use WorkerPool for Tiger data import

Requires adding an option that SQL errors are ignored.
This commit is contained in:
Sarah Hoffmann
2021-05-13 20:16:30 +02:00
parent b9a09129fa
commit 5feece64c1
3 changed files with 38 additions and 56 deletions

View File

@@ -28,8 +28,9 @@ class DeadlockHandler:
normally. normally.
""" """
def __init__(self, handler): def __init__(self, handler, ignore_sql_errors=False):
self.handler = handler self.handler = handler
self.ignore_sql_errors = ignore_sql_errors
def __enter__(self): def __enter__(self):
pass pass
@@ -44,6 +45,11 @@ class DeadlockHandler:
if exc_value.pgcode == '40P01': if exc_value.pgcode == '40P01':
self.handler() self.handler()
return True return True
if self.ignore_sql_errors and isinstance(exc_value, psycopg2.Error):
LOG.info("SQL error ignored: %s", exc_value)
return True
return False return False
@@ -51,10 +57,11 @@ class DBConnection:
""" A single non-blocking database connection. """ A single non-blocking database connection.
""" """
def __init__(self, dsn, cursor_factory=None): def __init__(self, dsn, cursor_factory=None, ignore_sql_errors=False):
self.current_query = None self.current_query = None
self.current_params = None self.current_params = None
self.dsn = dsn self.dsn = dsn
self.ignore_sql_errors = ignore_sql_errors
self.conn = None self.conn = None
self.cursor = None self.cursor = None
@@ -101,7 +108,7 @@ class DBConnection:
""" Block until any pending operation is done. """ Block until any pending operation is done.
""" """
while True: while True:
with DeadlockHandler(self._deadlock_handler): with DeadlockHandler(self._deadlock_handler, self.ignore_sql_errors):
wait_select(self.conn) wait_select(self.conn)
self.current_query = None self.current_query = None
return return
@@ -128,7 +135,7 @@ class DBConnection:
if self.current_query is None: if self.current_query is None:
return True return True
with DeadlockHandler(self._deadlock_handler): with DeadlockHandler(self._deadlock_handler, self.ignore_sql_errors):
if self.conn.poll() == psycopg2.extensions.POLL_OK: if self.conn.poll() == psycopg2.extensions.POLL_OK:
self.current_query = None self.current_query = None
return True return True
@@ -143,8 +150,9 @@ class WorkerPool:
""" """
REOPEN_CONNECTIONS_AFTER = 100000 REOPEN_CONNECTIONS_AFTER = 100000
def __init__(self, dsn, pool_size): def __init__(self, dsn, pool_size, ignore_sql_errors=False):
self.threads = [DBConnection(dsn) for _ in range(pool_size)] self.threads = [DBConnection(dsn, ignore_sql_errors=ignore_sql_errors)
for _ in range(pool_size)]
self.free_workers = self._yield_free_worker() self.free_workers = self._yield_free_worker()
self.wait_time = 0 self.wait_time = 0

View File

@@ -4,10 +4,9 @@ Functions for importing tiger data and handling tarbar and directory files
import logging import logging
import os import os
import tarfile import tarfile
import selectors
from nominatim.db.connection import connect from nominatim.db.connection import connect
from nominatim.db.async_connection import DBConnection from nominatim.db.async_connection import WorkerPool
from nominatim.db.sql_preprocessor import SQLPreprocessor from nominatim.db.sql_preprocessor import SQLPreprocessor
@@ -37,44 +36,20 @@ def handle_tarfile_or_directory(data_dir):
return sql_files, tar return sql_files, tar
def handle_threaded_sql_statements(sel, file): def handle_threaded_sql_statements(pool, file):
""" Handles sql statement with multiplexing """ Handles sql statement with multiplexing
""" """
lines = 0 lines = 0
end_of_file = False
# Using pool of database connections to execute sql statements # Using pool of database connections to execute sql statements
while not end_of_file: for sql_query in file:
for key, _ in sel.select(1): pool.next_free_worker().perform(sql_query)
conn = key.data
try:
if conn.is_done():
sql_query = file.readline()
lines += 1
if not sql_query:
end_of_file = True
break
conn.perform(sql_query)
if lines == 1000:
print('. ', end='', flush=True)
lines = 0
except Exception as exc: # pylint: disable=broad-except
LOG.info('Wrong SQL statement: %s', exc)
def handle_unregister_connection_pool(sel, place_threads): lines += 1
""" Handles unregistering pool of connections if lines == 1000:
""" print('.', end='', flush=True)
lines = 0
while place_threads > 0:
for key, _ in sel.select(1):
conn = key.data
sel.unregister(conn)
try:
conn.wait()
except Exception as exc: # pylint: disable=broad-except
LOG.info('Wrong SQL statement: %s', exc)
conn.close()
place_threads -= 1
def add_tiger_data(data_dir, config, threads): def add_tiger_data(data_dir, config, threads):
""" Import tiger data from directory or tar file `data dir`. """ Import tiger data from directory or tar file `data dir`.
@@ -91,25 +66,16 @@ def add_tiger_data(data_dir, config, threads):
# Reading sql_files and then for each file line handling # Reading sql_files and then for each file line handling
# sql_query in <threads - 1> chunks. # sql_query in <threads - 1> chunks.
sel = selectors.DefaultSelector()
place_threads = max(1, threads - 1) place_threads = max(1, threads - 1)
# Creates a pool of database connections with WorkerPool(dsn, place_threads, ignore_sql_errors=True) as pool:
for _ in range(place_threads): for sql_file in sql_files:
conn = DBConnection(dsn) if not tar:
conn.connect() file = open(sql_file)
sel.register(conn, selectors.EVENT_WRITE, conn) else:
file = tar.extractfile(sql_file)
for sql_file in sql_files: handle_threaded_sql_statements(pool, file)
if not tar:
file = open(sql_file)
else:
file = tar.extractfile(sql_file)
handle_threaded_sql_statements(sel, file)
# Unregistering pool of database connections
handle_unregister_connection_pool(sel, place_threads)
if tar: if tar:
tar.close() tar.close()

View File

@@ -56,13 +56,21 @@ def test_bad_query(conn):
conn.wait() conn.wait()
def test_bad_query_ignore(temp_db):
with closing(DBConnection('dbname=' + temp_db, ignore_sql_errors=True)) as conn:
conn.connect()
conn.perform('SELECT efasfjsea')
conn.wait()
def exec_with_deadlock(cur, sql, detector): def exec_with_deadlock(cur, sql, detector):
with DeadlockHandler(lambda *args: detector.append(1)): with DeadlockHandler(lambda *args: detector.append(1)):
cur.execute(sql) cur.execute(sql)
def test_deadlock(simple_conns): def test_deadlock(simple_conns):
print(psycopg2.__version__)
cur1, cur2 = simple_conns cur1, cur2 = simple_conns
cur1.execute("""CREATE TABLE t1 (id INT PRIMARY KEY, t TEXT); cur1.execute("""CREATE TABLE t1 (id INT PRIMARY KEY, t TEXT);