adapt search frontend to new postcode table

This commit is contained in:
Sarah Hoffmann
2025-12-23 19:10:01 +01:00
parent f9cf320794
commit 6a67cfcddf
8 changed files with 59 additions and 78 deletions

View File

@@ -2,7 +2,7 @@
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2024 by the Nominatim developer community.
# Copyright (C) 2025 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Implementation of place lookup by ID (doing many places at once).
@@ -291,12 +291,30 @@ async def find_in_postcode(conn: SearchConnection, collector: Collector) -> bool
.table_valued(sa.column('value', type_=sa.JSON))
t = conn.t.postcode
sql = sa.select(pid_tab.c.value['i'].as_integer().label('_idx'),
t.c.place_id, t.c.parent_place_id,
t.c.rank_search, t.c.rank_address,
t.c.osm_id, t.c.place_id, t.c.parent_place_id,
t.c.rank_search,
t.c.indexed_date, t.c.postcode, t.c.country_code,
t.c.geometry.label('centroid'))\
t.c.centroid)\
.where(t.c.place_id == pid_tab.c.value['id'].as_string().cast(sa.BigInteger))
if await collector.add_rows_from_sql(conn, sql, t.c.geometry,
nres.create_from_postcode_row):
return True
osm_ids = [{'i': i, 'oi': p.osm_id}
for i, p in collector.enumerate_free_osm_ids() if p.osm_type == 'R']
if osm_ids:
pid_tab = sa.func.JsonArrayEach(sa.type_coerce(osm_ids, sa.JSON))\
.table_valued(sa.column('value', type_=sa.JSON))
t = conn.t.postcode
sql = sa.select(pid_tab.c.value['i'].as_integer().label('_idx'),
t.c.osm_id, t.c.place_id, t.c.parent_place_id,
t.c.rank_search,
t.c.indexed_date, t.c.postcode, t.c.country_code,
t.c.centroid)\
.where(t.c.osm_id == pid_tab.c.value['oi'].as_string().cast(sa.BigInteger))
return await collector.add_rows_from_sql(conn, sql, t.c.geometry,
nres.create_from_postcode_row)

View File

