split db_searches moving each class in its own file

This commit is contained in:
Sarah Hoffmann
2025-07-01 22:57:04 +02:00
parent a7797f8b37
commit 11d624e92a
8 changed files with 1043 additions and 867 deletions

View File

@@ -1,867 +0,0 @@
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2024 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Implementation of the actual database accesses for forward search.
"""
from typing import List, Tuple, AsyncIterator, Dict, Any, Callable, cast
import abc
import sqlalchemy as sa
from ..typing import SaFromClause, SaScalarSelect, SaColumn, \
SaExpression, SaSelect, SaLambdaSelect, SaRow, SaBind
from ..sql.sqlalchemy_types import Geometry, IntArray
from ..connection import SearchConnection
from ..types import SearchDetails, DataLayer, GeometryFormat, Bbox
from .. import results as nres
from .db_search_fields import SearchData, WeightedCategories
def no_index(expr: SaColumn) -> SaColumn:
""" Wrap the given expression, so that the query planner will
refrain from using the expression for index lookup.
"""
return sa.func.coalesce(sa.null(), expr)
def _details_to_bind_params(details: SearchDetails) -> Dict[str, Any]:
""" Create a dictionary from search parameters that can be used
as bind parameter for SQL execute.
"""
return {'limit': details.max_results,
'min_rank': details.min_rank,
'max_rank': details.max_rank,
'viewbox': details.viewbox,
'viewbox2': details.viewbox_x2,
'near': details.near,
'near_radius': details.near_radius,
'excluded': details.excluded,
'countries': details.countries}
LIMIT_PARAM: SaBind = sa.bindparam('limit')
MIN_RANK_PARAM: SaBind = sa.bindparam('min_rank')
MAX_RANK_PARAM: SaBind = sa.bindparam('max_rank')
VIEWBOX_PARAM: SaBind = sa.bindparam('viewbox', type_=Geometry)
VIEWBOX2_PARAM: SaBind = sa.bindparam('viewbox2', type_=Geometry)
NEAR_PARAM: SaBind = sa.bindparam('near', type_=Geometry)
NEAR_RADIUS_PARAM: SaBind = sa.bindparam('near_radius')
COUNTRIES_PARAM: SaBind = sa.bindparam('countries')
def filter_by_area(sql: SaSelect, t: SaFromClause,
details: SearchDetails, avoid_index: bool = False) -> SaSelect:
""" Apply SQL statements for filtering by viewbox and near point,
if applicable.
"""
if details.near is not None and details.near_radius is not None:
if details.near_radius < 0.1 and not avoid_index:
sql = sql.where(t.c.geometry.within_distance(NEAR_PARAM, NEAR_RADIUS_PARAM))
else:
sql = sql.where(t.c.geometry.ST_Distance(NEAR_PARAM) <= NEAR_RADIUS_PARAM)
if details.viewbox is not None and details.bounded_viewbox:
sql = sql.where(t.c.geometry.intersects(VIEWBOX_PARAM,
use_index=not avoid_index and
details.viewbox.area < 0.2))
return sql
def _exclude_places(t: SaFromClause) -> Callable[[], SaExpression]:
return lambda: t.c.place_id.not_in(sa.bindparam('excluded'))
def _select_placex(t: SaFromClause) -> SaSelect:
return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
t.c.class_, t.c.type,
t.c.address, t.c.extratags,
t.c.housenumber, t.c.postcode, t.c.country_code,
t.c.wikipedia,
t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
t.c.linked_place_id, t.c.admin_level,
t.c.centroid,
t.c.geometry.ST_Expand(0).label('bbox'))
def _add_geometry_columns(sql: SaLambdaSelect, col: SaColumn, details: SearchDetails) -> SaSelect:
out = []
if details.geometry_simplification > 0.0:
col = sa.func.ST_SimplifyPreserveTopology(col, details.geometry_simplification)
if details.geometry_output & GeometryFormat.GEOJSON:
out.append(sa.func.ST_AsGeoJSON(col, 7).label('geometry_geojson'))
if details.geometry_output & GeometryFormat.TEXT:
out.append(sa.func.ST_AsText(col).label('geometry_text'))
if details.geometry_output & GeometryFormat.KML:
out.append(sa.func.ST_AsKML(col, 7).label('geometry_kml'))
if details.geometry_output & GeometryFormat.SVG:
out.append(sa.func.ST_AsSVG(col, 0, 7).label('geometry_svg'))
return sql.add_columns(*out)
def _make_interpolation_subquery(table: SaFromClause, inner: SaFromClause,
numerals: List[int], details: SearchDetails) -> SaScalarSelect:
all_ids = sa.func.ArrayAgg(table.c.place_id)
sql = sa.select(all_ids).where(table.c.parent_place_id == inner.c.place_id)
if len(numerals) == 1:
sql = sql.where(sa.between(numerals[0], table.c.startnumber, table.c.endnumber))\
.where((numerals[0] - table.c.startnumber) % table.c.step == 0)
else:
sql = sql.where(sa.or_(
*(sa.and_(sa.between(n, table.c.startnumber, table.c.endnumber),
(n - table.c.startnumber) % table.c.step == 0)
for n in numerals)))
if details.excluded:
sql = sql.where(_exclude_places(table))
return sql.scalar_subquery()
def _filter_by_layer(table: SaFromClause, layers: DataLayer) -> SaColumn:
orexpr: List[SaExpression] = []
if layers & DataLayer.ADDRESS and layers & DataLayer.POI:
orexpr.append(no_index(table.c.rank_address).between(1, 30))
elif layers & DataLayer.ADDRESS:
orexpr.append(no_index(table.c.rank_address).between(1, 29))
orexpr.append(sa.func.IsAddressPoint(table))
elif layers & DataLayer.POI:
orexpr.append(sa.and_(no_index(table.c.rank_address) == 30,
table.c.class_.not_in(('place', 'building'))))
if layers & DataLayer.MANMADE:
exclude = []
if not layers & DataLayer.RAILWAY:
exclude.append('railway')
if not layers & DataLayer.NATURAL:
exclude.extend(('natural', 'water', 'waterway'))
orexpr.append(sa.and_(table.c.class_.not_in(tuple(exclude)),
no_index(table.c.rank_address) == 0))
else:
include = []
if layers & DataLayer.RAILWAY:
include.append('railway')
if layers & DataLayer.NATURAL:
include.extend(('natural', 'water', 'waterway'))
orexpr.append(sa.and_(table.c.class_.in_(tuple(include)),
no_index(table.c.rank_address) == 0))
if len(orexpr) == 1:
return orexpr[0]
return sa.or_(*orexpr)
def _interpolated_position(table: SaFromClause, nr: SaColumn) -> SaColumn:
pos = sa.cast(nr - table.c.startnumber, sa.Float) / (table.c.endnumber - table.c.startnumber)
return sa.case(
(table.c.endnumber == table.c.startnumber, table.c.linegeo.ST_Centroid()),
else_=table.c.linegeo.ST_LineInterpolatePoint(pos)).label('centroid')
async def _get_placex_housenumbers(conn: SearchConnection,
place_ids: List[int],
details: SearchDetails) -> AsyncIterator[nres.SearchResult]:
t = conn.t.placex
sql = _select_placex(t).add_columns(t.c.importance)\
.where(t.c.place_id.in_(place_ids))
if details.geometry_output:
sql = _add_geometry_columns(sql, t.c.geometry, details)
for row in await conn.execute(sql):
result = nres.create_from_placex_row(row, nres.SearchResult)
assert result
result.bbox = Bbox.from_wkb(row.bbox)
yield result
def _int_list_to_subquery(inp: List[int]) -> 'sa.Subquery':
""" Create a subselect that returns the given list of integers
as rows in the column 'nr'.
"""
vtab = sa.func.JsonArrayEach(sa.type_coerce(inp, sa.JSON))\
.table_valued(sa.column('value', type_=sa.JSON))
return sa.select(sa.cast(sa.cast(vtab.c.value, sa.Text), sa.Integer).label('nr')).subquery()
async def _get_osmline(conn: SearchConnection, place_ids: List[int],
numerals: List[int],
details: SearchDetails) -> AsyncIterator[nres.SearchResult]:
t = conn.t.osmline
values = _int_list_to_subquery(numerals)
sql = sa.select(t.c.place_id, t.c.osm_id,
t.c.parent_place_id, t.c.address,
values.c.nr.label('housenumber'),
_interpolated_position(t, values.c.nr),
t.c.postcode, t.c.country_code)\
.where(t.c.place_id.in_(place_ids))\
.join(values, values.c.nr.between(t.c.startnumber, t.c.endnumber))
if details.geometry_output:
sub = sql.subquery()
sql = _add_geometry_columns(sa.select(sub), sub.c.centroid, details)
for row in await conn.execute(sql):
result = nres.create_from_osmline_row(row, nres.SearchResult)
assert result
yield result
async def _get_tiger(conn: SearchConnection, place_ids: List[int],
numerals: List[int], osm_id: int,
details: SearchDetails) -> AsyncIterator[nres.SearchResult]:
t = conn.t.tiger
values = _int_list_to_subquery(numerals)
sql = sa.select(t.c.place_id, t.c.parent_place_id,
sa.literal('W').label('osm_type'),
sa.literal(osm_id).label('osm_id'),
values.c.nr.label('housenumber'),
_interpolated_position(t, values.c.nr),
t.c.postcode)\
.where(t.c.place_id.in_(place_ids))\
.join(values, values.c.nr.between(t.c.startnumber, t.c.endnumber))
if details.geometry_output:
sub = sql.subquery()
sql = _add_geometry_columns(sa.select(sub), sub.c.centroid, details)
for row in await conn.execute(sql):
result = nres.create_from_tiger_row(row, nres.SearchResult)
assert result
yield result
class AbstractSearch(abc.ABC):
""" Encapuslation of a single lookup in the database.
"""
SEARCH_PRIO: int = 2
def __init__(self, penalty: float) -> None:
self.penalty = penalty
@abc.abstractmethod
async def lookup(self, conn: SearchConnection,
details: SearchDetails) -> nres.SearchResults:
""" Find results for the search in the database.
"""
class NearSearch(AbstractSearch):
""" Category search of a place type near the result of another search.
"""
def __init__(self, penalty: float, categories: WeightedCategories,
search: AbstractSearch) -> None:
super().__init__(penalty)
self.search = search
self.categories = categories
async def lookup(self, conn: SearchConnection,
details: SearchDetails) -> nres.SearchResults:
""" Find results for the search in the database.
"""
results = nres.SearchResults()
base = await self.search.lookup(conn, details)
if not base:
return results
base.sort(key=lambda r: (r.accuracy, r.rank_search))
max_accuracy = base[0].accuracy + 0.5
if base[0].rank_address == 0:
min_rank = 0
max_rank = 0
elif base[0].rank_address < 26:
min_rank = 1
max_rank = min(25, base[0].rank_address + 4)
else:
min_rank = 26
max_rank = 30
base = nres.SearchResults(r for r in base
if (r.source_table == nres.SourceTable.PLACEX
and r.accuracy <= max_accuracy
and r.bbox and r.bbox.area < 20
and r.rank_address >= min_rank
and r.rank_address <= max_rank))
if base:
baseids = [b.place_id for b in base[:5] if b.place_id]
for category, penalty in self.categories:
await self.lookup_category(results, conn, baseids, category, penalty, details)
if len(results) >= details.max_results:
break
return results
async def lookup_category(self, results: nres.SearchResults,
conn: SearchConnection, ids: List[int],
category: Tuple[str, str], penalty: float,
details: SearchDetails) -> None:
""" Find places of the given category near the list of
place ids and add the results to 'results'.
"""
table = await conn.get_class_table(*category)
tgeom = conn.t.placex.alias('pgeom')
if table is None:
# No classtype table available, do a simplified lookup in placex.
table = conn.t.placex
sql = sa.select(table.c.place_id,
sa.func.min(tgeom.c.centroid.ST_Distance(table.c.centroid))
.label('dist'))\
.join(tgeom, table.c.geometry.intersects(tgeom.c.centroid.ST_Expand(0.01)))\
.where(table.c.class_ == category[0])\
.where(table.c.type == category[1])
else:
# Use classtype table. We can afford to use a larger
# radius for the lookup.
sql = sa.select(table.c.place_id,
sa.func.min(tgeom.c.centroid.ST_Distance(table.c.centroid))
.label('dist'))\
.join(tgeom,
table.c.centroid.ST_CoveredBy(
sa.case((sa.and_(tgeom.c.rank_address > 9,
tgeom.c.geometry.is_area()),
tgeom.c.geometry),
else_=tgeom.c.centroid.ST_Expand(0.05))))
inner = sql.where(tgeom.c.place_id.in_(ids))\
.group_by(table.c.place_id).subquery()
t = conn.t.placex
sql = _select_placex(t).add_columns((-inner.c.dist).label('importance'))\
.join(inner, inner.c.place_id == t.c.place_id)\
.order_by(inner.c.dist)
sql = sql.where(no_index(t.c.rank_address).between(MIN_RANK_PARAM, MAX_RANK_PARAM))
if details.countries:
sql = sql.where(t.c.country_code.in_(COUNTRIES_PARAM))
if details.excluded:
sql = sql.where(_exclude_places(t))
if details.layers is not None:
sql = sql.where(_filter_by_layer(t, details.layers))
sql = sql.limit(LIMIT_PARAM)
for row in await conn.execute(sql, _details_to_bind_params(details)):
result = nres.create_from_placex_row(row, nres.SearchResult)
assert result
result.accuracy = self.penalty + penalty
result.bbox = Bbox.from_wkb(row.bbox)
results.append(result)
class PoiSearch(AbstractSearch):
""" Category search in a geographic area.
"""
def __init__(self, sdata: SearchData) -> None:
super().__init__(sdata.penalty)
self.qualifiers = sdata.qualifiers
self.countries = sdata.countries
async def lookup(self, conn: SearchConnection,
details: SearchDetails) -> nres.SearchResults:
""" Find results for the search in the database.
"""
bind_params = _details_to_bind_params(details)
t = conn.t.placex
rows: List[SaRow] = []
if details.near and details.near_radius is not None and details.near_radius < 0.2:
# simply search in placex table
def _base_query() -> SaSelect:
return _select_placex(t) \
.add_columns((-t.c.centroid.ST_Distance(NEAR_PARAM))
.label('importance'))\
.where(t.c.linked_place_id == None) \
.where(t.c.geometry.within_distance(NEAR_PARAM, NEAR_RADIUS_PARAM)) \
.order_by(t.c.centroid.ST_Distance(NEAR_PARAM)) \
.limit(LIMIT_PARAM)
classtype = self.qualifiers.values
if len(classtype) == 1:
cclass, ctype = classtype[0]
sql: SaLambdaSelect = sa.lambda_stmt(
lambda: _base_query().where(t.c.class_ == cclass)
.where(t.c.type == ctype))
else:
sql = _base_query().where(sa.or_(*(sa.and_(t.c.class_ == cls, t.c.type == typ)
for cls, typ in classtype)))
if self.countries:
sql = sql.where(t.c.country_code.in_(self.countries.values))
if details.viewbox is not None and details.bounded_viewbox:
sql = sql.where(t.c.geometry.intersects(VIEWBOX_PARAM))
rows.extend(await conn.execute(sql, bind_params))
else:
# use the class type tables
for category in self.qualifiers.values:
table = await conn.get_class_table(*category)
if table is not None:
sql = _select_placex(t)\
.add_columns(t.c.importance)\
.join(table, t.c.place_id == table.c.place_id)\
.where(t.c.class_ == category[0])\
.where(t.c.type == category[1])
if details.viewbox is not None and details.bounded_viewbox:
sql = sql.where(table.c.centroid.intersects(VIEWBOX_PARAM))
if details.near and details.near_radius is not None:
sql = sql.order_by(table.c.centroid.ST_Distance(NEAR_PARAM))\
.where(table.c.centroid.within_distance(NEAR_PARAM,
NEAR_RADIUS_PARAM))
if self.countries:
sql = sql.where(t.c.country_code.in_(self.countries.values))
sql = sql.limit(LIMIT_PARAM)
rows.extend(await conn.execute(sql, bind_params))
results = nres.SearchResults()
for row in rows:
result = nres.create_from_placex_row(row, nres.SearchResult)
assert result
result.accuracy = self.penalty + self.qualifiers.get_penalty((row.class_, row.type))
result.bbox = Bbox.from_wkb(row.bbox)
results.append(result)
return results
class CountrySearch(AbstractSearch):
""" Search for a country name or country code.
"""
SEARCH_PRIO = 0
def __init__(self, sdata: SearchData) -> None:
super().__init__(sdata.penalty)
self.countries = sdata.countries
async def lookup(self, conn: SearchConnection,
details: SearchDetails) -> nres.SearchResults:
""" Find results for the search in the database.
"""
t = conn.t.placex
ccodes = self.countries.values
sql = _select_placex(t)\
.add_columns(t.c.importance)\
.where(t.c.country_code.in_(ccodes))\
.where(t.c.rank_address == 4)
if details.geometry_output:
sql = _add_geometry_columns(sql, t.c.geometry, details)
if details.excluded:
sql = sql.where(_exclude_places(t))
sql = filter_by_area(sql, t, details)
results = nres.SearchResults()
for row in await conn.execute(sql, _details_to_bind_params(details)):
result = nres.create_from_placex_row(row, nres.SearchResult)
assert result
result.accuracy = self.penalty + self.countries.get_penalty(row.country_code, 5.0)
result.bbox = Bbox.from_wkb(row.bbox)
results.append(result)
if not results:
results = await self.lookup_in_country_table(conn, details)
if results:
details.min_rank = min(5, details.max_rank)
details.max_rank = min(25, details.max_rank)
return results
async def lookup_in_country_table(self, conn: SearchConnection,
details: SearchDetails) -> nres.SearchResults:
""" Look up the country in the fallback country tables.
"""
# Avoid the fallback search when this is a more search. Country results
# usually are in the first batch of results and it is not possible
# to exclude these fallbacks.
if details.excluded:
return nres.SearchResults()
t = conn.t.country_name
tgrid = conn.t.country_grid
sql = sa.select(tgrid.c.country_code,
tgrid.c.geometry.ST_Centroid().ST_Collect().ST_Centroid()
.label('centroid'),
tgrid.c.geometry.ST_Collect().ST_Expand(0).label('bbox'))\
.where(tgrid.c.country_code.in_(self.countries.values))\
.group_by(tgrid.c.country_code)
sql = filter_by_area(sql, tgrid, details, avoid_index=True)
sub = sql.subquery('grid')
sql = sa.select(t.c.country_code,
t.c.name.merge(t.c.derived_name).label('name'),
sub.c.centroid, sub.c.bbox)\
.join(sub, t.c.country_code == sub.c.country_code)
if details.geometry_output:
sql = _add_geometry_columns(sql, sub.c.centroid, details)
results = nres.SearchResults()
for row in await conn.execute(sql, _details_to_bind_params(details)):
result = nres.create_from_country_row(row, nres.SearchResult)
assert result
result.bbox = Bbox.from_wkb(row.bbox)
result.accuracy = self.penalty + self.countries.get_penalty(row.country_code, 5.0)
results.append(result)
return results
class PostcodeSearch(AbstractSearch):
""" Search for a postcode.
"""
def __init__(self, extra_penalty: float, sdata: SearchData) -> None:
super().__init__(sdata.penalty + extra_penalty)
self.countries = sdata.countries
self.postcodes = sdata.postcodes
self.lookups = sdata.lookups
self.rankings = sdata.rankings
async def lookup(self, conn: SearchConnection,
details: SearchDetails) -> nres.SearchResults:
""" Find results for the search in the database.
"""
t = conn.t.postcode
pcs = self.postcodes.values
sql = sa.select(t.c.place_id, t.c.parent_place_id,
t.c.rank_search, t.c.rank_address,
t.c.postcode, t.c.country_code,
t.c.geometry.label('centroid'))\
.where(t.c.postcode.in_(pcs))
if details.geometry_output:
sql = _add_geometry_columns(sql, t.c.geometry, details)
penalty: SaExpression = sa.literal(self.penalty)
if details.viewbox is not None and not details.bounded_viewbox:
penalty += sa.case((t.c.geometry.intersects(VIEWBOX_PARAM), 0.0),
(t.c.geometry.intersects(VIEWBOX2_PARAM), 0.5),
else_=1.0)
if details.near is not None:
sql = sql.order_by(t.c.geometry.ST_Distance(NEAR_PARAM))
sql = filter_by_area(sql, t, details)
if self.countries:
sql = sql.where(t.c.country_code.in_(self.countries.values))
if details.excluded:
sql = sql.where(_exclude_places(t))
if self.lookups:
assert len(self.lookups) == 1
tsearch = conn.t.search_name
sql = sql.where(tsearch.c.place_id == t.c.parent_place_id)\
.where((tsearch.c.name_vector + tsearch.c.nameaddress_vector)
.contains(sa.type_coerce(self.lookups[0].tokens,
IntArray)))
# Do NOT add rerank penalties based on the address terms.
# The standard rerank penalty only checks the address vector
# while terms may appear in name and address vector. This would
# lead to overly high penalties.
# We assume that a postcode is precise enough to not require
# additional full name matches.
penalty += sa.case(*((t.c.postcode == v, p) for v, p in self.postcodes),
else_=1.0)
sql = sql.add_columns(penalty.label('accuracy'))
sql = sql.order_by('accuracy').limit(LIMIT_PARAM)
results = nres.SearchResults()
for row in await conn.execute(sql, _details_to_bind_params(details)):
p = conn.t.placex
placex_sql = _select_placex(p)\
.add_columns(p.c.importance)\
.where(sa.text("""class = 'boundary'
AND type = 'postal_code'
AND osm_type = 'R'"""))\
.where(p.c.country_code == row.country_code)\
.where(p.c.postcode == row.postcode)\
.limit(1)
if details.geometry_output:
placex_sql = _add_geometry_columns(placex_sql, p.c.geometry, details)
for prow in await conn.execute(placex_sql, _details_to_bind_params(details)):
result = nres.create_from_placex_row(prow, nres.SearchResult)
if result is not None:
result.bbox = Bbox.from_wkb(prow.bbox)
break
else:
result = nres.create_from_postcode_row(row, nres.SearchResult)
assert result
if result.place_id not in details.excluded:
result.accuracy = row.accuracy
results.append(result)
return results
class PlaceSearch(AbstractSearch):
""" Generic search for an address or named place.
"""
SEARCH_PRIO = 1
def __init__(self, extra_penalty: float, sdata: SearchData, expected_count: int) -> None:
super().__init__(sdata.penalty + extra_penalty)
self.countries = sdata.countries
self.postcodes = sdata.postcodes
self.housenumbers = sdata.housenumbers
self.qualifiers = sdata.qualifiers
self.lookups = sdata.lookups
self.rankings = sdata.rankings
self.expected_count = expected_count
def _inner_search_name_cte(self, conn: SearchConnection,
details: SearchDetails) -> 'sa.CTE':
""" Create a subquery that preselects the rows in the search_name
table.
"""
t = conn.t.search_name
penalty: SaExpression = sa.literal(self.penalty)
for ranking in self.rankings:
penalty += ranking.sql_penalty(t)
sql = sa.select(t.c.place_id, t.c.search_rank, t.c.address_rank,
t.c.country_code, t.c.centroid,
t.c.name_vector, t.c.nameaddress_vector,
sa.case((t.c.importance > 0, t.c.importance),
else_=0.40001-(sa.cast(t.c.search_rank, sa.Float())/75))
.label('importance'),
penalty.label('penalty'))
for lookup in self.lookups:
sql = sql.where(lookup.sql_condition(t))
if self.countries:
sql = sql.where(t.c.country_code.in_(self.countries.values))
if self.postcodes:
# if a postcode is given, don't search for state or country level objects
sql = sql.where(t.c.address_rank > 9)
if self.expected_count > 10000:
# Many results expected. Restrict by postcode.
tpc = conn.t.postcode
sql = sql.where(sa.select(tpc.c.postcode)
.where(tpc.c.postcode.in_(self.postcodes.values))
.where(t.c.centroid.within_distance(tpc.c.geometry, 0.4))
.exists())
if details.viewbox is not None:
if details.bounded_viewbox:
sql = sql.where(t.c.centroid
.intersects(VIEWBOX_PARAM,
use_index=details.viewbox.area < 0.2))
elif not self.postcodes and not self.housenumbers and self.expected_count >= 10000:
sql = sql.where(t.c.centroid
.intersects(VIEWBOX2_PARAM,
use_index=details.viewbox.area < 0.5))
if details.near is not None and details.near_radius is not None:
if details.near_radius < 0.1:
sql = sql.where(t.c.centroid.within_distance(NEAR_PARAM,
NEAR_RADIUS_PARAM))
else:
sql = sql.where(t.c.centroid
.ST_Distance(NEAR_PARAM) < NEAR_RADIUS_PARAM)
if self.housenumbers:
sql = sql.where(t.c.address_rank.between(16, 30))
else:
if details.excluded:
sql = sql.where(_exclude_places(t))
if details.min_rank > 0:
sql = sql.where(sa.or_(t.c.address_rank >= MIN_RANK_PARAM,
t.c.search_rank >= MIN_RANK_PARAM))
if details.max_rank < 30:
sql = sql.where(sa.or_(t.c.address_rank <= MAX_RANK_PARAM,
t.c.search_rank <= MAX_RANK_PARAM))
inner = sql.limit(10000).order_by(sa.desc(sa.text('importance'))).subquery()
sql = sa.select(inner.c.place_id, inner.c.search_rank, inner.c.address_rank,
inner.c.country_code, inner.c.centroid, inner.c.importance,
inner.c.penalty)
# If the query is not an address search or has a geographic preference,
# preselect most important items to restrict the number of places
# that need to be looked up in placex.
if not self.housenumbers\
and (details.viewbox is None or details.bounded_viewbox)\
and (details.near is None or details.near_radius is not None)\
and not self.qualifiers:
sql = sql.add_columns(sa.func.first_value(inner.c.penalty - inner.c.importance)
.over(order_by=inner.c.penalty - inner.c.importance)
.label('min_penalty'))
inner = sql.subquery()
sql = sa.select(inner.c.place_id, inner.c.search_rank, inner.c.address_rank,
inner.c.country_code, inner.c.centroid, inner.c.importance,
inner.c.penalty)\
.where(inner.c.penalty - inner.c.importance < inner.c.min_penalty + 0.5)
return sql.cte('searches')
async def lookup(self, conn: SearchConnection,
details: SearchDetails) -> nres.SearchResults:
""" Find results for the search in the database.
"""
t = conn.t.placex
tsearch = self._inner_search_name_cte(conn, details)
sql = _select_placex(t).join(tsearch, t.c.place_id == tsearch.c.place_id)
if details.geometry_output:
sql = _add_geometry_columns(sql, t.c.geometry, details)
penalty: SaExpression = tsearch.c.penalty
if self.postcodes:
tpc = conn.t.postcode
pcs = self.postcodes.values
pc_near = sa.select(sa.func.min(tpc.c.geometry.ST_Distance(t.c.centroid)))\
.where(tpc.c.postcode.in_(pcs))\
.scalar_subquery()
penalty += sa.case((t.c.postcode.in_(pcs), 0.0),
else_=sa.func.coalesce(pc_near, cast(SaColumn, 2.0)))
if details.viewbox is not None and not details.bounded_viewbox:
penalty += sa.case((t.c.geometry.intersects(VIEWBOX_PARAM, use_index=False), 0.0),
(t.c.geometry.intersects(VIEWBOX2_PARAM, use_index=False), 0.5),
else_=1.0)
if details.near is not None:
sql = sql.add_columns((-tsearch.c.centroid.ST_Distance(NEAR_PARAM))
.label('importance'))
sql = sql.order_by(sa.desc(sa.text('importance')))
else:
sql = sql.order_by(penalty - tsearch.c.importance)
sql = sql.add_columns(tsearch.c.importance)
sql = sql.add_columns(penalty.label('accuracy'))\
.order_by(sa.text('accuracy'))
if self.housenumbers:
hnr_list = '|'.join(self.housenumbers.values)
inner = sql.where(sa.or_(tsearch.c.address_rank < 30,
sa.func.RegexpWord(hnr_list, t.c.housenumber)))\
.subquery()
# Housenumbers from placex
thnr = conn.t.placex.alias('hnr')
pid_list = sa.func.ArrayAgg(thnr.c.place_id)
place_sql = sa.select(pid_list)\
.where(thnr.c.parent_place_id == inner.c.place_id)\
.where(sa.func.RegexpWord(hnr_list, thnr.c.housenumber))\
.where(thnr.c.linked_place_id == None)\
.where(thnr.c.indexed_status == 0)
if details.excluded:
place_sql = place_sql.where(thnr.c.place_id.not_in(sa.bindparam('excluded')))
if self.qualifiers:
place_sql = place_sql.where(self.qualifiers.sql_restrict(thnr))
numerals = [int(n) for n in self.housenumbers.values
if n.isdigit() and len(n) < 8]
interpol_sql: SaColumn
tiger_sql: SaColumn
if numerals and \
(not self.qualifiers or ('place', 'house') in self.qualifiers.values):
# Housenumbers from interpolations
interpol_sql = _make_interpolation_subquery(conn.t.osmline, inner,
numerals, details)
# Housenumbers from Tiger
tiger_sql = sa.case((inner.c.country_code == 'us',
_make_interpolation_subquery(conn.t.tiger, inner,
numerals, details)
), else_=None)
else:
interpol_sql = sa.null()
tiger_sql = sa.null()
unsort = sa.select(inner, place_sql.scalar_subquery().label('placex_hnr'),
interpol_sql.label('interpol_hnr'),
tiger_sql.label('tiger_hnr')).subquery('unsort')
sql = sa.select(unsort)\
.order_by(sa.case((unsort.c.placex_hnr != None, 1),
(unsort.c.interpol_hnr != None, 2),
(unsort.c.tiger_hnr != None, 3),
else_=4),
unsort.c.accuracy)
else:
sql = sql.where(t.c.linked_place_id == None)\
.where(t.c.indexed_status == 0)
if self.qualifiers:
sql = sql.where(self.qualifiers.sql_restrict(t))
if details.layers is not None:
sql = sql.where(_filter_by_layer(t, details.layers))
sql = sql.limit(LIMIT_PARAM)
results = nres.SearchResults()
for row in await conn.execute(sql, _details_to_bind_params(details)):
result = nres.create_from_placex_row(row, nres.SearchResult)
assert result
result.bbox = Bbox.from_wkb(row.bbox)
result.accuracy = row.accuracy
if self.housenumbers and row.rank_address < 30:
if row.placex_hnr:
subs = _get_placex_housenumbers(conn, row.placex_hnr, details)
elif row.interpol_hnr:
subs = _get_osmline(conn, row.interpol_hnr, numerals, details)
elif row.tiger_hnr:
subs = _get_tiger(conn, row.tiger_hnr, numerals, row.osm_id, details)
else:
subs = None
if subs is not None:
async for sub in subs:
assert sub.housenumber
sub.accuracy = result.accuracy
if not any(nr in self.housenumbers.values
for nr in sub.housenumber.split(';')):
sub.accuracy += 0.6
results.append(sub)
# Only add the street as a result, if it meets all other
# filter conditions.
if (not details.excluded or result.place_id not in details.excluded)\
and (not self.qualifiers or result.category in self.qualifiers.values)\
and result.rank_address >= details.min_rank:
result.accuracy += 1.0 # penalty for missing housenumber
results.append(result)
else:
results.append(result)
return results

View File

@@ -0,0 +1,16 @@
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2025 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Module implementing the actual database accesses for forward search.
"""
from .base import AbstractSearch as AbstractSearch
from .near_search import NearSearch as NearSearch
from .poi_search import PoiSearch as PoiSearch
from .country_search import CountrySearch as CountrySearch
from .postcode_search import PostcodeSearch as PostcodeSearch
from .place_search import PlaceSearch as PlaceSearch

