mirror of
https://github.com/osm-search/Nominatim.git
synced 2026-02-15 02:47:59 +00:00
Return a consistent error response which takes into account the chosen content type. Also adds tests for V1 server glue.
387 lines
11 KiB
Python
387 lines
11 KiB
Python
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
#
|
|
# This file is part of Nominatim. (https://nominatim.org)
|
|
#
|
|
# Copyright (C) 2023 by the Nominatim developer community.
|
|
# For a full list of authors see the git log.
|
|
"""
|
|
Tests for the Python web frameworks adaptor, v1 API.
|
|
"""
|
|
from collections import namedtuple
|
|
import json
|
|
import xml.etree.ElementTree as ET
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from nominatim.config import Configuration
|
|
import nominatim.api.v1.server_glue as glue
|
|
import nominatim.api as napi
|
|
import nominatim.api.logging as loglib
|
|
|
|
class FakeError(BaseException):
|
|
|
|
def __init__(self, msg, status):
|
|
self.msg = msg
|
|
self.status = status
|
|
|
|
def __str__(self):
|
|
return f'{self.status} -- {self.msg}'
|
|
|
|
FakeResponse = namedtuple('FakeResponse', ['status', 'output', 'content_type'])
|
|
|
|
class FakeAdaptor(glue.ASGIAdaptor):
|
|
|
|
def __init__(self, params={}, headers={}, config=None):
|
|
self.params = params
|
|
self.headers = headers
|
|
self._config = config or Configuration(None)
|
|
|
|
|
|
def get(self, name, default=None):
|
|
return self.params.get(name, default)
|
|
|
|
|
|
def get_header(self, name, default=None):
|
|
return self.headers.get(name, default)
|
|
|
|
|
|
def error(self, msg, status=400):
|
|
return FakeError(msg, status)
|
|
|
|
|
|
def create_response(self, status, output):
|
|
return FakeResponse(status, output, self.content_type)
|
|
|
|
|
|
def config(self):
|
|
return self._config
|
|
|
|
|
|
# ASGIAdaptor.get_int/bool()
|
|
|
|
@pytest.mark.parametrize('func', ['get_int', 'get_bool'])
|
|
def test_adaptor_get_int_missing_but_required(func):
|
|
with pytest.raises(FakeError, match='^400 -- .*missing'):
|
|
getattr(FakeAdaptor(), func)('something')
|
|
|
|
|
|
@pytest.mark.parametrize('func, val', [('get_int', 23), ('get_bool', True)])
|
|
def test_adaptor_get_int_missing_with_default(func, val):
|
|
assert getattr(FakeAdaptor(), func)('something', val) == val
|
|
|
|
|
|
@pytest.mark.parametrize('inp', ['0', '234', '-4566953498567934876'])
|
|
def test_adaptor_get_int_success(inp):
|
|
assert FakeAdaptor(params={'foo': inp}).get_int('foo') == int(inp)
|
|
assert FakeAdaptor(params={'foo': inp}).get_int('foo', 4) == int(inp)
|
|
|
|
|
|
@pytest.mark.parametrize('inp', ['rs', '4.5', '6f'])
|
|
def test_adaptor_get_int_bad_number(inp):
|
|
with pytest.raises(FakeError, match='^400 -- .*must be a number'):
|
|
FakeAdaptor(params={'foo': inp}).get_int('foo')
|
|
|
|
|
|
@pytest.mark.parametrize('inp', ['1', 'true', 'whatever', 'false'])
|
|
def test_adaptor_get_bool_trueish(inp):
|
|
assert FakeAdaptor(params={'foo': inp}).get_bool('foo')
|
|
|
|
|
|
def test_adaptor_get_bool_falsish():
|
|
assert not FakeAdaptor(params={'foo': '0'}).get_bool('foo')
|
|
|
|
|
|
# ASGIAdaptor.parse_format()
|
|
|
|
def test_adaptor_parse_format_use_default():
|
|
adaptor = FakeAdaptor()
|
|
|
|
assert adaptor.parse_format(napi.StatusResult, 'text') == 'text'
|
|
assert adaptor.content_type == 'text/plain; charset=utf-8'
|
|
|
|
|
|
def test_adaptor_parse_format_use_configured():
|
|
adaptor = FakeAdaptor(params={'format': 'json'})
|
|
|
|
assert adaptor.parse_format(napi.StatusResult, 'text') == 'json'
|
|
assert adaptor.content_type == 'application/json'
|
|
|
|
|
|
def test_adaptor_parse_format_invalid_value():
|
|
adaptor = FakeAdaptor(params={'format': '@!#'})
|
|
|
|
with pytest.raises(FakeError, match='^400 -- .*must be one of'):
|
|
adaptor.parse_format(napi.StatusResult, 'text')
|
|
|
|
|
|
# ASGIAdaptor.get_accepted_languages()
|
|
|
|
def test_accepted_languages_from_param():
|
|
a = FakeAdaptor(params={'accept-language': 'de'})
|
|
assert a.get_accepted_languages() == 'de'
|
|
|
|
|
|
def test_accepted_languages_from_header():
|
|
a = FakeAdaptor(headers={'http_accept_language': 'de'})
|
|
assert a.get_accepted_languages() == 'de'
|
|
|
|
|
|
def test_accepted_languages_from_default(monkeypatch):
|
|
monkeypatch.setenv('NOMINATIM_DEFAULT_LANGUAGE', 'de')
|
|
a = FakeAdaptor()
|
|
assert a.get_accepted_languages() == 'de'
|
|
|
|
|
|
def test_accepted_languages_param_over_header():
|
|
a = FakeAdaptor(params={'accept-language': 'de'},
|
|
headers={'http_accept_language': 'en'})
|
|
assert a.get_accepted_languages() == 'de'
|
|
|
|
|
|
def test_accepted_languages_header_over_default(monkeypatch):
|
|
monkeypatch.setenv('NOMINATIM_DEFAULT_LANGUAGE', 'en')
|
|
a = FakeAdaptor(headers={'http_accept_language': 'de'})
|
|
assert a.get_accepted_languages() == 'de'
|
|
|
|
|
|
# ASGIAdaptor.raise_error()
|
|
|
|
class TestAdaptorRaiseError:
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def init_adaptor(self):
|
|
self.adaptor = FakeAdaptor()
|
|
self.adaptor.setup_debugging()
|
|
|
|
def run_raise_error(self, msg, status):
|
|
with pytest.raises(FakeError) as excinfo:
|
|
self.adaptor.raise_error(msg, status=status)
|
|
|
|
return excinfo.value
|
|
|
|
|
|
def test_without_content_set(self):
|
|
err = self.run_raise_error('TEST', 404)
|
|
|
|
assert self.adaptor.content_type == 'text/plain; charset=utf-8'
|
|
assert err.msg == 'TEST'
|
|
assert err.status == 404
|
|
|
|
|
|
def test_json(self):
|
|
self.adaptor.content_type = 'application/json'
|
|
|
|
err = self.run_raise_error('TEST', 501)
|
|
|
|
content = json.loads(err.msg)['error']
|
|
assert content['code'] == 501
|
|
assert content['message'] == 'TEST'
|
|
|
|
|
|
def test_xml(self):
|
|
self.adaptor.content_type = 'text/xml; charset=utf-8'
|
|
|
|
err = self.run_raise_error('this!', 503)
|
|
|
|
content = ET.fromstring(err.msg)
|
|
|
|
assert content.tag == 'error'
|
|
assert content.find('code').text == '503'
|
|
assert content.find('message').text == 'this!'
|
|
|
|
|
|
def test_raise_error_during_debug():
|
|
a = FakeAdaptor(params={'debug': '1'})
|
|
a.setup_debugging()
|
|
loglib.log().section('Ongoing')
|
|
|
|
with pytest.raises(FakeError) as excinfo:
|
|
a.raise_error('bad state')
|
|
|
|
content = ET.fromstring(excinfo.value.msg)
|
|
|
|
assert content.tag == 'html'
|
|
|
|
assert '>Ongoing<' in excinfo.value.msg
|
|
assert 'bad state' in excinfo.value.msg
|
|
|
|
|
|
# ASGIAdaptor.build_response
|
|
|
|
def test_build_response_without_content_type():
|
|
resp = FakeAdaptor().build_response('attention')
|
|
|
|
assert isinstance(resp, FakeResponse)
|
|
assert resp.status == 200
|
|
assert resp.output == 'attention'
|
|
assert resp.content_type == 'text/plain; charset=utf-8'
|
|
|
|
|
|
def test_build_response_with_status():
|
|
a = FakeAdaptor(params={'format': 'json'})
|
|
a.parse_format(napi.StatusResult, 'text')
|
|
|
|
resp = a.build_response('stuff\nmore stuff', status=404)
|
|
|
|
assert isinstance(resp, FakeResponse)
|
|
assert resp.status == 404
|
|
assert resp.output == 'stuff\nmore stuff'
|
|
assert resp.content_type == 'application/json'
|
|
|
|
|
|
def test_build_response_jsonp_with_json():
|
|
a = FakeAdaptor(params={'format': 'json', 'json_callback': 'test.func'})
|
|
a.parse_format(napi.StatusResult, 'text')
|
|
|
|
resp = a.build_response('{}')
|
|
|
|
assert isinstance(resp, FakeResponse)
|
|
assert resp.status == 200
|
|
assert resp.output == 'test.func({})'
|
|
assert resp.content_type == 'application/javascript'
|
|
|
|
|
|
def test_build_response_jsonp_without_json():
|
|
a = FakeAdaptor(params={'format': 'text', 'json_callback': 'test.func'})
|
|
a.parse_format(napi.StatusResult, 'text')
|
|
|
|
resp = a.build_response('{}')
|
|
|
|
assert isinstance(resp, FakeResponse)
|
|
assert resp.status == 200
|
|
assert resp.output == '{}'
|
|
assert resp.content_type == 'text/plain; charset=utf-8'
|
|
|
|
|
|
@pytest.mark.parametrize('param', ['alert(); func', '\\n', '', 'a b'])
|
|
def test_build_response_jsonp_bad_format(param):
|
|
a = FakeAdaptor(params={'format': 'json', 'json_callback': param})
|
|
a.parse_format(napi.StatusResult, 'text')
|
|
|
|
with pytest.raises(FakeError, match='^400 -- .*Invalid'):
|
|
a.build_response('{}')
|
|
|
|
|
|
# status_endpoint()
|
|
|
|
class TestStatusEndpoint:
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def patch_status_func(self, monkeypatch):
|
|
async def _status(*args, **kwargs):
|
|
return self.status
|
|
|
|
monkeypatch.setattr(napi.NominatimAPIAsync, 'status', _status)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_status_without_params(self):
|
|
a = FakeAdaptor()
|
|
self.status = napi.StatusResult(0, 'foo')
|
|
|
|
resp = await glue.status_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
|
|
|
|
assert isinstance(resp, FakeResponse)
|
|
assert resp.status == 200
|
|
assert resp.content_type == 'text/plain; charset=utf-8'
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_status_with_error(self):
|
|
a = FakeAdaptor()
|
|
self.status = napi.StatusResult(405, 'foo')
|
|
|
|
resp = await glue.status_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
|
|
|
|
assert isinstance(resp, FakeResponse)
|
|
assert resp.status == 500
|
|
assert resp.content_type == 'text/plain; charset=utf-8'
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_status_json_with_error(self):
|
|
a = FakeAdaptor(params={'format': 'json'})
|
|
self.status = napi.StatusResult(405, 'foo')
|
|
|
|
resp = await glue.status_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
|
|
|
|
assert isinstance(resp, FakeResponse)
|
|
assert resp.status == 200
|
|
assert resp.content_type == 'application/json'
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_status_bad_format(self):
|
|
a = FakeAdaptor(params={'format': 'foo'})
|
|
self.status = napi.StatusResult(0, 'foo')
|
|
|
|
with pytest.raises(FakeError):
|
|
await glue.status_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
|
|
|
|
|
|
# details_endpoint()
|
|
|
|
class TestDetailsEndpoint:
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def patch_lookup_func(self, monkeypatch):
|
|
self.result = napi.DetailedResult(napi.SourceTable.PLACEX,
|
|
('place', 'thing'),
|
|
napi.Point(1.0, 2.0))
|
|
self.lookup_args = []
|
|
|
|
async def _lookup(*args, **kwargs):
|
|
self.lookup_args.extend(args[1:])
|
|
return self.result
|
|
|
|
monkeypatch.setattr(napi.NominatimAPIAsync, 'lookup', _lookup)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_details_no_params(self):
|
|
a = FakeAdaptor()
|
|
|
|
with pytest.raises(FakeError, match='^400 -- .*Missing'):
|
|
await glue.details_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_details_by_place_id(self):
|
|
a = FakeAdaptor(params={'place_id': '4573'})
|
|
|
|
await glue.details_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
|
|
|
|
assert self.lookup_args[0].place_id == 4573
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_details_by_osm_id(self):
|
|
a = FakeAdaptor(params={'osmtype': 'N', 'osmid': '45'})
|
|
|
|
await glue.details_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
|
|
|
|
assert self.lookup_args[0].osm_type == 'N'
|
|
assert self.lookup_args[0].osm_id == 45
|
|
assert self.lookup_args[0].osm_class is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_details_with_debugging(self):
|
|
a = FakeAdaptor(params={'osmtype': 'N', 'osmid': '45', 'debug': '1'})
|
|
|
|
resp = await glue.details_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
|
|
content = ET.fromstring(resp.output)
|
|
|
|
assert resp.content_type == 'text/html; charset=utf-8'
|
|
assert content.tag == 'html'
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_details_no_result(self):
|
|
a = FakeAdaptor(params={'place_id': '4573'})
|
|
self.result = None
|
|
|
|
with pytest.raises(FakeError, match='^404 -- .*found'):
|
|
await glue.details_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
|