Merge pull request #2328 from lonvia/convert-tiger-to-csv

Switch external Tiger data to CSV format
This commit is contained in:
Sarah Hoffmann
2021-05-14 09:58:50 +02:00
committed by GitHub
9 changed files with 6390 additions and 6286 deletions

View File

@@ -12,4 +12,4 @@ ignored-modules=icu
ignored-classes=NominatimArgs,closing ignored-classes=NominatimArgs,closing
disable=too-few-public-methods,duplicate-code disable=too-few-public-methods,duplicate-code
good-names=i,x,y good-names=i,x,y,fd

View File

@@ -51,11 +51,11 @@ entire US adds about 10GB to your database.
1. Get preprocessed TIGER 2020 data: 1. Get preprocessed TIGER 2020 data:
cd $PROJECT_DIR cd $PROJECT_DIR
wget https://nominatim.org/data/tiger2020-nominatim-preprocessed.tar.gz wget https://nominatim.org/data/tiger2020-nominatim-preprocessed.csv.tar.gz
2. Import the data into your Nominatim database: 2. Import the data into your Nominatim database:
nominatim add-data --tiger-data tiger2020-nominatim-preprocessed.tar.gz nominatim add-data --tiger-data tiger2020-nominatim-preprocessed.csv.tar.gz
3. Enable use of the Tiger data in your `.env` by adding: 3. Enable use of the Tiger data in your `.env` by adding:

View File

@@ -12,4 +12,6 @@ ALTER TABLE location_property_tiger_import RENAME TO location_property_tiger;
ALTER INDEX IF EXISTS idx_location_property_tiger_parent_place_id_imp RENAME TO idx_location_property_tiger_housenumber_parent_place_id; ALTER INDEX IF EXISTS idx_location_property_tiger_parent_place_id_imp RENAME TO idx_location_property_tiger_housenumber_parent_place_id;
ALTER INDEX IF EXISTS idx_location_property_tiger_place_id_imp RENAME TO idx_location_property_tiger_place_id; ALTER INDEX IF EXISTS idx_location_property_tiger_place_id_imp RENAME TO idx_location_property_tiger_place_id;
DROP FUNCTION tiger_line_import (linegeo geometry, in_startnumber integer, in_endnumber integer, interpolationtype text, in_street text, in_isin text, in_postcode text); DROP FUNCTION tiger_line_import (linegeo GEOMETRY, in_startnumber INTEGER,
in_endnumber INTEGER, interpolationtype TEXT,
token_info JSONB, in_postcode TEXT);

View File

