clean up and document script

This commit is contained in:
Sarah Hoffmann
2020-01-20 22:19:33 +01:00
parent 4a9502bf88
commit 11c0dd235b

View File

@@ -39,38 +39,104 @@ def make_connection(options, asynchronous=False):
password=options.password, host=options.host,
port=options.port, async_=asynchronous)
class IndexingThread(object):
def __init__(self, thread_num, options):
log.debug("Creating thread {}".format(thread_num))
self.thread_num = thread_num
class RankRunner(object):
""" Returns SQL commands for indexing one rank within the placex table.
"""
def __init__(self, rank):
self.rank = rank
def name(self):
return "rank {}".format(self.rank)
def sql_index_sectors(self):
return """SELECT geometry_sector, count(*) FROM placex
WHERE rank_search = {} and indexed_status > 0
GROUP BY geometry_sector
ORDER BY geometry_sector""".format(self.rank)
def sql_nosector_places(self):
return """SELECT place_id FROM placex
WHERE indexed_status > 0 and rank_search = {}
ORDER BY geometry_sector""".format(self.rank)
def sql_sector_places(self):
return """SELECT place_id FROM placex
WHERE indexed_status > 0 and rank_search = {}
and geometry_sector = %s""".format(self.rank)
def sql_index_place(self):
return "UPDATE placex SET indexed_status = 0 WHERE place_id = %s"
class InterpolationRunner(object):
""" Returns SQL commands for indexing the address interpolation table
location_property_osmline.
"""
def name(self):
return "interpolation lines (location_property_osmline)"
def sql_index_sectors(self):
return """SELECT geometry_sector, count(*) FROM location_property_osmline
WHERE indexed_status > 0
GROUP BY geometry_sector
ORDER BY geometry_sector"""
def sql_nosector_places(self):
return """SELECT place_id FROM location_property_osmline
WHERE indexed_status > 0
ORDER BY geometry_sector"""
def sql_sector_places(self):
return """SELECT place_id FROM location_property_osmline
WHERE indexed_status > 0 and geometry_sector = %s
ORDER BY geometry_sector"""
def sql_index_place(self):
return """UPDATE location_property_osmline
SET indexed_status = 0 WHERE place_id = %s"""
class DBConnection(object):
""" A signle non-blocking database connection.
"""
def __init__(self, options):
self.conn = make_connection(options, asynchronous=True)
self.wait()
self.cursor = self.conn.cursor()
self.perform("SET lc_messages TO 'C'")
self.wait()
self.perform(InterpolationRunner.prepare())
self.wait()
self.perform(RankRunner.prepare())
self.wait()
self.current_query = None
self.current_params = None
def wait(self):
""" Block until any pending operation is done.
"""
wait_select(self.conn)
self.current_query = None
def perform(self, sql, args=None):
""" Send SQL query to the server. Returns immediately without
blocking.
"""
self.current_query = sql
self.current_params = args
self.cursor.execute(sql, args)
def fileno(self):
""" File descriptor to wait for. (Makes this class select()able.)
"""
return self.conn.fileno()
def is_done(self):
""" Check if the connection is available for a new query.
Also checks if the previous query has run into a deadlock.
If so, then the previous query is repeated.
"""
if self.current_query is None:
return True
@@ -79,40 +145,42 @@ class IndexingThread(object):
self.current_query = None
return True
except psycopg2.extensions.TransactionRollbackError as e:
if e.pgcode is None:
raise RuntimeError("Postgres exception has no error code")
if e.pgcode == '40P01':
log.info("Deadlock detected, retry.")
self.cursor.execute(self.current_query, self.current_params)
else:
raise
return False
class Indexer(object):
""" Main indexing routine.
"""
def __init__(self, options):
self.options = options
self.minrank = max(0, options.minrank)
self.maxrank = min(30, options.maxrank)
self.conn = make_connection(options)
self.threads = []
for i in range(options.threads):
t = IndexingThread(i, options)
self.threads.append(t)
self.threads = [DBConnection(options) for i in range(options.threads)]
def run(self):
""" Run indexing over the entire database.
"""
log.info("Starting indexing rank ({} to {}) using {} threads".format(
self.options.minrank, self.options.maxrank,
self.options.threads))
self.minrank, self.maxrank, len(self.threads)))
for rank in range(self.options.minrank, min(self.options.maxrank, 30)):
for rank in range(self.minrank, self.maxrank):
self.index(RankRunner(rank))
if self.options.maxrank >= 30:
if self.maxrank == 30:
self.index(InterpolationRunner())
self.index(RankRunner(30))
def index(self, obj):
""" Index a single rank or table. `obj` describes the SQL to use
for indexing.
"""
log.info("Starting {}".format(obj.name()))
cur = self.conn.cursor(name='main')
@@ -171,6 +239,9 @@ class Indexer(object):
done_tuples/diff_seconds, obj.name()))
def find_free_thread(self):
""" Generator that returns the next connection that is free for
sending a query.
"""
ready = self.threads
while True:
@@ -182,70 +253,6 @@ class Indexer(object):
assert(False, "Unreachable code")
class RankRunner(object):
def __init__(self, rank):
self.rank = rank
def name(self):
return "rank {}".format(self.rank)
@classmethod
def prepare(cls):
return """PREPARE rnk_index AS
UPDATE placex
SET indexed_status = 0 WHERE place_id = $1"""
def sql_index_sectors(self):
return """SELECT geometry_sector, count(*) FROM placex
WHERE rank_search = {} and indexed_status > 0
GROUP BY geometry_sector
ORDER BY geometry_sector""".format(self.rank)
def sql_nosector_places(self):
return """SELECT place_id FROM placex
WHERE indexed_status > 0 and rank_search = {}
ORDER BY geometry_sector""".format(self.rank)
def sql_sector_places(self):
return """SELECT place_id FROM placex
WHERE indexed_status > 0 and rank_search = {}
and geometry_sector = %s""".format(self.rank)
def sql_index_place(self):
return "EXECUTE rnk_index(%s)"
class InterpolationRunner(object):
def name(self):
return "interpolation lines (location_property_osmline)"
@classmethod
def prepare(cls):
return """PREPARE ipl_index AS
UPDATE location_property_osmline
SET indexed_status = 0 WHERE place_id = $1"""
def sql_index_sectors(self):
return """SELECT geometry_sector, count(*) FROM location_property_osmline
WHERE indexed_status > 0
GROUP BY geometry_sector
ORDER BY geometry_sector"""
def sql_nosector_places(self):
return """SELECT place_id FROM location_property_osmline
WHERE indexed_status > 0
ORDER BY geometry_sector"""
def sql_sector_places(self):
return """SELECT place_id FROM location_property_osmline
WHERE indexed_status > 0 and geometry_sector = %s
ORDER BY geometry_sector"""
def sql_index_place(self):
return "EXECUTE ipl_index(%s)"
def nominatim_arg_parser():
""" Setup the command-line parser for the tool.
@@ -253,7 +260,7 @@ def nominatim_arg_parser():
def h(s):
return re.sub("\s\s+" , " ", s)
p = ArgumentParser(description=__doc__,
p = ArgumentParser(description="Indexing tool for Nominatim.",
formatter_class=RawDescriptionHelpFormatter)
p.add_argument('-d', '--database',