forked from hans/Nominatim
Code cleaning, tests simplification and use of python3-icu package
This commit is contained in:
@@ -4,75 +4,27 @@
|
||||
import pytest
|
||||
from nominatim.tools.special_phrases import _create_place_classtype_indexes, _create_place_classtype_table, _get_wiki_content, _grant_access_to_webuser, _process_amenity
|
||||
|
||||
def test_get_wiki_content():
|
||||
assert _get_wiki_content('fr')
|
||||
def test_process_amenity_with_operator(temp_db_conn, getorcreate_amenityoperator_funcs):
|
||||
_process_amenity(temp_db_conn, '', '', '', '', 'near')
|
||||
_process_amenity(temp_db_conn, '', '', '', '', 'in')
|
||||
|
||||
def execute_and_verify_add_word(temp_db_conn, phrase_label, normalized_label,
|
||||
phrase_class, phrase_type):
|
||||
_process_amenity(temp_db_conn, phrase_label, normalized_label,
|
||||
phrase_class, phrase_type, '')
|
||||
|
||||
with temp_db_conn.cursor() as temp_db_cursor:
|
||||
temp_db_cursor.execute(f"""
|
||||
SELECT * FROM word
|
||||
WHERE word_token=' {normalized_label}'
|
||||
AND word='{normalized_label}'
|
||||
AND class='{phrase_class}'
|
||||
AND type='{phrase_type}'
|
||||
AND type='{phrase_type}'""")
|
||||
return temp_db_cursor.fetchone()
|
||||
|
||||
def execute_and_verify_add_word_with_operator(temp_db_conn, phrase_label, normalized_label,
|
||||
phrase_class, phrase_type, phrase_operator):
|
||||
_process_amenity(temp_db_conn, phrase_label, normalized_label,
|
||||
phrase_class, phrase_type, phrase_operator)
|
||||
|
||||
with temp_db_conn.cursor() as temp_db_cursor:
|
||||
temp_db_cursor.execute(f"""
|
||||
SELECT * FROM word
|
||||
WHERE word_token=' {normalized_label}'
|
||||
AND word='{normalized_label}'
|
||||
AND class='{phrase_class}'
|
||||
AND type='{phrase_type}'
|
||||
AND operator='{phrase_operator}'""")
|
||||
return temp_db_cursor.fetchone()
|
||||
|
||||
def test_process_amenity_with_near_operator(temp_db_conn, word_table, amenity_operator_funcs):
|
||||
phrase_label = ' label '
|
||||
normalized_label = 'label'
|
||||
phrase_class = 'class'
|
||||
phrase_type = 'type'
|
||||
|
||||
assert execute_and_verify_add_word(temp_db_conn, phrase_label, normalized_label,
|
||||
phrase_class, phrase_type)
|
||||
assert execute_and_verify_add_word_with_operator(temp_db_conn, phrase_label, normalized_label,
|
||||
phrase_class, phrase_type, 'near')
|
||||
assert execute_and_verify_add_word_with_operator(temp_db_conn, phrase_label, normalized_label,
|
||||
phrase_class, phrase_type, 'in')
|
||||
|
||||
def index_exists(db_connect, index):
|
||||
""" Check that an index with the given name exists in the database.
|
||||
"""
|
||||
with db_connect.cursor() as cur:
|
||||
cur.execute("""SELECT tablename FROM pg_indexes
|
||||
WHERE indexname = %s and schemaname = 'public'""", (index, ))
|
||||
if cur.rowcount == 0:
|
||||
return False
|
||||
return True
|
||||
def test_process_amenity_without_operator(temp_db_conn, getorcreate_amenity_funcs):
|
||||
_process_amenity(temp_db_conn, '', '', '', '', '')
|
||||
|
||||
def test_create_place_classtype_indexes(temp_db_conn):
|
||||
phrase_class = 'class'
|
||||
phrase_type = 'type'
|
||||
table_name = f'place_classtype_{phrase_class}_{phrase_type}'
|
||||
table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
|
||||
index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
|
||||
|
||||
with temp_db_conn.cursor() as temp_db_cursor:
|
||||
temp_db_cursor.execute("CREATE EXTENSION postgis;")
|
||||
temp_db_cursor.execute(f'CREATE TABLE {table_name}(place_id BIGINT, centroid GEOMETRY)')
|
||||
temp_db_cursor.execute('CREATE TABLE {}(place_id BIGINT, centroid GEOMETRY)'.format(table_name))
|
||||
|
||||
_create_place_classtype_indexes(temp_db_conn, '', phrase_class, phrase_type)
|
||||
|
||||
centroid_index_exists = index_exists(temp_db_conn, f'idx_place_classtype_{phrase_class}_{phrase_type}_centroid')
|
||||
place_id_index_exists = index_exists(temp_db_conn, f'idx_place_classtype_{phrase_class}_{phrase_type}_place_id')
|
||||
centroid_index_exists = temp_db_conn.index_exists(index_prefix + 'centroid')
|
||||
place_id_index_exists = temp_db_conn.index_exists(index_prefix + 'place_id')
|
||||
|
||||
assert centroid_index_exists and place_id_index_exists
|
||||
|
||||
@@ -93,10 +45,10 @@ def test_create_place_classtype_table(temp_db_conn, placex_table):
|
||||
def test_grant_access_to_web_user(temp_db_conn, def_config):
|
||||
phrase_class = 'class'
|
||||
phrase_type = 'type'
|
||||
table_name = f'place_classtype_{phrase_class}_{phrase_type}'
|
||||
table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
|
||||
|
||||
with temp_db_conn.cursor() as temp_db_cursor:
|
||||
temp_db_cursor.execute(f'CREATE TABLE {table_name}()')
|
||||
temp_db_cursor.execute('CREATE TABLE {}()'.format(table_name))
|
||||
|
||||
_grant_access_to_webuser(temp_db_conn, def_config, phrase_class, phrase_type)
|
||||
|
||||
@@ -110,65 +62,28 @@ def test_grant_access_to_web_user(temp_db_conn, def_config):
|
||||
assert result
|
||||
|
||||
@pytest.fixture
|
||||
def amenity_operator_funcs(temp_db_cursor):
|
||||
def make_strandard_name_func(temp_db_cursor):
|
||||
temp_db_cursor.execute(f"""
|
||||
CREATE OR REPLACE FUNCTION make_standard_name(name TEXT) RETURNS TEXT
|
||||
AS $$
|
||||
DECLARE
|
||||
o TEXT;
|
||||
CREATE OR REPLACE FUNCTION make_standard_name(name TEXT) RETURNS TEXT AS $$
|
||||
BEGIN
|
||||
RETURN trim(name); --Basically return only the trimed name for the tests
|
||||
END;
|
||||
$$
|
||||
LANGUAGE plpgsql IMMUTABLE;
|
||||
|
||||
CREATE SEQUENCE seq_word start 1;
|
||||
|
||||
$$ LANGUAGE plpgsql IMMUTABLE;""")
|
||||
|
||||
@pytest.fixture
|
||||
def getorcreate_amenity_funcs(temp_db_cursor, make_strandard_name_func):
|
||||
temp_db_cursor.execute(f"""
|
||||
CREATE OR REPLACE FUNCTION getorcreate_amenity(lookup_word TEXT, normalized_word TEXT,
|
||||
lookup_class text, lookup_type text)
|
||||
RETURNS INTEGER
|
||||
AS $$
|
||||
DECLARE
|
||||
lookup_token TEXT;
|
||||
return_word_id INTEGER;
|
||||
BEGIN
|
||||
lookup_token := ' '||trim(lookup_word);
|
||||
SELECT min(word_id) FROM word
|
||||
WHERE word_token = lookup_token and word = normalized_word
|
||||
and class = lookup_class and type = lookup_type
|
||||
INTO return_word_id;
|
||||
IF return_word_id IS NULL THEN
|
||||
return_word_id := nextval('seq_word');
|
||||
INSERT INTO word VALUES (return_word_id, lookup_token, normalized_word,
|
||||
lookup_class, lookup_type, null, 0);
|
||||
END IF;
|
||||
RETURN return_word_id;
|
||||
END;
|
||||
$$
|
||||
LANGUAGE plpgsql;
|
||||
RETURNS void as $$
|
||||
BEGIN END;
|
||||
$$ LANGUAGE plpgsql""")
|
||||
|
||||
CREATE OR REPLACE FUNCTION getorcreate_amenityoperator(lookup_word TEXT,
|
||||
normalized_word TEXT,
|
||||
lookup_class text,
|
||||
lookup_type text,
|
||||
op text)
|
||||
RETURNS INTEGER
|
||||
AS $$
|
||||
DECLARE
|
||||
lookup_token TEXT;
|
||||
return_word_id INTEGER;
|
||||
BEGIN
|
||||
lookup_token := ' '||trim(lookup_word);
|
||||
SELECT min(word_id) FROM word
|
||||
WHERE word_token = lookup_token and word = normalized_word
|
||||
and class = lookup_class and type = lookup_type and operator = op
|
||||
INTO return_word_id;
|
||||
IF return_word_id IS NULL THEN
|
||||
return_word_id := nextval('seq_word');
|
||||
INSERT INTO word VALUES (return_word_id, lookup_token, normalized_word,
|
||||
lookup_class, lookup_type, null, 0, op);
|
||||
END IF;
|
||||
RETURN return_word_id;
|
||||
END;
|
||||
$$
|
||||
LANGUAGE plpgsql;""")
|
||||
@pytest.fixture
|
||||
def getorcreate_amenityoperator_funcs(temp_db_cursor, make_strandard_name_func):
|
||||
temp_db_cursor.execute(f"""
|
||||
CREATE OR REPLACE FUNCTION getorcreate_amenityoperator(lookup_word TEXT, normalized_word TEXT,
|
||||
lookup_class text, lookup_type text, op text)
|
||||
RETURNS void as $$
|
||||
BEGIN END;
|
||||
$$ LANGUAGE plpgsql""")
|
||||
Reference in New Issue
Block a user