Source code for pysasl.mechanism.crammd5


import re
import hmac
import hashlib
import email.utils
from typing import Union, Optional, Tuple, Sequence

from . import (ServerMechanism, ClientMechanism, ServerChallenge,
               ChallengeResponse)
from ..creds.client import ClientCredentials
from ..creds.server import ServerCredentials
from ..exception import InvalidResponse, MechanismUnusable, UnexpectedChallenge
from ..identity import Identity
from ..prep import saslprep

__all__ = ['CramMD5Result', 'CramMD5Mechanism']


[docs]class CramMD5Result(ServerCredentials): """Because this mechanism uses hash algorithms to compare secrets, the :meth:`~CramMD5Mechanism.server_attempt` method returns this sub-class which overrides the :meth:`.verify` method. """ __slots__: Sequence[str] = ['_username', '_challenge', '_digest'] def __init__(self, username: str, challenge: bytes, digest: bytes) -> None: super().__init__() self._username = username self._challenge = challenge self._digest = digest @property def authcid(self) -> str: return self._username @property def authzid(self) -> str: return self._username
[docs] def verify(self, identity: Optional[Identity]) -> bool: if identity is None: return False clear_secret = identity.get_clear_secret() if clear_secret is None: raise MechanismUnusable('CRAM-MD5') secret_b = clear_secret.encode('utf-8') expected_hmac = hmac.new(secret_b, self._challenge, hashlib.md5) expected_digest = expected_hmac.hexdigest().encode('ascii') return identity.compare_authcid(self.authcid) \ and hmac.compare_digest(expected_digest, self._digest)
[docs]class CramMD5Mechanism(ServerMechanism, ClientMechanism): """Implements the CRAM-MD5 authentication mechanism. Warning: Although secure during transport, offering this mechanism can be dangerous, as it can have implications about how the credentials are stored server-side. """ _pattern = re.compile(br'^(.*) ([^ ]+)$') def __init__(self, name: Union[str, bytes] = b'CRAM-MD5') -> None: super().__init__(name)
[docs] def server_attempt(self, responses: Sequence[ChallengeResponse]) \ -> Tuple[CramMD5Result, None]: try: first = responses[0] except IndexError as exc: challenge = email.utils.make_msgid().encode('utf-8') raise ServerChallenge(challenge) from exc match = re.match(self._pattern, first.response) if not match: raise InvalidResponse() username, digest = match.groups() username_str = username.decode('utf-8') result = CramMD5Result(username_str, first.challenge, digest) return result, None
[docs] def client_attempt(self, creds: ClientCredentials, challenges: Sequence[ServerChallenge]) \ -> ChallengeResponse: if len(challenges) < 1: return ChallengeResponse(b'', b'') elif len(challenges) > 1: raise UnexpectedChallenge() challenge = challenges[0].data authcid = saslprep(creds.authcid).encode('utf-8') secret = saslprep(creds.secret).encode('utf-8') digest = hmac.new(secret, challenge, hashlib.md5).hexdigest() response = b' '.join((authcid, digest.encode('ascii'))) return ChallengeResponse(challenge, response)