authlib/oauth2/rfc7662/introspection.py
from authlib.consts import default_json_headers
from ..rfc6749 import (
TokenEndpoint,
InvalidRequestError,
UnsupportedTokenTypeError,
)
class IntrospectionEndpoint(TokenEndpoint):
"""Implementation of introspection endpoint which is described in
`RFC7662`_.
.. _RFC7662: https://tools.ietf.org/html/rfc7662
"""
#: Endpoint name to be registered
ENDPOINT_NAME = 'introspection'
def authenticate_token(self, request, client):
"""The protected resource calls the introspection endpoint using an HTTP
``POST`` request with parameters sent as
"application/x-www-form-urlencoded" data. The protected resource sends a
parameter representing the token along with optional parameters
representing additional context that is known by the protected resource
to aid the authorization server in its response.
token
**REQUIRED** The string value of the token. For access tokens, this
is the ``access_token`` value returned from the token endpoint
defined in OAuth 2.0. For refresh tokens, this is the
``refresh_token`` value returned from the token endpoint as defined
in OAuth 2.0.
token_type_hint
**OPTIONAL** A hint about the type of the token submitted for
introspection.
"""
self.check_params(request, client)
token = self.query_token(request.form['token'], request.form.get('token_type_hint'))
if token and self.check_permission(token, client, request):
return token
def check_params(self, request, client):
params = request.form
if 'token' not in params:
raise InvalidRequestError()
hint = params.get('token_type_hint')
if hint and hint not in self.SUPPORTED_TOKEN_TYPES:
raise UnsupportedTokenTypeError()
def create_endpoint_response(self, request):
"""Validate introspection request and create the response.
:returns: (status_code, body, headers)
"""
# The authorization server first validates the client credentials
client = self.authenticate_endpoint_client(request)
# then verifies whether the token was issued to the client making
# the revocation request
token = self.authenticate_token(request, client)
# the authorization server invalidates the token
body = self.create_introspection_payload(token)
return 200, body, default_json_headers
def create_introspection_payload(self, token):
# the token is not active, does not exist on this server, or the
# protected resource is not allowed to introspect this particular
# token, then the authorization server MUST return an introspection
# response with the "active" field set to "false"
if not token:
return {'active': False}
if token.is_expired() or token.is_revoked():
return {'active': False}
payload = self.introspect_token(token)
if 'active' not in payload:
payload['active'] = True
return payload
def check_permission(self, token, client, request):
"""Check if the request has permission to introspect the token. Developers
MUST implement this method::
def check_permission(self, token, client, request):
# only allow a special client to introspect the token
return client.client_id == 'introspection_client'
:return: bool
"""
raise NotImplementedError()
def query_token(self, token_string, token_type_hint):
"""Get the token from database/storage by the given token string.
Developers should implement this method::
def query_token(self, token_string, token_type_hint):
if token_type_hint == 'access_token':
tok = Token.query_by_access_token(token_string)
elif token_type_hint == 'refresh_token':
tok = Token.query_by_refresh_token(token_string)
else:
tok = Token.query_by_access_token(token_string)
if not tok:
tok = Token.query_by_refresh_token(token_string)
return tok
"""
raise NotImplementedError()
def introspect_token(self, token):
"""Read given token and return its introspection metadata as a
dictionary following `Section 2.2`_::
def introspect_token(self, token):
return {
'active': True,
'client_id': token.client_id,
'token_type': token.token_type,
'username': get_token_username(token),
'scope': token.get_scope(),
'sub': get_token_user_sub(token),
'aud': token.client_id,
'iss': 'https://server.example.com/',
'exp': token.expires_at,
'iat': token.issued_at,
}
.. _`Section 2.2`: https://tools.ietf.org/html/rfc7662#section-2.2
"""
raise NotImplementedError()