Source code for pysasl.mechanism



from abc import abstractmethod, ABCMeta
from typing import Union, Optional, Tuple, Sequence
from typing_extensions import TypeAlias

from ..creds.client import ClientCredentials
from ..creds.server import ServerCredentials

__all__ = ['Mechanism', 'ServerChallenge', 'ChallengeResponse',
           'ServerMechanism', 'ClientMechanism']

#: A type alias for either server or client mechanisms.
Mechanism: TypeAlias = Union['ServerMechanism', 'ClientMechanism']


[docs]class ServerChallenge(Exception): """Raised by :meth:`~ServerMechanism.server_attempt` to provide server challenges. Args: data: The challenge string that should be sent to the client. """ __slots__ = ['_data'] def __init__(self, data: bytes) -> None: super().__init__(data) self._data = data @property def data(self) -> bytes: """The server challenge that should be sent to the client.""" return self._data def __repr__(self) -> str: return f'ServerChallenge({self.data!r})'
[docs]class ChallengeResponse: """A challenge-response exchange between server and client. Args: challenge: The server challenge string. response: The client response string. """ __slots__ = ['_challenge', '_response'] def __init__(self, challenge: bytes, response: bytes) -> None: super().__init__() self._challenge = challenge self._response = response @property def challenge(self) -> bytes: """The server challenge string.""" return self._challenge @property def response(self) -> bytes: """The client response string.""" return self._response def __repr__(self) -> str: return f'ChallengeResponse({self.challenge!r}, {self.response!r})'
class _BaseMechanism: __slots__: Sequence[str] = ['_name'] def __init__(self, name: Union[str, bytes]) -> None: super().__init__() if isinstance(name, str): name = name.encode('ascii') self._name = name @property def name(self) -> bytes: """The SASL name for this mechanism.""" return self._name def __eq__(self, other: object) -> bool: if isinstance(other, _BaseMechanism): return self.name == other.name return NotImplemented
[docs]class ServerMechanism(_BaseMechanism, metaclass=ABCMeta): """Base class for implementing SASL mechanisms that support server-side credential verification. """ __slots__: Sequence[str] = []
[docs] @abstractmethod def server_attempt(self, responses: Sequence[ChallengeResponse]) \ -> Tuple[ServerCredentials, Optional[bytes]]: """For SASL server-side credential verification, receives responses from the client and issues challenges until it has everything needed to verify the credentials. If a challenge is necessary, a :class:`~pysasl.mechanism.ServerChallenge` exception will be raised. The response to this challenge must then be added to *responses* in the next call to :meth:`.server_attempt`. Args: responses: The challenge-response exchanges thus far. Returns: A tuple of the authentication credentials received from the client once no more challenges are necessary, and an optional final response string from the server used by some mechanisms. Raises: ServerChallenge: The server challenge needing a client response. InvalidResponse: The server received an invalid client response. """ ...
[docs]class ClientMechanism(_BaseMechanism, metaclass=ABCMeta): """Base class for implementing SASL mechanisms that support client-side credential verification. """ __slots__: Sequence[str] = []
[docs] @abstractmethod def client_attempt(self, creds: ClientCredentials, challenges: Sequence[ServerChallenge]) \ -> ChallengeResponse: """For SASL client-side credential verification, produce responses to send to the server and react to its challenges until the server returns a final success or failure. Args: creds: The credentials to attempt authentication with. challenges: The server challenges received. Returns: The response to the most recent server challenge. Raises: UnexpectedChallenge: The server has issued a challenge the client mechanism does not recognize. """ ...