diff --git a/src/nominatim_db/db/query_pool.py b/src/nominatim_db/db/query_pool.py index 08a92048..2f7307a3 100644 --- a/src/nominatim_db/db/query_pool.py +++ b/src/nominatim_db/db/query_pool.py @@ -2,7 +2,7 @@ # # This file is part of Nominatim. (https://nominatim.org) # -# Copyright (C) 2024 by the Nominatim developer community. +# Copyright (C) 2026 by the Nominatim developer community. # For a full list of authors see the git log. """ A connection pool that executes incoming queries in parallel. @@ -27,20 +27,28 @@ class QueryPool: The results of the queries is discarded. """ def __init__(self, dsn: str, pool_size: int = 1, **conn_args: Any) -> None: + self.is_cancelled = False self.wait_time = 0.0 self.query_queue: 'asyncio.Queue[QueueItem]' = asyncio.Queue(maxsize=2 * pool_size) - self.pool = [asyncio.create_task(self._worker_loop(dsn, **conn_args)) + self.pool = [asyncio.create_task(self._worker_loop_cancellable(dsn, **conn_args)) for _ in range(pool_size)] async def put_query(self, query: psycopg.abc.Query, params: Any) -> None: """ Schedule a query for execution. """ + if self.is_cancelled: + await self.finish() + return + tstart = time.time() await self.query_queue.put((query, params)) self.wait_time += time.time() - tstart await asyncio.sleep(0) + if self.is_cancelled: + await self.finish() + async def finish(self) -> None: """ Wait for all queries to finish and close the pool. """ @@ -56,6 +64,25 @@ class QueryPool: if excp is not None: raise excp + def clear_queue(self) -> None: + """ Drop all items silently that might still be queued. + """ + try: + while True: + self.query_queue.get_nowait() + except asyncio.QueueEmpty: + pass # expected + + async def _worker_loop_cancellable(self, dsn: str, **conn_args: Any) -> None: + try: + await self._worker_loop(dsn, **conn_args) + except Exception as e: + # Make sure the exception is forwarded to the main function + self.is_cancelled = True + # clear the queue here to ensure that any put() that may be blocked returns + self.clear_queue() + raise e + async def _worker_loop(self, dsn: str, **conn_args: Any) -> None: conn_args['autocommit'] = True aconn = await psycopg.AsyncConnection.connect(dsn, **conn_args)