Source code for pysasl.mechanism.oauth
import re
from typing import Union, Tuple, Sequence
from . import (ServerMechanism, ClientMechanism, ServerChallenge,
ChallengeResponse)
from ..creds.client import ClientCredentials
from ..creds.external import ExternalCredentials
from ..exception import InvalidResponse, UnexpectedChallenge
__all__ = ['OAuth2Mechanism']
[docs]class OAuth2Mechanism(ServerMechanism, ClientMechanism):
"""Implements the `XOAUTH2`_ authentication mechanism, used by `OAuth 2.0`_
systems to authenticate using access tokens.
.. _XOAUTH2: https://developers.google.com/gmail/xoauth2_protocol
.. _OAuth 2.0: https://tools.ietf.org/html/rfc6749
"""
_pattern = re.compile(br'^user=(.*?)\x01auth=[bB][eE][aA][rR][eE][rR] '
br'(.*?)\x01\x01$')
def __init__(self, name: Union[str, bytes] = b'XOAUTH2') -> None:
super().__init__(name)
[docs] def server_attempt(self, responses: Sequence[ChallengeResponse]) \
-> Tuple[ExternalCredentials, None]:
try:
first = responses[0]
except IndexError as exc:
raise ServerChallenge(b'') from exc
match = re.match(self._pattern, first.response)
if not match:
raise InvalidResponse()
user, token = match.groups()
user_str = user.decode('utf-8')
token_str = token.decode('utf-8')
return ExternalCredentials(user_str, token_str), None
[docs] def client_attempt(self, creds: ClientCredentials,
challenges: Sequence[ServerChallenge]) \
-> ChallengeResponse:
if len(challenges) == 0:
challenge = b''
elif len(challenges) == 1:
challenge = challenges[0].data
else:
raise UnexpectedChallenge()
if challenge != b'':
response = b''
else:
user = creds.authcid.encode('utf-8')
token = creds.secret.encode('utf-8')
response = b''.join((b'user=', user, b'\x01auth=Bearer ', token,
b'\x01\x01'))
return ChallengeResponse(challenge, response)