indexing: precompute row counts

Doing this once for the whole batch of ranks saves about
half a second for small imports. Speeds up BDD tests.
This commit is contained in:
Sarah Hoffmann
2024-08-12 14:57:26 +02:00
parent bd0316b5c3
commit 3905dd68dd

View File

@@ -7,7 +7,7 @@
""" """
Main work horse for indexing (computing addresses) the database. Main work horse for indexing (computing addresses) the database.
""" """
from typing import cast, List, Any from typing import cast, List, Any, Optional
import logging import logging
import time import time
@@ -83,9 +83,30 @@ class Indexer:
LOG.warning("Starting indexing boundaries using %s threads", LOG.warning("Starting indexing boundaries using %s threads",
self.num_threads) self.num_threads)
minrank = max(minrank, 4)
maxrank = min(maxrank, 25)
# Precompute number of rows to process for all rows
with connect(self.dsn) as conn:
hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
if hstore_info is None:
raise RuntimeError('Hstore extension is requested but not installed.')
psycopg.types.hstore.register_hstore(hstore_info)
with conn.cursor() as cur:
cur = conn.execute(""" SELECT rank_search, count(*)
FROM placex
WHERE rank_search between %s and %s
AND class = 'boundary' and type = 'administrative'
AND indexed_status > 0
GROUP BY rank_search""",
(minrank, maxrank))
total_tuples = {row.rank_search: row.count for row in cur}
with self.tokenizer.name_analyzer() as analyzer: with self.tokenizer.name_analyzer() as analyzer:
for rank in range(max(minrank, 4), min(maxrank, 26)): for rank in range(minrank, maxrank + 1):
total += await self._index(runners.BoundaryRunner(rank, analyzer)) total += await self._index(runners.BoundaryRunner(rank, analyzer),
total_tuples=total_tuples.get(rank, 0))
return total return total
@@ -101,6 +122,23 @@ class Indexer:
LOG.warning("Starting indexing rank (%i to %i) using %i threads", LOG.warning("Starting indexing rank (%i to %i) using %i threads",
minrank, maxrank, self.num_threads) minrank, maxrank, self.num_threads)
# Precompute number of rows to process for all rows
with connect(self.dsn) as conn:
hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
if hstore_info is None:
raise RuntimeError('Hstore extension is requested but not installed.')
psycopg.types.hstore.register_hstore(hstore_info)
with conn.cursor() as cur:
cur = conn.execute(""" SELECT rank_address, count(*)
FROM placex
WHERE rank_address between %s and %s
AND indexed_status > 0
GROUP BY rank_address""",
(minrank, maxrank))
total_tuples = {row.rank_address: row.count for row in cur}
with self.tokenizer.name_analyzer() as analyzer: with self.tokenizer.name_analyzer() as analyzer:
for rank in range(max(1, minrank), maxrank + 1): for rank in range(max(1, minrank), maxrank + 1):
if rank >= 30: if rank >= 30:
@@ -109,11 +147,12 @@ class Indexer:
batch = 5 batch = 5
else: else:
batch = 1 batch = 1
total += await self._index(runners.RankRunner(rank, analyzer), batch) total += await self._index(runners.RankRunner(rank, analyzer),
batch=batch, total_tuples=total_tuples.get(rank, 0))
if maxrank == 30: if maxrank == 30:
total += await self._index(runners.RankRunner(0, analyzer)) total += await self._index(runners.RankRunner(0, analyzer))
total += await self._index(runners.InterpolationRunner(analyzer), 20) total += await self._index(runners.InterpolationRunner(analyzer), batch=20)
return total return total
@@ -123,7 +162,7 @@ class Indexer:
""" """
LOG.warning("Starting indexing postcodes using %s threads", self.num_threads) LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
return await self._index(runners.PostcodeRunner(), 20) return await self._index(runners.PostcodeRunner(), batch=20)
def update_status_table(self) -> None: def update_status_table(self) -> None:
@@ -135,14 +174,20 @@ class Indexer:
conn.commit() conn.commit()
async def _index(self, runner: runners.Runner, batch: int = 1) -> int: async def _index(self, runner: runners.Runner, batch: int = 1,
total_tuples: Optional[int] = None) -> int:
""" Index a single rank or table. `runner` describes the SQL to use """ Index a single rank or table. `runner` describes the SQL to use
for indexing. `batch` describes the number of objects that for indexing. `batch` describes the number of objects that
should be processed with a single SQL statement should be processed with a single SQL statement.
`total_tuples` may contain the total number of rows to process.
When not supplied, the value will be computed using the
approriate runner function.
""" """
LOG.warning("Starting %s (using batch size %s)", runner.name(), batch) LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
total_tuples = self._prepare_indexing(runner) if total_tuples is None:
total_tuples = self._prepare_indexing(runner)
progress = ProgressLogger(runner.name(), total_tuples) progress = ProgressLogger(runner.name(), total_tuples)