Merge pull request #3515 from lonvia/custom-result-formatting

Add the capability to define custom formatting functions for API output
This commit is contained in:
Sarah Hoffmann
2024-08-15 09:26:27 +02:00
committed by GitHub
14 changed files with 617 additions and 301 deletions

View File

@@ -7,6 +7,8 @@ the following configurable parts:
can be set in your local `.env` configuration
* [Import styles](Import-Styles.md) explains how to write your own import style
in order to control what kind of OSM data will be imported
* [API Result Formatting](Result-Formatting.md) shows how to change the
output of the Nominatim API
* [Place ranking](Ranking.md) describes the configuration around classifing
places in terms of their importance and their role in an address
* [Tokenizers](Tokenizers.md) describes the configuration of the module

View File

@@ -0,0 +1,176 @@
# Changing the Appearance of Results in the Server API
The Nominatim Server API offers a number of formatting options that
present search results in [different output formats](../api/Output.md).
These results only contain a subset of all the information that Nominatim
has about the result. This page explains how to adapt the result output
or add additional result formatting.
## Defining custom result formatting
To change the result output, you need to place a file `api/v1/format.py`
into your project directory. This file needs to define a single variable
`dispatch` containing a [FormatDispatcher](#formatdispatcher). This class
serves to collect the functions for formatting the different result types
and offers helper functions to apply the formatters.
There are two ways to define the `dispatch` variable. If you want to reuse
the default output formatting and just make some changes or add an additional
format type, then import the dispatch object from the default API:
``` python
from nominatim_api.v1.format import dispatch as dispatch
```
If you prefer to define a completely new result output, then you can
create an empty dispatcher object:
``` python
from nominatim_api import FormatDispatcher
dispatch = FormatDispatcher()
```
## The formatting function
The dispatcher organises the formatting functions by format and result type.
The format corresponds to the `format` parameter of the API. It can contain
one of the predefined format names or you can invent your own new format.
API calls return data classes or an array of a data class which represent
the result. You need to make sure there are formatters defined for the
following result types:
* StatusResult (single object, returned by `/status`)
* DetailedResult (single object, returned by `/details`)
* SearchResults (list of objects, returned by `/search`)
* ReverseResults (list of objects, returned by `/reverse` and `/lookup`)
* RawDataList (simple object, returned by `/deletable` and `/polygons`)
A formatter function has the following signature:
``` python
def format_func(result: ResultType, options: Mapping[str, Any]) -> str
```
The options dictionary contains additional information about the original
query. See the [reference below](#options-for-different-result-types)
about the possible options.
To set the result formatter for a certain result type and format, you need
to write the format function and decorate it with the
[`format_func`](#nominatim_api.FormatDispatcher.format_func)
decorator.
For example, let us extend the result for the status call in text format
and add the server URL. Such a formatter would look like this:
``` python
@dispatch.format_func(StatusResult, 'text')
def _format_status_text(result, _):
header = 'Status for server nominatim.openstreetmap.org'
if result.status:
return f"{header}\n\nERROR: {result.message}"
return f"{header}\n\nOK"
```
If your dispatcher is derived from the default one, then this definition
will overwrite the original formatter function. This way it is possible
to customize the output of selected results.
## Adding new formats
You may also define a completely different output format. This is as simple
as adding formatting functions for all result types using the custom
format name:
``` python
@dispatch.format_func(StatusResult, 'chatty')
def _format_status_text(result, _):
if result.status:
return f"The server is currently not running. {result.message}"
return f"Good news! The server is running just fine."
```
That's all. Nominatim will automatically pick up the new format name and
will allow the user to use it. Make sure to really define formatters for
**all** result types. If they are for endpoints that you do not intend to
use, you can simply return some static string but the function needs to be
there.
All responses will be returned with the content type application/json by
default. If your format produces a different content type, you need
to configure the content type with the `set_content_type()` function.
For example, the 'chatty' format above returns just simple text. So the
content type should be set up as:
``` python
from nominatim_api.server.content_types import CONTENT_TEXT
dispatch.set_content_type('chatty', CONTENT_TEXT)
```
The `content_types` module used above provides constants for the most
frequent content types. You set the content type to an arbitrary string,
if the content type you need is not available.
## Reference
### FormatDispatcher
::: nominatim_api.FormatDispatcher
options:
heading_level: 6
group_by_category: False
### JsonWriter
::: nominatim_api.utils.json_writer.JsonWriter
options:
heading_level: 6
group_by_category: False
### Options for different result types
This section lists the options that may be handed in with the different result
types in the v1 version of the Nominatim API.
#### StatusResult
_None._
#### DetailedResult
| Option | Description |
|-----------------|-------------|
| locales | [Locale](../library/Result-Handling.md#locale) object for the requested language(s) |
| group_hierarchy | Setting of [group_hierarchy](../api/Details.md#output-details) parameter |
| icon_base_url | (optional) URL pointing to icons as set in [NOMINATIM_MAPICON_URL](Settings.md#nominatim_mapicon_url) |
#### SearchResults
| Option | Description |
|-----------------|-------------|
| query | Original query string |
| more_url | URL for requesting additional results for the same query |
| exclude_place_ids | List of place IDs already returned |
| viewbox | Setting of [viewbox](../api/Search.md#result-restriction) parameter |
| extratags | Setting of [extratags](../api/Search.md#output-details) parameter |
| namedetails | Setting of [namedetails](../api/Search.md#output-details) parameter |
| addressdetails | Setting of [addressdetails](../api/Search.md#output-details) parameter |
#### ReverseResults
| Option | Description |
|-----------------|-------------|
| query | Original query string |
| extratags | Setting of [extratags](../api/Search.md#output-details) parameter |
| namedetails | Setting of [namedetails](../api/Search.md#output-details) parameter |
| addressdetails | Setting of [addressdetails](../api/Search.md#output-details) parameter |
#### RawDataList
_None._

View File

@@ -35,6 +35,7 @@ nav:
- 'Overview': 'customize/Overview.md'
- 'Import Styles': 'customize/Import-Styles.md'
- 'Configuration Settings': 'customize/Settings.md'
- 'API Result Formatting': 'customize/Result-Formatting.md'
- 'Per-Country Data': 'customize/Country-Settings.md'
- 'Place Ranking' : 'customize/Ranking.md'
- 'Importance' : 'customize/Importance.md'

View File

@@ -39,5 +39,6 @@ from .results import (SourceTable as SourceTable,
SearchResult as SearchResult,
SearchResults as SearchResults)
from .localization import (Locales as Locales)
from .result_formatting import (FormatDispatcher as FormatDispatcher)
from .version import NOMINATIM_API_VERSION as __version__

View File

@@ -7,19 +7,28 @@
"""
Helper classes and functions for formatting results into API responses.
"""
from typing import Type, TypeVar, Dict, List, Callable, Any, Mapping
from typing import Type, TypeVar, Dict, List, Callable, Any, Mapping, Optional, cast
from collections import defaultdict
from pathlib import Path
import importlib
from .server.content_types import CONTENT_JSON
T = TypeVar('T') # pylint: disable=invalid-name
FormatFunc = Callable[[T, Mapping[str, Any]], str]
ErrorFormatFunc = Callable[[str, str, int], str]
class FormatDispatcher:
""" Helper class to conveniently create formatting functions in
a module using decorators.
""" Container for formatting functions for results.
Functions can conveniently be added by using decorated functions.
"""
def __init__(self) -> None:
def __init__(self, content_types: Optional[Mapping[str, str]] = None) -> None:
self.error_handler: ErrorFormatFunc = lambda ct, msg, status: f"ERROR {status}: {msg}"
self.content_types: Dict[str, str] = {}
if content_types:
self.content_types.update(content_types)
self.format_functions: Dict[Type[Any], Dict[str, FormatFunc[Any]]] = defaultdict(dict)
@@ -35,6 +44,15 @@ class FormatDispatcher:
return decorator
def error_format_func(self, func: ErrorFormatFunc) -> ErrorFormatFunc:
""" Decorator for a function that formats error messges.
There is only one error formatter per dispatcher. Using
the decorator repeatedly will overwrite previous functions.
"""
self.error_handler = func
return func
def list_formats(self, result_type: Type[Any]) -> List[str]:
""" Return a list of formats supported by this formatter.
"""
@@ -54,3 +72,56 @@ class FormatDispatcher:
`list_formats()`.
"""
return self.format_functions[type(result)][fmt](result, options)
def format_error(self, content_type: str, msg: str, status: int) -> str:
""" Convert the given error message into a response string
taking the requested content_type into account.
Change the format using the error_format_func decorator.
"""
return self.error_handler(content_type, msg, status)
def set_content_type(self, fmt: str, content_type: str) -> None:
""" Set the content type for the given format. This is the string
that will be returned in the Content-Type header of the HTML
response, when the given format is choosen.
"""
self.content_types[fmt] = content_type
def get_content_type(self, fmt: str) -> str:
""" Return the content type for the given format.
If no explicit content type has been defined, then
JSON format is assumed.
"""
return self.content_types.get(fmt, CONTENT_JSON)
def load_format_dispatcher(api_name: str, project_dir: Optional[Path]) -> FormatDispatcher:
""" Load the dispatcher for the given API.
The function first tries to find a module api/<api_name>/format.py
in the project directory. This file must export a single variable
`dispatcher`.
If the function does not exist, the default formatter is loaded.
"""
if project_dir is not None:
priv_module = project_dir / 'api' / api_name / 'format.py'
if priv_module.is_file():
spec = importlib.util.spec_from_file_location(f'api.{api_name},format',
str(priv_module))
if spec:
module = importlib.util.module_from_spec(spec)
# Do not add to global modules because there is no standard
# module name that Python can resolve.
assert spec.loader is not None
spec.loader.exec_module(module)
return cast(FormatDispatcher, module.dispatch)
return cast(FormatDispatcher,
importlib.import_module(f'nominatim_api.{api_name}.format').dispatch)

View File

@@ -0,0 +1,156 @@
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2024 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Base abstraction for implementing based on different ASGI frameworks.
"""
from typing import Optional, Any, NoReturn, Callable
import abc
import math
from ..config import Configuration
from ..core import NominatimAPIAsync
from ..result_formatting import FormatDispatcher
from .content_types import CONTENT_TEXT
class ASGIAdaptor(abc.ABC):
""" Adapter class for the different ASGI frameworks.
Wraps functionality over concrete requests and responses.
"""
content_type: str = CONTENT_TEXT
@abc.abstractmethod
def get(self, name: str, default: Optional[str] = None) -> Optional[str]:
""" Return an input parameter as a string. If the parameter was
not provided, return the 'default' value.
"""
@abc.abstractmethod
def get_header(self, name: str, default: Optional[str] = None) -> Optional[str]:
""" Return a HTTP header parameter as a string. If the parameter was
not provided, return the 'default' value.
"""
@abc.abstractmethod
def error(self, msg: str, status: int = 400) -> Exception:
""" Construct an appropriate exception from the given error message.
The exception must result in a HTTP error with the given status.
"""
@abc.abstractmethod
def create_response(self, status: int, output: str, num_results: int) -> Any:
""" Create a response from the given parameters. The result will
be returned by the endpoint functions. The adaptor may also
return None when the response is created internally with some
different means.
The response must return the HTTP given status code 'status', set
the HTTP content-type headers to the string provided and the
body of the response to 'output'.
"""
@abc.abstractmethod
def base_uri(self) -> str:
""" Return the URI of the original request.
"""
@abc.abstractmethod
def config(self) -> Configuration:
""" Return the current configuration object.
"""
@abc.abstractmethod
def formatting(self) -> FormatDispatcher:
""" Return the formatting object to use.
"""
def get_int(self, name: str, default: Optional[int] = None) -> int:
""" Return an input parameter as an int. Raises an exception if
the parameter is given but not in an integer format.
If 'default' is given, then it will be returned when the parameter
is missing completely. When 'default' is None, an error will be
raised on a missing parameter.
"""
value = self.get(name)
if value is None:
if default is not None:
return default
self.raise_error(f"Parameter '{name}' missing.")
try:
intval = int(value)
except ValueError:
self.raise_error(f"Parameter '{name}' must be a number.")
return intval
def get_float(self, name: str, default: Optional[float] = None) -> float:
""" Return an input parameter as a flaoting-point number. Raises an
exception if the parameter is given but not in an float format.
If 'default' is given, then it will be returned when the parameter
is missing completely. When 'default' is None, an error will be
raised on a missing parameter.
"""
value = self.get(name)
if value is None:
if default is not None:
return default
self.raise_error(f"Parameter '{name}' missing.")
try:
fval = float(value)
except ValueError:
self.raise_error(f"Parameter '{name}' must be a number.")
if math.isnan(fval) or math.isinf(fval):
self.raise_error(f"Parameter '{name}' must be a number.")
return fval
def get_bool(self, name: str, default: Optional[bool] = None) -> bool:
""" Return an input parameter as bool. Only '0' is accepted as
an input for 'false' all other inputs will be interpreted as 'true'.
If 'default' is given, then it will be returned when the parameter
is missing completely. When 'default' is None, an error will be
raised on a missing parameter.
"""
value = self.get(name)
if value is None:
if default is not None:
return default
self.raise_error(f"Parameter '{name}' missing.")
return value != '0'
def raise_error(self, msg: str, status: int = 400) -> NoReturn:
""" Raise an exception resulting in the given HTTP status and
message. The message will be formatted according to the
output format chosen by the request.
"""
raise self.error(self.formatting().format_error(self.content_type, msg, status),
status)
EndpointFunc = Callable[[NominatimAPIAsync, ASGIAdaptor], Any]

View File

@@ -0,0 +1,14 @@
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2024 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Constants for various content types for server responses.
"""
CONTENT_TEXT = 'text/plain; charset=utf-8'
CONTENT_XML = 'text/xml; charset=utf-8'
CONTENT_HTML = 'text/html; charset=utf-8'
CONTENT_JSON = 'application/json; charset=utf-8'

View File

@@ -17,7 +17,9 @@ from falcon.asgi import App, Request, Response
from ...config import Configuration
from ...core import NominatimAPIAsync
from ... import v1 as api_impl
from ...result_formatting import FormatDispatcher, load_format_dispatcher
from ... import logging as loglib
from ..asgi_adaptor import ASGIAdaptor, EndpointFunc
class HTTPNominatimError(Exception):
""" A special exception class for errors raised during processing.
@@ -57,15 +59,16 @@ async def timeout_error_handler(req: Request, resp: Response, #pylint: disable=u
resp.content_type = 'text/plain; charset=utf-8'
class ParamWrapper(api_impl.ASGIAdaptor):
class ParamWrapper(ASGIAdaptor):
""" Adaptor class for server glue to Falcon framework.
"""
def __init__(self, req: Request, resp: Response,
config: Configuration) -> None:
config: Configuration, formatter: FormatDispatcher) -> None:
self.request = req
self.response = resp
self._config = config
self._formatter = formatter
def get(self, name: str, default: Optional[str] = None) -> Optional[str]:
@@ -93,21 +96,27 @@ class ParamWrapper(api_impl.ASGIAdaptor):
def config(self) -> Configuration:
return self._config
def formatting(self) -> FormatDispatcher:
return self._formatter
class EndpointWrapper:
""" Converter for server glue endpoint functions to Falcon request handlers.
"""
def __init__(self, name: str, func: api_impl.EndpointFunc, api: NominatimAPIAsync) -> None:
def __init__(self, name: str, func: EndpointFunc, api: NominatimAPIAsync,
formatter: FormatDispatcher) -> None:
self.name = name
self.func = func
self.api = api
self.formatter = formatter
async def on_get(self, req: Request, resp: Response) -> None:
""" Implementation of the endpoint.
"""
await self.func(self.api, ParamWrapper(req, resp, self.api.config))
await self.func(self.api, ParamWrapper(req, resp, self.api.config,
self.formatter))
class FileLoggingMiddleware:
@@ -177,8 +186,9 @@ def get_application(project_dir: Path,
app.add_error_handler(asyncio.TimeoutError, timeout_error_handler)
legacy_urls = api.config.get_bool('SERVE_LEGACY_URLS')
formatter = load_format_dispatcher('v1', project_dir)
for name, func in api_impl.ROUTES:
endpoint = EndpointWrapper(name, func, api)
endpoint = EndpointWrapper(name, func, api, formatter)
app.add_route(f"/{name}", endpoint)
if legacy_urls:
app.add_route(f"/{name}.php", endpoint)

View File

@@ -24,9 +24,11 @@ from starlette.middleware.cors import CORSMiddleware
from ...config import Configuration
from ...core import NominatimAPIAsync
from ... import v1 as api_impl
from ...result_formatting import FormatDispatcher, load_format_dispatcher
from ..asgi_adaptor import ASGIAdaptor, EndpointFunc
from ... import logging as loglib
class ParamWrapper(api_impl.ASGIAdaptor):
class ParamWrapper(ASGIAdaptor):
""" Adaptor class for server glue to Starlette framework.
"""
@@ -69,7 +71,11 @@ class ParamWrapper(api_impl.ASGIAdaptor):
return cast(Configuration, self.request.app.state.API.config)
def _wrap_endpoint(func: api_impl.EndpointFunc)\
def formatting(self) -> FormatDispatcher:
return cast(FormatDispatcher, self.request.app.state.API.formatter)
def _wrap_endpoint(func: EndpointFunc)\
-> Callable[[Request], Coroutine[Any, Any, Response]]:
async def _callback(request: Request) -> Response:
return cast(Response, await func(request.app.state.API, ParamWrapper(request)))
@@ -164,6 +170,7 @@ def get_application(project_dir: Path,
on_shutdown=[_shutdown])
app.state.API = NominatimAPIAsync(project_dir, environ)
app.state.formatter = load_format_dispatcher('v1', project_dir)
return app

View File

@@ -10,9 +10,7 @@ Implementation of API version v1 (aka the legacy version).
#pylint: disable=useless-import-alias
from .server_glue import (ASGIAdaptor as ASGIAdaptor,
EndpointFunc as EndpointFunc,
ROUTES as ROUTES)
from .server_glue import ROUTES as ROUTES
from . import format as _format

View File

@@ -19,12 +19,38 @@ from ..localization import Locales
from ..result_formatting import FormatDispatcher
from .classtypes import ICONS
from . import format_json, format_xml
from .. import logging as loglib
from ..server import content_types as ct
class RawDataList(List[Dict[str, Any]]):
""" Data type for formatting raw data lists 'as is' in json.
"""
dispatch = FormatDispatcher()
dispatch = FormatDispatcher({'text': ct.CONTENT_TEXT,
'xml': ct.CONTENT_XML,
'debug': ct.CONTENT_HTML})
@dispatch.error_format_func
def _format_error(content_type: str, msg: str, status: int) -> str:
if content_type == ct.CONTENT_XML:
return f"""<?xml version="1.0" encoding="UTF-8" ?>
<error>
<code>{status}</code>
<message>{msg}</message>
</error>
"""
if content_type == ct.CONTENT_JSON:
return f"""{{"error":{{"code":{status},"message":"{msg}"}}}}"""
if content_type == ct.CONTENT_HTML:
loglib.log().section('Execution error')
loglib.log().var_dump('Status', status)
loglib.log().var_dump('Message', msg)
return loglib.get_and_disable()
return f"ERROR {status}: {msg}"
@dispatch.format_func(StatusResult, 'text')
def _format_status_text(result: StatusResult, _: Mapping[str, Any]) -> str:

View File

@@ -8,267 +8,118 @@
Generic part of the server implementation of the v1 API.
Combine with the scaffolding provided for the various Python ASGI frameworks.
"""
from typing import Optional, Any, Type, Callable, NoReturn, Dict, cast
from typing import Optional, Any, Type, Dict, cast
from functools import reduce
import abc
import dataclasses
import math
from urllib.parse import urlencode
import sqlalchemy as sa
from ..errors import UsageError
from ..config import Configuration
from .. import logging as loglib
from ..core import NominatimAPIAsync
from .format import dispatch as formatting
from .format import RawDataList
from ..types import DataLayer, GeometryFormat, PlaceRef, PlaceID, OsmID, Point
from ..status import StatusResult
from ..results import DetailedResult, ReverseResults, SearchResult, SearchResults
from ..localization import Locales
from . import helpers
from ..server import content_types as ct
from ..server.asgi_adaptor import ASGIAdaptor
CONTENT_TEXT = 'text/plain; charset=utf-8'
CONTENT_XML = 'text/xml; charset=utf-8'
CONTENT_HTML = 'text/html; charset=utf-8'
CONTENT_JSON = 'application/json; charset=utf-8'
CONTENT_TYPE = {'text': CONTENT_TEXT, 'xml': CONTENT_XML, 'debug': CONTENT_HTML}
class ASGIAdaptor(abc.ABC):
""" Adapter class for the different ASGI frameworks.
Wraps functionality over concrete requests and responses.
def build_response(adaptor: ASGIAdaptor, output: str, status: int = 200,
num_results: int = 0) -> Any:
""" Create a response from the given output. Wraps a JSONP function
around the response, if necessary.
"""
content_type: str = CONTENT_TEXT
if adaptor.content_type == ct.CONTENT_JSON and status == 200:
jsonp = adaptor.get('json_callback')
if jsonp is not None:
if any(not part.isidentifier() for part in jsonp.split('.')):
adaptor.raise_error('Invalid json_callback value')
output = f"{jsonp}({output})"
adaptor.content_type = 'application/javascript; charset=utf-8'
@abc.abstractmethod
def get(self, name: str, default: Optional[str] = None) -> Optional[str]:
""" Return an input parameter as a string. If the parameter was
not provided, return the 'default' value.
"""
@abc.abstractmethod
def get_header(self, name: str, default: Optional[str] = None) -> Optional[str]:
""" Return a HTTP header parameter as a string. If the parameter was
not provided, return the 'default' value.
"""
return adaptor.create_response(status, output, num_results)
@abc.abstractmethod
def error(self, msg: str, status: int = 400) -> Exception:
""" Construct an appropriate exception from the given error message.
The exception must result in a HTTP error with the given status.
"""
def get_accepted_languages(adaptor: ASGIAdaptor) -> str:
""" Return the accepted languages.
"""
return adaptor.get('accept-language')\
or adaptor.get_header('accept-language')\
or adaptor.config().DEFAULT_LANGUAGE
@abc.abstractmethod
def create_response(self, status: int, output: str, num_results: int) -> Any:
""" Create a response from the given parameters. The result will
be returned by the endpoint functions. The adaptor may also
return None when the response is created internally with some
different means.
def setup_debugging(adaptor: ASGIAdaptor) -> bool:
""" Set up collection of debug information if requested.
The response must return the HTTP given status code 'status', set
the HTTP content-type headers to the string provided and the
body of the response to 'output'.
"""
Return True when debugging was requested.
"""
if adaptor.get_bool('debug', False):
loglib.set_log_output('html')
adaptor.content_type = ct.CONTENT_HTML
return True
@abc.abstractmethod
def base_uri(self) -> str:
""" Return the URI of the original request.
"""
return False
@abc.abstractmethod
def config(self) -> Configuration:
""" Return the current configuration object.
"""
def get_layers(adaptor: ASGIAdaptor) -> Optional[DataLayer]:
""" Return a parsed version of the layer parameter.
"""
param = adaptor.get('layer', None)
if param is None:
return None
return cast(DataLayer,
reduce(DataLayer.__or__,
(getattr(DataLayer, s.upper()) for s in param.split(','))))
def build_response(self, output: str, status: int = 200, num_results: int = 0) -> Any:
""" Create a response from the given output. Wraps a JSONP function
around the response, if necessary.
"""
if self.content_type == CONTENT_JSON and status == 200:
jsonp = self.get('json_callback')
if jsonp is not None:
if any(not part.isidentifier() for part in jsonp.split('.')):
self.raise_error('Invalid json_callback value')
output = f"{jsonp}({output})"
self.content_type = 'application/javascript; charset=utf-8'
def parse_format(adaptor: ASGIAdaptor, result_type: Type[Any], default: str) -> str:
""" Get and check the 'format' parameter and prepare the formatter.
`result_type` is the type of result to be returned by the function
and `default` the format value to assume when no parameter is present.
"""
fmt = adaptor.get('format', default=default)
assert fmt is not None
return self.create_response(status, output, num_results)
formatting = adaptor.formatting()
if not formatting.supports_format(result_type, fmt):
adaptor.raise_error("Parameter 'format' must be one of: " +
', '.join(formatting.list_formats(result_type)))
adaptor.content_type = formatting.get_content_type(fmt)
return fmt
def raise_error(self, msg: str, status: int = 400) -> NoReturn:
""" Raise an exception resulting in the given HTTP status and
message. The message will be formatted according to the
output format chosen by the request.
"""
if self.content_type == CONTENT_XML:
msg = f"""<?xml version="1.0" encoding="UTF-8" ?>
<error>
<code>{status}</code>
<message>{msg}</message>
</error>
"""
elif self.content_type == CONTENT_JSON:
msg = f"""{{"error":{{"code":{status},"message":"{msg}"}}}}"""
elif self.content_type == CONTENT_HTML:
loglib.log().section('Execution error')
loglib.log().var_dump('Status', status)
loglib.log().var_dump('Message', msg)
msg = loglib.get_and_disable()
raise self.error(msg, status)
def get_int(self, name: str, default: Optional[int] = None) -> int:
""" Return an input parameter as an int. Raises an exception if
the parameter is given but not in an integer format.
If 'default' is given, then it will be returned when the parameter
is missing completely. When 'default' is None, an error will be
raised on a missing parameter.
"""
value = self.get(name)
if value is None:
if default is not None:
return default
self.raise_error(f"Parameter '{name}' missing.")
try:
intval = int(value)
except ValueError:
self.raise_error(f"Parameter '{name}' must be a number.")
return intval
def get_float(self, name: str, default: Optional[float] = None) -> float:
""" Return an input parameter as a flaoting-point number. Raises an
exception if the parameter is given but not in an float format.
If 'default' is given, then it will be returned when the parameter
is missing completely. When 'default' is None, an error will be
raised on a missing parameter.
"""
value = self.get(name)
if value is None:
if default is not None:
return default
self.raise_error(f"Parameter '{name}' missing.")
try:
fval = float(value)
except ValueError:
self.raise_error(f"Parameter '{name}' must be a number.")
if math.isnan(fval) or math.isinf(fval):
self.raise_error(f"Parameter '{name}' must be a number.")
return fval
def get_bool(self, name: str, default: Optional[bool] = None) -> bool:
""" Return an input parameter as bool. Only '0' is accepted as
an input for 'false' all other inputs will be interpreted as 'true'.
If 'default' is given, then it will be returned when the parameter
is missing completely. When 'default' is None, an error will be
raised on a missing parameter.
"""
value = self.get(name)
if value is None:
if default is not None:
return default
self.raise_error(f"Parameter '{name}' missing.")
return value != '0'
def get_accepted_languages(self) -> str:
""" Return the accepted languages.
"""
return self.get('accept-language')\
or self.get_header('accept-language')\
or self.config().DEFAULT_LANGUAGE
def setup_debugging(self) -> bool:
""" Set up collection of debug information if requested.
Return True when debugging was requested.
"""
if self.get_bool('debug', False):
loglib.set_log_output('html')
self.content_type = CONTENT_HTML
return True
return False
def get_layers(self) -> Optional[DataLayer]:
""" Return a parsed version of the layer parameter.
"""
param = self.get('layer', None)
if param is None:
return None
return cast(DataLayer,
reduce(DataLayer.__or__,
(getattr(DataLayer, s.upper()) for s in param.split(','))))
def parse_format(self, result_type: Type[Any], default: str) -> str:
""" Get and check the 'format' parameter and prepare the formatter.
`result_type` is the type of result to be returned by the function
and `default` the format value to assume when no parameter is present.
"""
fmt = self.get('format', default=default)
assert fmt is not None
if not formatting.supports_format(result_type, fmt):
self.raise_error("Parameter 'format' must be one of: " +
', '.join(formatting.list_formats(result_type)))
self.content_type = CONTENT_TYPE.get(fmt, CONTENT_JSON)
return fmt
def parse_geometry_details(self, fmt: str) -> Dict[str, Any]:
""" Create details structure from the supplied geometry parameters.
"""
numgeoms = 0
output = GeometryFormat.NONE
if self.get_bool('polygon_geojson', False):
output |= GeometryFormat.GEOJSON
def parse_geometry_details(adaptor: ASGIAdaptor, fmt: str) -> Dict[str, Any]:
""" Create details structure from the supplied geometry parameters.
"""
numgeoms = 0
output = GeometryFormat.NONE
if adaptor.get_bool('polygon_geojson', False):
output |= GeometryFormat.GEOJSON
numgeoms += 1
if fmt not in ('geojson', 'geocodejson'):
if adaptor.get_bool('polygon_text', False):
output |= GeometryFormat.TEXT
numgeoms += 1
if adaptor.get_bool('polygon_kml', False):
output |= GeometryFormat.KML
numgeoms += 1
if adaptor.get_bool('polygon_svg', False):
output |= GeometryFormat.SVG
numgeoms += 1
if fmt not in ('geojson', 'geocodejson'):
if self.get_bool('polygon_text', False):
output |= GeometryFormat.TEXT
numgeoms += 1
if self.get_bool('polygon_kml', False):
output |= GeometryFormat.KML
numgeoms += 1
if self.get_bool('polygon_svg', False):
output |= GeometryFormat.SVG
numgeoms += 1
if numgeoms > self.config().get_int('POLYGON_OUTPUT_MAX_TYPES'):
self.raise_error('Too many polygon output options selected.')
if numgeoms > adaptor.config().get_int('POLYGON_OUTPUT_MAX_TYPES'):
adaptor.raise_error('Too many polygon output options selected.')
return {'address_details': True,
'geometry_simplification': self.get_float('polygon_threshold', 0.0),
'geometry_output': output
}
return {'address_details': True,
'geometry_simplification': adaptor.get_float('polygon_threshold', 0.0),
'geometry_output': output
}
async def status_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
@@ -276,21 +127,21 @@ async def status_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
"""
result = await api.status()
fmt = params.parse_format(StatusResult, 'text')
fmt = parse_format(params, StatusResult, 'text')
if fmt == 'text' and result.status:
status_code = 500
else:
status_code = 200
return params.build_response(formatting.format_result(result, fmt, {}),
return build_response(params, params.formatting().format_result(result, fmt, {}),
status=status_code)
async def details_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
""" Server glue for /details endpoint. See API docs for details.
"""
fmt = params.parse_format(DetailedResult, 'json')
fmt = parse_format(params, DetailedResult, 'json')
place_id = params.get_int('place_id', 0)
place: PlaceRef
if place_id:
@@ -301,9 +152,9 @@ async def details_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
params.raise_error("Missing ID parameter 'place_id' or 'osmtype'.")
place = OsmID(osmtype, params.get_int('osmid'), params.get('class'))
debug = params.setup_debugging()
debug = setup_debugging(params)
locales = Locales.from_accept_languages(params.get_accepted_languages())
locales = Locales.from_accept_languages(get_accepted_languages(params))
result = await api.details(place,
address_details=params.get_bool('addressdetails', False),
@@ -317,35 +168,35 @@ async def details_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
)
if debug:
return params.build_response(loglib.get_and_disable())
return build_response(params, loglib.get_and_disable())
if result is None:
params.raise_error('No place with that OSM ID found.', status=404)
output = formatting.format_result(result, fmt,
output = params.formatting().format_result(result, fmt,
{'locales': locales,
'group_hierarchy': params.get_bool('group_hierarchy', False),
'icon_base_url': params.config().MAPICON_URL})
return params.build_response(output, num_results=1)
return build_response(params, output, num_results=1)
async def reverse_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
""" Server glue for /reverse endpoint. See API docs for details.
"""
fmt = params.parse_format(ReverseResults, 'xml')
debug = params.setup_debugging()
fmt = parse_format(params, ReverseResults, 'xml')
debug = setup_debugging(params)
coord = Point(params.get_float('lon'), params.get_float('lat'))
details = params.parse_geometry_details(fmt)
details = parse_geometry_details(params, fmt)
details['max_rank'] = helpers.zoom_to_rank(params.get_int('zoom', 18))
details['layers'] = params.get_layers()
details['locales'] = Locales.from_accept_languages(params.get_accepted_languages())
details['layers'] = get_layers(params)
details['locales'] = Locales.from_accept_languages(get_accepted_languages(params))
result = await api.reverse(coord, **details)
if debug:
return params.build_response(loglib.get_and_disable(), num_results=1 if result else 0)
return build_response(params, loglib.get_and_disable(), num_results=1 if result else 0)
if fmt == 'xml':
queryparts = {'lat': str(coord.lat), 'lon': str(coord.lon), 'format': 'xml'}
@@ -361,19 +212,19 @@ async def reverse_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
'namedetails': params.get_bool('namedetails', False),
'addressdetails': params.get_bool('addressdetails', True)}
output = formatting.format_result(ReverseResults([result] if result else []),
fmt, fmt_options)
output = params.formatting().format_result(ReverseResults([result] if result else []),
fmt, fmt_options)
return params.build_response(output, num_results=1 if result else 0)
return build_response(params, output, num_results=1 if result else 0)
async def lookup_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
""" Server glue for /lookup endpoint. See API docs for details.
"""
fmt = params.parse_format(SearchResults, 'xml')
debug = params.setup_debugging()
details = params.parse_geometry_details(fmt)
details['locales'] = Locales.from_accept_languages(params.get_accepted_languages())
fmt = parse_format(params, SearchResults, 'xml')
debug = setup_debugging(params)
details = parse_geometry_details(params, fmt)
details['locales'] = Locales.from_accept_languages(get_accepted_languages(params))
places = []
for oid in (params.get('osm_ids') or '').split(','):
@@ -390,15 +241,15 @@ async def lookup_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
results = SearchResults()
if debug:
return params.build_response(loglib.get_and_disable(), num_results=len(results))
return build_response(params, loglib.get_and_disable(), num_results=len(results))
fmt_options = {'extratags': params.get_bool('extratags', False),
'namedetails': params.get_bool('namedetails', False),
'addressdetails': params.get_bool('addressdetails', True)}
output = formatting.format_result(results, fmt, fmt_options)
output = params.formatting().format_result(results, fmt, fmt_options)
return params.build_response(output, num_results=len(results))
return build_response(params, output, num_results=len(results))
async def _unstructured_search(query: str, api: NominatimAPIAsync,
@@ -435,9 +286,9 @@ async def _unstructured_search(query: str, api: NominatimAPIAsync,
async def search_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
""" Server glue for /search endpoint. See API docs for details.
"""
fmt = params.parse_format(SearchResults, 'jsonv2')
debug = params.setup_debugging()
details = params.parse_geometry_details(fmt)
fmt = parse_format(params, SearchResults, 'jsonv2')
debug = setup_debugging(params)
details = parse_geometry_details(params, fmt)
details['countries'] = params.get('countrycodes', None)
details['excluded'] = params.get('exclude_place_ids', None)
@@ -454,9 +305,9 @@ async def search_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
if params.get('featureType', None) is not None:
details['layers'] = DataLayer.ADDRESS
else:
details['layers'] = params.get_layers()
details['layers'] = get_layers(params)
details['locales'] = Locales.from_accept_languages(params.get_accepted_languages())
details['locales'] = Locales.from_accept_languages(get_accepted_languages(params))
# unstructured query parameters
query = params.get('q', None)
@@ -486,7 +337,7 @@ async def search_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
results = helpers.deduplicate_results(results, max_results)
if debug:
return params.build_response(loglib.get_and_disable(), num_results=len(results))
return build_response(params, loglib.get_and_disable(), num_results=len(results))
if fmt == 'xml':
helpers.extend_query_parts(queryparts, details,
@@ -507,9 +358,9 @@ async def search_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
'namedetails': params.get_bool('namedetails', False),
'addressdetails': params.get_bool('addressdetails', False)}
output = formatting.format_result(results, fmt, fmt_options)
output = params.formatting().format_result(results, fmt, fmt_options)
return params.build_response(output, num_results=len(results))
return build_response(params, output, num_results=len(results))
async def deletable_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
@@ -518,7 +369,7 @@ async def deletable_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any
deleted or are broken in the OSM data but are kept in the
Nominatim database to minimize disruption.
"""
fmt = params.parse_format(RawDataList, 'json')
fmt = parse_format(params, RawDataList, 'json')
async with api.begin() as conn:
sql = sa.text(""" SELECT p.place_id, country_code,
@@ -529,7 +380,7 @@ async def deletable_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any
""")
results = RawDataList(r._asdict() for r in await conn.execute(sql))
return params.build_response(formatting.format_result(results, fmt, {}))
return build_response(params, params.formatting().format_result(results, fmt, {}))
async def polygons_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
@@ -538,7 +389,7 @@ async def polygons_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
their size but are kept in the Nominatim database with their
old area to minimize disruption.
"""
fmt = params.parse_format(RawDataList, 'json')
fmt = parse_format(params, RawDataList, 'json')
sql_params: Dict[str, Any] = {
'days': params.get_int('days', -1),
'cls': params.get('class')
@@ -561,11 +412,9 @@ async def polygons_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
results = RawDataList(r._asdict() for r in await conn.execute(sql, sql_params))
return params.build_response(formatting.format_result(results, fmt, {}))
return build_response(params, params.formatting().format_result(results, fmt, {}))
EndpointFunc = Callable[[NominatimAPIAsync, ASGIAdaptor], Any]
ROUTES = [
('status', status_endpoint),
('details', details_endpoint),

View File

@@ -10,6 +10,7 @@ Provides dummy implementations of ASGIAdaptor for testing.
from collections import namedtuple
import nominatim_api.v1.server_glue as glue
from nominatim_api.v1.format import dispatch as formatting
from nominatim_api.config import Configuration
class FakeError(BaseException):
@@ -47,9 +48,13 @@ class FakeAdaptor(glue.ASGIAdaptor):
return FakeResponse(status, output, self.content_type)
def base_uri(self) -> str:
def base_uri(self):
return 'http://test'
def config(self):
return self._config
def formatting(self):
return formatting

View File

@@ -59,14 +59,14 @@ def test_adaptor_get_bool_falsish():
def test_adaptor_parse_format_use_default():
adaptor = FakeAdaptor()
assert adaptor.parse_format(napi.StatusResult, 'text') == 'text'
assert glue.parse_format(adaptor, napi.StatusResult, 'text') == 'text'
assert adaptor.content_type == 'text/plain; charset=utf-8'
def test_adaptor_parse_format_use_configured():
adaptor = FakeAdaptor(params={'format': 'json'})
assert adaptor.parse_format(napi.StatusResult, 'text') == 'json'
assert glue.parse_format(adaptor, napi.StatusResult, 'text') == 'json'
assert adaptor.content_type == 'application/json; charset=utf-8'
@@ -74,37 +74,37 @@ def test_adaptor_parse_format_invalid_value():
adaptor = FakeAdaptor(params={'format': '@!#'})
with pytest.raises(FakeError, match='^400 -- .*must be one of'):
adaptor.parse_format(napi.StatusResult, 'text')
glue.parse_format(adaptor, napi.StatusResult, 'text')
# ASGIAdaptor.get_accepted_languages()
def test_accepted_languages_from_param():
a = FakeAdaptor(params={'accept-language': 'de'})
assert a.get_accepted_languages() == 'de'
assert glue.get_accepted_languages(a) == 'de'
def test_accepted_languages_from_header():
a = FakeAdaptor(headers={'accept-language': 'de'})
assert a.get_accepted_languages() == 'de'
assert glue.get_accepted_languages(a) == 'de'
def test_accepted_languages_from_default(monkeypatch):
monkeypatch.setenv('NOMINATIM_DEFAULT_LANGUAGE', 'de')
a = FakeAdaptor()
assert a.get_accepted_languages() == 'de'
assert glue.get_accepted_languages(a) == 'de'
def test_accepted_languages_param_over_header():
a = FakeAdaptor(params={'accept-language': 'de'},
headers={'accept-language': 'en'})
assert a.get_accepted_languages() == 'de'
assert glue.get_accepted_languages(a) == 'de'
def test_accepted_languages_header_over_default(monkeypatch):
monkeypatch.setenv('NOMINATIM_DEFAULT_LANGUAGE', 'en')
a = FakeAdaptor(headers={'accept-language': 'de'})
assert a.get_accepted_languages() == 'de'
assert glue.get_accepted_languages(a) == 'de'
# ASGIAdaptor.raise_error()
@@ -114,7 +114,7 @@ class TestAdaptorRaiseError:
@pytest.fixture(autouse=True)
def init_adaptor(self):
self.adaptor = FakeAdaptor()
self.adaptor.setup_debugging()
glue.setup_debugging(self.adaptor)
def run_raise_error(self, msg, status):
with pytest.raises(FakeError) as excinfo:
@@ -127,7 +127,7 @@ class TestAdaptorRaiseError:
err = self.run_raise_error('TEST', 404)
assert self.adaptor.content_type == 'text/plain; charset=utf-8'
assert err.msg == 'TEST'
assert err.msg == 'ERROR 404: TEST'
assert err.status == 404
@@ -155,7 +155,7 @@ class TestAdaptorRaiseError:
def test_raise_error_during_debug():
a = FakeAdaptor(params={'debug': '1'})
a.setup_debugging()
glue.setup_debugging(a)
loglib.log().section('Ongoing')
with pytest.raises(FakeError) as excinfo:
@@ -172,7 +172,7 @@ def test_raise_error_during_debug():
# ASGIAdaptor.build_response
def test_build_response_without_content_type():
resp = FakeAdaptor().build_response('attention')
resp = glue.build_response(FakeAdaptor(), 'attention')
assert isinstance(resp, FakeResponse)
assert resp.status == 200
@@ -182,9 +182,9 @@ def test_build_response_without_content_type():
def test_build_response_with_status():
a = FakeAdaptor(params={'format': 'json'})
a.parse_format(napi.StatusResult, 'text')
glue.parse_format(a, napi.StatusResult, 'text')
resp = a.build_response('stuff\nmore stuff', status=404)
resp = glue.build_response(a, 'stuff\nmore stuff', status=404)
assert isinstance(resp, FakeResponse)
assert resp.status == 404
@@ -194,9 +194,9 @@ def test_build_response_with_status():
def test_build_response_jsonp_with_json():
a = FakeAdaptor(params={'format': 'json', 'json_callback': 'test.func'})
a.parse_format(napi.StatusResult, 'text')
glue.parse_format(a, napi.StatusResult, 'text')
resp = a.build_response('{}')
resp = glue.build_response(a, '{}')
assert isinstance(resp, FakeResponse)
assert resp.status == 200
@@ -206,9 +206,9 @@ def test_build_response_jsonp_with_json():
def test_build_response_jsonp_without_json():
a = FakeAdaptor(params={'format': 'text', 'json_callback': 'test.func'})
a.parse_format(napi.StatusResult, 'text')
glue.parse_format(a, napi.StatusResult, 'text')
resp = a.build_response('{}')
resp = glue.build_response(a, '{}')
assert isinstance(resp, FakeResponse)
assert resp.status == 200
@@ -219,10 +219,10 @@ def test_build_response_jsonp_without_json():
@pytest.mark.parametrize('param', ['alert(); func', '\\n', '', 'a b'])
def test_build_response_jsonp_bad_format(param):
a = FakeAdaptor(params={'format': 'json', 'json_callback': param})
a.parse_format(napi.StatusResult, 'text')
glue.parse_format(a, napi.StatusResult, 'text')
with pytest.raises(FakeError, match='^400 -- .*Invalid'):
a.build_response('{}')
glue.build_response(a, '{}')
# status_endpoint()