CSCfi/pebbles

View on GitHub
pebbles/views/commons.py

Summary

Maintainability
D
2 days
Test Coverage
import logging
from functools import wraps

from flask import g, abort, current_app
from flask_httpauth import HTTPBasicAuth
from flask_restful import fields

from pebbles.models import db, User, Workspace, WorkspaceMembership

user_fields = {
    'id': fields.String,
    'ext_id': fields.String,
    'email_id': fields.String,
    'pseudonym': fields.String,
    'workspace_quota': fields.Integer,
    'is_active': fields.Boolean,
    'is_admin': fields.Boolean,
    'is_deleted': fields.Boolean,
    'is_blocked': fields.Boolean,
    'joining_ts': fields.Integer,
    'expiry_ts': fields.Integer,
    'last_login_ts': fields.Integer,
}

workspace_membership_fields = {
    'workspace_id': fields.String,
    'user_id': fields.String,
    'is_owner': fields.Boolean,
    'is_manager': fields.Boolean,
    'is_banned': fields.Boolean,
}

auth = HTTPBasicAuth()
auth.authenticate_header = lambda: "Authentication Required"

# Delimiter between optional identity domain/prefix and username(eppn/vppn/email)
EXT_ID_PREFIX_DELIMITER = '/'


@auth.verify_password
def verify_password(userid_or_token, password):
    g.user = User.verify_auth_token(userid_or_token, current_app.config['SECRET_KEY'])
    if not g.user:
        g.user = User.query.filter_by(ext_id=userid_or_token).first()
        if not g.user:
            return False
        if not g.user.check_password(password):
            return False
    return True


def create_worker():
    return create_user('worker@pebbles', current_app.config['SECRET_KEY'], is_admin=True, email_id=None)


def create_user(ext_id, password, is_admin=False, email_id=None, expiry_ts=None, annotations=None):
    if User.query.filter_by(ext_id=ext_id).first():
        logging.info("user %s already exists" % ext_id)
        return None

    user = User(ext_id, password, is_admin=is_admin, email_id=email_id)
    if annotations:
        user.annotations = annotations
    if not is_admin:
        add_user_to_default_workspace(user)
    if expiry_ts:
        user.expiry_ts = expiry_ts
    db.session.add(user)
    db.session.commit()
    return user


def update_email(ext_id, email_id=None):
    user = User.query.filter_by(ext_id=ext_id).first()
    if email_id:
        user.email_id = email_id
    db.session.add(user)
    db.session.commit()
    return user


def create_system_workspaces(admin):
    system_default_workspace = Workspace('System.default')
    workspace_admin_obj = WorkspaceMembership(
        workspace=system_default_workspace, user=admin, is_owner=True, is_manager=True)
    system_default_workspace.memberships.append(workspace_admin_obj)
    db.session.add(system_default_workspace)
    db.session.commit()


def can_user_join_workspace(user, ws):
    if not user.taints:
        return True
    tolerations = ws.membership_join_policy.get('tolerations', [])
    return set(user.taints) <= set(tolerations)


def add_user_to_default_workspace(user):
    system_default_workspace = Workspace.query.filter_by(name='System.default').first()
    if not can_user_join_workspace(user, system_default_workspace):
        logging.info('User %s could not be added to System.default workspace', user.ext_id)
        return
    workspace_user_obj = WorkspaceMembership(workspace=system_default_workspace, user=user)
    system_default_workspace.memberships.append(workspace_user_obj)
    db.session.add(system_default_workspace)
    db.session.commit()


def requires_workspace_manager_or_admin(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        if not g.user.is_admin and not g.user.is_workspace_owner and not g.user.is_workspace_manager:
            abort(403)
        return f(*args, **kwargs)

    return decorated


def is_workspace_manager(user, workspace=None):
    if workspace:
        if workspace.status != Workspace.STATUS_ACTIVE:
            return False
        manager_cache = g.setdefault('manager_cache', dict())
        key = '%s:%s' % (user.id, workspace.id)
        if key not in manager_cache.keys():
            logging.debug('manager cache: adding key %s' % key)
            manager_cache[key] = user.id in (wm.user_id for wm in workspace.memberships if wm.is_manager)
        return manager_cache.get(key)
    else:
        # generic property can be obtained from User
        return user.is_workspace_manager


def is_workspace_owner(user, workspace=None):
    if workspace:
        if workspace.status != Workspace.STATUS_ACTIVE:
            return False

        owner_cache = g.setdefault('owner_cache', dict())
        key = '%s:%s' % (user.id, workspace.id)
        if key not in owner_cache.keys():
            logging.debug('owner cache: adding key %s' % key)
            owner_cache[key] = user.id in (wm.user_id for wm in workspace.memberships if wm.is_owner)
        return owner_cache.get(key)
    else:
        # generic property can be obtained from User
        return user.is_workspace_owner