add exporting of SQLite table

This commit is contained in:
Sarah Hoffmann
2023-10-12 10:45:12 +02:00
parent 837bdecde8
commit 114cdafe7e
3 changed files with 152 additions and 17 deletions

View File

@@ -81,21 +81,34 @@ class NominatimAPIAsync: #pylint: disable=too-many-instance-attributes
if self._engine: if self._engine:
return return
dsn = self.config.get_database_params() extra_args: Dict[str, Any] = {'future': True,
pool_size = self.config.get_int('API_POOL_SIZE') 'echo': self.config.get_bool('DEBUG_SQL')}
query = {k: v for k, v in dsn.items() is_sqlite = self.config.DATABASE_DSN.startswith('sqlite:')
if k not in ('user', 'password', 'dbname', 'host', 'port')}
dburl = sa.engine.URL.create( if is_sqlite:
f'postgresql+{PGCORE_LIB}', params = dict((p.split('=', 1)
database=dsn.get('dbname'), for p in self.config.DATABASE_DSN[7:].split(';')))
username=dsn.get('user'), password=dsn.get('password'), dburl = sa.engine.URL.create('sqlite+aiosqlite',
host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None, database=params.get('dbname'))
query=query)
engine = sa_asyncio.create_async_engine(dburl, future=True, else:
max_overflow=0, pool_size=pool_size, dsn = self.config.get_database_params()
echo=self.config.get_bool('DEBUG_SQL')) query = {k: v for k, v in dsn.items()
if k not in ('user', 'password', 'dbname', 'host', 'port')}
dburl = sa.engine.URL.create(
f'postgresql+{PGCORE_LIB}',
database=dsn.get('dbname'),
username=dsn.get('user'),
password=dsn.get('password'),
host=dsn.get('host'),
port=int(dsn['port']) if 'port' in dsn else None,
query=query)
extra_args['max_overflow'] = 0
extra_args['pool_size'] = self.config.get_int('API_POOL_SIZE')
engine = sa_asyncio.create_async_engine(dburl, **extra_args)
try: try:
async with engine.begin() as conn: async with engine.begin() as conn:
@@ -104,7 +117,7 @@ class NominatimAPIAsync: #pylint: disable=too-many-instance-attributes
except (PGCORE_ERROR, sa.exc.OperationalError): except (PGCORE_ERROR, sa.exc.OperationalError):
server_version = 0 server_version = 0
if server_version >= 110000: if server_version >= 110000 and not is_sqlite:
@sa.event.listens_for(engine.sync_engine, "connect") @sa.event.listens_for(engine.sync_engine, "connect")
def _on_connect(dbapi_con: Any, _: Any) -> None: def _on_connect(dbapi_con: Any, _: Any) -> None:
cursor = dbapi_con.cursor() cursor = dbapi_con.cursor()
@@ -113,6 +126,15 @@ class NominatimAPIAsync: #pylint: disable=too-many-instance-attributes
# Make sure that all connections get the new settings # Make sure that all connections get the new settings
await self.close() await self.close()
if is_sqlite:
@sa.event.listens_for(engine.sync_engine, "connect")
def _on_sqlite_connect(dbapi_con: Any, _: Any) -> None:
dbapi_con.run_async(lambda conn: conn.enable_load_extension(True))
cursor = dbapi_con.cursor()
cursor.execute("SELECT load_extension('mod_spatialite')")
cursor.execute('SELECT SetDecimalPrecision(7)')
dbapi_con.run_async(lambda conn: conn.enable_load_extension(False))
self._property_cache['DB:server_version'] = server_version self._property_cache['DB:server_version'] = server_version
self._tables = SearchTables(sa.MetaData(), engine.name) # pylint: disable=no-member self._tables = SearchTables(sa.MetaData(), engine.name) # pylint: disable=no-member

View File

@@ -28,7 +28,7 @@ class Geometry(types.UserDefinedType): # type: ignore[type-arg]
def get_col_spec(self) -> str: def get_col_spec(self) -> str:
return f'GEOMETRY({self.subtype}, 4326)' return f'GEOMETRY'
def bind_processor(self, dialect: 'sa.Dialect') -> Callable[[Any], str]: def bind_processor(self, dialect: 'sa.Dialect') -> Callable[[Any], str]:

View File

