CSCfi/pebbles

View on GitHub
pebbles/views/applications.py

Summary

Maintainability
D
1 day
Test Coverage
import datetime
import logging
import uuid

import flask_restful as restful
from flask import Blueprint as FlaskBlueprint
from flask import abort, g, request
from flask_restful import fields, reqparse
from sqlalchemy import select
from sqlalchemy.orm.session import make_transient

from pebbles import rules
from pebbles.forms import ApplicationForm
from pebbles.models import db, Application, ApplicationTemplate, Workspace, ApplicationSession
from pebbles.utils import requires_workspace_owner_or_admin, requires_admin, check_config_against_attribute_limits, \
    check_attribute_limit_format
from pebbles.views import commons
from pebbles.views.commons import auth, requires_workspace_manager_or_admin

applications = FlaskBlueprint('applications', __name__)

application_field_role_map = dict(
    admin={
        'id': fields.String(attribute='id'),
        'name': fields.String,
        'description': fields.String,
        'status': fields.String,
        'maximum_lifetime': fields.Integer,
        'labels': fields.List(fields.String),
        'template_id': fields.String,
        'template_name': fields.String,
        'application_type': fields.String,
        'is_enabled': fields.Boolean,
        'config': fields.Raw,
        'attribute_limits': fields.Raw,
        'workspace_id': fields.String,
        'workspace_name': fields.String,
        'workspace_pseudonym': fields.String,
        'info': {
            'memory': fields.String,
            'memory_gib': fields.Float,
            'shared_folder_enabled': fields.Boolean,
            'work_folder_enabled': fields.Boolean,
            'workspace_expiry_ts': fields.Integer,
            'base_config_image': fields.String,
        },
    },
    manager={
        'id': fields.String(attribute='id'),
        'name': fields.String,
        'description': fields.String,
        'status': fields.String,
        'maximum_lifetime': fields.Integer,
        'labels': fields.List(fields.String),
        'template_id': fields.String,
        'template_name': fields.String,
        'application_type': fields.String,
        'is_enabled': fields.Boolean,
        'config': fields.Raw,
        'attribute_limits': fields.Raw,
        'workspace_id': fields.String,
        'workspace_name': fields.String,
        'info': {
            'memory': fields.String,
            'memory_gib': fields.Float,
            'shared_folder_enabled': fields.Boolean,
            'work_folder_enabled': fields.Boolean,
            'workspace_expiry_ts': fields.Integer,
            'base_config_image': fields.String,
        },
    },
    user={
        'id': fields.String(attribute='id'),
        'name': fields.String,
        'description': fields.String,
        'status': fields.String,
        'maximum_lifetime': fields.Integer,
        'labels': fields.List(fields.String),
        'application_type': fields.String,
        'is_enabled': fields.Boolean,
        'workspace_id': fields.String,
        'workspace_name': fields.String,
        'info': {
            'memory': fields.String,
            'memory_gib': fields.Float,
            'shared_folder_enabled': fields.Boolean,
            'work_folder_enabled': fields.Boolean,
            'workspace_expiry_ts': fields.Integer,
            'base_config_image': fields.String,
        },
    },
)


def extract_role(user, application):
    if user.is_admin:
        return 'admin'
    elif commons.is_workspace_manager(user, application.workspace):
        return 'manager'
    else:
        return 'user'


def marshal_based_on_role(role, application):
    application_fields = application_field_role_map.get(role)
    if not application_fields:
        raise RuntimeError('Unknown role %s passed to marshalling application' % role)
    return restful.marshal(application, application_fields)


