mirror of
https://github.com/osm-search/Nominatim.git
synced 2026-02-14 18:37:58 +00:00
make NominatimAPI[Async] a context manager
If close() isn't properly called, it can lead to odd error messages about uncaught exceptions.
This commit is contained in:
@@ -9,45 +9,34 @@ Tests for enhanced connection class for API functions.
|
||||
"""
|
||||
from pathlib import Path
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
|
||||
import sqlalchemy as sa
|
||||
|
||||
from nominatim_api import NominatimAPIAsync
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def apiobj(temp_db):
|
||||
""" Create an asynchronous SQLAlchemy engine for the test DB.
|
||||
"""
|
||||
api = NominatimAPIAsync(Path('/invalid'), {})
|
||||
yield api
|
||||
await api.close()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_scalar(apiobj, table_factory):
|
||||
async def test_run_scalar(api, table_factory):
|
||||
table_factory('foo', definition='that TEXT', content=(('a', ),))
|
||||
|
||||
async with apiobj.begin() as conn:
|
||||
async with api.begin() as conn:
|
||||
assert await conn.scalar(sa.text('SELECT * FROM foo')) == 'a'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_execute(apiobj, table_factory):
|
||||
async def test_run_execute(api, table_factory):
|
||||
table_factory('foo', definition='that TEXT', content=(('a', ),))
|
||||
|
||||
async with apiobj.begin() as conn:
|
||||
async with api.begin() as conn:
|
||||
result = await conn.execute(sa.text('SELECT * FROM foo'))
|
||||
assert result.fetchone()[0] == 'a'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_property_existing_cached(apiobj, table_factory):
|
||||
async def test_get_property_existing_cached(api, table_factory):
|
||||
table_factory('nominatim_properties',
|
||||
definition='property TEXT, value TEXT',
|
||||
content=(('dbv', '96723'), ))
|
||||
|
||||
async with apiobj.begin() as conn:
|
||||
async with api.begin() as conn:
|
||||
assert await conn.get_property('dbv') == '96723'
|
||||
|
||||
await conn.execute(sa.text('TRUNCATE nominatim_properties'))
|
||||
@@ -56,12 +45,12 @@ async def test_get_property_existing_cached(apiobj, table_factory):
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_property_existing_uncached(apiobj, table_factory):
|
||||
async def test_get_property_existing_uncached(api, table_factory):
|
||||
table_factory('nominatim_properties',
|
||||
definition='property TEXT, value TEXT',
|
||||
content=(('dbv', '96723'), ))
|
||||
|
||||
async with apiobj.begin() as conn:
|
||||
async with api.begin() as conn:
|
||||
assert await conn.get_property('dbv') == '96723'
|
||||
|
||||
await conn.execute(sa.text("UPDATE nominatim_properties SET value = '1'"))
|
||||
@@ -71,23 +60,23 @@ async def test_get_property_existing_uncached(apiobj, table_factory):
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize('param', ['foo', 'DB:server_version'])
|
||||
async def test_get_property_missing(apiobj, table_factory, param):
|
||||
async def test_get_property_missing(api, table_factory, param):
|
||||
table_factory('nominatim_properties',
|
||||
definition='property TEXT, value TEXT')
|
||||
|
||||
async with apiobj.begin() as conn:
|
||||
async with api.begin() as conn:
|
||||
with pytest.raises(ValueError):
|
||||
await conn.get_property(param)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_db_property_existing(apiobj):
|
||||
async with apiobj.begin() as conn:
|
||||
async def test_get_db_property_existing(api):
|
||||
async with api.begin() as conn:
|
||||
assert await conn.get_db_property('server_version') > 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_db_property_existing(apiobj):
|
||||
async with apiobj.begin() as conn:
|
||||
async def test_get_db_property_existing(api):
|
||||
async with api.begin() as conn:
|
||||
with pytest.raises(ValueError):
|
||||
await conn.get_db_property('dfkgjd.rijg')
|
||||
|
||||
Reference in New Issue
Block a user