Merge pull request #3658 from lonvia/minor-query-parsing-optimisations

Minor query parsing optimisations
This commit is contained in:
Sarah Hoffmann
2025-02-24 10:16:47 +01:00
committed by GitHub
13 changed files with 486 additions and 471 deletions

View File

@@ -60,13 +60,19 @@ The order of phrases matters to Nominatim when doing further processing.
Thus, while you may split or join phrases, you should not reorder them
unless you really know what you are doing.
Phrase types (`nominatim_api.search.PhraseType`) can further help narrowing
down how the tokens in the phrase are interpreted. The following phrase types
are known:
Phrase types can further help narrowing down how the tokens in the phrase
are interpreted. The following phrase types are known:
::: nominatim_api.search.PhraseType
options:
heading_level: 6
| Name | Description |
|----------------|-------------|
| PHRASE_ANY | No specific designation (i.e. source is free-form query) |
| PHRASE_AMENITY | Contains name or type of a POI |
| PHRASE_STREET | Contains a street name optionally with a housenumber |
| PHRASE_CITY | Contains the postal city |
| PHRASE_COUNTY | Contains the equivalent of a county |
| PHRASE_STATE | Contains a state or province |
| PHRASE_POSTCODE| Contains a postal code |
| PHRASE_COUNTRY | Contains the country name or code |
## Custom sanitizer modules

View File

