add tests for new arm and export Python functions

This commit is contained in:
Sarah Hoffmann
2023-07-26 11:09:52 +02:00
parent d545c6d73c
commit 9448c5e16f
5 changed files with 163 additions and 47 deletions

View File

@@ -89,19 +89,22 @@ class AdminFuncs:
api = napi.NominatimAPI(args.project_dir)
if args.target != 'reverse':
for _ in range(1000):
api.reverse((random.uniform(-90, 90), random.uniform(-180, 180)),
address_details=True)
try:
if args.target != 'reverse':
for _ in range(1000):
api.reverse((random.uniform(-90, 90), random.uniform(-180, 180)),
address_details=True)
if args.target != 'search':
from ..tokenizer import factory as tokenizer_factory
if args.target != 'search':
from ..tokenizer import factory as tokenizer_factory
tokenizer = tokenizer_factory.get_tokenizer_for_db(args.config)
with connect(args.config.get_libpq_dsn()) as conn:
words = tokenizer.most_frequent_words(conn, 1000)
tokenizer = tokenizer_factory.get_tokenizer_for_db(args.config)
with connect(args.config.get_libpq_dsn()) as conn:
words = tokenizer.most_frequent_words(conn, 1000)
for word in words:
api.search(word)
for word in words:
api.search(word)
finally:
api.close()
return 0

View File

@@ -52,6 +52,8 @@ RANK_TO_OUTPUT_MAP = {
class QueryExport:
"""\
Export places as CSV file from the database.
"""
def add_args(self, parser: argparse.ArgumentParser) -> None:
@@ -63,7 +65,8 @@ class QueryExport:
group.add_argument('--output-format',
default='street;suburb;city;county;state;country',
help=("Semicolon-separated list of address types "
"(see --output-type)."))
"(see --output-type). Additionally accepts:"
"placeid,postcode"))
group.add_argument('--language',
help=("Preferred language for output "
"(use local name, if omitted)"))
@@ -91,46 +94,49 @@ async def export(args: NominatimArgs) -> int:
api = napi.NominatimAPIAsync(args.project_dir)
output_range = RANK_RANGE_MAP[args.output_type]
try:
output_range = RANK_RANGE_MAP[args.output_type]
writer = init_csv_writer(args.output_format)
writer = init_csv_writer(args.output_format)
async with api.begin() as conn, api.begin() as detail_conn:
t = conn.t.placex
async with api.begin() as conn, api.begin() as detail_conn:
t = conn.t.placex
sql = sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
t.c.class_, t.c.type, t.c.admin_level,
t.c.address, t.c.extratags,
t.c.housenumber, t.c.postcode, t.c.country_code,
t.c.importance, t.c.wikipedia, t.c.indexed_date,
t.c.rank_address, t.c.rank_search,
t.c.centroid)\
.where(t.c.linked_place_id == None)\
.where(t.c.rank_address.between(*output_range))
sql = sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
t.c.class_, t.c.type, t.c.admin_level,
t.c.address, t.c.extratags,
t.c.housenumber, t.c.postcode, t.c.country_code,
t.c.importance, t.c.wikipedia, t.c.indexed_date,
t.c.rank_address, t.c.rank_search,
t.c.centroid)\
.where(t.c.linked_place_id == None)\
.where(t.c.rank_address.between(*output_range))
parent_place_id = await get_parent_id(conn, args.node, args.way, args.relation)
if parent_place_id:
taddr = conn.t.addressline
parent_place_id = await get_parent_id(conn, args.node, args.way, args.relation)
if parent_place_id:
taddr = conn.t.addressline
sql = sql.join(taddr, taddr.c.place_id == t.c.place_id)\
.where(taddr.c.address_place_id == parent_place_id)\
.where(taddr.c.isaddress)
sql = sql.join(taddr, taddr.c.place_id == t.c.place_id)\
.where(taddr.c.address_place_id == parent_place_id)\
.where(taddr.c.isaddress)
if args.restrict_to_country:
sql = sql.where(t.c.country_code == args.restrict_to_country.lower())
if args.restrict_to_country:
sql = sql.where(t.c.country_code == args.restrict_to_country.lower())
results = []
for row in await conn.execute(sql):
result = create_from_placex_row(row, ReverseResult)
if result is not None:
results.append(result)
results = []
for row in await conn.execute(sql):
result = create_from_placex_row(row, ReverseResult)
if result is not None:
results.append(result)
if len(results) == 1000:
if len(results) == 1000:
await dump_results(detail_conn, results, writer, args.language)
results = []
if results:
await dump_results(detail_conn, results, writer, args.language)
results = []
if results:
await dump_results(detail_conn, results, writer, args.language)
finally:
await api.close()
return 0
@@ -159,9 +165,11 @@ async def dump_results(conn: napi.SearchConnection,
result.localize(locale)
for line in (result.address_rows or []):
if line.isaddress and line.local_name\
and line.rank_address in RANK_TO_OUTPUT_MAP:
data[RANK_TO_OUTPUT_MAP[line.rank_address]] = line.local_name
if line.isaddress and line.local_name:
if line.category[1] == 'postcode':
data['postcode'] = line.local_name
elif line.rank_address in RANK_TO_OUTPUT_MAP:
data[RANK_TO_OUTPUT_MAP[line.rank_address]] = line.local_name
writer.writerow(data)