add server fronting for search endpoint

This also implements some of the quirks of free-text search of the
V1 API, in particular, search for categories and coordinates.
This commit is contained in:
Sarah Hoffmann
2023-05-26 11:40:45 +02:00
parent c7db69a30c
commit 371a780ef4
6 changed files with 535 additions and 14 deletions

View File

@@ -228,6 +228,12 @@ class SearchResults(List[SearchResult]):
May be empty when no result was found. May be empty when no result was found.
""" """
def localize(self, locales: Locales) -> None:
""" Apply the given locales to all results.
"""
for result in self:
result.localize(locales)
def _filter_geometries(row: SaRow) -> Dict[str, str]: def _filter_geometries(row: SaRow) -> Dict[str, str]:
return {k[9:]: v for k, v in row._mapping.items() # pylint: disable=W0212 return {k[9:]: v for k, v in row._mapping.items() # pylint: disable=W0212

View File

@@ -168,7 +168,7 @@ def _format_details_json(result: napi.DetailedResult, options: Mapping[str, Any]
def _format_reverse_xml(results: napi.ReverseResults, options: Mapping[str, Any]) -> str: def _format_reverse_xml(results: napi.ReverseResults, options: Mapping[str, Any]) -> str:
return format_xml.format_base_xml(results, return format_xml.format_base_xml(results,
options, True, 'reversegeocode', options, True, 'reversegeocode',
{'querystring': 'TODO'}) {'querystring': options.get('query', '')})
@dispatch.format_func(napi.ReverseResults, 'geojson') @dispatch.format_func(napi.ReverseResults, 'geojson')
@@ -199,9 +199,13 @@ def _format_reverse_jsonv2(results: napi.ReverseResults,
@dispatch.format_func(napi.SearchResults, 'xml') @dispatch.format_func(napi.SearchResults, 'xml')
def _format_search_xml(results: napi.SearchResults, options: Mapping[str, Any]) -> str: def _format_search_xml(results: napi.SearchResults, options: Mapping[str, Any]) -> str:
return format_xml.format_base_xml(results, extra = {'querystring': options.get('query', '')}
options, False, 'searchresults', for attr in ('more_url', 'exclude_place_ids', 'viewbox'):
{'querystring': 'TODO'}) if options.get(attr):
extra[attr] = options[attr]
return format_xml.format_base_xml(results, options, False, 'searchresults',
extra)
@dispatch.format_func(napi.SearchResults, 'geojson') @dispatch.format_func(napi.SearchResults, 'geojson')

View File

@@ -8,8 +8,12 @@
Helper function for parsing parameters and and outputting data Helper function for parsing parameters and and outputting data
specifically for the v1 version of the API. specifically for the v1 version of the API.
""" """
from typing import Tuple, Optional, Any, Dict, Iterable
from itertools import chain
import re
from nominatim.api.results import SearchResult, SearchResults, SourceTable from nominatim.api.results import SearchResult, SearchResults, SourceTable
from nominatim.api.types import SearchDetails, GeometryFormat
REVERSE_MAX_RANKS = [2, 2, 2, # 0-2 Continent/Sea REVERSE_MAX_RANKS = [2, 2, 2, # 0-2 Continent/Sea
4, 4, # 3-4 Country 4, 4, # 3-4 Country
@@ -33,6 +37,58 @@ def zoom_to_rank(zoom: int) -> int:
return REVERSE_MAX_RANKS[max(0, min(18, zoom))] return REVERSE_MAX_RANKS[max(0, min(18, zoom))]
FEATURE_TYPE_TO_RANK: Dict[Optional[str], Any] = {
'country': (4, 4),
'state': (8, 8),
'city': (14, 16),
'settlement': (8, 20)
}
def feature_type_to_rank(feature_type: Optional[str]) -> Tuple[int, int]:
""" Convert a feature type parameter to a tuple of
feature type name, minimum rank and maximum rank.
"""
return FEATURE_TYPE_TO_RANK.get(feature_type, (0, 30))
#pylint: disable=too-many-arguments
def extend_query_parts(queryparts: dict[str, Any], details: dict[str, Any],
feature_type: Optional[str],
namedetails: bool, extratags: bool,
excluded: Iterable[str]) -> None:
""" Add parameters from details dictionary to the query parts
dictionary which is suitable as URL parameter dictionary.
"""
parsed = SearchDetails.from_kwargs(details)
if parsed.geometry_output != GeometryFormat.NONE:
for flag in parsed.geometry_output:
assert flag.name
queryparts[f'polygon_{flag.name.lower()}'] = '1'
if parsed.address_details:
queryparts['addressdetails'] = '1'
if namedetails:
queryparts['namedetails'] = '1'
if extratags:
queryparts['extratags'] = '1'
if parsed.geometry_simplification > 0.0:
queryparts['polygon_threshold'] = f"{parsed.geometry_simplification:.6g}"
if parsed.max_results != 10:
queryparts['limit'] = str(parsed.max_results)
if parsed.countries:
queryparts['countrycodes'] = ','.join(parsed.countries)
queryparts['exclude_place_ids'] = \
','.join(chain(excluded, map(str, parsed.excluded)))
if parsed.viewbox:
queryparts['viewbox'] = ','.join(f"{c:.7g}" for c in parsed.viewbox.coords)
if parsed.bounded_viewbox:
queryparts['bounded'] = '1'
if not details['dedupe']:
queryparts['dedupe'] = '0'
if feature_type in FEATURE_TYPE_TO_RANK:
queryparts['featureType'] = feature_type
def deduplicate_results(results: SearchResults, max_results: int) -> SearchResults: def deduplicate_results(results: SearchResults, max_results: int) -> SearchResults:
""" Remove results that look like duplicates. """ Remove results that look like duplicates.
@@ -69,3 +125,69 @@ def _is_postcode_relation_for(result: SearchResult, postcode: str) -> bool:
and result.category == ('boundary', 'postal_code') \ and result.category == ('boundary', 'postal_code') \
and result.names is not None \ and result.names is not None \
and result.names.get('ref') == postcode and result.names.get('ref') == postcode
def _deg(axis:str) -> str:
return f"(?P<{axis}_deg>\\d+\\.\\d+)°?"
def _deg_min(axis: str) -> str:
return f"(?P<{axis}_deg>\\d+)[°\\s]+(?P<{axis}_min>[\\d.]+)?[']*"
def _deg_min_sec(axis: str) -> str:
return f"(?P<{axis}_deg>\\d+)[°\\s]+(?P<{axis}_min>\\d+)['\\s]+(?P<{axis}_sec>[\\d.]+)?[\"″]*"
COORD_REGEX = [re.compile(r'(?:(?P<pre>.*?)\s+)??' + r + r'(?:\s+(?P<post>.*))?') for r in (
r"(?P<ns>[NS])\s*" + _deg('lat') + r"[\s,]+" + r"(?P<ew>[EW])\s*" + _deg('lon'),
_deg('lat') + r"\s*(?P<ns>[NS])[\s,]+" + _deg('lon') + r"\s*(?P<ew>[EW])",
r"(?P<ns>[NS])\s*" + _deg_min('lat') + r"[\s,]+" + r"(?P<ew>[EW])\s*" + _deg_min('lon'),
_deg_min('lat') + r"\s*(?P<ns>[NS])[\s,]+" + _deg_min('lon') + r"\s*(?P<ew>[EW])",
r"(?P<ns>[NS])\s*" + _deg_min_sec('lat') + r"[\s,]+" + r"(?P<ew>[EW])\s*" + _deg_min_sec('lon'),
_deg_min_sec('lat') + r"\s*(?P<ns>[NS])[\s,]+" + _deg_min_sec('lon') + r"\s*(?P<ew>[EW])",
r"\[?(?P<lat_deg>[+-]?\d+\.\d+)[\s,]+(?P<lon_deg>[+-]?\d+\.\d+)\]?"
)]
def extract_coords_from_query(query: str) -> Tuple[str, Optional[float], Optional[float]]:
""" Look for something that is formated like a coordinate at the
beginning or end of the query. If found, extract the coordinate and
return the remaining query (or the empty string if the query
consisted of nothing but a coordinate).
Only the first match will be returned.
"""
for regex in COORD_REGEX:
match = regex.fullmatch(query)
if match is None:
continue
groups = match.groupdict()
if not groups['pre'] or not groups['post']:
x = float(groups['lon_deg']) \
+ float(groups.get('lon_min', 0.0)) / 60.0 \
+ float(groups.get('lon_sec', 0.0)) / 3600.0
if groups.get('ew') == 'W':
x = -x
y = float(groups['lat_deg']) \
+ float(groups.get('lat_min', 0.0)) / 60.0 \
+ float(groups.get('lat_sec', 0.0)) / 3600.0
if groups.get('ns') == 'S':
y = -y
return groups['pre'] or groups['post'] or '', x, y
return query, None, None
CATEGORY_REGEX = re.compile(r'(?P<pre>.*?)\[(?P<cls>[a-zA-Z_]+)=(?P<typ>[a-zA-Z_]+)\](?P<post>.*)')
def extract_category_from_query(query: str) -> Tuple[str, Optional[str], Optional[str]]:
""" Extract a hidden category specification of the form '[key=value]' from
the query. If found, extract key and value and
return the remaining query (or the empty string if the query
consisted of nothing but a category).
Only the first match will be returned.
"""
match = CATEGORY_REGEX.search(query)
if match is not None:
return (match.group('pre').strip() + ' ' + match.group('post').strip()).strip(), \
match.group('cls'), match.group('typ')
return query, None, None

View File

@@ -11,8 +11,11 @@ Combine with the scaffolding provided for the various Python ASGI frameworks.
from typing import Optional, Any, Type, Callable, NoReturn, Dict, cast from typing import Optional, Any, Type, Callable, NoReturn, Dict, cast
from functools import reduce from functools import reduce
import abc import abc
import dataclasses
import math import math
from urllib.parse import urlencode
from nominatim.errors import UsageError
from nominatim.config import Configuration from nominatim.config import Configuration
import nominatim.api as napi import nominatim.api as napi
import nominatim.api.logging as loglib import nominatim.api.logging as loglib
@@ -321,7 +324,6 @@ async def reverse_endpoint(api: napi.NominatimAPIAsync, params: ASGIAdaptor) ->
fmt = params.parse_format(napi.ReverseResults, 'xml') fmt = params.parse_format(napi.ReverseResults, 'xml')
debug = params.setup_debugging() debug = params.setup_debugging()
coord = napi.Point(params.get_float('lon'), params.get_float('lat')) coord = napi.Point(params.get_float('lon'), params.get_float('lat'))
locales = napi.Locales.from_accept_languages(params.get_accepted_languages())
details = params.parse_geometry_details(fmt) details = params.parse_geometry_details(fmt)
details['max_rank'] = helpers.zoom_to_rank(params.get_int('zoom', 18)) details['max_rank'] = helpers.zoom_to_rank(params.get_int('zoom', 18))
@@ -332,12 +334,22 @@ async def reverse_endpoint(api: napi.NominatimAPIAsync, params: ASGIAdaptor) ->
if debug: if debug:
return params.build_response(loglib.get_and_disable()) return params.build_response(loglib.get_and_disable())
fmt_options = {'extratags': params.get_bool('extratags', False), if fmt == 'xml':
queryparts = {'lat': str(coord.lat), 'lon': str(coord.lon), 'format': 'xml'}
zoom = params.get('zoom', None)
if zoom:
queryparts['zoom'] = zoom
query = urlencode(queryparts)
else:
query = ''
fmt_options = {'query': query,
'extratags': params.get_bool('extratags', False),
'namedetails': params.get_bool('namedetails', False), 'namedetails': params.get_bool('namedetails', False),
'addressdetails': params.get_bool('addressdetails', True)} 'addressdetails': params.get_bool('addressdetails', True)}
if result: if result:
result.localize(locales) result.localize(napi.Locales.from_accept_languages(params.get_accepted_languages()))
output = formatting.format_result(napi.ReverseResults([result] if result else []), output = formatting.format_result(napi.ReverseResults([result] if result else []),
fmt, fmt_options) fmt, fmt_options)
@@ -350,7 +362,6 @@ async def lookup_endpoint(api: napi.NominatimAPIAsync, params: ASGIAdaptor) -> A
""" """
fmt = params.parse_format(napi.SearchResults, 'xml') fmt = params.parse_format(napi.SearchResults, 'xml')
debug = params.setup_debugging() debug = params.setup_debugging()
locales = napi.Locales.from_accept_languages(params.get_accepted_languages())
details = params.parse_geometry_details(fmt) details = params.parse_geometry_details(fmt)
places = [] places = []
@@ -371,18 +382,119 @@ async def lookup_endpoint(api: napi.NominatimAPIAsync, params: ASGIAdaptor) -> A
'namedetails': params.get_bool('namedetails', False), 'namedetails': params.get_bool('namedetails', False),
'addressdetails': params.get_bool('addressdetails', True)} 'addressdetails': params.get_bool('addressdetails', True)}
for result in results: results.localize(napi.Locales.from_accept_languages(params.get_accepted_languages()))
result.localize(locales)
output = formatting.format_result(results, fmt, fmt_options) output = formatting.format_result(results, fmt, fmt_options)
return params.build_response(output) return params.build_response(output)
async def _unstructured_search(query: str, api: napi.NominatimAPIAsync,
details: Dict[str, Any]) -> napi.SearchResults:
if not query:
return napi.SearchResults()
# Extract special format for coordinates from query.
query, x, y = helpers.extract_coords_from_query(query)
if x is not None:
assert y is not None
details['near'] = napi.Point(x, y)
details['near_radius'] = 0.1
# If no query is left, revert to reverse search.
if x is not None and not query:
result = await api.reverse(details['near'], **details)
if not result:
return napi.SearchResults()
return napi.SearchResults(
[napi.SearchResult(**{f.name: getattr(result, f.name)
for f in dataclasses.fields(napi.SearchResult)
if hasattr(result, f.name)})])
query, cls, typ = helpers.extract_category_from_query(query)
if cls is not None:
assert typ is not None
return await api.search_category([(cls, typ)], near_query=query, **details)
return await api.search(query, **details)
async def search_endpoint(api: napi.NominatimAPIAsync, params: ASGIAdaptor) -> Any:
""" Server glue for /search endpoint. See API docs for details.
"""
fmt = params.parse_format(napi.SearchResults, 'jsonv2')
debug = params.setup_debugging()
details = params.parse_geometry_details(fmt)
details['countries'] = params.get('countrycodes', None)
details['excluded'] = params.get('exclude_place_ids', None)
details['viewbox'] = params.get('viewbox', None) or params.get('viewboxlbrt', None)
details['bounded_viewbox'] = params.get_bool('bounded', False)
details['dedupe'] = params.get_bool('dedupe', True)
max_results = max(1, min(50, params.get_int('limit', 10)))
details['max_results'] = max_results + min(10, max_results) \
if details['dedupe'] else max_results
details['min_rank'], details['max_rank'] = \
helpers.feature_type_to_rank(params.get('featureType', ''))
query = params.get('q', None)
queryparts = {}
try:
if query is not None:
queryparts['q'] = query
results = await _unstructured_search(query, api, details)
else:
for key in ('amenity', 'street', 'city', 'county', 'state', 'postalcode', 'country'):
details[key] = params.get(key, None)
if details[key]:
queryparts[key] = details[key]
query = ', '.join(queryparts.values())
results = await api.search_address(**details)
except UsageError as err:
params.raise_error(str(err))
results.localize(napi.Locales.from_accept_languages(params.get_accepted_languages()))
if details['dedupe'] and len(results) > 1:
results = helpers.deduplicate_results(results, max_results)
if debug:
return params.build_response(loglib.get_and_disable())
if fmt == 'xml':
helpers.extend_query_parts(queryparts, details,
params.get('featureType', ''),
params.get_bool('namedetails', False),
params.get_bool('extratags', False),
(str(r.place_id) for r in results if r.place_id))
queryparts['format'] = fmt
moreurl = urlencode(queryparts)
else:
moreurl = ''
fmt_options = {'query': query, 'more_url': moreurl,
'exclude_place_ids': queryparts.get('exclude_place_ids'),
'viewbox': queryparts.get('viewbox'),
'extratags': params.get_bool('extratags', False),
'namedetails': params.get_bool('namedetails', False),
'addressdetails': params.get_bool('addressdetails', False)}
output = formatting.format_result(results, fmt, fmt_options)
return params.build_response(output)
EndpointFunc = Callable[[napi.NominatimAPIAsync, ASGIAdaptor], Any] EndpointFunc = Callable[[napi.NominatimAPIAsync, ASGIAdaptor], Any]
ROUTES = [ ROUTES = [
('status', status_endpoint), ('status', status_endpoint),
('details', details_endpoint), ('details', details_endpoint),
('reverse', reverse_endpoint), ('reverse', reverse_endpoint),
('lookup', lookup_endpoint) ('lookup', lookup_endpoint),
('search', search_endpoint)
] ]

View File

@@ -0,0 +1,112 @@
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2023 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Tests for the helper functions for v1 API.
"""
import pytest
import nominatim.api.v1.helpers as helper
@pytest.mark.parametrize('inp', ['', 'abc', '12 23', 'abc -78.90, 12.456 def'])
def test_extract_coords_no_coords(inp):
query, x, y = helper.extract_coords_from_query(inp)
assert query == inp
assert x is None
assert y is None
def test_extract_coords_null_island():
assert ('', 0.0, 0.0) == helper.extract_coords_from_query('0.0 -0.0')
def test_extract_coords_with_text_before():
assert ('abc', 12.456, -78.90) == helper.extract_coords_from_query('abc -78.90, 12.456')
def test_extract_coords_with_text_after():
assert ('abc', 12.456, -78.90) == helper.extract_coords_from_query('-78.90, 12.456 abc')
@pytest.mark.parametrize('inp', [' [12.456,-78.90] ', ' 12.456,-78.90 '])
def test_extract_coords_with_spaces(inp):
assert ('', -78.90, 12.456) == helper.extract_coords_from_query(inp)
@pytest.mark.parametrize('inp', ['40 26.767 N 79 58.933 W',
'40° 26.767 N 79° 58.933 W',
"40° 26.767' N 79° 58.933' W",
"40° 26.767'\n"
" N 79° 58.933' W",
'N 40 26.767, W 79 58.933',
'N 40°26.767, W 79°58.933',
' N 40°26.767, W 79°58.933',
"N 40°26.767', W 79°58.933'",
'40 26 46 N 79 58 56 W',
'40° 26 46″ N 79° 58 56″ W',
'40° 26 46.00″ N 79° 58 56.00″ W',
'40°2646″N 79°5856″W',
'N 40 26 46 W 79 58 56',
'N 40° 26 46″, W 79° 58 56″',
'N 40° 26\' 46", W 79° 58\' 56"',
'N 40° 26\' 46", W 79° 58\' 56"',
'40.446 -79.982',
'40.446,-79.982',
'40.446° N 79.982° W',
'N 40.446° W 79.982°',
'[40.446 -79.982]',
'[40.446, -79.982]',
' 40.446 , -79.982 ',
' 40.446 , -79.982 ',
' 40.446 , -79.982 ',
' 40.446 , -79.982 '])
def test_extract_coords_formats(inp):
query, x, y = helper.extract_coords_from_query(inp)
assert query == ''
assert pytest.approx(x, abs=0.001) == -79.982
assert pytest.approx(y, abs=0.001) == 40.446
query, x, y = helper.extract_coords_from_query('foo bar ' + inp)
assert query == 'foo bar'
assert pytest.approx(x, abs=0.001) == -79.982
assert pytest.approx(y, abs=0.001) == 40.446
query, x, y = helper.extract_coords_from_query(inp + ' x')
assert query == 'x'
assert pytest.approx(x, abs=0.001) == -79.982
assert pytest.approx(y, abs=0.001) == 40.446
def test_extract_coords_formats_southeast():
query, x, y = helper.extract_coords_from_query('S 40 26.767, E 79 58.933')
assert query == ''
assert pytest.approx(x, abs=0.001) == 79.982
assert pytest.approx(y, abs=0.001) == -40.446
@pytest.mark.parametrize('inp', ['[shop=fish] foo bar',
'foo [shop=fish] bar',
'foo [shop=fish]bar',
'foo bar [shop=fish]'])
def test_extract_category_good(inp):
query, cls, typ = helper.extract_category_from_query(inp)
assert query == 'foo bar'
assert cls == 'shop'
assert typ == 'fish'
def test_extract_category_only():
assert helper.extract_category_from_query('[shop=market]') == ('', 'shop', 'market')
@pytest.mark.parametrize('inp', ['house []', 'nothing', '[352]'])
def test_extract_category_no_match(inp):
assert helper.extract_category_from_query(inp) == (inp, None, None)

View File

@@ -32,9 +32,9 @@ FakeResponse = namedtuple('FakeResponse', ['status', 'output', 'content_type'])
class FakeAdaptor(glue.ASGIAdaptor): class FakeAdaptor(glue.ASGIAdaptor):
def __init__(self, params={}, headers={}, config=None): def __init__(self, params=None, headers=None, config=None):
self.params = params self.params = params or {}
self.headers = headers self.headers = headers or {}
self._config = config or Configuration(None) self._config = config or Configuration(None)
@@ -386,6 +386,63 @@ class TestDetailsEndpoint:
await glue.details_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a) await glue.details_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
# reverse_endpoint()
class TestReverseEndPoint:
@pytest.fixture(autouse=True)
def patch_reverse_func(self, monkeypatch):
self.result = napi.ReverseResult(napi.SourceTable.PLACEX,
('place', 'thing'),
napi.Point(1.0, 2.0))
async def _reverse(*args, **kwargs):
return self.result
monkeypatch.setattr(napi.NominatimAPIAsync, 'reverse', _reverse)
@pytest.mark.asyncio
@pytest.mark.parametrize('params', [{}, {'lat': '3.4'}, {'lon': '6.7'}])
async def test_reverse_no_params(self, params):
a = FakeAdaptor()
a.params = params
a.params['format'] = 'xml'
with pytest.raises(FakeError, match='^400 -- (?s:.*)missing'):
await glue.reverse_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
@pytest.mark.asyncio
@pytest.mark.parametrize('params', [{'lat': '45.6', 'lon': '4563'}])
async def test_reverse_success(self, params):
a = FakeAdaptor()
a.params = params
a.params['format'] = 'json'
res = await glue.reverse_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
assert res == ''
@pytest.mark.asyncio
async def test_reverse_success(self):
a = FakeAdaptor()
a.params['lat'] = '56.3'
a.params['lon'] = '6.8'
assert await glue.reverse_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
@pytest.mark.asyncio
async def test_reverse_from_search(self):
a = FakeAdaptor()
a.params['q'] = '34.6 2.56'
a.params['format'] = 'json'
res = await glue.search_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
assert len(json.loads(res.output)) == 1
# lookup_endpoint() # lookup_endpoint()
class TestLookupEndpoint: class TestLookupEndpoint:
@@ -444,3 +501,111 @@ class TestLookupEndpoint:
res = await glue.lookup_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a) res = await glue.lookup_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
assert len(json.loads(res.output)) == 1 assert len(json.loads(res.output)) == 1
# search_endpoint()
class TestSearchEndPointSearch:
@pytest.fixture(autouse=True)
def patch_lookup_func(self, monkeypatch):
self.results = [napi.SearchResult(napi.SourceTable.PLACEX,
('place', 'thing'),
napi.Point(1.0, 2.0))]
async def _search(*args, **kwargs):
return napi.SearchResults(self.results)
monkeypatch.setattr(napi.NominatimAPIAsync, 'search', _search)
@pytest.mark.asyncio
async def test_search_free_text(self):
a = FakeAdaptor()
a.params['q'] = 'something'
res = await glue.search_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
assert len(json.loads(res.output)) == 1
@pytest.mark.asyncio
async def test_search_free_text_xml(self):
a = FakeAdaptor()
a.params['q'] = 'something'
a.params['format'] = 'xml'
res = await glue.search_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
assert res.status == 200
assert res.output.index('something') > 0
@pytest.mark.asyncio
async def test_search_free_and_structured(self):
a = FakeAdaptor()
a.params['q'] = 'something'
a.params['city'] = 'ignored'
res = await glue.search_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
assert len(json.loads(res.output)) == 1
@pytest.mark.asyncio
@pytest.mark.parametrize('dedupe,numres', [(True, 1), (False, 2)])
async def test_search_dedupe(self, dedupe, numres):
self.results = self.results * 2
a = FakeAdaptor()
a.params['q'] = 'something'
if not dedupe:
a.params['dedupe'] = '0'
res = await glue.search_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
assert len(json.loads(res.output)) == numres
class TestSearchEndPointSearchAddress:
@pytest.fixture(autouse=True)
def patch_lookup_func(self, monkeypatch):
self.results = [napi.SearchResult(napi.SourceTable.PLACEX,
('place', 'thing'),
napi.Point(1.0, 2.0))]
async def _search(*args, **kwargs):
return napi.SearchResults(self.results)
monkeypatch.setattr(napi.NominatimAPIAsync, 'search_address', _search)
@pytest.mark.asyncio
async def test_search_structured(self):
a = FakeAdaptor()
a.params['street'] = 'something'
res = await glue.search_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
assert len(json.loads(res.output)) == 1
class TestSearchEndPointSearchCategory:
@pytest.fixture(autouse=True)
def patch_lookup_func(self, monkeypatch):
self.results = [napi.SearchResult(napi.SourceTable.PLACEX,
('place', 'thing'),
napi.Point(1.0, 2.0))]
async def _search(*args, **kwargs):
return napi.SearchResults(self.results)
monkeypatch.setattr(napi.NominatimAPIAsync, 'search_category', _search)
@pytest.mark.asyncio
async def test_search_category(self):
a = FakeAdaptor()
a.params['q'] = '[shop=fog]'
res = await glue.search_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
assert len(json.loads(res.output)) == 1