handle postcodes properly on word table updates

update_postcodes_from_db() needs to do the full postcode treatment
in order to derive the correct word table entries.
This commit is contained in:
Sarah Hoffmann
2022-06-21 22:05:35 +02:00
parent 5be320368c
commit 612d34930b
5 changed files with 214 additions and 54 deletions

View File

@@ -290,33 +290,72 @@ class LegacyICUNameAnalyzer(AbstractAnalyzer):
""" Update postcode tokens in the word table from the location_postcode
table.
"""
to_delete = []
analyzer = self.token_analysis.analysis.get('@postcode')
with self.conn.cursor() as cur:
# This finds us the rows in location_postcode and word that are
# missing in the other table.
cur.execute("""SELECT * FROM
(SELECT pc, word FROM
(SELECT distinct(postcode) as pc FROM location_postcode) p
FULL JOIN
(SELECT word FROM word WHERE type = 'P') w
ON pc = word) x
WHERE pc is null or word is null""")
# First get all postcode names currently in the word table.
cur.execute("SELECT DISTINCT word FROM word WHERE type = 'P'")
word_entries = set((entry[0] for entry in cur))
with CopyBuffer() as copystr:
for postcode, word in cur:
if postcode is None:
to_delete.append(word)
else:
copystr.add(self._search_normalized(postcode),
'P', postcode)
# Then compute the required postcode names from the postcode table.
needed_entries = set()
cur.execute("SELECT country_code, postcode FROM location_postcode")
for cc, postcode in cur:
info = PlaceInfo({'country_code': cc,
'class': 'place', 'type': 'postcode',
'address': {'postcode': postcode}})
address = self.sanitizer.process_names(info)[1]
for place in address:
if place.kind == 'postcode':
if analyzer is None:
postcode_name = place.name.strip().upper()
variant_base = None
else:
postcode_name = analyzer.normalize(place.name)
variant_base = place.get_attr("variant")
if variant_base:
needed_entries.add(f'{postcode_name}@{variant_base}')
else:
needed_entries.add(postcode_name)
break
# Now update the word table.
self._delete_unused_postcode_words(word_entries - needed_entries)
self._add_missing_postcode_words(needed_entries - word_entries)
def _delete_unused_postcode_words(self, tokens):
if tokens:
with self.conn.cursor() as cur:
cur.execute("DELETE FROM word WHERE type = 'P' and word = any(%s)",
(list(tokens), ))
def _add_missing_postcode_words(self, tokens):
if not tokens:
return
analyzer = self.token_analysis.analysis.get('@postcode')
terms = []
for postcode_name in tokens:
if '@' in postcode_name:
term, variant = postcode_name.split('@', 2)
term = self._search_normalized(term)
variants = {term}
if analyzer is not None:
variants.update(analyzer.get_variants_ascii(variant))
variants = list(variants)
else:
variants = [self._search_normalized(postcode_name)]
terms.append((postcode_name, variants))
if terms:
with self.conn.cursor() as cur:
cur.execute_values("""SELECT create_postcode_word(pc, var)
FROM (VALUES %s) AS v(pc, var)""",
terms)
if to_delete:
cur.execute("""DELETE FROM WORD
WHERE type ='P' and word = any(%s)
""", (to_delete, ))
copystr.copy_out(cur, 'word',
columns=['word_token', 'type', 'word'])
def update_special_phrases(self, phrases, should_replace):
@@ -616,7 +655,7 @@ class LegacyICUNameAnalyzer(AbstractAnalyzer):
postcode_name = analyzer.normalize(item.name)
variant_base = item.get_attr("variant")
if variant_base is not None:
if variant_base:
postcode = f'{postcode_name}@{variant_base}'
else:
postcode = postcode_name
@@ -627,7 +666,7 @@ class LegacyICUNameAnalyzer(AbstractAnalyzer):
return None
variants = {term}
if analyzer is not None and variant_base is not None:
if analyzer is not None and variant_base:
variants.update(analyzer.get_variants_ascii(variant_base))
with self.conn.cursor() as cur:

View File

