from __future__ import annotations
import re
from collections.abc import Sequence
from datetime import datetime
from typing import TypeAlias, TypeVar, Any, Union
from .astring import AString
from .fetchattr import FetchRequirement
from .flag import Flag
from .objectid import ObjectId
from .sequenceset import SequenceSet
from .. import Params, Parseable, ExpectedParseable, Space
from ..exceptions import NotParseable, UnexpectedType
from ..primitives import Atom, Number, QuotedString, List
from ...frozen import frozenlist
__all__ = ['SearchKey']
_FilterType: TypeAlias = Union[
tuple['SearchKey', 'SearchKey'], tuple[str, str],
Sequence['SearchKey'], SequenceSet, Flag, ObjectId,
datetime, int, str, frozenlist['SearchKey']]
_FilterT = TypeVar('_FilterT', bound=_FilterType)
[docs]
class SearchKey(Parseable[bytes]):
"""Represents a search key given to the SEARCH command on an IMAP stream.
Args:
key: The search key.
filter_: The filter object, used by most search keys.
inverse: If the search key was inverted with ``NOT``.
Args:
filter_: The filter object, used by most search keys.
inverse: If the search key was inverted with ``NOT``.
"""
_not_pattern = re.compile(br'NOT +', re.I)
def __init__(self, key: bytes,
filter_: _FilterType | None = None,
inverse: bool = False) -> None:
super().__init__()
self.key = key
self.filter = filter_
self.inverse = inverse
@property
def value(self) -> bytes:
"""The search key."""
return self.key
@property
def value_str(self) -> str:
"""The search key, as a string."""
return self.key.decode('ascii')
@property
def requirement(self) -> FetchRequirement:
"""Indicates the data required to fulfill this search key."""
key_name = self.key
if key_name == b'ALL':
return FetchRequirement.NONE
elif key_name == b'KEYSET':
keyset_reqs = {key.requirement for key in self.filter_key_set}
return FetchRequirement.reduce(keyset_reqs)
elif key_name == b'OR':
left, right = self.filter_key_or
key_or_reqs = {left.requirement, right.requirement}
return FetchRequirement.reduce(key_or_reqs)
elif key_name in (b'SENTBEFORE', b'SENTON', b'SENTSINCE', b'BCC',
b'CC', b'FROM', b'SUBJECT', b'TO', b'HEADER'):
return FetchRequirement.HEADER
elif key_name in (b'BODY', b'TEXT', b'LARGER', b'SMALLER'):
return FetchRequirement.CONTENT
else:
return FetchRequirement.METADATA
@property
def not_inverse(self) -> SearchKey:
"""Return a copy of the search key with :attr:`.inverse` flipped."""
return SearchKey(self.value, self.filter, not self.inverse)
def _get_filter(self, cls: type[_FilterT]) -> _FilterT:
if not isinstance(self.filter, cls):
raise TypeError(self.filter)
return self.filter
@property
def filter_sequence_set(self) -> SequenceSet:
return self._get_filter(SequenceSet)
@property
def filter_key_set(self) -> frozenlist[SearchKey]:
return self._get_filter(frozenlist) # type: ignore
@property
def filter_key_or(self) -> tuple[SearchKey, SearchKey]:
return self._get_filter(tuple)
@property
def filter_flag(self) -> Flag:
return self._get_filter(Flag)
@property
def filter_datetime(self) -> datetime:
return self._get_filter(datetime)
@property
def filter_int(self) -> int:
return self._get_filter(int)
@property
def filter_str(self) -> str:
return self._get_filter(str)
@property
def filter_header(self) -> tuple[str, str]:
return self._get_filter(tuple)
@property
def filter_object_id(self) -> ObjectId:
return self._get_filter(ObjectId)
def __bytes__(self) -> bytes:
raise NotImplementedError
def __hash__(self) -> int:
return hash((self.value, self.filter, self.inverse))
def __eq__(self, other: Any) -> bool:
if isinstance(other, SearchKey):
return hash(self) == hash(other)
return super().__eq__(other)
def __ne__(self, other: Any) -> bool:
if isinstance(other, SearchKey):
return hash(self) != hash(other)
return super().__ne__(other)
@classmethod
def _parse_astring_filter(cls, buf: memoryview, params: Params) \
-> tuple[str, memoryview]:
ret, after = AString.parse(buf, params)
return ret.value.decode(params.charset or 'ascii'), after
@classmethod
def _parse_date_filter(cls, buf: memoryview, params: Params) \
-> tuple[datetime, memoryview]:
params_copy = params.copy(expected=[Atom, QuotedString])
atom, after = ExpectedParseable.parse(buf, params_copy)
date_str = str(atom.value, 'ascii', 'ignore')
try:
date = datetime.strptime(date_str, '%d-%b-%Y')
except ValueError as exc:
raise NotParseable(buf) from exc
return date, after
[docs]
@classmethod
def parse(cls, buf: memoryview, params: Params) \
-> tuple[SearchKey, memoryview]:
try:
_, buf = Space.parse(buf, params)
except NotParseable:
pass
inverse = False
match = cls._not_pattern.match(buf)
if match:
inverse = True
buf = buf[match.end(0):]
try:
seq_set, buf = SequenceSet.parse(buf, params)
except NotParseable:
pass
else:
return cls(b'SEQSET', seq_set, inverse), buf
try:
params_copy = params.copy(expected=[SearchKey])
key_list_p, buf = List.parse(buf, params_copy)
except UnexpectedType:
raise
except NotParseable:
pass
else:
key_set = key_list_p.get_as(SearchKey)
return cls(b'KEYSET', key_set, inverse), buf
atom, after = Atom.parse(buf, params)
key = atom.value.upper()
if key in (b'ALL', b'ANSWERED', b'DELETED', b'FLAGGED', b'NEW', b'OLD',
b'RECENT', b'SEEN', b'UNANSWERED', b'UNDELETED',
b'UNFLAGGED', b'UNSEEN', b'DRAFT', b'UNDRAFT'):
return cls(key, inverse=inverse), after
elif key in (
b'BCC', b'BODY', b'CC', b'FROM', b'SUBJECT', b'TEXT', b'TO'):
_, buf = Space.parse(after, params)
filter_str, buf = cls._parse_astring_filter(buf, params)
return cls(key, filter_str, inverse), buf
elif key in (b'EMAILID', b'THREADID'):
_, buf = Space.parse(after, params)
filter_id, buf = ObjectId.parse(buf, params)
return cls(key, filter_id, inverse), buf
elif key in (b'BEFORE', b'ON', b'SINCE', b'SENTBEFORE', b'SENTON',
b'SENTSINCE'):
_, buf = Space.parse(after, params)
filter_date, buf = cls._parse_date_filter(buf, params)
return cls(key, filter_date, inverse), buf
elif key in (b'KEYWORD', b'UNKEYWORD'):
_, buf = Space.parse(after, params)
flag, after_buf = Flag.parse(buf, params)
if flag.is_system:
raise NotParseable(buf)
return cls(key, flag, inverse), after_buf
elif key in (b'LARGER', b'SMALLER'):
_, buf = Space.parse(after, params)
num, buf = Number.parse(buf, params)
return cls(key, num.value, inverse), buf
elif key == b'UID':
_, buf = Space.parse(after, params)
seq_set, buf = SequenceSet.parse(buf, params.copy(uid=True))
return cls(b'SEQSET', seq_set, inverse), buf
elif key == b'HEADER':
_, buf = Space.parse(after, params)
header_field, buf = cls._parse_astring_filter(buf, params)
_, buf = Space.parse(buf, params)
header_value, buf = cls._parse_astring_filter(buf, params)
return cls(key, (header_field, header_value), inverse), buf
elif key == b'OR':
_, buf = Space.parse(after, params)
or1, buf = SearchKey.parse(buf, params)
_, buf = Space.parse(buf, params)
or2, buf = SearchKey.parse(buf, params)
return cls(key, (or1, or2), inverse), buf
raise NotParseable(buf)