@@ -1,9 +1,9 @@
DROP TABLE IF EXISTS location_property_tiger_import; DROP TABLE IF EXISTS location_property_tiger_import;
CREATE TABLE location_property_tiger_import (linegeo GEOMETRY, place_id BIGINT, partition INTEGER, parent_place_id BIGINT, startnumber INTEGER, endnumber INTEGER, interpolationtype TEXT, postcode TEXT); CREATE TABLE location_property_tiger_import (linegeo GEOMETRY, place_id BIGINT, partition INTEGER, parent_place_id BIGINT, startnumber INTEGER, endnumber INTEGER, interpolationtype TEXT, postcode TEXT);
CREATE OR REPLACE FUNCTION tiger_line_import(linegeo GEOMETRY, in_startnumber INTEGER, CREATE OR REPLACE FUNCTION tiger_line_import(linegeo GEOMETRY, in_startnumber INTEGER,
in_endnumber INTEGER, interpolationtype TEXT, in_endnumber INTEGER, interpolationtype TEXT,
in_street TEXT, in_isin TEXT, in_postcode TEXT) RETURNS INTEGER token_info JSONB, in_postcode TEXT) RETURNS INTEGER
AS $$ AS $$
DECLARE DECLARE
startnumber INTEGER; startnumber INTEGER;
@@ -27,13 +27,13 @@ BEGIN
END IF; END IF;
IF startnumber < 0 THEN IF startnumber < 0 THEN
RAISE WARNING 'Negative house number range (% to %) on %, %', startnumber, endnumber, in_street, in_isin; RAISE WARNING 'Negative house number range (% to %)', startnumber, endnumber;
RETURN 0; RETURN 0;
END IF; END IF;
numberrange := endnumber - startnumber; numberrange := endnumber - startnumber;
IF (interpolationtype = 'odd' AND startnumber%2 = 0) OR (interpolationtype = 'even' AND startnumber%2 = 1) THEN IF (interpolationtype = 'odd' AND startnumber % 2 = 0) OR (interpolationtype = 'even' AND startnumber % 2 = 1) THEN
startnumber := startnumber + 1; startnumber := startnumber + 1;
stepsize := 2; stepsize := 2;
ELSE ELSE
@@ -45,10 +45,10 @@ BEGIN
END IF; END IF;
-- Filter out really broken tiger data -- Filter out really broken tiger data
IF numberrange > 0 AND (numberrange::float/stepsize::float > 500) IF numberrange > 0 AND (numberrange::float/stepsize::float > 500)
AND ST_length(linegeo)/(numberrange::float/stepsize::float) < 0.000001 THEN AND ST_length(linegeo)/(numberrange::float/stepsize::float) < 0.000001 THEN
RAISE WARNING 'Road too short for number range % to % on %, % (%)',startnumber,endnumber,in_street,in_isin, RAISE WARNING 'Road too short for number range % to % (%)',startnumber,endnumber,
ST_length(linegeo)/(numberrange::float/stepsize::float); ST_length(linegeo)/(numberrange::float/stepsize::float);
RETURN 0; RETURN 0;
END IF; END IF;
@@ -56,7 +56,7 @@ BEGIN
out_partition := get_partition('us'); out_partition := get_partition('us');
out_parent_place_id := null; out_parent_place_id := null;
address_street_word_ids := word_ids_from_name(in_street); address_street_word_ids := token_addr_street_match_tokens(token_info);
IF address_street_word_ids IS NOT NULL THEN IF address_street_word_ids IS NOT NULL THEN
out_parent_place_id := getNearestNamedRoadPlaceId(out_partition, place_centroid, out_parent_place_id := getNearestNamedRoadPlaceId(out_partition, place_centroid,
address_street_word_ids); address_street_word_ids);

View File

@@ -13,7 +13,6 @@ from nominatim.tools.exec_utils import run_legacy_script, run_php_server
from nominatim.errors import UsageError from nominatim.errors import UsageError
from nominatim import clicmd from nominatim import clicmd
from nominatim.clicmd.args import NominatimArgs from nominatim.clicmd.args import NominatimArgs
from nominatim.tools import tiger_data
LOG = logging.getLogger() LOG = logging.getLogger()
@@ -147,9 +146,14 @@ class UpdateAddData:
@staticmethod @staticmethod
def run(args): def run(args):
from nominatim.tokenizer import factory as tokenizer_factory
from nominatim.tools import tiger_data
if args.tiger_data: if args.tiger_data:
tokenizer = tokenizer_factory.get_tokenizer_for_db(args.config)
return tiger_data.add_tiger_data(args.tiger_data, return tiger_data.add_tiger_data(args.tiger_data,
args.config, args.threads or 1) args.config, args.threads or 1,
tokenizer)
params = ['update.php'] params = ['update.php']
if args.file: if args.file:

View File

