from __future__ import annotations
import enum
import re
from collections.abc import Iterable, Sequence
from dataclasses import dataclass
from functools import total_ordering, reduce
from typing import Final, Any
from . import AString
from .. import Params, Parseable
from ..exceptions import NotParseable
from ..primitives import Atom, List
from ...bytes import BytesFormat, MaybeBytes, Writeable
__all__ = ['FetchPartial', 'FetchRequirement', 'FetchAttribute', 'FetchValue']
[docs]
@dataclass(frozen=True)
class FetchPartial:
"""Used to indicate that only a substring of the desired fetch is being
requested.
Args:
start: The first octet of the requested substring.
length: The maximum length of the requested substring, or ``None``.
"""
start: int
length: int | None = None
[docs]
class FetchRequirement(enum.Flag):
"""Indicates the data required to fulfill a message fetch.
Attributes:
NONE: No data is required.
METADATA: The IMAP metadata is required.
HEADER: The message header is required.
BODY: The parsed MIME message body is required.
"""
NONE = 0
METADATA = enum.auto()
HEADER = enum.auto()
BODY = enum.auto()
CONTENT = HEADER | BODY
[docs]
def has_all(self, expected: FetchRequirement) -> bool:
"""Returns true if all of the expected fetch requirements are met by
this fetch requirement.
Args:
expected: The expected fetch requirements.
"""
return self & expected == expected
[docs]
def has_none(self, expected: FetchRequirement) -> bool:
"""Returns true if none of the expected fetch requirements are met by
this fetch requirement.
Args:
expected: The expected fetch requirements.
"""
return self & expected == FetchRequirement.NONE
[docs]
@classmethod
def get_all(cls) -> FetchRequirement:
"""Return all possible fetch requirements reduced into a single
requirement.
"""
return cls.reduce(FetchRequirement)
[docs]
@classmethod
def reduce(cls, requirements: Iterable[FetchRequirement]) \
-> FetchRequirement:
"""Reduce a set of fetch requirements into a single requirement.
Args:
requirements: The set of fetch requirements.
"""
return reduce(lambda x, y: x | y, requirements, cls.NONE)
[docs]
@total_ordering
class FetchAttribute(Parseable[bytes]):
"""Represents an attribute that should be fetched for each message in the
sequence set of a FETCH command on an IMAP stream.
Args:
attribute: The attribute name.
section: The attribute section.
partial: The attribute partial range.
Attributes:
section: The attribute section.
partial: The attribute partial range.
"""
[docs]
class Section:
"""Represents a fetch attribute section, which typically comes after
the attribute name within square brackets, e.g. ``BODY[1.TEXT]``.
Args:
parts: The nested MIME part identifiers.
specifier: The MIME part specifier.
headers: The MIME part specifier headers.
"""
def __init__(self, parts: Sequence[int],
specifier: bytes | None = None,
headers: frozenset[bytes] | None = None) -> None:
self.parts = parts
self.specifier = specifier
self.headers = frozenset(hdr.upper() for hdr in headers) \
if headers else None
def __hash__(self) -> int:
return hash((tuple(self.parts), self.specifier, self.headers))
_attrname_pattern = re.compile(br' *([^\s\[<()]+)')
_section_start_pattern = re.compile(br' *\[ *')
_section_end_pattern = re.compile(br' *\]')
_partial_pattern = re.compile(br'< *(\d+) *\. *(\d+) *>')
_sec_part_pattern = re.compile(br'([1-9]\d* *(?:\. *[1-9]\d*)*) *(\.)? *')
def __init__(self, attribute: bytes,
section: FetchAttribute.Section | None = None,
partial: FetchPartial | None = None) -> None:
super().__init__()
self.attribute = attribute.upper()
self.section = section
self.partial = partial
self._raw: bytes | None = None
self._for_response: FetchAttribute | None = None
@property
def value(self) -> bytes:
"""The attribute name."""
return self.attribute
@property
def for_response(self) -> FetchAttribute:
if self._for_response is None:
if self.partial is None or self.partial.length is None:
self._for_response = self
else:
new_partial = FetchPartial(self.partial.start, None)
self._for_response = FetchAttribute(
self.value, self.section, new_partial)
return self._for_response
@property
def set_seen(self) -> bool:
if self.value == b'BODY' and self.section:
return True
elif self.value == b'BINARY':
return True
elif self.value in (b'RFC822', b'RFC822.TEXT'):
return True
return False
@property
def requirement(self) -> FetchRequirement:
"""Indicates the data required to fulfill this fetch attribute."""
attr_name = self.attribute
if attr_name in (b'UID', b'FLAGS', b'INTERNALDATE'):
return FetchRequirement.METADATA
elif attr_name in (b'ENVELOPE', b'RFC822.HEADER'):
return FetchRequirement.HEADER
elif attr_name == b'BODY' and self.section is not None:
return self._get_body_requirement(self.section)
return FetchRequirement.CONTENT
@classmethod
def _get_body_requirement(cls, section: FetchAttribute.Section) \
-> FetchRequirement:
if not section.parts and section.specifier in (
b'HEADER', b'HEADER.FIELDS', b'HEADER.FIELDS.NOT'):
return FetchRequirement.HEADER
else:
return FetchRequirement.CONTENT
@property
def raw(self) -> bytes:
if self._raw is not None:
return self._raw
if self.value == b'BODY.PEEK':
parts = [b'BODY']
else:
parts = [self.value]
if self.section and not self.value.startswith(b'RFC822'):
parts.append(b'[')
if self.section.parts:
part_raw = BytesFormat(b'.').join(
[b'%i' % num for num in self.section.parts])
parts.append(part_raw)
if self.section.specifier:
parts.append(b'.')
if self.section.specifier:
parts.append(self.section.specifier)
if self.section.headers:
headers = self.section.headers
parts.append(b' ')
parts.append(bytes(List(headers, sort=True)))
parts.append(b']')
if self.partial:
start, length = (self.partial.start, self.partial.length)
if length is None:
parts.append(b'<%i>' % start)
else:
parts.append(b'<%i.%i>' % (start, length))
self._raw = raw = b''.join(parts)
return raw
def __hash__(self) -> int:
return hash((self.value, self.section, self.partial))
def __eq__(self, other: Any) -> bool:
if isinstance(other, FetchAttribute):
return hash(self) == hash(other)
return super().__eq__(other)
def __ne__(self, other: Any) -> bool:
if isinstance(other, FetchAttribute):
return hash(self) != hash(other)
return super().__ne__(other)
def __lt__(self, other: Any) -> bool:
if not isinstance(other, FetchAttribute):
return NotImplemented
return bytes(self.for_response) < bytes(self.for_response)
@classmethod
def _parse_section(cls, buf: memoryview, params: Params) \
-> tuple[Section, memoryview]:
match = cls._sec_part_pattern.match(buf)
if match:
section_parts = [int(num) for num in match.group(1).split(b'.')]
buf = buf[match.end(0):]
else:
section_parts = []
try:
atom, after = Atom.parse(buf, params)
except NotParseable:
return cls.Section(section_parts), buf
specifier = atom.value.upper()
if section_parts and specifier == b'MIME':
return cls.Section(section_parts, specifier), after
elif specifier in (b'HEADER', b'TEXT'):
return cls.Section(section_parts, specifier), after
elif specifier in (b'HEADER.FIELDS', b'HEADER.FIELDS.NOT'):
params = params.copy(expected=[AString])
header_list_p, buf = List.parse(after, params)
header_list = frozenset([bytes(hdr)
for hdr in header_list_p.value])
if not header_list:
raise NotParseable(after)
return cls.Section(section_parts, specifier, header_list), buf
raise NotParseable(buf)
[docs]
@classmethod
def parse(cls, buf: memoryview, params: Params) \
-> tuple[FetchAttribute, memoryview]:
match = cls._attrname_pattern.match(buf)
if not match:
raise NotParseable(buf)
attr = match.group(1).upper()
after = buf[match.end(0):]
if attr in (b'ENVELOPE', b'FLAGS', b'INTERNALDATE', b'UID',
b'RFC822.SIZE', b'BODYSTRUCTURE', b'EMAILID',
b'THREADID'):
return cls(attr), after
elif attr == b'RFC822':
section = cls.Section([])
return cls(attr, section), after
elif attr == b'RFC822.HEADER':
section = cls.Section([], b'HEADER')
return cls(attr, section), after
elif attr == b'RFC822.TEXT':
section = cls.Section([], b'TEXT')
return cls(attr, section), after
elif attr not in (b'BODY', b'BODY.PEEK',
b'BINARY', b'BINARY.PEEK', b'BINARY.SIZE'):
raise NotParseable(buf)
buf = after
match = cls._section_start_pattern.match(buf)
if not match:
if attr == b'BODY':
return cls(attr), buf
else:
raise NotParseable(buf)
buf = buf[match.end(0):]
section, buf_s = cls._parse_section(buf, params)
if section.specifier and attr.startswith(b'BINARY'):
raise NotParseable(buf)
match = cls._section_end_pattern.match(buf_s)
if not match:
raise NotParseable(buf_s)
buf = buf_s[match.end(0):]
match = cls._partial_pattern.match(buf)
if match:
if attr == b'BINARY.SIZE':
raise NotParseable(buf)
start, length = int(match.group(1)), int(match.group(2))
if start < 0 or length <= 0:
raise NotParseable(buf)
partial = FetchPartial(start, length)
return cls(attr, section, partial), buf[match.end(0):]
return cls(attr, section), buf
def __bytes__(self) -> bytes:
return self.raw
[docs]
class FetchValue(Writeable):
"""A value fetched from a message for a single fetch attribute.
Args:
attribute: The fetch attribute.
"""
__slots__ = ['attribute']
def __init__(self, attribute: FetchAttribute) -> None:
super().__init__()
self.attribute: Final = attribute
@classmethod
def of(cls, attribute: FetchAttribute, value: MaybeBytes) \
-> FetchValue:
return _StaticFetchValue(attribute, value)
class _StaticFetchValue(FetchValue):
__slots__ = ['_value']
def __init__(self, attribute: FetchAttribute, value: MaybeBytes) -> None:
super().__init__(attribute)
self._value: Final = value
def __bytes__(self) -> bytes:
attr = self.attribute.for_response
return BytesFormat(b'%b %b') % (attr, self._value)