enable flake for bdd test code

This commit is contained in:
Sarah Hoffmann
2025-03-09 17:34:04 +01:00
parent c70dfccaca
commit 78f839fbd3
15 changed files with 396 additions and 393 deletions

View File

@@ -2,20 +2,16 @@
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2024 by the Nominatim developer community.
# Copyright (C) 2025 by the Nominatim developer community.
# For a full list of authors see the git log.
""" Steps that run queries against the API.
"""
from pathlib import Path
import json
import os
import re
import logging
import asyncio
import xml.etree.ElementTree as ET
from urllib.parse import urlencode
from utils import run_script
from http_responses import GenericResponse, SearchResponse, ReverseResponse, StatusResponse
from check_functions import Bbox, check_for_attributes
from table_compare import NominatimID
@@ -68,7 +64,7 @@ def send_api_query(endpoint, params, fmt, context):
getattr(context, 'http_headers', {})))
@given(u'the HTTP header')
@given('the HTTP header')
def add_http_header(context):
if not hasattr(context, 'http_headers'):
context.http_headers = {}
@@ -77,7 +73,7 @@ def add_http_header(context):
context.http_headers[h] = context.table[0][h]
@when(u'sending (?P<fmt>\S+ )?search query "(?P<query>.*)"(?P<addr> with address)?')
@when(r'sending (?P<fmt>\S+ )?search query "(?P<query>.*)"(?P<addr> with address)?')
def website_search_request(context, fmt, query, addr):
params = {}
if query:
@@ -90,7 +86,7 @@ def website_search_request(context, fmt, query, addr):
context.response = SearchResponse(outp, fmt or 'json', status)
@when('sending v1/reverse at (?P<lat>[\d.-]*),(?P<lon>[\d.-]*)(?: with format (?P<fmt>.+))?')
@when(r'sending v1/reverse at (?P<lat>[\d.-]*),(?P<lon>[\d.-]*)(?: with format (?P<fmt>.+))?')
def api_endpoint_v1_reverse(context, lat, lon, fmt):
params = {}
if lat is not None:
@@ -106,7 +102,7 @@ def api_endpoint_v1_reverse(context, lat, lon, fmt):
context.response = ReverseResponse(outp, fmt or 'xml', status)
@when('sending v1/reverse N(?P<nodeid>\d+)(?: with format (?P<fmt>.+))?')
@when(r'sending v1/reverse N(?P<nodeid>\d+)(?: with format (?P<fmt>.+))?')
def api_endpoint_v1_reverse_from_node(context, nodeid, fmt):
params = {}
params['lon'], params['lat'] = (f'{c:f}' for c in context.osm.grid_node(int(nodeid)))
@@ -115,7 +111,7 @@ def api_endpoint_v1_reverse_from_node(context, nodeid, fmt):
context.response = ReverseResponse(outp, fmt or 'xml', status)
@when(u'sending (?P<fmt>\S+ )?details query for (?P<query>.*)')
@when(r'sending (?P<fmt>\S+ )?details query for (?P<query>.*)')
def website_details_request(context, fmt, query):
params = {}
if query[0] in 'NWR':
@@ -130,38 +126,45 @@ def website_details_request(context, fmt, query):
context.response = GenericResponse(outp, fmt or 'json', status)
@when(u'sending (?P<fmt>\S+ )?lookup query for (?P<query>.*)')
@when(r'sending (?P<fmt>\S+ )?lookup query for (?P<query>.*)')
def website_lookup_request(context, fmt, query):
params = { 'osm_ids' : query }
params = {'osm_ids': query}
outp, status = send_api_query('lookup', params, fmt, context)
context.response = SearchResponse(outp, fmt or 'xml', status)
@when(u'sending (?P<fmt>\S+ )?status query')
@when(r'sending (?P<fmt>\S+ )?status query')
def website_status_request(context, fmt):
params = {}
outp, status = send_api_query('status', params, fmt, context)
context.response = StatusResponse(outp, fmt or 'text', status)
@step(u'(?P<operator>less than|more than|exactly|at least|at most) (?P<number>\d+) results? (?:is|are) returned')
@step(r'(?P<operator>less than|more than|exactly|at least|at most) '
r'(?P<number>\d+) results? (?:is|are) returned')
def validate_result_number(context, operator, number):
context.execute_steps("Then a HTTP 200 is returned")
numres = len(context.response.result)
assert compare(operator, numres, int(number)), \
f"Bad number of results: expected {operator} {number}, got {numres}."
@then(u'a HTTP (?P<status>\d+) is returned')
@then(r'a HTTP (?P<status>\d+) is returned')
def check_http_return_status(context, status):
assert context.response.errorcode == int(status), \
f"Return HTTP status is {context.response.errorcode}."\
f" Full response:\n{context.response.page}"
@then(u'the page contents equals "(?P<text>.+)"')
@then(r'the page contents equals "(?P<text>.+)"')
def check_page_content_equals(context, text):
assert context.response.page == text
@then(u'the result is valid (?P<fmt>\w+)')
@then(r'the result is valid (?P<fmt>\w+)')
def step_impl(context, fmt):
context.execute_steps("Then a HTTP 200 is returned")
if fmt.strip() == 'html':
@@ -178,7 +181,7 @@ def step_impl(context, fmt):
assert context.response.format == fmt
@then(u'a (?P<fmt>\w+) user error is returned')
@then(r'a (?P<fmt>\w+) user error is returned')
def check_page_error(context, fmt):
context.execute_steps("Then a HTTP 400 is returned")
assert context.response.format == fmt
@@ -188,32 +191,34 @@ def check_page_error(context, fmt):
else:
assert re.search(r'({"error":)', context.response.page, re.DOTALL) is not None
@then(u'result header contains')
@then('result header contains')
def check_header_attr(context):
context.execute_steps("Then a HTTP 200 is returned")
for line in context.table:
assert line['attr'] in context.response.header, \
f"Field '{line['attr']}' missing in header. Full header:\n{context.response.header}"
f"Field '{line['attr']}' missing in header. " \
f"Full header:\n{context.response.header}"
value = context.response.header[line['attr']]
assert re.fullmatch(line['value'], value) is not None, \
f"Attribute '{line['attr']}': expected: '{line['value']}', got '{value}'"
@then(u'result header has (?P<neg>not )?attributes (?P<attrs>.*)')
@then('result header has (?P<neg>not )?attributes (?P<attrs>.*)')
def check_header_no_attr(context, neg, attrs):
check_for_attributes(context.response.header, attrs,
'absent' if neg else 'present')
@then(u'results contain(?: in field (?P<field>.*))?')
def step_impl(context, field):
@then(r'results contain(?: in field (?P<field>.*))?')
def results_contain_in_field(context, field):
context.execute_steps("then at least 1 result is returned")
for line in context.table:
context.response.match_row(line, context=context, field=field)
@then(u'result (?P<lid>\d+ )?has (?P<neg>not )?attributes (?P<attrs>.*)')
@then(r'result (?P<lid>\d+ )?has (?P<neg>not )?attributes (?P<attrs>.*)')
def validate_attributes(context, lid, neg, attrs):
for i in make_todo_list(context, lid):
check_for_attributes(context.response.result[i], attrs,
@@ -221,7 +226,7 @@ def validate_attributes(context, lid, neg, attrs):
@then(u'result addresses contain')
def step_impl(context):
def result_addresses_contain(context):
context.execute_steps("then at least 1 result is returned")
for line in context.table:
@@ -231,8 +236,9 @@ def step_impl(context):
if name != 'ID':
context.response.assert_address_field(idx, name, value)
@then(u'address of result (?P<lid>\d+) has(?P<neg> no)? types (?P<attrs>.*)')
def check_address(context, lid, neg, attrs):
@then(r'address of result (?P<lid>\d+) has(?P<neg> no)? types (?P<attrs>.*)')
def check_address_has_types(context, lid, neg, attrs):
context.execute_steps(f"then more than {lid} results are returned")
addr_parts = context.response.result[int(lid)]['address']
@@ -243,7 +249,8 @@ def check_address(context, lid, neg, attrs):
else:
assert attr in addr_parts
@then(u'address of result (?P<lid>\d+) (?P<complete>is|contains)')
@then(r'address of result (?P<lid>\d+) (?P<complete>is|contains)')
def check_address(context, lid, complete):
context.execute_steps(f"then more than {lid} results are returned")
@@ -258,7 +265,7 @@ def check_address(context, lid, complete):
assert len(addr_parts) == 0, f"Additional address parts found: {addr_parts!s}"
@then(u'result (?P<lid>\d+ )?has bounding box in (?P<coords>[\d,.-]+)')
@then(r'result (?P<lid>\d+ )?has bounding box in (?P<coords>[\d,.-]+)')
def check_bounding_box_in_area(context, lid, coords):
expected = Bbox(coords)
@@ -269,7 +276,7 @@ def check_bounding_box_in_area(context, lid, coords):
f"Bbox is not contained in {expected}")
@then(u'result (?P<lid>\d+ )?has centroid in (?P<coords>[\d,.-]+)')
@then(r'result (?P<lid>\d+ )?has centroid in (?P<coords>[\d,.-]+)')
def check_centroid_in_area(context, lid, coords):
expected = Bbox(coords)
@@ -280,7 +287,7 @@ def check_centroid_in_area(context, lid, coords):
f"Centroid is not inside {expected}")
@then(u'there are(?P<neg> no)? duplicates')
@then('there are(?P<neg> no)? duplicates')
def check_for_duplicates(context, neg):
context.execute_steps("then at least 1 result is returned")
@@ -298,4 +305,3 @@ def check_for_duplicates(context, neg):
assert not has_dupe, f"Found duplicate for {dup}"
else:
assert has_dupe, "No duplicates found"