properly close connections of indexer after use

This commit is contained in:
Sarah Hoffmann
2021-02-26 12:10:54 +01:00
parent 57db5819ef
commit 3ee8d9fa75

View File

@@ -124,8 +124,25 @@ class Indexer:
""" """
def __init__(self, dsn, num_threads): def __init__(self, dsn, num_threads):
self.conn = psycopg2.connect(dsn) self.dsn = dsn
self.threads = [DBConnection(dsn) for _ in range(num_threads)] self.num_threads = num_threads
self.conn = None
self.threads = []
def _setup_connections(self):
self.conn = psycopg2.connect(self.dsn)
self.threads = [DBConnection(self.dsn) for _ in range(self.num_threads)]
def _close_connections(self):
if self.conn:
self.conn.close()
self.conn = None
for thread in self.threads:
thread.close()
threads = []
def index_full(self, analyse=True): def index_full(self, analyse=True):
@@ -134,34 +151,44 @@ class Indexer:
database will be analysed at the appropriate places to database will be analysed at the appropriate places to
ensure that database statistics are updated. ensure that database statistics are updated.
""" """
self.index_by_rank(0, 4) conn = psycopg2.connect(self.dsn)
self._analyse_db_if(analyse)
self.index_boundaries(0, 30) try:
self._analyse_db_if(analyse) self.index_by_rank(0, 4)
self._analyse_db_if(conn, analyse)
self.index_by_rank(5, 25) self.index_boundaries(0, 30)
self._analyse_db_if(analyse) self._analyse_db_if(conn, analyse)
self.index_by_rank(26, 30) self.index_by_rank(5, 25)
self._analyse_db_if(analyse) self._analyse_db_if(conn, analyse)
self.index_postcodes() self.index_by_rank(26, 30)
self._analyse_db_if(analyse) self._analyse_db_if(conn, analyse)
def _analyse_db_if(self, condition): self.index_postcodes()
self._analyse_db_if(conn, analyse)
finally:
conn.close()
def _analyse_db_if(self, conn, condition):
if condition: if condition:
with self.conn.cursor() as cur: with conn.cursor() as cur:
cur.execute('ANALYSE') cur.execute('ANALYSE')
def index_boundaries(self, minrank, maxrank): def index_boundaries(self, minrank, maxrank):
""" Index only administrative boundaries within the given rank range. """ Index only administrative boundaries within the given rank range.
""" """
LOG.warning("Starting indexing boundaries using %s threads", LOG.warning("Starting indexing boundaries using %s threads",
len(self.threads)) self.num_threads)
for rank in range(max(minrank, 4), min(maxrank, 26)): self._setup_connections()
self.index(BoundaryRunner(rank))
try:
for rank in range(max(minrank, 4), min(maxrank, 26)):
self.index(BoundaryRunner(rank))
finally:
self._close_connections()
def index_by_rank(self, minrank, maxrank): def index_by_rank(self, minrank, maxrank):
""" Index all entries of placex in the given rank range (inclusive) """ Index all entries of placex in the given rank range (inclusive)
@@ -172,30 +199,48 @@ class Indexer:
""" """
maxrank = min(maxrank, 30) maxrank = min(maxrank, 30)
LOG.warning("Starting indexing rank (%i to %i) using %i threads", LOG.warning("Starting indexing rank (%i to %i) using %i threads",
minrank, maxrank, len(self.threads)) minrank, maxrank, self.num_threads)
for rank in range(max(1, minrank), maxrank): self._setup_connections()
self.index(RankRunner(rank))
if maxrank == 30: try:
self.index(RankRunner(0)) for rank in range(max(1, minrank), maxrank):
self.index(InterpolationRunner(), 20) self.index(RankRunner(rank))
self.index(RankRunner(30), 20)
else: if maxrank == 30:
self.index(RankRunner(maxrank)) self.index(RankRunner(0))
self.index(InterpolationRunner(), 20)
self.index(RankRunner(30), 20)
else:
self.index(RankRunner(maxrank))
finally:
self._close_connections()
def index_postcodes(self): def index_postcodes(self):
"""Index the entries ofthe location_postcode table. """Index the entries ofthe location_postcode table.
""" """
self.index(PostcodeRunner(), 20) LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
self._setup_connections()
try:
self.index(PostcodeRunner(), 20)
finally:
self._close_connections()
def update_status_table(self): def update_status_table(self):
""" Update the status in the status table to 'indexed'. """ Update the status in the status table to 'indexed'.
""" """
with self.conn.cursor() as cur: conn = psycopg2.connect(self.dsn)
cur.execute('UPDATE import_status SET indexed = true')
self.conn.commit() try:
with conn.cursor() as cur:
cur.execute('UPDATE import_status SET indexed = true')
conn.commit()
finally:
conn.close()
def index(self, obj, batch=1): def index(self, obj, batch=1):
""" Index a single rank or table. `obj` describes the SQL to use """ Index a single rank or table. `obj` describes the SQL to use