forked from hans/Nominatim
419 lines
15 KiB
Python
419 lines
15 KiB
Python
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
#
|
|
# This file is part of Nominatim. (https://nominatim.org)
|
|
#
|
|
# Copyright (C) 2025 by the Nominatim developer community.
|
|
# For a full list of authors see the git log.
|
|
"""
|
|
Datastructures for a tokenized query.
|
|
"""
|
|
from typing import Dict, List, Tuple, Optional, Iterator
|
|
from abc import ABC, abstractmethod
|
|
from collections import defaultdict
|
|
import dataclasses
|
|
|
|
# Precomputed denominator for the computation of the linear regression slope
|
|
# used to determine the query direction.
|
|
# The x value for the regression computation will be the position of the
|
|
# token in the query. Thus we know the x values will be [0, query length).
|
|
# As the denominator only depends on the x values, we can pre-compute here
|
|
# the denominatior to use for a given query length.
|
|
# Note that query length of two or less is special cased and will not use
|
|
# the values from this array. Thus it is not a problem that they are 0.
|
|
LINFAC = [i * (sum(si * si for si in range(i)) - (i - 1) * i * (i - 1) / 4)
|
|
for i in range(50)]
|
|
|
|
|
|
BreakType = str
|
|
""" Type of break between tokens.
|
|
"""
|
|
BREAK_START = '<'
|
|
""" Begin of the query. """
|
|
BREAK_END = '>'
|
|
""" End of the query. """
|
|
BREAK_PHRASE = ','
|
|
""" Hard break between two phrases. Address parts cannot cross hard
|
|
phrase boundaries."""
|
|
BREAK_SOFT_PHRASE = ':'
|
|
""" Likely break between two phrases. Address parts should not cross soft
|
|
phrase boundaries. Soft breaks can be inserted by a preprocessor
|
|
that is analysing the input string.
|
|
"""
|
|
BREAK_WORD = ' '
|
|
""" Break between words. """
|
|
BREAK_PART = '-'
|
|
""" Break inside a word, for example a hyphen or apostrophe. """
|
|
BREAK_TOKEN = '`'
|
|
""" Break created as a result of tokenization.
|
|
This may happen in languages without spaces between words.
|
|
"""
|
|
|
|
|
|
TokenType = str
|
|
""" Type of token.
|
|
"""
|
|
TOKEN_WORD = 'W'
|
|
""" Full name of a place. """
|
|
TOKEN_PARTIAL = 'w'
|
|
""" Word term without breaks, does not necessarily represent a full name. """
|
|
TOKEN_HOUSENUMBER = 'H'
|
|
""" Housenumber term. """
|
|
TOKEN_POSTCODE = 'P'
|
|
""" Postal code term. """
|
|
TOKEN_COUNTRY = 'C'
|
|
""" Country name or reference. """
|
|
TOKEN_QUALIFIER = 'Q'
|
|
""" Special term used together with name (e.g. _Hotel_ Bellevue). """
|
|
TOKEN_NEAR_ITEM = 'N'
|
|
""" Special term used as searchable object(e.g. supermarket in ...). """
|
|
|
|
|
|
PhraseType = int
|
|
""" Designation of a phrase.
|
|
"""
|
|
PHRASE_ANY = 0
|
|
""" No specific designation (i.e. source is free-form query). """
|
|
PHRASE_AMENITY = 1
|
|
""" Contains name or type of a POI. """
|
|
PHRASE_STREET = 2
|
|
""" Contains a street name optionally with a housenumber. """
|
|
PHRASE_CITY = 3
|
|
""" Contains the postal city. """
|
|
PHRASE_COUNTY = 4
|
|
""" Contains the equivalent of a county. """
|
|
PHRASE_STATE = 5
|
|
""" Contains a state or province. """
|
|
PHRASE_POSTCODE = 6
|
|
""" Contains a postal code. """
|
|
PHRASE_COUNTRY = 7
|
|
""" Contains the country name or code. """
|
|
|
|
|
|
def _phrase_compatible_with(ptype: PhraseType, ttype: TokenType,
|
|
is_full_phrase: bool) -> bool:
|
|
""" Check if the given token type can be used with the phrase type.
|
|
"""
|
|
if ptype == PHRASE_ANY:
|
|
return not is_full_phrase or ttype != TOKEN_QUALIFIER
|
|
if ptype == PHRASE_AMENITY:
|
|
return ttype in (TOKEN_WORD, TOKEN_PARTIAL)\
|
|
or (is_full_phrase and ttype == TOKEN_NEAR_ITEM)\
|
|
or (not is_full_phrase and ttype == TOKEN_QUALIFIER)
|
|
if ptype == PHRASE_STREET:
|
|
return ttype in (TOKEN_WORD, TOKEN_PARTIAL, TOKEN_HOUSENUMBER)
|
|
if ptype == PHRASE_POSTCODE:
|
|
return ttype == TOKEN_POSTCODE
|
|
if ptype == PHRASE_COUNTRY:
|
|
return ttype == TOKEN_COUNTRY
|
|
|
|
return ttype in (TOKEN_WORD, TOKEN_PARTIAL)
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class Token(ABC):
|
|
""" Base type for tokens.
|
|
Specific query analyzers must implement the concrete token class.
|
|
"""
|
|
|
|
penalty: float
|
|
token: int
|
|
count: int
|
|
addr_count: int
|
|
lookup_word: str
|
|
|
|
@abstractmethod
|
|
def get_category(self) -> Tuple[str, str]:
|
|
""" Return the category restriction for qualifier terms and
|
|
category objects.
|
|
"""
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class TokenRange:
|
|
""" Indexes of query nodes over which a token spans.
|
|
"""
|
|
start: int
|
|
end: int
|
|
|
|
def __lt__(self, other: 'TokenRange') -> bool:
|
|
return self.end <= other.start
|
|
|
|
def __le__(self, other: 'TokenRange') -> bool:
|
|
return NotImplemented
|
|
|
|
def __gt__(self, other: 'TokenRange') -> bool:
|
|
return self.start >= other.end
|
|
|
|
def __ge__(self, other: 'TokenRange') -> bool:
|
|
return NotImplemented
|
|
|
|
def replace_start(self, new_start: int) -> 'TokenRange':
|
|
""" Return a new token range with the new start.
|
|
"""
|
|
return TokenRange(new_start, self.end)
|
|
|
|
def replace_end(self, new_end: int) -> 'TokenRange':
|
|
""" Return a new token range with the new end.
|
|
"""
|
|
return TokenRange(self.start, new_end)
|
|
|
|
def split(self, index: int) -> Tuple['TokenRange', 'TokenRange']:
|
|
""" Split the span into two spans at the given index.
|
|
The index must be within the span.
|
|
"""
|
|
return self.replace_end(index), self.replace_start(index)
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class TokenList:
|
|
""" List of all tokens of a given type going from one breakpoint to another.
|
|
"""
|
|
end: int
|
|
ttype: TokenType
|
|
tokens: List[Token]
|
|
|
|
def add_penalty(self, penalty: float) -> None:
|
|
""" Add the given penalty to all tokens in the list.
|
|
"""
|
|
for token in self.tokens:
|
|
token.penalty += penalty
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class QueryNode:
|
|
""" A node of the query representing a break between terms.
|
|
|
|
The node also contains information on the source term
|
|
ending at the node. The tokens are created from this information.
|
|
"""
|
|
btype: BreakType
|
|
ptype: PhraseType
|
|
|
|
penalty: float
|
|
""" Penalty for having a word break at this position. The penalty
|
|
may be negative, when a word break is more likely than continuing
|
|
the word after the node.
|
|
"""
|
|
term_lookup: str
|
|
""" Transliterated term ending at this node.
|
|
"""
|
|
term_normalized: str
|
|
""" Normalised form of term ending at this node.
|
|
"""
|
|
|
|
starting: List[TokenList] = dataclasses.field(default_factory=list)
|
|
""" List of all full tokens starting at this node.
|
|
"""
|
|
partial: Optional[Token] = None
|
|
""" Base token going to the next node.
|
|
May be None when the query has parts for which no words are known.
|
|
Note that the query may still be parsable when there are other
|
|
types of tokens spanning over the gap.
|
|
"""
|
|
|
|
@property
|
|
def word_break_penalty(self) -> float:
|
|
""" Penalty to apply when a words ends at this node.
|
|
"""
|
|
return max(0, self.penalty)
|
|
|
|
@property
|
|
def word_continuation_penalty(self) -> float:
|
|
""" Penalty to apply when a word continues over this node
|
|
(i.e. is a multi-term word).
|
|
"""
|
|
return max(0, -self.penalty)
|
|
|
|
def name_address_ratio(self) -> float:
|
|
""" Return the propability that the partial token belonging to
|
|
this node forms part of a name (as opposed of part of the address).
|
|
"""
|
|
if self.partial is None:
|
|
return 0.5
|
|
|
|
return self.partial.count / (self.partial.count + self.partial.addr_count)
|
|
|
|
def has_tokens(self, end: int, *ttypes: TokenType) -> bool:
|
|
""" Check if there are tokens of the given types ending at the
|
|
given node.
|
|
"""
|
|
return any(tl.end == end and tl.ttype in ttypes for tl in self.starting)
|
|
|
|
def get_tokens(self, end: int, ttype: TokenType) -> Optional[List[Token]]:
|
|
""" Get the list of tokens of the given type starting at this node
|
|
and ending at the node 'end'. Returns 'None' if no such
|
|
tokens exist.
|
|
"""
|
|
for tlist in self.starting:
|
|
if tlist.end == end and tlist.ttype == ttype:
|
|
return tlist.tokens
|
|
return None
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class Phrase:
|
|
""" A normalized query part. Phrases may be typed which means that
|
|
they then represent a specific part of the address.
|
|
"""
|
|
ptype: PhraseType
|
|
text: str
|
|
|
|
|
|
class QueryStruct:
|
|
""" A tokenized search query together with the normalized source
|
|
from which the tokens have been parsed.
|
|
|
|
The query contains a list of nodes that represent the breaks
|
|
between words. Tokens span between nodes, which don't necessarily
|
|
need to be direct neighbours. Thus the query is represented as a
|
|
directed acyclic graph.
|
|
|
|
A query also has a direction penalty 'dir_penalty'. This describes
|
|
the likelyhood if the query should be read from left-to-right or
|
|
vice versa. A negative 'dir_penalty' should be read as a penalty on
|
|
right-to-left reading, while a positive value represents a penalty
|
|
for left-to-right reading. The default value is 0, which is equivalent
|
|
to having no information about the reading.
|
|
|
|
When created, a query contains a single node: the start of the
|
|
query. Further nodes can be added by appending to 'nodes'.
|
|
"""
|
|
|
|
def __init__(self, source: List[Phrase]) -> None:
|
|
self.source = source
|
|
self.dir_penalty = 0.0
|
|
self.nodes: List[QueryNode] = \
|
|
[QueryNode(BREAK_START, source[0].ptype if source else PHRASE_ANY,
|
|
0.0, '', '')]
|
|
|
|
def num_token_slots(self) -> int:
|
|
""" Return the length of the query in vertice steps.
|
|
"""
|
|
return len(self.nodes) - 1
|
|
|
|
def add_node(self, btype: BreakType, ptype: PhraseType,
|
|
term_lookup: str = '', term_normalized: str = '') -> None:
|
|
""" Append a new break node with the given break type.
|
|
The phrase type denotes the type for any tokens starting
|
|
at the node.
|
|
"""
|
|
self.nodes.append(QueryNode(btype, ptype, 0.0, term_lookup, term_normalized))
|
|
|
|
def add_token(self, trange: TokenRange, ttype: TokenType, token: Token) -> None:
|
|
""" Add a token to the query. 'start' and 'end' are the indexes of the
|
|
nodes from which to which the token spans. The indexes must exist
|
|
and are expected to be in the same phrase.
|
|
'ttype' denotes the type of the token and 'token' the token to
|
|
be inserted.
|
|
|
|
If the token type is not compatible with the phrase it should
|
|
be added to, then the token is silently dropped.
|
|
"""
|
|
snode = self.nodes[trange.start]
|
|
if ttype == TOKEN_PARTIAL:
|
|
assert snode.partial is None
|
|
if _phrase_compatible_with(snode.ptype, TOKEN_PARTIAL, False):
|
|
snode.partial = token
|
|
else:
|
|
full_phrase = snode.btype in (BREAK_START, BREAK_PHRASE)\
|
|
and self.nodes[trange.end].btype in (BREAK_PHRASE, BREAK_END)
|
|
if _phrase_compatible_with(snode.ptype, ttype, full_phrase):
|
|
tlist = snode.get_tokens(trange.end, ttype)
|
|
if tlist is None:
|
|
snode.starting.append(TokenList(trange.end, ttype, [token]))
|
|
else:
|
|
tlist.append(token)
|
|
|
|
def compute_direction_penalty(self) -> None:
|
|
""" Recompute the direction probability from the partial tokens
|
|
of each node.
|
|
"""
|
|
n = len(self.nodes) - 1
|
|
if n <= 1 or n >= 50:
|
|
self.dir_penalty = 0
|
|
elif n == 2:
|
|
self.dir_penalty = (self.nodes[1].name_address_ratio()
|
|
- self.nodes[0].name_address_ratio()) / 3
|
|
else:
|
|
ratios = [n.name_address_ratio() for n in self.nodes[:-1]]
|
|
self.dir_penalty = (n * sum(i * r for i, r in enumerate(ratios))
|
|
- sum(ratios) * n * (n - 1) / 2) / LINFAC[n]
|
|
|
|
def get_tokens(self, trange: TokenRange, ttype: TokenType) -> List[Token]:
|
|
""" Get the list of tokens of a given type, spanning the given
|
|
nodes. The nodes must exist. If no tokens exist, an
|
|
empty list is returned.
|
|
|
|
Cannot be used to get the partial token.
|
|
"""
|
|
assert ttype != TOKEN_PARTIAL
|
|
return self.nodes[trange.start].get_tokens(trange.end, ttype) or []
|
|
|
|
def get_in_word_penalty(self, trange: TokenRange) -> float:
|
|
""" Gets the sum of penalties for all token transitions
|
|
within the given range.
|
|
"""
|
|
return sum(n.word_continuation_penalty
|
|
for n in self.nodes[trange.start + 1:trange.end])
|
|
|
|
def iter_partials(self, trange: TokenRange) -> Iterator[Token]:
|
|
""" Iterate over the partial tokens between the given nodes.
|
|
Missing partials are ignored.
|
|
"""
|
|
return (n.partial for n in self.nodes[trange.start:trange.end] if n.partial is not None)
|
|
|
|
def iter_tokens_by_edge(self) -> Iterator[Tuple[int, int, Dict[TokenType, List[Token]]]]:
|
|
""" Iterator over all tokens except partial ones grouped by edge.
|
|
|
|
Returns the start and end node indexes and a dictionary
|
|
of list of tokens by token type.
|
|
"""
|
|
for i, node in enumerate(self.nodes):
|
|
by_end: Dict[int, Dict[TokenType, List[Token]]] = defaultdict(dict)
|
|
for tlist in node.starting:
|
|
by_end[tlist.end][tlist.ttype] = tlist.tokens
|
|
for end, endlist in by_end.items():
|
|
yield i, end, endlist
|
|
|
|
def find_lookup_word_by_id(self, token: int) -> str:
|
|
""" Find the first token with the given token ID and return
|
|
its lookup word. Returns 'None' if no such token exists.
|
|
The function is very slow and must only be used for
|
|
debugging.
|
|
"""
|
|
for node in self.nodes:
|
|
if node.partial is not None and node.partial.token == token:
|
|
return f"[P]{node.partial.lookup_word}"
|
|
for tlist in node.starting:
|
|
for t in tlist.tokens:
|
|
if t.token == token:
|
|
return f"[{tlist.ttype}]{t.lookup_word}"
|
|
return 'None'
|
|
|
|
def extract_words(self, start: int = 0,
|
|
endpos: Optional[int] = None) -> Dict[str, List[TokenRange]]:
|
|
""" Add all combinations of words that can be formed from the terms
|
|
between the given start and endnode. The terms are joined with
|
|
spaces for each break. Words can never go across a BREAK_PHRASE.
|
|
|
|
The functions returns a dictionary of possible words with their
|
|
position within the query.
|
|
"""
|
|
if endpos is None:
|
|
endpos = len(self.nodes)
|
|
|
|
words: Dict[str, List[TokenRange]] = defaultdict(list)
|
|
|
|
for first, first_node in enumerate(self.nodes[start + 1:endpos], start):
|
|
word = first_node.term_lookup
|
|
words[word].append(TokenRange(first, first + 1))
|
|
if first_node.btype != BREAK_PHRASE:
|
|
max_last = min(first + 20, endpos)
|
|
for last, last_node in enumerate(self.nodes[first + 2:max_last], first + 2):
|
|
word = ' '.join((word, last_node.term_lookup))
|
|
words[word].append(TokenRange(first, last))
|
|
if last_node.btype == BREAK_PHRASE:
|
|
break
|
|
|
|
return words
|