Merge pull request #2326 from lonvia/wokerpool-for-tiger-data

Use WorkerPool when importing Tiger data
This commit is contained in:
Sarah Hoffmann
2021-05-13 22:09:56 +02:00
committed by GitHub
4 changed files with 108 additions and 123 deletions

View File

@@ -6,6 +6,9 @@
""" Database helper functions for the indexer. """ Database helper functions for the indexer.
""" """
import logging import logging
import select
import time
import psycopg2 import psycopg2
from psycopg2.extras import wait_select from psycopg2.extras import wait_select
@@ -25,8 +28,9 @@ class DeadlockHandler:
normally. normally.
""" """
def __init__(self, handler): def __init__(self, handler, ignore_sql_errors=False):
self.handler = handler self.handler = handler
self.ignore_sql_errors = ignore_sql_errors
def __enter__(self): def __enter__(self):
pass pass
@@ -41,6 +45,11 @@ class DeadlockHandler:
if exc_value.pgcode == '40P01': if exc_value.pgcode == '40P01':
self.handler() self.handler()
return True return True
if self.ignore_sql_errors and isinstance(exc_value, psycopg2.Error):
LOG.info("SQL error ignored: %s", exc_value)
return True
return False return False
@@ -48,10 +57,11 @@ class DBConnection:
""" A single non-blocking database connection. """ A single non-blocking database connection.
""" """
def __init__(self, dsn, cursor_factory=None): def __init__(self, dsn, cursor_factory=None, ignore_sql_errors=False):
self.current_query = None self.current_query = None
self.current_params = None self.current_params = None
self.dsn = dsn self.dsn = dsn
self.ignore_sql_errors = ignore_sql_errors
self.conn = None self.conn = None
self.cursor = None self.cursor = None
@@ -98,7 +108,7 @@ class DBConnection:
""" Block until any pending operation is done. """ Block until any pending operation is done.
""" """
while True: while True:
with DeadlockHandler(self._deadlock_handler): with DeadlockHandler(self._deadlock_handler, self.ignore_sql_errors):
wait_select(self.conn) wait_select(self.conn)
self.current_query = None self.current_query = None
return return
@@ -125,9 +135,78 @@ class DBConnection:
if self.current_query is None: if self.current_query is None:
return True return True
with DeadlockHandler(self._deadlock_handler): with DeadlockHandler(self._deadlock_handler, self.ignore_sql_errors):
if self.conn.poll() == psycopg2.extensions.POLL_OK: if self.conn.poll() == psycopg2.extensions.POLL_OK:
self.current_query = None self.current_query = None
return True return True
return False return False
class WorkerPool:
""" A pool of asynchronous database connections.
The pool may be used as a context manager.
"""
REOPEN_CONNECTIONS_AFTER = 100000
def __init__(self, dsn, pool_size, ignore_sql_errors=False):
self.threads = [DBConnection(dsn, ignore_sql_errors=ignore_sql_errors)
for _ in range(pool_size)]
self.free_workers = self._yield_free_worker()
self.wait_time = 0
def finish_all(self):
""" Wait for all connection to finish.
"""
for thread in self.threads:
while not thread.is_done():
thread.wait()
self.free_workers = self._yield_free_worker()
def close(self):
""" Close all connections and clear the pool.
"""
for thread in self.threads:
thread.close()
self.threads = []
self.free_workers = None
def next_free_worker(self):
""" Get the next free connection.
"""
return next(self.free_workers)
def _yield_free_worker(self):
ready = self.threads
command_stat = 0
while True:
for thread in ready:
if thread.is_done():
command_stat += 1
yield thread
if command_stat > self.REOPEN_CONNECTIONS_AFTER:
for thread in self.threads:
while not thread.is_done():
thread.wait()
thread.connect()
ready = self.threads
command_stat = 0
else:
tstart = time.time()
_, ready, _ = select.select([], self.threads, [])
self.wait_time += time.time() - tstart
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.finish_all()
self.close()

View File

@@ -2,14 +2,13 @@
Main work horse for indexing (computing addresses) the database. Main work horse for indexing (computing addresses) the database.
""" """
import logging import logging
import select
import time import time
import psycopg2.extras import psycopg2.extras
from nominatim.indexer.progress import ProgressLogger from nominatim.indexer.progress import ProgressLogger
from nominatim.indexer import runners from nominatim.indexer import runners
from nominatim.db.async_connection import DBConnection from nominatim.db.async_connection import DBConnection, WorkerPool
from nominatim.db.connection import connect from nominatim.db.connection import connect
LOG = logging.getLogger() LOG = logging.getLogger()
@@ -81,73 +80,6 @@ class PlaceFetcher:
self.conn.wait() self.conn.wait()
self.close() self.close()
class WorkerPool:
""" A pool of asynchronous database connections.
The pool may be used as a context manager.
"""
REOPEN_CONNECTIONS_AFTER = 100000
def __init__(self, dsn, pool_size):
self.threads = [DBConnection(dsn) for _ in range(pool_size)]
self.free_workers = self._yield_free_worker()
self.wait_time = 0
def finish_all(self):
""" Wait for all connection to finish.
"""
for thread in self.threads:
while not thread.is_done():
thread.wait()
self.free_workers = self._yield_free_worker()
def close(self):
""" Close all connections and clear the pool.
"""
for thread in self.threads:
thread.close()
self.threads = []
self.free_workers = None
def next_free_worker(self):
""" Get the next free connection.
"""
return next(self.free_workers)
def _yield_free_worker(self):
ready = self.threads
command_stat = 0
while True:
for thread in ready:
if thread.is_done():
command_stat += 1
yield thread
if command_stat > self.REOPEN_CONNECTIONS_AFTER:
for thread in self.threads:
while not thread.is_done():
thread.wait()
thread.connect()
ready = self.threads
command_stat = 0
else:
tstart = time.time()
_, ready, _ = select.select([], self.threads, [])
self.wait_time += time.time() - tstart
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.finish_all()
self.close()
class Indexer: class Indexer:
""" Main indexing routine. """ Main indexing routine.

View File

@@ -4,10 +4,9 @@ Functions for importing tiger data and handling tarbar and directory files
import logging import logging
import os import os
import tarfile import tarfile
import selectors
from nominatim.db.connection import connect from nominatim.db.connection import connect
from nominatim.db.async_connection import DBConnection from nominatim.db.async_connection import WorkerPool
from nominatim.db.sql_preprocessor import SQLPreprocessor from nominatim.db.sql_preprocessor import SQLPreprocessor
@@ -37,44 +36,20 @@ def handle_tarfile_or_directory(data_dir):
return sql_files, tar return sql_files, tar
def handle_threaded_sql_statements(sel, file): def handle_threaded_sql_statements(pool, file):
""" Handles sql statement with multiplexing """ Handles sql statement with multiplexing
""" """
lines = 0 lines = 0
end_of_file = False
# Using pool of database connections to execute sql statements # Using pool of database connections to execute sql statements
while not end_of_file: for sql_query in file:
for key, _ in sel.select(1): pool.next_free_worker().perform(sql_query)
conn = key.data
try:
if conn.is_done():
sql_query = file.readline()
lines += 1
if not sql_query:
end_of_file = True
break
conn.perform(sql_query)
if lines == 1000:
print('. ', end='', flush=True)
lines = 0
except Exception as exc: # pylint: disable=broad-except
LOG.info('Wrong SQL statement: %s', exc)
def handle_unregister_connection_pool(sel, place_threads): lines += 1
""" Handles unregistering pool of connections if lines == 1000:
""" print('.', end='', flush=True)
lines = 0
while place_threads > 0:
for key, _ in sel.select(1):
conn = key.data
sel.unregister(conn)
try:
conn.wait()
except Exception as exc: # pylint: disable=broad-except
LOG.info('Wrong SQL statement: %s', exc)
conn.close()
place_threads -= 1
def add_tiger_data(data_dir, config, threads): def add_tiger_data(data_dir, config, threads):
""" Import tiger data from directory or tar file `data dir`. """ Import tiger data from directory or tar file `data dir`.
@@ -91,25 +66,16 @@ def add_tiger_data(data_dir, config, threads):
# Reading sql_files and then for each file line handling # Reading sql_files and then for each file line handling
# sql_query in <threads - 1> chunks. # sql_query in <threads - 1> chunks.
sel = selectors.DefaultSelector()
place_threads = max(1, threads - 1) place_threads = max(1, threads - 1)
# Creates a pool of database connections with WorkerPool(dsn, place_threads, ignore_sql_errors=True) as pool:
for _ in range(place_threads): for sql_file in sql_files:
conn = DBConnection(dsn) if not tar:
conn.connect() file = open(sql_file)
sel.register(conn, selectors.EVENT_WRITE, conn) else:
file = tar.extractfile(sql_file)
for sql_file in sql_files: handle_threaded_sql_statements(pool, file)
if not tar:
file = open(sql_file)
else:
file = tar.extractfile(sql_file)
handle_threaded_sql_statements(sel, file)
# Unregistering pool of database connections
handle_unregister_connection_pool(sel, place_threads)
if tar: if tar:
tar.close() tar.close()

View File

@@ -56,13 +56,21 @@ def test_bad_query(conn):
conn.wait() conn.wait()
def test_bad_query_ignore(temp_db):
with closing(DBConnection('dbname=' + temp_db, ignore_sql_errors=True)) as conn:
conn.connect()
conn.perform('SELECT efasfjsea')
conn.wait()
def exec_with_deadlock(cur, sql, detector): def exec_with_deadlock(cur, sql, detector):
with DeadlockHandler(lambda *args: detector.append(1)): with DeadlockHandler(lambda *args: detector.append(1)):
cur.execute(sql) cur.execute(sql)
def test_deadlock(simple_conns): def test_deadlock(simple_conns):
print(psycopg2.__version__)
cur1, cur2 = simple_conns cur1, cur2 = simple_conns
cur1.execute("""CREATE TABLE t1 (id INT PRIMARY KEY, t TEXT); cur1.execute("""CREATE TABLE t1 (id INT PRIMARY KEY, t TEXT);