scitran/core

View on GitHub
api/jobs/jobs.py

Summary

Maintainability
D
2 days
Test Coverage
"""
Jobs
"""

import bson
import copy
import datetime
import string

from ..types import Origin
from ..dao.containerutil import create_filereference_from_dictionary, create_containerreference_from_dictionary, create_containerreference_from_filereference

from .. import config
from ..util import render_template
from ..web.errors import APINotFoundException


class Job(object):
    def __init__(self, gear_id, inputs, destination=None, tags=None,
                 attempt=1, previous_job_id=None, created=None,
                 modified=None, state='pending', request=None,
                 id_=None, config_=None, origin=None,
                 saved_files=None, produced_metadata=None, batch=None,
                 failed_output_accepted=False, profile=None):
        """
        Creates a job.

        Parameters
        ----------
        gear_id: string
            Unique gear_id of the algorithm
        inputs: string -> FileReference map
            The inputs to be used by this job
        destination: ContainerReference (optional)
            Where to place the gear's output. Defaults to one of the input's containers.
        tags: string array (optional)
            Tags that this job should be marked with.
        attempt: integer (optional)
            If an equivalent job has tried & failed before, pass which attempt number we're at. Defaults to 1 (no previous attempts).
        previous_job_id: string (optional)
            If an equivalent job has tried & failed before, pass the last job attempt. Defaults to None (no previous attempts).
        created: datetime (optional)
        modified: datetime (optional)
            Timestamps
        state: string (optional)
            The state of this job. Defaults to 'pending'.
        request: map (optional)
            The request that is used for the engine. Generated when job is started.
        id_: string (optional)
            The database identifier for this job.
        config: map (optional)
            The gear configuration for this job.
        failed_output_accepted: bool (optional)
            Flag indicating whether output was accepted for a failed job.
        """

        # TODO: validate inputs against the manifest

        if type(gear_id) is bson.ObjectId:
            raise Exception('gear_id should be a string, not an ObjectId.')

        time_now = datetime.datetime.utcnow()

        if tags is None:
            tags = []
        if saved_files is None:
            saved_files = []
        if produced_metadata is None:
            produced_metadata = {}
        if created is None:
            created = time_now
        if modified is None:
            modified = time_now
        if profile is None:
            profile = {}

        if destination is None and inputs is not None:
            # Grab an arbitrary input's container
            key = inputs.keys()[0]
            fr = inputs[key]
            destination = create_containerreference_from_filereference(fr)

        # Trim tags array to unique members...
        tags = list(set(tags))

        # If no origin, mark as system origin
        if origin is None:
            origin = {
                'type': str(Origin.system),
                'id': None
            }

        self.gear_id            = gear_id
        self.inputs             = inputs
        self.destination        = destination
        self.tags               = tags
        self.attempt            = attempt
        self.previous_job_id    = previous_job_id
        self.created            = created
        self.modified           = modified
        self.state              = state
        self.request            = request
        self.id_                = id_
        self.config             = config_
        self.origin             = origin
        self.saved_files        = saved_files
        self.produced_metadata  = produced_metadata
        self.batch              = batch
        self.failed_output_accepted = failed_output_accepted
        self.profile = profile


    def intention_equals(self, other_job):
        """
        Compare this job's intention to other_job.
        Returns True if other_job's gear_id, inputs and destination match self.
        Returns False otherwise.

        Useful for comparing auto-triggered jobs for equality.
        Implicitly uses dict, FileReference and ContainerReference _cmp_ methods.
        """
        if (
            isinstance(other_job, Job) and
            self.gear_id == other_job.gear_id and
            self.inputs == other_job.inputs and
            self.destination == other_job.destination
        ):
            return True

        else:
            return False


    @classmethod
    def load(cls, e):
        # TODO: validate

        # Don't modify the map
        d = copy.deepcopy(e)

        if d.get('inputs'):
            input_dict = {}

            for i in d['inputs']:
                inp = i.pop('input')
                input_dict[inp] = create_filereference_from_dictionary(i)

            d['inputs'] = input_dict

        if d.get('destination', None):
            d['destination'] = create_containerreference_from_dictionary(d['destination'])

        d['_id'] = str(d['_id'])

        return cls(d['gear_id'], d.get('inputs'),
            destination=d.get('destination'),
            tags=d['tags'], attempt=d['attempt'],
            previous_job_id=d.get('previous_job_id'),
            created=d['created'],
            modified=d['modified'],
            state=d['state'],
            request=d.get('request'),
            id_=d['_id'],
            config_=d.get('config'),
            origin=d.get('origin'),
            saved_files=d.get('saved_files'),
            produced_metadata=d.get('produced_metadata'),
            batch=d.get('batch'),
            failed_output_accepted=d.get('failed_output_accepted', False),
            profile=d.get('profile', {})
        )

    @classmethod
    def get(cls, _id):
        doc = config.db.jobs.find_one({'_id': bson.ObjectId(_id)})
        if doc is None:
            raise APINotFoundException('Job not found')

        return cls.load(doc)

    def map(self):
        """
        Flatten struct to map
        """

        # Don't modify the job obj
        d = copy.deepcopy(self.__dict__)

        d['id'] = d.pop('id_', None)

        if d.get('inputs'):
            for x in d['inputs'].keys():
                d['inputs'][x] = d['inputs'][x].__dict__
        else:
            d.pop('inputs')

        if d.get('destination'):
            d['destination'] = d['destination'].__dict__
        else:
            d.pop('destination')

        if d['id'] is None:
            d.pop('id')
        if d['previous_job_id'] is None:
            d.pop('previous_job_id')
        if d['request'] is None:
            d.pop('request')
        if d['failed_output_accepted'] is False:
            d.pop('failed_output_accepted')

        return d

    def mongo(self):
        d = self.map()
        if d.get('id'):
            d['id'] = bson.ObjectId(d['id'])
        if d.get('inputs'):
            input_array = []
            for k, inp in d['inputs'].iteritems():
                inp['input'] = k
                input_array.append(inp)
            d['inputs'] = input_array

        return d

    def insert(self):
        """
        Warning: this will not stop you from inserting a job for a gear that has gear.custom.flywheel.invald set to true.
        """

        if self.id_ is not None:
            raise Exception('Cannot insert job that has already been inserted')

        result = config.db.jobs.insert_one(self.mongo())
        self.id_ = result.inserted_id
        return result.inserted_id

    def save(self):
        self.modified = datetime.datetime.utcnow()
        update = self.mongo()
        job_id = update.pop('id')
        result = config.db.jobs.replace_one({'_id': job_id}, update)
        if result.modified_count != 1:
            raise Exception('Job modification not saved')
        return {'modified_count': 1}

    def generate_request(self, gear):
        """
        Generate the job's request, save it to the class, and return it

        Parameters
        ----------
        gear: map
            A gear_list map from the gears table.
        """

        if gear.get('gear', {}).get('custom', {}).get('flywheel', {}).get('invalid', False):
            raise Exception('Gear marked as invalid, will not run!')

        r = {
            'inputs': [
                {
                    'type': 'http',
                    'uri': gear['exchange']['rootfs-url'],
                    'vu': 'vu0:x-' + gear['exchange']['rootfs-hash'],
                    'location': '/',
                }
            ],
            'target': {
                'command': None,
                'env': {
                    'PATH': '/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin'
                },
                'dir': '/flywheel/v0',
            },
            'outputs': [
                {
                    'type': 'scitran',
                    'uri': '',
                    'location': '/flywheel/v0/output',
                },
            ],
        }

        # Map destination to upload URI
        r['outputs'][0]['uri'] = '/engine?level=' + self.destination.type + '&id=' + self.destination.id

        # Add environment, if any
        for key in gear['gear'].get('environment', {}).keys():
            r['target']['env'][key] = gear['gear']['environment'][key]

        # Add command, if any
        command_base = 'env; rm -rf output; mkdir -p output; '
        if gear['gear'].get('command') is not None:

            command = render_template(gear['gear']['command'], self.config['config'])

            r['target']['command'] = ['bash', '-c', command_base + command ]
        else:
            r['target']['command'] = ['bash', '-c', command_base + './run' ]

        # Add config, if any
        if self.config is not None:

            if self.id_ is None:
                raise Exception('Running a job requires an ID')

            # Detect if config is old- or new-style.
            # TODO: remove this logic with a DB upgrade, ref database.py's reserved upgrade section.

            # Add config scalars as environment variables
            if self.config.get('config') is not None and self.config.get('inputs') is not None:
                # New config behavior

                cf = self.config['config']

                # Whitelist characters that can be used in bash variable names
                bash_variable_letters = set(string.ascii_letters + string.digits + ' ' + '_')

                for x in cf:
                    # Strip non-whitelisted characters, set to underscore, and uppercase
                    config_name = filter(lambda char: char in bash_variable_letters, x)
                    config_name = config_name.replace(' ', '_').upper()

                    # Don't set nonsensical environment variables
                    if config_name == '':
                        print 'The gear config name ' + x + ' has no whitelisted characters!'
                        continue

                    if isinstance(cf[x], list):
                        # Stringify array or set
                        # Might have same issue as scalars with "True" >:(
                        r['target']['env']['FW_CONFIG_' + config_name] = str(cf[x])

                    elif isinstance(cf[x], dict):
                        raise Exception('Disallowed object-type config value ' + x + ' ' + str(cf[x]))

                    else:
                        # Stringify scalar
                        # Python strings true as "True"; fix
                        if not isinstance(cf[x], bool):
                            r['target']['env']['FW_CONFIG_' + config_name] = str(cf[x])
                        else:
                            r['target']['env']['FW_CONFIG_' + config_name] = str(cf[x]).lower()

            else:
                # Old config map.
                pass

            r['inputs'].append({
                'type': 'scitran',
                'uri': '/jobs/' + self.id_ + '/config.json',
                'location': '/flywheel/v0',
            })

        # Add the files
        if self.inputs is not None:
            for input_name in self.inputs.keys():
                i = self.inputs[input_name]

                r['inputs'].append({
                    'type': 'scitran',
                    'uri': i.file_uri(i.name),
                    'location': '/flywheel/v0/input/' + input_name,
                })

        # Log job origin if provided
        if self.id_:
            r['outputs'][0]['uri'] += '&job=' + self.id_

        self.request = r
        return self.request

