Source code for pymap.token
from __future__ import annotations
from collections.abc import Sequence, Set
from datetime import datetime
from functools import cached_property
from typing import Final
from ..config import IMAPConfig
from ..interfaces.token import TokenCredentials, TokensInterface
from ..plugin import Plugin
__all__ = ['tokens', 'TokensBase', 'AllTokens']
#: Registers token plugins.
tokens: Plugin[TokensBase] = Plugin('pymap.token', default='macaroon')
[docs]
class TokensBase(TokensInterface):
"""Base class for token types registered by :data:`tokens`.
Args:
config: The IMAP configuration object.
"""
def __init__(self, config: IMAPConfig) -> None:
super().__init__()
self.config: Final = config
[docs]
class AllTokens(TokensBase):
"""Uses :data:`tokens` to support all registered token types.
For token creation, the :attr:`~pymap.plugin.Plugin.default` token plugin
is used. For token parsing, each token plugin is tried until one succeeds.
"""
@cached_property
def _default_tokens(self) -> TokensInterface | None:
try:
token_type = tokens.default
except KeyError:
return None
else:
return token_type(self.config)
@cached_property
def _tokens(self) -> Sequence[TokensInterface]:
all_tokens = []
config = self.config
for token_type in tokens.registered.values():
if token_type == tokens.default:
assert self._default_tokens is not None
all_tokens.append(self._default_tokens)
else:
all_tokens.append(token_type(config))
return all_tokens
[docs]
def get_login_token(self, identifier: str, authcid: str, key: bytes, *,
authzid: str | None = None,
location: str | None = None,
expiration: datetime | None = None) \
-> str | None:
tokens = self._default_tokens
if tokens is None:
return None
return tokens.get_login_token(
identifier, authcid, key, authzid=authzid, location=location,
expiration=expiration)
[docs]
def get_admin_token(self, admin_key: bytes | None, *,
authzid: str | None = None,
location: str | None = None,
expiration: datetime | None = None) \
-> str | None:
tokens = self._default_tokens
if tokens is None:
return None
return tokens.get_admin_token(
admin_key, authzid=authzid, location=location,
expiration=expiration)
[docs]
def parse(self, authzid: str, token: str, *,
admin_keys: Set[bytes] = frozenset()) \
-> TokenCredentials:
for tokens in self._tokens:
try:
return tokens.parse(authzid, token, admin_keys=admin_keys)
except ValueError:
pass
raise ValueError('invalid token')