@@ -26,7 +26,7 @@ from .connection import SearchConnection
from .status import get_status, StatusResult
from .lookup import get_places, get_detailed_place
from .reverse import ReverseGeocoder
from .search import ForwardGeocoder, Phrase, PhraseType, make_query_analyzer
from . import search as nsearch
from . import types as ntyp
from .results import DetailedResult, ReverseResult, SearchResults
@@ -207,7 +207,7 @@ class NominatimAPIAsync:
async with self.begin() as conn:
conn.set_query_timeout(self.query_timeout)
if details.keywords:
await make_query_analyzer(conn)
await nsearch.make_query_analyzer(conn)
return await get_detailed_place(conn, place, details)
async def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
@@ -219,7 +219,7 @@ class NominatimAPIAsync:
async with self.begin() as conn:
conn.set_query_timeout(self.query_timeout)
if details.keywords:
await make_query_analyzer(conn)
await nsearch.make_query_analyzer(conn)
return await get_places(conn, places, details)
async def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
@@ -237,7 +237,7 @@ class NominatimAPIAsync:
async with self.begin() as conn:
conn.set_query_timeout(self.query_timeout)
if details.keywords:
await make_query_analyzer(conn)
await nsearch.make_query_analyzer(conn)
geocoder = ReverseGeocoder(conn, details,
self.reverse_restrict_to_country_area)
return await geocoder.lookup(coord)
@@ -251,10 +251,10 @@ class NominatimAPIAsync:
async with self.begin() as conn:
conn.set_query_timeout(self.query_timeout)
geocoder = ForwardGeocoder(conn, ntyp.SearchDetails.from_kwargs(params),
self.config.get_int('REQUEST_TIMEOUT')
if self.config.REQUEST_TIMEOUT else None)
phrases = [Phrase(PhraseType.NONE, p.strip()) for p in query.split(',')]
geocoder = nsearch.ForwardGeocoder(conn, ntyp.SearchDetails.from_kwargs(params),
self.config.get_int('REQUEST_TIMEOUT')
if self.config.REQUEST_TIMEOUT else None)
phrases = [nsearch.Phrase(nsearch.PHRASE_ANY, p.strip()) for p in query.split(',')]
return await geocoder.lookup(phrases)
async def search_address(self, amenity: Optional[str] = None,
@@ -271,22 +271,22 @@ class NominatimAPIAsync:
conn.set_query_timeout(self.query_timeout)
details = ntyp.SearchDetails.from_kwargs(params)
phrases: List[Phrase] = []
phrases: List[nsearch.Phrase] = []
if amenity:
phrases.append(Phrase(PhraseType.AMENITY, amenity))
phrases.append(nsearch.Phrase(nsearch.PHRASE_AMENITY, amenity))
if street:
phrases.append(Phrase(PhraseType.STREET, street))
phrases.append(nsearch.Phrase(nsearch.PHRASE_STREET, street))
if city:
phrases.append(Phrase(PhraseType.CITY, city))
phrases.append(nsearch.Phrase(nsearch.PHRASE_CITY, city))
if county:
phrases.append(Phrase(PhraseType.COUNTY, county))
phrases.append(nsearch.Phrase(nsearch.PHRASE_COUNTY, county))
if state:
phrases.append(Phrase(PhraseType.STATE, state))
phrases.append(nsearch.Phrase(nsearch.PHRASE_STATE, state))
if postalcode:
phrases.append(Phrase(PhraseType.POSTCODE, postalcode))
phrases.append(nsearch.Phrase(nsearch.PHRASE_POSTCODE, postalcode))
if country:
phrases.append(Phrase(PhraseType.COUNTRY, country))
phrases.append(nsearch.Phrase(nsearch.PHRASE_COUNTRY, country))
if not phrases:
raise UsageError('Nothing to search for.')
@@ -309,9 +309,9 @@ class NominatimAPIAsync:
if amenity:
details.layers |= ntyp.DataLayer.POI
geocoder = ForwardGeocoder(conn, details,
self.config.get_int('REQUEST_TIMEOUT')
if self.config.REQUEST_TIMEOUT else None)
geocoder = nsearch.ForwardGeocoder(conn, details,
self.config.get_int('REQUEST_TIMEOUT')
if self.config.REQUEST_TIMEOUT else None)
return await geocoder.lookup(phrases)
async def search_category(self, categories: List[Tuple[str, str]],
@@ -328,15 +328,15 @@ class NominatimAPIAsync:
async with self.begin() as conn:
conn.set_query_timeout(self.query_timeout)
if near_query:
phrases = [Phrase(PhraseType.NONE, p) for p in near_query.split(',')]
phrases = [nsearch.Phrase(nsearch.PHRASE_ANY, p) for p in near_query.split(',')]
else:
phrases = []
if details.keywords:
await make_query_analyzer(conn)
await nsearch.make_query_analyzer(conn)
geocoder = ForwardGeocoder(conn, details,
self.config.get_int('REQUEST_TIMEOUT')
if self.config.REQUEST_TIMEOUT else None)
geocoder = nsearch.ForwardGeocoder(conn, details,
self.config.get_int('REQUEST_TIMEOUT')
if self.config.REQUEST_TIMEOUT else None)
return await geocoder.lookup_pois(categories, phrases)

View File

@@ -9,5 +9,12 @@ Module for forward search.
"""
from .geocoder import (ForwardGeocoder as ForwardGeocoder)
from .query import (Phrase as Phrase,
PhraseType as PhraseType)
PHRASE_ANY as PHRASE_ANY,
PHRASE_AMENITY as PHRASE_AMENITY,
PHRASE_STREET as PHRASE_STREET,
PHRASE_CITY as PHRASE_CITY,
PHRASE_COUNTY as PHRASE_COUNTY,
PHRASE_STATE as PHRASE_STATE,
PHRASE_POSTCODE as PHRASE_POSTCODE,
PHRASE_COUNTRY as PHRASE_COUNTRY)
from .query_analyzer_factory import (make_query_analyzer as make_query_analyzer)

View File

@@ -11,7 +11,7 @@ from typing import Optional, List, Tuple, Iterator, Dict
import heapq
from ..types import SearchDetails, DataLayer
from .query import QueryStruct, Token, TokenType, TokenRange, BreakType
from . import query as qmod
from .token_assignment import TokenAssignment
from . import db_search_fields as dbf
from . import db_searches as dbs
@@ -51,7 +51,7 @@ class SearchBuilder:
""" Build the abstract search queries from token assignments.
"""
def __init__(self, query: QueryStruct, details: SearchDetails) -> None:
def __init__(self, query: qmod.QueryStruct, details: SearchDetails) -> None:
self.query = query
self.details = details
@@ -97,7 +97,7 @@ class SearchBuilder:
builder = self.build_poi_search(sdata)
elif assignment.housenumber:
hnr_tokens = self.query.get_tokens(assignment.housenumber,
TokenType.HOUSENUMBER)
qmod.TOKEN_HOUSENUMBER)
builder = self.build_housenumber_search(sdata, hnr_tokens, assignment.address)
else:
builder = self.build_special_search(sdata, assignment.address,
@@ -128,7 +128,7 @@ class SearchBuilder:
yield dbs.PoiSearch(sdata)
def build_special_search(self, sdata: dbf.SearchData,
address: List[TokenRange],
address: List[qmod.TokenRange],
is_category: bool) -> Iterator[dbs.AbstractSearch]:
""" Build abstract search queries for searches that do not involve
a named place.
@@ -150,8 +150,8 @@ class SearchBuilder:
lookups.Restrict)]
yield dbs.PostcodeSearch(penalty, sdata)
def build_housenumber_search(self, sdata: dbf.SearchData, hnrs: List[Token],
address: List[TokenRange]) -> Iterator[dbs.AbstractSearch]:
def build_housenumber_search(self, sdata: dbf.SearchData, hnrs: List[qmod.Token],
address: List[qmod.TokenRange]) -> Iterator[dbs.AbstractSearch]:
""" Build a simple address search for special entries where the
housenumber is the main name token.
"""
@@ -173,7 +173,7 @@ class SearchBuilder:
list(partials), lookups.LookupAll))
else:
addr_fulls = [t.token for t
in self.query.get_tokens(address[0], TokenType.WORD)]
in self.query.get_tokens(address[0], qmod.TOKEN_WORD)]
if len(addr_fulls) > 5:
return
sdata.lookups.append(
@@ -183,7 +183,7 @@ class SearchBuilder:
yield dbs.PlaceSearch(0.05, sdata, expected_count)
def build_name_search(self, sdata: dbf.SearchData,
name: TokenRange, address: List[TokenRange],
name: qmod.TokenRange, address: List[qmod.TokenRange],
is_category: bool) -> Iterator[dbs.AbstractSearch]:
""" Build abstract search queries for simple name or address searches.
"""
@@ -196,7 +196,7 @@ class SearchBuilder:
sdata.lookups = lookup
yield dbs.PlaceSearch(penalty + name_penalty, sdata, count)
def yield_lookups(self, name: TokenRange, address: List[TokenRange]
def yield_lookups(self, name: qmod.TokenRange, address: List[qmod.TokenRange]
) -> Iterator[Tuple[float, int, List[dbf.FieldLookup]]]:
""" Yield all variants how the given name and address should best
be searched for. This takes into account how frequent the terms
@@ -216,7 +216,7 @@ class SearchBuilder:
addr_count = min(t.addr_count for t in addr_partials) if addr_partials else 30000
# Partial term to frequent. Try looking up by rare full names first.
name_fulls = self.query.get_tokens(name, TokenType.WORD)
name_fulls = self.query.get_tokens(name, qmod.TOKEN_WORD)
if name_fulls:
fulls_count = sum(t.count for t in name_fulls)
@@ -235,7 +235,7 @@ class SearchBuilder:
self.get_name_address_ranking(list(name_partials.keys()), addr_partials)
def get_name_address_ranking(self, name_tokens: List[int],
addr_partials: List[Token]) -> List[dbf.FieldLookup]:
addr_partials: List[qmod.Token]) -> List[dbf.FieldLookup]:
""" Create a ranking expression looking up by name and address.
"""
lookup = [dbf.FieldLookup('name_vector', name_tokens, lookups.LookupAll)]
@@ -257,7 +257,7 @@ class SearchBuilder:
return lookup
def get_full_name_ranking(self, name_fulls: List[Token], addr_partials: List[Token],
def get_full_name_ranking(self, name_fulls: List[qmod.Token], addr_partials: List[qmod.Token],
use_lookup: bool) -> List[dbf.FieldLookup]:
""" Create a ranking expression with full name terms and
additional address lookup. When 'use_lookup' is true, then
@@ -281,11 +281,11 @@ class SearchBuilder:
return dbf.lookup_by_any_name([t.token for t in name_fulls],
addr_restrict_tokens, addr_lookup_tokens)
def get_name_ranking(self, trange: TokenRange,
def get_name_ranking(self, trange: qmod.TokenRange,
db_field: str = 'name_vector') -> dbf.FieldRanking:
""" Create a ranking expression for a name term in the given range.
"""
name_fulls = self.query.get_tokens(trange, TokenType.WORD)
name_fulls = self.query.get_tokens(trange, qmod.TOKEN_WORD)
ranks = [dbf.RankedTokens(t.penalty, [t.token]) for t in name_fulls]
ranks.sort(key=lambda r: r.penalty)
# Fallback, sum of penalty for partials
@@ -293,7 +293,7 @@ class SearchBuilder:
default = sum(t.penalty for t in name_partials) + 0.2
return dbf.FieldRanking(db_field, default, ranks)
def get_addr_ranking(self, trange: TokenRange) -> dbf.FieldRanking:
def get_addr_ranking(self, trange: qmod.TokenRange) -> dbf.FieldRanking:
""" Create a list of ranking expressions for an address term
for the given ranges.
"""
@@ -304,10 +304,10 @@ class SearchBuilder:
while todo:
neglen, pos, rank = heapq.heappop(todo)
for tlist in self.query.nodes[pos].starting:
if tlist.ttype in (TokenType.PARTIAL, TokenType.WORD):
if tlist.ttype in (qmod.TOKEN_PARTIAL, qmod.TOKEN_WORD):
if tlist.end < trange.end:
chgpenalty = PENALTY_WORDCHANGE[self.query.nodes[tlist.end].btype]
if tlist.ttype == TokenType.PARTIAL:
if tlist.ttype == qmod.TOKEN_PARTIAL:
penalty = rank.penalty + chgpenalty \
+ max(t.penalty for t in tlist.tokens)
heapq.heappush(todo, (neglen - 1, tlist.end,
@@ -317,7 +317,7 @@ class SearchBuilder:
heapq.heappush(todo, (neglen - 1, tlist.end,
rank.with_token(t, chgpenalty)))
elif tlist.end == trange.end:
if tlist.ttype == TokenType.PARTIAL:
if tlist.ttype == qmod.TOKEN_PARTIAL:
ranks.append(dbf.RankedTokens(rank.penalty
+ max(t.penalty for t in tlist.tokens),
rank.tokens))
@@ -357,11 +357,11 @@ class SearchBuilder:
if assignment.housenumber:
sdata.set_strings('housenumbers',
self.query.get_tokens(assignment.housenumber,
TokenType.HOUSENUMBER))
qmod.TOKEN_HOUSENUMBER))
if assignment.postcode:
sdata.set_strings('postcodes',
self.query.get_tokens(assignment.postcode,
TokenType.POSTCODE))
qmod.TOKEN_POSTCODE))
if assignment.qualifier:
tokens = self.get_qualifier_tokens(assignment.qualifier)
if not tokens:
@@ -386,23 +386,23 @@ class SearchBuilder:
return sdata
def get_country_tokens(self, trange: TokenRange) -> List[Token]:
def get_country_tokens(self, trange: qmod.TokenRange) -> List[qmod.Token]:
""" Return the list of country tokens for the given range,
optionally filtered by the country list from the details
parameters.
"""
tokens = self.query.get_tokens(trange, TokenType.COUNTRY)
tokens = self.query.get_tokens(trange, qmod.TOKEN_COUNTRY)
if self.details.countries:
tokens = [t for t in tokens if t.lookup_word in self.details.countries]
return tokens
def get_qualifier_tokens(self, trange: TokenRange) -> List[Token]:
def get_qualifier_tokens(self, trange: qmod.TokenRange) -> List[qmod.Token]:
""" Return the list of qualifier tokens for the given range,
optionally filtered by the qualifier list from the details
parameters.
"""
tokens = self.query.get_tokens(trange, TokenType.QUALIFIER)
tokens = self.query.get_tokens(trange, qmod.TOKEN_QUALIFIER)
if self.details.categories:
tokens = [t for t in tokens if t.get_category() in self.details.categories]
@@ -415,7 +415,7 @@ class SearchBuilder:
"""
if assignment.near_item:
tokens: Dict[Tuple[str, str], float] = {}
for t in self.query.get_tokens(assignment.near_item, TokenType.NEAR_ITEM):
for t in self.query.get_tokens(assignment.near_item, qmod.TOKEN_NEAR_ITEM):
cat = t.get_category()
# The category of a near search will be that of near_item.
# Thus, if search is restricted to a category parameter,
@@ -429,11 +429,11 @@ class SearchBuilder:
PENALTY_WORDCHANGE = {
BreakType.START: 0.0,
BreakType.END: 0.0,
BreakType.PHRASE: 0.0,
BreakType.SOFT_PHRASE: 0.0,
BreakType.WORD: 0.1,
BreakType.PART: 0.2,
BreakType.TOKEN: 0.4
qmod.BREAK_START: 0.0,
qmod.BREAK_END: 0.0,
qmod.BREAK_PHRASE: 0.0,
qmod.BREAK_SOFT_PHRASE: 0.0,
qmod.BREAK_WORD: 0.1,
qmod.BREAK_PART: 0.2,
qmod.BREAK_TOKEN: 0.4
}

View File

@@ -29,36 +29,37 @@ from .query_analyzer_factory import AbstractQueryAnalyzer
DB_TO_TOKEN_TYPE = {
'W': qmod.TokenType.WORD,
'w': qmod.TokenType.PARTIAL,
'H': qmod.TokenType.HOUSENUMBER,
'P': qmod.TokenType.POSTCODE,
'C': qmod.TokenType.COUNTRY
'W': qmod.TOKEN_WORD,
'w': qmod.TOKEN_PARTIAL,
'H': qmod.TOKEN_HOUSENUMBER,
'P': qmod.TOKEN_POSTCODE,
'C': qmod.TOKEN_COUNTRY
}
PENALTY_IN_TOKEN_BREAK = {
qmod.BreakType.START: 0.5,
qmod.BreakType.END: 0.5,
qmod.BreakType.PHRASE: 0.5,
qmod.BreakType.SOFT_PHRASE: 0.5,
qmod.BreakType.WORD: 0.1,
qmod.BreakType.PART: 0.0,
qmod.BreakType.TOKEN: 0.0
qmod.BREAK_START: 0.5,
qmod.BREAK_END: 0.5,
qmod.BREAK_PHRASE: 0.5,
qmod.BREAK_SOFT_PHRASE: 0.5,
qmod.BREAK_WORD: 0.1,
qmod.BREAK_PART: 0.0,
qmod.BREAK_TOKEN: 0.0
}
@dataclasses.dataclass
class QueryPart:
""" Normalized and transliterated form of a single term in the query.
When the term came out of a split during the transliteration,
the normalized string is the full word before transliteration.
The word number keeps track of the word before transliteration
and can be used to identify partial transliterated terms.
Check the subsequent break type to figure out if the word is
continued.
Penalty is the break penalty for the break following the token.
"""
token: str
normalized: str
word_number: int
penalty: float
@@ -66,19 +67,20 @@ QueryParts = List[QueryPart]
WordDict = Dict[str, List[qmod.TokenRange]]
def yield_words(terms: List[QueryPart], start: int) -> Iterator[Tuple[str, qmod.TokenRange]]:
""" Return all combinations of words in the terms list after the
given position.
def extract_words(terms: List[QueryPart], start: int, words: WordDict) -> None:
""" Add all combinations of words in the terms list after the
given position to the word list.
"""
total = len(terms)
base_penalty = PENALTY_IN_TOKEN_BREAK[qmod.BREAK_WORD]
for first in range(start, total):
word = terms[first].token
penalty = PENALTY_IN_TOKEN_BREAK[qmod.BreakType.WORD]
yield word, qmod.TokenRange(first, first + 1, penalty=penalty)
penalty = base_penalty
words[word].append(qmod.TokenRange(first, first + 1, penalty=penalty))
for last in range(first + 1, min(first + 20, total)):
word = ' '.join((word, terms[last].token))
penalty += terms[last - 1].penalty
yield word, qmod.TokenRange(first, last + 1, penalty=penalty)
words[word].append(qmod.TokenRange(first, last + 1, penalty=penalty))
@dataclasses.dataclass
@@ -223,12 +225,12 @@ class ICUQueryAnalyzer(AbstractQueryAnalyzer):
if row.type == 'S':
if row.info['op'] in ('in', 'near'):
if trange.start == 0:
query.add_token(trange, qmod.TokenType.NEAR_ITEM, token)
query.add_token(trange, qmod.TOKEN_NEAR_ITEM, token)
else:
if trange.start == 0 and trange.end == query.num_token_slots():
query.add_token(trange, qmod.TokenType.NEAR_ITEM, token)
query.add_token(trange, qmod.TOKEN_NEAR_ITEM, token)
else:
query.add_token(trange, qmod.TokenType.QUALIFIER, token)
query.add_token(trange, qmod.TOKEN_QUALIFIER, token)
else:
query.add_token(trange, DB_TO_TOKEN_TYPE[row.type], token)
@@ -255,8 +257,7 @@ class ICUQueryAnalyzer(AbstractQueryAnalyzer):
"""
parts: QueryParts = []
phrase_start = 0
words = defaultdict(list)
wordnr = 0
words: WordDict = defaultdict(list)
for phrase in query.source:
query.nodes[-1].ptype = phrase.ptype
phrase_split = re.split('([ :-])', phrase.text)
@@ -271,18 +272,16 @@ class ICUQueryAnalyzer(AbstractQueryAnalyzer):
if trans:
for term in trans.split(' '):
if term:
parts.append(QueryPart(term, word, wordnr,
PENALTY_IN_TOKEN_BREAK[qmod.BreakType.TOKEN]))
query.add_node(qmod.BreakType.TOKEN, phrase.ptype)
query.nodes[-1].btype = qmod.BreakType(breakchar)
parts[-1].penalty = PENALTY_IN_TOKEN_BREAK[qmod.BreakType(breakchar)]
wordnr += 1
parts.append(QueryPart(term, word,
PENALTY_IN_TOKEN_BREAK[qmod.BREAK_TOKEN]))
query.add_node(qmod.BREAK_TOKEN, phrase.ptype)
query.nodes[-1].btype = breakchar
parts[-1].penalty = PENALTY_IN_TOKEN_BREAK[breakchar]
for word, wrange in yield_words(parts, phrase_start):
words[word].append(wrange)
extract_words(parts, phrase_start, words)
phrase_start = len(parts)
query.nodes[-1].btype = qmod.BreakType.END
query.nodes[-1].btype = qmod.BREAK_END
return parts, words
@@ -298,8 +297,8 @@ class ICUQueryAnalyzer(AbstractQueryAnalyzer):
"""
for part, node, i in zip(parts, query.nodes, range(1000)):
if len(part.token) <= 4 and part.token.isdigit()\
and not node.has_tokens(i+1, qmod.TokenType.HOUSENUMBER):
query.add_token(qmod.TokenRange(i, i+1), qmod.TokenType.HOUSENUMBER,
and not node.has_tokens(i+1, qmod.TOKEN_HOUSENUMBER):
query.add_token(qmod.TokenRange(i, i+1), qmod.TOKEN_HOUSENUMBER,
ICUToken(penalty=0.5, token=0,
count=1, addr_count=1, lookup_word=part.token,
word_token=part.token, info=None))
@@ -308,31 +307,31 @@ class ICUQueryAnalyzer(AbstractQueryAnalyzer):
""" Add penalties to tokens that depend on presence of other token.
"""
for i, node, tlist in query.iter_token_lists():
if tlist.ttype == qmod.TokenType.POSTCODE:
if tlist.ttype == qmod.TOKEN_POSTCODE:
for repl in node.starting:
if repl.end == tlist.end and repl.ttype != qmod.TokenType.POSTCODE \
and (repl.ttype != qmod.TokenType.HOUSENUMBER
if repl.end == tlist.end and repl.ttype != qmod.TOKEN_POSTCODE \
and (repl.ttype != qmod.TOKEN_HOUSENUMBER
or len(tlist.tokens[0].lookup_word) > 4):
repl.add_penalty(0.39)
elif (tlist.ttype == qmod.TokenType.HOUSENUMBER
elif (tlist.ttype == qmod.TOKEN_HOUSENUMBER
and len(tlist.tokens[0].lookup_word) <= 3):
if any(c.isdigit() for c in tlist.tokens[0].lookup_word):
for repl in node.starting:
if repl.end == tlist.end and repl.ttype != qmod.TokenType.HOUSENUMBER:
if repl.end == tlist.end and repl.ttype != qmod.TOKEN_HOUSENUMBER:
repl.add_penalty(0.5 - tlist.tokens[0].penalty)
elif tlist.ttype not in (qmod.TokenType.COUNTRY, qmod.TokenType.PARTIAL):
elif tlist.ttype not in (qmod.TOKEN_COUNTRY, qmod.TOKEN_PARTIAL):
norm = parts[i].normalized
for j in range(i + 1, tlist.end):
if parts[j - 1].word_number != parts[j].word_number:
if node.btype != qmod.BREAK_TOKEN:
norm += ' ' + parts[j].normalized
for token in tlist.tokens:
cast(ICUToken, token).rematch(norm)
def _dump_transliterated(query: qmod.QueryStruct, parts: QueryParts) -> str:
out = query.nodes[0].btype.value
out = query.nodes[0].btype
for node, part in zip(query.nodes[1:], parts):
out += part.token + node.btype.value
out += part.token + node.btype
return out
@@ -342,7 +341,7 @@ def _dump_word_tokens(query: qmod.QueryStruct) -> Iterator[List[Any]]:
for tlist in node.starting:
for token in tlist.tokens:
t = cast(ICUToken, token)
yield [tlist.ttype.name, t.token, t.word_token or '',
yield [tlist.ttype, t.token, t.word_token or '',
t.lookup_word or '', t.penalty, t.count, t.info]

View File

@@ -10,91 +10,91 @@ Datastructures for a tokenized query.
from typing import List, Tuple, Optional, Iterator
from abc import ABC, abstractmethod
import dataclasses
import enum
class BreakType(enum.Enum):
""" Type of break between tokens.
"""
START = '<'
""" Begin of the query. """
END = '>'
""" End of the query. """
PHRASE = ','
""" Hard break between two phrases. Address parts cannot cross hard
phrase boundaries."""
SOFT_PHRASE = ':'
""" Likely break between two phrases. Address parts should not cross soft
phrase boundaries. Soft breaks can be inserted by a preprocessor
that is analysing the input string.
"""
WORD = ' '
""" Break between words. """
PART = '-'
""" Break inside a word, for example a hyphen or apostrophe. """
TOKEN = '`'
""" Break created as a result of tokenization.
This may happen in languages without spaces between words.
BreakType = str
""" Type of break between tokens.
"""
BREAK_START = '<'
""" Begin of the query. """
BREAK_END = '>'
""" End of the query. """
BREAK_PHRASE = ','
""" Hard break between two phrases. Address parts cannot cross hard
phrase boundaries."""
BREAK_SOFT_PHRASE = ':'
""" Likely break between two phrases. Address parts should not cross soft
phrase boundaries. Soft breaks can be inserted by a preprocessor
that is analysing the input string.
"""
BREAK_WORD = ' '
""" Break between words. """
BREAK_PART = '-'
""" Break inside a word, for example a hyphen or apostrophe. """
BREAK_TOKEN = '`'
""" Break created as a result of tokenization.
This may happen in languages without spaces between words.
"""
TokenType = str
""" Type of token.
"""
TOKEN_WORD = 'W'
""" Full name of a place. """
TOKEN_PARTIAL = 'w'
""" Word term without breaks, does not necessarily represent a full name. """
TOKEN_HOUSENUMBER = 'H'
""" Housenumber term. """
TOKEN_POSTCODE = 'P'
""" Postal code term. """
TOKEN_COUNTRY = 'C'
""" Country name or reference. """
TOKEN_QUALIFIER = 'Q'
""" Special term used together with name (e.g. _Hotel_ Bellevue). """
TOKEN_NEAR_ITEM = 'N'
""" Special term used as searchable object(e.g. supermarket in ...). """
PhraseType = int
""" Designation of a phrase.
"""
PHRASE_ANY = 0
""" No specific designation (i.e. source is free-form query). """
PHRASE_AMENITY = 1
""" Contains name or type of a POI. """
PHRASE_STREET = 2
""" Contains a street name optionally with a housenumber. """
PHRASE_CITY = 3
""" Contains the postal city. """
PHRASE_COUNTY = 4
""" Contains the equivalent of a county. """
PHRASE_STATE = 5
""" Contains a state or province. """
PHRASE_POSTCODE = 6
""" Contains a postal code. """
PHRASE_COUNTRY = 7
""" Contains the country name or code. """
def _phrase_compatible_with(ptype: PhraseType, ttype: TokenType,
is_full_phrase: bool) -> bool:
""" Check if the given token type can be used with the phrase type.
"""
if ptype == PHRASE_ANY:
return not is_full_phrase or ttype != TOKEN_QUALIFIER
if ptype == PHRASE_AMENITY:
return ttype in (TOKEN_WORD, TOKEN_PARTIAL)\
or (is_full_phrase and ttype == TOKEN_NEAR_ITEM)\
or (not is_full_phrase and ttype == TOKEN_QUALIFIER)
if ptype == PHRASE_STREET:
return ttype in (TOKEN_WORD, TOKEN_PARTIAL, TOKEN_HOUSENUMBER)
if ptype == PHRASE_POSTCODE:
return ttype == TOKEN_POSTCODE
if ptype == PHRASE_COUNTRY:
return ttype == TOKEN_COUNTRY
class TokenType(enum.Enum):
""" Type of token.
"""
WORD = enum.auto()
""" Full name of a place. """
PARTIAL = enum.auto()
""" Word term without breaks, does not necessarily represent a full name. """
HOUSENUMBER = enum.auto()
""" Housenumber term. """
POSTCODE = enum.auto()
""" Postal code term. """
COUNTRY = enum.auto()
""" Country name or reference. """
QUALIFIER = enum.auto()
""" Special term used together with name (e.g. _Hotel_ Bellevue). """
NEAR_ITEM = enum.auto()
""" Special term used as searchable object(e.g. supermarket in ...). """
class PhraseType(enum.Enum):
""" Designation of a phrase.
"""
NONE = 0
""" No specific designation (i.e. source is free-form query). """
AMENITY = enum.auto()
""" Contains name or type of a POI. """
STREET = enum.auto()
""" Contains a street name optionally with a housenumber. """
CITY = enum.auto()
""" Contains the postal city. """
COUNTY = enum.auto()
""" Contains the equivalent of a county. """
STATE = enum.auto()
""" Contains a state or province. """
POSTCODE = enum.auto()
""" Contains a postal code. """
COUNTRY = enum.auto()
""" Contains the country name or code. """
def compatible_with(self, ttype: TokenType,
is_full_phrase: bool) -> bool:
""" Check if the given token type can be used with the phrase type.
"""
if self == PhraseType.NONE:
return not is_full_phrase or ttype != TokenType.QUALIFIER
if self == PhraseType.AMENITY:
return ttype in (TokenType.WORD, TokenType.PARTIAL)\
or (is_full_phrase and ttype == TokenType.NEAR_ITEM)\
or (not is_full_phrase and ttype == TokenType.QUALIFIER)
if self == PhraseType.STREET:
return ttype in (TokenType.WORD, TokenType.PARTIAL, TokenType.HOUSENUMBER)
if self == PhraseType.POSTCODE:
return ttype == TokenType.POSTCODE
if self == PhraseType.COUNTRY:
return ttype == TokenType.COUNTRY
return ttype in (TokenType.WORD, TokenType.PARTIAL)
return ttype in (TOKEN_WORD, TOKEN_PARTIAL)
@dataclasses.dataclass
@@ -218,7 +218,7 @@ class QueryStruct:
def __init__(self, source: List[Phrase]) -> None:
self.source = source
self.nodes: List[QueryNode] = \
[QueryNode(BreakType.START, source[0].ptype if source else PhraseType.NONE)]
[QueryNode(BREAK_START, source[0].ptype if source else PHRASE_ANY)]
def num_token_slots(self) -> int:
""" Return the length of the query in vertice steps.
@@ -243,9 +243,9 @@ class QueryStruct:
be added to, then the token is silently dropped.
"""
snode = self.nodes[trange.start]
full_phrase = snode.btype in (BreakType.START, BreakType.PHRASE)\
and self.nodes[trange.end].btype in (BreakType.PHRASE, BreakType.END)
if snode.ptype.compatible_with(ttype, full_phrase):
full_phrase = snode.btype in (BREAK_START, BREAK_PHRASE)\
and self.nodes[trange.end].btype in (BREAK_PHRASE, BREAK_END)
if _phrase_compatible_with(snode.ptype, ttype, full_phrase):
tlist = snode.get_tokens(trange.end, ttype)
if tlist is None:
snode.starting.append(TokenList(trange.end, ttype, [token]))
@@ -265,7 +265,7 @@ class QueryStruct:
going to the subsequent node. Such PARTIAL tokens are
assumed to exist.
"""
return [next(iter(self.get_tokens(TokenRange(i, i+1), TokenType.PARTIAL)))
return [next(iter(self.get_tokens(TokenRange(i, i+1), TOKEN_PARTIAL)))
for i in range(trange.start, trange.end)]
def iter_token_lists(self) -> Iterator[Tuple[int, QueryNode, TokenList]]:
@@ -285,5 +285,5 @@ class QueryStruct:
for tlist in node.starting:
for t in tlist.tokens:
if t.token == token:
return f"[{tlist.ttype.name[0]}]{t.lookup_word}"
return f"[{tlist.ttype}]{t.lookup_word}"
return 'None'

View File

@@ -24,13 +24,13 @@ class TypedRange:
PENALTY_TOKENCHANGE = {
qmod.BreakType.START: 0.0,
qmod.BreakType.END: 0.0,
qmod.BreakType.PHRASE: 0.0,
qmod.BreakType.SOFT_PHRASE: 0.0,
qmod.BreakType.WORD: 0.1,
qmod.BreakType.PART: 0.2,
qmod.BreakType.TOKEN: 0.4
qmod.BREAK_START: 0.0,
qmod.BREAK_END: 0.0,
qmod.BREAK_PHRASE: 0.0,
qmod.BREAK_SOFT_PHRASE: 0.0,
qmod.BREAK_WORD: 0.1,
qmod.BREAK_PART: 0.2,
qmod.BREAK_TOKEN: 0.4
}
TypedRangeSeq = List[TypedRange]
@@ -56,17 +56,17 @@ class TokenAssignment:
"""
out = TokenAssignment()
for token in ranges:
if token.ttype == qmod.TokenType.PARTIAL:
if token.ttype == qmod.TOKEN_PARTIAL:
out.address.append(token.trange)
elif token.ttype == qmod.TokenType.HOUSENUMBER:
elif token.ttype == qmod.TOKEN_HOUSENUMBER:
out.housenumber = token.trange
elif token.ttype == qmod.TokenType.POSTCODE:
elif token.ttype == qmod.TOKEN_POSTCODE:
out.postcode = token.trange
elif token.ttype == qmod.TokenType.COUNTRY:
elif token.ttype == qmod.TOKEN_COUNTRY:
out.country = token.trange
elif token.ttype == qmod.TokenType.NEAR_ITEM:
elif token.ttype == qmod.TOKEN_NEAR_ITEM:
out.near_item = token.trange
elif token.ttype == qmod.TokenType.QUALIFIER:
elif token.ttype == qmod.TOKEN_QUALIFIER:
out.qualifier = token.trange
return out
@@ -84,7 +84,7 @@ class _TokenSequence:
self.penalty = penalty
def __str__(self) -> str:
seq = ''.join(f'[{r.trange.start} - {r.trange.end}: {r.ttype.name}]' for r in self.seq)
seq = ''.join(f'[{r.trange.start} - {r.trange.end}: {r.ttype}]' for r in self.seq)
return f'{seq} (dir: {self.direction}, penalty: {self.penalty})'
@property
@@ -105,7 +105,7 @@ class _TokenSequence:
"""
# Country and category must be the final term for left-to-right
return len(self.seq) > 1 and \
self.seq[-1].ttype in (qmod.TokenType.COUNTRY, qmod.TokenType.NEAR_ITEM)
self.seq[-1].ttype in (qmod.TOKEN_COUNTRY, qmod.TOKEN_NEAR_ITEM)
def appendable(self, ttype: qmod.TokenType) -> Optional[int]:
""" Check if the give token type is appendable to the existing sequence.
@@ -114,23 +114,23 @@ class _TokenSequence:
new direction of the sequence after adding such a type. The
token is not added.
"""
if ttype == qmod.TokenType.WORD:
if ttype == qmod.TOKEN_WORD:
return None
if not self.seq:
# Append unconditionally to the empty list
if ttype == qmod.TokenType.COUNTRY:
if ttype == qmod.TOKEN_COUNTRY:
return -1
if ttype in (qmod.TokenType.HOUSENUMBER, qmod.TokenType.QUALIFIER):
if ttype in (qmod.TOKEN_HOUSENUMBER, qmod.TOKEN_QUALIFIER):
return 1
return self.direction
# Name tokens are always acceptable and don't change direction
if ttype == qmod.TokenType.PARTIAL:
if ttype == qmod.TOKEN_PARTIAL:
# qualifiers cannot appear in the middle of the query. They need
# to be near the next phrase.
if self.direction == -1 \
and any(t.ttype == qmod.TokenType.QUALIFIER for t in self.seq[:-1]):
and any(t.ttype == qmod.TOKEN_QUALIFIER for t in self.seq[:-1]):
return None
return self.direction
@@ -138,54 +138,54 @@ class _TokenSequence:
if self.has_types(ttype):
return None
if ttype == qmod.TokenType.HOUSENUMBER:
if ttype == qmod.TOKEN_HOUSENUMBER:
if self.direction == 1:
if len(self.seq) == 1 and self.seq[0].ttype == qmod.TokenType.QUALIFIER:
if len(self.seq) == 1 and self.seq[0].ttype == qmod.TOKEN_QUALIFIER:
return None
if len(self.seq) > 2 \
or self.has_types(qmod.TokenType.POSTCODE, qmod.TokenType.COUNTRY):
or self.has_types(qmod.TOKEN_POSTCODE, qmod.TOKEN_COUNTRY):
return None # direction left-to-right: housenumber must come before anything
elif (self.direction == -1
or self.has_types(qmod.TokenType.POSTCODE, qmod.TokenType.COUNTRY)):
or self.has_types(qmod.TOKEN_POSTCODE, qmod.TOKEN_COUNTRY)):
return -1 # force direction right-to-left if after other terms
return self.direction
if ttype == qmod.TokenType.POSTCODE:
if ttype == qmod.TOKEN_POSTCODE:
if self.direction == -1:
if self.has_types(qmod.TokenType.HOUSENUMBER, qmod.TokenType.QUALIFIER):
if self.has_types(qmod.TOKEN_HOUSENUMBER, qmod.TOKEN_QUALIFIER):
return None
return -1
if self.direction == 1:
return None if self.has_types(qmod.TokenType.COUNTRY) else 1
if self.has_types(qmod.TokenType.HOUSENUMBER, qmod.TokenType.QUALIFIER):
return None if self.has_types(qmod.TOKEN_COUNTRY) else 1
if self.has_types(qmod.TOKEN_HOUSENUMBER, qmod.TOKEN_QUALIFIER):
return 1
return self.direction
if ttype == qmod.TokenType.COUNTRY:
if ttype == qmod.TOKEN_COUNTRY:
return None if self.direction == -1 else 1
if ttype == qmod.TokenType.NEAR_ITEM:
if ttype == qmod.TOKEN_NEAR_ITEM:
return self.direction
if ttype == qmod.TokenType.QUALIFIER:
if ttype == qmod.TOKEN_QUALIFIER:
if self.direction == 1:
if (len(self.seq) == 1
and self.seq[0].ttype in (qmod.TokenType.PARTIAL, qmod.TokenType.NEAR_ITEM)) \
and self.seq[0].ttype in (qmod.TOKEN_PARTIAL, qmod.TOKEN_NEAR_ITEM)) \
or (len(self.seq) == 2
and self.seq[0].ttype == qmod.TokenType.NEAR_ITEM
and self.seq[1].ttype == qmod.TokenType.PARTIAL):
and self.seq[0].ttype == qmod.TOKEN_NEAR_ITEM
and self.seq[1].ttype == qmod.TOKEN_PARTIAL):
return 1
return None
if self.direction == -1:
return -1
tempseq = self.seq[1:] if self.seq[0].ttype == qmod.TokenType.NEAR_ITEM else self.seq
tempseq = self.seq[1:] if self.seq[0].ttype == qmod.TOKEN_NEAR_ITEM else self.seq
if len(tempseq) == 0:
return 1
if len(tempseq) == 1 and self.seq[0].ttype == qmod.TokenType.HOUSENUMBER:
if len(tempseq) == 1 and self.seq[0].ttype == qmod.TOKEN_HOUSENUMBER:
return None
if len(tempseq) > 1 or self.has_types(qmod.TokenType.POSTCODE, qmod.TokenType.COUNTRY):
if len(tempseq) > 1 or self.has_types(qmod.TOKEN_POSTCODE, qmod.TOKEN_COUNTRY):
return -1
return 0
@@ -205,7 +205,7 @@ class _TokenSequence:
new_penalty = 0.0
else:
last = self.seq[-1]
if btype != qmod.BreakType.PHRASE and last.ttype == ttype:
if btype != qmod.BREAK_PHRASE and last.ttype == ttype:
# extend the existing range
newseq = self.seq[:-1] + [TypedRange(ttype, last.trange.replace_end(end_pos))]
new_penalty = 0.0
@@ -240,18 +240,18 @@ class _TokenSequence:
# housenumbers may not be further than 2 words from the beginning.
# If there are two words in front, give it a penalty.
hnrpos = next((i for i, tr in enumerate(self.seq)
if tr.ttype == qmod.TokenType.HOUSENUMBER),
if tr.ttype == qmod.TOKEN_HOUSENUMBER),
None)
if hnrpos is not None:
if self.direction != -1:
priors = sum(1 for t in self.seq[:hnrpos] if t.ttype == qmod.TokenType.PARTIAL)
priors = sum(1 for t in self.seq[:hnrpos] if t.ttype == qmod.TOKEN_PARTIAL)
if not self._adapt_penalty_from_priors(priors, -1):
return False
if self.direction != 1:
priors = sum(1 for t in self.seq[hnrpos+1:] if t.ttype == qmod.TokenType.PARTIAL)
priors = sum(1 for t in self.seq[hnrpos+1:] if t.ttype == qmod.TOKEN_PARTIAL)
if not self._adapt_penalty_from_priors(priors, 1):
return False
if any(t.ttype == qmod.TokenType.NEAR_ITEM for t in self.seq):
if any(t.ttype == qmod.TOKEN_NEAR_ITEM for t in self.seq):
self.penalty += 1.0
return True
@@ -293,7 +293,7 @@ class _TokenSequence:
# * the containing phrase is strictly typed
if (base.housenumber and first.end < base.housenumber.start)\
or (base.qualifier and base.qualifier > first)\
or (query.nodes[first.start].ptype != qmod.PhraseType.NONE):
or (query.nodes[first.start].ptype != qmod.PHRASE_ANY):
return
penalty = self.penalty
@@ -329,7 +329,7 @@ class _TokenSequence:
# * the containing phrase is strictly typed
if (base.housenumber and last.start > base.housenumber.end)\
or (base.qualifier and base.qualifier < last)\
or (query.nodes[last.start].ptype != qmod.PhraseType.NONE):
or (query.nodes[last.start].ptype != qmod.PHRASE_ANY):
return
penalty = self.penalty
@@ -393,7 +393,7 @@ def yield_token_assignments(query: qmod.QueryStruct) -> Iterator[TokenAssignment
another. It does not include penalties for transitions within a
type.
"""
todo = [_TokenSequence([], direction=0 if query.source[0].ptype == qmod.PhraseType.NONE else 1)]
todo = [_TokenSequence([], direction=0 if query.source[0].ptype == qmod.PHRASE_ANY else 1)]
while todo:
state = todo.pop()

View File

@@ -26,9 +26,9 @@ def run_preprocessor_on(query, norm):
def test_normalize_simple():
norm = ':: lower();'
query = [qmod.Phrase(qmod.PhraseType.NONE, 'Hallo')]
query = [qmod.Phrase(qmod.PHRASE_ANY, 'Hallo')]
out = run_preprocessor_on(query, norm)
assert len(out) == 1
assert out == [qmod.Phrase(qmod.PhraseType.NONE, 'hallo')]
assert out == [qmod.Phrase(qmod.PHRASE_ANY, 'hallo')]

View File

@@ -27,8 +27,8 @@ def run_preprocessor_on(query):
('大阪府大阪', '大阪府:大阪'),
('大阪市大阪', '大阪市:大阪')])
def test_split_phrases(inp, outp):
query = [qmod.Phrase(qmod.PhraseType.NONE, inp)]
query = [qmod.Phrase(qmod.PHRASE_ANY, inp)]
out = run_preprocessor_on(query)
assert out == [qmod.Phrase(qmod.PhraseType.NONE, outp)]
assert out == [qmod.Phrase(qmod.PHRASE_ANY, outp)]

View File

@@ -22,42 +22,42 @@ def mktoken(tid: int):
lookup_word='foo')
@pytest.mark.parametrize('ptype,ttype', [('NONE', 'WORD'),
('AMENITY', 'QUALIFIER'),
('STREET', 'PARTIAL'),
('CITY', 'WORD'),
('COUNTRY', 'COUNTRY'),
('POSTCODE', 'POSTCODE')])
@pytest.mark.parametrize('ptype,ttype', [(query.PHRASE_ANY, 'W'),
(query.PHRASE_AMENITY, 'Q'),
(query.PHRASE_STREET, 'w'),
(query.PHRASE_CITY, 'W'),
(query.PHRASE_COUNTRY, 'C'),
(query.PHRASE_POSTCODE, 'P')])
def test_phrase_compatible(ptype, ttype):
assert query.PhraseType[ptype].compatible_with(query.TokenType[ttype], False)
assert query._phrase_compatible_with(ptype, ttype, False)
@pytest.mark.parametrize('ptype', ['COUNTRY', 'POSTCODE'])
@pytest.mark.parametrize('ptype', [query.PHRASE_COUNTRY, query.PHRASE_POSTCODE])
def test_phrase_incompatible(ptype):
assert not query.PhraseType[ptype].compatible_with(query.TokenType.PARTIAL, True)
assert not query._phrase_compatible_with(ptype, query.TOKEN_PARTIAL, True)
def test_query_node_empty():
qn = query.QueryNode(query.BreakType.PHRASE, query.PhraseType.NONE)
qn = query.QueryNode(query.BREAK_PHRASE, query.PHRASE_ANY)
assert not qn.has_tokens(3, query.TokenType.PARTIAL)
assert qn.get_tokens(3, query.TokenType.WORD) is None
assert not qn.has_tokens(3, query.TOKEN_PARTIAL)
assert qn.get_tokens(3, query.TOKEN_WORD) is None
def test_query_node_with_content():
qn = query.QueryNode(query.BreakType.PHRASE, query.PhraseType.NONE)
qn.starting.append(query.TokenList(2, query.TokenType.PARTIAL, [mktoken(100), mktoken(101)]))
qn.starting.append(query.TokenList(2, query.TokenType.WORD, [mktoken(1000)]))
qn = query.QueryNode(query.BREAK_PHRASE, query.PHRASE_ANY)
qn.starting.append(query.TokenList(2, query.TOKEN_PARTIAL, [mktoken(100), mktoken(101)]))
qn.starting.append(query.TokenList(2, query.TOKEN_WORD, [mktoken(1000)]))
assert not qn.has_tokens(3, query.TokenType.PARTIAL)
assert not qn.has_tokens(2, query.TokenType.COUNTRY)
assert qn.has_tokens(2, query.TokenType.PARTIAL)
assert qn.has_tokens(2, query.TokenType.WORD)
assert not qn.has_tokens(3, query.TOKEN_PARTIAL)
assert not qn.has_tokens(2, query.TOKEN_COUNTRY)
assert qn.has_tokens(2, query.TOKEN_PARTIAL)
assert qn.has_tokens(2, query.TOKEN_WORD)
assert qn.get_tokens(3, query.TokenType.PARTIAL) is None
assert qn.get_tokens(2, query.TokenType.COUNTRY) is None
assert len(qn.get_tokens(2, query.TokenType.PARTIAL)) == 2
assert len(qn.get_tokens(2, query.TokenType.WORD)) == 1
assert qn.get_tokens(3, query.TOKEN_PARTIAL) is None
assert qn.get_tokens(2, query.TOKEN_COUNTRY) is None
assert len(qn.get_tokens(2, query.TOKEN_PARTIAL)) == 2
assert len(qn.get_tokens(2, query.TOKEN_WORD)) == 1
def test_query_struct_empty():
@@ -67,19 +67,19 @@ def test_query_struct_empty():
def test_query_struct_with_tokens():
q = query.QueryStruct([query.Phrase(query.PhraseType.NONE, 'foo bar')])
q.add_node(query.BreakType.WORD, query.PhraseType.NONE)
q.add_node(query.BreakType.END, query.PhraseType.NONE)
q = query.QueryStruct([query.Phrase(query.PHRASE_ANY, 'foo bar')])
q.add_node(query.BREAK_WORD, query.PHRASE_ANY)
q.add_node(query.BREAK_END, query.PHRASE_ANY)
assert q.num_token_slots() == 2
q.add_token(query.TokenRange(0, 1), query.TokenType.PARTIAL, mktoken(1))
q.add_token(query.TokenRange(1, 2), query.TokenType.PARTIAL, mktoken(2))
q.add_token(query.TokenRange(1, 2), query.TokenType.WORD, mktoken(99))
q.add_token(query.TokenRange(1, 2), query.TokenType.WORD, mktoken(98))
q.add_token(query.TokenRange(0, 1), query.TOKEN_PARTIAL, mktoken(1))
q.add_token(query.TokenRange(1, 2), query.TOKEN_PARTIAL, mktoken(2))
q.add_token(query.TokenRange(1, 2), query.TOKEN_WORD, mktoken(99))
q.add_token(query.TokenRange(1, 2), query.TOKEN_WORD, mktoken(98))
assert q.get_tokens(query.TokenRange(0, 2), query.TokenType.WORD) == []
assert len(q.get_tokens(query.TokenRange(1, 2), query.TokenType.WORD)) == 2
assert q.get_tokens(query.TokenRange(0, 2), query.TOKEN_WORD) == []
assert len(q.get_tokens(query.TokenRange(1, 2), query.TOKEN_WORD)) == 2
partials = q.get_partials_list(query.TokenRange(0, 2))
@@ -91,45 +91,45 @@ def test_query_struct_with_tokens():
def test_query_struct_incompatible_token():
q = query.QueryStruct([query.Phrase(query.PhraseType.COUNTRY, 'foo bar')])
q.add_node(query.BreakType.WORD, query.PhraseType.COUNTRY)
q.add_node(query.BreakType.END, query.PhraseType.NONE)
q = query.QueryStruct([query.Phrase(query.PHRASE_COUNTRY, 'foo bar')])
q.add_node(query.BREAK_WORD, query.PHRASE_COUNTRY)
q.add_node(query.BREAK_END, query.PHRASE_ANY)
q.add_token(query.TokenRange(0, 1), query.TokenType.PARTIAL, mktoken(1))
q.add_token(query.TokenRange(1, 2), query.TokenType.COUNTRY, mktoken(100))
q.add_token(query.TokenRange(0, 1), query.TOKEN_PARTIAL, mktoken(1))
q.add_token(query.TokenRange(1, 2), query.TOKEN_COUNTRY, mktoken(100))
assert q.get_tokens(query.TokenRange(0, 1), query.TokenType.PARTIAL) == []
assert len(q.get_tokens(query.TokenRange(1, 2), query.TokenType.COUNTRY)) == 1
assert q.get_tokens(query.TokenRange(0, 1), query.TOKEN_PARTIAL) == []
assert len(q.get_tokens(query.TokenRange(1, 2), query.TOKEN_COUNTRY)) == 1
def test_query_struct_amenity_single_word():
q = query.QueryStruct([query.Phrase(query.PhraseType.AMENITY, 'bar')])
q.add_node(query.BreakType.END, query.PhraseType.NONE)
q = query.QueryStruct([query.Phrase(query.PHRASE_AMENITY, 'bar')])
q.add_node(query.BREAK_END, query.PHRASE_ANY)
q.add_token(query.TokenRange(0, 1), query.TokenType.PARTIAL, mktoken(1))
q.add_token(query.TokenRange(0, 1), query.TokenType.NEAR_ITEM, mktoken(2))
q.add_token(query.TokenRange(0, 1), query.TokenType.QUALIFIER, mktoken(3))
q.add_token(query.TokenRange(0, 1), query.TOKEN_PARTIAL, mktoken(1))
q.add_token(query.TokenRange(0, 1), query.TOKEN_NEAR_ITEM, mktoken(2))
q.add_token(query.TokenRange(0, 1), query.TOKEN_QUALIFIER, mktoken(3))
assert len(q.get_tokens(query.TokenRange(0, 1), query.TokenType.PARTIAL)) == 1
assert len(q.get_tokens(query.TokenRange(0, 1), query.TokenType.NEAR_ITEM)) == 1
assert len(q.get_tokens(query.TokenRange(0, 1), query.TokenType.QUALIFIER)) == 0
assert len(q.get_tokens(query.TokenRange(0, 1), query.TOKEN_PARTIAL)) == 1
assert len(q.get_tokens(query.TokenRange(0, 1), query.TOKEN_NEAR_ITEM)) == 1
assert len(q.get_tokens(query.TokenRange(0, 1), query.TOKEN_QUALIFIER)) == 0
def test_query_struct_amenity_two_words():
q = query.QueryStruct([query.Phrase(query.PhraseType.AMENITY, 'foo bar')])
q.add_node(query.BreakType.WORD, query.PhraseType.AMENITY)
q.add_node(query.BreakType.END, query.PhraseType.NONE)
q = query.QueryStruct([query.Phrase(query.PHRASE_AMENITY, 'foo bar')])
q.add_node(query.BREAK_WORD, query.PHRASE_AMENITY)
q.add_node(query.BREAK_END, query.PHRASE_ANY)
for trange in [(0, 1), (1, 2)]:
q.add_token(query.TokenRange(*trange), query.TokenType.PARTIAL, mktoken(1))
q.add_token(query.TokenRange(*trange), query.TokenType.NEAR_ITEM, mktoken(2))
q.add_token(query.TokenRange(*trange), query.TokenType.QUALIFIER, mktoken(3))
q.add_token(query.TokenRange(*trange), query.TOKEN_PARTIAL, mktoken(1))
q.add_token(query.TokenRange(*trange), query.TOKEN_NEAR_ITEM, mktoken(2))
q.add_token(query.TokenRange(*trange), query.TOKEN_QUALIFIER, mktoken(3))
assert len(q.get_tokens(query.TokenRange(0, 1), query.TokenType.PARTIAL)) == 1
assert len(q.get_tokens(query.TokenRange(0, 1), query.TokenType.NEAR_ITEM)) == 0
assert len(q.get_tokens(query.TokenRange(0, 1), query.TokenType.QUALIFIER)) == 1
assert len(q.get_tokens(query.TokenRange(0, 1), query.TOKEN_PARTIAL)) == 1
assert len(q.get_tokens(query.TokenRange(0, 1), query.TOKEN_NEAR_ITEM)) == 0
assert len(q.get_tokens(query.TokenRange(0, 1), query.TOKEN_QUALIFIER)) == 1
assert len(q.get_tokens(query.TokenRange(1, 2), query.TokenType.PARTIAL)) == 1
assert len(q.get_tokens(query.TokenRange(1, 2), query.TokenType.NEAR_ITEM)) == 0
assert len(q.get_tokens(query.TokenRange(1, 2), query.TokenType.QUALIFIER)) == 1
assert len(q.get_tokens(query.TokenRange(1, 2), query.TOKEN_PARTIAL)) == 1
assert len(q.get_tokens(query.TokenRange(1, 2), query.TOKEN_NEAR_ITEM)) == 0
assert len(q.get_tokens(query.TokenRange(1, 2), query.TOKEN_QUALIFIER)) == 1

View File

@@ -9,7 +9,8 @@ Tests for creating abstract searches from token assignments.
"""
import pytest
from nominatim_api.search.query import Token, TokenRange, BreakType, PhraseType, TokenType, QueryStruct, Phrase
from nominatim_api.search.query import Token, TokenRange, QueryStruct, Phrase
import nominatim_api.search.query as qmod
from nominatim_api.search.db_search_builder import SearchBuilder
from nominatim_api.search.token_assignment import TokenAssignment
from nominatim_api.types import SearchDetails
@@ -21,17 +22,17 @@ class MyToken(Token):
def make_query(*args):
q = QueryStruct([Phrase(PhraseType.NONE, '')])
q = QueryStruct([Phrase(qmod.PHRASE_ANY, '')])
for _ in range(max(inner[0] for tlist in args for inner in tlist)):
q.add_node(BreakType.WORD, PhraseType.NONE)
q.add_node(BreakType.END, PhraseType.NONE)
q.add_node(qmod.BREAK_WORD, qmod.PHRASE_ANY)
q.add_node(qmod.BREAK_END, qmod.PHRASE_ANY)
for start, tlist in enumerate(args):
for end, ttype, tinfo in tlist:
for tid, word in tinfo:
q.add_token(TokenRange(start, end), ttype,
MyToken(penalty=0.5 if ttype == TokenType.PARTIAL else 0.0,
MyToken(penalty=0.5 if ttype == qmod.TOKEN_PARTIAL else 0.0,
token=tid, count=1, addr_count=1,
lookup_word=word))
@@ -40,7 +41,7 @@ def make_query(*args):
def test_country_search():
q = make_query([(1, TokenType.COUNTRY, [(2, 'de'), (3, 'en')])])
q = make_query([(1, qmod.TOKEN_COUNTRY, [(2, 'de'), (3, 'en')])])
builder = SearchBuilder(q, SearchDetails())
searches = list(builder.build(TokenAssignment(country=TokenRange(0, 1))))
@@ -54,7 +55,7 @@ def test_country_search():
def test_country_search_with_country_restriction():
q = make_query([(1, TokenType.COUNTRY, [(2, 'de'), (3, 'en')])])
q = make_query([(1, qmod.TOKEN_COUNTRY, [(2, 'de'), (3, 'en')])])
builder = SearchBuilder(q, SearchDetails.from_kwargs({'countries': 'en,fr'}))
searches = list(builder.build(TokenAssignment(country=TokenRange(0, 1))))
@@ -68,7 +69,7 @@ def test_country_search_with_country_restriction():
def test_country_search_with_conflicting_country_restriction():
q = make_query([(1, TokenType.COUNTRY, [(2, 'de'), (3, 'en')])])
q = make_query([(1, qmod.TOKEN_COUNTRY, [(2, 'de'), (3, 'en')])])
builder = SearchBuilder(q, SearchDetails.from_kwargs({'countries': 'fr'}))
searches = list(builder.build(TokenAssignment(country=TokenRange(0, 1))))
@@ -77,7 +78,7 @@ def test_country_search_with_conflicting_country_restriction():
def test_postcode_search_simple():
q = make_query([(1, TokenType.POSTCODE, [(34, '2367')])])
q = make_query([(1, qmod.TOKEN_POSTCODE, [(34, '2367')])])
builder = SearchBuilder(q, SearchDetails())
searches = list(builder.build(TokenAssignment(postcode=TokenRange(0, 1))))
@@ -93,8 +94,8 @@ def test_postcode_search_simple():
def test_postcode_with_country():
q = make_query([(1, TokenType.POSTCODE, [(34, '2367')])],
[(2, TokenType.COUNTRY, [(1, 'xx')])])
q = make_query([(1, qmod.TOKEN_POSTCODE, [(34, '2367')])],
[(2, qmod.TOKEN_COUNTRY, [(1, 'xx')])])
builder = SearchBuilder(q, SearchDetails())
searches = list(builder.build(TokenAssignment(postcode=TokenRange(0, 1),
@@ -111,8 +112,8 @@ def test_postcode_with_country():
def test_postcode_with_address():
q = make_query([(1, TokenType.POSTCODE, [(34, '2367')])],
[(2, TokenType.PARTIAL, [(100, 'word')])])
q = make_query([(1, qmod.TOKEN_POSTCODE, [(34, '2367')])],
[(2, qmod.TOKEN_PARTIAL, [(100, 'word')])])
builder = SearchBuilder(q, SearchDetails())
searches = list(builder.build(TokenAssignment(postcode=TokenRange(0, 1),
@@ -129,9 +130,9 @@ def test_postcode_with_address():
def test_postcode_with_address_with_full_word():
q = make_query([(1, TokenType.POSTCODE, [(34, '2367')])],
[(2, TokenType.PARTIAL, [(100, 'word')]),
(2, TokenType.WORD, [(1, 'full')])])
q = make_query([(1, qmod.TOKEN_POSTCODE, [(34, '2367')])],
[(2, qmod.TOKEN_PARTIAL, [(100, 'word')]),
(2, qmod.TOKEN_WORD, [(1, 'full')])])
builder = SearchBuilder(q, SearchDetails())
searches = list(builder.build(TokenAssignment(postcode=TokenRange(0, 1),
@@ -150,7 +151,7 @@ def test_postcode_with_address_with_full_word():
@pytest.mark.parametrize('kwargs', [{'viewbox': '0,0,1,1', 'bounded_viewbox': True},
{'near': '10,10'}])
def test_near_item_only(kwargs):
q = make_query([(1, TokenType.NEAR_ITEM, [(2, 'foo')])])
q = make_query([(1, qmod.TOKEN_NEAR_ITEM, [(2, 'foo')])])
builder = SearchBuilder(q, SearchDetails.from_kwargs(kwargs))
searches = list(builder.build(TokenAssignment(near_item=TokenRange(0, 1))))
@@ -166,7 +167,7 @@ def test_near_item_only(kwargs):
@pytest.mark.parametrize('kwargs', [{'viewbox': '0,0,1,1'},
{}])
def test_near_item_skipped(kwargs):
q = make_query([(1, TokenType.NEAR_ITEM, [(2, 'foo')])])
q = make_query([(1, qmod.TOKEN_NEAR_ITEM, [(2, 'foo')])])
builder = SearchBuilder(q, SearchDetails.from_kwargs(kwargs))
searches = list(builder.build(TokenAssignment(near_item=TokenRange(0, 1))))
@@ -175,8 +176,8 @@ def test_near_item_skipped(kwargs):
def test_name_only_search():
q = make_query([(1, TokenType.PARTIAL, [(1, 'a')]),
(1, TokenType.WORD, [(100, 'a')])])
q = make_query([(1, qmod.TOKEN_PARTIAL, [(1, 'a')]),
(1, qmod.TOKEN_WORD, [(100, 'a')])])
builder = SearchBuilder(q, SearchDetails())
searches = list(builder.build(TokenAssignment(name=TokenRange(0, 1))))
@@ -194,9 +195,9 @@ def test_name_only_search():
def test_name_with_qualifier():
q = make_query([(1, TokenType.PARTIAL, [(1, 'a')]),
(1, TokenType.WORD, [(100, 'a')])],
[(2, TokenType.QUALIFIER, [(55, 'hotel')])])
q = make_query([(1, qmod.TOKEN_PARTIAL, [(1, 'a')]),
(1, qmod.TOKEN_WORD, [(100, 'a')])],
[(2, qmod.TOKEN_QUALIFIER, [(55, 'hotel')])])
builder = SearchBuilder(q, SearchDetails())
searches = list(builder.build(TokenAssignment(name=TokenRange(0, 1),
@@ -215,9 +216,9 @@ def test_name_with_qualifier():
def test_name_with_housenumber_search():
q = make_query([(1, TokenType.PARTIAL, [(1, 'a')]),
(1, TokenType.WORD, [(100, 'a')])],
[(2, TokenType.HOUSENUMBER, [(66, '66')])])
q = make_query([(1, qmod.TOKEN_PARTIAL, [(1, 'a')]),
(1, qmod.TOKEN_WORD, [(100, 'a')])],
[(2, qmod.TOKEN_HOUSENUMBER, [(66, '66')])])
builder = SearchBuilder(q, SearchDetails())
searches = list(builder.build(TokenAssignment(name=TokenRange(0, 1),
@@ -235,12 +236,12 @@ def test_name_with_housenumber_search():
def test_name_and_address():
q = make_query([(1, TokenType.PARTIAL, [(1, 'a')]),
(1, TokenType.WORD, [(100, 'a')])],
[(2, TokenType.PARTIAL, [(2, 'b')]),
(2, TokenType.WORD, [(101, 'b')])],
[(3, TokenType.PARTIAL, [(3, 'c')]),
(3, TokenType.WORD, [(102, 'c')])]
q = make_query([(1, qmod.TOKEN_PARTIAL, [(1, 'a')]),
(1, qmod.TOKEN_WORD, [(100, 'a')])],
[(2, qmod.TOKEN_PARTIAL, [(2, 'b')]),
(2, qmod.TOKEN_WORD, [(101, 'b')])],
[(3, qmod.TOKEN_PARTIAL, [(3, 'c')]),
(3, qmod.TOKEN_WORD, [(102, 'c')])]
)
builder = SearchBuilder(q, SearchDetails())
@@ -260,13 +261,13 @@ def test_name_and_address():
def test_name_and_complex_address():
q = make_query([(1, TokenType.PARTIAL, [(1, 'a')]),
(1, TokenType.WORD, [(100, 'a')])],
[(2, TokenType.PARTIAL, [(2, 'b')]),
(3, TokenType.WORD, [(101, 'bc')])],
[(3, TokenType.PARTIAL, [(3, 'c')])],
[(4, TokenType.PARTIAL, [(4, 'd')]),
(4, TokenType.WORD, [(103, 'd')])]
q = make_query([(1, qmod.TOKEN_PARTIAL, [(1, 'a')]),
(1, qmod.TOKEN_WORD, [(100, 'a')])],
[(2, qmod.TOKEN_PARTIAL, [(2, 'b')]),
(3, qmod.TOKEN_WORD, [(101, 'bc')])],
[(3, qmod.TOKEN_PARTIAL, [(3, 'c')])],
[(4, qmod.TOKEN_PARTIAL, [(4, 'd')]),
(4, qmod.TOKEN_WORD, [(103, 'd')])]
)
builder = SearchBuilder(q, SearchDetails())
@@ -286,9 +287,9 @@ def test_name_and_complex_address():
def test_name_only_near_search():
q = make_query([(1, TokenType.NEAR_ITEM, [(88, 'g')])],
[(2, TokenType.PARTIAL, [(1, 'a')]),
(2, TokenType.WORD, [(100, 'a')])])
q = make_query([(1, qmod.TOKEN_NEAR_ITEM, [(88, 'g')])],
[(2, qmod.TOKEN_PARTIAL, [(1, 'a')]),
(2, qmod.TOKEN_WORD, [(100, 'a')])])
builder = SearchBuilder(q, SearchDetails())
searches = list(builder.build(TokenAssignment(name=TokenRange(1, 2),
@@ -302,8 +303,8 @@ def test_name_only_near_search():
def test_name_only_search_with_category():
q = make_query([(1, TokenType.PARTIAL, [(1, 'a')]),
(1, TokenType.WORD, [(100, 'a')])])
q = make_query([(1, qmod.TOKEN_PARTIAL, [(1, 'a')]),
(1, qmod.TOKEN_WORD, [(100, 'a')])])
builder = SearchBuilder(q, SearchDetails.from_kwargs({'categories': [('foo', 'bar')]}))
searches = list(builder.build(TokenAssignment(name=TokenRange(0, 1))))
@@ -316,9 +317,9 @@ def test_name_only_search_with_category():
def test_name_with_near_item_search_with_category_mismatch():
q = make_query([(1, TokenType.NEAR_ITEM, [(88, 'g')])],
[(2, TokenType.PARTIAL, [(1, 'a')]),
(2, TokenType.WORD, [(100, 'a')])])
q = make_query([(1, qmod.TOKEN_NEAR_ITEM, [(88, 'g')])],
[(2, qmod.TOKEN_PARTIAL, [(1, 'a')]),
(2, qmod.TOKEN_WORD, [(100, 'a')])])
builder = SearchBuilder(q, SearchDetails.from_kwargs({'categories': [('foo', 'bar')]}))
searches = list(builder.build(TokenAssignment(name=TokenRange(1, 2),
@@ -328,9 +329,9 @@ def test_name_with_near_item_search_with_category_mismatch():
def test_name_with_near_item_search_with_category_match():
q = make_query([(1, TokenType.NEAR_ITEM, [(88, 'g')])],
[(2, TokenType.PARTIAL, [(1, 'a')]),
(2, TokenType.WORD, [(100, 'a')])])
q = make_query([(1, qmod.TOKEN_NEAR_ITEM, [(88, 'g')])],
[(2, qmod.TOKEN_PARTIAL, [(1, 'a')]),
(2, qmod.TOKEN_WORD, [(100, 'a')])])
builder = SearchBuilder(q, SearchDetails.from_kwargs({'categories': [('foo', 'bar'),
('this', 'that')]}))
@@ -345,9 +346,9 @@ def test_name_with_near_item_search_with_category_match():
def test_name_with_qualifier_search_with_category_mismatch():
q = make_query([(1, TokenType.QUALIFIER, [(88, 'g')])],
[(2, TokenType.PARTIAL, [(1, 'a')]),
(2, TokenType.WORD, [(100, 'a')])])
q = make_query([(1, qmod.TOKEN_QUALIFIER, [(88, 'g')])],
[(2, qmod.TOKEN_PARTIAL, [(1, 'a')]),
(2, qmod.TOKEN_WORD, [(100, 'a')])])
builder = SearchBuilder(q, SearchDetails.from_kwargs({'categories': [('foo', 'bar')]}))
searches = list(builder.build(TokenAssignment(name=TokenRange(1, 2),
@@ -357,9 +358,9 @@ def test_name_with_qualifier_search_with_category_mismatch():
def test_name_with_qualifier_search_with_category_match():
q = make_query([(1, TokenType.QUALIFIER, [(88, 'g')])],
[(2, TokenType.PARTIAL, [(1, 'a')]),
(2, TokenType.WORD, [(100, 'a')])])
q = make_query([(1, qmod.TOKEN_QUALIFIER, [(88, 'g')])],
[(2, qmod.TOKEN_PARTIAL, [(1, 'a')]),
(2, qmod.TOKEN_WORD, [(100, 'a')])])
builder = SearchBuilder(q, SearchDetails.from_kwargs({'categories': [('foo', 'bar'),
('this', 'that')]}))
@@ -374,8 +375,8 @@ def test_name_with_qualifier_search_with_category_match():
def test_name_only_search_with_countries():
q = make_query([(1, TokenType.PARTIAL, [(1, 'a')]),
(1, TokenType.WORD, [(100, 'a')])])
q = make_query([(1, qmod.TOKEN_PARTIAL, [(1, 'a')]),
(1, qmod.TOKEN_WORD, [(100, 'a')])])
builder = SearchBuilder(q, SearchDetails.from_kwargs({'countries': 'de,en'}))
searches = list(builder.build(TokenAssignment(name=TokenRange(0, 1))))
@@ -391,19 +392,19 @@ def test_name_only_search_with_countries():
def make_counted_searches(name_part, name_full, address_part, address_full,
num_address_parts=1):
q = QueryStruct([Phrase(PhraseType.NONE, '')])
q = QueryStruct([Phrase(qmod.PHRASE_ANY, '')])
for i in range(1 + num_address_parts):
q.add_node(BreakType.WORD, PhraseType.NONE)
q.add_node(BreakType.END, PhraseType.NONE)
q.add_node(qmod.BREAK_WORD, qmod.PHRASE_ANY)
q.add_node(qmod.BREAK_END, qmod.PHRASE_ANY)
q.add_token(TokenRange(0, 1), TokenType.PARTIAL,
q.add_token(TokenRange(0, 1), qmod.TOKEN_PARTIAL,
MyToken(0.5, 1, name_part, 1, 'name_part'))
q.add_token(TokenRange(0, 1), TokenType.WORD,
q.add_token(TokenRange(0, 1), qmod.TOKEN_WORD,
MyToken(0, 101, name_full, 1, 'name_full'))
for i in range(num_address_parts):
q.add_token(TokenRange(i + 1, i + 2), TokenType.PARTIAL,
q.add_token(TokenRange(i + 1, i + 2), qmod.TOKEN_PARTIAL,
MyToken(0.5, 2, address_part, 1, 'address_part'))
q.add_token(TokenRange(i + 1, i + 2), TokenType.WORD,
q.add_token(TokenRange(i + 1, i + 2), qmod.TOKEN_WORD,
MyToken(0, 102, address_full, 1, 'address_full'))
builder = SearchBuilder(q, SearchDetails())

View File

@@ -11,7 +11,8 @@ import pytest
import pytest_asyncio
from nominatim_api import NominatimAPIAsync
from nominatim_api.search.query import Phrase, PhraseType, TokenType, BreakType
from nominatim_api.search.query import Phrase
import nominatim_api.search.query as qmod
import nominatim_api.search.icu_tokenizer as tok
from nominatim_api.logging import set_log_output, get_and_disable
@@ -25,7 +26,7 @@ async def add_word(conn, word_id, word_token, wtype, word, info = None):
def make_phrase(query):
return [Phrase(PhraseType.NONE, s) for s in query.split(',')]
return [Phrase(qmod.PHRASE_ANY, s) for s in query.split(',')]
@pytest_asyncio.fixture
async def conn(table_factory):
@@ -62,7 +63,7 @@ async def test_single_phrase_with_unknown_terms(conn):
query = await ana.analyze_query(make_phrase('foo BAR'))
assert len(query.source) == 1
assert query.source[0].ptype == PhraseType.NONE
assert query.source[0].ptype == qmod.PHRASE_ANY
assert query.source[0].text == 'foo bar'
assert query.num_token_slots() == 2
@@ -96,12 +97,12 @@ async def test_splitting_in_transliteration(conn):
assert query.num_token_slots() == 2
assert query.nodes[0].starting
assert query.nodes[1].starting
assert query.nodes[1].btype == BreakType.TOKEN
assert query.nodes[1].btype == qmod.BREAK_TOKEN
@pytest.mark.asyncio
@pytest.mark.parametrize('term,order', [('23456', ['POSTCODE', 'HOUSENUMBER', 'WORD', 'PARTIAL']),
('3', ['HOUSENUMBER', 'POSTCODE', 'WORD', 'PARTIAL'])
@pytest.mark.parametrize('term,order', [('23456', ['P', 'H', 'W', 'w']),
('3', ['H', 'P', 'W', 'w'])
])
async def test_penalty_postcodes_and_housenumbers(conn, term, order):
ana = await tok.create_query_analyzer(conn)
@@ -115,7 +116,7 @@ async def test_penalty_postcodes_and_housenumbers(conn, term, order):
assert query.num_token_slots() == 1
torder = [(tl.tokens[0].penalty, tl.ttype.name) for tl in query.nodes[0].starting]
torder = [(tl.tokens[0].penalty, tl.ttype) for tl in query.nodes[0].starting]
torder.sort()
assert [t[1] for t in torder] == order
@@ -131,7 +132,7 @@ async def test_category_words_only_at_beginning(conn):
assert query.num_token_slots() == 3
assert len(query.nodes[0].starting) == 1
assert query.nodes[0].starting[0].ttype == TokenType.NEAR_ITEM
assert query.nodes[0].starting[0].ttype == qmod.TOKEN_NEAR_ITEM
assert not query.nodes[2].starting
@@ -145,7 +146,7 @@ async def test_freestanding_qualifier_words_become_category(conn):
assert query.num_token_slots() == 1
assert len(query.nodes[0].starting) == 1
assert query.nodes[0].starting[0].ttype == TokenType.NEAR_ITEM
assert query.nodes[0].starting[0].ttype == qmod.TOKEN_NEAR_ITEM
@pytest.mark.asyncio
@@ -158,9 +159,9 @@ async def test_qualifier_words(conn):
query = await ana.analyze_query(make_phrase('foo BAR foo BAR foo'))
assert query.num_token_slots() == 5
assert set(t.ttype for t in query.nodes[0].starting) == {TokenType.QUALIFIER}
assert set(t.ttype for t in query.nodes[2].starting) == {TokenType.QUALIFIER}
assert set(t.ttype for t in query.nodes[4].starting) == {TokenType.QUALIFIER}
assert set(t.ttype for t in query.nodes[0].starting) == {qmod.TOKEN_QUALIFIER}
assert set(t.ttype for t in query.nodes[2].starting) == {qmod.TOKEN_QUALIFIER}
assert set(t.ttype for t in query.nodes[4].starting) == {qmod.TOKEN_QUALIFIER}
@pytest.mark.asyncio
@@ -172,10 +173,10 @@ async def test_add_unknown_housenumbers(conn):
query = await ana.analyze_query(make_phrase('466 23 99834 34a'))
assert query.num_token_slots() == 4
assert query.nodes[0].starting[0].ttype == TokenType.HOUSENUMBER
assert query.nodes[0].starting[0].ttype == qmod.TOKEN_HOUSENUMBER
assert len(query.nodes[0].starting[0].tokens) == 1
assert query.nodes[0].starting[0].tokens[0].token == 0
assert query.nodes[1].starting[0].ttype == TokenType.HOUSENUMBER
assert query.nodes[1].starting[0].ttype == qmod.TOKEN_HOUSENUMBER
assert len(query.nodes[1].starting[0].tokens) == 1
assert query.nodes[1].starting[0].tokens[0].token == 1
assert not query.nodes[2].starting

View File

@@ -9,7 +9,8 @@ Test for creation of token assignments from tokenized queries.
"""
import pytest
from nominatim_api.search.query import QueryStruct, Phrase, PhraseType, BreakType, TokenType, TokenRange, Token
from nominatim_api.search.query import QueryStruct, Phrase, TokenRange, Token
import nominatim_api.search.query as qmod
from nominatim_api.search.token_assignment import yield_token_assignments, TokenAssignment, PENALTY_TOKENCHANGE
class MyToken(Token):
@@ -24,7 +25,7 @@ def make_query(*args):
for btype, ptype, _ in args[1:]:
q.add_node(btype, ptype)
q.add_node(BreakType.END, PhraseType.NONE)
q.add_node(qmod.BREAK_END, qmod.PHRASE_ANY)
for start, t in enumerate(args):
for end, ttype in t[2]:
@@ -43,52 +44,52 @@ def check_assignments(actual, *expected):
def test_query_with_missing_tokens():
q = QueryStruct([Phrase(PhraseType.NONE, '')])
q.add_node(BreakType.END, PhraseType.NONE)
q = QueryStruct([Phrase(qmod.PHRASE_ANY, '')])
q.add_node(qmod.BREAK_END, qmod.PHRASE_ANY)
assert list(yield_token_assignments(q)) == []
def test_one_word_query():
q = make_query((BreakType.START, PhraseType.NONE,
[(1, TokenType.PARTIAL),
(1, TokenType.WORD),
(1, TokenType.HOUSENUMBER)]))
q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY,
[(1, qmod.TOKEN_PARTIAL),
(1, qmod.TOKEN_WORD),
(1, qmod.TOKEN_HOUSENUMBER)]))
res = list(yield_token_assignments(q))
assert res == [TokenAssignment(name=TokenRange(0, 1))]
def test_single_postcode():
q = make_query((BreakType.START, PhraseType.NONE,
[(1, TokenType.POSTCODE)]))
q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY,
[(1, qmod.TOKEN_POSTCODE)]))
res = list(yield_token_assignments(q))
assert res == [TokenAssignment(postcode=TokenRange(0, 1))]
def test_single_country_name():
q = make_query((BreakType.START, PhraseType.NONE,
[(1, TokenType.COUNTRY)]))
q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY,
[(1, qmod.TOKEN_COUNTRY)]))
res = list(yield_token_assignments(q))
assert res == [TokenAssignment(country=TokenRange(0, 1))]
def test_single_word_poi_search():
q = make_query((BreakType.START, PhraseType.NONE,
[(1, TokenType.NEAR_ITEM),
(1, TokenType.QUALIFIER)]))
q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY,
[(1, qmod.TOKEN_NEAR_ITEM),
(1, qmod.TOKEN_QUALIFIER)]))
res = list(yield_token_assignments(q))
assert res == [TokenAssignment(near_item=TokenRange(0, 1))]
@pytest.mark.parametrize('btype', [BreakType.WORD, BreakType.PART, BreakType.TOKEN])
@pytest.mark.parametrize('btype', [qmod.BREAK_WORD, qmod.BREAK_PART, qmod.BREAK_TOKEN])
def test_multiple_simple_words(btype):
q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.PARTIAL)]),
(btype, PhraseType.NONE, [(2, TokenType.PARTIAL)]),
(btype, PhraseType.NONE, [(3, TokenType.PARTIAL)]))
q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_PARTIAL)]),
(btype, qmod.PHRASE_ANY, [(2, qmod.TOKEN_PARTIAL)]),
(btype, qmod.PHRASE_ANY, [(3, qmod.TOKEN_PARTIAL)]))
penalty = PENALTY_TOKENCHANGE[btype]
@@ -106,8 +107,8 @@ def test_multiple_simple_words(btype):
def test_multiple_words_respect_phrase_break():
q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.PARTIAL)]),
(BreakType.PHRASE, PhraseType.NONE, [(2, TokenType.PARTIAL)]))
q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_PARTIAL)]),
(qmod.BREAK_PHRASE, qmod.PHRASE_ANY, [(2, qmod.TOKEN_PARTIAL)]))
check_assignments(yield_token_assignments(q),
TokenAssignment(name=TokenRange(0, 1),
@@ -117,8 +118,8 @@ def test_multiple_words_respect_phrase_break():
def test_housenumber_and_street():
q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.HOUSENUMBER)]),
(BreakType.PHRASE, PhraseType.NONE, [(2, TokenType.PARTIAL)]))
q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_HOUSENUMBER)]),
(qmod.BREAK_PHRASE, qmod.PHRASE_ANY, [(2, qmod.TOKEN_PARTIAL)]))
check_assignments(yield_token_assignments(q),
TokenAssignment(name=TokenRange(1, 2),
@@ -128,8 +129,8 @@ def test_housenumber_and_street():
def test_housenumber_and_street_backwards():
q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.PARTIAL)]),
(BreakType.PHRASE, PhraseType.NONE, [(2, TokenType.HOUSENUMBER)]))
q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_PARTIAL)]),
(qmod.BREAK_PHRASE, qmod.PHRASE_ANY, [(2, qmod.TOKEN_HOUSENUMBER)]))
check_assignments(yield_token_assignments(q),
TokenAssignment(name=TokenRange(0, 1),
@@ -139,10 +140,10 @@ def test_housenumber_and_street_backwards():
def test_housenumber_and_postcode():
q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.PARTIAL)]),
(BreakType.WORD, PhraseType.NONE, [(2, TokenType.HOUSENUMBER)]),
(BreakType.WORD, PhraseType.NONE, [(3, TokenType.PARTIAL)]),
(BreakType.WORD, PhraseType.NONE, [(4, TokenType.POSTCODE)]))
q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_PARTIAL)]),
(qmod.BREAK_WORD, qmod.PHRASE_ANY, [(2, qmod.TOKEN_HOUSENUMBER)]),
(qmod.BREAK_WORD, qmod.PHRASE_ANY, [(3, qmod.TOKEN_PARTIAL)]),
(qmod.BREAK_WORD, qmod.PHRASE_ANY, [(4, qmod.TOKEN_POSTCODE)]))
check_assignments(yield_token_assignments(q),
TokenAssignment(penalty=pytest.approx(0.3),
@@ -156,10 +157,10 @@ def test_housenumber_and_postcode():
postcode=TokenRange(3, 4)))
def test_postcode_and_housenumber():
q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.PARTIAL)]),
(BreakType.WORD, PhraseType.NONE, [(2, TokenType.POSTCODE)]),
(BreakType.WORD, PhraseType.NONE, [(3, TokenType.PARTIAL)]),
(BreakType.WORD, PhraseType.NONE, [(4, TokenType.HOUSENUMBER)]))
q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_PARTIAL)]),
(qmod.BREAK_WORD, qmod.PHRASE_ANY, [(2, qmod.TOKEN_POSTCODE)]),
(qmod.BREAK_WORD, qmod.PHRASE_ANY, [(3, qmod.TOKEN_PARTIAL)]),
(qmod.BREAK_WORD, qmod.PHRASE_ANY, [(4, qmod.TOKEN_HOUSENUMBER)]))
check_assignments(yield_token_assignments(q),
TokenAssignment(penalty=pytest.approx(0.3),
@@ -174,38 +175,38 @@ def test_postcode_and_housenumber():
def test_country_housenumber_postcode():
q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.COUNTRY)]),
(BreakType.WORD, PhraseType.NONE, [(2, TokenType.PARTIAL)]),
(BreakType.WORD, PhraseType.NONE, [(3, TokenType.HOUSENUMBER)]),
(BreakType.WORD, PhraseType.NONE, [(4, TokenType.POSTCODE)]))
q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_COUNTRY)]),
(qmod.BREAK_WORD, qmod.PHRASE_ANY, [(2, qmod.TOKEN_PARTIAL)]),
(qmod.BREAK_WORD, qmod.PHRASE_ANY, [(3, qmod.TOKEN_HOUSENUMBER)]),
(qmod.BREAK_WORD, qmod.PHRASE_ANY, [(4, qmod.TOKEN_POSTCODE)]))
check_assignments(yield_token_assignments(q))
@pytest.mark.parametrize('ttype', [TokenType.POSTCODE, TokenType.COUNTRY,
TokenType.NEAR_ITEM, TokenType.QUALIFIER])
@pytest.mark.parametrize('ttype', [qmod.TOKEN_POSTCODE, qmod.TOKEN_COUNTRY,
qmod.TOKEN_NEAR_ITEM, qmod.TOKEN_QUALIFIER])
def test_housenumber_with_only_special_terms(ttype):
q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.HOUSENUMBER)]),
(BreakType.WORD, PhraseType.NONE, [(2, ttype)]))
q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_HOUSENUMBER)]),
(qmod.BREAK_WORD, qmod.PHRASE_ANY, [(2, ttype)]))
check_assignments(yield_token_assignments(q))
@pytest.mark.parametrize('ttype', [TokenType.POSTCODE, TokenType.HOUSENUMBER, TokenType.COUNTRY])
@pytest.mark.parametrize('ttype', [qmod.TOKEN_POSTCODE, qmod.TOKEN_HOUSENUMBER, qmod.TOKEN_COUNTRY])
def test_multiple_special_tokens(ttype):
q = make_query((BreakType.START, PhraseType.NONE, [(1, ttype)]),
(BreakType.PHRASE, PhraseType.NONE, [(2, TokenType.PARTIAL)]),
(BreakType.PHRASE, PhraseType.NONE, [(3, ttype)]))
q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, ttype)]),
(qmod.BREAK_PHRASE, qmod.PHRASE_ANY, [(2, qmod.TOKEN_PARTIAL)]),
(qmod.BREAK_PHRASE, qmod.PHRASE_ANY, [(3, ttype)]))
check_assignments(yield_token_assignments(q))
def test_housenumber_many_phrases():
q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.PARTIAL)]),
(BreakType.PHRASE, PhraseType.NONE, [(2, TokenType.PARTIAL)]),
(BreakType.PHRASE, PhraseType.NONE, [(3, TokenType.PARTIAL)]),
(BreakType.PHRASE, PhraseType.NONE, [(4, TokenType.HOUSENUMBER)]),
(BreakType.WORD, PhraseType.NONE, [(5, TokenType.PARTIAL)]))
q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_PARTIAL)]),
(qmod.BREAK_PHRASE, qmod.PHRASE_ANY, [(2, qmod.TOKEN_PARTIAL)]),
(qmod.BREAK_PHRASE, qmod.PHRASE_ANY, [(3, qmod.TOKEN_PARTIAL)]),
(qmod.BREAK_PHRASE, qmod.PHRASE_ANY, [(4, qmod.TOKEN_HOUSENUMBER)]),
(qmod.BREAK_WORD, qmod.PHRASE_ANY, [(5, qmod.TOKEN_PARTIAL)]))
check_assignments(yield_token_assignments(q),
TokenAssignment(penalty=0.1,
@@ -220,8 +221,8 @@ def test_housenumber_many_phrases():
def test_country_at_beginning():
q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.COUNTRY)]),
(BreakType.WORD, PhraseType.NONE, [(2, TokenType.PARTIAL)]))
q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_COUNTRY)]),
(qmod.BREAK_WORD, qmod.PHRASE_ANY, [(2, qmod.TOKEN_PARTIAL)]))
check_assignments(yield_token_assignments(q),
TokenAssignment(penalty=0.1, name=TokenRange(1, 2),
@@ -229,8 +230,8 @@ def test_country_at_beginning():
def test_country_at_end():
q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.PARTIAL)]),
(BreakType.WORD, PhraseType.NONE, [(2, TokenType.COUNTRY)]))
q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_PARTIAL)]),
(qmod.BREAK_WORD, qmod.PHRASE_ANY, [(2, qmod.TOKEN_COUNTRY)]))
check_assignments(yield_token_assignments(q),
TokenAssignment(penalty=0.1, name=TokenRange(0, 1),
@@ -238,16 +239,16 @@ def test_country_at_end():
def test_country_in_middle():
q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.PARTIAL)]),
(BreakType.WORD, PhraseType.NONE, [(2, TokenType.COUNTRY)]),
(BreakType.WORD, PhraseType.NONE, [(3, TokenType.PARTIAL)]))
q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_PARTIAL)]),
(qmod.BREAK_WORD, qmod.PHRASE_ANY, [(2, qmod.TOKEN_COUNTRY)]),
(qmod.BREAK_WORD, qmod.PHRASE_ANY, [(3, qmod.TOKEN_PARTIAL)]))
check_assignments(yield_token_assignments(q))
def test_postcode_with_designation():
q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.POSTCODE)]),
(BreakType.PHRASE, PhraseType.NONE, [(2, TokenType.PARTIAL)]))
q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_POSTCODE)]),
(qmod.BREAK_PHRASE, qmod.PHRASE_ANY, [(2, qmod.TOKEN_PARTIAL)]))
check_assignments(yield_token_assignments(q),
TokenAssignment(penalty=0.1, name=TokenRange(1, 2),
@@ -257,8 +258,8 @@ def test_postcode_with_designation():
def test_postcode_with_designation_backwards():
q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.PARTIAL)]),
(BreakType.PHRASE, PhraseType.NONE, [(2, TokenType.POSTCODE)]))
q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_PARTIAL)]),
(qmod.BREAK_PHRASE, qmod.PHRASE_ANY, [(2, qmod.TOKEN_POSTCODE)]))
check_assignments(yield_token_assignments(q),
TokenAssignment(name=TokenRange(0, 1),
@@ -268,8 +269,8 @@ def test_postcode_with_designation_backwards():
def test_near_item_at_beginning():
q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.NEAR_ITEM)]),
(BreakType.WORD, PhraseType.NONE, [(2, TokenType.PARTIAL)]))
q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_NEAR_ITEM)]),
(qmod.BREAK_WORD, qmod.PHRASE_ANY, [(2, qmod.TOKEN_PARTIAL)]))
check_assignments(yield_token_assignments(q),
TokenAssignment(penalty=0.1, name=TokenRange(1, 2),
@@ -277,8 +278,8 @@ def test_near_item_at_beginning():
def test_near_item_at_end():
q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.PARTIAL)]),
(BreakType.WORD, PhraseType.NONE, [(2, TokenType.NEAR_ITEM)]))
q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_PARTIAL)]),
(qmod.BREAK_WORD, qmod.PHRASE_ANY, [(2, qmod.TOKEN_NEAR_ITEM)]))
check_assignments(yield_token_assignments(q),
TokenAssignment(penalty=0.1, name=TokenRange(0, 1),
@@ -286,17 +287,17 @@ def test_near_item_at_end():
def test_near_item_in_middle():
q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.PARTIAL)]),
(BreakType.WORD, PhraseType.NONE, [(2, TokenType.NEAR_ITEM)]),
(BreakType.WORD, PhraseType.NONE, [(3, TokenType.PARTIAL)]))
q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_PARTIAL)]),
(qmod.BREAK_WORD, qmod.PHRASE_ANY, [(2, qmod.TOKEN_NEAR_ITEM)]),
(qmod.BREAK_WORD, qmod.PHRASE_ANY, [(3, qmod.TOKEN_PARTIAL)]))
check_assignments(yield_token_assignments(q))
def test_qualifier_at_beginning():
q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.QUALIFIER)]),
(BreakType.WORD, PhraseType.NONE, [(2, TokenType.PARTIAL)]),
(BreakType.WORD, PhraseType.NONE, [(3, TokenType.PARTIAL)]))
q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_QUALIFIER)]),
(qmod.BREAK_WORD, qmod.PHRASE_ANY, [(2, qmod.TOKEN_PARTIAL)]),
(qmod.BREAK_WORD, qmod.PHRASE_ANY, [(3, qmod.TOKEN_PARTIAL)]))
check_assignments(yield_token_assignments(q),
@@ -308,11 +309,11 @@ def test_qualifier_at_beginning():
def test_qualifier_after_name():
q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.PARTIAL)]),
(BreakType.WORD, PhraseType.NONE, [(2, TokenType.PARTIAL)]),
(BreakType.WORD, PhraseType.NONE, [(3, TokenType.QUALIFIER)]),
(BreakType.WORD, PhraseType.NONE, [(4, TokenType.PARTIAL)]),
(BreakType.WORD, PhraseType.NONE, [(5, TokenType.PARTIAL)]))
q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_PARTIAL)]),
(qmod.BREAK_WORD, qmod.PHRASE_ANY, [(2, qmod.TOKEN_PARTIAL)]),
(qmod.BREAK_WORD, qmod.PHRASE_ANY, [(3, qmod.TOKEN_QUALIFIER)]),
(qmod.BREAK_WORD, qmod.PHRASE_ANY, [(4, qmod.TOKEN_PARTIAL)]),
(qmod.BREAK_WORD, qmod.PHRASE_ANY, [(5, qmod.TOKEN_PARTIAL)]))
check_assignments(yield_token_assignments(q),
@@ -325,27 +326,27 @@ def test_qualifier_after_name():
def test_qualifier_before_housenumber():
q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.QUALIFIER)]),
(BreakType.WORD, PhraseType.NONE, [(2, TokenType.HOUSENUMBER)]),
(BreakType.WORD, PhraseType.NONE, [(3, TokenType.PARTIAL)]))
q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_QUALIFIER)]),
(qmod.BREAK_WORD, qmod.PHRASE_ANY, [(2, qmod.TOKEN_HOUSENUMBER)]),
(qmod.BREAK_WORD, qmod.PHRASE_ANY, [(3, qmod.TOKEN_PARTIAL)]))
check_assignments(yield_token_assignments(q))
def test_qualifier_after_housenumber():
q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.HOUSENUMBER)]),
(BreakType.WORD, PhraseType.NONE, [(2, TokenType.QUALIFIER)]),
(BreakType.WORD, PhraseType.NONE, [(3, TokenType.PARTIAL)]))
q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_HOUSENUMBER)]),
(qmod.BREAK_WORD, qmod.PHRASE_ANY, [(2, qmod.TOKEN_QUALIFIER)]),
(qmod.BREAK_WORD, qmod.PHRASE_ANY, [(3, qmod.TOKEN_PARTIAL)]))
check_assignments(yield_token_assignments(q))
def test_qualifier_in_middle_of_phrase():
q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.PARTIAL)]),
(BreakType.PHRASE, PhraseType.NONE, [(2, TokenType.PARTIAL)]),
(BreakType.WORD, PhraseType.NONE, [(3, TokenType.QUALIFIER)]),
(BreakType.WORD, PhraseType.NONE, [(4, TokenType.PARTIAL)]),
(BreakType.PHRASE, PhraseType.NONE, [(5, TokenType.PARTIAL)]))
q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_PARTIAL)]),
(qmod.BREAK_PHRASE, qmod.PHRASE_ANY, [(2, qmod.TOKEN_PARTIAL)]),
(qmod.BREAK_WORD, qmod.PHRASE_ANY, [(3, qmod.TOKEN_QUALIFIER)]),
(qmod.BREAK_WORD, qmod.PHRASE_ANY, [(4, qmod.TOKEN_PARTIAL)]),
(qmod.BREAK_PHRASE, qmod.PHRASE_ANY, [(5, qmod.TOKEN_PARTIAL)]))
check_assignments(yield_token_assignments(q))