pylint: avoid explicit use of format() function

Use psycopg2 SQL formatters for SQL and formatted string literals
everywhere else.
This commit is contained in:
Sarah Hoffmann
2022-05-11 08:59:28 +02:00
parent 4e1e166c6a
commit bb2bd76f91
17 changed files with 66 additions and 59 deletions

View File

@@ -234,7 +234,7 @@ def create_search_indices(conn, config, drop=False):
bad_indices = [row[0] for row in list(cur)]
for idx in bad_indices:
LOG.info("Drop invalid index %s.", idx)
cur.execute('DROP INDEX "{}"'.format(idx))
cur.execute(pysql.SQL('DROP INDEX {}').format(pysql.Identifier(idx)))
conn.commit()
sql = SQLPreprocessor(conn, config)

View File

@@ -55,10 +55,10 @@ def run_api_script(endpoint, project_dir, extra_env=None, phpcgi_bin=None,
query_string = urlencode(params or {})
env = dict(QUERY_STRING=query_string,
SCRIPT_NAME='/{}.php'.format(endpoint),
REQUEST_URI='/{}.php?{}'.format(endpoint, query_string),
SCRIPT_NAME=f'/{endpoint}.php',
REQUEST_URI=f'/{endpoint}.php?{query_string}',
CONTEXT_DOCUMENT_ROOT=webdir,
SCRIPT_FILENAME='{}/{}.php'.format(webdir, endpoint),
SCRIPT_FILENAME=f'{webdir}/{endpoint}.php',
HTTP_HOST='localhost',
HTTP_USER_AGENT='nominatim-tool',
REMOTE_ADDR='0.0.0.0',

View File

@@ -9,6 +9,8 @@ Functions for database migration to newer software versions.
"""
import logging
from psycopg2 import sql as pysql
from nominatim.db import properties
from nominatim.db.connection import connect
from nominatim.version import NOMINATIM_VERSION, version_str
@@ -47,7 +49,7 @@ def migrate(config, paths):
for version, func in _MIGRATION_FUNCTIONS:
if db_version <= version:
LOG.warning("Runnning: %s (%s)", func.__doc__.split('\n', 1)[0],
'{}.{}.{}-{}'.format(*version))
version_str(version))
kwargs = dict(conn=conn, config=config, paths=paths)
func(**kwargs)
conn.commit()
@@ -124,11 +126,12 @@ def add_nominatim_property_table(conn, config, **_):
"""
if not conn.table_exists('nominatim_properties'):
with conn.cursor() as cur:
cur.execute("""CREATE TABLE nominatim_properties (
property TEXT,
value TEXT);
GRANT SELECT ON TABLE nominatim_properties TO "{}";
""".format(config.DATABASE_WEBUSER))
cur.execute(pysql.SQL("""CREATE TABLE nominatim_properties (
property TEXT,
value TEXT);
GRANT SELECT ON TABLE nominatim_properties TO {};
""")
.format(pysql.Identifier(config.DATABASE_WEBUSER)))
@_migration(3, 6, 0, 0)
def change_housenumber_transliteration(conn, **_):
@@ -193,7 +196,8 @@ def install_legacy_tokenizer(conn, config, **_):
and column_name = 'token_info'""",
(table, ))
if has_column == 0:
cur.execute('ALTER TABLE {} ADD COLUMN token_info JSONB'.format(table))
cur.execute(pysql.SQL('ALTER TABLE {} ADD COLUMN token_info JSONB')
.format(pysql.Identifier(table)))
tokenizer = tokenizer_factory.create_tokenizer(config, init_db=False,
module_name='legacy')

View File

@@ -136,13 +136,13 @@ class _CountryPostcodesCollector:
def _open_external(self, project_dir):
fname = project_dir / '{}_postcodes.csv'.format(self.country)
fname = project_dir / f'{self.country}_postcodes.csv'
if fname.is_file():
LOG.info("Using external postcode file '%s'.", fname)
return open(fname, 'r')
fname = project_dir / '{}_postcodes.csv.gz'.format(self.country)
fname = project_dir / f'{self.country}_postcodes.csv.gz'
if fname.is_file():
LOG.info("Using external postcode file '%s'.", fname)

View File

@@ -52,16 +52,19 @@ def load_address_levels(conn, table, levels):
with conn.cursor() as cur:
cur.drop_table(table)
cur.execute("""CREATE TABLE {} (country_code varchar(2),
cur.execute(pysql.SQL("""CREATE TABLE {} (
country_code varchar(2),
class TEXT,
type TEXT,
rank_search SMALLINT,
rank_address SMALLINT)""".format(table))
rank_address SMALLINT)""")
.format(pysql.Identifier(table)))
cur.execute_values(pysql.SQL("INSERT INTO {} VALUES %s")
.format(pysql.Identifier(table)), rows)
cur.execute('CREATE UNIQUE INDEX ON {} (country_code, class, type)'.format(table))
cur.execute(pysql.SQL('CREATE UNIQUE INDEX ON {} (country_code, class, type)')
.format(pysql.Identifier(table)))
conn.commit()

View File

@@ -54,4 +54,4 @@ class SPCsvLoader(Iterator):
_, extension = os.path.splitext(self.csv_path)
if extension != '.csv':
raise UsageError('The file {} is not a csv file.'.format(self.csv_path))
raise UsageError(f'The file {self.csv_path} is not a csv file.')

View File

@@ -16,7 +16,7 @@
import logging
import re
from psycopg2.sql import Identifier, Literal, SQL
from psycopg2.sql import Identifier, SQL
from nominatim.tools.special_phrases.importer_statistics import SpecialPhrasesImporterStatistics
LOG = logging.getLogger()
@@ -195,35 +195,36 @@ class SPImporter():
"""
table_name = _classtype_table(phrase_class, phrase_type)
with self.db_connection.cursor() as db_cursor:
db_cursor.execute(SQL("""
CREATE TABLE IF NOT EXISTS {{}} {}
AS SELECT place_id AS place_id,st_centroid(geometry) AS centroid FROM placex
WHERE class = {{}} AND type = {{}}""".format(sql_tablespace))
.format(Identifier(table_name), Literal(phrase_class),
Literal(phrase_type)))
db_cursor.execute(SQL("""CREATE TABLE IF NOT EXISTS {} {} AS
SELECT place_id AS place_id,
st_centroid(geometry) AS centroid
FROM placex
WHERE class = %s AND type = %s""")
.format(Identifier(table_name), SQL(sql_tablespace)),
(phrase_class, phrase_type))
def _create_place_classtype_indexes(self, sql_tablespace, phrase_class, phrase_type):
"""
Create indexes on centroid and place_id for the place_classtype table.
"""
index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
index_prefix = f'idx_place_classtype_{phrase_class}_{phrase_type}_'
base_table = _classtype_table(phrase_class, phrase_type)
# Index on centroid
if not self.db_connection.index_exists(index_prefix + 'centroid'):
with self.db_connection.cursor() as db_cursor:
db_cursor.execute(SQL("""
CREATE INDEX {{}} ON {{}} USING GIST (centroid) {}""".format(sql_tablespace))
.format(Identifier(index_prefix + 'centroid'),
Identifier(base_table)), sql_tablespace)
db_cursor.execute(SQL("CREATE INDEX {} ON {} USING GIST (centroid) {}")
.format(Identifier(index_prefix + 'centroid'),
Identifier(base_table),
SQL(sql_tablespace)))
# Index on place_id
if not self.db_connection.index_exists(index_prefix + 'place_id'):
with self.db_connection.cursor() as db_cursor:
db_cursor.execute(SQL(
"""CREATE INDEX {{}} ON {{}} USING btree(place_id) {}""".format(sql_tablespace))
db_cursor.execute(SQL("CREATE INDEX {} ON {} USING btree(place_id) {}")
.format(Identifier(index_prefix + 'place_id'),
Identifier(base_table)))
Identifier(base_table),
SQL(sql_tablespace)))
def _grant_access_to_webuser(self, phrase_class, phrase_type):