@@ -25,8 +25,18 @@ def create(normalizer, transliterator, config): # pylint: disable=W0613
"""
return PostcodeTokenAnalysis(normalizer, transliterator)
class PostcodeTokenAnalysis:
""" Detects common housenumber patterns and normalizes them.
""" Special normalization and variant generation for postcodes.
This analyser must not be used with anything but postcodes as
it follows some special rules: `normalize` doesn't necessarily
need to return a standard form as per normalization rules. It
needs to return the canonical form of the postcode that is also
used for output. `get_variants_ascii` then needs to ensure that
the generated variants once more follow the standard normalization
and transliteration, so that postcodes are correctly recognised by
the search algorithm.
"""
def __init__(self, norm, trans):
self.norm = norm
@@ -44,11 +54,12 @@ class PostcodeTokenAnalysis:
def get_variants_ascii(self, norm_name):
""" Compute the spelling variants for the given normalized postcode.
The official form creates one variant. If a 'lookup version' is
given, then it will create variants with optional spaces.
Takes the canonical form of the postcode, normalizes it using the
standard rules and then creates variants of the result where
all spaces are optional.
"""
# Postcodes follow their own transliteration rules.
# Make sure at this point, that the terms are normalized in a way
# that they are searchable with the standard transliteration rules.
return [self.trans.transliterate(term) for term in
self.mutator.generate([self.norm.transliterate(norm_name)])]
self.mutator.generate([self.norm.transliterate(norm_name)]) if term]

View File

@@ -18,13 +18,18 @@ from nominatim.tokenizer import factory as tokenizer_factory
def check_database_integrity(context):
""" Check some generic constraints on the tables.
"""
# place_addressline should not have duplicate (place_id, address_place_id)
cur = context.db.cursor()
cur.execute("""SELECT count(*) FROM
(SELECT place_id, address_place_id, count(*) as c
FROM place_addressline GROUP BY place_id, address_place_id) x
WHERE c > 1""")
assert cur.fetchone()[0] == 0, "Duplicates found in place_addressline"
with context.db.cursor() as cur:
# place_addressline should not have duplicate (place_id, address_place_id)
cur.execute("""SELECT count(*) FROM
(SELECT place_id, address_place_id, count(*) as c
FROM place_addressline GROUP BY place_id, address_place_id) x
WHERE c > 1""")
assert cur.fetchone()[0] == 0, "Duplicates found in place_addressline"
# word table must not have empty word_tokens
cur.execute("SELECT count(*) FROM word WHERE word_token = ''")
assert cur.fetchone()[0] == 0, "Empty word tokens found in word table"
################################ GIVEN ##################################

View File

