lepture/authlib

View on GitHub
authlib/oidc/core/grants/util.py

Summary

Maintainability
A
0 mins
Test Coverage
import time
from authlib.oauth2.rfc6749 import InvalidRequestError
from authlib.oauth2.rfc6749 import scope_to_list
from authlib.jose import jwt
from authlib.common.encoding import to_native
from authlib.common.urls import add_params_to_uri, quote_url
from ..util import create_half_hash
from ..errors import (
    LoginRequiredError,
    AccountSelectionRequiredError,
    ConsentRequiredError,
)


def is_openid_scope(scope):
    scopes = scope_to_list(scope)
    return scopes and 'openid' in scopes


def validate_request_prompt(grant, redirect_uri, redirect_fragment=False):
    prompt = grant.request.data.get('prompt')
    end_user = grant.request.user
    if not prompt:
        if not end_user:
            grant.prompt = 'login'
        return grant

    if prompt == 'none' and not end_user:
        raise LoginRequiredError(
            redirect_uri=redirect_uri,
            redirect_fragment=redirect_fragment)

    prompts = prompt.split()
    if 'none' in prompts and len(prompts) > 1:
        # If this parameter contains none with any other value,
        # an error is returned
        raise InvalidRequestError(
            'Invalid "prompt" parameter.',
            redirect_uri=redirect_uri,
            redirect_fragment=redirect_fragment)

    prompt = _guess_prompt_value(
        end_user, prompts, redirect_uri, redirect_fragment=redirect_fragment)
    if prompt:
        grant.prompt = prompt
    return grant


def validate_nonce(request, exists_nonce, required=False):
    nonce = request.data.get('nonce')
    if not nonce:
        if required:
            raise InvalidRequestError('Missing "nonce" in request.')
        return True

    if exists_nonce(nonce, request):
        raise InvalidRequestError('Replay attack')


def generate_id_token(
        token, user_info, key, iss, aud, alg='RS256', exp=3600,
        nonce=None, auth_time=None, code=None):

    now = int(time.time())
    if auth_time is None:
        auth_time = now

    payload = {
        'iss': iss,
        'aud': aud,
        'iat': now,
        'exp': now + exp,
        'auth_time': auth_time,
    }
    if nonce:
        payload['nonce'] = nonce

    if code:
        payload['c_hash'] = to_native(create_half_hash(code, alg))

    access_token = token.get('access_token')
    if access_token:
        payload['at_hash'] = to_native(create_half_hash(access_token, alg))

    payload.update(user_info)
    return to_native(jwt.encode({'alg': alg}, payload, key))


def create_response_mode_response(redirect_uri, params, response_mode):
    if response_mode == 'form_post':
        tpl = (
            '<html><head><title>Redirecting</title></head>'
            '<body onload="javascript:document.forms[0].submit()">'
            '<form method="post" action="{}">{}</form></body></html>'
        )
        inputs = ''.join([
            '<input type="hidden" name="{}" value="{}"/>'.format(
                quote_url(k), quote_url(v))
            for k, v in params
        ])
        body = tpl.format(quote_url(redirect_uri), inputs)
        return 200, body, [('Content-Type', 'text/html; charset=utf-8')]

    if response_mode == 'query':
        uri = add_params_to_uri(redirect_uri, params, fragment=False)
    elif response_mode == 'fragment':
        uri = add_params_to_uri(redirect_uri, params, fragment=True)
    else:
        raise InvalidRequestError('Invalid "response_mode" value')

    return 302, '', [('Location', uri)]


def _guess_prompt_value(end_user, prompts, redirect_uri, redirect_fragment):
    # http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest

    if not end_user and 'login' in prompts:
        return 'login'

    if 'consent' in prompts:
        if not end_user:
            raise ConsentRequiredError(
                redirect_uri=redirect_uri,
                redirect_fragment=redirect_fragment)
        return 'consent'
    elif 'select_account' in prompts:
        if not end_user:
            raise AccountSelectionRequiredError(
                redirect_uri=redirect_uri,
                redirect_fragment=redirect_fragment)
        return 'select_account'