class JobTicket(object):
    """
    A JobTicket represents an attempt to complete a job.
    """

    @staticmethod
    def get(_id):
        return config.db.job_tickets.find_one({'_id': bson.ObjectId(_id)})

    @staticmethod
    def create(job_id, success, elapsed):
        j = Job.get(job_id)

        result = config.db.job_tickets.insert_one({
            'job': j.id_,
            'success': success,
            'elapsed': elapsed,
        })

        return result.inserted_id

    @staticmethod
    def find(job_id):
        """Find any tickets with job ID"""
        return list(config.db.job_tickets.find({'job': job_id}))


class Logs(object):

    @staticmethod
    def get(_id):
        log = config.db.job_logs.find_one({'_id': _id})

        if log is None:
            return { '_id': _id, 'logs': [] }
        else:
            return log

    @staticmethod
    def get_text_generator(_id):
        log = config.db.job_logs.find_one({'_id': _id})

        if log is None:
            yield '<span class="fd--1">No logs were found for this job.</span>'
        else:
            for stanza in log['logs']:
                msg = stanza['msg']
                yield msg

    @staticmethod
    def get_html_generator(_id):
        log = config.db.job_logs.find_one({'_id': _id})

        if log is None:
            yield '<span class="fd--1">No logs were found for this job.</span>'

        else:
            open_span = False
            last = None

            for stanza in log['logs']:
                fd = stanza['fd']
                msg = stanza['msg']

                if fd != last:
                    if open_span:
                        yield '</span>\n'

                    yield '<span class="fd-' + str(fd) + '">'
                    open_span = True
                    last = fd

                yield msg.replace('\n', '<br/>\n')

            if open_span:
                yield '</span>\n'

    @staticmethod
    def add(_id, doc):

        # Silently ignore adding no logs
        if len(doc) <= 0:
            return

        log = config.db.job_logs.find_one({'_id': _id})

        if log is None: # Race
            config.db.job_logs.insert_one({'_id': _id, 'logs': []})

        config.db.job_logs.update({'_id': _id}, {'$push':{'logs':{'$each':doc}}})