Compare commits

..

22 Commits

Author SHA1 Message Date
Sarah Hoffmann
6c00169666 Merge pull request #3997 from lonvia/fix-postcode-index
Reenable index on centroid column for location_postcodes
2026-02-22 17:20:15 +01:00
Sarah Hoffmann
f0d32501e4 location_postcodes does geometry lookups on centroid 2026-02-22 15:51:38 +01:00
Sarah Hoffmann
3e35d7fe26 Merge pull request #3996 from lonvia/improved-postcode-import
Avoid updates on initial filling of postcode table
2026-02-22 13:12:49 +01:00
Sarah Hoffmann
fff5858b53 add option to force a postcode reimport 2026-02-21 13:03:04 +01:00
Sarah Hoffmann
2507d5a298 avoid updates on initial filling of postcode table 2026-02-20 18:53:48 +01:00
Sarah Hoffmann
af9458a601 Merge pull request #3981 from Itz-Agasta/test
Implement Lazy Loading Search Endpoint
2026-02-18 19:38:05 +01:00
Itz-Agasta
855f451a5f Adds lazy loading for search endpoint availability
Introduces a mechanism to defer the search endpoint's availability check until the first request, improving startup robustness. If the search table is unavailable due to DB issues, the endpoint now responds with a 503 or 404 as appropriate, and retries the check on subsequent requests. This ensures that downtime or partial DB failures no longer prevent the API from initializing or serving reverse-only mode.
2026-02-18 21:46:55 +05:30
Sarah Hoffmann
bf17f1d01a Merge pull request #3991 from lonvia/interpolation-on-addresses
Add support for addr:interpolation on housenumbers
2026-02-18 14:25:38 +01:00
Sarah Hoffmann
9ac56c2078 add support for expanding interpolations on housenumbers 2026-02-18 11:52:21 +01:00
Sarah Hoffmann
fbe0be9301 Merge pull request #3923 from kad-link/ci/windows-smoke
CI: add dev-only Windows smoke workflow
2026-02-16 09:27:23 +01:00
Sarah Hoffmann
0249cd54da Merge pull request #3989 from lonvia/rework-misc-tests
More production SQL use in unit tests
2026-02-16 09:20:37 +01:00
Sarah Hoffmann
52b5337f36 Merge pull request #3988 from jayaddison/pr-3957-followup/empty-name-field
Indexing: fixup: add presence check for hstore 'name' field
2026-02-16 09:17:36 +01:00
James Addison
53e8334206 Indexing: fixup: add presence check for hstore name field 2026-02-16 00:01:57 +00:00
Sarah Hoffmann
c31abf58d0 make database import unit tests against real SQL 2026-02-15 21:43:17 +01:00
Sarah Hoffmann
d0bd42298e use original tables for database check tests 2026-02-15 21:43:17 +01:00
Sarah Hoffmann
d1b0bcaea7 Merge pull request #3987 from lonvia/rework-postcode-tests
Rework postcode unit tests
2026-02-15 21:42:54 +01:00
Sarah Hoffmann
c3e8fa8c43 replace postcode mock with standard postcode table fixtures 2026-02-15 16:48:31 +01:00
Sri CHaRan
24ba9651ba ci/windows: install osm2pgsql binary and enable full unit tests suite 2026-02-13 22:01:39 +05:30
Sri CHaRan
bf5ef0140a ci/windows: enable full python unit test setup for windows 2026-02-13 21:47:14 +05:30
Sri CHaRan
238f3dd1d9 ci/windows: add Postgresql setup action to tests 2026-02-13 21:47:14 +05:30
Sri Charan Chittineni
abd7c302f8 implement stage 1 : python unit tests 2026-02-13 21:47:14 +05:30
Sri CHaRan
2197236872 Add experimental Windows CI workflow 2026-02-13 21:47:14 +05:30
20 changed files with 699 additions and 328 deletions

View File

