from __future__ import annotations
import re
from collections.abc import Iterable, Sequence
from typing import ClassVar
from . import CommandSelect, CommandNoArgs
from .. import Params, Space, EndLine
from ..exceptions import NotParseable
from ..primitives import Atom, List
from ..specials import AString, Mailbox, SequenceSet, Flag, FetchAttribute, \
SearchKey, ExtensionOptions
from ...flags import FlagOp
__all__ = ['CheckCommand', 'CloseCommand', 'ExpungeCommand', 'CopyCommand',
'MoveCommand', 'FetchCommand', 'StoreCommand', 'SearchCommand',
'UidCommand', 'UidCopyCommand', 'UidMoveCommand',
'UidExpungeCommand', 'UidFetchCommand', 'UidSearchCommand',
'UidStoreCommand', 'IdleCommand']
[docs]
class CheckCommand(CommandNoArgs, CommandSelect):
"""The ``CHECK`` command initiates an implementation-specific backend
synchronization for the selected mailbox.
See Also:
`RFC 3501 6.4.1. <https://tools.ietf.org/html/rfc3501#section-6.4.1>`_
"""
command = b'CHECK'
[docs]
class CloseCommand(CommandNoArgs, CommandSelect):
"""The ``CLOSE`` command closes a selected mailbox.
See Also:
`RFC 3501 6.4.2. <https://tools.ietf.org/html/rfc3501#section-6.4.2>`_
"""
command = b'CLOSE'
[docs]
class ExpungeCommand(CommandSelect):
"""The ``EXPUNGE`` command permanently erases all messages in the selected
mailbox that contain the ``\\Deleted`` flag.
See Also:
`RFC 3501 6.4.3 <https://tools.ietf.org/html/rfc3501#section-6.4.3>`_
`RFC 4315 2.1 <https://tools.ietf.org/html/rfc4315#section-2.1>`_
Args:
tag: The command tag.
uid_set: Only the messages in the given UID set should be expunged.
"""
command = b'EXPUNGE'
uid: ClassVar[bool] = False
def __init__(self, tag: bytes, uid_set: SequenceSet | None = None) -> None:
super().__init__(tag)
self.uid_set = uid_set
[docs]
@classmethod
def parse(cls, buf: memoryview, params: Params) \
-> tuple[ExpungeCommand, memoryview]:
uid_set: SequenceSet | None = None
if params.uid:
_, buf = Space.parse(buf, params)
uid_set, buf = SequenceSet.parse(buf, params)
_, buf = EndLine.parse(buf, params)
return cls(params.tag, uid_set), buf
[docs]
class CopyCommand(CommandSelect):
"""The ``COPY`` command copies messages from the selected mailbox to the
end of the destination mailbox.
See Also:
`RFC 3501 6.4.7. <https://tools.ietf.org/html/rfc3501#section-6.4.7>`_
Args:
tag: The command tag.
seq_set: The sequence set of the messages to copy.
mailbox: The destination mailbox.
"""
command = b'COPY'
uid: ClassVar[bool] = False
def __init__(self, tag: bytes, seq_set: SequenceSet,
mailbox: Mailbox) -> None:
super().__init__(tag)
self.sequence_set = seq_set
self.mailbox_obj = mailbox
@property
def mailbox(self) -> str:
return str(self.mailbox_obj)
[docs]
@classmethod
def parse(cls, buf: memoryview, params: Params) \
-> tuple[CopyCommand, memoryview]:
_, buf = Space.parse(buf, params)
seq_set, buf = SequenceSet.parse(buf, params)
_, buf = Space.parse(buf, params)
mailbox, buf = Mailbox.parse(buf, params)
_, buf = EndLine.parse(buf, params)
return cls(params.tag, seq_set, mailbox), buf
[docs]
class MoveCommand(CommandSelect):
"""The ``MOVE`` command moves messages from the selected mailbox to the
end of the destination mailbox.
See Also:
`RFC 6851 <https://tools.ietf.org/html/rfc6851>`_
Args:
tag: The command tag.
seq_set: The sequence set of the messages to move.
mailbox: The destination mailbox.
"""
command = b'MOVE'
uid: ClassVar[bool] = False
def __init__(self, tag: bytes, seq_set: SequenceSet,
mailbox: Mailbox) -> None:
super().__init__(tag)
self.sequence_set = seq_set
self.mailbox_obj = mailbox
@property
def mailbox(self) -> str:
return str(self.mailbox_obj)
[docs]
@classmethod
def parse(cls, buf: memoryview, params: Params) \
-> tuple[MoveCommand, memoryview]:
_, buf = Space.parse(buf, params)
seq_set, buf = SequenceSet.parse(buf, params)
_, buf = Space.parse(buf, params)
mailbox, buf = Mailbox.parse(buf, params)
_, buf = EndLine.parse(buf, params)
return cls(params.tag, seq_set, mailbox), buf
[docs]
class FetchCommand(CommandSelect):
"""The ``FETCH`` command fetches message data from the selected mailbox.
What data is fetched can be controlled in depth by a set of fetch
attributes given in the command.
See Also:
`RFC 3501 6.4.5. <https://tools.ietf.org/html/rfc3501#section-6.4.5>`_
Args:
tag: The command tag.
seq_set: The sequence set of the messages to fetch.
attr_list: The message attributes to fetch.
"""
command = b'FETCH'
uid: ClassVar[bool] = False
def __init__(self, tag: bytes, seq_set: SequenceSet,
attr_list: Sequence[FetchAttribute],
options: ExtensionOptions | None = None) -> None:
super().__init__(tag)
self.sequence_set = seq_set
self.attributes = attr_list
self.options = options or ExtensionOptions.empty()
@classmethod
def _check_macros(cls, buf: memoryview, params: Params) \
-> tuple[Sequence[FetchAttribute], memoryview]:
atom, after = Atom.parse(buf, params)
macro = atom.value.upper()
if macro == b'ALL':
attrs = [FetchAttribute(b'FLAGS'),
FetchAttribute(b'INTERNALDATE'),
FetchAttribute(b'RFC822.SIZE'),
FetchAttribute(b'ENVELOPE')]
return attrs, after
elif macro == b'FULL':
attrs = [FetchAttribute(b'FLAGS'),
FetchAttribute(b'INTERNALDATE'),
FetchAttribute(b'RFC822.SIZE'),
FetchAttribute(b'ENVELOPE'),
FetchAttribute(b'BODY')]
return attrs, after
elif macro == b'FAST':
attrs = [FetchAttribute(b'FLAGS'),
FetchAttribute(b'INTERNALDATE'),
FetchAttribute(b'RFC822.SIZE')]
return attrs, after
raise NotParseable(buf)
[docs]
@classmethod
def parse(cls, buf: memoryview, params: Params) \
-> tuple[FetchCommand, memoryview]:
_, buf = Space.parse(buf, params)
seq_set, buf = SequenceSet.parse(buf, params)
_, buf = Space.parse(buf, params)
attr_list: Sequence[FetchAttribute] = []
try:
attr_list, buf = cls._check_macros(buf, params)
except NotParseable:
pass
try:
attr, buf = FetchAttribute.parse(buf, params)
attr_list = [attr]
except NotParseable:
pass
if not attr_list:
params_copy = params.copy(expected=[FetchAttribute])
attr_list_p, buf = List.parse(buf, params_copy)
attr_list = attr_list_p.get_as(FetchAttribute)
if params.uid:
attr_list = list(attr_list) + [FetchAttribute(b'UID')]
options, buf = ExtensionOptions.parse(buf, params)
_, buf = EndLine.parse(buf, params)
return cls(params.tag, seq_set, attr_list, options), buf
[docs]
class StoreCommand(CommandSelect):
"""The ``STORE`` command updates the flags of a set of messages in the
selected mailbox.
See Also:
`RFC 3501 6.4.6. <https://tools.ietf.org/html/rfc3501#section-6.4.6>`_
Args:
tag: The command tag.
seq_set: The sequence set of the messages to fetch.
flags: The flag set operand.
mode: The type of update operation.
"""
command = b'STORE'
uid: ClassVar[bool] = False
_info_pattern = re.compile(br'^([+-]?)FLAGS(\.SILENT)?$', re.I)
_modes = {b'': FlagOp.REPLACE, b'+': FlagOp.ADD, b'-': FlagOp.DELETE}
def __init__(self, tag: bytes, seq_set: SequenceSet,
flags: Iterable[Flag], mode: FlagOp,
silent: bool,
options: ExtensionOptions | None = None) -> None:
super().__init__(tag)
self.sequence_set = seq_set
self.flag_set = frozenset(flags)
self.mode = mode
self.silent = silent
self.options = options or ExtensionOptions.empty()
@classmethod
def _parse_store_info(cls, buf: memoryview, params: Params) \
-> tuple[FlagOp, bool, memoryview]:
info, after = Atom.parse(buf, params)
match = cls._info_pattern.match(info.value)
if not match:
raise NotParseable(buf)
mode = cls._modes[match.group(1)]
silent = bool(match.group(2))
return mode, silent, after
@classmethod
def _parse_flag_list(cls, buf: memoryview, params: Params) \
-> tuple[Sequence[Flag], memoryview]:
try:
params_copy = params.copy(expected=[Flag])
flag_list_p, buf = List.parse(buf, params_copy)
except NotParseable:
pass
else:
return flag_list_p.get_as(Flag), buf
flag_list: list[Flag] = []
while True:
try:
flag, buf = Flag.parse(buf, params)
flag_list.append(flag)
_, buf = Space.parse(buf, params)
except NotParseable:
return flag_list, buf
[docs]
@classmethod
def parse(cls, buf: memoryview, params: Params) \
-> tuple[StoreCommand, memoryview]:
_, buf = Space.parse(buf, params)
seq_set, buf = SequenceSet.parse(buf, params)
options, buf = ExtensionOptions.parse(buf, params)
_, buf = Space.parse(buf, params)
mode, silent, buf = cls._parse_store_info(buf, params)
_, buf = Space.parse(buf, params)
flag_list, buf = cls._parse_flag_list(buf, params)
_, buf = EndLine.parse(buf, params)
return cls(params.tag, seq_set, flag_list, mode, silent, options), buf
[docs]
class SearchCommand(CommandSelect):
"""The ``SEARCH`` command searches the messages in the selected mailbox
based on a set of search criteria.
See Also:
`RFC 3501 6.4.4. <https://tools.ietf.org/html/rfc3501#section-6.4.4>`_
Args:
tag: The command tag.
keys: The search keys.
charset: The charset in use by the search keys.
"""
command = b'SEARCH'
uid: ClassVar[bool] = False
def __init__(self, tag: bytes, keys: Iterable[SearchKey],
charset: str | None,
options: ExtensionOptions | None = None) -> None:
super().__init__(tag)
self.keys = frozenset(keys)
self.charset = charset
self.options = options or ExtensionOptions.empty()
@classmethod
def _parse_charset(cls, buf: memoryview, params: Params) \
-> tuple[str | None, memoryview]:
try:
_, after = Space.parse(buf, params)
atom, after = Atom.parse(after, params)
except NotParseable:
pass
else:
if atom.value.upper() == b'CHARSET':
_, after = Space.parse(after, params)
string, after = AString.parse(after, params)
charset = str(string.value, 'ascii')
try:
b' '.decode(charset)
except LookupError as exc:
raise NotParseable(buf, b'BADCHARSET') from exc
return charset, after
return None, buf
@classmethod
def _parse_options(cls, buf: memoryview, params: Params) \
-> tuple[ExtensionOptions, memoryview]:
start = cls._whitespace_length(buf)
if buf[start:start + 6] == b'RETURN':
return ExtensionOptions.parse(buf[start + 6:], params)
else:
options, _ = ExtensionOptions.parse(memoryview(b''), params)
return options, buf
[docs]
@classmethod
def parse(cls, buf: memoryview, params: Params) \
-> tuple[SearchCommand, memoryview]:
options, buf = cls._parse_options(buf, params)
charset, buf = cls._parse_charset(buf, params)
search_keys = []
while True:
try:
_, buf = Space.parse(buf, params)
key, buf = SearchKey.parse(buf, params.copy(charset=charset))
search_keys.append(key)
except NotParseable:
if not search_keys:
raise
break
_, buf = EndLine.parse(buf, params)
return cls(params.tag, search_keys, charset, options), buf
[docs]
class UidCommand(CommandSelect):
"""The ``UID`` command precedes one of the ``COPY``, ``EXPUNGE``,
``FETCH``, ``SEARCH``, or ``STORE`` commands and indicates that the
command interacts with message UIDs instead of sequence numbers. Refer
to the RFC section for a complete description.
See Also:
`RFC 3501 6.4.8 <https://tools.ietf.org/html/rfc3501#section-6.4.8>`_
`RFC 4315 2.1 <https://tools.ietf.org/html/rfc4315#section-2.1>`_
"""
command = b'UID'
compound = True
[docs]
@classmethod
def parse(cls, buf: memoryview, params: Params) \
-> tuple[UidCommand, memoryview]:
raise TypeError(cls) # never parsed directly
[docs]
class UidCopyCommand(CopyCommand):
"""The ``UID COPY`` variant of the ``COPY`` command, which uses message
UIDs instead of sequence numbers.
"""
command = b'UID COPY'
delegate = CopyCommand
uid = True
[docs]
@classmethod
def parse(cls, buf: memoryview, params: Params) \
-> tuple[UidCopyCommand, memoryview]:
ret, buf = super().parse(buf, params.copy(uid=True))
if not isinstance(ret, UidCopyCommand):
raise TypeError(ret)
return ret, buf
[docs]
class UidMoveCommand(MoveCommand):
"""The ``UID MOVE`` variant of the ``MOVE`` command, which uses message
UIDs instead of sequence numbers.
"""
command = b'UID MOVE'
delegate = MoveCommand
uid = True
[docs]
@classmethod
def parse(cls, buf: memoryview, params: Params) \
-> tuple[UidMoveCommand, memoryview]:
ret, buf = super().parse(buf, params.copy(uid=True))
if not isinstance(ret, UidMoveCommand):
raise TypeError(ret)
return ret, buf
[docs]
class UidExpungeCommand(ExpungeCommand):
"""The ``UID EXPUNGE`` variant of the ``EXPUNGE`` command, which uses
message UIDs instead of sequence numbers.
"""
command = b'UID EXPUNGE'
delegate = ExpungeCommand
uid = True
[docs]
@classmethod
def parse(cls, buf: memoryview, params: Params) \
-> tuple[UidExpungeCommand, memoryview]:
ret, buf = super().parse(buf, params.copy(uid=True))
if not isinstance(ret, UidExpungeCommand):
raise TypeError(ret)
return ret, buf
[docs]
class UidFetchCommand(FetchCommand):
"""The ``UID FETCH`` variant of the ``FETCH`` command, which uses message
UIDs instead of sequence numbers.
"""
command = b'UID FETCH'
delegate = FetchCommand
uid = True
[docs]
@classmethod
def parse(cls, buf: memoryview, params: Params) \
-> tuple[UidFetchCommand, memoryview]:
ret, buf = super().parse(buf, params.copy(uid=True))
if not isinstance(ret, UidFetchCommand):
raise TypeError(ret)
return ret, buf
[docs]
class UidSearchCommand(SearchCommand):
"""The ``UID SEARCH`` variant of the ``SEARCH`` command, which uses message
UIDs instead of sequence numbers.
"""
command = b'UID SEARCH'
delegate = SearchCommand
uid = True
[docs]
@classmethod
def parse(cls, buf: memoryview, params: Params) \
-> tuple[UidSearchCommand, memoryview]:
ret, buf = super().parse(buf, params.copy(uid=True))
if not isinstance(ret, UidSearchCommand):
raise TypeError(ret)
return ret, buf
[docs]
class UidStoreCommand(StoreCommand):
"""The ``UID STORE`` variant of the ``STORE`` command, which uses message
UIDs instead of sequence numbers.
"""
command = b'UID STORE'
delegate = StoreCommand
uid = True
[docs]
@classmethod
def parse(cls, buf: memoryview, params: Params) \
-> tuple[UidStoreCommand, memoryview]:
ret, buf = super().parse(buf, params.copy(uid=True))
if not isinstance(ret, UidStoreCommand):
raise TypeError(ret)
return ret, buf
[docs]
class IdleCommand(CommandSelect):
"""The ``IDLE`` command waits for the continuation string ``DONE`` from the
client. During this wait, the server may be sending untagged responses
indicating concurrent updates to the mailbox.
Parsing this command is a special case. The continuation string ``DONE`` is
actually parsed by the :meth:`.parse_done` method. The :meth:`.parse`
only parses the ``IDLE <CRLF>`` portion, and does not raise a
:exc:`~pymap.exceptions.RequiresContinuation` exception.
See Also:
`RFC 2177 <https://tools.ietf.org/html/rfc2177>`_
"""
command = b'IDLE'
#: The string used to end the command.
continuation = b'DONE'
_pattern = re.compile(br'^(.*?)\r?\n')
[docs]
@classmethod
def parse(cls, buf: memoryview, params: Params) \
-> tuple[IdleCommand, memoryview]:
_, buf = EndLine.parse(buf, params)
return cls(params.tag), buf
[docs]
def parse_done(self, buf: memoryview) -> tuple[bool, memoryview]:
"""Parse the continuation line sent by the client to end the ``IDLE``
command.
Args:
buf: The continuation line to parse.
"""
match = self._pattern.match(buf)
if not match:
raise NotParseable(buf)
done = match.group(1).upper() == self.continuation
buf = buf[match.end(0):]
return done, buf