Source code for pymap.parsing.primitives

"""Primitive parseable objects in the IMAP protocol."""

from __future__ import annotations

import re
from abc import abstractmethod, ABCMeta
from collections.abc import Iterable, Iterator, Sequence
from functools import total_ordering
from re import Match
from typing import cast, overload, Any, SupportsBytes

from . import AnyParseable, ParseableT, ParseableType, Parseable, \
    ExpectedParseable, Params
from .exceptions import NotParseable
from .state import ExpectContinuation
from ..bytes import MaybeBytes, MaybeBytesT, BytesFormat, WriteStream, \
    Writeable
from ..frozen import frozenlist

__all__ = ['Nil', 'Number', 'Atom', 'List', 'String',
           'QuotedString', 'LiteralString']


[docs] class Nil(Parseable[None]): """Represents a ``NIL`` object from an IMAP stream.""" _nil_pattern = re.compile(b'^NIL$', re.I) __slots__: list[str] = [] def __init__(self) -> None: super().__init__() @property def value(self) -> None: """Always returns ``None``.""" return None def __bytes__(self) -> bytes: return b'NIL' def __hash__(self) -> int: return hash(Nil) def __eq__(self, other: Any) -> bool: if isinstance(other, Nil): return True return super().__eq__(other)
[docs] @classmethod def parse(cls, buf: memoryview, params: Params) \ -> tuple[Nil, memoryview]: start = cls._whitespace_length(buf) match = cls._atom_pattern.match(buf, start) if not match: raise NotParseable(buf) atom = match.group(0) if not cls._nil_pattern.match(atom): raise NotParseable(buf) return cls(), buf[match.end(0):]
[docs] @total_ordering class Number(Parseable[int]): """Represents a number object from an IMAP stream. Args: num: The integer value. """ _num_pattern = re.compile(br'^\d+$') __slots__ = ['num', '_raw'] def __init__(self, num: int) -> None: super().__init__() self.num = num self._raw = b'%d' % num @property def value(self) -> int: """The integer value.""" return self.num
[docs] @classmethod def parse(cls, buf: memoryview, params: Params) \ -> tuple[Number, memoryview]: start = cls._whitespace_length(buf) match = cls._atom_pattern.match(buf, start) if not match: raise NotParseable(buf) atom = match.group(0) if not cls._num_pattern.match(atom): raise NotParseable(buf) return cls(int(match.group(0))), buf[match.end(0):]
def __bytes__(self) -> bytes: return self._raw def __hash__(self) -> int: return hash((Number, self.value)) def __eq__(self, other: Any) -> bool: if isinstance(other, Number): return self.value == other.value elif isinstance(other, int): return self.value == other return super().__eq__(other) def __lt__(self, other: Any) -> bool: if isinstance(other, Number): return self.value < other.value elif isinstance(other, int): return self.value < other return NotImplemented
[docs] class Atom(Parseable[bytes]): """Represents an atom object from an IMAP stream. Args: value: The atom bytestring. """ __slots__ = ['_value'] def __init__(self, value: bytes) -> None: super().__init__() self._value = value @property def value(self) -> bytes: """The atom bytestring.""" return self._value
[docs] @classmethod def parse(cls, buf: memoryview, params: Params) \ -> tuple[Atom, memoryview]: start = cls._whitespace_length(buf) match = cls._atom_pattern.match(buf, start) if not match: raise NotParseable(buf[start:]) atom = match.group(0) return cls(atom), buf[match.end(0):]
def __bytes__(self) -> bytes: return self.value def __hash__(self) -> int: return hash((Atom, self.value)) def __eq__(self, other: Any) -> bool: if isinstance(other, Atom): return self.value == other.value return super().__eq__(other)
[docs] class String(Parseable[bytes], metaclass=ABCMeta): """Represents a string object from an IMAP string. This object may not be instantiated directly, use one of its derivatives instead. """ _MAX_LEN = 4096 __slots__: list[str] = [] @property @abstractmethod def binary(self) -> bool: """True if the string should be transmitted as binary.""" ... @property @abstractmethod def length(self) -> int: """The length of the string value.""" ...
[docs] @classmethod def parse(cls, buf: memoryview, params: Params) \ -> tuple[String, memoryview]: try: return QuotedString.parse(buf, params) except NotParseable: pass return LiteralString.parse(buf, params)
[docs] @classmethod def build(cls, value: object, binary: bool = False, fallback: object = None) -> Nil | String: """Produce either a :class:`QuotedString` or :class:`LiteralString` based on the contents of ``data``. This is useful to improve readability of response data. Args: value: The string to serialize. binary: True if the string should be transmitted as binary. fallback: The default value to use if ``value`` is None. """ if value is None: if fallback is None: return Nil() else: return cls.build(fallback, binary) elif not value: return QuotedString(b'') elif isinstance(value, bytes): ascii_ = value elif isinstance(value, memoryview): ascii_ = bytes(value) elif hasattr(value, '__bytes__'): ascii_ = bytes(cast(SupportsBytes, value)) elif isinstance(value, str) or hasattr(value, '__str__'): value = str(value) try: ascii_ = bytes(value, 'ascii') except UnicodeEncodeError: ascii_ = bytes(value, 'utf-8', 'replace') return LiteralString(ascii_, binary) else: raise TypeError(value) if not binary and len(ascii_) < 64 \ and b'\n' not in ascii_ \ and b'\x00' not in ascii_: return QuotedString(ascii_) else: return LiteralString(ascii_, binary)
def __bytes__(self) -> bytes: raise NotImplementedError def __hash__(self) -> int: return hash((String, self.value)) def __eq__(self, other: Any) -> bool: if isinstance(other, String): return self.value == other.value return super().__eq__(other)
[docs] class QuotedString(String): """Represents a string object from an IMAP stream that was encased in double-quotes. Args: string: The string value. """ _quoted_pattern = re.compile(br'(?:\r|\n|\\.|\")') _quoted_specials_pattern = re.compile(br'[\"\\]') __slots__ = ['_string', '_raw'] def __init__(self, string: bytes, raw: bytes | None = None) -> None: super().__init__() self._string = string self._raw = raw @property def value(self) -> bytes: """The string value.""" return self._string @property def binary(self) -> bool: return False @property def length(self) -> int: return len(self._string)
[docs] @classmethod def parse(cls, buf: memoryview, params: Params) \ -> tuple[QuotedString, memoryview]: start = cls._whitespace_length(buf) if buf[start:start + 1] != b'"': raise NotParseable(buf) marker = start + 1 unquoted = bytearray() for match in cls._quoted_pattern.finditer(buf, marker): unquoted += buf[marker:match.start(0)] match_group = match.group(0) if match_group in (b'\r', b'\n'): raise NotParseable(buf) elif match_group.startswith(b'\\'): escape_char = match_group[-1:] if escape_char in (b'\\', b'"'): unquoted += escape_char else: raise NotParseable(buf) marker = match.end(0) else: end = match.end(0) quoted = buf[start:end + 1] return cls(bytes(unquoted), bytes(quoted)), buf[end:] raise NotParseable(buf)
@classmethod def _escape_quoted_specials(cls, match: Match[bytes]) -> bytes: return b'\\' + match.group(0) def __bytes__(self) -> bytes: if self._raw is not None: return bytes(self._raw) pat = self._quoted_specials_pattern quoted_string = pat.sub(self._escape_quoted_specials, self.value) self._raw = BytesFormat(b'"%b"') % (quoted_string, ) return self._raw
[docs] class LiteralString(String): """Represents a string object from an IMAP stream that used the literal syntax. Args: string: The literal string value. binary: True if the string is considered binary data. """ _literal_pattern = re.compile(br'(~?){(\d+)(\+?)}\r?\n') __slots__ = ['_string', '_length', '_binary', '_raw'] def __init__(self, string: bytes | Writeable, binary: bool = False) -> None: super().__init__() self._string = string self._length = len(string) self._binary = binary self._raw: bytes | None = None @property def value(self) -> bytes: return bytes(self._string) @property def binary(self) -> bool: return self._binary @property def length(self) -> int: return self._length @property def _prefix(self) -> bytes: binary_prefix = b'~' if self.binary else b'' return b'%b{%d}\r\n' % (binary_prefix, self.length) @classmethod def _check_too_big(cls, params: Params, length: int) -> bool: if params.command_name == b'APPEND': max_len = params.max_append_len else: max_len = cls._MAX_LEN return max_len is not None and length > max_len
[docs] @classmethod def parse(cls, buf: memoryview, params: Params) \ -> tuple[LiteralString, memoryview]: start = cls._whitespace_length(buf) match = cls._literal_pattern.match(buf, start) if not match: raise NotParseable(buf) binary = match.group(1) == b'~' literal_length = int(match.group(2)) if cls._check_too_big(params, literal_length): raise NotParseable(buf, b'TOOBIG') elif match.group(3) == b'+': buf = buf[match.end(0):] literal = bytes(buf[0:literal_length]) elif len(buf) > match.end(0): raise NotParseable(buf[match.end(0):]) elif params.allow_continuations: expected = ExpectContinuation(b'Literal string', literal_length) buf = expected.expect(params.state) literal = bytes(buf[0:literal_length]) else: raise NotParseable(buf) if len(literal) != literal_length: raise NotParseable(buf) return cls(literal, binary), buf[literal_length:]
[docs] def write(self, writer: WriteStream) -> None: writer.write(self._prefix) if isinstance(self._string, Writeable): self._string.write(writer) else: writer.write(self._string)
def __len__(self) -> int: return len(self._prefix) + self.length def __bytes__(self) -> bytes: if self._raw is None: self._raw = self.tobytes() return self._raw
[docs] class List(Parseable[frozenlist[MaybeBytes]]): """Represents a list of :class:`Parseable` objects from an IMAP stream. Args: items: The list of parsed objects. sort: If True, the list of items is sorted. """ _end_pattern = re.compile(br' *\)') __slots__ = ['items'] def __init__(self, items: Iterable[MaybeBytes], sort: bool = False) -> None: super().__init__() if sort: items = tuple(sorted(items)) # type: ignore self.items: frozenlist[MaybeBytes] = frozenlist(items) @property def value(self) -> frozenlist[MaybeBytes]: """The list of parsed objects.""" return self.items @overload def get_as(self, cls: ParseableType[ParseableT]) -> Sequence[ParseableT]: ... @overload def get_as(self, cls: type[MaybeBytesT]) -> Sequence[MaybeBytesT]: ...
[docs] def get_as(self, cls: Any) -> Sequence[Any]: """Return the list of parsed objects.""" return self.items
def __iter__(self) -> Iterator[MaybeBytes]: return iter(self.value) def __len__(self) -> int: return len(self.value)
[docs] @classmethod def parse(cls, buf: memoryview, params: Params) \ -> tuple[List, memoryview]: start = cls._whitespace_length(buf) if buf[start:start + 1] != b'(': raise NotParseable(buf) items: list[AnyParseable] = [] buf = buf[start + 1:] limit = params.list_limit while True: match = cls._end_pattern.match(buf) if match: return cls(items), buf[match.end(0):] elif items and not cls._whitespace_length(buf): raise NotParseable(buf) item, buf = ExpectedParseable.parse(buf, params) if len(items) == limit: raise NotParseable(buf) items.append(item)
[docs] def write(self, writer: WriteStream) -> None: writer.write(b'(') is_first = True for item in self.items: if is_first: is_first = False else: writer.write(b' ') if isinstance(item, Writeable): item.write(writer) else: writer.write(bytes(item)) writer.write(b')')
def __bytes__(self) -> bytes: raw_items = BytesFormat(b' ').join(self.items) return b'(%b)' % raw_items def __hash__(self) -> int: return hash((List, self.value)) def __eq__(self, other: Any) -> bool: if isinstance(other, List): return self.__eq__(other.value) elif isinstance(other, Sequence): if len(self.value) != len(other): return False for i, val in enumerate(self.value): if val != other[i]: return False return True return super().__eq__(other)