mirror of
https://github.com/osm-search/Nominatim.git
synced 2026-03-10 03:54:06 +00:00
immediately terminate indexing when a task catches an exception
This commit is contained in:
@@ -2,7 +2,7 @@
|
|||||||
#
|
#
|
||||||
# This file is part of Nominatim. (https://nominatim.org)
|
# This file is part of Nominatim. (https://nominatim.org)
|
||||||
#
|
#
|
||||||
# Copyright (C) 2024 by the Nominatim developer community.
|
# Copyright (C) 2026 by the Nominatim developer community.
|
||||||
# For a full list of authors see the git log.
|
# For a full list of authors see the git log.
|
||||||
"""
|
"""
|
||||||
A connection pool that executes incoming queries in parallel.
|
A connection pool that executes incoming queries in parallel.
|
||||||
@@ -27,20 +27,28 @@ class QueryPool:
|
|||||||
The results of the queries is discarded.
|
The results of the queries is discarded.
|
||||||
"""
|
"""
|
||||||
def __init__(self, dsn: str, pool_size: int = 1, **conn_args: Any) -> None:
|
def __init__(self, dsn: str, pool_size: int = 1, **conn_args: Any) -> None:
|
||||||
|
self.is_cancelled = False
|
||||||
self.wait_time = 0.0
|
self.wait_time = 0.0
|
||||||
self.query_queue: 'asyncio.Queue[QueueItem]' = asyncio.Queue(maxsize=2 * pool_size)
|
self.query_queue: 'asyncio.Queue[QueueItem]' = asyncio.Queue(maxsize=2 * pool_size)
|
||||||
|
|
||||||
self.pool = [asyncio.create_task(self._worker_loop(dsn, **conn_args))
|
self.pool = [asyncio.create_task(self._worker_loop_cancellable(dsn, **conn_args))
|
||||||
for _ in range(pool_size)]
|
for _ in range(pool_size)]
|
||||||
|
|
||||||
async def put_query(self, query: psycopg.abc.Query, params: Any) -> None:
|
async def put_query(self, query: psycopg.abc.Query, params: Any) -> None:
|
||||||
""" Schedule a query for execution.
|
""" Schedule a query for execution.
|
||||||
"""
|
"""
|
||||||
|
if self.is_cancelled:
|
||||||
|
await self.finish()
|
||||||
|
return
|
||||||
|
|
||||||
tstart = time.time()
|
tstart = time.time()
|
||||||
await self.query_queue.put((query, params))
|
await self.query_queue.put((query, params))
|
||||||
self.wait_time += time.time() - tstart
|
self.wait_time += time.time() - tstart
|
||||||
await asyncio.sleep(0)
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
|
if self.is_cancelled:
|
||||||
|
await self.finish()
|
||||||
|
|
||||||
async def finish(self) -> None:
|
async def finish(self) -> None:
|
||||||
""" Wait for all queries to finish and close the pool.
|
""" Wait for all queries to finish and close the pool.
|
||||||
"""
|
"""
|
||||||
@@ -56,6 +64,25 @@ class QueryPool:
|
|||||||
if excp is not None:
|
if excp is not None:
|
||||||
raise excp
|
raise excp
|
||||||
|
|
||||||
|
def clear_queue(self) -> None:
|
||||||
|
""" Drop all items silently that might still be queued.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
self.query_queue.get_nowait()
|
||||||
|
except asyncio.QueueEmpty:
|
||||||
|
pass # expected
|
||||||
|
|
||||||
|
async def _worker_loop_cancellable(self, dsn: str, **conn_args: Any) -> None:
|
||||||
|
try:
|
||||||
|
await self._worker_loop(dsn, **conn_args)
|
||||||
|
except Exception as e:
|
||||||
|
# Make sure the exception is forwarded to the main function
|
||||||
|
self.is_cancelled = True
|
||||||
|
# clear the queue here to ensure that any put() that may be blocked returns
|
||||||
|
self.clear_queue()
|
||||||
|
raise e
|
||||||
|
|
||||||
async def _worker_loop(self, dsn: str, **conn_args: Any) -> None:
|
async def _worker_loop(self, dsn: str, **conn_args: Any) -> None:
|
||||||
conn_args['autocommit'] = True
|
conn_args['autocommit'] = True
|
||||||
aconn = await psycopg.AsyncConnection.connect(dsn, **conn_args)
|
aconn = await psycopg.AsyncConnection.connect(dsn, **conn_args)
|
||||||
|
|||||||
Reference in New Issue
Block a user