kuasha/cosmos

View on GitHub
cosmos/auth/oauth2.py

Summary

Maintainability
A
1 hr
Test Coverage
"""
 Copyright (C) 2015 Maruf Maniruzzaman
 Website: http://cosmosframework.com
 Author: Maruf Maniruzzaman
 License :: OSI Approved :: MIT License
"""

import jwt, Crypto.PublicKey.RSA as RSA, datetime
import uuid


class ValidationError(ValueError):
    pass

"""
  FROM RFC 6749: http://tools.ietf.org/html/rfc6749


  +--------+                                           +---------------+
  |        |--(A)------- Authorization Grant --------->|               |
  |        |                                           |               |
  |        |<-(B)----------- Access Token -------------|               |
  |        |               & Refresh Token             |               |
  |        |                                           |               |
  |        |                            +----------+   |               |
  |        |--(C)---- Access Token ---->|          |   |               |
  |        |                            |          |   |               |
  |        |<-(D)- Protected Resource --| Resource |   | Authorization |
  | Client |                            |  Server  |   |     Server    |
  |        |--(E)---- Access Token ---->|          |   |               |
  |        |                            |          |   |               |
  |        |<-(F)- Invalid Token Error -|          |   |               |
  |        |                            +----------+   |               |
  |        |                                           |               |
  |        |--(G)----------- Refresh Token ----------->|               |
  |        |                                           |               |
  |        |<-(H)----------- Access Token -------------|               |
  +--------+           & Optional Refresh Token        +---------------+

               Figure 2: Refreshing an Expired Access Token

"""

CLIENT_APPLICATIONS_TABLE = "cosmos.auth.apps"


def authorize(user, response_type, client_id, redirect_uri, **kwargs):
    state = kwargs.get("state", None)
    resource = kwargs.get("resource", "id")

    session_state = str(uuid.uuid4())
    code = {"resource": resource, }
    result = {"code": code, "session_state": session_state}

    if state:
        result["state"] = state

    return result


def get_token(**kwargs):
    service_private_pem = kwargs.get("service_private_pem")
    if not service_private_pem:
        raise ValueError("service_private_pem is not defined")

    aud = kwargs.get("aud", None)
    exp = kwargs.get("exp", None)
    family_name = kwargs.get("family_name", None)
    given_name = kwargs.get("given_name", None)

    token_payload = {
        "aud": aud,
        "exp": exp,
        "family_name": family_name,
        "given_name": given_name,
        "ver": "1.0"
    }

    iss = kwargs.get("iss", None)
    if iss:
        token_payload["iss"] = iss

    nbf = kwargs.get("nbf", None)
    if nbf:
        token_payload["nbf"] = nbf

    oid = kwargs.get("oid", None)
    if oid:
        token_payload["oid"] = oid

    sub = kwargs.get("sub", None)
    if sub:
        token_payload["sub"] = sub

    tid = kwargs.get("tid", None)
    if tid:
        token_payload["tid"] = tid

    unique_name = kwargs.get("unique_name", None)
    if unique_name:
        token_payload["unique_name"] = unique_name

    upn = kwargs.get("upn", None)
    if upn:
        token_payload["upn"] = upn

    priv_key = RSA.importKey(service_private_pem)
    token = jwt.generate_jwt(token_payload, priv_key, 'RS256', exp)

    return token


def verify_token(token, public_key_pem, alg_list):
    try:
        header, claims = jwt.verify_jwt(token, public_key_pem, alg_list)
        return header, claims
    except Exception as ex:
        raise ValidationError(ex)