mirror of
https://github.com/osm-search/Nominatim.git
synced 2026-02-16 15:47:58 +00:00
574 lines
22 KiB
Python
574 lines
22 KiB
Python
# SPDX-License-Identifier: GPL-2.0-only
|
|
#
|
|
# This file is part of Nominatim. (https://nominatim.org)
|
|
#
|
|
# Copyright (C) 2023 by the Nominatim developer community.
|
|
# For a full list of authors see the git log.
|
|
"""
|
|
Generic part of the server implementation of the v1 API.
|
|
Combine with the scaffolding provided for the various Python ASGI frameworks.
|
|
"""
|
|
from typing import Optional, Any, Type, Callable, NoReturn, Dict, cast
|
|
from functools import reduce
|
|
import abc
|
|
import dataclasses
|
|
import math
|
|
from urllib.parse import urlencode
|
|
|
|
import sqlalchemy as sa
|
|
|
|
from nominatim.errors import UsageError
|
|
from nominatim.config import Configuration
|
|
import nominatim.api as napi
|
|
import nominatim.api.logging as loglib
|
|
from nominatim.api.v1.format import dispatch as formatting
|
|
from nominatim.api.v1.format import RawDataList
|
|
from nominatim.api.v1 import helpers
|
|
|
|
CONTENT_TEXT = 'text/plain; charset=utf-8'
|
|
CONTENT_XML = 'text/xml; charset=utf-8'
|
|
CONTENT_HTML = 'text/html; charset=utf-8'
|
|
CONTENT_JSON = 'application/json; charset=utf-8'
|
|
|
|
CONTENT_TYPE = {'text': CONTENT_TEXT, 'xml': CONTENT_XML, 'debug': CONTENT_HTML}
|
|
|
|
class ASGIAdaptor(abc.ABC):
|
|
""" Adapter class for the different ASGI frameworks.
|
|
Wraps functionality over concrete requests and responses.
|
|
"""
|
|
content_type: str = CONTENT_TEXT
|
|
|
|
@abc.abstractmethod
|
|
def get(self, name: str, default: Optional[str] = None) -> Optional[str]:
|
|
""" Return an input parameter as a string. If the parameter was
|
|
not provided, return the 'default' value.
|
|
"""
|
|
|
|
@abc.abstractmethod
|
|
def get_header(self, name: str, default: Optional[str] = None) -> Optional[str]:
|
|
""" Return a HTTP header parameter as a string. If the parameter was
|
|
not provided, return the 'default' value.
|
|
"""
|
|
|
|
|
|
@abc.abstractmethod
|
|
def error(self, msg: str, status: int = 400) -> Exception:
|
|
""" Construct an appropriate exception from the given error message.
|
|
The exception must result in a HTTP error with the given status.
|
|
"""
|
|
|
|
|
|
@abc.abstractmethod
|
|
def create_response(self, status: int, output: str, num_results: int) -> Any:
|
|
""" Create a response from the given parameters. The result will
|
|
be returned by the endpoint functions. The adaptor may also
|
|
return None when the response is created internally with some
|
|
different means.
|
|
|
|
The response must return the HTTP given status code 'status', set
|
|
the HTTP content-type headers to the string provided and the
|
|
body of the response to 'output'.
|
|
"""
|
|
|
|
@abc.abstractmethod
|
|
def base_uri(self) -> str:
|
|
""" Return the URI of the original request.
|
|
"""
|
|
|
|
|
|
@abc.abstractmethod
|
|
def config(self) -> Configuration:
|
|
""" Return the current configuration object.
|
|
"""
|
|
|
|
|
|
def build_response(self, output: str, status: int = 200, num_results: int = 0) -> Any:
|
|
""" Create a response from the given output. Wraps a JSONP function
|
|
around the response, if necessary.
|
|
"""
|
|
if self.content_type == CONTENT_JSON and status == 200:
|
|
jsonp = self.get('json_callback')
|
|
if jsonp is not None:
|
|
if any(not part.isidentifier() for part in jsonp.split('.')):
|
|
self.raise_error('Invalid json_callback value')
|
|
output = f"{jsonp}({output})"
|
|
self.content_type = 'application/javascript; charset=utf-8'
|
|
|
|
return self.create_response(status, output, num_results)
|
|
|
|
|
|
def raise_error(self, msg: str, status: int = 400) -> NoReturn:
|
|
""" Raise an exception resulting in the given HTTP status and
|
|
message. The message will be formatted according to the
|
|
output format chosen by the request.
|
|
"""
|
|
if self.content_type == CONTENT_XML:
|
|
msg = f"""<?xml version="1.0" encoding="UTF-8" ?>
|
|
<error>
|
|
<code>{status}</code>
|
|
<message>{msg}</message>
|
|
</error>
|
|
"""
|
|
elif self.content_type == CONTENT_JSON:
|
|
msg = f"""{{"error":{{"code":{status},"message":"{msg}"}}}}"""
|
|
elif self.content_type == CONTENT_HTML:
|
|
loglib.log().section('Execution error')
|
|
loglib.log().var_dump('Status', status)
|
|
loglib.log().var_dump('Message', msg)
|
|
msg = loglib.get_and_disable()
|
|
|
|
raise self.error(msg, status)
|
|
|
|
|
|
def get_int(self, name: str, default: Optional[int] = None) -> int:
|
|
""" Return an input parameter as an int. Raises an exception if
|
|
the parameter is given but not in an integer format.
|
|
|
|
If 'default' is given, then it will be returned when the parameter
|
|
is missing completely. When 'default' is None, an error will be
|
|
raised on a missing parameter.
|
|
"""
|
|
value = self.get(name)
|
|
|
|
if value is None:
|
|
if default is not None:
|
|
return default
|
|
|
|
self.raise_error(f"Parameter '{name}' missing.")
|
|
|
|
try:
|
|
intval = int(value)
|
|
except ValueError:
|
|
self.raise_error(f"Parameter '{name}' must be a number.")
|
|
|
|
return intval
|
|
|
|
|
|
def get_float(self, name: str, default: Optional[float] = None) -> float:
|
|
""" Return an input parameter as a flaoting-point number. Raises an
|
|
exception if the parameter is given but not in an float format.
|
|
|
|
If 'default' is given, then it will be returned when the parameter
|
|
is missing completely. When 'default' is None, an error will be
|
|
raised on a missing parameter.
|
|
"""
|
|
value = self.get(name)
|
|
|
|
if value is None:
|
|
if default is not None:
|
|
return default
|
|
|
|
self.raise_error(f"Parameter '{name}' missing.")
|
|
|
|
try:
|
|
fval = float(value)
|
|
except ValueError:
|
|
self.raise_error(f"Parameter '{name}' must be a number.")
|
|
|
|
if math.isnan(fval) or math.isinf(fval):
|
|
self.raise_error(f"Parameter '{name}' must be a number.")
|
|
|
|
return fval
|
|
|
|
|
|
def get_bool(self, name: str, default: Optional[bool] = None) -> bool:
|
|
""" Return an input parameter as bool. Only '0' is accepted as
|
|
an input for 'false' all other inputs will be interpreted as 'true'.
|
|
|
|
If 'default' is given, then it will be returned when the parameter
|
|
is missing completely. When 'default' is None, an error will be
|
|
raised on a missing parameter.
|
|
"""
|
|
value = self.get(name)
|
|
|
|
if value is None:
|
|
if default is not None:
|
|
return default
|
|
|
|
self.raise_error(f"Parameter '{name}' missing.")
|
|
|
|
return value != '0'
|
|
|
|
|
|
def get_accepted_languages(self) -> str:
|
|
""" Return the accepted languages.
|
|
"""
|
|
return self.get('accept-language')\
|
|
or self.get_header('accept-language')\
|
|
or self.config().DEFAULT_LANGUAGE
|
|
|
|
|
|
def setup_debugging(self) -> bool:
|
|
""" Set up collection of debug information if requested.
|
|
|
|
Return True when debugging was requested.
|
|
"""
|
|
if self.get_bool('debug', False):
|
|
loglib.set_log_output('html')
|
|
self.content_type = CONTENT_HTML
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def get_layers(self) -> Optional[napi.DataLayer]:
|
|
""" Return a parsed version of the layer parameter.
|
|
"""
|
|
param = self.get('layer', None)
|
|
if param is None:
|
|
return None
|
|
|
|
return cast(napi.DataLayer,
|
|
reduce(napi.DataLayer.__or__,
|
|
(getattr(napi.DataLayer, s.upper()) for s in param.split(','))))
|
|
|
|
|
|
def parse_format(self, result_type: Type[Any], default: str) -> str:
|
|
""" Get and check the 'format' parameter and prepare the formatter.
|
|
`result_type` is the type of result to be returned by the function
|
|
and `default` the format value to assume when no parameter is present.
|
|
"""
|
|
fmt = self.get('format', default=default)
|
|
assert fmt is not None
|
|
|
|
if not formatting.supports_format(result_type, fmt):
|
|
self.raise_error("Parameter 'format' must be one of: " +
|
|
', '.join(formatting.list_formats(result_type)))
|
|
|
|
self.content_type = CONTENT_TYPE.get(fmt, CONTENT_JSON)
|
|
return fmt
|
|
|
|
|
|
def parse_geometry_details(self, fmt: str) -> Dict[str, Any]:
|
|
""" Create details structure from the supplied geometry parameters.
|
|
"""
|
|
numgeoms = 0
|
|
output = napi.GeometryFormat.NONE
|
|
if self.get_bool('polygon_geojson', False):
|
|
output |= napi.GeometryFormat.GEOJSON
|
|
numgeoms += 1
|
|
if fmt not in ('geojson', 'geocodejson'):
|
|
if self.get_bool('polygon_text', False):
|
|
output |= napi.GeometryFormat.TEXT
|
|
numgeoms += 1
|
|
if self.get_bool('polygon_kml', False):
|
|
output |= napi.GeometryFormat.KML
|
|
numgeoms += 1
|
|
if self.get_bool('polygon_svg', False):
|
|
output |= napi.GeometryFormat.SVG
|
|
numgeoms += 1
|
|
|
|
if numgeoms > self.config().get_int('POLYGON_OUTPUT_MAX_TYPES'):
|
|
self.raise_error('Too many polygon output options selected.')
|
|
|
|
return {'address_details': True,
|
|
'geometry_simplification': self.get_float('polygon_threshold', 0.0),
|
|
'geometry_output': output
|
|
}
|
|
|
|
|
|
async def status_endpoint(api: napi.NominatimAPIAsync, params: ASGIAdaptor) -> Any:
|
|
""" Server glue for /status endpoint. See API docs for details.
|
|
"""
|
|
result = await api.status()
|
|
|
|
fmt = params.parse_format(napi.StatusResult, 'text')
|
|
|
|
if fmt == 'text' and result.status:
|
|
status_code = 500
|
|
else:
|
|
status_code = 200
|
|
|
|
return params.build_response(formatting.format_result(result, fmt, {}),
|
|
status=status_code)
|
|
|
|
|
|
async def details_endpoint(api: napi.NominatimAPIAsync, params: ASGIAdaptor) -> Any:
|
|
""" Server glue for /details endpoint. See API docs for details.
|
|
"""
|
|
fmt = params.parse_format(napi.DetailedResult, 'json')
|
|
place_id = params.get_int('place_id', 0)
|
|
place: napi.PlaceRef
|
|
if place_id:
|
|
place = napi.PlaceID(place_id)
|
|
else:
|
|
osmtype = params.get('osmtype')
|
|
if osmtype is None:
|
|
params.raise_error("Missing ID parameter 'place_id' or 'osmtype'.")
|
|
place = napi.OsmID(osmtype, params.get_int('osmid'), params.get('class'))
|
|
|
|
debug = params.setup_debugging()
|
|
|
|
locales = napi.Locales.from_accept_languages(params.get_accepted_languages())
|
|
|
|
result = await api.details(place,
|
|
address_details=params.get_bool('addressdetails', False),
|
|
linked_places=params.get_bool('linkedplaces', True),
|
|
parented_places=params.get_bool('hierarchy', False),
|
|
keywords=params.get_bool('keywords', False),
|
|
geometry_output = napi.GeometryFormat.GEOJSON
|
|
if params.get_bool('polygon_geojson', False)
|
|
else napi.GeometryFormat.NONE,
|
|
locales=locales
|
|
)
|
|
|
|
if debug:
|
|
return params.build_response(loglib.get_and_disable())
|
|
|
|
if result is None:
|
|
params.raise_error('No place with that OSM ID found.', status=404)
|
|
|
|
output = formatting.format_result(result, fmt,
|
|
{'locales': locales,
|
|
'group_hierarchy': params.get_bool('group_hierarchy', False),
|
|
'icon_base_url': params.config().MAPICON_URL})
|
|
|
|
return params.build_response(output, num_results=1)
|
|
|
|
|
|
async def reverse_endpoint(api: napi.NominatimAPIAsync, params: ASGIAdaptor) -> Any:
|
|
""" Server glue for /reverse endpoint. See API docs for details.
|
|
"""
|
|
fmt = params.parse_format(napi.ReverseResults, 'xml')
|
|
debug = params.setup_debugging()
|
|
coord = napi.Point(params.get_float('lon'), params.get_float('lat'))
|
|
|
|
details = params.parse_geometry_details(fmt)
|
|
details['max_rank'] = helpers.zoom_to_rank(params.get_int('zoom', 18))
|
|
details['layers'] = params.get_layers()
|
|
details['locales'] = napi.Locales.from_accept_languages(params.get_accepted_languages())
|
|
|
|
result = await api.reverse(coord, **details)
|
|
|
|
if debug:
|
|
return params.build_response(loglib.get_and_disable(), num_results=1 if result else 0)
|
|
|
|
if fmt == 'xml':
|
|
queryparts = {'lat': str(coord.lat), 'lon': str(coord.lon), 'format': 'xml'}
|
|
zoom = params.get('zoom', None)
|
|
if zoom:
|
|
queryparts['zoom'] = zoom
|
|
query = urlencode(queryparts)
|
|
else:
|
|
query = ''
|
|
|
|
fmt_options = {'query': query,
|
|
'extratags': params.get_bool('extratags', False),
|
|
'namedetails': params.get_bool('namedetails', False),
|
|
'addressdetails': params.get_bool('addressdetails', True)}
|
|
|
|
output = formatting.format_result(napi.ReverseResults([result] if result else []),
|
|
fmt, fmt_options)
|
|
|
|
return params.build_response(output, num_results=1 if result else 0)
|
|
|
|
|
|
async def lookup_endpoint(api: napi.NominatimAPIAsync, params: ASGIAdaptor) -> Any:
|
|
""" Server glue for /lookup endpoint. See API docs for details.
|
|
"""
|
|
fmt = params.parse_format(napi.SearchResults, 'xml')
|
|
debug = params.setup_debugging()
|
|
details = params.parse_geometry_details(fmt)
|
|
details['locales'] = napi.Locales.from_accept_languages(params.get_accepted_languages())
|
|
|
|
places = []
|
|
for oid in (params.get('osm_ids') or '').split(','):
|
|
oid = oid.strip()
|
|
if len(oid) > 1 and oid[0] in 'RNWrnw' and oid[1:].isdigit():
|
|
places.append(napi.OsmID(oid[0].upper(), int(oid[1:])))
|
|
|
|
if len(places) > params.config().get_int('LOOKUP_MAX_COUNT'):
|
|
params.raise_error('Too many object IDs.')
|
|
|
|
if places:
|
|
results = await api.lookup(places, **details)
|
|
else:
|
|
results = napi.SearchResults()
|
|
|
|
if debug:
|
|
return params.build_response(loglib.get_and_disable(), num_results=len(results))
|
|
|
|
fmt_options = {'extratags': params.get_bool('extratags', False),
|
|
'namedetails': params.get_bool('namedetails', False),
|
|
'addressdetails': params.get_bool('addressdetails', True)}
|
|
|
|
output = formatting.format_result(results, fmt, fmt_options)
|
|
|
|
return params.build_response(output, num_results=len(results))
|
|
|
|
|
|
async def _unstructured_search(query: str, api: napi.NominatimAPIAsync,
|
|
details: Dict[str, Any]) -> napi.SearchResults:
|
|
if not query:
|
|
return napi.SearchResults()
|
|
|
|
# Extract special format for coordinates from query.
|
|
query, x, y = helpers.extract_coords_from_query(query)
|
|
if x is not None:
|
|
assert y is not None
|
|
details['near'] = napi.Point(x, y)
|
|
details['near_radius'] = 0.1
|
|
|
|
# If no query is left, revert to reverse search.
|
|
if x is not None and not query:
|
|
result = await api.reverse(details['near'], **details)
|
|
if not result:
|
|
return napi.SearchResults()
|
|
|
|
return napi.SearchResults(
|
|
[napi.SearchResult(**{f.name: getattr(result, f.name)
|
|
for f in dataclasses.fields(napi.SearchResult)
|
|
if hasattr(result, f.name)})])
|
|
|
|
query, cls, typ = helpers.extract_category_from_query(query)
|
|
if cls is not None:
|
|
assert typ is not None
|
|
return await api.search_category([(cls, typ)], near_query=query, **details)
|
|
|
|
return await api.search(query, **details)
|
|
|
|
|
|
async def search_endpoint(api: napi.NominatimAPIAsync, params: ASGIAdaptor) -> Any:
|
|
""" Server glue for /search endpoint. See API docs for details.
|
|
"""
|
|
fmt = params.parse_format(napi.SearchResults, 'jsonv2')
|
|
debug = params.setup_debugging()
|
|
details = params.parse_geometry_details(fmt)
|
|
|
|
details['countries'] = params.get('countrycodes', None)
|
|
details['excluded'] = params.get('exclude_place_ids', None)
|
|
details['viewbox'] = params.get('viewbox', None) or params.get('viewboxlbrt', None)
|
|
details['bounded_viewbox'] = params.get_bool('bounded', False)
|
|
details['dedupe'] = params.get_bool('dedupe', True)
|
|
|
|
max_results = max(1, min(50, params.get_int('limit', 10)))
|
|
details['max_results'] = max_results + min(10, max_results) \
|
|
if details['dedupe'] else max_results
|
|
|
|
details['min_rank'], details['max_rank'] = \
|
|
helpers.feature_type_to_rank(params.get('featureType', ''))
|
|
if params.get('featureType', None) is not None:
|
|
details['layers'] = napi.DataLayer.ADDRESS
|
|
else:
|
|
details['layers'] = params.get_layers()
|
|
|
|
details['locales'] = napi.Locales.from_accept_languages(params.get_accepted_languages())
|
|
|
|
# unstructured query parameters
|
|
query = params.get('q', None)
|
|
# structured query parameters
|
|
queryparts = {}
|
|
for key in ('amenity', 'street', 'city', 'county', 'state', 'postalcode', 'country'):
|
|
details[key] = params.get(key, None)
|
|
if details[key]:
|
|
queryparts[key] = details[key]
|
|
|
|
try:
|
|
if query is not None:
|
|
if queryparts:
|
|
params.raise_error("Structured query parameters"
|
|
"(amenity, street, city, county, state, postalcode, country)"
|
|
" cannot be used together with 'q' parameter.")
|
|
queryparts['q'] = query
|
|
results = await _unstructured_search(query, api, details)
|
|
else:
|
|
query = ', '.join(queryparts.values())
|
|
|
|
results = await api.search_address(**details)
|
|
except UsageError as err:
|
|
params.raise_error(str(err))
|
|
|
|
if details['dedupe'] and len(results) > 1:
|
|
results = helpers.deduplicate_results(results, max_results)
|
|
|
|
if debug:
|
|
return params.build_response(loglib.get_and_disable(), num_results=len(results))
|
|
|
|
if fmt == 'xml':
|
|
helpers.extend_query_parts(queryparts, details,
|
|
params.get('featureType', ''),
|
|
params.get_bool('namedetails', False),
|
|
params.get_bool('extratags', False),
|
|
(str(r.place_id) for r in results if r.place_id))
|
|
queryparts['format'] = fmt
|
|
|
|
moreurl = params.base_uri() + '/search?' + urlencode(queryparts)
|
|
else:
|
|
moreurl = ''
|
|
|
|
fmt_options = {'query': query, 'more_url': moreurl,
|
|
'exclude_place_ids': queryparts.get('exclude_place_ids'),
|
|
'viewbox': queryparts.get('viewbox'),
|
|
'extratags': params.get_bool('extratags', False),
|
|
'namedetails': params.get_bool('namedetails', False),
|
|
'addressdetails': params.get_bool('addressdetails', False)}
|
|
|
|
output = formatting.format_result(results, fmt, fmt_options)
|
|
|
|
return params.build_response(output, num_results=len(results))
|
|
|
|
|
|
async def deletable_endpoint(api: napi.NominatimAPIAsync, params: ASGIAdaptor) -> Any:
|
|
""" Server glue for /deletable endpoint.
|
|
This is a special endpoint that shows polygons that have been
|
|
deleted or are broken in the OSM data but are kept in the
|
|
Nominatim database to minimize disruption.
|
|
"""
|
|
fmt = params.parse_format(RawDataList, 'json')
|
|
|
|
async with api.begin() as conn:
|
|
sql = sa.text(""" SELECT p.place_id, country_code,
|
|
name->'name' as name, i.*
|
|
FROM placex p, import_polygon_delete i
|
|
WHERE p.osm_id = i.osm_id AND p.osm_type = i.osm_type
|
|
AND p.class = i.class AND p.type = i.type
|
|
""")
|
|
results = RawDataList(r._asdict() for r in await conn.execute(sql))
|
|
|
|
return params.build_response(formatting.format_result(results, fmt, {}))
|
|
|
|
|
|
async def polygons_endpoint(api: napi.NominatimAPIAsync, params: ASGIAdaptor) -> Any:
|
|
""" Server glue for /polygons endpoint.
|
|
This is a special endpoint that shows polygons that have changed
|
|
their size but are kept in the Nominatim database with their
|
|
old area to minimize disruption.
|
|
"""
|
|
fmt = params.parse_format(RawDataList, 'json')
|
|
sql_params: Dict[str, Any] = {
|
|
'days': params.get_int('days', -1),
|
|
'cls': params.get('class')
|
|
}
|
|
reduced = params.get_bool('reduced', False)
|
|
|
|
async with api.begin() as conn:
|
|
sql = sa.select(sa.text("""osm_type, osm_id, class, type,
|
|
name->'name' as name,
|
|
country_code, errormessage, updated"""))\
|
|
.select_from(sa.text('import_polygon_error'))
|
|
if sql_params['days'] > 0:
|
|
sql = sql.where(sa.text("updated > 'now'::timestamp - make_interval(days => :days)"))
|
|
if reduced:
|
|
sql = sql.where(sa.text("errormessage like 'Area reduced%'"))
|
|
if sql_params['cls'] is not None:
|
|
sql = sql.where(sa.text("class = :cls"))
|
|
|
|
sql = sql.order_by(sa.literal_column('updated').desc()).limit(1000)
|
|
|
|
results = RawDataList(r._asdict() for r in await conn.execute(sql, sql_params))
|
|
|
|
return params.build_response(formatting.format_result(results, fmt, {}))
|
|
|
|
|
|
EndpointFunc = Callable[[napi.NominatimAPIAsync, ASGIAdaptor], Any]
|
|
|
|
ROUTES = [
|
|
('status', status_endpoint),
|
|
('details', details_endpoint),
|
|
('reverse', reverse_endpoint),
|
|
('lookup', lookup_endpoint),
|
|
('search', search_endpoint),
|
|
('deletable', deletable_endpoint),
|
|
('polygons', polygons_endpoint),
|
|
]
|