Resolve conflicts

This commit is contained in:
AntoJvlt
2021-05-17 13:52:35 +02:00
37 changed files with 7285 additions and 6729 deletions

View File

@@ -13,7 +13,6 @@ from nominatim.tools.exec_utils import run_legacy_script, run_php_server
from nominatim.errors import UsageError
from nominatim import clicmd
from nominatim.clicmd.args import NominatimArgs
from nominatim.tools import tiger_data
LOG = logging.getLogger()
@@ -147,9 +146,14 @@ class UpdateAddData:
@staticmethod
def run(args):
from nominatim.tokenizer import factory as tokenizer_factory
from nominatim.tools import tiger_data
if args.tiger_data:
tokenizer = tokenizer_factory.get_tokenizer_for_db(args.config)
return tiger_data.add_tiger_data(args.tiger_data,
args.config, args.threads or 1)
args.config, args.threads or 1,
tokenizer)
params = ['update.php']
if args.file:

View File

@@ -45,12 +45,19 @@ class UpdateRefresh:
@staticmethod
def run(args):
from ..tools import refresh
from ..tools import refresh, postcodes
from ..tokenizer import factory as tokenizer_factory
from ..indexer.indexer import Indexer
tokenizer = tokenizer_factory.get_tokenizer_for_db(args.config)
if args.postcodes:
LOG.warning("Update postcodes centroid")
refresh.update_postcodes(args.config.get_libpq_dsn(), args.sqllib_dir)
postcodes.update_postcodes(args.config.get_libpq_dsn(),
args.project_dir, tokenizer)
indexer = Indexer(args.config.get_libpq_dsn(), tokenizer,
args.threads or 1)
indexer.index_postcodes()
if args.word_counts:
LOG.warning('Recompute frequency of full-word search terms')
@@ -67,7 +74,6 @@ class UpdateRefresh:
with connect(args.config.get_libpq_dsn()) as conn:
refresh.create_functions(conn, args.config,
args.diffs, args.enable_debug_statements)
tokenizer = tokenizer_factory.get_tokenizer_for_db(args.config)
tokenizer.update_sql_functions(args.config)
if args.wiki_data:
@@ -88,6 +94,6 @@ class UpdateRefresh:
if args.website:
webdir = args.project_dir / 'website'
LOG.warning('Setting up website directory at %s', webdir)
refresh.setup_website(webdir, args.config)
with connect(args.config.get_libpq_dsn()) as conn:
refresh.setup_website(webdir, args.config, conn)
return 0

View File

@@ -116,8 +116,8 @@ class SetupAll:
if args.continue_at is None or args.continue_at == 'load-data':
LOG.warning('Calculate postcodes')
postcodes.import_postcodes(args.config.get_libpq_dsn(), args.project_dir,
tokenizer)
postcodes.update_postcodes(args.config.get_libpq_dsn(),
args.project_dir, tokenizer)
if args.continue_at is None or args.continue_at in ('load-data', 'indexing'):
if args.continue_at is not None and args.continue_at != 'load-data':
@@ -139,7 +139,8 @@ class SetupAll:
webdir = args.project_dir / 'website'
LOG.warning('Setup website at %s', webdir)
refresh.setup_website(webdir, args.config)
with connect(args.config.get_libpq_dsn()) as conn:
refresh.setup_website(webdir, args.config, conn)
with connect(args.config.get_libpq_dsn()) as conn:
try:

View File

