mirror of
https://github.com/osm-search/Nominatim.git
synced 2026-02-15 02:47:59 +00:00
port address level computation to Python
Also adds simple tests for correct table creation.
This commit is contained in:
@@ -2,13 +2,43 @@ import sys
|
||||
from pathlib import Path
|
||||
|
||||
import psycopg2
|
||||
import psycopg2.extras
|
||||
import pytest
|
||||
|
||||
SRC_DIR = Path(__file__) / '..' / '..' / '..'
|
||||
|
||||
# always test against the source
|
||||
sys.path.insert(0, str((Path(__file__) / '..' / '..' / '..').resolve()))
|
||||
sys.path.insert(0, str(SRC_DIR.resolve()))
|
||||
|
||||
from nominatim.config import Configuration
|
||||
|
||||
class _TestingCursor(psycopg2.extras.DictCursor):
|
||||
""" Extension to the DictCursor class that provides execution
|
||||
short-cuts that simplify writing assertions.
|
||||
"""
|
||||
|
||||
def scalar(self, sql, params=None):
|
||||
""" Execute a query with a single return value and return this value.
|
||||
Raises an assertion when not exactly one row is returned.
|
||||
"""
|
||||
self.execute(sql, params)
|
||||
assert self.rowcount == 1
|
||||
return self.fetchone()[0]
|
||||
|
||||
def row_set(self, sql, params=None):
|
||||
""" Execute a query and return the result as a set of tuples.
|
||||
"""
|
||||
self.execute(sql, params)
|
||||
if self.rowcount == 1:
|
||||
return set(tuple(self.fetchone()))
|
||||
|
||||
return set((tuple(row) for row in self))
|
||||
|
||||
@pytest.fixture
|
||||
def temp_db(monkeypatch):
|
||||
""" Create an empty database for the test. The database name is also
|
||||
exported into NOMINATIM_DATABASE_DSN.
|
||||
"""
|
||||
name = 'test_nominatim_python_unittest'
|
||||
with psycopg2.connect(database='postgres') as conn:
|
||||
conn.set_isolation_level(0)
|
||||
@@ -24,3 +54,29 @@ def temp_db(monkeypatch):
|
||||
conn.set_isolation_level(0)
|
||||
with conn.cursor() as cur:
|
||||
cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def temp_db_conn(temp_db):
|
||||
""" Connection to the test database.
|
||||
"""
|
||||
conn = psycopg2.connect(database=temp_db)
|
||||
yield conn
|
||||
conn.close()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def temp_db_cursor(temp_db):
|
||||
""" Connection and cursor towards the test database. The connection will
|
||||
be in auto-commit mode.
|
||||
"""
|
||||
conn = psycopg2.connect('dbname=' + temp_db)
|
||||
conn.set_isolation_level(0)
|
||||
with conn.cursor(cursor_factory=_TestingCursor) as cur:
|
||||
yield cur
|
||||
conn.close()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def def_config():
|
||||
return Configuration(None, SRC_DIR.resolve() / 'settings')
|
||||
|
||||
@@ -84,10 +84,8 @@ def test_add_data_command(mock_run_legacy, name, oid):
|
||||
(['--boundaries-only'], 1, 0),
|
||||
(['--no-boundaries'], 0, 1),
|
||||
(['--boundaries-only', '--no-boundaries'], 0, 0)])
|
||||
def test_index_command(monkeypatch, temp_db, params, do_bnds, do_ranks):
|
||||
with psycopg2.connect(database=temp_db) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("CREATE TABLE import_status (indexed bool)")
|
||||
def test_index_command(monkeypatch, temp_db_cursor, params, do_bnds, do_ranks):
|
||||
temp_db_cursor.execute("CREATE TABLE import_status (indexed bool)")
|
||||
bnd_mock = MockParamCapture()
|
||||
monkeypatch.setattr(nominatim.indexer.indexer.Indexer, 'index_boundaries', bnd_mock)
|
||||
rank_mock = MockParamCapture()
|
||||
@@ -100,7 +98,6 @@ def test_index_command(monkeypatch, temp_db, params, do_bnds, do_ranks):
|
||||
|
||||
|
||||
@pytest.mark.parametrize("command,params", [
|
||||
('address-levels', ('update.php', '--update-address-levels')),
|
||||
('functions', ('setup.php',)),
|
||||
('wiki-data', ('setup.php', '--import-wikipedia-articles')),
|
||||
('importance', ('update.php', '--recompute-importance')),
|
||||
@@ -116,6 +113,7 @@ def test_refresh_legacy_command(mock_run_legacy, command, params):
|
||||
@pytest.mark.parametrize("command,func", [
|
||||
('postcodes', 'update_postcodes'),
|
||||
('word-counts', 'recompute_word_counts'),
|
||||
('address-levels', 'load_address_levels_from_file'),
|
||||
])
|
||||
def test_refresh_command(monkeypatch, command, func):
|
||||
func_mock = MockParamCapture()
|
||||
|
||||
@@ -6,28 +6,25 @@ import pytest
|
||||
|
||||
import nominatim.db.utils as db_utils
|
||||
|
||||
def test_execute_file_success(temp_db, tmp_path):
|
||||
def test_execute_file_success(temp_db_conn, tmp_path):
|
||||
tmpfile = tmp_path / 'test.sql'
|
||||
tmpfile.write_text('CREATE TABLE test (id INT);\nINSERT INTO test VALUES(56);')
|
||||
|
||||
with psycopg2.connect('dbname=' + temp_db) as conn:
|
||||
db_utils.execute_file(conn, tmpfile)
|
||||
db_utils.execute_file(temp_db_conn, tmpfile)
|
||||
|
||||
with conn.cursor() as cur:
|
||||
cur.execute('SELECT * FROM test')
|
||||
with temp_db_conn.cursor() as cur:
|
||||
cur.execute('SELECT * FROM test')
|
||||
|
||||
assert cur.rowcount == 1
|
||||
assert cur.fetchone()[0] == 56
|
||||
assert cur.rowcount == 1
|
||||
assert cur.fetchone()[0] == 56
|
||||
|
||||
def test_execute_file_bad_file(temp_db, tmp_path):
|
||||
with psycopg2.connect('dbname=' + temp_db) as conn:
|
||||
with pytest.raises(FileNotFoundError):
|
||||
db_utils.execute_file(conn, tmp_path / 'test2.sql')
|
||||
def test_execute_file_bad_file(temp_db_conn, tmp_path):
|
||||
with pytest.raises(FileNotFoundError):
|
||||
db_utils.execute_file(temp_db_conn, tmp_path / 'test2.sql')
|
||||
|
||||
def test_execute_file_bad_sql(temp_db, tmp_path):
|
||||
def test_execute_file_bad_sql(temp_db_conn, tmp_path):
|
||||
tmpfile = tmp_path / 'test.sql'
|
||||
tmpfile.write_text('CREATE STABLE test (id INT)')
|
||||
|
||||
with psycopg2.connect('dbname=' + temp_db) as conn:
|
||||
with pytest.raises(psycopg2.ProgrammingError):
|
||||
db_utils.execute_file(conn, tmpfile)
|
||||
with pytest.raises(psycopg2.ProgrammingError):
|
||||
db_utils.execute_file(temp_db_conn, tmpfile)
|
||||
|
||||
@@ -82,10 +82,8 @@ class IndexerTestDB:
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_db(temp_db):
|
||||
conn = psycopg2.connect(database=temp_db)
|
||||
yield IndexerTestDB(conn)
|
||||
conn.close()
|
||||
def test_db(temp_db_conn):
|
||||
yield IndexerTestDB(temp_db_conn)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("threads", [1, 15])
|
||||
|
||||
@@ -7,7 +7,6 @@ import tempfile
|
||||
|
||||
import pytest
|
||||
|
||||
from nominatim.config import Configuration
|
||||
import nominatim.tools.exec_utils as exec_utils
|
||||
|
||||
@pytest.fixture
|
||||
@@ -18,9 +17,9 @@ def tmp_phplib_dir():
|
||||
yield Path(phpdir)
|
||||
|
||||
@pytest.fixture
|
||||
def nominatim_env(tmp_phplib_dir):
|
||||
def nominatim_env(tmp_phplib_dir, def_config):
|
||||
class _NominatimEnv:
|
||||
config = Configuration(None, Path(__file__) / '..' / '..' / '..' / 'settings')
|
||||
config = def_config
|
||||
phplib_dir = tmp_phplib_dir
|
||||
data_dir = Path('data')
|
||||
project_dir = Path('.')
|
||||
|
||||
85
test/python/test_tools_refresh_address_levels.py
Normal file
85
test/python/test_tools_refresh_address_levels.py
Normal file
@@ -0,0 +1,85 @@
|
||||
"""
|
||||
Tests for function for importing address ranks.
|
||||
"""
|
||||
import json
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
|
||||
from nominatim.tools.refresh import load_address_levels, load_address_levels_from_file
|
||||
|
||||
def test_load_ranks_def_config(temp_db_conn, temp_db_cursor, def_config):
|
||||
load_address_levels_from_file(temp_db_conn, Path(def_config.ADDRESS_LEVEL_CONFIG))
|
||||
|
||||
assert temp_db_cursor.scalar('SELECT count(*) FROM address_levels') > 0
|
||||
|
||||
def test_load_ranks_from_file(temp_db_conn, temp_db_cursor, tmp_path):
|
||||
test_file = tmp_path / 'test_levels.json'
|
||||
test_file.write_text('[{"tags":{"place":{"sea":2}}}]')
|
||||
|
||||
load_address_levels_from_file(temp_db_conn, test_file)
|
||||
|
||||
assert temp_db_cursor.scalar('SELECT count(*) FROM address_levels') > 0
|
||||
|
||||
|
||||
def test_load_ranks_from_broken_file(temp_db_conn, tmp_path):
|
||||
test_file = tmp_path / 'test_levels.json'
|
||||
test_file.write_text('[{"tags":"place":{"sea":2}}}]')
|
||||
|
||||
with pytest.raises(json.decoder.JSONDecodeError):
|
||||
load_address_levels_from_file(temp_db_conn, test_file)
|
||||
|
||||
|
||||
def test_load_ranks_country(temp_db_conn, temp_db_cursor):
|
||||
load_address_levels(temp_db_conn, 'levels',
|
||||
[{"tags": {"place": {"village": 14}}},
|
||||
{"countries": ['de'],
|
||||
"tags": {"place": {"village": 15}}},
|
||||
{"countries": ['uk', 'us' ],
|
||||
"tags": {"place": {"village": 16}}}
|
||||
])
|
||||
|
||||
assert temp_db_cursor.row_set('SELECT * FROM levels') == \
|
||||
set([(None, 'place', 'village', 14, 14),
|
||||
('de', 'place', 'village', 15, 15),
|
||||
('uk', 'place', 'village', 16, 16),
|
||||
('us', 'place', 'village', 16, 16),
|
||||
])
|
||||
|
||||
|
||||
def test_load_ranks_default_value(temp_db_conn, temp_db_cursor):
|
||||
load_address_levels(temp_db_conn, 'levels',
|
||||
[{"tags": {"boundary": {"": 28}}},
|
||||
{"countries": ['hu'],
|
||||
"tags": {"boundary": {"": 29}}}
|
||||
])
|
||||
|
||||
assert temp_db_cursor.row_set('SELECT * FROM levels') == \
|
||||
set([(None, 'boundary', None, 28, 28),
|
||||
('hu', 'boundary', None, 29, 29),
|
||||
])
|
||||
|
||||
|
||||
def test_load_ranks_multiple_keys(temp_db_conn, temp_db_cursor):
|
||||
load_address_levels(temp_db_conn, 'levels',
|
||||
[{"tags":
|
||||
{"place": {"city": 14},
|
||||
"boundary": {"administrative2" : 4}}
|
||||
}])
|
||||
|
||||
assert temp_db_cursor.row_set('SELECT * FROM levels') == \
|
||||
set([(None, 'place', 'city', 14, 14),
|
||||
(None, 'boundary', 'administrative2', 4, 4),
|
||||
])
|
||||
|
||||
|
||||
def test_load_ranks_address(temp_db_conn, temp_db_cursor):
|
||||
load_address_levels(temp_db_conn, 'levels',
|
||||
[{"tags":
|
||||
{"place": {"city": 14,
|
||||
"town" : [14, 13]}}
|
||||
}])
|
||||
|
||||
assert temp_db_cursor.row_set('SELECT * FROM levels') == \
|
||||
set([(None, 'place', 'city', 14, 14),
|
||||
(None, 'place', 'town', 14, 13),
|
||||
])
|
||||
Reference in New Issue
Block a user