# SPDX-License-Identifier: GPL-2.0-only # # This file is part of Nominatim. (https://nominatim.org) # # Copyright (C) 2022 by the Nominatim developer community. # For a full list of authors see the git log. """ Tests for functions to maintain the artificial postcode table. """ import subprocess import pytest from nominatim.tools import postcodes, country_info import dummy_tokenizer class MockPostcodeTable: """ A location_postcode table for testing. """ def __init__(self, conn): self.conn = conn with conn.cursor() as cur: cur.execute("""CREATE TABLE location_postcode ( place_id BIGINT, parent_place_id BIGINT, rank_search SMALLINT, rank_address SMALLINT, indexed_status SMALLINT, indexed_date TIMESTAMP, country_code varchar(2), postcode TEXT, geometry GEOMETRY(Geometry, 4326))""") cur.execute("""CREATE OR REPLACE FUNCTION token_normalized_postcode(postcode TEXT) RETURNS TEXT AS $$ BEGIN RETURN postcode; END; $$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION get_country_code(place geometry) RETURNS TEXT AS $$ BEGIN RETURN null; END; $$ LANGUAGE plpgsql; """) conn.commit() def add(self, country, postcode, x, y): with self.conn.cursor() as cur: cur.execute("""INSERT INTO location_postcode (place_id, indexed_status, country_code, postcode, geometry) VALUES (nextval('seq_place'), 1, %s, %s, 'SRID=4326;POINT(%s %s)')""", (country, postcode, x, y)) self.conn.commit() @property def row_set(self): with self.conn.cursor() as cur: cur.execute("""SELECT country_code, postcode, ST_X(geometry), ST_Y(geometry) FROM location_postcode""") return set((tuple(row) for row in cur)) @pytest.fixture def tokenizer(): return dummy_tokenizer.DummyTokenizer(None, None) @pytest.fixture def postcode_table(def_config, temp_db_conn, placex_table): country_info.setup_country_config(def_config) return MockPostcodeTable(temp_db_conn) @pytest.fixture def insert_implicit_postcode(placex_table, place_row): """ Inserts data into the placex and place table which can then be used to compute one postcode. """ def _insert_implicit_postcode(osm_id, country, geometry, address): placex_table.add(osm_id=osm_id, country=country, geom=geometry) place_row(osm_id=osm_id, geom='SRID=4326;'+geometry, address=address) return _insert_implicit_postcode def test_postcodes_empty(dsn, postcode_table, place_table, tmp_path, tokenizer): postcodes.update_postcodes(dsn, tmp_path, tokenizer) assert not postcode_table.row_set def test_postcodes_add_new(dsn, postcode_table, tmp_path, insert_implicit_postcode, tokenizer): insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='9486')) postcode_table.add('yy', '9486', 99, 34) postcodes.update_postcodes(dsn, tmp_path, tokenizer) assert postcode_table.row_set == {('xx', '9486', 10, 12), } def test_postcodes_replace_coordinates(dsn, postcode_table, tmp_path, insert_implicit_postcode, tokenizer): insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511')) postcode_table.add('xx', 'AB 4511', 99, 34) postcodes.update_postcodes(dsn, tmp_path, tokenizer) assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)} def test_postcodes_replace_coordinates_close(dsn, postcode_table, tmp_path, insert_implicit_postcode, tokenizer): insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511')) postcode_table.add('xx', 'AB 4511', 10, 11.99999) postcodes.update_postcodes(dsn, tmp_path, tokenizer) assert postcode_table.row_set == {('xx', 'AB 4511', 10, 11.99999)} def test_postcodes_remove(dsn, postcode_table, tmp_path, insert_implicit_postcode, tokenizer): insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511')) postcode_table.add('xx', 'badname', 10, 12) postcodes.update_postcodes(dsn, tmp_path, tokenizer) assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)} def test_postcodes_ignore_empty_country(dsn, postcode_table, tmp_path, insert_implicit_postcode, tokenizer): insert_implicit_postcode(1, None, 'POINT(10 12)', dict(postcode='AB 4511')) postcodes.update_postcodes(dsn, tmp_path, tokenizer) assert not postcode_table.row_set def test_postcodes_remove_all(dsn, postcode_table, place_table, tmp_path, tokenizer): postcode_table.add('ch', '5613', 10, 12) postcodes.update_postcodes(dsn, tmp_path, tokenizer) assert not postcode_table.row_set def test_postcodes_multi_country(dsn, postcode_table, tmp_path, insert_implicit_postcode, tokenizer): insert_implicit_postcode(1, 'de', 'POINT(10 12)', dict(postcode='54451')) insert_implicit_postcode(2, 'cc', 'POINT(100 56)', dict(postcode='DD23 T')) insert_implicit_postcode(3, 'de', 'POINT(10.3 11.0)', dict(postcode='54452')) insert_implicit_postcode(4, 'cc', 'POINT(10.3 11.0)', dict(postcode='54452')) postcodes.update_postcodes(dsn, tmp_path, tokenizer) assert postcode_table.row_set == {('de', '54451', 10, 12), ('de', '54452', 10.3, 11.0), ('cc', '54452', 10.3, 11.0), ('cc', 'DD23 T', 100, 56)} @pytest.mark.parametrize("gzipped", [True, False]) def test_postcodes_extern(dsn, postcode_table, tmp_path, insert_implicit_postcode, tokenizer, gzipped): insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511')) extfile = tmp_path / 'xx_postcodes.csv' extfile.write_text("postcode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10") if gzipped: subprocess.run(['gzip', str(extfile)]) assert not extfile.is_file() postcodes.update_postcodes(dsn, tmp_path, tokenizer) assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12), ('xx', 'CD 4511', -10, -5)} def test_postcodes_extern_bad_column(dsn, postcode_table, tmp_path, insert_implicit_postcode, tokenizer): insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511')) extfile = tmp_path / 'xx_postcodes.csv' extfile.write_text("postode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10") postcodes.update_postcodes(dsn, tmp_path, tokenizer) assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)} def test_postcodes_extern_bad_number(dsn, insert_implicit_postcode, postcode_table, tmp_path, tokenizer): insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511')) extfile = tmp_path / 'xx_postcodes.csv' extfile.write_text("postcode,lat,lon\nXX 4511,-4,NaN\nCD 4511,-5, -10\n34,200,0") postcodes.update_postcodes(dsn, tmp_path, tokenizer) assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12), ('xx', 'CD 4511', -10, -5)} def test_can_compute(dsn, table_factory): assert not postcodes.can_compute(dsn) table_factory('place') assert postcodes.can_compute(dsn) def test_no_placex_entry(dsn, tmp_path, temp_db_cursor, place_row, postcode_table, tokenizer): #Rewrite the get_country_code function to verify its execution. temp_db_cursor.execute(""" CREATE OR REPLACE FUNCTION get_country_code(place geometry) RETURNS TEXT AS $$ BEGIN RETURN 'yy'; END; $$ LANGUAGE plpgsql; """) place_row(geom='SRID=4326;POINT(10 12)', address=dict(postcode='AB 4511')) postcodes.update_postcodes(dsn, tmp_path, tokenizer) assert postcode_table.row_set == {('yy', 'AB 4511', 10, 12)} def test_discard_badly_formatted_postcodes(dsn, tmp_path, temp_db_cursor, place_row, postcode_table, tokenizer): #Rewrite the get_country_code function to verify its execution. temp_db_cursor.execute(""" CREATE OR REPLACE FUNCTION get_country_code(place geometry) RETURNS TEXT AS $$ BEGIN RETURN 'fr'; END; $$ LANGUAGE plpgsql; """) place_row(geom='SRID=4326;POINT(10 12)', address=dict(postcode='AB 4511')) postcodes.update_postcodes(dsn, tmp_path, tokenizer) assert not postcode_table.row_set