forked from hans/Nominatim
add postcode parser
This commit is contained in:
@@ -25,6 +25,7 @@ from ..logging import log
|
||||
from . import query as qmod
|
||||
from ..query_preprocessing.config import QueryConfig
|
||||
from .query_analyzer_factory import AbstractQueryAnalyzer
|
||||
from .postcode_parser import PostcodeParser
|
||||
|
||||
|
||||
DB_TO_TOKEN_TYPE = {
|
||||
@@ -117,6 +118,7 @@ class ICUQueryAnalyzer(AbstractQueryAnalyzer):
|
||||
"""
|
||||
def __init__(self, conn: SearchConnection) -> None:
|
||||
self.conn = conn
|
||||
self.postcode_parser = PostcodeParser(conn.config)
|
||||
|
||||
async def setup(self) -> None:
|
||||
""" Set up static data structures needed for the analysis.
|
||||
@@ -199,6 +201,11 @@ class ICUQueryAnalyzer(AbstractQueryAnalyzer):
|
||||
query.add_token(trange, DB_TO_TOKEN_TYPE[row.type], token)
|
||||
|
||||
self.add_extra_tokens(query)
|
||||
for start, end, pc in self.postcode_parser.parse(query):
|
||||
query.add_token(qmod.TokenRange(start, end),
|
||||
qmod.TOKEN_POSTCODE,
|
||||
ICUToken(penalty=0.1, token=0, count=1, addr_count=1,
|
||||
lookup_word=pc, word_token=pc, info=None))
|
||||
self.rerank_tokens(query)
|
||||
|
||||
log().table_dump('Word tokens', _dump_word_tokens(query))
|
||||
@@ -240,9 +247,13 @@ class ICUQueryAnalyzer(AbstractQueryAnalyzer):
|
||||
async def lookup_in_db(self, words: List[str]) -> 'sa.Result[Any]':
|
||||
""" Return the token information from the database for the
|
||||
given word tokens.
|
||||
|
||||
This function excludes postcode tokens
|
||||
"""
|
||||
t = self.conn.t.meta.tables['word']
|
||||
return await self.conn.execute(t.select().where(t.c.word_token.in_(words)))
|
||||
return await self.conn.execute(t.select()
|
||||
.where(t.c.word_token.in_(words))
|
||||
.where(t.c.type != 'P'))
|
||||
|
||||
def add_extra_tokens(self, query: qmod.QueryStruct) -> None:
|
||||
""" Add tokens to query that are not saved in the database.
|
||||
|
||||
81
src/nominatim_api/search/postcode_parser.py
Normal file
81
src/nominatim_api/search/postcode_parser.py
Normal file
@@ -0,0 +1,81 @@
|
||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||
#
|
||||
# This file is part of Nominatim. (https://nominatim.org)
|
||||
#
|
||||
# Copyright (C) 2025 by the Nominatim developer community.
|
||||
# For a full list of authors see the git log.
|
||||
"""
|
||||
Handling of arbitrary postcode tokens in tokenized query string.
|
||||
"""
|
||||
from typing import Tuple, Set
|
||||
import re
|
||||
from collections import defaultdict
|
||||
|
||||
import yaml
|
||||
|
||||
from ..config import Configuration
|
||||
from . import query as qmod
|
||||
|
||||
|
||||
class PostcodeParser:
|
||||
""" Pattern-based parser for postcodes in tokenized queries.
|
||||
|
||||
The postcode patterns are read from the country configuration.
|
||||
The parser does currently not return country restrictions.
|
||||
"""
|
||||
|
||||
def __init__(self, config: Configuration) -> None:
|
||||
# skip over includes here to avoid loading the complete country name data
|
||||
yaml.add_constructor('!include', lambda loader, node: [],
|
||||
Loader=yaml.SafeLoader)
|
||||
cdata = yaml.safe_load(config.find_config_file('country_settings.yaml')
|
||||
.read_text(encoding='utf-8'))
|
||||
|
||||
unique_patterns = defaultdict(set)
|
||||
for cc, data in cdata.items():
|
||||
if data.get('postcode'):
|
||||
pat = data['postcode']['pattern']
|
||||
out = data['postcode'].get('output')
|
||||
unique_patterns[pat.replace('d', '[0-9]').replace('l', '[a-z]')].add(out)
|
||||
|
||||
self.global_pattern = re.compile(
|
||||
'(?:' +
|
||||
'|'.join(f"(?:{k})" for k in unique_patterns)
|
||||
+ ')[:, >]')
|
||||
|
||||
self.local_patterns = [(re.compile(f"(?:{k})[:, >]"), v)
|
||||
for k, v in unique_patterns.items()]
|
||||
|
||||
def parse(self, query: qmod.QueryStruct) -> Set[Tuple[int, int, str]]:
|
||||
""" Parse postcodes in the given list of query tokens taking into
|
||||
account the list of breaks from the nodes.
|
||||
|
||||
The result is a sequence of tuples with
|
||||
[start node id, end node id, postcode token]
|
||||
"""
|
||||
nodes = query.nodes
|
||||
outcodes = set()
|
||||
|
||||
for i in range(query.num_token_slots()):
|
||||
if nodes[i].btype in '<,: ' and nodes[i + 1].btype != '`':
|
||||
word = nodes[i + 1].term_normalized + nodes[i + 1].btype
|
||||
if word[-1] in ' -' and nodes[i + 2].btype != '`':
|
||||
word += nodes[i + 2].term_normalized + nodes[i + 2].btype
|
||||
if word[-1] in ' -' and nodes[i + 3].btype != '`':
|
||||
word += nodes[i + 3].term_normalized + nodes[i + 3].btype
|
||||
|
||||
# Use global pattern to check for presence of any postocde.
|
||||
m = self.global_pattern.match(word)
|
||||
if m:
|
||||
# If there was a match, check against each pattern separately
|
||||
# because multiple patterns might be machting at the end.
|
||||
for pattern, info in self.local_patterns:
|
||||
lm = pattern.match(word)
|
||||
if lm:
|
||||
trange = (i, i + sum(c in ' ,-:>' for c in lm.group(0)))
|
||||
for out in info:
|
||||
if out:
|
||||
outcodes.add((*trange, lm.expand(out).upper()))
|
||||
else:
|
||||
outcodes.add((*trange, lm.group(0)[:-1].upper()))
|
||||
return outcodes
|
||||
Reference in New Issue
Block a user