fix timeout use for replication timeout

The timeout parameter is no longer taken into account since
pyosmium switched to the requests library. This adds the parameter
back.
This commit is contained in:
Sarah Hoffmann
2022-11-08 21:45:36 +01:00
parent 951f92f665
commit 2dd8433ab6
2 changed files with 37 additions and 6 deletions

View File

@@ -148,7 +148,7 @@ class UpdateReplication:
while True: while True:
with connect(args.config.get_libpq_dsn()) as conn: with connect(args.config.get_libpq_dsn()) as conn:
start = dt.datetime.now(dt.timezone.utc) start = dt.datetime.now(dt.timezone.utc)
state = replication.update(conn, params) state = replication.update(conn, params, socket_timeout=args.socket_timeout)
if state is not replication.UpdateState.NO_CHANGES: if state is not replication.UpdateState.NO_CHANGES:
status.log_status(conn, start, 'import') status.log_status(conn, start, 'import')
batchdate, _, _ = status.get_status(conn) batchdate, _, _ = status.get_status(conn)

View File

@@ -7,13 +7,16 @@
""" """
Functions for updating a database from a replication source. Functions for updating a database from a replication source.
""" """
from typing import ContextManager, MutableMapping, Any, Generator, cast from typing import ContextManager, MutableMapping, Any, Generator, cast, Iterator
from contextlib import contextmanager from contextlib import contextmanager
import datetime as dt import datetime as dt
from enum import Enum from enum import Enum
import logging import logging
import time import time
import types
import urllib.request as urlrequest
import requests
from nominatim.db import status from nominatim.db import status
from nominatim.db.connection import Connection from nominatim.db.connection import Connection
from nominatim.tools.exec_utils import run_osm2pgsql from nominatim.tools.exec_utils import run_osm2pgsql
@@ -22,6 +25,7 @@ from nominatim.errors import UsageError
try: try:
from osmium.replication.server import ReplicationServer from osmium.replication.server import ReplicationServer
from osmium import WriteHandler from osmium import WriteHandler
from osmium import version as pyo_version
except ImportError as exc: except ImportError as exc:
logging.getLogger().critical("pyosmium not installed. Replication functions not available.\n" logging.getLogger().critical("pyosmium not installed. Replication functions not available.\n"
"To install pyosmium via pip: pip3 install osmium") "To install pyosmium via pip: pip3 install osmium")
@@ -86,7 +90,8 @@ class UpdateState(Enum):
NO_CHANGES = 3 NO_CHANGES = 3
def update(conn: Connection, options: MutableMapping[str, Any]) -> UpdateState: def update(conn: Connection, options: MutableMapping[str, Any],
socket_timeout: int = 60) -> UpdateState:
""" Update database from the next batch of data. Returns the state of """ Update database from the next batch of data. Returns the state of
updates according to `UpdateState`. updates according to `UpdateState`.
""" """
@@ -114,7 +119,7 @@ def update(conn: Connection, options: MutableMapping[str, Any]) -> UpdateState:
options['import_file'].unlink() options['import_file'].unlink()
# Read updates into file. # Read updates into file.
with _make_replication_server(options['base_url']) as repl: with _make_replication_server(options['base_url'], socket_timeout) as repl:
outhandler = WriteHandler(str(options['import_file'])) outhandler = WriteHandler(str(options['import_file']))
endseq = repl.apply_diffs(outhandler, startseq + 1, endseq = repl.apply_diffs(outhandler, startseq + 1,
max_size=options['max_diff_size'] * 1024) max_size=options['max_diff_size'] * 1024)
@@ -136,14 +141,40 @@ def update(conn: Connection, options: MutableMapping[str, Any]) -> UpdateState:
return UpdateState.UP_TO_DATE return UpdateState.UP_TO_DATE
def _make_replication_server(url: str) -> ContextManager[ReplicationServer]: def _make_replication_server(url: str, timeout: int) -> ContextManager[ReplicationServer]:
""" Returns a ReplicationServer in form of a context manager. """ Returns a ReplicationServer in form of a context manager.
Creates a light wrapper around older versions of pyosmium that did Creates a light wrapper around older versions of pyosmium that did
not support the context manager interface. not support the context manager interface.
""" """
if hasattr(ReplicationServer, '__enter__'): if hasattr(ReplicationServer, '__enter__'):
return cast(ContextManager[ReplicationServer], ReplicationServer(url)) # Patches the open_url function for pyosmium >= 3.2
# where the socket timeout is no longer respected.
def patched_open_url(self: ReplicationServer, url: urlrequest.Request) -> Any:
""" Download a resource from the given URL and return a byte sequence
of the content.
"""
get_params = {
'headers': {"User-Agent" : f"Nominatim (pyosmium/{pyo_version.pyosmium_release})"},
'timeout': timeout or None,
'stream': True
}
if self.session is not None:
return self.session.get(url.get_full_url(), **get_params)
@contextmanager
def _get_url_with_session() -> Iterator[requests.Response]:
with requests.Session() as session:
request = session.get(url.get_full_url(), **get_params) # type: ignore
yield request
return _get_url_with_session()
repl = ReplicationServer(url)
repl.open_url = types.MethodType(patched_open_url, repl)
return cast(ContextManager[ReplicationServer], repl)
@contextmanager @contextmanager
def get_cm() -> Generator[ReplicationServer, None, None]: def get_cm() -> Generator[ReplicationServer, None, None]: