add BDD tests for DB

This commit is contained in:
Sarah Hoffmann
2025-04-09 14:52:34 +02:00
parent 5f44aa2873
commit b34991d85f
33 changed files with 6095 additions and 46 deletions

View File

@@ -12,9 +12,10 @@ import re
import math
from psycopg import sql as pysql
from psycopg.rows import dict_row, tuple_row
from psycopg.rows import dict_row
from .geometry_alias import ALIASES
COMPARATOR_TERMS = {
'exactly': lambda exp, act: exp == act,
'more than': lambda exp, act: act > exp,
@@ -26,11 +27,19 @@ def _pretty(obj):
return json.dumps(obj, sort_keys=True, indent=2)
def _pt_close(p1, p2):
return math.isclose(p1[0], p2[0], abs_tol=1e-07) \
and math.isclose(p1[1], p2[1], abs_tol=1e-07)
def within_box(value, expect):
coord = [float(x) for x in expect.split(',')]
if isinstance(value, str):
value = value.split(',')
if value.startswith('POINT'):
value = value[6:-1].split(' ')
else:
value = value.split(',')
value = list(map(float, value))
if len(value) == 2:
@@ -98,10 +107,10 @@ class ResultAttr:
self.subobj = self.subobj[sub]
def __eq__(self, other):
if not isinstance(other, str):
raise NotImplementedError()
# work around bad quoting by pytest-bdd
if not isinstance(other, str):
return self.subobj == other
other = other.replace(r'\\', '\\')
if self.fmt in COMPARISON_FUNCS:
@@ -148,18 +157,16 @@ class ResultAttr:
for pt in map(str.strip, m[2].split(','))]
if expected.startswith('country:'):
ccode = geom[8:].upper()
ccode = expected[8:].upper()
assert ccode in ALIASES, f"Geometry error: unknown country {ccode}"
return m[1] == 'POINT' and \
all(math.isclose(p1, p2) for p1, p2 in zip(converted[0], ALIASES[ccode]))
return m[1] == 'POINT' and _pt_close(converted[0], ALIASES[ccode])
if ',' not in expected:
return m[1] == 'POINT' and \
all(math.isclose(p1, p2) for p1, p2 in zip(converted[0], self.get_point(expected)))
return m[1] == 'POINT' and _pt_close(converted[0], self.get_point(expected))
if '(' not in expected:
return m[1] == 'LINESTRING' and \
all(math.isclose(p1[0], p2[0]) and math.isclose(p1[1], p2[1]) for p1, p2 in
all(_pt_close(p1, p2) for p1, p2 in
zip(converted, (self.get_point(p) for p in expected.split(','))))
if m[1] != 'POLYGON':
@@ -174,7 +181,7 @@ class ResultAttr:
"First and last point need to be the same")
for line in (exp_coords[:-1], exp_coords[-1:0:-1]):
for i in range(len(line)):
if all(math.isclose(p1[0], p2[0]) and math.isclose(p1[1], p2[1]) for p1, p2 in
if all(_pt_close(p1, p2) for p1, p2 in
zip(converted, line[i:] + line[:i])):
return True
@@ -199,7 +206,7 @@ def check_table_content(conn, tablename, data, grid=None, exact=False):
cols.extend(('osm_id', 'osm_type'))
elif '!' in col:
name, fmt = col.rsplit('!', 1)
if fmt == 'wkt':
if fmt in ('wkt', 'in_box'):
cols.append(f"ST_AsText({name}) as {name}")
else:
cols.append(name.split('+')[0])
@@ -215,7 +222,7 @@ def check_table_content(conn, tablename, data, grid=None, exact=False):
table_content += '\n' + str(row)
for i in lines:
for col, value in zip(data[0], data[i]):
if ResultAttr(row, col, grid=grid) != value:
if ResultAttr(row, col, grid=grid) != (None if value == '-' else value):
break
else:
lines.remove(i)
@@ -228,15 +235,3 @@ def check_table_content(conn, tablename, data, grid=None, exact=False):
+ '\n'.join(str(data[i]) for i in lines) \
+ "\nTable content:\n" \
+ table_content
def check_table_has_lines(conn, tablename, osm_type, osm_id, osm_class):
sql = pysql.SQL("""SELECT count(*) FROM {}
WHERE osm_type = %s and osm_id = %s""").format(pysql.Identifier(tablename))
params = [osm_type, int(osm_id)]
if osm_class:
sql += pysql.SQL(' AND class = %s')
params.append(osm_class)
with conn.cursor(row_factory=tuple_row) as cur:
assert cur.execute(sql, params).fetchone()[0] == 0