View File

@@ -0,0 +1,144 @@
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2025 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Interface for classes implementing a database search.
"""
from typing import Callable, List
import abc
import sqlalchemy as sa
from ...typing import SaFromClause, SaSelect, SaColumn, SaExpression, SaLambdaSelect
from ...sql.sqlalchemy_types import Geometry
from ...connection import SearchConnection
from ...types import SearchDetails, DataLayer, GeometryFormat
from ...results import SearchResults
class AbstractSearch(abc.ABC):
""" Encapuslation of a single lookup in the database.
"""
SEARCH_PRIO: int = 2
def __init__(self, penalty: float) -> None:
self.penalty = penalty
@abc.abstractmethod
async def lookup(self, conn: SearchConnection, details: SearchDetails) -> SearchResults:
""" Find results for the search in the database.
"""
def select_placex(t: SaFromClause) -> SaSelect:
""" Return the basic select query for placex which returns all
fields necessary to fill a Nominatim result. 't' must either be
the placex table or a subquery returning appropriate fields from
a placex-related query.
"""
return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
t.c.class_, t.c.type,
t.c.address, t.c.extratags,
t.c.housenumber, t.c.postcode, t.c.country_code,
t.c.wikipedia,
t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
t.c.linked_place_id, t.c.admin_level,
t.c.centroid,
t.c.geometry.ST_Expand(0).label('bbox'))
def exclude_places(t: SaFromClause) -> Callable[[], SaExpression]:
""" Return an expression to exclude place IDs from the list in the
SearchDetails.
Requires the excluded IDs to be supplied as a bind parameter in SQL.
"""
return lambda: t.c.place_id.not_in(sa.bindparam('excluded'))
def filter_by_layer(table: SaFromClause, layers: DataLayer) -> SaColumn:
""" Return an expression that filters the given table by layers.
"""
orexpr: List[SaExpression] = []
if layers & DataLayer.ADDRESS and layers & DataLayer.POI:
orexpr.append(no_index(table.c.rank_address).between(1, 30))
elif layers & DataLayer.ADDRESS:
orexpr.append(no_index(table.c.rank_address).between(1, 29))
orexpr.append(sa.func.IsAddressPoint(table))
elif layers & DataLayer.POI:
orexpr.append(sa.and_(no_index(table.c.rank_address) == 30,
table.c.class_.not_in(('place', 'building'))))
if layers & DataLayer.MANMADE:
exclude = []
if not layers & DataLayer.RAILWAY:
exclude.append('railway')
if not layers & DataLayer.NATURAL:
exclude.extend(('natural', 'water', 'waterway'))
orexpr.append(sa.and_(table.c.class_.not_in(tuple(exclude)),
no_index(table.c.rank_address) == 0))
else:
include = []
if layers & DataLayer.RAILWAY:
include.append('railway')
if layers & DataLayer.NATURAL:
include.extend(('natural', 'water', 'waterway'))
orexpr.append(sa.and_(table.c.class_.in_(tuple(include)),
no_index(table.c.rank_address) == 0))
if len(orexpr) == 1:
return orexpr[0]
return sa.or_(*orexpr)
def no_index(expr: SaColumn) -> SaColumn:
""" Wrap the given expression, so that the query planner will
refrain from using the expression for index lookup.
"""
return sa.func.coalesce(sa.null(), expr)
def filter_by_area(sql: SaSelect, t: SaFromClause,
details: SearchDetails, avoid_index: bool = False) -> SaSelect:
""" Apply SQL statements for filtering by viewbox and near point,
if applicable.
"""
if details.near is not None and details.near_radius is not None:
if details.near_radius < 0.1 and not avoid_index:
sql = sql.where(
t.c.geometry.within_distance(sa.bindparam('near', type_=Geometry),
sa.bindparam('near_radius')))
else:
sql = sql.where(
t.c.geometry.ST_Distance(
sa.bindparam('near', type_=Geometry)) <= sa.bindparam('near_radius'))
if details.viewbox is not None and details.bounded_viewbox:
sql = sql.where(t.c.geometry.intersects(sa.bindparam('viewbox', type_=Geometry),
use_index=not avoid_index and
details.viewbox.area < 0.2))
return sql
def add_geometry_columns(sql: SaLambdaSelect, col: SaColumn, details: SearchDetails) -> SaSelect:
""" Add columns for requested geometry formats and return the new query.
"""
out = []
if details.geometry_simplification > 0.0:
col = sa.func.ST_SimplifyPreserveTopology(col, details.geometry_simplification)
if details.geometry_output & GeometryFormat.GEOJSON:
out.append(sa.func.ST_AsGeoJSON(col, 7).label('geometry_geojson'))
if details.geometry_output & GeometryFormat.TEXT:
out.append(sa.func.ST_AsText(col).label('geometry_text'))
if details.geometry_output & GeometryFormat.KML:
out.append(sa.func.ST_AsKML(col, 7).label('geometry_kml'))
if details.geometry_output & GeometryFormat.SVG:
out.append(sa.func.ST_AsSVG(col, 0, 7).label('geometry_svg'))
return sql.add_columns(*out)

View File

@@ -0,0 +1,119 @@
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2025 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Implementation of searches for a country.
"""
import sqlalchemy as sa
from . import base
from ..db_search_fields import SearchData
from ... import results as nres
from ...connection import SearchConnection
from ...types import SearchDetails, Bbox
class CountrySearch(base.AbstractSearch):
""" Search for a country name or country code.
"""
SEARCH_PRIO = 0
def __init__(self, sdata: SearchData) -> None:
super().__init__(sdata.penalty)
self.countries = sdata.countries
async def lookup(self, conn: SearchConnection,
details: SearchDetails) -> nres.SearchResults:
""" Find results for the search in the database.
"""
t = conn.t.placex
ccodes = self.countries.values
sql = base.select_placex(t)\
.add_columns(t.c.importance)\
.where(t.c.country_code.in_(ccodes))\
.where(t.c.rank_address == 4)
if details.geometry_output:
sql = base.add_geometry_columns(sql, t.c.geometry, details)
if details.excluded:
sql = sql.where(base.exclude_places(t))
sql = base.filter_by_area(sql, t, details)
bind_params = {
'excluded': details.excluded,
'viewbox': details.viewbox,
'near': details.near,
'near_radius': details.near_radius
}
results = nres.SearchResults()
for row in await conn.execute(sql, bind_params):
result = nres.create_from_placex_row(row, nres.SearchResult)
assert result
result.accuracy = self.penalty + self.countries.get_penalty(row.country_code, 5.0)
result.bbox = Bbox.from_wkb(row.bbox)
results.append(result)
if not results:
results = await self.lookup_in_country_table(conn, details)
if results:
details.min_rank = min(5, details.max_rank)
details.max_rank = min(25, details.max_rank)
return results
async def lookup_in_country_table(self, conn: SearchConnection,
details: SearchDetails) -> nres.SearchResults:
""" Look up the country in the fallback country tables.
"""
# Avoid the fallback search when this is a more search. Country results
# usually are in the first batch of results and it is not possible
# to exclude these fallbacks.
if details.excluded:
return nres.SearchResults()
t = conn.t.country_name
tgrid = conn.t.country_grid
sql = sa.select(tgrid.c.country_code,
tgrid.c.geometry.ST_Centroid().ST_Collect().ST_Centroid()
.label('centroid'),
tgrid.c.geometry.ST_Collect().ST_Expand(0).label('bbox'))\
.where(tgrid.c.country_code.in_(self.countries.values))\
.group_by(tgrid.c.country_code)
sql = base.filter_by_area(sql, tgrid, details, avoid_index=True)
sub = sql.subquery('grid')
sql = sa.select(t.c.country_code,
t.c.name.merge(t.c.derived_name).label('name'),
sub.c.centroid, sub.c.bbox)\
.join(sub, t.c.country_code == sub.c.country_code)
if details.geometry_output:
sql = base.add_geometry_columns(sql, sub.c.centroid, details)
bind_params = {
'viewbox': details.viewbox,
'near': details.near,
'near_radius': details.near_radius
}
results = nres.SearchResults()
for row in await conn.execute(sql, bind_params):
result = nres.create_from_country_row(row, nres.SearchResult)
assert result
result.bbox = Bbox.from_wkb(row.bbox)
result.accuracy = self.penalty + self.countries.get_penalty(row.country_code, 5.0)
results.append(result)
return results

