from __future__ import annotations
import asyncio
import dataclasses
import os.path
import uuid
from argparse import ArgumentParser, Namespace
from collections.abc import Set, Mapping, AsyncIterator
from contextlib import closing, asynccontextmanager, AsyncExitStack
from datetime import datetime, timezone
from importlib.resources import files
from secrets import token_bytes
from typing import Any, Final
from pysasl.creds.server import ServerCredentials
from pymap.config import BackendCapability, IMAPConfig
from pymap.exceptions import AuthorizationFailure, InvalidAuth, \
NotAllowedError, UserNotFound, CannotReplaceUser
from pymap.health import HealthStatus
from pymap.interfaces.backend import BackendInterface
from pymap.interfaces.login import LoginInterface, IdentityInterface
from pymap.interfaces.token import TokenCredentials
from pymap.parsing.message import AppendMessage
from pymap.parsing.specials.flag import Flag, Recent
from pymap.token import AllTokens
from pymap.user import Passwords, UserMetadata
from .filter import FilterSet
from .mailbox import Message, MailboxData, MailboxSet
from ..session import BaseSession
__all__ = ['DictBackend', 'Config']
[docs]
class DictBackend(BackendInterface):
"""Defines a backend that uses an in-memory dictionary for example usage
and integration testing.
"""
def __init__(self, login: Login, config: Config) -> None:
super().__init__()
self._login = login
self._config = config
self._status = HealthStatus()
@property
def login(self) -> Login:
return self._login
@property
def config(self) -> Config:
return self._config
@property
def status(self) -> HealthStatus:
return self._status
[docs]
@classmethod
def add_subparser(cls, name: str, subparsers: Any) -> ArgumentParser:
parser: ArgumentParser = subparsers.add_parser(
name, help='in-memory backend')
parser.add_argument('--demo-data', action='store_true',
help='load initial demo data')
parser.add_argument('--demo-user', default='demouser',
metavar='VAL', help='demo user ID')
parser.add_argument('--demo-password', default='demopass',
metavar='VAL', help='demo user password')
return parser
[docs]
@classmethod
async def init(cls, args: Namespace, **overrides: Any) \
-> tuple[DictBackend, Config]:
config = Config.from_args(args, **overrides)
login = Login(config)
await cls._add_demo_user(config, login)
return cls(login, config), config
@classmethod
async def _add_demo_user(cls, config: Config, login: Login) -> None:
hashed_password = await Passwords(config).hash_password(
config.demo_password)
demo_user = UserMetadata(config, config.demo_user,
password=hashed_password)
await login.demo_user_identity.set(demo_user)
[docs]
async def start(self, stack: AsyncExitStack) -> None:
pass
[docs]
class Config(IMAPConfig):
"""The config implementation for the dict backend."""
def __init__(self, args: Namespace, *, demo_data: bool,
demo_user: str, demo_password: str,
demo_data_resource: str = __name__,
admin_key: bytes | None = None, **extra: Any) -> None:
admin_key = admin_key or token_bytes()
super().__init__(args, admin_key=admin_key, **extra)
self._demo_data = demo_data
self._demo_user = demo_user
self._demo_password = demo_password
self._demo_data_resource = demo_data_resource
self.set_cache: dict[str, tuple[MailboxSet, FilterSet]] = {}
@property
def backend_capability(self) -> BackendCapability:
return BackendCapability(idle=True, object_id=True, multi_append=True)
@property
def demo_data(self) -> bool:
"""True if demo data should be loaded at startup."""
return self._demo_data
@property
def demo_data_resource(self) -> str:
"""Resource path of demo data files."""
return self._demo_data_resource
@property
def demo_user(self) -> str:
"""A login name that is valid at startup, which defaults to
``demouser``.
"""
return self._demo_user
@property
def demo_password(self) -> str:
"""The password for the :attr:`.demo_user` login name, which defaults
to ``demopass``.
"""
return self._demo_password
[docs]
@classmethod
def parse_args(cls, args: Namespace) -> Mapping[str, Any]:
return {**super().parse_args(args),
'demo_data': args.demo_data,
'demo_user': args.demo_user,
'demo_password': args.demo_password}
class Session(BaseSession[Message]):
"""The session implementation for the dict backend."""
def __init__(self, owner: str, config: Config, mailbox_set: MailboxSet,
filter_set: FilterSet) -> None:
super().__init__(owner)
self._config = config
self._mailbox_set = mailbox_set
self._filter_set = filter_set
@property
def config(self) -> Config:
return self._config
@property
def mailbox_set(self) -> MailboxSet:
return self._mailbox_set
@property
def filter_set(self) -> FilterSet:
return self._filter_set
class Login(LoginInterface):
"""The login implementation for the dict backend."""
def __init__(self, config: Config) -> None:
super().__init__()
self.config: Final = config
self.users_dict: dict[str, UserMetadata] = {}
self.tokens_dict: dict[str, tuple[str, bytes]] = {}
self._passwords = Passwords(config)
self._tokens = AllTokens(config)
@property
def tokens(self) -> AllTokens:
return self._tokens
@property
def demo_user_identity(self) -> Identity:
return Identity(self.config.demo_user, self, None, frozenset())
async def authenticate(self, credentials: ServerCredentials) -> Identity:
authcid = credentials.authcid
roles: set[str] = set()
token_id: str | None = None
if isinstance(credentials, TokenCredentials):
token_id = credentials.identifier
roles.update(credentials.roles)
identity = Identity(authcid, self, token_id, roles)
try:
user = await identity.get()
except UserNotFound:
await asyncio.sleep(self.config.invalid_user_sleep)
user = UserMetadata(self.config, authcid)
if not await self._passwords.check_password(user, credentials):
raise InvalidAuth()
roles |= user.roles
return identity
async def authorize(self, authenticated: IdentityInterface, authzid: str) \
-> Identity:
authcid = authenticated.name
roles = authenticated.roles
if authcid != authzid and 'admin' not in roles:
raise AuthorizationFailure()
return Identity(authzid, self, None, roles)
class Identity(IdentityInterface):
"""The identity implementation for the dict backend."""
def __init__(self, name: str, login: Login, token_id: str | None,
roles: Set[str]) -> None:
super().__init__()
self.login: Final = login
self.config: Final = login.config
self._name = name
self._roles = roles
self._token_id = token_id
@property
def name(self) -> str:
return self._name
@property
def roles(self) -> frozenset[str]:
return frozenset(self._roles)
async def new_token(self, *, expiration: datetime | None = None) \
-> str | None:
token_id = uuid.uuid4().hex
token_key = token_bytes()
self.login.tokens_dict[token_id] = (self.name, token_key)
return self.login.tokens.get_login_token(
token_id, self.name, token_key, expiration=expiration)
@asynccontextmanager
async def new_session(self) -> AsyncIterator[Session]:
identity = self.name
config = self.config
_ = await self.get()
mailbox_set, filter_set = config.set_cache.get(identity, (None, None))
if not mailbox_set or not filter_set:
mailbox_set = MailboxSet()
filter_set = FilterSet()
if config.demo_data and identity == config.demo_user:
await self._load_demo(config.demo_data_resource,
mailbox_set, filter_set)
config.set_cache[identity] = (mailbox_set, filter_set)
yield Session(identity, config, mailbox_set, filter_set)
async def _load_demo(self, resource: str, mailbox_set: MailboxSet,
filter_set: FilterSet) -> None:
inbox = await mailbox_set.get_mailbox('INBOX')
await self._load_demo_mailbox(resource, 'INBOX', inbox)
mbx_names = sorted(f.name
for f in files(resource).joinpath('demo').iterdir()
if f.is_dir())
await self._load_demo_sieve(resource, filter_set)
for name in mbx_names:
if name != 'INBOX':
await mailbox_set.add_mailbox(name)
mbx = await mailbox_set.get_mailbox(name)
await self._load_demo_mailbox(resource, name, mbx)
async def _load_demo_sieve(self, resource: str,
filter_set: FilterSet) -> None:
path = os.path.join('demo', 'sieve')
sieve = files(resource).joinpath(path).read_bytes()
await filter_set.put('demo', sieve)
await filter_set.set_active('demo')
async def _load_demo_mailbox(self, resource: str, name: str,
mbx: MailboxData) -> None:
path = os.path.join('demo', name)
msg_names = sorted(f.name
for f in files(resource).joinpath(path).iterdir()
if f.is_file())
for msg_name in msg_names:
if msg_name == '.readonly':
mbx._readonly = True
continue
elif msg_name.startswith('.'):
continue
msg_path = os.path.join(path, msg_name)
with closing(files(resource).joinpath(msg_path).open('rb')) \
as msg_stream:
flags_line = msg_stream.readline()
msg_timestamp = float(msg_stream.readline())
msg_data = msg_stream.read()
msg_dt = datetime.fromtimestamp(msg_timestamp, timezone.utc)
msg_flags = {Flag(flag) for flag in flags_line.split()}
if Recent in msg_flags:
msg_flags.remove(Recent)
msg_recent = True
else:
msg_recent = False
append_msg = AppendMessage(msg_data, msg_dt, frozenset(msg_flags))
await mbx.append(append_msg, recent=msg_recent)
async def get(self) -> UserMetadata:
token_id = self._token_id
name = self.name
token_key: bytes | None = None
if token_id is not None:
try:
name, token_key = self.login.tokens_dict[token_id]
except KeyError as exc:
raise UserNotFound() from exc
else:
self._name = name
user = self.login.users_dict.get(name)
if user is None:
raise UserNotFound(self.name)
return dataclasses.replace(user, token_key=token_key)
async def set(self, user: UserMetadata) -> int:
if 'admin' not in self._roles and user.roles:
raise NotAllowedError('Cannot assign roles.')
existing = self.login.users_dict.get(self.name)
if not user.can_replace(existing):
raise CannotReplaceUser()
entity_tag = UserMetadata.new_entity_tag()
self.login.users_dict[self.name] = \
dataclasses.replace(user, entity_tag=entity_tag)
return entity_tag
async def delete(self) -> None:
try:
del self.login.users_dict[self.name]
except KeyError as exc:
raise UserNotFound(self.name) from exc
self.config.set_cache.pop(self.name, None)
for token_id, (name, _) in list(self.login.tokens_dict.items()):
if name == self.name:
del self.login.tokens_dict[token_id]