replace PhraseType enum with simple int constants

This commit is contained in:
Sarah Hoffmann
2025-02-21 16:44:12 +01:00
parent 31412e0674
commit 49bd18b048
10 changed files with 186 additions and 179 deletions

View File

@@ -26,7 +26,7 @@ from .connection import SearchConnection
from .status import get_status, StatusResult
from .lookup import get_places, get_detailed_place
from .reverse import ReverseGeocoder
from .search import ForwardGeocoder, Phrase, PhraseType, make_query_analyzer
from . import search as nsearch
from . import types as ntyp
from .results import DetailedResult, ReverseResult, SearchResults
@@ -207,7 +207,7 @@ class NominatimAPIAsync:
async with self.begin() as conn:
conn.set_query_timeout(self.query_timeout)
if details.keywords:
await make_query_analyzer(conn)
await nsearch.make_query_analyzer(conn)
return await get_detailed_place(conn, place, details)
async def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
@@ -219,7 +219,7 @@ class NominatimAPIAsync:
async with self.begin() as conn:
conn.set_query_timeout(self.query_timeout)
if details.keywords:
await make_query_analyzer(conn)
await nsearch.make_query_analyzer(conn)
return await get_places(conn, places, details)
async def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
@@ -237,7 +237,7 @@ class NominatimAPIAsync:
async with self.begin() as conn:
conn.set_query_timeout(self.query_timeout)
if details.keywords:
await make_query_analyzer(conn)
await nsearch.make_query_analyzer(conn)
geocoder = ReverseGeocoder(conn, details,
self.reverse_restrict_to_country_area)
return await geocoder.lookup(coord)
@@ -251,10 +251,10 @@ class NominatimAPIAsync:
async with self.begin() as conn:
conn.set_query_timeout(self.query_timeout)
geocoder = ForwardGeocoder(conn, ntyp.SearchDetails.from_kwargs(params),
self.config.get_int('REQUEST_TIMEOUT')
if self.config.REQUEST_TIMEOUT else None)
phrases = [Phrase(PhraseType.NONE, p.strip()) for p in query.split(',')]
geocoder = nsearch.ForwardGeocoder(conn, ntyp.SearchDetails.from_kwargs(params),
self.config.get_int('REQUEST_TIMEOUT')
if self.config.REQUEST_TIMEOUT else None)
phrases = [nsearch.Phrase(nsearch.PHRASE_ANY, p.strip()) for p in query.split(',')]
return await geocoder.lookup(phrases)
async def search_address(self, amenity: Optional[str] = None,
@@ -271,22 +271,22 @@ class NominatimAPIAsync:
conn.set_query_timeout(self.query_timeout)
details = ntyp.SearchDetails.from_kwargs(params)
phrases: List[Phrase] = []
phrases: List[nsearch.Phrase] = []
if amenity:
phrases.append(Phrase(PhraseType.AMENITY, amenity))
phrases.append(nsearch.Phrase(nsearch.PHRASE_AMENITY, amenity))
if street:
phrases.append(Phrase(PhraseType.STREET, street))
phrases.append(nsearch.Phrase(nsearch.PHRASE_STREET, street))
if city:
phrases.append(Phrase(PhraseType.CITY, city))
phrases.append(nsearch.Phrase(nsearch.PHRASE_CITY, city))
if county:
phrases.append(Phrase(PhraseType.COUNTY, county))
phrases.append(nsearch.Phrase(nsearch.PHRASE_COUNTY, county))
if state:
phrases.append(Phrase(PhraseType.STATE, state))
phrases.append(nsearch.Phrase(nsearch.PHRASE_STATE, state))
if postalcode:
phrases.append(Phrase(PhraseType.POSTCODE, postalcode))
phrases.append(nsearch.Phrase(nsearch.PHRASE_POSTCODE, postalcode))
if country:
phrases.append(Phrase(PhraseType.COUNTRY, country))
phrases.append(nsearch.Phrase(nsearch.PHRASE_COUNTRY, country))
if not phrases:
raise UsageError('Nothing to search for.')
@@ -309,9 +309,9 @@ class NominatimAPIAsync:
if amenity:
details.layers |= ntyp.DataLayer.POI
geocoder = ForwardGeocoder(conn, details,
self.config.get_int('REQUEST_TIMEOUT')
if self.config.REQUEST_TIMEOUT else None)
geocoder = nsearch.ForwardGeocoder(conn, details,
self.config.get_int('REQUEST_TIMEOUT')
if self.config.REQUEST_TIMEOUT else None)
return await geocoder.lookup(phrases)
async def search_category(self, categories: List[Tuple[str, str]],
@@ -328,15 +328,15 @@ class NominatimAPIAsync:
async with self.begin() as conn:
conn.set_query_timeout(self.query_timeout)
if near_query:
phrases = [Phrase(PhraseType.NONE, p) for p in near_query.split(',')]
phrases = [nsearch.Phrase(nsearch.PHRASE_ANY, p) for p in near_query.split(',')]
else:
phrases = []
if details.keywords:
await make_query_analyzer(conn)
await nsearch.make_query_analyzer(conn)
geocoder = ForwardGeocoder(conn, details,
self.config.get_int('REQUEST_TIMEOUT')
if self.config.REQUEST_TIMEOUT else None)
geocoder = nsearch.ForwardGeocoder(conn, details,
self.config.get_int('REQUEST_TIMEOUT')
if self.config.REQUEST_TIMEOUT else None)
return await geocoder.lookup_pois(categories, phrases)

View File

@@ -9,5 +9,12 @@ Module for forward search.
"""
from .geocoder import (ForwardGeocoder as ForwardGeocoder)
from .query import (Phrase as Phrase,
PhraseType as PhraseType)
PHRASE_ANY as PHRASE_ANY,
PHRASE_AMENITY as PHRASE_AMENITY,
PHRASE_STREET as PHRASE_STREET,
PHRASE_CITY as PHRASE_CITY,
PHRASE_COUNTY as PHRASE_COUNTY,
PHRASE_STATE as PHRASE_STATE,
PHRASE_POSTCODE as PHRASE_POSTCODE,
PHRASE_COUNTRY as PHRASE_COUNTRY)
from .query_analyzer_factory import (make_query_analyzer as make_query_analyzer)

View File

@@ -10,7 +10,6 @@ Datastructures for a tokenized query.
from typing import List, Tuple, Optional, Iterator
from abc import ABC, abstractmethod
import dataclasses
import enum
BreakType = str
@@ -57,44 +56,45 @@ TOKEN_NEAR_ITEM = 'N'
""" Special term used as searchable object(e.g. supermarket in ...). """
class PhraseType(enum.Enum):
""" Designation of a phrase.
PhraseType = int
""" Designation of a phrase.
"""
PHRASE_ANY = 0
""" No specific designation (i.e. source is free-form query). """
PHRASE_AMENITY = 1
""" Contains name or type of a POI. """
PHRASE_STREET = 2
""" Contains a street name optionally with a housenumber. """
PHRASE_CITY = 3
""" Contains the postal city. """
PHRASE_COUNTY = 4
""" Contains the equivalent of a county. """
PHRASE_STATE = 5
""" Contains a state or province. """
PHRASE_POSTCODE = 6
""" Contains a postal code. """
PHRASE_COUNTRY = 7
""" Contains the country name or code. """
def _phrase_compatible_with(ptype: PhraseType, ttype: TokenType,
is_full_phrase: bool) -> bool:
""" Check if the given token type can be used with the phrase type.
"""
NONE = 0
""" No specific designation (i.e. source is free-form query). """
AMENITY = enum.auto()
""" Contains name or type of a POI. """
STREET = enum.auto()
""" Contains a street name optionally with a housenumber. """
CITY = enum.auto()
""" Contains the postal city. """
COUNTY = enum.auto()
""" Contains the equivalent of a county. """
STATE = enum.auto()
""" Contains a state or province. """
POSTCODE = enum.auto()
""" Contains a postal code. """
COUNTRY = enum.auto()
""" Contains the country name or code. """
if ptype == PHRASE_ANY:
return not is_full_phrase or ttype != TOKEN_QUALIFIER
if ptype == PHRASE_AMENITY:
return ttype in (TOKEN_WORD, TOKEN_PARTIAL)\
or (is_full_phrase and ttype == TOKEN_NEAR_ITEM)\
or (not is_full_phrase and ttype == TOKEN_QUALIFIER)
if ptype == PHRASE_STREET:
return ttype in (TOKEN_WORD, TOKEN_PARTIAL, TOKEN_HOUSENUMBER)
if ptype == PHRASE_POSTCODE:
return ttype == TOKEN_POSTCODE
if ptype == PHRASE_COUNTRY:
return ttype == TOKEN_COUNTRY
def compatible_with(self, ttype: TokenType,
is_full_phrase: bool) -> bool:
""" Check if the given token type can be used with the phrase type.
"""
if self == PhraseType.NONE:
return not is_full_phrase or ttype != TOKEN_QUALIFIER
if self == PhraseType.AMENITY:
return ttype in (TOKEN_WORD, TOKEN_PARTIAL)\
or (is_full_phrase and ttype == TOKEN_NEAR_ITEM)\
or (not is_full_phrase and ttype == TOKEN_QUALIFIER)
if self == PhraseType.STREET:
return ttype in (TOKEN_WORD, TOKEN_PARTIAL, TOKEN_HOUSENUMBER)
if self == PhraseType.POSTCODE:
return ttype == TOKEN_POSTCODE
if self == PhraseType.COUNTRY:
return ttype == TOKEN_COUNTRY
return ttype in (TOKEN_WORD, TOKEN_PARTIAL)
return ttype in (TOKEN_WORD, TOKEN_PARTIAL)
@dataclasses.dataclass
@@ -218,7 +218,7 @@ class QueryStruct:
def __init__(self, source: List[Phrase]) -> None:
self.source = source
self.nodes: List[QueryNode] = \
[QueryNode(BREAK_START, source[0].ptype if source else PhraseType.NONE)]
[QueryNode(BREAK_START, source[0].ptype if source else PHRASE_ANY)]
def num_token_slots(self) -> int:
""" Return the length of the query in vertice steps.
@@ -245,7 +245,7 @@ class QueryStruct:
snode = self.nodes[trange.start]
full_phrase = snode.btype in (BREAK_START, BREAK_PHRASE)\
and self.nodes[trange.end].btype in (BREAK_PHRASE, BREAK_END)
if snode.ptype.compatible_with(ttype, full_phrase):
if _phrase_compatible_with(snode.ptype, ttype, full_phrase):
tlist = snode.get_tokens(trange.end, ttype)
if tlist is None:
snode.starting.append(TokenList(trange.end, ttype, [token]))

View File

@@ -293,7 +293,7 @@ class _TokenSequence:
# * the containing phrase is strictly typed
if (base.housenumber and first.end < base.housenumber.start)\
or (base.qualifier and base.qualifier > first)\
or (query.nodes[first.start].ptype != qmod.PhraseType.NONE):
or (query.nodes[first.start].ptype != qmod.PHRASE_ANY):
return
penalty = self.penalty
@@ -329,7 +329,7 @@ class _TokenSequence:
# * the containing phrase is strictly typed
if (base.housenumber and last.start > base.housenumber.end)\
or (base.qualifier and base.qualifier < last)\
or (query.nodes[last.start].ptype != qmod.PhraseType.NONE):
or (query.nodes[last.start].ptype != qmod.PHRASE_ANY):
return
penalty = self.penalty
@@ -393,7 +393,7 @@ def yield_token_assignments(query: qmod.QueryStruct) -> Iterator[TokenAssignment
another. It does not include penalties for transitions within a
type.
"""
todo = [_TokenSequence([], direction=0 if query.source[0].ptype == qmod.PhraseType.NONE else 1)]
todo = [_TokenSequence([], direction=0 if query.source[0].ptype == qmod.PHRASE_ANY else 1)]
while todo:
state = todo.pop()