@@ -0,0 +1,95 @@
name: 'Setup Postgresql and Postgis on Windows'
description: 'Installs PostgreSQL and PostGIS for Windows and configures it for CI tests'
inputs:
postgresql-version:
description: 'Version of PostgreSQL to install'
required: true
runs:
using: "composite"
steps:
- name: Set up PostgreSQL variables
shell: pwsh
run: |
$version = "${{ inputs.postgresql-version }}"
$root = "C:\Program Files\PostgreSQL\$version"
$bin = "$root\bin"
echo "PGROOT=$root" | Out-File -FilePath $env:GITHUB_ENV -Encoding utf8 -Append
echo "PGBIN=$bin" | Out-File -FilePath $env:GITHUB_ENV -Encoding utf8 -Append
echo "$bin" | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append
- name: Decide Postgis version (Windows)
id: postgis-ver
shell: pwsh
run: |
echo "PowerShell version: ${PSVersionTable.PSVersion}"
$PG_VERSION = Split-Path $env:PGROOT -Leaf
$postgis_page = "https://download.osgeo.org/postgis/windows/pg$PG_VERSION"
echo "Detecting PostGIS version from $postgis_page for PostgreSQL $PG_VERSION"
$pgis_bundle = (Invoke-WebRequest -Uri $postgis_page -ErrorAction Stop).Links.Where({$_.href -match "^postgis.*zip$"}).href
if (!$pgis_bundle) {
Write-Error "Could not find latest PostGIS version in $postgis_page that would match ^postgis.*zip$ pattern"
exit 1
}
$pgis_bundle = [IO.Path]::ChangeExtension($pgis_bundle, [NullString]::Value)
$pgis_bundle_url = "$postgis_page/$pgis_bundle.zip"
Add-Content $env:GITHUB_OUTPUT "postgis_file=$pgis_bundle"
Add-Content $env:GITHUB_OUTPUT "postgis_bundle_url=$pgis_bundle_url"
- uses: actions/cache@v4
with:
path: |
C:/postgis.zip
key: postgis-cache-${{ steps.postgis-ver.outputs.postgis_file }}
- name: Download postgis
shell: pwsh
run: |
if (!(Test-Path "C:\postgis.zip")){(new-object net.webclient).DownloadFile($env:PGIS_BUNDLE_URL, "c:\postgis.zip")}
if (Test-path "c:\postgis_archive"){Remove-Item "c:\postgis_archive" -Recurse -Force}
7z x c:\postgis.zip -oc:\postgis_archive
env:
PGIS_BUNDLE_URL: ${{ steps.postgis-ver.outputs.postgis_bundle_url }}
- name: Install postgis
shell: bash
run: |
echo "Root: $PGROOT, Bin: $PGBIN"
cp -r c:/postgis_archive/postgis-bundle-*/* "$PGROOT"
- name: Start PostgreSQL on Windows
run: |
$pgService = Get-Service -Name postgresql*
Set-Service -InputObject $pgService -Status running -StartupType automatic
Start-Process -FilePath "$env:PGBIN\pg_isready" -Wait -PassThru
shell: pwsh
- name: Adapt postgresql configuration
shell: pwsh
env:
PGPASSWORD: root
run: |
& "$env:PGBIN\psql" -U postgres -d postgres -c "ALTER SYSTEM SET fsync = 'off';"
& "$env:PGBIN\psql" -U postgres -d postgres -c "ALTER SYSTEM SET synchronous_commit = 'off';"
& "$env:PGBIN\psql" -U postgres -d postgres -c "ALTER SYSTEM SET full_page_writes = 'off';"
& "$env:PGBIN\psql" -U postgres -d postgres -c "ALTER SYSTEM SET shared_buffers = '1GB';"
& "$env:PGBIN\psql" -U postgres -d postgres -c "ALTER SYSTEM SET port = 5432;"
Restart-Service -Name postgresql*
Start-Process -FilePath "$env:PGBIN\pg_isready" -Wait -PassThru
- name: Setup database users
shell: pwsh
env:
PGPASSWORD: root
run: |
& "$env:PGBIN\createuser" -U postgres -S www-data
& "$env:PGBIN\createuser" -U postgres -s runner

View File

@@ -1,5 +1,7 @@
name: 'Setup Postgresql and Postgis' name: 'Setup Postgresql and Postgis'
description: 'Installs PostgreSQL and PostGIS and configures it for CI tests'
inputs: inputs:
postgresql-version: postgresql-version:
description: 'Version of PostgreSQL to install' description: 'Version of PostgreSQL to install'

View File

@@ -140,6 +140,65 @@ jobs:
../venv/bin/python -m pytest test/bdd --nominatim-purge ../venv/bin/python -m pytest test/bdd --nominatim-purge
working-directory: Nominatim working-directory: Nominatim
tests-windows:
needs: create-archive
runs-on: windows-latest
steps:
- uses: actions/download-artifact@v4
with:
name: full-source
- name: Unpack Nominatim
run: tar xf nominatim-src.tar.bz2
- uses: ./Nominatim/.github/actions/setup-postgresql-windows
with:
postgresql-version: 17
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.14'
- name: Install Spatialite
run: |
Invoke-WebRequest -Uri "https://www.gaia-gis.it/gaia-sins/windows-bin-amd64/mod_spatialite-5.1.0-win-amd64.7z" -OutFile "spatialite.7z"
7z x spatialite.7z -o"C:\spatialite"
echo "C:\spatialite\mod_spatialite-5.1.0-win-amd64" | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append
- name: Install osm2pgsql
run: |
Invoke-WebRequest -Uri "https://osm2pgsql.org/download/windows/osm2pgsql-latest-x64.zip" -OutFile "osm2pgsql.zip"
Expand-Archive -Path "osm2pgsql.zip" -DestinationPath "C:\osm2pgsql"
$BinDir = Get-ChildItem -Path "C:\osm2pgsql" -Recurse -Filter "osm2pgsql.exe" | Select-Object -ExpandProperty DirectoryName | Select-Object -First 1
if (-not $BinDir) {
Write-Error "Could not find osm2pgsql.exe"
exit 1
}
echo "$BinDir" | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append
$FullExePath = Join-Path $BinDir "osm2pgsql.exe"
echo "NOMINATIM_OSM2PGSQL_BINARY=$FullExePath" | Out-File -FilePath $env:GITHUB_ENV -Encoding utf8 -Append
- name: Set UTF-8 encoding
run: |
echo "PYTHONUTF8=1" >> $env:GITHUB_ENV
[System.Console]::OutputEncoding = [System.Text.Encoding]::UTF8
- name: Install PyICU from wheel
run: |
python -m pip install https://github.com/cgohlke/pyicu-build/releases/download/v2.16.0/pyicu-2.16-cp314-cp314-win_amd64.whl
- name: Install test prerequisites
run: |
python -m pip install -U pip
python -m pip install pytest pytest-asyncio "psycopg[binary]!=3.3.0" python-dotenv pyyaml jinja2 psutil sqlalchemy pytest-bdd falcon starlette uvicorn asgi_lifespan aiosqlite osmium mwparserfromhell
- name: Python unit tests
run: |
python -m pytest test/python -k "not (import_osm or run_osm2pgsql)"
working-directory: Nominatim
install: install:
runs-on: ubuntu-latest runs-on: ubuntu-latest
needs: create-archive needs: create-archive

View File

@@ -89,7 +89,7 @@ BEGIN
-- Add the linked-place (e.g. city) name as a searchable placename in the default language (if any) -- Add the linked-place (e.g. city) name as a searchable placename in the default language (if any)
default_language := get_country_language_code(location.country_code); default_language := get_country_language_code(location.country_code);
IF default_language is not NULL AND NOT location.name ? ('name:' || default_language) THEN IF default_language is not NULL AND location.name ? 'name' AND NOT location.name ? ('name:' || default_language) THEN
location.name := location.name || hstore('name:' || default_language, location.name->'name'); location.name := location.name || hstore('name:' || default_language, location.name->'name');
END IF; END IF;

View File

@@ -153,8 +153,7 @@ BEGIN
IF ST_GeometryType(geom) in ('ST_Polygon','ST_MultiPolygon') THEN IF ST_GeometryType(geom) in ('ST_Polygon','ST_MultiPolygon') THEN
SELECT min(postcode), count(*) FROM SELECT min(postcode), count(*) FROM
(SELECT postcode FROM location_postcodes (SELECT postcode FROM location_postcodes
WHERE geom && location_postcodes.geometry -- want to use the index WHERE ST_Contains(geom, location_postcodes.centroid)
AND ST_Contains(geom, location_postcodes.centroid)
AND country_code = country AND country_code = country
LIMIT 2) sub LIMIT 2) sub
INTO outcode, cnt; INTO outcode, cnt;

View File

@@ -23,6 +23,8 @@ CREATE UNIQUE INDEX idx_location_postcodes_id ON location_postcodes
USING BTREE (place_id) {{db.tablespace.search_index}}; USING BTREE (place_id) {{db.tablespace.search_index}};
CREATE INDEX idx_location_postcodes_geometry ON location_postcodes CREATE INDEX idx_location_postcodes_geometry ON location_postcodes
USING GIST (geometry) {{db.tablespace.search_index}}; USING GIST (geometry) {{db.tablespace.search_index}};
CREATE INDEX idx_location_postcodes_centroid ON location_postcodes
USING GIST (centroid) {{db.tablespace.search_index}};
CREATE INDEX IF NOT EXISTS idx_location_postcodes_postcode ON location_postcodes CREATE INDEX IF NOT EXISTS idx_location_postcodes_postcode ON location_postcodes
USING BTREE (postcode, country_code) {{db.tablespace.search_index}}; USING BTREE (postcode, country_code) {{db.tablespace.search_index}};
CREATE INDEX IF NOT EXISTS idx_location_postcodes_osmid ON location_postcodes CREATE INDEX IF NOT EXISTS idx_location_postcodes_osmid ON location_postcodes

View File

@@ -184,6 +184,10 @@ class APIMiddleware:
formatter = load_format_dispatcher('v1', self.api.config.project_dir) formatter = load_format_dispatcher('v1', self.api.config.project_dir)
for name, func in await api_impl.get_routes(self.api): for name, func in await api_impl.get_routes(self.api):
endpoint = EndpointWrapper(name, func, self.api, formatter) endpoint = EndpointWrapper(name, func, self.api, formatter)
# If func is a LazySearchEndpoint, give it a reference to wrapper
# so it can replace wrapper.func dynamically
if hasattr(func, 'set_wrapper'):
func.set_wrapper(endpoint)
self.app.add_route(f"/{name}", endpoint) self.app.add_route(f"/{name}", endpoint)
if legacy_urls: if legacy_urls:
self.app.add_route(f"/{name}.php", endpoint) self.app.add_route(f"/{name}.php", endpoint)

View File

@@ -12,6 +12,7 @@ from typing import Optional, Any, Type, Dict, cast, Sequence, Tuple
from functools import reduce from functools import reduce
import dataclasses import dataclasses
from urllib.parse import urlencode from urllib.parse import urlencode
import asyncio
import sqlalchemy as sa import sqlalchemy as sa
@@ -124,6 +125,12 @@ def parse_geometry_details(adaptor: ASGIAdaptor, fmt: str) -> Dict[str, Any]:
} }
def has_search_name(conn: sa.engine.Connection) -> bool:
""" Check if the search_name table exists in the database.
"""
return sa.inspect(conn).has_table('search_name')
async def status_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any: async def status_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
""" Server glue for /status endpoint. See API docs for details. """ Server glue for /status endpoint. See API docs for details.
""" """
@@ -441,6 +448,61 @@ async def polygons_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
return build_response(params, params.formatting().format_result(results, fmt, {})) return build_response(params, params.formatting().format_result(results, fmt, {}))
async def search_unavailable_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
""" Server glue for /search endpoint in reverse-only mode.
Returns 404 when search functionality is not available.
"""
params.raise_error('Search not available (reverse-only mode)', 404)
class LazySearchEndpoint:
"""
Lazy-loading search endpoint that replaces itself after first successful check.
- Falcon: EndpointWrapper stores this instance in wrapper.func
On first request, replace wrapper.func directly with real endpoint
- Starlette: _wrap_endpoint wraps this instance in a callback
store a delegate function and call it on subsequent requests
"""
def __init__(self, api: NominatimAPIAsync, real_endpoint: EndpointFunc):
self.api = api
self.real_endpoint = real_endpoint
self._lock = asyncio.Lock()
self._wrapper: Any = None # Store reference to Falcon's EndpointWrapper
self._delegate: Optional[EndpointFunc] = None
def set_wrapper(self, wrapper: Any) -> None:
self._wrapper = wrapper
async def __call__(self, api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
if self._delegate is None:
async with self._lock:
# Double-check after acquiring lock (thread safety)
if self._delegate is None:
try:
async with api.begin() as conn:
has_table = await conn.connection.run_sync(
has_search_name)
if has_table:
# For Starlette
self._delegate = self.real_endpoint
# For Falcon
if self._wrapper is not None:
self._wrapper.func = self.real_endpoint
else:
self._delegate = search_unavailable_endpoint
if self._wrapper is not None:
self._wrapper.func = search_unavailable_endpoint
except (PGCORE_ERROR, sa.exc.OperationalError, OSError):
# No _delegate set, so retry on next request
params.raise_error('Search temporarily unavailable', 503)
return await self._delegate(api, params)
async def get_routes(api: NominatimAPIAsync) -> Sequence[Tuple[str, EndpointFunc]]: async def get_routes(api: NominatimAPIAsync) -> Sequence[Tuple[str, EndpointFunc]]:
routes = [ routes = [
('status', status_endpoint), ('status', status_endpoint),
@@ -451,15 +513,13 @@ async def get_routes(api: NominatimAPIAsync) -> Sequence[Tuple[str, EndpointFunc
('polygons', polygons_endpoint), ('polygons', polygons_endpoint),
] ]
def has_search_name(conn: sa.engine.Connection) -> bool:
insp = sa.inspect(conn)
return insp.has_table('search_name')
try: try:
async with api.begin() as conn: async with api.begin() as conn:
if await conn.connection.run_sync(has_search_name): if await conn.connection.run_sync(has_search_name):
routes.append(('search', search_endpoint)) routes.append(('search', search_endpoint))
except (PGCORE_ERROR, sa.exc.OperationalError): else:
pass # ignored routes.append(('search', search_unavailable_endpoint))
except (PGCORE_ERROR, sa.exc.OperationalError, OSError):
routes.append(('search', LazySearchEndpoint(api, search_endpoint)))
return routes return routes

View File

@@ -120,6 +120,7 @@ class NominatimArgs:
data_object: Sequence[Tuple[str, int]] data_object: Sequence[Tuple[str, int]]
data_area: Sequence[Tuple[str, int]] data_area: Sequence[Tuple[str, int]]
ro_access: bool ro_access: bool
postcode_force_reimport: bool
# Arguments to 'replication' # Arguments to 'replication'
init: bool init: bool

View File

@@ -84,6 +84,10 @@ class UpdateRefresh:
help='Do not enable code for propagating updates') help='Do not enable code for propagating updates')
group.add_argument('--enable-debug-statements', action='store_true', group.add_argument('--enable-debug-statements', action='store_true',
help='Enable debug warning statements in functions') help='Enable debug warning statements in functions')
group = parser.add_argument_group('Arguments for postcode refresh')
group.add_argument('--force-reimport', action='store_true',
dest='postcode_force_reimport',
help='Recompute the postcodes from scratch instead of updating')
def run(self, args: NominatimArgs) -> int: def run(self, args: NominatimArgs) -> int:
from ..tools import refresh, postcodes from ..tools import refresh, postcodes
@@ -96,7 +100,8 @@ class UpdateRefresh:
LOG.warning("Update postcodes centroid") LOG.warning("Update postcodes centroid")
tokenizer = self._get_tokenizer(args.config) tokenizer = self._get_tokenizer(args.config)
postcodes.update_postcodes(args.config.get_libpq_dsn(), postcodes.update_postcodes(args.config.get_libpq_dsn(),
args.project_dir, tokenizer) args.project_dir, tokenizer,
force_reimport=args.postcode_force_reimport)
indexer = Indexer(args.config.get_libpq_dsn(), tokenizer, indexer = Indexer(args.config.get_libpq_dsn(), tokenizer,
args.threads or 1) args.threads or 1)
asyncio.run(indexer.index_postcodes()) asyncio.run(indexer.index_postcodes())

View File

@@ -2,7 +2,7 @@
# #
# This file is part of Nominatim. (https://nominatim.org) # This file is part of Nominatim. (https://nominatim.org)
# #
# Copyright (C) 2024 by the Nominatim developer community. # Copyright (C) 2026 by the Nominatim developer community.
# For a full list of authors see the git log. # For a full list of authors see the git log.
""" """
Sanitizer that preprocesses address tags for house numbers. The sanitizer Sanitizer that preprocesses address tags for house numbers. The sanitizer
@@ -10,6 +10,7 @@ allows to
* define which tags are to be considered house numbers (see 'filter-kind') * define which tags are to be considered house numbers (see 'filter-kind')
* split house number lists into individual numbers (see 'delimiters') * split house number lists into individual numbers (see 'delimiters')
* expand interpolated house numbers
Arguments: Arguments:
delimiters: Define the set of characters to be used for delimiters: Define the set of characters to be used for
@@ -23,13 +24,19 @@ Arguments:
instead of a house number. Either takes a single string instead of a house number. Either takes a single string
or a list of strings, where each string is a regular or a list of strings, where each string is a regular
expression that must match the full house number value. expression that must match the full house number value.
expand-interpolations: When true, expand house number ranges to separate numbers
when an 'interpolation' is present. (default: true)
""" """
from typing import Callable, Iterator, List from typing import Callable, Iterator, Iterable, Union
import re
from ...data.place_name import PlaceName from ...data.place_name import PlaceName
from .base import ProcessInfo from .base import ProcessInfo
from .config import SanitizerConfig from .config import SanitizerConfig
RANGE_REGEX = re.compile(r'\d+-\d+')
class _HousenumberSanitizer: class _HousenumberSanitizer:
@@ -38,21 +45,40 @@ class _HousenumberSanitizer:
self.split_regexp = config.get_delimiter() self.split_regexp = config.get_delimiter()
self.filter_name = config.get_filter('convert-to-name', 'FAIL_ALL') self.filter_name = config.get_filter('convert-to-name', 'FAIL_ALL')
self.expand_interpolations = config.get_bool('expand-interpolations', True)
def __call__(self, obj: ProcessInfo) -> None: def __call__(self, obj: ProcessInfo) -> None:
if not obj.address: if not obj.address:
return return
new_address: List[PlaceName] = [] itype: Union[int, str, None] = None
if self.expand_interpolations:
itype = next((i.name for i in obj.address if i.kind == 'interpolation'), None)
if itype is not None:
if itype == 'all':
itype = 1
elif len(itype) == 1 and itype.isdigit():
itype = int(itype)
elif itype not in ('odd', 'even'):
itype = None
new_address: list[PlaceName] = []
for item in obj.address: for item in obj.address:
if self.filter_kind(item.kind): if self.filter_kind(item.kind):
if itype is not None and RANGE_REGEX.fullmatch(item.name):
hnrs = self._expand_range(itype, item.name)
if hnrs:
new_address.extend(item.clone(kind='housenumber', name=str(hnr))
for hnr in hnrs)
continue
if self.filter_name(item.name): if self.filter_name(item.name):
obj.names.append(item.clone(kind='housenumber')) obj.names.append(item.clone(kind='housenumber'))
else: else:
new_address.extend(item.clone(kind='housenumber', name=n) new_address.extend(item.clone(kind='housenumber', name=n)
for n in self.sanitize(item.name)) for n in self.sanitize(item.name))
else: elif item.kind != 'interpolation':
# Don't touch other address items. # Ignore interpolation, otherwise don't touch other address items.
new_address.append(item) new_address.append(item)
obj.address = new_address obj.address = new_address
@@ -70,6 +96,22 @@ class _HousenumberSanitizer:
def _regularize(self, hnr: str) -> Iterator[str]: def _regularize(self, hnr: str) -> Iterator[str]:
yield hnr yield hnr
def _expand_range(self, itype: Union[str, int], hnr: str) -> Iterable[int]:
first, last = (int(i) for i in hnr.split('-'))
if isinstance(itype, int):
step = itype
else:
step = 2
if (itype == 'even' and first % 2 == 1)\
or (itype == 'odd' and first % 2 == 0):
first += 1
if (last + 1 - first) / step < 10:
return range(first, last + 1, step)
return []
def create(config: SanitizerConfig) -> Callable[[ProcessInfo], None]: def create(config: SanitizerConfig) -> Callable[[ProcessInfo], None]:
""" Create a housenumber processing function. """ Create a housenumber processing function.

View File

@@ -78,7 +78,7 @@ class _PostcodeCollector:
self.collected[normalized] += (x, y) self.collected[normalized] += (x, y)
def commit(self, conn: Connection, analyzer: AbstractAnalyzer, def commit(self, conn: Connection, analyzer: AbstractAnalyzer,
project_dir: Optional[Path]) -> None: project_dir: Optional[Path], is_initial: bool) -> None:
""" Update postcodes for the country from the postcodes selected so far. """ Update postcodes for the country from the postcodes selected so far.
When 'project_dir' is set, then any postcode files found in this When 'project_dir' is set, then any postcode files found in this
@@ -87,11 +87,14 @@ class _PostcodeCollector:
if project_dir is not None: if project_dir is not None:
self._update_from_external(analyzer, project_dir) self._update_from_external(analyzer, project_dir)
with conn.cursor() as cur: if is_initial:
cur.execute("""SELECT postcode FROM location_postcodes to_delete = []
WHERE country_code = %s AND osm_id is null""", else:
(self.country, )) with conn.cursor() as cur:
to_delete = [row[0] for row in cur if row[0] not in self.collected] cur.execute("""SELECT postcode FROM location_postcodes
WHERE country_code = %s AND osm_id is null""",
(self.country, ))
to_delete = [row[0] for row in cur if row[0] not in self.collected]
to_add = [dict(zip(('pc', 'x', 'y'), (k, *v.centroid()))) to_add = [dict(zip(('pc', 'x', 'y'), (k, *v.centroid())))
for k, v in self.collected.items()] for k, v in self.collected.items()]
@@ -102,22 +105,32 @@ class _PostcodeCollector:
with conn.cursor() as cur: with conn.cursor() as cur:
if to_add: if to_add:
cur.executemany(pysql.SQL( columns = ['country_code',
"""INSERT INTO location_postcodes 'rank_search',
(country_code, rank_search, postcode, centroid, geometry) 'postcode',
VALUES ({}, {}, %(pc)s, 'centroid',
ST_SetSRID(ST_MakePoint(%(x)s, %(y)s), 4326), 'geometry']
expand_by_meters(ST_SetSRID(ST_MakePoint(%(x)s, %(y)s), 4326), {})) values = [pysql.Literal(self.country),
""").format(pysql.Literal(self.country), pysql.Literal(_extent_to_rank(self.extent)),
pysql.Literal(_extent_to_rank(self.extent)), pysql.Placeholder('pc'),
pysql.Literal(self.extent)), pysql.SQL('ST_SetSRID(ST_MakePoint(%(x)s, %(y)s), 4326)'),
to_add) pysql.SQL("""expand_by_meters(
ST_SetSRID(ST_MakePoint(%(x)s, %(y)s), 4326), {})""")
.format(pysql.Literal(self.extent))]
if is_initial:
columns.extend(('place_id', 'indexed_status'))
values.extend((pysql.SQL("nextval('seq_place')"), pysql.Literal(1)))
cur.executemany(pysql.SQL("INSERT INTO location_postcodes ({}) VALUES ({})")
.format(pysql.SQL(',')
.join(pysql.Identifier(c) for c in columns),
pysql.SQL(',').join(values)),
to_add)
if to_delete: if to_delete:
cur.execute("""DELETE FROM location_postcodes cur.execute("""DELETE FROM location_postcodes
WHERE country_code = %s and postcode = any(%s) WHERE country_code = %s and postcode = any(%s)
AND osm_id is null AND osm_id is null
""", (self.country, to_delete)) """, (self.country, to_delete))
cur.execute("ANALYSE location_postcodes")
def _update_from_external(self, analyzer: AbstractAnalyzer, project_dir: Path) -> None: def _update_from_external(self, analyzer: AbstractAnalyzer, project_dir: Path) -> None:
""" Look for an external postcode file for the active country in """ Look for an external postcode file for the active country in
@@ -164,7 +177,8 @@ class _PostcodeCollector:
return None return None
def update_postcodes(dsn: str, project_dir: Optional[Path], tokenizer: AbstractTokenizer) -> None: def update_postcodes(dsn: str, project_dir: Optional[Path],
tokenizer: AbstractTokenizer, force_reimport: bool = False) -> None:
""" Update the table of postcodes from the input tables """ Update the table of postcodes from the input tables
placex and place_postcode. placex and place_postcode.
""" """
@@ -176,45 +190,76 @@ def update_postcodes(dsn: str, project_dir: Optional[Path], tokenizer: AbstractT
SET country_code = get_country_code(centroid) SET country_code = get_country_code(centroid)
WHERE country_code is null WHERE country_code is null
""") """)
if force_reimport:
conn.execute("TRUNCATE location_postcodes")
is_initial = True
else:
is_initial = _is_postcode_table_empty(conn)
if is_initial:
conn.execute("""ALTER TABLE location_postcodes
DISABLE TRIGGER location_postcodes_before_insert""")
# Now update first postcode areas # Now update first postcode areas
_update_postcode_areas(conn, analyzer, matcher) _update_postcode_areas(conn, analyzer, matcher, is_initial)
# Then fill with estimated postcode centroids from other info # Then fill with estimated postcode centroids from other info
_update_guessed_postcode(conn, analyzer, matcher, project_dir) _update_guessed_postcode(conn, analyzer, matcher, project_dir, is_initial)
if is_initial:
conn.execute("""ALTER TABLE location_postcodes
ENABLE TRIGGER location_postcodes_before_insert""")
conn.commit() conn.commit()
analyzer.update_postcodes_from_db() analyzer.update_postcodes_from_db()
def _is_postcode_table_empty(conn: Connection) -> bool:
""" Check if there are any entries in the location_postcodes table yet.
"""
with conn.cursor() as cur:
cur.execute("SELECT place_id FROM location_postcodes LIMIT 1")
return cur.fetchone() is None
def _insert_postcode_areas(conn: Connection, country_code: str, def _insert_postcode_areas(conn: Connection, country_code: str,
extent: int, pcs: list[dict[str, str]]) -> None: extent: int, pcs: list[dict[str, str]],
is_initial: bool) -> None:
if pcs: if pcs:
with conn.cursor() as cur: with conn.cursor() as cur:
columns = ['osm_id', 'country_code',
'rank_search', 'postcode',
'centroid', 'geometry']
values = [pysql.Identifier('osm_id'), pysql.Identifier('country_code'),
pysql.Literal(_extent_to_rank(extent)), pysql.Placeholder('out'),
pysql.Identifier('centroid'), pysql.Identifier('geometry')]
if is_initial:
columns.extend(('place_id', 'indexed_status'))
values.extend((pysql.SQL("nextval('seq_place')"), pysql.Literal(1)))
cur.executemany( cur.executemany(
pysql.SQL( pysql.SQL(
""" INSERT INTO location_postcodes """ INSERT INTO location_postcodes ({})
(osm_id, country_code, rank_search, postcode, centroid, geometry) SELECT {} FROM place_postcode
SELECT osm_id, country_code, {}, %(out)s, centroid, geometry
FROM place_postcode
WHERE osm_type = 'R' WHERE osm_type = 'R'
and country_code = {} and postcode = %(in)s and country_code = {} and postcode = %(in)s
and geometry is not null and geometry is not null
""").format(pysql.Literal(_extent_to_rank(extent)), """).format(pysql.SQL(',')
.join(pysql.Identifier(c) for c in columns),
pysql.SQL(',').join(values),
pysql.Literal(country_code)), pysql.Literal(country_code)),
pcs) pcs)
def _update_postcode_areas(conn: Connection, analyzer: AbstractAnalyzer, def _update_postcode_areas(conn: Connection, analyzer: AbstractAnalyzer,
matcher: PostcodeFormatter) -> None: matcher: PostcodeFormatter, is_initial: bool) -> None:
""" Update the postcode areas made from postcode boundaries. """ Update the postcode areas made from postcode boundaries.
""" """
# first delete all areas that have gone # first delete all areas that have gone
conn.execute(""" DELETE FROM location_postcodes pc if not is_initial:
WHERE pc.osm_id is not null conn.execute(""" DELETE FROM location_postcodes pc
AND NOT EXISTS( WHERE pc.osm_id is not null
SELECT * FROM place_postcode pp AND NOT EXISTS(
WHERE pp.osm_type = 'R' and pp.osm_id = pc.osm_id SELECT * FROM place_postcode pp
and geometry is not null) WHERE pp.osm_type = 'R' and pp.osm_id = pc.osm_id
""") and geometry is not null)
""")
# now insert all in country batches, triggers will ensure proper updates # now insert all in country batches, triggers will ensure proper updates
with conn.cursor() as cur: with conn.cursor() as cur:
cur.execute(""" SELECT country_code, postcode FROM place_postcode cur.execute(""" SELECT country_code, postcode FROM place_postcode
@@ -230,7 +275,8 @@ def _update_postcode_areas(conn: Connection, analyzer: AbstractAnalyzer,
fmt = matcher.get_matcher(country_code) fmt = matcher.get_matcher(country_code)
elif country_code != cc: elif country_code != cc:
_insert_postcode_areas(conn, country_code, _insert_postcode_areas(conn, country_code,
matcher.get_postcode_extent(country_code), pcs) matcher.get_postcode_extent(country_code), pcs,
is_initial)
country_code = cc country_code = cc
fmt = matcher.get_matcher(country_code) fmt = matcher.get_matcher(country_code)
pcs = [] pcs = []
@@ -241,21 +287,26 @@ def _update_postcode_areas(conn: Connection, analyzer: AbstractAnalyzer,
if country_code is not None and pcs: if country_code is not None and pcs:
_insert_postcode_areas(conn, country_code, _insert_postcode_areas(conn, country_code,
matcher.get_postcode_extent(country_code), pcs) matcher.get_postcode_extent(country_code), pcs,
is_initial)
def _update_guessed_postcode(conn: Connection, analyzer: AbstractAnalyzer, def _update_guessed_postcode(conn: Connection, analyzer: AbstractAnalyzer,
matcher: PostcodeFormatter, project_dir: Optional[Path]) -> None: matcher: PostcodeFormatter, project_dir: Optional[Path],
is_initial: bool) -> None:
""" Computes artificial postcode centroids from the placex table, """ Computes artificial postcode centroids from the placex table,
potentially enhances it with external data and then updates the potentially enhances it with external data and then updates the
postcodes in the table 'location_postcodes'. postcodes in the table 'location_postcodes'.
""" """
# First get the list of countries that currently have postcodes. # First get the list of countries that currently have postcodes.
# (Doing this before starting to insert, so it is fast on import.) # (Doing this before starting to insert, so it is fast on import.)
with conn.cursor() as cur: if is_initial:
cur.execute("""SELECT DISTINCT country_code FROM location_postcodes todo_countries: set[str] = set()
WHERE osm_id is null""") else:
todo_countries = {row[0] for row in cur} with conn.cursor() as cur:
cur.execute("""SELECT DISTINCT country_code FROM location_postcodes
WHERE osm_id is null""")
todo_countries = {row[0] for row in cur}
# Next, get the list of postcodes that are already covered by areas. # Next, get the list of postcodes that are already covered by areas.
area_pcs = defaultdict(set) area_pcs = defaultdict(set)
@@ -275,6 +326,7 @@ def _update_guessed_postcode(conn: Connection, analyzer: AbstractAnalyzer,
FROM place_postcode WHERE geometry is not null) FROM place_postcode WHERE geometry is not null)
""") """)
cur.execute("CREATE INDEX ON _global_postcode_area USING gist(geometry)") cur.execute("CREATE INDEX ON _global_postcode_area USING gist(geometry)")
# Recompute the list of valid postcodes from placex. # Recompute the list of valid postcodes from placex.
with conn.cursor(name="placex_postcodes") as cur: with conn.cursor(name="placex_postcodes") as cur:
cur.execute(""" cur.execute("""
@@ -296,7 +348,7 @@ def _update_guessed_postcode(conn: Connection, analyzer: AbstractAnalyzer,
for country, postcode, x, y in cur: for country, postcode, x, y in cur:
if collector is None or country != collector.country: if collector is None or country != collector.country:
if collector is not None: if collector is not None:
collector.commit(conn, analyzer, project_dir) collector.commit(conn, analyzer, project_dir, is_initial)
collector = _PostcodeCollector(country, matcher.get_matcher(country), collector = _PostcodeCollector(country, matcher.get_matcher(country),
matcher.get_postcode_extent(country), matcher.get_postcode_extent(country),
exclude=area_pcs[country]) exclude=area_pcs[country])
@@ -304,14 +356,14 @@ def _update_guessed_postcode(conn: Connection, analyzer: AbstractAnalyzer,
collector.add(postcode, x, y) collector.add(postcode, x, y)
if collector is not None: if collector is not None:
collector.commit(conn, analyzer, project_dir) collector.commit(conn, analyzer, project_dir, is_initial)
# Now handle any countries that are only in the postcode table. # Now handle any countries that are only in the postcode table.
for country in todo_countries: for country in todo_countries:
fmt = matcher.get_matcher(country) fmt = matcher.get_matcher(country)
ext = matcher.get_postcode_extent(country) ext = matcher.get_postcode_extent(country)
_PostcodeCollector(country, fmt, ext, _PostcodeCollector(country, fmt, ext,
exclude=area_pcs[country]).commit(conn, analyzer, project_dir) exclude=area_pcs[country]).commit(conn, analyzer, project_dir, False)
conn.execute("DROP TABLE IF EXISTS _global_postcode_area") conn.execute("DROP TABLE IF EXISTS _global_postcode_area")

View File

@@ -299,18 +299,23 @@ Feature: Linking of places
Scenario: Linked places expand default language names Scenario: Linked places expand default language names
Given the grid with origin CO Given the grid with origin CO
| 1 | | 2 | | 1 | | 2 | | 5 | | 6 |
| | 9 | | | | 9 | | | | 10 | |
| 4 | | 3 | | 4 | | 3 | | 8 | | 7 |
Given the places And the places
| osm | class | type | name+name | geometry | | osm | class | type | name+name | geometry |
| N9 | place | city | Popayán | 9 | | N9 | place | city | Popayán | 9 |
Given the places And the places
| osm | class | type | name+name:en | geometry |
| N10 | place | city | Open | 10 |
And the places
| osm | class | type | name+name | geometry | admin | | osm | class | type | name+name | geometry | admin |
| R1 | boundary | administrative | Perímetro Urbano Popayán | (1,2,3,4,1) | 8 | | R1 | boundary | administrative | Perímetro Urbano Popayán | (1,2,3,4,1) | 8 |
| R2 | boundary | administrative | Abre | (5,6,7,8,5) | 8 |
And the relations And the relations
| id | members | | id | members |
| 1 | N9:label | | 1 | N9:label |
| 2 | N10:label |
When importing When importing
Then placex contains Then placex contains
| object | linked_place_id | | object | linked_place_id |

View File

@@ -318,3 +318,64 @@ Feature: Searching of house numbers
Then the result set contains Then the result set contains
| object | | object |
| W20 | | W20 |
Scenario: A housenumber with interpolation is found
Given the places
| osm | class | type | housenr | addr+interpolation | geometry |
| N1 | building | yes | 1-5 | odd | 9 |
And the places
| osm | class | type | name | geometry |
| W10 | highway | path | Rue Paris | 1,2,3 |
When importing
When geocoding "Rue Paris 1"
Then the result set contains
| object | address+house_number |
| N1 | 1-5 |
When geocoding "Rue Paris 3"
Then the result set contains
| object | address+house_number |
| N1 | 1-5 |
When geocoding "Rue Paris 5"
Then the result set contains
| object | address+house_number |
| N1 | 1-5 |
When geocoding "Rue Paris 2"
Then the result set contains
| object |
| W10 |
Scenario: A housenumber with bad interpolation is ignored
Given the places
| osm | class | type | housenr | addr+interpolation | geometry |
| N1 | building | yes | 1-5 | bad | 9 |
And the places
| osm | class | type | name | geometry |
| W10 | highway | path | Rue Paris | 1,2,3 |
When importing
When geocoding "Rue Paris 1-5"
Then the result set contains
| object | address+house_number |
| N1 | 1-5 |
When geocoding "Rue Paris 3"
Then the result set contains
| object |
| W10 |
Scenario: A bad housenumber with a good interpolation is just a housenumber
Given the places
| osm | class | type | housenr | addr+interpolation | geometry |
| N1 | building | yes | 1-100 | all | 9 |
And the places
| osm | class | type | name | geometry |
| W10 | highway | path | Rue Paris | 1,2,3 |
When importing
When geocoding "Rue Paris 1-100"
Then the result set contains
| object | address+house_number |
| N1 | 1-100 |
When geocoding "Rue Paris 3"
Then the result set contains
| object |
| W10 |

View File

@@ -200,14 +200,15 @@ def test_get_path_empty(make_config):
assert not config.get_path('TOKENIZER_CONFIG') assert not config.get_path('TOKENIZER_CONFIG')
def test_get_path_absolute(make_config, monkeypatch): def test_get_path_absolute(make_config, monkeypatch, tmp_path):
config = make_config() config = make_config()
monkeypatch.setenv('NOMINATIM_FOOBAR', '/dont/care') p = (tmp_path / "does_not_exist").resolve()
monkeypatch.setenv('NOMINATIM_FOOBAR', str(p))
result = config.get_path('FOOBAR') result = config.get_path('FOOBAR')
assert isinstance(result, Path) assert isinstance(result, Path)
assert str(result) == '/dont/care' assert str(result) == str(p)
def test_get_path_relative(make_config, monkeypatch, tmp_path): def test_get_path_relative(make_config, monkeypatch, tmp_path):

View File

@@ -6,8 +6,12 @@
# For a full list of authors see the git log. # For a full list of authors see the git log.
import itertools import itertools
import sys import sys
import asyncio
from pathlib import Path from pathlib import Path
if sys.platform == 'win32':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
import psycopg import psycopg
from psycopg import sql as pysql from psycopg import sql as pysql
import pytest import pytest
@@ -145,11 +149,12 @@ def country_row(country_table, temp_db_cursor):
@pytest.fixture @pytest.fixture
def load_sql(temp_db_conn, country_row): def load_sql(temp_db_conn, country_table):
proc = SQLPreprocessor(temp_db_conn, Configuration(None)) conf = Configuration(None)
def _run(filename, **kwargs): def _run(*filename, **kwargs):
proc.run_sql_file(temp_db_conn, filename, **kwargs) for fn in filename:
SQLPreprocessor(temp_db_conn, conf).run_sql_file(temp_db_conn, fn, **kwargs)
return _run return _run

View File

@@ -2,7 +2,7 @@
# #
# This file is part of Nominatim. (https://nominatim.org) # This file is part of Nominatim. (https://nominatim.org)
# #
# Copyright (C) 2025 by the Nominatim developer community. # Copyright (C) 2026 by the Nominatim developer community.
# For a full list of authors see the git log. # For a full list of authors see the git log.
""" """
Tests for the sanitizer that normalizes housenumbers. Tests for the sanitizer that normalizes housenumbers.
@@ -67,3 +67,25 @@ def test_convert_to_name_unconverted(def_config, number):
assert 'housenumber' not in set(p.kind for p in names) assert 'housenumber' not in set(p.kind for p in names)
assert ('housenumber', number) in set((p.kind, p.name) for p in address) assert ('housenumber', number) in set((p.kind, p.name) for p in address)
@pytest.mark.parametrize('hnr,itype,out', [
('1-5', 'all', (1, 2, 3, 4, 5)),
('1-5', 'odd', (1, 3, 5)),
('1-5', 'even', (2, 4)),
('6-9', '1', (6, 7, 8, 9)),
('6-9', '2', (6, 8)),
('6-9', '3', (6, 9)),
('6-9', '5', (6,)),
('6-9', 'odd', (7, 9)),
('6-9', 'even', (6, 8)),
('6-22', 'even', (6, 8, 10, 12, 14, 16, 18, 20, 22))
])
def test_convert_interpolations(sanitize, hnr, itype, out):
assert set(sanitize(housenumber=hnr, interpolation=itype)) \
== {('housenumber', str(i)) for i in out}
@pytest.mark.parametrize('hnr', ('23', '23-', '3z-f', '1-10', '5-1', '1-4-5'))
def test_ignore_interpolation_with_bad_housenumber(sanitize, hnr):
assert sanitize(housenumber=hnr, interpolation='all') == [('housenumber', hnr)]

View File

@@ -2,7 +2,7 @@
# #
# This file is part of Nominatim. (https://nominatim.org) # This file is part of Nominatim. (https://nominatim.org)
# #
# Copyright (C) 2025 by the Nominatim developer community. # Copyright (C) 2026 by the Nominatim developer community.
# For a full list of authors see the git log. # For a full list of authors see the git log.
""" """
Tests for database integrity checks. Tests for database integrity checks.
@@ -46,8 +46,7 @@ def test_check_database_version_bad(property_table, temp_db_conn, def_config):
assert chkdb.check_database_version(temp_db_conn, def_config) == chkdb.CheckState.FATAL assert chkdb.check_database_version(temp_db_conn, def_config) == chkdb.CheckState.FATAL
def test_check_placex_table_good(table_factory, temp_db_conn, def_config): def test_check_placex_table_good(placex_table, temp_db_conn, def_config):
table_factory('placex')
assert chkdb.check_placex_table(temp_db_conn, def_config) == chkdb.CheckState.OK assert chkdb.check_placex_table(temp_db_conn, def_config) == chkdb.CheckState.OK
@@ -55,13 +54,13 @@ def test_check_placex_table_bad(temp_db_conn, def_config):
assert chkdb.check_placex_table(temp_db_conn, def_config) == chkdb.CheckState.FATAL assert chkdb.check_placex_table(temp_db_conn, def_config) == chkdb.CheckState.FATAL
def test_check_placex_table_size_good(table_factory, temp_db_conn, def_config): def test_check_placex_table_size_good(placex_row, temp_db_conn, def_config):
table_factory('placex', content=((1, ), (2, ))) for _ in range(2):
placex_row()
assert chkdb.check_placex_size(temp_db_conn, def_config) == chkdb.CheckState.OK assert chkdb.check_placex_size(temp_db_conn, def_config) == chkdb.CheckState.OK
def test_check_placex_table_size_bad(table_factory, temp_db_conn, def_config): def test_check_placex_table_size_bad(placex_table, temp_db_conn, def_config):
table_factory('placex')
assert chkdb.check_placex_size(temp_db_conn, def_config) == chkdb.CheckState.FATAL assert chkdb.check_placex_size(temp_db_conn, def_config) == chkdb.CheckState.FATAL
@@ -84,15 +83,22 @@ def test_check_tokenizer(temp_db_conn, def_config, monkeypatch,
assert chkdb.check_tokenizer(temp_db_conn, def_config) == state assert chkdb.check_tokenizer(temp_db_conn, def_config) == state
def test_check_indexing_good(table_factory, temp_db_conn, def_config): def test_check_indexing_good(placex_row, temp_db_conn, def_config):
table_factory('placex', 'place_id int, indexed_status smallint', for _ in range(2):
content=((1, 0), (2, 0))) placex_row(indexed_status=0)
assert chkdb.check_indexing(temp_db_conn, def_config) == chkdb.CheckState.OK assert chkdb.check_indexing(temp_db_conn, def_config) == chkdb.CheckState.OK
def test_check_indexing_bad(table_factory, temp_db_conn, def_config): def test_check_indexing_bad(placex_row, temp_db_conn, def_config):
table_factory('placex', 'place_id int, indexed_status smallint', for status in (0, 2):
content=((1, 0), (2, 2))) placex_row(indexed_status=status)
assert chkdb.check_indexing(temp_db_conn, def_config) == chkdb.CheckState.FAIL
def test_check_indexing_bad_frozen(placex_row, temp_db_conn, def_config):
for status in (0, 2):
placex_row(indexed_status=status)
temp_db_conn.execute('DROP TABLE place')
assert chkdb.check_indexing(temp_db_conn, def_config) == chkdb.CheckState.WARN assert chkdb.check_indexing(temp_db_conn, def_config) == chkdb.CheckState.WARN

View File

@@ -78,8 +78,8 @@ def test_setup_skeleton_already_exists(temp_db):
database_import.setup_database_skeleton(f'dbname={temp_db}') database_import.setup_database_skeleton(f'dbname={temp_db}')
def test_import_osm_data_simple(table_factory, osm2pgsql_options, capfd): def test_import_osm_data_simple(place_row, osm2pgsql_options, capfd):
table_factory('place', content=((1, ), )) place_row()
database_import.import_osm_data(Path('file.pbf'), osm2pgsql_options) database_import.import_osm_data(Path('file.pbf'), osm2pgsql_options)
captured = capfd.readouterr() captured = capfd.readouterr()
@@ -92,8 +92,8 @@ def test_import_osm_data_simple(table_factory, osm2pgsql_options, capfd):
assert 'file.pbf' in captured.out assert 'file.pbf' in captured.out
def test_import_osm_data_multifile(table_factory, tmp_path, osm2pgsql_options, capfd): def test_import_osm_data_multifile(place_row, tmp_path, osm2pgsql_options, capfd):
table_factory('place', content=((1, ), )) place_row()
osm2pgsql_options['osm2pgsql_cache'] = 0 osm2pgsql_options['osm2pgsql_cache'] = 0
files = [tmp_path / 'file1.osm', tmp_path / 'file2.osm'] files = [tmp_path / 'file1.osm', tmp_path / 'file2.osm']
@@ -107,22 +107,19 @@ def test_import_osm_data_multifile(table_factory, tmp_path, osm2pgsql_options, c
assert 'file2.osm' in captured.out assert 'file2.osm' in captured.out
def test_import_osm_data_simple_no_data(table_factory, osm2pgsql_options): def test_import_osm_data_simple_no_data(place_row, osm2pgsql_options):
table_factory('place')
with pytest.raises(UsageError, match='No data imported'): with pytest.raises(UsageError, match='No data imported'):
database_import.import_osm_data(Path('file.pbf'), osm2pgsql_options) database_import.import_osm_data(Path('file.pbf'), osm2pgsql_options)
def test_import_osm_data_simple_ignore_no_data(table_factory, osm2pgsql_options): def test_import_osm_data_simple_ignore_no_data(place_table, osm2pgsql_options):
table_factory('place')
database_import.import_osm_data(Path('file.pbf'), osm2pgsql_options, database_import.import_osm_data(Path('file.pbf'), osm2pgsql_options,
ignore_errors=True) ignore_errors=True)
def test_import_osm_data_drop(table_factory, temp_db_cursor, tmp_path, osm2pgsql_options): def test_import_osm_data_drop(place_row, table_factory, temp_db_cursor,
table_factory('place', content=((1, ), )) tmp_path, osm2pgsql_options):
place_row()
table_factory('planet_osm_nodes') table_factory('planet_osm_nodes')
flatfile = tmp_path / 'flatfile' flatfile = tmp_path / 'flatfile'
@@ -136,8 +133,8 @@ def test_import_osm_data_drop(table_factory, temp_db_cursor, tmp_path, osm2pgsql
assert not temp_db_cursor.table_exists('planet_osm_nodes') assert not temp_db_cursor.table_exists('planet_osm_nodes')
def test_import_osm_data_default_cache(table_factory, osm2pgsql_options, capfd): def test_import_osm_data_default_cache(place_row, osm2pgsql_options, capfd):
table_factory('place', content=((1, ), )) place_row()
osm2pgsql_options['osm2pgsql_cache'] = 0 osm2pgsql_options['osm2pgsql_cache'] = 0
@@ -215,52 +212,53 @@ async def test_load_data(dsn, place_row, placex_table, osmline_table,
class TestSetupSQL: class TestSetupSQL:
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def init_env(self, temp_db, tmp_path, def_config, sql_preprocessor_cfg): def osm2ppsql_skel(self, def_config, temp_db_with_extensions, place_row,
def_config.lib_dir.sql = tmp_path / 'sql' country_table, table_factory, temp_db_conn):
def_config.lib_dir.sql.mkdir()
self.config = def_config self.config = def_config
place_row()
table_factory('osm2pgsql_properties', 'property TEXT, value TEXT',
(('db_format', 2),))
def write_sql(self, fname, content): table_factory('planet_osm_rels', 'id BIGINT, members JSONB, tags JSONB')
(self.config.lib_dir.sql / fname).write_text(content, encoding='utf-8') temp_db_conn.execute("""
CREATE OR REPLACE FUNCTION planet_osm_member_ids(jsonb, character)
RETURNS bigint[] AS $$
SELECT array_agg((el->>'ref')::int8)
FROM jsonb_array_elements($1) AS el WHERE el->>'type' = $2
$$ LANGUAGE sql IMMUTABLE;
""")
@pytest.mark.parametrize("reverse", [True, False]) @pytest.mark.parametrize("reverse", [True, False])
def test_create_tables(self, temp_db_conn, temp_db_cursor, reverse): def test_create_tables(self, table_factory, temp_db_conn, temp_db_cursor, reverse):
self.write_sql('tables.sql', table_factory('country_osm_grid')
"""CREATE FUNCTION test() RETURNS bool
AS $$ SELECT {{db.reverse_only}} $$ LANGUAGE SQL""")
self.write_sql('grants.sql', "-- Mock grants file for testing\n")
database_import.create_tables(temp_db_conn, self.config, reverse) database_import.create_tables(temp_db_conn, self.config, reverse)
temp_db_cursor.scalar('SELECT test()') == reverse assert temp_db_cursor.table_exists('placex')
assert not reverse == temp_db_cursor.table_exists('search_name')
def test_create_table_triggers(self, temp_db_conn, temp_db_cursor): def test_create_table_triggers(self, temp_db_conn, placex_table, osmline_table,
self.write_sql('table-triggers.sql', postcode_table, load_sql):
"""CREATE FUNCTION test() RETURNS TEXT load_sql('functions.sql')
AS $$ SELECT 'a'::text $$ LANGUAGE SQL""")
database_import.create_table_triggers(temp_db_conn, self.config) database_import.create_table_triggers(temp_db_conn, self.config)
temp_db_cursor.scalar('SELECT test()') == 'a' def test_create_partition_tables(self, country_row, temp_db_conn, temp_db_cursor, load_sql):
for i in range(3):
def test_create_partition_tables(self, temp_db_conn, temp_db_cursor): country_row(partition=i)
self.write_sql('partition-tables.src.sql', load_sql('tables/location_area.sql')
"""CREATE FUNCTION test() RETURNS TEXT
AS $$ SELECT 'b'::text $$ LANGUAGE SQL""")
database_import.create_partition_tables(temp_db_conn, self.config) database_import.create_partition_tables(temp_db_conn, self.config)
temp_db_cursor.scalar('SELECT test()') == 'b' for i in range(3):
assert temp_db_cursor.table_exists(f"location_area_large_{i}")
assert temp_db_cursor.table_exists(f"search_name_{i}")
@pytest.mark.parametrize("drop", [True, False]) @pytest.mark.parametrize("drop", [True, False])
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_create_search_indices(self, temp_db_conn, temp_db_cursor, drop): async def test_create_search_indices(self, temp_db_conn, temp_db_cursor, drop, load_sql):
self.write_sql('indices.sql', load_sql('tables.sql', 'functions/ranking.sql')
"""CREATE FUNCTION test() RETURNS bool
AS $$ SELECT {{drop}} $$ LANGUAGE SQL""")
await database_import.create_search_indices(temp_db_conn, self.config, drop) await database_import.create_search_indices(temp_db_conn, self.config, drop)
temp_db_cursor.scalar('SELECT test()') == drop assert temp_db_cursor.index_exists('placex', 'idx_placex_geometry')
assert not drop == temp_db_cursor.index_exists('placex', 'idx_placex_geometry_buildings')

View File

@@ -11,73 +11,14 @@ import subprocess
import pytest import pytest
from psycopg.rows import tuple_row
from nominatim_db.tools import postcodes from nominatim_db.tools import postcodes
from nominatim_db.data import country_info from nominatim_db.data import country_info
from nominatim_db.db.sql_preprocessor import SQLPreprocessor
import dummy_tokenizer import dummy_tokenizer
class MockPostcodeTable:
""" A location_postcodes table for testing.
"""
def __init__(self, conn, config):
self.conn = conn
SQLPreprocessor(conn, config).run_sql_file(conn, 'functions/postcode_triggers.sql')
with conn.cursor() as cur:
cur.execute("""CREATE TABLE location_postcodes (
place_id BIGINT,
osm_id BIGINT,
parent_place_id BIGINT,
rank_search SMALLINT,
indexed_status SMALLINT,
indexed_date TIMESTAMP,
country_code varchar(2),
postcode TEXT,
geometry GEOMETRY(Geometry, 4326),
centroid GEOMETRY(Point, 4326))""")
cur.execute("""CREATE OR REPLACE FUNCTION token_normalized_postcode(postcode TEXT)
RETURNS TEXT AS $$ BEGIN RETURN postcode; END; $$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION get_country_code(place geometry)
RETURNS TEXT AS $$ BEGIN
RETURN null;
END; $$ LANGUAGE plpgsql;
""")
cur.execute("""CREATE OR REPLACE FUNCTION expand_by_meters(geom GEOMETRY, meters FLOAT)
RETURNS GEOMETRY AS $$
SELECT ST_Envelope(ST_Buffer(geom::geography, meters, 1)::geometry)
$$ LANGUAGE sql;""")
conn.commit()
def add(self, country, postcode, x, y):
with self.conn.cursor() as cur:
cur.execute(
"""INSERT INTO location_postcodes
(place_id, indexed_status, country_code, postcode, centroid, geometry)
VALUES (nextval('seq_place'), 1, %(cc)s, %(pc)s,
ST_SetSRID(ST_MakePoint(%(x)s, %(y)s), 4326),
ST_Expand(ST_SetSRID(ST_MakePoint(%(x)s, %(y)s), 4326), 0.005))""",
{'cc': country, 'pc': postcode, 'x': x, 'y': y})
self.conn.commit()
@property
def row_set(self):
with self.conn.cursor() as cur:
cur.execute("""SELECT osm_id, country_code, postcode,
ST_X(centroid), ST_Y(centroid)
FROM location_postcodes""")
return set((tuple(row) for row in cur))
@pytest.fixture
def postcode_table(def_config, temp_db_conn, placex_table, table_factory):
country_info.setup_country_config(def_config)
return MockPostcodeTable(temp_db_conn, def_config)
@pytest.fixture @pytest.fixture
def insert_implicit_postcode(placex_row, place_postcode_row): def insert_implicit_postcode(placex_row, place_postcode_row):
""" Insert data into the placex and place table """ Insert data into the placex and place table
@@ -86,11 +27,11 @@ def insert_implicit_postcode(placex_row, place_postcode_row):
def _insert_implicit_postcode(osm_id, country, geometry, postcode, in_placex=False): def _insert_implicit_postcode(osm_id, country, geometry, postcode, in_placex=False):
if in_placex: if in_placex:
placex_row(osm_id=osm_id, country=country, geom=geometry, placex_row(osm_id=osm_id, country=country, geom=geometry,
centroid=geometry, address={'postcode': postcode}) centroid=geometry,
address={'postcode': postcode})
else: else:
place_postcode_row(osm_id=osm_id, centroid=geometry, place_postcode_row(osm_id=osm_id, centroid=geometry,
country=country, postcode=postcode) country=country, postcode=postcode)
return _insert_implicit_postcode return _insert_implicit_postcode
@@ -103,7 +44,6 @@ def insert_postcode_area(place_postcode_row):
place_postcode_row(osm_type='R', osm_id=osm_id, postcode=postcode, country=country, place_postcode_row(osm_type='R', osm_id=osm_id, postcode=postcode, country=country,
centroid=f"POINT({x} {y})", centroid=f"POINT({x} {y})",
geom=f"POLYGON(({x1} {y1}, {x1} {y2}, {x2} {y2}, {x2} {y1}, {x1} {y1}))") geom=f"POLYGON(({x1} {y1}, {x1} {y2}, {x2} {y2}, {x2} {y1}, {x1} {y1}))")
return _do return _do
@@ -123,186 +63,198 @@ def postcode_update(dsn, temp_db_conn):
BEFORE INSERT ON location_postcodes BEFORE INSERT ON location_postcodes
FOR EACH ROW EXECUTE PROCEDURE postcodes_insert()""") FOR EACH ROW EXECUTE PROCEDURE postcodes_insert()""")
temp_db_conn.commit() temp_db_conn.commit()
postcodes.update_postcodes(dsn, data_path, tokenizer) postcodes.update_postcodes(dsn, data_path, tokenizer)
return _do return _do
def test_postcodes_empty(postcode_update, postcode_table, place_postcode_table): class TestPostcodes:
postcode_update() @pytest.fixture(autouse=True)
def setup(self, def_config, postcode_table, placex_table, place_postcode_table,
load_sql, temp_db_conn):
self.conn = temp_db_conn
country_info.setup_country_config(def_config)
load_sql('functions/postcode_triggers.sql')
assert not postcode_table.row_set temp_db_conn.execute("""
CREATE OR REPLACE FUNCTION token_normalized_postcode(postcode TEXT)
RETURNS TEXT AS $$
SELECT postcode
$$ LANGUAGE sql;
CREATE OR REPLACE FUNCTION get_country_code(place geometry)
RETURNS TEXT AS $$
SELECT NULL
$$ LANGUAGE sql;
@pytest.mark.parametrize('in_placex', [True, False]) CREATE OR REPLACE FUNCTION expand_by_meters(geom GEOMETRY, meters FLOAT)
def test_postcodes_add_new_point(postcode_update, postcode_table, RETURNS GEOMETRY AS $$
insert_implicit_postcode, in_placex): SELECT ST_Envelope(ST_Buffer(geom::geography, meters, 1)::geometry)
insert_implicit_postcode(1, 'xx', 'POINT(10 12)', '9486', in_placex) $$ LANGUAGE sql;
postcode_table.add('yy', '9486', 99, 34) """)
postcode_update() @property
def row_set(self):
with self.conn.cursor(row_factory=tuple_row) as cur:
cur.execute("""SELECT osm_id, country_code, postcode,
ST_X(centroid), ST_Y(centroid)
FROM location_postcodes""")
return {r for r in cur}
assert postcode_table.row_set == {(None, 'xx', '9486', 10, 12), } def test_postcodes_empty(self, postcode_update):
postcode_update()
assert not self.row_set
def test_postcodes_add_new_area(postcode_update, insert_postcode_area, postcode_table): @pytest.mark.parametrize('in_placex', [True, False])
insert_postcode_area(345, 'de', '10445', 23.5, 46.2) def test_postcodes_add_new_point(self, postcode_update, postcode_row,
insert_implicit_postcode, in_placex):
insert_implicit_postcode(1, 'xx', 'POINT(10 12)', '9486', in_placex)
postcode_row('yy', '9486', 99, 34)
postcode_update() postcode_update()
assert postcode_table.row_set == {(345, 'de', '10445', 23.5, 46.2)} assert self.row_set == {(None, 'xx', '9486', 10, 12), }
def test_postcodes_add_new_area(self, postcode_update, insert_postcode_area):
insert_postcode_area(345, 'de', '10445', 23.5, 46.2)
@pytest.mark.parametrize('in_placex', [True, False]) postcode_update()
def test_postcodes_add_area_and_point(postcode_update, insert_postcode_area,
insert_implicit_postcode, postcode_table, in_placex):
insert_implicit_postcode(1, 'xx', 'POINT(10 12)', '10445', in_placex)
insert_postcode_area(345, 'xx', '10445', 23.5, 46.2)
postcode_update() assert self.row_set == {(345, 'de', '10445', 23.5, 46.2)}
assert postcode_table.row_set == {(345, 'xx', '10445', 23.5, 46.2)} @pytest.mark.parametrize('in_placex', [True, False])
def test_postcodes_add_area_and_point(self, postcode_update, insert_postcode_area,
insert_implicit_postcode, in_placex):
insert_implicit_postcode(1, 'xx', 'POINT(10 12)', '10445', in_placex)
insert_postcode_area(345, 'xx', '10445', 23.5, 46.2)
postcode_update()
@pytest.mark.parametrize('in_placex', [True, False]) assert self.row_set == {(345, 'xx', '10445', 23.5, 46.2)}
def test_postcodes_add_point_within_area(postcode_update, insert_postcode_area,
insert_implicit_postcode, postcode_table, in_placex):
insert_implicit_postcode(1, 'xx', 'POINT(23.5 46.2)', '10446', in_placex)
insert_postcode_area(345, 'xx', '10445', 23.5, 46.2)
postcode_update() @pytest.mark.parametrize('in_placex', [True, False])
def test_postcodes_add_point_within_area(self, postcode_update, insert_postcode_area,
insert_implicit_postcode, in_placex):
insert_implicit_postcode(1, 'xx', 'POINT(23.5 46.2)', '10446', in_placex)
insert_postcode_area(345, 'xx', '10445', 23.5, 46.2)
assert postcode_table.row_set == {(345, 'xx', '10445', 23.5, 46.2)} postcode_update()
assert self.row_set == {(345, 'xx', '10445', 23.5, 46.2)}
@pytest.mark.parametrize('coords', [(99, 34), (10, 34), (99, 12), @pytest.mark.parametrize('coords', [(99, 34), (10, 34), (99, 12),
(9, 34), (9, 11), (23, 11)]) (9, 34), (9, 11), (23, 11)])
def test_postcodes_replace_coordinates(postcode_update, postcode_table, tmp_path, def test_postcodes_replace_coordinates(self, postcode_update, postcode_row, tmp_path,
insert_implicit_postcode, coords): insert_implicit_postcode, coords):
insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511') insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
postcode_table.add('xx', 'AB 4511', *coords) postcode_row('xx', 'AB 4511', *coords)
postcode_update(tmp_path) postcode_update(tmp_path)
assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 12)} assert self.row_set == {(None, 'xx', 'AB 4511', 10, 12)}
def test_postcodes_replace_coordinates_close(self, postcode_update, postcode_row,
insert_implicit_postcode):
insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
postcode_row('xx', 'AB 4511', 10, 11.99999999)
def test_postcodes_replace_coordinates_close(postcode_update, postcode_table, postcode_update()
insert_implicit_postcode):
insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
postcode_table.add('xx', 'AB 4511', 10, 11.99999999)
postcode_update() assert self.row_set == {(None, 'xx', 'AB 4511', 10, 11.99999999)}
assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 11.99999999)} def test_postcodes_remove_point(self, postcode_update, postcode_row,
insert_implicit_postcode):
insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
postcode_row('xx', 'badname', 10, 12)
postcode_update()
def test_postcodes_remove_point(postcode_update, postcode_table, assert self.row_set == {(None, 'xx', 'AB 4511', 10, 12)}
insert_implicit_postcode):
insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
postcode_table.add('xx', 'badname', 10, 12)
postcode_update() def test_postcodes_ignore_empty_country(self, postcode_update, insert_implicit_postcode):
insert_implicit_postcode(1, None, 'POINT(10 12)', 'AB 4511')
postcode_update()
assert not self.row_set
assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 12)} def test_postcodes_remove_all(self, postcode_update, postcode_row, place_postcode_table):
postcode_row('ch', '5613', 10, 12)
postcode_update()
assert not self.row_set
def test_postcodes_ignore_empty_country(postcode_update, postcode_table, def test_postcodes_multi_country(self, postcode_update,
insert_implicit_postcode):
insert_implicit_postcode(1, None, 'POINT(10 12)', 'AB 4511')
postcode_update()
assert not postcode_table.row_set
def test_postcodes_remove_all(postcode_update, postcode_table, place_postcode_table):
postcode_table.add('ch', '5613', 10, 12)
postcode_update()
assert not postcode_table.row_set
def test_postcodes_multi_country(postcode_update, postcode_table,
insert_implicit_postcode):
insert_implicit_postcode(1, 'de', 'POINT(10 12)', '54451')
insert_implicit_postcode(2, 'cc', 'POINT(100 56)', 'DD23 T')
insert_implicit_postcode(3, 'de', 'POINT(10.3 11.0)', '54452')
insert_implicit_postcode(4, 'cc', 'POINT(10.3 11.0)', '54452')
postcode_update()
assert postcode_table.row_set == {(None, 'de', '54451', 10, 12),
(None, 'de', '54452', 10.3, 11.0),
(None, 'cc', '54452', 10.3, 11.0),
(None, 'cc', 'DD23 T', 100, 56)}
@pytest.mark.parametrize("gzipped", [True, False])
def test_postcodes_extern(postcode_update, postcode_table, tmp_path,
insert_implicit_postcode, gzipped):
insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
extfile = tmp_path / 'xx_postcodes.csv'
extfile.write_text("postcode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10", encoding='utf-8')
if gzipped:
subprocess.run(['gzip', str(extfile)])
assert not extfile.is_file()
postcode_update(tmp_path)
assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 12),
(None, 'xx', 'CD 4511', -10, -5)}
def test_postcodes_extern_bad_column(postcode_update, postcode_table, tmp_path,
insert_implicit_postcode): insert_implicit_postcode):
insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511') insert_implicit_postcode(1, 'de', 'POINT(10 12)', '54451')
insert_implicit_postcode(2, 'cc', 'POINT(100 56)', 'DD23 T')
insert_implicit_postcode(3, 'de', 'POINT(10.3 11.0)', '54452')
insert_implicit_postcode(4, 'cc', 'POINT(10.3 11.0)', '54452')
extfile = tmp_path / 'xx_postcodes.csv' postcode_update()
extfile.write_text("postode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10", encoding='utf-8')
postcode_update(tmp_path) assert self.row_set == {(None, 'de', '54451', 10, 12),
(None, 'de', '54452', 10.3, 11.0),
(None, 'cc', '54452', 10.3, 11.0),
(None, 'cc', 'DD23 T', 100, 56)}
assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 12)} @pytest.mark.parametrize("gzipped", [True, False])
def test_postcodes_extern(self, postcode_update, tmp_path,
insert_implicit_postcode, gzipped):
insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
extfile = tmp_path / 'xx_postcodes.csv'
extfile.write_text("postcode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10", encoding='utf-8')
def test_postcodes_extern_bad_number(postcode_update, insert_implicit_postcode, if gzipped:
postcode_table, tmp_path): subprocess.run(['gzip', str(extfile)])
insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511') assert not extfile.is_file()
extfile = tmp_path / 'xx_postcodes.csv' postcode_update(tmp_path)
extfile.write_text(
"postcode,lat,lon\nXX 4511,-4,NaN\nCD 4511,-5, -10\n34,200,0", encoding='utf-8')
postcode_update(tmp_path) assert self.row_set == {(None, 'xx', 'AB 4511', 10, 12),
(None, 'xx', 'CD 4511', -10, -5)}
assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 12), def test_postcodes_extern_bad_column(self, postcode_update, tmp_path,
(None, 'xx', 'CD 4511', -10, -5)} insert_implicit_postcode):
insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
extfile = tmp_path / 'xx_postcodes.csv'
extfile.write_text("postode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10", encoding='utf-8')
postcode_update(tmp_path)
assert self.row_set == {(None, 'xx', 'AB 4511', 10, 12)}
def test_postcodes_extern_bad_number(self, postcode_update, insert_implicit_postcode,
tmp_path):
insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
extfile = tmp_path / 'xx_postcodes.csv'
extfile.write_text(
"postcode,lat,lon\nXX 4511,-4,NaN\nCD 4511,-5, -10\n34,200,0", encoding='utf-8')
postcode_update(tmp_path)
assert self.row_set == {(None, 'xx', 'AB 4511', 10, 12),
(None, 'xx', 'CD 4511', -10, -5)}
def test_no_placex_entry(self, postcode_update, temp_db_cursor, place_postcode_row):
# Rewrite the get_country_code function to verify its execution.
temp_db_cursor.execute("""
CREATE OR REPLACE FUNCTION get_country_code(place geometry) RETURNS TEXT AS $$
SELECT 'yy' $$ LANGUAGE sql""")
place_postcode_row(centroid='POINT(10 12)', postcode='AB 4511')
postcode_update()
assert self.row_set == {(None, 'yy', 'AB 4511', 10, 12)}
def test_discard_badly_formatted_postcodes(self, postcode_update, place_postcode_row):
place_postcode_row(centroid='POINT(10 12)', country='fr', postcode='AB 4511')
postcode_update()
assert not self.row_set
def test_can_compute(dsn, table_factory): def test_can_compute(dsn, table_factory):
assert not postcodes.can_compute(dsn) assert not postcodes.can_compute(dsn)
table_factory('place_postcode') table_factory('place_postcode')
assert postcodes.can_compute(dsn) assert postcodes.can_compute(dsn)
def test_no_placex_entry(postcode_update, temp_db_cursor, place_postcode_row, postcode_table):
# Rewrite the get_country_code function to verify its execution.
temp_db_cursor.execute("""
CREATE OR REPLACE FUNCTION get_country_code(place geometry)
RETURNS TEXT AS $$ BEGIN
RETURN 'yy';
END; $$ LANGUAGE plpgsql;
""")
place_postcode_row(centroid='POINT(10 12)', postcode='AB 4511')
postcode_update()
assert postcode_table.row_set == {(None, 'yy', 'AB 4511', 10, 12)}
def test_discard_badly_formatted_postcodes(postcode_update, place_postcode_row, postcode_table):
place_postcode_row(centroid='POINT(10 12)', country='fr', postcode='AB 4511')
postcode_update()
assert not postcode_table.row_set