Merge pull request #2871 from lonvia/fix-timeout-for-updates

Fix timeout for updates
This commit is contained in:
Sarah Hoffmann
2022-11-09 14:26:39 +01:00
committed by GitHub
3 changed files with 50 additions and 15 deletions

View File

@@ -99,7 +99,7 @@ jobs:
if: matrix.ubuntu == 22
- name: Install latest pylint/mypy
run: pip3 install -U pylint mypy types-PyYAML types-jinja2 types-psycopg2 types-psutil typing-extensions
run: pip3 install -U pylint mypy types-PyYAML types-jinja2 types-psycopg2 types-psutil types-requests typing-extensions
- name: PHP linting
run: phpcs --report-width=120 .

View File

@@ -76,7 +76,8 @@ class UpdateReplication:
LOG.warning("Initialising replication updates")
with connect(args.config.get_libpq_dsn()) as conn:
replication.init_replication(conn, base_url=args.config.REPLICATION_URL)
replication.init_replication(conn, base_url=args.config.REPLICATION_URL,
socket_timeout=args.socket_timeout)
if args.update_functions:
LOG.warning("Create functions")
refresh.create_functions(conn, args.config, True, False)
@@ -87,7 +88,8 @@ class UpdateReplication:
from ..tools import replication
with connect(args.config.get_libpq_dsn()) as conn:
return replication.check_for_updates(conn, base_url=args.config.REPLICATION_URL)
return replication.check_for_updates(conn, base_url=args.config.REPLICATION_URL,
socket_timeout=args.socket_timeout)
def _report_update(self, batchdate: dt.datetime,
@@ -148,7 +150,7 @@ class UpdateReplication:
while True:
with connect(args.config.get_libpq_dsn()) as conn:
start = dt.datetime.now(dt.timezone.utc)
state = replication.update(conn, params)
state = replication.update(conn, params, socket_timeout=args.socket_timeout)
if state is not replication.UpdateState.NO_CHANGES:
status.log_status(conn, start, 'import')
batchdate, _, _ = status.get_status(conn)

View File

@@ -7,13 +7,16 @@
"""
Functions for updating a database from a replication source.
"""
from typing import ContextManager, MutableMapping, Any, Generator, cast
from typing import ContextManager, MutableMapping, Any, Generator, cast, Iterator
from contextlib import contextmanager
import datetime as dt
from enum import Enum
import logging
import time
import types
import urllib.request as urlrequest
import requests
from nominatim.db import status
from nominatim.db.connection import Connection
from nominatim.tools.exec_utils import run_osm2pgsql
@@ -22,6 +25,7 @@ from nominatim.errors import UsageError
try:
from osmium.replication.server import ReplicationServer
from osmium import WriteHandler
from osmium import version as pyo_version
except ImportError as exc:
logging.getLogger().critical("pyosmium not installed. Replication functions not available.\n"
"To install pyosmium via pip: pip3 install osmium")
@@ -29,7 +33,8 @@ except ImportError as exc:
LOG = logging.getLogger()
def init_replication(conn: Connection, base_url: str) -> None:
def init_replication(conn: Connection, base_url: str,
socket_timeout: int = 60) -> None:
""" Set up replication for the server at the given base URL.
"""
LOG.info("Using replication source: %s", base_url)
@@ -38,9 +43,8 @@ def init_replication(conn: Connection, base_url: str) -> None:
# margin of error to make sure we get all data
date -= dt.timedelta(hours=3)
repl = ReplicationServer(base_url)
seq = repl.timestamp_to_sequence(date)
with _make_replication_server(base_url, socket_timeout) as repl:
seq = repl.timestamp_to_sequence(date)
if seq is None:
LOG.fatal("Cannot reach the configured replication service '%s'.\n"
@@ -53,7 +57,8 @@ def init_replication(conn: Connection, base_url: str) -> None:
LOG.warning("Updates initialised at sequence %s (%s)", seq, date)
def check_for_updates(conn: Connection, base_url: str) -> int:
def check_for_updates(conn: Connection, base_url: str,
socket_timeout: int = 60) -> int:
""" Check if new data is available from the replication service at the
given base URL.
"""
@@ -64,7 +69,8 @@ def check_for_updates(conn: Connection, base_url: str) -> int:
"Please run 'nominatim replication --init' first.")
return 254
state = ReplicationServer(base_url).get_state_info()
with _make_replication_server(base_url, socket_timeout) as repl:
state = repl.get_state_info()
if state is None:
LOG.error("Cannot get state for URL %s.", base_url)
@@ -86,7 +92,8 @@ class UpdateState(Enum):
NO_CHANGES = 3
def update(conn: Connection, options: MutableMapping[str, Any]) -> UpdateState:
def update(conn: Connection, options: MutableMapping[str, Any],
socket_timeout: int = 60) -> UpdateState:
""" Update database from the next batch of data. Returns the state of
updates according to `UpdateState`.
"""
@@ -114,7 +121,7 @@ def update(conn: Connection, options: MutableMapping[str, Any]) -> UpdateState:
options['import_file'].unlink()
# Read updates into file.
with _make_replication_server(options['base_url']) as repl:
with _make_replication_server(options['base_url'], socket_timeout) as repl:
outhandler = WriteHandler(str(options['import_file']))
endseq = repl.apply_diffs(outhandler, startseq + 1,
max_size=options['max_diff_size'] * 1024)
@@ -136,14 +143,40 @@ def update(conn: Connection, options: MutableMapping[str, Any]) -> UpdateState:
return UpdateState.UP_TO_DATE
def _make_replication_server(url: str) -> ContextManager[ReplicationServer]:
def _make_replication_server(url: str, timeout: int) -> ContextManager[ReplicationServer]:
""" Returns a ReplicationServer in form of a context manager.
Creates a light wrapper around older versions of pyosmium that did
not support the context manager interface.
"""
if hasattr(ReplicationServer, '__enter__'):
return cast(ContextManager[ReplicationServer], ReplicationServer(url))
# Patches the open_url function for pyosmium >= 3.2
# where the socket timeout is no longer respected.
def patched_open_url(self: ReplicationServer, url: urlrequest.Request) -> Any:
""" Download a resource from the given URL and return a byte sequence
of the content.
"""
get_params = {
'headers': {"User-Agent" : f"Nominatim (pyosmium/{pyo_version.pyosmium_release})"},
'timeout': timeout or None,
'stream': True
}
if self.session is not None:
return self.session.get(url.get_full_url(), **get_params)
@contextmanager
def _get_url_with_session() -> Iterator[requests.Response]:
with requests.Session() as session:
request = session.get(url.get_full_url(), **get_params) # type: ignore
yield request
return _get_url_with_session()
repl = ReplicationServer(url)
repl.open_url = types.MethodType(patched_open_url, repl)
return cast(ContextManager[ReplicationServer], repl)
@contextmanager
def get_cm() -> Generator[ReplicationServer, None, None]: