Merge pull request #2539 from lonvia/clean-up-python-tests

Restructure and extend python unit tests
This commit is contained in:
Sarah Hoffmann
2021-12-03 17:08:25 +01:00
committed by GitHub
54 changed files with 1800 additions and 821 deletions

View File

@@ -154,15 +154,13 @@ class APIReverse:
@staticmethod
def run(args):
params = dict(lat=args.lat, lon=args.lon)
params = dict(lat=args.lat, lon=args.lon, format=args.format)
if args.zoom is not None:
params['zoom'] = args.zoom
for param, _ in EXTRADATA_PARAMS:
if getattr(args, param):
params[param] = '1'
if args.format:
params['format'] = args.format
if args.lang:
params['accept-language'] = args.lang
if args.polygon_output:
@@ -195,13 +193,11 @@ class APILookup:
@staticmethod
def run(args):
params = dict(osm_ids=','.join(args.ids))
params = dict(osm_ids=','.join(args.ids), format=args.format)
for param, _ in EXTRADATA_PARAMS:
if getattr(args, param):
params[param] = '1'
if args.format:
params['format'] = args.format
if args.lang:
params['accept-language'] = args.lang
if args.polygon_output:
@@ -258,6 +254,8 @@ class APIDetails:
params['class'] = args.object_class
for name, _ in DETAILS_SWITCHES:
params[name] = '1' if getattr(args, name) else '0'
if args.lang:
params['accept-language'] = args.lang
return _run_api('details', args, params)

View File

@@ -30,7 +30,6 @@ class AbstractAnalyzer(ABC):
def close(self) -> None:
""" Free all resources used by the analyzer.
"""
pass
@abstractmethod
@@ -50,7 +49,6 @@ class AbstractAnalyzer(ABC):
found for the given words. Each list entry is a tuple of
(original word, word token, word id).
"""
pass
@abstractmethod
@@ -66,7 +64,6 @@ class AbstractAnalyzer(ABC):
Returns:
The given postcode after normalization.
"""
pass
@abstractmethod
@@ -74,7 +71,6 @@ class AbstractAnalyzer(ABC):
""" Update the tokenizer's postcode tokens from the current content
of the `location_postcode` table.
"""
pass
@abstractmethod
@@ -90,7 +86,6 @@ class AbstractAnalyzer(ABC):
When false, just add the given phrases to the
ones that already exist.
"""
pass
@abstractmethod
@@ -102,7 +97,6 @@ class AbstractAnalyzer(ABC):
refer to.
names: Dictionary of name type to name.
"""
pass
@abstractmethod
@@ -145,7 +139,6 @@ class AbstractTokenizer(ABC):
TODO: can we move the init_db parameter somewhere else?
"""
pass
@abstractmethod
@@ -158,7 +151,6 @@ class AbstractTokenizer(ABC):
Arguments:
config: Read-only object with configuration options.
"""
pass
@abstractmethod
@@ -171,7 +163,6 @@ class AbstractTokenizer(ABC):
Arguments:
config: Read-only object with configuration options.
"""
pass
@abstractmethod
@@ -186,7 +177,6 @@ class AbstractTokenizer(ABC):
Arguments:
config: Read-only object with configuration options.
"""
pass
@abstractmethod
@@ -202,7 +192,6 @@ class AbstractTokenizer(ABC):
description of the issue as well as hints for the user on
how to resolve the issue. If everything is okay, return `None`.
"""
pass
@abstractmethod
@@ -212,7 +201,6 @@ class AbstractTokenizer(ABC):
to improve performance. However, the tokenizer must not depend on
it to be called in order to work.
"""
pass
@abstractmethod
@@ -229,4 +217,3 @@ class AbstractTokenizer(ABC):
When used outside the with construct, the caller must ensure to
call the close() function before destructing the analyzer.
"""
pass

View File

@@ -513,7 +513,9 @@ class _TokenInfo:
with conn.cursor() as cur:
return cur.scalar("SELECT word_ids_from_name(%s)::text", (name, ))
self.data['street'] = self.cache.streets.get(street, _get_street)
tokens = self.cache.streets.get(street, _get_street)
if tokens:
self.data['street'] = tokens
def add_place(self, conn, place):
@@ -542,9 +544,12 @@ class _TokenInfo:
tokens = {}
for key, value in terms:
tokens[key] = self.cache.address_terms.get(value, _get_address_term)
items = self.cache.address_terms.get(value, _get_address_term)
if items[0] or items[1]:
tokens[key] = items
self.data['addr'] = tokens
if tokens:
self.data['addr'] = tokens
class _LRU:

View File

@@ -26,7 +26,7 @@ def migrate(config, paths):
if db_version_str is not None:
parts = db_version_str.split('.')
db_version = tuple([int(x) for x in parts[:2] + parts[2].split('-')])
db_version = tuple(int(x) for x in parts[:2] + parts[2].split('-'))
if db_version == NOMINATIM_VERSION:
LOG.warning("Database already at latest version (%s)", db_version_str)
@@ -96,6 +96,7 @@ def _migration(major, minor, patch=0, dbpatch=0):
"""
def decorator(func):
_MIGRATION_FUNCTIONS.append(((major, minor, patch, dbpatch), func))
return func
return decorator
@@ -195,7 +196,7 @@ def install_legacy_tokenizer(conn, config, **_):
@_migration(4, 0, 99, 0)
def create_tiger_housenumber_index(conn, _, **_):
def create_tiger_housenumber_index(conn, **_):
""" Create idx_location_property_tiger_parent_place_id with included
house number.

View File

@@ -0,0 +1,82 @@
import pytest
import nominatim.cli
class MockParamCapture:
""" Mock that records the parameters with which a function was called
as well as the number of calls.
"""
def __init__(self, retval=0):
self.called = 0
self.return_value = retval
self.last_args = None
self.last_kwargs = None
def __call__(self, *args, **kwargs):
self.called += 1
self.last_args = args
self.last_kwargs = kwargs
return self.return_value
class DummyTokenizer:
def __init__(self, *args, **kwargs):
self.update_sql_functions_called = False
self.finalize_import_called = False
self.update_statistics_called = False
def update_sql_functions(self, *args):
self.update_sql_functions_called = True
def finalize_import(self, *args):
self.finalize_import_called = True
def update_statistics(self):
self.update_statistics_called = True
@pytest.fixture
def cli_call(src_dir):
""" Call the nominatim main function with the correct paths set.
Returns a function that can be called with the desired CLI arguments.
"""
def _call_nominatim(*args):
return nominatim.cli.nominatim(module_dir='MODULE NOT AVAILABLE',
osm2pgsql_path='OSM2PGSQL NOT AVAILABLE',
phplib_dir=str(src_dir / 'lib-php'),
data_dir=str(src_dir / 'data'),
phpcgi_path='/usr/bin/php-cgi',
sqllib_dir=str(src_dir / 'lib-sql'),
config_dir=str(src_dir / 'settings'),
cli_args=args)
return _call_nominatim
@pytest.fixture
def mock_run_legacy(monkeypatch):
mock = MockParamCapture()
monkeypatch.setattr(nominatim.cli, 'run_legacy_script', mock)
return mock
@pytest.fixture
def mock_func_factory(monkeypatch):
def get_mock(module, func):
mock = MockParamCapture()
mock.func_name = func
monkeypatch.setattr(module, func, mock)
return mock
return get_mock
@pytest.fixture
def cli_tokenizer_mock(monkeypatch):
tok = DummyTokenizer()
monkeypatch.setattr(nominatim.tokenizer.factory, 'get_tokenizer_for_db',
lambda *args: tok)
monkeypatch.setattr(nominatim.tokenizer.factory, 'create_tokenizer',
lambda *args: tok)
return tok

146
test/python/cli/test_cli.py Normal file
View File

@@ -0,0 +1,146 @@
"""
Tests for command line interface wrapper.
These tests just check that the various command line parameters route to the
correct functionionality. They use a lot of monkeypatching to avoid executing
the actual functions.
"""
import pytest
import nominatim.indexer.indexer
import nominatim.tools.add_osm_data
import nominatim.tools.freeze
def test_cli_help(cli_call, capsys):
""" Running nominatim tool without arguments prints help.
"""
assert cli_call() == 1
captured = capsys.readouterr()
assert captured.out.startswith('usage:')
@pytest.mark.parametrize("name,oid", [('file', 'foo.osm'), ('diff', 'foo.osc')])
def test_cli_add_data_file_command(cli_call, mock_func_factory, name, oid):
mock_run_legacy = mock_func_factory(nominatim.tools.add_osm_data, 'add_data_from_file')
assert cli_call('add-data', '--' + name, str(oid)) == 0
assert mock_run_legacy.called == 1
@pytest.mark.parametrize("name,oid", [('node', 12), ('way', 8), ('relation', 32)])
def test_cli_add_data_object_command(cli_call, mock_func_factory, name, oid):
mock_run_legacy = mock_func_factory(nominatim.tools.add_osm_data, 'add_osm_object')
assert cli_call('add-data', '--' + name, str(oid)) == 0
assert mock_run_legacy.called == 1
def test_cli_add_data_tiger_data(cli_call, cli_tokenizer_mock, mock_func_factory):
mock = mock_func_factory(nominatim.tools.tiger_data, 'add_tiger_data')
assert cli_call('add-data', '--tiger-data', 'somewhere') == 0
assert mock.called == 1
def test_cli_serve_command(cli_call, mock_func_factory):
func = mock_func_factory(nominatim.cli, 'run_php_server')
cli_call('serve') == 0
assert func.called == 1
def test_cli_export_command(cli_call, mock_run_legacy):
assert cli_call('export', '--output-all-postcodes') == 0
assert mock_run_legacy.called == 1
assert mock_run_legacy.last_args[0] == 'export.php'
@pytest.mark.parametrize("param,value", [('output-type', 'country'),
('output-format', 'street;city'),
('language', 'xf'),
('restrict-to-country', 'us'),
('restrict-to-osm-node', '536'),
('restrict-to-osm-way', '727'),
('restrict-to-osm-relation', '197532')
])
def test_export_parameters(src_dir, tmp_path, param, value):
(tmp_path / 'admin').mkdir()
(tmp_path / 'admin' / 'export.php').write_text(f"""<?php
exit(strpos(implode(' ', $_SERVER['argv']), '--{param} {value}') >= 0 ? 0 : 10);
""")
assert nominatim.cli.nominatim(module_dir='MODULE NOT AVAILABLE',
osm2pgsql_path='OSM2PGSQL NOT AVAILABLE',
phplib_dir=str(tmp_path),
data_dir=str(src_dir / 'data'),
phpcgi_path='/usr/bin/php-cgi',
sqllib_dir=str(src_dir / 'lib-sql'),
config_dir=str(src_dir / 'settings'),
cli_args=['export', '--' + param, value]) == 0
class TestCliWithDb:
@pytest.fixture(autouse=True)
def setup_cli_call(self, cli_call, temp_db, cli_tokenizer_mock):
self.call_nominatim = cli_call
self.tokenizer_mock = cli_tokenizer_mock
def test_freeze_command(self, mock_func_factory):
mock_drop = mock_func_factory(nominatim.tools.freeze, 'drop_update_tables')
mock_flatnode = mock_func_factory(nominatim.tools.freeze, 'drop_flatnode_file')
assert self.call_nominatim('freeze') == 0
assert mock_drop.called == 1
assert mock_flatnode.called == 1
@pytest.mark.parametrize("params,do_bnds,do_ranks", [
([], 1, 1),
(['--boundaries-only'], 1, 0),
(['--no-boundaries'], 0, 1),
(['--boundaries-only', '--no-boundaries'], 0, 0)])
def test_index_command(self, mock_func_factory, table_factory,
params, do_bnds, do_ranks):
table_factory('import_status', 'indexed bool')
bnd_mock = mock_func_factory(nominatim.indexer.indexer.Indexer, 'index_boundaries')
rank_mock = mock_func_factory(nominatim.indexer.indexer.Indexer, 'index_by_rank')
assert self.call_nominatim('index', *params) == 0
assert bnd_mock.called == do_bnds
assert rank_mock.called == do_ranks
def test_special_phrases_wiki_command(self, mock_func_factory):
func = mock_func_factory(nominatim.clicmd.special_phrases.SPImporter, 'import_phrases')
self.call_nominatim('special-phrases', '--import-from-wiki', '--no-replace')
assert func.called == 1
def test_special_phrases_csv_command(self, src_dir, mock_func_factory):
func = mock_func_factory(nominatim.clicmd.special_phrases.SPImporter, 'import_phrases')
testdata = src_dir / 'test' / 'testdb'
csv_path = str((testdata / 'full_en_phrases_test.csv').resolve())
self.call_nominatim('special-phrases', '--import-from-csv', csv_path)
assert func.called == 1
def test_special_phrases_csv_bad_file(self, src_dir):
testdata = src_dir / 'something349053905.csv'
self.call_nominatim('special-phrases', '--import-from-csv',
str(testdata.resolve())) == 1

View File

@@ -0,0 +1,54 @@
"""
Test for the command line interface wrapper admin subcommand.
These tests just check that the various command line parameters route to the
correct functionionality. They use a lot of monkeypatching to avoid executing
the actual functions.
"""
import pytest
import nominatim.tools.admin
import nominatim.tools.check_database
import nominatim.tools.migration
import nominatim.clicmd.admin
@pytest.mark.parametrize("params", [('--warm', ),
('--warm', '--reverse-only'),
('--warm', '--search-only')])
def test_admin_command_legacy(cli_call, mock_func_factory, params):
mock_run_legacy = mock_func_factory(nominatim.clicmd.admin, 'run_legacy_script')
assert cli_call('admin', *params) == 0
assert mock_run_legacy.called == 1
def test_admin_command_check_database(cli_call, mock_func_factory):
mock = mock_func_factory(nominatim.tools.check_database, 'check_database')
assert cli_call('admin', '--check-database') == 0
assert mock.called == 1
def test_admin_migrate(cli_call, mock_func_factory):
mock = mock_func_factory(nominatim.tools.migration, 'migrate')
assert cli_call('admin', '--migrate') == 0
assert mock.called == 1
class TestCliAdminWithDb:
@pytest.fixture(autouse=True)
def setup_cli_call(self, cli_call, temp_db, cli_tokenizer_mock):
self.call_nominatim = cli_call
self.tokenizer_mock = cli_tokenizer_mock
@pytest.mark.parametrize("func, params", [('analyse_indexing', ('--analyse-indexing', ))])
def test_analyse_indexing(self, mock_func_factory, func, params):
mock = mock_func_factory(nominatim.tools.admin, func)
assert self.call_nominatim('admin', *params) == 0
assert mock.called == 1

View File

@@ -0,0 +1,154 @@
"""
Tests for API access commands of command-line interface wrapper.
"""
import pytest
import nominatim.clicmd.api
@pytest.mark.parametrize("endpoint", (('search', 'reverse', 'lookup', 'details', 'status')))
def test_no_api_without_phpcgi(src_dir, endpoint):
with pytest.raises(SystemExit):
nominatim.cli.nominatim(module_dir='MODULE NOT AVAILABLE',
osm2pgsql_path='OSM2PGSQL NOT AVAILABLE',
phplib_dir=str(src_dir / 'lib-php'),
data_dir=str(src_dir / 'data'),
phpcgi_path=None,
sqllib_dir=str(src_dir / 'lib-sql'),
config_dir=str(src_dir / 'settings'),
cli_args=[endpoint])
@pytest.mark.parametrize("params", [('search', '--query', 'new'),
('search', '--city', 'Berlin'),
('reverse', '--lat', '0', '--lon', '0', '--zoom', '13'),
('lookup', '--id', 'N1'),
('details', '--node', '1'),
('details', '--way', '1'),
('details', '--relation', '1'),
('details', '--place_id', '10001'),
('status',)])
class TestCliApiCall:
@pytest.fixture(autouse=True)
def setup_cli_call(self, cli_call):
self.call_nominatim = cli_call
def test_api_commands_simple(self, mock_func_factory, params, tmp_path):
(tmp_path / 'website').mkdir()
(tmp_path / 'website' / (params[0] + '.php')).write_text('')
mock_run_api = mock_func_factory(nominatim.clicmd.api, 'run_api_script')
assert self.call_nominatim(*params, '--project-dir', str(tmp_path)) == 0
assert mock_run_api.called == 1
assert mock_run_api.last_args[0] == params[0]
def test_bad_project_idr(self, mock_func_factory, params):
mock_run_api = mock_func_factory(nominatim.clicmd.api, 'run_api_script')
assert self.call_nominatim(*params) == 1
QUERY_PARAMS = {
'search': ('--query', 'somewhere'),
'reverse': ('--lat', '20', '--lon', '30'),
'lookup': ('--id', 'R345345'),
'details': ('--node', '324')
}
@pytest.mark.parametrize("endpoint", (('search', 'reverse', 'lookup')))
class TestCliApiCommonParameters:
@pytest.fixture(autouse=True)
def setup_website_dir(self, cli_call, project_env, endpoint):
self.endpoint = endpoint
self.cli_call = cli_call
self.project_dir = project_env.project_dir
(self.project_dir / 'website').mkdir()
def expect_param(self, param, expected):
(self.project_dir / 'website' / (self.endpoint + '.php')).write_text(f"""<?php
exit($_GET['{param}'] == '{expected}' ? 0 : 10);
""")
def call_nominatim(self, *params):
return self.cli_call(self.endpoint, *QUERY_PARAMS[self.endpoint],
'--project-dir', str(self.project_dir), *params)
def test_param_output(self):
self.expect_param('format', 'xml')
assert self.call_nominatim('--format', 'xml') == 0
def test_param_lang(self):
self.expect_param('accept-language', 'de')
assert self.call_nominatim('--lang', 'de') == 0
assert self.call_nominatim('--accept-language', 'de') == 0
@pytest.mark.parametrize("param", ('addressdetails', 'extratags', 'namedetails'))
def test_param_extradata(self, param):
self.expect_param(param, '1')
assert self.call_nominatim('--' + param) == 0
def test_param_polygon_output(self):
self.expect_param('polygon_geojson', '1')
assert self.call_nominatim('--polygon-output', 'geojson') == 0
def test_param_polygon_threshold(self):
self.expect_param('polygon_threshold', '0.3452')
assert self.call_nominatim('--polygon-threshold', '0.3452') == 0
def test_cli_search_param_bounded(cli_call, project_env):
webdir = project_env.project_dir / 'website'
webdir.mkdir()
(webdir / 'search.php').write_text(f"""<?php
exit($_GET['bounded'] == '1' ? 0 : 10);
""")
assert cli_call('search', *QUERY_PARAMS['search'], '--project-dir', str(project_env.project_dir),
'--bounded') == 0
def test_cli_search_param_dedupe(cli_call, project_env):
webdir = project_env.project_dir / 'website'
webdir.mkdir()
(webdir / 'search.php').write_text(f"""<?php
exit($_GET['dedupe'] == '0' ? 0 : 10);
""")
assert cli_call('search', *QUERY_PARAMS['search'], '--project-dir', str(project_env.project_dir),
'--no-dedupe') == 0
def test_cli_details_param_class(cli_call, project_env):
webdir = project_env.project_dir / 'website'
webdir.mkdir()
(webdir / 'details.php').write_text(f"""<?php
exit($_GET['class'] == 'highway' ? 0 : 10);
""")
assert cli_call('details', *QUERY_PARAMS['details'], '--project-dir', str(project_env.project_dir),
'--class', 'highway') == 0
@pytest.mark.parametrize('param', ('lang', 'accept-language'))
def test_cli_details_param_lang(cli_call, project_env, param):
webdir = project_env.project_dir / 'website'
webdir.mkdir()
(webdir / 'details.php').write_text(f"""<?php
exit($_GET['accept-language'] == 'es' ? 0 : 10);
""")
assert cli_call('details', *QUERY_PARAMS['details'], '--project-dir', str(project_env.project_dir),
'--' + param, 'es') == 0

View File

@@ -0,0 +1,122 @@
"""
Tests for import command of the command-line interface wrapper.
"""
import pytest
import nominatim.tools.database_import
import nominatim.tools.country_info
import nominatim.tools.refresh
import nominatim.tools.postcodes
import nominatim.indexer.indexer
import nominatim.db.properties
class TestCliImportWithDb:
@pytest.fixture(autouse=True)
def setup_cli_call(self, cli_call, temp_db, cli_tokenizer_mock):
self.call_nominatim = cli_call
self.tokenizer_mock = cli_tokenizer_mock
def test_import_missing_file(self):
assert self.call_nominatim('import', '--osm-file', 'sfsafegwedgw.reh.erh') == 1
def test_import_bad_file(self):
assert self.call_nominatim('import', '--osm-file', '.') == 1
@pytest.mark.parametrize('with_updates', [True, False])
def test_import_full(self, mock_func_factory, with_updates, place_table, property_table):
mocks = [
mock_func_factory(nominatim.tools.database_import, 'setup_database_skeleton'),
mock_func_factory(nominatim.tools.country_info, 'setup_country_tables'),
mock_func_factory(nominatim.tools.database_import, 'import_osm_data'),
mock_func_factory(nominatim.tools.refresh, 'import_wikipedia_articles'),
mock_func_factory(nominatim.tools.database_import, 'truncate_data_tables'),
mock_func_factory(nominatim.tools.database_import, 'load_data'),
mock_func_factory(nominatim.tools.database_import, 'create_tables'),
mock_func_factory(nominatim.tools.database_import, 'create_table_triggers'),
mock_func_factory(nominatim.tools.database_import, 'create_partition_tables'),
mock_func_factory(nominatim.tools.database_import, 'create_search_indices'),
mock_func_factory(nominatim.tools.country_info, 'create_country_names'),
mock_func_factory(nominatim.tools.refresh, 'load_address_levels_from_config'),
mock_func_factory(nominatim.tools.postcodes, 'update_postcodes'),
mock_func_factory(nominatim.indexer.indexer.Indexer, 'index_full'),
mock_func_factory(nominatim.tools.refresh, 'setup_website'),
]
params = ['import', '--osm-file', __file__]
if with_updates:
mocks.append(mock_func_factory(nominatim.tools.freeze, 'drop_update_tables'))
params.append('--no-updates')
cf_mock = mock_func_factory(nominatim.tools.refresh, 'create_functions')
assert self.call_nominatim(*params) == 0
assert self.tokenizer_mock.finalize_import_called
assert cf_mock.called > 1
for mock in mocks:
assert mock.called == 1, "Mock '{}' not called".format(mock.func_name)
def test_import_continue_load_data(self, mock_func_factory):
mocks = [
mock_func_factory(nominatim.tools.database_import, 'truncate_data_tables'),
mock_func_factory(nominatim.tools.database_import, 'load_data'),
mock_func_factory(nominatim.tools.database_import, 'create_search_indices'),
mock_func_factory(nominatim.tools.country_info, 'create_country_names'),
mock_func_factory(nominatim.tools.postcodes, 'update_postcodes'),
mock_func_factory(nominatim.indexer.indexer.Indexer, 'index_full'),
mock_func_factory(nominatim.tools.refresh, 'setup_website'),
mock_func_factory(nominatim.db.properties, 'set_property')
]
assert self.call_nominatim('import', '--continue', 'load-data') == 0
assert self.tokenizer_mock.finalize_import_called
for mock in mocks:
assert mock.called == 1, "Mock '{}' not called".format(mock.func_name)
def test_import_continue_indexing(self, mock_func_factory, placex_table,
temp_db_conn):
mocks = [
mock_func_factory(nominatim.tools.database_import, 'create_search_indices'),
mock_func_factory(nominatim.tools.country_info, 'create_country_names'),
mock_func_factory(nominatim.indexer.indexer.Indexer, 'index_full'),
mock_func_factory(nominatim.tools.refresh, 'setup_website'),
mock_func_factory(nominatim.db.properties, 'set_property')
]
assert self.call_nominatim('import', '--continue', 'indexing') == 0
for mock in mocks:
assert mock.called == 1, "Mock '{}' not called".format(mock.func_name)
assert temp_db_conn.index_exists('idx_placex_pendingsector')
# Calling it again still works for the index
assert self.call_nominatim('import', '--continue', 'indexing') == 0
assert temp_db_conn.index_exists('idx_placex_pendingsector')
def test_import_continue_postprocess(self, mock_func_factory):
mocks = [
mock_func_factory(nominatim.tools.database_import, 'create_search_indices'),
mock_func_factory(nominatim.tools.country_info, 'create_country_names'),
mock_func_factory(nominatim.tools.refresh, 'setup_website'),
mock_func_factory(nominatim.db.properties, 'set_property')
]
assert self.call_nominatim('import', '--continue', 'db-postprocess') == 0
assert self.tokenizer_mock.finalize_import_called
for mock in mocks:
assert mock.called == 1, "Mock '{}' not called".format(mock.func_name)

View File

@@ -0,0 +1,73 @@
"""
Tests for command line interface wrapper for refresk command.
"""
import pytest
import nominatim.tools.refresh
import nominatim.tools.postcodes
import nominatim.indexer.indexer
class TestRefresh:
@pytest.fixture(autouse=True)
def setup_cli_call(self, cli_call, temp_db, cli_tokenizer_mock):
self.call_nominatim = cli_call
self.tokenizer_mock = cli_tokenizer_mock
@pytest.mark.parametrize("command,func", [
('address-levels', 'load_address_levels_from_config'),
('wiki-data', 'import_wikipedia_articles'),
('importance', 'recompute_importance'),
('website', 'setup_website'),
])
def test_refresh_command(self, mock_func_factory, command, func):
func_mock = mock_func_factory(nominatim.tools.refresh, func)
assert self.call_nominatim('refresh', '--' + command) == 0
assert func_mock.called == 1
def test_refresh_word_count(self):
assert self.call_nominatim('refresh', '--word-count') == 0
assert self.tokenizer_mock.update_statistics_called
def test_refresh_postcodes(self, mock_func_factory, place_table):
func_mock = mock_func_factory(nominatim.tools.postcodes, 'update_postcodes')
idx_mock = mock_func_factory(nominatim.indexer.indexer.Indexer, 'index_postcodes')
assert self.call_nominatim('refresh', '--postcodes') == 0
assert func_mock.called == 1
assert idx_mock.called == 1
def test_refresh_postcodes_no_place_table(self):
# Do nothing without the place table
assert self.call_nominatim('refresh', '--postcodes') == 0
def test_refresh_create_functions(self, mock_func_factory):
func_mock = mock_func_factory(nominatim.tools.refresh, 'create_functions')
assert self.call_nominatim('refresh', '--functions') == 0
assert func_mock.called == 1
assert self.tokenizer_mock.update_sql_functions_called
def test_refresh_wikidata_file_not_found(self, monkeypatch):
monkeypatch.setenv('NOMINATIM_WIKIPEDIA_DATA_PATH', 'gjoiergjeroi345Q')
assert self.call_nominatim('refresh', '--wiki-data') == 1
def test_refresh_importance_computed_after_wiki_import(self, monkeypatch):
calls = []
monkeypatch.setattr(nominatim.tools.refresh, 'import_wikipedia_articles',
lambda *args, **kwargs: calls.append('import') or 0)
monkeypatch.setattr(nominatim.tools.refresh, 'recompute_importance',
lambda *args, **kwargs: calls.append('update'))
assert self.call_nominatim('refresh', '--importance', '--wiki-data') == 0
assert calls == ['import', 'update']

View File

@@ -11,8 +11,6 @@ import nominatim.indexer.indexer
import nominatim.tools.replication
from nominatim.db import status
from mocks import MockParamCapture
@pytest.fixture
def tokenizer_mock(monkeypatch):
class DummyTokenizer:
@@ -35,15 +33,6 @@ def tokenizer_mock(monkeypatch):
return tok
@pytest.fixture
def mock_func_factory(monkeypatch):
def get_mock(module, func):
mock = MockParamCapture()
monkeypatch.setattr(module, func, mock)
return mock
return get_mock
@pytest.fixture
def init_status(temp_db_conn, status_table):
@@ -51,11 +40,8 @@ def init_status(temp_db_conn, status_table):
@pytest.fixture
def index_mock(monkeypatch, tokenizer_mock, init_status):
mock = MockParamCapture()
monkeypatch.setattr(nominatim.indexer.indexer.Indexer, 'index_full', mock)
return mock
def index_mock(mock_func_factory, tokenizer_mock, init_status):
return mock_func_factory(nominatim.indexer.indexer.Indexer, 'index_full')
@pytest.fixture
@@ -69,15 +55,31 @@ class TestCliReplication:
def setup_cli_call(self, cli_call, temp_db):
self.call_nominatim = lambda *args: cli_call('replication', *args)
@pytest.fixture(autouse=True)
def setup_update_function(self, monkeypatch):
def _mock_updates(states):
monkeypatch.setattr(nominatim.tools.replication, 'update',
lambda *args, **kwargs: states.pop())
self.update_states = _mock_updates
@pytest.mark.parametrize("params,func", [
(('--init',), 'init_replication'),
(('--init', '--no-update-functions'), 'init_replication'),
(('--check-for-updates',), 'check_for_updates')
])
def test_replication_command(self, mock_func_factory, params, func):
func_mock = mock_func_factory(nominatim.tools.replication, func)
if params == ('--init',):
umock = mock_func_factory(nominatim.tools.refresh, 'create_functions')
assert self.call_nominatim(*params) == 0
assert func_mock.called == 1
if params == ('--init',):
assert umock.called == 1
def test_replication_update_bad_interval(self, monkeypatch):
@@ -93,6 +95,9 @@ class TestCliReplication:
assert self.call_nominatim() == 1
def test_replication_update_continuous_no_index(self):
assert self.call_nominatim('--no-index') == 1
def test_replication_update_once_no_index(self, update_mock):
assert self.call_nominatim('--once', '--no-index') == 0
@@ -107,11 +112,9 @@ class TestCliReplication:
@pytest.mark.parametrize("update_interval", [60, 3600])
def test_replication_catchup(self, monkeypatch, index_mock, update_interval, placex_table):
def test_replication_catchup(self, placex_table, monkeypatch, index_mock, update_interval):
monkeypatch.setenv('NOMINATIM_REPLICATION_UPDATE_INTERVAL', str(update_interval))
states = [nominatim.tools.replication.UpdateState.NO_CHANGES]
monkeypatch.setattr(nominatim.tools.replication, 'update',
lambda *args, **kwargs: states.pop())
self.update_states([nominatim.tools.replication.UpdateState.NO_CHANGES])
assert self.call_nominatim('--catch-up') == 0
@@ -122,11 +125,9 @@ class TestCliReplication:
assert update_mock.last_args[1]['threads'] == 4
def test_replication_update_continuous(self, monkeypatch, index_mock):
states = [nominatim.tools.replication.UpdateState.UP_TO_DATE,
nominatim.tools.replication.UpdateState.UP_TO_DATE]
monkeypatch.setattr(nominatim.tools.replication, 'update',
lambda *args, **kwargs: states.pop())
def test_replication_update_continuous(self, index_mock):
self.update_states([nominatim.tools.replication.UpdateState.UP_TO_DATE,
nominatim.tools.replication.UpdateState.UP_TO_DATE])
with pytest.raises(IndexError):
self.call_nominatim()
@@ -134,14 +135,12 @@ class TestCliReplication:
assert index_mock.called == 2
def test_replication_update_continuous_no_change(self, monkeypatch, index_mock):
states = [nominatim.tools.replication.UpdateState.NO_CHANGES,
nominatim.tools.replication.UpdateState.UP_TO_DATE]
monkeypatch.setattr(nominatim.tools.replication, 'update',
lambda *args, **kwargs: states.pop())
def test_replication_update_continuous_no_change(self, mock_func_factory,
index_mock):
self.update_states([nominatim.tools.replication.UpdateState.NO_CHANGES,
nominatim.tools.replication.UpdateState.UP_TO_DATE])
sleep_mock = MockParamCapture()
monkeypatch.setattr(time, 'sleep', sleep_mock)
sleep_mock = mock_func_factory(time, 'sleep')
with pytest.raises(IndexError):
self.call_nominatim()

View File

@@ -4,7 +4,7 @@ Test for loading dotenv configuration.
from pathlib import Path
import pytest
from nominatim.config import Configuration
from nominatim.config import Configuration, flatten_config_list
from nominatim.errors import UsageError
@pytest.fixture
@@ -306,11 +306,29 @@ def test_load_subconf_env_relative_not_found(make_config_path, monkeypatch):
rules = config.load_sub_configuration('test.yaml', config='MY_CONFIG')
def test_load_subconf_json(make_config_path):
config = make_config_path()
(config.project_dir / 'test.json').write_text('{"cow": "muh", "cat": "miau"}')
rules = config.load_sub_configuration('test.json')
assert rules == dict(cow='muh', cat='miau')
def test_load_subconf_not_found(make_config_path):
config = make_config_path()
with pytest.raises(UsageError, match='Config file not found.'):
rules = config.load_sub_configuration('test.yaml')
config.load_sub_configuration('test.yaml')
def test_load_subconf_env_unknown_format(make_config_path):
config = make_config_path()
(config.project_dir / 'test.xml').write_text('<html></html>')
with pytest.raises(UsageError, match='unknown format'):
config.load_sub_configuration('test.xml')
def test_load_subconf_include_absolute(make_config_path, tmp_path):
@@ -370,3 +388,30 @@ def test_load_subconf_include_recursive(make_config_path):
rules = config.load_sub_configuration('test.yaml')
assert rules == dict(base=[['the end'], 'upper'])
@pytest.mark.parametrize("content", [[], None])
def test_flatten_config_list_empty(content):
assert flatten_config_list(content) == []
@pytest.mark.parametrize("content", [{'foo': 'bar'}, 'hello world', 3])
def test_flatten_config_list_no_list(content):
with pytest.raises(UsageError):
flatten_config_list(content)
def test_flatten_config_list_allready_flat():
assert flatten_config_list([1, 2, 456]) == [1, 2, 456]
def test_flatten_config_list_nested():
content = [
34,
[{'first': '1st', 'second': '2nd'}, {}],
[[2, 3], [45, [56, 78], 66]],
'end'
]
assert flatten_config_list(content) == \
[34, {'first': '1st', 'second': '2nd'}, {},
2, 3, 45, 56, 78, 66, 'end']

View File

@@ -5,22 +5,25 @@ from pathlib import Path
import psycopg2
import pytest
SRC_DIR = (Path(__file__) / '..' / '..' / '..').resolve()
# always test against the source
sys.path.insert(0, str(SRC_DIR.resolve()))
SRC_DIR = (Path(__file__) / '..' / '..' / '..').resolve()
sys.path.insert(0, str(SRC_DIR))
from nominatim.config import Configuration
from nominatim.db import connection
from nominatim.db.sql_preprocessor import SQLPreprocessor
import nominatim.tokenizer.factory
import nominatim.cli
import dummy_tokenizer
import mocks
from cursor import CursorForTesting
@pytest.fixture
def src_dir():
return SRC_DIR
@pytest.fixture
def temp_db(monkeypatch):
""" Create an empty database for the test. The database name is also
@@ -98,33 +101,25 @@ def table_factory(temp_db_cursor):
@pytest.fixture
def def_config():
cfg = Configuration(None, SRC_DIR.resolve() / 'settings')
def def_config(src_dir):
cfg = Configuration(None, src_dir / 'settings')
cfg.set_libdirs(module='.', osm2pgsql='.',
php=SRC_DIR / 'lib-php',
sql=SRC_DIR / 'lib-sql',
data=SRC_DIR / 'data')
php=src_dir / 'lib-php',
sql=src_dir / 'lib-sql',
data=src_dir / 'data')
return cfg
@pytest.fixture
def src_dir():
return SRC_DIR.resolve()
@pytest.fixture
def cli_call():
def _call_nominatim(*args):
return nominatim.cli.nominatim(module_dir='MODULE NOT AVAILABLE',
osm2pgsql_path='OSM2PGSQL NOT AVAILABLE',
phplib_dir=str(SRC_DIR / 'lib-php'),
data_dir=str(SRC_DIR / 'data'),
phpcgi_path='/usr/bin/php-cgi',
sqllib_dir=str(SRC_DIR / 'lib-sql'),
config_dir=str(SRC_DIR / 'settings'),
cli_args=args)
return _call_nominatim
def project_env(src_dir, tmp_path):
projdir = tmp_path / 'project'
projdir.mkdir()
cfg = Configuration(projdir, src_dir / 'settings')
cfg.set_libdirs(module='.', osm2pgsql='.',
php=src_dir / 'lib-php',
sql=src_dir / 'lib-sql',
data=src_dir / 'data')
return cfg
@pytest.fixture
@@ -215,18 +210,6 @@ def word_table(temp_db_conn):
return mocks.MockWordTable(temp_db_conn)
@pytest.fixture
def osm2pgsql_options(temp_db):
return dict(osm2pgsql='echo',
osm2pgsql_cache=10,
osm2pgsql_style='style.file',
threads=1,
dsn='dbname=' + temp_db,
flatnode_file='',
tablespaces=dict(slim_data='', slim_index='',
main_data='', main_index=''))
@pytest.fixture
def sql_preprocessor_cfg(tmp_path, table_factory, temp_db_with_extensions):
table_factory('country_name', 'partition INT', ((0, ), (1, ), (2, )))

