generalize filter for sanitizers

This commit is contained in:
biswajit-k
2023-03-11 13:52:07 +05:30
parent a8bedb6ab9
commit 8f03c80ce8
6 changed files with 110 additions and 77 deletions

View File

@@ -25,7 +25,6 @@ Arguments:
expression that must match the full house number value. expression that must match the full house number value.
""" """
from typing import Callable, Iterator, List from typing import Callable, Iterator, List
import re
from nominatim.tokenizer.sanitizers.base import ProcessInfo from nominatim.tokenizer.sanitizers.base import ProcessInfo
from nominatim.data.place_name import PlaceName from nominatim.data.place_name import PlaceName
@@ -34,12 +33,10 @@ from nominatim.tokenizer.sanitizers.config import SanitizerConfig
class _HousenumberSanitizer: class _HousenumberSanitizer:
def __init__(self, config: SanitizerConfig) -> None: def __init__(self, config: SanitizerConfig) -> None:
self.filter_kind = config.get_filter_kind('housenumber') self.filter_kind = config.get_filter('filter-kind', ['housenumber'])
self.split_regexp = config.get_delimiter() self.split_regexp = config.get_delimiter()
nameregexps = config.get_string_list('convert-to-name', []) self.filter_name = config.get_filter('convert-to-name', 'FAIL_ALL')
self.is_name_regexp = [re.compile(r) for r in nameregexps]
def __call__(self, obj: ProcessInfo) -> None: def __call__(self, obj: ProcessInfo) -> None:
@@ -49,7 +46,7 @@ class _HousenumberSanitizer:
new_address: List[PlaceName] = [] new_address: List[PlaceName] = []
for item in obj.address: for item in obj.address:
if self.filter_kind(item.kind): if self.filter_kind(item.kind):
if self._treat_as_name(item.name): if self.filter_name(item.name):
obj.names.append(item.clone(kind='housenumber')) obj.names.append(item.clone(kind='housenumber'))
else: else:
new_address.extend(item.clone(kind='housenumber', name=n) new_address.extend(item.clone(kind='housenumber', name=n)
@@ -76,10 +73,6 @@ class _HousenumberSanitizer:
yield hnr yield hnr
def _treat_as_name(self, housenumber: str) -> bool:
return any(r.fullmatch(housenumber) is not None for r in self.is_name_regexp)
def create(config: SanitizerConfig) -> Callable[[ProcessInfo], None]: def create(config: SanitizerConfig) -> Callable[[ProcessInfo], None]:
""" Create a housenumber processing function. """ Create a housenumber processing function.
""" """

View File

@@ -7,7 +7,7 @@
""" """
Configuration for Sanitizers. Configuration for Sanitizers.
""" """
from typing import Sequence, Optional, Pattern, Callable, Any, TYPE_CHECKING from typing import Sequence, Union, Optional, Pattern, Callable, Any, TYPE_CHECKING
from collections import UserDict from collections import UserDict
import re import re
@@ -33,7 +33,11 @@ class SanitizerConfig(_BaseUserDict):
Arguments: Arguments:
param: Name of the configuration parameter. param: Name of the configuration parameter.
default: Value to return, when the parameter is missing. default: Takes a tuple or list of strings which will
be returned if the parameter is missing in the
sanitizer configuration.
Note that if this default parameter is not
provided then an empty list is returned.
Returns: Returns:
If the parameter value is a simple string, it is returned as a If the parameter value is a simple string, it is returned as a
@@ -44,7 +48,7 @@ class SanitizerConfig(_BaseUserDict):
values = self.data.get(param, None) values = self.data.get(param, None)
if values is None: if values is None:
return None if default is None else list(default) return list(default)
if isinstance(values, str): if isinstance(values, str):
return [values] if values else [] return [values] if values else []
@@ -74,7 +78,7 @@ class SanitizerConfig(_BaseUserDict):
value = self.data.get(param, default) value = self.data.get(param, default)
if not isinstance(value, bool): if not isinstance(value, bool):
raise UsageError(f"Parameter '{param}' must be a boolean value ('yes' or 'no'.") raise UsageError(f"Parameter '{param}' must be a boolean value ('yes' or 'no').")
return value return value
@@ -102,30 +106,46 @@ class SanitizerConfig(_BaseUserDict):
return re.compile('\\s*[{}]+\\s*'.format(''.join('\\' + d for d in delimiter_set))) return re.compile('\\s*[{}]+\\s*'.format(''.join('\\' + d for d in delimiter_set)))
def get_filter_kind(self, *default: str) -> Callable[[str], bool]: def get_filter(self, param: str, default: Union[str, Sequence[str]] = 'PASS_ALL'
""" Return a filter function for the name kind from the 'filter-kind' ) -> Callable[[str], bool]:
config parameter. """ Returns a filter function for the given parameter of the sanitizer
configuration.
If the 'filter-kind' parameter is empty, the filter lets all items The value provided for the parameter in sanitizer configuration
pass. If the parameter is a string, it is interpreted as a single should be a string or list of strings, where each string is a regular
regular expression that must match the full kind string. expression. These regular expressions will later be used by the
If the parameter is a list then filter function to filter strings.
any of the regular expressions in the list must match to pass.
Arguments: Arguments:
default: Filters to be used, when the 'filter-kind' parameter param: The parameter for which the filter function
is not specified. If omitted then the default is to will be created.
let all names pass. default: Defines the behaviour of filter function if
parameter is missing in the sanitizer configuration.
Takes a string(PASS_ALL or FAIL_ALL) or a list of strings.
Any other value of string or an empty list is not allowed,
and will raise a ValueError. If the value is PASS_ALL, the filter
function will let all strings to pass, if the value is FAIL_ALL,
filter function will let no strings to pass.
If value provided is a list of strings each string
is treated as a regular expression. In this case these regular
expressions will be used by the filter function.
By default allow filter function to let all strings pass.
Returns: Returns:
A filter function which takes a name string and returns A filter function that takes a target string as the argument and
True when the item passes the filter. returns True if it fully matches any of the regular expressions
otherwise returns False.
""" """
filters = self.get_string_list('filter-kind', default) filters = self.get_string_list(param) or default
if not filters: if filters == 'PASS_ALL':
return lambda _: True return lambda _: True
if filters == 'FAIL_ALL':
return lambda _: False
regexes = [re.compile(regex) for regex in filters] if filters and isinstance(filters, (list, tuple)):
regexes = [re.compile(regex) for regex in filters]
return lambda target: any(regex.fullmatch(target) for regex in regexes)
return lambda name: any(regex.fullmatch(name) for regex in regexes) raise ValueError("Default parameter must be a non-empty list or a string value \
('PASS_ALL' or 'FAIL_ALL').")

View File

@@ -54,8 +54,7 @@ Arguments:
""" """
from typing import Callable, List, Optional, Pattern, Tuple, Sequence from typing import Callable, List, Tuple, Sequence
import re
from nominatim.tokenizer.sanitizers.base import ProcessInfo from nominatim.tokenizer.sanitizers.base import ProcessInfo
from nominatim.data.place_name import PlaceName from nominatim.data.place_name import PlaceName
@@ -65,37 +64,33 @@ class _TagSanitizer:
def __init__(self, config: SanitizerConfig) -> None: def __init__(self, config: SanitizerConfig) -> None:
self.type = config.get('type', 'name') self.type = config.get('type', 'name')
self.filter_kind = config.get_filter_kind() self.filter_kind = config.get_filter('filter-kind')
self.country_codes = config.get_string_list('country_code', []) self.country_codes = config.get_string_list('country_code', [])
self.allowed_ranks = self._set_allowed_ranks( \ self.filter_suffix = config.get_filter('suffix')
config.get_string_list('rank_address', ['0-30'])) self.filter_name = config.get_filter('name')
self.allowed_ranks = self._set_allowed_ranks(
config.get_string_list("rank_address", ["0-30"])
)
self.has_country_code = config.get('country_code', None) is not None self.has_country_code = config.get('country_code', None) is not None
suffixregexps = config.get_string_list('suffix', [r'[\s\S]*'])
self.suffix_regexp = [re.compile(r) for r in suffixregexps]
nameregexps = config.get_string_list('name', [r'[\s\S]*'])
self.name_regexp = [re.compile(r) for r in nameregexps]
def __call__(self, obj: ProcessInfo) -> None: def __call__(self, obj: ProcessInfo) -> None:
tags = obj.names if self.type == 'name' else obj.address tags = obj.names if self.type == 'name' else obj.address
if (not tags or if not tags \
self.has_country_code and or not self.allowed_ranks[obj.place.rank_address] \
obj.place.country_code not in self.country_codes or or self.has_country_code \
not self.allowed_ranks[obj.place.rank_address]): and obj.place.country_code not in self.country_codes:
return return
filtered_tags: List[PlaceName] = [] filtered_tags: List[PlaceName] = []
for tag in tags: for tag in tags:
if (not self.filter_kind(tag.kind) or if not self.filter_kind(tag.kind) \
not self._matches(tag.suffix, self.suffix_regexp) or or not self.filter_suffix(tag.suffix or '') \
not self._matches(tag.name, self.name_regexp)): or not self.filter_name(tag.name):
filtered_tags.append(tag) filtered_tags.append(tag)
@@ -117,7 +112,7 @@ class _TagSanitizer:
for rank in ranks: for rank in ranks:
intvl = [int(x) for x in rank.split('-')] intvl = [int(x) for x in rank.split('-')]
start, end = (intvl[0], intvl[0]) if len(intvl) == 1 else (intvl[0], intvl[1]) start, end = intvl[0], intvl[0] if len(intvl) == 1 else intvl[1]
for i in range(start, end + 1): for i in range(start, end + 1):
allowed_ranks[i] = True allowed_ranks[i] = True
@@ -126,17 +121,6 @@ class _TagSanitizer:
return tuple(allowed_ranks) return tuple(allowed_ranks)
def _matches(self, value: Optional[str], patterns: List[Pattern[str]]) -> bool:
""" Returns True if the given value fully matches any of the regular
expression pattern in the list. Otherwise, returns False.
Note that if the value is None, it is taken as an empty string.
"""
target = '' if value is None else value
return any(r.fullmatch(target) is not None for r in patterns)
def create(config: SanitizerConfig) -> Callable[[ProcessInfo], None]: def create(config: SanitizerConfig) -> Callable[[ProcessInfo], None]:
""" Create a function to process removal of certain tags. """ Create a function to process removal of certain tags.
""" """

View File

@@ -41,7 +41,7 @@ class _AnalyzerByLanguage:
""" """
def __init__(self, config: SanitizerConfig) -> None: def __init__(self, config: SanitizerConfig) -> None:
self.filter_kind = config.get_filter_kind() self.filter_kind = config.get_filter('filter-kind')
self.replace = config.get('mode', 'replace') != 'append' self.replace = config.get('mode', 'replace') != 'append'
self.whitelist = config.get('whitelist') self.whitelist = config.get('whitelist')

View File

@@ -302,7 +302,7 @@ class TestAllParameters:
def test_list_arguments_pass(self): def test_list_arguments_pass(self):
res = self.run_sanitizer_on(['de', 'in'], ['20-28', '30'], [r'abc.*', r'[\s\S]*'], res = self.run_sanitizer_on(['de', 'in'], ['20-28', '30'], [r'abc.*', r'[\s\S]*'],
name='foo', ref_abc='foo', name_abcxx='bar', ref_pqr='baz') name='foo', ref='foo', name_abcxx='bar', ref_pqr='baz')
assert res == [] assert res == []
@@ -315,7 +315,7 @@ class TestAllParameters:
def test_mix_arguments_pass(self): def test_mix_arguments_pass(self):
res = self.run_sanitizer_on('de', ['10', '20-28', '30'], r'[\s\S]*', res = self.run_sanitizer_on('de', ['10', '20-28', '30'], r'[\s\S]*',
name='foo', ref_abc='foo', name_abcxx='bar', ref_pqr='baz') name_abc='foo', ref_abc='foo', name_abcxx='bar', ref_pqr='baz')
assert res == [] assert res == []

View File

@@ -10,17 +10,12 @@ Tests for sanitizer configuration helper functions.
import pytest import pytest
from nominatim.errors import UsageError from nominatim.errors import UsageError
from nominatim.tokenizer.place_sanitizer import PlaceName
from nominatim.tokenizer.sanitizers.config import SanitizerConfig from nominatim.tokenizer.sanitizers.config import SanitizerConfig
def test_string_list_default_empty(): def test_string_list_default_empty():
assert SanitizerConfig().get_string_list('op') == [] assert SanitizerConfig().get_string_list('op') == []
def test_string_list_default_none():
assert SanitizerConfig().get_string_list('op', default=None) is None
def test_string_list_default_something(): def test_string_list_default_something():
assert SanitizerConfig().get_string_list('op', default=['a', 'b']) == ['a', 'b'] assert SanitizerConfig().get_string_list('op', default=['a', 'b']) == ['a', 'b']
@@ -78,36 +73,77 @@ def test_create_split_regex_empty_delimiter():
regex = SanitizerConfig({'delimiters': ''}).get_delimiter() regex = SanitizerConfig({'delimiters': ''}).get_delimiter()
@pytest.mark.parametrize('inp', ('name', 'name:de', 'na\\me', '.*')) @pytest.mark.parametrize('inp', ('name', 'name:de', 'na\\me', '.*', ''))
def test_create_kind_filter_no_params(inp): def test_create_name_filter_no_param_no_default(inp):
filt = SanitizerConfig().get_filter_kind() filt = SanitizerConfig({'filter-kind': 'place'}).get_filter('name')
assert filt(inp) assert filt(inp)
@pytest.mark.parametrize('inp', ('name', 'name:de', 'na\\me', '.*', ''))
def test_create_name_filter_no_param_default_pass_all(inp):
filt = SanitizerConfig().get_filter('name', 'PASS_ALL')
assert filt(inp)
@pytest.mark.parametrize('inp', ('name', 'name:de', 'na\\me', '.*', ''))
def test_create_name_filter_no_param_default_fail_all(inp):
filt = SanitizerConfig().get_filter('name', 'FAIL_ALL')
assert not filt(inp)
def test_create_name_filter_no_param_default_invalid_string():
with pytest.raises(ValueError):
filt = SanitizerConfig().get_filter('name', 'abc')
def test_create_name_filter_no_param_default_empty_list():
with pytest.raises(ValueError):
filt = SanitizerConfig().get_filter('name', [])
@pytest.mark.parametrize('kind', ('de', 'name:de', 'ende')) @pytest.mark.parametrize('kind', ('de', 'name:de', 'ende'))
def test_create_kind_filter_default_positive(kind):
filt = SanitizerConfig().get_filter('filter-kind', ['.*de'])
assert filt(kind)
@pytest.mark.parametrize('kind', ('de', 'name:de', 'ende'))
def test_create_kind_filter_default_negetive(kind):
filt = SanitizerConfig().get_filter('filter-kind', ['.*fr'])
assert not filt(kind)
@pytest.mark.parametrize('kind', ('lang', 'lang:de', 'langxx'))
def test_create_kind_filter_custom_regex_positive(kind): def test_create_kind_filter_custom_regex_positive(kind):
filt = SanitizerConfig({'filter-kind': '.*de'}).get_filter_kind() filt = SanitizerConfig({'filter-kind': 'lang.*'}
).get_filter('filter-kind', ['.*fr'])
assert filt(kind) assert filt(kind)
@pytest.mark.parametrize('kind', ('de ', '123', '', 'bedece')) @pytest.mark.parametrize('kind', ('de ', '123', '', 'bedece'))
def test_create_kind_filter_custom_regex_negative(kind): def test_create_kind_filter_custom_regex_negative(kind):
filt = SanitizerConfig({'filter-kind': '.*de'}).get_filter_kind() filt = SanitizerConfig({'filter-kind': '.*de'}).get_filter('filter-kind')
assert not filt(kind) assert not filt(kind)
@pytest.mark.parametrize('kind', ('name', 'fr', 'name:fr', 'frfr', '34')) @pytest.mark.parametrize('kind', ('name', 'fr', 'name:fr', 'frfr', '34'))
def test_create_kind_filter_many_positive(kind): def test_create_kind_filter_many_positive(kind):
filt = SanitizerConfig({'filter-kind': ['.*fr', 'name', r'\d+']}).get_filter_kind() filt = SanitizerConfig({'filter-kind': ['.*fr', 'name', r'\d+']}
).get_filter('filter-kind')
assert filt(kind) assert filt(kind)
@pytest.mark.parametrize('kind', ('name:de', 'fridge', 'a34', '.*', '\\')) @pytest.mark.parametrize('kind', ('name:de', 'fridge', 'a34', '.*', '\\'))
def test_create_kind_filter_many_negative(kind): def test_create_kind_filter_many_negative(kind):
filt = SanitizerConfig({'filter-kind': ['.*fr', 'name', r'\d+']}).get_filter_kind() filt = SanitizerConfig({'filter-kind': ['.*fr', 'name', r'\d+']}
).get_filter('filter-kind')
assert not filt(kind) assert not filt(kind)