Merge pull request #2324 from lonvia/generic-external-postcodes

Rework postcode handling and generalised external postcode support
This commit is contained in:
Sarah Hoffmann
2021-05-13 14:52:19 +02:00
committed by GitHub
20 changed files with 731 additions and 329 deletions

View File

@@ -45,12 +45,19 @@ class UpdateRefresh:
@staticmethod
def run(args):
from ..tools import refresh
from ..tools import refresh, postcodes
from ..tokenizer import factory as tokenizer_factory
from ..indexer.indexer import Indexer
tokenizer = tokenizer_factory.get_tokenizer_for_db(args.config)
if args.postcodes:
LOG.warning("Update postcodes centroid")
refresh.update_postcodes(args.config.get_libpq_dsn(), args.sqllib_dir)
postcodes.update_postcodes(args.config.get_libpq_dsn(),
args.project_dir, tokenizer)
indexer = Indexer(args.config.get_libpq_dsn(), tokenizer,
args.threads or 1)
indexer.index_postcodes()
if args.word_counts:
LOG.warning('Recompute frequency of full-word search terms')
@@ -67,7 +74,6 @@ class UpdateRefresh:
with connect(args.config.get_libpq_dsn()) as conn:
refresh.create_functions(conn, args.config,
args.diffs, args.enable_debug_statements)
tokenizer = tokenizer_factory.get_tokenizer_for_db(args.config)
tokenizer.update_sql_functions(args.config)
if args.wiki_data:

View File

@@ -116,8 +116,8 @@ class SetupAll:
if args.continue_at is None or args.continue_at == 'load-data':
LOG.warning('Calculate postcodes')
postcodes.import_postcodes(args.config.get_libpq_dsn(), args.project_dir,
tokenizer)
postcodes.update_postcodes(args.config.get_libpq_dsn(),
args.project_dir, tokenizer)
if args.continue_at is None or args.continue_at in ('load-data', 'indexing'):
if args.continue_at is not None and args.continue_at != 'load-data':

View File

@@ -263,6 +263,16 @@ class LegacyICUNameAnalyzer:
"""
return self.normalizer.transliterate(phrase)
@staticmethod
def normalize_postcode(postcode):
""" Convert the postcode to a standardized form.
This function must yield exactly the same result as the SQL function
'token_normalized_postcode()'.
"""
return postcode.strip().upper()
@functools.lru_cache(maxsize=1024)
def make_standard_word(self, name):
""" Create the normalised version of the input.
@@ -285,25 +295,44 @@ class LegacyICUNameAnalyzer:
return self.transliterator.transliterate(hnr)
def add_postcodes_from_db(self):
""" Add postcodes from the location_postcode table to the word table.
def update_postcodes_from_db(self):
""" Update postcode tokens in the word table from the location_postcode
table.
"""
to_delete = []
copystr = io.StringIO()
with self.conn.cursor() as cur:
cur.execute("SELECT distinct(postcode) FROM location_postcode")
for (postcode, ) in cur:
copystr.write(postcode)
copystr.write('\t ')
copystr.write(self.transliterator.transliterate(postcode))
copystr.write('\tplace\tpostcode\t0\n')
# This finds us the rows in location_postcode and word that are
# missing in the other table.
cur.execute("""SELECT * FROM
(SELECT pc, word FROM
(SELECT distinct(postcode) as pc FROM location_postcode) p
FULL JOIN
(SELECT word FROM word
WHERE class ='place' and type = 'postcode') w
ON pc = word) x
WHERE pc is null or word is null""")
copystr.seek(0)
cur.copy_from(copystr, 'word',
columns=['word', 'word_token', 'class', 'type',
'search_name_count'])
# Don't really need an ID for postcodes....
# cur.execute("""UPDATE word SET word_id = nextval('seq_word')
# WHERE word_id is null and type = 'postcode'""")
for postcode, word in cur:
if postcode is None:
to_delete.append(word)
else:
copystr.write(postcode)
copystr.write('\t ')
copystr.write(self.transliterator.transliterate(postcode))
copystr.write('\tplace\tpostcode\t0\n')
if to_delete:
cur.execute("""DELETE FROM WORD
WHERE class ='place' and type = 'postcode'
and word = any(%s)
""", (to_delete, ))
if copystr.getvalue():
copystr.seek(0)
cur.copy_from(copystr, 'word',
columns=['word', 'word_token', 'class', 'type',
'search_name_count'])
def update_special_phrases(self, phrases):
@@ -435,22 +464,25 @@ class LegacyICUNameAnalyzer:
def _add_postcode(self, postcode):
""" Make sure the normalized postcode is present in the word table.
"""
if re.search(r'[:,;]', postcode) is None and not postcode in self._cache.postcodes:
term = self.make_standard_word(postcode)
if not term:
return
if re.search(r'[:,;]', postcode) is None:
postcode = self.normalize_postcode(postcode)
with self.conn.cursor() as cur:
# no word_id needed for postcodes
cur.execute("""INSERT INTO word (word, word_token, class, type,
search_name_count)
(SELECT pc, %s, 'place', 'postcode', 0
FROM (VALUES (%s)) as v(pc)
WHERE NOT EXISTS
(SELECT * FROM word
WHERE word = pc and class='place' and type='postcode'))
""", (' ' + term, postcode))
self._cache.postcodes.add(postcode)
if postcode not in self._cache.postcodes:
term = self.make_standard_word(postcode)
if not term:
return
with self.conn.cursor() as cur:
# no word_id needed for postcodes
cur.execute("""INSERT INTO word (word, word_token, class, type,
search_name_count)
(SELECT pc, %s, 'place', 'postcode', 0
FROM (VALUES (%s)) as v(pc)
WHERE NOT EXISTS
(SELECT * FROM word
WHERE word = pc and class='place' and type='postcode'))
""", (' ' + term, postcode))
self._cache.postcodes.add(postcode)
@staticmethod
def _split_housenumbers(hnrs):