@@ -6,6 +6,9 @@
""" Database helper functions for the indexer.
"""
import logging
import select
import time
import psycopg2
from psycopg2.extras import wait_select
@@ -25,8 +28,9 @@ class DeadlockHandler:
normally.
"""
def __init__(self, handler):
def __init__(self, handler, ignore_sql_errors=False):
self.handler = handler
self.ignore_sql_errors = ignore_sql_errors
def __enter__(self):
pass
@@ -41,6 +45,11 @@ class DeadlockHandler:
if exc_value.pgcode == '40P01':
self.handler()
return True
if self.ignore_sql_errors and isinstance(exc_value, psycopg2.Error):
LOG.info("SQL error ignored: %s", exc_value)
return True
return False
@@ -48,10 +57,11 @@ class DBConnection:
""" A single non-blocking database connection.
"""
def __init__(self, dsn, cursor_factory=None):
def __init__(self, dsn, cursor_factory=None, ignore_sql_errors=False):
self.current_query = None
self.current_params = None
self.dsn = dsn
self.ignore_sql_errors = ignore_sql_errors
self.conn = None
self.cursor = None
@@ -98,7 +108,7 @@ class DBConnection:
""" Block until any pending operation is done.
"""
while True:
with DeadlockHandler(self._deadlock_handler):
with DeadlockHandler(self._deadlock_handler, self.ignore_sql_errors):
wait_select(self.conn)
self.current_query = None
return
@@ -125,9 +135,78 @@ class DBConnection:
if self.current_query is None:
return True
with DeadlockHandler(self._deadlock_handler):
with DeadlockHandler(self._deadlock_handler, self.ignore_sql_errors):
if self.conn.poll() == psycopg2.extensions.POLL_OK:
self.current_query = None
return True
return False
class WorkerPool:
""" A pool of asynchronous database connections.
The pool may be used as a context manager.
"""
REOPEN_CONNECTIONS_AFTER = 100000
def __init__(self, dsn, pool_size, ignore_sql_errors=False):
self.threads = [DBConnection(dsn, ignore_sql_errors=ignore_sql_errors)
for _ in range(pool_size)]
self.free_workers = self._yield_free_worker()
self.wait_time = 0
def finish_all(self):
""" Wait for all connection to finish.
"""
for thread in self.threads:
while not thread.is_done():
thread.wait()
self.free_workers = self._yield_free_worker()
def close(self):
""" Close all connections and clear the pool.
"""
for thread in self.threads:
thread.close()
self.threads = []
self.free_workers = None
def next_free_worker(self):
""" Get the next free connection.
"""
return next(self.free_workers)
def _yield_free_worker(self):
ready = self.threads
command_stat = 0
while True:
for thread in ready:
if thread.is_done():
command_stat += 1
yield thread
if command_stat > self.REOPEN_CONNECTIONS_AFTER:
for thread in self.threads:
while not thread.is_done():
thread.wait()
thread.connect()
ready = self.threads
command_stat = 0
else:
tstart = time.time()
_, ready, _ = select.select([], self.threads, [])
self.wait_time += time.time() - tstart
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.finish_all()
self.close()

View File

@@ -2,14 +2,13 @@
Main work horse for indexing (computing addresses) the database.
"""
import logging
import select
import time
import psycopg2.extras
from nominatim.indexer.progress import ProgressLogger
from nominatim.indexer import runners
from nominatim.db.async_connection import DBConnection
from nominatim.db.async_connection import DBConnection, WorkerPool
from nominatim.db.connection import connect
LOG = logging.getLogger()
@@ -81,73 +80,6 @@ class PlaceFetcher:
self.conn.wait()
self.close()
class WorkerPool:
""" A pool of asynchronous database connections.
The pool may be used as a context manager.
"""
REOPEN_CONNECTIONS_AFTER = 100000
def __init__(self, dsn, pool_size):
self.threads = [DBConnection(dsn) for _ in range(pool_size)]
self.free_workers = self._yield_free_worker()
self.wait_time = 0
def finish_all(self):
""" Wait for all connection to finish.
"""
for thread in self.threads:
while not thread.is_done():
thread.wait()
self.free_workers = self._yield_free_worker()
def close(self):
""" Close all connections and clear the pool.
"""
for thread in self.threads:
thread.close()
self.threads = []
self.free_workers = None
def next_free_worker(self):
""" Get the next free connection.
"""
return next(self.free_workers)
def _yield_free_worker(self):
ready = self.threads
command_stat = 0
while True:
for thread in ready:
if thread.is_done():
command_stat += 1
yield thread
if command_stat > self.REOPEN_CONNECTIONS_AFTER:
for thread in self.threads:
while not thread.is_done():
thread.wait()
thread.connect()
ready = self.threads
command_stat = 0
else:
tstart = time.time()
_, ready, _ = select.select([], self.threads, [])
self.wait_time += time.time() - tstart
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.finish_all()
self.close()
class Indexer:
""" Main indexing routine.

View File

@@ -263,6 +263,16 @@ class LegacyICUNameAnalyzer:
"""
return self.normalizer.transliterate(phrase)
@staticmethod
def normalize_postcode(postcode):
""" Convert the postcode to a standardized form.
This function must yield exactly the same result as the SQL function
'token_normalized_postcode()'.
"""
return postcode.strip().upper()
@functools.lru_cache(maxsize=1024)
def make_standard_word(self, name):
""" Create the normalised version of the input.
@@ -285,25 +295,44 @@ class LegacyICUNameAnalyzer:
return self.transliterator.transliterate(hnr)
def add_postcodes_from_db(self):
""" Add postcodes from the location_postcode table to the word table.
def update_postcodes_from_db(self):
""" Update postcode tokens in the word table from the location_postcode
table.
"""
to_delete = []
copystr = io.StringIO()
with self.conn.cursor() as cur:
cur.execute("SELECT distinct(postcode) FROM location_postcode")
for (postcode, ) in cur:
copystr.write(postcode)
copystr.write('\t ')
copystr.write(self.transliterator.transliterate(postcode))
copystr.write('\tplace\tpostcode\t0\n')
# This finds us the rows in location_postcode and word that are
# missing in the other table.
cur.execute("""SELECT * FROM
(SELECT pc, word FROM
(SELECT distinct(postcode) as pc FROM location_postcode) p
FULL JOIN
(SELECT word FROM word
WHERE class ='place' and type = 'postcode') w
ON pc = word) x
WHERE pc is null or word is null""")
copystr.seek(0)
cur.copy_from(copystr, 'word',
columns=['word', 'word_token', 'class', 'type',
'search_name_count'])
# Don't really need an ID for postcodes....
# cur.execute("""UPDATE word SET word_id = nextval('seq_word')
# WHERE word_id is null and type = 'postcode'""")
for postcode, word in cur:
if postcode is None:
to_delete.append(word)
else:
copystr.write(postcode)
copystr.write('\t ')
copystr.write(self.transliterator.transliterate(postcode))
copystr.write('\tplace\tpostcode\t0\n')
if to_delete:
cur.execute("""DELETE FROM WORD
WHERE class ='place' and type = 'postcode'
and word = any(%s)
""", (to_delete, ))
if copystr.getvalue():
copystr.seek(0)
cur.copy_from(copystr, 'word',
columns=['word', 'word_token', 'class', 'type',
'search_name_count'])
def update_special_phrases(self, phrases, should_replace):
@@ -435,22 +464,25 @@ class LegacyICUNameAnalyzer:
def _add_postcode(self, postcode):
""" Make sure the normalized postcode is present in the word table.
"""
if re.search(r'[:,;]', postcode) is None and not postcode in self._cache.postcodes:
term = self.make_standard_word(postcode)
if not term:
return
if re.search(r'[:,;]', postcode) is None:
postcode = self.normalize_postcode(postcode)
with self.conn.cursor() as cur:
# no word_id needed for postcodes
cur.execute("""INSERT INTO word (word, word_token, class, type,
search_name_count)
(SELECT pc, %s, 'place', 'postcode', 0
FROM (VALUES (%s)) as v(pc)
WHERE NOT EXISTS
(SELECT * FROM word
WHERE word = pc and class='place' and type='postcode'))
""", (' ' + term, postcode))
self._cache.postcodes.add(postcode)
if postcode not in self._cache.postcodes:
term = self.make_standard_word(postcode)
if not term:
return
with self.conn.cursor() as cur:
# no word_id needed for postcodes
cur.execute("""INSERT INTO word (word, word_token, class, type,
search_name_count)
(SELECT pc, %s, 'place', 'postcode', 0
FROM (VALUES (%s)) as v(pc)
WHERE NOT EXISTS
(SELECT * FROM word
WHERE word = pc and class='place' and type='postcode'))
""", (' ' + term, postcode))
self._cache.postcodes.add(postcode)
@staticmethod
def _split_housenumbers(hnrs):

View File

@@ -305,13 +305,51 @@ class LegacyNameAnalyzer:
return self.normalizer.transliterate(phrase)
def add_postcodes_from_db(self):
""" Add postcodes from the location_postcode table to the word table.
@staticmethod
def normalize_postcode(postcode):
""" Convert the postcode to a standardized form.
This function must yield exactly the same result as the SQL function
'token_normalized_postcode()'.
"""
return postcode.strip().upper()
def update_postcodes_from_db(self):
""" Update postcode tokens in the word table from the location_postcode
table.
"""
with self.conn.cursor() as cur:
cur.execute("""SELECT count(create_postcode_id(pc))
FROM (SELECT distinct(postcode) as pc
FROM location_postcode) x""")
# This finds us the rows in location_postcode and word that are
# missing in the other table.
cur.execute("""SELECT * FROM
(SELECT pc, word FROM
(SELECT distinct(postcode) as pc FROM location_postcode) p
FULL JOIN
(SELECT word FROM word
WHERE class ='place' and type = 'postcode') w
ON pc = word) x
WHERE pc is null or word is null""")
to_delete = []
to_add = []
for postcode, word in cur:
if postcode is None:
to_delete.append(word)
else:
to_add.append(postcode)
if to_delete:
cur.execute("""DELETE FROM WORD
WHERE class ='place' and type = 'postcode'
and word = any(%s)
""", (to_delete, ))
if to_add:
cur.execute("""SELECT count(create_postcode_id(pc))
FROM unnest(%s) as pc
""", (to_add, ))
def update_special_phrases(self, phrases, should_replace):
@@ -416,12 +454,8 @@ class LegacyNameAnalyzer:
def _add_postcode(self, postcode):
""" Make sure the normalized postcode is present in the word table.
"""
def _create_postcode_from_db(pcode):
with self.conn.cursor() as cur:
cur.execute('SELECT create_postcode_id(%s)', (pcode, ))
if re.search(r'[:,;]', postcode) is None:
self._cache.postcodes.get(postcode.strip().upper(), _create_postcode_from_db)
self._cache.add_postcode(self.conn, self.normalize_postcode(postcode))
class _TokenInfo:
@@ -552,16 +586,19 @@ class _TokenCache:
FROM generate_series(1, 100) as i""")
self._cached_housenumbers = {str(r[0]) : r[1] for r in cur}
# Get postcodes that are already saved
postcodes = OrderedDict()
with conn.cursor() as cur:
cur.execute("""SELECT word FROM word
WHERE class ='place' and type = 'postcode'""")
for row in cur:
postcodes[row[0]] = None
self.postcodes = _LRU(maxsize=32, init_data=postcodes)
# For postcodes remember the ones that have already been added
self.postcodes = set()
def get_housenumber(self, number):
""" Get a housenumber token from the cache.
"""
return self._cached_housenumbers.get(number)
def add_postcode(self, conn, postcode):
""" Make sure the given postcode is in the database.
"""
if postcode not in self.postcodes:
with conn.cursor() as cur:
cur.execute('SELECT create_postcode_id(%s)', (postcode, ))
self.postcodes.add(postcode)

