Merge pull request #2197 from lonvia/use-jinja-for-sql-preprocessing

Use jinja2 for SQL preprocessing
This commit is contained in:
Sarah Hoffmann
2021-03-04 16:36:18 +01:00
committed by GitHub
29 changed files with 607 additions and 568 deletions

View File

@@ -79,20 +79,22 @@ class SetupAll:
drop=args.no_updates,
ignore_errors=args.ignore_errors)
LOG.warning('Create functions (1st pass)')
with connect(args.config.get_libpq_dsn()) as conn:
LOG.warning('Create functions (1st pass)')
refresh.create_functions(conn, args.config, args.sqllib_dir,
False, False)
LOG.warning('Create tables')
params = ['setup.php', '--create-tables', '--create-partition-tables']
if args.reverse_only:
params.append('--reverse-only')
run_legacy_script(*params, nominatim_env=args,
throw_on_fail=not args.ignore_errors)
LOG.warning('Create functions (2nd pass)')
with connect(args.config.get_libpq_dsn()) as conn:
LOG.warning('Create tables')
database_import.create_tables(conn, args.config, args.sqllib_dir,
reverse_only=args.reverse_only)
refresh.load_address_levels_from_file(conn, Path(args.config.ADDRESS_LEVEL_CONFIG))
LOG.warning('Create functions (2nd pass)')
refresh.create_functions(conn, args.config, args.sqllib_dir,
False, False)
LOG.warning('Create table triggers')
database_import.create_table_triggers(conn, args.config, args.sqllib_dir)
LOG.warning('Create partition tables')
database_import.create_partition_tables(conn, args.config, args.sqllib_dir)
LOG.warning('Create functions (3rd pass)')
refresh.create_functions(conn, args.config, args.sqllib_dir,
False, False)
@@ -124,10 +126,12 @@ class SetupAll:
indexer.index_full(analyse=not args.index_noanalyse)
LOG.warning('Post-process tables')
params = ['setup.php', '--create-search-indices', '--create-country-names']
if args.no_updates:
params.append('--drop')
run_legacy_script(*params, nominatim_env=args, throw_on_fail=not args.ignore_errors)
with connect(args.config.get_libpq_dsn()) as conn:
database_import.create_search_indices(conn, args.config,
args.sqllib_dir,
drop=args.no_updates)
run_legacy_script('setup.php', '--create-country-names',
nominatim_env=args, throw_on_fail=not args.ignore_errors)
webdir = args.project_dir / 'website'
LOG.warning('Setup website at %s', webdir)

View File

@@ -35,8 +35,14 @@ class AdminTransition:
help='Import a osm file')
group.add_argument('--load-data', action='store_true',
help='Copy data to live tables from import table')
group.add_argument('--create-tables', action='store_true',
help='Create main tables')
group.add_argument('--create-partition-tables', action='store_true',
help='Create required partition tables')
group.add_argument('--index', action='store_true',
help='Index the data')
group.add_argument('--create-search-indices', action='store_true',
help='Create additional indices required for search and update')
group = parser.add_argument_group('Options')
group.add_argument('--no-partitions', action='store_true',
help='Do not partition search indices')
@@ -50,10 +56,13 @@ class AdminTransition:
help='Do not perform analyse operations during index')
group.add_argument('--ignore-errors', action='store_true',
help="Ignore certain erros on import.")
group.add_argument('--reverse-only', action='store_true',
help='Do not create search tables and indexes')
@staticmethod
def run(args):
from ..tools import database_import
from ..tools import refresh
if args.create_db:
LOG.warning('Create DB')
@@ -80,6 +89,20 @@ class AdminTransition:
drop=args.drop,
ignore_errors=args.ignore_errors)
if args.create_tables:
LOG.warning('Create Tables')
with connect(args.config.get_libpq_dsn()) as conn:
database_import.create_tables(conn, args.config, args.sqllib_dir, args.reverse_only)
refresh.load_address_levels_from_file(conn, Path(args.config.ADDRESS_LEVEL_CONFIG))
refresh.create_functions(conn, args.config, args.sqllib_dir,
enable_diff_updates=False)
database_import.create_table_triggers(conn, args.config, args.sqllib_dir)
if args.create_partition_tables:
LOG.warning('Create Partition Tables')
with connect(args.config.get_libpq_dsn()) as conn:
database_import.create_partition_tables(conn, args.config, args.sqllib_dir)
if args.load_data:
LOG.warning('Load data')
with connect(args.config.get_libpq_dsn()) as conn:
@@ -99,3 +122,8 @@ class AdminTransition:
from ..indexer.indexer import Indexer
indexer = Indexer(args.config.get_libpq_dsn(), args.threads or 1)
indexer.index_full()
if args.create_search_indices:
LOG.warning('Create Search indices')
with connect(args.config.get_libpq_dsn()) as conn:
database_import.create_search_indices(conn, args.config, args.sqllib_dir, args.drop)