class ApplicationList(restful.Resource):
    get_parser = reqparse.RequestParser()
    get_parser.add_argument('show_all', type=bool, default=False, location='args')
    get_parser.add_argument('workspace_id', type=str, default=None, required=False, location='args')

    @auth.login_required
    def get(self):
        args = self.get_parser.parse_args()
        user = g.user
        s = rules.generate_application_query(user, args)
        rows = db.session.execute(s).all()
        results = []
        for row in rows:
            application = process_application(row.Application)
            if user.is_admin:
                results.append(marshal_based_on_role('admin', application))
            elif row.WorkspaceMembership.is_manager:
                results.append(marshal_based_on_role('manager', application))
            else:
                results.append(marshal_based_on_role('user', application))

        return results

    @auth.login_required
    @requires_workspace_manager_or_admin
    def post(self):
        form = ApplicationForm()
        if not form.validate_on_submit():
            logging.warning("Form validation error on create application %s" % form.errors)
            return form.errors, 422
        user = g.user
        application = Application()
        template_id = form.template_id.data
        template = ApplicationTemplate.query.filter_by(id=template_id).first()
        if not template:
            abort(422)
        application.template_id = template_id
        workspace_id = form.workspace_id.data
        workspace = Workspace.query.filter_by(id=workspace_id).first()
        if not workspace:
            abort(422)
        if not user.is_admin and not commons.is_workspace_manager(user, workspace):
            logging.warning("invalid workspace for the user")
            abort(403)

        # check workspace quota
        application_count = workspace.applications.filter_by(status='active').count()
        if not (user.is_admin or application_count < workspace.application_quota):
            logging.warning("Maximum number of applications in workspace reached, ws '%s'", workspace.id)
            return dict(
                message="You have reached the maximum number of applications for this workspace."
                        "Contact support if you need more."
            ), 422
        # basic data
        application.name = form.name.data
        application.description = form.description.data
        application.workspace_id = workspace_id

        # base configuration from template
        application.base_config = template.base_config
        application.attribute_limits = template.attribute_limits
        application.application_type = template.application_type

        # for json lists, we need to use raw_data
        application.labels = form.labels.raw_data
        application.config = form.config.data
        application.is_enabled = form.is_enabled.data

        error = check_config_against_attribute_limits(application.config, application.attribute_limits)
        if error:
            return 'Application config failed attribute limit check: %s' % error, 422
        application.maximum_lifetime = application.config.get(
            'maximum_lifetime',
            application.base_config.get('maximum_lifetime', 3600)
        )

        db.session.add(application)
        db.session.commit()

        application.workspace = workspace
        application = process_application(application)
        return marshal_based_on_role(extract_role(user, application), application)


class ApplicationView(restful.Resource):
    @auth.login_required
    def get(self, application_id):
        user = g.user
        parser = reqparse.RequestParser()
        parser.add_argument('show_all', type=bool, default=False, location='args')
        args = parser.parse_args()
        args['application_id'] = application_id

        s = rules.generate_application_query(user, args)
        application = db.session.scalar(s)
        if not application:
            abort(404)

        application = process_application(application)
        return marshal_based_on_role(extract_role(user, application), application)

    @auth.login_required
    @requires_workspace_manager_or_admin
    def put(self, application_id):
        form = ApplicationForm()
        if not form.validate_on_submit():
            logging.warning("validation error on update application config")
            return form.errors, 422

        user = g.user
        application = Application.query.filter_by(id=application_id).first()
        if not application:
            abort(404)

        if application.status in (Application.STATUS_ARCHIVED, Application.STATUS_DELETED):
            abort(422)

        if not user.is_admin and not commons.is_workspace_manager(user, application.workspace):
            logging.warning("invalid workspace for the user")
            abort(403)

        application.name = form.name.data
        application.description = form.description.data

        # for json lists, we need to use raw_data
        application.labels = form.labels.raw_data
        application.config = form.config.data

        if form.is_enabled.raw_data:
            application.is_enabled = form.is_enabled.raw_data[0]
        else:
            application.is_enabled = False

        error = check_config_against_attribute_limits(application.config, application.attribute_limits)
        if error:
            return 'Application config failed attribute limit check: %s' % error, 422
        application.maximum_lifetime = application.config.get(
            'maximum_lifetime',
            application.maximum_lifetime
        )

        db.session.commit()
        application = process_application(application)
        return marshal_based_on_role(extract_role(user, application), application)

    @auth.login_required
    @requires_admin
    def patch(self, application_id):
        parser = reqparse.RequestParser()
        parser.add_argument('status', type=str)
        args = parser.parse_args()
        application = Application.query.filter_by(id=application_id).first()
        if not application:
            abort(404)

        if args.get('status'):
            application.status = args['status']
            application.is_enabled = False
            db.session.commit()

    @auth.login_required
    @requires_workspace_owner_or_admin
    def delete(self, application_id):
        user = g.user
        s = rules.generate_application_query(user, dict(application_id=application_id))
        application = db.session.scalar(s)
        if not application:
            logging.warning("trying to delete non-existing application")
            abort(404)
        elif not (user.is_admin or commons.is_workspace_manager(user, application.workspace)):
            abort(403)
        elif application.status == Application.STATUS_ARCHIVED:
            abort(403)

        # Check if sessions have been created, and we need to preserve the Application in 'DELETED' state for statistics
        application_sessions = ApplicationSession.query.filter_by(application_id=application_id).all()
        if not application_sessions:
            db.session.delete(application)
            db.session.commit()
        elif application_sessions:
            for application_session in application_sessions:
                if application_session.state != ApplicationSession.STATE_DELETED:
                    application_session.to_be_deleted = True
                    application_session.state = ApplicationSession.STATE_DELETING
                    application_session.deprovisioned_at = datetime.datetime.utcnow()
            application.status = application.STATUS_DELETED
            db.session.commit()
        else:
            abort(422)


