move filling of postcode table to python

The Python code now takes care of reading postcodes from placex,
enhancing them with potentially existing external postcodes and
updating location_postcodes accordingly. The initial setup and
updates use exactly the same function.

External postcode handling has been generalized. External postcodes
for any country are now accepted. The format of the external postcode
file has changed. We now expect CSV, potentially gzipped. The
postcodes are no longer saved in the database.
This commit is contained in:
Sarah Hoffmann
2021-05-12 19:57:48 +02:00
parent cae0cf3546
commit a4aba23a83
8 changed files with 264 additions and 166 deletions

View File

@@ -2,80 +2,167 @@
Functions for importing, updating and otherwise maintaining the table
of artificial postcode centroids.
"""
import csv
import gzip
import logging
from psycopg2.extras import execute_values
from nominatim.db.utils import execute_file
from nominatim.db.connection import connect
def import_postcodes(dsn, project_dir, tokenizer):
""" Set up the initial list of postcodes.
LOG = logging.getLogger()
class _CountryPostcodesCollector:
""" Collector for postcodes of a single country.
"""
with connect(dsn) as conn:
conn.drop_table('gb_postcode')
conn.drop_table('us_postcode')
def __init__(self, country):
self.country = country
self.collected = dict()
def add(self, postcode, x, y):
""" Add the given postcode to the collection cache. If the postcode
already existed, it is overwritten with the new centroid.
"""
self.collected[postcode] = (x, y)
def commit(self, conn, analyzer, project_dir):
""" Update postcodes for the country from the postcodes selected so far
as well as any externally supplied postcodes.
"""
self._update_from_external(analyzer, project_dir)
to_add, to_delete, to_update = self._compute_changes(conn)
with conn.cursor() as cur:
cur.execute("""CREATE TABLE gb_postcode (
id integer,
postcode character varying(9),
geometry GEOMETRY(Point, 4326))""")
if to_add:
execute_values(cur,
"""INSERT INTO location_postcodes
(place_id, indexed_status, countrycode,
postcode, geometry) VALUES %s""",
to_add,
template="""(nextval('seq_place'), 1, '{}',
%s, 'SRID=4326;POINT(%s %s)')
""".format(self.country))
if to_delete:
cur.execute("""DELETE FROM location_postcodes
WHERE country_code = %s and postcode = any(%s)
""", (self.country, to_delete))
if to_update:
execute_values(cur,
"""UPDATE location_postcodes
SET indexed_status = 2,
geometry = ST_SetSRID(ST_Point(v.x, v.y), 4326)
FROM (VALUES %s) AS v (pc, x, y)
WHERE country_code = '{}' and postcode = pc
""".format(self.country),
to_update)
def _compute_changes(self, conn):
""" Compute which postcodes from the collected postcodes have to be
added or modified and which from the location_postcodes table
have to be deleted.
"""
to_update = []
to_delete = []
with conn.cursor() as cur:
cur.execute("""CREATE TABLE us_postcode (
postcode text,
x double precision,
y double precision)""")
conn.commit()
cur.execute("""SELECT postcode, ST_X(geometry), ST_Y(geometry)
FROM location_postcodes
WHERE country_code = %s""",
(self.country, ))
for postcode, x, y in cur:
oldx, oldy = self.collected.pop(postcode, (None, None))
if oldx is not None:
dist = (x - oldx)**2 + (y - oldy)**2
if dist > 0.000001:
to_update.append(postcode, x, y)
else:
to_delete.append(postcode)
gb_postcodes = project_dir / 'gb_postcode_data.sql.gz'
if gb_postcodes.is_file():
execute_file(dsn, gb_postcodes)
to_add = [(k, v[0], v[1]) for k, v in self.collected.items()]
self.collected = []
us_postcodes = project_dir / 'us_postcode_data.sql.gz'
if us_postcodes.is_file():
execute_file(dsn, us_postcodes)
return to_add, to_delete, to_update
with conn.cursor() as cur:
cur.execute("TRUNCATE location_postcode")
cur.execute("""
INSERT INTO location_postcode
(place_id, indexed_status, country_code, postcode, geometry)
SELECT nextval('seq_place'), 1, country_code,
token_normalized_postcode(address->'postcode') as pc,
ST_Centroid(ST_Collect(ST_Centroid(geometry)))
FROM placex
WHERE address ? 'postcode'
and token_normalized_postcode(address->'postcode') is not null
AND geometry IS NOT null
GROUP BY country_code, pc
""")
cur.execute("""
INSERT INTO location_postcode
(place_id, indexed_status, country_code, postcode, geometry)
SELECT nextval('seq_place'), 1, 'us',
token_normalized_postcode(postcode),
ST_SetSRID(ST_Point(x,y),4326)
FROM us_postcode WHERE token_normalized_postcode(postcode) NOT IN
(SELECT postcode FROM location_postcode
WHERE country_code = 'us')
""")
def _update_from_external(self, analyzer, project_dir):
""" Look for an external postcode file for the active country in
the project directory and add missing postcodes when found.
"""
csvfile = self._open_external(project_dir)
if csvfile is None:
return
cur.execute("""
INSERT INTO location_postcode
(place_id, indexed_status, country_code, postcode, geometry)
SELECT nextval('seq_place'), 1, 'gb',
token_normalized_postcode(postcode), geometry
FROM gb_postcode WHERE token_normalized_postcode(postcode) NOT IN
(SELECT postcode FROM location_postcode
WHERE country_code = 'gb')
""")
try:
reader = csv.DictReader(csvfile)
for row in reader:
if 'postcode' not in row or 'lat' not in row or 'lon' not in row:
LOG.warning("Bad format for external postcode file for country '%s'."
" Ignored.", self.country)
return
postcode = analyzer.normalize_postcode(row['postcode'])
if postcode not in self.collected:
try:
self.collected[postcode] = (float(row['lon'], float(row['lat'])))
except ValueError:
LOG.warning("Bad coordinates %s, %s in %s country postcode file.",
row['lat'], row['lon'], self.country)
cur.execute("""
DELETE FROM word WHERE class='place' and type='postcode'
and word NOT IN (SELECT postcode FROM location_postcode)
""")
conn.commit()
finally:
csvfile.close()
with tokenizer.name_analyzer() as analyzer:
analyzer.add_postcodes_from_db()
def _open_external(self, project_dir):
fname = project_dir / '{}_postcodes.csv'.format(self.country)
if fname.is_file():
LOG.info("Using external postcode file '%s'.", fname)
return open(fname, 'r')
fname = project_dir / '{}_postcodes.csv.gz'.format(self.country)
if fname.is_file():
LOG.info("Using external postcode file '%s'.", fname)
return gzip.open(fname, 'rt')
return None
def update_postcodes(dsn, project_dir, tokenizer):
""" Update the table of artificial postcodes.
Computes artificial postcode centroids from the placex table,
potentially enhances it with external data and then updates the
postcodes in the table 'location_postcode'.
"""
with tokenizer.name_analyzer() as analyzer:
with connect(dsn) as conn:
with conn.cursor("placex_postcodes") as cur:
cur.execute("""SELECT country_code, pc, ST_X(centroid), ST_Y(centroid)
FROM (
SELECT country_code,
token_normalized_postcode(address->'postcode') as pc,
ST_Centroid(ST_Collect(ST_Centroid(geometry))) as centroid
FROM placex
WHERE address ? 'postcode' and geometry IS NOT null
GROUP BY country_code, pc) xx
WHERE pc is not null
ORDER BY country_code, pc""")
collector = None
for country, postcode, x, y in cur:
if collector is None or country != collector.country:
if collector is not None:
collector.commit(conn, analyzer, project_dir)
collector = _CountryPostcodesCollector(country)
collector.add(postcode, x, y)
if collector is not None:
collector.commit(conn, analyzer, project_dir)
conn.commit()
analyzer.add_postcodes_from_db()