from __future__ import annotations
from abc import abstractmethod, ABCMeta
from collections.abc import Iterator, Mapping, Sequence, AsyncIterator
from contextlib import contextmanager, asynccontextmanager
from typing import ClassVar, Final, Protocol, Any
from .bytes import BytesFormat, MaybeBytes, Writeable
from .interfaces.message import MessageInterface, LoadedMessageInterface
from .parsing.primitives import Nil, Number, List, LiteralString
from .parsing.specials import DateTime
from .parsing.specials.fetchattr import FetchPartial, FetchRequirement, \
FetchAttribute, FetchValue
from .selected import SelectedMailbox
__all__ = ['LoadedMessageProvider', 'DynamicFetchValue',
'DynamicLoadedFetchValue', 'MessageAttributes']
[docs]
class LoadedMessageProvider(Protocol):
"""Generic protocol that provides access to a message's loaded contents
when they are available.
"""
@property
@abstractmethod
def loaded_msg(self) -> LoadedMessageInterface | None:
"""The loaded message, if available."""
...
[docs]
class DynamicFetchValue(FetchValue, metaclass=ABCMeta):
"""Base class for fetch values that are dynamically read from a message.
Args:
attribute: The fetch attribute.
message: The message object.
selected: The selected mailbox.
"""
__slots__ = ['message', 'selected']
def __init__(self, attribute: FetchAttribute, *,
message: MessageInterface, selected: SelectedMailbox) -> None:
super().__init__(attribute)
self.message: Final = message
self.selected: Final = selected
[docs]
@abstractmethod
def get_value(self) -> MaybeBytes:
"""Computes the value of the fetch attribute for the message."""
...
def __bytes__(self) -> bytes:
return BytesFormat(b'%b %b') % (
self.attribute.for_response, self.get_value())
[docs]
class DynamicLoadedFetchValue(FetchValue, metaclass=ABCMeta):
"""Base class for fetch values that are dynamically read from a message and
require loading its contents.
Args:
attribute: The fetch attribute.
message: The message object.
selected: The selected mailbox.
get_loaded: The provider of the loaded message contents, if available.
"""
__slots__ = ['message', '_get_loaded']
def __init__(self, attribute: FetchAttribute, *,
message: MessageInterface,
get_loaded: LoadedMessageProvider) -> None:
super().__init__(attribute)
self.message: Final = message
self._get_loaded = get_loaded
[docs]
@abstractmethod
def get_value(self, loaded_msg: LoadedMessageInterface) -> MaybeBytes:
"""Computes the value of the fetch attribute for the message.
Args:
loaded_msg: The loaded message object.
"""
...
def __bytes__(self) -> bytes:
loaded_msg = self._get_loaded.loaded_msg
if loaded_msg is None:
value: MaybeBytes = MessageAttributes.placeholder
else:
value = self.get_value(loaded_msg)
return BytesFormat(b'%b %b') % (
self.attribute.for_response, value)
@classmethod
def _get_data(cls, section: FetchAttribute.Section | None,
partial: FetchPartial | None,
loaded_msg: LoadedMessageInterface, *,
binary: bool = False) -> Writeable:
if section is None:
raise RuntimeError() # Parsing should not allow this
specifier = section.specifier
parts = section.parts
headers = section.headers
if specifier is None:
data = loaded_msg.get_body(parts, binary)
elif specifier == b'MIME' and parts is not None:
data = loaded_msg.get_headers(parts)
elif specifier == b'TEXT':
data = loaded_msg.get_message_text(parts)
elif specifier == b'HEADER':
data = loaded_msg.get_message_headers(parts)
elif specifier == b'HEADER.FIELDS':
data = loaded_msg.get_message_headers(parts, headers)
elif specifier == b'HEADER.FIELDS.NOT':
data = loaded_msg.get_message_headers(parts, headers, True)
else:
raise ValueError(specifier) # Should not happen.
return cls._get_partial(data, partial)
@classmethod
def _get_partial(cls, data: Writeable,
partial: FetchPartial | None) -> Writeable:
if partial is None:
return data
full = bytes(data)
start, length = (partial.start, partial.length)
if length is None:
end = len(full)
else:
end = start + length
return Writeable.wrap(full[start:end])
class _UidFetchValue(DynamicFetchValue):
def get_value(self) -> MaybeBytes:
return Number(self.message.uid)
class _FlagsFetchValue(DynamicFetchValue):
def get_value(self) -> MaybeBytes:
session_flags = self.selected.session_flags
flag_set = self.message.get_flags(session_flags)
return List(flag_set, sort=True)
class _InternalDateFetchValue(DynamicFetchValue):
def get_value(self) -> MaybeBytes:
return DateTime(self.message.internal_date)
class _EmailIdFetchValue(DynamicFetchValue):
def get_value(self) -> MaybeBytes:
return self.message.email_id.parens
class _ThreadIdFetchValue(DynamicFetchValue):
def get_value(self) -> MaybeBytes:
try:
return self.message.thread_id.parens
except ValueError:
return Nil()
class _LoadedMessageProvider(LoadedMessageProvider):
__slots__ = ['loaded_msg']
def __init__(self) -> None:
super().__init__()
self.loaded_msg: LoadedMessageInterface | None = None
@contextmanager
def apply(self, loaded_msg: LoadedMessageInterface) -> Iterator[None]:
self.loaded_msg = loaded_msg
try:
yield
finally:
self.loaded_msg = None
class _EnvelopeFetchValue(DynamicLoadedFetchValue):
def get_value(self, loaded_msg: LoadedMessageInterface) -> MaybeBytes:
return loaded_msg.get_envelope_structure()
class _BodyStructureFetchValue(DynamicLoadedFetchValue):
def get_value(self, loaded_msg: LoadedMessageInterface) -> MaybeBytes:
return loaded_msg.get_body_structure().extended
class _BodyFetchValue(DynamicLoadedFetchValue):
def get_value(self, loaded_msg: LoadedMessageInterface) -> MaybeBytes:
attr = self.attribute
if attr.section is None:
return loaded_msg.get_body_structure()
else:
data = self._get_data(attr.section, attr.partial, loaded_msg)
return LiteralString(data)
class _BodyPeekFetchValue(_BodyFetchValue):
pass
class _RFC822FetchValue(_BodyFetchValue):
pass
class _RFC822HeaderFetchValue(_BodyFetchValue):
pass
class _RFC822TextFetchValue(_BodyFetchValue):
pass
class _RFC822SizeFetchValue(DynamicLoadedFetchValue):
def get_value(self, loaded_msg: LoadedMessageInterface) -> MaybeBytes:
return Number(loaded_msg.get_size())
class _BinaryFetchValue(DynamicLoadedFetchValue):
def get_value(self, loaded_msg: LoadedMessageInterface) -> MaybeBytes:
attr = self.attribute
data = self._get_data(attr.section, attr.partial, loaded_msg,
binary=True)
return LiteralString(data, True)
class _BinaryPeekFetchValue(_BinaryFetchValue):
pass
class _BinarySizeFetchValue(DynamicLoadedFetchValue):
def get_value(self, loaded_msg: LoadedMessageInterface) -> MaybeBytes:
attr = self.attribute
data = self._get_data(attr.section, attr.partial, loaded_msg,
binary=True)
return Number(len(data))
[docs]
class MessageAttributes(Sequence[FetchValue]):
"""Defines the logic for how fetch attributes are resolved on a message to
produce a fetch value.
Args:
message: The message object.
selected: The selected mailbox.
"""
#: Placeholder value for fetch values requiring loaded message contents.
placeholder: ClassVar[bytes] = b'...'
_simple_attrs: Mapping[bytes, type[DynamicFetchValue]] = {
b'UID': _UidFetchValue,
b'FLAGS': _FlagsFetchValue,
b'INTERNALDATE': _InternalDateFetchValue,
b'EMAILID': _EmailIdFetchValue,
b'THREADID': _ThreadIdFetchValue}
_loaded_attrs: Mapping[bytes, type[DynamicLoadedFetchValue]] = {
b'ENVELOPE': _EnvelopeFetchValue,
b'BODYSTRUCTURE': _BodyStructureFetchValue,
b'BODY': _BodyFetchValue,
b'BODY.PEEK': _BodyPeekFetchValue,
b'RFC822': _RFC822FetchValue,
b'RFC822.HEADER': _RFC822HeaderFetchValue,
b'RFC822.TEXT': _RFC822TextFetchValue,
b'RFC822.SIZE': _RFC822SizeFetchValue,
b'BINARY': _BinaryFetchValue,
b'BINARY.PEEK': _BinaryPeekFetchValue,
b'BINARY.SIZE': _BinarySizeFetchValue}
__slots__ = ['message', 'selected', 'attributes', 'requirement',
'_get_loaded', '_values']
def __init__(self, message: MessageInterface,
selected: SelectedMailbox,
attributes: Sequence[FetchAttribute]) -> None:
super().__init__()
self.message: Final = message
self.selected: Final = selected
self.attributes: Final = attributes
self.requirement: Final = FetchRequirement.reduce(
attr.requirement for attr in attributes)
self._get_loaded = _LoadedMessageProvider()
self._values: Sequence[FetchValue] | None = None
[docs]
@asynccontextmanager
async def load_hook(self) -> AsyncIterator[None]:
"""An async context manager that loads the message content on entry,
and releases it on exit. Fetch values that require loaded message
content will write :attr:`.placeholder` if written outside of this
context manager, for console or log output.
"""
loaded_msg = await self.message.load_content(self.requirement)
with self._get_loaded.apply(loaded_msg):
yield
def __iter__(self) -> Iterator[FetchValue]:
return iter(self._get_values())
def __getitem__(self, index: Any) -> Any:
values = self._get_values()
return values[index]
def __len__(self) -> int:
return len(self.attributes)
def _get_values(self) -> Sequence[FetchValue]:
if self._values is None:
self._values = [self._get(attr) for attr in self.attributes]
return self._values
def _get(self, attr: FetchAttribute) -> FetchValue:
attr_name = attr.value
simple = self._simple_attrs.get(attr_name)
if simple is not None:
return simple(attr, message=self.message, selected=self.selected)
loaded = self._loaded_attrs.get(attr_name)
if loaded is not None:
return loaded(attr, message=self.message,
get_loaded=self._get_loaded)
raise KeyError(attr_name)