remove legacy tokenizer and direct tests

This commit is contained in:
Sarah Hoffmann
2024-09-21 11:38:08 +02:00
parent e92e03e2e6
commit b87d6226fb
9 changed files with 0 additions and 2360 deletions

View File

@@ -1,273 +0,0 @@
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2024 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Implementation of query analysis for the legacy tokenizer.
"""
from typing import Tuple, Dict, List, Optional, Iterator, Any, cast
from copy import copy
from collections import defaultdict
import dataclasses
import sqlalchemy as sa
from ..typing import SaRow
from ..connection import SearchConnection
from ..logging import log
from . import query as qmod
from .query_analyzer_factory import AbstractQueryAnalyzer
def yield_words(terms: List[str], start: int) -> Iterator[Tuple[str, qmod.TokenRange]]:
""" Return all combinations of words in the terms list after the
given position.
"""
total = len(terms)
for first in range(start, total):
word = terms[first]
yield word, qmod.TokenRange(first, first + 1)
for last in range(first + 1, min(first + 20, total)):
word = ' '.join((word, terms[last]))
yield word, qmod.TokenRange(first, last + 1)
@dataclasses.dataclass
class LegacyToken(qmod.Token):
""" Specialised token for legacy tokenizer.
"""
word_token: str
category: Optional[Tuple[str, str]]
country: Optional[str]
operator: Optional[str]
@property
def info(self) -> Dict[str, Any]:
""" Dictionary of additional properties of the token.
Should only be used for debugging purposes.
"""
return {'category': self.category,
'country': self.country,
'operator': self.operator}
def get_category(self) -> Tuple[str, str]:
assert self.category
return self.category
class LegacyQueryAnalyzer(AbstractQueryAnalyzer):
""" Converter for query strings into a tokenized query
using the tokens created by a legacy tokenizer.
"""
def __init__(self, conn: SearchConnection) -> None:
self.conn = conn
async def setup(self) -> None:
""" Set up static data structures needed for the analysis.
"""
self.max_word_freq = int(await self.conn.get_property('tokenizer_maxwordfreq'))
if 'word' not in self.conn.t.meta.tables:
sa.Table('word', self.conn.t.meta,
sa.Column('word_id', sa.Integer),
sa.Column('word_token', sa.Text, nullable=False),
sa.Column('word', sa.Text),
sa.Column('class', sa.Text),
sa.Column('type', sa.Text),
sa.Column('country_code', sa.Text),
sa.Column('search_name_count', sa.Integer),
sa.Column('operator', sa.Text))
async def analyze_query(self, phrases: List[qmod.Phrase]) -> qmod.QueryStruct:
""" Analyze the given list of phrases and return the
tokenized query.
"""
log().section('Analyze query (using Legacy tokenizer)')
normalized = []
if phrases:
for row in await self.conn.execute(sa.select(*(sa.func.make_standard_name(p.text)
for p in phrases))):
normalized = [qmod.Phrase(p.ptype, r) for r, p in zip(row, phrases) if r]
break
query = qmod.QueryStruct(normalized)
log().var_dump('Normalized query', query.source)
if not query.source:
return query
parts, words = self.split_query(query)
lookup_words = list(words.keys())
log().var_dump('Split query', parts)
log().var_dump('Extracted words', lookup_words)
for row in await self.lookup_in_db(lookup_words):
for trange in words[row.word_token.strip()]:
token, ttype = self.make_token(row)
if ttype == qmod.TokenType.NEAR_ITEM:
if trange.start == 0:
query.add_token(trange, qmod.TokenType.NEAR_ITEM, token)
elif ttype == qmod.TokenType.QUALIFIER:
query.add_token(trange, qmod.TokenType.QUALIFIER, token)
if trange.start == 0 or trange.end == query.num_token_slots():
token = copy(token)
token.penalty += 0.1 * (query.num_token_slots())
query.add_token(trange, qmod.TokenType.NEAR_ITEM, token)
elif ttype != qmod.TokenType.PARTIAL or trange.start + 1 == trange.end:
query.add_token(trange, ttype, token)
self.add_extra_tokens(query, parts)
self.rerank_tokens(query)
log().table_dump('Word tokens', _dump_word_tokens(query))
return query
def normalize_text(self, text: str) -> str:
""" Bring the given text into a normalized form.
This only removes case, so some difference with the normalization
in the phrase remains.
"""
return text.lower()
def split_query(self, query: qmod.QueryStruct) -> Tuple[List[str],
Dict[str, List[qmod.TokenRange]]]:
""" Transliterate the phrases and split them into tokens.
Returns a list of transliterated tokens and a dictionary
of words for lookup together with their position.
"""
parts: List[str] = []
phrase_start = 0
words = defaultdict(list)
for phrase in query.source:
query.nodes[-1].ptype = phrase.ptype
for trans in phrase.text.split(' '):
if trans:
for term in trans.split(' '):
if term:
parts.append(trans)
query.add_node(qmod.BreakType.TOKEN, phrase.ptype)
query.nodes[-1].btype = qmod.BreakType.WORD
query.nodes[-1].btype = qmod.BreakType.PHRASE
for word, wrange in yield_words(parts, phrase_start):
words[word].append(wrange)
phrase_start = len(parts)
query.nodes[-1].btype = qmod.BreakType.END
return parts, words
async def lookup_in_db(self, words: List[str]) -> 'sa.Result[Any]':
""" Return the token information from the database for the
given word tokens.
"""
t = self.conn.t.meta.tables['word']
sql = t.select().where(t.c.word_token.in_(words + [' ' + w for w in words]))
return await self.conn.execute(sql)
def make_token(self, row: SaRow) -> Tuple[LegacyToken, qmod.TokenType]:
""" Create a LegacyToken from the row of the word table.
Also determines the type of token.
"""
penalty = 0.0
is_indexed = True
rowclass = getattr(row, 'class')
if row.country_code is not None:
ttype = qmod.TokenType.COUNTRY
lookup_word = row.country_code
elif rowclass is not None:
if rowclass == 'place' and row.type == 'house':
ttype = qmod.TokenType.HOUSENUMBER
lookup_word = row.word_token[1:]
elif rowclass == 'place' and row.type == 'postcode':
ttype = qmod.TokenType.POSTCODE
lookup_word = row.word
else:
ttype = qmod.TokenType.NEAR_ITEM if row.operator in ('in', 'near')\
else qmod.TokenType.QUALIFIER
lookup_word = row.word
elif row.word_token.startswith(' '):
ttype = qmod.TokenType.WORD
lookup_word = row.word or row.word_token[1:]
else:
ttype = qmod.TokenType.PARTIAL
lookup_word = row.word_token
penalty = 0.21
if row.search_name_count > self.max_word_freq:
is_indexed = False
return LegacyToken(penalty=penalty, token=row.word_id,
count=max(1, row.search_name_count or 1),
addr_count=1, # not supported
lookup_word=lookup_word,
word_token=row.word_token.strip(),
category=(rowclass, row.type) if rowclass is not None else None,
country=row.country_code,
operator=row.operator,
is_indexed=is_indexed),\
ttype
def add_extra_tokens(self, query: qmod.QueryStruct, parts: List[str]) -> None:
""" Add tokens to query that are not saved in the database.
"""
for part, node, i in zip(parts, query.nodes, range(1000)):
if len(part) <= 4 and part.isdigit()\
and not node.has_tokens(i+1, qmod.TokenType.HOUSENUMBER):
query.add_token(qmod.TokenRange(i, i+1), qmod.TokenType.HOUSENUMBER,
LegacyToken(penalty=0.5, token=0, count=1, addr_count=1,
lookup_word=part, word_token=part,
category=None, country=None,
operator=None, is_indexed=True))
def rerank_tokens(self, query: qmod.QueryStruct) -> None:
""" Add penalties to tokens that depend on presence of other token.
"""
for _, node, tlist in query.iter_token_lists():
if tlist.ttype == qmod.TokenType.POSTCODE:
for repl in node.starting:
if repl.end == tlist.end and repl.ttype != qmod.TokenType.POSTCODE \
and (repl.ttype != qmod.TokenType.HOUSENUMBER
or len(tlist.tokens[0].lookup_word) > 4):
repl.add_penalty(0.39)
elif tlist.ttype == qmod.TokenType.HOUSENUMBER \
and len(tlist.tokens[0].lookup_word) <= 3:
if any(c.isdigit() for c in tlist.tokens[0].lookup_word):
for repl in node.starting:
if repl.end == tlist.end and repl.ttype != qmod.TokenType.HOUSENUMBER:
repl.add_penalty(0.5 - tlist.tokens[0].penalty)
def _dump_word_tokens(query: qmod.QueryStruct) -> Iterator[List[Any]]:
yield ['type', 'token', 'word_token', 'lookup_word', 'penalty', 'count', 'info', 'indexed']
for node in query.nodes:
for tlist in node.starting:
for token in tlist.tokens:
t = cast(LegacyToken, token)
yield [tlist.ttype.name, t.token, t.word_token or '',
t.lookup_word or '', t.penalty, t.count, t.info,
'Y' if t.is_indexed else 'N']
async def create_query_analyzer(conn: SearchConnection) -> AbstractQueryAnalyzer:
""" Create and set up a new query analyzer for a database based
on the ICU tokenizer.
"""
out = LegacyQueryAnalyzer(conn)
await out.setup()
return out

View File

@@ -1,666 +0,0 @@
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2024 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Tokenizer implementing normalisation as used before Nominatim 4.
"""
from typing import Optional, Sequence, List, Tuple, Mapping, Any, Callable, \
cast, Dict, Set, Iterable
from collections import OrderedDict
import logging
from pathlib import Path
import re
import shutil
from icu import Transliterator
import psycopg
from psycopg import sql as pysql
from ..errors import UsageError
from ..db.connection import connect, Connection, drop_tables, table_exists,\
execute_scalar, register_hstore
from ..config import Configuration
from ..db import properties
from ..db import utils as db_utils
from ..db.sql_preprocessor import SQLPreprocessor
from ..data.place_info import PlaceInfo
from .base import AbstractAnalyzer, AbstractTokenizer
DBCFG_NORMALIZATION = "tokenizer_normalization"
DBCFG_MAXWORDFREQ = "tokenizer_maxwordfreq"
LOG = logging.getLogger()
def create(dsn: str, data_dir: Path) -> 'LegacyTokenizer':
""" Create a new instance of the tokenizer provided by this module.
"""
LOG.warning('WARNING: the legacy tokenizer is deprecated '
'and will be removed in Nominatim 5.0.')
return LegacyTokenizer(dsn, data_dir)
def _install_module(config_module_path: str, src_dir: Optional[Path], module_dir: Path) -> str:
""" Copies the PostgreSQL normalisation module into the project
directory if necessary. For historical reasons the module is
saved in the '/module' subdirectory and not with the other tokenizer
data.
The function detects when the installation is run from the
build directory. It doesn't touch the module in that case.
"""
# Custom module locations are simply used as is.
if config_module_path:
LOG.info("Using custom path for database module at '%s'", config_module_path)
return config_module_path
# Otherwise a source dir must be given.
if src_dir is None:
raise UsageError("The legacy tokenizer cannot be used with the Nominatim pip module.")
# Compatibility mode for builddir installations.
if module_dir.exists() and src_dir.samefile(module_dir):
LOG.info('Running from build directory. Leaving database module as is.')
return str(module_dir)
# In any other case install the module in the project directory.
if not module_dir.exists():
module_dir.mkdir()
destfile = module_dir / 'nominatim.so'
shutil.copy(str(src_dir / 'nominatim.so'), str(destfile))
destfile.chmod(0o755)
LOG.info('Database module installed at %s', str(destfile))
return str(module_dir)
def _check_module(module_dir: str, conn: Connection) -> None:
""" Try to use the PostgreSQL module to confirm that it is correctly
installed and accessible from PostgreSQL.
"""
with conn.cursor() as cur:
try:
cur.execute(pysql.SQL("""CREATE FUNCTION nominatim_test_import_func(text)
RETURNS text AS {}, 'transliteration'
LANGUAGE c IMMUTABLE STRICT;
DROP FUNCTION nominatim_test_import_func(text)
""").format(pysql.Literal(f'{module_dir}/nominatim.so')))
except psycopg.DatabaseError as err:
LOG.fatal("Error accessing database module: %s", err)
raise UsageError("Database module cannot be accessed.") from err
class LegacyTokenizer(AbstractTokenizer):
""" The legacy tokenizer uses a special PostgreSQL module to normalize
names and queries. The tokenizer thus implements normalization through
calls to the database.
"""
def __init__(self, dsn: str, data_dir: Path) -> None:
self.dsn = dsn
self.data_dir = data_dir
self.normalization: Optional[str] = None
def init_new_db(self, config: Configuration, init_db: bool = True) -> None:
""" Set up a new tokenizer for the database.
This copies all necessary data in the project directory to make
sure the tokenizer remains stable even over updates.
"""
assert config.project_dir is not None
module_dir = _install_module(config.DATABASE_MODULE_PATH,
config.lib_dir.module,
config.project_dir / 'module')
self.normalization = config.TERM_NORMALIZATION
with connect(self.dsn) as conn:
_check_module(module_dir, conn)
self._save_config(conn, config)
conn.commit()
if init_db:
self.update_sql_functions(config)
self._init_db_tables(config)
def init_from_project(self, config: Configuration) -> None:
""" Initialise the tokenizer from the project directory.
"""
assert config.project_dir is not None
with connect(self.dsn) as conn:
self.normalization = properties.get_property(conn, DBCFG_NORMALIZATION)
if not (config.project_dir / 'module' / 'nominatim.so').exists():
_install_module(config.DATABASE_MODULE_PATH,
config.lib_dir.module,
config.project_dir / 'module')
def finalize_import(self, config: Configuration) -> None:
""" Do any required postprocessing to make the tokenizer data ready
for use.
"""
with connect(self.dsn) as conn:
sqlp = SQLPreprocessor(conn, config)
sqlp.run_sql_file(conn, 'tokenizer/legacy_tokenizer_indices.sql')
def update_sql_functions(self, config: Configuration) -> None:
""" Reimport the SQL functions for this tokenizer.
"""
assert config.project_dir is not None
with connect(self.dsn) as conn:
max_word_freq = properties.get_property(conn, DBCFG_MAXWORDFREQ)
modulepath = config.DATABASE_MODULE_PATH or \
str((config.project_dir / 'module').resolve())
sqlp = SQLPreprocessor(conn, config)
sqlp.run_sql_file(conn, 'tokenizer/legacy_tokenizer.sql',
max_word_freq=max_word_freq,
modulepath=modulepath)
def check_database(self, _: Configuration) -> Optional[str]:
""" Check that the tokenizer is set up correctly.
"""
hint = """\
The Postgresql extension nominatim.so was not correctly loaded.
Error: {error}
Hints:
* Check the output of the CMmake/make installation step
* Does nominatim.so exist?
* Does nominatim.so exist on the database server?
* Can nominatim.so be accessed by the database user?
"""
with connect(self.dsn) as conn:
try:
out = execute_scalar(conn, "SELECT make_standard_name('a')")
except psycopg.Error as err:
return hint.format(error=str(err))
if out != 'a':
return hint.format(error='Unexpected result for make_standard_name()')
return None
def migrate_database(self, config: Configuration) -> None:
""" Initialise the project directory of an existing database for
use with this tokenizer.
This is a special migration function for updating existing databases
to new software versions.
"""
assert config.project_dir is not None
self.normalization = config.TERM_NORMALIZATION
module_dir = _install_module(config.DATABASE_MODULE_PATH,
config.lib_dir.module,
config.project_dir / 'module')
with connect(self.dsn) as conn:
_check_module(module_dir, conn)
self._save_config(conn, config)
def update_statistics(self, config: Configuration, threads: int = 1) -> None:
""" Recompute the frequency of full words.
"""
with connect(self.dsn) as conn:
if table_exists(conn, 'search_name'):
drop_tables(conn, "word_frequencies")
with conn.cursor() as cur:
LOG.info("Computing word frequencies")
cur.execute("""CREATE TEMP TABLE word_frequencies AS
SELECT unnest(name_vector) as id, count(*)
FROM search_name GROUP BY id""")
cur.execute("CREATE INDEX ON word_frequencies(id)")
LOG.info("Update word table with recomputed frequencies")
cur.execute("""UPDATE word SET search_name_count = count
FROM word_frequencies
WHERE word_token like ' %' and word_id = id""")
drop_tables(conn, "word_frequencies")
conn.commit()
def update_word_tokens(self) -> None:
""" No house-keeping implemented for the legacy tokenizer.
"""
LOG.info("No tokenizer clean-up available.")
def name_analyzer(self) -> 'LegacyNameAnalyzer':
""" Create a new analyzer for tokenizing names and queries
using this tokinzer. Analyzers are context managers and should
be used accordingly:
```
with tokenizer.name_analyzer() as analyzer:
analyser.tokenize()
```
When used outside the with construct, the caller must ensure to
call the close() function before destructing the analyzer.
Analyzers are not thread-safe. You need to instantiate one per thread.
"""
normalizer = Transliterator.createFromRules("phrase normalizer",
self.normalization)
return LegacyNameAnalyzer(self.dsn, normalizer)
def most_frequent_words(self, conn: Connection, num: int) -> List[str]:
""" Return a list of the `num` most frequent full words
in the database.
"""
with conn.cursor() as cur:
cur.execute(""" SELECT word FROM word WHERE word is not null
ORDER BY search_name_count DESC LIMIT %s""", (num,))
return list(s[0] for s in cur)
def _init_db_tables(self, config: Configuration) -> None:
""" Set up the word table and fill it with pre-computed word
frequencies.
"""
with connect(self.dsn) as conn:
sqlp = SQLPreprocessor(conn, config)
sqlp.run_sql_file(conn, 'tokenizer/legacy_tokenizer_tables.sql')
conn.commit()
LOG.warning("Precomputing word tokens")
db_utils.execute_file(self.dsn, config.lib_dir.data / 'words.sql')
def _save_config(self, conn: Connection, config: Configuration) -> None:
""" Save the configuration that needs to remain stable for the given
database as database properties.
"""
assert self.normalization is not None
properties.set_property(conn, DBCFG_NORMALIZATION, self.normalization)
properties.set_property(conn, DBCFG_MAXWORDFREQ, config.MAX_WORD_FREQUENCY)
class LegacyNameAnalyzer(AbstractAnalyzer):
""" The legacy analyzer uses the special Postgresql module for
splitting names.
Each instance opens a connection to the database to request the
normalization.
"""
def __init__(self, dsn: str, normalizer: Any):
self.conn: Optional[Connection] = connect(dsn)
self.conn.autocommit = True
self.normalizer = normalizer
register_hstore(self.conn)
self._cache = _TokenCache(self.conn)
def close(self) -> None:
""" Free all resources used by the analyzer.
"""
if self.conn:
self.conn.close()
self.conn = None
def get_word_token_info(self, words: Sequence[str]) -> List[Tuple[str, str, int]]:
""" Return token information for the given list of words.
If a word starts with # it is assumed to be a full name
otherwise is a partial name.
The function returns a list of tuples with
(original word, word token, word id).
The function is used for testing and debugging only
and not necessarily efficient.
"""
assert self.conn is not None
with self.conn.cursor() as cur:
cur.execute("""SELECT t.term, word_token, word_id
FROM word, (SELECT unnest(%s::TEXT[]) as term) t
WHERE word_token = (CASE
WHEN left(t.term, 1) = '#' THEN
' ' || make_standard_name(substring(t.term from 2))
ELSE
make_standard_name(t.term)
END)
and class is null and country_code is null""",
(words, ))
return [(r[0], r[1], r[2]) for r in cur]
def normalize(self, phrase: str) -> str:
""" Normalize the given phrase, i.e. remove all properties that
are irrelevant for search.
"""
return cast(str, self.normalizer.transliterate(phrase))
def normalize_postcode(self, postcode: str) -> str:
""" Convert the postcode to a standardized form.
This function must yield exactly the same result as the SQL function
'token_normalized_postcode()'.
"""
return postcode.strip().upper()
def update_postcodes_from_db(self) -> None:
""" Update postcode tokens in the word table from the location_postcode
table.
"""
assert self.conn is not None
with self.conn.cursor() as cur:
# This finds us the rows in location_postcode and word that are
# missing in the other table.
cur.execute("""SELECT * FROM
(SELECT pc, word FROM
(SELECT distinct(postcode) as pc FROM location_postcode) p
FULL JOIN
(SELECT word FROM word
WHERE class ='place' and type = 'postcode') w
ON pc = word) x
WHERE pc is null or word is null""")
to_delete = []
to_add = []
for postcode, word in cur:
if postcode is None:
to_delete.append(word)
else:
to_add.append(postcode)
if to_delete:
cur.execute("""DELETE FROM WORD
WHERE class ='place' and type = 'postcode'
and word = any(%s)
""", (to_delete, ))
if to_add:
cur.execute("""SELECT count(create_postcode_id(pc))
FROM unnest(%s::text[]) as pc
""", (to_add, ))
def update_special_phrases(self, phrases: Iterable[Tuple[str, str, str, str]],
should_replace: bool) -> None:
""" Replace the search index for special phrases with the new phrases.
"""
assert self.conn is not None
norm_phrases = set(((self.normalize(p[0]), p[1], p[2], p[3])
for p in phrases))
with self.conn.cursor() as cur:
# Get the old phrases.
existing_phrases = set()
cur.execute("""SELECT word, class as cls, type, operator FROM word
WHERE class != 'place'
OR (type != 'house' AND type != 'postcode')""")
for label, cls, typ, oper in cur:
existing_phrases.add((label, cls, typ, oper or '-'))
to_add = norm_phrases - existing_phrases
to_delete = existing_phrases - norm_phrases
if to_add:
cur.executemany(
""" INSERT INTO word (word_id, word_token, word, class, type,
search_name_count, operator)
(SELECT nextval('seq_word'), ' ' || make_standard_name(name), name,
class, type, 0,
CASE WHEN op in ('in', 'near') THEN op ELSE null END
FROM (VALUES (%s, %s, %s, %s)) as v(name, class, type, op))""",
to_add)
if to_delete and should_replace:
cur.executemany(
""" DELETE FROM word
USING (VALUES (%s, %s, %s, %s)) as v(name, in_class, in_type, op)
WHERE word = name and class = in_class and type = in_type
and ((op = '-' and operator is null) or op = operator)""",
to_delete)
LOG.info("Total phrases: %s. Added: %s. Deleted: %s",
len(norm_phrases), len(to_add), len(to_delete))
def add_country_names(self, country_code: str, names: Mapping[str, str]) -> None:
""" Add names for the given country to the search index.
"""
assert self.conn is not None
with self.conn.cursor() as cur:
cur.execute(
"""INSERT INTO word (word_id, word_token, country_code)
(SELECT nextval('seq_word'), lookup_token, %s
FROM (SELECT DISTINCT ' ' || make_standard_name(n) as lookup_token
FROM unnest(%s::TEXT[])n) y
WHERE NOT EXISTS(SELECT * FROM word
WHERE word_token = lookup_token and country_code = %s))
""", (country_code, list(names.values()), country_code))
def process_place(self, place: PlaceInfo) -> Mapping[str, Any]:
""" Determine tokenizer information about the given place.
Returns a JSON-serialisable structure that will be handed into
the database via the token_info field.
"""
assert self.conn is not None
token_info = _TokenInfo(self._cache)
names = place.name
if names:
token_info.add_names(self.conn, names)
if place.is_country():
assert place.country_code is not None
self.add_country_names(place.country_code, names)
address = place.address
if address:
self._process_place_address(token_info, address)
return token_info.data
def _process_place_address(self, token_info: '_TokenInfo', address: Mapping[str, str]) -> None:
assert self.conn is not None
hnrs = []
addr_terms = []
for key, value in address.items():
if key == 'postcode':
# Make sure the normalized postcode is present in the word table.
if re.search(r'[:,;]', value) is None:
norm_pc = self.normalize_postcode(value)
token_info.set_postcode(norm_pc)
self._cache.add_postcode(self.conn, norm_pc)
elif key in ('housenumber', 'streetnumber', 'conscriptionnumber'):
hnrs.append(value)
elif key == 'street':
token_info.add_street(self.conn, value)
elif key == 'place':
token_info.add_place(self.conn, value)
elif not key.startswith('_') \
and key not in ('country', 'full', 'inclusion'):
addr_terms.append((key, value))
if hnrs:
token_info.add_housenumbers(self.conn, hnrs)
if addr_terms:
token_info.add_address_terms(self.conn, addr_terms)
class _TokenInfo:
""" Collect token information to be sent back to the database.
"""
def __init__(self, cache: '_TokenCache') -> None:
self.cache = cache
self.data: Dict[str, Any] = {}
def add_names(self, conn: Connection, names: Mapping[str, str]) -> None:
""" Add token information for the names of the place.
"""
# Create the token IDs for all names.
self.data['names'] = execute_scalar(conn, "SELECT make_keywords(%s)::text",
(names, ))
def add_housenumbers(self, conn: Connection, hnrs: Sequence[str]) -> None:
""" Extract housenumber information from the address.
"""
if len(hnrs) == 1:
token = self.cache.get_housenumber(hnrs[0])
if token is not None:
self.data['hnr_tokens'] = token
self.data['hnr'] = hnrs[0]
return
# split numbers if necessary
simple_list: List[str] = []
for hnr in hnrs:
simple_list.extend((x.strip() for x in re.split(r'[;,]', hnr)))
if len(simple_list) > 1:
simple_list = list(set(simple_list))
with conn.cursor() as cur:
cur.execute("SELECT * FROM create_housenumbers(%s)", (simple_list, ))
result = cur.fetchone()
assert result is not None
self.data['hnr_tokens'], self.data['hnr'] = result
def set_postcode(self, postcode: str) -> None:
""" Set or replace the postcode token with the given value.
"""
self.data['postcode'] = postcode
def add_street(self, conn: Connection, street: str) -> None:
""" Add addr:street match terms.
"""
def _get_street(name: str) -> Optional[str]:
return cast(Optional[str],
execute_scalar(conn, "SELECT word_ids_from_name(%s)::text", (name, )))
tokens = self.cache.streets.get(street, _get_street)
self.data['street'] = tokens or '{}'
def add_place(self, conn: Connection, place: str) -> None:
""" Add addr:place search and match terms.
"""
def _get_place(name: str) -> Tuple[List[int], List[int]]:
with conn.cursor() as cur:
cur.execute("""SELECT make_keywords(hstore('name' , %s))::text,
word_ids_from_name(%s)::text""",
(name, name))
return cast(Tuple[List[int], List[int]], cur.fetchone())
self.data['place_search'], self.data['place_match'] = \
self.cache.places.get(place, _get_place)
def add_address_terms(self, conn: Connection, terms: Sequence[Tuple[str, str]]) -> None:
""" Add additional address terms.
"""
def _get_address_term(name: str) -> Tuple[List[int], List[int]]:
with conn.cursor() as cur:
cur.execute("""SELECT addr_ids_from_name(%s)::text,
word_ids_from_name(%s)::text""",
(name, name))
return cast(Tuple[List[int], List[int]], cur.fetchone())
tokens = {}
for key, value in terms:
items = self.cache.address_terms.get(value, _get_address_term)
if items[0] or items[1]:
tokens[key] = items
if tokens:
self.data['addr'] = tokens
class _LRU:
""" Least recently used cache that accepts a generator function to
produce the item when there is a cache miss.
"""
def __init__(self, maxsize: int = 128):
self.data: 'OrderedDict[str, Any]' = OrderedDict()
self.maxsize = maxsize
def get(self, key: str, generator: Callable[[str], Any]) -> Any:
""" Get the item with the given key from the cache. If nothing
is found in the cache, generate the value through the
generator function and store it in the cache.
"""
value = self.data.get(key)
if value is not None:
self.data.move_to_end(key)
else:
value = generator(key)
if len(self.data) >= self.maxsize:
self.data.popitem(last=False)
self.data[key] = value
return value
class _TokenCache:
""" Cache for token information to avoid repeated database queries.
This cache is not thread-safe and needs to be instantiated per
analyzer.
"""
def __init__(self, conn: Connection):
# various LRU caches
self.streets = _LRU(maxsize=256)
self.places = _LRU(maxsize=128)
self.address_terms = _LRU(maxsize=1024)
# Lookup houseunumbers up to 100 and cache them
with conn.cursor() as cur:
cur.execute("""SELECT i, ARRAY[getorcreate_housenumber_id(i::text)]::text
FROM generate_series(1, 100) as i""")
self._cached_housenumbers: Dict[str, str] = {str(r[0]): r[1] for r in cur}
# For postcodes remember the ones that have already been added
self.postcodes: Set[str] = set()
def get_housenumber(self, number: str) -> Optional[str]:
""" Get a housenumber token from the cache.
"""
return self._cached_housenumbers.get(number)
def add_postcode(self, conn: Connection, postcode: str) -> None:
""" Make sure the given postcode is in the database.
"""
if postcode not in self.postcodes:
with conn.cursor() as cur:
cur.execute('SELECT create_postcode_id(%s)', (postcode, ))
self.postcodes.add(postcode)