mirror of
https://github.com/osm-search/Nominatim.git
synced 2026-02-16 15:47:58 +00:00
adapt to new type annotations from typeshed
Some more functions frrom psycopg are now properly annotated. No ignoring necessary anymore.
This commit is contained in:
@@ -55,7 +55,7 @@ class Cursor(psycopg2.extras.DictCursor):
|
|||||||
if self.rowcount != 1:
|
if self.rowcount != 1:
|
||||||
raise RuntimeError("Query did not return a single row.")
|
raise RuntimeError("Query did not return a single row.")
|
||||||
|
|
||||||
result = self.fetchone() # type: ignore[no-untyped-call]
|
result = self.fetchone()
|
||||||
assert result is not None
|
assert result is not None
|
||||||
|
|
||||||
return result[0]
|
return result[0]
|
||||||
@@ -131,7 +131,7 @@ class Connection(psycopg2.extensions.connection):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
if table is not None:
|
if table is not None:
|
||||||
row = cur.fetchone() # type: ignore[no-untyped-call]
|
row = cur.fetchone()
|
||||||
if row is None or not isinstance(row[0], str):
|
if row is None or not isinstance(row[0], str):
|
||||||
return False
|
return False
|
||||||
return row[0] == table
|
return row[0] == table
|
||||||
@@ -236,7 +236,7 @@ def get_pg_env(dsn: str,
|
|||||||
"""
|
"""
|
||||||
env = dict(base_env if base_env is not None else os.environ)
|
env = dict(base_env if base_env is not None else os.environ)
|
||||||
|
|
||||||
for param, value in psycopg2.extensions.parse_dsn(dsn).items(): # type: ignore
|
for param, value in psycopg2.extensions.parse_dsn(dsn).items():
|
||||||
if param in _PG_CONNECTION_STRINGS:
|
if param in _PG_CONNECTION_STRINGS:
|
||||||
env[_PG_CONNECTION_STRINGS[param]] = value
|
env[_PG_CONNECTION_STRINGS[param]] = value
|
||||||
else:
|
else:
|
||||||
|
|||||||
@@ -41,4 +41,7 @@ def get_property(conn: Connection, name: str) -> Optional[str]:
|
|||||||
if cur.rowcount == 0:
|
if cur.rowcount == 0:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
return cast(Optional[str], cur.fetchone()[0]) # type: ignore[no-untyped-call]
|
result = cur.fetchone()
|
||||||
|
assert result is not None
|
||||||
|
|
||||||
|
return cast(Optional[str], result[0])
|
||||||
|
|||||||
@@ -90,7 +90,7 @@ def get_status(conn: Connection) -> Tuple[Optional[dt.datetime], Optional[int],
|
|||||||
if cur.rowcount < 1:
|
if cur.rowcount < 1:
|
||||||
return None, None, None
|
return None, None, None
|
||||||
|
|
||||||
row = cast(StatusRow, cur.fetchone()) # type: ignore[no-untyped-call]
|
row = cast(StatusRow, cur.fetchone())
|
||||||
return row['lastimportdate'], row['sequence_id'], row['indexed']
|
return row['lastimportdate'], row['sequence_id'], row['indexed']
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -566,8 +566,9 @@ class ICUNameAnalyzer(AbstractAnalyzer):
|
|||||||
result = self._cache.housenumbers.get(norm_name, result)
|
result = self._cache.housenumbers.get(norm_name, result)
|
||||||
if result[0] is None:
|
if result[0] is None:
|
||||||
with self.conn.cursor() as cur:
|
with self.conn.cursor() as cur:
|
||||||
cur.execute("SELECT getorcreate_hnr_id(%s)", (norm_name, ))
|
hid = cur.scalar("SELECT getorcreate_hnr_id(%s)", (norm_name, ))
|
||||||
result = cur.fetchone()[0], norm_name # type: ignore[no-untyped-call]
|
|
||||||
|
result = hid, norm_name
|
||||||
self._cache.housenumbers[norm_name] = result
|
self._cache.housenumbers[norm_name] = result
|
||||||
else:
|
else:
|
||||||
# Otherwise use the analyzer to determine the canonical name.
|
# Otherwise use the analyzer to determine the canonical name.
|
||||||
@@ -580,9 +581,9 @@ class ICUNameAnalyzer(AbstractAnalyzer):
|
|||||||
variants = analyzer.compute_variants(word_id)
|
variants = analyzer.compute_variants(word_id)
|
||||||
if variants:
|
if variants:
|
||||||
with self.conn.cursor() as cur:
|
with self.conn.cursor() as cur:
|
||||||
cur.execute("SELECT create_analyzed_hnr_id(%s, %s)",
|
hid = cur.scalar("SELECT create_analyzed_hnr_id(%s, %s)",
|
||||||
(word_id, list(variants)))
|
(word_id, list(variants)))
|
||||||
result = cur.fetchone()[0], variants[0] # type: ignore[no-untyped-call]
|
result = hid, variants[0]
|
||||||
self._cache.housenumbers[word_id] = result
|
self._cache.housenumbers[word_id] = result
|
||||||
|
|
||||||
return result
|
return result
|
||||||
@@ -665,8 +666,7 @@ class ICUNameAnalyzer(AbstractAnalyzer):
|
|||||||
with self.conn.cursor() as cur:
|
with self.conn.cursor() as cur:
|
||||||
cur.execute("SELECT * FROM getorcreate_full_word(%s, %s)",
|
cur.execute("SELECT * FROM getorcreate_full_word(%s, %s)",
|
||||||
(token_id, variants))
|
(token_id, variants))
|
||||||
full, part = cast(Tuple[int, List[int]],
|
full, part = cast(Tuple[int, List[int]], cur.fetchone())
|
||||||
cur.fetchone()) # type: ignore[no-untyped-call]
|
|
||||||
|
|
||||||
self._cache.names[token_id] = (full, part)
|
self._cache.names[token_id] = (full, part)
|
||||||
|
|
||||||
|
|||||||
@@ -544,8 +544,9 @@ class _TokenInfo:
|
|||||||
|
|
||||||
with conn.cursor() as cur:
|
with conn.cursor() as cur:
|
||||||
cur.execute("SELECT * FROM create_housenumbers(%s)", (simple_list, ))
|
cur.execute("SELECT * FROM create_housenumbers(%s)", (simple_list, ))
|
||||||
self.data['hnr_tokens'], self.data['hnr'] = \
|
result = cur.fetchone()
|
||||||
cur.fetchone() # type: ignore[no-untyped-call]
|
assert result is not None
|
||||||
|
self.data['hnr_tokens'], self.data['hnr'] = result
|
||||||
|
|
||||||
|
|
||||||
def set_postcode(self, postcode: str) -> None:
|
def set_postcode(self, postcode: str) -> None:
|
||||||
@@ -574,8 +575,7 @@ class _TokenInfo:
|
|||||||
cur.execute("""SELECT make_keywords(hstore('name' , %s))::text,
|
cur.execute("""SELECT make_keywords(hstore('name' , %s))::text,
|
||||||
word_ids_from_name(%s)::text""",
|
word_ids_from_name(%s)::text""",
|
||||||
(name, name))
|
(name, name))
|
||||||
return cast(Tuple[List[int], List[int]],
|
return cast(Tuple[List[int], List[int]], cur.fetchone())
|
||||||
cur.fetchone()) # type: ignore[no-untyped-call]
|
|
||||||
|
|
||||||
self.data['place_search'], self.data['place_match'] = \
|
self.data['place_search'], self.data['place_match'] = \
|
||||||
self.cache.places.get(place, _get_place)
|
self.cache.places.get(place, _get_place)
|
||||||
@@ -589,8 +589,7 @@ class _TokenInfo:
|
|||||||
cur.execute("""SELECT addr_ids_from_name(%s)::text,
|
cur.execute("""SELECT addr_ids_from_name(%s)::text,
|
||||||
word_ids_from_name(%s)::text""",
|
word_ids_from_name(%s)::text""",
|
||||||
(name, name))
|
(name, name))
|
||||||
return cast(Tuple[List[int], List[int]],
|
return cast(Tuple[List[int], List[int]], cur.fetchone())
|
||||||
cur.fetchone()) # type: ignore[no-untyped-call]
|
|
||||||
|
|
||||||
tokens = {}
|
tokens = {}
|
||||||
for key, value in terms:
|
for key, value in terms:
|
||||||
|
|||||||
@@ -49,7 +49,7 @@ def _get_place_info(cursor: Cursor, osm_id: Optional[str],
|
|||||||
LOG.fatal("OSM object %s not found in database.", osm_id)
|
LOG.fatal("OSM object %s not found in database.", osm_id)
|
||||||
raise UsageError("OSM object not found")
|
raise UsageError("OSM object not found")
|
||||||
|
|
||||||
return cast(DictCursorResult, cursor.fetchone()) # type: ignore[no-untyped-call]
|
return cast(DictCursorResult, cursor.fetchone())
|
||||||
|
|
||||||
|
|
||||||
def analyse_indexing(config: Configuration, osm_id: Optional[str] = None,
|
def analyse_indexing(config: Configuration, osm_id: Optional[str] = None,
|
||||||
|
|||||||
Reference in New Issue
Block a user