@@ -1,14 +1,18 @@
""" """
Functions for importing tiger data and handling tarbar and directory files Functions for importing tiger data and handling tarbar and directory files
""" """
import csv
import io
import logging import logging
import os import os
import tarfile import tarfile
import psycopg2.extras
from nominatim.db.connection import connect from nominatim.db.connection import connect
from nominatim.db.async_connection import WorkerPool from nominatim.db.async_connection import WorkerPool
from nominatim.db.sql_preprocessor import SQLPreprocessor from nominatim.db.sql_preprocessor import SQLPreprocessor
from nominatim.errors import UsageError
LOG = logging.getLogger() LOG = logging.getLogger()
@@ -19,31 +23,46 @@ def handle_tarfile_or_directory(data_dir):
tar = None tar = None
if data_dir.endswith('.tar.gz'): if data_dir.endswith('.tar.gz'):
tar = tarfile.open(data_dir) try:
sql_files = [i for i in tar.getmembers() if i.name.endswith('.sql')] tar = tarfile.open(data_dir)
LOG.warning("Found %d SQL files in tarfile with path %s", len(sql_files), data_dir) except tarfile.ReadError as err:
if not sql_files: LOG.fatal("Cannot open '%s'. Is this a tar file?", data_dir)
raise UsageError("Cannot open Tiger data file.") from err
csv_files = [i for i in tar.getmembers() if i.name.endswith('.csv')]
LOG.warning("Found %d CSV files in tarfile with path %s", len(csv_files), data_dir)
if not csv_files:
LOG.warning("Tiger data import selected but no files in tarfile's path %s", data_dir) LOG.warning("Tiger data import selected but no files in tarfile's path %s", data_dir)
return None, None return None, None
else: else:
files = os.listdir(data_dir) files = os.listdir(data_dir)
sql_files = [os.path.join(data_dir, i) for i in files if i.endswith('.sql')] csv_files = [os.path.join(data_dir, i) for i in files if i.endswith('.csv')]
LOG.warning("Found %d SQL files in path %s", len(sql_files), data_dir) LOG.warning("Found %d CSV files in path %s", len(csv_files), data_dir)
if not sql_files: if not csv_files:
LOG.warning("Tiger data import selected but no files found in path %s", data_dir) LOG.warning("Tiger data import selected but no files found in path %s", data_dir)
return None, None return None, None
return sql_files, tar return csv_files, tar
def handle_threaded_sql_statements(pool, file): def handle_threaded_sql_statements(pool, fd, analyzer):
""" Handles sql statement with multiplexing """ Handles sql statement with multiplexing
""" """
lines = 0 lines = 0
# Using pool of database connections to execute sql statements # Using pool of database connections to execute sql statements
for sql_query in file:
pool.next_free_worker().perform(sql_query) sql = "SELECT tiger_line_import(%s, %s, %s, %s, %s, %s)"
for row in csv.DictReader(fd, delimiter=';'):
try:
address = dict(street=row['street'], postcode=row['postcode'])
args = ('SRID=4326;' + row['geometry'],
int(row['from']), int(row['to']), row['interpolation'],
psycopg2.extras.Json(analyzer.process_place(dict(address=address))),
analyzer.normalize_postcode(row['postcode']))
except ValueError:
continue
pool.next_free_worker().perform(sql, args=args)
lines += 1 lines += 1
if lines == 1000: if lines == 1000:
@@ -51,31 +70,34 @@ def handle_threaded_sql_statements(pool, file):
lines = 0 lines = 0
def add_tiger_data(data_dir, config, threads): def add_tiger_data(data_dir, config, threads, tokenizer):
""" Import tiger data from directory or tar file `data dir`. """ Import tiger data from directory or tar file `data dir`.
""" """
dsn = config.get_libpq_dsn() dsn = config.get_libpq_dsn()
sql_files, tar = handle_tarfile_or_directory(data_dir) files, tar = handle_tarfile_or_directory(data_dir)
if not sql_files: if not files:
return return
with connect(dsn) as conn: with connect(dsn) as conn:
sql = SQLPreprocessor(conn, config) sql = SQLPreprocessor(conn, config)
sql.run_sql_file(conn, 'tiger_import_start.sql') sql.run_sql_file(conn, 'tiger_import_start.sql')
# Reading sql_files and then for each file line handling # Reading files and then for each file line handling
# sql_query in <threads - 1> chunks. # sql_query in <threads - 1> chunks.
place_threads = max(1, threads - 1) place_threads = max(1, threads - 1)
with WorkerPool(dsn, place_threads, ignore_sql_errors=True) as pool: with WorkerPool(dsn, place_threads, ignore_sql_errors=True) as pool:
for sql_file in sql_files: with tokenizer.name_analyzer() as analyzer:
if not tar: for fname in files:
file = open(sql_file) if not tar:
else: fd = open(fname)
file = tar.extractfile(sql_file) else:
fd = io.TextIOWrapper(tar.extractfile(fname))
handle_threaded_sql_statements(pool, file) handle_threaded_sql_statements(pool, fd, analyzer)
fd.close()
if tar: if tar:
tar.close() tar.close()

View File

@@ -2,60 +2,137 @@
Test for tiger data function Test for tiger data function
""" """
from pathlib import Path from pathlib import Path
from textwrap import dedent
import pytest import pytest
import tarfile import tarfile
from nominatim.tools import tiger_data, database_import from nominatim.tools import tiger_data, database_import
from nominatim.errors import UsageError
class MockTigerTable:
def __init__(self, conn):
self.conn = conn
with conn.cursor() as cur:
cur.execute("""CREATE TABLE tiger (linegeo GEOMETRY,
start INTEGER,
stop INTEGER,
interpol TEXT,
token_info JSONB,
postcode TEXT)""")
def count(self):
with self.conn.cursor() as cur:
return cur.scalar("SELECT count(*) FROM tiger")
def row(self):
with self.conn.cursor() as cur:
cur.execute("SELECT * FROM tiger LIMIT 1")
return cur.fetchone()
@pytest.fixture
def tiger_table(def_config, temp_db_conn, sql_preprocessor,
temp_db_with_extensions, tmp_path):
def_config.lib_dir.sql = tmp_path / 'sql'
def_config.lib_dir.sql.mkdir()
(def_config.lib_dir.sql / 'tiger_import_start.sql').write_text(
"""CREATE OR REPLACE FUNCTION tiger_line_import(linegeo GEOMETRY, start INTEGER,
stop INTEGER, interpol TEXT,
token_info JSONB, postcode TEXT)
RETURNS INTEGER AS $$
INSERT INTO tiger VALUES(linegeo, start, stop, interpol, token_info, postcode) RETURNING 1
$$ LANGUAGE SQL;""")
(def_config.lib_dir.sql / 'tiger_import_finish.sql').write_text(
"""DROP FUNCTION tiger_line_import (linegeo GEOMETRY, in_startnumber INTEGER,
in_endnumber INTEGER, interpolationtype TEXT,
token_info JSONB, in_postcode TEXT);""")
return MockTigerTable(temp_db_conn)
@pytest.fixture
def csv_factory(tmp_path):
def _mk_file(fname, hnr_from=1, hnr_to=9, interpol='odd', street='Main St',
city='Newtown', state='AL', postcode='12345',
geometry='LINESTRING(-86.466995 32.428956,-86.466923 32.428933)'):
(tmp_path / (fname + '.csv')).write_text(dedent("""\
from;to;interpolation;street;city;state;postcode;geometry
{};{};{};{};{};{};{};{}
""".format(hnr_from, hnr_to, interpol, street, city, state,
postcode, geometry)))
return _mk_file
@pytest.mark.parametrize("threads", (1, 5)) @pytest.mark.parametrize("threads", (1, 5))
def test_add_tiger_data(def_config, tmp_path, sql_preprocessor, def test_add_tiger_data(def_config, src_dir, tiger_table, tokenizer_mock, threads):
temp_db_cursor, threads, temp_db_with_extensions): tiger_data.add_tiger_data(str(src_dir / 'test' / 'testdb' / 'tiger'),
temp_db_cursor.execute('CREATE TABLE place (id INT)') def_config, threads, tokenizer_mock())
sqlfile = tmp_path / '1010.sql'
sqlfile.write_text("""INSERT INTO place values (1);
INSERT INTO non_existant_table values (1);""")
tiger_data.add_tiger_data(str(tmp_path), def_config, threads)
assert temp_db_cursor.table_rows('place') == 1 assert tiger_table.count() == 6213
@pytest.mark.parametrize("threads", (1, 5)) def test_add_tiger_data_no_files(def_config, tiger_table, tokenizer_mock,
def test_add_tiger_data_bad_file(def_config, tmp_path, sql_preprocessor, tmp_path):
temp_db_cursor, threads, temp_db_with_extensions): tiger_data.add_tiger_data(str(tmp_path), def_config, 1, tokenizer_mock())
temp_db_cursor.execute('CREATE TABLE place (id INT)')
sqlfile = tmp_path / '1010.txt' assert tiger_table.count() == 0
def test_add_tiger_data_bad_file(def_config, tiger_table, tokenizer_mock,
tmp_path):
sqlfile = tmp_path / '1010.csv'
sqlfile.write_text("""Random text""") sqlfile.write_text("""Random text""")
tiger_data.add_tiger_data(str(tmp_path), def_config, threads)
assert temp_db_cursor.table_rows('place') == 0 tiger_data.add_tiger_data(str(tmp_path), def_config, 1, tokenizer_mock())
assert tiger_table.count() == 0
def test_add_tiger_data_hnr_nan(def_config, tiger_table, tokenizer_mock,
csv_factory, tmp_path):
csv_factory('file1', hnr_from=99)
csv_factory('file2', hnr_from='L12')
csv_factory('file3', hnr_to='12.4')
tiger_data.add_tiger_data(str(tmp_path), def_config, 1, tokenizer_mock())
assert tiger_table.count() == 1
assert tiger_table.row()['start'] == 99
@pytest.mark.parametrize("threads", (1, 5)) @pytest.mark.parametrize("threads", (1, 5))
def test_add_tiger_data_tarfile(def_config, tmp_path, temp_db_cursor, def test_add_tiger_data_tarfile(def_config, tiger_table, tokenizer_mock,
threads, temp_db_with_extensions, sql_preprocessor): tmp_path, src_dir, threads):
temp_db_cursor.execute('CREATE TABLE place (id INT)')
sqlfile = tmp_path / '1010.sql'
sqlfile.write_text("""INSERT INTO place values (1);
INSERT INTO non_existant_table values (1);""")
tar = tarfile.open(str(tmp_path / 'sample.tar.gz'), "w:gz") tar = tarfile.open(str(tmp_path / 'sample.tar.gz'), "w:gz")
tar.add(sqlfile) tar.add(str(src_dir / 'test' / 'testdb' / 'tiger' / '01001.csv'))
tar.close() tar.close()
tiger_data.add_tiger_data(str(tmp_path / 'sample.tar.gz'), def_config, threads)
assert temp_db_cursor.table_rows('place') == 1 tiger_data.add_tiger_data(str(tmp_path / 'sample.tar.gz'), def_config, 1,
tokenizer_mock())
assert tiger_table.count() == 6213
@pytest.mark.parametrize("threads", (1, 5)) def test_add_tiger_data_bad_tarfile(def_config, tiger_table, tokenizer_mock,
def test_add_tiger_data_bad_tarfile(def_config, tmp_path, temp_db_cursor, threads, tmp_path):
temp_db_with_extensions, sql_preprocessor): tarfile = tmp_path / 'sample.tar.gz'
temp_db_cursor.execute('CREATE TABLE place (id INT)') tarfile.write_text("""Random text""")
sqlfile = tmp_path / '1010.txt'
sqlfile.write_text("""Random text""") with pytest.raises(UsageError):
tiger_data.add_tiger_data(str(tarfile), def_config, 1, tokenizer_mock())
def test_add_tiger_data_empty_tarfile(def_config, tiger_table, tokenizer_mock,
tmp_path, src_dir):
tar = tarfile.open(str(tmp_path / 'sample.tar.gz'), "w:gz") tar = tarfile.open(str(tmp_path / 'sample.tar.gz'), "w:gz")
tar.add(sqlfile) tar.add(__file__)
tar.close() tar.close()
tiger_data.add_tiger_data(str(tmp_path / 'sample.tar.gz'), def_config, threads)
assert temp_db_cursor.table_rows('place') == 0 tiger_data.add_tiger_data(str(tmp_path / 'sample.tar.gz'), def_config, 1,
tokenizer_mock())
assert tiger_table.count() == 0

6214
test/testdb/tiger/01001.csv Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff