move WorkerPool into db module

The pool is independent of the indexer and may also be used
by other parts of the software.
This commit is contained in:
Sarah Hoffmann
2021-05-13 17:11:17 +02:00
parent 96e6bbe3a1
commit b9a09129fa
2 changed files with 72 additions and 69 deletions

View File

@@ -6,6 +6,9 @@
""" Database helper functions for the indexer. """ Database helper functions for the indexer.
""" """
import logging import logging
import select
import time
import psycopg2 import psycopg2
from psycopg2.extras import wait_select from psycopg2.extras import wait_select
@@ -131,3 +134,71 @@ class DBConnection:
return True return True
return False return False
class WorkerPool:
""" A pool of asynchronous database connections.
The pool may be used as a context manager.
"""
REOPEN_CONNECTIONS_AFTER = 100000
def __init__(self, dsn, pool_size):
self.threads = [DBConnection(dsn) for _ in range(pool_size)]
self.free_workers = self._yield_free_worker()
self.wait_time = 0
def finish_all(self):
""" Wait for all connection to finish.
"""
for thread in self.threads:
while not thread.is_done():
thread.wait()
self.free_workers = self._yield_free_worker()
def close(self):
""" Close all connections and clear the pool.
"""
for thread in self.threads:
thread.close()
self.threads = []
self.free_workers = None
def next_free_worker(self):
""" Get the next free connection.
"""
return next(self.free_workers)
def _yield_free_worker(self):
ready = self.threads
command_stat = 0
while True:
for thread in ready:
if thread.is_done():
command_stat += 1
yield thread
if command_stat > self.REOPEN_CONNECTIONS_AFTER:
for thread in self.threads:
while not thread.is_done():
thread.wait()
thread.connect()
ready = self.threads
command_stat = 0
else:
tstart = time.time()
_, ready, _ = select.select([], self.threads, [])
self.wait_time += time.time() - tstart
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.finish_all()
self.close()

View File

@@ -2,14 +2,13 @@
Main work horse for indexing (computing addresses) the database. Main work horse for indexing (computing addresses) the database.
""" """
import logging import logging
import select
import time import time
import psycopg2.extras import psycopg2.extras
from nominatim.indexer.progress import ProgressLogger from nominatim.indexer.progress import ProgressLogger
from nominatim.indexer import runners from nominatim.indexer import runners
from nominatim.db.async_connection import DBConnection from nominatim.db.async_connection import DBConnection, WorkerPool
from nominatim.db.connection import connect from nominatim.db.connection import connect
LOG = logging.getLogger() LOG = logging.getLogger()
@@ -81,73 +80,6 @@ class PlaceFetcher:
self.conn.wait() self.conn.wait()
self.close() self.close()
class WorkerPool:
""" A pool of asynchronous database connections.
The pool may be used as a context manager.
"""
REOPEN_CONNECTIONS_AFTER = 100000
def __init__(self, dsn, pool_size):
self.threads = [DBConnection(dsn) for _ in range(pool_size)]
self.free_workers = self._yield_free_worker()
self.wait_time = 0
def finish_all(self):
""" Wait for all connection to finish.
"""
for thread in self.threads:
while not thread.is_done():
thread.wait()
self.free_workers = self._yield_free_worker()
def close(self):
""" Close all connections and clear the pool.
"""
for thread in self.threads:
thread.close()
self.threads = []
self.free_workers = None
def next_free_worker(self):
""" Get the next free connection.
"""
return next(self.free_workers)
def _yield_free_worker(self):
ready = self.threads
command_stat = 0
while True:
for thread in ready:
if thread.is_done():
command_stat += 1
yield thread
if command_stat > self.REOPEN_CONNECTIONS_AFTER:
for thread in self.threads:
while not thread.is_done():
thread.wait()
thread.connect()
ready = self.threads
command_stat = 0
else:
tstart = time.time()
_, ready, _ = select.select([], self.threads, [])
self.wait_time += time.time() - tstart
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.finish_all()
self.close()
class Indexer: class Indexer:
""" Main indexing routine. """ Main indexing routine.