mirror of
https://github.com/osm-search/Nominatim.git
synced 2026-02-16 15:47:58 +00:00
The algorithm is similar to the PHP reranking and uses the terms from the display name to check against the query terms. However instead of exact matching it uses a per-word-edit-distance, so that it is less strict when it comes to mismatching accents or other one letter differences. Country names get a higher penalty because they don't receive a penalty during token matching right now. This will work badly with the legacy tokenizer. Given that it is marked for removal, it is simply not worth optimising for it.
249 lines
9.8 KiB
Python
249 lines
9.8 KiB
Python
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
#
|
|
# This file is part of Nominatim. (https://nominatim.org)
|
|
#
|
|
# Copyright (C) 2023 by the Nominatim developer community.
|
|
# For a full list of authors see the git log.
|
|
"""
|
|
Public interface to the search code.
|
|
"""
|
|
from typing import List, Any, Optional, Iterator, Tuple
|
|
import itertools
|
|
import re
|
|
import datetime as dt
|
|
import difflib
|
|
|
|
from nominatim.api.connection import SearchConnection
|
|
from nominatim.api.types import SearchDetails
|
|
from nominatim.api.results import SearchResults, add_result_details
|
|
from nominatim.api.search.token_assignment import yield_token_assignments
|
|
from nominatim.api.search.db_search_builder import SearchBuilder, build_poi_search, wrap_near_search
|
|
from nominatim.api.search.db_searches import AbstractSearch
|
|
from nominatim.api.search.query_analyzer_factory import make_query_analyzer, AbstractQueryAnalyzer
|
|
from nominatim.api.search.query import Phrase, QueryStruct
|
|
from nominatim.api.logging import log
|
|
|
|
class ForwardGeocoder:
|
|
""" Main class responsible for place search.
|
|
"""
|
|
|
|
def __init__(self, conn: SearchConnection,
|
|
params: SearchDetails, timeout: Optional[int]) -> None:
|
|
self.conn = conn
|
|
self.params = params
|
|
self.timeout = dt.timedelta(seconds=timeout or 1000000)
|
|
self.query_analyzer: Optional[AbstractQueryAnalyzer] = None
|
|
|
|
|
|
@property
|
|
def limit(self) -> int:
|
|
""" Return the configured maximum number of search results.
|
|
"""
|
|
return self.params.max_results
|
|
|
|
|
|
async def build_searches(self,
|
|
phrases: List[Phrase]) -> Tuple[QueryStruct, List[AbstractSearch]]:
|
|
""" Analyse the query and return the tokenized query and list of
|
|
possible searches over it.
|
|
"""
|
|
if self.query_analyzer is None:
|
|
self.query_analyzer = await make_query_analyzer(self.conn)
|
|
|
|
query = await self.query_analyzer.analyze_query(phrases)
|
|
|
|
searches: List[AbstractSearch] = []
|
|
if query.num_token_slots() > 0:
|
|
# 2. Compute all possible search interpretations
|
|
log().section('Compute abstract searches')
|
|
search_builder = SearchBuilder(query, self.params)
|
|
num_searches = 0
|
|
for assignment in yield_token_assignments(query):
|
|
searches.extend(search_builder.build(assignment))
|
|
if num_searches < len(searches):
|
|
log().table_dump('Searches for assignment',
|
|
_dump_searches(searches, query, num_searches))
|
|
num_searches = len(searches)
|
|
searches.sort(key=lambda s: s.penalty)
|
|
|
|
return query, searches
|
|
|
|
|
|
async def execute_searches(self, query: QueryStruct,
|
|
searches: List[AbstractSearch]) -> SearchResults:
|
|
""" Run the abstract searches against the database until a result
|
|
is found.
|
|
"""
|
|
log().section('Execute database searches')
|
|
results = SearchResults()
|
|
end_time = dt.datetime.now() + self.timeout
|
|
|
|
num_results = 0
|
|
min_ranking = 1000.0
|
|
prev_penalty = 0.0
|
|
for i, search in enumerate(searches):
|
|
if search.penalty > prev_penalty and (search.penalty > min_ranking or i > 20):
|
|
break
|
|
log().table_dump(f"{i + 1}. Search", _dump_searches([search], query))
|
|
for result in await search.lookup(self.conn, self.params):
|
|
results.append(result)
|
|
min_ranking = min(min_ranking, result.ranking + 0.5, search.penalty + 0.3)
|
|
log().result_dump('Results', ((r.accuracy, r) for r in results[num_results:]))
|
|
num_results = len(results)
|
|
prev_penalty = search.penalty
|
|
if dt.datetime.now() >= end_time:
|
|
break
|
|
|
|
return results
|
|
|
|
|
|
def sort_and_cut_results(self, results: SearchResults) -> SearchResults:
|
|
""" Remove badly matching results, sort by ranking and
|
|
limit to the configured number of results.
|
|
"""
|
|
if results:
|
|
min_ranking = min(r.ranking for r in results)
|
|
results = SearchResults(r for r in results if r.ranking < min_ranking + 0.5)
|
|
results.sort(key=lambda r: r.ranking)
|
|
|
|
if results:
|
|
min_rank = results[0].rank_search
|
|
results = SearchResults(r for r in results
|
|
if r.ranking + 0.05 * (r.rank_search - min_rank)
|
|
< min_ranking + 0.5)
|
|
|
|
results = SearchResults(results[:self.limit])
|
|
|
|
return results
|
|
|
|
|
|
def rerank_by_query(self, query: QueryStruct, results: SearchResults) -> None:
|
|
""" Adjust the accuracy of the localized result according to how well
|
|
they match the original query.
|
|
"""
|
|
assert self.query_analyzer is not None
|
|
qwords = [word for phrase in query.source
|
|
for word in re.split('[, ]+', phrase.text) if word]
|
|
if not qwords:
|
|
return
|
|
|
|
for result in results:
|
|
if not result.display_name:
|
|
continue
|
|
distance = 0.0
|
|
norm = self.query_analyzer.normalize_text(result.display_name)
|
|
words = set((w for w in norm.split(' ') if w))
|
|
if not words:
|
|
continue
|
|
for qword in qwords:
|
|
wdist = max(difflib.SequenceMatcher(a=qword, b=w).quick_ratio() for w in words)
|
|
if wdist < 0.5:
|
|
distance += len(qword)
|
|
else:
|
|
distance += (1.0 - wdist) * len(qword)
|
|
result.accuracy += distance * 0.5 / sum(len(w) for w in qwords)
|
|
|
|
|
|
async def lookup_pois(self, categories: List[Tuple[str, str]],
|
|
phrases: List[Phrase]) -> SearchResults:
|
|
""" Look up places by category. If phrase is given, a place search
|
|
over the phrase will be executed first and places close to the
|
|
results returned.
|
|
"""
|
|
log().function('forward_lookup_pois', categories=categories, params=self.params)
|
|
|
|
if phrases:
|
|
query, searches = await self.build_searches(phrases)
|
|
|
|
if query:
|
|
searches = [wrap_near_search(categories, s) for s in searches[:50]]
|
|
results = await self.execute_searches(query, searches)
|
|
await add_result_details(self.conn, results, self.params)
|
|
log().result_dump('Preliminary Results', ((r.accuracy, r) for r in results))
|
|
results = self.sort_and_cut_results(results)
|
|
else:
|
|
results = SearchResults()
|
|
else:
|
|
search = build_poi_search(categories, self.params.countries)
|
|
results = await search.lookup(self.conn, self.params)
|
|
await add_result_details(self.conn, results, self.params)
|
|
|
|
log().result_dump('Final Results', ((r.accuracy, r) for r in results))
|
|
|
|
return results
|
|
|
|
|
|
async def lookup(self, phrases: List[Phrase]) -> SearchResults:
|
|
""" Look up a single free-text query.
|
|
"""
|
|
log().function('forward_lookup', phrases=phrases, params=self.params)
|
|
results = SearchResults()
|
|
|
|
if self.params.is_impossible():
|
|
return results
|
|
|
|
query, searches = await self.build_searches(phrases)
|
|
|
|
if searches:
|
|
# Execute SQL until an appropriate result is found.
|
|
results = await self.execute_searches(query, searches[:50])
|
|
await add_result_details(self.conn, results, self.params)
|
|
log().result_dump('Preliminary Results', ((r.accuracy, r) for r in results))
|
|
self.rerank_by_query(query, results)
|
|
log().result_dump('Results after reranking', ((r.accuracy, r) for r in results))
|
|
results = self.sort_and_cut_results(results)
|
|
log().result_dump('Final Results', ((r.accuracy, r) for r in results))
|
|
|
|
return results
|
|
|
|
|
|
# pylint: disable=invalid-name,too-many-locals
|
|
def _dump_searches(searches: List[AbstractSearch], query: QueryStruct,
|
|
start: int = 0) -> Iterator[Optional[List[Any]]]:
|
|
yield ['Penalty', 'Lookups', 'Housenr', 'Postcode', 'Countries',
|
|
'Qualifier', 'Catgeory', 'Rankings']
|
|
|
|
def tk(tl: List[int]) -> str:
|
|
tstr = [f"{query.find_lookup_word_by_id(t)}({t})" for t in tl]
|
|
|
|
return f"[{','.join(tstr)}]"
|
|
|
|
def fmt_ranking(f: Any) -> str:
|
|
if not f:
|
|
return ''
|
|
ranks = ','.join((f"{tk(r.tokens)}^{r.penalty:.3g}" for r in f.rankings))
|
|
if len(ranks) > 100:
|
|
ranks = ranks[:100] + '...'
|
|
return f"{f.column}({ranks},def={f.default:.3g})"
|
|
|
|
def fmt_lookup(l: Any) -> str:
|
|
if not l:
|
|
return ''
|
|
|
|
return f"{l.lookup_type}({l.column}{tk(l.tokens)})"
|
|
|
|
|
|
def fmt_cstr(c: Any) -> str:
|
|
if not c:
|
|
return ''
|
|
|
|
return f'{c[0]}^{c[1]}'
|
|
|
|
for search in searches[start:]:
|
|
fields = ('lookups', 'rankings', 'countries', 'housenumbers',
|
|
'postcodes', 'qualifiers')
|
|
if hasattr(search, 'search'):
|
|
iters = itertools.zip_longest([f"{search.penalty:.3g}"],
|
|
*(getattr(search.search, attr, []) for attr in fields),
|
|
getattr(search, 'categories', []),
|
|
fillvalue='')
|
|
else:
|
|
iters = itertools.zip_longest([f"{search.penalty:.3g}"],
|
|
*(getattr(search, attr, []) for attr in fields),
|
|
[],
|
|
fillvalue='')
|
|
for penalty, lookup, rank, cc, hnr, pc, qual, cat in iters:
|
|
yield [penalty, fmt_lookup(lookup), fmt_cstr(hnr),
|
|
fmt_cstr(pc), fmt_cstr(cc), fmt_cstr(qual), fmt_cstr(cat), fmt_ranking(rank)]
|
|
yield None
|