Source code for pymap.parsing.commands


from __future__ import annotations

from collections.abc import Collection

from . import Space, Params
from .command import Command
from .command.any import CapabilityCommand, LogoutCommand, NoOpCommand, \
    IdCommand
from .command.auth import AppendCommand, CreateCommand, DeleteCommand, \
    ExamineCommand, ListCommand, LSubCommand, RenameCommand, SelectCommand, \
    StatusCommand, SubscribeCommand, UnsubscribeCommand
from .command.nonauth import AuthenticateCommand, LoginCommand, StartTLSCommand
from .command.select import CheckCommand, CloseCommand, ExpungeCommand, \
    CopyCommand, MoveCommand, FetchCommand, StoreCommand, SearchCommand, \
    UidCommand, UidCopyCommand, UidMoveCommand, UidExpungeCommand, \
    UidFetchCommand, UidSearchCommand, UidStoreCommand, IdleCommand
from .exceptions import NotParseable
from .primitives import Atom
from .specials import Tag

__all__ = ['builtin_commands', 'InvalidCommand', 'Commands']

#: List of built-in commands. These commands are automatically registered by a
#: new :class:`Commands` object.
builtin_commands: Collection[type[Command]] = [
    CapabilityCommand, LogoutCommand, NoOpCommand, IdCommand, AppendCommand,
    CreateCommand, DeleteCommand, ExamineCommand, ListCommand, LSubCommand,
    RenameCommand, SelectCommand, StatusCommand, SubscribeCommand,
    UnsubscribeCommand, AuthenticateCommand, LoginCommand, StartTLSCommand,
    CheckCommand, CloseCommand, ExpungeCommand, CopyCommand, MoveCommand,
    FetchCommand, StoreCommand, SearchCommand, UidCommand, UidCopyCommand,
    UidMoveCommand, UidExpungeCommand, UidFetchCommand, UidSearchCommand,
    UidStoreCommand, IdleCommand]


[docs] class InvalidCommand(Command): """An invalid command, either because the tag or command name could not be parsed, the command name was not recognized, or a known command was given invalid arguments. Args: params: The parsing parameters parse_exc: The exception that caused the failure, if any. command: The command name, if available. command_type: The command type, if available. """ command = b'[INVALID]' def __init__(self, params: Params, parse_exc: NotParseable | None, command: bytes | None = None, command_type: type[Command] | None = None) -> None: super().__init__(params.tag) self._parse_exc = parse_exc self._command = command self._command_type = command_type @property def value(self) -> bytes: return self._command or b'' @property def message(self) -> bytes: """The message to include in a BAD response.""" if not self.command_name: return b'Command not given.' elif not self.command_type: return b'%b: Unknown command.' % (self.command_name, ) else: return b'%b: Invalid arguments.' % (self.command_name, ) @property def command_name(self) -> bytes | None: """The command name, if the name could be parsed.""" return self._command @property def command_type(self) -> type[Command] | None: """The command type, if parsing failed due to invalid arguments.""" return self._command_type @property def parse_exc(self) -> NotParseable | None: """The parsing exception, if any.""" return self._parse_exc
[docs] @classmethod def parse(cls, buf: memoryview, params: Params) \ -> tuple[InvalidCommand, memoryview]: raise NotImplementedError()
[docs] class Commands: """Manages the set of all known IMAP commands and the ability to parse.""" __slots__ = ['commands'] def __init__(self) -> None: super().__init__() self.commands: dict[bytes, type[Command]] = {} self._load_commands() def _load_commands(self) -> None: for cmd in builtin_commands: self.register(cmd)
[docs] def register(self, cmd: type[Command]) -> None: """Register a new IMAP command. Args: cmd: The new command type. """ self.commands[cmd.command] = cmd
[docs] def deregister(self, name: bytes) -> None: """Deregister an IMAP command by name. Does nothing if the command is not registered. Args: name: The IMAP command name. """ self.commands.pop(name.upper(), None)
[docs] def parse(self, buf: memoryview, params: Params) \ -> tuple[Command, memoryview]: """Parse the given bytes into a command. The basic syntax is a tag string, a command name, possibly some arguments, and then an endline. If the command has a complete structure but cannot be parsed, an :class:`InvalidCommand` is returned. Args: buf: The bytes to parse. params: The parsing parameters. """ try: tag, buf = Tag.parse(buf, params) except NotParseable as exc: return InvalidCommand(params, exc), buf[0:0] else: params = params.copy(tag=tag.value) cmd_parts: list[bytes] = [] while True: try: _, buf = Space.parse(buf, params) atom, buf = Atom.parse(buf, params) cmd_parts.append(atom.value.upper()) except NotParseable as exc: return InvalidCommand(params, exc), buf[0:0] command = b' '.join(cmd_parts) cmd_type = self.commands.get(command) if not cmd_type: return InvalidCommand(params, None, command), buf[0:0] elif not cmd_type.compound: break params = params.copy(command_name=command) try: return cmd_type.parse(buf, params) except NotParseable as exc: return InvalidCommand(params, exc, command, cmd_type), buf[0:0]