add typing annotations for DB status module

Requires TypedDict which is only available from Python 3.8. Require
therefore typing_extensions to make the functions available for
earlier Python versions.
This commit is contained in:
Sarah Hoffmann
2022-07-04 11:29:12 +02:00
parent fc254fc744
commit 69f9122bef
6 changed files with 36 additions and 12 deletions

View File

@@ -31,7 +31,7 @@ class _Cursor(psycopg2.extras.DictCursor):
""" Query execution that logs the SQL query when debugging is enabled.
"""
if LOG.isEnabledFor(logging.DEBUG):
LOG.debug(self.mogrify(query, args).decode('utf-8')) # type: ignore
LOG.debug(self.mogrify(query, args).decode('utf-8')) # type: ignore[no-untyped-call]
super().execute(query, args)
@@ -55,7 +55,7 @@ class _Cursor(psycopg2.extras.DictCursor):
if self.rowcount != 1:
raise RuntimeError("Query did not return a single row.")
result = self.fetchone() # type: ignore
result = self.fetchone() # type: ignore[no-untyped-call]
assert result is not None
return result[0]
@@ -74,7 +74,7 @@ class _Cursor(psycopg2.extras.DictCursor):
if cascade:
sql += ' CASCADE'
self.execute(pysql.SQL(sql).format(pysql.Identifier(name))) # type: ignore
self.execute(pysql.SQL(sql).format(pysql.Identifier(name))) # type: ignore[no-untyped-call]
class Connection(psycopg2.extensions.connection):
@@ -131,7 +131,7 @@ class Connection(psycopg2.extensions.connection):
return False
if table is not None:
row = cur.fetchone() # type: ignore
row = cur.fetchone() # type: ignore[no-untyped-call]
if row is None or not isinstance(row[0], str):
return False
return row[0] == table

View File

@@ -7,17 +7,30 @@
"""
Access and helper functions for the status and status log table.
"""
from typing import Optional, Tuple, cast
import datetime as dt
import logging
import re
from typing_extensions import TypedDict
from nominatim.db.connection import Connection
from nominatim.tools.exec_utils import get_url
from nominatim.errors import UsageError
LOG = logging.getLogger()
ISODATE_FORMAT = '%Y-%m-%dT%H:%M:%S'
def compute_database_date(conn):
class StatusRow(TypedDict):
""" Dictionary of columns of the import_status table.
"""
lastimportdate: dt.datetime
sequence_id: Optional[int]
indexed: Optional[bool]
def compute_database_date(conn: Connection) -> dt.datetime:
""" Determine the date of the database from the newest object in the
data base.
"""
@@ -49,10 +62,12 @@ def compute_database_date(conn):
return dt.datetime.strptime(match.group(1), ISODATE_FORMAT).replace(tzinfo=dt.timezone.utc)
def set_status(conn, date, seq=None, indexed=True):
def set_status(conn: Connection, date: Optional[dt.datetime],
seq: Optional[int] = None, indexed: bool = True) -> None:
""" Replace the current status with the given status. If date is `None`
then only sequence and indexed will be updated as given. Otherwise
the whole status is replaced.
The change will be committed to the database.
"""
assert date is None or date.tzinfo == dt.timezone.utc
with conn.cursor() as cur:
@@ -67,7 +82,7 @@ def set_status(conn, date, seq=None, indexed=True):
conn.commit()
def get_status(conn):
def get_status(conn: Connection) -> Tuple[Optional[dt.datetime], Optional[int], Optional[bool]]:
""" Return the current status as a triple of (date, sequence, indexed).
If status has not been set up yet, a triple of None is returned.
"""
@@ -76,11 +91,11 @@ def get_status(conn):
if cur.rowcount < 1:
return None, None, None
row = cur.fetchone()
row = cast(StatusRow, cur.fetchone()) # type: ignore[no-untyped-call]
return row['lastimportdate'], row['sequence_id'], row['indexed']
def set_indexed(conn, state):
def set_indexed(conn: Connection, state: bool) -> None:
""" Set the indexed flag in the status table to the given state.
"""
with conn.cursor() as cur:
@@ -88,7 +103,8 @@ def set_indexed(conn, state):
conn.commit()
def log_status(conn, start, event, batchsize=None):
def log_status(conn: Connection, start: dt.datetime,
event: str, batchsize: Optional[int] = None) -> None:
""" Write a new status line to the `import_osmosis_log` table.
"""
with conn.cursor() as cur:
@@ -96,3 +112,4 @@ def log_status(conn, start, event, batchsize=None):
(batchend, batchseq, batchsize, starttime, endtime, event)
SELECT lastimportdate, sequence_id, %s, %s, now(), %s FROM import_status""",
(batchsize, start, event))
conn.commit()