View File

@@ -185,8 +185,8 @@ def install_legacy_tokenizer(conn, config, **_):
WHERE table_name = %s
and column_name = 'token_info'""",
(table, ))
if has_column == 0:
cur.execute('ALTER TABLE {} ADD COLUMN token_info JSONB'.format(table))
if has_column == 0:
cur.execute('ALTER TABLE {} ADD COLUMN token_info JSONB'.format(table))
tokenizer = tokenizer_factory.create_tokenizer(config, init_db=False,
module_name='legacy')

View File

@@ -2,80 +2,196 @@
Functions for importing, updating and otherwise maintaining the table
of artificial postcode centroids.
"""
import csv
import gzip
import logging
from math import isfinite
from psycopg2.extras import execute_values
from nominatim.db.utils import execute_file
from nominatim.db.connection import connect
def import_postcodes(dsn, project_dir, tokenizer):
""" Set up the initial list of postcodes.
LOG = logging.getLogger()
def _to_float(num, max_value):
""" Convert the number in string into a float. The number is expected
to be in the range of [-max_value, max_value]. Otherwise rises a
ValueError.
"""
num = float(num)
if not isfinite(num) or num <= -max_value or num >= max_value:
raise ValueError()
return num
class _CountryPostcodesCollector:
""" Collector for postcodes of a single country.
"""
with connect(dsn) as conn:
conn.drop_table('gb_postcode')
conn.drop_table('us_postcode')
def __init__(self, country):
self.country = country
self.collected = dict()
def add(self, postcode, x, y):
""" Add the given postcode to the collection cache. If the postcode
already existed, it is overwritten with the new centroid.
"""
self.collected[postcode] = (x, y)
def commit(self, conn, analyzer, project_dir):
""" Update postcodes for the country from the postcodes selected so far
as well as any externally supplied postcodes.
"""
self._update_from_external(analyzer, project_dir)
to_add, to_delete, to_update = self._compute_changes(conn)
LOG.info("Processing country '%s' (%s added, %s deleted, %s updated).",
self.country, len(to_add), len(to_delete), len(to_update))
with conn.cursor() as cur:
cur.execute("""CREATE TABLE gb_postcode (
id integer,
postcode character varying(9),
geometry GEOMETRY(Point, 4326))""")
if to_add:
execute_values(cur,
"""INSERT INTO location_postcode
(place_id, indexed_status, country_code,
postcode, geometry) VALUES %s""",
to_add,
template="""(nextval('seq_place'), 1, '{}',
%s, 'SRID=4326;POINT(%s %s)')
""".format(self.country))
if to_delete:
cur.execute("""DELETE FROM location_postcode
WHERE country_code = %s and postcode = any(%s)
""", (self.country, to_delete))
if to_update:
execute_values(cur,
"""UPDATE location_postcode
SET indexed_status = 2,
geometry = ST_SetSRID(ST_Point(v.x, v.y), 4326)
FROM (VALUES %s) AS v (pc, x, y)
WHERE country_code = '{}' and postcode = pc
""".format(self.country),
to_update)
def _compute_changes(self, conn):
""" Compute which postcodes from the collected postcodes have to be
added or modified and which from the location_postcode table
have to be deleted.
"""
to_update = []
to_delete = []
with conn.cursor() as cur:
cur.execute("""CREATE TABLE us_postcode (
postcode text,
x double precision,
y double precision)""")
conn.commit()
cur.execute("""SELECT postcode, ST_X(geometry), ST_Y(geometry)
FROM location_postcode
WHERE country_code = %s""",
(self.country, ))
for postcode, x, y in cur:
newx, newy = self.collected.pop(postcode, (None, None))
if newx is not None:
dist = (x - newx)**2 + (y - newy)**2
if dist > 0.0000001:
to_update.append((postcode, newx, newy))
else:
to_delete.append(postcode)
gb_postcodes = project_dir / 'gb_postcode_data.sql.gz'
if gb_postcodes.is_file():
execute_file(dsn, gb_postcodes)
to_add = [(k, v[0], v[1]) for k, v in self.collected.items()]
self.collected = []
us_postcodes = project_dir / 'us_postcode_data.sql.gz'
if us_postcodes.is_file():
execute_file(dsn, us_postcodes)
return to_add, to_delete, to_update
with conn.cursor() as cur:
cur.execute("TRUNCATE location_postcode")
cur.execute("""
INSERT INTO location_postcode
(place_id, indexed_status, country_code, postcode, geometry)
SELECT nextval('seq_place'), 1, country_code,
token_normalized_postcode(address->'postcode') as pc,
ST_Centroid(ST_Collect(ST_Centroid(geometry)))
FROM placex
WHERE address ? 'postcode'
and token_normalized_postcode(address->'postcode') is not null
AND geometry IS NOT null
GROUP BY country_code, pc
""")
cur.execute("""
INSERT INTO location_postcode
(place_id, indexed_status, country_code, postcode, geometry)
SELECT nextval('seq_place'), 1, 'us',
token_normalized_postcode(postcode),
ST_SetSRID(ST_Point(x,y),4326)
FROM us_postcode WHERE token_normalized_postcode(postcode) NOT IN
(SELECT postcode FROM location_postcode
WHERE country_code = 'us')
""")
def _update_from_external(self, analyzer, project_dir):
""" Look for an external postcode file for the active country in
the project directory and add missing postcodes when found.
"""
csvfile = self._open_external(project_dir)
if csvfile is None:
return
cur.execute("""
INSERT INTO location_postcode
(place_id, indexed_status, country_code, postcode, geometry)
SELECT nextval('seq_place'), 1, 'gb',
token_normalized_postcode(postcode), geometry
FROM gb_postcode WHERE token_normalized_postcode(postcode) NOT IN
(SELECT postcode FROM location_postcode
WHERE country_code = 'gb')
""")
try:
reader = csv.DictReader(csvfile)
for row in reader:
if 'postcode' not in row or 'lat' not in row or 'lon' not in row:
LOG.warning("Bad format for external postcode file for country '%s'."
" Ignored.", self.country)
return
postcode = analyzer.normalize_postcode(row['postcode'])
if postcode not in self.collected:
try:
self.collected[postcode] = (_to_float(row['lon'], 180),
_to_float(row['lat'], 90))
except ValueError:
LOG.warning("Bad coordinates %s, %s in %s country postcode file.",
row['lat'], row['lon'], self.country)
cur.execute("""
DELETE FROM word WHERE class='place' and type='postcode'
and word NOT IN (SELECT postcode FROM location_postcode)
""")
conn.commit()
finally:
csvfile.close()
with tokenizer.name_analyzer() as analyzer:
analyzer.add_postcodes_from_db()
def _open_external(self, project_dir):
fname = project_dir / '{}_postcodes.csv'.format(self.country)
if fname.is_file():
LOG.info("Using external postcode file '%s'.", fname)
return open(fname, 'r')
fname = project_dir / '{}_postcodes.csv.gz'.format(self.country)
if fname.is_file():
LOG.info("Using external postcode file '%s'.", fname)
return gzip.open(fname, 'rt')
return None
def update_postcodes(dsn, project_dir, tokenizer):
""" Update the table of artificial postcodes.
Computes artificial postcode centroids from the placex table,
potentially enhances it with external data and then updates the
postcodes in the table 'location_postcode'.
"""
with tokenizer.name_analyzer() as analyzer:
with connect(dsn) as conn:
# First get the list of countries that currently have postcodes.
# (Doing this before starting to insert, so it is fast on import.)
with conn.cursor() as cur:
cur.execute("SELECT DISTINCT country_code FROM location_postcode")
todo_countries = set((row[0] for row in cur))
# Recompute the list of valid postcodes from placex.
with conn.cursor(name="placex_postcodes") as cur:
cur.execute("""SELECT country_code, pc, ST_X(centroid), ST_Y(centroid)
FROM (
SELECT country_code,
token_normalized_postcode(address->'postcode') as pc,
ST_Centroid(ST_Collect(ST_Centroid(geometry))) as centroid
FROM placex
WHERE address ? 'postcode' and geometry IS NOT null
and country_code is not null
GROUP BY country_code, pc) xx
WHERE pc is not null
ORDER BY country_code, pc""")
collector = None
for country, postcode, x, y in cur:
if collector is None or country != collector.country:
if collector is not None:
collector.commit(conn, analyzer, project_dir)
collector = _CountryPostcodesCollector(country)
todo_countries.discard(country)
collector.add(postcode, x, y)
if collector is not None:
collector.commit(conn, analyzer, project_dir)
# Now handle any countries that are only in the postcode table.
for country in todo_countries:
_CountryPostcodesCollector(country).commit(conn, analyzer, project_dir)
conn.commit()
analyzer.update_postcodes_from_db()

View File

@@ -13,12 +13,6 @@ from nominatim.version import NOMINATIM_VERSION
LOG = logging.getLogger()
def update_postcodes(dsn, sql_dir):
""" Recalculate postcode centroids and add, remove and update entries in the
location_postcode table. `conn` is an opne connection to the database.
"""
execute_file(dsn, sql_dir / 'update-postcodes.sql')
def recompute_word_counts(dsn, sql_dir):
""" Compute the frequency of full-word search terms.
@@ -161,7 +155,7 @@ def recompute_importance(conn):
conn.commit()
def setup_website(basedir, config):
def setup_website(basedir, config, conn):
""" Create the website script stubs.
"""
if not basedir.exists():
@@ -193,5 +187,10 @@ def setup_website(basedir, config):
template += "\nrequire_once('{}/website/{{}}');\n".format(config.lib_dir.php)
search_name_table_exists = bool(conn and conn.table_exists('search_name'))
for script in WEBSITE_SCRIPTS:
(basedir / script).write_text(template.format(script), 'utf-8')
if not search_name_table_exists and script == 'search.php':
(basedir / script).write_text(template.format('reverse-only-search.php'), 'utf-8')
else:
(basedir / script).write_text(template.format(script), 'utf-8')

