Source code for pymap.search

"""Utilities for managing IMAP searches."""

from __future__ import annotations

import re
from abc import abstractmethod, ABCMeta
from collections.abc import Iterable
from datetime import datetime
from typing import AnyStr, Final

from .exceptions import SearchNotAllowed
from .interfaces.message import MessageInterface, LoadedMessageInterface
from .parsing.specials import ObjectId, SearchKey, SequenceSet
from .parsing.specials.flag import Flag, Answered, Deleted, Draft, Flagged, \
    Recent, Seen
from .selected import SelectedMailbox

__all__ = ['SearchParams', 'SearchCriteria', 'SearchCriteriaSet']


[docs] class SearchParams: """Defines certain parameters and routines necessary to process any kind of search criteria. If a parameter is not supplied, or a method not implemented, any search keys that require it will fail. Args: selected: The active mailbox session. max_seq: The highest message sequence ID in the mailbox. max_uid: The highest message UID in the mailbox. disabled: Search keys that should be disabled. """ __slots__ = ['selected', 'disabled', 'max_seq', 'max_uid', 'session_flags'] def __init__(self, selected: SelectedMailbox, *, disabled: Iterable[bytes] | None = None) -> None: self.selected: Final = selected self.disabled: Final = frozenset(disabled or []) self.max_seq: Final = selected.messages.exists self.max_uid: Final = selected.messages.max_uid self.session_flags: Final = selected.session_flags
[docs] class SearchCriteria(metaclass=ABCMeta): """Base class for different types of search criteria. Args: params: The parameters that may be used by some searches. """ def __init__(self, params: SearchParams) -> None: self.params = params @classmethod def _in(cls, substr: AnyStr, data: AnyStr) -> bool: re_flags = re.I | re.A escaped_substr = re.escape(substr) return re.search(escaped_substr, data, re_flags) is not None
[docs] @abstractmethod def matches(self, msg_seq: int, msg: MessageInterface, loaded_msg: LoadedMessageInterface) -> bool: """Implemented by sub-classes to define the search criteria. Args: msg_seq: The message sequence ID. msg: The message object. loaded_msg: The message content. """ ...
[docs] @classmethod def of(cls, key: SearchKey, params: SearchParams) -> SearchCriteria: """Factory method for producing a search criteria sub-class from a search key. Args: key: The search key defining the criteria. params: The parameters that may be used by some searches. """ key_name = key.value if key_name in params.disabled: raise SearchNotAllowed(key) elif key.inverse: return InverseSearchCriteria(key.not_inverse, params) elif key_name == b'SEQSET': return SequenceSetSearchCriteria(key.filter_sequence_set, params) elif key_name == b'KEYSET': return SearchCriteriaSet(key.filter_key_set, params) elif key_name == b'ALL': return AllSearchCriteria(params) elif key_name == b'OR': left_key, right_key = key.filter_key_or return OrSearchCriteria(left_key, right_key, params) elif key_name == b'EMAILID': return HasEmailIdSearchCriteria(key.filter_object_id, params) elif key_name == b'THREADID': return HasThreadIdSearchCriteria(key.filter_object_id, params) elif key_name == b'ANSWERED': return HasFlagSearchCriteria(Answered, True, params) elif key_name == b'UNANSWERED': return HasFlagSearchCriteria(Answered, False, params) elif key_name == b'DELETED': return HasFlagSearchCriteria(Deleted, True, params) elif key_name == b'UNDELETED': return HasFlagSearchCriteria(Deleted, False, params) elif key_name == b'DRAFT': return HasFlagSearchCriteria(Draft, True, params) elif key_name == b'UNDRAFT': return HasFlagSearchCriteria(Draft, False, params) elif key_name == b'FLAGGED': return HasFlagSearchCriteria(Flagged, True, params) elif key_name == b'UNFLAGGED': return HasFlagSearchCriteria(Flagged, False, params) elif key_name == b'RECENT': return HasFlagSearchCriteria(Recent, True, params) elif key_name == b'OLD': return HasFlagSearchCriteria(Recent, False, params) elif key_name == b'SEEN': return HasFlagSearchCriteria(Seen, True, params) elif key_name == b'UNSEEN': return HasFlagSearchCriteria(Seen, False, params) elif key_name == b'KEYWORD': return HasFlagSearchCriteria(key.filter_flag, True, params) elif key_name == b'UNKEYWORD': return HasFlagSearchCriteria(key.filter_flag, False, params) elif key_name == b'NEW': return NewSearchCriteria(params) elif key_name == b'BEFORE': return DateSearchCriteria(key.filter_datetime, '<', params) elif key_name == b'ON': return DateSearchCriteria(key.filter_datetime, '=', params) elif key_name == b'SINCE': return DateSearchCriteria(key.filter_datetime, '>=', params) elif key_name == b'SENTBEFORE': return HeaderDateSearchCriteria(key.filter_datetime, '<', params) elif key_name == b'SENTON': return HeaderDateSearchCriteria(key.filter_datetime, '=', params) elif key_name == b'SENTSINCE': return HeaderDateSearchCriteria(key.filter_datetime, '>=', params) elif key_name == b'SMALLER': return SizeSearchCriteria(key.filter_int, '<', params) elif key_name == b'LARGER': return SizeSearchCriteria(key.filter_int, '>', params) elif key_name in (b'BCC', b'CC', b'FROM', b'SUBJECT', b'TO'): return EnvelopeSearchCriteria(key_name, key.filter_str, params) elif key_name == b'HEADER': name, value = key.filter_header return HeaderSearchCriteria(name, value, params) elif key_name in (b'BODY', b'TEXT'): return BodySearchCriteria(key.filter_str, params) raise SearchNotAllowed(key)
[docs] class SearchCriteriaSet(SearchCriteria): """Search criteria composed of a set of search criteria that must all match. If the set is empty, nothing will match. Args: keys: The set of search keys that must match. params: The parameters that may be used by some searches. """ def __init__(self, keys: Iterable[SearchKey], params: SearchParams) -> None: super().__init__(params) self.all_criteria = [SearchCriteria.of(key, params) for key in keys] @property def sequence_set(self) -> SequenceSet: """The sequence set to use when finding the messages to match against. This will default to all messages unless the search criteria set contains a sequence set. """ try: seqset_crit = next(crit for crit in self.all_criteria if isinstance(crit, SequenceSetSearchCriteria)) except StopIteration: return SequenceSet.all() else: return seqset_crit.seq_set
[docs] def matches(self, msg_seq: int, msg: MessageInterface, loaded_msg: LoadedMessageInterface) -> bool: """The message matches if all the defined search key criteria match. Args: msg_seq: The message sequence ID. msg: The message object. """ return all(crit.matches(msg_seq, msg, loaded_msg) for crit in self.all_criteria)
[docs] class InverseSearchCriteria(SearchCriteria): """Matches only if the given search criteria does not match.""" def __init__(self, key: SearchKey, params: SearchParams) -> None: super().__init__(params) self.key = SearchCriteria.of(key, params)
[docs] def matches(self, msg_seq: int, msg: MessageInterface, loaded_msg: LoadedMessageInterface) -> bool: return not self.key.matches(msg_seq, msg, loaded_msg)
[docs] class AllSearchCriteria(SearchCriteria): """Always matches anything."""
[docs] def matches(self, msg_seq: int, msg: MessageInterface, loaded_msg: LoadedMessageInterface) -> bool: return True
[docs] class OrSearchCriteria(SearchCriteria): """Matches if either of the search criteria match.""" def __init__(self, left: SearchKey, right: SearchKey, params: SearchParams) -> None: super().__init__(params) self.left = SearchCriteria.of(left, self.params) self.right = SearchCriteria.of(right, self.params)
[docs] def matches(self, msg_seq: int, msg: MessageInterface, loaded_msg: LoadedMessageInterface) -> bool: return (self.left.matches(msg_seq, msg, loaded_msg) or self.right.matches(msg_seq, msg, loaded_msg))
[docs] class SequenceSetSearchCriteria(SearchCriteria): """Matches if the message is contained in the sequence set.""" def __init__(self, seq_set: SequenceSet, params: SearchParams) -> None: super().__init__(params) self.seq_set = seq_set if seq_set.uid: self.flat = seq_set.flatten(params.max_uid) else: self.flat = seq_set.flatten(params.max_seq)
[docs] def matches(self, msg_seq: int, msg: MessageInterface, loaded_msg: LoadedMessageInterface) -> bool: if self.seq_set.uid: return msg.uid in self.flat else: return msg_seq in self.flat
[docs] class HasEmailIdSearchCriteria(SearchCriteria): """Matches if the message has the same :attr:`~pymap.interfaces.message.MessageInterface.email_id` value. """ def __init__(self, email_id: ObjectId, params: SearchParams) -> None: super().__init__(params) self.email_id = email_id
[docs] def matches(self, msg_seq: int, msg: MessageInterface, loaded_msg: LoadedMessageInterface) -> bool: try: email_id = msg.email_id except ValueError: return False else: return self.email_id == email_id
[docs] class HasThreadIdSearchCriteria(SearchCriteria): """Matches if the message has the same :attr:`~pymap.interfaces.message.MessageInterface.thread_id` value. """ def __init__(self, thread_id: ObjectId, params: SearchParams) -> None: super().__init__(params) self.thread_id = thread_id
[docs] def matches(self, msg_seq: int, msg: MessageInterface, loaded_msg: LoadedMessageInterface) -> bool: return self.thread_id == msg.thread_id
[docs] class HasFlagSearchCriteria(SearchCriteria): """Matches if the message has the given flag in their permanent or session flag sets. """ def __init__(self, flag: Flag, expected: bool, params: SearchParams) -> None: super().__init__(params) self.flag = flag self.expected = expected
[docs] def matches(self, msg_seq: int, msg: MessageInterface, loaded_msg: LoadedMessageInterface) -> bool: has_flag = self.flag in msg.get_flags(self.params.session_flags) expected = self.expected return (has_flag and expected) or (not expected and not has_flag)
[docs] class NewSearchCriteria(SearchCriteria): """Matches if the message is considered "new", i.e. recent and unseen."""
[docs] def matches(self, msg_seq: int, msg: MessageInterface, loaded_msg: LoadedMessageInterface) -> bool: flags = msg.get_flags(self.params.session_flags) return Recent in flags and Seen not in flags
[docs] class DateSearchCriteria(SearchCriteria): """Matches by comparing against the internal date of the message.""" def __init__(self, when: datetime, op: str, params: SearchParams) -> None: super().__init__(params) self.when = when.date() self.op = op @classmethod def _get_msg_date(cls, msg: MessageInterface, loaded_msg: LoadedMessageInterface) \ -> datetime | None: return msg.internal_date
[docs] def matches(self, msg_seq: int, msg: MessageInterface, loaded_msg: LoadedMessageInterface) -> bool: msg_datetime = self._get_msg_date(msg, loaded_msg) if msg_datetime is None: return False msg_date = msg_datetime.date() if self.op == '<': # BEFORE return msg_date < self.when elif self.op == '=': # ON return msg_date == self.when elif self.op == '>=': # SINCE return msg_date >= self.when raise ValueError(self.op)
[docs] class HeaderDateSearchCriteria(DateSearchCriteria): """Matches by comparing against the ``Date:`` header of the message.""" @classmethod def _get_msg_date(cls, msg: MessageInterface, loaded_msg: LoadedMessageInterface) \ -> datetime | None: envelope = loaded_msg.get_envelope_structure() return envelope.date.datetime if envelope.date else None
[docs] class SizeSearchCriteria(SearchCriteria): """Matches by comparing against the size of the message.""" def __init__(self, size: int, op: str, params: SearchParams) -> None: super().__init__(params) self.size = size self.op = op
[docs] def matches(self, msg_seq: int, msg: MessageInterface, loaded_msg: LoadedMessageInterface) -> bool: size = loaded_msg.get_size() if self.op == '<': return size < self.size elif self.op == '>': return size > self.size raise ValueError(self.op)
[docs] class EnvelopeSearchCriteria(SearchCriteria): """Matches by checking for strings within various fields of the envelope structure. """ def __init__(self, key: bytes, value: str, params: SearchParams) -> None: super().__init__(params) self.key = key self.value = value
[docs] def matches(self, msg_seq: int, msg: MessageInterface, loaded_msg: LoadedMessageInterface) -> bool: envelope = loaded_msg.get_envelope_structure() if self.key == b'BCC': if not envelope.bcc: return False return any(self._in(self.value, str(bcc)) for bcc in envelope.bcc) elif self.key == b'CC': if not envelope.cc: return False return any(self._in(self.value, str(cc)) for cc in envelope.cc) elif self.key == b'FROM': if not envelope.from_: return False return any(self._in(self.value, str(from_)) for from_ in envelope.from_) elif self.key == b'SUBJECT': if not envelope.subject: return False return self._in(self.value, str(envelope.subject)) elif self.key == b'TO': if not envelope.to: return False return any(self._in(self.value, str(to)) for to in envelope.to) raise ValueError(self.key)
[docs] class HeaderSearchCriteria(SearchCriteria): """Matches if the message has a header containing a value.""" def __init__(self, name: str, value: str, params: SearchParams) -> None: super().__init__(params) self.name = name.encode('ascii') self.value = value
[docs] def matches(self, msg_seq: int, msg: MessageInterface, loaded_msg: LoadedMessageInterface) -> bool: values = loaded_msg.get_header(self.name) return any(self._in(self.value, value) for value in values)
[docs] class BodySearchCriteria(SearchCriteria): """Matches if the message body contains a value.""" def __init__(self, value: str, params: SearchParams) -> None: super().__init__(params) self.value = bytes(value, 'utf-8', 'replace')
[docs] def matches(self, msg_seq: int, msg: MessageInterface, loaded_msg: LoadedMessageInterface) -> bool: return loaded_msg.contains(self.value)