class ApplicationAttributeLimits(restful.Resource):
    @auth.login_required
    @requires_admin
    def put(self, application_id):
        user = g.user
        application = db.session.scalar(rules.generate_application_query(user, dict(application_id=application_id)))

        if not application:
            logging.warning('application %s does not exist', application_id)
            return 'The application does not exist', 404

        attribute_limits = request.get_json().get('attribute_limits', None)
        if not attribute_limits:
            return 'request is missing attribute_limits', 422
        if type(attribute_limits) is not list:
            return 'attribute_limits needs to be a list', 422

        # check that attribute limits are well-formed
        error = check_attribute_limit_format(attribute_limits)
        if error:
            return 'Invalid attribute limit format, error: %s' % error, 422

        # check that current config is valid with the new limits
        error = check_config_against_attribute_limits(application.config, attribute_limits)
        if error:
            return 'Invalid attribute limits, error: %s' % error, 422

        application.attribute_limits = attribute_limits
        db.session.commit()
        return application.attribute_limits


class ApplicationCopy(restful.Resource):
    parser = reqparse.RequestParser()
    parser.add_argument('workspace_id', type=str, default=None, required=False, location='args')

    @auth.login_required
    @requires_workspace_manager_or_admin
    def put(self, application_id):
        user = g.user
        application = Application.query.filter_by(id=application_id).first_or_404()
        args = self.parser.parse_args()

        # Specific target workspace or just cloning in current one?
        if args.get('workspace_id', None):
            target_workspace = Workspace.query.filter_by(id=args.get('workspace_id')).first()
            # check that user has manager rights in the target workspace
            if not (user.is_admin or commons.is_workspace_manager(user, target_workspace)):
                logging.warning(
                    'ApplicationCopy: user {} is not manager in workspace {}'.format(user.id, target_workspace.id))
                abort(403)
        else:
            target_workspace = application.workspace

        # check rights to the source workspace
        if not (user.is_admin or commons.is_workspace_manager(user, application.workspace)):
            logging.warning(
                'ApplicationCopy: user {} is not manager in workspace {}'.format(user.id, target_workspace.id))
            abort(403)

        # check status
        if not application.status == Application.STATUS_ACTIVE:
            abort(422)

        # check workspace quota
        application_count = target_workspace.applications.filter_by(status='active').count()
        if not (user.is_admin or application_count < target_workspace.application_quota):
            logging.warning('Maximum number of applications in workspace reached, ws "%s"', target_workspace.id)

            return dict(message='You have reached the maximum number of applications for this workspace.'
                                'Contact support if you need more.'), 422

        db.session.expunge(application)
        make_transient(application)
        application.id = uuid.uuid4().hex
        application.workspace_id = target_workspace.id
        application.name = format('%s - %s' % (application.name, 'Copy'))
        application.is_enabled = False
        db.session.add(application)
        db.session.commit()


def process_application(application):
    # cache application template names in the request context to avoid lookups on every call
    template_name_cache = g.setdefault('template_name_cache', dict())
    if application.template_id not in template_name_cache.keys():
        result = db.session.execute(
            select(ApplicationTemplate.id, ApplicationTemplate.name)
        ).all()
        for row in result:
            template_name_cache[row[0]] = row[1]

    # shortcut properties for UI
    application.template_name = template_name_cache.get(application.template_id)

    application.workspace_name = application.workspace.name
    application.workspace_pseudonym = application.workspace.pseudonym
    application.workspace_expiry_ts = application.workspace.expiry_ts
    application.base_config_image = application.base_config.get('image')

    # generate human-readable memory information
    memory_gib = float(application.config.get('memory_gib', application.base_config.get('memory_gib')))
    application.memory_gib = memory_gib
    if memory_gib % 1:
        application.memory = '%dMiB' % round(memory_gib * 1024)
    else:
        application.memory = '%dGiB' % memory_gib

    # rest of the code taken for refactoring from single application GET query
    application.cluster = application.workspace.cluster

    if 'enable_user_work_folder' in application.config and application.config['enable_user_work_folder']:
        application.work_folder_enabled = True
    else:
        application.work_folder_enabled = False

    if application.workspace_name.startswith('System.default'):
        application.shared_folder_enabled = False
    elif 'enable_shared_folder' in application.config:
        if application.config['enable_shared_folder']:
            application.shared_folder_enabled = True
        else:
            application.shared_folder_enabled = False
    else:
        application.shared_folder_enabled = True

    return application