rename and add basic tests

This commit is contained in:
Sarah Hoffmann
2016-11-07 21:16:28 +01:00
parent 65bf6dbff7
commit c56c09e2c0
5 changed files with 149 additions and 3 deletions

View File

@@ -0,0 +1,20 @@
@DB
Feature: Import of simple objects
Testing simple stuff
@wip
Scenario: Import place node
Given the places
| osm | class | type | name | geometry |
| N1 | place | village | 'name' : 'Foo' | 10.0 -10.0 |
And the named places
| osm | class | type | housenumber |
| N2 | place | village | |
When importing
Then table placex contains
| object | class | type | name | centroid |
| N1 | place | village | 'name' : 'Foo' | 10.0,-10.0 +- 1m |
When sending query "Foo"
Then results contain
| ID | osm_type | osm_id |
| 0 | N | 1 |

1
test/bdd/db/test.feature Normal file
View File

@@ -0,0 +1 @@
Feature: Test

176
test/bdd/environment.py Normal file
View File

@@ -0,0 +1,176 @@
from behave import *
import logging
import os
import psycopg2
import psycopg2.extras
import subprocess
from sys import version_info as python_version
logger = logging.getLogger(__name__)
userconfig = {
'BASEURL' : 'http://localhost/nominatim',
'BUILDDIR' : '../build',
'REMOVE_TEMPLATE' : False,
'KEEP_TEST_DB' : False,
'TEMPLATE_DB' : 'test_template_nominatim',
'TEST_DB' : 'test_nominatim',
'TEST_SETTINGS_FILE' : '/tmp/nominatim_settings.php'
}
use_step_matcher("re")
class NominatimEnvironment(object):
""" Collects all functions for the execution of Nominatim functions.
"""
def __init__(self, config):
self.build_dir = os.path.abspath(config['BUILDDIR'])
self.template_db = config['TEMPLATE_DB']
self.test_db = config['TEST_DB']
self.local_settings_file = config['TEST_SETTINGS_FILE']
self.reuse_template = not config['REMOVE_TEMPLATE']
self.keep_scenario_db = config['KEEP_TEST_DB']
os.environ['NOMINATIM_SETTINGS'] = self.local_settings_file
self.template_db_done = False
def write_nominatim_config(self, dbname):
f = open(self.local_settings_file, 'w')
f.write("<?php\n @define('CONST_Database_DSN', 'pgsql://@/%s');\n" % dbname)
f.close()
def cleanup(self):
try:
os.remove(self.local_settings_file)
except OSError:
pass # ignore missing file
def db_drop_database(self, name):
conn = psycopg2.connect(database='postgres')
conn.set_isolation_level(0)
cur = conn.cursor()
cur.execute('DROP DATABASE IF EXISTS %s' % (name, ))
conn.close()
def setup_template_db(self):
if self.template_db_done:
return
self.template_db_done = True
if self.reuse_template:
# check that the template is there
conn = psycopg2.connect(database='postgres')
cur = conn.cursor()
cur.execute('select count(*) from pg_database where datname = %s',
(self.template_db,))
if cur.fetchone()[0] == 1:
return
conn.close()
else:
# just in case... make sure a previous table has been dropped
self.db_drop_database(self.template_db)
# call the first part of database setup
self.write_nominatim_config(self.template_db)
self.run_setup_script('create-db', 'setup-db')
# remove external data to speed up indexing for tests
conn = psycopg2.connect(database=self.template_db)
cur = conn.cursor()
cur.execute("""select tablename from pg_tables
where tablename in ('gb_postcode', 'us_postcode')""")
for t in cur:
conn.cursor().execute('TRUNCATE TABLE %s' % (t[0],))
conn.commit()
conn.close()
# execute osm2pgsql on an empty file to get the right tables
osm2pgsql = os.path.join(self.build_dir, 'osm2pgsql', 'osm2pgsql')
proc = subprocess.Popen([osm2pgsql, '-lsc', '-r', 'xml',
'-O', 'gazetteer', '-d', self.template_db, '-'],
cwd=self.build_dir, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
[outstr, errstr] = proc.communicate(input=b'<osm version="0.6"></osm>')
logger.debug("running osm2pgsql for template: %s\n%s\n%s" % (osm2pgsql, outstr, errstr))
self.run_setup_script('create-functions', 'create-tables',
'create-partition-tables', 'create-partition-functions',
'load-data', 'create-search-indices')
def setup_db(self, context):
self.setup_template_db()
self.write_nominatim_config(self.test_db)
conn = psycopg2.connect(database=self.template_db)
conn.set_isolation_level(0)
cur = conn.cursor()
cur.execute('DROP DATABASE IF EXISTS %s' % (self.test_db, ))
cur.execute('CREATE DATABASE %s TEMPLATE = %s' % (self.test_db, self.template_db))
conn.close()
context.db = psycopg2.connect(database=self.test_db)
if python_version[0] < 3:
psycopg2.extras.register_hstore(context.db, globally=False, unicode=True)
else:
psycopg2.extras.register_hstore(context.db, globally=False)
def teardown_db(self, context):
if 'db' in context:
context.db.close()
if not self.keep_scenario_db:
self.db_drop_database(self.test_db)
def run_setup_script(self, *args):
self.run_nominatim_script('setup', *args)
def run_nominatim_script(self, script, *args):
cmd = [os.path.join(self.build_dir, 'utils', '%s.php' % script)]
cmd.extend(['--%s' % x for x in args])
proc = subprocess.Popen(cmd, cwd=self.build_dir,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(outp, outerr) = proc.communicate()
logger.debug("run_nominatim_script: %s\n%s\n%s" % (cmd, outp, outerr))
assert (proc.returncode == 0), "Script '%s' failed:\n%s\n%s\n" % (script, outp, outerr)
class OSMDataFactory(object):
def __init__(self):
scriptpath = os.path.dirname(os.path.abspath(__file__))
self.scene_path = os.environ.get('SCENE_PATH',
os.path.join(scriptpath, '..', 'scenes', 'data'))
def make_geometry(self, geom):
if geom.find(',') < 0:
return 'POINT(%s)' % geom
if geom.find('(') < 0:
return 'LINESTRING(%s)' % geom
return 'POLYGON(%s)' % geom
def before_all(context):
# logging setup
context.config.setup_logging()
# set up -D options
for k,v in userconfig.items():
context.config.userdata.setdefault(k, v)
logging.debug('User config: %s' %(str(context.config.userdata)))
# Nominatim test setup
context.nominatim = NominatimEnvironment(context.config.userdata)
context.osm = OSMDataFactory()
def after_all(context):
context.nominatim.cleanup()
def before_scenario(context, scenario):
if 'DB' in context.tags:
context.nominatim.setup_db(context)
def after_scenario(context, scenario):
if 'DB' in context.tags:
context.nominatim.teardown_db(context)

75
test/bdd/steps/db_ops.py Normal file
View File

@@ -0,0 +1,75 @@
import base64
import random
import string
def _format_placex_columns(row, force_name):
out = {
'osm_type' : row['osm'][0],
'osm_id' : row['osm'][1:],
'admin_level' : row.get('admin_level', 100)
}
for k in ('class', 'type', 'housenumber', 'street',
'addr_place', 'isin', 'postcode', 'country_code'):
if k in row.headings and row[k]:
out[k] = row[k]
if 'name' in row.headings:
if row['name'].startswith("'"):
out['name'] = eval('{' + row['name'] + '}')
else:
out['name'] = { 'name' : row['name'] }
elif force_name:
out['name'] = { 'name' : ''.join(random.choice(string.printable) for _ in range(int(random.random()*30))) }
if 'extratags' in row.headings:
out['extratags'] = eval('{%s}' % row['extratags'])
return out
@given("the (?P<named>named )?places")
def add_data_to_place_table(context, named):
cur = context.db.cursor()
cur.execute('ALTER TABLE place DISABLE TRIGGER place_before_insert')
for r in context.table:
cols = _format_placex_columns(r, named is not None)
if 'geometry' in r.headings:
geometry = "'%s'::geometry" % context.osm.make_geometry(r['geometry'])
elif cols['osm_type'] == 'N':
geometry = "ST_Point(%f, %f)" % (random.random()*360 - 180, random.random()*180 - 90)
else:
raise RuntimeError("Missing geometry for place")
query = 'INSERT INTO place (%s, geometry) values(%s, ST_SetSRID(%s, 4326))' % (
','.join(cols.keys()),
','.join(['%s' for x in range(len(cols))]),
geometry
)
cur.execute(query, list(cols.values()))
cur.execute('ALTER TABLE place ENABLE TRIGGER place_before_insert')
cur.close()
context.db.commit()
@when("importing")
def import_and_index_data_from_place_table(context):
context.nominatim.run_setup_script('create-functions', 'create-partition-functions')
cur = context.db.cursor()
cur.execute(
"""insert into placex (osm_type, osm_id, class, type, name, admin_level,
housenumber, street, addr_place, isin, postcode, country_code, extratags,
geometry)
select * from place where not (class='place' and type='houses' and osm_type='W')""")
cur.execute(
"""select insert_osmline (osm_id, housenumber, street, addr_place,
postcode, country_code, geometry)
from place where class='place' and type='houses' and osm_type='W'""")
context.db.commit()
context.nominatim.run_setup_script('index', 'index-noanalyse')
@then("table (?P<table>\w+) contains(?P<exact> exactly)?")
def check_table_contents(context, table, exact):
pass

38
test/bdd/steps/queries.py Normal file
View File

@@ -0,0 +1,38 @@
""" Steps that run search queries.
Queries may either be run directly via PHP using the query script
or via the HTTP interface.
"""
import os
import subprocess
class SearchResponse(object):
def __init__(response,
@when(u'searching for "(?P<query>.*)"( with params)?$')
def query_cmd(context, query):
""" Query directly via PHP script.
"""
cmd = [os.path.join(context.nominatim.build_dir, 'utils', 'query.php'),
'--search', query]
# add more parameters in table form
if context.table:
for h in context.table.headings:
value = context.table[0][h].strip()
if value:
cmd.extend(('--' + h, value))
proc = subprocess.Popen(cmd, cwd=context.nominatim.build_dir,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(outp, err) = proc.communicate()
assert_equals (0, proc.returncode), "query.php failed with message: %s" % err
context.
world.page = outp
world.response_format = 'json'
world.request_type = 'search'
world.returncode = 200