diff --git a/docs/admin/Migration.md b/docs/admin/Migration.md index 0400ed43..917d3785 100644 --- a/docs/admin/Migration.md +++ b/docs/admin/Migration.md @@ -8,6 +8,23 @@ SQL statements should be executed from the PostgreSQL commandline. Execute ## 3.5.0 -> master +### Change order during indexing + +When reindexing places during updates, there is now a different order used +which needs a different database index. Create it with the following SQL command: + +```sql +CREATE INDEX idx_placex_pendingsector_rank_address + ON placex USING BTREE (rank_address, geometry_sector) where indexed_status > 0; +``` + +You can then drop the old index with: + +```sql +DROP INDEX idx_placex_pendingsector +``` + + ### Switching to dotenv As part of the work changing the configuration format, the configuration for diff --git a/lib/setup/SetupClass.php b/lib/setup/SetupClass.php index ba700a2c..2815f8c4 100755 --- a/lib/setup/SetupClass.php +++ b/lib/setup/SetupClass.php @@ -566,19 +566,27 @@ class SetupFunctions info('Index ranks 0 - 4'); $oCmd = (clone $oBaseCmd)->addParams('--maxrank', 4); echo $oCmd->escapedCmd(); - + $iStatus = $oCmd->run(); if ($iStatus != 0) { fail('error status ' . $iStatus . ' running nominatim!'); } if (!$bIndexNoanalyse) $this->pgsqlRunScript('ANALYSE'); + info('Index administrative boundaries'); + $oCmd = (clone $oBaseCmd)->addParams('-b'); + $iStatus = $oCmd->run(); + if ($iStatus != 0) { + fail('error status ' . $iStatus . ' running nominatim!'); + } + info('Index ranks 5 - 25'); $oCmd = (clone $oBaseCmd)->addParams('--minrank', 5, '--maxrank', 25); $iStatus = $oCmd->run(); if ($iStatus != 0) { fail('error status ' . $iStatus . ' running nominatim!'); } + if (!$bIndexNoanalyse) $this->pgsqlRunScript('ANALYSE'); info('Index ranks 26 - 30'); diff --git a/nominatim/indexer/db.py b/nominatim/indexer/db.py new file mode 100644 index 00000000..037c3fb2 --- /dev/null +++ b/nominatim/indexer/db.py @@ -0,0 +1,112 @@ +# SPDX-License-Identifier: GPL-2.0-only +# +# This file is part of Nominatim. +# Copyright (C) 2020 Sarah Hoffmann + +import logging +import psycopg2 +from psycopg2.extras import wait_select + +log = logging.getLogger() + +def make_connection(options, asynchronous=False): + params = {'dbname' : options.dbname, + 'user' : options.user, + 'password' : options.password, + 'host' : options.host, + 'port' : options.port, + 'async' : asynchronous} + + return psycopg2.connect(**params) + +class DBConnection(object): + """ A single non-blocking database connection. + """ + + def __init__(self, options): + self.current_query = None + self.current_params = None + self.options = options + + self.conn = None + self.connect() + + def connect(self): + """ (Re)connect to the database. Creates an asynchronous connection + with JIT and parallel processing disabled. If a connection was + already open, it is closed and a new connection established. + The caller must ensure that no query is pending before reconnecting. + """ + if self.conn is not None: + self.cursor.close() + self.conn.close() + + self.conn = make_connection(self.options, asynchronous=True) + self.wait() + + self.cursor = self.conn.cursor() + # Disable JIT and parallel workers as they are known to cause problems. + # Update pg_settings instead of using SET because it does not yield + # errors on older versions of Postgres where the settings are not + # implemented. + self.perform( + """ UPDATE pg_settings SET setting = -1 WHERE name = 'jit_above_cost'; + UPDATE pg_settings SET setting = 0 + WHERE name = 'max_parallel_workers_per_gather';""") + self.wait() + + def wait(self): + """ Block until any pending operation is done. + """ + while True: + try: + wait_select(self.conn) + self.current_query = None + return + except psycopg2.extensions.TransactionRollbackError as e: + if e.pgcode == '40P01': + log.info("Deadlock detected (params = {}), retry." + .format(self.current_params)) + self.cursor.execute(self.current_query, self.current_params) + else: + raise + except psycopg2.errors.DeadlockDetected: + self.cursor.execute(self.current_query, self.current_params) + + def perform(self, sql, args=None): + """ Send SQL query to the server. Returns immediately without + blocking. + """ + self.current_query = sql + self.current_params = args + self.cursor.execute(sql, args) + + def fileno(self): + """ File descriptor to wait for. (Makes this class select()able.) + """ + return self.conn.fileno() + + def is_done(self): + """ Check if the connection is available for a new query. + + Also checks if the previous query has run into a deadlock. + If so, then the previous query is repeated. + """ + if self.current_query is None: + return True + + try: + if self.conn.poll() == psycopg2.extensions.POLL_OK: + self.current_query = None + return True + except psycopg2.extensions.TransactionRollbackError as e: + if e.pgcode == '40P01': + log.info("Deadlock detected (params = {}), retry.".format(self.current_params)) + self.cursor.execute(self.current_query, self.current_params) + else: + raise + except psycopg2.errors.DeadlockDetected: + self.cursor.execute(self.current_query, self.current_params) + + return False + diff --git a/nominatim/nominatim.py b/nominatim/nominatim.py index e8600ca8..f46af9ff 100755 --- a/nominatim/nominatim.py +++ b/nominatim/nominatim.py @@ -28,25 +28,13 @@ import sys import re import getpass from datetime import datetime -import psycopg2 -from psycopg2.extras import wait_select import select from indexer.progress import ProgressLogger +from indexer.db import DBConnection, make_connection log = logging.getLogger() -def make_connection(options, asynchronous=False): - params = {'dbname' : options.dbname, - 'user' : options.user, - 'password' : options.password, - 'host' : options.host, - 'port' : options.port, - 'async' : asynchronous} - - return psycopg2.connect(**params) - - class RankRunner(object): """ Returns SQL commands for indexing one rank within the placex table. """ @@ -59,12 +47,12 @@ class RankRunner(object): def sql_count_objects(self): return """SELECT count(*) FROM placex - WHERE rank_search = {} and indexed_status > 0 + WHERE rank_address = {} and indexed_status > 0 """.format(self.rank) def sql_get_objects(self): return """SELECT place_id FROM placex - WHERE indexed_status > 0 and rank_search = {} + WHERE indexed_status > 0 and rank_address = {} ORDER BY geometry_sector""".format(self.rank) def sql_index_place(self, ids): @@ -94,113 +82,62 @@ class InterpolationRunner(object): SET indexed_status = 0 WHERE place_id IN ({})"""\ .format(','.join((str(i) for i in ids))) - -class DBConnection(object): - """ A single non-blocking database connection. +class BoundaryRunner(object): + """ Returns SQL commands for indexing the administrative boundaries + of a certain rank. """ - def __init__(self, options): - self.current_query = None - self.current_params = None + def __init__(self, rank): + self.rank = rank - self.conn = None - self.connect() + def name(self): + return "boundaries rank {}".format(self.rank) - def connect(self): - if self.conn is not None: - self.cursor.close() - self.conn.close() + def sql_count_objects(self): + return """SELECT count(*) FROM placex + WHERE indexed_status > 0 + AND rank_search = {} + AND class = 'boundary' and type = 'administrative'""".format(self.rank) - self.conn = make_connection(options, asynchronous=True) - self.wait() - - self.cursor = self.conn.cursor() - # Disable JIT and parallel workers as they are known to cause problems. - # Update pg_settings instead of using SET because it does not yield - # errors on older versions of Postgres where the settings are not - # implemented. - self.perform( - """ UPDATE pg_settings SET setting = -1 WHERE name = 'jit_above_cost'; - UPDATE pg_settings SET setting = 0 - WHERE name = 'max_parallel_workers_per_gather';""") - self.wait() - - def wait(self): - """ Block until any pending operation is done. - """ - while True: - try: - wait_select(self.conn) - self.current_query = None - return - except psycopg2.extensions.TransactionRollbackError as e: - if e.pgcode == '40P01': - log.info("Deadlock detected (params = {}), retry." - .format(self.current_params)) - self.cursor.execute(self.current_query, self.current_params) - else: - raise - except psycopg2.errors.DeadlockDetected: - self.cursor.execute(self.current_query, self.current_params) - - def perform(self, sql, args=None): - """ Send SQL query to the server. Returns immediately without - blocking. - """ - self.current_query = sql - self.current_params = args - self.cursor.execute(sql, args) - - def fileno(self): - """ File descriptor to wait for. (Makes this class select()able.) - """ - return self.conn.fileno() - - def is_done(self): - """ Check if the connection is available for a new query. - - Also checks if the previous query has run into a deadlock. - If so, then the previous query is repeated. - """ - if self.current_query is None: - return True - - try: - if self.conn.poll() == psycopg2.extensions.POLL_OK: - self.current_query = None - return True - except psycopg2.extensions.TransactionRollbackError as e: - if e.pgcode == '40P01': - log.info("Deadlock detected (params = {}), retry.".format(self.current_params)) - self.cursor.execute(self.current_query, self.current_params) - else: - raise - except psycopg2.errors.DeadlockDetected: - self.cursor.execute(self.current_query, self.current_params) - - return False + def sql_get_objects(self): + return """SELECT place_id FROM placex + WHERE indexed_status > 0 and rank_search = {} + and class = 'boundary' and type = 'administrative' + ORDER BY partition, admin_level""".format(self.rank) + def sql_index_place(self, ids): + return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\ + .format(','.join((str(i) for i in ids))) class Indexer(object): """ Main indexing routine. """ def __init__(self, options): - self.minrank = max(0, options.minrank) + self.minrank = max(1, options.minrank) self.maxrank = min(30, options.maxrank) self.conn = make_connection(options) self.threads = [DBConnection(options) for i in range(options.threads)] - def run(self): - """ Run indexing over the entire database. + def index_boundaries(self): + log.warning("Starting indexing boundaries using {} threads".format( + len(self.threads))) + + for rank in range(max(self.minrank, 5), min(self.maxrank, 26)): + self.index(BoundaryRunner(rank)) + + def index_by_rank(self): + """ Run classic indexing by rank. """ log.warning("Starting indexing rank ({} to {}) using {} threads".format( self.minrank, self.maxrank, len(self.threads))) - for rank in range(self.minrank, self.maxrank): + for rank in range(max(1, self.minrank), self.maxrank): self.index(RankRunner(rank)) + if self.maxrank == 30: + self.index(RankRunner(0), 20) self.index(InterpolationRunner(), 20) self.index(RankRunner(self.maxrank), 20) @@ -220,27 +157,28 @@ class Indexer(object): cur.close() - next_thread = self.find_free_thread() progress = ProgressLogger(obj.name(), total_tuples) - cur = self.conn.cursor(name='places') - cur.execute(obj.sql_get_objects()) + if total_tuples > 0: + cur = self.conn.cursor(name='places') + cur.execute(obj.sql_get_objects()) - while True: - places = [p[0] for p in cur.fetchmany(batch)] - if len(places) == 0: - break + next_thread = self.find_free_thread() + while True: + places = [p[0] for p in cur.fetchmany(batch)] + if len(places) == 0: + break - log.debug("Processing places: {}".format(places)) - thread = next(next_thread) + log.debug("Processing places: {}".format(places)) + thread = next(next_thread) - thread.perform(obj.sql_index_place(places)) - progress.add(len(places)) + thread.perform(obj.sql_index_place(places)) + progress.add(len(places)) - cur.close() + cur.close() - for t in self.threads: - t.wait() + for t in self.threads: + t.wait() progress.done() @@ -296,6 +234,9 @@ def nominatim_arg_parser(): p.add_argument('-P', '--port', dest='port', action='store', help='PostgreSQL server port') + p.add_argument('-b', '--boundary-only', + dest='boundary_only', action='store_true', + help='Only index administrative boundaries (ignores min/maxrank).') p.add_argument('-r', '--minrank', dest='minrank', type=int, metavar='RANK', default=0, help='Minimum/starting rank.') @@ -323,4 +264,7 @@ if __name__ == '__main__': password = getpass.getpass("Database password: ") options.password = password - Indexer(options).run() + if options.boundary_only: + Indexer(options).index_boundaries() + else: + Indexer(options).index_by_rank() diff --git a/sql/functions/normalization.sql b/sql/functions/normalization.sql index 66d0214a..1a8bbd84 100644 --- a/sql/functions/normalization.sql +++ b/sql/functions/normalization.sql @@ -207,16 +207,22 @@ CREATE OR REPLACE FUNCTION addr_ids_from_name(lookup_word TEXT) AS $$ DECLARE lookup_token TEXT; + id INTEGER; return_word_id INTEGER[]; BEGIN lookup_token := make_standard_name(lookup_word); SELECT array_agg(word_id) FROM word WHERE word_token = lookup_token and class is null and type is null INTO return_word_id; + IF return_word_id IS NULL THEN + id := nextval('seq_word'); + INSERT INTO word VALUES (id, lookup_token, null, null, null, null, 0); + return_word_id = ARRAY[id]; + END IF; RETURN return_word_id; END; $$ -LANGUAGE plpgsql STABLE; +LANGUAGE plpgsql; -- Normalize a string and look up its name ids (full words). diff --git a/sql/functions/placex_triggers.sql b/sql/functions/placex_triggers.sql index ccee3c82..38b3421f 100644 --- a/sql/functions/placex_triggers.sql +++ b/sql/functions/placex_triggers.sql @@ -581,6 +581,11 @@ BEGIN RETURN NEW; END IF; + -- Speed up searches - just use the centroid of the feature + -- cheaper but less acurate + NEW.centroid := ST_PointOnSurface(NEW.geometry); + --DEBUG: RAISE WARNING 'Computing preliminary centroid at %',ST_AsText(NEW.centroid); + -- recompute the ranks, they might change when linking changes SELECT * INTO NEW.rank_search, NEW.rank_address FROM compute_place_rank(NEW.country_code, @@ -591,8 +596,8 @@ BEGIN (NEW.extratags->'capital') = 'yes', NEW.address->'postcode'); -- We must always increase the address level relative to the admin boundary. - IF NEW.class = 'boundary' and NEW.type = 'administrative' THEN - parent_address_level := get_parent_address_level(NEW.geometry, NEW.admin_level); + IF NEW.class = 'boundary' and NEW.type = 'administrative' and NEW.osm_type = 'R' THEN + parent_address_level := get_parent_address_level(NEW.centroid, NEW.admin_level); IF parent_address_level >= NEW.rank_address THEN IF parent_address_level >= 24 THEN NEW.rank_address := 25; @@ -632,11 +637,6 @@ BEGIN END IF; END IF; - -- Speed up searches - just use the centroid of the feature - -- cheaper but less acurate - NEW.centroid := ST_PointOnSurface(NEW.geometry); - --DEBUG: RAISE WARNING 'Computing preliminary centroid at %',ST_AsText(NEW.centroid); - NEW.postcode := null; -- recalculate country and partition diff --git a/sql/indices_updates.src.sql b/sql/indices_updates.src.sql index 175bfba2..6d4c968e 100644 --- a/sql/indices_updates.src.sql +++ b/sql/indices_updates.src.sql @@ -1,7 +1,7 @@ -- Indices used only during search and update. -- These indices are created only after the indexing process is done. -CREATE INDEX CONCURRENTLY idx_placex_pendingsector ON placex USING BTREE (rank_search,geometry_sector) {ts:address-index} where indexed_status > 0; +CREATE INDEX CONCURRENTLY idx_placex_pendingsector ON placex USING BTREE (rank_address,geometry_sector) {ts:address-index} where indexed_status > 0; CREATE INDEX CONCURRENTLY idx_location_area_country_place_id ON location_area_country USING BTREE (place_id) {ts:address-index}; diff --git a/test/bdd/db/import/search_name.feature b/test/bdd/db/import/search_name.feature index c4e5bbce..8006045f 100644 --- a/test/bdd/db/import/search_name.feature +++ b/test/bdd/db/import/search_name.feature @@ -39,13 +39,13 @@ Feature: Creation of search terms | object | nameaddress_vector | | W1 | bonn, new york, smalltown | - Scenario: A known addr:* tag is not added if the name is unknown + Scenario: A known addr:* tag is added even if the name is unknown Given the scene roads-with-pois And the places | osm | class | type | name | addr+city | geometry | | W1 | highway | residential | Road | Nandu | :w-north | When importing - Then search_name contains not + Then search_name contains | object | nameaddress_vector | | W1 | nandu | diff --git a/utils/update.php b/utils/update.php index 9e74b4ba..f0b45b42 100644 --- a/utils/update.php +++ b/utils/update.php @@ -278,9 +278,11 @@ if ($aResult['recompute-word-counts']) { if ($aResult['index']) { $oCmd = (clone $oIndexCmd) - ->addParams('--minrank', $aResult['index-rank']); + ->addParams('--minrank', $aResult['index-rank'], '-b'); + $oCmd->run(); - // echo $oCmd->escapedCmd()."\n"; + $oCmd = (clone $oIndexCmd) + ->addParams('--minrank', $aResult['index-rank']); $oCmd->run(); $oDB->exec('update import_status set indexed = true'); @@ -421,9 +423,18 @@ if ($aResult['import-osmosis'] || $aResult['import-osmosis-all']) { // Index file if (!$aResult['no-index']) { - $oThisIndexCmd = clone($oIndexCmd); $fCMDStartTime = time(); + $oThisIndexCmd = clone($oIndexCmd); + $oThisIndexCmd->addParams('-b'); + echo $oThisIndexCmd->escapedCmd()."\n"; + $iErrorLevel = $oThisIndexCmd->run(); + if ($iErrorLevel) { + echo "Error: $iErrorLevel\n"; + exit($iErrorLevel); + } + + $oThisIndexCmd = clone($oIndexCmd); echo $oThisIndexCmd->escapedCmd()."\n"; $iErrorLevel = $oThisIndexCmd->run(); if ($iErrorLevel) {