authlib/oidc/core/grants/util.py
import time
from authlib.oauth2.rfc6749 import InvalidRequestError
from authlib.oauth2.rfc6749 import scope_to_list
from authlib.jose import jwt
from authlib.common.encoding import to_native
from authlib.common.urls import add_params_to_uri, quote_url
from ..util import create_half_hash
from ..errors import (
LoginRequiredError,
AccountSelectionRequiredError,
ConsentRequiredError,
)
def is_openid_scope(scope):
scopes = scope_to_list(scope)
return scopes and 'openid' in scopes
def validate_request_prompt(grant, redirect_uri, redirect_fragment=False):
prompt = grant.request.data.get('prompt')
end_user = grant.request.user
if not prompt:
if not end_user:
grant.prompt = 'login'
return grant
if prompt == 'none' and not end_user:
raise LoginRequiredError(
redirect_uri=redirect_uri,
redirect_fragment=redirect_fragment)
prompts = prompt.split()
if 'none' in prompts and len(prompts) > 1:
# If this parameter contains none with any other value,
# an error is returned
raise InvalidRequestError(
'Invalid "prompt" parameter.',
redirect_uri=redirect_uri,
redirect_fragment=redirect_fragment)
prompt = _guess_prompt_value(
end_user, prompts, redirect_uri, redirect_fragment=redirect_fragment)
if prompt:
grant.prompt = prompt
return grant
def validate_nonce(request, exists_nonce, required=False):
nonce = request.data.get('nonce')
if not nonce:
if required:
raise InvalidRequestError('Missing "nonce" in request.')
return True
if exists_nonce(nonce, request):
raise InvalidRequestError('Replay attack')
def generate_id_token(
token, user_info, key, iss, aud, alg='RS256', exp=3600,
nonce=None, auth_time=None, code=None):
now = int(time.time())
if auth_time is None:
auth_time = now
payload = {
'iss': iss,
'aud': aud,
'iat': now,
'exp': now + exp,
'auth_time': auth_time,
}
if nonce:
payload['nonce'] = nonce
if code:
payload['c_hash'] = to_native(create_half_hash(code, alg))
access_token = token.get('access_token')
if access_token:
payload['at_hash'] = to_native(create_half_hash(access_token, alg))
payload.update(user_info)
return to_native(jwt.encode({'alg': alg}, payload, key))
def create_response_mode_response(redirect_uri, params, response_mode):
if response_mode == 'form_post':
tpl = (
'<html><head><title>Redirecting</title></head>'
'<body onload="javascript:document.forms[0].submit()">'
'<form method="post" action="{}">{}</form></body></html>'
)
inputs = ''.join([
'<input type="hidden" name="{}" value="{}"/>'.format(
quote_url(k), quote_url(v))
for k, v in params
])
body = tpl.format(quote_url(redirect_uri), inputs)
return 200, body, [('Content-Type', 'text/html; charset=utf-8')]
if response_mode == 'query':
uri = add_params_to_uri(redirect_uri, params, fragment=False)
elif response_mode == 'fragment':
uri = add_params_to_uri(redirect_uri, params, fragment=True)
else:
raise InvalidRequestError('Invalid "response_mode" value')
return 302, '', [('Location', uri)]
def _guess_prompt_value(end_user, prompts, redirect_uri, redirect_fragment):
# http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest
if not end_user and 'login' in prompts:
return 'login'
if 'consent' in prompts:
if not end_user:
raise ConsentRequiredError(
redirect_uri=redirect_uri,
redirect_fragment=redirect_fragment)
return 'consent'
elif 'select_account' in prompts:
if not end_user:
raise AccountSelectionRequiredError(
redirect_uri=redirect_uri,
redirect_fragment=redirect_fragment)
return 'select_account'