mirror of
https://github.com/osm-search/Nominatim.git
synced 2026-02-14 01:47:57 +00:00
Compare commits
7 Commits
ab5f348a4a
...
06d5ab4c2d
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
06d5ab4c2d | ||
|
|
e327512667 | ||
|
|
3e04eb2ffe | ||
|
|
970d81fb27 | ||
|
|
cecdbeb7cf | ||
|
|
c634e9fc5f | ||
|
|
13eaea8aae |
@@ -180,7 +180,7 @@ class SearchBuilder:
|
||||
dbf.FieldLookup('nameaddress_vector', addr_fulls, lookups.LookupAny))
|
||||
|
||||
sdata.housenumbers = dbf.WeightedStrings([], [])
|
||||
yield dbs.PlaceSearch(0.05, sdata, expected_count)
|
||||
yield dbs.PlaceSearch(0.05, sdata, expected_count, True)
|
||||
|
||||
def build_name_search(self, sdata: dbf.SearchData,
|
||||
name: qmod.TokenRange, address: List[qmod.TokenRange],
|
||||
@@ -194,7 +194,10 @@ class SearchBuilder:
|
||||
sdata.rankings.append(ranking)
|
||||
for penalty, count, lookup in self.yield_lookups(name, address):
|
||||
sdata.lookups = lookup
|
||||
yield dbs.PlaceSearch(penalty + name_penalty, sdata, count)
|
||||
if sdata.housenumbers:
|
||||
yield dbs.AddressSearch(penalty + name_penalty, sdata, count, bool(address))
|
||||
else:
|
||||
yield dbs.PlaceSearch(penalty + name_penalty, sdata, count, bool(address))
|
||||
|
||||
def yield_lookups(self, name: qmod.TokenRange, address: List[qmod.TokenRange]
|
||||
) -> Iterator[Tuple[float, int, List[dbf.FieldLookup]]]:
|
||||
|
||||
@@ -14,3 +14,4 @@ from .poi_search import PoiSearch as PoiSearch
|
||||
from .country_search import CountrySearch as CountrySearch
|
||||
from .postcode_search import PostcodeSearch as PostcodeSearch
|
||||
from .place_search import PlaceSearch as PlaceSearch
|
||||
from .address_search import AddressSearch as AddressSearch
|
||||
|
||||
359
src/nominatim_api/search/db_searches/address_search.py
Normal file
359
src/nominatim_api/search/db_searches/address_search.py
Normal file
@@ -0,0 +1,359 @@
|
||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||
#
|
||||
# This file is part of Nominatim. (https://nominatim.org)
|
||||
#
|
||||
# Copyright (C) 2025 by the Nominatim developer community.
|
||||
# For a full list of authors see the git log.
|
||||
"""
|
||||
Implementation of search for an address (search with housenumber).
|
||||
"""
|
||||
from typing import cast, List, AsyncIterator
|
||||
|
||||
import sqlalchemy as sa
|
||||
|
||||
from . import base
|
||||
from ...typing import SaBind, SaExpression, SaColumn, SaFromClause, SaScalarSelect
|
||||
from ...types import SearchDetails, Bbox
|
||||
from ...sql.sqlalchemy_types import Geometry
|
||||
from ...connection import SearchConnection
|
||||
from ... import results as nres
|
||||
from ..db_search_fields import SearchData
|
||||
|
||||
|
||||
LIMIT_PARAM: SaBind = sa.bindparam('limit')
|
||||
MIN_RANK_PARAM: SaBind = sa.bindparam('min_rank')
|
||||
MAX_RANK_PARAM: SaBind = sa.bindparam('max_rank')
|
||||
VIEWBOX_PARAM: SaBind = sa.bindparam('viewbox', type_=Geometry)
|
||||
VIEWBOX2_PARAM: SaBind = sa.bindparam('viewbox2', type_=Geometry)
|
||||
NEAR_PARAM: SaBind = sa.bindparam('near', type_=Geometry)
|
||||
NEAR_RADIUS_PARAM: SaBind = sa.bindparam('near_radius')
|
||||
COUNTRIES_PARAM: SaBind = sa.bindparam('countries')
|
||||
|
||||
|
||||
def _int_list_to_subquery(inp: List[int]) -> 'sa.Subquery':
|
||||
""" Create a subselect that returns the given list of integers
|
||||
as rows in the column 'nr'.
|
||||
"""
|
||||
vtab = sa.func.JsonArrayEach(sa.type_coerce(inp, sa.JSON))\
|
||||
.table_valued(sa.column('value', type_=sa.JSON))
|
||||
return sa.select(sa.cast(sa.cast(vtab.c.value, sa.Text), sa.Integer).label('nr')).subquery()
|
||||
|
||||
|
||||
def _interpolated_position(table: SaFromClause, nr: SaColumn) -> SaColumn:
|
||||
pos = sa.cast(nr - table.c.startnumber, sa.Float) / (table.c.endnumber - table.c.startnumber)
|
||||
return sa.case(
|
||||
(table.c.endnumber == table.c.startnumber, table.c.linegeo.ST_Centroid()),
|
||||
else_=table.c.linegeo.ST_LineInterpolatePoint(pos)).label('centroid')
|
||||
|
||||
|
||||
def _make_interpolation_subquery(table: SaFromClause, inner: SaFromClause,
|
||||
numerals: List[int], details: SearchDetails) -> SaScalarSelect:
|
||||
all_ids = sa.func.ArrayAgg(table.c.place_id)
|
||||
sql = sa.select(all_ids).where(table.c.parent_place_id == inner.c.place_id)
|
||||
|
||||
if len(numerals) == 1:
|
||||
sql = sql.where(sa.between(numerals[0], table.c.startnumber, table.c.endnumber))\
|
||||
.where((numerals[0] - table.c.startnumber) % table.c.step == 0)
|
||||
else:
|
||||
sql = sql.where(sa.or_(
|
||||
*(sa.and_(sa.between(n, table.c.startnumber, table.c.endnumber),
|
||||
(n - table.c.startnumber) % table.c.step == 0)
|
||||
for n in numerals)))
|
||||
|
||||
if details.excluded:
|
||||
sql = sql.where(base.exclude_places(table))
|
||||
|
||||
return sql.scalar_subquery()
|
||||
|
||||
|
||||
async def _get_placex_housenumbers(conn: SearchConnection,
|
||||
place_ids: List[int],
|
||||
details: SearchDetails) -> AsyncIterator[nres.SearchResult]:
|
||||
t = conn.t.placex
|
||||
sql = base.select_placex(t).add_columns(t.c.importance)\
|
||||
.where(t.c.place_id.in_(place_ids))
|
||||
|
||||
if details.geometry_output:
|
||||
sql = base.add_geometry_columns(sql, t.c.geometry, details)
|
||||
|
||||
for row in await conn.execute(sql):
|
||||
result = nres.create_from_placex_row(row, nres.SearchResult)
|
||||
assert result
|
||||
result.bbox = Bbox.from_wkb(row.bbox)
|
||||
yield result
|
||||
|
||||
|
||||
async def _get_osmline(conn: SearchConnection, place_ids: List[int],
|
||||
numerals: List[int],
|
||||
details: SearchDetails) -> AsyncIterator[nres.SearchResult]:
|
||||
t = conn.t.osmline
|
||||
|
||||
values = _int_list_to_subquery(numerals)
|
||||
sql = sa.select(t.c.place_id, t.c.osm_id,
|
||||
t.c.parent_place_id, t.c.address,
|
||||
values.c.nr.label('housenumber'),
|
||||
_interpolated_position(t, values.c.nr),
|
||||
t.c.postcode, t.c.country_code)\
|
||||
.where(t.c.place_id.in_(place_ids))\
|
||||
.join(values, values.c.nr.between(t.c.startnumber, t.c.endnumber))
|
||||
|
||||
if details.geometry_output:
|
||||
sub = sql.subquery()
|
||||
sql = base.add_geometry_columns(sa.select(sub), sub.c.centroid, details)
|
||||
|
||||
for row in await conn.execute(sql):
|
||||
result = nres.create_from_osmline_row(row, nres.SearchResult)
|
||||
assert result
|
||||
yield result
|
||||
|
||||
|
||||
async def _get_tiger(conn: SearchConnection, place_ids: List[int],
|
||||
numerals: List[int], osm_id: int,
|
||||
details: SearchDetails) -> AsyncIterator[nres.SearchResult]:
|
||||
t = conn.t.tiger
|
||||
values = _int_list_to_subquery(numerals)
|
||||
sql = sa.select(t.c.place_id, t.c.parent_place_id,
|
||||
sa.literal('W').label('osm_type'),
|
||||
sa.literal(osm_id).label('osm_id'),
|
||||
values.c.nr.label('housenumber'),
|
||||
_interpolated_position(t, values.c.nr),
|
||||
t.c.postcode)\
|
||||
.where(t.c.place_id.in_(place_ids))\
|
||||
.join(values, values.c.nr.between(t.c.startnumber, t.c.endnumber))
|
||||
|
||||
if details.geometry_output:
|
||||
sub = sql.subquery()
|
||||
sql = base.add_geometry_columns(sa.select(sub), sub.c.centroid, details)
|
||||
|
||||
for row in await conn.execute(sql):
|
||||
result = nres.create_from_tiger_row(row, nres.SearchResult)
|
||||
assert result
|
||||
yield result
|
||||
|
||||
|
||||
class AddressSearch(base.AbstractSearch):
|
||||
""" Generic search for an address or named place.
|
||||
"""
|
||||
SEARCH_PRIO = 1
|
||||
|
||||
def __init__(self, extra_penalty: float, sdata: SearchData,
|
||||
expected_count: int, has_address_terms: bool) -> None:
|
||||
assert sdata.housenumbers
|
||||
super().__init__(sdata.penalty + extra_penalty)
|
||||
self.countries = sdata.countries
|
||||
self.postcodes = sdata.postcodes
|
||||
self.housenumbers = sdata.housenumbers
|
||||
self.qualifiers = sdata.qualifiers
|
||||
self.lookups = sdata.lookups
|
||||
self.rankings = sdata.rankings
|
||||
self.expected_count = expected_count
|
||||
self.has_address_terms = has_address_terms
|
||||
|
||||
def _inner_search_name_cte(self, conn: SearchConnection,
|
||||
details: SearchDetails) -> 'sa.CTE':
|
||||
""" Create a subquery that preselects the rows in the search_name
|
||||
table.
|
||||
"""
|
||||
t = conn.t.search_name
|
||||
|
||||
penalty: SaExpression = sa.literal(self.penalty)
|
||||
for ranking in self.rankings:
|
||||
penalty += ranking.sql_penalty(t)
|
||||
|
||||
sql = sa.select(t.c.place_id, t.c.search_rank, t.c.address_rank,
|
||||
t.c.country_code, t.c.centroid,
|
||||
t.c.name_vector, t.c.nameaddress_vector,
|
||||
sa.case((t.c.importance > 0, t.c.importance),
|
||||
else_=0.40001-(sa.cast(t.c.search_rank, sa.Float())/75))
|
||||
.label('importance'),
|
||||
penalty.label('penalty'))
|
||||
|
||||
for lookup in self.lookups:
|
||||
sql = sql.where(lookup.sql_condition(t))
|
||||
|
||||
if self.countries:
|
||||
sql = sql.where(t.c.country_code.in_(self.countries.values))
|
||||
|
||||
if self.postcodes:
|
||||
if self.expected_count > 10000:
|
||||
# Many results expected. Restrict by postcode.
|
||||
tpc = conn.t.postcode
|
||||
sql = sql.where(sa.select(tpc.c.postcode)
|
||||
.where(tpc.c.postcode.in_(self.postcodes.values))
|
||||
.where(t.c.centroid.within_distance(tpc.c.geometry, 0.4))
|
||||
.exists())
|
||||
|
||||
if details.viewbox is not None:
|
||||
if details.bounded_viewbox:
|
||||
sql = sql.where(t.c.centroid
|
||||
.intersects(VIEWBOX_PARAM,
|
||||
use_index=details.viewbox.area < 0.2))
|
||||
|
||||
if details.near is not None and details.near_radius is not None:
|
||||
if details.near_radius < 0.1:
|
||||
sql = sql.where(t.c.centroid.within_distance(NEAR_PARAM,
|
||||
NEAR_RADIUS_PARAM))
|
||||
else:
|
||||
sql = sql.where(t.c.centroid
|
||||
.ST_Distance(NEAR_PARAM) < NEAR_RADIUS_PARAM)
|
||||
|
||||
if self.has_address_terms:
|
||||
sql = sql.where(t.c.address_rank.between(16, 30))
|
||||
else:
|
||||
# If no further address terms are given, then the base street must
|
||||
# be in the name. No search for named POIs with the given house number.
|
||||
sql = sql.where(t.c.address_rank.between(16, 27))
|
||||
|
||||
inner = sql.limit(10000).order_by(sa.desc(sa.text('importance'))).subquery()
|
||||
|
||||
sql = sa.select(inner.c.place_id, inner.c.search_rank, inner.c.address_rank,
|
||||
inner.c.country_code, inner.c.centroid, inner.c.importance,
|
||||
inner.c.penalty)
|
||||
|
||||
return sql.cte('searches')
|
||||
|
||||
async def lookup(self, conn: SearchConnection,
|
||||
details: SearchDetails) -> nres.SearchResults:
|
||||
""" Find results for the search in the database.
|
||||
"""
|
||||
t = conn.t.placex
|
||||
tsearch = self._inner_search_name_cte(conn, details)
|
||||
|
||||
sql = base.select_placex(t).join(tsearch, t.c.place_id == tsearch.c.place_id)
|
||||
|
||||
if details.geometry_output:
|
||||
sql = base.add_geometry_columns(sql, t.c.geometry, details)
|
||||
|
||||
penalty: SaExpression = tsearch.c.penalty
|
||||
|
||||
if self.postcodes:
|
||||
tpc = conn.t.postcode
|
||||
pcs = self.postcodes.values
|
||||
|
||||
pc_near = sa.select(sa.func.min(tpc.c.geometry.ST_Distance(t.c.centroid)
|
||||
* (tpc.c.rank_search - 19)))\
|
||||
.where(tpc.c.postcode.in_(pcs))\
|
||||
.scalar_subquery()
|
||||
penalty += sa.case((t.c.postcode.in_(pcs), 0.0),
|
||||
else_=sa.func.coalesce(pc_near, cast(SaColumn, 2.0)))
|
||||
|
||||
if details.viewbox is not None and not details.bounded_viewbox:
|
||||
penalty += sa.case((t.c.geometry.intersects(VIEWBOX_PARAM, use_index=False), 0.0),
|
||||
(t.c.geometry.intersects(VIEWBOX2_PARAM, use_index=False), 0.5),
|
||||
else_=1.0)
|
||||
|
||||
if details.near is not None:
|
||||
sql = sql.add_columns((-tsearch.c.centroid.ST_Distance(NEAR_PARAM))
|
||||
.label('importance'))
|
||||
sql = sql.order_by(sa.desc(sa.text('importance')))
|
||||
else:
|
||||
sql = sql.order_by(penalty - tsearch.c.importance)
|
||||
sql = sql.add_columns(tsearch.c.importance)
|
||||
|
||||
sql = sql.add_columns(penalty.label('accuracy'))\
|
||||
.order_by(sa.text('accuracy'))
|
||||
|
||||
hnr_list = '|'.join(self.housenumbers.values)
|
||||
|
||||
if self.has_address_terms:
|
||||
sql = sql.where(sa.or_(tsearch.c.address_rank < 30,
|
||||
sa.func.RegexpWord(hnr_list, t.c.housenumber)))
|
||||
|
||||
inner = sql.subquery()
|
||||
|
||||
# Housenumbers from placex
|
||||
thnr = conn.t.placex.alias('hnr')
|
||||
pid_list = sa.func.ArrayAgg(thnr.c.place_id)
|
||||
place_sql = sa.select(pid_list)\
|
||||
.where(thnr.c.parent_place_id == inner.c.place_id)\
|
||||
.where(sa.func.RegexpWord(hnr_list, thnr.c.housenumber))\
|
||||
.where(thnr.c.linked_place_id == None)\
|
||||
.where(thnr.c.indexed_status == 0)
|
||||
|
||||
if details.excluded:
|
||||
place_sql = place_sql.where(thnr.c.place_id.not_in(sa.bindparam('excluded')))
|
||||
if self.qualifiers:
|
||||
place_sql = place_sql.where(self.qualifiers.sql_restrict(thnr))
|
||||
|
||||
numerals = [int(n) for n in self.housenumbers.values
|
||||
if n.isdigit() and len(n) < 8]
|
||||
interpol_sql: SaColumn
|
||||
tiger_sql: SaColumn
|
||||
if numerals and \
|
||||
(not self.qualifiers or ('place', 'house') in self.qualifiers.values):
|
||||
# Housenumbers from interpolations
|
||||
interpol_sql = _make_interpolation_subquery(conn.t.osmline, inner,
|
||||
numerals, details)
|
||||
# Housenumbers from Tiger
|
||||
tiger_sql = sa.case((inner.c.country_code == 'us',
|
||||
_make_interpolation_subquery(conn.t.tiger, inner,
|
||||
numerals, details)
|
||||
), else_=None)
|
||||
else:
|
||||
interpol_sql = sa.null()
|
||||
tiger_sql = sa.null()
|
||||
|
||||
unsort = sa.select(inner, place_sql.scalar_subquery().label('placex_hnr'),
|
||||
interpol_sql.label('interpol_hnr'),
|
||||
tiger_sql.label('tiger_hnr')).subquery('unsort')
|
||||
sql = sa.select(unsort)\
|
||||
.order_by(unsort.c.accuracy +
|
||||
sa.case((unsort.c.placex_hnr != None, 0),
|
||||
(unsort.c.interpol_hnr != None, 0),
|
||||
(unsort.c.tiger_hnr != None, 0),
|
||||
else_=1),
|
||||
sa.case((unsort.c.placex_hnr != None, 1),
|
||||
(unsort.c.interpol_hnr != None, 2),
|
||||
(unsort.c.tiger_hnr != None, 3),
|
||||
else_=4))
|
||||
|
||||
sql = sql.limit(LIMIT_PARAM)
|
||||
|
||||
bind_params = {
|
||||
'limit': details.max_results,
|
||||
'min_rank': details.min_rank,
|
||||
'max_rank': details.max_rank,
|
||||
'viewbox': details.viewbox,
|
||||
'viewbox2': details.viewbox_x2,
|
||||
'near': details.near,
|
||||
'near_radius': details.near_radius,
|
||||
'excluded': details.excluded,
|
||||
'countries': details.countries
|
||||
}
|
||||
|
||||
results = nres.SearchResults()
|
||||
for row in await conn.execute(sql, bind_params):
|
||||
result = nres.create_from_placex_row(row, nres.SearchResult)
|
||||
assert result
|
||||
result.bbox = Bbox.from_wkb(row.bbox)
|
||||
result.accuracy = row.accuracy
|
||||
if row.rank_address < 30:
|
||||
if row.placex_hnr:
|
||||
subs = _get_placex_housenumbers(conn, row.placex_hnr, details)
|
||||
elif row.interpol_hnr:
|
||||
subs = _get_osmline(conn, row.interpol_hnr, numerals, details)
|
||||
elif row.tiger_hnr:
|
||||
subs = _get_tiger(conn, row.tiger_hnr, numerals, row.osm_id, details)
|
||||
else:
|
||||
subs = None
|
||||
|
||||
if subs is not None:
|
||||
async for sub in subs:
|
||||
assert sub.housenumber
|
||||
sub.accuracy = result.accuracy
|
||||
if not any(nr in self.housenumbers.values
|
||||
for nr in sub.housenumber.split(';')):
|
||||
sub.accuracy += 0.6
|
||||
results.append(sub)
|
||||
|
||||
# Only add the street as a result, if it meets all other
|
||||
# filter conditions.
|
||||
if (not details.excluded or result.place_id not in details.excluded)\
|
||||
and (not self.qualifiers or result.category in self.qualifiers.values)\
|
||||
and result.rank_address >= details.min_rank:
|
||||
result.accuracy += 1.0 # penalty for missing housenumber
|
||||
results.append(result)
|
||||
else:
|
||||
results.append(result)
|
||||
|
||||
return results
|
||||
@@ -5,14 +5,14 @@
|
||||
# Copyright (C) 2025 by the Nominatim developer community.
|
||||
# For a full list of authors see the git log.
|
||||
"""
|
||||
Implementation of search for a named place.
|
||||
Implementation of search for a named place (without housenumber).
|
||||
"""
|
||||
from typing import cast, List, AsyncIterator
|
||||
from typing import cast
|
||||
|
||||
import sqlalchemy as sa
|
||||
|
||||
from . import base
|
||||
from ...typing import SaBind, SaExpression, SaColumn, SaFromClause, SaScalarSelect
|
||||
from ...typing import SaBind, SaExpression, SaColumn
|
||||
from ...types import SearchDetails, Bbox
|
||||
from ...sql.sqlalchemy_types import Geometry
|
||||
from ...connection import SearchConnection
|
||||
@@ -30,121 +30,22 @@ NEAR_RADIUS_PARAM: SaBind = sa.bindparam('near_radius')
|
||||
COUNTRIES_PARAM: SaBind = sa.bindparam('countries')
|
||||
|
||||
|
||||
def _int_list_to_subquery(inp: List[int]) -> 'sa.Subquery':
|
||||
""" Create a subselect that returns the given list of integers
|
||||
as rows in the column 'nr'.
|
||||
"""
|
||||
vtab = sa.func.JsonArrayEach(sa.type_coerce(inp, sa.JSON))\
|
||||
.table_valued(sa.column('value', type_=sa.JSON))
|
||||
return sa.select(sa.cast(sa.cast(vtab.c.value, sa.Text), sa.Integer).label('nr')).subquery()
|
||||
|
||||
|
||||
def _interpolated_position(table: SaFromClause, nr: SaColumn) -> SaColumn:
|
||||
pos = sa.cast(nr - table.c.startnumber, sa.Float) / (table.c.endnumber - table.c.startnumber)
|
||||
return sa.case(
|
||||
(table.c.endnumber == table.c.startnumber, table.c.linegeo.ST_Centroid()),
|
||||
else_=table.c.linegeo.ST_LineInterpolatePoint(pos)).label('centroid')
|
||||
|
||||
|
||||
def _make_interpolation_subquery(table: SaFromClause, inner: SaFromClause,
|
||||
numerals: List[int], details: SearchDetails) -> SaScalarSelect:
|
||||
all_ids = sa.func.ArrayAgg(table.c.place_id)
|
||||
sql = sa.select(all_ids).where(table.c.parent_place_id == inner.c.place_id)
|
||||
|
||||
if len(numerals) == 1:
|
||||
sql = sql.where(sa.between(numerals[0], table.c.startnumber, table.c.endnumber))\
|
||||
.where((numerals[0] - table.c.startnumber) % table.c.step == 0)
|
||||
else:
|
||||
sql = sql.where(sa.or_(
|
||||
*(sa.and_(sa.between(n, table.c.startnumber, table.c.endnumber),
|
||||
(n - table.c.startnumber) % table.c.step == 0)
|
||||
for n in numerals)))
|
||||
|
||||
if details.excluded:
|
||||
sql = sql.where(base.exclude_places(table))
|
||||
|
||||
return sql.scalar_subquery()
|
||||
|
||||
|
||||
async def _get_placex_housenumbers(conn: SearchConnection,
|
||||
place_ids: List[int],
|
||||
details: SearchDetails) -> AsyncIterator[nres.SearchResult]:
|
||||
t = conn.t.placex
|
||||
sql = base.select_placex(t).add_columns(t.c.importance)\
|
||||
.where(t.c.place_id.in_(place_ids))
|
||||
|
||||
if details.geometry_output:
|
||||
sql = base.add_geometry_columns(sql, t.c.geometry, details)
|
||||
|
||||
for row in await conn.execute(sql):
|
||||
result = nres.create_from_placex_row(row, nres.SearchResult)
|
||||
assert result
|
||||
result.bbox = Bbox.from_wkb(row.bbox)
|
||||
yield result
|
||||
|
||||
|
||||
async def _get_osmline(conn: SearchConnection, place_ids: List[int],
|
||||
numerals: List[int],
|
||||
details: SearchDetails) -> AsyncIterator[nres.SearchResult]:
|
||||
t = conn.t.osmline
|
||||
|
||||
values = _int_list_to_subquery(numerals)
|
||||
sql = sa.select(t.c.place_id, t.c.osm_id,
|
||||
t.c.parent_place_id, t.c.address,
|
||||
values.c.nr.label('housenumber'),
|
||||
_interpolated_position(t, values.c.nr),
|
||||
t.c.postcode, t.c.country_code)\
|
||||
.where(t.c.place_id.in_(place_ids))\
|
||||
.join(values, values.c.nr.between(t.c.startnumber, t.c.endnumber))
|
||||
|
||||
if details.geometry_output:
|
||||
sub = sql.subquery()
|
||||
sql = base.add_geometry_columns(sa.select(sub), sub.c.centroid, details)
|
||||
|
||||
for row in await conn.execute(sql):
|
||||
result = nres.create_from_osmline_row(row, nres.SearchResult)
|
||||
assert result
|
||||
yield result
|
||||
|
||||
|
||||
async def _get_tiger(conn: SearchConnection, place_ids: List[int],
|
||||
numerals: List[int], osm_id: int,
|
||||
details: SearchDetails) -> AsyncIterator[nres.SearchResult]:
|
||||
t = conn.t.tiger
|
||||
values = _int_list_to_subquery(numerals)
|
||||
sql = sa.select(t.c.place_id, t.c.parent_place_id,
|
||||
sa.literal('W').label('osm_type'),
|
||||
sa.literal(osm_id).label('osm_id'),
|
||||
values.c.nr.label('housenumber'),
|
||||
_interpolated_position(t, values.c.nr),
|
||||
t.c.postcode)\
|
||||
.where(t.c.place_id.in_(place_ids))\
|
||||
.join(values, values.c.nr.between(t.c.startnumber, t.c.endnumber))
|
||||
|
||||
if details.geometry_output:
|
||||
sub = sql.subquery()
|
||||
sql = base.add_geometry_columns(sa.select(sub), sub.c.centroid, details)
|
||||
|
||||
for row in await conn.execute(sql):
|
||||
result = nres.create_from_tiger_row(row, nres.SearchResult)
|
||||
assert result
|
||||
yield result
|
||||
|
||||
|
||||
class PlaceSearch(base.AbstractSearch):
|
||||
""" Generic search for an address or named place.
|
||||
""" Generic search for a named place.
|
||||
"""
|
||||
SEARCH_PRIO = 1
|
||||
|
||||
def __init__(self, extra_penalty: float, sdata: SearchData, expected_count: int) -> None:
|
||||
def __init__(self, extra_penalty: float, sdata: SearchData,
|
||||
expected_count: int, has_address_terms: bool) -> None:
|
||||
assert not sdata.housenumbers
|
||||
super().__init__(sdata.penalty + extra_penalty)
|
||||
self.countries = sdata.countries
|
||||
self.postcodes = sdata.postcodes
|
||||
self.housenumbers = sdata.housenumbers
|
||||
self.qualifiers = sdata.qualifiers
|
||||
self.lookups = sdata.lookups
|
||||
self.rankings = sdata.rankings
|
||||
self.expected_count = expected_count
|
||||
self.has_address_terms = has_address_terms
|
||||
|
||||
def _inner_search_name_cte(self, conn: SearchConnection,
|
||||
details: SearchDetails) -> 'sa.CTE':
|
||||
@@ -187,7 +88,7 @@ class PlaceSearch(base.AbstractSearch):
|
||||
sql = sql.where(t.c.centroid
|
||||
.intersects(VIEWBOX_PARAM,
|
||||
use_index=details.viewbox.area < 0.2))
|
||||
elif not self.postcodes and not self.housenumbers and self.expected_count >= 10000:
|
||||
elif not self.postcodes and self.expected_count >= 10000:
|
||||
sql = sql.where(t.c.centroid
|
||||
.intersects(VIEWBOX2_PARAM,
|
||||
use_index=details.viewbox.area < 0.5))
|
||||
@@ -200,19 +101,18 @@ class PlaceSearch(base.AbstractSearch):
|
||||
sql = sql.where(t.c.centroid
|
||||
.ST_Distance(NEAR_PARAM) < NEAR_RADIUS_PARAM)
|
||||
|
||||
if self.housenumbers:
|
||||
sql = sql.where(t.c.address_rank.between(16, 30))
|
||||
else:
|
||||
if details.excluded:
|
||||
sql = sql.where(base.exclude_places(t))
|
||||
if details.min_rank > 0:
|
||||
sql = sql.where(sa.or_(t.c.address_rank >= MIN_RANK_PARAM,
|
||||
t.c.search_rank >= MIN_RANK_PARAM))
|
||||
if details.max_rank < 30:
|
||||
sql = sql.where(sa.or_(t.c.address_rank <= MAX_RANK_PARAM,
|
||||
t.c.search_rank <= MAX_RANK_PARAM))
|
||||
if details.excluded:
|
||||
sql = sql.where(base.exclude_places(t))
|
||||
if details.min_rank > 0:
|
||||
sql = sql.where(sa.or_(t.c.address_rank >= MIN_RANK_PARAM,
|
||||
t.c.search_rank >= MIN_RANK_PARAM))
|
||||
if details.max_rank < 30:
|
||||
sql = sql.where(sa.or_(t.c.address_rank <= MAX_RANK_PARAM,
|
||||
t.c.search_rank <= MAX_RANK_PARAM))
|
||||
|
||||
inner = sql.limit(10000).order_by(sa.desc(sa.text('importance'))).subquery()
|
||||
inner = sql.limit(5000 if self.qualifiers else 1000)\
|
||||
.order_by(sa.desc(sa.text('importance')))\
|
||||
.subquery()
|
||||
|
||||
sql = sa.select(inner.c.place_id, inner.c.search_rank, inner.c.address_rank,
|
||||
inner.c.country_code, inner.c.centroid, inner.c.importance,
|
||||
@@ -221,8 +121,7 @@ class PlaceSearch(base.AbstractSearch):
|
||||
# If the query is not an address search or has a geographic preference,
|
||||
# preselect most important items to restrict the number of places
|
||||
# that need to be looked up in placex.
|
||||
if not self.housenumbers\
|
||||
and (details.viewbox is None or details.bounded_viewbox)\
|
||||
if (details.viewbox is None or details.bounded_viewbox)\
|
||||
and (details.near is None or details.near_radius is not None)\
|
||||
and not self.qualifiers:
|
||||
sql = sql.add_columns(sa.func.first_value(inner.c.penalty - inner.c.importance)
|
||||
@@ -253,14 +152,19 @@ class PlaceSearch(base.AbstractSearch):
|
||||
penalty: SaExpression = tsearch.c.penalty
|
||||
|
||||
if self.postcodes:
|
||||
tpc = conn.t.postcode
|
||||
pcs = self.postcodes.values
|
||||
if self.has_address_terms:
|
||||
tpc = conn.t.postcode
|
||||
pcs = self.postcodes.values
|
||||
|
||||
pc_near = sa.select(sa.func.min(tpc.c.geometry.ST_Distance(t.c.centroid)))\
|
||||
.where(tpc.c.postcode.in_(pcs))\
|
||||
.scalar_subquery()
|
||||
penalty += sa.case((t.c.postcode.in_(pcs), 0.0),
|
||||
else_=sa.func.coalesce(pc_near, cast(SaColumn, 2.0)))
|
||||
pc_near = sa.select(sa.func.min(tpc.c.geometry.ST_Distance(t.c.centroid)))\
|
||||
.where(tpc.c.postcode.in_(pcs))\
|
||||
.scalar_subquery()
|
||||
penalty += sa.case((t.c.postcode.in_(pcs), 0.0),
|
||||
else_=sa.func.coalesce(pc_near, cast(SaColumn, 2.0)))
|
||||
else:
|
||||
# High penalty if the postcode is not an exact match.
|
||||
# The postcode search needs to get priority here.
|
||||
penalty += sa.case((t.c.postcode.in_(self.postcodes.values), 0.0), else_=1.0)
|
||||
|
||||
if details.viewbox is not None and not details.bounded_viewbox:
|
||||
penalty += sa.case((t.c.geometry.intersects(VIEWBOX_PARAM, use_index=False), 0.0),
|
||||
@@ -278,60 +182,12 @@ class PlaceSearch(base.AbstractSearch):
|
||||
sql = sql.add_columns(penalty.label('accuracy'))\
|
||||
.order_by(sa.text('accuracy'))
|
||||
|
||||
if self.housenumbers:
|
||||
hnr_list = '|'.join(self.housenumbers.values)
|
||||
inner = sql.where(sa.or_(tsearch.c.address_rank < 30,
|
||||
sa.func.RegexpWord(hnr_list, t.c.housenumber)))\
|
||||
.subquery()
|
||||
|
||||
# Housenumbers from placex
|
||||
thnr = conn.t.placex.alias('hnr')
|
||||
pid_list = sa.func.ArrayAgg(thnr.c.place_id)
|
||||
place_sql = sa.select(pid_list)\
|
||||
.where(thnr.c.parent_place_id == inner.c.place_id)\
|
||||
.where(sa.func.RegexpWord(hnr_list, thnr.c.housenumber))\
|
||||
.where(thnr.c.linked_place_id == None)\
|
||||
.where(thnr.c.indexed_status == 0)
|
||||
|
||||
if details.excluded:
|
||||
place_sql = place_sql.where(thnr.c.place_id.not_in(sa.bindparam('excluded')))
|
||||
if self.qualifiers:
|
||||
place_sql = place_sql.where(self.qualifiers.sql_restrict(thnr))
|
||||
|
||||
numerals = [int(n) for n in self.housenumbers.values
|
||||
if n.isdigit() and len(n) < 8]
|
||||
interpol_sql: SaColumn
|
||||
tiger_sql: SaColumn
|
||||
if numerals and \
|
||||
(not self.qualifiers or ('place', 'house') in self.qualifiers.values):
|
||||
# Housenumbers from interpolations
|
||||
interpol_sql = _make_interpolation_subquery(conn.t.osmline, inner,
|
||||
numerals, details)
|
||||
# Housenumbers from Tiger
|
||||
tiger_sql = sa.case((inner.c.country_code == 'us',
|
||||
_make_interpolation_subquery(conn.t.tiger, inner,
|
||||
numerals, details)
|
||||
), else_=None)
|
||||
else:
|
||||
interpol_sql = sa.null()
|
||||
tiger_sql = sa.null()
|
||||
|
||||
unsort = sa.select(inner, place_sql.scalar_subquery().label('placex_hnr'),
|
||||
interpol_sql.label('interpol_hnr'),
|
||||
tiger_sql.label('tiger_hnr')).subquery('unsort')
|
||||
sql = sa.select(unsort)\
|
||||
.order_by(sa.case((unsort.c.placex_hnr != None, 1),
|
||||
(unsort.c.interpol_hnr != None, 2),
|
||||
(unsort.c.tiger_hnr != None, 3),
|
||||
else_=4),
|
||||
unsort.c.accuracy)
|
||||
else:
|
||||
sql = sql.where(t.c.linked_place_id == None)\
|
||||
.where(t.c.indexed_status == 0)
|
||||
if self.qualifiers:
|
||||
sql = sql.where(self.qualifiers.sql_restrict(t))
|
||||
if details.layers is not None:
|
||||
sql = sql.where(base.filter_by_layer(t, details.layers))
|
||||
sql = sql.where(t.c.linked_place_id == None)\
|
||||
.where(t.c.indexed_status == 0)
|
||||
if self.qualifiers:
|
||||
sql = sql.where(self.qualifiers.sql_restrict(t))
|
||||
if details.layers is not None:
|
||||
sql = sql.where(base.filter_by_layer(t, details.layers))
|
||||
|
||||
sql = sql.limit(LIMIT_PARAM)
|
||||
|
||||
@@ -353,33 +209,6 @@ class PlaceSearch(base.AbstractSearch):
|
||||
assert result
|
||||
result.bbox = Bbox.from_wkb(row.bbox)
|
||||
result.accuracy = row.accuracy
|
||||
if self.housenumbers and row.rank_address < 30:
|
||||
if row.placex_hnr:
|
||||
subs = _get_placex_housenumbers(conn, row.placex_hnr, details)
|
||||
elif row.interpol_hnr:
|
||||
subs = _get_osmline(conn, row.interpol_hnr, numerals, details)
|
||||
elif row.tiger_hnr:
|
||||
subs = _get_tiger(conn, row.tiger_hnr, numerals, row.osm_id, details)
|
||||
else:
|
||||
subs = None
|
||||
|
||||
if subs is not None:
|
||||
async for sub in subs:
|
||||
assert sub.housenumber
|
||||
sub.accuracy = result.accuracy
|
||||
if not any(nr in self.housenumbers.values
|
||||
for nr in sub.housenumber.split(';')):
|
||||
sub.accuracy += 0.6
|
||||
results.append(sub)
|
||||
|
||||
# Only add the street as a result, if it meets all other
|
||||
# filter conditions.
|
||||
if (not details.excluded or result.place_id not in details.excluded)\
|
||||
and (not self.qualifiers or result.category in self.qualifiers.values)\
|
||||
and result.rank_address >= details.min_rank:
|
||||
result.accuracy += 1.0 # penalty for missing housenumber
|
||||
results.append(result)
|
||||
else:
|
||||
results.append(result)
|
||||
results.append(result)
|
||||
|
||||
return results
|
||||
|
||||
@@ -37,9 +37,7 @@ Feature: Searching of simple objects
|
||||
And geocoding "Wood Street 45"
|
||||
Then exactly 0 results are returned
|
||||
When geocoding "Red Way 34"
|
||||
Then all results contain
|
||||
| object |
|
||||
| N20 |
|
||||
Then exactly 0 results are returned
|
||||
|
||||
Scenario: when the housenumber is missing the street is still returned
|
||||
Given the grid
|
||||
|
||||
@@ -188,7 +188,6 @@ def test_name_only_search():
|
||||
assert isinstance(search, dbs.PlaceSearch)
|
||||
assert not search.postcodes.values
|
||||
assert not search.countries.values
|
||||
assert not search.housenumbers.values
|
||||
assert not search.qualifiers.values
|
||||
assert len(search.lookups) == 1
|
||||
assert len(search.rankings) == 1
|
||||
@@ -209,7 +208,6 @@ def test_name_with_qualifier():
|
||||
assert isinstance(search, dbs.PlaceSearch)
|
||||
assert not search.postcodes.values
|
||||
assert not search.countries.values
|
||||
assert not search.housenumbers.values
|
||||
assert search.qualifiers.values == [('this', 'that')]
|
||||
assert len(search.lookups) == 1
|
||||
assert len(search.rankings) == 1
|
||||
@@ -227,7 +225,7 @@ def test_name_with_housenumber_search():
|
||||
assert len(searches) == 1
|
||||
search = searches[0]
|
||||
|
||||
assert isinstance(search, dbs.PlaceSearch)
|
||||
assert isinstance(search, dbs.AddressSearch)
|
||||
assert not search.postcodes.values
|
||||
assert not search.countries.values
|
||||
assert search.housenumbers.values == ['66']
|
||||
@@ -254,7 +252,6 @@ def test_name_and_address():
|
||||
assert isinstance(search, dbs.PlaceSearch)
|
||||
assert not search.postcodes.values
|
||||
assert not search.countries.values
|
||||
assert not search.housenumbers.values
|
||||
assert len(search.lookups) == 2
|
||||
assert len(search.rankings) == 3
|
||||
|
||||
@@ -279,7 +276,6 @@ def test_name_and_complex_address():
|
||||
assert isinstance(search, dbs.PlaceSearch)
|
||||
assert not search.postcodes.values
|
||||
assert not search.countries.values
|
||||
assert not search.housenumbers.values
|
||||
assert len(search.lookups) == 2
|
||||
assert len(search.rankings) == 2
|
||||
|
||||
@@ -385,7 +381,6 @@ def test_name_only_search_with_countries():
|
||||
assert isinstance(search, dbs.PlaceSearch)
|
||||
assert not search.postcodes.values
|
||||
assert set(search.countries.values) == {'de', 'en'}
|
||||
assert not search.housenumbers.values
|
||||
|
||||
|
||||
def make_counted_searches(name_part, name_full, address_part, address_full,
|
||||
|
||||
269
test/python/api/search/test_search_address.py
Normal file
269
test/python/api/search/test_search_address.py
Normal file
@@ -0,0 +1,269 @@
|
||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||
#
|
||||
# This file is part of Nominatim. (https://nominatim.org)
|
||||
#
|
||||
# Copyright (C) 2025 by the Nominatim developer community.
|
||||
# For a full list of authors see the git log.
|
||||
"""
|
||||
Tests for running the address searcher.
|
||||
"""
|
||||
import pytest
|
||||
|
||||
import nominatim_api as napi
|
||||
from nominatim_api.types import SearchDetails
|
||||
from nominatim_api.search.db_searches import AddressSearch
|
||||
from nominatim_api.search.db_search_fields import WeightedStrings, WeightedCategories, \
|
||||
FieldLookup, FieldRanking, RankedTokens
|
||||
from nominatim_api.search.db_search_lookups import LookupAll
|
||||
|
||||
APIOPTIONS = ['search']
|
||||
|
||||
|
||||
def run_search(apiobj, frontend, global_penalty, lookup, ranking, count=2,
|
||||
hnrs=[], pcs=[], ccodes=[], quals=[], has_address=False,
|
||||
details=SearchDetails()):
|
||||
class MySearchData:
|
||||
penalty = global_penalty
|
||||
postcodes = WeightedStrings(pcs, [0.0] * len(pcs))
|
||||
countries = WeightedStrings(ccodes, [0.0] * len(ccodes))
|
||||
housenumbers = WeightedStrings(hnrs, [0.0] * len(hnrs))
|
||||
qualifiers = WeightedCategories(quals, [0.0] * len(quals))
|
||||
lookups = lookup
|
||||
rankings = ranking
|
||||
|
||||
search = AddressSearch(0.0, MySearchData(), count, has_address)
|
||||
|
||||
if frontend is None:
|
||||
api = apiobj
|
||||
else:
|
||||
api = frontend(apiobj, options=APIOPTIONS)
|
||||
|
||||
async def run():
|
||||
async with api._async_api.begin() as conn:
|
||||
return await search.lookup(conn, details)
|
||||
|
||||
results = api._loop.run_until_complete(run())
|
||||
results.sort(key=lambda r: r.accuracy)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
class TestStreetWithHousenumber:
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def fill_database(self, apiobj):
|
||||
apiobj.add_placex(place_id=1, class_='place', type='house',
|
||||
parent_place_id=1000,
|
||||
housenumber='20 a', country_code='es')
|
||||
apiobj.add_placex(place_id=2, class_='place', type='house',
|
||||
parent_place_id=1000,
|
||||
housenumber='21;22', country_code='es')
|
||||
apiobj.add_placex(place_id=1000, class_='highway', type='residential',
|
||||
rank_search=26, rank_address=26,
|
||||
country_code='es')
|
||||
apiobj.add_search_name(1000, names=[1, 2, 10, 11],
|
||||
search_rank=26, address_rank=26,
|
||||
country_code='es')
|
||||
apiobj.add_placex(place_id=91, class_='place', type='house',
|
||||
parent_place_id=2000,
|
||||
housenumber='20', country_code='pt')
|
||||
apiobj.add_placex(place_id=92, class_='place', type='house',
|
||||
parent_place_id=2000,
|
||||
housenumber='22', country_code='pt')
|
||||
apiobj.add_placex(place_id=93, class_='place', type='house',
|
||||
parent_place_id=2000,
|
||||
housenumber='24', country_code='pt')
|
||||
apiobj.add_placex(place_id=2000, class_='highway', type='residential',
|
||||
rank_search=26, rank_address=26,
|
||||
country_code='pt')
|
||||
apiobj.add_search_name(2000, names=[1, 2, 20, 21],
|
||||
search_rank=26, address_rank=26,
|
||||
country_code='pt')
|
||||
|
||||
@pytest.mark.parametrize('hnr,res', [('20', [91, 1]), ('20 a', [1]),
|
||||
('21', [2]), ('22', [2, 92]),
|
||||
('24', [93]), ('25', [])])
|
||||
def test_lookup_by_single_housenumber(self, apiobj, frontend, hnr, res):
|
||||
lookup = FieldLookup('name_vector', [1, 2], LookupAll)
|
||||
ranking = FieldRanking('name_vector', 0.3, [RankedTokens(0.0, [10])])
|
||||
|
||||
results = run_search(apiobj, frontend, 0.1, [lookup], [ranking], hnrs=[hnr])
|
||||
|
||||
assert [r.place_id for r in results] == res + [1000, 2000]
|
||||
|
||||
@pytest.mark.parametrize('cc,res', [('es', [2, 1000]), ('pt', [92, 2000])])
|
||||
def test_lookup_with_country_restriction(self, apiobj, frontend, cc, res):
|
||||
lookup = FieldLookup('name_vector', [1, 2], LookupAll)
|
||||
ranking = FieldRanking('name_vector', 0.3, [RankedTokens(0.0, [10])])
|
||||
|
||||
results = run_search(apiobj, frontend, 0.1, [lookup], [ranking], hnrs=['22'],
|
||||
ccodes=[cc])
|
||||
|
||||
assert [r.place_id for r in results] == res
|
||||
|
||||
def test_lookup_exclude_housenumber_placeid(self, apiobj, frontend):
|
||||
lookup = FieldLookup('name_vector', [1, 2], LookupAll)
|
||||
ranking = FieldRanking('name_vector', 0.3, [RankedTokens(0.0, [10])])
|
||||
|
||||
results = run_search(apiobj, frontend, 0.1, [lookup], [ranking], hnrs=['22'],
|
||||
details=SearchDetails(excluded=[92]))
|
||||
|
||||
assert [r.place_id for r in results] == [2, 1000, 2000]
|
||||
|
||||
def test_lookup_exclude_street_placeid(self, apiobj, frontend):
|
||||
lookup = FieldLookup('name_vector', [1, 2], LookupAll)
|
||||
ranking = FieldRanking('name_vector', 0.3, [RankedTokens(0.0, [10])])
|
||||
|
||||
results = run_search(apiobj, frontend, 0.1, [lookup], [ranking], hnrs=['22'],
|
||||
details=SearchDetails(excluded=[1000]))
|
||||
|
||||
assert [r.place_id for r in results] == [2, 92, 2000]
|
||||
|
||||
def test_lookup_only_house_qualifier(self, apiobj, frontend):
|
||||
lookup = FieldLookup('name_vector', [1, 2], LookupAll)
|
||||
ranking = FieldRanking('name_vector', 0.3, [RankedTokens(0.0, [10])])
|
||||
|
||||
results = run_search(apiobj, frontend, 0.1, [lookup], [ranking], hnrs=['22'],
|
||||
quals=[('place', 'house')])
|
||||
|
||||
assert [r.place_id for r in results] == [2, 92]
|
||||
|
||||
def test_lookup_only_street_qualifier(self, apiobj, frontend):
|
||||
lookup = FieldLookup('name_vector', [1, 2], LookupAll)
|
||||
ranking = FieldRanking('name_vector', 0.3, [RankedTokens(0.0, [10])])
|
||||
|
||||
results = run_search(apiobj, frontend, 0.1, [lookup], [ranking], hnrs=['22'],
|
||||
quals=[('highway', 'residential')])
|
||||
|
||||
assert [r.place_id for r in results] == [1000, 2000]
|
||||
|
||||
@pytest.mark.parametrize('rank,found', [(26, True), (27, False), (30, False)])
|
||||
def test_lookup_min_rank(self, apiobj, frontend, rank, found):
|
||||
lookup = FieldLookup('name_vector', [1, 2], LookupAll)
|
||||
ranking = FieldRanking('name_vector', 0.3, [RankedTokens(0.0, [10])])
|
||||
|
||||
results = run_search(apiobj, frontend, 0.1, [lookup], [ranking], hnrs=['22'],
|
||||
details=SearchDetails(min_rank=rank))
|
||||
|
||||
assert [r.place_id for r in results] == ([2, 92, 1000, 2000] if found else [2, 92])
|
||||
|
||||
@pytest.mark.parametrize('geom', [napi.GeometryFormat.GEOJSON,
|
||||
napi.GeometryFormat.KML,
|
||||
napi.GeometryFormat.SVG,
|
||||
napi.GeometryFormat.TEXT])
|
||||
def test_return_geometries(self, apiobj, frontend, geom):
|
||||
lookup = FieldLookup('name_vector', [1, 2], LookupAll)
|
||||
|
||||
results = run_search(apiobj, frontend, 0.1, [lookup], [], hnrs=['20', '21', '22'],
|
||||
details=SearchDetails(geometry_output=geom))
|
||||
|
||||
assert results
|
||||
assert all(geom.name.lower() in r.geometry for r in results)
|
||||
|
||||
|
||||
def test_very_large_housenumber(apiobj, frontend):
|
||||
apiobj.add_placex(place_id=93, class_='place', type='house',
|
||||
parent_place_id=2000,
|
||||
housenumber='2467463524544', country_code='pt')
|
||||
apiobj.add_placex(place_id=2000, class_='highway', type='residential',
|
||||
rank_search=26, rank_address=26,
|
||||
country_code='pt')
|
||||
apiobj.add_search_name(2000, names=[1, 2],
|
||||
search_rank=26, address_rank=26,
|
||||
country_code='pt')
|
||||
|
||||
lookup = FieldLookup('name_vector', [1, 2], LookupAll)
|
||||
|
||||
results = run_search(apiobj, frontend, 0.1, [lookup], [], hnrs=['2467463524544'],
|
||||
details=SearchDetails())
|
||||
|
||||
assert results
|
||||
assert [r.place_id for r in results] == [93, 2000]
|
||||
|
||||
|
||||
class TestInterpolations:
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def fill_database(self, apiobj):
|
||||
apiobj.add_placex(place_id=990, class_='highway', type='service',
|
||||
rank_search=27, rank_address=27,
|
||||
centroid=(10.0, 10.0),
|
||||
geometry='LINESTRING(9.995 10, 10.005 10)')
|
||||
apiobj.add_search_name(990, names=[111],
|
||||
search_rank=27, address_rank=27)
|
||||
apiobj.add_placex(place_id=991, class_='place', type='house',
|
||||
parent_place_id=990,
|
||||
rank_search=30, rank_address=30,
|
||||
housenumber='23',
|
||||
centroid=(10.0, 10.00002))
|
||||
apiobj.add_osmline(place_id=992,
|
||||
parent_place_id=990,
|
||||
startnumber=21, endnumber=29, step=2,
|
||||
centroid=(10.0, 10.00001),
|
||||
geometry='LINESTRING(9.995 10.00001, 10.005 10.00001)')
|
||||
|
||||
@pytest.mark.parametrize('hnr,res', [('21', [992]), ('22', []), ('23', [991])])
|
||||
def test_lookup_housenumber(self, apiobj, frontend, hnr, res):
|
||||
lookup = FieldLookup('name_vector', [111], LookupAll)
|
||||
|
||||
results = run_search(apiobj, frontend, 0.1, [lookup], [], hnrs=[hnr])
|
||||
|
||||
assert [r.place_id for r in results] == res + [990]
|
||||
|
||||
@pytest.mark.parametrize('geom', [napi.GeometryFormat.GEOJSON,
|
||||
napi.GeometryFormat.KML,
|
||||
napi.GeometryFormat.SVG,
|
||||
napi.GeometryFormat.TEXT])
|
||||
def test_osmline_with_geometries(self, apiobj, frontend, geom):
|
||||
lookup = FieldLookup('name_vector', [111], LookupAll)
|
||||
|
||||
results = run_search(apiobj, frontend, 0.1, [lookup], [], hnrs=['21'],
|
||||
details=SearchDetails(geometry_output=geom))
|
||||
|
||||
assert results[0].place_id == 992
|
||||
assert geom.name.lower() in results[0].geometry
|
||||
|
||||
|
||||
class TestTiger:
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def fill_database(self, apiobj):
|
||||
apiobj.add_placex(place_id=990, class_='highway', type='service',
|
||||
rank_search=27, rank_address=27,
|
||||
country_code='us',
|
||||
centroid=(10.0, 10.0),
|
||||
geometry='LINESTRING(9.995 10, 10.005 10)')
|
||||
apiobj.add_search_name(990, names=[111], country_code='us',
|
||||
search_rank=27, address_rank=27)
|
||||
apiobj.add_placex(place_id=991, class_='place', type='house',
|
||||
parent_place_id=990,
|
||||
rank_search=30, rank_address=30,
|
||||
housenumber='23',
|
||||
country_code='us',
|
||||
centroid=(10.0, 10.00002))
|
||||
apiobj.add_tiger(place_id=992,
|
||||
parent_place_id=990,
|
||||
startnumber=21, endnumber=29, step=2,
|
||||
centroid=(10.0, 10.00001),
|
||||
geometry='LINESTRING(9.995 10.00001, 10.005 10.00001)')
|
||||
|
||||
@pytest.mark.parametrize('hnr,res', [('21', [992]), ('22', []), ('23', [991])])
|
||||
def test_lookup_housenumber(self, apiobj, frontend, hnr, res):
|
||||
lookup = FieldLookup('name_vector', [111], LookupAll)
|
||||
|
||||
results = run_search(apiobj, frontend, 0.1, [lookup], [], hnrs=[hnr])
|
||||
|
||||
assert [r.place_id for r in results] == res + [990]
|
||||
|
||||
@pytest.mark.parametrize('geom', [napi.GeometryFormat.GEOJSON,
|
||||
napi.GeometryFormat.KML,
|
||||
napi.GeometryFormat.SVG,
|
||||
napi.GeometryFormat.TEXT])
|
||||
def test_tiger_with_geometries(self, apiobj, frontend, geom):
|
||||
lookup = FieldLookup('name_vector', [111], LookupAll)
|
||||
|
||||
results = run_search(apiobj, frontend, 0.1, [lookup], [], hnrs=['21'],
|
||||
details=SearchDetails(geometry_output=geom))
|
||||
|
||||
assert results[0].place_id == 992
|
||||
assert geom.name.lower() in results[0].geometry
|
||||
@@ -32,7 +32,7 @@ def run_search(apiobj, frontend, global_penalty, cat, cat_penalty=None, ccodes=[
|
||||
if ccodes is not None:
|
||||
details.countries = ccodes
|
||||
|
||||
place_search = PlaceSearch(0.0, PlaceSearchData(), 2)
|
||||
place_search = PlaceSearch(0.0, PlaceSearchData(), 2, False)
|
||||
|
||||
if cat_penalty is None:
|
||||
cat_penalty = [0.0] * len(cat)
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
# Copyright (C) 2025 by the Nominatim developer community.
|
||||
# For a full list of authors see the git log.
|
||||
"""
|
||||
Tests for running the generic place searcher.
|
||||
Tests for running the named place searcher.
|
||||
"""
|
||||
import json
|
||||
|
||||
@@ -22,18 +22,18 @@ APIOPTIONS = ['search']
|
||||
|
||||
|
||||
def run_search(apiobj, frontend, global_penalty, lookup, ranking, count=2,
|
||||
hnrs=[], pcs=[], ccodes=[], quals=[],
|
||||
pcs=[], ccodes=[], quals=[], has_address=False,
|
||||
details=SearchDetails()):
|
||||
class MySearchData:
|
||||
penalty = global_penalty
|
||||
postcodes = WeightedStrings(pcs, [0.0] * len(pcs))
|
||||
countries = WeightedStrings(ccodes, [0.0] * len(ccodes))
|
||||
housenumbers = WeightedStrings(hnrs, [0.0] * len(hnrs))
|
||||
qualifiers = WeightedCategories(quals, [0.0] * len(quals))
|
||||
lookups = lookup
|
||||
rankings = ranking
|
||||
housenumbers = None
|
||||
|
||||
search = PlaceSearch(0.0, MySearchData(), count)
|
||||
search = PlaceSearch(0.0, MySearchData(), count, has_address)
|
||||
|
||||
if frontend is None:
|
||||
api = apiobj
|
||||
@@ -205,139 +205,6 @@ class TestNameOnlySearches:
|
||||
assert [r.place_id for r in results] == [100]
|
||||
|
||||
|
||||
class TestStreetWithHousenumber:
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def fill_database(self, apiobj):
|
||||
apiobj.add_placex(place_id=1, class_='place', type='house',
|
||||
parent_place_id=1000,
|
||||
housenumber='20 a', country_code='es')
|
||||
apiobj.add_placex(place_id=2, class_='place', type='house',
|
||||
parent_place_id=1000,
|
||||
housenumber='21;22', country_code='es')
|
||||
apiobj.add_placex(place_id=1000, class_='highway', type='residential',
|
||||
rank_search=26, rank_address=26,
|
||||
country_code='es')
|
||||
apiobj.add_search_name(1000, names=[1, 2, 10, 11],
|
||||
search_rank=26, address_rank=26,
|
||||
country_code='es')
|
||||
apiobj.add_placex(place_id=91, class_='place', type='house',
|
||||
parent_place_id=2000,
|
||||
housenumber='20', country_code='pt')
|
||||
apiobj.add_placex(place_id=92, class_='place', type='house',
|
||||
parent_place_id=2000,
|
||||
housenumber='22', country_code='pt')
|
||||
apiobj.add_placex(place_id=93, class_='place', type='house',
|
||||
parent_place_id=2000,
|
||||
housenumber='24', country_code='pt')
|
||||
apiobj.add_placex(place_id=2000, class_='highway', type='residential',
|
||||
rank_search=26, rank_address=26,
|
||||
country_code='pt')
|
||||
apiobj.add_search_name(2000, names=[1, 2, 20, 21],
|
||||
search_rank=26, address_rank=26,
|
||||
country_code='pt')
|
||||
|
||||
@pytest.mark.parametrize('hnr,res', [('20', [91, 1]), ('20 a', [1]),
|
||||
('21', [2]), ('22', [2, 92]),
|
||||
('24', [93]), ('25', [])])
|
||||
def test_lookup_by_single_housenumber(self, apiobj, frontend, hnr, res):
|
||||
lookup = FieldLookup('name_vector', [1, 2], LookupAll)
|
||||
ranking = FieldRanking('name_vector', 0.3, [RankedTokens(0.0, [10])])
|
||||
|
||||
results = run_search(apiobj, frontend, 0.1, [lookup], [ranking], hnrs=[hnr])
|
||||
|
||||
assert [r.place_id for r in results] == res + [1000, 2000]
|
||||
|
||||
@pytest.mark.parametrize('cc,res', [('es', [2, 1000]), ('pt', [92, 2000])])
|
||||
def test_lookup_with_country_restriction(self, apiobj, frontend, cc, res):
|
||||
lookup = FieldLookup('name_vector', [1, 2], LookupAll)
|
||||
ranking = FieldRanking('name_vector', 0.3, [RankedTokens(0.0, [10])])
|
||||
|
||||
results = run_search(apiobj, frontend, 0.1, [lookup], [ranking], hnrs=['22'],
|
||||
ccodes=[cc])
|
||||
|
||||
assert [r.place_id for r in results] == res
|
||||
|
||||
def test_lookup_exclude_housenumber_placeid(self, apiobj, frontend):
|
||||
lookup = FieldLookup('name_vector', [1, 2], LookupAll)
|
||||
ranking = FieldRanking('name_vector', 0.3, [RankedTokens(0.0, [10])])
|
||||
|
||||
results = run_search(apiobj, frontend, 0.1, [lookup], [ranking], hnrs=['22'],
|
||||
details=SearchDetails(excluded=[92]))
|
||||
|
||||
assert [r.place_id for r in results] == [2, 1000, 2000]
|
||||
|
||||
def test_lookup_exclude_street_placeid(self, apiobj, frontend):
|
||||
lookup = FieldLookup('name_vector', [1, 2], LookupAll)
|
||||
ranking = FieldRanking('name_vector', 0.3, [RankedTokens(0.0, [10])])
|
||||
|
||||
results = run_search(apiobj, frontend, 0.1, [lookup], [ranking], hnrs=['22'],
|
||||
details=SearchDetails(excluded=[1000]))
|
||||
|
||||
assert [r.place_id for r in results] == [2, 92, 2000]
|
||||
|
||||
def test_lookup_only_house_qualifier(self, apiobj, frontend):
|
||||
lookup = FieldLookup('name_vector', [1, 2], LookupAll)
|
||||
ranking = FieldRanking('name_vector', 0.3, [RankedTokens(0.0, [10])])
|
||||
|
||||
results = run_search(apiobj, frontend, 0.1, [lookup], [ranking], hnrs=['22'],
|
||||
quals=[('place', 'house')])
|
||||
|
||||
assert [r.place_id for r in results] == [2, 92]
|
||||
|
||||
def test_lookup_only_street_qualifier(self, apiobj, frontend):
|
||||
lookup = FieldLookup('name_vector', [1, 2], LookupAll)
|
||||
ranking = FieldRanking('name_vector', 0.3, [RankedTokens(0.0, [10])])
|
||||
|
||||
results = run_search(apiobj, frontend, 0.1, [lookup], [ranking], hnrs=['22'],
|
||||
quals=[('highway', 'residential')])
|
||||
|
||||
assert [r.place_id for r in results] == [1000, 2000]
|
||||
|
||||
@pytest.mark.parametrize('rank,found', [(26, True), (27, False), (30, False)])
|
||||
def test_lookup_min_rank(self, apiobj, frontend, rank, found):
|
||||
lookup = FieldLookup('name_vector', [1, 2], LookupAll)
|
||||
ranking = FieldRanking('name_vector', 0.3, [RankedTokens(0.0, [10])])
|
||||
|
||||
results = run_search(apiobj, frontend, 0.1, [lookup], [ranking], hnrs=['22'],
|
||||
details=SearchDetails(min_rank=rank))
|
||||
|
||||
assert [r.place_id for r in results] == ([2, 92, 1000, 2000] if found else [2, 92])
|
||||
|
||||
@pytest.mark.parametrize('geom', [napi.GeometryFormat.GEOJSON,
|
||||
napi.GeometryFormat.KML,
|
||||
napi.GeometryFormat.SVG,
|
||||
napi.GeometryFormat.TEXT])
|
||||
def test_return_geometries(self, apiobj, frontend, geom):
|
||||
lookup = FieldLookup('name_vector', [1, 2], LookupAll)
|
||||
|
||||
results = run_search(apiobj, frontend, 0.1, [lookup], [], hnrs=['20', '21', '22'],
|
||||
details=SearchDetails(geometry_output=geom))
|
||||
|
||||
assert results
|
||||
assert all(geom.name.lower() in r.geometry for r in results)
|
||||
|
||||
|
||||
def test_very_large_housenumber(apiobj, frontend):
|
||||
apiobj.add_placex(place_id=93, class_='place', type='house',
|
||||
parent_place_id=2000,
|
||||
housenumber='2467463524544', country_code='pt')
|
||||
apiobj.add_placex(place_id=2000, class_='highway', type='residential',
|
||||
rank_search=26, rank_address=26,
|
||||
country_code='pt')
|
||||
apiobj.add_search_name(2000, names=[1, 2],
|
||||
search_rank=26, address_rank=26,
|
||||
country_code='pt')
|
||||
|
||||
lookup = FieldLookup('name_vector', [1, 2], LookupAll)
|
||||
|
||||
results = run_search(apiobj, frontend, 0.1, [lookup], [], hnrs=['2467463524544'],
|
||||
details=SearchDetails())
|
||||
|
||||
assert results
|
||||
assert [r.place_id for r in results] == [93, 2000]
|
||||
|
||||
|
||||
@pytest.mark.parametrize('wcount,rids', [(2, [990, 991]), (30000, [990])])
|
||||
def test_name_and_postcode(apiobj, frontend, wcount, rids):
|
||||
apiobj.add_placex(place_id=990, class_='highway', type='service',
|
||||
@@ -366,94 +233,6 @@ def test_name_and_postcode(apiobj, frontend, wcount, rids):
|
||||
assert [r.place_id for r in results] == rids
|
||||
|
||||
|
||||
class TestInterpolations:
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def fill_database(self, apiobj):
|
||||
apiobj.add_placex(place_id=990, class_='highway', type='service',
|
||||
rank_search=27, rank_address=27,
|
||||
centroid=(10.0, 10.0),
|
||||
geometry='LINESTRING(9.995 10, 10.005 10)')
|
||||
apiobj.add_search_name(990, names=[111],
|
||||
search_rank=27, address_rank=27)
|
||||
apiobj.add_placex(place_id=991, class_='place', type='house',
|
||||
parent_place_id=990,
|
||||
rank_search=30, rank_address=30,
|
||||
housenumber='23',
|
||||
centroid=(10.0, 10.00002))
|
||||
apiobj.add_osmline(place_id=992,
|
||||
parent_place_id=990,
|
||||
startnumber=21, endnumber=29, step=2,
|
||||
centroid=(10.0, 10.00001),
|
||||
geometry='LINESTRING(9.995 10.00001, 10.005 10.00001)')
|
||||
|
||||
@pytest.mark.parametrize('hnr,res', [('21', [992]), ('22', []), ('23', [991])])
|
||||
def test_lookup_housenumber(self, apiobj, frontend, hnr, res):
|
||||
lookup = FieldLookup('name_vector', [111], LookupAll)
|
||||
|
||||
results = run_search(apiobj, frontend, 0.1, [lookup], [], hnrs=[hnr])
|
||||
|
||||
assert [r.place_id for r in results] == res + [990]
|
||||
|
||||
@pytest.mark.parametrize('geom', [napi.GeometryFormat.GEOJSON,
|
||||
napi.GeometryFormat.KML,
|
||||
napi.GeometryFormat.SVG,
|
||||
napi.GeometryFormat.TEXT])
|
||||
def test_osmline_with_geometries(self, apiobj, frontend, geom):
|
||||
lookup = FieldLookup('name_vector', [111], LookupAll)
|
||||
|
||||
results = run_search(apiobj, frontend, 0.1, [lookup], [], hnrs=['21'],
|
||||
details=SearchDetails(geometry_output=geom))
|
||||
|
||||
assert results[0].place_id == 992
|
||||
assert geom.name.lower() in results[0].geometry
|
||||
|
||||
|
||||
class TestTiger:
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def fill_database(self, apiobj):
|
||||
apiobj.add_placex(place_id=990, class_='highway', type='service',
|
||||
rank_search=27, rank_address=27,
|
||||
country_code='us',
|
||||
centroid=(10.0, 10.0),
|
||||
geometry='LINESTRING(9.995 10, 10.005 10)')
|
||||
apiobj.add_search_name(990, names=[111], country_code='us',
|
||||
search_rank=27, address_rank=27)
|
||||
apiobj.add_placex(place_id=991, class_='place', type='house',
|
||||
parent_place_id=990,
|
||||
rank_search=30, rank_address=30,
|
||||
housenumber='23',
|
||||
country_code='us',
|
||||
centroid=(10.0, 10.00002))
|
||||
apiobj.add_tiger(place_id=992,
|
||||
parent_place_id=990,
|
||||
startnumber=21, endnumber=29, step=2,
|
||||
centroid=(10.0, 10.00001),
|
||||
geometry='LINESTRING(9.995 10.00001, 10.005 10.00001)')
|
||||
|
||||
@pytest.mark.parametrize('hnr,res', [('21', [992]), ('22', []), ('23', [991])])
|
||||
def test_lookup_housenumber(self, apiobj, frontend, hnr, res):
|
||||
lookup = FieldLookup('name_vector', [111], LookupAll)
|
||||
|
||||
results = run_search(apiobj, frontend, 0.1, [lookup], [], hnrs=[hnr])
|
||||
|
||||
assert [r.place_id for r in results] == res + [990]
|
||||
|
||||
@pytest.mark.parametrize('geom', [napi.GeometryFormat.GEOJSON,
|
||||
napi.GeometryFormat.KML,
|
||||
napi.GeometryFormat.SVG,
|
||||
napi.GeometryFormat.TEXT])
|
||||
def test_tiger_with_geometries(self, apiobj, frontend, geom):
|
||||
lookup = FieldLookup('name_vector', [111], LookupAll)
|
||||
|
||||
results = run_search(apiobj, frontend, 0.1, [lookup], [], hnrs=['21'],
|
||||
details=SearchDetails(geometry_output=geom))
|
||||
|
||||
assert results[0].place_id == 992
|
||||
assert geom.name.lower() in results[0].geometry
|
||||
|
||||
|
||||
class TestLayersRank30:
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
|
||||
Reference in New Issue
Block a user