use tokenizer during Tiger data import

This also changes the required import format to CSV.
This commit is contained in:
Sarah Hoffmann
2021-05-13 22:11:41 +02:00
parent d7f9d2bde9
commit 35efe3b41c
5 changed files with 57 additions and 34 deletions

View File

@@ -12,4 +12,4 @@ ignored-modules=icu
ignored-classes=NominatimArgs,closing ignored-classes=NominatimArgs,closing
disable=too-few-public-methods,duplicate-code disable=too-few-public-methods,duplicate-code
good-names=i,x,y good-names=i,x,y,fd

View File

@@ -12,4 +12,6 @@ ALTER TABLE location_property_tiger_import RENAME TO location_property_tiger;
ALTER INDEX IF EXISTS idx_location_property_tiger_parent_place_id_imp RENAME TO idx_location_property_tiger_housenumber_parent_place_id; ALTER INDEX IF EXISTS idx_location_property_tiger_parent_place_id_imp RENAME TO idx_location_property_tiger_housenumber_parent_place_id;
ALTER INDEX IF EXISTS idx_location_property_tiger_place_id_imp RENAME TO idx_location_property_tiger_place_id; ALTER INDEX IF EXISTS idx_location_property_tiger_place_id_imp RENAME TO idx_location_property_tiger_place_id;
DROP FUNCTION tiger_line_import (linegeo geometry, in_startnumber integer, in_endnumber integer, interpolationtype text, in_street text, in_isin text, in_postcode text); DROP FUNCTION tiger_line_import (linegeo GEOMETRY, in_startnumber INTEGER,
in_endnumber INTEGER, interpolationtype TEXT,
token_info JSONB, in_postcode TEXT);

View File

@@ -1,9 +1,9 @@
DROP TABLE IF EXISTS location_property_tiger_import; DROP TABLE IF EXISTS location_property_tiger_import;
CREATE TABLE location_property_tiger_import (linegeo GEOMETRY, place_id BIGINT, partition INTEGER, parent_place_id BIGINT, startnumber INTEGER, endnumber INTEGER, interpolationtype TEXT, postcode TEXT); CREATE TABLE location_property_tiger_import (linegeo GEOMETRY, place_id BIGINT, partition INTEGER, parent_place_id BIGINT, startnumber INTEGER, endnumber INTEGER, interpolationtype TEXT, postcode TEXT);
CREATE OR REPLACE FUNCTION tiger_line_import(linegeo GEOMETRY, in_startnumber INTEGER, CREATE OR REPLACE FUNCTION tiger_line_import(linegeo GEOMETRY, in_startnumber INTEGER,
in_endnumber INTEGER, interpolationtype TEXT, in_endnumber INTEGER, interpolationtype TEXT,
in_street TEXT, in_isin TEXT, in_postcode TEXT) RETURNS INTEGER token_info JSONB, in_postcode TEXT) RETURNS INTEGER
AS $$ AS $$
DECLARE DECLARE
startnumber INTEGER; startnumber INTEGER;
@@ -27,13 +27,13 @@ BEGIN
END IF; END IF;
IF startnumber < 0 THEN IF startnumber < 0 THEN
RAISE WARNING 'Negative house number range (% to %) on %, %', startnumber, endnumber, in_street, in_isin; RAISE WARNING 'Negative house number range (% to %)', startnumber, endnumber;
RETURN 0; RETURN 0;
END IF; END IF;
numberrange := endnumber - startnumber; numberrange := endnumber - startnumber;
IF (interpolationtype = 'odd' AND startnumber%2 = 0) OR (interpolationtype = 'even' AND startnumber%2 = 1) THEN IF (interpolationtype = 'odd' AND startnumber % 2 = 0) OR (interpolationtype = 'even' AND startnumber % 2 = 1) THEN
startnumber := startnumber + 1; startnumber := startnumber + 1;
stepsize := 2; stepsize := 2;
ELSE ELSE
@@ -45,10 +45,10 @@ BEGIN
END IF; END IF;
-- Filter out really broken tiger data -- Filter out really broken tiger data
IF numberrange > 0 AND (numberrange::float/stepsize::float > 500) IF numberrange > 0 AND (numberrange::float/stepsize::float > 500)
AND ST_length(linegeo)/(numberrange::float/stepsize::float) < 0.000001 THEN AND ST_length(linegeo)/(numberrange::float/stepsize::float) < 0.000001 THEN
RAISE WARNING 'Road too short for number range % to % on %, % (%)',startnumber,endnumber,in_street,in_isin, RAISE WARNING 'Road too short for number range % to % (%)',startnumber,endnumber,
ST_length(linegeo)/(numberrange::float/stepsize::float); ST_length(linegeo)/(numberrange::float/stepsize::float);
RETURN 0; RETURN 0;
END IF; END IF;
@@ -56,7 +56,7 @@ BEGIN
out_partition := get_partition('us'); out_partition := get_partition('us');
out_parent_place_id := null; out_parent_place_id := null;
address_street_word_ids := word_ids_from_name(in_street); address_street_word_ids := token_addr_street_match_tokens(token_info);
IF address_street_word_ids IS NOT NULL THEN IF address_street_word_ids IS NOT NULL THEN
out_parent_place_id := getNearestNamedRoadPlaceId(out_partition, place_centroid, out_parent_place_id := getNearestNamedRoadPlaceId(out_partition, place_centroid,
address_street_word_ids); address_street_word_ids);

View File

