Files
Nominatim/nominatim/indexer/indexer.py
Sarah Hoffmann 504922ffbe remove old nominatim.py in favour of 'nominatim index'
The PHP scripts need to know the position of the nominatim
tool in order to call it. This is handed in as environment
variable, so it can be set by the Python script.
2021-01-18 15:43:27 +01:00

200 lines
6.1 KiB
Python

"""
Main work horse for indexing (computing addresses) the database.
"""
# pylint: disable=C0111
import logging
import select
import psycopg2
from .progress import ProgressLogger
from ..db.async_connection import DBConnection
LOG = logging.getLogger()
class RankRunner:
""" Returns SQL commands for indexing one rank within the placex table.
"""
def __init__(self, rank):
self.rank = rank
def name(self):
return "rank {}".format(self.rank)
def sql_count_objects(self):
return """SELECT count(*) FROM placex
WHERE rank_address = {} and indexed_status > 0
""".format(self.rank)
def sql_get_objects(self):
return """SELECT place_id FROM placex
WHERE indexed_status > 0 and rank_address = {}
ORDER BY geometry_sector""".format(self.rank)
@staticmethod
def sql_index_place(ids):
return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
.format(','.join((str(i) for i in ids)))
class InterpolationRunner:
""" Returns SQL commands for indexing the address interpolation table
location_property_osmline.
"""
@staticmethod
def name():
return "interpolation lines (location_property_osmline)"
@staticmethod
def sql_count_objects():
return """SELECT count(*) FROM location_property_osmline
WHERE indexed_status > 0"""
@staticmethod
def sql_get_objects():
return """SELECT place_id FROM location_property_osmline
WHERE indexed_status > 0
ORDER BY geometry_sector"""
@staticmethod
def sql_index_place(ids):
return """UPDATE location_property_osmline
SET indexed_status = 0 WHERE place_id IN ({})"""\
.format(','.join((str(i) for i in ids)))
class BoundaryRunner:
""" Returns SQL commands for indexing the administrative boundaries
of a certain rank.
"""
def __init__(self, rank):
self.rank = rank
def name(self):
return "boundaries rank {}".format(self.rank)
def sql_count_objects(self):
return """SELECT count(*) FROM placex
WHERE indexed_status > 0
AND rank_search = {}
AND class = 'boundary' and type = 'administrative'""".format(self.rank)
def sql_get_objects(self):
return """SELECT place_id FROM placex
WHERE indexed_status > 0 and rank_search = {}
and class = 'boundary' and type = 'administrative'
ORDER BY partition, admin_level""".format(self.rank)
@staticmethod
def sql_index_place(ids):
return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
.format(','.join((str(i) for i in ids)))
class Indexer:
""" Main indexing routine.
"""
def __init__(self, dsn, num_threads):
self.conn = psycopg2.connect(dsn)
self.threads = [DBConnection(dsn) for _ in range(num_threads)]
def index_boundaries(self, minrank, maxrank):
LOG.warning("Starting indexing boundaries using %s threads",
len(self.threads))
for rank in range(max(minrank, 5), min(maxrank, 26)):
self.index(BoundaryRunner(rank))
def index_by_rank(self, minrank, maxrank):
""" Run classic indexing by rank.
"""
maxrank = min(maxrank, 30)
LOG.warning("Starting indexing rank (%i to %i) using %i threads",
minrank, maxrank, len(self.threads))
for rank in range(max(1, minrank), maxrank):
self.index(RankRunner(rank))
if maxrank == 30:
self.index(RankRunner(0))
self.index(InterpolationRunner(), 20)
self.index(RankRunner(30), 20)
else:
self.index(RankRunner(maxrank))
def update_status_table(self):
""" Update the status in the status table to 'indexed'.
"""
with self.conn.cursor() as cur:
cur.execute('UPDATE import_status SET indexed = true')
self.conn.commit()
def index(self, obj, batch=1):
""" Index a single rank or table. `obj` describes the SQL to use
for indexing. `batch` describes the number of objects that
should be processed with a single SQL statement
"""
LOG.warning("Starting %s (using batch size %s)", obj.name(), batch)
cur = self.conn.cursor()
cur.execute(obj.sql_count_objects())
total_tuples = cur.fetchone()[0]
LOG.debug("Total number of rows: %i", total_tuples)
cur.close()
progress = ProgressLogger(obj.name(), total_tuples)
if total_tuples > 0:
cur = self.conn.cursor(name='places')
cur.execute(obj.sql_get_objects())
next_thread = self.find_free_thread()
while True:
places = [p[0] for p in cur.fetchmany(batch)]
if not places:
break
LOG.debug("Processing places: %s", str(places))
thread = next(next_thread)
thread.perform(obj.sql_index_place(places))
progress.add(len(places))
cur.close()
for thread in self.threads:
thread.wait()
progress.done()
def find_free_thread(self):
""" Generator that returns the next connection that is free for
sending a query.
"""
ready = self.threads
command_stat = 0
while True:
for thread in ready:
if thread.is_done():
command_stat += 1
yield thread
# refresh the connections occasionaly to avoid potential
# memory leaks in Postgresql.
if command_stat > 100000:
for thread in self.threads:
while not thread.is_done():
thread.wait()
thread.connect()
command_stat = 0
ready = self.threads
else:
ready, _, _ = select.select(self.threads, [], [])
assert False, "Unreachable code"