indexer: make self.conn function-local

Also switches to our internal connect function which gives us
a cursor with a sclar() function.
This commit is contained in:
Sarah Hoffmann
2021-04-19 18:15:09 +02:00
parent 6430371d7d
commit 26a81654a8

View File

@@ -4,11 +4,10 @@ Main work horse for indexing (computing addresses) the database.
import logging import logging
import select import select
import psycopg2
from nominatim.indexer.progress import ProgressLogger from nominatim.indexer.progress import ProgressLogger
from nominatim.indexer import runners from nominatim.indexer import runners
from nominatim.db.async_connection import DBConnection from nominatim.db.async_connection import DBConnection
from nominatim.db.connection import connect
LOG = logging.getLogger() LOG = logging.getLogger()
@@ -20,20 +19,14 @@ class Indexer:
def __init__(self, dsn, num_threads): def __init__(self, dsn, num_threads):
self.dsn = dsn self.dsn = dsn
self.num_threads = num_threads self.num_threads = num_threads
self.conn = None
self.threads = [] self.threads = []
def _setup_connections(self): def _setup_connections(self):
self.conn = psycopg2.connect(self.dsn)
self.threads = [DBConnection(self.dsn) for _ in range(self.num_threads)] self.threads = [DBConnection(self.dsn) for _ in range(self.num_threads)]
def _close_connections(self): def _close_connections(self):
if self.conn:
self.conn.close()
self.conn = None
for thread in self.threads: for thread in self.threads:
thread.close() thread.close()
self.threads = [] self.threads = []
@@ -45,7 +38,7 @@ class Indexer:
database will be analysed at the appropriate places to database will be analysed at the appropriate places to
ensure that database statistics are updated. ensure that database statistics are updated.
""" """
with psycopg2.connect(self.dsn) as conn: with connect(self.dsn) as conn:
conn.autocommit = True conn.autocommit = True
if analyse: if analyse:
@@ -128,15 +121,11 @@ class Indexer:
def update_status_table(self): def update_status_table(self):
""" Update the status in the status table to 'indexed'. """ Update the status in the status table to 'indexed'.
""" """
conn = psycopg2.connect(self.dsn) with connect(self.dsn) as conn:
try:
with conn.cursor() as cur: with conn.cursor() as cur:
cur.execute('UPDATE import_status SET indexed = true') cur.execute('UPDATE import_status SET indexed = true')
conn.commit() conn.commit()
finally:
conn.close()
def _index(self, runner, batch=1): def _index(self, runner, batch=1):
""" Index a single rank or table. `runner` describes the SQL to use """ Index a single rank or table. `runner` describes the SQL to use
@@ -145,36 +134,35 @@ class Indexer:
""" """
LOG.warning("Starting %s (using batch size %s)", runner.name(), batch) LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
cur = self.conn.cursor() with connect(self.dsn) as conn:
cur.execute(runner.sql_count_objects()) with conn.cursor() as cur:
total_tuples = cur.scalar(runner.sql_count_objects())
LOG.debug("Total number of rows: %i", total_tuples)
total_tuples = cur.fetchone()[0] conn.commit()
LOG.debug("Total number of rows: %i", total_tuples)
cur.close() progress = ProgressLogger(runner.name(), total_tuples)
progress = ProgressLogger(runner.name(), total_tuples) if total_tuples > 0:
with conn.cursor(name='places') as cur:
cur.execute(runner.sql_get_objects())
if total_tuples > 0: next_thread = self.find_free_thread()
cur = self.conn.cursor(name='places') while True:
cur.execute(runner.sql_get_objects()) places = [p[0] for p in cur.fetchmany(batch)]
if not places:
break
next_thread = self.find_free_thread() LOG.debug("Processing places: %s", str(places))
while True: thread = next(next_thread)
places = [p[0] for p in cur.fetchmany(batch)]
if not places:
break
LOG.debug("Processing places: %s", str(places)) thread.perform(runner.sql_index_place(places))
thread = next(next_thread) progress.add(len(places))
thread.perform(runner.sql_index_place(places)) conn.commit()
progress.add(len(places))
cur.close() for thread in self.threads:
thread.wait()
for thread in self.threads:
thread.wait()
progress.done() progress.done()