View File

@@ -1,15 +1,18 @@
"""
Functions for importing tiger data and handling tarbar and directory files
"""
import csv
import io
import logging
import os
import tarfile
import selectors
import psycopg2.extras
from nominatim.db.connection import connect
from nominatim.db.async_connection import DBConnection
from nominatim.db.async_connection import WorkerPool
from nominatim.db.sql_preprocessor import SQLPreprocessor
from nominatim.errors import UsageError
LOG = logging.getLogger()
@@ -20,96 +23,81 @@ def handle_tarfile_or_directory(data_dir):
tar = None
if data_dir.endswith('.tar.gz'):
tar = tarfile.open(data_dir)
sql_files = [i for i in tar.getmembers() if i.name.endswith('.sql')]
LOG.warning("Found %d SQL files in tarfile with path %s", len(sql_files), data_dir)
if not sql_files:
try:
tar = tarfile.open(data_dir)
except tarfile.ReadError as err:
LOG.fatal("Cannot open '%s'. Is this a tar file?", data_dir)
raise UsageError("Cannot open Tiger data file.") from err
csv_files = [i for i in tar.getmembers() if i.name.endswith('.csv')]
LOG.warning("Found %d CSV files in tarfile with path %s", len(csv_files), data_dir)
if not csv_files:
LOG.warning("Tiger data import selected but no files in tarfile's path %s", data_dir)
return None, None
else:
files = os.listdir(data_dir)
sql_files = [os.path.join(data_dir, i) for i in files if i.endswith('.sql')]
LOG.warning("Found %d SQL files in path %s", len(sql_files), data_dir)
if not sql_files:
csv_files = [os.path.join(data_dir, i) for i in files if i.endswith('.csv')]
LOG.warning("Found %d CSV files in path %s", len(csv_files), data_dir)
if not csv_files:
LOG.warning("Tiger data import selected but no files found in path %s", data_dir)
return None, None
return sql_files, tar
return csv_files, tar
def handle_threaded_sql_statements(sel, file):
def handle_threaded_sql_statements(pool, fd, analyzer):
""" Handles sql statement with multiplexing
"""
lines = 0
end_of_file = False
# Using pool of database connections to execute sql statements
while not end_of_file:
for key, _ in sel.select(1):
conn = key.data
try:
if conn.is_done():
sql_query = file.readline()
lines += 1
if not sql_query:
end_of_file = True
break
conn.perform(sql_query)
if lines == 1000:
print('. ', end='', flush=True)
lines = 0
except Exception as exc: # pylint: disable=broad-except
LOG.info('Wrong SQL statement: %s', exc)
def handle_unregister_connection_pool(sel, place_threads):
""" Handles unregistering pool of connections
"""
sql = "SELECT tiger_line_import(%s, %s, %s, %s, %s, %s)"
while place_threads > 0:
for key, _ in sel.select(1):
conn = key.data
sel.unregister(conn)
try:
conn.wait()
except Exception as exc: # pylint: disable=broad-except
LOG.info('Wrong SQL statement: %s', exc)
conn.close()
place_threads -= 1
for row in csv.DictReader(fd, delimiter=';'):
try:
address = dict(street=row['street'], postcode=row['postcode'])
args = ('SRID=4326;' + row['geometry'],
int(row['from']), int(row['to']), row['interpolation'],
psycopg2.extras.Json(analyzer.process_place(dict(address=address))),
analyzer.normalize_postcode(row['postcode']))
except ValueError:
continue
pool.next_free_worker().perform(sql, args=args)
def add_tiger_data(data_dir, config, threads):
lines += 1
if lines == 1000:
print('.', end='', flush=True)
lines = 0
def add_tiger_data(data_dir, config, threads, tokenizer):
""" Import tiger data from directory or tar file `data dir`.
"""
dsn = config.get_libpq_dsn()
sql_files, tar = handle_tarfile_or_directory(data_dir)
files, tar = handle_tarfile_or_directory(data_dir)
if not sql_files:
if not files:
return
with connect(dsn) as conn:
sql = SQLPreprocessor(conn, config)
sql.run_sql_file(conn, 'tiger_import_start.sql')
# Reading sql_files and then for each file line handling
# Reading files and then for each file line handling
# sql_query in <threads - 1> chunks.
sel = selectors.DefaultSelector()
place_threads = max(1, threads - 1)
# Creates a pool of database connections
for _ in range(place_threads):
conn = DBConnection(dsn)
conn.connect()
sel.register(conn, selectors.EVENT_WRITE, conn)
with WorkerPool(dsn, place_threads, ignore_sql_errors=True) as pool:
with tokenizer.name_analyzer() as analyzer:
for fname in files:
if not tar:
fd = open(fname)
else:
fd = io.TextIOWrapper(tar.extractfile(fname))
for sql_file in sql_files:
if not tar:
file = open(sql_file)
else:
file = tar.extractfile(sql_file)
handle_threaded_sql_statements(pool, fd, analyzer)
handle_threaded_sql_statements(sel, file)
# Unregistering pool of database connections
handle_unregister_connection_pool(sel, place_threads)
fd.close()
if tar:
tar.close()