scitran/core

View on GitHub
api/handlers/containerhandler.py

Summary

Maintainability
F
4 days
Test Coverage
import bson
import datetime
import dateutil

from .. import config
from .. import util
from .. import validators
from ..auth import containerauth, always_ok
from ..dao import containerstorage, containerutil, noop
from ..dao.containerstorage import AnalysisStorage
from ..jobs.gears import get_gear
from ..jobs.jobs import Job
from ..jobs.queue import Queue
from ..types import Origin
from ..web import base
from ..web.errors import APIStorageException, APIPermissionException
from ..web.request import log_access, AccessType

log = config.log


class ContainerHandler(base.RequestHandler):
    """
    This class handle operations on a generic container

    The pattern used is:
    1) load the storage class used to interact with mongo
    2) configure the permissions checker and the json payload validators
    3) validate the input payload
    4) augment the payload when appropriate
    5) exec the request (using the mongo validator and the permissions checker)
    6) check the result
    7) augment the result when needed
    8) return the result

    Specific behaviors (permissions checking logic for authenticated and not superuser users, storage interaction)
    are specified in the container_handler_configurations
    """
    use_object_id = {
        'groups': False,
        'projects': True,
        'sessions': True,
        'acquisitions': True
    }
    default_list_projection = ['files', 'notes', 'timestamp', 'timezone', 'public']

    # This configurations are used by the ContainerHandler class to load the storage,
    # the permissions checker and the json schema validators used to handle a request.
    #
    # "children_cont" represents the children container.
    # "list projection" is used to filter data in mongo.
    # "use_object_id" implies that the container ids are converted to ObjectId
    container_handler_configurations = {
        'projects': {
            'storage': containerstorage.ProjectStorage(),
            'permchecker': containerauth.default_container,
            'parent_storage': containerstorage.GroupStorage(),
            'storage_schema_file': 'project.json',
            'payload_schema_file': 'project.json',
            'list_projection': {'info': 0, 'files.info': 0},
            'propagated_properties': ['public'],
            'children_cont': 'sessions'
        },
        'sessions': {
            'storage': containerstorage.SessionStorage(),
            'permchecker': containerauth.default_container,
            'parent_storage': containerstorage.ProjectStorage(),
            'storage_schema_file': 'session.json',
            'payload_schema_file': 'session.json',
            # Remove subject first/last from list view to better log access to this information
            'list_projection': {'info': 0, 'analyses': 0, 'subject.firstname': 0,
                                'subject.lastname': 0, 'subject.sex': 0, 'subject.age': 0,
                                'subject.race': 0, 'subject.ethnicity': 0, 'subject.info': 0,
                                'files.info': 0, 'tags': 0},
            'children_cont': 'acquisitions'
        },
        'acquisitions': {
            'storage': containerstorage.AcquisitionStorage(),
            'permchecker': containerauth.default_container,
            'parent_storage': containerstorage.SessionStorage(),
            'storage_schema_file': 'acquisition.json',
            'payload_schema_file': 'acquisition.json',
            'list_projection': {'info': 0, 'collections': 0, 'files.info': 0, 'tags': 0}
        }
    }

    def __init__(self, request=None, response=None):
        super(ContainerHandler, self).__init__(request, response)
        self.storage = None
        self.config = None

    @log_access(AccessType.view_container)
    def get(self, cont_name, **kwargs):
        _id = kwargs.get('cid')
        self.config = self.container_handler_configurations[cont_name]
        self.storage = self.config['storage']
        container = self._get_container(_id)

        permchecker = self._get_permchecker(container)
        try:
            # This line exec the actual get checking permissions using the decorator permchecker
            result = permchecker(self.storage.exec_op)('GET', _id)
        except APIStorageException as e:
            self.abort(400, e.message)
        if result is None:
            self.abort(404, 'Element not found in container {} {}'.format(self.storage.cont_name, _id))
        if not self.superuser_request and not self.is_true('join_avatars'):
            self._filter_permissions(result, self.uid)
        if self.is_true('join_avatars'):
            self.join_user_info([result])
        # build and insert file paths if they are requested
        if self.is_true('paths'):
            for fileinfo in result['files']:
                fileinfo['path'] = util.path_from_hash(fileinfo['hash'])

        inflate_job_info = cont_name == 'sessions'
        result['analyses'] = AnalysisStorage().get_analyses(cont_name, _id, inflate_job_info)
        return self.handle_origin(result)

    def handle_origin(self, result):
        """
        Given an object with a `files` array key, coalesce and merge file origins if requested.
        """

        # If `join=origin` passed as a request param, join out that key
        join_origin = 'origin' in self.request.params.getall('join')

        # Now that gears are identified by ID rather than name, the origin.job.gear_id is not enough.
        # Joining the whole gear doc in feels like overkill; for now let's just add gear names to jobs
        join_gear_name = 'origin_job_gear_name' in self.request.params.getall('join')

        # If it was requested, create a map of each type of origin to hold the join
        if join_origin:
            result['join-origin'] = {
                Origin.user.name:   {},
                Origin.device.name: {},
                Origin.job.name:    {}
            }

        # Cache looked-up gears if needed
        cached_gears = {}

        for f in result.get('files', []):
            origin = f.get('origin', None)

            if origin is None:
                # Backfill origin maps if none provided from DB
                f['origin'] = {
                    'type': str(Origin.unknown),
                    'id': None
                }

            elif join_origin:
                j_type = f['origin']['type']
                j_id   = f['origin']['id']
                j_id_b = j_id

                # Some tables don't use BSON for their primary keys.
                if j_type not in (Origin.user, Origin.device):
                    j_id_b = bson.ObjectId(j_id)

                # Join from database if we haven't for this origin before
                if j_type != 'unknown' and result['join-origin'][j_type].get(j_id, None) is None:
                    # Initial join
                    j_types = containerutil.pluralize(j_type)
                    join_doc = config.db[j_types].find_one({'_id': j_id_b})

                    # Join in gear name on the job doc if requested
                    if join_doc is not None and join_gear_name and j_type == 'job':

                        gear_id = join_doc['gear_id']
                        gear_name = None

                        if cached_gears.get(gear_id, None) is not None:
                            gear_name = cached_gears[gear_id]
                        else:
                            gear_id_bson = bson.ObjectId(gear_id)
                            gear = config.db.gears.find_one({'_id': gear_id_bson})
                            gear_name = gear['gear']['name']

                        join_doc['gear_name'] = gear_name
                        cached_gears[gear_id] = gear_name

                    # Save to join table
                    result['join-origin'][j_type][j_id] = join_doc

        return result

    @staticmethod
    def join_user_info(results):
        """
        Given a list of containers, adds avatar and name context to each member of the permissions and notes lists
        """

        # Get list of all users, hash by uid
        # TODO: This is not an efficient solution if there are hundreds of inactive users
        users_list = containerstorage.ContainerStorage('users', use_object_id=False).get_all_el({}, None, None)
        users = {user['_id']: user for user in users_list}

        for r in results:
            permissions = r.get('permissions', [])
            notes = r.get('notes', [])

            for p in permissions+notes:
                uid = p['user'] if 'user' in p else p['_id']
                user = users[uid]
                p['avatar'] = user.get('avatar')
                p['firstname'] = user.get('firstname', '')
                p['lastname'] = user.get('lastname', '')


        return results

    def _filter_permissions(self, result, uid):
        """
        if the user is not admin only her permissions are returned.
        """
        user_perm = util.user_perm(result.get('permissions', []), uid)
        if user_perm.get('access') != 'admin':
            result['permissions'] = [user_perm] if user_perm else []

    def get_subject(self, cid):
        self.config = self.container_handler_configurations['sessions']
        self.storage = self.config['storage']
        container= self._get_container(cid)

        permchecker = self._get_permchecker(container)
        result = permchecker(self.storage.exec_op)('GET', cid)
        self.log_user_access(AccessType.view_subject, cont_name='sessions', cont_id=cid)
        return result.get('subject', {})


    def get_jobs(self, cid):
        # Only enabled for sessions container type per url rule in api.py
        self.config = self.container_handler_configurations["sessions"]
        self.storage = self.config['storage']
        cont = self._get_container(cid, projection={'files': 0, 'metadata': 0}, get_children=True)

        permchecker = self._get_permchecker(cont)

        permchecker(noop)('GET', cid)

        analyses = AnalysisStorage().get_analyses('session', cont['_id'])
        acquisitions = cont.get('acquisitions', [])

        results = []
        if not acquisitions and not analyses:
            # no jobs
            return {'jobs': results}

        # Get query params
        states      = self.request.GET.getall('states')
        tags        = self.request.GET.getall('tags')
        join_cont   = 'containers' in self.request.params.getall('join')
        join_gears  = 'gears' in self.request.params.getall('join')

        # search for jobs
        if acquisitions:
            id_array = [str(c['_id']) for c in acquisitions]
            cont_array = [containerutil.ContainerReference('acquisition', cid) for cid in id_array]
            results += Queue.search(cont_array, states=states, tags=tags)

            # Also check the acquisition's session
            results += Queue.search([containerutil.ContainerReference('session', str(cont['_id']))], states=states, tags=tags)

        if analyses:
            id_array = [str(c['_id']) for c in analyses]
            cont_array = [containerutil.ContainerReference('analysis', cid) for cid in id_array]
            results += Queue.search(cont_array, states=states, tags=tags)

        # Stateful closure to remove search duplicates
        # Eventually, this code should not call Queue.search and should instead do its own work.
        def match_ids(x):
            in_set = str(x['_id']) in match_ids.unique_job_ids
            match_ids.unique_job_ids.add(str(x['_id']))
            return not in_set

        match_ids.unique_job_ids = set()
        results = filter(match_ids, results)

        # Ensure job uniqueness
        seen_jobs = []
        seen_gears = []
        jobs = []
        for j in results:
            if j['_id'] not in seen_jobs:
                job  = Job.load(j)
                jobs.append(job)
                seen_jobs.append(job.id_)
            if j.get('gear_id') and j['gear_id'] not in seen_gears:
                seen_gears.append(j['gear_id'])

        jobs.sort(key=lambda j: j.created)

        response = {'jobs': jobs}
        if join_gears:
            response['gears'] = {}
            for g_id in seen_gears:
                response['gears'][g_id] = get_gear(g_id)
        if join_cont:
            # create a map of analyses and acquisitions by _id
            containers = dict((str(c['_id']), c) for c in analyses+acquisitions)
            for container in containers.itervalues():
                # No need to return perm arrays
                container.pop('permissions', None)
            response['containers'] = containers

        return response

    def get_all(self, cont_name, par_cont_name=None, par_id=None):
        self.config = self.container_handler_configurations[cont_name]
        self.storage = self.config['storage']

        projection = self.config['list_projection'].copy()

        if self.is_true('permissions'):
            if not projection:
                projection = None

        # select which permission filter will be applied to the list of results.
        if self.superuser_request:
            permchecker = always_ok
        elif self.public_request:
            permchecker = containerauth.list_public_request
        else:
            permchecker = containerauth.list_permission_checker(self)
        # if par_cont_name (parent container name) and par_id are not null we return only results
        # within that container
        if par_cont_name:
            if not par_id:
                self.abort(500, 'par_id is required when par_cont_name is provided')
            if self.use_object_id.get(par_cont_name):
                if not bson.ObjectId.is_valid(par_id):
                    self.abort(400, 'not a valid object id')
                par_id = bson.ObjectId(par_id)
            query = {par_cont_name[:-1]: par_id}
        else:
            query = {}
        # this request executes the actual reqeust filtering containers based on the user permissions
        results = permchecker(self.storage.exec_op)('GET', query=query, public=self.public_request, projection=projection)
        if results is None:
            self.abort(404, 'No elements found in container {}'.format(self.storage.cont_name))
        # return only permissions of the current user unless superuser or getting avatars
        if not self.superuser_request and not self.is_true('join_avatars'):
            self._filter_all_permissions(results, self.uid)
        # the "count" flag add a count for each container returned
        if self.is_true('counts'):
            self._add_results_counts(results, cont_name)
        # the "measurements" flag applies only to query for sessions
        # and add a list of the measurements in the child acquisitions
        if cont_name == 'sessions' and self.is_true('measurements'):
            self._add_session_measurements(results)

        modified_results = []
        for result in results:
            if self.is_true('stats'):
                result = containerutil.get_stats(result, cont_name)
            result = self.handle_origin(result)
            modified_results.append(result)

        if self.is_true('join_avatars'):
            modified_results = self.join_user_info(modified_results)

        return modified_results

    def _filter_all_permissions(self, results, uid):
        for result in results:
            user_perm = util.user_perm(result.get('permissions', []), uid)
            result['permissions'] = [user_perm] if user_perm else []
        return results

    def _add_results_counts(self, results, cont_name):
        dbc_name = self.config.get('children_cont')
        el_cont_name = cont_name[:-1]
        dbc = config.db[dbc_name]
        counts =  dbc.aggregate([
            {'$match': {el_cont_name: {'$in': [res['_id'] for res in results]}}},
            {'$group': {'_id': '$' + el_cont_name, 'count': {"$sum": 1}}}
            ])
        counts = {elem['_id']: elem['count'] for elem in counts}
        for elem in results:
            elem[dbc_name[:-1] + '_count'] = counts.get(elem['_id'], 0)

    def get_all_for_user(self, cont_name, uid):
        self.config = self.container_handler_configurations[cont_name]
        self.storage = self.config['storage']
        projection = self.config['list_projection']
        # select which permission filter will be applied to the list of results.
        if self.superuser_request or self.user_is_admin:
            permchecker = always_ok
        elif self.public_request:
            self.abort(403, 'this request is not allowed')
        else:
            permchecker = containerauth.list_permission_checker(self)
        query = {}
        user = {
            '_id': uid
        }
        try:
            results = permchecker(self.storage.exec_op)('GET', query=query, user=user, projection=projection)
        except APIStorageException as e:
            self.abort(400, e.message)
        if results is None:
            self.abort(404, 'Element not found in container {} {}'.format(self.storage.cont_name, uid))
        self._filter_all_permissions(results, uid)
        return results

    def post(self, cont_name):
        self.config = self.container_handler_configurations[cont_name]
        self.storage = self.config['storage']
        mongo_validator, payload_validator = self._get_validators()

        payload = self.request.json_body
        #validate the input payload
        payload_validator(payload, 'POST')
        # Load the parent container in which the new container will be created
        # to check permissions.
        parent_container, parent_id_property = self._get_parent_container(payload)
        # Always add the id of the parent to the container
        payload[parent_id_property] = parent_container['_id']
        # If the new container is a session add the group of the parent project in the payload
        if cont_name == 'sessions':
            payload['group'] = parent_container['group']
            payload['subject'] = containerutil.add_id_to_subject(payload.get('subject'), payload.get('project'))
        # Optionally inherit permissions of a project from the parent group. The default behaviour
        # for projects is to give admin permissions to the requestor.
        # The default for other containers is to inherit.
        if self.is_true('inherit') and cont_name == 'projects':
            payload['permissions'] = parent_container.get('permissions')
        elif cont_name =='projects':
            payload['permissions'] = [{'_id': self.uid, 'access': 'admin'}] if self.uid else []
        else:
            payload['permissions'] = parent_container.get('permissions', [])
        # Created and modified timestamps are added here to the payload
        payload['created'] = payload['modified'] = datetime.datetime.utcnow()
        if payload.get('timestamp'):
            payload['timestamp'] = dateutil.parser.parse(payload['timestamp'])
        permchecker = self._get_permchecker(parent_container=parent_container)
        # This line exec the actual request validating the payload that will create the new container
        # and checking permissions using respectively the two decorators, mongo_validator and permchecker
        result = mongo_validator(permchecker(self.storage.exec_op))('POST', payload=payload)
        if result.acknowledged:
            return {'_id': result.inserted_id}
        else:
            self.abort(404, 'Element not added in container {}'.format(self.storage.cont_name))

    @validators.verify_payload_exists
    def put(self, cont_name, **kwargs):
        _id = kwargs.pop('cid')
        self.config = self.container_handler_configurations[cont_name]
        self.storage = self.config['storage']
        container = self._get_container(_id)
        mongo_validator, payload_validator = self._get_validators()

        payload = self.request.json_body
        payload_validator(payload, 'PUT')

        # Check if any payload keys are any propogated property, add to r_payload
        rec = False
        r_payload = {}
        prop_keys = set(payload.keys()).intersection(set(self.config.get('propagated_properties', [])))
        if prop_keys:
            rec = True
            for key in prop_keys:
                r_payload[key] = payload[key]

        # Check if we are updating the parent container of the element (ie we are moving the container)
        # In this case, we will check permissions on it.
        target_parent_container, parent_id_property = self._get_parent_container(payload)
        if target_parent_container:
            if cont_name in ['sessions', 'acquisitions']:
                payload[parent_id_property] = bson.ObjectId(payload[parent_id_property])
                parent_perms = target_parent_container.get('permissions', [])
                payload['permissions'] = parent_perms

            if cont_name == 'sessions':
                payload['group'] = target_parent_container['group']
                # Propagate permissions down to acquisitions
                rec = True
                r_payload['permissions'] = parent_perms


        payload['modified'] = datetime.datetime.utcnow()
        if payload.get('timestamp'):
            payload['timestamp'] = dateutil.parser.parse(payload['timestamp'])
        if cont_name == 'sessions':
            if payload.get('subject') is not None and payload['subject'].get('_id') is not None:
                # Ensure subject id is a bson object
                payload['subject']['_id'] = bson.ObjectId(str(payload['subject']['_id']))
        permchecker = self._get_permchecker(container, target_parent_container)

        # Specifies wether the metadata fields should be replaced or patched with payload value
        replace_metadata = self.get_param('replace_metadata', default=False)
        try:
            # This line exec the actual request validating the payload that will update the container
            # and checking permissions using respectively the two decorators, mongo_validator and permchecker
            result = mongo_validator(permchecker(self.storage.exec_op))('PUT',
                        _id=_id, payload=payload, recursive=rec, r_payload=r_payload, replace_metadata=replace_metadata)
        except APIStorageException as e:
            self.abort(400, e.message)

        if result.modified_count == 1:
            return {'modified': result.modified_count}
        else:
            self.abort(404, 'Element not updated in container {} {}'.format(self.storage.cont_name, _id))

    def modify_info(self, cont_name, **kwargs):
        _id = kwargs.pop('cid')

        # Support subject info modification in new style
        # Will be removed when subject becomes stand-alone container
        modify_subject = True if 'subject' in kwargs else False
        if modify_subject and cont_name != 'sessions':
            self.abort(400, 'Subject info modification only allowed via session.')

        self.config = self.container_handler_configurations[cont_name]
        self.storage = self.config['storage']
        container = self._get_container(_id)
        permchecker = self._get_permchecker(container)
        payload = self.request.json_body

        validators.validate_data(payload, 'info_update.json', 'input', 'POST')

        permchecker(noop)('PUT', _id=_id)
        self.storage.modify_info(_id, payload, modify_subject=modify_subject)

        return

    @log_access(AccessType.delete_container)
    def delete(self, cont_name, **kwargs):
        _id = kwargs.pop('cid')
        self.config = self.container_handler_configurations[cont_name]
        self.storage = self.config['storage']

        if cont_name == 'sessions':
            get_children = True
        else:
            get_children = False
        container = self._get_container(_id, get_children=get_children)
        container['cont_name'] = containerutil.singularize(cont_name)

        if cont_name in ['sessions', 'acquisitions']:
            container['has_original_data'] = containerutil.container_has_original_data(container, child_cont_name=self.config.get('children_cont'))
        if cont_name == 'acquisitions':
            analyses = containerutil.get_referring_analyses(cont_name, _id)
            if analyses:
                analysis_ids = [str(a['_id']) for a in analyses]
                errors = {'reason': 'analysis_conflict'}
                raise APIPermissionException('Cannot delete acquisition {} referenced by analyses {}'.format(_id, analysis_ids), errors=errors)

        target_parent_container, _ = self._get_parent_container(container)
        permchecker = self._get_permchecker(container, target_parent_container)
        permchecker(noop)('DELETE', _id)
        if self.is_true('check'):
            # User only wanted to check permissions, respond with 200
            return

        try:
            # This line exec the actual delete checking permissions using the decorator permchecker
            result = self.storage.exec_op('DELETE', _id)
        except APIStorageException as e:
            self.abort(400, e.message)
        if result.modified_count == 1:
            deleted_at = config.db[cont_name].find_one({'_id': bson.ObjectId(_id)})['deleted']
            # Don't overwrite deleted timestamp for already deleted children
            query = {'deleted': {'$exists': False}}
            update = {'$set': {'deleted': deleted_at}}
            containerutil.propagate_changes(cont_name, bson.ObjectId(_id), query, update, include_refs=True)
            return {'deleted': 1}
        else:
            self.abort(404, 'Element not removed from container {} {}'.format(self.storage.cont_name, _id))


    def get_groups_with_project(self):
        """
        method to return the list of groups for which there are projects accessible to the user
        """
        group_ids = list(set((p['group'] for p in self.get_all('projects'))))
        return list(config.db.groups.find({'_id': {'$in': group_ids}}, ['label']))

    def set_project_template(self, **kwargs):
        project_id = kwargs.pop('cid')
        self.config = self.container_handler_configurations['projects']
        self.storage = self.config['storage']
        container = self._get_container(project_id)

        template = self.request.json_body
        validators.validate_data(template, 'project-template.json', 'input', 'POST')
        payload = {'template': template}
        payload['modified'] = datetime.datetime.utcnow()

        permchecker = self._get_permchecker(container)
        result = permchecker(self.storage.exec_op)('PUT', _id=project_id, payload=payload)
        return {'modified': result.modified_count}

    def delete_project_template(self, **kwargs):
        project_id = kwargs.pop('cid')
        self.config = self.container_handler_configurations['projects']
        self.storage = self.config['storage']
        container = self._get_container(project_id)

        payload = {'modified': datetime.datetime.utcnow()}
        unset_payload = {'template': ''}

        permchecker = self._get_permchecker(container)
        result = permchecker(self.storage.exec_op)('PUT', _id=project_id, payload=payload, unset_payload=unset_payload)
        return {'modified': result.modified_count}


    def calculate_project_compliance(self, **kwargs):
        project_id = kwargs.pop('cid', None)
        log.debug("project_id is {}".format(project_id))
        self.config = self.container_handler_configurations['projects']
        self.storage = self.config['storage']
        return {'sessions_changed': self.storage.recalc_sessions_compliance(project_id=project_id)}

    def _get_validators(self):
        mongo_schema_uri = validators.schema_uri('mongo', self.config.get('storage_schema_file'))
        mongo_validator = validators.decorator_from_schema_path(mongo_schema_uri)
        payload_schema_uri = validators.schema_uri('input', self.config.get('payload_schema_file'))
        payload_validator = validators.from_schema_path(payload_schema_uri)
        return mongo_validator, payload_validator

    def _add_session_measurements(self, results):
        session_measurements = config.db.acquisitions.aggregate([
            {'$match': {'session': {'$in': [sess['_id'] for sess in results]}}},
            {'$project': { '_id': '$session', 'files':1 }},
            {'$unwind': '$files'},
            {'$project': { '_id': '$_id', 'files.measurements': 1}},
            {'$unwind': '$files.measurements'},
            {'$group': {'_id': '$_id', 'measurements': {'$addToSet': '$files.measurements'}}}
        ])
        session_measurements = {sess['_id']: sess['measurements'] for sess in session_measurements}
        for sess in results:
            sess['measurements'] = session_measurements.get(sess['_id'], None)

    def _get_parent_container(self, payload):
        if not self.config.get('parent_storage'):
            return None, None
        parent_storage = self.config['parent_storage']
        parent_id_property = parent_storage.cont_name[:-1]
        parent_id = payload.get(parent_id_property)
        if parent_id:
            parent_storage.dbc = config.db[parent_storage.cont_name]
            parent_container = parent_storage.get_container(parent_id)
            if parent_container is None:
                self.abort(404, 'Element {} not found in container {}'.format(parent_id, parent_storage.cont_name))
            parent_container['cont_name'] = parent_storage.cont_name[:-1]
        else:
            parent_container = None
        return parent_container, parent_id_property

    def _get_container(self, _id, projection=None, get_children=False):
        try:
            container = self.storage.get_container(_id, projection=projection, get_children=get_children)
        except APIStorageException as e:
            self.abort(400, e.message)
        if container is not None:
            files = container.get('files', [])
            if files:
                container['files'] = [f for f in files if 'deleted' not in f]
            return container
        else:
            self.abort(404, 'Element {} not found in container {}'.format(_id, self.storage.cont_name))

    def _get_permchecker(self, container=None, parent_container=None):
        if self.superuser_request:
            return always_ok
        elif self.public_request:
            return containerauth.public_request(self, container)
        else:
            permchecker = self.config['permchecker']
            return permchecker(self, container, parent_container)