add simple osm2pgsql tests

This commit is contained in:
Sarah Hoffmann
2016-11-27 16:51:05 +01:00
parent 21a3fc4b0f
commit f2debbef19
4 changed files with 147 additions and 8 deletions

View File

@@ -118,14 +118,15 @@ def assert_db_column(row, column, value, context):
if column.startswith('centroid'):
fac = float(column[9:]) if column.startswith('centroid*') else 1.0
x, y = value.split(' ')
assert_almost_equal(float(x) * fac, row['cx'])
assert_almost_equal(float(y) * fac, row['cy'])
assert_almost_equal(float(x) * fac, row['cx'], "Bad x coordinate")
assert_almost_equal(float(y) * fac, row['cy'], "Bad y coordinate")
elif column == 'geometry':
geom = context.osm.parse_geometry(value, context.scene)
cur = context.db.cursor()
cur.execute("SELECT ST_Equals(%s, ST_SetSRID(%%s::geometry, 4326))" % geom,
(row['geomtxt'],))
eq_(cur.fetchone()[0], True)
query = "SELECT ST_Equals(ST_SnapToGrid(%s, 0.00001, 0.00001), ST_SnapToGrid(ST_SetSRID('%s'::geometry, 4326), 0.00001, 0.00001))" % (
geom, row['geomtxt'],)
cur.execute(query)
eq_(cur.fetchone()[0], True, "(Row %s failed: %s)" % (column, query))
else:
eq_(value, str(row[column]),
"Row '%s': expected: %s, got: %s"
@@ -258,6 +259,7 @@ def check_placex_contents(context, exact):
ST_X(centroid) as cx, ST_Y(centroid) as cy
FROM placex where %s""" % where,
params)
assert_less(0, cur.rowcount, "No rows found for " + row['object'])
for res in cur:
if exact:
@@ -286,6 +288,48 @@ def check_placex_contents(context, exact):
context.db.commit()
@then("place contains(?P<exact> exactly)?")
def check_placex_contents(context, exact):
cur = context.db.cursor(cursor_factory=psycopg2.extras.DictCursor)
expected_content = set()
for row in context.table:
nid = NominatimID(row['object'])
where, params = nid.table_select()
cur.execute("""SELECT *, ST_AsText(geometry) as geomtxt
FROM place where %s""" % where,
params)
assert_less(0, cur.rowcount, "No rows found for " + row['object'])
for res in cur:
if exact:
expected_content.add((res['osm_type'], res['osm_id'], res['class']))
for h in row.headings:
msg = "%s: %s" % (row['object'], h)
if h in ('name', 'extratags'):
vdict = eval('{' + row[h] + '}')
assert_equals(vdict, res[h], msg)
elif h.startswith('name+'):
assert_equals(res['name'][h[5:]], row[h], msg)
elif h.startswith('extratags+'):
assert_equals(res['extratags'][h[10:]], row[h], msg)
elif h in ('linked_place_id', 'parent_place_id'):
if row[h] == '0':
assert_equals(0, res[h], msg)
elif row[h] == '-':
assert_is_none(res[h], msg)
else:
assert_equals(NominatimID(row[h]).get_place_id(context.db.cursor()),
res[h], msg)
else:
assert_db_column(res, h, row[h], context)
if exact:
cur.execute('SELECT osm_type, osm_id, class from place')
eq_(expected_content, set([(r[0], r[1], r[2]) for r in cur]))
context.db.commit()
@then("search_name contains")
def check_search_name_contents(context):
cur = context.db.cursor(cursor_factory=psycopg2.extras.DictCursor)
@@ -294,6 +338,7 @@ def check_search_name_contents(context):
pid = NominatimID(row['object']).get_place_id(cur)
cur.execute("""SELECT *, ST_X(centroid) as cx, ST_Y(centroid) as cy
FROM search_name WHERE place_id = %s""", (pid, ))
assert_less(0, cur.rowcount, "No rows found for " + row['object'])
for res in cur:
for h in row.headings:

View File

@@ -0,0 +1,33 @@
import subprocess
import tempfile
import random
import os
from nose.tools import * # for assert functions
@when(u'loading osm data')
def load_osm_file(context):
# create a OSM file in /tmp and import it
with tempfile.NamedTemporaryFile(dir='/tmp', suffix='.opl', delete=False) as fd:
fname = fd.name
for line in context.text.splitlines():
if line.startswith('n') and line.find(' x') < 0:
line += " x%d y%d" % (random.random()*360 - 180,
random.random()*180 - 90)
fd.write(line.encode('utf-8'))
fd.write(b'\n')
context.nominatim.run_setup_script('import-data', osm_file=fname,
osm2pgsql_cache=300)
### reintroduce the triggers/indexes we've lost by having osm2pgsql set up place again
cur = context.db.cursor()
cur.execute("""CREATE TRIGGER place_before_delete BEFORE DELETE ON place
FOR EACH ROW EXECUTE PROCEDURE place_delete()""")
cur.execute("""CREATE TRIGGER place_before_insert BEFORE INSERT ON place
FOR EACH ROW EXECUTE PROCEDURE place_insert()""")
cur.execute("""CREATE UNIQUE INDEX idx_place_osm_unique on place using btree(osm_id,osm_type,class,type)""")
context.db.commit()
os.remove(fname)