add type annotations for ICU tokenizer

This commit is contained in:
Sarah Hoffmann
2022-07-16 17:33:19 +02:00
parent 18b16e06ca
commit 6c6bbe5747
2 changed files with 118 additions and 83 deletions

View File

@@ -36,7 +36,7 @@ class ICUTokenAnalysis:
for name, arules in analysis_rules.items()} for name, arules in analysis_rules.items()}
def get_analyzer(self, name: str) -> Analyser: def get_analyzer(self, name: Optional[str]) -> Analyser:
""" Return the given named analyzer. If no analyzer with that """ Return the given named analyzer. If no analyzer with that
name exists, return the default analyzer. name exists, return the default analyzer.
""" """

View File

@@ -8,41 +8,47 @@
Tokenizer implementing normalisation as used before Nominatim 4 but using Tokenizer implementing normalisation as used before Nominatim 4 but using
libICU instead of the PostgreSQL module. libICU instead of the PostgreSQL module.
""" """
from typing import Optional, Sequence, List, Tuple, Mapping, Any, cast, Dict, Set, Iterable
import itertools import itertools
import json import json
import logging import logging
from pathlib import Path
from textwrap import dedent from textwrap import dedent
from nominatim.db.connection import connect from nominatim.db.connection import connect, Connection, Cursor
from nominatim.config import Configuration
from nominatim.db.utils import CopyBuffer from nominatim.db.utils import CopyBuffer
from nominatim.db.sql_preprocessor import SQLPreprocessor from nominatim.db.sql_preprocessor import SQLPreprocessor
from nominatim.data.place_info import PlaceInfo from nominatim.data.place_info import PlaceInfo
from nominatim.tokenizer.icu_rule_loader import ICURuleLoader from nominatim.tokenizer.icu_rule_loader import ICURuleLoader
from nominatim.tokenizer.place_sanitizer import PlaceSanitizer
from nominatim.tokenizer.sanitizers.base import PlaceName
from nominatim.tokenizer.icu_token_analysis import ICUTokenAnalysis
from nominatim.tokenizer.base import AbstractAnalyzer, AbstractTokenizer from nominatim.tokenizer.base import AbstractAnalyzer, AbstractTokenizer
DBCFG_TERM_NORMALIZATION = "tokenizer_term_normalization" DBCFG_TERM_NORMALIZATION = "tokenizer_term_normalization"
LOG = logging.getLogger() LOG = logging.getLogger()
def create(dsn, data_dir): def create(dsn: str, data_dir: Path) -> 'ICUTokenizer':
""" Create a new instance of the tokenizer provided by this module. """ Create a new instance of the tokenizer provided by this module.
""" """
return LegacyICUTokenizer(dsn, data_dir) return ICUTokenizer(dsn, data_dir)
class LegacyICUTokenizer(AbstractTokenizer): class ICUTokenizer(AbstractTokenizer):
""" This tokenizer uses libICU to covert names and queries to ASCII. """ This tokenizer uses libICU to covert names and queries to ASCII.
Otherwise it uses the same algorithms and data structures as the Otherwise it uses the same algorithms and data structures as the
normalization routines in Nominatim 3. normalization routines in Nominatim 3.
""" """
def __init__(self, dsn, data_dir): def __init__(self, dsn: str, data_dir: Path) -> None:
self.dsn = dsn self.dsn = dsn
self.data_dir = data_dir self.data_dir = data_dir
self.loader = None self.loader: Optional[ICURuleLoader] = None
def init_new_db(self, config, init_db=True): def init_new_db(self, config: Configuration, init_db: bool = True) -> None:
""" Set up a new tokenizer for the database. """ Set up a new tokenizer for the database.
This copies all necessary data in the project directory to make This copies all necessary data in the project directory to make
@@ -58,7 +64,7 @@ class LegacyICUTokenizer(AbstractTokenizer):
self._init_db_tables(config) self._init_db_tables(config)
def init_from_project(self, config): def init_from_project(self, config: Configuration) -> None:
""" Initialise the tokenizer from the project directory. """ Initialise the tokenizer from the project directory.
""" """
self.loader = ICURuleLoader(config) self.loader = ICURuleLoader(config)
@@ -69,7 +75,7 @@ class LegacyICUTokenizer(AbstractTokenizer):
self._install_php(config.lib_dir.php, overwrite=False) self._install_php(config.lib_dir.php, overwrite=False)
def finalize_import(self, config): def finalize_import(self, config: Configuration) -> None:
""" Do any required postprocessing to make the tokenizer data ready """ Do any required postprocessing to make the tokenizer data ready
for use. for use.
""" """
@@ -78,7 +84,7 @@ class LegacyICUTokenizer(AbstractTokenizer):
sqlp.run_sql_file(conn, 'tokenizer/legacy_tokenizer_indices.sql') sqlp.run_sql_file(conn, 'tokenizer/legacy_tokenizer_indices.sql')
def update_sql_functions(self, config): def update_sql_functions(self, config: Configuration) -> None:
""" Reimport the SQL functions for this tokenizer. """ Reimport the SQL functions for this tokenizer.
""" """
with connect(self.dsn) as conn: with connect(self.dsn) as conn:
@@ -86,14 +92,14 @@ class LegacyICUTokenizer(AbstractTokenizer):
sqlp.run_sql_file(conn, 'tokenizer/icu_tokenizer.sql') sqlp.run_sql_file(conn, 'tokenizer/icu_tokenizer.sql')
def check_database(self, config): def check_database(self, config: Configuration) -> None:
""" Check that the tokenizer is set up correctly. """ Check that the tokenizer is set up correctly.
""" """
# Will throw an error if there is an issue. # Will throw an error if there is an issue.
self.init_from_project(config) self.init_from_project(config)
def update_statistics(self): def update_statistics(self) -> None:
""" Recompute frequencies for all name words. """ Recompute frequencies for all name words.
""" """
with connect(self.dsn) as conn: with connect(self.dsn) as conn:
@@ -113,7 +119,7 @@ class LegacyICUTokenizer(AbstractTokenizer):
conn.commit() conn.commit()
def _cleanup_housenumbers(self): def _cleanup_housenumbers(self) -> None:
""" Remove unused house numbers. """ Remove unused house numbers.
""" """
with connect(self.dsn) as conn: with connect(self.dsn) as conn:
@@ -148,7 +154,7 @@ class LegacyICUTokenizer(AbstractTokenizer):
def update_word_tokens(self): def update_word_tokens(self) -> None:
""" Remove unused tokens. """ Remove unused tokens.
""" """
LOG.warning("Cleaning up housenumber tokens.") LOG.warning("Cleaning up housenumber tokens.")
@@ -156,7 +162,7 @@ class LegacyICUTokenizer(AbstractTokenizer):
LOG.warning("Tokenizer house-keeping done.") LOG.warning("Tokenizer house-keeping done.")
def name_analyzer(self): def name_analyzer(self) -> 'ICUNameAnalyzer':
""" Create a new analyzer for tokenizing names and queries """ Create a new analyzer for tokenizing names and queries
using this tokinzer. Analyzers are context managers and should using this tokinzer. Analyzers are context managers and should
be used accordingly: be used accordingly:
@@ -171,13 +177,15 @@ class LegacyICUTokenizer(AbstractTokenizer):
Analyzers are not thread-safe. You need to instantiate one per thread. Analyzers are not thread-safe. You need to instantiate one per thread.
""" """
return LegacyICUNameAnalyzer(self.dsn, self.loader.make_sanitizer(), assert self.loader is not None
self.loader.make_token_analysis()) return ICUNameAnalyzer(self.dsn, self.loader.make_sanitizer(),
self.loader.make_token_analysis())
def _install_php(self, phpdir, overwrite=True): def _install_php(self, phpdir: Path, overwrite: bool = True) -> None:
""" Install the php script for the tokenizer. """ Install the php script for the tokenizer.
""" """
assert self.loader is not None
php_file = self.data_dir / "tokenizer.php" php_file = self.data_dir / "tokenizer.php"
if not php_file.exists() or overwrite: if not php_file.exists() or overwrite:
@@ -189,15 +197,16 @@ class LegacyICUTokenizer(AbstractTokenizer):
require_once('{phpdir}/tokenizer/icu_tokenizer.php');"""), encoding='utf-8') require_once('{phpdir}/tokenizer/icu_tokenizer.php');"""), encoding='utf-8')
def _save_config(self): def _save_config(self) -> None:
""" Save the configuration that needs to remain stable for the given """ Save the configuration that needs to remain stable for the given
database as database properties. database as database properties.
""" """
assert self.loader is not None
with connect(self.dsn) as conn: with connect(self.dsn) as conn:
self.loader.save_config_to_db(conn) self.loader.save_config_to_db(conn)
def _init_db_tables(self, config): def _init_db_tables(self, config: Configuration) -> None:
""" Set up the word table and fill it with pre-computed word """ Set up the word table and fill it with pre-computed word
frequencies. frequencies.
""" """
@@ -207,15 +216,16 @@ class LegacyICUTokenizer(AbstractTokenizer):
conn.commit() conn.commit()
class LegacyICUNameAnalyzer(AbstractAnalyzer): class ICUNameAnalyzer(AbstractAnalyzer):
""" The legacy analyzer uses the ICU library for splitting names. """ The ICU analyzer uses the ICU library for splitting names.
Each instance opens a connection to the database to request the Each instance opens a connection to the database to request the
normalization. normalization.
""" """
def __init__(self, dsn, sanitizer, token_analysis): def __init__(self, dsn: str, sanitizer: PlaceSanitizer,
self.conn = connect(dsn).connection token_analysis: ICUTokenAnalysis) -> None:
self.conn: Optional[Connection] = connect(dsn).connection
self.conn.autocommit = True self.conn.autocommit = True
self.sanitizer = sanitizer self.sanitizer = sanitizer
self.token_analysis = token_analysis self.token_analysis = token_analysis
@@ -223,7 +233,7 @@ class LegacyICUNameAnalyzer(AbstractAnalyzer):
self._cache = _TokenCache() self._cache = _TokenCache()
def close(self): def close(self) -> None:
""" Free all resources used by the analyzer. """ Free all resources used by the analyzer.
""" """
if self.conn: if self.conn:
@@ -231,20 +241,20 @@ class LegacyICUNameAnalyzer(AbstractAnalyzer):
self.conn = None self.conn = None
def _search_normalized(self, name): def _search_normalized(self, name: str) -> str:
""" Return the search token transliteration of the given name. """ Return the search token transliteration of the given name.
""" """
return self.token_analysis.search.transliterate(name).strip() return cast(str, self.token_analysis.search.transliterate(name)).strip()
def _normalized(self, name): def _normalized(self, name: str) -> str:
""" Return the normalized version of the given name with all """ Return the normalized version of the given name with all
non-relevant information removed. non-relevant information removed.
""" """
return self.token_analysis.normalizer.transliterate(name).strip() return cast(str, self.token_analysis.normalizer.transliterate(name)).strip()
def get_word_token_info(self, words): def get_word_token_info(self, words: Sequence[str]) -> List[Tuple[str, str, int]]:
""" Return token information for the given list of words. """ Return token information for the given list of words.
If a word starts with # it is assumed to be a full name If a word starts with # it is assumed to be a full name
otherwise is a partial name. otherwise is a partial name.
@@ -255,6 +265,7 @@ class LegacyICUNameAnalyzer(AbstractAnalyzer):
The function is used for testing and debugging only The function is used for testing and debugging only
and not necessarily efficient. and not necessarily efficient.
""" """
assert self.conn is not None
full_tokens = {} full_tokens = {}
partial_tokens = {} partial_tokens = {}
for word in words: for word in words:
@@ -277,7 +288,7 @@ class LegacyICUNameAnalyzer(AbstractAnalyzer):
+ [(k, v, part_ids.get(v, None)) for k, v in partial_tokens.items()] + [(k, v, part_ids.get(v, None)) for k, v in partial_tokens.items()]
def normalize_postcode(self, postcode): def normalize_postcode(self, postcode: str) -> str:
""" Convert the postcode to a standardized form. """ Convert the postcode to a standardized form.
This function must yield exactly the same result as the SQL function This function must yield exactly the same result as the SQL function
@@ -286,10 +297,11 @@ class LegacyICUNameAnalyzer(AbstractAnalyzer):
return postcode.strip().upper() return postcode.strip().upper()
def update_postcodes_from_db(self): def update_postcodes_from_db(self) -> None:
""" Update postcode tokens in the word table from the location_postcode """ Update postcode tokens in the word table from the location_postcode
table. table.
""" """
assert self.conn is not None
analyzer = self.token_analysis.analysis.get('@postcode') analyzer = self.token_analysis.analysis.get('@postcode')
with self.conn.cursor() as cur: with self.conn.cursor() as cur:
@@ -324,13 +336,15 @@ class LegacyICUNameAnalyzer(AbstractAnalyzer):
self._delete_unused_postcode_words(word_entries - needed_entries) self._delete_unused_postcode_words(word_entries - needed_entries)
self._add_missing_postcode_words(needed_entries - word_entries) self._add_missing_postcode_words(needed_entries - word_entries)
def _delete_unused_postcode_words(self, tokens): def _delete_unused_postcode_words(self, tokens: Iterable[str]) -> None:
assert self.conn is not None
if tokens: if tokens:
with self.conn.cursor() as cur: with self.conn.cursor() as cur:
cur.execute("DELETE FROM word WHERE type = 'P' and word = any(%s)", cur.execute("DELETE FROM word WHERE type = 'P' and word = any(%s)",
(list(tokens), )) (list(tokens), ))
def _add_missing_postcode_words(self, tokens): def _add_missing_postcode_words(self, tokens: Iterable[str]) -> None:
assert self.conn is not None
if not tokens: if not tokens:
return return
@@ -341,10 +355,12 @@ class LegacyICUNameAnalyzer(AbstractAnalyzer):
if '@' in postcode_name: if '@' in postcode_name:
term, variant = postcode_name.split('@', 2) term, variant = postcode_name.split('@', 2)
term = self._search_normalized(term) term = self._search_normalized(term)
variants = {term} if analyzer is None:
if analyzer is not None: variants = [term]
variants.update(analyzer.get_variants_ascii(variant)) else:
variants = list(variants) variants = analyzer.get_variants_ascii(variant)
if term not in variants:
variants.append(term)
else: else:
variants = [self._search_normalized(postcode_name)] variants = [self._search_normalized(postcode_name)]
terms.append((postcode_name, variants)) terms.append((postcode_name, variants))
@@ -358,12 +374,14 @@ class LegacyICUNameAnalyzer(AbstractAnalyzer):
def update_special_phrases(self, phrases, should_replace): def update_special_phrases(self, phrases: Sequence[Tuple[str, str, str, str]],
should_replace: bool) -> None:
""" Replace the search index for special phrases with the new phrases. """ Replace the search index for special phrases with the new phrases.
If `should_replace` is True, then the previous set of will be If `should_replace` is True, then the previous set of will be
completely replaced. Otherwise the phrases are added to the completely replaced. Otherwise the phrases are added to the
already existing ones. already existing ones.
""" """
assert self.conn is not None
norm_phrases = set(((self._normalized(p[0]), p[1], p[2], p[3]) norm_phrases = set(((self._normalized(p[0]), p[1], p[2], p[3])
for p in phrases)) for p in phrases))
@@ -386,7 +404,9 @@ class LegacyICUNameAnalyzer(AbstractAnalyzer):
len(norm_phrases), added, deleted) len(norm_phrases), added, deleted)
def _add_special_phrases(self, cursor, new_phrases, existing_phrases): def _add_special_phrases(self, cursor: Cursor,
new_phrases: Set[Tuple[str, str, str, str]],
existing_phrases: Set[Tuple[str, str, str, str]]) -> int:
""" Add all phrases to the database that are not yet there. """ Add all phrases to the database that are not yet there.
""" """
to_add = new_phrases - existing_phrases to_add = new_phrases - existing_phrases
@@ -407,8 +427,9 @@ class LegacyICUNameAnalyzer(AbstractAnalyzer):
return added return added
@staticmethod def _remove_special_phrases(self, cursor: Cursor,
def _remove_special_phrases(cursor, new_phrases, existing_phrases): new_phrases: Set[Tuple[str, str, str, str]],
existing_phrases: Set[Tuple[str, str, str, str]]) -> int:
""" Remove all phrases from the databse that are no longer in the """ Remove all phrases from the databse that are no longer in the
new phrase list. new phrase list.
""" """
@@ -425,7 +446,7 @@ class LegacyICUNameAnalyzer(AbstractAnalyzer):
return len(to_delete) return len(to_delete)
def add_country_names(self, country_code, names): def add_country_names(self, country_code: str, names: Mapping[str, str]) -> None:
""" Add default names for the given country to the search index. """ Add default names for the given country to the search index.
""" """
# Make sure any name preprocessing for country names applies. # Make sure any name preprocessing for country names applies.
@@ -437,10 +458,12 @@ class LegacyICUNameAnalyzer(AbstractAnalyzer):
internal=True) internal=True)
def _add_country_full_names(self, country_code, names, internal=False): def _add_country_full_names(self, country_code: str, names: Sequence[PlaceName],
internal: bool = False) -> None:
""" Add names for the given country from an already sanitized """ Add names for the given country from an already sanitized
name list. name list.
""" """
assert self.conn is not None
word_tokens = set() word_tokens = set()
for name in names: for name in names:
norm_name = self._search_normalized(name.name) norm_name = self._search_normalized(name.name)
@@ -453,7 +476,8 @@ class LegacyICUNameAnalyzer(AbstractAnalyzer):
FROM word FROM word
WHERE type = 'C' and word = %s""", WHERE type = 'C' and word = %s""",
(country_code, )) (country_code, ))
existing_tokens = {True: set(), False: set()} # internal/external names # internal/external names
existing_tokens: Dict[bool, Set[str]] = {True: set(), False: set()}
for word in cur: for word in cur:
existing_tokens[word[1]].add(word[0]) existing_tokens[word[1]].add(word[0])
@@ -486,7 +510,7 @@ class LegacyICUNameAnalyzer(AbstractAnalyzer):
cur.execute(sql, (country_code, list(new_tokens))) cur.execute(sql, (country_code, list(new_tokens)))
def process_place(self, place): def process_place(self, place: PlaceInfo) -> Mapping[str, Any]:
""" Determine tokenizer information about the given place. """ Determine tokenizer information about the given place.
Returns a JSON-serializable structure that will be handed into Returns a JSON-serializable structure that will be handed into
@@ -500,6 +524,7 @@ class LegacyICUNameAnalyzer(AbstractAnalyzer):
token_info.set_names(*self._compute_name_tokens(names)) token_info.set_names(*self._compute_name_tokens(names))
if place.is_country(): if place.is_country():
assert place.country_code is not None
self._add_country_full_names(place.country_code, names) self._add_country_full_names(place.country_code, names)
if address: if address:
@@ -508,7 +533,8 @@ class LegacyICUNameAnalyzer(AbstractAnalyzer):
return token_info.to_dict() return token_info.to_dict()
def _process_place_address(self, token_info, address): def _process_place_address(self, token_info: '_TokenInfo',
address: Sequence[PlaceName]) -> None:
for item in address: for item in address:
if item.kind == 'postcode': if item.kind == 'postcode':
token_info.set_postcode(self._add_postcode(item)) token_info.set_postcode(self._add_postcode(item))
@@ -524,12 +550,13 @@ class LegacyICUNameAnalyzer(AbstractAnalyzer):
token_info.add_address_term(item.kind, self._compute_partial_tokens(item.name)) token_info.add_address_term(item.kind, self._compute_partial_tokens(item.name))
def _compute_housenumber_token(self, hnr): def _compute_housenumber_token(self, hnr: PlaceName) -> Tuple[Optional[int], Optional[str]]:
""" Normalize the housenumber and return the word token and the """ Normalize the housenumber and return the word token and the
canonical form. canonical form.
""" """
assert self.conn is not None
analyzer = self.token_analysis.analysis.get('@housenumber') analyzer = self.token_analysis.analysis.get('@housenumber')
result = None, None result: Tuple[Optional[int], Optional[str]] = (None, None)
if analyzer is None: if analyzer is None:
# When no custom analyzer is set, simply normalize and transliterate # When no custom analyzer is set, simply normalize and transliterate
@@ -539,7 +566,7 @@ class LegacyICUNameAnalyzer(AbstractAnalyzer):
if result[0] is None: if result[0] is None:
with self.conn.cursor() as cur: with self.conn.cursor() as cur:
cur.execute("SELECT getorcreate_hnr_id(%s)", (norm_name, )) cur.execute("SELECT getorcreate_hnr_id(%s)", (norm_name, ))
result = cur.fetchone()[0], norm_name result = cur.fetchone()[0], norm_name # type: ignore[no-untyped-call]
self._cache.housenumbers[norm_name] = result self._cache.housenumbers[norm_name] = result
else: else:
# Otherwise use the analyzer to determine the canonical name. # Otherwise use the analyzer to determine the canonical name.
@@ -554,16 +581,17 @@ class LegacyICUNameAnalyzer(AbstractAnalyzer):
with self.conn.cursor() as cur: with self.conn.cursor() as cur:
cur.execute("SELECT create_analyzed_hnr_id(%s, %s)", cur.execute("SELECT create_analyzed_hnr_id(%s, %s)",
(norm_name, list(variants))) (norm_name, list(variants)))
result = cur.fetchone()[0], variants[0] result = cur.fetchone()[0], variants[0] # type: ignore[no-untyped-call]
self._cache.housenumbers[norm_name] = result self._cache.housenumbers[norm_name] = result
return result return result
def _compute_partial_tokens(self, name): def _compute_partial_tokens(self, name: str) -> List[int]:
""" Normalize the given term, split it into partial words and return """ Normalize the given term, split it into partial words and return
then token list for them. then token list for them.
""" """
assert self.conn is not None
norm_name = self._search_normalized(name) norm_name = self._search_normalized(name)
tokens = [] tokens = []
@@ -582,16 +610,18 @@ class LegacyICUNameAnalyzer(AbstractAnalyzer):
(need_lookup, )) (need_lookup, ))
for partial, token in cur: for partial, token in cur:
assert token is not None
tokens.append(token) tokens.append(token)
self._cache.partials[partial] = token self._cache.partials[partial] = token
return tokens return tokens
def _retrieve_full_tokens(self, name): def _retrieve_full_tokens(self, name: str) -> List[int]:
""" Get the full name token for the given name, if it exists. """ Get the full name token for the given name, if it exists.
The name is only retrived for the standard analyser. The name is only retrived for the standard analyser.
""" """
assert self.conn is not None
norm_name = self._search_normalized(name) norm_name = self._search_normalized(name)
# return cached if possible # return cached if possible
@@ -608,12 +638,13 @@ class LegacyICUNameAnalyzer(AbstractAnalyzer):
return full return full
def _compute_name_tokens(self, names): def _compute_name_tokens(self, names: Sequence[PlaceName]) -> Tuple[Set[int], Set[int]]:
""" Computes the full name and partial name tokens for the given """ Computes the full name and partial name tokens for the given
dictionary of names. dictionary of names.
""" """
full_tokens = set() assert self.conn is not None
partial_tokens = set() full_tokens: Set[int] = set()
partial_tokens: Set[int] = set()
for name in names: for name in names:
analyzer_id = name.get_attr('analyzer') analyzer_id = name.get_attr('analyzer')
@@ -633,19 +664,23 @@ class LegacyICUNameAnalyzer(AbstractAnalyzer):
with self.conn.cursor() as cur: with self.conn.cursor() as cur:
cur.execute("SELECT * FROM getorcreate_full_word(%s, %s)", cur.execute("SELECT * FROM getorcreate_full_word(%s, %s)",
(token_id, variants)) (token_id, variants))
full, part = cur.fetchone() full, part = cast(Tuple[int, List[int]],
cur.fetchone()) # type: ignore[no-untyped-call]
self._cache.names[token_id] = (full, part) self._cache.names[token_id] = (full, part)
assert part is not None
full_tokens.add(full) full_tokens.add(full)
partial_tokens.update(part) partial_tokens.update(part)
return full_tokens, partial_tokens return full_tokens, partial_tokens
def _add_postcode(self, item): def _add_postcode(self, item: PlaceName) -> Optional[str]:
""" Make sure the normalized postcode is present in the word table. """ Make sure the normalized postcode is present in the word table.
""" """
assert self.conn is not None
analyzer = self.token_analysis.analysis.get('@postcode') analyzer = self.token_analysis.analysis.get('@postcode')
if analyzer is None: if analyzer is None:
@@ -680,25 +715,24 @@ class LegacyICUNameAnalyzer(AbstractAnalyzer):
class _TokenInfo: class _TokenInfo:
""" Collect token information to be sent back to the database. """ Collect token information to be sent back to the database.
""" """
def __init__(self): def __init__(self) -> None:
self.names = None self.names: Optional[str] = None
self.housenumbers = set() self.housenumbers: Set[str] = set()
self.housenumber_tokens = set() self.housenumber_tokens: Set[int] = set()
self.street_tokens = set() self.street_tokens: Set[int] = set()
self.place_tokens = set() self.place_tokens: Set[int] = set()
self.address_tokens = {} self.address_tokens: Dict[str, str] = {}
self.postcode = None self.postcode: Optional[str] = None
@staticmethod def _mk_array(self, tokens: Iterable[Any]) -> str:
def _mk_array(tokens):
return f"{{{','.join((str(s) for s in tokens))}}}" return f"{{{','.join((str(s) for s in tokens))}}}"
def to_dict(self): def to_dict(self) -> Dict[str, Any]:
""" Return the token information in database importable format. """ Return the token information in database importable format.
""" """
out = {} out: Dict[str, Any] = {}
if self.names: if self.names:
out['names'] = self.names out['names'] = self.names
@@ -722,40 +756,41 @@ class _TokenInfo:
return out return out
def set_names(self, fulls, partials): def set_names(self, fulls: Iterable[int], partials: Iterable[int]) -> None:
""" Adds token information for the normalised names. """ Adds token information for the normalised names.
""" """
self.names = self._mk_array(itertools.chain(fulls, partials)) self.names = self._mk_array(itertools.chain(fulls, partials))
def add_housenumber(self, token, hnr): def add_housenumber(self, token: Optional[int], hnr: Optional[str]) -> None:
""" Extract housenumber information from a list of normalised """ Extract housenumber information from a list of normalised
housenumbers. housenumbers.
""" """
if token: if token:
assert hnr is not None
self.housenumbers.add(hnr) self.housenumbers.add(hnr)
self.housenumber_tokens.add(token) self.housenumber_tokens.add(token)
def add_street(self, tokens): def add_street(self, tokens: Iterable[int]) -> None:
""" Add addr:street match terms. """ Add addr:street match terms.
""" """
self.street_tokens.update(tokens) self.street_tokens.update(tokens)
def add_place(self, tokens): def add_place(self, tokens: Iterable[int]) -> None:
""" Add addr:place search and match terms. """ Add addr:place search and match terms.
""" """
self.place_tokens.update(tokens) self.place_tokens.update(tokens)
def add_address_term(self, key, partials): def add_address_term(self, key: str, partials: Iterable[int]) -> None:
""" Add additional address terms. """ Add additional address terms.
""" """
if partials: if partials:
self.address_tokens[key] = self._mk_array(partials) self.address_tokens[key] = self._mk_array(partials)
def set_postcode(self, postcode): def set_postcode(self, postcode: Optional[str]) -> None:
""" Set the postcode to the given one. """ Set the postcode to the given one.
""" """
self.postcode = postcode self.postcode = postcode
@@ -767,9 +802,9 @@ class _TokenCache:
This cache is not thread-safe and needs to be instantiated per This cache is not thread-safe and needs to be instantiated per
analyzer. analyzer.
""" """
def __init__(self): def __init__(self) -> None:
self.names = {} self.names: Dict[str, Tuple[int, List[int]]] = {}
self.partials = {} self.partials: Dict[str, int] = {}
self.fulls = {} self.fulls: Dict[str, List[int]] = {}
self.postcodes = set() self.postcodes: Set[str] = set()
self.housenumbers = {} self.housenumbers: Dict[str, Tuple[Optional[int], Optional[str]]] = {}