mirror of
https://github.com/osm-search/Nominatim.git
synced 2026-02-16 15:47:58 +00:00
more API search tests
also move directory name back to api
This commit is contained in:
@@ -94,11 +94,29 @@ class SearchResponse(object):
|
||||
|
||||
self.header = dict(et.attrib)
|
||||
|
||||
|
||||
for child in et:
|
||||
assert_equal(child.tag, "place")
|
||||
self.result.append(dict(child.attrib))
|
||||
|
||||
address = {}
|
||||
for sub in child:
|
||||
if sub.tag == 'extratags':
|
||||
self.result[-1]['extratags'] = {}
|
||||
for tag in sub:
|
||||
self.result[-1]['extratags'][tag.attrib['key']] = tag.attrib['value']
|
||||
elif sub.tag == 'namedetails':
|
||||
self.result[-1]['namedetails'] = {}
|
||||
for tag in sub:
|
||||
self.result[-1]['namedetails'][tag.attrib['desc']] = tag.text
|
||||
elif sub.tag in ('geokml'):
|
||||
self.result[-1][sub.tag] = True
|
||||
else:
|
||||
address[sub.tag] = sub.text
|
||||
|
||||
if len(address) > 0:
|
||||
self.result[-1]['address'] = address
|
||||
|
||||
|
||||
def match_row(self, row):
|
||||
if 'ID' in row.headings:
|
||||
todo = [int(row['ID'])]
|
||||
@@ -117,10 +135,18 @@ class SearchResponse(object):
|
||||
x, y = row[h].split(' ')
|
||||
assert_almost_equal(float(y), float(res['lat']))
|
||||
assert_almost_equal(float(x), float(res['lon']))
|
||||
elif row[h].startswith("^"):
|
||||
assert_in(h, res)
|
||||
assert_is_not_none(re.fullmatch(row[h], res[h]),
|
||||
"attribute '%s': expected: '%s', got '%s'"
|
||||
% (h, row[h], res[h]))
|
||||
else:
|
||||
assert_in(h, res)
|
||||
assert_equal(str(res[h]), str(row[h]))
|
||||
|
||||
def property_list(self, prop):
|
||||
return [ x[prop] for x in self.result ]
|
||||
|
||||
|
||||
@when(u'searching for "(?P<query>.*)"(?P<dups> with dups)?')
|
||||
def query_cmd(context, query, dups):
|
||||
@@ -147,13 +173,15 @@ def query_cmd(context, query, dups):
|
||||
context.response = SearchResponse(outp.decode('utf-8'), 'json')
|
||||
|
||||
|
||||
@when(u'sending (?P<fmt>\S+ )?search query "(?P<query>.*)"')
|
||||
def website_search_request(context, fmt, query):
|
||||
@when(u'sending (?P<fmt>\S+ )?search query "(?P<query>.*)"(?P<addr> with address)?')
|
||||
def website_search_request(context, fmt, query, addr):
|
||||
env = BASE_SERVER_ENV
|
||||
|
||||
params = { 'q' : query }
|
||||
if fmt is not None:
|
||||
params['format'] = fmt.strip()
|
||||
if addr is not None:
|
||||
params['addressdetails'] = '1'
|
||||
if context.table:
|
||||
if context.table.headings[0] == 'param':
|
||||
for line in context.table:
|
||||
@@ -241,13 +269,101 @@ def step_impl(context):
|
||||
for line in context.table:
|
||||
context.response.match_row(line)
|
||||
|
||||
@then(u'result (?P<lid>\d+) has (?P<neg>not )?attributes (?P<attrs>.*)')
|
||||
@then(u'result (?P<lid>\d+ )?has (?P<neg>not )?attributes (?P<attrs>.*)')
|
||||
def validate_attributes(context, lid, neg, attrs):
|
||||
context.execute_steps("then at least %s result is returned" % lid)
|
||||
if lid is None:
|
||||
idx = range(len(context.response.result))
|
||||
context.execute_steps("then at least 1 result is returned")
|
||||
else:
|
||||
idx = [int(lid.strip())]
|
||||
context.execute_steps("then more than %sresults are returned" % lid)
|
||||
|
||||
for i in idx:
|
||||
for attr in attrs.split(','):
|
||||
if neg:
|
||||
assert_not_in(attr, context.response.result[i])
|
||||
else:
|
||||
assert_in(attr, context.response.result[i])
|
||||
|
||||
@then(u'result addresses contain')
|
||||
def step_impl(context):
|
||||
context.execute_steps("then at least 1 result is returned")
|
||||
|
||||
if 'ID' not in context.table.headings:
|
||||
addr_parts = context.response.property_list('address')
|
||||
|
||||
for line in context.table:
|
||||
if 'ID' in context.table.headings:
|
||||
addr_parts = [dict(context.response.result[int(line['ID'])]['address'])]
|
||||
|
||||
for h in context.table.headings:
|
||||
if h != 'ID':
|
||||
for p in addr_parts:
|
||||
assert_in(h, p)
|
||||
assert_equal(p[h], line[h], "Bad address value for %s" % h)
|
||||
|
||||
@then(u'address of result (?P<lid>\d+) has(?P<neg> no)? types (?P<attrs>.*)')
|
||||
def check_address(context, lid, neg, attrs):
|
||||
context.execute_steps("then more than %s results are returned" % lid)
|
||||
|
||||
addr_parts = context.response.result[int(lid)]['address']
|
||||
|
||||
for attr in attrs.split(','):
|
||||
if neg:
|
||||
assert_not_in(attr, context.response.result[int(lid)])
|
||||
assert_not_in(attr, addr_parts)
|
||||
else:
|
||||
assert_in(attr, context.response.result[int(lid)])
|
||||
assert_in(attr, addr_parts)
|
||||
|
||||
@then(u'address of result (?P<lid>\d+) is')
|
||||
def check_address(context, lid):
|
||||
context.execute_steps("then more than %s results are returned" % lid)
|
||||
|
||||
addr_parts = dict(context.response.result[int(lid)]['address'])
|
||||
|
||||
for line in context.table:
|
||||
assert_in(line['type'], addr_parts)
|
||||
assert_equal(addr_parts[line['type']], line['value'],
|
||||
"Bad address value for %s" % line['type'])
|
||||
del addr_parts[line['type']]
|
||||
|
||||
eq_(0, len(addr_parts), "Additional address parts found: %s" % str(addr_parts))
|
||||
|
||||
@then(u'result (?P<lid>\d+ )?has bounding box in (?P<coords>[\d,.-]+)')
|
||||
def step_impl(context, lid, coords):
|
||||
if lid is None:
|
||||
context.execute_steps("then at least 1 result is returned")
|
||||
bboxes = context.response.property_list('boundingbox')
|
||||
else:
|
||||
context.execute_steps("then more than %sresults are returned" % lid)
|
||||
bboxes = [ context.response.result[int(lid)]['boundingbox']]
|
||||
|
||||
coord = [ float(x) for x in coords.split(',') ]
|
||||
|
||||
for bbox in bboxes:
|
||||
if isinstance(bbox, str):
|
||||
bbox = bbox.split(',')
|
||||
bbox = [ float(x) for x in bbox ]
|
||||
|
||||
assert_greater_equal(bbox[0], coord[0])
|
||||
assert_less_equal(bbox[1], coord[1])
|
||||
assert_greater_equal(bbox[2], coord[2])
|
||||
assert_less_equal(bbox[3], coord[3])
|
||||
|
||||
@then(u'there are(?P<neg> no)? duplicates')
|
||||
def check_for_duplicates(context, neg):
|
||||
context.execute_steps("then at least 1 result is returned")
|
||||
|
||||
resarr = set()
|
||||
has_dupe = False
|
||||
|
||||
for res in context.response.result:
|
||||
dup = (res['osm_type'], res['class'], res['type'], res['display_name'])
|
||||
if dup in resarr:
|
||||
has_dupe = True
|
||||
break
|
||||
resarr.add(dup)
|
||||
|
||||
if neg:
|
||||
assert not has_dupe, "Found duplicate for %s" % (dup, )
|
||||
else:
|
||||
assert has_dupe, "No duplicates found"
|
||||
|
||||
Reference in New Issue
Block a user