prot load-data function to python

This commit is contained in:
Sarah Hoffmann
2021-02-25 21:32:40 +01:00
parent 3c186f8030
commit 57db5819ef
4 changed files with 101 additions and 129 deletions

View File

@@ -65,7 +65,6 @@ if ($aCMDResult['verbose']) {
} }
// by default, use all but one processor, but never more than 15. // by default, use all but one processor, but never more than 15.
var_dump($aCMDResult);
$iInstances = max(1, $aCMDResult['threads'] ?? (min(16, getProcessorCount()) - 1)); $iInstances = max(1, $aCMDResult['threads'] ?? (min(16, getProcessorCount()) - 1));
function run($oCmd) { function run($oCmd) {
@@ -147,7 +146,7 @@ if ($aCMDResult['import-wikipedia-articles'] || $aCMDResult['all']) {
if ($aCMDResult['load-data'] || $aCMDResult['all']) { if ($aCMDResult['load-data'] || $aCMDResult['all']) {
$bDidSomething = true; $bDidSomething = true;
$oSetup->loadData($aCMDResult['disable-token-precalc']); run((clone($oNominatimCmd))->addParams('transition', '--load-data'));
} }
if ($aCMDResult['import-tiger-data']) { if ($aCMDResult['import-tiger-data']) {

View File

@@ -119,133 +119,6 @@ class SetupFunctions
$this->pgsqlRunPartitionScript($sTemplate); $this->pgsqlRunPartitionScript($sTemplate);
} }
public function loadData($bDisableTokenPrecalc)
{
info('Drop old Data');
$oDB = $this->db();
$oDB->exec('TRUNCATE word');
echo '.';
$oDB->exec('TRUNCATE placex');
echo '.';
$oDB->exec('TRUNCATE location_property_osmline');
echo '.';
$oDB->exec('TRUNCATE place_addressline');
echo '.';
$oDB->exec('TRUNCATE location_area');
echo '.';
if (!$this->dbReverseOnly()) {
$oDB->exec('TRUNCATE search_name');
echo '.';
}
$oDB->exec('TRUNCATE search_name_blank');
echo '.';
$oDB->exec('DROP SEQUENCE seq_place');
echo '.';
$oDB->exec('CREATE SEQUENCE seq_place start 100000');
echo '.';
$sSQL = 'select distinct partition from country_name';
$aPartitions = $oDB->getCol($sSQL);
if (!$this->bNoPartitions) $aPartitions[] = 0;
foreach ($aPartitions as $sPartition) {
$oDB->exec('TRUNCATE location_road_'.$sPartition);
echo '.';
}
// used by getorcreate_word_id to ignore frequent partial words
$sSQL = 'CREATE OR REPLACE FUNCTION get_maxwordfreq() RETURNS integer AS ';
$sSQL .= '$$ SELECT '.getSetting('MAX_WORD_FREQUENCY').' as maxwordfreq; $$ LANGUAGE SQL IMMUTABLE';
$oDB->exec($sSQL);
echo ".\n";
// pre-create the word list
if (!$bDisableTokenPrecalc) {
info('Loading word list');
$this->pgsqlRunScriptFile(CONST_DataDir.'/words.sql');
}
info('Load Data');
$sColumns = 'osm_type, osm_id, class, type, name, admin_level, address, extratags, geometry';
$aDBInstances = array();
$iLoadThreads = max(1, $this->iInstances - 1);
for ($i = 0; $i < $iLoadThreads; $i++) {
// https://secure.php.net/manual/en/function.pg-connect.php
$DSN = getSetting('DATABASE_DSN');
$DSN = preg_replace('/^pgsql:/', '', $DSN);
$DSN = preg_replace('/;/', ' ', $DSN);
$aDBInstances[$i] = pg_connect($DSN, PGSQL_CONNECT_FORCE_NEW);
pg_ping($aDBInstances[$i]);
}
for ($i = 0; $i < $iLoadThreads; $i++) {
$sSQL = "INSERT INTO placex ($sColumns) SELECT $sColumns FROM place WHERE osm_id % $iLoadThreads = $i";
$sSQL .= " and not (class='place' and type='houses' and osm_type='W'";
$sSQL .= " and ST_GeometryType(geometry) = 'ST_LineString')";
$sSQL .= ' and ST_IsValid(geometry)';
if ($this->bVerbose) echo "$sSQL\n";
if (!pg_send_query($aDBInstances[$i], $sSQL)) {
fail(pg_last_error($aDBInstances[$i]));
}
}
// last thread for interpolation lines
// https://secure.php.net/manual/en/function.pg-connect.php
$DSN = getSetting('DATABASE_DSN');
$DSN = preg_replace('/^pgsql:/', '', $DSN);
$DSN = preg_replace('/;/', ' ', $DSN);
$aDBInstances[$iLoadThreads] = pg_connect($DSN, PGSQL_CONNECT_FORCE_NEW);
pg_ping($aDBInstances[$iLoadThreads]);
$sSQL = 'insert into location_property_osmline';
$sSQL .= ' (osm_id, address, linegeo)';
$sSQL .= ' SELECT osm_id, address, geometry from place where ';
$sSQL .= "class='place' and type='houses' and osm_type='W' and ST_GeometryType(geometry) = 'ST_LineString'";
if ($this->bVerbose) echo "$sSQL\n";
if (!pg_send_query($aDBInstances[$iLoadThreads], $sSQL)) {
fail(pg_last_error($aDBInstances[$iLoadThreads]));
}
$bFailed = false;
for ($i = 0; $i <= $iLoadThreads; $i++) {
while (($hPGresult = pg_get_result($aDBInstances[$i])) !== false) {
$resultStatus = pg_result_status($hPGresult);
// PGSQL_EMPTY_QUERY, PGSQL_COMMAND_OK, PGSQL_TUPLES_OK,
// PGSQL_COPY_OUT, PGSQL_COPY_IN, PGSQL_BAD_RESPONSE,
// PGSQL_NONFATAL_ERROR and PGSQL_FATAL_ERROR
// echo 'Query result ' . $i . ' is: ' . $resultStatus . "\n";
if ($resultStatus != PGSQL_COMMAND_OK && $resultStatus != PGSQL_TUPLES_OK) {
$resultError = pg_result_error($hPGresult);
echo '-- error text ' . $i . ': ' . $resultError . "\n";
$bFailed = true;
}
}
}
if ($bFailed) {
fail('SQL errors loading placex and/or location_property_osmline tables');
}
for ($i = 0; $i < $this->iInstances; $i++) {
pg_close($aDBInstances[$i]);
}
echo "\n";
info('Reanalysing database');
$this->pgsqlRunScript('ANALYSE');
$sDatabaseDate = getDatabaseDate($oDB);
$oDB->exec('TRUNCATE import_status');
if (!$sDatabaseDate) {
warn('could not determine database date.');
} else {
$sSQL = "INSERT INTO import_status (lastimportdate) VALUES('".$sDatabaseDate."')";
$oDB->exec($sSQL);
echo "Latest data imported from $sDatabaseDate.\n";
}
}
public function importTigerData($sTigerPath) public function importTigerData($sTigerPath)
{ {
info('Import Tiger data'); info('Import Tiger data');

View File

@@ -9,6 +9,7 @@ import logging
from pathlib import Path from pathlib import Path
from ..db.connection import connect from ..db.connection import connect
from ..db import status
from ..errors import UsageError from ..errors import UsageError
# Do not repeat documentation of subcommand classes. # Do not repeat documentation of subcommand classes.
@@ -32,6 +33,8 @@ class AdminTransition:
help='Build a blank nominatim db') help='Build a blank nominatim db')
group.add_argument('--import-data', action='store_true', group.add_argument('--import-data', action='store_true',
help='Import a osm file') help='Import a osm file')
group.add_argument('--load-data', action='store_true',
help='Copy data to live tables from import table')
group.add_argument('--index', action='store_true', group.add_argument('--index', action='store_true',
help='Index the data') help='Index the data')
group = parser.add_argument_group('Options') group = parser.add_argument_group('Options')
@@ -74,6 +77,20 @@ class AdminTransition:
args.osm2pgsql_options(0, 1), args.osm2pgsql_options(0, 1),
drop=args.drop) drop=args.drop)
if args.load_data:
LOG.warning('Load data')
with connect(args.config.get_libpq_dsn()) as conn:
database_import.truncate_data_tables(conn, args.config.MAX_WORD_FREQUENCY)
database_import.load_data(args.config.get_libpq_dsn(),
args.data_dir,
args.threads or 1)
with connect(args.config.get_libpq_dsn()) as conn:
try:
status.set_status(conn, status.compute_database_date(conn))
except Exception as exc: # pylint: disable=bare-except
LOG.error('Cannot determine date of database: %s', exc)
if args.index: if args.index:
LOG.warning('Indexing') LOG.warning('Indexing')
from ..indexer.indexer import Indexer from ..indexer.indexer import Indexer

View File

@@ -3,6 +3,7 @@ Functions for setting up and importing a new Nominatim database.
""" """
import logging import logging
import os import os
import selectors
import subprocess import subprocess
import shutil import shutil
from pathlib import Path from pathlib import Path
@@ -11,6 +12,7 @@ import psutil
from ..db.connection import connect, get_pg_env from ..db.connection import connect, get_pg_env
from ..db import utils as db_utils from ..db import utils as db_utils
from ..db.async_connection import DBConnection
from .exec_utils import run_osm2pgsql from .exec_utils import run_osm2pgsql
from ..errors import UsageError from ..errors import UsageError
from ..version import POSTGRESQL_REQUIRED_VERSION, POSTGIS_REQUIRED_VERSION from ..version import POSTGRESQL_REQUIRED_VERSION, POSTGIS_REQUIRED_VERSION
@@ -156,3 +158,84 @@ def import_osm_data(osm_file, options, drop=False):
if drop: if drop:
if options['flatnode_file']: if options['flatnode_file']:
Path(options['flatnode_file']).unlink() Path(options['flatnode_file']).unlink()
def truncate_data_tables(conn, max_word_frequency=None):
""" Truncate all data tables to prepare for a fresh load.
"""
with conn.cursor() as cur:
cur.execute('TRUNCATE word')
cur.execute('TRUNCATE placex')
cur.execute('TRUNCATE place_addressline')
cur.execute('TRUNCATE location_area')
cur.execute('TRUNCATE location_area_country')
cur.execute('TRUNCATE location_property')
cur.execute('TRUNCATE location_property_tiger')
cur.execute('TRUNCATE location_property_osmline')
cur.execute('TRUNCATE location_postcode')
cur.execute('TRUNCATE search_name')
cur.execute('DROP SEQUENCE seq_place')
cur.execute('CREATE SEQUENCE seq_place start 100000')
cur.execute("""SELECT tablename FROM pg_tables
WHERE tablename LIKE 'location_road_%'""")
for table in [r[0] for r in list(cur)]:
cur.execute('TRUNCATE ' + table)
if max_word_frequency is not None:
# Used by getorcreate_word_id to ignore frequent partial words.
cur.execute("""CREATE OR REPLACE FUNCTION get_maxwordfreq()
RETURNS integer AS $$
SELECT {} as maxwordfreq;
$$ LANGUAGE SQL IMMUTABLE
""".format(max_word_frequency))
conn.commit()
_COPY_COLUMNS = 'osm_type, osm_id, class, type, name, admin_level, address, extratags, geometry'
def load_data(dsn, data_dir, threads):
""" Copy data into the word and placex table.
"""
# Pre-calculate the most important terms in the word list.
db_utils.execute_file(dsn, data_dir / 'words.sql')
sel = selectors.DefaultSelector()
# Then copy data from place to placex in <threads - 1> chunks.
place_threads = max(1, threads - 1)
for imod in range(place_threads):
conn = DBConnection(dsn)
conn.connect()
conn.perform("""INSERT INTO placex ({0})
SELECT {0} FROM place
WHERE osm_id % {1} = {2}
AND NOT (class='place' and type='houses')
AND ST_IsValid(geometry)
""".format(_COPY_COLUMNS, place_threads, imod))
sel.register(conn, selectors.EVENT_READ, conn)
# Address interpolations go into another table.
conn = DBConnection(dsn)
conn.connect()
conn.perform("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
SELECT osm_id, address, geometry FROM place
WHERE class='place' and type='houses' and osm_type='W'
and ST_GeometryType(geometry) = 'ST_LineString'
""")
sel.register(conn, selectors.EVENT_READ, conn)
# Now wait for all of them to finish.
todo = place_threads + 1
while todo > 0:
for key, _ in sel.select(1):
conn = key.data
sel.unregister(conn)
conn.wait()
conn.close()
todo -= 1
print('.', end='', flush=True)
print('\n')
with connect(dsn) as conn:
with conn.cursor() as cur:
cur.execute('ANALYSE')