mirror of
https://github.com/osm-search/Nominatim.git
synced 2026-03-11 05:14:07 +00:00
cli indexer tests need a fake database
The Indexer constructor opens a connection to the given database.
This commit is contained in:
@@ -1,6 +1,26 @@
|
|||||||
import sys
|
import sys
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
|
import psycopg2
|
||||||
|
import pytest
|
||||||
|
|
||||||
# always test against the source
|
# always test against the source
|
||||||
sys.path.insert(0, str((Path(__file__) / '..' / '..' / '..').resolve()))
|
sys.path.insert(0, str((Path(__file__) / '..' / '..' / '..').resolve()))
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def temp_db(monkeypatch):
|
||||||
|
name = 'test_nominatim_python_unittest'
|
||||||
|
with psycopg2.connect(database='postgres') as conn:
|
||||||
|
conn.set_isolation_level(0)
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
|
||||||
|
cur.execute('CREATE DATABASE {}'.format(name))
|
||||||
|
|
||||||
|
monkeypatch.setenv('NOMINATIM_DATABASE_DSN' , 'dbname=' + name)
|
||||||
|
|
||||||
|
yield name
|
||||||
|
|
||||||
|
with psycopg2.connect(database='postgres') as conn:
|
||||||
|
conn.set_isolation_level(0)
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
"""
|
"""
|
||||||
Tests for command line interface wrapper.
|
Tests for command line interface wrapper.
|
||||||
"""
|
"""
|
||||||
|
import psycopg2
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
import nominatim.cli
|
import nominatim.cli
|
||||||
@@ -81,7 +82,10 @@ def test_add_data_command(mock_run_legacy, name, oid):
|
|||||||
(['--boundaries-only'], 1, 0),
|
(['--boundaries-only'], 1, 0),
|
||||||
(['--no-boundaries'], 0, 1),
|
(['--no-boundaries'], 0, 1),
|
||||||
(['--boundaries-only', '--no-boundaries'], 0, 0)])
|
(['--boundaries-only', '--no-boundaries'], 0, 0)])
|
||||||
def test_index_command(monkeypatch, params, do_bnds, do_ranks):
|
def test_index_command(monkeypatch, temp_db, params, do_bnds, do_ranks):
|
||||||
|
with psycopg2.connect(database=temp_db) as conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
cur.execute("CREATE TABLE import_status (indexed bool)")
|
||||||
bnd_mock = MockParamCapture()
|
bnd_mock = MockParamCapture()
|
||||||
monkeypatch.setattr(nominatim.cli.Indexer, 'index_boundaries', bnd_mock)
|
monkeypatch.setattr(nominatim.cli.Indexer, 'index_boundaries', bnd_mock)
|
||||||
rank_mock = MockParamCapture()
|
rank_mock = MockParamCapture()
|
||||||
|
|||||||
@@ -9,19 +9,11 @@ from nominatim.indexer.indexer import Indexer
|
|||||||
|
|
||||||
class IndexerTestDB:
|
class IndexerTestDB:
|
||||||
|
|
||||||
def __init__(self, name):
|
def __init__(self, conn):
|
||||||
self.name = name
|
|
||||||
self.conn = None
|
|
||||||
self.placex_id = itertools.count(100000)
|
self.placex_id = itertools.count(100000)
|
||||||
self.osmline_id = itertools.count(500000)
|
self.osmline_id = itertools.count(500000)
|
||||||
|
|
||||||
def setup(self):
|
self.conn = conn
|
||||||
with psycopg2.connect(database='postgres') as conn:
|
|
||||||
conn.set_isolation_level(0)
|
|
||||||
with conn.cursor() as cur:
|
|
||||||
cur.execute('DROP DATABASE IF EXISTS {}'.format(self.name))
|
|
||||||
cur.execute('CREATE DATABASE {}'.format(self.name))
|
|
||||||
self.conn = psycopg2.connect(database=self.name)
|
|
||||||
self.conn.set_isolation_level(0)
|
self.conn.set_isolation_level(0)
|
||||||
with self.conn.cursor() as cur:
|
with self.conn.cursor() as cur:
|
||||||
cur.execute("""CREATE TABLE placex (place_id BIGINT,
|
cur.execute("""CREATE TABLE placex (place_id BIGINT,
|
||||||
@@ -52,16 +44,6 @@ class IndexerTestDB:
|
|||||||
cur.execute("""CREATE TRIGGER osmline_update BEFORE UPDATE ON location_property_osmline
|
cur.execute("""CREATE TRIGGER osmline_update BEFORE UPDATE ON location_property_osmline
|
||||||
FOR EACH ROW EXECUTE PROCEDURE date_update()""")
|
FOR EACH ROW EXECUTE PROCEDURE date_update()""")
|
||||||
|
|
||||||
|
|
||||||
def drop(self):
|
|
||||||
if self.conn:
|
|
||||||
self.conn.close()
|
|
||||||
self.conn = None
|
|
||||||
with psycopg2.connect(database='postgres') as conn:
|
|
||||||
conn.set_isolation_level(0)
|
|
||||||
with conn.cursor() as cur:
|
|
||||||
cur.execute('DROP DATABASE IF EXISTS {}'.format(self.name))
|
|
||||||
|
|
||||||
def scalar(self, query):
|
def scalar(self, query):
|
||||||
with self.conn.cursor() as cur:
|
with self.conn.cursor() as cur:
|
||||||
cur.execute(query)
|
cur.execute(query)
|
||||||
@@ -100,11 +82,10 @@ class IndexerTestDB:
|
|||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def test_db():
|
def test_db(temp_db):
|
||||||
db = IndexerTestDB('test_nominatim_python_unittest')
|
conn = psycopg2.connect(database=temp_db)
|
||||||
db.setup()
|
yield IndexerTestDB(conn)
|
||||||
yield db
|
conn.close()
|
||||||
db.drop()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("threads", [1, 15])
|
@pytest.mark.parametrize("threads", [1, 15])
|
||||||
|
|||||||
Reference in New Issue
Block a user