implementaion of 'nominatim index'

This commit is contained in:
Sarah Hoffmann
2021-01-17 20:05:41 +01:00
parent 27977411e9
commit c77877a934
3 changed files with 49 additions and 31 deletions

View File

@@ -11,6 +11,17 @@ from pathlib import Path
from .config import Configuration from .config import Configuration
from .admin.exec_utils import run_legacy_script from .admin.exec_utils import run_legacy_script
from .indexer.indexer import Indexer
def _num_system_cpus():
try:
cpus = len(os.sched_getaffinity(0))
except NotImplementedError:
cpus = None
return cpus or os.cpu_count()
class CommandlineParser: class CommandlineParser:
""" Wraps some of the common functions for parsing the command line """ Wraps some of the common functions for parsing the command line
and setting up subcommands. and setting up subcommands.
@@ -297,11 +308,27 @@ class UpdateIndex:
@staticmethod @staticmethod
def add_args(parser): def add_args(parser):
pass group = parser.add_argument_group('Filter arguments')
group.add_argument('--boundaries-only', action='store_true',
help="""Index only administrative boundaries.""")
group.add_argument('--no-boundaries', action='store_true',
help="""Index everything except administrative boundaries.""")
group.add_argument('--minrank', '-r', type=int, metavar='RANK', default=0,
help='Minimum/starting rank')
group.add_argument('--maxrank', '-R', type=int, metavar='RANK', default=30,
help='Maximum/finishing rank')
@staticmethod @staticmethod
def run(args): def run(args):
return run_legacy_script('update.php', '--index', nominatim_env=args) indexer = Indexer(args.config.get_libpq_dsn(),
args.threads or _num_system_cpus() or 1)
if not args.no_boundaries:
indexer.index_boundaries(args.minrank, args.maxrank)
if not args.boundaries_only:
indexer.index_by_rank(args.minrank, args.maxrank)
return 0
class UpdateRefresh: class UpdateRefresh:

View File

@@ -11,26 +11,14 @@ from psycopg2.extras import wait_select
LOG = logging.getLogger() LOG = logging.getLogger()
def make_connection(options, asynchronous=False):
""" Create a psycopg2 connection from the given options.
"""
params = {'dbname' : options.dbname,
'user' : options.user,
'password' : options.password,
'host' : options.host,
'port' : options.port,
'async' : asynchronous}
return psycopg2.connect(**params)
class DBConnection: class DBConnection:
""" A single non-blocking database connection. """ A single non-blocking database connection.
""" """
def __init__(self, options): def __init__(self, dsn):
self.current_query = None self.current_query = None
self.current_params = None self.current_params = None
self.options = options self.dsn = dsn
self.conn = None self.conn = None
self.cursor = None self.cursor = None
@@ -46,7 +34,9 @@ class DBConnection:
self.cursor.close() self.cursor.close()
self.conn.close() self.conn.close()
self.conn = make_connection(self.options, asynchronous=True) # Use a dict to hand in the parameters because async is a reserved
# word in Python3.
self.conn = psycopg2.connect(**{'dsn' : self.dsn, 'async' : True})
self.wait() self.wait()
self.cursor = self.conn.cursor() self.cursor = self.conn.cursor()

View File

@@ -5,8 +5,10 @@ Main work horse for indexing (computing addresses) the database.
import logging import logging
import select import select
import psycopg2
from .progress import ProgressLogger from .progress import ProgressLogger
from db.async_connection import DBConnection, make_connection from ..db.async_connection import DBConnection
LOG = logging.getLogger() LOG = logging.getLogger()
@@ -94,34 +96,33 @@ class Indexer:
""" Main indexing routine. """ Main indexing routine.
""" """
def __init__(self, opts): def __init__(self, dsn, num_threads):
self.minrank = max(1, opts.minrank) self.conn = psycopg2.connect(dsn)
self.maxrank = min(30, opts.maxrank) self.threads = [DBConnection(dsn) for _ in range(num_threads)]
self.conn = make_connection(opts)
self.threads = [DBConnection(opts) for _ in range(opts.threads)]
def index_boundaries(self): def index_boundaries(self, minrank, maxrank):
LOG.warning("Starting indexing boundaries using %s threads", LOG.warning("Starting indexing boundaries using %s threads",
len(self.threads)) len(self.threads))
for rank in range(max(self.minrank, 5), min(self.maxrank, 26)): for rank in range(max(minrank, 5), min(maxrank, 26)):
self.index(BoundaryRunner(rank)) self.index(BoundaryRunner(rank))
def index_by_rank(self): def index_by_rank(self, minrank, maxrank):
""" Run classic indexing by rank. """ Run classic indexing by rank.
""" """
maxrank = min(maxrank, 30)
LOG.warning("Starting indexing rank (%i to %i) using %i threads", LOG.warning("Starting indexing rank (%i to %i) using %i threads",
self.minrank, self.maxrank, len(self.threads)) minrank, maxrank, len(self.threads))
for rank in range(max(1, self.minrank), self.maxrank): for rank in range(max(1, minrank), maxrank):
self.index(RankRunner(rank)) self.index(RankRunner(rank))
if self.maxrank == 30: if maxrank == 30:
self.index(RankRunner(0)) self.index(RankRunner(0))
self.index(InterpolationRunner(), 20) self.index(InterpolationRunner(), 20)
self.index(RankRunner(self.maxrank), 20) self.index(RankRunner(30), 20)
else: else:
self.index(RankRunner(self.maxrank)) self.index(RankRunner(maxrank))
def index(self, obj, batch=1): def index(self, obj, batch=1):
""" Index a single rank or table. `obj` describes the SQL to use """ Index a single rank or table. `obj` describes the SQL to use