Source code for pymap.selected


from __future__ import annotations

from bisect import bisect_right
from collections.abc import Iterable, MutableSet, Sequence, Set
from itertools import chain, groupby, islice
from typing import Any
from weakref import WeakSet

from .flags import FlagOp, PermanentFlags, SessionFlags
from .interfaces.message import CachedMessage, FlagsKey
from .parsing.command import Command
from .parsing.primitives import List, Number
from .parsing.response import UntaggedResponse, ResponseBye
from .parsing.response.specials import ExistsResponse, RecentResponse, \
    ExpungeResponse, FetchResponse
from .parsing.specials import ObjectId, FetchAttribute, FetchValue, \
    Flag, SequenceSet

__all__ = ['SelectedSet', 'SynchronizedMessages', 'SelectedMailbox']

_flags_attr = FetchAttribute(b'FLAGS')
_uid_attr = FetchAttribute(b'UID')


[docs] class SelectedSet: """Maintains a weak set of :class:`SelectedMailbox` objects that exist for a mailbox, across all sessions. This is useful for assigning the ``\\Recent`` flag, as well as notifying other sessions about updates. """ __slots__ = ['_set'] def __init__(self) -> None: super().__init__() self._set: MutableSet[SelectedMailbox] = WeakSet()
[docs] def add(self, selected: SelectedMailbox, *, replace: SelectedMailbox | None = None) -> None: """Add a new selected mailbox object to the set, which may then be returned by :meth:`.any_selected`. Args: selected: The new selected mailbox object. replace: An existing selected mailbox object that should be removed from the weak set. """ if replace is not None: self._set.discard(replace) self._set.add(selected)
@property def any_selected(self) -> SelectedMailbox | None: """A single, random object in the set of selected mailbox objects. Selected mailbox object's marked :attr:`~SelectedMailbox.readonly` will not be chosen. """ for selected in self._set: if not selected.readonly: return selected return None
class _Frozen: def __init__(self, selected: SelectedMailbox) -> None: super().__init__() messages = selected.messages session_flags = selected.session_flags self.is_deleted = selected._is_deleted self.uids = messages._uids.copy() self.seqs_cache = messages._seqs_cache.copy() self.flags = messages._flags_key_set.copy() self.recent = session_flags.recent_uids & self.uids self.sflags = frozenset(session_flags.flags.items())
[docs] class SynchronizedMessages: """Manages the message data that has been synchronized with the client.""" def __init__(self) -> None: super().__init__() self._uids: set[int] = set() self._sorted: list[int] = [] self._seqs_cache: dict[int, int] = {} self._cache: dict[int, CachedMessage] = {} self._flags_key_map: dict[int, FlagsKey] = {} self._flags_key_set: set[FlagsKey] = set() self._pending_remove: set[int] = set() @property def exists(self) -> int: """The total number of messages in the mailbox.""" return len(self._uids) @property def max_uid(self) -> int: """The highest message UID value of the mailbox.""" try: return self._sorted[-1] except IndexError: return 0 def _update(self, messages: Iterable[CachedMessage]) -> None: lowest_idx: int | None = None for msg in messages: msg_uid = msg.uid if msg_uid not in self._uids: self._uids.add(msg_uid) idx = bisect_right(self._sorted, msg_uid) if lowest_idx is None or lowest_idx > idx: lowest_idx = idx self._sorted.insert(idx, msg_uid) self._cache[msg_uid] = msg new_flags_key = msg.flags_key old_flags_key = self._flags_key_map.get(msg_uid) if old_flags_key is not None: self._flags_key_set.discard(old_flags_key) self._flags_key_map[msg_uid] = new_flags_key self._flags_key_set.add(new_flags_key) if lowest_idx is not None: needs_reset = islice(self._sorted, lowest_idx, len(self._sorted)) for seq, uid in enumerate(needs_reset, lowest_idx + 1): self._seqs_cache[uid] = seq def _remove(self, uids: Iterable[int], pending: bool) -> None: if pending: self._pending_remove.update(uids) else: any_removed = False for msg_uid in chain(uids, self._pending_remove): try: self._uids.remove(msg_uid) except KeyError: pass else: flags_key = self._flags_key_map[msg_uid] self._flags_key_set.remove(flags_key) del self._flags_key_map[msg_uid] del self._cache[msg_uid] any_removed = True self._pending_remove.clear() if any_removed: self._sorted = sorted_uids = sorted(self._uids) self._seqs_cache = {uid: seq for seq, uid in enumerate(sorted_uids, 1)}
[docs] def get(self, uid: int) -> CachedMessage | None: """Return the given cached message. Args: uid: The message UID. """ return self._cache.get(uid)
[docs] def get_uids(self, seq_set: SequenceSet) -> Sequence[tuple[int, int]]: """Return the message sequence numbers and their UIDs for the given sequence set. Args: seq_set: The message sequence set. """ if seq_set.uid: all_uids = seq_set.flatten(self.max_uid) & self._uids return [(seq, uid) for seq, uid in enumerate(self._sorted, 1) if uid in all_uids] else: all_seqs = seq_set.flatten(self.exists) return [(seq, uid) for seq, uid in enumerate(self._sorted, 1) if seq in all_seqs]
[docs] def get_all(self, seq_set: SequenceSet) \ -> Sequence[tuple[int, CachedMessage]]: """Return the cached messages, and their sequence numbers, for the given sequence set. Args: seq_set: The message sequence set. """ if seq_set.uid: all_uids = seq_set.flatten(self.max_uid) & self._uids return [(seq, self._cache[uid]) for seq, uid in enumerate(self._sorted, 1) if uid in all_uids] else: all_seqs = seq_set.flatten(self.exists) return [(seq, self._cache[uid]) for seq, uid in enumerate(self._sorted, 1) if seq in all_seqs]
[docs] class SelectedMailbox: """Manages the updates to the current selected mailbox from other concurrent sessions. The current state of the selected mailbox will be written to this object by the backend implementation during each operation. Then, when the operation completes, call :meth:`.fork` to make a fresh copy of the object and any untagged responses that should be added to the response. Args: mailbox_id: The globally unique identifier of the selected mailbox. readonly: Indicates the mailbox is selected as read-only. permanent_flags (~pymap.flags.PermanentFlags): The defined permanent flags for the mailbox. session_flags: Session-only flags for the mailbox. selected_set: The ``self`` object and subsequent forked copies will be kept in in this set. """ __slots__ = ['_mailbox_id', '_readonly', '_permanent_flags', '_session_flags', '_selected_set', '_kwargs', '_lookup', '_mod_sequence', '_is_deleted', '_hide_expunged', '_silenced_flags', '_silenced_sflags', '_prev', '_messages', '__weakref__'] def __init__(self, mailbox_id: ObjectId, readonly: bool, permanent_flags: PermanentFlags, session_flags: SessionFlags, selected_set: SelectedSet | None = None, lookup: Any = None, **kwargs: Any) -> None: super().__init__() self._mailbox_id = mailbox_id self._readonly = readonly self._permanent_flags = permanent_flags self._session_flags = session_flags self._selected_set = selected_set self._kwargs = kwargs self._lookup: Any = lookup self._mod_sequence = kwargs.get('_mod_sequence') self._is_deleted = False self._hide_expunged = False self._silenced_flags: set[tuple[int, frozenset[Flag]]] = set() self._silenced_sflags: set[tuple[int, frozenset[Flag]]] = set() self._prev: _Frozen | None = kwargs.get('_prev') try: self._messages: SynchronizedMessages = kwargs['_messages'] except KeyError: self._messages = SynchronizedMessages() if selected_set is not None: selected_set.add(self) @property def mailbox_id(self) -> ObjectId: """The selected mailbox object ID. See Also: :attr:`~pymap.interfaces.mailbox.MailboxInterface.mailbox_id` """ return self._mailbox_id @property def lookup(self) -> Any: """The lookup value, if any, needed by backends that cannot lookup mailboxes by :attr:`.mailbox_id`. A typical lookup value might be the name of the mailbox. """ return self._lookup @lookup.setter def lookup(self, lookup: Any) -> None: self._lookup = lookup @property def mod_sequence(self) -> Any: """The highest modification sequence of the mailbox.""" return self._mod_sequence @mod_sequence.setter def mod_sequence(self, mod_sequence: Any) -> None: self._mod_sequence = mod_sequence @property def hide_expunged(self) -> bool: """If True, no untagged ``EXPUNGE`` responses will be generated, and message sequence numbers will not be adjusted, until the next :meth:`.fork`. """ return self._hide_expunged @hide_expunged.setter def hide_expunged(self, hide_expunged: bool) -> None: self._hide_expunged = hide_expunged
[docs] def add_updates(self, messages: Iterable[CachedMessage], expunged: Iterable[int]) -> None: """Update the messages in the selected mailboxes. The ``messages`` should include non-expunged messages in the mailbox that should be checked for updates. The ``expunged`` argument is the set of UIDs that have been expunged from the mailbox. In an optimized implementation, ``messages`` only includes new messages or messages with metadata updates. This minimizes the comparison needed to determine what untagged responses are necessary. The :attr:`.mod_sequence` attribute may be used to support this optimization. If a backend implementation lacks the ability to determine the subset of messages that have been updated, it should instead use :meth:`.set_messages`. Args: messages: The cached message objects to add. expunged: The set of message UIDs that have been expunged. """ self._messages._update(messages) self._messages._remove(expunged, self._hide_expunged) if not self._hide_expunged: self._session_flags.remove(expunged)
[docs] def set_messages(self, messages: Sequence[CachedMessage]) -> None: """This is the non-optimized alternative to :meth:`.add_updates` for backend implementations that cannot detect their own updates and must instead compare the entire state of the mailbox. The ``messages`` list should contain the entire set of messages in the mailbox, ordered by UID. Any UID that previously existed and is not included in ``messages`` will be expunged. Args: messages: The entire set of cached message objects. """ uids = {msg.uid for msg in messages} expunged = self._messages._uids - uids return self.add_updates(messages, expunged)
@property def readonly(self) -> bool: """Indicates the mailbox is selected as read-only.""" return self._readonly @property def messages(self) -> SynchronizedMessages: """The messages in the mailbox, as synchronized with the client.""" return self._messages @property def permanent_flags(self) -> PermanentFlags: """The defined permanent flags for the mailbox.""" return self._permanent_flags @property def session_flags(self) -> SessionFlags: """Session-only flags for the mailbox.""" return self._session_flags
[docs] def set_deleted(self) -> None: """Marks the selected mailbox as having been deleted.""" self._is_deleted = True
[docs] def silence(self, seq_set: SequenceSet, flag_set: Set[Flag], flag_op: FlagOp) -> None: """Runs the flags update against the cached flags, to prevent untagged FETCH responses unless other updates have occurred. For example, if a session adds ``\\Deleted`` and calls this method, the FETCH response will be silenced. But if another added ``\\Seen`` at the same time, the FETCH response will be sent. Args: seq_set: The sequence set of messages. flag_set: The set of flags for the update operation. flag_op: The mode to change the flags. """ session_flags = self.session_flags permanent_flag_set = self.permanent_flags & flag_set session_flag_set = session_flags & flag_set for _, msg in self._messages.get_all(seq_set): msg_flags = msg.permanent_flags msg_sflags = session_flags.get(msg.uid) updated_flags = flag_op.apply(msg_flags, permanent_flag_set) updated_sflags = flag_op.apply(msg_sflags, session_flag_set) if msg_flags != updated_flags: self._silenced_flags.add((msg.uid, updated_flags)) if msg_sflags != updated_sflags: self._silenced_sflags.add((msg.uid, updated_sflags))
[docs] def fork(self, command: Command) \ -> tuple[SelectedMailbox, Iterable[UntaggedResponse]]: """Compares the state of the current object to that of the last fork, returning the untagged responses that reflect any changes. A new copy of the object is also returned, ready for the next command. Args: command: The command that was finished. """ frozen = _Frozen(self) cls = type(self) copy = cls(self._mailbox_id, self._readonly, self._permanent_flags, self._session_flags, self._selected_set, self._lookup, _mod_sequence=self._mod_sequence, _prev=frozen, _messages=self._messages) if self._prev is not None: with_uid: bool = getattr(command, 'uid', False) untagged = self._compare(self._prev, frozen, with_uid) else: untagged = [] return copy, untagged
def _compare(self, before: _Frozen, after: _Frozen, with_uid: bool) -> Iterable[UntaggedResponse]: if after.is_deleted: yield ResponseBye(b'Selected mailbox no longer exists.') return cache = self._messages._cache session_flags = self._session_flags expunged_uids = before.uids - after.uids new_uids = after.uids - before.uids if not self._hide_expunged and expunged_uids: for uid in sorted(expunged_uids, reverse=True): yield ExpungeResponse(before.seqs_cache[uid]) if new_uids: yield ExistsResponse(len(after.uids)) if len(after.recent) != len(before.recent): yield RecentResponse(len(after.recent)) new_recent = (after.recent - before.recent) new_flags = (after.flags - before.flags - self._silenced_flags) new_sflags = (after.sflags - before.sflags - self._silenced_sflags) fetch_uids = chain(new_recent, (uid for uid, _ in new_flags), (uid for uid, _ in new_sflags)) for uid, _ in groupby(sorted(fetch_uids)): seq = after.seqs_cache[uid] msg_flags = cache[uid].get_flags(session_flags) fetch_data: list[FetchValue] = [ FetchValue.of(_flags_attr, List(msg_flags, sort=True))] if with_uid: fetch_data.append(FetchValue.of(_uid_attr, Number(uid))) yield FetchResponse(seq, fetch_data)