consistently use query module as qmod

This commit is contained in:
Sarah Hoffmann
2025-02-21 09:31:21 +01:00
parent b56edf3d0a
commit 9bf1428d81

View File

@@ -11,7 +11,7 @@ from typing import Optional, List, Tuple, Iterator, Dict
import heapq import heapq
from ..types import SearchDetails, DataLayer from ..types import SearchDetails, DataLayer
from .query import QueryStruct, Token, TokenType, TokenRange, BreakType from . import query as qmod
from .token_assignment import TokenAssignment from .token_assignment import TokenAssignment
from . import db_search_fields as dbf from . import db_search_fields as dbf
from . import db_searches as dbs from . import db_searches as dbs
@@ -51,7 +51,7 @@ class SearchBuilder:
""" Build the abstract search queries from token assignments. """ Build the abstract search queries from token assignments.
""" """
def __init__(self, query: QueryStruct, details: SearchDetails) -> None: def __init__(self, query: qmod.QueryStruct, details: SearchDetails) -> None:
self.query = query self.query = query
self.details = details self.details = details
@@ -97,7 +97,7 @@ class SearchBuilder:
builder = self.build_poi_search(sdata) builder = self.build_poi_search(sdata)
elif assignment.housenumber: elif assignment.housenumber:
hnr_tokens = self.query.get_tokens(assignment.housenumber, hnr_tokens = self.query.get_tokens(assignment.housenumber,
TokenType.HOUSENUMBER) qmod.TokenType.HOUSENUMBER)
builder = self.build_housenumber_search(sdata, hnr_tokens, assignment.address) builder = self.build_housenumber_search(sdata, hnr_tokens, assignment.address)
else: else:
builder = self.build_special_search(sdata, assignment.address, builder = self.build_special_search(sdata, assignment.address,
@@ -128,7 +128,7 @@ class SearchBuilder:
yield dbs.PoiSearch(sdata) yield dbs.PoiSearch(sdata)
def build_special_search(self, sdata: dbf.SearchData, def build_special_search(self, sdata: dbf.SearchData,
address: List[TokenRange], address: List[qmod.TokenRange],
is_category: bool) -> Iterator[dbs.AbstractSearch]: is_category: bool) -> Iterator[dbs.AbstractSearch]:
""" Build abstract search queries for searches that do not involve """ Build abstract search queries for searches that do not involve
a named place. a named place.
@@ -150,8 +150,8 @@ class SearchBuilder:
lookups.Restrict)] lookups.Restrict)]
yield dbs.PostcodeSearch(penalty, sdata) yield dbs.PostcodeSearch(penalty, sdata)
def build_housenumber_search(self, sdata: dbf.SearchData, hnrs: List[Token], def build_housenumber_search(self, sdata: dbf.SearchData, hnrs: List[qmod.Token],
address: List[TokenRange]) -> Iterator[dbs.AbstractSearch]: address: List[qmod.TokenRange]) -> Iterator[dbs.AbstractSearch]:
""" Build a simple address search for special entries where the """ Build a simple address search for special entries where the
housenumber is the main name token. housenumber is the main name token.
""" """
@@ -173,7 +173,7 @@ class SearchBuilder:
list(partials), lookups.LookupAll)) list(partials), lookups.LookupAll))
else: else:
addr_fulls = [t.token for t addr_fulls = [t.token for t
in self.query.get_tokens(address[0], TokenType.WORD)] in self.query.get_tokens(address[0], qmod.TokenType.WORD)]
if len(addr_fulls) > 5: if len(addr_fulls) > 5:
return return
sdata.lookups.append( sdata.lookups.append(
@@ -183,7 +183,7 @@ class SearchBuilder:
yield dbs.PlaceSearch(0.05, sdata, expected_count) yield dbs.PlaceSearch(0.05, sdata, expected_count)
def build_name_search(self, sdata: dbf.SearchData, def build_name_search(self, sdata: dbf.SearchData,
name: TokenRange, address: List[TokenRange], name: qmod.TokenRange, address: List[qmod.TokenRange],
is_category: bool) -> Iterator[dbs.AbstractSearch]: is_category: bool) -> Iterator[dbs.AbstractSearch]:
""" Build abstract search queries for simple name or address searches. """ Build abstract search queries for simple name or address searches.
""" """
@@ -196,7 +196,7 @@ class SearchBuilder:
sdata.lookups = lookup sdata.lookups = lookup
yield dbs.PlaceSearch(penalty + name_penalty, sdata, count) yield dbs.PlaceSearch(penalty + name_penalty, sdata, count)
def yield_lookups(self, name: TokenRange, address: List[TokenRange] def yield_lookups(self, name: qmod.TokenRange, address: List[qmod.TokenRange]
) -> Iterator[Tuple[float, int, List[dbf.FieldLookup]]]: ) -> Iterator[Tuple[float, int, List[dbf.FieldLookup]]]:
""" Yield all variants how the given name and address should best """ Yield all variants how the given name and address should best
be searched for. This takes into account how frequent the terms be searched for. This takes into account how frequent the terms
@@ -216,7 +216,7 @@ class SearchBuilder:
addr_count = min(t.addr_count for t in addr_partials) if addr_partials else 30000 addr_count = min(t.addr_count for t in addr_partials) if addr_partials else 30000
# Partial term to frequent. Try looking up by rare full names first. # Partial term to frequent. Try looking up by rare full names first.
name_fulls = self.query.get_tokens(name, TokenType.WORD) name_fulls = self.query.get_tokens(name, qmod.TokenType.WORD)
if name_fulls: if name_fulls:
fulls_count = sum(t.count for t in name_fulls) fulls_count = sum(t.count for t in name_fulls)
@@ -235,7 +235,7 @@ class SearchBuilder:
self.get_name_address_ranking(list(name_partials.keys()), addr_partials) self.get_name_address_ranking(list(name_partials.keys()), addr_partials)
def get_name_address_ranking(self, name_tokens: List[int], def get_name_address_ranking(self, name_tokens: List[int],
addr_partials: List[Token]) -> List[dbf.FieldLookup]: addr_partials: List[qmod.Token]) -> List[dbf.FieldLookup]:
""" Create a ranking expression looking up by name and address. """ Create a ranking expression looking up by name and address.
""" """
lookup = [dbf.FieldLookup('name_vector', name_tokens, lookups.LookupAll)] lookup = [dbf.FieldLookup('name_vector', name_tokens, lookups.LookupAll)]
@@ -257,7 +257,7 @@ class SearchBuilder:
return lookup return lookup
def get_full_name_ranking(self, name_fulls: List[Token], addr_partials: List[Token], def get_full_name_ranking(self, name_fulls: List[qmod.Token], addr_partials: List[qmod.Token],
use_lookup: bool) -> List[dbf.FieldLookup]: use_lookup: bool) -> List[dbf.FieldLookup]:
""" Create a ranking expression with full name terms and """ Create a ranking expression with full name terms and
additional address lookup. When 'use_lookup' is true, then additional address lookup. When 'use_lookup' is true, then
@@ -281,11 +281,11 @@ class SearchBuilder:
return dbf.lookup_by_any_name([t.token for t in name_fulls], return dbf.lookup_by_any_name([t.token for t in name_fulls],
addr_restrict_tokens, addr_lookup_tokens) addr_restrict_tokens, addr_lookup_tokens)
def get_name_ranking(self, trange: TokenRange, def get_name_ranking(self, trange: qmod.TokenRange,
db_field: str = 'name_vector') -> dbf.FieldRanking: db_field: str = 'name_vector') -> dbf.FieldRanking:
""" Create a ranking expression for a name term in the given range. """ Create a ranking expression for a name term in the given range.
""" """
name_fulls = self.query.get_tokens(trange, TokenType.WORD) name_fulls = self.query.get_tokens(trange, qmod.TokenType.WORD)
ranks = [dbf.RankedTokens(t.penalty, [t.token]) for t in name_fulls] ranks = [dbf.RankedTokens(t.penalty, [t.token]) for t in name_fulls]
ranks.sort(key=lambda r: r.penalty) ranks.sort(key=lambda r: r.penalty)
# Fallback, sum of penalty for partials # Fallback, sum of penalty for partials
@@ -293,7 +293,7 @@ class SearchBuilder:
default = sum(t.penalty for t in name_partials) + 0.2 default = sum(t.penalty for t in name_partials) + 0.2
return dbf.FieldRanking(db_field, default, ranks) return dbf.FieldRanking(db_field, default, ranks)
def get_addr_ranking(self, trange: TokenRange) -> dbf.FieldRanking: def get_addr_ranking(self, trange: qmod.TokenRange) -> dbf.FieldRanking:
""" Create a list of ranking expressions for an address term """ Create a list of ranking expressions for an address term
for the given ranges. for the given ranges.
""" """
@@ -304,10 +304,10 @@ class SearchBuilder:
while todo: while todo:
neglen, pos, rank = heapq.heappop(todo) neglen, pos, rank = heapq.heappop(todo)
for tlist in self.query.nodes[pos].starting: for tlist in self.query.nodes[pos].starting:
if tlist.ttype in (TokenType.PARTIAL, TokenType.WORD): if tlist.ttype in (qmod.TokenType.PARTIAL, qmod.TokenType.WORD):
if tlist.end < trange.end: if tlist.end < trange.end:
chgpenalty = PENALTY_WORDCHANGE[self.query.nodes[tlist.end].btype] chgpenalty = PENALTY_WORDCHANGE[self.query.nodes[tlist.end].btype]
if tlist.ttype == TokenType.PARTIAL: if tlist.ttype == qmod.TokenType.PARTIAL:
penalty = rank.penalty + chgpenalty \ penalty = rank.penalty + chgpenalty \
+ max(t.penalty for t in tlist.tokens) + max(t.penalty for t in tlist.tokens)
heapq.heappush(todo, (neglen - 1, tlist.end, heapq.heappush(todo, (neglen - 1, tlist.end,
@@ -317,7 +317,7 @@ class SearchBuilder:
heapq.heappush(todo, (neglen - 1, tlist.end, heapq.heappush(todo, (neglen - 1, tlist.end,
rank.with_token(t, chgpenalty))) rank.with_token(t, chgpenalty)))
elif tlist.end == trange.end: elif tlist.end == trange.end:
if tlist.ttype == TokenType.PARTIAL: if tlist.ttype == qmod.TokenType.PARTIAL:
ranks.append(dbf.RankedTokens(rank.penalty ranks.append(dbf.RankedTokens(rank.penalty
+ max(t.penalty for t in tlist.tokens), + max(t.penalty for t in tlist.tokens),
rank.tokens)) rank.tokens))
@@ -357,11 +357,11 @@ class SearchBuilder:
if assignment.housenumber: if assignment.housenumber:
sdata.set_strings('housenumbers', sdata.set_strings('housenumbers',
self.query.get_tokens(assignment.housenumber, self.query.get_tokens(assignment.housenumber,
TokenType.HOUSENUMBER)) qmod.TokenType.HOUSENUMBER))
if assignment.postcode: if assignment.postcode:
sdata.set_strings('postcodes', sdata.set_strings('postcodes',
self.query.get_tokens(assignment.postcode, self.query.get_tokens(assignment.postcode,
TokenType.POSTCODE)) qmod.TokenType.POSTCODE))
if assignment.qualifier: if assignment.qualifier:
tokens = self.get_qualifier_tokens(assignment.qualifier) tokens = self.get_qualifier_tokens(assignment.qualifier)
if not tokens: if not tokens:
@@ -386,23 +386,23 @@ class SearchBuilder:
return sdata return sdata
def get_country_tokens(self, trange: TokenRange) -> List[Token]: def get_country_tokens(self, trange: qmod.TokenRange) -> List[qmod.Token]:
""" Return the list of country tokens for the given range, """ Return the list of country tokens for the given range,
optionally filtered by the country list from the details optionally filtered by the country list from the details
parameters. parameters.
""" """
tokens = self.query.get_tokens(trange, TokenType.COUNTRY) tokens = self.query.get_tokens(trange, qmod.TokenType.COUNTRY)
if self.details.countries: if self.details.countries:
tokens = [t for t in tokens if t.lookup_word in self.details.countries] tokens = [t for t in tokens if t.lookup_word in self.details.countries]
return tokens return tokens
def get_qualifier_tokens(self, trange: TokenRange) -> List[Token]: def get_qualifier_tokens(self, trange: qmod.TokenRange) -> List[qmod.Token]:
""" Return the list of qualifier tokens for the given range, """ Return the list of qualifier tokens for the given range,
optionally filtered by the qualifier list from the details optionally filtered by the qualifier list from the details
parameters. parameters.
""" """
tokens = self.query.get_tokens(trange, TokenType.QUALIFIER) tokens = self.query.get_tokens(trange, qmod.TokenType.QUALIFIER)
if self.details.categories: if self.details.categories:
tokens = [t for t in tokens if t.get_category() in self.details.categories] tokens = [t for t in tokens if t.get_category() in self.details.categories]
@@ -415,7 +415,7 @@ class SearchBuilder:
""" """
if assignment.near_item: if assignment.near_item:
tokens: Dict[Tuple[str, str], float] = {} tokens: Dict[Tuple[str, str], float] = {}
for t in self.query.get_tokens(assignment.near_item, TokenType.NEAR_ITEM): for t in self.query.get_tokens(assignment.near_item, qmod.TokenType.NEAR_ITEM):
cat = t.get_category() cat = t.get_category()
# The category of a near search will be that of near_item. # The category of a near search will be that of near_item.
# Thus, if search is restricted to a category parameter, # Thus, if search is restricted to a category parameter,
@@ -429,11 +429,11 @@ class SearchBuilder:
PENALTY_WORDCHANGE = { PENALTY_WORDCHANGE = {
BreakType.START: 0.0, qmod.BreakType.START: 0.0,
BreakType.END: 0.0, qmod.BreakType.END: 0.0,
BreakType.PHRASE: 0.0, qmod.BreakType.PHRASE: 0.0,
BreakType.SOFT_PHRASE: 0.0, qmod.BreakType.SOFT_PHRASE: 0.0,
BreakType.WORD: 0.1, qmod.BreakType.WORD: 0.1,
BreakType.PART: 0.2, qmod.BreakType.PART: 0.2,
BreakType.TOKEN: 0.4 qmod.BreakType.TOKEN: 0.4
} }