from __future__ import annotations
from abc import abstractmethod
from asyncio import shield
from collections.abc import Iterable, Sequence
from typing import Generic, Any
from pymap.concurrent import Event
from pymap.config import IMAPConfig
from pymap.exceptions import MailboxNotFound, MailboxConflict, MailboxReadOnly
from pymap.flags import FlagOp, SessionFlags, PermanentFlags
from pymap.interfaces.filter import FilterSetInterface
from pymap.interfaces.message import MessageT
from pymap.interfaces.session import SessionInterface
from pymap.mailbox import MailboxSnapshot
from pymap.parsing.message import AppendMessage
from pymap.parsing.specials import SequenceSet, SearchKey, ObjectId, \
FetchRequirement
from pymap.parsing.specials.flag import Flag, Seen
from pymap.parsing.response.code import AppendUid, CopyUid
from pymap.search import SearchParams, SearchCriteriaSet
from pymap.selected import SelectedMailbox
from .mailbox import MailboxDataInterface, MailboxSetInterface
__all__ = ['BaseSession']
[docs]
class BaseSession(SessionInterface, Generic[MessageT]):
"""Base implementation of
:class:`~pymap.interfaces.session.SessionInterface` intended for use by
most backends.
Args:
owner: The logged-in user name.
"""
def __init__(self, owner: str) -> None:
super().__init__()
self._owner = owner
@property
def owner(self) -> str:
return self._owner
@property
@abstractmethod
def config(self) -> IMAPConfig:
...
@property
@abstractmethod
def mailbox_set(self) \
-> MailboxSetInterface[MailboxDataInterface[MessageT]]:
...
@property
def filter_set(self) -> FilterSetInterface[Any] | None:
return None
[docs]
def close(self) -> None:
pass
[docs]
async def cleanup(self) -> None:
pass
async def _load_updates(self, selected: SelectedMailbox | None,
mbx: MailboxDataInterface[MessageT] | None) \
-> SelectedMailbox | None:
if selected:
if not mbx or selected.mailbox_id != mbx.mailbox_id:
try:
mbx = await self._get_selected(selected)
except MailboxNotFound:
selected.set_deleted()
return selected
return await mbx.update_selected(selected)
return selected
@classmethod
def _pick_selected(cls, selected: SelectedMailbox | None,
mbx: MailboxDataInterface[MessageT]) \
-> SelectedMailbox | None:
if selected and selected.mailbox_id == mbx.mailbox_id:
return selected
return mbx.selected_set.any_selected
async def _get_mailbox(self, name: str, *, try_create: bool = False) \
-> MailboxDataInterface[MessageT]:
try:
return await self.mailbox_set.get_mailbox(name)
except KeyError as exc:
raise MailboxNotFound(name, try_create=try_create) from exc
async def _get_selected(self, selected: SelectedMailbox) \
-> MailboxDataInterface[MessageT]:
return await self._get_mailbox(selected.lookup)
[docs]
async def list_mailboxes(self, ref_name: str, filter_: str,
subscribed: bool = False,
selected: SelectedMailbox | None = None) \
-> tuple[Iterable[tuple[str, str | None, Sequence[bytes]]],
SelectedMailbox | None]:
delimiter = self.mailbox_set.delimiter
if filter_:
if subscribed:
list_tree = await self.mailbox_set.list_subscribed()
else:
list_tree = await self.mailbox_set.list_mailboxes()
ret = [(entry.name, delimiter, entry.attributes)
for entry in list_tree.list_matching(ref_name, filter_)]
else:
ret = [("", delimiter, [b'Noselect'])]
return ret, await self._load_updates(selected, None)
[docs]
async def get_mailbox(self, name: str,
selected: SelectedMailbox | None = None) \
-> tuple[MailboxSnapshot, SelectedMailbox | None]:
try:
mbx = await self.mailbox_set.get_mailbox(name)
except KeyError as exc:
raise MailboxNotFound(name) from exc
snapshot = await mbx.snapshot()
return snapshot, await self._load_updates(selected, mbx)
[docs]
async def create_mailbox(self, name: str,
selected: SelectedMailbox | None = None) \
-> tuple[ObjectId, SelectedMailbox | None]:
try:
mailbox_id = await self.mailbox_set.add_mailbox(name)
except ValueError as exc:
raise MailboxConflict(name) from exc
return mailbox_id, await self._load_updates(selected, None)
[docs]
async def delete_mailbox(self, name: str,
selected: SelectedMailbox | None = None) \
-> SelectedMailbox | None:
try:
await self.mailbox_set.delete_mailbox(name)
except KeyError as exc:
raise MailboxNotFound(name) from exc
return await self._load_updates(selected, None)
[docs]
async def rename_mailbox(self, before_name: str, after_name: str,
selected: SelectedMailbox | None = None) \
-> SelectedMailbox | None:
try:
await self.mailbox_set.rename_mailbox(before_name, after_name)
except KeyError as exc:
raise MailboxNotFound(before_name) from exc
except ValueError as exc:
raise MailboxConflict(after_name) from exc
return await self._load_updates(selected, None)
[docs]
async def subscribe(self, name: str,
selected: SelectedMailbox | None = None) \
-> SelectedMailbox | None:
await self.mailbox_set.set_subscribed(name, True)
return await self._load_updates(selected, None)
[docs]
async def unsubscribe(self, name: str,
selected: SelectedMailbox | None = None) \
-> SelectedMailbox | None:
await self.mailbox_set.set_subscribed(name, False)
return await self._load_updates(selected, None)
[docs]
async def append_messages(self, name: str,
messages: Sequence[AppendMessage],
selected: SelectedMailbox | None = None) \
-> tuple[AppendUid, SelectedMailbox | None]:
mbx = await self._get_mailbox(name, try_create=True)
if mbx.readonly:
raise MailboxReadOnly(name)
dest_selected = self._pick_selected(selected, mbx)
uids: list[int] = []
for append_msg in messages:
msg = await mbx.append(append_msg, recent=not dest_selected)
if dest_selected:
dest_selected.session_flags.add_recent(msg.uid)
uids.append(msg.uid)
return (AppendUid(mbx.uid_validity, uids),
await self._load_updates(selected, mbx))
[docs]
async def select_mailbox(self, name: str, readonly: bool = False) \
-> tuple[MailboxSnapshot, SelectedMailbox]:
mbx = await self._get_mailbox(name)
selected = SelectedMailbox(mbx.mailbox_id, readonly or mbx.readonly,
PermanentFlags(mbx.permanent_flags),
SessionFlags(mbx.session_flags),
selected_set=mbx.selected_set,
lookup=name)
if not selected.readonly:
await mbx.claim_recent(selected)
snapshot = await mbx.snapshot()
return snapshot, await mbx.update_selected(selected)
[docs]
async def check_mailbox(self, selected: SelectedMailbox, *,
wait_on: Event | None = None,
housekeeping: bool = False) -> SelectedMailbox:
mbx = await self._get_selected(selected)
if housekeeping:
await shield(mbx.cleanup())
return await mbx.update_selected(selected, wait_on=wait_on)
[docs]
async def fetch_messages(self, selected: SelectedMailbox,
sequence_set: SequenceSet, set_seen: bool) \
-> tuple[Iterable[tuple[int, MessageT]],
SelectedMailbox]:
mbx = await self._get_selected(selected)
ret: list[tuple[int, MessageT]] = []
for seq, cached_msg in selected.messages.get_all(sequence_set):
if set_seen:
msg = await mbx.update(cached_msg.uid, cached_msg,
frozenset({Seen}), FlagOp.ADD)
else:
msg = await mbx.get(cached_msg.uid, cached_msg)
if msg is not None:
ret.append((seq, msg))
return ret, await mbx.update_selected(selected)
[docs]
async def search_mailbox(self, selected: SelectedMailbox,
keys: frozenset[SearchKey]) \
-> tuple[Iterable[tuple[int, MessageT]],
SelectedMailbox]:
mbx = await self._get_selected(selected)
req = FetchRequirement.reduce(key.requirement for key in keys)
ret: list[tuple[int, MessageT]] = []
params = SearchParams(selected,
disabled=self.config.disable_search_keys)
search = SearchCriteriaSet(keys, params)
async for seq, msg in mbx.find(search.sequence_set, selected):
msg_content = await msg.load_content(req)
if search.matches(seq, msg, msg_content):
ret.append((seq, msg))
return ret, await mbx.update_selected(selected)
[docs]
async def expunge_mailbox(self, selected: SelectedMailbox,
uid_set: SequenceSet | None = None) \
-> SelectedMailbox:
if selected.readonly:
raise MailboxReadOnly()
mbx = await self._get_selected(selected)
if uid_set is None:
uid_set = SequenceSet.all(uid=True)
expunge_uids = await mbx.find_deleted(uid_set, selected)
await mbx.delete(expunge_uids)
return await mbx.update_selected(selected)
[docs]
async def copy_messages(self, selected: SelectedMailbox,
sequence_set: SequenceSet,
mailbox: str) \
-> tuple[CopyUid | None, SelectedMailbox]:
mbx = await self._get_selected(selected)
dest = await self._get_mailbox(mailbox, try_create=True)
if dest.readonly:
raise MailboxReadOnly(mailbox)
dest_selected = self._pick_selected(selected, dest)
uids: list[tuple[int, int]] = []
for _, source_uid in selected.messages.get_uids(sequence_set):
dest_uid = await mbx.copy(source_uid, dest,
recent=not dest_selected)
if dest_uid is not None:
if dest_selected:
dest_selected.session_flags.add_recent(dest_uid)
uids.append((source_uid, dest_uid))
if not uids:
copy_uid: CopyUid | None = None
else:
copy_uid = CopyUid(dest.uid_validity, uids)
return (copy_uid, await mbx.update_selected(selected))
[docs]
async def move_messages(self, selected: SelectedMailbox,
sequence_set: SequenceSet,
mailbox: str) \
-> tuple[CopyUid | None, SelectedMailbox]:
mbx = await self._get_selected(selected)
dest = await self._get_mailbox(mailbox, try_create=True)
if dest.readonly:
raise MailboxReadOnly(mailbox)
dest_selected = self._pick_selected(selected, dest)
uids: list[tuple[int, int]] = []
for _, source_uid in selected.messages.get_uids(sequence_set):
dest_uid = await mbx.move(source_uid, dest,
recent=not dest_selected)
if dest_uid is not None:
if dest_selected:
dest_selected.session_flags.add_recent(dest_uid)
uids.append((source_uid, dest_uid))
if not uids:
copy_uid: CopyUid | None = None
else:
copy_uid = CopyUid(dest.uid_validity, uids)
return (copy_uid, await mbx.update_selected(selected))
[docs]
async def update_flags(self, selected: SelectedMailbox,
sequence_set: SequenceSet,
flag_set: frozenset[Flag],
mode: FlagOp = FlagOp.REPLACE) \
-> tuple[Iterable[tuple[int, MessageT]],
SelectedMailbox]:
if selected.readonly:
raise MailboxReadOnly()
mbx = await self._get_selected(selected)
permanent_flags = selected.permanent_flags & flag_set
messages: list[tuple[int, MessageT]] = []
for seq, cached_msg in selected.messages.get_all(sequence_set):
uid = cached_msg.uid
msg = await mbx.update(uid, cached_msg, permanent_flags, mode)
if not msg.expunged:
selected.session_flags.update(uid, flag_set, mode)
messages.append((seq, msg))
return messages, await mbx.update_selected(selected)