From fc50eb8688754b8eae4abf5e22aa921593b97935 Mon Sep 17 00:00:00 2001 From: Sarah Hoffmann Date: Fri, 14 Aug 2020 10:33:58 +0200 Subject: [PATCH 1/7] nominatim: move DBConnection class into its own file --- nominatim/indexer/db.py | 112 ++++++++++++++++++++++++++++++++++++++++ nominatim/nominatim.py | 100 +---------------------------------- 2 files changed, 113 insertions(+), 99 deletions(-) create mode 100644 nominatim/indexer/db.py diff --git a/nominatim/indexer/db.py b/nominatim/indexer/db.py new file mode 100644 index 00000000..037c3fb2 --- /dev/null +++ b/nominatim/indexer/db.py @@ -0,0 +1,112 @@ +# SPDX-License-Identifier: GPL-2.0-only +# +# This file is part of Nominatim. +# Copyright (C) 2020 Sarah Hoffmann + +import logging +import psycopg2 +from psycopg2.extras import wait_select + +log = logging.getLogger() + +def make_connection(options, asynchronous=False): + params = {'dbname' : options.dbname, + 'user' : options.user, + 'password' : options.password, + 'host' : options.host, + 'port' : options.port, + 'async' : asynchronous} + + return psycopg2.connect(**params) + +class DBConnection(object): + """ A single non-blocking database connection. + """ + + def __init__(self, options): + self.current_query = None + self.current_params = None + self.options = options + + self.conn = None + self.connect() + + def connect(self): + """ (Re)connect to the database. Creates an asynchronous connection + with JIT and parallel processing disabled. If a connection was + already open, it is closed and a new connection established. + The caller must ensure that no query is pending before reconnecting. + """ + if self.conn is not None: + self.cursor.close() + self.conn.close() + + self.conn = make_connection(self.options, asynchronous=True) + self.wait() + + self.cursor = self.conn.cursor() + # Disable JIT and parallel workers as they are known to cause problems. + # Update pg_settings instead of using SET because it does not yield + # errors on older versions of Postgres where the settings are not + # implemented. + self.perform( + """ UPDATE pg_settings SET setting = -1 WHERE name = 'jit_above_cost'; + UPDATE pg_settings SET setting = 0 + WHERE name = 'max_parallel_workers_per_gather';""") + self.wait() + + def wait(self): + """ Block until any pending operation is done. + """ + while True: + try: + wait_select(self.conn) + self.current_query = None + return + except psycopg2.extensions.TransactionRollbackError as e: + if e.pgcode == '40P01': + log.info("Deadlock detected (params = {}), retry." + .format(self.current_params)) + self.cursor.execute(self.current_query, self.current_params) + else: + raise + except psycopg2.errors.DeadlockDetected: + self.cursor.execute(self.current_query, self.current_params) + + def perform(self, sql, args=None): + """ Send SQL query to the server. Returns immediately without + blocking. + """ + self.current_query = sql + self.current_params = args + self.cursor.execute(sql, args) + + def fileno(self): + """ File descriptor to wait for. (Makes this class select()able.) + """ + return self.conn.fileno() + + def is_done(self): + """ Check if the connection is available for a new query. + + Also checks if the previous query has run into a deadlock. + If so, then the previous query is repeated. + """ + if self.current_query is None: + return True + + try: + if self.conn.poll() == psycopg2.extensions.POLL_OK: + self.current_query = None + return True + except psycopg2.extensions.TransactionRollbackError as e: + if e.pgcode == '40P01': + log.info("Deadlock detected (params = {}), retry.".format(self.current_params)) + self.cursor.execute(self.current_query, self.current_params) + else: + raise + except psycopg2.errors.DeadlockDetected: + self.cursor.execute(self.current_query, self.current_params) + + return False + diff --git a/nominatim/nominatim.py b/nominatim/nominatim.py index e8600ca8..67cd42ee 100755 --- a/nominatim/nominatim.py +++ b/nominatim/nominatim.py @@ -28,25 +28,13 @@ import sys import re import getpass from datetime import datetime -import psycopg2 -from psycopg2.extras import wait_select import select from indexer.progress import ProgressLogger +from indexer.db import DBConnection, make_connection log = logging.getLogger() -def make_connection(options, asynchronous=False): - params = {'dbname' : options.dbname, - 'user' : options.user, - 'password' : options.password, - 'host' : options.host, - 'port' : options.port, - 'async' : asynchronous} - - return psycopg2.connect(**params) - - class RankRunner(object): """ Returns SQL commands for indexing one rank within the placex table. """ @@ -95,92 +83,6 @@ class InterpolationRunner(object): .format(','.join((str(i) for i in ids))) -class DBConnection(object): - """ A single non-blocking database connection. - """ - - def __init__(self, options): - self.current_query = None - self.current_params = None - - self.conn = None - self.connect() - - def connect(self): - if self.conn is not None: - self.cursor.close() - self.conn.close() - - self.conn = make_connection(options, asynchronous=True) - self.wait() - - self.cursor = self.conn.cursor() - # Disable JIT and parallel workers as they are known to cause problems. - # Update pg_settings instead of using SET because it does not yield - # errors on older versions of Postgres where the settings are not - # implemented. - self.perform( - """ UPDATE pg_settings SET setting = -1 WHERE name = 'jit_above_cost'; - UPDATE pg_settings SET setting = 0 - WHERE name = 'max_parallel_workers_per_gather';""") - self.wait() - - def wait(self): - """ Block until any pending operation is done. - """ - while True: - try: - wait_select(self.conn) - self.current_query = None - return - except psycopg2.extensions.TransactionRollbackError as e: - if e.pgcode == '40P01': - log.info("Deadlock detected (params = {}), retry." - .format(self.current_params)) - self.cursor.execute(self.current_query, self.current_params) - else: - raise - except psycopg2.errors.DeadlockDetected: - self.cursor.execute(self.current_query, self.current_params) - - def perform(self, sql, args=None): - """ Send SQL query to the server. Returns immediately without - blocking. - """ - self.current_query = sql - self.current_params = args - self.cursor.execute(sql, args) - - def fileno(self): - """ File descriptor to wait for. (Makes this class select()able.) - """ - return self.conn.fileno() - - def is_done(self): - """ Check if the connection is available for a new query. - - Also checks if the previous query has run into a deadlock. - If so, then the previous query is repeated. - """ - if self.current_query is None: - return True - - try: - if self.conn.poll() == psycopg2.extensions.POLL_OK: - self.current_query = None - return True - except psycopg2.extensions.TransactionRollbackError as e: - if e.pgcode == '40P01': - log.info("Deadlock detected (params = {}), retry.".format(self.current_params)) - self.cursor.execute(self.current_query, self.current_params) - else: - raise - except psycopg2.errors.DeadlockDetected: - self.cursor.execute(self.current_query, self.current_params) - - return False - - class Indexer(object): """ Main indexing routine. """ From a4b30fc64962565469765521586b6736db99c0f0 Mon Sep 17 00:00:00 2001 From: Sarah Hoffmann Date: Fri, 14 Aug 2020 16:13:06 +0200 Subject: [PATCH 2/7] index admin boundaries before everything else Avoids irregularities that might happen because the address rank of a boundary is changed through linking. --- lib/setup/SetupClass.php | 10 +++++++++- nominatim/nominatim.py | 41 +++++++++++++++++++++++++++++++++++++--- utils/update.php | 17 ++++++++++++++--- 3 files changed, 61 insertions(+), 7 deletions(-) diff --git a/lib/setup/SetupClass.php b/lib/setup/SetupClass.php index ba700a2c..2815f8c4 100755 --- a/lib/setup/SetupClass.php +++ b/lib/setup/SetupClass.php @@ -566,19 +566,27 @@ class SetupFunctions info('Index ranks 0 - 4'); $oCmd = (clone $oBaseCmd)->addParams('--maxrank', 4); echo $oCmd->escapedCmd(); - + $iStatus = $oCmd->run(); if ($iStatus != 0) { fail('error status ' . $iStatus . ' running nominatim!'); } if (!$bIndexNoanalyse) $this->pgsqlRunScript('ANALYSE'); + info('Index administrative boundaries'); + $oCmd = (clone $oBaseCmd)->addParams('-b'); + $iStatus = $oCmd->run(); + if ($iStatus != 0) { + fail('error status ' . $iStatus . ' running nominatim!'); + } + info('Index ranks 5 - 25'); $oCmd = (clone $oBaseCmd)->addParams('--minrank', 5, '--maxrank', 25); $iStatus = $oCmd->run(); if ($iStatus != 0) { fail('error status ' . $iStatus . ' running nominatim!'); } + if (!$bIndexNoanalyse) $this->pgsqlRunScript('ANALYSE'); info('Index ranks 26 - 30'); diff --git a/nominatim/nominatim.py b/nominatim/nominatim.py index 67cd42ee..a4f4a62e 100755 --- a/nominatim/nominatim.py +++ b/nominatim/nominatim.py @@ -82,6 +82,29 @@ class InterpolationRunner(object): SET indexed_status = 0 WHERE place_id IN ({})"""\ .format(','.join((str(i) for i in ids))) +class BoundaryRunner(object): + """ Returns SQL commands for indexing the administrative boundaries + by partition. + """ + + def name(self): + return "boundaries" + + def sql_count_objects(self): + return """SELECT count(*) FROM placex + WHERE indexed_status > 0 + AND rank_search < 26 + AND class = 'boundary' and type = 'administrative'""" + + def sql_get_objects(self): + return """SELECT place_id FROM placex + WHERE indexed_status > 0 and rank_search < 26 + and class = 'boundary' and type = 'administrative' + ORDER BY partition, admin_level""" + + def sql_index_place(self, ids): + return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\ + .format(','.join((str(i) for i in ids))) class Indexer(object): """ Main indexing routine. @@ -93,8 +116,14 @@ class Indexer(object): self.conn = make_connection(options) self.threads = [DBConnection(options) for i in range(options.threads)] - def run(self): - """ Run indexing over the entire database. + def index_boundaries(self): + log.warning("Starting indexing boundaries using {} threads".format( + len(self.threads))) + + self.index(BoundaryRunner()) + + def index_by_rank(self): + """ Run classic indexing by rank. """ log.warning("Starting indexing rank ({} to {}) using {} threads".format( self.minrank, self.maxrank, len(self.threads))) @@ -198,6 +227,9 @@ def nominatim_arg_parser(): p.add_argument('-P', '--port', dest='port', action='store', help='PostgreSQL server port') + p.add_argument('-b', '--boundary-only', + dest='boundary_only', action='store_true', + help='Only index administrative boundaries (ignores min/maxrank).') p.add_argument('-r', '--minrank', dest='minrank', type=int, metavar='RANK', default=0, help='Minimum/starting rank.') @@ -225,4 +257,7 @@ if __name__ == '__main__': password = getpass.getpass("Database password: ") options.password = password - Indexer(options).run() + if options.boundary_only: + Indexer(options).index_boundaries() + else: + Indexer(options).index_by_rank() diff --git a/utils/update.php b/utils/update.php index 9e74b4ba..f0b45b42 100644 --- a/utils/update.php +++ b/utils/update.php @@ -278,9 +278,11 @@ if ($aResult['recompute-word-counts']) { if ($aResult['index']) { $oCmd = (clone $oIndexCmd) - ->addParams('--minrank', $aResult['index-rank']); + ->addParams('--minrank', $aResult['index-rank'], '-b'); + $oCmd->run(); - // echo $oCmd->escapedCmd()."\n"; + $oCmd = (clone $oIndexCmd) + ->addParams('--minrank', $aResult['index-rank']); $oCmd->run(); $oDB->exec('update import_status set indexed = true'); @@ -421,9 +423,18 @@ if ($aResult['import-osmosis'] || $aResult['import-osmosis-all']) { // Index file if (!$aResult['no-index']) { - $oThisIndexCmd = clone($oIndexCmd); $fCMDStartTime = time(); + $oThisIndexCmd = clone($oIndexCmd); + $oThisIndexCmd->addParams('-b'); + echo $oThisIndexCmd->escapedCmd()."\n"; + $iErrorLevel = $oThisIndexCmd->run(); + if ($iErrorLevel) { + echo "Error: $iErrorLevel\n"; + exit($iErrorLevel); + } + + $oThisIndexCmd = clone($oIndexCmd); echo $oThisIndexCmd->escapedCmd()."\n"; $iErrorLevel = $oThisIndexCmd->run(); if ($iErrorLevel) { From 3816b86a9e084483b3ad73661cbed063123fd8f0 Mon Sep 17 00:00:00 2001 From: Sarah Hoffmann Date: Sun, 16 Aug 2020 09:38:20 +0200 Subject: [PATCH 3/7] nominatim: also index boundaries by rank We need to make sure that the entry in serach_name from a lower rank is indeed available. --- nominatim/nominatim.py | 47 +++++++++++++++++++++++------------------- 1 file changed, 26 insertions(+), 21 deletions(-) diff --git a/nominatim/nominatim.py b/nominatim/nominatim.py index a4f4a62e..9e720609 100755 --- a/nominatim/nominatim.py +++ b/nominatim/nominatim.py @@ -84,23 +84,26 @@ class InterpolationRunner(object): class BoundaryRunner(object): """ Returns SQL commands for indexing the administrative boundaries - by partition. + of a certain rank. """ + def __init__(self, rank): + self.rank = rank + def name(self): - return "boundaries" + return "boundaries rank {}".format(self.rank) def sql_count_objects(self): return """SELECT count(*) FROM placex WHERE indexed_status > 0 - AND rank_search < 26 - AND class = 'boundary' and type = 'administrative'""" + AND rank_search = {} + AND class = 'boundary' and type = 'administrative'""".format(self.rank) def sql_get_objects(self): return """SELECT place_id FROM placex - WHERE indexed_status > 0 and rank_search < 26 + WHERE indexed_status > 0 and rank_search = {} and class = 'boundary' and type = 'administrative' - ORDER BY partition, admin_level""" + ORDER BY partition, admin_level""".format(self.rank) def sql_index_place(self, ids): return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\ @@ -120,7 +123,8 @@ class Indexer(object): log.warning("Starting indexing boundaries using {} threads".format( len(self.threads))) - self.index(BoundaryRunner()) + for rank in range(max(self.minrank, 5), min(self.maxrank, 26)): + self.index(BoundaryRunner(rank)) def index_by_rank(self): """ Run classic indexing by rank. @@ -151,27 +155,28 @@ class Indexer(object): cur.close() - next_thread = self.find_free_thread() progress = ProgressLogger(obj.name(), total_tuples) - cur = self.conn.cursor(name='places') - cur.execute(obj.sql_get_objects()) + if total_tuples > 0: + cur = self.conn.cursor(name='places') + cur.execute(obj.sql_get_objects()) - while True: - places = [p[0] for p in cur.fetchmany(batch)] - if len(places) == 0: - break + next_thread = self.find_free_thread() + while True: + places = [p[0] for p in cur.fetchmany(batch)] + if len(places) == 0: + break - log.debug("Processing places: {}".format(places)) - thread = next(next_thread) + log.debug("Processing places: {}".format(places)) + thread = next(next_thread) - thread.perform(obj.sql_index_place(places)) - progress.add(len(places)) + thread.perform(obj.sql_index_place(places)) + progress.add(len(places)) - cur.close() + cur.close() - for t in self.threads: - t.wait() + for t in self.threads: + t.wait() progress.done() From 1529666232c5d72813280c081a4dc086cd0f9910 Mon Sep 17 00:00:00 2001 From: Sarah Hoffmann Date: Sun, 16 Aug 2020 22:23:23 +0200 Subject: [PATCH 4/7] use only centroid to get parent admin boundaries Using the full geometry is far too expensive. --- sql/functions/placex_triggers.sql | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/functions/placex_triggers.sql b/sql/functions/placex_triggers.sql index ccee3c82..38b3421f 100644 --- a/sql/functions/placex_triggers.sql +++ b/sql/functions/placex_triggers.sql @@ -581,6 +581,11 @@ BEGIN RETURN NEW; END IF; + -- Speed up searches - just use the centroid of the feature + -- cheaper but less acurate + NEW.centroid := ST_PointOnSurface(NEW.geometry); + --DEBUG: RAISE WARNING 'Computing preliminary centroid at %',ST_AsText(NEW.centroid); + -- recompute the ranks, they might change when linking changes SELECT * INTO NEW.rank_search, NEW.rank_address FROM compute_place_rank(NEW.country_code, @@ -591,8 +596,8 @@ BEGIN (NEW.extratags->'capital') = 'yes', NEW.address->'postcode'); -- We must always increase the address level relative to the admin boundary. - IF NEW.class = 'boundary' and NEW.type = 'administrative' THEN - parent_address_level := get_parent_address_level(NEW.geometry, NEW.admin_level); + IF NEW.class = 'boundary' and NEW.type = 'administrative' and NEW.osm_type = 'R' THEN + parent_address_level := get_parent_address_level(NEW.centroid, NEW.admin_level); IF parent_address_level >= NEW.rank_address THEN IF parent_address_level >= 24 THEN NEW.rank_address := 25; @@ -632,11 +637,6 @@ BEGIN END IF; END IF; - -- Speed up searches - just use the centroid of the feature - -- cheaper but less acurate - NEW.centroid := ST_PointOnSurface(NEW.geometry); - --DEBUG: RAISE WARNING 'Computing preliminary centroid at %',ST_AsText(NEW.centroid); - NEW.postcode := null; -- recalculate country and partition From 73c449b97b70e5d5f28db81993b09d86829d0fa5 Mon Sep 17 00:00:00 2001 From: Sarah Hoffmann Date: Tue, 18 Aug 2020 16:58:58 +0200 Subject: [PATCH 5/7] switch indexind to address rank A place needs all lower address rank object indexed to make up the address. The search rank no longer ensures that as it can have a different ordering than the address rank. This switches indexing rank order to address ranks. Non-address objects (with address rank 0) are indexed together with POIs. --- nominatim/nominatim.py | 10 ++++++---- sql/indices_updates.src.sql | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/nominatim/nominatim.py b/nominatim/nominatim.py index 9e720609..f46af9ff 100755 --- a/nominatim/nominatim.py +++ b/nominatim/nominatim.py @@ -47,12 +47,12 @@ class RankRunner(object): def sql_count_objects(self): return """SELECT count(*) FROM placex - WHERE rank_search = {} and indexed_status > 0 + WHERE rank_address = {} and indexed_status > 0 """.format(self.rank) def sql_get_objects(self): return """SELECT place_id FROM placex - WHERE indexed_status > 0 and rank_search = {} + WHERE indexed_status > 0 and rank_address = {} ORDER BY geometry_sector""".format(self.rank) def sql_index_place(self, ids): @@ -114,7 +114,7 @@ class Indexer(object): """ def __init__(self, options): - self.minrank = max(0, options.minrank) + self.minrank = max(1, options.minrank) self.maxrank = min(30, options.maxrank) self.conn = make_connection(options) self.threads = [DBConnection(options) for i in range(options.threads)] @@ -132,10 +132,12 @@ class Indexer(object): log.warning("Starting indexing rank ({} to {}) using {} threads".format( self.minrank, self.maxrank, len(self.threads))) - for rank in range(self.minrank, self.maxrank): + for rank in range(max(1, self.minrank), self.maxrank): self.index(RankRunner(rank)) + if self.maxrank == 30: + self.index(RankRunner(0), 20) self.index(InterpolationRunner(), 20) self.index(RankRunner(self.maxrank), 20) diff --git a/sql/indices_updates.src.sql b/sql/indices_updates.src.sql index 175bfba2..6d4c968e 100644 --- a/sql/indices_updates.src.sql +++ b/sql/indices_updates.src.sql @@ -1,7 +1,7 @@ -- Indices used only during search and update. -- These indices are created only after the indexing process is done. -CREATE INDEX CONCURRENTLY idx_placex_pendingsector ON placex USING BTREE (rank_search,geometry_sector) {ts:address-index} where indexed_status > 0; +CREATE INDEX CONCURRENTLY idx_placex_pendingsector ON placex USING BTREE (rank_address,geometry_sector) {ts:address-index} where indexed_status > 0; CREATE INDEX CONCURRENTLY idx_location_area_country_place_id ON location_area_country USING BTREE (place_id) {ts:address-index}; From 984979d9bf448cf8dfab9be763c5e0ba6cf2f399 Mon Sep 17 00:00:00 2001 From: Sarah Hoffmann Date: Tue, 18 Aug 2020 21:40:53 +0200 Subject: [PATCH 6/7] add migration for new indxing schema --- docs/admin/Migration.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/docs/admin/Migration.md b/docs/admin/Migration.md index 0400ed43..917d3785 100644 --- a/docs/admin/Migration.md +++ b/docs/admin/Migration.md @@ -8,6 +8,23 @@ SQL statements should be executed from the PostgreSQL commandline. Execute ## 3.5.0 -> master +### Change order during indexing + +When reindexing places during updates, there is now a different order used +which needs a different database index. Create it with the following SQL command: + +```sql +CREATE INDEX idx_placex_pendingsector_rank_address + ON placex USING BTREE (rank_address, geometry_sector) where indexed_status > 0; +``` + +You can then drop the old index with: + +```sql +DROP INDEX idx_placex_pendingsector +``` + + ### Switching to dotenv As part of the work changing the configuration format, the configuration for From d6ff7475f1777f66cfc94847f6ba279fc0d10c2e Mon Sep 17 00:00:00 2001 From: Sarah Hoffmann Date: Wed, 19 Aug 2020 11:37:21 +0200 Subject: [PATCH 7/7] make sure that addr:* tags can always be searched for Always add contents of addr:* tags into address part of the search table, even when there is no corresponding other name. This keeps search tolerant to the kind of tagging where parts show up in the address that have no corresponding object in the database or where it is only an unaddressable object. --- sql/functions/normalization.sql | 8 +++++++- test/bdd/db/import/search_name.feature | 4 ++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/sql/functions/normalization.sql b/sql/functions/normalization.sql index 66d0214a..1a8bbd84 100644 --- a/sql/functions/normalization.sql +++ b/sql/functions/normalization.sql @@ -207,16 +207,22 @@ CREATE OR REPLACE FUNCTION addr_ids_from_name(lookup_word TEXT) AS $$ DECLARE lookup_token TEXT; + id INTEGER; return_word_id INTEGER[]; BEGIN lookup_token := make_standard_name(lookup_word); SELECT array_agg(word_id) FROM word WHERE word_token = lookup_token and class is null and type is null INTO return_word_id; + IF return_word_id IS NULL THEN + id := nextval('seq_word'); + INSERT INTO word VALUES (id, lookup_token, null, null, null, null, 0); + return_word_id = ARRAY[id]; + END IF; RETURN return_word_id; END; $$ -LANGUAGE plpgsql STABLE; +LANGUAGE plpgsql; -- Normalize a string and look up its name ids (full words). diff --git a/test/bdd/db/import/search_name.feature b/test/bdd/db/import/search_name.feature index c4e5bbce..8006045f 100644 --- a/test/bdd/db/import/search_name.feature +++ b/test/bdd/db/import/search_name.feature @@ -39,13 +39,13 @@ Feature: Creation of search terms | object | nameaddress_vector | | W1 | bonn, new york, smalltown | - Scenario: A known addr:* tag is not added if the name is unknown + Scenario: A known addr:* tag is added even if the name is unknown Given the scene roads-with-pois And the places | osm | class | type | name | addr+city | geometry | | W1 | highway | residential | Road | Nandu | :w-north | When importing - Then search_name contains not + Then search_name contains | object | nameaddress_vector | | W1 | nandu |