port replication update function to python

This commit is contained in:
Sarah Hoffmann
2021-01-30 15:50:34 +01:00
parent 8f0885f6cb
commit 4cb6dc01f3
13 changed files with 527 additions and 224 deletions

View File

@@ -2,8 +2,10 @@
Command-line interface to the Nominatim functions for import, update,
database administration and querying.
"""
import sys
import datetime as dt
import os
import sys
import time
import argparse
import logging
from pathlib import Path
@@ -11,6 +13,7 @@ from pathlib import Path
from .config import Configuration
from .tools.exec_utils import run_legacy_script, run_api_script
from .db.connection import connect
from .db import status
LOG = logging.getLogger()
@@ -88,6 +91,17 @@ class CommandlineParser:
return args.command.run(args)
def _osm2pgsql_options_from_args(args, default_cache, default_threads):
""" Set up the stanadrd osm2pgsql from the command line arguments.
"""
return dict(osm2pgsql=args.osm2pgsql_path,
osm2pgsql_cache=args.osm2pgsql_cache or default_cache,
osm2pgsql_style=args.config.get_import_style_file(),
threads=args.threads or default_threads,
dsn=args.config.get_libpq_dsn(),
flatnode_file=args.config.FLATNODE_FILE)
##### Subcommand classes
#
# Each class needs to implement two functions: add_args() adds the CLI parameters
@@ -231,6 +245,88 @@ class UpdateReplication:
group.add_argument('--no-index', action='store_false', dest='do_index',
help="""Do not index the new data. Only applicable
together with --once""")
group.add_argument('--osm2pgsql-cache', metavar='SIZE', type=int,
help='Size of cache to be used by osm2pgsql (in MB)')
@staticmethod
def _init_replication(args):
from .tools import replication, refresh
LOG.warning("Initialising replication updates")
conn = connect(args.config.get_libpq_dsn())
replication.init_replication(conn, base_url=args.config.REPLICATION_URL)
if args.update_functions:
LOG.warning("Create functions")
refresh.create_functions(conn, args.config, args.data_dir,
True, False)
conn.close()
return 0
@staticmethod
def _check_for_updates(args):
from .tools import replication
conn = connect(args.config.get_libpq_dsn())
ret = replication.check_for_updates(conn, base_url=args.config.REPLICATION_URL)
conn.close()
return ret
@staticmethod
def _update(args):
from .tools import replication
from .indexer.indexer import Indexer
params = _osm2pgsql_options_from_args(args, 2000, 1)
params.update(base_url=args.config.REPLICATION_URL,
update_interval=args.config.get_int('REPLICATION_UPDATE_INTERVAL'),
import_file=args.project_dir / 'osmosischange.osc',
max_diff_size=args.config.get_int('REPLICATION_MAX_DIFF'),
indexed_only=not args.once)
# Sanity check to not overwhelm the Geofabrik servers.
if 'download.geofabrik.de'in params['base_url']\
and params['update_interval'] < 86400:
LOG.fatal("Update interval too low for download.geofabrik.de.\n"
"Please check install documentation "
"(https://nominatim.org/release-docs/latest/admin/Import-and-Update#"
"setting-up-the-update-process).")
raise RuntimeError("Invalid replication update interval setting.")
if not args.once:
if not args.do_index:
LOG.fatal("Indexing cannot be disabled when running updates continuously.")
raise RuntimeError("Bad arguments.")
recheck_interval = args.config.get_int('REPLICATION_RECHECK_INTERVAL')
while True:
conn = connect(args.config.get_libpq_dsn())
start = dt.datetime.now(dt.timezone.utc)
state = replication.update(conn, params)
status.log_status(conn, start, 'import')
conn.close()
if state is not replication.UpdateState.NO_CHANGES and args.do_index:
start = dt.datetime.now(dt.timezone.utc)
indexer = Indexer(args.config.get_libpq_dsn(),
args.threads or 1)
indexer.index_boundaries(0, 30)
indexer.index_by_rank(0, 30)
conn = connect(args.config.get_libpq_dsn())
status.set_indexed(conn, True)
status.log_status(conn, start, 'index')
conn.close()
if args.once:
break
if state is replication.UpdateState.NO_CHANGES:
LOG.warning("No new changes. Sleeping for %d sec.", recheck_interval)
time.sleep(recheck_interval)
return state.value
@staticmethod
def run(args):
@@ -241,35 +337,13 @@ class UpdateReplication:
"To install pyosmium via pip: pip3 install osmium")
return 1
from .tools import replication, refresh
conn = connect(args.config.get_libpq_dsn())
params = ['update.php']
if args.init:
LOG.warning("Initialising replication updates")
replication.init_replication(conn, args.config.REPLICATION_URL)
if args.update_functions:
LOG.warning("Create functions")
refresh.create_functions(conn, args.config, args.data_dir,
True, False)
conn.close()
return 0
return UpdateReplication._init_replication(args)
if args.check_for_updates:
ret = replication.check_for_updates(conn, args.config.REPLICATION_URL)
conn.close()
return ret
if args.once:
params.append('--import-osmosis')
else:
params.append('--import-osmosis-all')
if not args.do_index:
params.append('--no-index')
return run_legacy_script(*params, nominatim_env=args)
return UpdateReplication._check_for_updates(args)
return UpdateReplication._update(args)
class UpdateAddData:
"""\
@@ -350,8 +424,11 @@ class UpdateIndex:
if not args.boundaries_only:
indexer.index_by_rank(args.minrank, args.maxrank)
if not args.no_boundaries and not args.boundaries_only:
indexer.update_status_table()
if not args.no_boundaries and not args.boundaries_only \
and args.minrank == 0 and args.maxrank == 30:
conn = connect(args.config.get_libpq_dsn())
status.set_indexed(conn, True)
conn.close()
return 0
@@ -390,25 +467,31 @@ class UpdateRefresh:
def run(args):
from .tools import refresh
conn = connect(args.config.get_libpq_dsn())
if args.postcodes:
LOG.warning("Update postcodes centroid")
conn = connect(args.config.get_libpq_dsn())
refresh.update_postcodes(conn, args.data_dir)
conn.close()
if args.word_counts:
LOG.warning('Recompute frequency of full-word search terms')
conn = connect(args.config.get_libpq_dsn())
refresh.recompute_word_counts(conn, args.data_dir)
conn.close()
if args.address_levels:
cfg = Path(args.config.ADDRESS_LEVEL_CONFIG)
LOG.warning('Updating address levels from %s', cfg)
conn = connect(args.config.get_libpq_dsn())
refresh.load_address_levels_from_file(conn, cfg)
conn.close()
if args.functions:
LOG.warning('Create functions')
conn = connect(args.config.get_libpq_dsn())
refresh.create_functions(conn, args.config, args.data_dir,
args.diffs, args.enable_debug_statements)
conn.close()
if args.wiki_data:
run_legacy_script('setup.php', '--import-wikipedia-articles',
@@ -421,8 +504,6 @@ class UpdateRefresh:
run_legacy_script('setup.php', '--setup-website',
nominatim_env=args, throw_on_fail=True)
conn.close()
return 0

View File

@@ -1,10 +1,14 @@
"""
Nominatim configuration accessor.
"""
import logging
import os
from pathlib import Path
from dotenv import dotenv_values
LOG = logging.getLogger()
class Configuration:
""" Load and manage the project configuration.
@@ -21,6 +25,7 @@ class Configuration:
def __init__(self, project_dir, config_dir):
self.project_dir = project_dir
self.config_dir = config_dir
self._config = dotenv_values(str((config_dir / 'env.defaults').resolve()))
if project_dir is not None:
self._config.update(dotenv_values(str((project_dir / '.env').resolve())))
@@ -38,24 +43,56 @@ class Configuration:
return os.environ.get(name) or self._config[name]
def get_bool(self, name):
""" Return the given configuration parameters as a boolean.
""" Return the given configuration parameter as a boolean.
Values of '1', 'yes' and 'true' are accepted as truthy values,
everything else is interpreted as false.
"""
return self.__getattr__(name).lower() in ('1', 'yes', 'true')
def get_int(self, name):
""" Return the given configuration parameter as an int.
"""
try:
return int(self.__getattr__(name))
except ValueError:
LOG.fatal("Invalid setting NOMINATIM_%s. Needs to be a number.", name)
raise
def get_libpq_dsn(self):
""" Get configured database DSN converted into the key/value format
understood by libpq and psycopg.
"""
dsn = self.DATABASE_DSN
def quote_param(param):
key, val = param.split('=')
val = val.replace('\\', '\\\\').replace("'", "\\'")
if ' ' in val:
val = "'" + val + "'"
return key + '=' + val
if dsn.startswith('pgsql:'):
# Old PHP DSN format. Convert before returning.
return dsn[6:].replace(';', ' ')
return ' '.join([quote_param(p) for p in dsn[6:].split(';')])
return dsn
def get_import_style_file(self):
""" Return the import style file as a path object. Translates the
name of the standard styles automatically into a file in the
config style.
"""
style = self.__getattr__('IMPORT_STYLE')
if style in ('admin', 'street', 'address', 'full', 'extratags'):
return self.config_dir / 'import-{}.style'.format(style)
return Path(style)
def get_os_env(self):
""" Return a copy of the OS environment with the Nominatim configuration
merged in.

View File

@@ -1,5 +1,5 @@
"""
Access and helper functions for the status table.
Access and helper functions for the status and status log table.
"""
import datetime as dt
import logging
@@ -61,3 +61,21 @@ def get_status(conn):
row = cur.fetchone()
return row['lastimportdate'], row['sequence_id'], row['indexed']
def set_indexed(conn, state):
""" Set the indexed flag in the status table to the given state.
"""
with conn.cursor() as cur:
cur.execute("UPDATE import_status SET indexed = %s", (state, ))
conn.commit()
def log_status(conn, start, event, batchsize=None):
""" Write a new status line to the `import_osmosis_log` table.
"""
with conn.cursor() as cur:
cur.execute("""INSERT INTO import_osmosis_log
(batchend, batchseq, batchsize, starttime, endtime, event)
SELECT lastimportdate, sequence_id, %s, %s, now(), %s FROM import_status""",
(batchsize, start, event))

View File

@@ -2,10 +2,13 @@
Helper functions for executing external programs.
"""
import logging
import os
import subprocess
import urllib.request as urlrequest
from urllib.parse import urlencode
from psycopg2.extensions import parse_dsn
from ..version import NOMINATIM_VERSION
LOG = logging.getLogger()
@@ -87,6 +90,41 @@ def run_api_script(endpoint, project_dir, extra_env=None, phpcgi_bin=None,
return 0
def run_osm2pgsql(options):
""" Run osm2pgsql with the given options.
"""
env = os.environ
cmd = [options['osm2pgsql'],
'--hstore', '--latlon', '--slim',
'--with-forward-dependencies', 'false',
'--log-progress', 'true',
'--number-processes', str(options['threads']),
'--cache', str(options['osm2pgsql_cache']),
'--output', 'gazetteer',
'--style', str(options['osm2pgsql_style'])
]
if options['append']:
cmd.append('--append')
if options['flatnode_file']:
cmd.extend(('--flat-nodes', options['flatnode_file']))
dsn = parse_dsn(options['dsn'])
if 'password' in dsn:
env['PGPASSWORD'] = dsn['password']
if 'dbname' in dsn:
cmd.extend(('-d', dsn['dbname']))
if 'user' in dsn:
cmd.extend(('--username', dsn['user']))
for param in ('host', 'port'):
if param in dsn:
cmd.extend(('--' + param, dsn[param]))
cmd.append(str(options['import_file']))
subprocess.run(cmd, cwd=options.get('cwd', '.'), env=env, check=True)
def get_url(url):
""" Get the contents from the given URL and return it as a UTF-8 string.
"""

View File

@@ -1,12 +1,16 @@
"""
Functions for updating a database from a replication source.
"""
import datetime
import datetime as dt
from enum import Enum
import logging
import time
from osmium.replication.server import ReplicationServer
from osmium import WriteHandler
from ..db import status
from .exec_utils import run_osm2pgsql
LOG = logging.getLogger()
@@ -17,7 +21,7 @@ def init_replication(conn, base_url):
date = status.compute_database_date(conn)
# margin of error to make sure we get all data
date -= datetime.timedelta(hours=3)
date -= dt.timedelta(hours=3)
repl = ReplicationServer(base_url)
@@ -53,7 +57,62 @@ def check_for_updates(conn, base_url):
if state.sequence <= seq:
LOG.warning("Database is up to date.")
return 1
return 2
LOG.warning("New data available (%i => %i).", seq, state.sequence)
return 0
class UpdateState(Enum):
""" Possible states after an update has run.
"""
UP_TO_DATE = 0
MORE_PENDING = 2
NO_CHANGES = 3
def update(conn, options):
""" Update database from the next batch of data. Returns the state of
updates according to `UpdateState`.
"""
startdate, startseq, indexed = status.get_status(conn)
if startseq is None:
LOG.error("Replication not set up. "
"Please run 'nominatim replication --init' first.")
raise RuntimeError("Replication not set up.")
if not indexed and options['indexed_only']:
LOG.info("Skipping update. There is data that needs indexing.")
return UpdateState.MORE_PENDING
last_since_update = dt.datetime.now(dt.timezone.utc) - startdate
update_interval = dt.timedelta(seconds=options['update_interval'])
if last_since_update < update_interval:
duration = (update_interval - last_since_update).seconds
LOG.warning("Sleeping for %s sec before next update.", duration)
time.sleep(duration)
if options['import_file'].exists():
options['import_file'].unlink()
# Read updates into file.
repl = ReplicationServer(options['base_url'])
outhandler = WriteHandler(str(options['import_file']))
endseq = repl.apply_diffs(outhandler, startseq,
max_size=options['max_diff_size'] * 1024)
outhandler.close()
if endseq is None:
return UpdateState.NO_CHANGES
# Consume updates with osm2pgsql.
options['append'] = True
run_osm2pgsql(options)
# Write the current status to the file
endstate = repl.get_state_info(endseq)
status.set_status(conn, endstate.timestamp, seq=endseq, indexed=False)
return UpdateState.UP_TO_DATE