respect socket timeout also in other replication functions

This commit is contained in:
Sarah Hoffmann
2022-11-08 22:24:19 +01:00
parent 1fdcec985a
commit 6ddb39fda3
2 changed files with 12 additions and 8 deletions

View File

@@ -76,7 +76,8 @@ class UpdateReplication:
LOG.warning("Initialising replication updates") LOG.warning("Initialising replication updates")
with connect(args.config.get_libpq_dsn()) as conn: with connect(args.config.get_libpq_dsn()) as conn:
replication.init_replication(conn, base_url=args.config.REPLICATION_URL) replication.init_replication(conn, base_url=args.config.REPLICATION_URL,
socket_timeout=args.socket_timeout)
if args.update_functions: if args.update_functions:
LOG.warning("Create functions") LOG.warning("Create functions")
refresh.create_functions(conn, args.config, True, False) refresh.create_functions(conn, args.config, True, False)
@@ -87,7 +88,8 @@ class UpdateReplication:
from ..tools import replication from ..tools import replication
with connect(args.config.get_libpq_dsn()) as conn: with connect(args.config.get_libpq_dsn()) as conn:
return replication.check_for_updates(conn, base_url=args.config.REPLICATION_URL) return replication.check_for_updates(conn, base_url=args.config.REPLICATION_URL,
socket_timeout=args.socket_timeout)
def _report_update(self, batchdate: dt.datetime, def _report_update(self, batchdate: dt.datetime,

View File

@@ -33,7 +33,8 @@ except ImportError as exc:
LOG = logging.getLogger() LOG = logging.getLogger()
def init_replication(conn: Connection, base_url: str) -> None: def init_replication(conn: Connection, base_url: str,
socket_timeout: int = 60) -> None:
""" Set up replication for the server at the given base URL. """ Set up replication for the server at the given base URL.
""" """
LOG.info("Using replication source: %s", base_url) LOG.info("Using replication source: %s", base_url)
@@ -42,9 +43,8 @@ def init_replication(conn: Connection, base_url: str) -> None:
# margin of error to make sure we get all data # margin of error to make sure we get all data
date -= dt.timedelta(hours=3) date -= dt.timedelta(hours=3)
repl = ReplicationServer(base_url) with _make_replication_server(base_url, socket_timeout) as repl:
seq = repl.timestamp_to_sequence(date)
seq = repl.timestamp_to_sequence(date)
if seq is None: if seq is None:
LOG.fatal("Cannot reach the configured replication service '%s'.\n" LOG.fatal("Cannot reach the configured replication service '%s'.\n"
@@ -57,7 +57,8 @@ def init_replication(conn: Connection, base_url: str) -> None:
LOG.warning("Updates initialised at sequence %s (%s)", seq, date) LOG.warning("Updates initialised at sequence %s (%s)", seq, date)
def check_for_updates(conn: Connection, base_url: str) -> int: def check_for_updates(conn: Connection, base_url: str,
socket_timeout: int = 60) -> int:
""" Check if new data is available from the replication service at the """ Check if new data is available from the replication service at the
given base URL. given base URL.
""" """
@@ -68,7 +69,8 @@ def check_for_updates(conn: Connection, base_url: str) -> int:
"Please run 'nominatim replication --init' first.") "Please run 'nominatim replication --init' first.")
return 254 return 254
state = ReplicationServer(base_url).get_state_info() with _make_replication_server(base_url, socket_timeout) as repl:
state = repl.get_state_info()
if state is None: if state is None:
LOG.error("Cannot get state for URL %s.", base_url) LOG.error("Cannot get state for URL %s.", base_url)