add and extend tests for new postcode handling

This commit is contained in:
Sarah Hoffmann
2021-05-12 23:30:45 +02:00
parent a4aba23a83
commit 4abaf71234
9 changed files with 320 additions and 138 deletions

View File

@@ -38,20 +38,20 @@ class _CountryPostcodesCollector:
with conn.cursor() as cur: with conn.cursor() as cur:
if to_add: if to_add:
execute_values(cur, execute_values(cur,
"""INSERT INTO location_postcodes """INSERT INTO location_postcode
(place_id, indexed_status, countrycode, (place_id, indexed_status, country_code,
postcode, geometry) VALUES %s""", postcode, geometry) VALUES %s""",
to_add, to_add,
template="""(nextval('seq_place'), 1, '{}', template="""(nextval('seq_place'), 1, '{}',
%s, 'SRID=4326;POINT(%s %s)') %s, 'SRID=4326;POINT(%s %s)')
""".format(self.country)) """.format(self.country))
if to_delete: if to_delete:
cur.execute("""DELETE FROM location_postcodes cur.execute("""DELETE FROM location_postcode
WHERE country_code = %s and postcode = any(%s) WHERE country_code = %s and postcode = any(%s)
""", (self.country, to_delete)) """, (self.country, to_delete))
if to_update: if to_update:
execute_values(cur, execute_values(cur,
"""UPDATE location_postcodes """UPDATE location_postcode
SET indexed_status = 2, SET indexed_status = 2,
geometry = ST_SetSRID(ST_Point(v.x, v.y), 4326) geometry = ST_SetSRID(ST_Point(v.x, v.y), 4326)
FROM (VALUES %s) AS v (pc, x, y) FROM (VALUES %s) AS v (pc, x, y)
@@ -62,22 +62,22 @@ class _CountryPostcodesCollector:
def _compute_changes(self, conn): def _compute_changes(self, conn):
""" Compute which postcodes from the collected postcodes have to be """ Compute which postcodes from the collected postcodes have to be
added or modified and which from the location_postcodes table added or modified and which from the location_postcode table
have to be deleted. have to be deleted.
""" """
to_update = [] to_update = []
to_delete = [] to_delete = []
with conn.cursor() as cur: with conn.cursor() as cur:
cur.execute("""SELECT postcode, ST_X(geometry), ST_Y(geometry) cur.execute("""SELECT postcode, ST_X(geometry), ST_Y(geometry)
FROM location_postcodes FROM location_postcode
WHERE country_code = %s""", WHERE country_code = %s""",
(self.country, )) (self.country, ))
for postcode, x, y in cur: for postcode, x, y in cur:
oldx, oldy = self.collected.pop(postcode, (None, None)) newx, newy = self.collected.pop(postcode, (None, None))
if oldx is not None: if newx is not None:
dist = (x - oldx)**2 + (y - oldy)**2 dist = (x - newx)**2 + (y - newy)**2
if dist > 0.000001: if dist > 0.0000001:
to_update.append(postcode, x, y) to_update.append((postcode, newx, newy))
else: else:
to_delete.append(postcode) to_delete.append(postcode)
@@ -105,7 +105,7 @@ class _CountryPostcodesCollector:
postcode = analyzer.normalize_postcode(row['postcode']) postcode = analyzer.normalize_postcode(row['postcode'])
if postcode not in self.collected: if postcode not in self.collected:
try: try:
self.collected[postcode] = (float(row['lon'], float(row['lat']))) self.collected[postcode] = (float(row['lon']), float(row['lat']))
except ValueError: except ValueError:
LOG.warning("Bad coordinates %s, %s in %s country postcode file.", LOG.warning("Bad coordinates %s, %s in %s country postcode file.",
row['lat'], row['lon'], self.country) row['lat'], row['lon'], self.country)
@@ -139,7 +139,7 @@ def update_postcodes(dsn, project_dir, tokenizer):
""" """
with tokenizer.name_analyzer() as analyzer: with tokenizer.name_analyzer() as analyzer:
with connect(dsn) as conn: with connect(dsn) as conn:
with conn.cursor("placex_postcodes") as cur: with conn.cursor(name="placex_postcodes") as cur:
cur.execute("""SELECT country_code, pc, ST_X(centroid), ST_Y(centroid) cur.execute("""SELECT country_code, pc, ST_X(centroid), ST_Y(centroid)
FROM ( FROM (
SELECT country_code, SELECT country_code,

View File

@@ -19,6 +19,7 @@ from nominatim.db.sql_preprocessor import SQLPreprocessor
from nominatim.db import properties from nominatim.db import properties
import dummy_tokenizer import dummy_tokenizer
import mocks
class _TestingCursor(psycopg2.extras.DictCursor): class _TestingCursor(psycopg2.extras.DictCursor):
""" Extension to the DictCursor class that provides execution """ Extension to the DictCursor class that provides execution
@@ -211,33 +212,7 @@ def place_row(place_table, temp_db_cursor):
def placex_table(temp_db_with_extensions, temp_db_conn): def placex_table(temp_db_with_extensions, temp_db_conn):
""" Create an empty version of the place table. """ Create an empty version of the place table.
""" """
with temp_db_conn.cursor() as cur: return mocks.MockPlacexTable(temp_db_conn)
cur.execute("""CREATE TABLE placex (
place_id BIGINT,
parent_place_id BIGINT,
linked_place_id BIGINT,
importance FLOAT,
indexed_date TIMESTAMP,
geometry_sector INTEGER,
rank_address SMALLINT,
rank_search SMALLINT,
partition SMALLINT,
indexed_status SMALLINT,
osm_id int8,
osm_type char(1),
class text,
type text,
name hstore,
admin_level smallint,
address hstore,
extratags hstore,
geometry Geometry(Geometry,4326),
wikipedia TEXT,
country_code varchar(2),
housenumber TEXT,
postcode TEXT,
centroid GEOMETRY(Geometry, 4326))""")
temp_db_conn.commit()
@pytest.fixture @pytest.fixture
@@ -262,18 +237,8 @@ def osmline_table(temp_db_with_extensions, temp_db_conn):
@pytest.fixture @pytest.fixture
def word_table(temp_db, temp_db_conn): def word_table(temp_db_conn):
with temp_db_conn.cursor() as cur: return mocks.MockWordTable(temp_db_conn)
cur.execute("""CREATE TABLE word (
word_id INTEGER,
word_token text,
word text,
class text,
type text,
country_code varchar(2),
search_name_count INTEGER,
operator TEXT)""")
temp_db_conn.commit()
@pytest.fixture @pytest.fixture

View File

@@ -51,6 +51,9 @@ class DummyNameAnalyzer:
def close(self): def close(self):
pass pass
def normalize_postcode(self, postcode):
return postcode
def add_postcodes_from_db(self): def add_postcodes_from_db(self):
pass pass

View File

@@ -1,7 +1,9 @@
""" """
Custom mocks for testing. Custom mocks for testing.
""" """
import itertools
import psycopg2.extras
class MockParamCapture: class MockParamCapture:
""" Mock that records the parameters with which a function was called """ Mock that records the parameters with which a function was called
@@ -16,3 +18,110 @@ class MockParamCapture:
self.last_args = args self.last_args = args
self.last_kwargs = kwargs self.last_kwargs = kwargs
return self.return_value return self.return_value
class MockWordTable:
""" A word table for testing.
"""
def __init__(self, conn):
self.conn = conn
with conn.cursor() as cur:
cur.execute("""CREATE TABLE word (word_id INTEGER,
word_token text,
word text,
class text,
type text,
country_code varchar(2),
search_name_count INTEGER,
operator TEXT)""")
conn.commit()
def add_special(self, word_token, word, cls, typ, op):
with self.conn.cursor() as cur:
cur.execute("""INSERT INTO word (word_token, word, class, type, operator)
VALUES (%s, %s, %s, %s, %s)
""", (word_token, word, cls, typ, op))
self.conn.commit()
def add_postcode(self, word_token, postcode):
with self.conn.cursor() as cur:
cur.execute("""INSERT INTO word (word_token, word, class, type)
VALUES (%s, %s, 'place', 'postcode')
""", (word_token, postcode))
self.conn.commit()
def count(self):
with self.conn.cursor() as cur:
return cur.scalar("SELECT count(*) FROM word")
def count_special(self):
with self.conn.cursor() as cur:
return cur.scalar("SELECT count(*) FROM word WHERE class != 'place'")
def get_special(self):
with self.conn.cursor() as cur:
cur.execute("""SELECT word_token, word, class, type, operator
FROM word WHERE class != 'place'""")
return set((tuple(row) for row in cur))
def get_postcodes(self):
with self.conn.cursor() as cur:
cur.execute("""SELECT word FROM word
WHERE class = 'place' and type = 'postcode'""")
return set((row[0] for row in cur))
class MockPlacexTable:
""" A placex table for testing.
"""
def __init__(self, conn):
self.idseq = itertools.count(10000)
self.conn = conn
with conn.cursor() as cur:
cur.execute("""CREATE TABLE placex (
place_id BIGINT,
parent_place_id BIGINT,
linked_place_id BIGINT,
importance FLOAT,
indexed_date TIMESTAMP,
geometry_sector INTEGER,
rank_address SMALLINT,
rank_search SMALLINT,
partition SMALLINT,
indexed_status SMALLINT,
osm_id int8,
osm_type char(1),
class text,
type text,
name hstore,
admin_level smallint,
address hstore,
extratags hstore,
geometry Geometry(Geometry,4326),
wikipedia TEXT,
country_code varchar(2),
housenumber TEXT,
postcode TEXT,
centroid GEOMETRY(Geometry, 4326))""")
cur.execute("CREATE SEQUENCE IF NOT EXISTS seq_place")
conn.commit()
def add(self, osm_type='N', osm_id=None, cls='amenity', typ='cafe', names=None,
admin_level=None, address=None, extratags=None, geom='POINT(10 4)',
country=None):
with self.conn.cursor() as cur:
psycopg2.extras.register_hstore(cur)
cur.execute("""INSERT INTO placex (place_id, osm_type, osm_id, class,
type, name, admin_level, address,
extratags, geometry, country_code)
VALUES(nextval('seq_place'), %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""",
(osm_type, osm_id or next(self.idseq), cls, typ, names,
admin_level, address, extratags, 'SRID=4326;' + geom,
country))
self.conn.commit()

View File

@@ -120,7 +120,7 @@ def test_import_full(temp_db, mock_func_factory, tokenizer_mock):
mock_func_factory(nominatim.tools.database_import, 'create_search_indices'), mock_func_factory(nominatim.tools.database_import, 'create_search_indices'),
mock_func_factory(nominatim.tools.database_import, 'create_country_names'), mock_func_factory(nominatim.tools.database_import, 'create_country_names'),
mock_func_factory(nominatim.tools.refresh, 'load_address_levels_from_file'), mock_func_factory(nominatim.tools.refresh, 'load_address_levels_from_file'),
mock_func_factory(nominatim.tools.postcodes, 'import_postcodes'), mock_func_factory(nominatim.tools.postcodes, 'update_postcodes'),
mock_func_factory(nominatim.indexer.indexer.Indexer, 'index_full'), mock_func_factory(nominatim.indexer.indexer.Indexer, 'index_full'),
mock_func_factory(nominatim.tools.refresh, 'setup_website'), mock_func_factory(nominatim.tools.refresh, 'setup_website'),
mock_func_factory(nominatim.db.properties, 'set_property') mock_func_factory(nominatim.db.properties, 'set_property')
@@ -143,7 +143,7 @@ def test_import_continue_load_data(temp_db, mock_func_factory, tokenizer_mock):
mock_func_factory(nominatim.tools.database_import, 'load_data'), mock_func_factory(nominatim.tools.database_import, 'load_data'),
mock_func_factory(nominatim.tools.database_import, 'create_search_indices'), mock_func_factory(nominatim.tools.database_import, 'create_search_indices'),
mock_func_factory(nominatim.tools.database_import, 'create_country_names'), mock_func_factory(nominatim.tools.database_import, 'create_country_names'),
mock_func_factory(nominatim.tools.postcodes, 'import_postcodes'), mock_func_factory(nominatim.tools.postcodes, 'update_postcodes'),
mock_func_factory(nominatim.indexer.indexer.Indexer, 'index_full'), mock_func_factory(nominatim.indexer.indexer.Indexer, 'index_full'),
mock_func_factory(nominatim.tools.refresh, 'setup_website'), mock_func_factory(nominatim.tools.refresh, 'setup_website'),
mock_func_factory(nominatim.db.properties, 'set_property') mock_func_factory(nominatim.db.properties, 'set_property')
@@ -263,20 +263,25 @@ def test_special_phrases_command(temp_db, mock_func_factory, tokenizer_mock):
assert func.called == 1 assert func.called == 1
@pytest.mark.parametrize("command,func", [ @pytest.mark.parametrize("command,func", [
('postcodes', 'update_postcodes'),
('word-counts', 'recompute_word_counts'), ('word-counts', 'recompute_word_counts'),
('address-levels', 'load_address_levels_from_file'), ('address-levels', 'load_address_levels_from_file'),
('wiki-data', 'import_wikipedia_articles'), ('wiki-data', 'import_wikipedia_articles'),
('importance', 'recompute_importance'), ('importance', 'recompute_importance'),
('website', 'setup_website'), ('website', 'setup_website'),
]) ])
def test_refresh_command(mock_func_factory, temp_db, command, func): def test_refresh_command(mock_func_factory, temp_db, command, func, tokenizer_mock):
func_mock = mock_func_factory(nominatim.tools.refresh, func) func_mock = mock_func_factory(nominatim.tools.refresh, func)
assert 0 == call_nominatim('refresh', '--' + command) assert 0 == call_nominatim('refresh', '--' + command)
assert func_mock.called == 1 assert func_mock.called == 1
def test_refresh_postcodes(mock_func_factory, temp_db, tokenizer_mock):
func_mock = mock_func_factory(nominatim.tools.postcodes, 'update_postcodes')
assert 0 == call_nominatim('refresh', '--postcodes')
assert func_mock.called == 1
def test_refresh_create_functions(mock_func_factory, temp_db, tokenizer_mock): def test_refresh_create_functions(mock_func_factory, temp_db, tokenizer_mock):
func_mock = mock_func_factory(nominatim.tools.refresh, 'create_functions') func_mock = mock_func_factory(nominatim.tools.refresh, 'create_functions')
@@ -285,7 +290,7 @@ def test_refresh_create_functions(mock_func_factory, temp_db, tokenizer_mock):
assert tokenizer_mock.update_sql_functions_called assert tokenizer_mock.update_sql_functions_called
def test_refresh_importance_computed_after_wiki_import(monkeypatch, temp_db): def test_refresh_importance_computed_after_wiki_import(monkeypatch, temp_db, tokenizer_mock):
calls = [] calls = []
monkeypatch.setattr(nominatim.tools.refresh, 'import_wikipedia_articles', monkeypatch.setattr(nominatim.tools.refresh, 'import_wikipedia_articles',
lambda *args, **kwargs: calls.append('import') or 0) lambda *args, **kwargs: calls.append('import') or 0)

View File

@@ -77,12 +77,12 @@ def make_standard_name(temp_db_cursor):
@pytest.fixture @pytest.fixture
def create_postcode_id(table_factory, temp_db_cursor): def create_postcode_id(temp_db_cursor):
table_factory('out_postcode_table', 'postcode TEXT')
temp_db_cursor.execute("""CREATE OR REPLACE FUNCTION create_postcode_id(postcode TEXT) temp_db_cursor.execute("""CREATE OR REPLACE FUNCTION create_postcode_id(postcode TEXT)
RETURNS BOOLEAN AS $$ RETURNS BOOLEAN AS $$
INSERT INTO out_postcode_table VALUES (postcode) RETURNING True; INSERT INTO word (word_token, word, class, type)
VALUES (' ' || postcode, postcode, 'place', 'postcode')
RETURNING True;
$$ LANGUAGE SQL""") $$ LANGUAGE SQL""")
@@ -192,27 +192,38 @@ def test_normalize(analyzer):
assert analyzer.normalize('TEsT') == 'test' assert analyzer.normalize('TEsT') == 'test'
def test_add_postcodes_from_db(analyzer, table_factory, temp_db_cursor, def test_update_postcodes_from_db_empty(analyzer, table_factory, word_table,
create_postcode_id): create_postcode_id):
table_factory('location_postcode', 'postcode TEXT', table_factory('location_postcode', 'postcode TEXT',
content=(('1234',), ('12 34',), ('AB23',), ('1234',))) content=(('1234',), ('12 34',), ('AB23',), ('1234',)))
analyzer.add_postcodes_from_db() analyzer.update_postcodes_from_db()
assert temp_db_cursor.row_set("SELECT * from out_postcode_table") \ assert word_table.count() == 3
== set((('1234', ), ('12 34', ), ('AB23',))) assert word_table.get_postcodes() == {'1234', '12 34', 'AB23'}
def test_update_special_phrase_empty_table(analyzer, word_table, temp_db_cursor, def test_update_postcodes_from_db_add_and_remove(analyzer, table_factory, word_table,
make_standard_name): create_postcode_id):
table_factory('location_postcode', 'postcode TEXT',
content=(('1234',), ('45BC', ), ('XX45', )))
word_table.add_postcode(' 1234', '1234')
word_table.add_postcode(' 5678', '5678')
analyzer.update_postcodes_from_db()
assert word_table.count() == 3
assert word_table.get_postcodes() == {'1234', '45BC', 'XX45'}
def test_update_special_phrase_empty_table(analyzer, word_table, make_standard_name):
analyzer.update_special_phrases([ analyzer.update_special_phrases([
("König bei", "amenity", "royal", "near"), ("König bei", "amenity", "royal", "near"),
("Könige", "amenity", "royal", "-"), ("Könige", "amenity", "royal", "-"),
("strasse", "highway", "primary", "in") ("strasse", "highway", "primary", "in")
]) ])
assert temp_db_cursor.row_set("""SELECT word_token, word, class, type, operator assert word_table.get_special() \
FROM word WHERE class != 'place'""") \
== set(((' könig bei', 'könig bei', 'amenity', 'royal', 'near'), == set(((' könig bei', 'könig bei', 'amenity', 'royal', 'near'),
(' könige', 'könige', 'amenity', 'royal', None), (' könige', 'könige', 'amenity', 'royal', None),
(' strasse', 'strasse', 'highway', 'primary', 'in'))) (' strasse', 'strasse', 'highway', 'primary', 'in')))
@@ -220,24 +231,21 @@ def test_update_special_phrase_empty_table(analyzer, word_table, temp_db_cursor,
def test_update_special_phrase_delete_all(analyzer, word_table, temp_db_cursor, def test_update_special_phrase_delete_all(analyzer, word_table, temp_db_cursor,
make_standard_name): make_standard_name):
temp_db_cursor.execute("""INSERT INTO word (word_token, word, class, type, operator) word_table.add_special(' foo', 'foo', 'amenity', 'prison', 'in')
VALUES (' foo', 'foo', 'amenity', 'prison', 'in'), word_table.add_special(' bar', 'bar', 'highway', 'road', None)
(' bar', 'bar', 'highway', 'road', null)""")
assert 2 == temp_db_cursor.scalar("SELECT count(*) FROM word WHERE class != 'place'""") assert word_table.count_special() == 2
analyzer.update_special_phrases([]) analyzer.update_special_phrases([])
assert 0 == temp_db_cursor.scalar("SELECT count(*) FROM word WHERE class != 'place'""") assert word_table.count_special() == 0
def test_update_special_phrase_modify(analyzer, word_table, temp_db_cursor, def test_update_special_phrase_modify(analyzer, word_table, make_standard_name):
make_standard_name): word_table.add_special(' foo', 'foo', 'amenity', 'prison', 'in')
temp_db_cursor.execute("""INSERT INTO word (word_token, word, class, type, operator) word_table.add_special(' bar', 'bar', 'highway', 'road', None)
VALUES (' foo', 'foo', 'amenity', 'prison', 'in'),
(' bar', 'bar', 'highway', 'road', null)""")
assert 2 == temp_db_cursor.scalar("SELECT count(*) FROM word WHERE class != 'place'""") assert word_table.count_special() == 2
analyzer.update_special_phrases([ analyzer.update_special_phrases([
('prison', 'amenity', 'prison', 'in'), ('prison', 'amenity', 'prison', 'in'),
@@ -245,8 +253,7 @@ def test_update_special_phrase_modify(analyzer, word_table, temp_db_cursor,
('garden', 'leisure', 'garden', 'near') ('garden', 'leisure', 'garden', 'near')
]) ])
assert temp_db_cursor.row_set("""SELECT word_token, word, class, type, operator assert word_table.get_special() \
FROM word WHERE class != 'place'""") \
== set(((' prison', 'prison', 'amenity', 'prison', 'in'), == set(((' prison', 'prison', 'amenity', 'prison', 'in'),
(' bar', 'bar', 'highway', 'road', None), (' bar', 'bar', 'highway', 'road', None),
(' garden', 'garden', 'leisure', 'garden', 'near'))) (' garden', 'garden', 'leisure', 'garden', 'near')))
@@ -260,21 +267,17 @@ def test_process_place_names(analyzer, make_keywords):
@pytest.mark.parametrize('pc', ['12345', 'AB 123', '34-345']) @pytest.mark.parametrize('pc', ['12345', 'AB 123', '34-345'])
def test_process_place_postcode(analyzer, temp_db_cursor, create_postcode_id, pc): def test_process_place_postcode(analyzer, create_postcode_id, word_table, pc):
info = analyzer.process_place({'address': {'postcode' : pc}}) info = analyzer.process_place({'address': {'postcode' : pc}})
assert temp_db_cursor.row_set("SELECT * from out_postcode_table") \ assert word_table.get_postcodes() == {pc, }
== set(((pc, ),))
@pytest.mark.parametrize('pc', ['12:23', 'ab;cd;f', '123;836']) @pytest.mark.parametrize('pc', ['12:23', 'ab;cd;f', '123;836'])
def test_process_place_bad_postcode(analyzer, temp_db_cursor, create_postcode_id, def test_process_place_bad_postcode(analyzer, create_postcode_id, word_table, pc):
pc):
info = analyzer.process_place({'address': {'postcode' : pc}}) info = analyzer.process_place({'address': {'postcode' : pc}})
assert 0 == temp_db_cursor.scalar("SELECT count(*) from out_postcode_table") assert not word_table.get_postcodes()
@pytest.mark.parametrize('hnr', ['123a', '1', '101']) @pytest.mark.parametrize('hnr', ['123a', '1', '101'])

View File

@@ -141,16 +141,28 @@ def test_make_standard_hnr(analyzer):
assert a._make_standard_hnr('iv') == 'IV' assert a._make_standard_hnr('iv') == 'IV'
def test_add_postcodes_from_db(analyzer, word_table, table_factory, temp_db_cursor): def test_update_postcodes_from_db_empty(analyzer, table_factory, word_table):
table_factory('location_postcode', 'postcode TEXT', table_factory('location_postcode', 'postcode TEXT',
content=(('1234',), ('12 34',), ('AB23',), ('1234',))) content=(('1234',), ('12 34',), ('AB23',), ('1234',)))
with analyzer() as a: with analyzer() as a:
a.add_postcodes_from_db() a.update_postcodes_from_db()
assert temp_db_cursor.row_set("""SELECT word, word_token from word assert word_table.count() == 3
""") \ assert word_table.get_postcodes() == {'1234', '12 34', 'AB23'}
== set((('1234', ' 1234'), ('12 34', ' 12 34'), ('AB23', ' AB23')))
def test_update_postcodes_from_db_add_and_remove(analyzer, table_factory, word_table):
table_factory('location_postcode', 'postcode TEXT',
content=(('1234',), ('45BC', ), ('XX45', )))
word_table.add_postcode(' 1234', '1234')
word_table.add_postcode(' 5678', '5678')
with analyzer() as a:
a.update_postcodes_from_db()
assert word_table.count() == 3
assert word_table.get_postcodes() == {'1234', '45BC', 'XX45'}
def test_update_special_phrase_empty_table(analyzer, word_table, temp_db_cursor): def test_update_special_phrase_empty_table(analyzer, word_table, temp_db_cursor):
@@ -211,22 +223,19 @@ def test_process_place_names(analyzer, getorcreate_term_id):
@pytest.mark.parametrize('pc', ['12345', 'AB 123', '34-345']) @pytest.mark.parametrize('pc', ['12345', 'AB 123', '34-345'])
def test_process_place_postcode(analyzer, temp_db_cursor, pc): def test_process_place_postcode(analyzer, word_table, pc):
with analyzer() as a: with analyzer() as a:
info = a.process_place({'address': {'postcode' : pc}}) info = a.process_place({'address': {'postcode' : pc}})
assert temp_db_cursor.row_set("""SELECT word FROM word assert word_table.get_postcodes() == {pc, }
WHERE class = 'place' and type = 'postcode'""") \
== set(((pc, ),))
@pytest.mark.parametrize('pc', ['12:23', 'ab;cd;f', '123;836']) @pytest.mark.parametrize('pc', ['12:23', 'ab;cd;f', '123;836'])
def test_process_place_bad_postcode(analyzer, temp_db_cursor, pc): def test_process_place_bad_postcode(analyzer, word_table, pc):
with analyzer() as a: with analyzer() as a:
info = a.process_place({'address': {'postcode' : pc}}) info = a.process_place({'address': {'postcode' : pc}})
assert 0 == temp_db_cursor.scalar("""SELECT count(*) FROM word assert not word_table.get_postcodes()
WHERE class = 'place' and type = 'postcode'""")
@pytest.mark.parametrize('hnr', ['123a', '1', '101']) @pytest.mark.parametrize('hnr', ['123a', '1', '101'])

View File

@@ -153,8 +153,8 @@ def test_truncate_database_tables(temp_db_conn, temp_db_cursor, table_factory):
@pytest.mark.parametrize("threads", (1, 5)) @pytest.mark.parametrize("threads", (1, 5))
def test_load_data(dsn, src_dir, place_row, placex_table, osmline_table, word_table, def test_load_data(dsn, src_dir, place_row, placex_table, osmline_table,
temp_db_cursor, threads): word_table, temp_db_cursor, threads):
for func in ('precompute_words', 'getorcreate_housenumber_id', 'make_standard_name'): for func in ('precompute_words', 'getorcreate_housenumber_id', 'make_standard_name'):
temp_db_cursor.execute("""CREATE FUNCTION {} (src TEXT) temp_db_cursor.execute("""CREATE FUNCTION {} (src TEXT)
RETURNS TEXT AS $$ SELECT 'a'::TEXT $$ LANGUAGE SQL RETURNS TEXT AS $$ SELECT 'a'::TEXT $$ LANGUAGE SQL

View File

@@ -1,55 +1,143 @@
""" """
Tests for functions to maintain the artificial postcode table. Tests for functions to maintain the artificial postcode table.
""" """
import subprocess
import pytest import pytest
from nominatim.tools import postcodes from nominatim.tools import postcodes
import dummy_tokenizer import dummy_tokenizer
class MockPostcodeTable:
""" A location_postcode table for testing.
"""
def __init__(self, conn):
self.conn = conn
with conn.cursor() as cur:
cur.execute("""CREATE TABLE location_postcode (
place_id BIGINT,
parent_place_id BIGINT,
rank_search SMALLINT,
rank_address SMALLINT,
indexed_status SMALLINT,
indexed_date TIMESTAMP,
country_code varchar(2),
postcode TEXT,
geometry GEOMETRY(Geometry, 4326))""")
cur.execute("""CREATE OR REPLACE FUNCTION token_normalized_postcode(postcode TEXT)
RETURNS TEXT AS $$ BEGIN RETURN postcode; END; $$ LANGUAGE plpgsql;
""")
conn.commit()
def add(self, country, postcode, x, y):
with self.conn.cursor() as cur:
cur.execute("""INSERT INTO location_postcode (place_id, indexed_status,
country_code, postcode,
geometry)
VALUES (nextval('seq_place'), 1, %s, %s,
'SRID=4326;POINT(%s %s)')""",
(country, postcode, x, y))
self.conn.commit()
@property
def row_set(self):
with self.conn.cursor() as cur:
cur.execute("""SELECT country_code, postcode,
ST_X(geometry), ST_Y(geometry)
FROM location_postcode""")
return set((tuple(row) for row in cur))
@pytest.fixture @pytest.fixture
def tokenizer(): def tokenizer():
return dummy_tokenizer.DummyTokenizer(None, None) return dummy_tokenizer.DummyTokenizer(None, None)
@pytest.fixture @pytest.fixture
def postcode_table(temp_db_with_extensions, temp_db_cursor, table_factory, def postcode_table(temp_db_conn, placex_table, word_table):
placex_table, word_table): return MockPostcodeTable(temp_db_conn)
table_factory('location_postcode',
""" place_id BIGINT,
parent_place_id BIGINT,
rank_search SMALLINT,
rank_address SMALLINT,
indexed_status SMALLINT,
indexed_date TIMESTAMP,
country_code varchar(2),
postcode TEXT,
geometry GEOMETRY(Geometry, 4326)""")
temp_db_cursor.execute('CREATE SEQUENCE seq_place')
temp_db_cursor.execute("""CREATE OR REPLACE FUNCTION token_normalized_postcode(postcode TEXT)
RETURNS TEXT AS $$ BEGIN RETURN postcode; END; $$ LANGUAGE plpgsql;
""")
def test_import_postcodes_empty(dsn, temp_db_cursor, postcode_table, tmp_path, tokenizer): def test_import_postcodes_empty(dsn, postcode_table, tmp_path, tokenizer):
postcodes.import_postcodes(dsn, tmp_path, tokenizer) postcodes.update_postcodes(dsn, tmp_path, tokenizer)
assert temp_db_cursor.table_exists('gb_postcode') assert not postcode_table.row_set
assert temp_db_cursor.table_exists('us_postcode')
assert temp_db_cursor.table_rows('location_postcode') == 0
def test_import_postcodes_from_placex(dsn, temp_db_cursor, postcode_table, tmp_path, tokenizer): def test_import_postcodes_add_new(dsn, placex_table, postcode_table, tmp_path, tokenizer):
temp_db_cursor.execute(""" placex_table.add(country='xx', geom='POINT(10 12)',
INSERT INTO placex (place_id, country_code, address, geometry) address=dict(postcode='9486'))
VALUES (1, 'xx', '"postcode"=>"9486"', 'SRID=4326;POINT(10 12)') postcode_table.add('yy', '9486', 99, 34)
""")
postcodes.import_postcodes(dsn, tmp_path, tokenizer) postcodes.update_postcodes(dsn, tmp_path, tokenizer)
rows = temp_db_cursor.row_set(""" SELECT postcode, country_code, assert postcode_table.row_set == {('xx', '9486', 10, 12),
ST_X(geometry), ST_Y(geometry) ('yy', '9486', 99, 34)}
FROM location_postcode""")
print(rows)
assert len(rows) == 1 def test_import_postcodes_replace_coordinates(dsn, placex_table, postcode_table, tmp_path, tokenizer):
assert rows == set((('9486', 'xx', 10, 12), )) placex_table.add(country='xx', geom='POINT(10 12)',
address=dict(postcode='AB 4511'))
postcode_table.add('xx', 'AB 4511', 99, 34)
postcodes.update_postcodes(dsn, tmp_path, tokenizer)
assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)}
def test_import_postcodes_replace_coordinates_close(dsn, placex_table, postcode_table, tmp_path, tokenizer):
placex_table.add(country='xx', geom='POINT(10 12)',
address=dict(postcode='AB 4511'))
postcode_table.add('xx', 'AB 4511', 10, 11.99999)
postcodes.update_postcodes(dsn, tmp_path, tokenizer)
assert postcode_table.row_set == {('xx', 'AB 4511', 10, 11.99999)}
def test_import_postcodes_remove(dsn, placex_table, postcode_table, tmp_path, tokenizer):
placex_table.add(country='xx', geom='POINT(10 12)',
address=dict(postcode='AB 4511'))
postcode_table.add('xx', 'badname', 10, 12)
postcodes.update_postcodes(dsn, tmp_path, tokenizer)
assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)}
def test_import_postcodes_multi_country(dsn, placex_table, postcode_table, tmp_path, tokenizer):
placex_table.add(country='de', geom='POINT(10 12)',
address=dict(postcode='54451'))
placex_table.add(country='cc', geom='POINT(100 56)',
address=dict(postcode='DD23 T'))
placex_table.add(country='de', geom='POINT(10.3 11.0)',
address=dict(postcode='54452'))
placex_table.add(country='cc', geom='POINT(10.3 11.0)',
address=dict(postcode='54452'))
postcodes.update_postcodes(dsn, tmp_path, tokenizer)
assert postcode_table.row_set == {('de', '54451', 10, 12),
('de', '54452', 10.3, 11.0),
('cc', '54452', 10.3, 11.0),
('cc', 'DD23 T', 100, 56)}
@pytest.mark.parametrize("gzipped", [True, False])
def test_import_postcodes_extern(dsn, placex_table, postcode_table, tmp_path,
tokenizer, gzipped):
placex_table.add(country='xx', geom='POINT(10 12)',
address=dict(postcode='AB 4511'))
extfile = tmp_path / 'xx_postcodes.csv'
extfile.write_text("postcode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10")
if gzipped:
subprocess.run(['gzip', str(extfile)])
assert not extfile.is_file()
postcodes.update_postcodes(dsn, tmp_path, tokenizer)
assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12),
('xx', 'CD 4511', -10, -5)}