View File

@@ -37,6 +37,15 @@ class CursorForTesting(psycopg2.extras.DictCursor):
return num == 1
def index_exists(self, table, index):
""" Check that an indexwith the given name exists on the given table.
"""
num = self.scalar("""SELECT count(*) FROM pg_indexes
WHERE tablename = %s and indexname = %s""",
(table, index))
return num == 1
def table_rows(self, table, where=None):
""" Return the number of rows in the given table.
"""

View File

@@ -145,9 +145,8 @@ def test_db(temp_db_conn):
@pytest.fixture
def test_tokenizer(tokenizer_mock, def_config, tmp_path):
def_config.project_dir = tmp_path
return factory.create_tokenizer(def_config)
def test_tokenizer(tokenizer_mock, project_env):
return factory.create_tokenizer(project_env)
@pytest.mark.parametrize("threads", [1, 15])

View File

@@ -17,6 +17,14 @@ class MockIcuWordTable:
conn.commit()
def add_full_word(self, word_id, word, word_token=None):
with self.conn.cursor() as cur:
cur.execute("""INSERT INTO word (word_id, word_token, type, word, info)
VALUES(%s, %s, 'W', %s, '{}'::jsonb)""",
(word_id, word or word_token, word))
self.conn.commit()
def add_special(self, word_token, word, cls, typ, oper):
with self.conn.cursor() as cur:
cur.execute("""INSERT INTO word (word_token, type, word, info)

View File

@@ -20,6 +20,14 @@ class MockLegacyWordTable:
conn.commit()
def add_full_word(self, word_id, word, word_token=None):
with self.conn.cursor() as cur:
cur.execute("""INSERT INTO word (word_id, word_token, word)
VALUES (%s, %s, %s)
""", (word_id, ' ' + (word_token or word), word))
self.conn.commit()
def add_special(self, word_token, word, cls, typ, oper):
with self.conn.cursor() as cur:
cur.execute("""INSERT INTO word (word_token, word, class, type, operator)

View File

@@ -10,23 +10,6 @@ from nominatim.db import properties
# This must always point to the mock word table for the default tokenizer.
from mock_legacy_word_table import MockLegacyWordTable as MockWordTable
class MockParamCapture:
""" Mock that records the parameters with which a function was called
as well as the number of calls.
"""
def __init__(self, retval=0):
self.called = 0
self.return_value = retval
self.last_args = None
self.last_kwargs = None
def __call__(self, *args, **kwargs):
self.called += 1
self.last_args = args
self.last_kwargs = kwargs
return self.return_value
class MockPlacexTable:
""" A placex table for testing.
"""
@@ -64,15 +47,16 @@ class MockPlacexTable:
def add(self, osm_type='N', osm_id=None, cls='amenity', typ='cafe', names=None,
admin_level=None, address=None, extratags=None, geom='POINT(10 4)',
country=None):
country=None, housenumber=None):
with self.conn.cursor() as cur:
psycopg2.extras.register_hstore(cur)
cur.execute("""INSERT INTO placex (place_id, osm_type, osm_id, class,
type, name, admin_level, address,
housenumber,
extratags, geometry, country_code)
VALUES(nextval('seq_place'), %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""",
VALUES(nextval('seq_place'), %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""",
(osm_type, osm_id or next(self.idseq), cls, typ, names,
admin_level, address, extratags, 'SRID=4326;' + geom,
admin_level, address, housenumber, extratags, 'SRID=4326;' + geom,
country))
self.conn.commit()
@@ -88,3 +72,9 @@ class MockPropertyTable:
""" Set a property in the table to the given value.
"""
properties.set_property(self.conn, name, value)
def get(self, name):
""" Set a property in the table to the given value.
"""
return properties.get_property(self.conn, name)

Binary file not shown.

View File

@@ -1,379 +0,0 @@
"""
Tests for command line interface wrapper.
These tests just check that the various command line parameters route to the
correct functionionality. They use a lot of monkeypatching to avoid executing
the actual functions.
"""
import pytest
import nominatim.db.properties
import nominatim.cli
import nominatim.clicmd.api
import nominatim.clicmd.refresh
import nominatim.clicmd.admin
import nominatim.clicmd.setup
import nominatim.indexer.indexer
import nominatim.tools.admin
import nominatim.tools.add_osm_data
import nominatim.tools.check_database
import nominatim.tools.database_import
import nominatim.tools.country_info
import nominatim.tools.freeze
import nominatim.tools.refresh
import nominatim.tools.postcodes
import nominatim.tokenizer.factory
from mocks import MockParamCapture
@pytest.fixture
def mock_run_legacy(monkeypatch):
mock = MockParamCapture()
monkeypatch.setattr(nominatim.cli, 'run_legacy_script', mock)
return mock
@pytest.fixture
def mock_func_factory(monkeypatch):
def get_mock(module, func):
mock = MockParamCapture()
mock.func_name = func
monkeypatch.setattr(module, func, mock)
return mock
return get_mock
class TestCli:
@pytest.fixture(autouse=True)
def setup_cli_call(self, cli_call):
self.call_nominatim = cli_call
def test_cli_help(self, capsys):
""" Running nominatim tool without arguments prints help.
"""
assert self.call_nominatim() == 1
captured = capsys.readouterr()
assert captured.out.startswith('usage:')
@pytest.mark.parametrize("command,script", [
(('export',), 'export')
])
def test_legacy_commands_simple(self, mock_run_legacy, command, script):
assert self.call_nominatim(*command) == 0
assert mock_run_legacy.called == 1
assert mock_run_legacy.last_args[0] == script + '.php'
@pytest.mark.parametrize("params", [('--warm', ),
('--warm', '--reverse-only'),
('--warm', '--search-only')])
def test_admin_command_legacy(self, mock_func_factory, params):
mock_run_legacy = mock_func_factory(nominatim.clicmd.admin, 'run_legacy_script')
assert self.call_nominatim('admin', *params) == 0
assert mock_run_legacy.called == 1
def test_admin_command_check_database(self, mock_func_factory):
mock = mock_func_factory(nominatim.tools.check_database, 'check_database')
assert self.call_nominatim('admin', '--check-database') == 0
assert mock.called == 1
@pytest.mark.parametrize("name,oid", [('file', 'foo.osm'), ('diff', 'foo.osc')])
def test_add_data_file_command(self, mock_func_factory, name, oid):
mock_run_legacy = mock_func_factory(nominatim.tools.add_osm_data, 'add_data_from_file')
assert self.call_nominatim('add-data', '--' + name, str(oid)) == 0
assert mock_run_legacy.called == 1
@pytest.mark.parametrize("name,oid", [('node', 12), ('way', 8), ('relation', 32)])
def test_add_data_object_command(self, mock_func_factory, name, oid):
mock_run_legacy = mock_func_factory(nominatim.tools.add_osm_data, 'add_osm_object')
assert self.call_nominatim('add-data', '--' + name, str(oid)) == 0
assert mock_run_legacy.called == 1
def test_serve_command(self, mock_func_factory):
func = mock_func_factory(nominatim.cli, 'run_php_server')
self.call_nominatim('serve')
assert func.called == 1
@pytest.mark.parametrize("params", [('search', '--query', 'new'),
('reverse', '--lat', '0', '--lon', '0'),
('lookup', '--id', 'N1'),
('details', '--node', '1'),
('details', '--way', '1'),
('details', '--relation', '1'),
('details', '--place_id', '10001'),
('status',)])
class TestCliApiCall:
@pytest.fixture(autouse=True)
def setup_cli_call(self, cli_call):
self.call_nominatim = cli_call
def test_api_commands_simple(self, mock_func_factory, params, tmp_path):
(tmp_path / 'website').mkdir()
(tmp_path / 'website' / (params[0] + '.php')).write_text('')
mock_run_api = mock_func_factory(nominatim.clicmd.api, 'run_api_script')
assert self.call_nominatim(*params, '--project-dir', str(tmp_path)) == 0
assert mock_run_api.called == 1
assert mock_run_api.last_args[0] == params[0]
def test_bad_project_idr(self, mock_func_factory, params):
mock_run_api = mock_func_factory(nominatim.clicmd.api, 'run_api_script')
assert self.call_nominatim(*params) == 1
class TestCliWithDb:
@pytest.fixture(autouse=True)
def setup_cli_call(self, cli_call, temp_db):
self.call_nominatim = cli_call
@pytest.fixture(autouse=True)
def setup_tokenizer_mock(self, monkeypatch):
class DummyTokenizer:
def __init__(self, *args, **kwargs):
self.update_sql_functions_called = False
self.finalize_import_called = False
self.update_statistics_called = False
def update_sql_functions(self, *args):
self.update_sql_functions_called = True
def finalize_import(self, *args):
self.finalize_import_called = True
def update_statistics(self):
self.update_statistics_called = True
tok = DummyTokenizer()
monkeypatch.setattr(nominatim.tokenizer.factory, 'get_tokenizer_for_db',
lambda *args: tok)
monkeypatch.setattr(nominatim.tokenizer.factory, 'create_tokenizer',
lambda *args: tok)
self.tokenizer_mock = tok
def test_import_missing_file(self):
assert self.call_nominatim('import', '--osm-file', 'sfsafegwedgw.reh.erh') == 1
def test_import_bad_file(self):
assert self.call_nominatim('import', '--osm-file', '.') == 1
def test_import_full(self, mock_func_factory):
mocks = [
mock_func_factory(nominatim.tools.database_import, 'setup_database_skeleton'),
mock_func_factory(nominatim.tools.country_info, 'setup_country_tables'),
mock_func_factory(nominatim.tools.database_import, 'import_osm_data'),
mock_func_factory(nominatim.tools.refresh, 'import_wikipedia_articles'),
mock_func_factory(nominatim.tools.database_import, 'truncate_data_tables'),
mock_func_factory(nominatim.tools.database_import, 'load_data'),
mock_func_factory(nominatim.tools.database_import, 'create_tables'),
mock_func_factory(nominatim.tools.database_import, 'create_table_triggers'),
mock_func_factory(nominatim.tools.database_import, 'create_partition_tables'),
mock_func_factory(nominatim.tools.database_import, 'create_search_indices'),
mock_func_factory(nominatim.tools.country_info, 'create_country_names'),
mock_func_factory(nominatim.tools.refresh, 'load_address_levels_from_config'),
mock_func_factory(nominatim.tools.postcodes, 'update_postcodes'),
mock_func_factory(nominatim.indexer.indexer.Indexer, 'index_full'),
mock_func_factory(nominatim.tools.refresh, 'setup_website'),
mock_func_factory(nominatim.db.properties, 'set_property')
]
cf_mock = mock_func_factory(nominatim.tools.refresh, 'create_functions')
assert self.call_nominatim('import', '--osm-file', __file__) == 0
assert self.tokenizer_mock.finalize_import_called
assert cf_mock.called > 1
for mock in mocks:
assert mock.called == 1, "Mock '{}' not called".format(mock.func_name)
def test_import_continue_load_data(self, mock_func_factory):
mocks = [
mock_func_factory(nominatim.tools.database_import, 'truncate_data_tables'),
mock_func_factory(nominatim.tools.database_import, 'load_data'),
mock_func_factory(nominatim.tools.database_import, 'create_search_indices'),
mock_func_factory(nominatim.tools.country_info, 'create_country_names'),
mock_func_factory(nominatim.tools.postcodes, 'update_postcodes'),
mock_func_factory(nominatim.indexer.indexer.Indexer, 'index_full'),
mock_func_factory(nominatim.tools.refresh, 'setup_website'),
mock_func_factory(nominatim.db.properties, 'set_property')
]
assert self.call_nominatim('import', '--continue', 'load-data') == 0
assert self.tokenizer_mock.finalize_import_called
for mock in mocks:
assert mock.called == 1, "Mock '{}' not called".format(mock.func_name)
def test_import_continue_indexing(self, mock_func_factory, placex_table,
temp_db_conn):
mocks = [
mock_func_factory(nominatim.tools.database_import, 'create_search_indices'),
mock_func_factory(nominatim.tools.country_info, 'create_country_names'),
mock_func_factory(nominatim.indexer.indexer.Indexer, 'index_full'),
mock_func_factory(nominatim.tools.refresh, 'setup_website'),
mock_func_factory(nominatim.db.properties, 'set_property')
]
assert self.call_nominatim('import', '--continue', 'indexing') == 0
for mock in mocks:
assert mock.called == 1, "Mock '{}' not called".format(mock.func_name)
assert temp_db_conn.index_exists('idx_placex_pendingsector')
# Calling it again still works for the index
assert self.call_nominatim('import', '--continue', 'indexing') == 0
assert temp_db_conn.index_exists('idx_placex_pendingsector')
def test_import_continue_postprocess(self, mock_func_factory):
mocks = [
mock_func_factory(nominatim.tools.database_import, 'create_search_indices'),
mock_func_factory(nominatim.tools.country_info, 'create_country_names'),
mock_func_factory(nominatim.tools.refresh, 'setup_website'),
mock_func_factory(nominatim.db.properties, 'set_property')
]
assert self.call_nominatim('import', '--continue', 'db-postprocess') == 0
assert self.tokenizer_mock.finalize_import_called
for mock in mocks:
assert mock.called == 1, "Mock '{}' not called".format(mock.func_name)
def test_freeze_command(self, mock_func_factory):
mock_drop = mock_func_factory(nominatim.tools.freeze, 'drop_update_tables')
mock_flatnode = mock_func_factory(nominatim.tools.freeze, 'drop_flatnode_file')
assert self.call_nominatim('freeze') == 0
assert mock_drop.called == 1
assert mock_flatnode.called == 1
@pytest.mark.parametrize("func, params", [('analyse_indexing', ('--analyse-indexing', ))])
def test_admin_command_tool(self, mock_func_factory, func, params):
mock = mock_func_factory(nominatim.tools.admin, func)
assert self.call_nominatim('admin', *params) == 0
assert mock.called == 1
@pytest.mark.parametrize("params,do_bnds,do_ranks", [
([], 1, 1),
(['--boundaries-only'], 1, 0),
(['--no-boundaries'], 0, 1),
(['--boundaries-only', '--no-boundaries'], 0, 0)])
def test_index_command(self, mock_func_factory, table_factory,
params, do_bnds, do_ranks):
table_factory('import_status', 'indexed bool')
bnd_mock = mock_func_factory(nominatim.indexer.indexer.Indexer, 'index_boundaries')
rank_mock = mock_func_factory(nominatim.indexer.indexer.Indexer, 'index_by_rank')
assert self.call_nominatim('index', *params) == 0
assert bnd_mock.called == do_bnds
assert rank_mock.called == do_ranks
@pytest.mark.parametrize("no_replace", [(True), (False)])
def test_special_phrases_wiki_command(self, mock_func_factory, no_replace):
func = mock_func_factory(nominatim.clicmd.special_phrases.SPImporter, 'import_phrases')
if no_replace:
self.call_nominatim('special-phrases', '--import-from-wiki', '--no-replace')
else:
self.call_nominatim('special-phrases', '--import-from-wiki')
assert func.called == 1
@pytest.mark.parametrize("no_replace", [(True), (False)])
def test_special_phrases_csv_command(self, src_dir, mock_func_factory, no_replace):
func = mock_func_factory(nominatim.clicmd.special_phrases.SPImporter, 'import_phrases')
testdata = src_dir / 'test' / 'testdb'
csv_path = str((testdata / 'full_en_phrases_test.csv').resolve())
if no_replace:
self.call_nominatim('special-phrases', '--import-from-csv', csv_path, '--no-replace')
else:
self.call_nominatim('special-phrases', '--import-from-csv', csv_path)
assert func.called == 1
@pytest.mark.parametrize("command,func", [
('address-levels', 'load_address_levels_from_config'),
('wiki-data', 'import_wikipedia_articles'),
('importance', 'recompute_importance'),
('website', 'setup_website'),
])
def test_refresh_command(self, mock_func_factory, command, func):
func_mock = mock_func_factory(nominatim.tools.refresh, func)
assert self.call_nominatim('refresh', '--' + command) == 0
assert func_mock.called == 1
def test_refresh_word_count(self):
assert self.call_nominatim('refresh', '--word-count') == 0
assert self.tokenizer_mock.update_statistics_called
def test_refresh_postcodes(self, mock_func_factory, place_table):
func_mock = mock_func_factory(nominatim.tools.postcodes, 'update_postcodes')
idx_mock = mock_func_factory(nominatim.indexer.indexer.Indexer, 'index_postcodes')
assert self.call_nominatim('refresh', '--postcodes') == 0
assert func_mock.called == 1
assert idx_mock.called == 1
def test_refresh_create_functions(self, mock_func_factory):
func_mock = mock_func_factory(nominatim.tools.refresh, 'create_functions')
assert self.call_nominatim('refresh', '--functions') == 0
assert func_mock.called == 1
assert self.tokenizer_mock.update_sql_functions_called
def test_refresh_importance_computed_after_wiki_import(self, monkeypatch):
calls = []
monkeypatch.setattr(nominatim.tools.refresh, 'import_wikipedia_articles',
lambda *args, **kwargs: calls.append('import') or 0)
monkeypatch.setattr(nominatim.tools.refresh, 'recompute_importance',
lambda *args, **kwargs: calls.append('update'))
assert self.call_nominatim('refresh', '--importance', '--wiki-data') == 0
assert calls == ['import', 'update']

View File

@@ -1,75 +0,0 @@
"""
Tests for creating new tokenizers.
"""
import pytest
from nominatim.db import properties
from nominatim.tokenizer import factory
from nominatim.errors import UsageError
from dummy_tokenizer import DummyTokenizer
@pytest.fixture
def test_config(def_config, tmp_path, property_table, tokenizer_mock):
def_config.project_dir = tmp_path
return def_config
def test_setup_dummy_tokenizer(temp_db_conn, test_config):
tokenizer = factory.create_tokenizer(test_config)
assert isinstance(tokenizer, DummyTokenizer)
assert tokenizer.init_state == "new"
assert (test_config.project_dir / 'tokenizer').is_dir()
assert properties.get_property(temp_db_conn, 'tokenizer') == 'dummy'
def test_setup_tokenizer_dir_exists(test_config):
(test_config.project_dir / 'tokenizer').mkdir()
tokenizer = factory.create_tokenizer(test_config)
assert isinstance(tokenizer, DummyTokenizer)
assert tokenizer.init_state == "new"
def test_setup_tokenizer_dir_failure(test_config):
(test_config.project_dir / 'tokenizer').write_text("foo")
with pytest.raises(UsageError):
factory.create_tokenizer(test_config)
def test_setup_bad_tokenizer_name(def_config, tmp_path, monkeypatch):
def_config.project_dir = tmp_path
monkeypatch.setenv('NOMINATIM_TOKENIZER', 'dummy')
with pytest.raises(UsageError):
factory.create_tokenizer(def_config)
def test_load_tokenizer(test_config):
factory.create_tokenizer(test_config)
tokenizer = factory.get_tokenizer_for_db(test_config)
assert isinstance(tokenizer, DummyTokenizer)
assert tokenizer.init_state == "loaded"
def test_load_no_tokenizer_dir(test_config):
factory.create_tokenizer(test_config)
test_config.project_dir = test_config.project_dir / 'foo'
with pytest.raises(UsageError):
factory.get_tokenizer_for_db(test_config)
def test_load_missing_propoerty(temp_db_cursor, test_config):
factory.create_tokenizer(test_config)
temp_db_cursor.execute("TRUNCATE TABLE nominatim_properties")
with pytest.raises(UsageError):
factory.get_tokenizer_for_db(test_config)

View File

@@ -1,51 +0,0 @@
"""
Tests for creating PL/pgSQL functions for Nominatim.
"""
import pytest
from nominatim.tools.refresh import create_functions
@pytest.fixture
def sql_tmp_path(tmp_path, def_config):
def_config.lib_dir.sql = tmp_path
return tmp_path
@pytest.fixture
def conn(sql_preprocessor, temp_db_conn):
return temp_db_conn
def test_create_functions(temp_db_cursor, conn, def_config, sql_tmp_path):
sqlfile = sql_tmp_path / 'functions.sql'
sqlfile.write_text("""CREATE OR REPLACE FUNCTION test() RETURNS INTEGER
AS $$
BEGIN
RETURN 43;
END;
$$ LANGUAGE plpgsql IMMUTABLE;
""")
create_functions(conn, def_config)
assert temp_db_cursor.scalar('SELECT test()') == 43
@pytest.mark.parametrize("dbg,ret", ((True, 43), (False, 22)))
def test_create_functions_with_template(temp_db_cursor, conn, def_config, sql_tmp_path,
dbg, ret):
sqlfile = sql_tmp_path / 'functions.sql'
sqlfile.write_text("""CREATE OR REPLACE FUNCTION test() RETURNS INTEGER
AS $$
BEGIN
{% if debug %}
RETURN 43;
{% else %}
RETURN 22;
{% endif %}
END;
$$ LANGUAGE plpgsql IMMUTABLE;
""")
create_functions(conn, def_config, enable_debug=dbg)
assert temp_db_cursor.scalar('SELECT test()') == ret

View File

@@ -0,0 +1,75 @@
"""
Tests for creating new tokenizers.
"""
import pytest
from nominatim.db import properties
from nominatim.tokenizer import factory
from nominatim.errors import UsageError
from dummy_tokenizer import DummyTokenizer
def test_setup_bad_tokenizer_name(project_env, monkeypatch):
monkeypatch.setenv('NOMINATIM_TOKENIZER', 'dummy')
with pytest.raises(UsageError):
factory.create_tokenizer(project_env)
class TestFactory:
@pytest.fixture(autouse=True)
def init_env(self, project_env, property_table, tokenizer_mock):
self.config = project_env
def test_setup_dummy_tokenizer(self, temp_db_conn):
tokenizer = factory.create_tokenizer(self.config)
assert isinstance(tokenizer, DummyTokenizer)
assert tokenizer.init_state == "new"
assert (self.config.project_dir / 'tokenizer').is_dir()
assert properties.get_property(temp_db_conn, 'tokenizer') == 'dummy'
def test_setup_tokenizer_dir_exists(self):
(self.config.project_dir / 'tokenizer').mkdir()
tokenizer = factory.create_tokenizer(self.config)
assert isinstance(tokenizer, DummyTokenizer)
assert tokenizer.init_state == "new"
def test_setup_tokenizer_dir_failure(self):
(self.config.project_dir / 'tokenizer').write_text("foo")
with pytest.raises(UsageError):
factory.create_tokenizer(self.config)
def test_load_tokenizer(self):
factory.create_tokenizer(self.config)
tokenizer = factory.get_tokenizer_for_db(self.config)
assert isinstance(tokenizer, DummyTokenizer)
assert tokenizer.init_state == "loaded"
def test_load_no_tokenizer_dir(self):
factory.create_tokenizer(self.config)
self.config.project_dir = self.config.project_dir / 'foo'
with pytest.raises(UsageError):
factory.get_tokenizer_for_db(self.config)
def test_load_missing_property(self, temp_db_cursor):
factory.create_tokenizer(self.config)
temp_db_cursor.execute("TRUNCATE TABLE nominatim_properties")
with pytest.raises(UsageError):
factory.get_tokenizer_for_db(self.config)

View File

@@ -1,5 +1,5 @@
"""
Tests for Legacy ICU tokenizer.
Tests for ICU tokenizer.
"""
import shutil
import yaml
@@ -20,20 +20,17 @@ def word_table(temp_db_conn):
@pytest.fixture
def test_config(def_config, tmp_path):
def_config.project_dir = tmp_path / 'project'
def_config.project_dir.mkdir()
def test_config(project_env, tmp_path):
sqldir = tmp_path / 'sql'
sqldir.mkdir()
(sqldir / 'tokenizer').mkdir()
(sqldir / 'tokenizer' / 'icu_tokenizer.sql').write_text("SELECT 'a'")
shutil.copy(str(def_config.lib_dir.sql / 'tokenizer' / 'icu_tokenizer_tables.sql'),
shutil.copy(str(project_env.lib_dir.sql / 'tokenizer' / 'icu_tokenizer_tables.sql'),
str(sqldir / 'tokenizer' / 'icu_tokenizer_tables.sql'))
def_config.lib_dir.sql = sqldir
project_env.lib_dir.sql = sqldir
return def_config
return project_env
@pytest.fixture
@@ -144,12 +141,6 @@ LANGUAGE plpgsql;
""")
@pytest.fixture
def getorcreate_hnr_id(temp_db_cursor):
temp_db_cursor.execute("""CREATE OR REPLACE FUNCTION getorcreate_hnr_id(lookup_term TEXT)
RETURNS INTEGER AS $$
SELECT -nextval('seq_word')::INTEGER; $$ LANGUAGE SQL""")
def test_init_new(tokenizer_factory, test_config, db_prop):
tok = tokenizer_factory()
@@ -197,6 +188,47 @@ def test_update_sql_functions(db_prop, temp_db_cursor,
assert test_content == set((('1133', ), ))
def test_finalize_import(tokenizer_factory, temp_db_conn,
temp_db_cursor, test_config, sql_preprocessor_cfg):
func_file = test_config.lib_dir.sql / 'tokenizer' / 'legacy_tokenizer_indices.sql'
func_file.write_text("""CREATE FUNCTION test() RETURNS TEXT
AS $$ SELECT 'b'::text $$ LANGUAGE SQL""")
tok = tokenizer_factory()
tok.init_new_db(test_config)
tok.finalize_import(test_config)
temp_db_cursor.scalar('SELECT test()') == 'b'
def test_check_database(test_config, tokenizer_factory,
temp_db_cursor, sql_preprocessor_cfg):
tok = tokenizer_factory()
tok.init_new_db(test_config)
assert tok.check_database(test_config) is None
def test_update_statistics_reverse_only(word_table, tokenizer_factory):
tok = tokenizer_factory()
tok.update_statistics()
def test_update_statistics(word_table, table_factory, temp_db_cursor, tokenizer_factory):
word_table.add_full_word(1000, 'hello')
table_factory('search_name',
'place_id BIGINT, name_vector INT[]',
[(12, [1000])])
tok = tokenizer_factory()
tok.update_statistics()
assert temp_db_cursor.scalar("""SELECT count(*) FROM word
WHERE type = 'W' and
(info->>'count')::int > 0""") > 0
def test_normalize_postcode(analyzer):
with analyzer() as anl:
anl.normalize_postcode('123') == '123'
@@ -367,6 +399,13 @@ class TestPlaceAddress:
yield anl
@pytest.fixture
def getorcreate_hnr_id(self, temp_db_cursor):
temp_db_cursor.execute("""CREATE OR REPLACE FUNCTION getorcreate_hnr_id(lookup_term TEXT)
RETURNS INTEGER AS $$
SELECT -nextval('seq_word')::INTEGER; $$ LANGUAGE SQL""")
def process_address(self, **kwargs):
return self.analyzer.process_place(PlaceInfo({'address': kwargs}))

View File

@@ -11,18 +11,20 @@ from nominatim.errors import UsageError
from icu import Transliterator
@pytest.fixture
def test_config(def_config, tmp_path):
project_dir = tmp_path / 'project_dir'
project_dir.mkdir()
def_config.project_dir = project_dir
CONFIG_SECTIONS = ('normalization', 'transliteration', 'token-analysis')
return def_config
class TestIcuRuleLoader:
@pytest.fixture(autouse=True)
def init_env(self, project_env):
self.project_env = project_env
@pytest.fixture
def cfgrules(test_config):
def _create_config(*variants, **kwargs):
def write_config(self, content):
(self.project_env.project_dir / 'icu_tokenizer.yaml').write_text(dedent(content))
def config_rules(self, *variants):
content = dedent("""\
normalization:
- ":: NFD ()"
@@ -33,122 +35,116 @@ def cfgrules(test_config):
transliteration:
- ":: Latin ()"
- "[[:Punctuation:][:Space:]]+ > ' '"
""")
content += "token-analysis:\n - analyzer: generic\n variants:\n - words:\n"
content += '\n'.join((" - " + s for s in variants)) + '\n'
for k, v in kwargs:
content += " {}: {}\n".format(k, v)
(test_config.project_dir / 'icu_tokenizer.yaml').write_text(content)
return test_config
return _create_config
def test_empty_rule_set(test_config):
(test_config.project_dir / 'icu_tokenizer.yaml').write_text(dedent("""\
normalization:
transliteration:
token-analysis:
- analyzer: generic
variants:
"""))
rules = ICURuleLoader(test_config)
assert rules.get_search_rules() == ''
assert rules.get_normalization_rules() == ''
assert rules.get_transliteration_rules() == ''
CONFIG_SECTIONS = ('normalization', 'transliteration', 'token-analysis')
@pytest.mark.parametrize("section", CONFIG_SECTIONS)
def test_missing_section(section, test_config):
rule_cfg = { s: [] for s in CONFIG_SECTIONS if s != section}
(test_config.project_dir / 'icu_tokenizer.yaml').write_text(yaml.dump(rule_cfg))
with pytest.raises(UsageError):
ICURuleLoader(test_config)
def test_get_search_rules(cfgrules):
loader = ICURuleLoader(cfgrules())
rules = loader.get_search_rules()
trans = Transliterator.createFromRules("test", rules)
assert trans.transliterate(" Baum straße ") == " baum straße "
assert trans.transliterate(" Baumstraße ") == " baumstraße "
assert trans.transliterate(" Baumstrasse ") == " baumstrasse "
assert trans.transliterate(" Baumstr ") == " baumstr "
assert trans.transliterate(" Baumwegstr ") == " baumwegstr "
assert trans.transliterate(" Αθήνα ") == " athēna "
assert trans.transliterate(" проспект ") == " prospekt "
def test_get_normalization_rules(cfgrules):
loader = ICURuleLoader(cfgrules())
rules = loader.get_normalization_rules()
trans = Transliterator.createFromRules("test", rules)
assert trans.transliterate(" проспект-Prospekt ") == " проспект prospekt "
def test_get_transliteration_rules(cfgrules):
loader = ICURuleLoader(cfgrules())
rules = loader.get_transliteration_rules()
trans = Transliterator.createFromRules("test", rules)
assert trans.transliterate(" проспект-Prospekt ") == " prospekt Prospekt "
def test_transliteration_rules_from_file(test_config):
cfgpath = test_config.project_dir / ('icu_tokenizer.yaml')
cfgpath.write_text(dedent("""\
normalization:
transliteration:
- "'ax' > 'b'"
- !include transliteration.yaml
token-analysis:
- analyzer: generic
variants:
"""))
transpath = test_config.project_dir / ('transliteration.yaml')
transpath.write_text('- "x > y"')
- words:
""")
content += '\n'.join((" - " + s for s in variants)) + '\n'
self.write_config(content)
loader = ICURuleLoader(test_config)
rules = loader.get_transliteration_rules()
trans = Transliterator.createFromRules("test", rules)
assert trans.transliterate(" axxt ") == " byt "
def test_search_rules(cfgrules):
config = cfgrules('~street => s,st', 'master => mstr')
proc = ICURuleLoader(config).make_token_analysis()
assert proc.search.transliterate('Master Street').strip() == 'master street'
assert proc.search.transliterate('Earnes St').strip() == 'earnes st'
assert proc.search.transliterate('Nostreet').strip() == 'nostreet'
class TestGetReplacements:
@pytest.fixture(autouse=True)
def setup_cfg(self, cfgrules):
self.cfgrules = cfgrules
def get_replacements(self, *variants):
loader = ICURuleLoader(self.cfgrules(*variants))
self.config_rules(*variants)
loader = ICURuleLoader(self.project_env)
rules = loader.analysis[None].config['replacements']
return sorted((k, sorted(v)) for k,v in rules)
def test_empty_rule_set(self):
self.write_config("""\
normalization:
transliteration:
token-analysis:
- analyzer: generic
variants:
""")
rules = ICURuleLoader(self.project_env)
assert rules.get_search_rules() == ''
assert rules.get_normalization_rules() == ''
assert rules.get_transliteration_rules() == ''
@pytest.mark.parametrize("section", CONFIG_SECTIONS)
def test_missing_section(self, section):
rule_cfg = { s: [] for s in CONFIG_SECTIONS if s != section}
self.write_config(yaml.dump(rule_cfg))
with pytest.raises(UsageError):
ICURuleLoader(self.project_env)
def test_get_search_rules(self):
self.config_rules()
loader = ICURuleLoader(self.project_env)
rules = loader.get_search_rules()
trans = Transliterator.createFromRules("test", rules)
assert trans.transliterate(" Baum straße ") == " baum straße "
assert trans.transliterate(" Baumstraße ") == " baumstraße "
assert trans.transliterate(" Baumstrasse ") == " baumstrasse "
assert trans.transliterate(" Baumstr ") == " baumstr "
assert trans.transliterate(" Baumwegstr ") == " baumwegstr "
assert trans.transliterate(" Αθήνα ") == " athēna "
assert trans.transliterate(" проспект ") == " prospekt "
def test_get_normalization_rules(self):
self.config_rules()
loader = ICURuleLoader(self.project_env)
rules = loader.get_normalization_rules()
trans = Transliterator.createFromRules("test", rules)
assert trans.transliterate(" проспект-Prospekt ") == " проспект prospekt "
def test_get_transliteration_rules(self):
self.config_rules()
loader = ICURuleLoader(self.project_env)
rules = loader.get_transliteration_rules()
trans = Transliterator.createFromRules("test", rules)
assert trans.transliterate(" проспект-Prospekt ") == " prospekt Prospekt "
def test_transliteration_rules_from_file(self):
self.write_config("""\
normalization:
transliteration:
- "'ax' > 'b'"
- !include transliteration.yaml
token-analysis:
- analyzer: generic
variants:
""")
transpath = self.project_env.project_dir / ('transliteration.yaml')
transpath.write_text('- "x > y"')
loader = ICURuleLoader(self.project_env)
rules = loader.get_transliteration_rules()
trans = Transliterator.createFromRules("test", rules)
assert trans.transliterate(" axxt ") == " byt "
def test_search_rules(self):
self.config_rules('~street => s,st', 'master => mstr')
proc = ICURuleLoader(self.project_env).make_token_analysis()
assert proc.search.transliterate('Master Street').strip() == 'master street'
assert proc.search.transliterate('Earnes St').strip() == 'earnes st'
assert proc.search.transliterate('Nostreet').strip() == 'nostreet'
@pytest.mark.parametrize("variant", ['foo > bar', 'foo -> bar -> bar',
'~foo~ -> bar', 'fo~ o -> bar'])
def test_invalid_variant_description(self, variant):
self.config_rules(variant)
with pytest.raises(UsageError):
ICURuleLoader(self.cfgrules(variant))
ICURuleLoader(self.project_env)
def test_add_full(self):
repl = self.get_replacements("foo -> bar")

View File

@@ -2,6 +2,7 @@
Test for legacy tokenizer.
"""
import shutil
import re
import pytest
@@ -10,29 +11,49 @@ from nominatim.tokenizer import legacy_tokenizer
from nominatim.db import properties
from nominatim.errors import UsageError
@pytest.fixture
def test_config(def_config, tmp_path):
def_config.project_dir = tmp_path / 'project'
def_config.project_dir.mkdir()
from mock_legacy_word_table import MockLegacyWordTable
# Force use of legacy word table
@pytest.fixture
def word_table(temp_db_conn):
return MockLegacyWordTable(temp_db_conn)
@pytest.fixture
def test_config(project_env, tmp_path):
module_dir = tmp_path / 'module_src'
module_dir.mkdir()
(module_dir / 'nominatim.so').write_text('TEST nomiantim.so')
def_config.lib_dir.module = module_dir
project_env.lib_dir.module = module_dir
sqldir = tmp_path / 'sql'
sqldir.mkdir()
(sqldir / 'tokenizer').mkdir()
(sqldir / 'tokenizer' / 'legacy_tokenizer.sql').write_text("SELECT 'a'")
# Get the original SQL but replace make_standard_name to avoid module use.
init_sql = (project_env.lib_dir.sql / 'tokenizer' / 'legacy_tokenizer.sql').read_text()
for fn in ('transliteration', 'gettokenstring'):
init_sql = re.sub(f'CREATE OR REPLACE FUNCTION {fn}[^;]*;',
'', init_sql, re.DOTALL)
init_sql += """
CREATE OR REPLACE FUNCTION make_standard_name(name TEXT)
RETURNS TEXT AS $$ SELECT lower(name); $$ LANGUAGE SQL;
"""
# Also load util functions. Some are needed by the tokenizer.
init_sql += (project_env.lib_dir.sql / 'functions' / 'utils.sql').read_text()
(sqldir / 'tokenizer' / 'legacy_tokenizer.sql').write_text(init_sql)
(sqldir / 'words.sql').write_text("SELECT 'a'")
shutil.copy(str(def_config.lib_dir.sql / 'tokenizer' / 'legacy_tokenizer_tables.sql'),
shutil.copy(str(project_env.lib_dir.sql / 'tokenizer' / 'legacy_tokenizer_tables.sql'),
str(sqldir / 'tokenizer' / 'legacy_tokenizer_tables.sql'))
def_config.lib_dir.sql = sqldir
def_config.lib_dir.data = sqldir
project_env.lib_dir.sql = sqldir
project_env.lib_dir.data = sqldir
return def_config
return project_env
@pytest.fixture
@@ -55,12 +76,6 @@ def tokenizer_setup(tokenizer_factory, test_config, monkeypatch, sql_preprocesso
@pytest.fixture
def analyzer(tokenizer_factory, test_config, monkeypatch, sql_preprocessor,
word_table, temp_db_with_extensions, tmp_path):
sql = tmp_path / 'sql' / 'tokenizer' / 'legacy_tokenizer.sql'
sql.write_text("""
CREATE OR REPLACE FUNCTION getorcreate_housenumber_id(lookup_word TEXT)
RETURNS INTEGER AS $$ SELECT 342; $$ LANGUAGE SQL;
""")
monkeypatch.setattr(legacy_tokenizer, '_check_module', lambda m, c: None)
monkeypatch.setenv('NOMINATIM_TERM_NORMALIZATION', ':: lower();')
tok = tokenizer_factory()
@@ -87,12 +102,6 @@ def create_postcode_id(temp_db_cursor):
$$ LANGUAGE SQL""")
@pytest.fixture
def make_keywords(temp_db_cursor, temp_db_with_extensions):
temp_db_cursor.execute(
"""CREATE OR REPLACE FUNCTION make_keywords(names HSTORE)
RETURNS INTEGER[] AS $$ SELECT ARRAY[1, 2, 3] $$ LANGUAGE SQL""")
def test_init_new(tokenizer_factory, test_config, monkeypatch,
temp_db_conn, sql_preprocessor):
monkeypatch.setenv('NOMINATIM_TERM_NORMALIZATION', 'xxvv')
@@ -163,6 +172,23 @@ def test_update_sql_functions(sql_preprocessor, temp_db_conn,
assert test_content == set((('1133', ), (str(test_config.project_dir / 'module'), )))
def test_finalize_import(tokenizer_factory, temp_db_conn,
temp_db_cursor, test_config, monkeypatch,
sql_preprocessor_cfg):
monkeypatch.setattr(legacy_tokenizer, '_check_module', lambda m, c: None)
func_file = test_config.lib_dir.sql / 'tokenizer' / 'legacy_tokenizer_indices.sql'
func_file.write_text("""CREATE FUNCTION test() RETURNS TEXT
AS $$ SELECT 'b'::text $$ LANGUAGE SQL""")
tok = tokenizer_factory()
tok.init_new_db(test_config)
tok.finalize_import(test_config)
temp_db_cursor.scalar('SELECT test()') == 'b'
def test_migrate_database(tokenizer_factory, test_config, temp_db_conn, monkeypatch):
monkeypatch.setattr(legacy_tokenizer, '_check_module', lambda m, c: None)
tok = tokenizer_factory()
@@ -178,6 +204,53 @@ def test_migrate_database(tokenizer_factory, test_config, temp_db_conn, monkeypa
assert outfile.stat().st_mode == 33261
def test_check_database(test_config, tokenizer_factory, monkeypatch,
temp_db_cursor, sql_preprocessor_cfg):
monkeypatch.setattr(legacy_tokenizer, '_check_module', lambda m, c: None)
tok = tokenizer_factory()
tok.init_new_db(test_config)
assert tok.check_database(False) is None
def test_check_database_no_tokenizer(test_config, tokenizer_factory):
tok = tokenizer_factory()
assert tok.check_database(False) is not None
def test_check_database_bad_setup(test_config, tokenizer_factory, monkeypatch,
temp_db_cursor, sql_preprocessor_cfg):
monkeypatch.setattr(legacy_tokenizer, '_check_module', lambda m, c: None)
tok = tokenizer_factory()
tok.init_new_db(test_config)
# Inject a bad transliteration.
temp_db_cursor.execute("""CREATE OR REPLACE FUNCTION make_standard_name(name TEXT)
RETURNS TEXT AS $$ SELECT 'garbage'::text; $$ LANGUAGE SQL""")
assert tok.check_database(False) is not None
def test_update_statistics_reverse_only(word_table, tokenizer_factory):
tok = tokenizer_factory()
tok.update_statistics()
def test_update_statistics(word_table, table_factory, temp_db_cursor, tokenizer_factory):
word_table.add_full_word(1000, 'hello')
table_factory('search_name',
'place_id BIGINT, name_vector INT[]',
[(12, [1000])])
tok = tokenizer_factory()
tok.update_statistics()
assert temp_db_cursor.scalar("""SELECT count(*) FROM word
WHERE word_token like ' %' and
search_name_count > 0""") > 0
def test_normalize(analyzer):
assert analyzer.normalize('TEsT') == 'test'
@@ -189,7 +262,6 @@ def test_update_postcodes_from_db_empty(analyzer, table_factory, word_table,
analyzer.update_postcodes_from_db()
assert word_table.count() == 3
assert word_table.get_postcodes() == {'1234', '12 34', 'AB23'}
@@ -202,7 +274,6 @@ def test_update_postcodes_from_db_add_and_remove(analyzer, table_factory, word_t
analyzer.update_postcodes_from_db()
assert word_table.count() == 3
assert word_table.get_postcodes() == {'1234', '45BC', 'XX45'}
@@ -284,12 +355,6 @@ def test_add_more_country_names(analyzer, word_table, make_standard_name):
('it', ' #it#')}
def test_process_place_names(analyzer, make_keywords):
info = analyzer.process_place(PlaceInfo({'name' : {'name' : 'Soft bAr', 'ref': '34'}}))
assert info['names'] == '{1,2,3}'
@pytest.mark.parametrize('pcode', ['12345', 'AB 123', '34-345'])
def test_process_place_postcode(analyzer, create_postcode_id, word_table, pcode):
analyzer.process_place(PlaceInfo({'address': {'postcode' : pcode}}))
@@ -340,3 +405,174 @@ class TestHousenumberName:
'streetnumber' : '99a'}}))
assert set(info['hnr'].split(';')) == set(('134', '99a'))
class TestPlaceNames:
@pytest.fixture(autouse=True)
def setup(self, analyzer):
self.analyzer = analyzer
def expect_name_terms(self, info, *expected_terms):
tokens = self.analyzer.get_word_token_info(list(expected_terms))
for token in tokens:
assert token[2] is not None, "No token for {0}".format(token)
assert eval(info['names']) == set((t[2] for t in tokens)),\
f"Expected: {tokens}\nGot: {info['names']}"
def process_named_place(self, names):
return self.analyzer.process_place(PlaceInfo({'name': names}))
def test_simple_names(self):
info = self.process_named_place({'name': 'Soft bAr', 'ref': '34'})
self.expect_name_terms(info, '#Soft bAr', '#34', 'Soft', 'bAr', '34')
@pytest.mark.parametrize('sep', [',' , ';'])
def test_names_with_separator(self, sep):
info = self.process_named_place({'name': sep.join(('New York', 'Big Apple'))})
self.expect_name_terms(info, '#New York', '#Big Apple',
'new', 'york', 'big', 'apple')
def test_full_names_with_bracket(self):
info = self.process_named_place({'name': 'Houseboat (left)'})
self.expect_name_terms(info, '#Houseboat (left)', '#Houseboat',
'houseboat', '(left)')
def test_country_name(self, word_table):
place = PlaceInfo({'name' : {'name': 'Norge'},
'country_code': 'no',
'rank_address': 4,
'class': 'boundary',
'type': 'administrative'})
info = self.analyzer.process_place(place)
self.expect_name_terms(info, '#norge', 'norge')
assert word_table.get_country() == {('no', ' norge')}
class TestPlaceAddress:
@pytest.fixture(autouse=True)
def setup(self, analyzer):
self.analyzer = analyzer
@pytest.fixture
def getorcreate_hnr_id(self, temp_db_cursor):
temp_db_cursor.execute("""CREATE SEQUENCE seq_hnr start 1;
CREATE OR REPLACE FUNCTION getorcreate_housenumber_id(lookup_word TEXT)
RETURNS INTEGER AS $$
SELECT -nextval('seq_hnr')::INTEGER; $$ LANGUAGE SQL""")
def process_address(self, **kwargs):
return self.analyzer.process_place(PlaceInfo({'address': kwargs}))
def name_token_set(self, *expected_terms):
tokens = self.analyzer.get_word_token_info(list(expected_terms))
for token in tokens:
assert token[2] is not None, "No token for {0}".format(token)
return set((t[2] for t in tokens))
@pytest.mark.parametrize('pcode', ['12345', 'AB 123', '34-345'])
def test_process_place_postcode(self, word_table, pcode):
self.process_address(postcode=pcode)
assert word_table.get_postcodes() == {pcode, }
@pytest.mark.parametrize('pcode', ['12:23', 'ab;cd;f', '123;836'])
def test_process_place_bad_postcode(self, word_table, pcode):
self.process_address(postcode=pcode)
assert not word_table.get_postcodes()
@pytest.mark.parametrize('hnr', ['123a', '0', '101'])
def test_process_place_housenumbers_simple(self, hnr, getorcreate_hnr_id):
info = self.process_address(housenumber=hnr)
assert info['hnr'] == hnr.lower()
assert info['hnr_tokens'] == "{-1}"
def test_process_place_housenumbers_lists(self, getorcreate_hnr_id):
info = self.process_address(conscriptionnumber='1; 2;3')
assert set(info['hnr'].split(';')) == set(('1', '2', '3'))
assert info['hnr_tokens'] == "{-1,-2,-3}"
def test_process_place_housenumbers_duplicates(self, getorcreate_hnr_id):
info = self.process_address(housenumber='134',
conscriptionnumber='134',
streetnumber='99A')
assert set(info['hnr'].split(';')) == set(('134', '99a'))
assert info['hnr_tokens'] == "{-1,-2}"
def test_process_place_street(self):
# legacy tokenizer only indexes known names
self.analyzer.process_place(PlaceInfo({'name': {'name' : 'Grand Road'}}))
info = self.process_address(street='Grand Road')
assert eval(info['street']) == self.name_token_set('#Grand Road')
def test_process_place_street_empty(self):
info = self.process_address(street='🜵')
assert 'street' not in info
def test_process_place_place(self):
self.analyzer.process_place(PlaceInfo({'name': {'name' : 'Honu Lulu'}}))
info = self.process_address(place='Honu Lulu')
assert eval(info['place_search']) == self.name_token_set('#Honu Lulu',
'Honu', 'Lulu')
assert eval(info['place_match']) == self.name_token_set('#Honu Lulu')
def test_process_place_place_empty(self):
info = self.process_address(place='🜵')
assert 'place' not in info
def test_process_place_address_terms(self):
for name in ('Zwickau', 'Haupstraße', 'Sachsen'):
self.analyzer.process_place(PlaceInfo({'name': {'name' : name}}))
info = self.process_address(country='de', city='Zwickau', state='Sachsen',
suburb='Zwickau', street='Hauptstr',
full='right behind the church')
city = self.name_token_set('ZWICKAU')
state = self.name_token_set('SACHSEN')
print(info)
result = {k: eval(v[0]) for k,v in info['addr'].items()}
assert result == {'city': city, 'suburb': city, 'state': state}
def test_process_place_address_terms_empty(self):
info = self.process_address(country='de', city=' ', street='Hauptstr',
full='right behind the church')
assert 'addr' not in info

View File

@@ -0,0 +1,14 @@
import pytest
@pytest.fixture
def osm2pgsql_options(temp_db):
""" A standard set of options for osm2pgsql.
"""
return dict(osm2pgsql='echo',
osm2pgsql_cache=10,
osm2pgsql_style='style.file',
threads=1,
dsn='dbname=' + temp_db,
flatnode_file='',
tablespaces=dict(slim_data='', slim_index='',
main_data='', main_index=''))

View File

@@ -0,0 +1,52 @@
"""
Tests for functions to add additional data to the database.
"""
from pathlib import Path
import pytest
from nominatim.tools import add_osm_data
class CaptureGetUrl:
def __init__(self, monkeypatch):
self.url = None
monkeypatch.setattr(add_osm_data, 'get_url', self)
def __call__(self, url):
self.url = url
return '<xml></xml>'
def test_import_osm_file_simple(table_factory, osm2pgsql_options, capfd):
table_factory('place', content=((1, ), ))
assert add_osm_data.add_data_from_file(Path('change.osm'), osm2pgsql_options) == 0
captured = capfd.readouterr()
assert '--append' in captured.out
assert '--output gazetteer' in captured.out
assert f'--style {osm2pgsql_options["osm2pgsql_style"]}' in captured.out
assert f'--number-processes {osm2pgsql_options["threads"]}' in captured.out
assert f'--cache {osm2pgsql_options["osm2pgsql_cache"]}' in captured.out
assert 'change.osm' in captured.out
@pytest.mark.parametrize("osm_type", ['node', 'way', 'relation'])
@pytest.mark.parametrize("main_api,url", [(True, 'https://www.openstreetmap.org/api'),
(False, 'https://overpass-api.de/api/interpreter?')])
def test_import_osm_object_main_api(osm2pgsql_options, monkeypatch, capfd,
osm_type, main_api, url):
get_url_mock = CaptureGetUrl(monkeypatch)
add_osm_data.add_osm_object(osm_type, 4536, main_api, osm2pgsql_options)
captured = capfd.readouterr()
assert get_url_mock.url.startswith(url)
assert '--append' in captured.out
assert '--output gazetteer' in captured.out
assert f'--style {osm2pgsql_options["osm2pgsql_style"]}' in captured.out
assert f'--number-processes {osm2pgsql_options["threads"]}' in captured.out
assert f'--cache {osm2pgsql_options["osm2pgsql_cache"]}' in captured.out
assert captured.out.endswith(' -\n')

View File

@@ -60,6 +60,11 @@ class TestDatabaseSetup:
database_import.setup_database_skeleton(f'dbname={self.DBNAME}')
def test_create_db_explicit_ro_user(self):
database_import.setup_database_skeleton(f'dbname={self.DBNAME}',
rouser='postgres')
def test_create_db_missing_ro_user(self):
with pytest.raises(UsageError, match='Missing read-only user.'):
database_import.setup_database_skeleton(f'dbname={self.DBNAME}',
@@ -78,13 +83,21 @@ def test_setup_skeleton_already_exists(temp_db):
database_import.setup_database_skeleton(f'dbname={temp_db}')
def test_import_osm_data_simple(table_factory, osm2pgsql_options):
def test_import_osm_data_simple(table_factory, osm2pgsql_options, capfd):
table_factory('place', content=((1, ), ))
database_import.import_osm_data(Path('file.pbf'), osm2pgsql_options)
captured = capfd.readouterr()
assert '--create' in captured.out
assert '--output gazetteer' in captured.out
assert f'--style {osm2pgsql_options["osm2pgsql_style"]}' in captured.out
assert f'--number-processes {osm2pgsql_options["threads"]}' in captured.out
assert f'--cache {osm2pgsql_options["osm2pgsql_cache"]}' in captured.out
assert 'file.pbf' in captured.out
def test_import_osm_data_multifile(table_factory, tmp_path, osm2pgsql_options):
def test_import_osm_data_multifile(table_factory, tmp_path, osm2pgsql_options, capfd):
table_factory('place', content=((1, ), ))
osm2pgsql_options['osm2pgsql_cache'] = 0
@@ -93,15 +106,26 @@ def test_import_osm_data_multifile(table_factory, tmp_path, osm2pgsql_options):
f.write_text('test')
database_import.import_osm_data(files, osm2pgsql_options)
captured = capfd.readouterr()
assert 'file1.osm' in captured.out
assert 'file2.osm' in captured.out
def test_import_osm_data_simple_no_data(table_factory, osm2pgsql_options):
table_factory('place')
with pytest.raises(UsageError, match='No data.*'):
with pytest.raises(UsageError, match='No data imported'):
database_import.import_osm_data(Path('file.pbf'), osm2pgsql_options)
def test_import_osm_data_simple_ignore_no_data(table_factory, osm2pgsql_options):
table_factory('place')
database_import.import_osm_data(Path('file.pbf'), osm2pgsql_options,
ignore_errors=True)
def test_import_osm_data_drop(table_factory, temp_db_conn, tmp_path, osm2pgsql_options):
table_factory('place', content=((1, ), ))
table_factory('planet_osm_nodes')
@@ -117,19 +141,26 @@ def test_import_osm_data_drop(table_factory, temp_db_conn, tmp_path, osm2pgsql_o
assert not temp_db_conn.table_exists('planet_osm_nodes')
def test_import_osm_data_default_cache(table_factory, osm2pgsql_options):
def test_import_osm_data_default_cache(table_factory, osm2pgsql_options, capfd):
table_factory('place', content=((1, ), ))
osm2pgsql_options['osm2pgsql_cache'] = 0
database_import.import_osm_data(Path(__file__), osm2pgsql_options)
captured = capfd.readouterr()
assert f'--cache {osm2pgsql_options["osm2pgsql_cache"]}' in captured.out
def test_truncate_database_tables(temp_db_conn, temp_db_cursor, table_factory):
tables = ('placex', 'place_addressline', 'location_area',
@pytest.mark.parametrize("with_search", (True, False))
def test_truncate_database_tables(temp_db_conn, temp_db_cursor, table_factory, with_search):
tables = ['placex', 'place_addressline', 'location_area',
'location_area_country',
'location_property_tiger', 'location_property_osmline',
'location_postcode', 'search_name', 'location_road_23')
'location_postcode', 'location_road_23']
if with_search:
tables.append('search_name')
for table in tables:
table_factory(table, content=((1, ), (2, ), (3, )))
assert temp_db_cursor.table_rows(table) == 3
@@ -144,9 +175,9 @@ def test_truncate_database_tables(temp_db_conn, temp_db_cursor, table_factory):
def test_load_data(dsn, place_row, placex_table, osmline_table,
word_table, temp_db_cursor, threads):
for func in ('precompute_words', 'getorcreate_housenumber_id', 'make_standard_name'):
temp_db_cursor.execute("""CREATE FUNCTION {} (src TEXT)
temp_db_cursor.execute(f"""CREATE FUNCTION {func} (src TEXT)
RETURNS TEXT AS $$ SELECT 'a'::TEXT $$ LANGUAGE SQL
""".format(func))
""")
for oid in range(100, 130):
place_row(osm_id=oid)
place_row(osm_type='W', osm_id=342, cls='place', typ='houses',
@@ -156,3 +187,59 @@ def test_load_data(dsn, place_row, placex_table, osmline_table,
assert temp_db_cursor.table_rows('placex') == 30
assert temp_db_cursor.table_rows('location_property_osmline') == 1
class TestSetupSQL:
@pytest.fixture(autouse=True)
def init_env(self, temp_db, tmp_path, def_config, sql_preprocessor_cfg):
def_config.lib_dir.sql = tmp_path / 'sql'
def_config.lib_dir.sql.mkdir()
self.config = def_config
def write_sql(self, fname, content):
(self.config.lib_dir.sql / fname).write_text(content)
@pytest.mark.parametrize("reverse", [True, False])
def test_create_tables(self, temp_db_conn, temp_db_cursor, reverse):
self.write_sql('tables.sql',
"""CREATE FUNCTION test() RETURNS bool
AS $$ SELECT {{db.reverse_only}} $$ LANGUAGE SQL""")
database_import.create_tables(temp_db_conn, self.config, reverse)
temp_db_cursor.scalar('SELECT test()') == reverse
def test_create_table_triggers(self, temp_db_conn, temp_db_cursor):
self.write_sql('table-triggers.sql',
"""CREATE FUNCTION test() RETURNS TEXT
AS $$ SELECT 'a'::text $$ LANGUAGE SQL""")
database_import.create_table_triggers(temp_db_conn, self.config)
temp_db_cursor.scalar('SELECT test()') == 'a'
def test_create_partition_tables(self, temp_db_conn, temp_db_cursor):
self.write_sql('partition-tables.src.sql',
"""CREATE FUNCTION test() RETURNS TEXT
AS $$ SELECT 'b'::text $$ LANGUAGE SQL""")
database_import.create_partition_tables(temp_db_conn, self.config)
temp_db_cursor.scalar('SELECT test()') == 'b'
@pytest.mark.parametrize("drop", [True, False])
def test_create_search_indices(self, temp_db_conn, temp_db_cursor, drop):
self.write_sql('indices.sql',
"""CREATE FUNCTION test() RETURNS bool
AS $$ SELECT {{drop}} $$ LANGUAGE SQL""")
database_import.create_search_indices(temp_db_conn, self.config, drop)
temp_db_cursor.scalar('SELECT test()') == drop

View File

@@ -69,6 +69,20 @@ class TestRunLegacyScript:
assert exec_utils.run_legacy_script(fname, nominatim_env=self.testenv) == 0
def test_run_legacy_default_osm2pgsql_binary(self, monkeypatch):
fname = self.mk_script("exit($_SERVER['NOMINATIM_OSM2PGSQL_BINARY'] == 'osm2pgsql' ? 0 : 23);")
assert exec_utils.run_legacy_script(fname, nominatim_env=self.testenv) == 0
def test_run_legacy_override_osm2pgsql_binary(self, monkeypatch):
monkeypatch.setenv('NOMINATIM_OSM2PGSQL_BINARY', 'somethingelse')
fname = self.mk_script("exit($_SERVER['NOMINATIM_OSM2PGSQL_BINARY'] == 'somethingelse' ? 0 : 23);")
assert exec_utils.run_legacy_script(fname, nominatim_env=self.testenv) == 0
class TestRunApiScript:
@staticmethod
@@ -92,13 +106,26 @@ class TestRunApiScript:
extra_env = dict(SCRIPT_FILENAME=str(tmp_path / 'website' / 'test.php'))
assert exec_utils.run_api_script('badname', tmp_path, extra_env=extra_env) == 0
@staticmethod
def test_custom_phpcgi(tmp_path, capfd):
assert exec_utils.run_api_script('test', tmp_path, phpcgi_bin='env',
params={'q' : 'Berlin'}) == 0
captured = capfd.readouterr()
assert '?q=Berlin' in captured.out
@staticmethod
def test_fail_on_error_output(tmp_path):
(tmp_path / 'website' / 'bad.php').write_text("<?php\nfwrite(STDERR, 'WARNING'.PHP_EOL);")
assert exec_utils.run_api_script('bad', tmp_path) == 1
### run_osm2pgsql
def test_run_osm2pgsql(osm2pgsql_options):
osm2pgsql_options['append'] = False
osm2pgsql_options['import_file'] = 'foo.bar'
osm2pgsql_options['tablespaces']['osm_data'] = 'extra'
osm2pgsql_options['tablespaces']['slim_data'] = 'extra'
exec_utils.run_osm2pgsql(osm2pgsql_options)

View File

@@ -0,0 +1,237 @@
"""
Tests for migration functions
"""
import pytest
import psycopg2.extras
from nominatim.tools import migration
from nominatim.errors import UsageError
import nominatim.version
class DummyTokenizer:
def update_sql_functions(self, config):
pass
@pytest.fixture
def postprocess_mock(monkeypatch):
monkeypatch.setattr(migration.refresh, 'create_functions', lambda *args: args)
monkeypatch.setattr(migration.tokenizer_factory, 'get_tokenizer_for_db',
lambda *args: DummyTokenizer())
def test_no_migration_old_versions(temp_db_with_extensions, table_factory, def_config):
table_factory('country_name', 'name HSTORE, country_code TEXT')
with pytest.raises(UsageError, match='Migration not possible'):
migration.migrate(def_config, {})
def test_set_up_migration_for_36(temp_db_with_extensions, temp_db_cursor,
table_factory, def_config, monkeypatch,
postprocess_mock):
psycopg2.extras.register_hstore(temp_db_cursor)
# don't actually run any migration, except the property table creation
monkeypatch.setattr(migration, '_MIGRATION_FUNCTIONS',
[((3, 5, 0, 99), migration.add_nominatim_property_table)])
# Use a r/o user name that always exists
monkeypatch.setenv('NOMINATIM_DATABASE_WEBUSER', 'postgres')
table_factory('country_name', 'name HSTORE, country_code TEXT',
(({str(x): 'a' for x in range(200)}, 'gb'),))
assert not temp_db_cursor.table_exists('nominatim_properties')
assert migration.migrate(def_config, {}) == 0
assert temp_db_cursor.table_exists('nominatim_properties')
assert 1 == temp_db_cursor.scalar(""" SELECT count(*) FROM nominatim_properties
WHERE property = 'database_version'""")
def test_already_at_version(def_config, property_table):
property_table.set('database_version',
'{0[0]}.{0[1]}.{0[2]}-{0[3]}'.format(nominatim.version.NOMINATIM_VERSION))
assert migration.migrate(def_config, {}) == 0
def test_no_migrations_necessary(def_config, temp_db_cursor, property_table,
monkeypatch):
oldversion = [x for x in nominatim.version.NOMINATIM_VERSION]
oldversion[0] -= 1
property_table.set('database_version',
'{0[0]}.{0[1]}.{0[2]}-{0[3]}'.format(oldversion))
oldversion[0] = 0
monkeypatch.setattr(migration, '_MIGRATION_FUNCTIONS',
[(tuple(oldversion), lambda **attr: True)])
assert migration.migrate(def_config, {}) == 0
def test_run_single_migration(def_config, temp_db_cursor, property_table,
monkeypatch, postprocess_mock):
oldversion = [x for x in nominatim.version.NOMINATIM_VERSION]
oldversion[0] -= 1
property_table.set('database_version',
'{0[0]}.{0[1]}.{0[2]}-{0[3]}'.format(oldversion))
done = {'old': False, 'new': False}
def _migration(**_):
""" Dummy migration"""
done['new'] = True
def _old_migration(**_):
""" Dummy migration"""
done['old'] = True
oldversion[0] = 0
monkeypatch.setattr(migration, '_MIGRATION_FUNCTIONS',
[(tuple(oldversion), _old_migration),
(nominatim.version.NOMINATIM_VERSION, _migration)])
assert migration.migrate(def_config, {}) == 0
assert done['new']
assert not done['old']
assert property_table.get('database_version') == \
'{0[0]}.{0[1]}.{0[2]}-{0[3]}'.format(nominatim.version.NOMINATIM_VERSION)
###### Tests for specific migrations
#
# Each migration should come with two tests:
# 1. Test that migration from old to new state works as expected.
# 2. Test that the migration can be rerun on the new state without side effects.
@pytest.mark.parametrize('in_attr', ('', 'with time zone'))
def test_import_status_timestamp_change(temp_db_conn, temp_db_cursor,
table_factory, in_attr):
table_factory('import_status',
f"""lastimportdate timestamp {in_attr},
sequence_id integer,
indexed boolean""")
migration.import_status_timestamp_change(temp_db_conn)
temp_db_conn.commit()
assert temp_db_cursor.scalar("""SELECT data_type FROM information_schema.columns
WHERE table_name = 'import_status'
and column_name = 'lastimportdate'""")\
== 'timestamp with time zone'
def test_add_nominatim_property_table(temp_db_conn, temp_db_cursor,
def_config, monkeypatch):
# Use a r/o user name that always exists
monkeypatch.setenv('NOMINATIM_DATABASE_WEBUSER', 'postgres')
assert not temp_db_cursor.table_exists('nominatim_properties')
migration.add_nominatim_property_table(temp_db_conn, def_config)
temp_db_conn.commit()
assert temp_db_cursor.table_exists('nominatim_properties')
def test_add_nominatim_property_table_repeat(temp_db_conn, temp_db_cursor,
def_config, property_table):
assert temp_db_cursor.table_exists('nominatim_properties')
migration.add_nominatim_property_table(temp_db_conn, def_config)
temp_db_conn.commit()
assert temp_db_cursor.table_exists('nominatim_properties')
def test_change_housenumber_transliteration(temp_db_conn, temp_db_cursor,
word_table, placex_table):
placex_table.add(housenumber='3A')
temp_db_cursor.execute("""CREATE OR REPLACE FUNCTION make_standard_name(name TEXT)
RETURNS TEXT AS $$ SELECT lower(name) $$ LANGUAGE SQL """)
temp_db_cursor.execute("""CREATE OR REPLACE FUNCTION getorcreate_housenumber_id(lookup_word TEXT)
RETURNS INTEGER AS $$ SELECT 4325 $$ LANGUAGE SQL """)
migration.change_housenumber_transliteration(temp_db_conn)
temp_db_conn.commit()
assert temp_db_cursor.scalar('SELECT housenumber from placex') == '3a'
migration.change_housenumber_transliteration(temp_db_conn)
temp_db_conn.commit()
assert temp_db_cursor.scalar('SELECT housenumber from placex') == '3a'
def test_switch_placenode_geometry_index(temp_db_conn, temp_db_cursor, placex_table):
temp_db_cursor.execute("""CREATE INDEX idx_placex_adminname
ON placex (place_id)""")
migration.switch_placenode_geometry_index(temp_db_conn)
temp_db_conn.commit()
assert temp_db_cursor.index_exists('placex', 'idx_placex_geometry_placenode')
assert not temp_db_cursor.index_exists('placex', 'idx_placex_adminname')
def test_switch_placenode_geometry_index_repeat(temp_db_conn, temp_db_cursor, placex_table):
temp_db_cursor.execute("""CREATE INDEX idx_placex_geometry_placenode
ON placex (place_id)""")
migration.switch_placenode_geometry_index(temp_db_conn)
temp_db_conn.commit()
assert temp_db_cursor.index_exists('placex', 'idx_placex_geometry_placenode')
assert not temp_db_cursor.index_exists('placex', 'idx_placex_adminname')
assert temp_db_cursor.scalar("""SELECT indexdef from pg_indexes
WHERE tablename = 'placex'
and indexname = 'idx_placex_geometry_placenode'
""").endswith('(place_id)')
def test_install_legacy_tokenizer(temp_db_conn, temp_db_cursor, project_env,
property_table, table_factory, monkeypatch,
tmp_path):
table_factory('placex', 'place_id BIGINT')
table_factory('location_property_osmline', 'place_id BIGINT')
# Setting up the tokenizer is problematic
class MiniTokenizer:
def migrate_database(self, config):
pass
monkeypatch.setattr(migration.tokenizer_factory, 'create_tokenizer',
lambda cfg, **kwargs: MiniTokenizer())
migration.install_legacy_tokenizer(temp_db_conn, project_env)
temp_db_conn.commit()
def test_install_legacy_tokenizer_repeat(temp_db_conn, temp_db_cursor,
def_config, property_table):
property_table.set('tokenizer', 'dummy')
migration.install_legacy_tokenizer(temp_db_conn, def_config)
temp_db_conn.commit()
def test_create_tiger_housenumber_index(temp_db_conn, temp_db_cursor, table_factory):
table_factory('location_property_tiger',
'parent_place_id BIGINT, startnumber INT, endnumber INT')
migration.create_tiger_housenumber_index(temp_db_conn)
temp_db_conn.commit()
if temp_db_conn.server_version_tuple() >= (11, 0, 0):
assert temp_db_cursor.index_exists('location_property_tiger',
'idx_location_property_tiger_housenumber_migrated')
migration.create_tiger_housenumber_index(temp_db_conn)
temp_db_conn.commit()

View File

@@ -22,3 +22,14 @@ def test_refresh_import_wikipedia(dsn, src_dir, table_factory, temp_db_cursor, r
assert temp_db_cursor.table_rows('wikipedia_article') > 0
assert temp_db_cursor.table_rows('wikipedia_redirect') > 0
def test_recompute_importance(placex_table, table_factory, temp_db_conn, temp_db_cursor):
temp_db_cursor.execute("""CREATE OR REPLACE FUNCTION compute_importance(extratags HSTORE,
country_code varchar(2),
osm_type varchar(1), osm_id BIGINT,
OUT importance FLOAT,
OUT wikipedia TEXT)
AS $$ SELECT 0.1::float, 'foo'::text $$ LANGUAGE SQL""")
refresh.recompute_importance(temp_db_conn)

View File

@@ -13,24 +13,21 @@ def test_load_ranks_def_config(temp_db_conn, temp_db_cursor, def_config):
assert temp_db_cursor.table_rows('address_levels') > 0
def test_load_ranks_from_project_dir(def_config, temp_db_conn, temp_db_cursor,
tmp_path):
test_file = tmp_path / 'address-levels.json'
def test_load_ranks_from_project_dir(project_env, temp_db_conn, temp_db_cursor):
test_file = project_env.project_dir / 'address-levels.json'
test_file.write_text('[{"tags":{"place":{"sea":2}}}]')
def_config.project_dir = tmp_path
load_address_levels_from_config(temp_db_conn, def_config)
load_address_levels_from_config(temp_db_conn, project_env)
assert temp_db_cursor.table_rows('address_levels') == 1
def test_load_ranks_from_broken_file(def_config, temp_db_conn, tmp_path):
test_file = tmp_path / 'address-levels.json'
def test_load_ranks_from_broken_file(project_env, temp_db_conn):
test_file = project_env.project_dir / 'address-levels.json'
test_file.write_text('[{"tags":"place":{"sea":2}}}]')
def_config.project_dir = tmp_path
with pytest.raises(json.decoder.JSONDecodeError):
load_address_levels_from_config(temp_db_conn, def_config)
load_address_levels_from_config(temp_db_conn, project_env)
def test_load_ranks_country(temp_db_conn, temp_db_cursor):

View File

@@ -0,0 +1,51 @@
"""
Tests for creating PL/pgSQL functions for Nominatim.
"""
import pytest
from nominatim.tools.refresh import create_functions
class TestCreateFunctions:
@pytest.fixture(autouse=True)
def init_env(self, sql_preprocessor, temp_db_conn, def_config, tmp_path):
self.conn = temp_db_conn
self.config = def_config
def_config.lib_dir.sql = tmp_path
def write_functions(self, content):
sqlfile = self.config.lib_dir.sql / 'functions.sql'
sqlfile.write_text(content)
def test_create_functions(self, temp_db_cursor):
self.write_functions("""CREATE OR REPLACE FUNCTION test() RETURNS INTEGER
AS $$
BEGIN
RETURN 43;
END;
$$ LANGUAGE plpgsql IMMUTABLE;
""")
create_functions(self.conn, self.config)
assert temp_db_cursor.scalar('SELECT test()') == 43
@pytest.mark.parametrize("dbg,ret", ((True, 43), (False, 22)))
def test_create_functions_with_template(self, temp_db_cursor, dbg, ret):
self.write_functions("""CREATE OR REPLACE FUNCTION test() RETURNS INTEGER
AS $$
BEGIN
{% if debug %}
RETURN 43;
{% else %}
RETURN 22;
{% endif %}
END;
$$ LANGUAGE plpgsql IMMUTABLE;
""")
create_functions(self.conn, self.config, enable_debug=dbg)
assert temp_db_cursor.scalar('SELECT test()') == ret

View File

@@ -22,12 +22,11 @@ def test_script(tmp_path):
@pytest.fixture
def run_website_script(tmp_path, def_config, temp_db_conn):
def_config.lib_dir.php = tmp_path / 'php'
def_config.project_dir = tmp_path
def run_website_script(tmp_path, project_env, temp_db_conn):
project_env.lib_dir.php = tmp_path / 'php'
def _runner():
refresh.setup_website(tmp_path, def_config, temp_db_conn)
refresh.setup_website(tmp_path, project_env, temp_db_conn)
proc = subprocess.run(['/usr/bin/env', 'php', '-Cq',
tmp_path / 'search.php'], check=False)
@@ -37,6 +36,16 @@ def run_website_script(tmp_path, def_config, temp_db_conn):
return _runner
def test_basedir_created(tmp_path, project_env, temp_db_conn):
webdir = tmp_path / 'website'
assert not webdir.exists()
refresh.setup_website(webdir, project_env, temp_db_conn)
assert webdir.exists()
@pytest.mark.parametrize("setting,retval", (('yes', 10), ('no', 20)))
def test_setup_website_check_bool(monkeypatch, test_script, run_website_script,
setting, retval):
@@ -70,3 +79,13 @@ def test_setup_website_check_str(monkeypatch, test_script, run_website_script):
test_script('exit(CONST_Default_Language === "ffde 2" ? 10 : 20);')
assert run_website_script() == 10
def test_relative_log_file(project_env, monkeypatch, test_script, run_website_script):
monkeypatch.setenv('NOMINATIM_LOG_FILE', 'access.log')
expected_file = str(project_env.project_dir / 'access.log')
test_script(f'exit(CONST_Log_File === "{expected_file}" ? 10 : 20);')
assert run_website_script() == 10

View File

@@ -1,20 +0,0 @@
<?php
// These settings control the import of special phrases from the wiki.
// class/type combinations to exclude
$aTagsBlacklist
= array(
'boundary' => array('administrative'),
'place' => array('house', 'houses'),
);
// If a class is in the white list then all types will
// be ignored except the ones given in the list.
// Also use this list to exclude an entire class from
// special phrases.
$aTagsWhitelist
= array(
'highway' => array('bus_stop', 'rest_area', 'raceway'),
'building' => array(),
);