@@ -8,12 +8,17 @@
Exporting a Nominatim database to SQlite. Exporting a Nominatim database to SQlite.
""" """
from typing import Set from typing import Set
import logging
from pathlib import Path from pathlib import Path
import sqlalchemy as sa import sqlalchemy as sa
from nominatim.typing import SaSelect
from nominatim.db.sqlalchemy_types import Geometry
import nominatim.api as napi import nominatim.api as napi
LOG = logging.getLogger()
async def convert(project_dir: Path, outfile: Path, options: Set[str]) -> None: async def convert(project_dir: Path, outfile: Path, options: Set[str]) -> None:
""" Export an existing database to sqlite. The resulting database """ Export an existing database to sqlite. The resulting database
will be usable against the Python frontend of Nominatim. will be usable against the Python frontend of Nominatim.
@@ -24,7 +29,115 @@ async def convert(project_dir: Path, outfile: Path, options: Set[str]) -> None:
outapi = napi.NominatimAPIAsync(project_dir, outapi = napi.NominatimAPIAsync(project_dir,
{'NOMINATIM_DATABASE_DSN': f"sqlite:dbname={outfile}"}) {'NOMINATIM_DATABASE_DSN': f"sqlite:dbname={outfile}"})
async with api.begin() as inconn, outapi.begin() as outconn: async with api.begin() as src, outapi.begin() as dest:
pass writer = SqliteWriter(src, dest, options)
await writer.write()
finally: finally:
await api.close() await api.close()
class SqliteWriter:
""" Worker class which creates a new SQLite database.
"""
def __init__(self, src: napi.SearchConnection,
dest: napi.SearchConnection, options: Set[str]) -> None:
self.src = src
self.dest = dest
self.options = options
async def write(self) -> None:
""" Create the database structure and copy the data from
the source database to the destination.
"""
await self.dest.execute(sa.select(sa.func.InitSpatialMetaData(True, 'WGS84')))
await self.create_tables()
await self.copy_data()
await self.create_indexes()
async def create_tables(self) -> None:
""" Set up the database tables.
"""
if 'search' not in self.options:
self.dest.t.meta.remove(self.dest.t.search_name)
await self.dest.connection.run_sync(self.dest.t.meta.create_all)
# Convert all Geometry columns to Spatialite geometries
for table in self.dest.t.meta.sorted_tables:
for col in table.c:
if isinstance(col.type, Geometry):
await self.dest.execute(sa.select(
sa.func.RecoverGeometryColumn(table.name, col.name, 4326,
col.type.subtype.upper(), 'XY')))
async def copy_data(self) -> None:
""" Copy data for all registered tables.
"""
for table in self.dest.t.meta.sorted_tables:
LOG.warning("Copying '%s'", table.name)
async_result = await self.src.connection.stream(self.select_from(table.name))
async for partition in async_result.partitions(10000):
data = [{('class_' if k == 'class' else k): getattr(r, k) for k in r._fields}
for r in partition]
await self.dest.execute(table.insert(), data)
async def create_indexes(self) -> None:
""" Add indexes necessary for the frontend.
"""
# reverse place node lookup needs an extra table to simulate a
# partial index with adaptive buffering.
await self.dest.execute(sa.text(
""" CREATE TABLE placex_place_node_areas AS
SELECT place_id, ST_Expand(geometry,
14.0 * exp(-0.2 * rank_search) - 0.03) as geometry
FROM placex
WHERE rank_address between 5 and 25
and osm_type = 'N'
and linked_place_id is NULL """))
await self.dest.execute(sa.select(
sa.func.RecoverGeometryColumn('placex_place_node_areas', 'geometry',
4326, 'GEOMETRY', 'XY')))
await self.dest.execute(sa.select(sa.func.CreateSpatialIndex(
'placex_place_node_areas', 'geometry')))
# Remaining indexes.
await self.create_spatial_index('country_grid', 'geometry')
await self.create_spatial_index('placex', 'geometry')
await self.create_spatial_index('osmline', 'linegeo')
await self.create_spatial_index('tiger', 'linegeo')
await self.create_index('placex', 'place_id')
await self.create_index('placex', 'rank_address')
await self.create_index('addressline', 'place_id')
async def create_spatial_index(self, table: str, column: str) -> None:
""" Create a spatial index on the given table and column.
"""
await self.dest.execute(sa.select(
sa.func.CreateSpatialIndex(getattr(self.dest.t, table).name, column)))
async def create_index(self, table_name: str, column: str) -> None:
""" Create a simple index on the given table and column.
"""
table = getattr(self.dest.t, table_name)
await self.dest.connection.run_sync(
sa.Index(f"idx_{table}_{column}", getattr(table.c, column)).create)
def select_from(self, table: str) -> SaSelect:
""" Create the SQL statement to select the source columns and rows.
"""
columns = self.src.t.meta.tables[table].c
sql = sa.select(*(sa.func.ST_AsText(c).label(c.name)
if isinstance(c.type, Geometry) else c for c in columns))
return sql