make details API work with sqlite incl. unit tests

This commit is contained in:
Sarah Hoffmann
2023-10-12 15:31:20 +02:00
parent d0c91e4acf
commit 07e6c5cf69
5 changed files with 172 additions and 77 deletions

View File

@@ -10,7 +10,6 @@ Custom functions and expressions for SQLAlchemy.
from typing import Any
import sqlalchemy as sa
from sqlalchemy.sql.expression import FunctionElement
from sqlalchemy.ext.compiler import compiles
from nominatim.typing import SaColumn
@@ -41,10 +40,11 @@ def select_index_placex_geometry_reverse_lookupplacenode(table: str) -> 'sa.Text
f" AND {table}.osm_type = 'N'")
class CrosscheckNames(FunctionElement[Any]):
class CrosscheckNames(sa.sql.functions.GenericFunction[bool]):
""" Check if in the given list of names in parameters 1 any of the names
from the JSON array in parameter 2 are contained.
"""
type = sa.Boolean()
name = 'CrosscheckNames'
inherit_cache = True
@@ -54,3 +54,42 @@ def compile_crosscheck_names(element: SaColumn,
arg1, arg2 = list(element.clauses)
return "coalesce(avals(%s) && ARRAY(SELECT * FROM json_array_elements_text(%s)), false)" % (
compiler.process(arg1, **kw), compiler.process(arg2, **kw))
@compiles(CrosscheckNames, 'sqlite') # type: ignore[no-untyped-call, misc]
def compile_sqlite_crosscheck_names(element: SaColumn,
compiler: 'sa.Compiled', **kw: Any) -> str:
arg1, arg2 = list(element.clauses)
return "EXISTS(SELECT *"\
" FROM json_each(%s) as name, json_each(%s) as match_name"\
" WHERE name.value = match_name.value)"\
% (compiler.process(arg1, **kw), compiler.process(arg2, **kw))
class JsonArrayEach(sa.sql.functions.GenericFunction[Any]):
""" Return elements of a json array as a set.
"""
name = 'JsonArrayEach'
inherit_cache = True
@compiles(JsonArrayEach) # type: ignore[no-untyped-call, misc]
def default_json_array_each(element: SaColumn, compiler: 'sa.Compiled', **kw: Any) -> str:
return "json_array_elements(%s)" % compiler.process(element.clauses, **kw)
@compiles(JsonArrayEach, 'sqlite') # type: ignore[no-untyped-call, misc]
def sqlite_json_array_each(element: SaColumn, compiler: 'sa.Compiled', **kw: Any) -> str:
return "json_each(%s)" % compiler.process(element.clauses, **kw)
class Greatest(sa.sql.functions.GenericFunction[Any]):
""" Function to compute maximum of all its input parameters.
"""
name = 'greatest'
inherit_cache = True
@compiles(Greatest, 'sqlite') # type: ignore[no-untyped-call, misc]
def sqlite_greatest(element: SaColumn, compiler: 'sa.Compiled', **kw: Any) -> str:
return "max(%s)" % compiler.process(element.clauses, **kw)

View File

@@ -18,29 +18,26 @@ from nominatim.typing import SaColumn, SaBind
#pylint: disable=all
SQLITE_FUNCTION_ALIAS = (
('ST_AsEWKB', sa.Text, 'AsEWKB'),
('ST_AsGeoJSON', sa.Text, 'AsGeoJSON'),
('ST_AsKML', sa.Text, 'AsKML'),
('ST_AsSVG', sa.Text, 'AsSVG'),
)
class Geometry_DistanceSpheroid(sa.sql.expression.FunctionElement[float]):
""" Function to compute the spherical distance in meters.
"""
type = sa.Float()
name = 'Geometry_DistanceSpheroid'
inherit_cache = True
def _add_function_alias(func: str, ftype: type, alias: str) -> None:
_FuncDef = type(func, (sa.sql.functions.GenericFunction, ), {
"type": ftype,
"name": func,
"identifier": func,
"inherit_cache": True})
func_templ = f"{alias}(%s)"
@compiles(Geometry_DistanceSpheroid) # type: ignore[no-untyped-call, misc]
def _default_distance_spheroid(element: SaColumn,
compiler: 'sa.Compiled', **kw: Any) -> str:
return "ST_DistanceSpheroid(%s,"\
" 'SPHEROID[\"WGS 84\",6378137,298.257223563, AUTHORITY[\"EPSG\",\"7030\"]]')"\
% compiler.process(element.clauses, **kw)
def _sqlite_impl(element: Any, compiler: Any, **kw: Any) -> Any:
return func_templ % compiler.process(element.clauses, **kw)
compiles(_FuncDef, 'sqlite')(_sqlite_impl) # type: ignore[no-untyped-call]
for alias in SQLITE_FUNCTION_ALIAS:
_add_function_alias(*alias)
@compiles(Geometry_DistanceSpheroid, 'sqlite') # type: ignore[no-untyped-call, misc]
def _spatialite_distance_spheroid(element: SaColumn,
compiler: 'sa.Compiled', **kw: Any) -> str:
return "Distance(%s, true)" % compiler.process(element.clauses, **kw)
class Geometry(types.UserDefinedType): # type: ignore[type-arg]
@@ -148,6 +145,39 @@ class Geometry(types.UserDefinedType): # type: ignore[type-arg]
return sa.func.ST_LineLocatePoint(self, other, type_=sa.Float)
def distance_spheroid(self, other: SaColumn) -> SaColumn:
return Geometry_DistanceSpheroid(self, other)
@compiles(Geometry, 'sqlite') # type: ignore[no-untyped-call]
def get_col_spec(self, *args, **kwargs): # type: ignore[no-untyped-def]
return 'GEOMETRY'
SQLITE_FUNCTION_ALIAS = (
('ST_AsEWKB', sa.Text, 'AsEWKB'),
('ST_GeomFromEWKT', Geometry, 'GeomFromEWKT'),
('ST_AsGeoJSON', sa.Text, 'AsGeoJSON'),
('ST_AsKML', sa.Text, 'AsKML'),
('ST_AsSVG', sa.Text, 'AsSVG'),
)
def _add_function_alias(func: str, ftype: type, alias: str) -> None:
_FuncDef = type(func, (sa.sql.functions.GenericFunction, ), {
"type": ftype,
"name": func,
"identifier": func,
"inherit_cache": True})
func_templ = f"{alias}(%s)"
def _sqlite_impl(element: Any, compiler: Any, **kw: Any) -> Any:
return func_templ % compiler.process(element.clauses, **kw)
compiles(_FuncDef, 'sqlite')(_sqlite_impl) # type: ignore[no-untyped-call]
for alias in SQLITE_FUNCTION_ALIAS:
_add_function_alias(*alias)