RackHD/on-tasks

View on GitHub
lib/task.js

Summary

Maintainability
F
6 days
Test Coverage
// Copyright 2015, EMC, Inc.

'use strict';

var di = require('di');

module.exports = factory;
di.annotate(factory, new di.Provide('Task.Task'));
di.annotate(factory,
    new di.Inject(
        'Services.Configuration',
        'Logger',
        'Assert',
        'Errors',
        'Constants',
        'uuid',
        'Hogan',
        'Promise',
        'Rx',
        '_',
        'Services.Environment',
        'Services.Waterline',
        'TaskOption.Validator',
        'Task.taskLibrary',
        di.Injector
    )
);

/**
 * Injectable wrapper for dependencies
 * @param logger
 */
function factory(
    configuration,
    Logger,
    assert,
    Errors,
    Constants,
    uuid,
    Hogan,
    Promise,
    Rx,
    _,
    env,
    waterline,
    validator,
    taskLibrary,
    injector
) {
    var COMMON_OPTIONS_SCHEMA = 'common-task-options.json';

    var TaskStates = Constants.Task.States;
    var logger = Logger.initialize(factory);

    /**
     *
     * @param taskGraph
     * @param optionOverrides
     * @returns {factory.Task}
     * @constructor
     */
    function Task(definition, taskOverrides, context) {
        var self = this;
        assert.object(context, 'Task shared context object');

        self.definition = _.cloneDeep(definition);

        // run state related properties
        self.cancelled = false;
        taskOverrides = taskOverrides || {};
        // Don't do context rendering if this is true. This signals to only
        // do up-front validation before we have any dynamic data available.
        self.compileOnly = taskOverrides.compileOnly || false;

        self.instanceId = taskOverrides.instanceId || uuid.v4();
        // Add convenience nodeId attribute to be used for rendering some task data option values
        if (_.has(context, 'target')) {
            self.nodeId = context.target;
        }

        self.name = taskOverrides.name || self.definition.injectableName;
        self.friendlyName = taskOverrides.friendlyName || self.definition.friendlyName;
        self.waitingOn = taskOverrides.waitingOn || [];
        self.ignoreFailure = taskOverrides.ignoreFailure || false;
        self.error = null;

        // tags for categorization and hinting functionality
        self.properties = definition.properties;

        // State bag shared throughout tasks in a TaskGraph
        self.context = context;

        // state of the current object
        self.state = TaskStates.Pending;

        // hint to whatever is running the task when it is successful
        self.successStates = [TaskStates.Succeeded];
        // hint to whatever is running the task when it has failed
        self.failedStates = [TaskStates.Failed, TaskStates.Timeout, TaskStates.Cancelled];

        var server;
        if (_.has(context, 'proxy')) {
            server = context.proxy;
        } else {
            server = 'http://%s:%s'.format(
                Task.configCache.apiServerAddress,
                Task.configCache.apiServerPort
            );
        }
        var fileServerUri;
        if (_.has(Task.configCache, 'fileServerAddress')) {
            fileServerUri = 'http://%s'.format(Task.configCache.fileServerAddress);
            if (_.has(Task.configCache, 'fileServerPort')) {
                fileServerUri = fileServerUri + ':' + Task.configCache.fileServerPort;
            }
            if (_.has(Task.configCache, 'fileServerPath')) {
                fileServerUri = fileServerUri + Task.configCache.fileServerPath;
                fileServerUri = _.trimRight(fileServerUri, '/');
            }
        } else {
            fileServerUri = server;
        }

        self.renderContext = {
            server: Task.configCache,
            api: { server: server },
            file: {
                server: fileServerUri
            },
            task: self,
            options: self.definition.options,
            context: self.context
        };
        self.renderContext.api.base = self.renderContext.api.server + '/api/current';
        self.renderContext.api.templates = self.renderContext.api.base + '/templates';
        self.renderContext.api.profiles = self.renderContext.api.base + '/profiles';
        self.renderContext.api.lookups = self.renderContext.api.base + '/lookups';
        self.renderContext.api.files = self.renderContext.api.base + '/files';
        self.renderContext.api.nodes = self.renderContext.api.base + '/nodes';

        self._taskTimeout = self.definition.options._taskTimeout;

        return self;
    }

    Task.prototype.getSkuId = function(nodeId) {
        if(!nodeId)  {
            return Promise.resolve();
        }

        return waterline.nodes.needByIdentifier(nodeId)
            .then(function(node) {
                if (node.sku) {
                    return node.sku;
                }
            });
    };

    Task.prototype.renderAll = function(nodeId, options){
        var self = this;
        return self.getSkuId(nodeId).
            then(function (skuid) {
                if (!skuid){
                    return env.get('config', {}, [Constants.Scope.Global]).
                        then( function(env){
                            self.renderContext.env = env;
                        });
                }else{
                    return Promise.all([
                            env.get('config', {}, [skuid]),
                            env.get('config', {}, [skuid, Constants.Scope.Global])
                        ]).spread(function (sku, env) {
                            self.renderContext.sku = sku;
                            self.renderContext.env = env;
                        });
                }
            }).finally(function(){
                self.renderOwnOptions(_.cloneDeep(options));
         });
    };

    Task.prototype._isDeferredRender = function(renderKey) {
        return this.compileOnly && renderKey === 'context';
    };

    Task.prototype.renderString = function(str, context, depth, maxDepth) {
        var self = this;

        if (depth > maxDepth) {
            throw new Errors.TemplateRenderError("Exceeded max depth rendering string: " + str);
        }

        var rendered;
        var deferredRender = false;

        // Support OR syntax: {{ options.value | options.backupValue | default }}
        // ^^ Will return "default" if options.value and options.backupValue do not exist
        _.some(str.split(/[\s\n]?\|+[\s\n]?/), function(item) {
            // If item is a string that should not be rendered yet (i.e. it requires
            // data generated dynamically at workflow runtime), defer rendering
            // until later
            if (self._isDeferredRender(item.split('.')[0])) {
                rendered = '{{' + str + '}}';
                deferredRender = true;
                return rendered;
            }
            rendered = item.indexOf('.') > -1 ? _.get(context, item) : item;
            return rendered;
        });

        if (!rendered) {
            throw new Errors.TemplateRenderError("Value does not exist for " + str);
        } else if (deferredRender) {
            return rendered;
        }
        // Check if our rendered string itself needs to be rendered (i.e. the rendered
        // template is itself another template that needs to be rendered)
        var partialRender = _.some(self.parse(rendered), function(item) {
            return item.tag === '_v';
        });
        // NOTE (benbp): this is a non-optimal algorithm. Since we aren't caching nested
        // render results, we may end up rendering some templates multiple times.
        // I highly doubt this will cause any bottlenecks given the small scale at which
        // we are rendering, so it's not worth the extra effort at this time.
        if (partialRender) {
            return self.render(rendered, context, depth + 1);
        } else {
            return rendered;
        }
    };

    Task.prototype.parse = function(source) {
        var renderedValue;
        if  (Array.isArray(source)) {
            renderedValue = [];
            source.forEach(function (element) {
                renderedValue.push(Hogan.parse(Hogan.scan(element)));
            });
            return renderedValue;
        }
        renderedValue = Hogan.parse(Hogan.scan(source));
        return renderedValue;
    };

    Task.prototype.renderComplex = function(source, context) {
        return Hogan.compile(source).render(context);
    };

    Task.prototype.render = function(source, context, depth) {
        var self = this;

        var parsed = self.parse(source);
        // use original hogan render while source contains complex logic like iteration
        var complexTemplate = _.some(parsed, function(item) {
            return item.tag !== '_t' && item.tag !== '_v';
        });
        if (complexTemplate) {
            return self.renderComplex(source, context);
        }


        var isString = false;
        var mappedValues  = _.map(parsed, function(item) {
            // Read Hogan parse tree objects
            // _t === simple text
            // _v === value to be rendered
            if (item.tag === '_t') {
                return _.values(item.text).join('');
            } else if (item.tag === '_v') {
                depth = _.isNumber(depth) ? depth : 0;
                var type = _.get(context, item.n);

                if(typeof type === 'string' || type instanceof String ||  typeof type === 'undefined' || type === null)  {
                    isString = true;
                } else {
                    isString = false;
                }
                return self.renderString(item.n, context, depth, 50);
            }

            });

        if (isString){
            return mappedValues.join('');
        }else{
            return mappedValues[0];
        }
    };

    Task.prototype.renderOptions = function(toRender, renderContext) {
        var self = this;
        if (_.isEmpty(toRender)) {
            return toRender;
        } else if (typeof toRender === 'string') {
            return self.render(toRender, renderContext);
        } else {
            return _.transform(toRender, function(acc, v, k) {
                acc[k] = self.renderOptions(v, renderContext);
            }, toRender);
        }
    };

    Task.prototype.renderOwnOptions = function(toRender) {
        this.options = this.renderOptions(toRender, this.renderContext);
    };

    Task.prototype.instantiateJob = function() {
        // This should already have been validated to exist
        var Job = injector.get(this.definition.runJob);
        this.job = new Job(this.options, this.context, this.instanceId, this.definition.label);
    };

    Task.prototype.run = function() {
        var self = this;
        if (self.state !== TaskStates.Pending) {
            return Promise.resolve();
        }
        self.state = TaskStates.Running;
        return Promise.resolve()
            .then(function() {
                return self.renderAll(self.nodeId, self.definition.options);
            })
            .then(function() {
                Task.validateOptions(self.definition, self.options, { skipContext: false });
            })
            .then(function() {
                return self._run();
            })
            .then(function() {
                self.state = TaskStates.Succeeded;
            })
            .then(function() {
                return self;
            })
            .catch(Errors.TaskCancellationError, function(e) {
                self.state = TaskStates.Cancelled;
                self.error = e;
                return self;
            })
            .catch(Errors.TaskStopError, function(e) {
                self.state = TaskStates.Pending;
                self.error = e;
                return null;
            })
            .catch(Errors.TaskTimeoutError, function(e) {
                self.state = TaskStates.Timeout;
                self.error = e;
                return self;
            })
            .catch(function(e) {
                self.state = TaskStates.Failed;
                self.error = e;
                return self;
            })
            .finally(function() {
                if (self.timer) {
                    clearTimeout(self.timer);
                }
                if (self.error) {
                    var data = {
                        error: self.error.toString(),
                        taskId: self.instanceId,
                        graphId: self.context.graphId,
                        injectableName: self.name
                    };
                    if (self.nodeId) {
                        data.node = self.nodeId;
                    }
                    logger.error('Task failed', data);
                }
            });
    };

    Task.prototype._run = function() {
        var self = this;

        self.instantiateJob();

        // TODO: it may better to define this in the database in the case that
        // a task runner crashes, and the task is re-run, the timeout clock
        // will reset. It's somewhat of a design question whether we want
        // timeouts to be for each task iteration or for the max time allowed period.
        // If timeout is undefined, this evaluates to false as well.
        if (self._taskTimeout > 0) {
            self.timer = setTimeout(function() {
                // This ends up cancelling the job, which causes the promise returned
                // by the `self.job.run()` call below to then reject, which gets
                // caught by the promise chain in `task.run()`.
                self.cancel(new Errors.TaskTimeoutError(
                    "Task did not complete within " + self._taskTimeout + "ms"));
            }, self._taskTimeout);
        }
        logger.info("Running task job.", {
            taskId: self.instanceId,
            name: self.definition.friendlyName,
            job: self.definition.runJob
        });

        return self.job.run();
    };

    Task.prototype.stop = function() {
        //Used to handle stopping a task without triggering
        //the downstream cancellation logic
        this.state = TaskStates.Pending;
        return this.job.cancel(new Errors.TaskStopError());
    };

    Task.prototype.cancel = function(error) {
        var self = this;
        error = error || new Errors.TaskCancellationError();
        self.error = error;

        if (self.state === TaskStates.Running) {
            // Task cancellation is passed down to the job first
            // and then bubbled up into the promise chain
            // created in this.run()
            self.state = TaskStates.Cancelled;
            return self.job
                .cancel(error)
                .catch(function() {
                    //This is expected Error when task is cancelled, which has been addressed in
                    //this.run(), so we just swallow it here.
                });
        }

        if (self.state === TaskStates.Pending) {
            // This block handles cancellation before run() has
            // been called (i.e. the task has been created, but not
            // yet scheduled to run by the scheduler via taskProtocol.subscribeRun
            self.state = TaskStates.Cancelled;
        }
        return Promise.resolve();
    };


    // enables JSON.stringify(this)
    Task.prototype.toJSON = function toJSON() {
        return this.serialize();
    };

    Task.prototype.serialize = function serialize() {
        var redactKeys = ['renderContext', 'timer'];
        var obj = _.transform(this, function(result, v, k) {
            if (!_.contains(redactKeys, k)) {
                result[k] = v;
            }
        }, {});

        if (this.job) {
            obj.job = this.job.serialize();
        }

        return obj;
    };

    /**
     * compile task instance
     *
     * @memberof Task
     * @return {Promise} Returns a fulfilled promise if compile success, otherwise a rejected
     * promise.
     */
    Task.prototype.compile = function compile() {
        var self = this;
        return self.renderAll(self.nodeId, self.definition.options)
        .then(function() {
            Task.validateOptions(self.definition, self.options, { skipContext: true });
        });
    };

    /**
     * handle task common options
     *
     * This handling will operate on the input options itself. Currently this function only checks
     * the task timout setting, if in future there is additional common setting, put it here as
     * well.
     *
     * @param {Object} options - The input task options.
     * @return {Object} the result options object.
     */
    function handleCommonOptions(options) {
        // check the task timeout, if not specified, then set a default one.
        if (!options.hasOwnProperty('_taskTimeout') && options.schedulerOverrides &&
                options.schedulerOverrides.hasOwnProperty('timeout')) {
            options._taskTimeout = options.schedulerOverrides.timeout;
        }

        if (typeof options._taskTimeout !== 'number') {
            options._taskTimeout = 24 * 60 * 60 * 1000; //default to 24 hour timeout
        }
        return options;
    }

    function _validateSchemaProperty(schema, title) {
        if (!schema || _.isString(schema) || _.isObject(schema)) {
            return;
        }
        throw new Error( (title ? (title + ': ') : '') + 'schema must be either string or object');
    }

    Task.getBaseTask = function(query) {
        var injectableName;
        if (_.isString(query)) {
            injectableName = query;
        }
        else if (_.isObject(query)) {
            assert.string(query.implementsTask);
            injectableName = query.implementsTask;
        }
        else {
            throw new Error('getBaseTask allows either input injectableName or task definition');
        }

        var baseTaskDefinition = _.find(taskLibrary, function(task) {
            return !task.implementsTask && task.injectableName === injectableName;
        });

        if (!baseTaskDefinition) {
            throw new Error('cannot find baseTask with injectableName=' + injectableName);
        }
        return baseTaskDefinition;
    };

    Task.validateBaseTask = function(baseTask) {
        assert.string(baseTask.friendlyName, 'BaseTask friendlyName');
        assert.string(baseTask.injectableName, 'BaseTask injectableName');
        assert.object(baseTask.requiredProperties, 'requiredProperties');
        assert.object(baseTask.properties, 'properties');
        _validateSchemaProperty(baseTask.optionsSchema, 'BaseTask');

        assert.string(baseTask.runJob, 'BaseTask runJob');
        var Job = injector.get(baseTask.runJob);
        assert.func(Job.prototype.run, "Task Job run method");
        assert.func(Job.prototype.cancel, "Task Job cancel method");
    };

    /**
     * validate task definition
     *
     * @param {Object} definition - The task definition that to be validated.
     * @throws Will throw an error if the validation fails.
     * @static
     */
    Task.validateDefinition = function validateDefinition(definition) {
        assert.object(definition, 'Task definition data');
        assert.string(definition.friendlyName, 'Task definition friendly name');
        assert.string(definition.implementsTask, 'Task definition implementsTask');
        assert.object(definition.options, 'Task definition option');
        assert.object(definition.properties, 'Task definition properties');
        _validateSchemaProperty(definition.optionsSchema, 'Task definition');

        var baseTask = Task.getBaseTask(definition);
        Task.validateBaseTask(baseTask);
    };

    /**
     * create task instance
     *
     * There are two kinds of task instance creation:
     * 1 - full creation
     * This means the runtime context is ready, all task features can be enabled.
     *
     * 2 - compile-only creation
     * This is used when the runtime context is not ready, the creation will skip the features
     * that leverage the context.
     * Set 'taskOverrides.compileOnly' to true can enable the compile-only creation.
     *
     * @param {Object} definition - The task definition
     * @param {Object} taskOverides - The parameter to overide some task definition's properties.
     * @param {Object} context - The task graph runtime context
     * @return {Promise<Task>} - Return a task instance if input parameters are valid.
     * @static
     */
    Task.create = function create(definition, taskOverrides, context) {
        return Promise.try(function() {
            Task.validateDefinition(definition);
        })
        .then(function() {
            //set task common options so that it can be validated using schema as well.
            handleCommonOptions(definition.options);
        })
        .then(function() {
            return new Task(definition, taskOverrides, context);
        })
        .tap(function(task) {
            // If compileOnly is falsey, do rendering in the `.run()` step for
            // compatibility with how the task runner code is written.
            if (taskOverrides.compileOnly) {
                return task.compile();
            }
        });
    };

    Task.getCommonSchema = function () {
        return validator.getSchema(COMMON_OPTIONS_SCHEMA);
    };

    Task.getTaskSpecificSchema = function(definition) {
        var taskSpecific = definition.optionsSchema;
        if (_.isString(taskSpecific)) {
            taskSpecific = validator.getSchema(taskSpecific);
        }
        return taskSpecific;
    };

    Task.getJobSchema = function(definition) {
        var baseTask = Task.getBaseTask(definition);
        var job = baseTask.optionsSchema;
        if (_.isString(job)) {
            job = validator.getSchema(job);
        }
        return job;
    };

    Task.getFullSchema = function getFullSchema(definition) {
        var schemas = _.compact([
            Task.getCommonSchema(),
            Task.getTaskSpecificSchema(definition),
            Task.getJobSchema(definition)
        ]);

        //since each sub schema may has its own `definitions`, so if directly merge those schemas
        //with allOf, then those $ref to definitions will become invalid. So before merging
        //sub-schemas, all $ref in these schemas have to be resolved so as to remove the reference
        //on their specific definitions.
        var resolvedSchemas = _.map(schemas, function(schema) {
            var tempSchemaId = uuid.v4();

            //add schema then remove it is to resolve reference
            validator.addSchema(schema, tempSchemaId);
            var schemaResolved = validator.getSchemaResolved(tempSchemaId);
            validator.removeSchema(tempSchemaId);
            delete schemaResolved.id;
            return schemaResolved;
        });

        //Combine common/job/taskSpecific schema into a large schema
        return {
            allOf: resolvedSchemas
        };
    };

    Task.validateOptions = function (definition, options, flags) {
        var jobSchema;

        //if the definition is from taskgraph, then the taskgraph has helped to assigned the
        //joOptionsSchema, so it's uncessary to lookup the baseTask again
        if (definition.hasOwnProperty('jobOptionsSchema')) {
            jobSchema = definition.jobOptionsSchema;
        }
        else {
            var baseTask = Task.getBaseTask(definition);
            jobSchema = baseTask.optionsSchema;
        }
        var schemas = _.compact([
            COMMON_OPTIONS_SCHEMA,
            definition.optionsSchema,
            jobSchema
        ]);

        _.forEach(schemas, function(schema) {
            try {
                if (flags && flags.skipContext) {
                    validator.validateContextSkipped(schema, options);
                }
                else {
                    validator.validate(schema, options);
                }
            }
            catch (err) {
                err.message = definition.injectableName + ': ' + err.message;
                throw err;
            }
        });
    };

    // NOTE: getAll() has turned out to be a perf bottleneck, taking up to 40ms in
    // some cases. Until we make configuration updates dynamic, just cache this
    // at startup to avoid performance issues.
    Task.configCache = configuration.getAll();

    return Task;
}