fix errors reported by pylint

This commit is contained in:
Sarah Hoffmann
2021-01-14 21:36:31 +01:00
parent 565356613a
commit 8e53f63036
2 changed files with 101 additions and 96 deletions

View File

@@ -1,15 +1,19 @@
# SPDX-License-Identifier: GPL-2.0-only # SPDX-License-Identifier: GPL-2.0-only
# #
# This file is part of Nominatim. # This file is part of Nominatim.
# Copyright (C) 2020 Sarah Hoffmann # Copyright (C) 2021 by the Nominatim developer community.
# For a full list of authors see the git log.
""" Database helper functions for the indexer.
"""
import logging import logging
import psycopg2 import psycopg2
from psycopg2.extras import wait_select from psycopg2.extras import wait_select
log = logging.getLogger() LOG = logging.getLogger()
def make_connection(options, asynchronous=False): def make_connection(options, asynchronous=False):
""" Create a psycopg2 connection from the given options.
"""
params = {'dbname' : options.dbname, params = {'dbname' : options.dbname,
'user' : options.user, 'user' : options.user,
'password' : options.password, 'password' : options.password,
@@ -19,7 +23,7 @@ def make_connection(options, asynchronous=False):
return psycopg2.connect(**params) return psycopg2.connect(**params)
class DBConnection(object): class DBConnection:
""" A single non-blocking database connection. """ A single non-blocking database connection.
""" """
@@ -29,6 +33,7 @@ class DBConnection(object):
self.options = options self.options = options
self.conn = None self.conn = None
self.cursor = None
self.connect() self.connect()
def connect(self): def connect(self):
@@ -63,14 +68,14 @@ class DBConnection(object):
wait_select(self.conn) wait_select(self.conn)
self.current_query = None self.current_query = None
return return
except psycopg2.extensions.TransactionRollbackError as e: except psycopg2.extensions.TransactionRollbackError as error:
if e.pgcode == '40P01': if error.pgcode == '40P01':
log.info("Deadlock detected (params = {}), retry." LOG.info("Deadlock detected (params = %s), retry.",
.format(self.current_params)) str(self.current_params))
self.cursor.execute(self.current_query, self.current_params) self.cursor.execute(self.current_query, self.current_params)
else: else:
raise raise
except psycopg2.errors.DeadlockDetected: except psycopg2.errors.DeadlockDetected: # pylint: disable=E1101
self.cursor.execute(self.current_query, self.current_params) self.cursor.execute(self.current_query, self.current_params)
def perform(self, sql, args=None): def perform(self, sql, args=None):
@@ -99,14 +104,13 @@ class DBConnection(object):
if self.conn.poll() == psycopg2.extensions.POLL_OK: if self.conn.poll() == psycopg2.extensions.POLL_OK:
self.current_query = None self.current_query = None
return True return True
except psycopg2.extensions.TransactionRollbackError as e: except psycopg2.extensions.TransactionRollbackError as error:
if e.pgcode == '40P01': if error.pgcode == '40P01':
log.info("Deadlock detected (params = {}), retry.".format(self.current_params)) LOG.info("Deadlock detected (params = %s), retry.", str(self.current_params))
self.cursor.execute(self.current_query, self.current_params) self.cursor.execute(self.current_query, self.current_params)
else: else:
raise raise
except psycopg2.errors.DeadlockDetected: except psycopg2.errors.DeadlockDetected: # pylint: disable=E1101
self.cursor.execute(self.current_query, self.current_params) self.cursor.execute(self.current_query, self.current_params)
return False return False

View File

@@ -21,21 +21,19 @@
# along with this program; if not, write to the Free Software # along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#----------------------------------------------------------------------------- #-----------------------------------------------------------------------------
# pylint: disable=C0111
from argparse import ArgumentParser, RawDescriptionHelpFormatter, ArgumentTypeError from argparse import ArgumentParser, RawDescriptionHelpFormatter
import logging import logging
import sys import sys
import re
import getpass import getpass
from datetime import datetime
import select import select
from indexer.progress import ProgressLogger from indexer.progress import ProgressLogger # pylint: disable=E0401
from indexer.db import DBConnection, make_connection from indexer.db import DBConnection, make_connection # pylint: disable=E0401
log = logging.getLogger() LOG = logging.getLogger()
class RankRunner(object): class RankRunner:
""" Returns SQL commands for indexing one rank within the placex table. """ Returns SQL commands for indexing one rank within the placex table.
""" """
@@ -55,34 +53,39 @@ class RankRunner(object):
WHERE indexed_status > 0 and rank_address = {} WHERE indexed_status > 0 and rank_address = {}
ORDER BY geometry_sector""".format(self.rank) ORDER BY geometry_sector""".format(self.rank)
def sql_index_place(self, ids): @staticmethod
def sql_index_place(ids):
return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\ return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
.format(','.join((str(i) for i in ids))) .format(','.join((str(i) for i in ids)))
class InterpolationRunner(object): class InterpolationRunner:
""" Returns SQL commands for indexing the address interpolation table """ Returns SQL commands for indexing the address interpolation table
location_property_osmline. location_property_osmline.
""" """
def name(self): @staticmethod
def name():
return "interpolation lines (location_property_osmline)" return "interpolation lines (location_property_osmline)"
def sql_count_objects(self): @staticmethod
def sql_count_objects():
return """SELECT count(*) FROM location_property_osmline return """SELECT count(*) FROM location_property_osmline
WHERE indexed_status > 0""" WHERE indexed_status > 0"""
def sql_get_objects(self): @staticmethod
def sql_get_objects():
return """SELECT place_id FROM location_property_osmline return """SELECT place_id FROM location_property_osmline
WHERE indexed_status > 0 WHERE indexed_status > 0
ORDER BY geometry_sector""" ORDER BY geometry_sector"""
def sql_index_place(self, ids): @staticmethod
def sql_index_place(ids):
return """UPDATE location_property_osmline return """UPDATE location_property_osmline
SET indexed_status = 0 WHERE place_id IN ({})"""\ SET indexed_status = 0 WHERE place_id IN ({})"""\
.format(','.join((str(i) for i in ids))) .format(','.join((str(i) for i in ids)))
class BoundaryRunner(object): class BoundaryRunner:
""" Returns SQL commands for indexing the administrative boundaries """ Returns SQL commands for indexing the administrative boundaries
of a certain rank. of a certain rank.
""" """
@@ -105,23 +108,24 @@ class BoundaryRunner(object):
and class = 'boundary' and type = 'administrative' and class = 'boundary' and type = 'administrative'
ORDER BY partition, admin_level""".format(self.rank) ORDER BY partition, admin_level""".format(self.rank)
def sql_index_place(self, ids): @staticmethod
def sql_index_place(ids):
return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\ return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
.format(','.join((str(i) for i in ids))) .format(','.join((str(i) for i in ids)))
class Indexer(object): class Indexer:
""" Main indexing routine. """ Main indexing routine.
""" """
def __init__(self, options): def __init__(self, opts):
self.minrank = max(1, options.minrank) self.minrank = max(1, opts.minrank)
self.maxrank = min(30, options.maxrank) self.maxrank = min(30, opts.maxrank)
self.conn = make_connection(options) self.conn = make_connection(opts)
self.threads = [DBConnection(options) for i in range(options.threads)] self.threads = [DBConnection(opts) for _ in range(opts.threads)]
def index_boundaries(self): def index_boundaries(self):
log.warning("Starting indexing boundaries using {} threads".format( LOG.warning("Starting indexing boundaries using %s threads",
len(self.threads))) len(self.threads))
for rank in range(max(self.minrank, 5), min(self.maxrank, 26)): for rank in range(max(self.minrank, 5), min(self.maxrank, 26)):
self.index(BoundaryRunner(rank)) self.index(BoundaryRunner(rank))
@@ -129,8 +133,8 @@ class Indexer(object):
def index_by_rank(self): def index_by_rank(self):
""" Run classic indexing by rank. """ Run classic indexing by rank.
""" """
log.warning("Starting indexing rank ({} to {}) using {} threads".format( LOG.warning("Starting indexing rank (%i to %i) using %i threads",
self.minrank, self.maxrank, len(self.threads))) self.minrank, self.maxrank, len(self.threads))
for rank in range(max(1, self.minrank), self.maxrank): for rank in range(max(1, self.minrank), self.maxrank):
self.index(RankRunner(rank)) self.index(RankRunner(rank))
@@ -147,13 +151,13 @@ class Indexer(object):
for indexing. `batch` describes the number of objects that for indexing. `batch` describes the number of objects that
should be processed with a single SQL statement should be processed with a single SQL statement
""" """
log.warning("Starting %s (using batch size %s)", obj.name(), batch) LOG.warning("Starting %s (using batch size %s)", obj.name(), batch)
cur = self.conn.cursor() cur = self.conn.cursor()
cur.execute(obj.sql_count_objects()) cur.execute(obj.sql_count_objects())
total_tuples = cur.fetchone()[0] total_tuples = cur.fetchone()[0]
log.debug("Total number of rows: {}".format(total_tuples)) LOG.debug("Total number of rows: %i", total_tuples)
cur.close() cur.close()
@@ -166,10 +170,10 @@ class Indexer(object):
next_thread = self.find_free_thread() next_thread = self.find_free_thread()
while True: while True:
places = [p[0] for p in cur.fetchmany(batch)] places = [p[0] for p in cur.fetchmany(batch)]
if len(places) == 0: if not places:
break break
log.debug("Processing places: {}".format(places)) LOG.debug("Processing places: %s", str(places))
thread = next(next_thread) thread = next(next_thread)
thread.perform(obj.sql_index_place(places)) thread.perform(obj.sql_index_place(places))
@@ -177,8 +181,8 @@ class Indexer(object):
cur.close() cur.close()
for t in self.threads: for thread in self.threads:
t.wait() thread.wait()
progress.done() progress.done()
@@ -198,10 +202,10 @@ class Indexer(object):
# refresh the connections occasionaly to avoid potential # refresh the connections occasionaly to avoid potential
# memory leaks in Postgresql. # memory leaks in Postgresql.
if command_stat > 100000: if command_stat > 100000:
for t in self.threads: for thread in self.threads:
while not t.is_done(): while not thread.is_done():
t.wait() thread.wait()
t.connect() thread.connect()
command_stat = 0 command_stat = 0
ready = self.threads ready = self.threads
else: else:
@@ -213,58 +217,55 @@ class Indexer(object):
def nominatim_arg_parser(): def nominatim_arg_parser():
""" Setup the command-line parser for the tool. """ Setup the command-line parser for the tool.
""" """
def h(s): parser = ArgumentParser(description="Indexing tool for Nominatim.",
return re.sub("\s\s+" , " ", s)
p = ArgumentParser(description="Indexing tool for Nominatim.",
formatter_class=RawDescriptionHelpFormatter) formatter_class=RawDescriptionHelpFormatter)
p.add_argument('-d', '--database', parser.add_argument('-d', '--database',
dest='dbname', action='store', default='nominatim', dest='dbname', action='store', default='nominatim',
help='Name of the PostgreSQL database to connect to.') help='Name of the PostgreSQL database to connect to.')
p.add_argument('-U', '--username', parser.add_argument('-U', '--username',
dest='user', action='store', dest='user', action='store',
help='PostgreSQL user name.') help='PostgreSQL user name.')
p.add_argument('-W', '--password', parser.add_argument('-W', '--password',
dest='password_prompt', action='store_true', dest='password_prompt', action='store_true',
help='Force password prompt.') help='Force password prompt.')
p.add_argument('-H', '--host', parser.add_argument('-H', '--host',
dest='host', action='store', dest='host', action='store',
help='PostgreSQL server hostname or socket location.') help='PostgreSQL server hostname or socket location.')
p.add_argument('-P', '--port', parser.add_argument('-P', '--port',
dest='port', action='store', dest='port', action='store',
help='PostgreSQL server port') help='PostgreSQL server port')
p.add_argument('-b', '--boundary-only', parser.add_argument('-b', '--boundary-only',
dest='boundary_only', action='store_true', dest='boundary_only', action='store_true',
help='Only index administrative boundaries (ignores min/maxrank).') help='Only index administrative boundaries (ignores min/maxrank).')
p.add_argument('-r', '--minrank', parser.add_argument('-r', '--minrank',
dest='minrank', type=int, metavar='RANK', default=0, dest='minrank', type=int, metavar='RANK', default=0,
help='Minimum/starting rank.') help='Minimum/starting rank.')
p.add_argument('-R', '--maxrank', parser.add_argument('-R', '--maxrank',
dest='maxrank', type=int, metavar='RANK', default=30, dest='maxrank', type=int, metavar='RANK', default=30,
help='Maximum/finishing rank.') help='Maximum/finishing rank.')
p.add_argument('-t', '--threads', parser.add_argument('-t', '--threads',
dest='threads', type=int, metavar='NUM', default=1, dest='threads', type=int, metavar='NUM', default=1,
help='Number of threads to create for indexing.') help='Number of threads to create for indexing.')
p.add_argument('-v', '--verbose', parser.add_argument('-v', '--verbose',
dest='loglevel', action='count', default=0, dest='loglevel', action='count', default=0,
help='Increase verbosity') help='Increase verbosity')
return p return parser
if __name__ == '__main__': if __name__ == '__main__':
logging.basicConfig(stream=sys.stderr, format='%(levelname)s: %(message)s') logging.basicConfig(stream=sys.stderr, format='%(levelname)s: %(message)s')
options = nominatim_arg_parser().parse_args(sys.argv[1:]) OPTIONS = nominatim_arg_parser().parse_args(sys.argv[1:])
log.setLevel(max(3 - options.loglevel, 0) * 10) LOG.setLevel(max(3 - OPTIONS.loglevel, 0) * 10)
options.password = None OPTIONS.password = None
if options.password_prompt: if OPTIONS.password_prompt:
password = getpass.getpass("Database password: ") PASSWORD = getpass.getpass("Database password: ")
options.password = password OPTIONS.password = PASSWORD
if options.boundary_only: if OPTIONS.boundary_only:
Indexer(options).index_boundaries() Indexer(OPTIONS).index_boundaries()
else: else:
Indexer(options).index_by_rank() Indexer(OPTIONS).index_by_rank()