@@ -2,7 +2,7 @@
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2024 by the Nominatim developer community.
# Copyright (C) 2025 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Dataclasses for search results and helper functions to fill them.
@@ -407,11 +407,13 @@ def create_from_postcode_row(row: SaRow, class_type: Type[BaseResultT]) -> BaseR
"""
return class_type(source_table=SourceTable.POSTCODE,
place_id=row.place_id,
osm_object=None if row.osm_id is None else ('R', row.osm_id),
parent_place_id=row.parent_place_id,
category=('place', 'postcode'),
category=(('place', 'postcode') if row.osm_id is None
else ('boundary', 'postal_code')),
names={'ref': row.postcode},
rank_search=row.rank_search,
rank_address=row.rank_address,
rank_address=5,
country_code=row.country_code,
centroid=Point.from_wkb(row.centroid),
geometry=_filter_geometries(row))
@@ -494,17 +496,15 @@ def _get_address_lookup_id(result: BaseResultT) -> int:
async def _finalize_entry(conn: SearchConnection, result: BaseResultT) -> None:
assert result.address_rows is not None
if result.category[0] not in ('boundary', 'place')\
or result.category[1] not in ('postal_code', 'postcode'):
postcode = result.postcode
if not postcode and result.address:
postcode = result.address.get('postcode')
if postcode and ',' not in postcode and ';' not in postcode:
result.address_rows.append(AddressLine(
category=('place', 'postcode'),
names={'ref': postcode},
fromarea=False, isaddress=True, rank_address=5,
distance=0.0))
postcode = result.postcode or (result.address and result.address.get('postcode'))
if postcode and ',' not in postcode and ';' not in postcode:
result.address_rows.append(AddressLine(
category=('place', 'postcode'),
names={'ref': postcode},
fromarea=False, isaddress=True, rank_address=5,
distance=0.0))
if result.country_code:
async def _get_country_names() -> Optional[Dict[str, str]]:
t = conn.t.country_name
@@ -627,13 +627,6 @@ async def complete_address_details(conn: SearchConnection, results: List[BaseRes
if current_result.country_code is None and row.country_code:
current_result.country_code = row.country_code
if row.type in ('postcode', 'postal_code') and location_isaddress:
if not row.fromarea or \
(current_result.address and 'postcode' in current_result.address):
location_isaddress = False
else:
current_result.postcode = None
assert current_result.address_rows is not None
current_result.address_rows.append(_result_row_to_address_row(row, location_isaddress))
current_rank_address = row.rank_address

View File

@@ -175,7 +175,8 @@ class AddressSearch(base.AbstractSearch):
sql = sql.where(sa.select(tpc.c.postcode)
.where(tpc.c.postcode.in_(self.postcodes.values))
.where(tpc.c.country_code == t.c.country_code)
.where(t.c.centroid.within_distance(tpc.c.geometry, 0.4))
.where(t.c.centroid.intersects(tpc.c.geometry,
use_index=False))
.exists())
if details.viewbox is not None:
@@ -225,7 +226,7 @@ class AddressSearch(base.AbstractSearch):
tpc = conn.t.postcode
pcs = self.postcodes.values
pc_near = sa.select(sa.func.min(tpc.c.geometry.ST_Distance(t.c.centroid)
pc_near = sa.select(sa.func.min(tpc.c.centroid.ST_Distance(t.c.centroid)
* (tpc.c.rank_search - 19)))\
.where(tpc.c.postcode.in_(pcs))\
.where(tpc.c.country_code == t.c.country_code)\

View File

@@ -79,7 +79,8 @@ class PlaceSearch(base.AbstractSearch):
tpc = conn.t.postcode
sql = sql.where(sa.select(tpc.c.postcode)
.where(tpc.c.postcode.in_(self.postcodes.values))
.where(t.c.centroid.within_distance(tpc.c.geometry, 0.4))
.where(t.c.centroid.intersects(tpc.c.geometry,
use_index=False))
.exists())
if details.viewbox is not None:
@@ -157,7 +158,7 @@ class PlaceSearch(base.AbstractSearch):
tpc = conn.t.postcode
pcs = self.postcodes.values
pc_near = sa.select(sa.func.min(tpc.c.geometry.ST_Distance(t.c.centroid)))\
pc_near = sa.select(sa.func.min(tpc.c.centroid.ST_Distance(t.c.centroid)))\
.where(tpc.c.postcode.in_(pcs))\
.scalar_subquery()
penalty += sa.case((t.c.postcode.in_(pcs), 0.0),

View File

@@ -14,7 +14,7 @@ from . import base
from ...typing import SaBind, SaExpression
from ...sql.sqlalchemy_types import Geometry, IntArray
from ...connection import SearchConnection
from ...types import SearchDetails, Bbox
from ...types import SearchDetails
from ... import results as nres
from ..db_search_fields import SearchData
@@ -42,10 +42,9 @@ class PostcodeSearch(base.AbstractSearch):
t = conn.t.postcode
pcs = self.postcodes.values
sql = sa.select(t.c.place_id, t.c.parent_place_id,
t.c.rank_search, t.c.rank_address,
t.c.postcode, t.c.country_code,
t.c.geometry.label('centroid'))\
sql = sa.select(t.c.place_id, t.c.parent_place_id, t.c.osm_id,
t.c.rank_search, t.c.postcode, t.c.country_code,
t.c.centroid)\
.where(t.c.postcode.in_(pcs))
if details.geometry_output:
@@ -59,7 +58,7 @@ class PostcodeSearch(base.AbstractSearch):
else_=1.0)
if details.near is not None:
sql = sql.order_by(t.c.geometry.ST_Distance(NEAR_PARAM))
sql = sql.order_by(t.c.centroid.ST_Distance(NEAR_PARAM))
sql = base.filter_by_area(sql, t, details)
@@ -100,29 +99,9 @@ class PostcodeSearch(base.AbstractSearch):
results = nres.SearchResults()
for row in await conn.execute(sql, bind_params):
p = conn.t.placex
placex_sql = base.select_placex(p)\
.add_columns(p.c.importance)\
.where(sa.text("""class = 'boundary'
AND type = 'postal_code'
AND osm_type = 'R'"""))\
.where(p.c.country_code == row.country_code)\
.where(p.c.postcode == row.postcode)\
.limit(1)
result = nres.create_from_postcode_row(row, nres.SearchResult)
if details.geometry_output:
placex_sql = base.add_geometry_columns(placex_sql, p.c.geometry, details)
for prow in await conn.execute(placex_sql, bind_params):
result = nres.create_from_placex_row(prow, nres.SearchResult)
if result is not None:
result.bbox = Bbox.from_wkb(prow.bbox)
break
else:
result = nres.create_from_postcode_row(row, nres.SearchResult)
if result.place_id not in details.excluded:
result.accuracy = row.accuracy
results.append(result)
result.accuracy = row.accuracy
results.append(result)
return results

View File

@@ -2,7 +2,7 @@
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2024 by the Nominatim developer community.
# Copyright (C) 2025 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Custom types for SQLAlchemy.
@@ -178,6 +178,8 @@ class Geometry(types.UserDefinedType): # type: ignore[type-arg]
def bind_processor(self, dialect: 'sa.Dialect') -> Callable[[Any], str]:
def process(value: Any) -> str:
if value is None:
return 'null'
if isinstance(value, str):
return value

View File

@@ -2,7 +2,7 @@
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2024 by the Nominatim developer community.
# Copyright (C) 2025 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Hard-coded information about tag categories.
@@ -20,7 +20,9 @@ def get_label_tag(category: Tuple[str, str], extratags: Optional[Mapping[str, st
rank: int, country: Optional[str]) -> str:
""" Create a label tag for the given place that can be used as an XML name.
"""
if rank < 26 and extratags and 'place' in extratags:
if category in (('place', 'postcode'), ('boundary', 'postal_code')):
label = 'postcode'
elif rank < 26 and extratags and 'place' in extratags:
label = extratags['place']
elif rank < 26 and extratags and 'linked_place' in extratags:
label = extratags['linked_place']
@@ -28,8 +30,6 @@ def get_label_tag(category: Tuple[str, str], extratags: Optional[Mapping[str, st
label = ADMIN_LABELS.get((country or '', rank // 2))\
or ADMIN_LABELS.get(('', rank // 2))\
or 'Administrative'
elif category[1] == 'postal_code':
label = 'postcode'
elif rank < 26:
label = category[1] if category[1] != 'yes' else category[0]
elif rank < 28:

View File

@@ -2,7 +2,7 @@
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2024 by the Nominatim developer community.
# Copyright (C) 2025 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Helper function for parsing parameters and and outputting data
@@ -12,7 +12,7 @@ from typing import Tuple, Optional, Any, Dict, Iterable
from itertools import chain
import re
from ..results import SearchResult, SearchResults, SourceTable
from ..results import SearchResults, SourceTable
from ..types import SearchDetails, GeometryFormat
@@ -106,10 +106,6 @@ def deduplicate_results(results: SearchResults, max_results: int) -> SearchResul
classification_done = set()
deduped = SearchResults()
for result in results:
if result.source_table == SourceTable.POSTCODE:
assert result.names and 'ref' in result.names
if any(_is_postcode_relation_for(r, result.names['ref']) for r in results):
continue
if result.source_table == SourceTable.PLACEX:
classification = (result.osm_object[0] if result.osm_object else None,
result.category,
@@ -128,15 +124,6 @@ def deduplicate_results(results: SearchResults, max_results: int) -> SearchResul
return deduped
def _is_postcode_relation_for(result: SearchResult, postcode: str) -> bool:
return result.source_table == SourceTable.PLACEX \
and result.osm_object is not None \
and result.osm_object[0] == 'R' \
and result.category == ('boundary', 'postal_code') \
and result.names is not None \
and result.names.get('ref') == postcode
def _deg(axis: str) -> str:
return f"(?P<{axis}_deg>\\d+\\.\\d+)°?"