View File

@@ -0,0 +1,136 @@
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2025 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Implementation of a category search around a place.
"""
from typing import List, Tuple
import sqlalchemy as sa
from . import base
from ...typing import SaBind
from ...types import SearchDetails, Bbox
from ...connection import SearchConnection
from ... import results as nres
from ..db_search_fields import WeightedCategories
LIMIT_PARAM: SaBind = sa.bindparam('limit')
MIN_RANK_PARAM: SaBind = sa.bindparam('min_rank')
MAX_RANK_PARAM: SaBind = sa.bindparam('max_rank')
COUNTRIES_PARAM: SaBind = sa.bindparam('countries')
class NearSearch(base.AbstractSearch):
""" Category search of a place type near the result of another search.
"""
def __init__(self, penalty: float, categories: WeightedCategories,
search: base.AbstractSearch) -> None:
super().__init__(penalty)
self.search = search
self.categories = categories
async def lookup(self, conn: SearchConnection,
details: SearchDetails) -> nres.SearchResults:
""" Find results for the search in the database.
"""
results = nres.SearchResults()
base = await self.search.lookup(conn, details)
if not base:
return results
base.sort(key=lambda r: (r.accuracy, r.rank_search))
max_accuracy = base[0].accuracy + 0.5
if base[0].rank_address == 0:
min_rank = 0
max_rank = 0
elif base[0].rank_address < 26:
min_rank = 1
max_rank = min(25, base[0].rank_address + 4)
else:
min_rank = 26
max_rank = 30
base = nres.SearchResults(r for r in base
if (r.source_table == nres.SourceTable.PLACEX
and r.accuracy <= max_accuracy
and r.bbox and r.bbox.area < 20
and r.rank_address >= min_rank
and r.rank_address <= max_rank))
if base:
baseids = [b.place_id for b in base[:5] if b.place_id]
for category, penalty in self.categories:
await self.lookup_category(results, conn, baseids, category, penalty, details)
if len(results) >= details.max_results:
break
return results
async def lookup_category(self, results: nres.SearchResults,
conn: SearchConnection, ids: List[int],
category: Tuple[str, str], penalty: float,
details: SearchDetails) -> None:
""" Find places of the given category near the list of
place ids and add the results to 'results'.
"""
table = await conn.get_class_table(*category)
tgeom = conn.t.placex.alias('pgeom')
if table is None:
# No classtype table available, do a simplified lookup in placex.
table = conn.t.placex
sql = sa.select(table.c.place_id,
sa.func.min(tgeom.c.centroid.ST_Distance(table.c.centroid))
.label('dist'))\
.join(tgeom, table.c.geometry.intersects(tgeom.c.centroid.ST_Expand(0.01)))\
.where(table.c.class_ == category[0])\
.where(table.c.type == category[1])
else:
# Use classtype table. We can afford to use a larger
# radius for the lookup.
sql = sa.select(table.c.place_id,
sa.func.min(tgeom.c.centroid.ST_Distance(table.c.centroid))
.label('dist'))\
.join(tgeom,
table.c.centroid.ST_CoveredBy(
sa.case((sa.and_(tgeom.c.rank_address > 9,
tgeom.c.geometry.is_area()),
tgeom.c.geometry),
else_=tgeom.c.centroid.ST_Expand(0.05))))
inner = sql.where(tgeom.c.place_id.in_(ids))\
.group_by(table.c.place_id).subquery()
t = conn.t.placex
sql = base.select_placex(t).add_columns((-inner.c.dist).label('importance'))\
.join(inner, inner.c.place_id == t.c.place_id)\
.order_by(inner.c.dist)
sql = sql.where(base.no_index(t.c.rank_address).between(MIN_RANK_PARAM, MAX_RANK_PARAM))
if details.countries:
sql = sql.where(t.c.country_code.in_(COUNTRIES_PARAM))
if details.excluded:
sql = sql.where(base.exclude_places(t))
if details.layers is not None:
sql = sql.where(base.filter_by_layer(t, details.layers))
sql = sql.limit(LIMIT_PARAM)
bind_params = {'limit': details.max_results,
'min_rank': details.min_rank,
'max_rank': details.max_rank,
'excluded': details.excluded,
'countries': details.countries}
for row in await conn.execute(sql, bind_params):
result = nres.create_from_placex_row(row, nres.SearchResult)
assert result
result.accuracy = self.penalty + penalty
result.bbox = Bbox.from_wkb(row.bbox)
results.append(result)

View File

@@ -0,0 +1,385 @@
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2025 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Implementation of search for a named place.
"""
from typing import cast, List, AsyncIterator
import sqlalchemy as sa
from . import base
from ...typing import SaBind, SaExpression, SaColumn, SaFromClause, SaScalarSelect
from ...types import SearchDetails, Bbox
from ...sql.sqlalchemy_types import Geometry
from ...connection import SearchConnection
from ... import results as nres
from ..db_search_fields import SearchData
LIMIT_PARAM: SaBind = sa.bindparam('limit')
MIN_RANK_PARAM: SaBind = sa.bindparam('min_rank')
MAX_RANK_PARAM: SaBind = sa.bindparam('max_rank')
VIEWBOX_PARAM: SaBind = sa.bindparam('viewbox', type_=Geometry)
VIEWBOX2_PARAM: SaBind = sa.bindparam('viewbox2', type_=Geometry)
NEAR_PARAM: SaBind = sa.bindparam('near', type_=Geometry)
NEAR_RADIUS_PARAM: SaBind = sa.bindparam('near_radius')
COUNTRIES_PARAM: SaBind = sa.bindparam('countries')
def _int_list_to_subquery(inp: List[int]) -> 'sa.Subquery':
""" Create a subselect that returns the given list of integers
as rows in the column 'nr'.
"""
vtab = sa.func.JsonArrayEach(sa.type_coerce(inp, sa.JSON))\
.table_valued(sa.column('value', type_=sa.JSON))
return sa.select(sa.cast(sa.cast(vtab.c.value, sa.Text), sa.Integer).label('nr')).subquery()
def _interpolated_position(table: SaFromClause, nr: SaColumn) -> SaColumn:
pos = sa.cast(nr - table.c.startnumber, sa.Float) / (table.c.endnumber - table.c.startnumber)
return sa.case(
(table.c.endnumber == table.c.startnumber, table.c.linegeo.ST_Centroid()),
else_=table.c.linegeo.ST_LineInterpolatePoint(pos)).label('centroid')
def _make_interpolation_subquery(table: SaFromClause, inner: SaFromClause,
numerals: List[int], details: SearchDetails) -> SaScalarSelect:
all_ids = sa.func.ArrayAgg(table.c.place_id)
sql = sa.select(all_ids).where(table.c.parent_place_id == inner.c.place_id)
if len(numerals) == 1:
sql = sql.where(sa.between(numerals[0], table.c.startnumber, table.c.endnumber))\
.where((numerals[0] - table.c.startnumber) % table.c.step == 0)
else:
sql = sql.where(sa.or_(
*(sa.and_(sa.between(n, table.c.startnumber, table.c.endnumber),
(n - table.c.startnumber) % table.c.step == 0)
for n in numerals)))
if details.excluded:
sql = sql.where(base.exclude_places(table))
return sql.scalar_subquery()
async def _get_placex_housenumbers(conn: SearchConnection,
place_ids: List[int],
details: SearchDetails) -> AsyncIterator[nres.SearchResult]:
t = conn.t.placex
sql = base.select_placex(t).add_columns(t.c.importance)\
.where(t.c.place_id.in_(place_ids))
if details.geometry_output:
sql = base.add_geometry_columns(sql, t.c.geometry, details)
for row in await conn.execute(sql):
result = nres.create_from_placex_row(row, nres.SearchResult)
assert result
result.bbox = Bbox.from_wkb(row.bbox)
yield result
async def _get_osmline(conn: SearchConnection, place_ids: List[int],
numerals: List[int],
details: SearchDetails) -> AsyncIterator[nres.SearchResult]:
t = conn.t.osmline
values = _int_list_to_subquery(numerals)
sql = sa.select(t.c.place_id, t.c.osm_id,
t.c.parent_place_id, t.c.address,
values.c.nr.label('housenumber'),
_interpolated_position(t, values.c.nr),
t.c.postcode, t.c.country_code)\
.where(t.c.place_id.in_(place_ids))\
.join(values, values.c.nr.between(t.c.startnumber, t.c.endnumber))
if details.geometry_output:
sub = sql.subquery()
sql = base.add_geometry_columns(sa.select(sub), sub.c.centroid, details)
for row in await conn.execute(sql):
result = nres.create_from_osmline_row(row, nres.SearchResult)
assert result
yield result
async def _get_tiger(conn: SearchConnection, place_ids: List[int],
numerals: List[int], osm_id: int,
details: SearchDetails) -> AsyncIterator[nres.SearchResult]:
t = conn.t.tiger
values = _int_list_to_subquery(numerals)
sql = sa.select(t.c.place_id, t.c.parent_place_id,
sa.literal('W').label('osm_type'),
sa.literal(osm_id).label('osm_id'),
values.c.nr.label('housenumber'),
_interpolated_position(t, values.c.nr),
t.c.postcode)\
.where(t.c.place_id.in_(place_ids))\
.join(values, values.c.nr.between(t.c.startnumber, t.c.endnumber))
if details.geometry_output:
sub = sql.subquery()
sql = base.add_geometry_columns(sa.select(sub), sub.c.centroid, details)
for row in await conn.execute(sql):
result = nres.create_from_tiger_row(row, nres.SearchResult)
assert result
yield result
class PlaceSearch(base.AbstractSearch):
""" Generic search for an address or named place.
"""
SEARCH_PRIO = 1
def __init__(self, extra_penalty: float, sdata: SearchData, expected_count: int) -> None:
super().__init__(sdata.penalty + extra_penalty)
self.countries = sdata.countries
self.postcodes = sdata.postcodes
self.housenumbers = sdata.housenumbers
self.qualifiers = sdata.qualifiers
self.lookups = sdata.lookups
self.rankings = sdata.rankings
self.expected_count = expected_count
def _inner_search_name_cte(self, conn: SearchConnection,
details: SearchDetails) -> 'sa.CTE':
""" Create a subquery that preselects the rows in the search_name
table.
"""
t = conn.t.search_name
penalty: SaExpression = sa.literal(self.penalty)
for ranking in self.rankings:
penalty += ranking.sql_penalty(t)
sql = sa.select(t.c.place_id, t.c.search_rank, t.c.address_rank,
t.c.country_code, t.c.centroid,
t.c.name_vector, t.c.nameaddress_vector,
sa.case((t.c.importance > 0, t.c.importance),
else_=0.40001-(sa.cast(t.c.search_rank, sa.Float())/75))
.label('importance'),
penalty.label('penalty'))
for lookup in self.lookups:
sql = sql.where(lookup.sql_condition(t))
if self.countries:
sql = sql.where(t.c.country_code.in_(self.countries.values))
if self.postcodes:
# if a postcode is given, don't search for state or country level objects
sql = sql.where(t.c.address_rank > 9)
if self.expected_count > 10000:
# Many results expected. Restrict by postcode.
tpc = conn.t.postcode
sql = sql.where(sa.select(tpc.c.postcode)
.where(tpc.c.postcode.in_(self.postcodes.values))
.where(t.c.centroid.within_distance(tpc.c.geometry, 0.4))
.exists())
if details.viewbox is not None:
if details.bounded_viewbox:
sql = sql.where(t.c.centroid
.intersects(VIEWBOX_PARAM,
use_index=details.viewbox.area < 0.2))
elif not self.postcodes and not self.housenumbers and self.expected_count >= 10000:
sql = sql.where(t.c.centroid
.intersects(VIEWBOX2_PARAM,
use_index=details.viewbox.area < 0.5))
if details.near is not None and details.near_radius is not None:
if details.near_radius < 0.1:
sql = sql.where(t.c.centroid.within_distance(NEAR_PARAM,
NEAR_RADIUS_PARAM))
else:
sql = sql.where(t.c.centroid
.ST_Distance(NEAR_PARAM) < NEAR_RADIUS_PARAM)
if self.housenumbers:
sql = sql.where(t.c.address_rank.between(16, 30))
else:
if details.excluded:
sql = sql.where(base.exclude_places(t))
if details.min_rank > 0:
sql = sql.where(sa.or_(t.c.address_rank >= MIN_RANK_PARAM,
t.c.search_rank >= MIN_RANK_PARAM))
if details.max_rank < 30:
sql = sql.where(sa.or_(t.c.address_rank <= MAX_RANK_PARAM,
t.c.search_rank <= MAX_RANK_PARAM))
inner = sql.limit(10000).order_by(sa.desc(sa.text('importance'))).subquery()
sql = sa.select(inner.c.place_id, inner.c.search_rank, inner.c.address_rank,
inner.c.country_code, inner.c.centroid, inner.c.importance,
inner.c.penalty)
# If the query is not an address search or has a geographic preference,
# preselect most important items to restrict the number of places
# that need to be looked up in placex.
if not self.housenumbers\
and (details.viewbox is None or details.bounded_viewbox)\
and (details.near is None or details.near_radius is not None)\
and not self.qualifiers:
sql = sql.add_columns(sa.func.first_value(inner.c.penalty - inner.c.importance)
.over(order_by=inner.c.penalty - inner.c.importance)
.label('min_penalty'))
inner = sql.subquery()
sql = sa.select(inner.c.place_id, inner.c.search_rank, inner.c.address_rank,
inner.c.country_code, inner.c.centroid, inner.c.importance,
inner.c.penalty)\
.where(inner.c.penalty - inner.c.importance < inner.c.min_penalty + 0.5)
return sql.cte('searches')
async def lookup(self, conn: SearchConnection,
details: SearchDetails) -> nres.SearchResults:
""" Find results for the search in the database.
"""
t = conn.t.placex
tsearch = self._inner_search_name_cte(conn, details)
sql = base.select_placex(t).join(tsearch, t.c.place_id == tsearch.c.place_id)
if details.geometry_output:
sql = base.add_geometry_columns(sql, t.c.geometry, details)
penalty: SaExpression = tsearch.c.penalty
if self.postcodes:
tpc = conn.t.postcode
pcs = self.postcodes.values
pc_near = sa.select(sa.func.min(tpc.c.geometry.ST_Distance(t.c.centroid)))\
.where(tpc.c.postcode.in_(pcs))\
.scalar_subquery()
penalty += sa.case((t.c.postcode.in_(pcs), 0.0),
else_=sa.func.coalesce(pc_near, cast(SaColumn, 2.0)))
if details.viewbox is not None and not details.bounded_viewbox:
penalty += sa.case((t.c.geometry.intersects(VIEWBOX_PARAM, use_index=False), 0.0),
(t.c.geometry.intersects(VIEWBOX2_PARAM, use_index=False), 0.5),
else_=1.0)
if details.near is not None:
sql = sql.add_columns((-tsearch.c.centroid.ST_Distance(NEAR_PARAM))
.label('importance'))
sql = sql.order_by(sa.desc(sa.text('importance')))
else:
sql = sql.order_by(penalty - tsearch.c.importance)
sql = sql.add_columns(tsearch.c.importance)
sql = sql.add_columns(penalty.label('accuracy'))\
.order_by(sa.text('accuracy'))
if self.housenumbers:
hnr_list = '|'.join(self.housenumbers.values)
inner = sql.where(sa.or_(tsearch.c.address_rank < 30,
sa.func.RegexpWord(hnr_list, t.c.housenumber)))\
.subquery()
# Housenumbers from placex
thnr = conn.t.placex.alias('hnr')
pid_list = sa.func.ArrayAgg(thnr.c.place_id)
place_sql = sa.select(pid_list)\
.where(thnr.c.parent_place_id == inner.c.place_id)\
.where(sa.func.RegexpWord(hnr_list, thnr.c.housenumber))\
.where(thnr.c.linked_place_id == None)\
.where(thnr.c.indexed_status == 0)
if details.excluded:
place_sql = place_sql.where(thnr.c.place_id.not_in(sa.bindparam('excluded')))
if self.qualifiers:
place_sql = place_sql.where(self.qualifiers.sql_restrict(thnr))
numerals = [int(n) for n in self.housenumbers.values
if n.isdigit() and len(n) < 8]
interpol_sql: SaColumn
tiger_sql: SaColumn
if numerals and \
(not self.qualifiers or ('place', 'house') in self.qualifiers.values):
# Housenumbers from interpolations
interpol_sql = _make_interpolation_subquery(conn.t.osmline, inner,
numerals, details)
# Housenumbers from Tiger
tiger_sql = sa.case((inner.c.country_code == 'us',
_make_interpolation_subquery(conn.t.tiger, inner,
numerals, details)
), else_=None)
else:
interpol_sql = sa.null()
tiger_sql = sa.null()
unsort = sa.select(inner, place_sql.scalar_subquery().label('placex_hnr'),
interpol_sql.label('interpol_hnr'),
tiger_sql.label('tiger_hnr')).subquery('unsort')
sql = sa.select(unsort)\
.order_by(sa.case((unsort.c.placex_hnr != None, 1),
(unsort.c.interpol_hnr != None, 2),
(unsort.c.tiger_hnr != None, 3),
else_=4),
unsort.c.accuracy)
else:
sql = sql.where(t.c.linked_place_id == None)\
.where(t.c.indexed_status == 0)
if self.qualifiers:
sql = sql.where(self.qualifiers.sql_restrict(t))
if details.layers is not None:
sql = sql.where(base.filter_by_layer(t, details.layers))
sql = sql.limit(LIMIT_PARAM)
bind_params = {
'limit': details.max_results,
'min_rank': details.min_rank,
'max_rank': details.max_rank,
'viewbox': details.viewbox,
'viewbox2': details.viewbox_x2,
'near': details.near,
'near_radius': details.near_radius,
'excluded': details.excluded,
'countries': details.countries
}
results = nres.SearchResults()
for row in await conn.execute(sql, bind_params):
result = nres.create_from_placex_row(row, nres.SearchResult)
assert result
result.bbox = Bbox.from_wkb(row.bbox)
result.accuracy = row.accuracy
if self.housenumbers and row.rank_address < 30:
if row.placex_hnr:
subs = _get_placex_housenumbers(conn, row.placex_hnr, details)
elif row.interpol_hnr:
subs = _get_osmline(conn, row.interpol_hnr, numerals, details)
elif row.tiger_hnr:
subs = _get_tiger(conn, row.tiger_hnr, numerals, row.osm_id, details)
else:
subs = None
if subs is not None:
async for sub in subs:
assert sub.housenumber
sub.accuracy = result.accuracy
if not any(nr in self.housenumbers.values
for nr in sub.housenumber.split(';')):
sub.accuracy += 0.6
results.append(sub)
# Only add the street as a result, if it meets all other
# filter conditions.
if (not details.excluded or result.place_id not in details.excluded)\
and (not self.qualifiers or result.category in self.qualifiers.values)\
and result.rank_address >= details.min_rank:
result.accuracy += 1.0 # penalty for missing housenumber
results.append(result)
else:
results.append(result)
return results

