mirror of
https://github.com/osm-search/Nominatim.git
synced 2026-02-25 18:48:15 +00:00
replace behave BDD API tests with pytest-bdd tests
This commit is contained in:
0
test/bdd/utils/__init__.py
Normal file
0
test/bdd/utils/__init__.py
Normal file
133
test/bdd/utils/api_result.py
Normal file
133
test/bdd/utils/api_result.py
Normal file
@@ -0,0 +1,133 @@
|
||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||
#
|
||||
# This file is part of Nominatim. (https://nominatim.org)
|
||||
#
|
||||
# Copyright (C) 2025 by the Nominatim developer community.
|
||||
# For a full list of authors see the git log.
|
||||
"""
|
||||
Wrapper for results from the API
|
||||
"""
|
||||
import json
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
|
||||
class APIResult:
|
||||
|
||||
def __init__(self, fmt, endpoint, body):
|
||||
getattr(self, '_parse_' + fmt)(endpoint, body)
|
||||
|
||||
def is_simple(self):
|
||||
return not isinstance(self.result, list)
|
||||
|
||||
def __len__(self):
|
||||
return 1 if self.is_simple() else len(self.result)
|
||||
|
||||
def __str__(self):
|
||||
return json.dumps({'meta': self.meta, 'result': self.result}, indent=2)
|
||||
|
||||
def _parse_json(self, _, body):
|
||||
self.meta = {}
|
||||
self.result = json.loads(body)
|
||||
|
||||
def _parse_xml(self, endpoint, body):
|
||||
xml_tree = ET.fromstring(body)
|
||||
|
||||
self.meta = dict(xml_tree.attrib)
|
||||
|
||||
if xml_tree.tag == 'reversegeocode':
|
||||
self._parse_xml_simple(xml_tree)
|
||||
elif xml_tree.tag == 'searchresults':
|
||||
self._parse_xml_multi(xml_tree)
|
||||
elif xml_tree.tag == 'error':
|
||||
self.result = {'error': {sub.tag: sub.text for sub in xml_tree}}
|
||||
|
||||
def _parse_xml_simple(self, xml):
|
||||
self.result = {}
|
||||
|
||||
for child in xml:
|
||||
if child.tag == 'result':
|
||||
assert not self.result, "More than one result in reverse result"
|
||||
self.result.update(child.attrib)
|
||||
assert 'display_name' not in self.result
|
||||
self.result['display_name'] = child.text
|
||||
elif child.tag == 'addressparts':
|
||||
assert 'address' not in self.result
|
||||
self.result['address'] = {sub.tag: sub.text for sub in child}
|
||||
elif child.tag == 'extratags':
|
||||
assert 'extratags' not in self.result
|
||||
self.result['extratags'] = {tag.attrib['key']: tag.attrib['value'] for tag in child}
|
||||
elif child.tag == 'namedetails':
|
||||
assert 'namedetails' not in self.result
|
||||
self.result['namedetails'] = {tag.attrib['desc']: tag.text for tag in child}
|
||||
elif child.tag == 'geokml':
|
||||
assert 'geokml' not in self.result
|
||||
self.result['geokml'] = ET.tostring(child, encoding='unicode')
|
||||
elif child.tag == 'error':
|
||||
assert not self.result
|
||||
self.result['error'] = child.text
|
||||
else:
|
||||
assert False, f"Unknown XML tag {child.tag} on page: {self.page}"
|
||||
|
||||
def _parse_xml_multi(self, xml):
|
||||
self.result = []
|
||||
|
||||
for child in xml:
|
||||
assert child.tag == "place"
|
||||
res = dict(child.attrib)
|
||||
|
||||
address = {}
|
||||
for sub in child:
|
||||
if sub.tag == 'extratags':
|
||||
assert 'extratags' not in res
|
||||
res['extratags'] = {tag.attrib['key']: tag.attrib['value'] for tag in sub}
|
||||
elif sub.tag == 'namedetails':
|
||||
assert 'namedetails' not in res
|
||||
res['namedetails'] = {tag.attrib['desc']: tag.text for tag in sub}
|
||||
elif sub.tag == 'geokml':
|
||||
res['geokml'] = ET.tostring(sub, encoding='utf-8')
|
||||
else:
|
||||
address[sub.tag] = sub.text
|
||||
|
||||
if address:
|
||||
res['address'] = address
|
||||
|
||||
self.result.append(res)
|
||||
|
||||
def _parse_geojson(self, _, body):
|
||||
geojson = json.loads(body)
|
||||
|
||||
assert geojson.get('type') == 'FeatureCollection'
|
||||
assert isinstance(geojson.get('features'), list)
|
||||
|
||||
self.meta = {k: v for k, v in geojson.items() if k not in ('type', 'features')}
|
||||
self.result = []
|
||||
|
||||
for obj in geojson['features']:
|
||||
assert isinstance(obj, dict)
|
||||
assert obj.get('type') == 'Feature'
|
||||
|
||||
assert isinstance(obj.get('properties'), dict)
|
||||
result = obj['properties']
|
||||
assert 'geojson' not in result
|
||||
result['geojson'] = obj['geometry']
|
||||
if 'bbox' in obj:
|
||||
assert 'boundingbox' not in result
|
||||
# bbox is minlon, minlat, maxlon, maxlat
|
||||
# boundingbox is minlat, maxlat, minlon, maxlon
|
||||
result['boundingbox'] = [obj['bbox'][1], obj['bbox'][3],
|
||||
obj['bbox'][0], obj['bbox'][2]]
|
||||
self.result.append(result)
|
||||
|
||||
def _parse_geocodejson(self, endpoint, body):
|
||||
self._parse_geojson(endpoint, body)
|
||||
|
||||
assert set(self.meta.keys()) == {'geocoding'}
|
||||
assert isinstance(self.meta['geocoding'], dict)
|
||||
self.meta = self.meta['geocoding']
|
||||
|
||||
for r in self.result:
|
||||
assert set(r.keys()) == {'geocoding', 'geojson'}
|
||||
inner = r.pop('geocoding')
|
||||
assert isinstance(inner, dict)
|
||||
assert 'geojson' not in inner
|
||||
r.update(inner)
|
||||
70
test/bdd/utils/api_runner.py
Normal file
70
test/bdd/utils/api_runner.py
Normal file
@@ -0,0 +1,70 @@
|
||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||
#
|
||||
# This file is part of Nominatim. (https://nominatim.org)
|
||||
#
|
||||
# Copyright (C) 2025 by the Nominatim developer community.
|
||||
# For a full list of authors see the git log.
|
||||
"""
|
||||
Various helper classes for running Nominatim commands.
|
||||
"""
|
||||
import asyncio
|
||||
from collections import namedtuple
|
||||
|
||||
APIResponse = namedtuple('APIResponse', ['endpoint', 'status', 'body', 'headers'])
|
||||
|
||||
|
||||
class APIRunner:
|
||||
""" Execute a call to an API endpoint.
|
||||
"""
|
||||
def __init__(self, environ, api_engine):
|
||||
create_func = getattr(self, f"create_engine_{api_engine}")
|
||||
self.exec_engine = create_func(environ)
|
||||
|
||||
def run(self, endpoint, params, http_headers):
|
||||
return asyncio.run(self.exec_engine(endpoint, params, http_headers))
|
||||
|
||||
def run_step(self, endpoint, base_params, datatable, fmt, http_headers):
|
||||
if fmt:
|
||||
base_params['format'] = fmt.strip()
|
||||
|
||||
if datatable:
|
||||
if datatable[0] == ['param', 'value']:
|
||||
base_params.update(datatable[1:])
|
||||
else:
|
||||
base_params.update(zip(datatable[0], datatable[1]))
|
||||
|
||||
return self.run(endpoint, base_params, http_headers)
|
||||
|
||||
def create_engine_falcon(self, environ):
|
||||
import nominatim_api.server.falcon.server
|
||||
import falcon.testing
|
||||
|
||||
async def exec_engine_falcon(endpoint, params, http_headers):
|
||||
app = nominatim_api.server.falcon.server.get_application(None, environ)
|
||||
|
||||
async with falcon.testing.ASGIConductor(app) as conductor:
|
||||
response = await conductor.get("/" + endpoint, params=params,
|
||||
headers=http_headers)
|
||||
|
||||
return APIResponse(endpoint, response.status_code,
|
||||
response.text, response.headers)
|
||||
|
||||
return exec_engine_falcon
|
||||
|
||||
def create_engine_starlette(self, environ):
|
||||
import nominatim_api.server.starlette.server
|
||||
from asgi_lifespan import LifespanManager
|
||||
import httpx
|
||||
|
||||
async def _request(endpoint, params, http_headers):
|
||||
app = nominatim_api.server.starlette.server.get_application(None, environ)
|
||||
|
||||
async with LifespanManager(app):
|
||||
async with httpx.AsyncClient(app=app, base_url="http://nominatim.test") as client:
|
||||
response = await client.get("/" + endpoint, params=params,
|
||||
headers=http_headers)
|
||||
|
||||
return APIResponse(endpoint, response.status_code,
|
||||
response.text, response.headers)
|
||||
|
||||
return _request
|
||||
109
test/bdd/utils/checks.py
Normal file
109
test/bdd/utils/checks.py
Normal file
@@ -0,0 +1,109 @@
|
||||
# SPDX-License-Identifier: GPL-2.0-only
|
||||
#
|
||||
# This file is part of Nominatim. (https://nominatim.org)
|
||||
#
|
||||
# Copyright (C) 2025 by the Nominatim developer community.
|
||||
# For a full list of authors see the git log.
|
||||
"""
|
||||
Helper functions to compare expected values.
|
||||
"""
|
||||
import json
|
||||
import re
|
||||
|
||||
COMPARATOR_TERMS = {
|
||||
'exactly': lambda exp, act: exp == act,
|
||||
'more than': lambda exp, act: act > exp,
|
||||
'less than': lambda exp, act: act < exp,
|
||||
}
|
||||
|
||||
|
||||
def _pretty(obj):
|
||||
return json.dumps(obj, sort_keys=True, indent=2)
|
||||
|
||||
|
||||
def within_box(value, expect):
|
||||
coord = [float(x) for x in expect.split(',')]
|
||||
|
||||
if isinstance(value, str):
|
||||
value = value.split(',')
|
||||
value = list(map(float, value))
|
||||
|
||||
if len(value) == 2:
|
||||
return coord[0] <= value[0] <= coord[2] \
|
||||
and coord[1] <= value[1] <= coord[3]
|
||||
|
||||
if len(value) == 4:
|
||||
return value[0] >= coord[0] and value[1] <= coord[1] \
|
||||
and value[2] >= coord[2] and value[3] <= coord[3]
|
||||
|
||||
raise ValueError("Not a coordinate or bbox.")
|
||||
|
||||
|
||||
COMPARISON_FUNCS = {
|
||||
None: lambda val, exp: str(val) == exp,
|
||||
'i': lambda val, exp: str(val).lower() == exp.lower(),
|
||||
'fm': lambda val, exp: re.fullmatch(exp, val) is not None,
|
||||
'in_box': within_box
|
||||
}
|
||||
|
||||
OSM_TYPE = {'node': 'n', 'way': 'w', 'relation': 'r'}
|
||||
|
||||
|
||||
class ResultAttr:
|
||||
""" Returns the given attribute as a string.
|
||||
|
||||
The key parameter determines how the value is formatted before
|
||||
returning. To refer to sub attributes, use '+' to add more keys
|
||||
(e.g. 'name+ref' will access obj['name']['ref']). A '!' introduces
|
||||
a formatting suffix. If no suffix is given, the value will be
|
||||
converted using the str() function.
|
||||
|
||||
Available formatters:
|
||||
|
||||
!:... - use a formatting expression according to Python Mini Format Spec
|
||||
!i - make case-insensitive comparison
|
||||
!fm - consider comparison string a regular expression and match full value
|
||||
"""
|
||||
|
||||
def __init__(self, obj, key):
|
||||
self.obj = obj
|
||||
if '!' in key:
|
||||
self.key, self.fmt = key.rsplit('!', 1)
|
||||
else:
|
||||
self.key = key
|
||||
self.fmt = None
|
||||
|
||||
if self.key == 'object':
|
||||
assert 'osm_id' in obj
|
||||
assert 'osm_type' in obj
|
||||
self.subobj = OSM_TYPE[obj['osm_type']] + str(obj['osm_id'])
|
||||
self.fmt = 'i'
|
||||
else:
|
||||
done = ''
|
||||
self.subobj = self.obj
|
||||
for sub in self.key.split('+'):
|
||||
done += f"[{sub}]"
|
||||
assert sub in self.subobj, \
|
||||
f"Missing attribute {done}. Full object:\n{_pretty(self.obj)}"
|
||||
self.subobj = self.subobj[sub]
|
||||
|
||||
def __eq__(self, other):
|
||||
if not isinstance(other, str):
|
||||
raise NotImplementedError()
|
||||
|
||||
# work around bad quoting by pytest-bdd
|
||||
other = other.replace(r'\\', '\\')
|
||||
|
||||
if self.fmt in COMPARISON_FUNCS:
|
||||
return COMPARISON_FUNCS[self.fmt](self.subobj, other)
|
||||
|
||||
if self.fmt.startswith(':'):
|
||||
return other == f"{{{self.fmt}}}".format(self.subobj)
|
||||
|
||||
raise RuntimeError(f"Unknown format string '{self.fmt}'.")
|
||||
|
||||
def __repr__(self):
|
||||
k = self.key.replace('+', '][')
|
||||
if self.fmt:
|
||||
k += '!' + self.fmt
|
||||
return f"result[{k}]({self.subobj})"
|
||||
44
test/bdd/utils/db.py
Normal file
44
test/bdd/utils/db.py
Normal file
@@ -0,0 +1,44 @@
|
||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||
#
|
||||
# This file is part of Nominatim. (https://nominatim.org)
|
||||
#
|
||||
# Copyright (C) 2025 by the Nominatim developer community.
|
||||
# For a full list of authors see the git log.
|
||||
"""
|
||||
Helper functions for managing test databases.
|
||||
"""
|
||||
import psycopg
|
||||
from psycopg import sql as pysql
|
||||
|
||||
|
||||
class DBManager:
|
||||
|
||||
def __init__(self, purge=False):
|
||||
self.purge = purge
|
||||
|
||||
def check_for_db(self, dbname):
|
||||
""" Check if the given DB already exists.
|
||||
When the purge option is set, then an existing database will
|
||||
be deleted and the function returns that it does not exist.
|
||||
"""
|
||||
if self.purge:
|
||||
self.drop_db(dbname)
|
||||
return False
|
||||
|
||||
return self.exists_db(dbname)
|
||||
|
||||
def drop_db(self, dbname):
|
||||
""" Drop the given database if it exists.
|
||||
"""
|
||||
with psycopg.connect(dbname='postgres') as conn:
|
||||
conn.autocommit = True
|
||||
conn.execute(pysql.SQL('DROP DATABASE IF EXISTS')
|
||||
+ pysql.Identifier(dbname))
|
||||
|
||||
def exists_db(self, dbname):
|
||||
""" Check if a database with the given name exists already.
|
||||
"""
|
||||
with psycopg.connect(dbname='postgres') as conn:
|
||||
cur = conn.execute('select count(*) from pg_database where datname = %s',
|
||||
(dbname,))
|
||||
return cur.fetchone()[0] == 1
|
||||
Reference in New Issue
Block a user