Files
Nominatim/nominatim/api/search/geocoder.py
Sarah Hoffmann 4ce13f5c1f prefilter bad results before adding details and reranking
Move the first cutting of the result list before reranking
by result match. This means that results with significantly
less importance are removed early and independently of the
fact how well they match the original query.

Fixes #3266.
2024-02-06 20:29:48 +01:00

275 lines
11 KiB
Python

# SPDX-License-Identifier: GPL-3.0-or-later
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2023 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Public interface to the search code.
"""
from typing import List, Any, Optional, Iterator, Tuple, Dict
import itertools
import re
import datetime as dt
import difflib
from nominatim.api.connection import SearchConnection
from nominatim.api.types import SearchDetails
from nominatim.api.results import SearchResult, SearchResults, add_result_details
from nominatim.api.search.token_assignment import yield_token_assignments
from nominatim.api.search.db_search_builder import SearchBuilder, build_poi_search, wrap_near_search
from nominatim.api.search.db_searches import AbstractSearch
from nominatim.api.search.query_analyzer_factory import make_query_analyzer, AbstractQueryAnalyzer
from nominatim.api.search.query import Phrase, QueryStruct
from nominatim.api.logging import log
class ForwardGeocoder:
""" Main class responsible for place search.
"""
def __init__(self, conn: SearchConnection,
params: SearchDetails, timeout: Optional[int]) -> None:
self.conn = conn
self.params = params
self.timeout = dt.timedelta(seconds=timeout or 1000000)
self.query_analyzer: Optional[AbstractQueryAnalyzer] = None
@property
def limit(self) -> int:
""" Return the configured maximum number of search results.
"""
return self.params.max_results
async def build_searches(self,
phrases: List[Phrase]) -> Tuple[QueryStruct, List[AbstractSearch]]:
""" Analyse the query and return the tokenized query and list of
possible searches over it.
"""
if self.query_analyzer is None:
self.query_analyzer = await make_query_analyzer(self.conn)
query = await self.query_analyzer.analyze_query(phrases)
searches: List[AbstractSearch] = []
if query.num_token_slots() > 0:
# 2. Compute all possible search interpretations
log().section('Compute abstract searches')
search_builder = SearchBuilder(query, self.params)
num_searches = 0
for assignment in yield_token_assignments(query):
searches.extend(search_builder.build(assignment))
if num_searches < len(searches):
log().table_dump('Searches for assignment',
_dump_searches(searches, query, num_searches))
num_searches = len(searches)
searches.sort(key=lambda s: (s.penalty, s.SEARCH_PRIO))
return query, searches
async def execute_searches(self, query: QueryStruct,
searches: List[AbstractSearch]) -> SearchResults:
""" Run the abstract searches against the database until a result
is found.
"""
log().section('Execute database searches')
results: Dict[Any, SearchResult] = {}
end_time = dt.datetime.now() + self.timeout
min_ranking = searches[0].penalty + 2.0
prev_penalty = 0.0
for i, search in enumerate(searches):
if search.penalty > prev_penalty and (search.penalty > min_ranking or i > 20):
break
log().table_dump(f"{i + 1}. Search", _dump_searches([search], query))
log().var_dump('Params', self.params)
lookup_results = await search.lookup(self.conn, self.params)
for result in lookup_results:
rhash = (result.source_table, result.place_id,
result.housenumber, result.country_code)
prevresult = results.get(rhash)
if prevresult:
prevresult.accuracy = min(prevresult.accuracy, result.accuracy)
else:
results[rhash] = result
min_ranking = min(min_ranking, result.accuracy * 1.2)
log().result_dump('Results', ((r.accuracy, r) for r in lookup_results))
prev_penalty = search.penalty
if dt.datetime.now() >= end_time:
break
return SearchResults(results.values())
def pre_filter_results(self, results: SearchResults) -> SearchResults:
""" Remove results that are significantly worse than the
best match.
"""
if results:
max_ranking = min(r.ranking for r in results) + 0.5
results = SearchResults(r for r in results if r.ranking < max_ranking)
return results
def sort_and_cut_results(self, results: SearchResults) -> SearchResults:
""" Remove badly matching results, sort by ranking and
limit to the configured number of results.
"""
if results:
results.sort(key=lambda r: r.ranking)
min_rank = results[0].rank_search
min_ranking = results[0].ranking
results = SearchResults(r for r in results
if r.ranking + 0.03 * (r.rank_search - min_rank)
< min_ranking + 0.5)
results = SearchResults(results[:self.limit])
return results
def rerank_by_query(self, query: QueryStruct, results: SearchResults) -> None:
""" Adjust the accuracy of the localized result according to how well
they match the original query.
"""
assert self.query_analyzer is not None
qwords = [word for phrase in query.source
for word in re.split('[, ]+', phrase.text) if word]
if not qwords:
return
for result in results:
# Negative importance indicates ordering by distance, which is
# more important than word matching.
if not result.display_name\
or (result.importance is not None and result.importance < 0):
continue
distance = 0.0
norm = self.query_analyzer.normalize_text(' '.join((result.display_name,
result.country_code or '')))
words = set((w for w in norm.split(' ') if w))
if not words:
continue
for qword in qwords:
wdist = max(difflib.SequenceMatcher(a=qword, b=w).quick_ratio() for w in words)
if wdist < 0.5:
distance += len(qword)
else:
distance += (1.0 - wdist) * len(qword)
# Compensate for the fact that country names do not get a
# match penalty yet by the tokenizer.
# Temporary hack that needs to be removed!
if result.rank_address == 4:
distance *= 2
result.accuracy += distance * 0.4 / sum(len(w) for w in qwords)
async def lookup_pois(self, categories: List[Tuple[str, str]],
phrases: List[Phrase]) -> SearchResults:
""" Look up places by category. If phrase is given, a place search
over the phrase will be executed first and places close to the
results returned.
"""
log().function('forward_lookup_pois', categories=categories, params=self.params)
if phrases:
query, searches = await self.build_searches(phrases)
if query:
searches = [wrap_near_search(categories, s) for s in searches[:50]]
results = await self.execute_searches(query, searches)
results = self.pre_filter_results(results)
await add_result_details(self.conn, results, self.params)
log().result_dump('Preliminary Results', ((r.accuracy, r) for r in results))
results = self.sort_and_cut_results(results)
else:
results = SearchResults()
else:
search = build_poi_search(categories, self.params.countries)
results = await search.lookup(self.conn, self.params)
await add_result_details(self.conn, results, self.params)
log().result_dump('Final Results', ((r.accuracy, r) for r in results))
return results
async def lookup(self, phrases: List[Phrase]) -> SearchResults:
""" Look up a single free-text query.
"""
log().function('forward_lookup', phrases=phrases, params=self.params)
results = SearchResults()
if self.params.is_impossible():
return results
query, searches = await self.build_searches(phrases)
if searches:
# Execute SQL until an appropriate result is found.
results = await self.execute_searches(query, searches[:50])
results = self.pre_filter_results(results)
await add_result_details(self.conn, results, self.params)
log().result_dump('Preliminary Results', ((r.accuracy, r) for r in results))
self.rerank_by_query(query, results)
log().result_dump('Results after reranking', ((r.accuracy, r) for r in results))
results = self.sort_and_cut_results(results)
log().result_dump('Final Results', ((r.accuracy, r) for r in results))
return results
# pylint: disable=invalid-name,too-many-locals
def _dump_searches(searches: List[AbstractSearch], query: QueryStruct,
start: int = 0) -> Iterator[Optional[List[Any]]]:
yield ['Penalty', 'Lookups', 'Housenr', 'Postcode', 'Countries',
'Qualifier', 'Catgeory', 'Rankings']
def tk(tl: List[int]) -> str:
tstr = [f"{query.find_lookup_word_by_id(t)}({t})" for t in tl]
return f"[{','.join(tstr)}]"
def fmt_ranking(f: Any) -> str:
if not f:
return ''
ranks = ','.join((f"{tk(r.tokens)}^{r.penalty:.3g}" for r in f.rankings))
if len(ranks) > 100:
ranks = ranks[:100] + '...'
return f"{f.column}({ranks},def={f.default:.3g})"
def fmt_lookup(l: Any) -> str:
if not l:
return ''
return f"{l.lookup_type}({l.column}{tk(l.tokens)})"
def fmt_cstr(c: Any) -> str:
if not c:
return ''
return f'{c[0]}^{c[1]}'
for search in searches[start:]:
fields = ('lookups', 'rankings', 'countries', 'housenumbers',
'postcodes', 'qualifiers')
if hasattr(search, 'search'):
iters = itertools.zip_longest([f"{search.penalty:.3g}"],
*(getattr(search.search, attr, []) for attr in fields),
getattr(search, 'categories', []),
fillvalue='')
else:
iters = itertools.zip_longest([f"{search.penalty:.3g}"],
*(getattr(search, attr, []) for attr in fields),
[],
fillvalue='')
for penalty, lookup, rank, cc, hnr, pc, qual, cat in iters:
yield [penalty, fmt_lookup(lookup), fmt_cstr(hnr),
fmt_cstr(pc), fmt_cstr(cc), fmt_cstr(qual), fmt_cstr(cat), fmt_ranking(rank)]
yield None