lepture/authlib

View on GitHub
authlib/oauth2/rfc7523/jwt_bearer.py

Summary

Maintainability
A
0 mins
Test Coverage
import logging
from authlib.jose import jwt, JoseError
from ..rfc6749 import BaseGrant, TokenEndpointMixin
from ..rfc6749 import (
    UnauthorizedClientError,
    InvalidRequestError,
    InvalidGrantError,
    InvalidClientError,
)
from .assertion import sign_jwt_bearer_assertion

log = logging.getLogger(__name__)
JWT_BEARER_GRANT_TYPE = 'urn:ietf:params:oauth:grant-type:jwt-bearer'


class JWTBearerGrant(BaseGrant, TokenEndpointMixin):
    GRANT_TYPE = JWT_BEARER_GRANT_TYPE

    #: Options for verifying JWT payload claims. Developers MAY
    #: overwrite this constant to create a more strict options.
    CLAIMS_OPTIONS = {
        'iss': {'essential': True},
        'aud': {'essential': True},
        'exp': {'essential': True},
    }

    @staticmethod
    def sign(key, issuer, audience, subject=None,
             issued_at=None, expires_at=None, claims=None, **kwargs):
        return sign_jwt_bearer_assertion(
            key, issuer, audience, subject, issued_at,
            expires_at, claims, **kwargs)

    def process_assertion_claims(self, assertion):
        """Extract JWT payload claims from request "assertion", per
        `Section 3.1`_.

        :param assertion: assertion string value in the request
        :return: JWTClaims
        :raise: InvalidGrantError

        .. _`Section 3.1`: https://tools.ietf.org/html/rfc7523#section-3.1
        """
        try:
            claims = jwt.decode(
                assertion, self.resolve_public_key,
                claims_options=self.CLAIMS_OPTIONS)
            claims.validate()
        except JoseError as e:
            log.debug('Assertion Error: %r', e)
            raise InvalidGrantError(description=e.description)
        return claims

    def resolve_public_key(self, headers, payload):
        client = self.resolve_issuer_client(payload['iss'])
        return self.resolve_client_key(client, headers, payload)

    def validate_token_request(self):
        """The client makes a request to the token endpoint by sending the
        following parameters using the "application/x-www-form-urlencoded"
        format per `Section 2.1`_:

        grant_type
             REQUIRED.  Value MUST be set to
             "urn:ietf:params:oauth:grant-type:jwt-bearer".

        assertion
             REQUIRED.  Value MUST contain a single JWT.

        scope
            OPTIONAL.

        The following example demonstrates an access token request with a JWT
        as an authorization grant:

        .. code-block:: http

            POST /token.oauth2 HTTP/1.1
            Host: as.example.com
            Content-Type: application/x-www-form-urlencoded

            grant_type=urn%3Aietf%3Aparams%3Aoauth%3Agrant-type%3Ajwt-bearer
            &assertion=eyJhbGciOiJFUzI1NiIsImtpZCI6IjE2In0.
            eyJpc3Mi[...omitted for brevity...].
            J9l-ZhwP[...omitted for brevity...]

        .. _`Section 2.1`: https://tools.ietf.org/html/rfc7523#section-2.1
        """
        assertion = self.request.form.get('assertion')
        if not assertion:
            raise InvalidRequestError('Missing "assertion" in request')

        claims = self.process_assertion_claims(assertion)
        client = self.resolve_issuer_client(claims['iss'])
        log.debug('Validate token request of %s', client)

        if not client.check_grant_type(self.GRANT_TYPE):
            raise UnauthorizedClientError()

        self.request.client = client
        self.validate_requested_scope()

        subject = claims.get('sub')
        if subject:
            user = self.authenticate_user(subject)
            if not user:
                raise InvalidGrantError(description='Invalid "sub" value in assertion')

            log.debug('Check client(%s) permission to User(%s)', client, user)
            if not self.has_granted_permission(client, user):
                raise InvalidClientError(
                    description='Client has no permission to access user data')
            self.request.user = user

    def create_token_response(self):
        """If valid and authorized, the authorization server issues an access
        token.
        """
        token = self.generate_token(
            scope=self.request.scope,
            user=self.request.user,
            include_refresh_token=False,
        )
        log.debug('Issue token %r to %r', token, self.request.client)
        self.save_token(token)
        return 200, token, self.TOKEN_RESPONSE_HEADER

    def resolve_issuer_client(self, issuer):
        """Fetch client via "iss" in assertion claims. Developers MUST
        implement this method in subclass, e.g.::

            def resolve_issuer_client(self, issuer):
                return Client.query_by_iss(issuer)

        :param issuer: "iss" value in assertion
        :return: Client instance
        """
        raise NotImplementedError()

    def resolve_client_key(self, client, headers, payload):
        """Resolve client key to decode assertion data. Developers MUST
        implement this method in subclass. For instance, there is a
        "jwks" column on client table, e.g.::

            def resolve_client_key(self, client, headers, payload):
                # from authlib.jose import JsonWebKey

                key_set = JsonWebKey.import_key_set(client.jwks)
                return key_set.find_by_kid(headers['kid'])

        :param client: instance of OAuth client model
        :param headers: headers part of the JWT
        :param payload: payload part of the JWT
        :return: ``authlib.jose.Key`` instance
        """
        raise NotImplementedError()

    def authenticate_user(self, subject):
        """Authenticate user with the given assertion claims. Developers MUST
        implement it in subclass, e.g.::

            def authenticate_user(self, subject):
                return User.get_by_sub(subject)

        :param subject: "sub" value in claims
        :return: User instance
        """
        raise NotImplementedError()

    def has_granted_permission(self, client, user):
        """Check if the client has permission to access the given user's resource.
        Developers MUST implement it in subclass, e.g.::

            def has_granted_permission(self, client, user):
                permission = ClientUserGrant.query(client=client, user=user)
                return permission.granted

        :param client: instance of OAuth client model
        :param user: instance of User model
        :return: bool
        """
        raise NotImplementedError()