View File

@@ -0,0 +1,114 @@
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2025 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Implementation of category search.
"""
from typing import List
import sqlalchemy as sa
from . import base
from ..db_search_fields import SearchData
from ... import results as nres
from ...typing import SaBind, SaRow, SaSelect, SaLambdaSelect
from ...sql.sqlalchemy_types import Geometry
from ...connection import SearchConnection
from ...types import SearchDetails, Bbox
LIMIT_PARAM: SaBind = sa.bindparam('limit')
VIEWBOX_PARAM: SaBind = sa.bindparam('viewbox', type_=Geometry)
NEAR_PARAM: SaBind = sa.bindparam('near', type_=Geometry)
NEAR_RADIUS_PARAM: SaBind = sa.bindparam('near_radius')
class PoiSearch(base.AbstractSearch):
""" Category search in a geographic area.
"""
def __init__(self, sdata: SearchData) -> None:
super().__init__(sdata.penalty)
self.qualifiers = sdata.qualifiers
self.countries = sdata.countries
async def lookup(self, conn: SearchConnection,
details: SearchDetails) -> nres.SearchResults:
""" Find results for the search in the database.
"""
bind_params = {
'limit': details.max_results,
'viewbox': details.viewbox,
'near': details.near,
'near_radius': details.near_radius,
'excluded': details.excluded
}
t = conn.t.placex
rows: List[SaRow] = []
if details.near and details.near_radius is not None and details.near_radius < 0.2:
# simply search in placex table
def _base_query() -> SaSelect:
return base.select_placex(t) \
.add_columns((-t.c.centroid.ST_Distance(NEAR_PARAM))
.label('importance'))\
.where(t.c.linked_place_id == None) \
.where(t.c.geometry.within_distance(NEAR_PARAM, NEAR_RADIUS_PARAM)) \
.order_by(t.c.centroid.ST_Distance(NEAR_PARAM)) \
.limit(LIMIT_PARAM)
classtype = self.qualifiers.values
if len(classtype) == 1:
cclass, ctype = classtype[0]
sql: SaLambdaSelect = sa.lambda_stmt(
lambda: _base_query().where(t.c.class_ == cclass)
.where(t.c.type == ctype))
else:
sql = _base_query().where(sa.or_(*(sa.and_(t.c.class_ == cls, t.c.type == typ)
for cls, typ in classtype)))
if self.countries:
sql = sql.where(t.c.country_code.in_(self.countries.values))
if details.viewbox is not None and details.bounded_viewbox:
sql = sql.where(t.c.geometry.intersects(VIEWBOX_PARAM))
rows.extend(await conn.execute(sql, bind_params))
else:
# use the class type tables
for category in self.qualifiers.values:
table = await conn.get_class_table(*category)
if table is not None:
sql = base.select_placex(t)\
.add_columns(t.c.importance)\
.join(table, t.c.place_id == table.c.place_id)\
.where(t.c.class_ == category[0])\
.where(t.c.type == category[1])
if details.viewbox is not None and details.bounded_viewbox:
sql = sql.where(table.c.centroid.intersects(VIEWBOX_PARAM))
if details.near and details.near_radius is not None:
sql = sql.order_by(table.c.centroid.ST_Distance(NEAR_PARAM))\
.where(table.c.centroid.within_distance(NEAR_PARAM,
NEAR_RADIUS_PARAM))
if self.countries:
sql = sql.where(t.c.country_code.in_(self.countries.values))
sql = sql.limit(LIMIT_PARAM)
rows.extend(await conn.execute(sql, bind_params))
results = nres.SearchResults()
for row in rows:
result = nres.create_from_placex_row(row, nres.SearchResult)
assert result
result.accuracy = self.penalty + self.qualifiers.get_penalty((row.class_, row.type))
result.bbox = Bbox.from_wkb(row.bbox)
results.append(result)
return results

View File

@@ -0,0 +1,129 @@
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2025 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Implementation of search for a postcode.
"""
import sqlalchemy as sa
from . import base
from ...typing import SaBind, SaExpression
from ...sql.sqlalchemy_types import Geometry, IntArray
from ...connection import SearchConnection
from ...types import SearchDetails, Bbox
from ... import results as nres
from ..db_search_fields import SearchData
LIMIT_PARAM: SaBind = sa.bindparam('limit')
VIEWBOX_PARAM: SaBind = sa.bindparam('viewbox', type_=Geometry)
VIEWBOX2_PARAM: SaBind = sa.bindparam('viewbox2', type_=Geometry)
NEAR_PARAM: SaBind = sa.bindparam('near', type_=Geometry)
class PostcodeSearch(base.AbstractSearch):
""" Search for a postcode.
"""
def __init__(self, extra_penalty: float, sdata: SearchData) -> None:
super().__init__(sdata.penalty + extra_penalty)
self.countries = sdata.countries
self.postcodes = sdata.postcodes
self.lookups = sdata.lookups
self.rankings = sdata.rankings
async def lookup(self, conn: SearchConnection,
details: SearchDetails) -> nres.SearchResults:
""" Find results for the search in the database.
"""
t = conn.t.postcode
pcs = self.postcodes.values
sql = sa.select(t.c.place_id, t.c.parent_place_id,
t.c.rank_search, t.c.rank_address,
t.c.postcode, t.c.country_code,
t.c.geometry.label('centroid'))\
.where(t.c.postcode.in_(pcs))
if details.geometry_output:
sql = base.add_geometry_columns(sql, t.c.geometry, details)
penalty: SaExpression = sa.literal(self.penalty)
if details.viewbox is not None and not details.bounded_viewbox:
penalty += sa.case((t.c.geometry.intersects(VIEWBOX_PARAM), 0.0),
(t.c.geometry.intersects(VIEWBOX2_PARAM), 0.5),
else_=1.0)
if details.near is not None:
sql = sql.order_by(t.c.geometry.ST_Distance(NEAR_PARAM))
sql = base.filter_by_area(sql, t, details)
if self.countries:
sql = sql.where(t.c.country_code.in_(self.countries.values))
if details.excluded:
sql = sql.where(base.exclude_places(t))
if self.lookups:
assert len(self.lookups) == 1
tsearch = conn.t.search_name
sql = sql.where(tsearch.c.place_id == t.c.parent_place_id)\
.where((tsearch.c.name_vector + tsearch.c.nameaddress_vector)
.contains(sa.type_coerce(self.lookups[0].tokens,
IntArray)))
# Do NOT add rerank penalties based on the address terms.
# The standard rerank penalty only checks the address vector
# while terms may appear in name and address vector. This would
# lead to overly high penalties.
# We assume that a postcode is precise enough to not require
# additional full name matches.
penalty += sa.case(*((t.c.postcode == v, p) for v, p in self.postcodes),
else_=1.0)
sql = sql.add_columns(penalty.label('accuracy'))
sql = sql.order_by('accuracy').limit(LIMIT_PARAM)
bind_params = {
'limit': details.max_results,
'viewbox': details.viewbox,
'viewbox2': details.viewbox_x2,
'near': details.near,
'near_radius': details.near_radius,
'excluded': details.excluded
}
results = nres.SearchResults()
for row in await conn.execute(sql, bind_params):
p = conn.t.placex
placex_sql = base.select_placex(p)\
.add_columns(p.c.importance)\
.where(sa.text("""class = 'boundary'
AND type = 'postal_code'
AND osm_type = 'R'"""))\
.where(p.c.country_code == row.country_code)\
.where(p.c.postcode == row.postcode)\
.limit(1)
if details.geometry_output:
placex_sql = base.add_geometry_columns(placex_sql, p.c.geometry, details)
for prow in await conn.execute(placex_sql, bind_params):
result = nres.create_from_placex_row(prow, nres.SearchResult)
if result is not None:
result.bbox = Bbox.from_wkb(prow.bbox)
break
else:
result = nres.create_from_postcode_row(row, nres.SearchResult)
assert result
if result.place_id not in details.excluded:
result.accuracy = row.accuracy
results.append(result)
return results