Merge pull request #3984 from lonvia/avoid-custom-table-definition-in-tests

Reuse table creation SQL in unittest
This commit is contained in:
Sarah Hoffmann
2026-02-13 09:16:44 +01:00
committed by GitHub
13 changed files with 252 additions and 239 deletions

View File

@@ -38,6 +38,7 @@ class QueryPool:
""" Schedule a query for execution. """ Schedule a query for execution.
""" """
if self.is_cancelled: if self.is_cancelled:
self.clear_queue()
await self.finish() await self.finish()
return return
@@ -47,6 +48,7 @@ class QueryPool:
await asyncio.sleep(0) await asyncio.sleep(0)
if self.is_cancelled: if self.is_cancelled:
self.clear_queue()
await self.finish() await self.finish()
async def finish(self) -> None: async def finish(self) -> None:

View File

@@ -2,7 +2,7 @@
# #
# This file is part of Nominatim. (https://nominatim.org) # This file is part of Nominatim. (https://nominatim.org)
# #
# Copyright (C) 2025 by the Nominatim developer community. # Copyright (C) 2026 by the Nominatim developer community.
# For a full list of authors see the git log. # For a full list of authors see the git log.
import itertools import itertools
import sys import sys
@@ -17,12 +17,11 @@ SRC_DIR = (Path(__file__) / '..' / '..' / '..').resolve()
sys.path.insert(0, str(SRC_DIR / 'src')) sys.path.insert(0, str(SRC_DIR / 'src'))
from nominatim_db.config import Configuration from nominatim_db.config import Configuration
from nominatim_db.db import connection from nominatim_db.db import connection, properties
from nominatim_db.db.sql_preprocessor import SQLPreprocessor from nominatim_db.db.sql_preprocessor import SQLPreprocessor
import nominatim_db.tokenizer.factory import nominatim_db.tokenizer.factory
import dummy_tokenizer import dummy_tokenizer
import mocks
from cursor import CursorForTesting from cursor import CursorForTesting
@@ -132,28 +131,49 @@ def project_env(tmp_path):
@pytest.fixture @pytest.fixture
def property_table(table_factory, temp_db_conn): def country_table(table_factory):
table_factory('nominatim_properties', 'property TEXT, value TEXT') table_factory('country_name', 'partition INT, country_code varchar(2), name hstore')
return mocks.MockPropertyTable(temp_db_conn)
@pytest.fixture @pytest.fixture
def status_table(table_factory): def country_row(country_table, temp_db_cursor):
def _add(partition=None, country=None, names=None):
temp_db_cursor.insert_row('country_name', partition=partition,
country_code=country, name=names)
return _add
@pytest.fixture
def load_sql(temp_db_conn, country_row):
proc = SQLPreprocessor(temp_db_conn, Configuration(None))
def _run(filename, **kwargs):
proc.run_sql_file(temp_db_conn, filename, **kwargs)
return _run
@pytest.fixture
def property_table(load_sql, temp_db_conn):
load_sql('tables/nominatim_properties.sql')
class _PropTable:
def set(self, name, value):
properties.set_property(temp_db_conn, name, value)
def get(self, name):
return properties.get_property(temp_db_conn, name)
return _PropTable()
@pytest.fixture
def status_table(load_sql):
""" Create an empty version of the status table and """ Create an empty version of the status table and
the status logging table. the status logging table.
""" """
table_factory('import_status', load_sql('tables/status.sql')
"""lastimportdate timestamp with time zone NOT NULL,
sequence_id integer,
indexed boolean""")
table_factory('import_osmosis_log',
"""batchend timestamp,
batchseq integer,
batchsize bigint,
starttime timestamp,
endtime timestamp,
event text""")
@pytest.fixture @pytest.fixture
@@ -178,12 +198,14 @@ def place_row(place_table, temp_db_cursor):
prerequisite to the fixture. prerequisite to the fixture.
""" """
idseq = itertools.count(1001) idseq = itertools.count(1001)
def _insert(osm_type='N', osm_id=None, cls='amenity', typ='cafe', names=None, def _insert(osm_type='N', osm_id=None, cls='amenity', typ='cafe', names=None,
admin_level=None, address=None, extratags=None, geom=None): admin_level=None, address=None, extratags=None, geom='POINT(0 0)'):
temp_db_cursor.execute("INSERT INTO place VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)", args = {'osm_type': osm_type, 'osm_id': osm_id or next(idseq),
(osm_id or next(idseq), osm_type, cls, typ, names, 'class': cls, 'type': typ, 'name': names, 'admin_level': admin_level,
admin_level, address, extratags, 'address': address, 'extratags': extratags,
geom or 'SRID=4326;POINT(0 0)')) 'geometry': _with_srid(geom)}
temp_db_cursor.insert_row('place', **args)
return _insert return _insert
@@ -203,50 +225,66 @@ def place_postcode_table(temp_db_with_extensions, table_factory):
@pytest.fixture @pytest.fixture
def place_postcode_row(place_postcode_table, temp_db_cursor): def place_postcode_row(place_postcode_table, temp_db_cursor):
""" A factory for rows in the place table. The table is created as a """ A factory for rows in the place_postcode table. The table is created as a
prerequisite to the fixture. prerequisite to the fixture.
""" """
idseq = itertools.count(5001) idseq = itertools.count(5001)
def _insert(osm_type='N', osm_id=None, postcode=None, country=None, def _insert(osm_type='N', osm_id=None, postcode=None, country=None,
centroid=None, geom=None): centroid='POINT(12.0 4.0)', geom=None):
temp_db_cursor.execute("INSERT INTO place_postcode VALUES (%s, %s, %s, %s, %s, %s)", temp_db_cursor.insert_row('place_postcode',
(osm_type, osm_id or next(idseq), osm_type=osm_type, osm_id=osm_id or next(idseq),
postcode, country, postcode=postcode, country_code=country,
_with_srid(centroid, 'POINT(12.0 4.0)'), centroid=_with_srid(centroid),
_with_srid(geom))) geometry=_with_srid(geom))
return _insert return _insert
@pytest.fixture @pytest.fixture
def placex_table(temp_db_with_extensions, temp_db_conn): def placex_table(temp_db_with_extensions, temp_db_conn, load_sql, place_table):
""" Create an empty version of the place table. """ Create an empty version of the placex table.
""" """
return mocks.MockPlacexTable(temp_db_conn) load_sql('tables/placex.sql')
temp_db_conn.execute("CREATE SEQUENCE IF NOT EXISTS seq_place START 1")
@pytest.fixture @pytest.fixture
def osmline_table(temp_db_with_extensions, table_factory): def placex_row(placex_table, temp_db_cursor):
table_factory('location_property_osmline', """ A factory for rows in the placex table. The table is created as a
"""place_id BIGINT, prerequisite to the fixture.
osm_id BIGINT, """
parent_place_id BIGINT, idseq = itertools.count(1001)
geometry_sector INTEGER,
indexed_date TIMESTAMP, def _add(osm_type='N', osm_id=None, cls='amenity', typ='cafe', names=None,
startnumber INTEGER, admin_level=None, address=None, extratags=None, geom='POINT(10 4)',
endnumber INTEGER, country=None, housenumber=None, rank_search=30, rank_address=30,
partition SMALLINT, centroid='POINT(10 4)', indexed_status=0, indexed_date=None):
indexed_status SMALLINT, args = {'place_id': pysql.SQL("nextval('seq_place')"),
linegeo GEOMETRY, 'osm_type': osm_type, 'osm_id': osm_id or next(idseq),
interpolationtype TEXT, 'class': cls, 'type': typ, 'name': names, 'admin_level': admin_level,
address HSTORE, 'address': address, 'housenumber': housenumber,
postcode TEXT, 'rank_search': rank_search, 'rank_address': rank_address,
country_code VARCHAR(2)""") 'extratags': extratags,
'centroid': _with_srid(centroid), 'geometry': _with_srid(geom),
'country_code': country,
'indexed_status': indexed_status, 'indexed_date': indexed_date,
'partition': pysql.Literal(0), 'geometry_sector': pysql.Literal(1)}
return temp_db_cursor.insert_row('placex', **args)
return _add
@pytest.fixture @pytest.fixture
def sql_preprocessor_cfg(tmp_path, table_factory, temp_db_with_extensions): def osmline_table(temp_db_with_extensions, load_sql):
table_factory('country_name', 'partition INT', ((0, ), (1, ), (2, ))) load_sql('tables/interpolation.sql')
@pytest.fixture
def sql_preprocessor_cfg(tmp_path, table_factory, temp_db_with_extensions, country_row):
for part in range(3):
country_row(partition=part)
cfg = Configuration(None) cfg = Configuration(None)
cfg.set_libdirs(sql=tmp_path) cfg.set_libdirs(sql=tmp_path)
return cfg return cfg