@@ -72,7 +72,8 @@ def analyzer(tokenizer_factory, test_config, monkeypatch,
def _mk_analyser(norm=("[[:Punctuation:][:Space:]]+ > ' '",), trans=(':: upper()',),
variants=('~gasse -> gasse', 'street => st', ),
sanitizers=[], with_housenumber=False):
sanitizers=[], with_housenumber=False,
with_postcode=False):
cfgstr = {'normalization': list(norm),
'sanitizers': sanitizers,
'transliteration': list(trans),
@@ -81,6 +82,9 @@ def analyzer(tokenizer_factory, test_config, monkeypatch,
if with_housenumber:
cfgstr['token-analysis'].append({'id': '@housenumber',
'analyzer': 'housenumbers'})
if with_postcode:
cfgstr['token-analysis'].append({'id': '@postcode',
'analyzer': 'postcodes'})
(test_config.project_dir / 'icu_tokenizer.yaml').write_text(yaml.dump(cfgstr))
tok.loader = nominatim.tokenizer.icu_rule_loader.ICURuleLoader(test_config)
@@ -246,28 +250,69 @@ def test_normalize_postcode(analyzer):
anl.normalize_postcode('38 Б') == '38 Б'
def test_update_postcodes_from_db_empty(analyzer, table_factory, word_table):
table_factory('location_postcode', 'postcode TEXT',
content=(('1234',), ('12 34',), ('AB23',), ('1234',)))
class TestPostcodes:
with analyzer() as anl:
anl.update_postcodes_from_db()
assert word_table.count() == 3
assert word_table.get_postcodes() == {'1234', '12 34', 'AB23'}
@pytest.fixture(autouse=True)
def setup(self, analyzer, sql_functions):
sanitizers = [{'step': 'clean-postcodes'}]
with analyzer(sanitizers=sanitizers, with_postcode=True) as anl:
self.analyzer = anl
yield anl
def test_update_postcodes_from_db_add_and_remove(analyzer, table_factory, word_table):
table_factory('location_postcode', 'postcode TEXT',
content=(('1234',), ('45BC', ), ('XX45', )))
word_table.add_postcode(' 1234', '1234')
word_table.add_postcode(' 5678', '5678')
def process_postcode(self, cc, postcode):
return self.analyzer.process_place(PlaceInfo({'country_code': cc,
'address': {'postcode': postcode}}))
with analyzer() as anl:
anl.update_postcodes_from_db()
assert word_table.count() == 3
assert word_table.get_postcodes() == {'1234', '45BC', 'XX45'}
def test_update_postcodes_from_db_empty(self, table_factory, word_table):
table_factory('location_postcode', 'country_code TEXT, postcode TEXT',
content=(('de', '12345'), ('se', '132 34'),
('bm', 'AB23'), ('fr', '12345')))
self.analyzer.update_postcodes_from_db()
assert word_table.count() == 5
assert word_table.get_postcodes() == {'12345', '132 34@132 34', 'AB 23@AB 23'}
def test_update_postcodes_from_db_ambigious(self, table_factory, word_table):
table_factory('location_postcode', 'country_code TEXT, postcode TEXT',
content=(('in', '123456'), ('sg', '123456')))
self.analyzer.update_postcodes_from_db()
assert word_table.count() == 3
assert word_table.get_postcodes() == {'123456', '123456@123 456'}
def test_update_postcodes_from_db_add_and_remove(self, table_factory, word_table):
table_factory('location_postcode', 'country_code TEXT, postcode TEXT',
content=(('ch', '1234'), ('bm', 'BC 45'), ('bm', 'XX45')))
word_table.add_postcode(' 1234', '1234')
word_table.add_postcode(' 5678', '5678')
self.analyzer.update_postcodes_from_db()
assert word_table.count() == 5
assert word_table.get_postcodes() == {'1234', 'BC 45@BC 45', 'XX 45@XX 45'}
def test_process_place_postcode_simple(self, word_table):
info = self.process_postcode('de', '12345')
assert info['postcode'] == '12345'
assert word_table.get_postcodes() == {'12345', }
def test_process_place_postcode_with_space(self, word_table):
info = self.process_postcode('in', '123 567')
assert info['postcode'] == '123567'
assert word_table.get_postcodes() == {'123567@123 567', }
def test_update_special_phrase_empty_table(analyzer, word_table):

View File

@@ -0,0 +1,60 @@
# SPDX-License-Identifier: GPL-2.0-only
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2022 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Tests for special postcode analysis and variant generation.
"""
import pytest
from icu import Transliterator
import nominatim.tokenizer.token_analysis.postcodes as module
from nominatim.errors import UsageError
DEFAULT_NORMALIZATION = """ :: NFD ();
'🜳' > ' ';
[[:Nonspacing Mark:] [:Cf:]] >;
:: lower ();
[[:Punctuation:][:Space:]]+ > ' ';
:: NFC ();
"""
DEFAULT_TRANSLITERATION = """ :: Latin ();
'🜵' > ' ';
"""
@pytest.fixture
def analyser():
rules = { 'analyzer': 'postcodes'}
config = module.configure(rules, DEFAULT_NORMALIZATION)
trans = Transliterator.createFromRules("test_trans", DEFAULT_TRANSLITERATION)
norm = Transliterator.createFromRules("test_norm", DEFAULT_NORMALIZATION)
return module.create(norm, trans, config)
def get_normalized_variants(proc, name):
norm = Transliterator.createFromRules("test_norm", DEFAULT_NORMALIZATION)
return proc.get_variants_ascii(norm.transliterate(name).strip())
@pytest.mark.parametrize('name,norm', [('12', '12'),
('A 34 ', 'A 34'),
('34-av', '34-AV')])
def test_normalize(analyser, name, norm):
assert analyser.normalize(name) == norm
@pytest.mark.parametrize('postcode,variants', [('12345', {'12345'}),
('AB-998', {'ab 998', 'ab998'}),
('23 FGH D3', {'23 fgh d3', '23fgh d3',
'23 fghd3', '23fghd3'})])
def test_get_variants_ascii(analyser, postcode, variants):
out = analyser.get_variants_ascii(postcode)
assert len(out) == len(set(out))
assert set(out) == variants