@@ -13,7 +13,6 @@ from nominatim.tools.exec_utils import run_legacy_script, run_php_server
from nominatim.errors import UsageError from nominatim.errors import UsageError
from nominatim import clicmd from nominatim import clicmd
from nominatim.clicmd.args import NominatimArgs from nominatim.clicmd.args import NominatimArgs
from nominatim.tools import tiger_data
LOG = logging.getLogger() LOG = logging.getLogger()
@@ -147,9 +146,14 @@ class UpdateAddData:
@staticmethod @staticmethod
def run(args): def run(args):
from nominatim.tokenizer import factory as tokenizer_factory
from nominatim.tools import tiger_data
if args.tiger_data: if args.tiger_data:
tokenizer = tokenizer_factory.get_tokenizer_for_db(args.config)
return tiger_data.add_tiger_data(args.tiger_data, return tiger_data.add_tiger_data(args.tiger_data,
args.config, args.threads or 1) args.config, args.threads or 1,
tokenizer)
params = ['update.php'] params = ['update.php']
if args.file: if args.file:

View File

@@ -1,10 +1,14 @@
""" """
Functions for importing tiger data and handling tarbar and directory files Functions for importing tiger data and handling tarbar and directory files
""" """
import csv
import io
import logging import logging
import os import os
import tarfile import tarfile
import psycopg2.extras
from nominatim.db.connection import connect from nominatim.db.connection import connect
from nominatim.db.async_connection import WorkerPool from nominatim.db.async_connection import WorkerPool
from nominatim.db.sql_preprocessor import SQLPreprocessor from nominatim.db.sql_preprocessor import SQLPreprocessor
@@ -20,30 +24,40 @@ def handle_tarfile_or_directory(data_dir):
tar = None tar = None
if data_dir.endswith('.tar.gz'): if data_dir.endswith('.tar.gz'):
tar = tarfile.open(data_dir) tar = tarfile.open(data_dir)
sql_files = [i for i in tar.getmembers() if i.name.endswith('.sql')] csv_files = [i for i in tar.getmembers() if i.name.endswith('.csv')]
LOG.warning("Found %d SQL files in tarfile with path %s", len(sql_files), data_dir) LOG.warning("Found %d CSV files in tarfile with path %s", len(csv_files), data_dir)
if not sql_files: if not csv_files:
LOG.warning("Tiger data import selected but no files in tarfile's path %s", data_dir) LOG.warning("Tiger data import selected but no files in tarfile's path %s", data_dir)
return None, None return None, None
else: else:
files = os.listdir(data_dir) files = os.listdir(data_dir)
sql_files = [os.path.join(data_dir, i) for i in files if i.endswith('.sql')] csv_files = [os.path.join(data_dir, i) for i in files if i.endswith('.csv')]
LOG.warning("Found %d SQL files in path %s", len(sql_files), data_dir) LOG.warning("Found %d CSV files in path %s", len(csv_files), data_dir)
if not sql_files: if not csv_files:
LOG.warning("Tiger data import selected but no files found in path %s", data_dir) LOG.warning("Tiger data import selected but no files found in path %s", data_dir)
return None, None return None, None
return sql_files, tar return csv_files, tar
def handle_threaded_sql_statements(pool, file): def handle_threaded_sql_statements(pool, fd, analyzer):
""" Handles sql statement with multiplexing """ Handles sql statement with multiplexing
""" """
lines = 0 lines = 0
# Using pool of database connections to execute sql statements # Using pool of database connections to execute sql statements
for sql_query in file:
pool.next_free_worker().perform(sql_query) sql = "SELECT tiger_line_import(%s, %s, %s, %s, %s, %s)"
for row in csv.DictReader(fd, delimiter=';'):
try:
address = dict(street=row['street'], postcode=row['postcode'])
args = ('SRID=4326;' + row['geometry'],
int(row['from']), int(row['to']), row['interpolation'],
psycopg2.extras.Json(analyzer.process_place(dict(address=address))),
analyzer.normalize_postcode(row['postcode']))
except ValueError:
continue
pool.next_free_worker().perform(sql, args=args)
lines += 1 lines += 1
if lines == 1000: if lines == 1000:
@@ -51,31 +65,34 @@ def handle_threaded_sql_statements(pool, file):
lines = 0 lines = 0
def add_tiger_data(data_dir, config, threads): def add_tiger_data(data_dir, config, threads, tokenizer):
""" Import tiger data from directory or tar file `data dir`. """ Import tiger data from directory or tar file `data dir`.
""" """
dsn = config.get_libpq_dsn() dsn = config.get_libpq_dsn()
sql_files, tar = handle_tarfile_or_directory(data_dir) files, tar = handle_tarfile_or_directory(data_dir)
if not sql_files: if not files:
return return
with connect(dsn) as conn: with connect(dsn) as conn:
sql = SQLPreprocessor(conn, config) sql = SQLPreprocessor(conn, config)
sql.run_sql_file(conn, 'tiger_import_start.sql') sql.run_sql_file(conn, 'tiger_import_start.sql')
# Reading sql_files and then for each file line handling # Reading files and then for each file line handling
# sql_query in <threads - 1> chunks. # sql_query in <threads - 1> chunks.
place_threads = max(1, threads - 1) place_threads = max(1, threads - 1)
with WorkerPool(dsn, place_threads, ignore_sql_errors=True) as pool: with WorkerPool(dsn, place_threads, ignore_sql_errors=True) as pool:
for sql_file in sql_files: with tokenizer.name_analyzer() as analyzer:
if not tar: for fname in files:
file = open(sql_file) if not tar:
else: fd = open(fname)
file = tar.extractfile(sql_file) else:
fd = io.TextIOWrapper(tar.extractfile(fname))
handle_threaded_sql_statements(pool, file) handle_threaded_sql_statements(pool, fd, analyzer)
fd.close()
if tar: if tar:
tar.close() tar.close()