View File

@@ -305,13 +305,51 @@ class LegacyNameAnalyzer:
return self.normalizer.transliterate(phrase)
def add_postcodes_from_db(self):
""" Add postcodes from the location_postcode table to the word table.
@staticmethod
def normalize_postcode(postcode):
""" Convert the postcode to a standardized form.
This function must yield exactly the same result as the SQL function
'token_normalized_postcode()'.
"""
return postcode.strip().upper()
def update_postcodes_from_db(self):
""" Update postcode tokens in the word table from the location_postcode
table.
"""
with self.conn.cursor() as cur:
cur.execute("""SELECT count(create_postcode_id(pc))
FROM (SELECT distinct(postcode) as pc
FROM location_postcode) x""")
# This finds us the rows in location_postcode and word that are
# missing in the other table.
cur.execute("""SELECT * FROM
(SELECT pc, word FROM
(SELECT distinct(postcode) as pc FROM location_postcode) p
FULL JOIN
(SELECT word FROM word
WHERE class ='place' and type = 'postcode') w
ON pc = word) x
WHERE pc is null or word is null""")
to_delete = []
to_add = []
for postcode, word in cur:
if postcode is None:
to_delete.append(word)
else:
to_add.append(postcode)
if to_delete:
cur.execute("""DELETE FROM WORD
WHERE class ='place' and type = 'postcode'
and word = any(%s)
""", (to_delete, ))
if to_add:
cur.execute("""SELECT count(create_postcode_id(pc))
FROM unnest(%s) as pc
""", (to_add, ))
def update_special_phrases(self, phrases):
@@ -421,7 +459,8 @@ class LegacyNameAnalyzer:
cur.execute('SELECT create_postcode_id(%s)', (pcode, ))
if re.search(r'[:,;]', postcode) is None:
self._cache.postcodes.get(postcode.strip().upper(), _create_postcode_from_db)
self._cache.postcodes.get(self.normalize_postcode(postcode),
_create_postcode_from_db)
class _TokenInfo:

View File

@@ -2,80 +2,196 @@
Functions for importing, updating and otherwise maintaining the table
of artificial postcode centroids.
"""
import csv
import gzip
import logging
from math import isfinite
from psycopg2.extras import execute_values
from nominatim.db.utils import execute_file
from nominatim.db.connection import connect
def import_postcodes(dsn, project_dir, tokenizer):
""" Set up the initial list of postcodes.
LOG = logging.getLogger()
def _to_float(num, max_value):
""" Convert the number in string into a float. The number is expected
to be in the range of [-max_value, max_value]. Otherwise rises a
ValueError.
"""
num = float(num)
if not isfinite(num) or num <= -max_value or num >= max_value:
raise ValueError()
return num
class _CountryPostcodesCollector:
""" Collector for postcodes of a single country.
"""
with connect(dsn) as conn:
conn.drop_table('gb_postcode')
conn.drop_table('us_postcode')
def __init__(self, country):
self.country = country
self.collected = dict()
def add(self, postcode, x, y):
""" Add the given postcode to the collection cache. If the postcode
already existed, it is overwritten with the new centroid.
"""
self.collected[postcode] = (x, y)
def commit(self, conn, analyzer, project_dir):
""" Update postcodes for the country from the postcodes selected so far
as well as any externally supplied postcodes.
"""
self._update_from_external(analyzer, project_dir)
to_add, to_delete, to_update = self._compute_changes(conn)
LOG.info("Processing country '%s' (%s added, %s deleted, %s updated).",
self.country, len(to_add), len(to_delete), len(to_update))
with conn.cursor() as cur:
cur.execute("""CREATE TABLE gb_postcode (
id integer,
postcode character varying(9),
geometry GEOMETRY(Point, 4326))""")
if to_add:
execute_values(cur,
"""INSERT INTO location_postcode
(place_id, indexed_status, country_code,
postcode, geometry) VALUES %s""",
to_add,
template="""(nextval('seq_place'), 1, '{}',
%s, 'SRID=4326;POINT(%s %s)')
""".format(self.country))
if to_delete:
cur.execute("""DELETE FROM location_postcode
WHERE country_code = %s and postcode = any(%s)
""", (self.country, to_delete))
if to_update:
execute_values(cur,
"""UPDATE location_postcode
SET indexed_status = 2,
geometry = ST_SetSRID(ST_Point(v.x, v.y), 4326)
FROM (VALUES %s) AS v (pc, x, y)
WHERE country_code = '{}' and postcode = pc
""".format(self.country),
to_update)
def _compute_changes(self, conn):
""" Compute which postcodes from the collected postcodes have to be
added or modified and which from the location_postcode table
have to be deleted.
"""
to_update = []
to_delete = []
with conn.cursor() as cur:
cur.execute("""CREATE TABLE us_postcode (
postcode text,
x double precision,
y double precision)""")
conn.commit()
cur.execute("""SELECT postcode, ST_X(geometry), ST_Y(geometry)
FROM location_postcode
WHERE country_code = %s""",
(self.country, ))
for postcode, x, y in cur:
newx, newy = self.collected.pop(postcode, (None, None))
if newx is not None:
dist = (x - newx)**2 + (y - newy)**2
if dist > 0.0000001:
to_update.append((postcode, newx, newy))
else:
to_delete.append(postcode)
gb_postcodes = project_dir / 'gb_postcode_data.sql.gz'
if gb_postcodes.is_file():
execute_file(dsn, gb_postcodes)
to_add = [(k, v[0], v[1]) for k, v in self.collected.items()]
self.collected = []
us_postcodes = project_dir / 'us_postcode_data.sql.gz'
if us_postcodes.is_file():
execute_file(dsn, us_postcodes)
return to_add, to_delete, to_update
with conn.cursor() as cur:
cur.execute("TRUNCATE location_postcode")
cur.execute("""
INSERT INTO location_postcode
(place_id, indexed_status, country_code, postcode, geometry)
SELECT nextval('seq_place'), 1, country_code,
token_normalized_postcode(address->'postcode') as pc,
ST_Centroid(ST_Collect(ST_Centroid(geometry)))
FROM placex
WHERE address ? 'postcode'
and token_normalized_postcode(address->'postcode') is not null
AND geometry IS NOT null
GROUP BY country_code, pc
""")
cur.execute("""
INSERT INTO location_postcode
(place_id, indexed_status, country_code, postcode, geometry)
SELECT nextval('seq_place'), 1, 'us',
token_normalized_postcode(postcode),
ST_SetSRID(ST_Point(x,y),4326)
FROM us_postcode WHERE token_normalized_postcode(postcode) NOT IN
(SELECT postcode FROM location_postcode
WHERE country_code = 'us')
""")
def _update_from_external(self, analyzer, project_dir):
""" Look for an external postcode file for the active country in
the project directory and add missing postcodes when found.
"""
csvfile = self._open_external(project_dir)
if csvfile is None:
return
cur.execute("""
INSERT INTO location_postcode
(place_id, indexed_status, country_code, postcode, geometry)
SELECT nextval('seq_place'), 1, 'gb',
token_normalized_postcode(postcode), geometry
FROM gb_postcode WHERE token_normalized_postcode(postcode) NOT IN
(SELECT postcode FROM location_postcode
WHERE country_code = 'gb')
""")
try:
reader = csv.DictReader(csvfile)
for row in reader:
if 'postcode' not in row or 'lat' not in row or 'lon' not in row:
LOG.warning("Bad format for external postcode file for country '%s'."
" Ignored.", self.country)
return
postcode = analyzer.normalize_postcode(row['postcode'])
if postcode not in self.collected:
try:
self.collected[postcode] = (_to_float(row['lon'], 180),
_to_float(row['lat'], 90))
except ValueError:
LOG.warning("Bad coordinates %s, %s in %s country postcode file.",
row['lat'], row['lon'], self.country)
cur.execute("""
DELETE FROM word WHERE class='place' and type='postcode'
and word NOT IN (SELECT postcode FROM location_postcode)
""")
conn.commit()
finally:
csvfile.close()
with tokenizer.name_analyzer() as analyzer:
analyzer.add_postcodes_from_db()
def _open_external(self, project_dir):
fname = project_dir / '{}_postcodes.csv'.format(self.country)
if fname.is_file():
LOG.info("Using external postcode file '%s'.", fname)
return open(fname, 'r')
fname = project_dir / '{}_postcodes.csv.gz'.format(self.country)
if fname.is_file():
LOG.info("Using external postcode file '%s'.", fname)
return gzip.open(fname, 'rt')
return None
def update_postcodes(dsn, project_dir, tokenizer):
""" Update the table of artificial postcodes.
Computes artificial postcode centroids from the placex table,
potentially enhances it with external data and then updates the
postcodes in the table 'location_postcode'.
"""
with tokenizer.name_analyzer() as analyzer:
with connect(dsn) as conn:
# First get the list of countries that currently have postcodes.
# (Doing this before starting to insert, so it is fast on import.)
with conn.cursor() as cur:
cur.execute("SELECT DISTINCT country_code FROM location_postcode")
todo_countries = set((row[0] for row in cur))
# Recompute the list of valid postcodes from placex.
with conn.cursor(name="placex_postcodes") as cur:
cur.execute("""SELECT country_code, pc, ST_X(centroid), ST_Y(centroid)
FROM (
SELECT country_code,
token_normalized_postcode(address->'postcode') as pc,
ST_Centroid(ST_Collect(ST_Centroid(geometry))) as centroid
FROM placex
WHERE address ? 'postcode' and geometry IS NOT null
and country_code is not null
GROUP BY country_code, pc) xx
WHERE pc is not null
ORDER BY country_code, pc""")
collector = None
for country, postcode, x, y in cur:
if collector is None or country != collector.country:
if collector is not None:
collector.commit(conn, analyzer, project_dir)
collector = _CountryPostcodesCollector(country)
todo_countries.discard(country)
collector.add(postcode, x, y)
if collector is not None:
collector.commit(conn, analyzer, project_dir)
# Now handle any countries that are only in the postcode table.
for country in todo_countries:
_CountryPostcodesCollector(country).commit(conn, analyzer, project_dir)
conn.commit()
analyzer.update_postcodes_from_db()

View File

@@ -13,12 +13,6 @@ from nominatim.version import NOMINATIM_VERSION
LOG = logging.getLogger()
def update_postcodes(dsn, sql_dir):
""" Recalculate postcode centroids and add, remove and update entries in the
location_postcode table. `conn` is an opne connection to the database.
"""
execute_file(dsn, sql_dir / 'update-postcodes.sql')
def recompute_word_counts(dsn, sql_dir):
""" Compute the frequency of full-word search terms.