diff --git a/nominatim/db/async_connection.py b/nominatim/db/async_connection.py index a4f55496..db4b89ce 100644 --- a/nominatim/db/async_connection.py +++ b/nominatim/db/async_connection.py @@ -6,6 +6,9 @@ """ Database helper functions for the indexer. """ import logging +import select +import time + import psycopg2 from psycopg2.extras import wait_select @@ -25,8 +28,9 @@ class DeadlockHandler: normally. """ - def __init__(self, handler): + def __init__(self, handler, ignore_sql_errors=False): self.handler = handler + self.ignore_sql_errors = ignore_sql_errors def __enter__(self): pass @@ -41,6 +45,11 @@ class DeadlockHandler: if exc_value.pgcode == '40P01': self.handler() return True + + if self.ignore_sql_errors and isinstance(exc_value, psycopg2.Error): + LOG.info("SQL error ignored: %s", exc_value) + return True + return False @@ -48,10 +57,11 @@ class DBConnection: """ A single non-blocking database connection. """ - def __init__(self, dsn, cursor_factory=None): + def __init__(self, dsn, cursor_factory=None, ignore_sql_errors=False): self.current_query = None self.current_params = None self.dsn = dsn + self.ignore_sql_errors = ignore_sql_errors self.conn = None self.cursor = None @@ -98,7 +108,7 @@ class DBConnection: """ Block until any pending operation is done. """ while True: - with DeadlockHandler(self._deadlock_handler): + with DeadlockHandler(self._deadlock_handler, self.ignore_sql_errors): wait_select(self.conn) self.current_query = None return @@ -125,9 +135,78 @@ class DBConnection: if self.current_query is None: return True - with DeadlockHandler(self._deadlock_handler): + with DeadlockHandler(self._deadlock_handler, self.ignore_sql_errors): if self.conn.poll() == psycopg2.extensions.POLL_OK: self.current_query = None return True return False + + +class WorkerPool: + """ A pool of asynchronous database connections. + + The pool may be used as a context manager. + """ + REOPEN_CONNECTIONS_AFTER = 100000 + + def __init__(self, dsn, pool_size, ignore_sql_errors=False): + self.threads = [DBConnection(dsn, ignore_sql_errors=ignore_sql_errors) + for _ in range(pool_size)] + self.free_workers = self._yield_free_worker() + self.wait_time = 0 + + + def finish_all(self): + """ Wait for all connection to finish. + """ + for thread in self.threads: + while not thread.is_done(): + thread.wait() + + self.free_workers = self._yield_free_worker() + + def close(self): + """ Close all connections and clear the pool. + """ + for thread in self.threads: + thread.close() + self.threads = [] + self.free_workers = None + + + def next_free_worker(self): + """ Get the next free connection. + """ + return next(self.free_workers) + + + def _yield_free_worker(self): + ready = self.threads + command_stat = 0 + while True: + for thread in ready: + if thread.is_done(): + command_stat += 1 + yield thread + + if command_stat > self.REOPEN_CONNECTIONS_AFTER: + for thread in self.threads: + while not thread.is_done(): + thread.wait() + thread.connect() + ready = self.threads + command_stat = 0 + else: + tstart = time.time() + _, ready, _ = select.select([], self.threads, []) + self.wait_time += time.time() - tstart + + + def __enter__(self): + return self + + + def __exit__(self, exc_type, exc_value, traceback): + self.finish_all() + self.close() diff --git a/nominatim/indexer/indexer.py b/nominatim/indexer/indexer.py index b7673aba..5ab0eac3 100644 --- a/nominatim/indexer/indexer.py +++ b/nominatim/indexer/indexer.py @@ -2,14 +2,13 @@ Main work horse for indexing (computing addresses) the database. """ import logging -import select import time import psycopg2.extras from nominatim.indexer.progress import ProgressLogger from nominatim.indexer import runners -from nominatim.db.async_connection import DBConnection +from nominatim.db.async_connection import DBConnection, WorkerPool from nominatim.db.connection import connect LOG = logging.getLogger() @@ -81,73 +80,6 @@ class PlaceFetcher: self.conn.wait() self.close() -class WorkerPool: - """ A pool of asynchronous database connections. - - The pool may be used as a context manager. - """ - REOPEN_CONNECTIONS_AFTER = 100000 - - def __init__(self, dsn, pool_size): - self.threads = [DBConnection(dsn) for _ in range(pool_size)] - self.free_workers = self._yield_free_worker() - self.wait_time = 0 - - - def finish_all(self): - """ Wait for all connection to finish. - """ - for thread in self.threads: - while not thread.is_done(): - thread.wait() - - self.free_workers = self._yield_free_worker() - - def close(self): - """ Close all connections and clear the pool. - """ - for thread in self.threads: - thread.close() - self.threads = [] - self.free_workers = None - - - def next_free_worker(self): - """ Get the next free connection. - """ - return next(self.free_workers) - - - def _yield_free_worker(self): - ready = self.threads - command_stat = 0 - while True: - for thread in ready: - if thread.is_done(): - command_stat += 1 - yield thread - - if command_stat > self.REOPEN_CONNECTIONS_AFTER: - for thread in self.threads: - while not thread.is_done(): - thread.wait() - thread.connect() - ready = self.threads - command_stat = 0 - else: - tstart = time.time() - _, ready, _ = select.select([], self.threads, []) - self.wait_time += time.time() - tstart - - - def __enter__(self): - return self - - - def __exit__(self, exc_type, exc_value, traceback): - self.finish_all() - self.close() - class Indexer: """ Main indexing routine. diff --git a/nominatim/tools/tiger_data.py b/nominatim/tools/tiger_data.py index 07772c70..fbcdb077 100644 --- a/nominatim/tools/tiger_data.py +++ b/nominatim/tools/tiger_data.py @@ -4,10 +4,9 @@ Functions for importing tiger data and handling tarbar and directory files import logging import os import tarfile -import selectors from nominatim.db.connection import connect -from nominatim.db.async_connection import DBConnection +from nominatim.db.async_connection import WorkerPool from nominatim.db.sql_preprocessor import SQLPreprocessor @@ -37,44 +36,20 @@ def handle_tarfile_or_directory(data_dir): return sql_files, tar -def handle_threaded_sql_statements(sel, file): +def handle_threaded_sql_statements(pool, file): """ Handles sql statement with multiplexing """ lines = 0 - end_of_file = False # Using pool of database connections to execute sql statements - while not end_of_file: - for key, _ in sel.select(1): - conn = key.data - try: - if conn.is_done(): - sql_query = file.readline() - lines += 1 - if not sql_query: - end_of_file = True - break - conn.perform(sql_query) - if lines == 1000: - print('. ', end='', flush=True) - lines = 0 - except Exception as exc: # pylint: disable=broad-except - LOG.info('Wrong SQL statement: %s', exc) + for sql_query in file: + pool.next_free_worker().perform(sql_query) -def handle_unregister_connection_pool(sel, place_threads): - """ Handles unregistering pool of connections - """ + lines += 1 + if lines == 1000: + print('.', end='', flush=True) + lines = 0 - while place_threads > 0: - for key, _ in sel.select(1): - conn = key.data - sel.unregister(conn) - try: - conn.wait() - except Exception as exc: # pylint: disable=broad-except - LOG.info('Wrong SQL statement: %s', exc) - conn.close() - place_threads -= 1 def add_tiger_data(data_dir, config, threads): """ Import tiger data from directory or tar file `data dir`. @@ -91,25 +66,16 @@ def add_tiger_data(data_dir, config, threads): # Reading sql_files and then for each file line handling # sql_query in chunks. - sel = selectors.DefaultSelector() place_threads = max(1, threads - 1) - # Creates a pool of database connections - for _ in range(place_threads): - conn = DBConnection(dsn) - conn.connect() - sel.register(conn, selectors.EVENT_WRITE, conn) + with WorkerPool(dsn, place_threads, ignore_sql_errors=True) as pool: + for sql_file in sql_files: + if not tar: + file = open(sql_file) + else: + file = tar.extractfile(sql_file) - for sql_file in sql_files: - if not tar: - file = open(sql_file) - else: - file = tar.extractfile(sql_file) - - handle_threaded_sql_statements(sel, file) - - # Unregistering pool of database connections - handle_unregister_connection_pool(sel, place_threads) + handle_threaded_sql_statements(pool, file) if tar: tar.close() diff --git a/test/python/test_db_async_connection.py b/test/python/test_db_async_connection.py index b52f7053..330b86f7 100644 --- a/test/python/test_db_async_connection.py +++ b/test/python/test_db_async_connection.py @@ -56,13 +56,21 @@ def test_bad_query(conn): conn.wait() +def test_bad_query_ignore(temp_db): + with closing(DBConnection('dbname=' + temp_db, ignore_sql_errors=True)) as conn: + conn.connect() + + conn.perform('SELECT efasfjsea') + + conn.wait() + + def exec_with_deadlock(cur, sql, detector): with DeadlockHandler(lambda *args: detector.append(1)): cur.execute(sql) def test_deadlock(simple_conns): - print(psycopg2.__version__) cur1, cur2 = simple_conns cur1.execute("""CREATE TABLE t1 (id INT PRIMARY KEY, t TEXT);