View File

@@ -0,0 +1,94 @@
"""
Preprocessing of SQL files.
"""
import jinja2
def _get_partitions(conn):
""" Get the set of partitions currently in use.
"""
with conn.cursor() as cur:
cur.execute('SELECT DISTINCT partition FROM country_name')
partitions = set([0])
for row in cur:
partitions.add(row[0])
return partitions
def _get_tables(conn):
""" Return the set of tables currently in use.
Only includes non-partitioned
"""
with conn.cursor() as cur:
cur.execute("SELECT tablename FROM pg_tables WHERE schemaname = 'public'")
return set((row[0] for row in list(cur)))
def _setup_tablespace_sql(config):
""" Returns a dict with tablespace expressions for the different tablespace
kinds depending on whether a tablespace is configured or not.
"""
out = {}
for subset in ('ADDRESS', 'SEARCH', 'AUX'):
for kind in ('DATA', 'INDEX'):
tspace = getattr(config, 'TABLESPACE_{}_{}'.format(subset, kind))
if tspace:
tspace = 'TABLESPACE "{}"'.format(tspace)
out['{}_{}'.format(subset.lower, kind.lower())] = tspace
return out
def _setup_postgres_sql(conn):
""" Set up a dictionary with various Postgresql/Postgis SQL terms which
are dependent on the database version in use.
"""
out = {}
pg_version = conn.server_version_tuple()
# CREATE INDEX IF NOT EXISTS was introduced in PG9.5.
# Note that you need to ignore failures on older versions when
# unsing this construct.
out['if_index_not_exists'] = ' IF NOT EXISTS ' if pg_version >= (9, 5, 0) else ''
return out
class SQLPreprocessor: # pylint: disable=too-few-public-methods
""" A environment for preprocessing SQL files from the
lib-sql directory.
The preprocessor provides a number of default filters and variables.
The variables may be overwritten when rendering an SQL file.
The preprocessing is currently based on the jinja2 templating library
and follows its syntax.
"""
def __init__(self, conn, config, sqllib_dir):
self.env = jinja2.Environment(autoescape=False,
loader=jinja2.FileSystemLoader(str(sqllib_dir)))
db_info = {}
db_info['partitions'] = _get_partitions(conn)
db_info['tables'] = _get_tables(conn)
db_info['reverse_only'] = 'search_name' not in db_info['tables']
db_info['tablespace'] = _setup_tablespace_sql(config)
self.env.globals['config'] = config
self.env.globals['db'] = db_info
self.env.globals['sql'] = _setup_postgres_sql(conn)
self.env.globals['modulepath'] = config.DATABASE_MODULE_PATH or \
str((config.project_dir / 'module').resolve())
def run_sql_file(self, conn, name, **kwargs):
""" Execute the given SQL file on the connection. The keyword arguments
may supply additional parameters for preprocessing.
"""
sql = self.env.get_template(name).render(**kwargs)
with conn.cursor() as cur:
cur.execute(sql)
conn.commit()

View File

@@ -14,6 +14,7 @@ import psycopg2
from ..db.connection import connect, get_pg_env
from ..db import utils as db_utils
from ..db.async_connection import DBConnection
from ..db.sql_preprocessor import SQLPreprocessor
from .exec_utils import run_osm2pgsql
from ..errors import UsageError
from ..version import POSTGRESQL_REQUIRED_VERSION, POSTGIS_REQUIRED_VERSION
@@ -178,6 +179,32 @@ def import_osm_data(osm_file, options, drop=False, ignore_errors=False):
Path(options['flatnode_file']).unlink()
def create_tables(conn, config, sqllib_dir, reverse_only=False):
""" Create the set of basic tables.
When `reverse_only` is True, then the main table for searching will
be skipped and only reverse search is possible.
"""
sql = SQLPreprocessor(conn, config, sqllib_dir)
sql.env.globals['db']['reverse_only'] = reverse_only
sql.run_sql_file(conn, 'tables.sql')
def create_table_triggers(conn, config, sqllib_dir):
""" Create the triggers for the tables. The trigger functions must already
have been imported with refresh.create_functions().
"""
sql = SQLPreprocessor(conn, config, sqllib_dir)
sql.run_sql_file(conn, 'table-triggers.sql')
def create_partition_tables(conn, config, sqllib_dir):
""" Create tables that have explicit partitioning.
"""
sql = SQLPreprocessor(conn, config, sqllib_dir)
sql.run_sql_file(conn, 'partition-tables.src.sql')
def truncate_data_tables(conn, max_word_frequency=None):
""" Truncate all data tables to prepare for a fresh load.
"""
@@ -258,3 +285,24 @@ def load_data(dsn, data_dir, threads):
with connect(dsn) as conn:
with conn.cursor() as cur:
cur.execute('ANALYSE')
def create_search_indices(conn, config, sqllib_dir, drop=False):
""" Create tables that have explicit partitioning.
"""
# If index creation failed and left an index invalid, they need to be
# cleaned out first, so that the script recreates them.
with conn.cursor() as cur:
cur.execute("""SELECT relname FROM pg_class, pg_index
WHERE pg_index.indisvalid = false
AND pg_index.indexrelid = pg_class.oid""")
bad_indices = [row[0] for row in list(cur)]
for idx in bad_indices:
LOG.info("Drop invalid index %s.", idx)
cur.execute('DROP INDEX "{}"'.format(idx))
conn.commit()
sql = SQLPreprocessor(conn, config, sqllib_dir)
sql.run_sql_file(conn, 'indices.sql', drop=drop)

