
View on GitHub


25 mins
Test Coverage
from functools import wraps
from packaging import version
from typing import List

import requests
from jose import jwt
from flask import g, request, current_app as app, Flask
from werkzeug.exceptions import Unauthorized, BadRequest, PreconditionFailed

from ..models import Users, UserSchema
from ..config.settings import AUTH0_DOMAIN, ALGORITHMS, AUTH0_CLIENT_ID
from ..config.logging import get_logger

logger = get_logger(__name__)

### Main auth utility functions ###
def validate_api_auth(app: Flask):
    Assert that all URLs in `app`'s API are explicitly marked with either
    `requires_auth` or `public`.
    unmarked_endpoints = []
    for label, endpoint in app.view_functions.items():
        if not hasattr(endpoint, "is_protected"):

    assert len(unmarked_endpoints) == 0, (
        "All endpoints must use either the `requires_auth` or `public` decorator "
        "to explicitly specify their auth configuration. Missing from the following "
        "endpoints: " + ", ".join(unmarked_endpoints)

def requires_auth(resource: str, allowed_roles: list = []):
    A decorator that adds authentication and basic access to an endpoint.

    NOTE: leaving the `allowed_roles` argument empty allows any authenticated user to access
    the decorated endpoint.

        Unauthorized if unauthorized

    def decorator(endpoint):
        # Store metadata on this function stating that it is protected by authentication
        endpoint.is_protected = True

        def wrapped(*args, **kwargs):
            is_authorized = check_auth(allowed_roles, resource, request.method)
            if not is_authorized:
                raise Unauthorized("Please provide proper credentials")
            return endpoint(*args, **kwargs)

        return wrapped

    return decorator

def authenticate_and_get_user():
    Try to authenticate the user associated with this request. Return the user
    if authentication succeeds, or `None` if it fails.
    NOTE: this function bypasses RBAC. It's up to the caller to determine whether
    an authenticated user is authorized to take subsequent action.
        check_auth(None, None, None)
        return get_current_user()
    except (AssertionError, BadRequest, PreconditionFailed, Unauthorized):
        return None

def public(endpoint):
    """Declare an endpoint to be public, i.e., not requiring auth."""
    # Store metadata on this function stating that it is unprotected
    endpoint.is_protected = False

    return endpoint

def check_auth(allowed_roles: List[str], resource: str, method: str) -> bool:
    Perform authentication and authorization for the current request.

        allowed_roles: a list of CIDC user roles allowed to access this endpoint
        resource: the resource targeted by this request
        method: the HTTP method of this request
        Unauthorized if not authorized
        BadRequest if cannot parse User-Agent string
        PreconditionFailed if too low CLI version
        bool, `True` if authentication and authorization passed.
    user = authenticate()

        is_authorized = authorize(user, allowed_roles, resource, method)
    except Unauthorized:



    return is_authorized

### Current user management ###
CURRENT_USER_KEY = "current_user"

def _set_current_user(user: Users):
    """Store a user in the current request's context.
    Raises AssertionError if not given a `Users`"""
    assert isinstance(user, Users), "`user` must be an instance of the `Users` model"
    setattr(g, CURRENT_USER_KEY, user)

def get_current_user() -> Users:
    """Returns the authenticated user who made the current request.
    Raises AssertionError if no current user"""
    current_user = g.get(CURRENT_USER_KEY)

    assert current_user, (
        "There is no user associated with the current request.\n"
        "Note: `auth.get_current_user` can't be called by a request handler without authentication. "
        "Decorate your handler with `auth.requires_auth` to authenticate the requesting user before calling the handler."

    return current_user

### Authentication logic ###
_user_schema = UserSchema()

def authenticate() -> Users:
    id_token = _extract_token()
    public_key = _get_issuer_public_key(id_token)
    token_payload = _decode_id_token(id_token, public_key)
    profile = {"email": token_payload["email"]}
    return _user_schema.load(profile)

def _extract_token() -> str:
    """Extract an identity token from the current request's authorization header or from the request body.
    Raises Unauthorized if cannot find the token"""
    auth_header = request.headers.get("Authorization")

        if auth_header:
            bearer, id_token = auth_header.split(" ")
            assert bearer.lower() == "bearer"
            id_token = request.json["id_token"]
    except (AssertionError, AttributeError, KeyError, TypeError, ValueError):
        raise Unauthorized(
            "Either the 'Authorization' header must be set with structure 'Authorization: Bearer <id token>' "
            'or "id_token" must be present in the JSON body of the request.'

    return id_token

def _get_issuer_public_key(token: str) -> dict:
    Get the appropriate public key to check this token for authenticity.

        token: an encoded JWT.

        Unauthorized: if no public key can be found.

        str: the public key.
        header = jwt.get_unverified_header(token)
    except jwt.JWTError as e:
        raise Unauthorized(str(e))

    # Get public keys from our Auth0 domain
    jwks_url = f"https://{AUTH0_DOMAIN}/.well-known/jwks.json"
    jwks = requests.get(jwks_url).json()

    # Obtain the public key used to sign this token
    public_key = None
    for key in jwks["keys"]:
        if key["kid"] == header["kid"]:
            public_key = key

    # If no matching public key was found, we can't validate the token
    if not public_key:
        raise Unauthorized("Found no public key with id %s" % header["kid"])

    return public_key

def _decode_id_token(token: str, public_key: dict) -> dict:
    Decodes the token and checks it for validity.

        token: the JWT to validate and decode
        public_key: public_key

            - if token is expired
            - if token has invalid claims
            - if token signature is invalid in any way
            - if no `.email` field on token

        dict: the decoded token as a dictionary.
        payload = jwt.decode(
            options={"verify_at_hash": False},
    except jwt.ExpiredSignatureError as e:
        raise Unauthorized(
            f"{e} Token expired. Obtain a new login token from the CIDC Portal, then try logging in again."
    except jwt.JWTClaimsError as e:
        raise Unauthorized(str(e))
    except jwt.JWTError as e:
        raise Unauthorized(str(e))

    # Currently, only id_tokens are accepted for authentication.
    # Going forward, we could also accept access tokens that we
    # use to query the userinfo endpoint.
    if "email" not in payload:
        msg = "An id_token with an 'email' field is required to authenticate"
        raise Unauthorized(msg)

    return payload

### Authorization logic ###
def authorize(
    user: Users, allowed_roles: List[str], resource: str, method: str
) -> bool:
    """Check if the current user is authorized to act on the current request's resource.
    Raises Unauthorized
        - if user is not registered
        - if user is disabled
        - if user's registration is pending approval
        - if user.role is not in allowed_roles
    db_user = Users.find_by_email(

    # User hasn't registered yet.
    if not db_user:
        # Although the user doesn't exist in the database, we still
        # make the user's identity data available in the request context.

        # User is only authorized to create themself.
        if resource == "self" and method == "POST":
            return True

        raise Unauthorized(f"{} is not registered.")



    # User is registered but disabled.
    if db_user.disabled:
        # Disabled users are not authorized to do anything but access their
        # account info.
        if resource == "self" and method == "GET":
            return True

        raise Unauthorized(f"{}'s account is disabled.")

    # User is registered but not yet approved.
    if not db_user.approval_date:
        # Unapproved users are not authorized to do anything but access their
        # account info.
        if resource == "self" and method == "GET":
            return True

        raise Unauthorized(f"{}'s registration is pending approval")

    # User is approved and registered, so just check their role.
    if allowed_roles and db_user.role not in allowed_roles:
        raise Unauthorized(
            f"{} is not authorized to access this endpoint."

    return True

### Miscellaneous helpers ###
def _log_user_and_request_details(is_authorized: bool):
    """Log user and request info before every request"""
    log_msg = f"{'' if is_authorized else 'UN'}AUTHORIZED"

    # log request details
    log_msg += f" {request.environ['REQUEST_METHOD']} {request.environ['RAW_URI']}"

    # log user details
    user = get_current_user()
    log_msg += f" (user:{}:{})"

    if is_authorized:

def _enforce_cli_version():
    If the current request appears to come from the CLI and not the Portal, enforce the configured
    minimum CLI version.

        BadRequest if could not parse the User-Agent string
        PreconditionFailed if too low CLI version
    user_agent = request.headers.get("User-Agent")

    # e.g., during testing no User-Agent header is supplied
    if not user_agent:

        client, client_version = user_agent.split("/", 1)
    except ValueError:
        logger.error(f"Unrecognized user-agent string format: {user_agent}")
        raise BadRequest("could not parse User-Agent string")

    # The CLI sets the User-Agent header to `cidc-cli/{version}`,
    # so we can assess whether the requester needs to update their CLI.
    is_old_cli = client == "cidc-cli" and version.parse(client_version) < version.parse(

    if is_old_cli:"cancelling request: detected outdated CLI")
        message = (
            "You appear to be using an out-of-date version of the CIDC CLI. "
            "Please upgrade to the most recent version:\n"
            "    pip3 install --upgrade cidc-cli"
        raise PreconditionFailed(message)