indexer: fetch extra place data asynchronously

The indexer now fetches any extra data besides the place_id
asynchronously while processing the places from the last batch.
This also means that more places are now fetched at once.
This commit is contained in:
Sarah Hoffmann
2021-04-30 16:17:28 +02:00
parent 6ce6f62b8e
commit 20891abe1c
2 changed files with 96 additions and 54 deletions

View File

@@ -14,6 +14,73 @@ from nominatim.db.connection import connect
LOG = logging.getLogger()
class PlaceFetcher:
""" Asynchronous connection that fetches place details for processing.
"""
def __init__(self, dsn, setup_conn):
self.wait_time = 0
self.current_ids = None
self.conn = DBConnection(dsn, cursor_factory=psycopg2.extras.DictCursor)
with setup_conn.cursor() as cur:
# need to fetch those manually because register_hstore cannot
# fetch them on an asynchronous connection below.
hstore_oid = cur.scalar("SELECT 'hstore'::regtype::oid")
hstore_array_oid = cur.scalar("SELECT 'hstore[]'::regtype::oid")
psycopg2.extras.register_hstore(self.conn.conn, oid=hstore_oid,
array_oid=hstore_array_oid)
def close(self):
""" Close the underlying asynchronous connection.
"""
if self.conn:
self.conn.close()
self.conn = None
def fetch_next_batch(self, cur, runner):
""" Send a request for the next batch of places.
If details for the places are required, they will be fetched
asynchronously.
Returns true if there is still data available.
"""
ids = cur.fetchmany(100)
if not ids:
self.current_ids = None
return False
if hasattr(runner, 'get_place_details'):
runner.get_place_details(self.conn, ids)
self.current_ids = []
else:
self.current_ids = ids
return True
def get_batch(self):
""" Get the next batch of data, previously requested with
`fetch_next_batch`.
"""
if self.current_ids is not None and not self.current_ids:
tstart = time.time()
self.conn.wait()
self.wait_time += time.time() - tstart
self.current_ids = self.conn.cursor.fetchall()
return self.current_ids
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.conn.wait()
self.close()
class WorkerPool:
""" A pool of asynchronous database connections.
@@ -24,6 +91,7 @@ class WorkerPool:
def __init__(self, dsn, pool_size):
self.threads = [DBConnection(dsn) for _ in range(pool_size)]
self.free_workers = self._yield_free_worker()
self.wait_time = 0
def finish_all(self):
@@ -67,7 +135,9 @@ class WorkerPool:
ready = self.threads
command_stat = 0
else:
tstart = time.time()
_, ready, _ = select.select([], self.threads, [])
self.wait_time += time.time() - tstart
def __enter__(self):
@@ -75,6 +145,7 @@ class WorkerPool:
def __exit__(self, exc_type, exc_value, traceback):
self.finish_all()
self.close()
@@ -184,70 +255,33 @@ class Indexer:
total_tuples = cur.scalar(runner.sql_count_objects())
LOG.debug("Total number of rows: %i", total_tuples)
# need to fetch those manually because register_hstore cannot
# fetch them on an asynchronous connection below.
hstore_oid = cur.scalar("SELECT 'hstore'::regtype::oid")
hstore_array_oid = cur.scalar("SELECT 'hstore[]'::regtype::oid")
conn.commit()
progress = ProgressLogger(runner.name(), total_tuples)
fetcher_wait = 0
pool_wait = 0
if total_tuples > 0:
with conn.cursor(name='places') as cur:
cur.execute(runner.sql_get_objects())
fetcher = DBConnection(self.dsn, cursor_factory=psycopg2.extras.DictCursor)
psycopg2.extras.register_hstore(fetcher.conn,
oid=hstore_oid,
array_oid=hstore_array_oid)
with PlaceFetcher(self.dsn, conn) as fetcher:
with WorkerPool(self.dsn, self.num_threads) as pool:
has_more = fetcher.fetch_next_batch(cur, runner)
while has_more:
places = fetcher.get_batch()
with WorkerPool(self.dsn, self.num_threads) as pool:
places = self._fetch_next_batch(cur, fetcher, runner)
while places is not None:
if not places:
t0 = time.time()
fetcher.wait()
fetcher_wait += time.time() - t0
places = fetcher.cursor.fetchall()
# asynchronously get the next batch
has_more = fetcher.fetch_next_batch(cur, runner)
# asynchronously get the next batch
next_places = self._fetch_next_batch(cur, fetcher, runner)
# And insert the curent batch
for idx in range(0, len(places), batch):
part = places[idx:idx+batch]
LOG.debug("Processing places: %s", str(part))
runner.index_places(pool.next_free_worker(), part)
progress.add(len(part))
# And insert the curent batch
for idx in range(0, len(places), batch):
t0 = time.time()
worker = pool.next_free_worker()
pool_wait += time.time() - t0
part = places[idx:idx+batch]
LOG.debug("Processing places: %s", str(part))
runner.index_places(worker, part)
progress.add(len(part))
places = next_places
pool.finish_all()
fetcher.wait()
fetcher.close()
LOG.info("Wait time: fetcher: %.2fs, pool: %.2fs",
fetcher.wait_time, pool.wait_time)
conn.commit()
progress.done()
LOG.warning("Wait time: fetcher: {}s, pool: {}s".format(fetcher_wait, pool_wait))
def _fetch_next_batch(self, cur, fetcher, runner):
ids = cur.fetchmany(100)
if not ids:
return None
if not hasattr(runner, 'get_place_details'):
return ids
runner.get_place_details(fetcher, ids)
return []

View File

@@ -28,7 +28,8 @@ class AbstractPlacexRunner:
""".format(','.join(["(%s, %s::hstore, %s::jsonb)"] * num_places))
def get_place_details(self, worker, ids):
@staticmethod
def get_place_details(worker, ids):
worker.perform("""SELECT place_id, (placex_prepare_update(placex)).*
FROM placex WHERE place_id IN %s""",
(tuple((p[0] for p in ids)), ))
@@ -103,12 +104,19 @@ class InterpolationRunner:
@staticmethod
def sql_get_objects():
return """SELECT place_id, get_interpolation_address(address, osm_id) as address
return """SELECT place_id
FROM location_property_osmline
WHERE indexed_status > 0
ORDER BY geometry_sector"""
@staticmethod
def get_place_details(worker, ids):
worker.perform("""SELECT place_id, get_interpolation_address(address, osm_id) as address
FROM location_property_osmline WHERE place_id IN %s""",
(tuple((p[0] for p in ids)), ))
@staticmethod
@functools.lru_cache(maxsize=1)
def _index_sql(num_places):