View File

@@ -3,12 +3,12 @@ Functions for bringing auxiliary data in the database up-to-date.
"""
import json
import logging
import re
from textwrap import dedent
from psycopg2.extras import execute_values
from ..db.utils import execute_file
from ..db.sql_preprocessor import SQLPreprocessor
from ..version import NOMINATIM_VERSION
LOG = logging.getLogger()
@@ -76,100 +76,17 @@ def load_address_levels_from_file(conn, config_file):
with config_file.open('r') as fdesc:
load_address_levels(conn, 'address_levels', json.load(fdesc))
PLPGSQL_BASE_MODULES = (
'utils.sql',
'normalization.sql',
'ranking.sql',
'importance.sql',
'address_lookup.sql',
'interpolation.sql'
)
PLPGSQL_TABLE_MODULES = (
('place', 'place_triggers.sql'),
('placex', 'placex_triggers.sql'),
('location_postcode', 'postcode_triggers.sql')
)
def _get_standard_function_sql(conn, config, sql_dir, enable_diff_updates, enable_debug):
""" Read all applicable SQLs containing PL/pgSQL functions, replace
placefolders and execute them.
"""
sql_func_dir = sql_dir / 'functions'
sql = ''
# Get the basic set of functions that is always imported.
for sql_file in PLPGSQL_BASE_MODULES:
with (sql_func_dir / sql_file).open('r') as fdesc:
sql += fdesc.read()
# Some files require the presence of a certain table
for table, fname in PLPGSQL_TABLE_MODULES:
if conn.table_exists(table):
with (sql_func_dir / fname).open('r') as fdesc:
sql += fdesc.read()
# Replace placeholders.
sql = sql.replace('{modulepath}',
config.DATABASE_MODULE_PATH or str((config.project_dir / 'module').resolve()))
if enable_diff_updates:
sql = sql.replace('RETURN NEW; -- %DIFFUPDATES%', '--')
if enable_debug:
sql = sql.replace('--DEBUG:', '')
if config.get_bool('LIMIT_REINDEXING'):
sql = sql.replace('--LIMIT INDEXING:', '')
if not config.get_bool('USE_US_TIGER_DATA'):
sql = sql.replace('-- %NOTIGERDATA% ', '')
if not config.get_bool('USE_AUX_LOCATION_DATA'):
sql = sql.replace('-- %NOAUXDATA% ', '')
reverse_only = 'false' if conn.table_exists('search_name') else 'true'
return sql.replace('%REVERSE-ONLY%', reverse_only)
def replace_partition_string(sql, partitions):
""" Replace a partition template with the actual partition code.
"""
for match in re.findall('^-- start(.*?)^-- end', sql, re.M | re.S):
repl = ''
for part in partitions:
repl += match.replace('-partition-', str(part))
sql = sql.replace(match, repl)
return sql
def _get_partition_function_sql(conn, sql_dir):
""" Create functions that work on partition tables.
"""
with conn.cursor() as cur:
cur.execute('SELECT distinct partition FROM country_name')
partitions = set([0])
for row in cur:
partitions.add(row[0])
with (sql_dir / 'partition-functions.src.sql').open('r') as fdesc:
sql = fdesc.read()
return replace_partition_string(sql, sorted(partitions))
def create_functions(conn, config, sql_dir,
def create_functions(conn, config, sqllib_dir,
enable_diff_updates=True, enable_debug=False):
""" (Re)create the PL/pgSQL functions.
"""
sql = _get_standard_function_sql(conn, config, sql_dir,
enable_diff_updates, enable_debug)
sql += _get_partition_function_sql(conn, sql_dir)
sql = SQLPreprocessor(conn, config, sqllib_dir)
with conn.cursor() as cur:
cur.execute(sql)
sql.run_sql_file(conn, 'functions.sql',
disable_diff_update=not enable_diff_updates,
debug=enable_debug)
conn.commit()
WEBSITE_SCRIPTS = (