mirror of
https://github.com/osm-search/Nominatim.git
synced 2026-02-16 05:18:00 +00:00
The only allowable difference is precision of coordinates. Python uses a precision of 7 digits where possible, which corresponds to the precision of OSM data. Also fixes some smaller bugs found by the BDD tests.
260 lines
9.9 KiB
Python
260 lines
9.9 KiB
Python
# SPDX-License-Identifier: GPL-2.0-only
|
|
#
|
|
# This file is part of Nominatim. (https://nominatim.org)
|
|
#
|
|
# Copyright (C) 2023 by the Nominatim developer community.
|
|
# For a full list of authors see the git log.
|
|
"""
|
|
Classes wrapping HTTP responses from the Nominatim API.
|
|
"""
|
|
import re
|
|
import json
|
|
import xml.etree.ElementTree as ET
|
|
|
|
from check_functions import Almost, OsmType, Field, check_for_attributes
|
|
|
|
|
|
class GenericResponse:
|
|
""" Common base class for all API responses.
|
|
"""
|
|
def __init__(self, page, fmt, errorcode=200):
|
|
fmt = fmt.strip()
|
|
if fmt == 'jsonv2':
|
|
fmt = 'json'
|
|
|
|
self.page = page
|
|
self.format = fmt
|
|
self.errorcode = errorcode
|
|
self.result = []
|
|
self.header = dict()
|
|
|
|
if errorcode == 200 and fmt != 'debug':
|
|
getattr(self, '_parse_' + fmt)()
|
|
|
|
def _parse_json(self):
|
|
m = re.fullmatch(r'([\w$][^(]*)\((.*)\)', self.page)
|
|
if m is None:
|
|
code = self.page
|
|
else:
|
|
code = m.group(2)
|
|
self.header['json_func'] = m.group(1)
|
|
self.result = json.JSONDecoder().decode(code)
|
|
if isinstance(self.result, dict):
|
|
if 'error' in self.result:
|
|
self.result = []
|
|
else:
|
|
self.result = [self.result]
|
|
|
|
|
|
def _parse_geojson(self):
|
|
self._parse_json()
|
|
if self.result:
|
|
geojson = self.result[0]
|
|
# check for valid geojson
|
|
check_for_attributes(geojson, 'type,features')
|
|
assert geojson['type'] == 'FeatureCollection'
|
|
assert isinstance(geojson['features'], list)
|
|
|
|
self.result = []
|
|
for result in geojson['features']:
|
|
check_for_attributes(result, 'type,properties,geometry')
|
|
assert result['type'] == 'Feature'
|
|
new = result['properties']
|
|
check_for_attributes(new, 'geojson', 'absent')
|
|
new['geojson'] = result['geometry']
|
|
if 'bbox' in result:
|
|
check_for_attributes(new, 'boundingbox', 'absent')
|
|
# bbox is minlon, minlat, maxlon, maxlat
|
|
# boundingbox is minlat, maxlat, minlon, maxlon
|
|
new['boundingbox'] = [result['bbox'][1],
|
|
result['bbox'][3],
|
|
result['bbox'][0],
|
|
result['bbox'][2]]
|
|
for k, v in geojson.items():
|
|
if k not in ('type', 'features'):
|
|
check_for_attributes(new, '__' + k, 'absent')
|
|
new['__' + k] = v
|
|
self.result.append(new)
|
|
|
|
|
|
def _parse_geocodejson(self):
|
|
self._parse_geojson()
|
|
if self.result:
|
|
for r in self.result:
|
|
assert set(r.keys()) == {'geocoding', 'geojson', '__geocoding'}, \
|
|
f"Unexpected keys in result: {r.keys()}"
|
|
check_for_attributes(r['geocoding'], 'geojson', 'absent')
|
|
inner = r.pop('geocoding')
|
|
r.update(inner)
|
|
|
|
|
|
def assert_address_field(self, idx, field, value):
|
|
""" Check that result rows`idx` has a field `field` with value `value`
|
|
in its address. If idx is None, then all results are checked.
|
|
"""
|
|
if idx is None:
|
|
todo = range(len(self.result))
|
|
else:
|
|
todo = [int(idx)]
|
|
|
|
for idx in todo:
|
|
self.check_row(idx, 'address' in self.result[idx], "No field 'address'")
|
|
|
|
address = self.result[idx]['address']
|
|
self.check_row_field(idx, field, value, base=address)
|
|
|
|
|
|
def match_row(self, row, context=None, field=None):
|
|
""" Match the result fields against the given behave table row.
|
|
"""
|
|
if 'ID' in row.headings:
|
|
todo = [int(row['ID'])]
|
|
else:
|
|
todo = range(len(self.result))
|
|
|
|
for i in todo:
|
|
subdict = self.result[i]
|
|
if field is not None:
|
|
for key in field.split('.'):
|
|
self.check_row(i, key in subdict, f"Missing subfield {key}")
|
|
subdict = subdict[key]
|
|
self.check_row(i, isinstance(subdict, dict),
|
|
f"Subfield {key} not a dict")
|
|
|
|
for name, value in zip(row.headings, row.cells):
|
|
if name == 'ID':
|
|
pass
|
|
elif name == 'osm':
|
|
self.check_row_field(i, 'osm_type', OsmType(value[0]), base=subdict)
|
|
self.check_row_field(i, 'osm_id', Field(value[1:]), base=subdict)
|
|
elif name == 'centroid':
|
|
if ' ' in value:
|
|
lon, lat = value.split(' ')
|
|
elif context is not None:
|
|
lon, lat = context.osm.grid_node(int(value))
|
|
else:
|
|
raise RuntimeError("Context needed when using grid coordinates")
|
|
self.check_row_field(i, 'lat', Field(float(lat), abs_tol=1e-07), base=subdict)
|
|
self.check_row_field(i, 'lon', Field(float(lon), abs_tol=1e-07), base=subdict)
|
|
else:
|
|
self.check_row_field(i, name, Field(value), base=subdict)
|
|
|
|
|
|
def check_row(self, idx, check, msg):
|
|
""" Assert for the condition 'check' and print 'msg' on fail together
|
|
with the contents of the failing result.
|
|
"""
|
|
class _RowError:
|
|
def __init__(self, row):
|
|
self.row = row
|
|
|
|
def __str__(self):
|
|
return f"{msg}. Full row {idx}:\n" \
|
|
+ json.dumps(self.row, indent=4, ensure_ascii=False)
|
|
|
|
assert check, _RowError(self.result[idx])
|
|
|
|
|
|
def check_row_field(self, idx, field, expected, base=None):
|
|
""" Check field 'field' of result 'idx' for the expected value
|
|
and print a meaningful error if the condition fails.
|
|
When 'base' is set to a dictionary, then the field is checked
|
|
in that base. The error message will still report the contents
|
|
of the full result.
|
|
"""
|
|
if base is None:
|
|
base = self.result[idx]
|
|
|
|
self.check_row(idx, field in base, f"No field '{field}'")
|
|
value = base[field]
|
|
|
|
self.check_row(idx, expected == value,
|
|
f"\nBad value for field '{field}'. Expected: {expected}, got: {value}")
|
|
|
|
|
|
|
|
class SearchResponse(GenericResponse):
|
|
""" Specialised class for search and lookup responses.
|
|
Transforms the xml response in a format similar to json.
|
|
"""
|
|
|
|
def _parse_xml(self):
|
|
xml_tree = ET.fromstring(self.page)
|
|
|
|
self.header = dict(xml_tree.attrib)
|
|
|
|
for child in xml_tree:
|
|
assert child.tag == "place"
|
|
self.result.append(dict(child.attrib))
|
|
|
|
address = {}
|
|
for sub in child:
|
|
if sub.tag == 'extratags':
|
|
self.result[-1]['extratags'] = {}
|
|
for tag in sub:
|
|
self.result[-1]['extratags'][tag.attrib['key']] = tag.attrib['value']
|
|
elif sub.tag == 'namedetails':
|
|
self.result[-1]['namedetails'] = {}
|
|
for tag in sub:
|
|
self.result[-1]['namedetails'][tag.attrib['desc']] = tag.text
|
|
elif sub.tag == 'geokml':
|
|
self.result[-1][sub.tag] = True
|
|
else:
|
|
address[sub.tag] = sub.text
|
|
|
|
if address:
|
|
self.result[-1]['address'] = address
|
|
|
|
|
|
class ReverseResponse(GenericResponse):
|
|
""" Specialised class for reverse responses.
|
|
Transforms the xml response in a format similar to json.
|
|
"""
|
|
|
|
def _parse_xml(self):
|
|
xml_tree = ET.fromstring(self.page)
|
|
|
|
self.header = dict(xml_tree.attrib)
|
|
self.result = []
|
|
|
|
for child in xml_tree:
|
|
if child.tag == 'result':
|
|
assert not self.result, "More than one result in reverse result"
|
|
self.result.append(dict(child.attrib))
|
|
check_for_attributes(self.result[0], 'display_name', 'absent')
|
|
self.result[0]['display_name'] = child.text
|
|
elif child.tag == 'addressparts':
|
|
assert 'address' not in self.result[0], "More than one address in result"
|
|
address = {}
|
|
for sub in child:
|
|
assert len(sub) == 0, f"Address element '{sub.tag}' has subelements"
|
|
address[sub.tag] = sub.text
|
|
self.result[0]['address'] = address
|
|
elif child.tag == 'extratags':
|
|
assert 'extratags' not in self.result[0], "More than one extratags in result"
|
|
self.result[0]['extratags'] = {}
|
|
for tag in child:
|
|
assert len(tag) == 0, f"Extratags element '{tag.attrib['key']}' has subelements"
|
|
self.result[0]['extratags'][tag.attrib['key']] = tag.attrib['value']
|
|
elif child.tag == 'namedetails':
|
|
assert 'namedetails' not in self.result[0], "More than one namedetails in result"
|
|
self.result[0]['namedetails'] = {}
|
|
for tag in child:
|
|
assert len(tag) == 0, f"Namedetails element '{tag.attrib['desc']}' has subelements"
|
|
self.result[0]['namedetails'][tag.attrib['desc']] = tag.text
|
|
elif child.tag == 'geokml':
|
|
assert 'geokml' not in self.result[0], "More than one geokml in result"
|
|
self.result[0]['geokml'] = ET.tostring(child, encoding='unicode')
|
|
else:
|
|
assert child.tag == 'error', \
|
|
f"Unknown XML tag {child.tag} on page: {self.page}"
|
|
|
|
|
|
class StatusResponse(GenericResponse):
|
|
""" Specialised class for status responses.
|
|
Can also parse text responses.
|
|
"""
|
|
|
|
def _parse_text(self):
|
|
pass
|