Merge pull request #3828 from lonvia/code-cleanup

Code cleanup
This commit is contained in:
Sarah Hoffmann
2025-09-06 16:59:52 +02:00
committed by GitHub
12 changed files with 17 additions and 116 deletions

View File

@@ -19,7 +19,7 @@ from .logging import log
from . import types as ntyp
from . import results as nres
RowFunc = Callable[[Optional[SaRow], Type[nres.BaseResultT]], Optional[nres.BaseResultT]]
RowFunc = Callable[[SaRow, Type[nres.BaseResultT]], nres.BaseResultT]
GEOMETRY_TYPE_MAP = {
'POINT': 'ST_Point',
@@ -74,7 +74,6 @@ class LookupCollector:
for row in await conn.execute(sql):
result = row_func(row, nres.SearchResult)
assert result is not None
if hasattr(row, 'bbox'):
result.bbox = ntyp.Bbox.from_wkb(row.bbox)
@@ -116,7 +115,6 @@ class DetailedCollector:
for row in await conn.execute(sql):
self.result = row_func(row, nres.DetailedResult)
assert self.result is not None
# add missing details
if 'type' in self.result.geometry:
self.result.geometry['type'] = \

View File

@@ -310,15 +310,11 @@ def _filter_geometries(row: SaRow) -> Dict[str, str]:
if k.startswith('geometry_')}
def create_from_placex_row(row: Optional[SaRow],
class_type: Type[BaseResultT]) -> Optional[BaseResultT]:
def create_from_placex_row(row: SaRow, class_type: Type[BaseResultT]) -> BaseResultT:
""" Construct a new result and add the data from the result row
from the placex table. 'class_type' defines the type of result
to return. Returns None if the row is None.
"""
if row is None:
return None
return class_type(source_table=SourceTable.PLACEX,
place_id=row.place_id,
osm_object=(row.osm_type, row.osm_id),
@@ -340,8 +336,7 @@ def create_from_placex_row(row: Optional[SaRow],
geometry=_filter_geometries(row))
def create_from_osmline_row(row: Optional[SaRow],
class_type: Type[BaseResultT]) -> Optional[BaseResultT]:
def create_from_osmline_row(row: SaRow, class_type: Type[BaseResultT]) -> BaseResultT:
""" Construct a new result and add the data from the result row
from the address interpolation table osmline. 'class_type' defines
the type of result to return. Returns None if the row is None.
@@ -349,9 +344,6 @@ def create_from_osmline_row(row: Optional[SaRow],
If the row contains a housenumber, then the housenumber is filled out.
Otherwise the result contains the interpolation information in extratags.
"""
if row is None:
return None
hnr = getattr(row, 'housenumber', None)
res = class_type(source_table=SourceTable.OSMLINE,
@@ -375,10 +367,10 @@ def create_from_osmline_row(row: Optional[SaRow],
return res
def create_from_tiger_row(row: Optional[SaRow],
def create_from_tiger_row(row: SaRow,
class_type: Type[BaseResultT],
osm_type: Optional[str] = None,
osm_id: Optional[int] = None) -> Optional[BaseResultT]:
osm_id: Optional[int] = None) -> BaseResultT:
""" Construct a new result and add the data from the result row
from the Tiger data interpolation table. 'class_type' defines
the type of result to return. Returns None if the row is None.
@@ -386,9 +378,6 @@ def create_from_tiger_row(row: Optional[SaRow],
If the row contains a housenumber, then the housenumber is filled out.
Otherwise the result contains the interpolation information in extratags.
"""
if row is None:
return None
hnr = getattr(row, 'housenumber', None)
res = class_type(source_table=SourceTable.TIGER,
@@ -411,15 +400,11 @@ def create_from_tiger_row(row: Optional[SaRow],
return res
def create_from_postcode_row(row: Optional[SaRow],
class_type: Type[BaseResultT]) -> Optional[BaseResultT]:
def create_from_postcode_row(row: SaRow, class_type: Type[BaseResultT]) -> BaseResultT:
""" Construct a new result and add the data from the result row
from the postcode table. 'class_type' defines
the type of result to return. Returns None if the row is None.
"""
if row is None:
return None
return class_type(source_table=SourceTable.POSTCODE,
place_id=row.place_id,
parent_place_id=row.parent_place_id,
@@ -432,15 +417,11 @@ def create_from_postcode_row(row: Optional[SaRow],
geometry=_filter_geometries(row))
def create_from_country_row(row: Optional[SaRow],
class_type: Type[BaseResultT]) -> Optional[BaseResultT]:
def create_from_country_row(row: SaRow, class_type: Type[BaseResultT]) -> BaseResultT:
""" Construct a new result and add the data from the result row
from the fallback country tables. 'class_type' defines
the type of result to return. Returns None if the row is None.
"""
if row is None:
return None
return class_type(source_table=SourceTable.COUNTRY,
category=('place', 'country'),
centroid=Point.from_wkb(row.centroid),

View File

@@ -21,7 +21,7 @@ from .logging import log
from .types import AnyPoint, DataLayer, ReverseDetails, GeometryFormat, Bbox
RowFunc = Callable[[Optional[SaRow], Type[nres.ReverseResult]], Optional[nres.ReverseResult]]
RowFunc = Callable[[SaRow, Type[nres.ReverseResult]], nres.ReverseResult]
WKT_PARAM: SaBind = sa.bindparam('wkt', type_=Geometry)
MAX_RANK_PARAM: SaBind = sa.bindparam('max_rank')
@@ -596,12 +596,13 @@ class ReverseGeocoder:
if row is None and self.layer_enabled(DataLayer.ADDRESS):
row, row_func = await self.lookup_country(ccodes)
if row is None:
return None
result = row_func(row, nres.ReverseResult)
if result is not None:
assert row is not None
result.distance = getattr(row, 'distance', 0)
if hasattr(row, 'bbox'):
result.bbox = Bbox.from_wkb(row.bbox)
await nres.add_result_details(self.conn, [result], self.params)
result.distance = getattr(row, 'distance', 0)
if hasattr(row, 'bbox'):
result.bbox = Bbox.from_wkb(row.bbox)
await nres.add_result_details(self.conn, [result], self.params)
return result

View File

@@ -295,46 +295,6 @@ class SearchBuilder:
yield penalty, exp_count, lookup
def get_name_address_ranking(self, name_tokens: List[int],
addr_partials: List[qmod.Token]) -> List[dbf.FieldLookup]:
""" Create a ranking expression looking up by name and address.
"""
lookup = [dbf.FieldLookup('name_vector', name_tokens, lookups.LookupAll)]
addr_restrict_tokens = []
addr_lookup_tokens = []
for t in addr_partials:
if t.addr_count > 20000:
addr_restrict_tokens.append(t.token)
else:
addr_lookup_tokens.append(t.token)
if addr_restrict_tokens:
lookup.append(dbf.FieldLookup('nameaddress_vector',
addr_restrict_tokens, lookups.Restrict))
if addr_lookup_tokens:
lookup.append(dbf.FieldLookup('nameaddress_vector',
addr_lookup_tokens, lookups.LookupAll))
return lookup
def get_full_name_ranking(self, name_fulls: List[qmod.Token], addr_partials: List[qmod.Token],
use_lookup: bool) -> List[dbf.FieldLookup]:
""" Create a ranking expression with full name terms and
additional address lookup. When 'use_lookup' is true, then
address lookups will use the index, when the occurrences are not
too many.
"""
if use_lookup:
addr_restrict_tokens = []
addr_lookup_tokens = [t.token for t in addr_partials]
else:
addr_restrict_tokens = [t.token for t in addr_partials]
addr_lookup_tokens = []
return dbf.lookup_by_any_name([t.token for t in name_fulls],
addr_restrict_tokens, addr_lookup_tokens)
def get_name_ranking(self, trange: qmod.TokenRange,
db_field: str = 'name_vector') -> dbf.FieldRanking:
""" Create a ranking expression for a name term in the given range.

View File

@@ -271,17 +271,6 @@ class SearchData:
self.penalty += ranking.default
def lookup_by_names(name_tokens: List[int], addr_tokens: List[int]) -> List[FieldLookup]:
""" Create a lookup list where name tokens are looked up via index
and potential address tokens are used to restrict the search further.
"""
lookup = [FieldLookup('name_vector', name_tokens, lookups.LookupAll)]
if addr_tokens:
lookup.append(FieldLookup('nameaddress_vector', addr_tokens, lookups.Restrict))
return lookup
def lookup_by_any_name(name_tokens: List[int], addr_restrict_tokens: List[int],
addr_lookup_tokens: List[int]) -> List[FieldLookup]:
""" Create a lookup list where name tokens are looked up via index
@@ -295,11 +284,3 @@ def lookup_by_any_name(name_tokens: List[int], addr_restrict_tokens: List[int],
lookup.append(FieldLookup('nameaddress_vector', addr_lookup_tokens, lookups.LookupAll))
return lookup
def lookup_by_addr(name_tokens: List[int], addr_tokens: List[int]) -> List[FieldLookup]:
""" Create a lookup list where address tokens are looked up via index
and the name tokens are only used to restrict the search further.
"""
return [FieldLookup('name_vector', name_tokens, lookups.Restrict),
FieldLookup('nameaddress_vector', addr_tokens, lookups.LookupAll)]

View File

@@ -78,7 +78,6 @@ async def _get_placex_housenumbers(conn: SearchConnection,
for row in await conn.execute(sql):
result = nres.create_from_placex_row(row, nres.SearchResult)
assert result
result.bbox = Bbox.from_wkb(row.bbox)
yield result
@@ -102,9 +101,7 @@ async def _get_osmline(conn: SearchConnection, place_ids: List[int],
sql = base.add_geometry_columns(sa.select(sub), sub.c.centroid, details)
for row in await conn.execute(sql):
result = nres.create_from_osmline_row(row, nres.SearchResult)
assert result
yield result
yield nres.create_from_osmline_row(row, nres.SearchResult)
async def _get_tiger(conn: SearchConnection, place_ids: List[int],
@@ -126,9 +123,7 @@ async def _get_tiger(conn: SearchConnection, place_ids: List[int],
sql = base.add_geometry_columns(sa.select(sub), sub.c.centroid, details)
for row in await conn.execute(sql):
result = nres.create_from_tiger_row(row, nres.SearchResult)
assert result
yield result
yield nres.create_from_tiger_row(row, nres.SearchResult)
class AddressSearch(base.AbstractSearch):
@@ -325,7 +320,6 @@ class AddressSearch(base.AbstractSearch):
results = nres.SearchResults()
for row in await conn.execute(sql, bind_params):
result = nres.create_from_placex_row(row, nres.SearchResult)
assert result
result.bbox = Bbox.from_wkb(row.bbox)
result.accuracy = row.accuracy
if row.rank_address < 30:

View File

@@ -56,7 +56,6 @@ class CountrySearch(base.AbstractSearch):
results = nres.SearchResults()
for row in await conn.execute(sql, bind_params):
result = nres.create_from_placex_row(row, nres.SearchResult)
assert result
result.accuracy = self.penalty + self.countries.get_penalty(row.country_code, 5.0)
result.bbox = Bbox.from_wkb(row.bbox)
results.append(result)
@@ -111,7 +110,6 @@ class CountrySearch(base.AbstractSearch):
results = nres.SearchResults()
for row in await conn.execute(sql, bind_params):
result = nres.create_from_country_row(row, nres.SearchResult)
assert result
result.bbox = Bbox.from_wkb(row.bbox)
result.accuracy = self.penalty + self.countries.get_penalty(row.country_code, 5.0)
results.append(result)

View File

@@ -130,7 +130,6 @@ class NearSearch(base.AbstractSearch):
'countries': details.countries}
for row in await conn.execute(sql, bind_params):
result = nres.create_from_placex_row(row, nres.SearchResult)
assert result
result.accuracy = self.penalty + penalty
result.bbox = Bbox.from_wkb(row.bbox)
results.append(result)

View File

@@ -202,7 +202,6 @@ class PlaceSearch(base.AbstractSearch):
results = nres.SearchResults()
for row in await conn.execute(sql, bind_params):
result = nres.create_from_placex_row(row, nres.SearchResult)
assert result
result.bbox = Bbox.from_wkb(row.bbox)
result.accuracy = row.accuracy
results.append(result)

View File

@@ -106,7 +106,6 @@ class PoiSearch(base.AbstractSearch):
results = nres.SearchResults()
for row in rows:
result = nres.create_from_placex_row(row, nres.SearchResult)
assert result
result.accuracy = self.penalty + self.qualifiers.get_penalty((row.class_, row.type))
result.bbox = Bbox.from_wkb(row.bbox)
results.append(result)

View File

@@ -121,7 +121,6 @@ class PostcodeSearch(base.AbstractSearch):
else:
result = nres.create_from_postcode_row(row, nres.SearchResult)
assert result
if result.place_id not in details.excluded:
result.accuracy = row.accuracy
results.append(result)

View File

@@ -48,14 +48,6 @@ def test_detailed_result_custom_importance():
assert res.calculated_importance() == 0.4563
@pytest.mark.parametrize('func', (nresults.create_from_placex_row,
nresults.create_from_osmline_row,
nresults.create_from_tiger_row,
nresults.create_from_postcode_row))
def test_create_row_none(func):
assert func(None, DetailedResult) is None
@pytest.mark.parametrize('func', (nresults.create_from_osmline_row,
nresults.create_from_tiger_row))
def test_create_row_with_housenumber(func):