lepture/authlib

View on GitHub
authlib/oauth2/rfc6749/grants/refresh_token.py

Summary

Maintainability
A
0 mins
Test Coverage
"""
    authlib.oauth2.rfc6749.grants.refresh_token
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

    A special grant endpoint for refresh_token grant_type. Refreshing an
    Access Token per `Section 6`_.

    .. _`Section 6`: https://tools.ietf.org/html/rfc6749#section-6
"""

import logging
from .base import BaseGrant, TokenEndpointMixin
from ..util import scope_to_list
from ..errors import (
    InvalidRequestError,
    InvalidScopeError,
    InvalidGrantError,
    UnauthorizedClientError,
)
log = logging.getLogger(__name__)


class RefreshTokenGrant(BaseGrant, TokenEndpointMixin):
    """A special grant endpoint for refresh_token grant_type. Refreshing an
    Access Token per `Section 6`_.

    .. _`Section 6`: https://tools.ietf.org/html/rfc6749#section-6
    """
    GRANT_TYPE = 'refresh_token'

    #: The authorization server MAY issue a new refresh token
    INCLUDE_NEW_REFRESH_TOKEN = False

    def _validate_request_client(self):
        # require client authentication for confidential clients or for any
        # client that was issued client credentials (or with other
        # authentication requirements)
        client = self.authenticate_token_endpoint_client()
        log.debug('Validate token request of %r', client)

        if not client.check_grant_type(self.GRANT_TYPE):
            raise UnauthorizedClientError()

        return client

    def _validate_request_token(self, client):
        refresh_token = self.request.form.get('refresh_token')
        if refresh_token is None:
            raise InvalidRequestError('Missing "refresh_token" in request.')

        token = self.authenticate_refresh_token(refresh_token)
        if not token or not token.check_client(client):
            raise InvalidGrantError()
        return token

    def _validate_token_scope(self, token):
        scope = self.request.scope
        if not scope:
            return

        original_scope = token.get_scope()
        if not original_scope:
            raise InvalidScopeError()

        original_scope = set(scope_to_list(original_scope))
        if not original_scope.issuperset(set(scope_to_list(scope))):
            raise InvalidScopeError()

    def validate_token_request(self):
        """If the authorization server issued a refresh token to the client, the
        client makes a refresh request to the token endpoint by adding the
        following parameters using the "application/x-www-form-urlencoded"
        format per Appendix B with a character encoding of UTF-8 in the HTTP
        request entity-body, per Section 6:

        grant_type
             REQUIRED.  Value MUST be set to "refresh_token".

        refresh_token
             REQUIRED.  The refresh token issued to the client.

        scope
             OPTIONAL.  The scope of the access request as described by
             Section 3.3.  The requested scope MUST NOT include any scope
             not originally granted by the resource owner, and if omitted is
             treated as equal to the scope originally granted by the
             resource owner.


        For example, the client makes the following HTTP request using
        transport-layer security (with extra line breaks for display purposes
        only):

        .. code-block:: http

            POST /token HTTP/1.1
            Host: server.example.com
            Authorization: Basic czZCaGRSa3F0MzpnWDFmQmF0M2JW
            Content-Type: application/x-www-form-urlencoded

            grant_type=refresh_token&refresh_token=tGzv3JOkF0XG5Qx2TlKWIA
        """
        client = self._validate_request_client()
        self.request.client = client
        refresh_token = self._validate_request_token(client)
        self._validate_token_scope(refresh_token)
        self.request.refresh_token = refresh_token

    def create_token_response(self):
        """If valid and authorized, the authorization server issues an access
        token as described in Section 5.1.  If the request failed
        verification or is invalid, the authorization server returns an error
        response as described in Section 5.2.
        """
        refresh_token = self.request.refresh_token
        user = self.authenticate_user(refresh_token)
        if not user:
            raise InvalidRequestError('There is no "user" for this token.')

        client = self.request.client
        token = self.issue_token(user, refresh_token)
        log.debug('Issue token %r to %r', token, client)

        self.request.user = user
        self.save_token(token)
        self.execute_hook('process_token', token=token)
        self.revoke_old_credential(refresh_token)
        return 200, token, self.TOKEN_RESPONSE_HEADER

    def issue_token(self, user, refresh_token):
        scope = self.request.scope
        if not scope:
            scope = refresh_token.get_scope()

        token = self.generate_token(
            user=user,
            scope=scope,
            include_refresh_token=self.INCLUDE_NEW_REFRESH_TOKEN,
        )
        return token

    def authenticate_refresh_token(self, refresh_token):
        """Get token information with refresh_token string. Developers MUST
        implement this method in subclass::

            def authenticate_refresh_token(self, refresh_token):
                token = Token.get(refresh_token=refresh_token)
                if token and not token.refresh_token_revoked:
                    return token

        :param refresh_token: The refresh token issued to the client
        :return: token
        """
        raise NotImplementedError()

    def authenticate_user(self, refresh_token):
        """Authenticate the user related to this credential. Developers MUST
        implement this method in subclass::

            def authenticate_user(self, credential):
                return User.get(credential.user_id)

        :param refresh_token: Token object
        :return: user
        """
        raise NotImplementedError()

    def revoke_old_credential(self, refresh_token):
        """The authorization server MAY revoke the old refresh token after
        issuing a new refresh token to the client. Developers MUST implement
        this method in subclass::

            def revoke_old_credential(self, refresh_token):
                credential.revoked = True
                credential.save()

        :param refresh_token: Token object
        """
        raise NotImplementedError()