add a function for the intial indexing run

Also moves postcodes to fully parallel indexing.
This commit is contained in:
Sarah Hoffmann
2021-02-25 11:25:01 +01:00
parent db5e78c879
commit 3c186f8030
7 changed files with 184 additions and 60 deletions

View File

@@ -64,6 +64,16 @@ if ($aCMDResult['verbose']) {
$oNominatimCmd->addParams('--verbose'); $oNominatimCmd->addParams('--verbose');
} }
// by default, use all but one processor, but never more than 15.
var_dump($aCMDResult);
$iInstances = max(1, $aCMDResult['threads'] ?? (min(16, getProcessorCount()) - 1));
function run($oCmd) {
global $iInstances;
$oCmd->addParams('--threads', $iInstances);
$oCmd->run(true);
}
//******************************************************* //*******************************************************
// Making some sanity check: // Making some sanity check:
@@ -81,7 +91,7 @@ $oSetup = new SetupFunctions($aCMDResult);
// go through complete process if 'all' is selected or start selected functions // go through complete process if 'all' is selected or start selected functions
if ($aCMDResult['create-db'] || $aCMDResult['all']) { if ($aCMDResult['create-db'] || $aCMDResult['all']) {
$bDidSomething = true; $bDidSomething = true;
(clone($oNominatimCmd))->addParams('transition', '--create-db')->run(true); run((clone($oNominatimCmd))->addParams('transition', '--create-db'));
} }
if ($aCMDResult['setup-db'] || $aCMDResult['all']) { if ($aCMDResult['setup-db'] || $aCMDResult['all']) {
@@ -92,7 +102,7 @@ if ($aCMDResult['setup-db'] || $aCMDResult['all']) {
$oCmd->addParams('--no-partitions'); $oCmd->addParams('--no-partitions');
} }
$oCmd->run(true); run($oCmd);
} }
if ($aCMDResult['import-data'] || $aCMDResult['all']) { if ($aCMDResult['import-data'] || $aCMDResult['all']) {
@@ -104,7 +114,7 @@ if ($aCMDResult['import-data'] || $aCMDResult['all']) {
$oCmd->addParams('--drop'); $oCmd->addParams('--drop');
} }
$oCmd->run(true); run($oCmd);
} }
if ($aCMDResult['create-functions'] || $aCMDResult['all']) { if ($aCMDResult['create-functions'] || $aCMDResult['all']) {
@@ -131,6 +141,7 @@ if ($aCMDResult['create-partition-functions'] || $aCMDResult['all']) {
if ($aCMDResult['import-wikipedia-articles'] || $aCMDResult['all']) { if ($aCMDResult['import-wikipedia-articles'] || $aCMDResult['all']) {
$bDidSomething = true; $bDidSomething = true;
// ignore errors!
(clone($oNominatimCmd))->addParams('refresh', '--wiki-data')->run(); (clone($oNominatimCmd))->addParams('refresh', '--wiki-data')->run();
} }
@@ -152,12 +163,17 @@ if ($aCMDResult['calculate-postcodes'] || $aCMDResult['all']) {
if ($aCMDResult['index'] || $aCMDResult['all']) { if ($aCMDResult['index'] || $aCMDResult['all']) {
$bDidSomething = true; $bDidSomething = true;
$oSetup->index($aCMDResult['index-noanalyse']); $oCmd = (clone($oNominatimCmd))->addParams('transition', '--index');
if ($aCMDResult['index-noanalyse'] ?? false) {
$oCmd->addParams('--no-analyse');
}
run($oCmd);
} }
if ($aCMDResult['drop']) { if ($aCMDResult['drop']) {
$bDidSomething = true; $bDidSomething = true;
(clone($oNominatimCmd))->addParams('freeze')->run(true); run((clone($oNominatimCmd))->addParams('freeze'));
} }
if ($aCMDResult['create-search-indices'] || $aCMDResult['all']) { if ($aCMDResult['create-search-indices'] || $aCMDResult['all']) {
@@ -172,7 +188,7 @@ if ($aCMDResult['create-country-names'] || $aCMDResult['all']) {
if ($aCMDResult['setup-website'] || $aCMDResult['all']) { if ($aCMDResult['setup-website'] || $aCMDResult['all']) {
$bDidSomething = true; $bDidSomething = true;
(clone($oNominatimCmd))->addParams('refresh', '--website')->run(true); run((clone($oNominatimCmd))->addParams('refresh', '--website'));
} }
// ****************************************************** // ******************************************************

View File

@@ -71,7 +71,6 @@ class SetupFunctions
if ($this->bVerbose) { if ($this->bVerbose) {
$this->oNominatimCmd->addParams('--verbose'); $this->oNominatimCmd->addParams('--verbose');
} }
$this->oNominatimCmd->addParams('--threads', $this->iInstances);
} }
public function createFunctions() public function createFunctions()
@@ -380,49 +379,6 @@ class SetupFunctions
$this->db()->exec($sSQL); $this->db()->exec($sSQL);
} }
public function index($bIndexNoanalyse)
{
$this->checkModulePresence(); // raises exception on failure
$oBaseCmd = (clone $this->oNominatimCmd)->addParams('index');
info('Index ranks 0 - 4');
$oCmd = (clone $oBaseCmd)->addParams('--maxrank', 4);
$iStatus = $oCmd->run();
if ($iStatus != 0) {
fail('error status ' . $iStatus . ' running nominatim!');
}
if (!$bIndexNoanalyse) $this->pgsqlRunScript('ANALYSE');
info('Index administrative boundaries');
$oCmd = (clone $oBaseCmd)->addParams('--boundaries-only');
$iStatus = $oCmd->run();
if ($iStatus != 0) {
fail('error status ' . $iStatus . ' running nominatim!');
}
info('Index ranks 5 - 25');
$oCmd = (clone $oBaseCmd)->addParams('--no-boundaries', '--minrank', 5, '--maxrank', 25);
$iStatus = $oCmd->run();
if ($iStatus != 0) {
fail('error status ' . $iStatus . ' running nominatim!');
}
if (!$bIndexNoanalyse) $this->pgsqlRunScript('ANALYSE');
info('Index ranks 26 - 30');
$oCmd = (clone $oBaseCmd)->addParams('--no-boundaries', '--minrank', 26);
$iStatus = $oCmd->run();
if ($iStatus != 0) {
fail('error status ' . $iStatus . ' running nominatim!');
}
info('Index postcodes');
$sSQL = 'UPDATE location_postcode SET indexed_status = 0';
$this->db()->exec($sSQL);
}
public function createSearchIndices() public function createSearchIndices()
{ {
info('Create Search indices'); info('Create Search indices');

View File

@@ -171,6 +171,8 @@ class SetupAll:
params.append('--ignore-errors') params.append('--ignore-errors')
if args.index_noanalyse: if args.index_noanalyse:
params.append('--index-noanalyse') params.append('--index-noanalyse')
if args.threads:
params.extend(('--threads', args.threads))
return run_legacy_script(*params, nominatim_env=args) return run_legacy_script(*params, nominatim_env=args)

View File

@@ -32,6 +32,8 @@ class AdminTransition:
help='Build a blank nominatim db') help='Build a blank nominatim db')
group.add_argument('--import-data', action='store_true', group.add_argument('--import-data', action='store_true',
help='Import a osm file') help='Import a osm file')
group.add_argument('--index', action='store_true',
help='Index the data')
group = parser.add_argument_group('Options') group = parser.add_argument_group('Options')
group.add_argument('--no-partitions', action='store_true', group.add_argument('--no-partitions', action='store_true',
help='Do not partition search indices') help='Do not partition search indices')
@@ -41,6 +43,8 @@ class AdminTransition:
help='Drop tables needed for updates, making the database readonly') help='Drop tables needed for updates, making the database readonly')
group.add_argument('--osm2pgsql-cache', metavar='SIZE', type=int, group.add_argument('--osm2pgsql-cache', metavar='SIZE', type=int,
help='Size of cache to be used by osm2pgsql (in MB)') help='Size of cache to be used by osm2pgsql (in MB)')
group.add_argument('--no-analyse', action='store_true',
help='Do not perform analyse operations during index')
@staticmethod @staticmethod
def run(args): def run(args):
@@ -69,3 +73,9 @@ class AdminTransition:
database_import.import_osm_data(Path(args.osm_file), database_import.import_osm_data(Path(args.osm_file),
args.osm2pgsql_options(0, 1), args.osm2pgsql_options(0, 1),
drop=args.drop) drop=args.drop)
if args.index:
LOG.warning('Indexing')
from ..indexer.indexer import Indexer
indexer = Indexer(args.config.get_libpq_dsn(), args.threads or 1)
indexer.index_full()

View File

@@ -61,8 +61,8 @@ class InterpolationRunner:
@staticmethod @staticmethod
def sql_index_place(ids): def sql_index_place(ids):
return """UPDATE location_property_osmline return """UPDATE location_property_osmline
SET indexed_status = 0 WHERE place_id IN ({})"""\ SET indexed_status = 0 WHERE place_id IN ({})
.format(','.join((str(i) for i in ids))) """.format(','.join((str(i) for i in ids)))
class BoundaryRunner: class BoundaryRunner:
""" Returns SQL commands for indexing the administrative boundaries """ Returns SQL commands for indexing the administrative boundaries
@@ -79,19 +79,46 @@ class BoundaryRunner:
return """SELECT count(*) FROM placex return """SELECT count(*) FROM placex
WHERE indexed_status > 0 WHERE indexed_status > 0
AND rank_search = {} AND rank_search = {}
AND class = 'boundary' and type = 'administrative'""".format(self.rank) AND class = 'boundary' and type = 'administrative'
""".format(self.rank)
def sql_get_objects(self): def sql_get_objects(self):
return """SELECT place_id FROM placex return """SELECT place_id FROM placex
WHERE indexed_status > 0 and rank_search = {} WHERE indexed_status > 0 and rank_search = {}
and class = 'boundary' and type = 'administrative' and class = 'boundary' and type = 'administrative'
ORDER BY partition, admin_level""".format(self.rank) ORDER BY partition, admin_level
""".format(self.rank)
@staticmethod @staticmethod
def sql_index_place(ids): def sql_index_place(ids):
return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\ return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
.format(','.join((str(i) for i in ids))) .format(','.join((str(i) for i in ids)))
class PostcodeRunner:
""" Provides the SQL commands for indexing the location_postcode table.
"""
@staticmethod
def name():
return "postcodes (location_postcode)"
@staticmethod
def sql_count_objects():
return 'SELECT count(*) FROM location_postcode WHERE indexed_status > 0'
@staticmethod
def sql_get_objects():
return """SELECT place_id FROM location_postcode
WHERE indexed_status > 0
ORDER BY country_code, postcode"""
@staticmethod
def sql_index_place(ids):
return """UPDATE location_postcode SET indexed_status = 0
WHERE place_id IN ({})
""".format(','.join((str(i) for i in ids)))
class Indexer: class Indexer:
""" Main indexing routine. """ Main indexing routine.
""" """
@@ -100,7 +127,36 @@ class Indexer:
self.conn = psycopg2.connect(dsn) self.conn = psycopg2.connect(dsn)
self.threads = [DBConnection(dsn) for _ in range(num_threads)] self.threads = [DBConnection(dsn) for _ in range(num_threads)]
def index_full(self, analyse=True):
""" Index the complete database. This will first index boudnaries
followed by all other objects. When `analyse` is True, then the
database will be analysed at the appropriate places to
ensure that database statistics are updated.
"""
self.index_by_rank(0, 4)
self._analyse_db_if(analyse)
self.index_boundaries(0, 30)
self._analyse_db_if(analyse)
self.index_by_rank(5, 25)
self._analyse_db_if(analyse)
self.index_by_rank(26, 30)
self._analyse_db_if(analyse)
self.index_postcodes()
self._analyse_db_if(analyse)
def _analyse_db_if(self, condition):
if condition:
with self.conn.cursor() as cur:
cur.execute('ANALYSE')
def index_boundaries(self, minrank, maxrank): def index_boundaries(self, minrank, maxrank):
""" Index only administrative boundaries within the given rank range.
"""
LOG.warning("Starting indexing boundaries using %s threads", LOG.warning("Starting indexing boundaries using %s threads",
len(self.threads)) len(self.threads))
@@ -108,7 +164,11 @@ class Indexer:
self.index(BoundaryRunner(rank)) self.index(BoundaryRunner(rank))
def index_by_rank(self, minrank, maxrank): def index_by_rank(self, minrank, maxrank):
""" Run classic indexing by rank. """ Index all entries of placex in the given rank range (inclusive)
in order of their address rank.
When rank 30 is requested then also interpolations and
places with address rank 0 will be indexed.
""" """
maxrank = min(maxrank, 30) maxrank = min(maxrank, 30)
LOG.warning("Starting indexing rank (%i to %i) using %i threads", LOG.warning("Starting indexing rank (%i to %i) using %i threads",
@@ -124,6 +184,12 @@ class Indexer:
else: else:
self.index(RankRunner(maxrank)) self.index(RankRunner(maxrank))
def index_postcodes(self):
"""Index the entries ofthe location_postcode table.
"""
self.index(PostcodeRunner(), 20)
def update_status_table(self): def update_status_table(self):
""" Update the status in the status table to 'indexed'. """ Update the status in the status table to 'indexed'.
""" """

View File

@@ -12,6 +12,7 @@ class IndexerTestDB:
def __init__(self, conn): def __init__(self, conn):
self.placex_id = itertools.count(100000) self.placex_id = itertools.count(100000)
self.osmline_id = itertools.count(500000) self.osmline_id = itertools.count(500000)
self.postcode_id = itertools.count(700000)
self.conn = conn self.conn = conn
self.conn.set_isolation_level(0) self.conn.set_isolation_level(0)
@@ -31,6 +32,12 @@ class IndexerTestDB:
indexed_status SMALLINT, indexed_status SMALLINT,
indexed_date TIMESTAMP, indexed_date TIMESTAMP,
geometry_sector INTEGER)""") geometry_sector INTEGER)""")
cur.execute("""CREATE TABLE location_postcode (
place_id BIGINT,
indexed_status SMALLINT,
indexed_date TIMESTAMP,
country_code varchar(2),
postcode TEXT)""")
cur.execute("""CREATE OR REPLACE FUNCTION date_update() RETURNS TRIGGER cur.execute("""CREATE OR REPLACE FUNCTION date_update() RETURNS TRIGGER
AS $$ AS $$
BEGIN BEGIN
@@ -39,10 +46,10 @@ class IndexerTestDB:
END IF; END IF;
RETURN NEW; RETURN NEW;
END; $$ LANGUAGE plpgsql;""") END; $$ LANGUAGE plpgsql;""")
cur.execute("""CREATE TRIGGER placex_update BEFORE UPDATE ON placex for table in ('placex', 'location_property_osmline', 'location_postcode'):
FOR EACH ROW EXECUTE PROCEDURE date_update()""") cur.execute("""CREATE TRIGGER {0}_update BEFORE UPDATE ON {0}
cur.execute("""CREATE TRIGGER osmline_update BEFORE UPDATE ON location_property_osmline FOR EACH ROW EXECUTE PROCEDURE date_update()
FOR EACH ROW EXECUTE PROCEDURE date_update()""") """.format(table))
def scalar(self, query): def scalar(self, query):
with self.conn.cursor() as cur: with self.conn.cursor() as cur:
@@ -74,6 +81,15 @@ class IndexerTestDB:
(next_id, sector)) (next_id, sector))
return next_id return next_id
def add_postcode(self, country, postcode):
next_id = next(self.postcode_id)
with self.conn.cursor() as cur:
cur.execute("""INSERT INTO location_postcode
(place_id, indexed_status, country_code, postcode)
VALUES (%s, 1, %s, %s)""",
(next_id, country, postcode))
return next_id
def placex_unindexed(self): def placex_unindexed(self):
return self.scalar('SELECT count(*) from placex where indexed_status > 0') return self.scalar('SELECT count(*) from placex where indexed_status > 0')
@@ -87,7 +103,7 @@ def test_db(temp_db_conn):
@pytest.mark.parametrize("threads", [1, 15]) @pytest.mark.parametrize("threads", [1, 15])
def test_index_full(test_db, threads): def test_index_all_by_rank(test_db, threads):
for rank in range(31): for rank in range(31):
test_db.add_place(rank_address=rank, rank_search=rank) test_db.add_place(rank_address=rank, rank_search=rank)
test_db.add_osmline() test_db.add_osmline()
@@ -184,3 +200,35 @@ def test_index_boundaries(test_db, threads):
assert 0 == test_db.scalar(""" assert 0 == test_db.scalar("""
SELECT count(*) FROM placex SELECT count(*) FROM placex
WHERE indexed_status = 0 AND class != 'boundary'""") WHERE indexed_status = 0 AND class != 'boundary'""")
@pytest.mark.parametrize("threads", [1, 15])
def test_index_postcodes(test_db, threads):
for postcode in range(1000):
test_db.add_postcode('de', postcode)
for postcode in range(32000, 33000):
test_db.add_postcode('us', postcode)
idx = Indexer('dbname=test_nominatim_python_unittest', threads)
idx.index_postcodes()
assert 0 == test_db.scalar("""SELECT count(*) FROM location_postcode
WHERE indexed_status != 0""")
def test_index_full(test_db):
for rank in range(4, 10):
test_db.add_admin(rank_address=rank, rank_search=rank)
for rank in range(31):
test_db.add_place(rank_address=rank, rank_search=rank)
test_db.add_osmline()
for postcode in range(1000):
test_db.add_postcode('de', postcode)
idx = Indexer('dbname=test_nominatim_python_unittest', 4)
idx.index_full()
assert 0 == test_db.placex_unindexed()
assert 0 == test_db.osmline_unindexed()
assert 0 == test_db.scalar("""SELECT count(*) FROM location_postcode
WHERE indexed_status != 0""")

View File

@@ -0,0 +1,26 @@
"""
Test for various refresh functions.
"""
from pathlib import Path
import pytest
from nominatim.tools import refresh
TEST_DIR = (Path(__file__) / '..' / '..').resolve()
def test_refresh_import_wikipedia_not_existing(dsn):
assert 1 == refresh.import_wikipedia_articles(dsn, Path('.'))
@pytest.mark.parametrize("replace", (True, False))
def test_refresh_import_wikipedia(dsn, table_factory, temp_db_cursor, replace):
if replace:
table_factory('wikipedia_article')
table_factory('wikipedia_redirect')
# use the small wikipedia file for the API testdb
assert 0 == refresh.import_wikipedia_articles(dsn, TEST_DIR / 'testdb')
assert temp_db_cursor.scalar('SELECT count(*) FROM wikipedia_article') > 0
assert temp_db_cursor.scalar('SELECT count(*) FROM wikipedia_redirect') > 0