View File

@@ -2,12 +2,13 @@
# #
# This file is part of Nominatim. (https://nominatim.org) # This file is part of Nominatim. (https://nominatim.org)
# #
# Copyright (C) 2025 by the Nominatim developer community. # Copyright (C) 2026 by the Nominatim developer community.
# For a full list of authors see the git log. # For a full list of authors see the git log.
""" """
Specialised psycopg cursor with shortcut functions useful for testing. Specialised psycopg cursor with shortcut functions useful for testing.
""" """
import psycopg import psycopg
from psycopg import sql as pysql
class CursorForTesting(psycopg.Cursor): class CursorForTesting(psycopg.Cursor):
@@ -52,7 +53,49 @@ class CursorForTesting(psycopg.Cursor):
def table_rows(self, table, where=None): def table_rows(self, table, where=None):
""" Return the number of rows in the given table. """ Return the number of rows in the given table.
""" """
if where is None: sql = pysql.SQL('SELECT count(*) FROM') + pysql.Identifier(table)
return self.scalar('SELECT count(*) FROM ' + table) if where is not None:
sql += pysql.SQL('WHERE') + pysql.SQL(where)
return self.scalar('SELECT count(*) FROM {} WHERE {}'.format(table, where)) return self.scalar(sql)
def insert_row(self, table, **data):
""" Insert a row into the given table.
'data' is a dictionary of column names and associated values.
When the value is a pysql.Literal or pysql.SQL, then the expression
will be inserted as is instead of loading the value. When the
value is a tuple, then the first element will be added as an
SQL expression for the value and the second element is treated
as the actual value to insert. The SQL expression must contain
a %s placeholder in that case.
If data contains a 'place_id' column, then the value of the
place_id column after insert is returned. Otherwise the function
returns nothing.
"""
columns = []
placeholders = []
values = []
for k, v in data.items():
columns.append(pysql.Identifier(k))
if isinstance(v, tuple):
placeholders.append(pysql.SQL(v[0]))
values.append(v[1])
elif isinstance(v, (pysql.Literal, pysql.SQL)):
placeholders.append(v)
else:
placeholders.append(pysql.Placeholder())
values.append(v)
sql = pysql.SQL("INSERT INTO {table} ({columns}) VALUES({values})")\
.format(table=pysql.Identifier(table),
columns=pysql.SQL(',').join(columns),
values=pysql.SQL(',').join(placeholders))
if 'place_id' in data:
sql += pysql.SQL('RETURNING place_id')
self.execute(sql, values)
return self.fetchone()[0] if 'place_id' in data else None

View File

@@ -53,11 +53,10 @@ def test_setup_country_tables(src_dir, temp_db_with_extensions, dsn, temp_db_cur
@pytest.mark.parametrize("languages", (None, ['fr', 'en'])) @pytest.mark.parametrize("languages", (None, ['fr', 'en']))
def test_create_country_names(temp_db_with_extensions, temp_db_conn, temp_db_cursor, def test_create_country_names(temp_db_with_extensions, temp_db_conn, temp_db_cursor,
table_factory, tokenizer_mock, languages, loaded_country): country_row, tokenizer_mock, languages, loaded_country):
temp_db_cursor.execute('TRUNCATE country_name')
table_factory('country_name', 'country_code varchar(2), name hstore', country_row(country='us', names={"name": "us1", "name:af": "us2"})
content=(('us', '"name"=>"us1","name:af"=>"us2"'), country_row(country='fr', names={"name": "Fra", "name:en": "Fren"})
('fr', '"name"=>"Fra", "name:en"=>"Fren"')))
assert temp_db_cursor.scalar("SELECT count(*) FROM country_name") == 2 assert temp_db_cursor.scalar("SELECT count(*) FROM country_name") == 2

View File

@@ -1,85 +0,0 @@
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2025 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Custom mocks for testing.
"""
import itertools
from nominatim_db.db import properties
class MockPlacexTable:
""" A placex table for testing.
"""
def __init__(self, conn):
self.idseq = itertools.count(10000)
self.conn = conn
with conn.cursor() as cur:
cur.execute("""CREATE TABLE placex (
place_id BIGINT,
parent_place_id BIGINT,
linked_place_id BIGINT,
importance FLOAT,
indexed_date TIMESTAMP,
geometry_sector INTEGER,
rank_address SMALLINT,
rank_search SMALLINT,
partition SMALLINT,
indexed_status SMALLINT,
osm_id int8,
osm_type char(1),
class text,
type text,
name hstore,
admin_level smallint,
address hstore,
extratags hstore,
token_info jsonb,
geometry Geometry(Geometry,4326),
wikipedia TEXT,
country_code varchar(2),
housenumber TEXT,
postcode TEXT,
centroid GEOMETRY(Geometry, 4326))""")
cur.execute("CREATE SEQUENCE IF NOT EXISTS seq_place")
conn.commit()
def add(self, osm_type='N', osm_id=None, cls='amenity', typ='cafe', names=None,
admin_level=None, address=None, extratags=None, geom='POINT(10 4)',
country=None, housenumber=None, rank_search=30, centroid=None):
with self.conn.cursor() as cur:
cur.execute("""INSERT INTO placex (place_id, osm_type, osm_id, class,
type, name, admin_level, address,
housenumber, rank_search,
extratags, centroid, geometry, country_code)
VALUES(nextval('seq_place'), %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s)
RETURNING place_id""",
(osm_type, osm_id or next(self.idseq), cls, typ, names,
admin_level, address, housenumber, rank_search,
extratags, centroid, 'SRID=4326;' + geom,
country))
place_id = cur.fetchone()[0]
self.conn.commit()
return place_id
class MockPropertyTable:
""" A property table for testing.
"""
def __init__(self, conn):
self.conn = conn
def set(self, name, value):
""" Set a property in the table to the given value.
"""
properties.set_property(self.conn, name, value)
def get(self, name):
""" Set a property in the table to the given value.
"""
return properties.get_property(self.conn, name)

View File

@@ -2,7 +2,7 @@
# #
# This file is part of Nominatim. (https://nominatim.org) # This file is part of Nominatim. (https://nominatim.org)
# #
# Copyright (C) 2025 by the Nominatim developer community. # Copyright (C) 2026 by the Nominatim developer community.
# For a full list of authors see the git log. # For a full list of authors see the git log.
""" """
Tests for ICU tokenizer. Tests for ICU tokenizer.
@@ -15,7 +15,6 @@ import pytest
from nominatim_db.tokenizer import icu_tokenizer from nominatim_db.tokenizer import icu_tokenizer
import nominatim_db.tokenizer.icu_rule_loader import nominatim_db.tokenizer.icu_rule_loader
from nominatim_db.db import properties from nominatim_db.db import properties
from nominatim_db.db.sql_preprocessor import SQLPreprocessor
from nominatim_db.data.place_info import PlaceInfo from nominatim_db.data.place_info import PlaceInfo
from mock_icu_word_table import MockIcuWordTable from mock_icu_word_table import MockIcuWordTable
@@ -90,13 +89,9 @@ def analyzer(tokenizer_factory, test_config, monkeypatch,
@pytest.fixture @pytest.fixture
def sql_functions(temp_db_conn, def_config, src_dir): def sql_functions(load_sql):
orig_sql = def_config.lib_dir.sql load_sql('functions/utils.sql')
def_config.lib_dir.sql = src_dir / 'lib-sql' load_sql('tokenizer/icu_tokenizer.sql')
sqlproc = SQLPreprocessor(temp_db_conn, def_config)
sqlproc.run_sql_file(temp_db_conn, 'functions/utils.sql')
sqlproc.run_sql_file(temp_db_conn, 'tokenizer/icu_tokenizer.sql')
def_config.lib_dir.sql = orig_sql
@pytest.fixture @pytest.fixture
@@ -653,22 +648,21 @@ class TestUpdateWordTokens:
self.tok.update_word_tokens() self.tok.update_word_tokens()
assert word_table.count_housenumbers() == 1 assert word_table.count_housenumbers() == 1
def test_keep_housenumbers_from_placex_table(self, add_housenumber, word_table, def test_keep_housenumbers_from_placex_table(self, add_housenumber, word_table, placex_row):
placex_table):
add_housenumber(9999, '5432a') add_housenumber(9999, '5432a')
add_housenumber(9990, '34z') add_housenumber(9990, '34z')
placex_table.add(housenumber='34z') placex_row(housenumber='34z')
placex_table.add(housenumber='25432a') placex_row(housenumber='25432a')
assert word_table.count_housenumbers() == 2 assert word_table.count_housenumbers() == 2
self.tok.update_word_tokens() self.tok.update_word_tokens()
assert word_table.count_housenumbers() == 1 assert word_table.count_housenumbers() == 1
def test_keep_housenumbers_from_placex_table_hnr_list(self, add_housenumber, def test_keep_housenumbers_from_placex_table_hnr_list(self, add_housenumber,
word_table, placex_table): word_table, placex_row):
add_housenumber(9991, '9 b') add_housenumber(9991, '9 b')
add_housenumber(9990, '34z') add_housenumber(9990, '34z')
placex_table.add(housenumber='9 a;9 b;9 c') placex_row(housenumber='9 a;9 b;9 c')
assert word_table.count_housenumbers() == 2 assert word_table.count_housenumbers() == 2
self.tok.update_word_tokens() self.tok.update_word_tokens()

View File

@@ -2,17 +2,17 @@
# #
# This file is part of Nominatim. (https://nominatim.org) # This file is part of Nominatim. (https://nominatim.org)
# #
# Copyright (C) 2025 by the Nominatim developer community. # Copyright (C) 2026 by the Nominatim developer community.
# For a full list of authors see the git log. # For a full list of authors see the git log.
""" """
Tests for maintenance and analysis functions. Tests for maintenance and analysis functions.
""" """
import pytest import pytest
import datetime as dt
from nominatim_db.errors import UsageError from nominatim_db.errors import UsageError
from nominatim_db.tools import admin from nominatim_db.tools import admin
from nominatim_db.tokenizer import factory from nominatim_db.tokenizer import factory
from nominatim_db.db.sql_preprocessor import SQLPreprocessor
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
@@ -61,15 +61,14 @@ def test_analyse_indexing_unknown_osmid(project_env):
admin.analyse_indexing(project_env, osm_id='W12345674') admin.analyse_indexing(project_env, osm_id='W12345674')
def test_analyse_indexing_with_place_id(project_env, temp_db_cursor): def test_analyse_indexing_with_place_id(project_env, placex_row):
temp_db_cursor.execute("INSERT INTO placex (place_id) VALUES(12345)") place_id = placex_row()
admin.analyse_indexing(project_env, place_id=12345) admin.analyse_indexing(project_env, place_id=place_id)
def test_analyse_indexing_with_osm_id(project_env, temp_db_cursor): def test_analyse_indexing_with_osm_id(project_env, placex_row):
temp_db_cursor.execute("""INSERT INTO placex (place_id, osm_type, osm_id) placex_row(osm_type='N', osm_id=10000)
VALUES(9988, 'N', 10000)""")
admin.analyse_indexing(project_env, osm_id='N10000') admin.analyse_indexing(project_env, osm_id='N10000')
@@ -77,8 +76,8 @@ def test_analyse_indexing_with_osm_id(project_env, temp_db_cursor):
class TestAdminCleanDeleted: class TestAdminCleanDeleted:
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def setup_polygon_delete(self, project_env, table_factory, place_table, def setup_polygon_delete(self, project_env, table_factory, place_table, placex_row,
osmline_table, temp_db_cursor, temp_db_conn, def_config, src_dir): osmline_table, temp_db_cursor, load_sql):
""" Set up place_force_delete function and related tables """ Set up place_force_delete function and related tables
""" """
self.project_env = project_env self.project_env = project_env
@@ -91,12 +90,15 @@ class TestAdminCleanDeleted:
((100, 'N', 'boundary', 'administrative'), ((100, 'N', 'boundary', 'administrative'),
(145, 'N', 'boundary', 'administrative'), (145, 'N', 'boundary', 'administrative'),
(175, 'R', 'landcover', 'grass'))) (175, 'R', 'landcover', 'grass')))
temp_db_cursor.execute("""
INSERT INTO placex (place_id, osm_id, osm_type, class, type, now = dt.datetime.now()
indexed_date, indexed_status) placex_row(osm_type='N', osm_id=100, cls='boundary', typ='administrative',
VALUES(1, 100, 'N', 'boundary', 'administrative', current_date - INTERVAL '1 month', 1), indexed_status=1, indexed_date=now - dt.timedelta(days=30))
(2, 145, 'N', 'boundary', 'administrative', current_date - INTERVAL '3 month', 1), placex_row(osm_type='N', osm_id=145, cls='boundary', typ='administrative',
(3, 175, 'R', 'landcover', 'grass', current_date - INTERVAL '3 months', 1)""") indexed_status=1, indexed_date=now - dt.timedelta(days=90))
placex_row(osm_type='R', osm_id=175, cls='landcover', typ='grass',
indexed_status=1, indexed_date=now - dt.timedelta(days=90))
# set up tables and triggers for utils function # set up tables and triggers for utils function
table_factory('place_to_be_deleted', table_factory('place_to_be_deleted',
"""osm_id BIGINT, """osm_id BIGINT,
@@ -104,7 +106,6 @@ class TestAdminCleanDeleted:
class TEXT NOT NULL, class TEXT NOT NULL,
type TEXT NOT NULL, type TEXT NOT NULL,
deferred BOOLEAN""") deferred BOOLEAN""")
table_factory('country_name', 'partition INT')
table_factory('import_polygon_error', """osm_id BIGINT, table_factory('import_polygon_error', """osm_id BIGINT,
osm_type CHAR(1), osm_type CHAR(1),
class TEXT NOT NULL, class TEXT NOT NULL,
@@ -115,11 +116,7 @@ class TestAdminCleanDeleted:
$$ LANGUAGE plpgsql;""") $$ LANGUAGE plpgsql;""")
temp_db_cursor.execute("""CREATE TRIGGER place_before_delete BEFORE DELETE ON place temp_db_cursor.execute("""CREATE TRIGGER place_before_delete BEFORE DELETE ON place
FOR EACH ROW EXECUTE PROCEDURE place_delete();""") FOR EACH ROW EXECUTE PROCEDURE place_delete();""")
orig_sql = def_config.lib_dir.sql load_sql('functions/utils.sql')
def_config.lib_dir.sql = src_dir / 'lib-sql'
sqlproc = SQLPreprocessor(temp_db_conn, def_config)
sqlproc.run_sql_file(temp_db_conn, 'functions/utils.sql')
def_config.lib_dir.sql = orig_sql
def test_admin_clean_deleted_no_records(self): def test_admin_clean_deleted_no_records(self):
admin.clean_deleted_relations(self.project_env, age='1 year') admin.clean_deleted_relations(self.project_env, age='1 year')

View File

@@ -170,14 +170,41 @@ def test_truncate_database_tables(temp_db_conn, temp_db_cursor, table_factory, w
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_load_data(dsn, place_row, placex_table, osmline_table, async def test_load_data(dsn, place_row, placex_table, osmline_table,
temp_db_cursor, threads): temp_db_cursor, threads):
for func in ('precompute_words', 'getorcreate_housenumber_id', 'make_standard_name'):
temp_db_cursor.execute(pysql.SQL("""CREATE FUNCTION {} (src TEXT)
RETURNS TEXT AS $$ SELECT 'a'::TEXT $$ LANGUAGE SQL
""").format(pysql.Identifier(func)))
for oid in range(100, 130): for oid in range(100, 130):
place_row(osm_id=oid) place_row(osm_id=oid)
place_row(osm_type='W', osm_id=342, cls='place', typ='houses', place_row(osm_type='W', osm_id=342, cls='place', typ='houses',
geom='SRID=4326;LINESTRING(0 0, 10 10)') geom='LINESTRING(0 0, 10 10)')
temp_db_cursor.execute("""
CREATE OR REPLACE FUNCTION placex_insert() RETURNS TRIGGER AS $$
BEGIN
NEW.place_id := nextval('seq_place');
NEW.indexed_status := 1;
NEW.centroid := ST_Centroid(NEW.geometry);
NEW.partition := 0;
NEW.geometry_sector := 2424;
NEW.rank_address := 30;
NEW.rank_search := 30;
RETURN NEW;
END; $$ LANGUAGE plpgsql STABLE PARALLEL SAFE;
CREATE OR REPLACE FUNCTION osmline_insert() RETURNS TRIGGER AS $$
BEGIN
NEW.place_id := nextval('seq_place');
IF NEW.indexed_status IS NULL THEN
NEW.indexed_status := 1;
NEW.partition := 0;
NEW.geometry_sector := 2424;
END IF;
RETURN NEW;
END; $$ LANGUAGE plpgsql STABLE PARALLEL SAFE;
CREATE TRIGGER placex_before_insert BEFORE INSERT ON placex
FOR EACH ROW EXECUTE PROCEDURE placex_insert();
CREATE TRIGGER osmline_before_insert BEFORE INSERT ON location_property_osmline
FOR EACH ROW EXECUTE PROCEDURE osmline_insert();
""")
await database_import.load_data(dsn, threads) await database_import.load_data(dsn, threads)

View File

@@ -2,7 +2,7 @@
# #
# This file is part of Nominatim. (https://nominatim.org) # This file is part of Nominatim. (https://nominatim.org)
# #
# Copyright (C) 2025 by the Nominatim developer community. # Copyright (C) 2026 by the Nominatim developer community.
# For a full list of authors see the git log. # For a full list of authors see the git log.
""" """
Tests for import special phrases methods Tests for import special phrases methods
@@ -125,9 +125,8 @@ def test_grant_access_to_web_user(temp_db_conn, temp_db_cursor, table_factory,
phrase_class, phrase_type) phrase_class, phrase_type)
def test_create_place_classtype_table_and_indexes( def test_create_place_classtype_table_and_indexes(temp_db_cursor, def_config, placex_row,
temp_db_cursor, def_config, placex_table, sp_importer, temp_db_conn, monkeypatch):
sp_importer, temp_db_conn, monkeypatch):
""" """
Test that _create_place_classtype_table_and_indexes() Test that _create_place_classtype_table_and_indexes()
create the right place_classtype tables and place_id indexes create the right place_classtype tables and place_id indexes
@@ -136,7 +135,7 @@ def test_create_place_classtype_table_and_indexes(
""" """
pairs = set([('class1', 'type1'), ('class2', 'type2')]) pairs = set([('class1', 'type1'), ('class2', 'type2')])
for pair in pairs: for pair in pairs:
placex_table.add(cls=pair[0], typ=pair[1]) # adding to db placex_row(cls=pair[0], typ=pair[1]) # adding to db
sp_importer._create_classtype_table_and_indexes(pairs) sp_importer._create_classtype_table_and_indexes(pairs)
temp_db_conn.commit() temp_db_conn.commit()
@@ -178,7 +177,7 @@ def test_remove_non_existent_tables_from_db(sp_importer, default_phrases,
@pytest.mark.parametrize("should_replace", [(True), (False)]) @pytest.mark.parametrize("should_replace", [(True), (False)])
def test_import_phrases(monkeypatch, temp_db_cursor, def_config, sp_importer, def test_import_phrases(monkeypatch, temp_db_cursor, def_config, sp_importer,
placex_table, table_factory, tokenizer_mock, placex_row, table_factory, tokenizer_mock,
xml_wiki_content, should_replace): xml_wiki_content, should_replace):
""" """
Check that the main import_phrases() method is well executed. Check that the main import_phrases() method is well executed.
@@ -199,8 +198,8 @@ def test_import_phrases(monkeypatch, temp_db_cursor, def_config, sp_importer,
type_test = 'zip_line' type_test = 'zip_line'
tokenizer = tokenizer_mock() tokenizer = tokenizer_mock()
placex_table.add(cls=class_test, typ=type_test) # in db for special phrase filtering placex_row(cls=class_test, typ=type_test) # in db for special phrase filtering
placex_table.add(cls='amenity', typ='animal_shelter') # in db for special phrase filtering placex_row(cls='amenity', typ='animal_shelter') # in db for special phrase filtering
sp_importer.import_phrases(tokenizer, should_replace) sp_importer.import_phrases(tokenizer, should_replace)
assert len(tokenizer.analyser_cache['special_phrases']) == 19 assert len(tokenizer.analyser_cache['special_phrases']) == 19
@@ -257,7 +256,7 @@ def check_placeid_and_centroid_indexes(temp_db_cursor, phrase_class, phrase_type
@pytest.mark.parametrize("should_replace", [(True), (False)]) @pytest.mark.parametrize("should_replace", [(True), (False)])
def test_import_phrases_special_phrase_filtering(monkeypatch, temp_db_cursor, def_config, def test_import_phrases_special_phrase_filtering(monkeypatch, temp_db_cursor, def_config,
sp_importer, placex_table, tokenizer_mock, sp_importer, placex_row, tokenizer_mock,
xml_wiki_content, should_replace): xml_wiki_content, should_replace):
monkeypatch.setattr('nominatim_db.tools.special_phrases.sp_wiki_loader._get_wiki_content', monkeypatch.setattr('nominatim_db.tools.special_phrases.sp_wiki_loader._get_wiki_content',
@@ -266,7 +265,7 @@ def test_import_phrases_special_phrase_filtering(monkeypatch, temp_db_cursor, de
class_test = 'aerialway' class_test = 'aerialway'
type_test = 'zip_line' type_test = 'zip_line'
placex_table.add(cls=class_test, typ=type_test) # add to the database to make valid placex_row(cls=class_test, typ=type_test) # add to the database to make valid
tokenizer = tokenizer_mock() tokenizer = tokenizer_mock()
sp_importer.import_phrases(tokenizer, should_replace) sp_importer.import_phrases(tokenizer, should_replace)
@@ -276,11 +275,11 @@ def test_import_phrases_special_phrase_filtering(monkeypatch, temp_db_cursor, de
assert check_grant_access(temp_db_cursor, def_config.DATABASE_WEBUSER, class_test, type_test) assert check_grant_access(temp_db_cursor, def_config.DATABASE_WEBUSER, class_test, type_test)
def test_get_classtype_pairs_directly(placex_table, temp_db_conn, sp_importer): def test_get_classtype_pairs_directly(placex_row, temp_db_conn, sp_importer):
for _ in range(101): for _ in range(101):
placex_table.add(cls='highway', typ='residential') placex_row(cls='highway', typ='residential')
for _ in range(99): for _ in range(99):
placex_table.add(cls='amenity', typ='toilet') placex_row(cls='amenity', typ='toilet')
temp_db_conn.commit() temp_db_conn.commit()

View File

@@ -2,7 +2,7 @@
# #
# This file is part of Nominatim. (https://nominatim.org) # This file is part of Nominatim. (https://nominatim.org)
# #
# Copyright (C) 2025 by the Nominatim developer community. # Copyright (C) 2026 by the Nominatim developer community.
# For a full list of authors see the git log. # For a full list of authors see the git log.
""" """
Tests for functions to maintain the artificial postcode table. Tests for functions to maintain the artificial postcode table.
@@ -75,20 +75,18 @@ class MockPostcodeTable:
@pytest.fixture @pytest.fixture
def postcode_table(def_config, temp_db_conn, placex_table, table_factory): def postcode_table(def_config, temp_db_conn, placex_table, table_factory):
country_info.setup_country_config(def_config) country_info.setup_country_config(def_config)
table_factory('country_name', 'partition INT', ((0, ), (1, ), (2, )))
return MockPostcodeTable(temp_db_conn, def_config) return MockPostcodeTable(temp_db_conn, def_config)
@pytest.fixture @pytest.fixture
def insert_implicit_postcode(placex_table, place_postcode_row): def insert_implicit_postcode(placex_row, place_postcode_row):
""" Insert data into the placex and place table """ Insert data into the placex and place table
which can then be used to compute one postcode. which can then be used to compute one postcode.
""" """
def _insert_implicit_postcode(osm_id, country, geometry, postcode, in_placex=False): def _insert_implicit_postcode(osm_id, country, geometry, postcode, in_placex=False):
if in_placex: if in_placex:
placex_table.add(osm_id=osm_id, country=country, geom=geometry, placex_row(osm_id=osm_id, country=country, geom=geometry,
centroid=f'SRID=4326;{geometry}', centroid=geometry, address={'postcode': postcode})
address={'postcode': postcode})
else: else:
place_postcode_row(osm_id=osm_id, centroid=geometry, place_postcode_row(osm_id=osm_id, centroid=geometry,
country=country, postcode=postcode) country=country, postcode=postcode)

View File

@@ -42,8 +42,8 @@ def test_refresh_import_wikipedia(dsn, src_dir, table_factory, temp_db_cursor, r
@pytest.mark.parametrize('osm_type', ('N', 'W', 'R')) @pytest.mark.parametrize('osm_type', ('N', 'W', 'R'))
def test_invalidate_osm_object_simple(placex_table, osm_type, temp_db_conn, temp_db_cursor): def test_invalidate_osm_object_simple(placex_row, osm_type, temp_db_conn, temp_db_cursor):
placex_table.add(osm_type=osm_type, osm_id=57283) placex_row(osm_type=osm_type, osm_id=57283)
refresh.invalidate_osm_object(osm_type, 57283, temp_db_conn, recursive=False) refresh.invalidate_osm_object(osm_type, 57283, temp_db_conn, recursive=False)
temp_db_conn.commit() temp_db_conn.commit()
@@ -53,8 +53,8 @@ def test_invalidate_osm_object_simple(placex_table, osm_type, temp_db_conn, temp
(osm_type, 57283)) (osm_type, 57283))
def test_invalidate_osm_object_nonexisting_simple(placex_table, temp_db_conn, temp_db_cursor): def test_invalidate_osm_object_nonexisting_simple(placex_row, temp_db_conn, temp_db_cursor):
placex_table.add(osm_type='W', osm_id=57283) placex_row(osm_type='W', osm_id=57283)
refresh.invalidate_osm_object('N', 57283, temp_db_conn, recursive=False) refresh.invalidate_osm_object('N', 57283, temp_db_conn, recursive=False)
temp_db_conn.commit() temp_db_conn.commit()
@@ -64,8 +64,8 @@ def test_invalidate_osm_object_nonexisting_simple(placex_table, temp_db_conn, te
@pytest.mark.parametrize('osm_type', ('N', 'W', 'R')) @pytest.mark.parametrize('osm_type', ('N', 'W', 'R'))
def test_invalidate_osm_object_recursive(placex_table, osm_type, temp_db_conn, temp_db_cursor): def test_invalidate_osm_object_recursive(placex_row, osm_type, temp_db_conn, temp_db_cursor):
placex_table.add(osm_type=osm_type, osm_id=57283) placex_row(osm_type=osm_type, osm_id=57283)
temp_db_cursor.execute("""CREATE OR REPLACE FUNCTION place_force_update(placeid BIGINT) temp_db_cursor.execute("""CREATE OR REPLACE FUNCTION place_force_update(placeid BIGINT)
RETURNS BOOLEAN AS $$ RETURNS BOOLEAN AS $$

View File

@@ -2,7 +2,7 @@
# #
# This file is part of Nominatim. (https://nominatim.org) # This file is part of Nominatim. (https://nominatim.org)
# #
# Copyright (C) 2025 by the Nominatim developer community. # Copyright (C) 2026 by the Nominatim developer community.
# For a full list of authors see the git log. # For a full list of authors see the git log.
""" """
Tests for correctly assigning wikipedia pages to places. Tests for correctly assigning wikipedia pages to places.
@@ -38,7 +38,7 @@ def wiki_csv(tmp_path, sql_preprocessor):
{'wikipedia': 'en:Test'}, {'wikipedia': 'en:Test'},
{'wikidata': 'Q123'}]) {'wikidata': 'Q123'}])
def test_wikipedia(dsn, temp_db_conn, temp_db_cursor, table_factory, def test_wikipedia(dsn, temp_db_conn, temp_db_cursor, table_factory,
def_config, wiki_csv, placex_table, extra): def_config, wiki_csv, placex_row, extra):
import_wikipedia_articles(dsn, wiki_csv([('en', 'Test', 0.3, 'Q123')])) import_wikipedia_articles(dsn, wiki_csv([('en', 'Test', 0.3, 'Q123')]))
create_functions(temp_db_conn, def_config) create_functions(temp_db_conn, def_config)
@@ -46,7 +46,7 @@ def test_wikipedia(dsn, temp_db_conn, temp_db_cursor, table_factory,
'SELECT language, title, importance, wikidata FROM wikimedia_importance') 'SELECT language, title, importance, wikidata FROM wikimedia_importance')
assert content == set([('en', 'Test', 0.3, 'Q123')]) assert content == set([('en', 'Test', 0.3, 'Q123')])
place_id = placex_table.add(osm_id=12, extratags=extra) place_id = placex_row(osm_id=12, extratags=extra)
table_factory('search_name', table_factory('search_name',
'place_id BIGINT, importance FLOAT', 'place_id BIGINT, importance FLOAT',
[(place_id, 0.2)]) [(place_id, 0.2)])
@@ -61,11 +61,11 @@ def test_wikipedia(dsn, temp_db_conn, temp_db_cursor, table_factory,
def test_wikipedia_no_match(dsn, temp_db_conn, temp_db_cursor, def_config, wiki_csv, def test_wikipedia_no_match(dsn, temp_db_conn, temp_db_cursor, def_config, wiki_csv,
placex_table, table_factory): placex_row, table_factory):
import_wikipedia_articles(dsn, wiki_csv([('de', 'Test', 0.3, 'Q123')])) import_wikipedia_articles(dsn, wiki_csv([('de', 'Test', 0.3, 'Q123')]))
create_functions(temp_db_conn, def_config) create_functions(temp_db_conn, def_config)
place_id = placex_table.add(osm_id=12, extratags={'wikipedia': 'en:Test'}, rank_search=10) place_id = placex_row(osm_id=12, extratags={'wikipedia': 'en:Test'}, rank_search=10)
table_factory('search_name', table_factory('search_name',
'place_id BIGINT, importance FLOAT', 'place_id BIGINT, importance FLOAT',
[(place_id, 0.2)]) [(place_id, 0.2)])

View File

@@ -1,69 +1,70 @@
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2026 by the Nominatim developer community.
# For a full list of authors see the git log.
from nominatim_db.tools.special_phrases.sp_importer import SPImporter from nominatim_db.tools.special_phrases.sp_importer import SPImporter
# Testing Database Class Pair Retrival using Conftest.py and placex # Testing Database Class Pair Retrival using Conftest.py and placex
def test_get_classtype_pair_data(placex_table, def_config, temp_db_conn): def test_get_classtype_pair_data(placex_row, def_config, temp_db_conn):
for _ in range(100): for _ in range(100):
placex_table.add(cls='highway', typ='motorway') # edge case 100 placex_row(cls='highway', typ='motorway') # edge case 100
for _ in range(99): for _ in range(99):
placex_table.add(cls='amenity', typ='prison') # edge case 99 placex_row(cls='amenity', typ='prison') # edge case 99
for _ in range(150): for _ in range(150):
placex_table.add(cls='tourism', typ='hotel') placex_row(cls='tourism', typ='hotel')
importer = SPImporter(config=def_config, conn=temp_db_conn, sp_loader=None) importer = SPImporter(config=def_config, conn=temp_db_conn, sp_loader=None)
result = importer.get_classtype_pairs(min=100) result = importer.get_classtype_pairs(min=100)
expected = { assert result == {
("highway", "motorway"), ("highway", "motorway"),
("tourism", "hotel") ("tourism", "hotel")
} }
assert result == expected, f"Expected {expected}, got {result}"
def test_get_classtype_pair_data_more(placex_row, def_config, temp_db_conn):
def test_get_classtype_pair_data_more(placex_table, def_config, temp_db_conn):
for _ in range(99): for _ in range(99):
placex_table.add(cls='emergency', typ='firehydrant') # edge case 99, not included placex_row(cls='emergency', typ='firehydrant') # edge case 99, not included
for _ in range(199): for _ in range(199):
placex_table.add(cls='amenity', typ='prison') placex_row(cls='amenity', typ='prison')
for _ in range(3478): for _ in range(3478):
placex_table.add(cls='tourism', typ='hotel') placex_row(cls='tourism', typ='hotel')
importer = SPImporter(config=def_config, conn=temp_db_conn, sp_loader=None) importer = SPImporter(config=def_config, conn=temp_db_conn, sp_loader=None)
result = importer.get_classtype_pairs(min=100) result = importer.get_classtype_pairs(min=100)
expected = { assert result == {
("amenity", "prison"), ("amenity", "prison"),
("tourism", "hotel") ("tourism", "hotel")
} }
assert result == expected, f"Expected {expected}, got {result}"
def test_get_classtype_pair_data_default(placex_row, def_config, temp_db_conn):
def test_get_classtype_pair_data_default(placex_table, def_config, temp_db_conn):
for _ in range(1): for _ in range(1):
placex_table.add(cls='emergency', typ='firehydrant') placex_row(cls='emergency', typ='firehydrant')
for _ in range(199): for _ in range(199):
placex_table.add(cls='amenity', typ='prison') placex_row(cls='amenity', typ='prison')
for _ in range(3478): for _ in range(3478):
placex_table.add(cls='tourism', typ='hotel') placex_row(cls='tourism', typ='hotel')
importer = SPImporter(config=def_config, conn=temp_db_conn, sp_loader=None) importer = SPImporter(config=def_config, conn=temp_db_conn, sp_loader=None)
result = importer.get_classtype_pairs() result = importer.get_classtype_pairs()
expected = { assert result == {
("amenity", "prison"), ("amenity", "prison"),
("tourism", "hotel"), ("tourism", "hotel"),
("emergency", "firehydrant") ("emergency", "firehydrant")
} }
assert result == expected, f"Expected {expected}, got {result}"