extend sqlite converter for search tables

This commit is contained in:
Sarah Hoffmann
2023-12-06 13:42:58 +01:00
parent 381bd0b576
commit 0d840c8d4e
2 changed files with 110 additions and 1 deletions

View File

@@ -10,6 +10,7 @@ Custom type for an array of integers.
from typing import Any, List, cast, Optional
import sqlalchemy as sa
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.dialects.postgresql import ARRAY
from nominatim.typing import SaDialect, SaColumn
@@ -71,3 +72,16 @@ class IntArray(sa.types.TypeDecorator[Any]):
in the array.
"""
return self.op('&&', is_comparison=True)(other)
class ArrayAgg(sa.sql.functions.GenericFunction[Any]):
""" Aggregate function to collect elements in an array.
"""
type = IntArray()
identifier = 'ArrayAgg'
name = 'array_agg'
inherit_cache = True
@compiles(ArrayAgg, 'sqlite') # type: ignore[no-untyped-call, misc]
def sqlite_array_agg(element: ArrayAgg, compiler: 'sa.Compiled', **kw: Any) -> str:
return "group_concat(%s, ',')" % compiler.process(element.clauses, **kw)

View File

@@ -14,7 +14,8 @@ from pathlib import Path
import sqlalchemy as sa
from nominatim.typing import SaSelect
from nominatim.db.sqlalchemy_types import Geometry
from nominatim.db.sqlalchemy_types import Geometry, IntArray
from nominatim.api.search.query_analyzer_factory import make_query_analyzer
import nominatim.api as napi
LOG = logging.getLogger()
@@ -54,18 +55,24 @@ class SqliteWriter:
""" Create the database structure and copy the data from
the source database to the destination.
"""
LOG.warning('Setting up spatialite')
await self.dest.execute(sa.select(sa.func.InitSpatialMetaData(True, 'WGS84')))
await self.create_tables()
await self.copy_data()
if 'search' in self.options:
await self.create_word_table()
await self.create_indexes()
async def create_tables(self) -> None:
""" Set up the database tables.
"""
LOG.warning('Setting up tables')
if 'search' not in self.options:
self.dest.t.meta.remove(self.dest.t.search_name)
else:
await self.create_class_tables()
await self.dest.connection.run_sync(self.dest.t.meta.create_all)
@@ -78,6 +85,41 @@ class SqliteWriter:
col.type.subtype.upper(), 'XY')))
async def create_class_tables(self) -> None:
""" Set up the table that serve class/type-specific geometries.
"""
sql = sa.text("""SELECT tablename FROM pg_tables
WHERE tablename LIKE 'place_classtype_%'""")
for res in await self.src.execute(sql):
for db in (self.src, self.dest):
sa.Table(res[0], db.t.meta,
sa.Column('place_id', sa.BigInteger),
sa.Column('centroid', Geometry))
async def create_word_table(self) -> None:
""" Create the word table.
This table needs the property information to determine the
correct format. Therefore needs to be done after all other
data has been copied.
"""
await make_query_analyzer(self.src)
await make_query_analyzer(self.dest)
src = self.src.t.meta.tables['word']
dest = self.dest.t.meta.tables['word']
await self.dest.connection.run_sync(dest.create)
LOG.warning("Copying word table")
async_result = await self.src.connection.stream(sa.select(src))
async for partition in async_result.partitions(10000):
data = [{k: getattr(r, k) for k in r._fields} for r in partition]
await self.dest.execute(dest.insert(), data)
await self.dest.connection.run_sync(sa.Index('idx_word_woken', dest.c.word_token).create)
async def copy_data(self) -> None:
""" Copy data for all registered tables.
"""
@@ -90,6 +132,14 @@ class SqliteWriter:
for r in partition]
await self.dest.execute(table.insert(), data)
# Set up a minimal copy of pg_tables used to look up the class tables later.
pg_tables = sa.Table('pg_tables', self.dest.t.meta,
sa.Column('schemaname', sa.Text, default='public'),
sa.Column('tablename', sa.Text))
await self.dest.connection.run_sync(pg_tables.create)
data = [{'tablename': t} for t in self.dest.t.meta.tables]
await self.dest.execute(pg_tables.insert().values(data))
async def create_indexes(self) -> None:
""" Add indexes necessary for the frontend.
@@ -119,6 +169,22 @@ class SqliteWriter:
await self.create_index('placex', 'parent_place_id')
await self.create_index('placex', 'rank_address')
await self.create_index('addressline', 'place_id')
await self.create_index('postcode', 'place_id')
await self.create_index('osmline', 'place_id')
await self.create_index('tiger', 'place_id')
if 'search' in self.options:
await self.create_spatial_index('postcode', 'geometry')
await self.create_spatial_index('search_name', 'centroid')
await self.create_index('search_name', 'place_id')
await self.create_index('osmline', 'parent_place_id')
await self.create_index('tiger', 'parent_place_id')
await self.create_search_index()
for t in self.dest.t.meta.tables:
if t.startswith('place_classtype_'):
await self.dest.execute(sa.select(
sa.func.CreateSpatialIndex(t, 'centroid')))
async def create_spatial_index(self, table: str, column: str) -> None:
@@ -136,6 +202,35 @@ class SqliteWriter:
sa.Index(f"idx_{table}_{column}", getattr(table.c, column)).create)
async def create_search_index(self) -> None:
""" Create the tables and indexes needed for word lookup.
"""
tsrc = self.src.t.search_name
for column in ('name_vector', 'nameaddress_vector'):
table_name = f'reverse_search_{column}'
LOG.warning("Creating reverse search %s", table_name)
rsn = sa.Table(table_name, self.dest.t.meta,
sa.Column('word', sa.Integer()),
sa.Column('places', IntArray))
await self.dest.connection.run_sync(rsn.create)
sql = sa.select(sa.func.unnest(getattr(tsrc.c, column)).label('word'),
sa.func.ArrayAgg(tsrc.c.place_id).label('places'))\
.group_by('word')
async_result = await self.src.connection.stream(sql)
async for partition in async_result.partitions(100):
data = []
for row in partition:
row.places.sort()
data.append({'word': row.word,
'places': row.places})
await self.dest.execute(rsn.insert(), data)
await self.dest.connection.run_sync(
sa.Index(f'idx_reverse_search_{column}_word', rsn.c.word).create)
def select_from(self, table: str) -> SaSelect:
""" Create the SQL statement to select the source columns and rows.
"""