AngellusMortis/django_microsoft_auth

View on GitHub
microsoft_auth/client.py

Summary

Maintainability
A
30 mins
Test Coverage
C
71%
import json
import logging

from django.contrib.sites.models import Site
from django.core.cache import cache
from django.urls import reverse
import jwt
from jwt.algorithms import RSAAlgorithm
import requests
from requests_oauthlib import OAuth2Session

from .conf import (
    CACHE_KEY_JWKS,
    CACHE_KEY_OPENID,
    CACHE_TIMEOUT,
    LOGIN_TYPE_XBL,
    config,
)
from .utils import get_scheme

logger = logging.getLogger("django")


class MicrosoftClient(OAuth2Session):
    """Simple Microsoft OAuth2 Client to authenticate them

    Extended from Requests-OAuthlib's OAuth2Session class which
        does most of the heavy lifting

    https://requests-oauthlib.readthedocs.io/en/latest/

    Microsoft OAuth documentation can be found at
    https://developer.microsoft.com/en-us/graph/docs/get-started/rest
    """

    _config_url = "https://login.microsoftonline.com/{tenant}/v2.0/.well-known/openid-configuration"  # noqa

    _xbox_authorization_url = "https://login.live.com/oauth20_authorize.srf"
    _xbox_token_url = "https://user.auth.xboxlive.com/user/authenticate"  # nosec
    _profile_url = "https://xsts.auth.xboxlive.com/xsts/authorize"

    xbox_token = {}

    config = None

    # required OAuth scopes
    SCOPE_XBL = ["XboxLive.signin", "XboxLive.offline_access"]
    SCOPE_MICROSOFT = ["User.Read", "openid", "email", "profile"]

    def __init__(self, state=None, request=None, *args, **kwargs):
        self.config = config

        super().__init__(
            self.config.MICROSOFT_AUTH_CLIENT_ID,
            scope=self._get_scopes(),
            state=state,
            redirect_uri=self._get_redirect_uri(request),
            *args,
            **kwargs,
        )

        if self.config.MICROSOFT_AUTH_PROXIES:
            self.proxies = self.config.MICROSOFT_AUTH_PROXIES

    def _get_scopes(self):
        scope = " ".join(self.SCOPE_MICROSOFT)

        if self.config.MICROSOFT_AUTH_LOGIN_TYPE == LOGIN_TYPE_XBL:
            scope = " ".join(self.SCOPE_XBL)

        extra_scopes = self.config.MICROSOFT_AUTH_EXTRA_SCOPES
        scope = "{} {}".format(scope, extra_scopes).strip()

        return scope

    def _get_redirect_uri(self, request):
        try:
            current_site = Site.objects.get_current(request)
        except Site.DoesNotExist:
            current_site = Site.objects.first()

        domain = current_site.domain
        callback = reverse("microsoft_auth:auth-callback")
        redirect = reverse("microsoft_auth:from-auth-redirect")
        if not request or "redirect" not in request.path:
            path = callback
        else:
            path = redirect

        return f"{get_scheme(request, self.config)}://{domain}{path}"

    @property
    def openid_config(self):
        config = cache.get(CACHE_KEY_OPENID)

        if config is None:
            config_url = self._config_url.format(
                tenant=self.config.MICROSOFT_AUTH_TENANT_ID
            )
            response = self.get(config_url)

            if response.ok:
                config = response.json()
                cache.set(CACHE_KEY_OPENID, config, CACHE_TIMEOUT)

        return config

    @property
    def jwks(self):
        jwks = cache.get(CACHE_KEY_JWKS, [])

        if len(jwks) == 0:
            jwks_uri = self.openid_config["jwks_uri"]
            if jwks_uri is None:
                return []

            response = self.get(jwks_uri)

            if response.ok:
                jwks = response.json()["keys"]
                cache.set(CACHE_KEY_JWKS, jwks, CACHE_TIMEOUT)
        return jwks

    def get_claims(self, allow_refresh=True):
        if self.token is None:
            return None

        token = self.token["id_token"].encode("utf8")

        kid = jwt.get_unverified_header(token)["kid"]
        jwk = None
        public_key = None
        for key in self.jwks:
            if kid == key["kid"]:
                jwk = key
                break

        if jwk is None:
            if allow_refresh:
                logger.warn(
                    "could not find public key for id_token, " "refreshing OIDC config"
                )
                cache.delete(CACHE_KEY_JWKS)
                cache.delete(CACHE_KEY_OPENID)

                return self.get_claims(allow_refresh=False)
            else:
                logger.warn("could not find public key for id_token")
                return None

        public_key = RSAAlgorithm.from_jwk(json.dumps(jwk))

        try:
            claims = jwt.decode(
                token,
                public_key,
                algorithms=["RS256"],
                audience=self.config.MICROSOFT_AUTH_CLIENT_ID,
            )
        except jwt.PyJWTError as e:
            logger.warn("could not verify id_token sig: {}".format(e))
            return None

        return claims

    def authorization_url(self):
        """Generates Microsoft/Xbox or a Office 365 Authorization URL"""

        auth_url = self.openid_config["authorization_endpoint"]
        if self.config.MICROSOFT_AUTH_LOGIN_TYPE == LOGIN_TYPE_XBL:
            auth_url = self._xbox_authorization_url

        extra_parameters = self.config.MICROSOFT_AUTH_EXTRA_PARAMETERS
        extra_parameters["response_mode"] = "form_post"
        built_auth_url = super().authorization_url(auth_url, **extra_parameters)
        return built_auth_url

    def fetch_token(self, **kwargs):
        """Fetchs OAuth2 Token with given kwargs"""

        return super().fetch_token(  # pragma: no cover
            self.openid_config["token_endpoint"],
            client_secret=self.config.MICROSOFT_AUTH_CLIENT_SECRET,
            **kwargs,
        )

    def fetch_xbox_token(self):
        """Fetches Xbox Live Auth token.

        token must contain a valid access_token
            - retrieved from fetch_token

        Reversed engineered from existing Github repos,
            no "official" API docs from Microsoft

        Response will be similar to
        {
            'Token': 'token',
            'IssueInstant': '2016-09-27T15:01:45.225637Z',
            'DisplayClaims': {'xui': [{'uhs': '###################'}]},
            'NotAfter': '2016-10-11T15:01:45.225637Z'
        }
        """

        # Content-type MUST be json for Xbox Live
        headers = {
            "Content-type": "application/json",
            "Accept": "application/json",
        }
        params = {
            "RelyingParty": "http://auth.xboxlive.com",
            "TokenType": "JWT",
            "Properties": {
                "AuthMethod": "RPS",
                "SiteName": "user.auth.xboxlive.com",
                "RpsTicket": "d={}".format(self.token["access_token"]),
            },
        }
        response = requests.post(
            self._xbox_token_url, data=json.dumps(params), headers=headers
        )

        if response.status_code == 200:
            self.xbox_token = response.json()

        return self.xbox_token

    def get_xbox_profile(self):
        """
        Fetches the Xbox Live user profile from Xbox servers

        xbox_token must contain a valid Xbox Live token
            - retrieved from fetch_xbox_token

        Reversed engineered from existing Github repos,
            no "official" API docs from Microsoft

        Response will be similar to
        {
            'NotAfter': '2016-09-28T07:19:21.9608601Z',
            'DisplayClaims': {
                'xui': [
                    {
                        'agg': 'Adult',
                        'uhs': '###################',
                        'usr': '###',
                        'xid': '################',
                        'prv': '### ### ###...',
                        'gtg': 'Gamertag'}]},
            'IssueInstant': '2016-09-27T15:19:21.9608601Z',
            'Token': 'token'}
        """

        if "Token" in self.xbox_token:
            # Content-type MUST be json for Xbox Live
            headers = {
                "Content-type": "application/json",
                "Accept": "application/json",
            }
            params = {
                "RelyingParty": "http://xboxlive.com",
                "TokenType": "JWT",
                "Properties": {
                    "UserTokens": [self.xbox_token["Token"]],
                    "SandboxId": "RETAIL",
                },
            }
            response = requests.post(
                self._profile_url, data=json.dumps(params), headers=headers
            )

            if response.status_code == 200:
                return response.json()["DisplayClaims"]["xui"][0]
        return {}

    def valid_scopes(self, scopes):
        """Validates response scopes based on MICROSOFT_AUTH_LOGIN_TYPE"""

        scopes = set(scopes)
        required_scopes = None
        if self.config.MICROSOFT_AUTH_LOGIN_TYPE == LOGIN_TYPE_XBL:
            required_scopes = set(self.SCOPE_XBL)
        else:
            required_scopes = set(self.SCOPE_MICROSOFT)

        # verify all require_scopes are in scopes
        return required_scopes <= scopes