RackHD/on-tasks

View on GitHub
lib/task-graph.js

Summary

Maintainability
F
5 days
Test Coverage
// Copyright 2016, EMC, Inc.
'use strict';

var di = require('di');

module.exports = taskGraphFactory;
di.annotate(taskGraphFactory, new di.Provide('TaskGraph.TaskGraph'));
di.annotate(taskGraphFactory,
    new di.Inject(
        'Task.taskLibrary',
        'Task.Task',
        'TaskGraph.Store',
        'Task.Messenger',
        'Services.Waterline',
        'Constants',
        'Assert',
        'uuid',
        'Promise',
        '_'
    )
);

function taskGraphFactory(
    taskLibrary,
    Task,
    store,
    taskMessenger,
    waterline,
    Constants,
    assert,
    uuid,
    Promise,
    _
) {
    function TaskGraph(definition, context, domain) {
        this.definition = definition;

        if (this.definition.options && this.definition.options.instanceId) {
            this.instanceId = this.definition.options.instanceId;
        } else {
            this.instanceId = uuid.v4();
        }

        // Bool
        this.serviceGraph = this.definition.serviceGraph;
        this.context = context || {};
        this.context.graphId = this.instanceId;
        this.context.graphName = this.definition.injectableName;
        this.domain = domain || Constants.Task.DefaultDomain;
        this.name = this.definition.friendlyName;
        this.injectableName = this.definition.injectableName;

        this.tasks = {};

        // TODO: find instances of 'valid' elsewhere and replace from valid to Pending
        // TODO: replace _status with status
        this._status = Constants.Task.States.Pending;

        this.logContext = {
            graphInstance: this.instanceId,
            graphName: this.name
        };
        if (this.context.target) {
            this.logContext.id = this.context.target;
       }
        // For database ref linking
        this.node = this.context.target || this.definition.options.nodeId;

        return this;
    }

    /**
    This is a helper that visits one node. 
    This function is used in _visitGraphNode().
    */

    TaskGraph.prototype._checkGraphNode = function(task, value, dep) {
        var nonTerminalOnStates = [];

        // This won't actually get called because we do this same check in
        // _populateTaskData, but keep this here just to be defensive.
        if (!_.has(this.tasks, dep)) {
            throw new Error('Graph does not contain task with ID ' + dep);
        }
        // Expand 'finished' to all finished task states
        if (_.contains(value, Constants.Task.States.Finished)) {
            value = Constants.Task.FinishedStates;
            task.waitingOn[dep] = [Constants.Task.States.Finished];
        }
        // The dependent task is no longer terminal on state <value> since the
        // current task lists that condition as a dependency.
        // task.nonTerminalOnStates gets substracted from task.terminalOnStates
        // when we traverse to the dependent task.
        nonTerminalOnStates = Array.isArray(value) ? value : [value];

        return nonTerminalOnStates;
    };

    /**
    This is a helper that helps process a dependent task under waitOn key.
    this function is used at _populateTaskData()
    */
    TaskGraph.prototype._convertWaitOnTask = function(waitOnBase, idMap, waitOnTask) {
        
        var newWaitOnTaskKey = idMap[waitOnTask];
        var waitOnTaskValue = "";
        assert.ok(newWaitOnTaskKey, 'Task to wait on does not exist: ' + waitOnTask);
        
        waitOnTaskValue = waitOnBase[waitOnTask];
        delete waitOnBase[waitOnTask];
        waitOnBase[newWaitOnTaskKey] = waitOnTaskValue;
        
    };

 
    TaskGraph.prototype._visitGraphNode = function(
            taskId, parentTaskName, markers, nonTerminalOnStates) {
        var self = this;
        var task = self.tasks[taskId];
        var marker = markers[taskId];
        if (!marker) {
            marker = {};
            markers[taskId] = marker;
        }

        if (marker.temporaryMark) {
            throw new Error('Detected a cyclic graph with tasks %s and %s'.format(
                                task.injectableName, parentTaskName));
        }

        // Only check for task.terminalOnStates being null here, not for an empty array!
        var _terminalOnStates = task.terminalOnStates || Constants.Task.FinishedStates;
        task.terminalOnStates = _.difference(_terminalOnStates, nonTerminalOnStates);

        if (marker.permanentMark) {
            return;
        }
        marker.temporaryMark = true;
        
        _.forEach(task.waitingOn, function(value, dep) {
            var nonTerminalOnStates = [];
            if (dep === "anyOf") {
                _.forEach(task.waitingOn.anyOf, function(orValue, orDep) {
                    nonTerminalOnStates = self._checkGraphNode(
                        self.tasks.anyOf, orValue, orDep);
                    return self._visitGraphNode(
                        orDep, task.injectableName, markers, nonTerminalOnStates);
                });
            }
            else {
                nonTerminalOnStates = self._checkGraphNode(task, value, dep);
                return self._visitGraphNode(dep, task.injectableName, markers, nonTerminalOnStates);
            } 
        });

        marker.permanentMark = true;
        marker.temporaryMark = false;
    };

    TaskGraph.prototype.detectCyclesAndSetTerminalTasks = function() {
        var self = this;
        var markers = {};
        _.forEach(self.tasks, function(task, taskId) {
            self._visitGraphNode(taskId, null, markers, null);
        });
    };

   /*
    *This function is help to get the task name for the task graph with
    * without API path
    */ 
    var getTaskName = function(taskNameInput){
        // check if there is "/" in the input task name ,if yes 
        // it may have the api path before task name
        var taskName = taskNameInput;
        var slashOffset = taskNameInput.indexOf("\/");
        if(slashOffset !== -1){
            var nameArray = taskNameInput.split("\/");
            // get the really taskName
            taskName = nameArray[nameArray.length - 1];
        }
        return taskName;    
    };

    /*
     * Take the tasks definitions in this.definition.tasks, generate instanceIds
     * to use for each task, and then create new Task objects that reference
     * the instanceIds in their dependencies instead of user-created task labels.
     */
    // TODO: Replace this with a proper DFS traversal instead of iterating
    TaskGraph.prototype._populateTaskData = function() {
        var self = this;

        assert.arrayOfObject(self.definition.tasks);
        var idMap = _.transform(self.definition.tasks, function(result, v) {
            result[v.label] = uuid.v4();
        }, {});
        return Promise.map(self.definition.tasks, function(taskData) {
            assert.object(taskData);
            if (_.has(taskData, 'taskName')) {
                assert.string(taskData.taskName);
            } else if (_.has(taskData, 'taskDefinition')) {
                assert.object(taskData.taskDefinition);
            } else {
                throw new Error("All TaskGraph tasks should have either a taskName" +
                    " or taskDefinition property.");
            }

            _.forEach(_.keys(taskData.waitOn), function(waitOnTask) {
                if (waitOnTask === "anyOf") {
                    
                    assert.ok(typeof(taskData.waitOn[waitOnTask]) === "object", 
                        "task.waitOn.anyOf should be an object.");
                    
                    _.forEach(_.keys(taskData.waitOn[waitOnTask]), function(orTask){
                      self._convertWaitOnTask(taskData.waitOn[waitOnTask], idMap, orTask);
                    });
                }
                else {
                    self._convertWaitOnTask(taskData.waitOn, idMap, waitOnTask);
                }
            });
            var taskOverrides = {
                instanceId: idMap[taskData.label],
                waitingOn: taskData.waitOn,
                ignoreFailure: taskData.ignoreFailure
            };
            if (taskData.taskName) {
                var taskNameInput = taskData.taskName;
                var taskName = getTaskName(taskNameInput);
                return self.constructTaskObject(
                    taskName,
                    taskOverrides,
                    taskData.optionOverrides,
                    taskData.label
                )
                .then(function(definition) {
                    self.tasks[definition.instanceId] = definition;
                });
            } else if (taskData.taskDefinition) {
                var definition = self.constructInlineTaskObject(taskData.taskDefinition,
                    taskOverrides, taskData.optionOverrides, taskData.label);
                self.tasks[definition.instanceId] = definition;
            }
        })
        .spread(function() {
            return self;
        });
    };
 
    TaskGraph.prototype.constructInlineTaskObject = function(_definition, taskOverrides,
           optionOverrides, label) {

        return this._buildTaskDefinition(_definition, optionOverrides,
                taskOverrides, label);
    };

    TaskGraph.prototype.constructTaskObject = function(taskName, taskOverrides,
           optionOverrides, label) {
        var self = this;
        return store.getTaskDefinition(taskName)
        .then(function(taskDefinition) {
            return self._buildTaskDefinition(
                taskDefinition,
                optionOverrides,
                taskOverrides,
                label
            );
        });
    };

    TaskGraph.prototype._buildTaskDefinition = function(_definition, optionOverrides,
            taskOverrides, label) {
        var self = this;
        var definition = _.cloneDeep(_definition);

        var baseTaskDefinition = self._getBaseTask(_definition);
        definition.instanceId = taskOverrides.instanceId;
        definition.properties = _.merge(definition.properties, baseTaskDefinition.properties);
        definition.runJob = baseTaskDefinition.runJob;
        definition.jobOptionsSchema = baseTaskDefinition.optionsSchema;
        definition.options = _.merge(definition.options || {}, optionOverrides || {});
        definition.label = label;
        definition.name = taskOverrides.name || definition.injectableName;
        definition.waitingOn = taskOverrides.waitingOn || {};
        // TODO: Remove ignoreFailure in favor of better graph branching evalution.
        // NOTE: actually ignoreFailure is still useful for tasks that can
        // fail but we don't care AND don't have anything queued up to run
        // on failure (rare case probably, but still worth supporting IMO).
        definition.ignoreFailure = taskOverrides.ignoreFailure || false;

        // If there is JSON schema defined for the task, then pass all task-specified option and
        // defaults options to the task, let the schema to determine whether the additional options
        // is allowed or not.
        if (definition.optionsSchema || definition.jobOptionsSchema) {
            if (!_.isEmpty(self.definition.options)) {
                //first scan 'defaults' then task-specific, so the task-specific option will take
                //precedent
                _.forEach(self.definition.options.defaults, function(optionValue, optionName) {
                    definition.options[optionName] = optionValue;
                });

                _.forEach(self.definition.options[label], function(optionValue, optionName) {
                    //if the null value in the task-specific option but a non-null value in
                    //defaults, then we will pick the non-null value, as the null value may only
                    //be a placeholder in task graph definition to indicate that this value is a
                    //required option.
                    if (optionValue !== null || definition.options[optionName] === null) {
                        definition.options[optionName] = optionValue;
                    }
                });
            }
        }
        else { //TODO: Remove requiredOptions
            var allOptions = _.uniq(
                _.keys(definition.options).concat(baseTaskDefinition.requiredOptions || [])
            );

            // If the graph has specifically defined options for a task, don't bother
            // with whether they exist as a required option or not in the base definition.
            if (_.has(self.definition.options, label)) {
                allOptions = allOptions.concat(_.keys(self.definition.options[label]));
            }

            if (!_.isEmpty(self.definition.options)) {
                _.forEach(allOptions, function(option) {
                    var taskSpecificOptions = self.definition.options[label];
                    if (_.has(taskSpecificOptions, option)) {
                        definition.options[option] = taskSpecificOptions[option];
                    } else if (_.has(self.definition.options.defaults, option)) {
                        definition.options[option] = self.definition.options.defaults[option];
                    }
                });
            }
        }

        definition.state = Constants.Task.States.Pending;

        return definition;
    };

    /**
     * Attempt to create task objects for all tasks. Effectively this means
     * we are exercising the task definition rendering functionality for all
     * tasks with the information we have at this point in time, and so doing as much
     * up front validation as we can.
     */
    TaskGraph.prototype.renderTasks = function() {
        var self = this;
        return Promise.map(_.values(self.tasks), function(taskDefinition) {
            return Task.create(
                taskDefinition,
                { compileOnly: true, instanceId: taskDefinition.instanceId },
                self.context
            )
            .then(function(task) {
                // Overwrite definition options with the rendered options from
                // the task object
                self.tasks[taskDefinition.instanceId].options = task.options;
            });
        })
        .then(function() {
            return self;
        });
    };

    TaskGraph.prototype.validate = function () {
        var self = this;
        var context = {};

        return Promise.resolve()
        .then(function() {
            // TODO: Move this into the loop below so we don't iterate more than
            // necessary.
            self._validateTaskLabels();
        })
        .then(function() {
            assert.arrayOfObject(self.definition.tasks, 'Graph.tasks');
            return Promise.map(self.definition.tasks, function(taskData) {
                if (!_.has(taskData, 'taskDefinition')) {
                    var taskNameInput = taskData.taskName;
                    var taskName = taskNameInput;
                    if(taskNameInput){
                        taskName = getTaskName(taskNameInput);
                    }
                    return store.getTaskDefinition(taskName)
                    .then(function(definition) {
                        return {
                            taskDefinition: definition,
                            label: taskData.label
                        };
                    });
                } else {
                    return {
                        taskDefinition: taskData.taskDefinition,
                        label: taskData.label
                    };
                }
            })
            .then(function(tasks) {
                _.forEach(tasks, function(taskData) {
                    self._validateTaskDefinition(taskData.taskDefinition);
                    self._validateProperties(taskData.taskDefinition, context);
                    self._validateOptions(taskData.taskDefinition, taskData.label);
                });
            });
        })
        .then(function() {
            return self;
        });
    };

    TaskGraph.prototype._validateTaskLabels = function() {
        _.transform(this.definition.tasks, function(result, task) {
            assert.ok(task.label !== 'anyOf', 
                'Label anyOf is reserved, please use another label.');
            if (result[task.label]) {
                throw new Error(("The task label '%s' is used more than once in " +
                                "the graph definition.").format(task.label));
            } else {
                result[task.label] = true;
            }
        }, {});
    };

    TaskGraph.prototype._validateTaskDefinition = function(taskDefinition) {
        assert.object(taskDefinition, 'taskDefinition');
        assert.string(taskDefinition.friendlyName, 'friendlyName');
        assert.string(taskDefinition.injectableName, 'injectableName');
        assert.string(taskDefinition.implementsTask, 'implementsTask');
        assert.object(taskDefinition.options, 'options');
        assert.object(taskDefinition.properties, 'properties');

        var baseTaskDefinition = this._getBaseTask(taskDefinition);
        assert.string(baseTaskDefinition.friendlyName, 'friendlyName');
        assert.string(baseTaskDefinition.injectableName, 'injectableName');
        assert.string(baseTaskDefinition.runJob, 'runJob');
        assert.object(baseTaskDefinition.requiredProperties, 'requiredProperties');
        assert.object(baseTaskDefinition.properties, 'properties');
    };

    TaskGraph.prototype._validateProperties = function(taskDefinition, context) {
        var self = this;
        var baseTaskDefinition = self._getBaseTask(taskDefinition);
        var requiredProperties = baseTaskDefinition.requiredProperties;
        _.forEach(requiredProperties, function(v, k) {
            self.compareNestedProperties(
                v, k, context.properties, baseTaskDefinition.injectableName);
        });

        // Update shared context with properties from this task
        var _properties = _.merge(taskDefinition.properties, baseTaskDefinition.properties);
        context.properties = _.merge(_properties, context.properties);
    };

    TaskGraph.prototype._validateOptions = function(taskDefinition, label) {
        var self = this;
        var baseTaskDefinition = self._getBaseTask(taskDefinition);
        _.forEach((baseTaskDefinition.requiredOptions || []), function(k) {
            var option = taskDefinition.options[k];
            if (!option && _.has(self.definition.options.defaults, k)) {
                option = self.definition.options.defaults[k];
            }
            if (label && _.has(self.definition.options[label], k)) {
                option = self.definition.options[label][k];
            }
            assert.ok((option != null), // jshint ignore:line
                'required option ' + k + ' for task ' +
                taskDefinition.injectableName + ' in graph ' + self.injectableName);
        });
    };

    TaskGraph.prototype._getBaseTask = function(definition) {
        assert.object(definition);
        assert.string(definition.implementsTask);

        // TODO: this is just a temporary solution until base tasks are refactored
        // to be attributes of the job classes themselves.
        // TODO: also un-promisify all _getBaseTask calls *again* :(
        var baseTaskDefinition = _.find(taskLibrary, function(task) {
            return !task.implementsTask && task.injectableName === definition.implementsTask;
        });
        assert.object(baseTaskDefinition, "Base task definition for " +
                definition.implementsTask + " should exist");
        return baseTaskDefinition;
    };

    TaskGraph.prototype.compareNestedProperties = function(value, nestedKey, obj, taskName) {
        var self = this;

        // nested key is a dot notated string that represents a JSON scope, e.g.
        // os.linux.type represents { os: { linux: { type: 'value' } } }
        if (!nestedKey) {
            return;
        }
        assert.string(nestedKey);
        var keys = nestedKey.split('.');
        if (keys.length === 1) {
            assert.ok(_.has(obj, keys[0]),
                'expected property [' + keys[0] + '] to be supplied for task ' +
                taskName + ' in graph ' + self.injectableName);
            assert.equal(obj[keys[0]], value);
            return;
        }

        // Accumulator is a progressively nesting scope into an object, e.g.
        // 1. accumulator = key1  <- reduce makes accumulator the first item, which is a string
        // 2. accumulator = obj.key1.key2.key3  <- now accumulator is the object we returned
        // 3. accumulator = obj.key1.key2.key3.key4
        _.reduce(keys, function(accumulator, key) {
            var nested;

            if (typeof accumulator === 'string') {
                // First pass, accumulator is key[0]
                assert.ok(_.has(obj, accumulator),
                    'expected property [' + accumulator + '] to be supplied for task ' +
                    taskName + ' in graph ' + self.injectableName);
                nested = obj[accumulator];
            } else {
                // Subsequent passes, accumulator is an object
                assert.ok(_.has(accumulator, key),
                    'expected property [' + key + '] to be supplied for task ' +
                    taskName + ' in graph ' + self.injectableName);
                nested = accumulator;
            }

            // Last pass, check against the value now that we've reached
            // the correct scope.
            if (key === _.last(keys)) {
                assert.equal(nested[key], value,
                    'expected property [' + key + '] to equal ' + value + ' for task ' +
                    taskName + ' in graph ' + self.injectableName);
            }

            // Return next nested scope
            return nested[key];
        });
    };
    
    TaskGraph.prototype.createTaskDependencyObject = function(task) {
        return _.transform(task.waitingOn, function(out, states, taskId) {
            var first = true;
            var slicePosition = out.length || 1;
            states = Array.isArray(states) ? states : [states];
            states.forEach(function(state) {
                if (!out.length) {
                    var depObj = {};
                    depObj[taskId] = state;
                    out.push(depObj);
                } else if (first) {
                    out.forEach(function(item) {
                        item[taskId] = state;
                    });
                } else {
                    // Ensure each dependency object represents a unique
                    // dependency path iteration, accounting for all
                    // possible dependency paths.
                    // For example, if state === ['a', 'b'] then one dependency object
                    // will only include state 'a', and the other only state 'b'.
                    // This works with any number of items in any number of dependencies,
                    // and the number of unique dependency objects created is multiplicative.
                    var sliced = out.slice(out.length - slicePosition, out.length);
                    sliced.forEach(function(item) {
                        var dep = _.transform(item, function(result, v, k) {
                            result[k] = k === taskId ? state : v;
                        }, {});
                        out.push(dep);
                    });

                }
                if (first) {
                    first = false;
                }
            });

        }, []);
    };

    TaskGraph.prototype.createTaskDependencyItems = function() {
        var self = this;
        return _.flatten(_.map(this.tasks, function(task) {
            if (_.isEmpty(task.waitingOn)) {
                return {
                    taskId: task.instanceId,
                    dependencies: {},
                    terminalOnStates: task.terminalOnStates,
                    ignoreFailure: task.ignoreFailure
                };
            }
            return _.map(self.createTaskDependencyObject(task), function(dependencies) {
                return {
                    taskId: task.instanceId,
                    dependencies: dependencies,
                    terminalOnStates: task.terminalOnStates,
                    ignoreFailure: task.ignoreFailure
                };
            });
        }));
    };
    
    // enables JSON.stringify(this)
    TaskGraph.prototype.toJSON = function toJSON() {
        return this;
    };

    TaskGraph.prototype.persist = function() {
        var self = this;

        return Promise.all(
            _.flatten([
                store.persistGraphObject(self),
                this.createTaskDependencyItems().map(function(item) {
                    return store.persistTaskDependencies(item, self.instanceId)
                        .then(function () {
                            store.publishGraphRecord(self);
                        });
                })
            ])
        )
        .then(function() {
            return self;
        });
    };

    TaskGraph.create = function create(domain, data) {
        return TaskGraph.validateDefinition(domain, data)
        .then(function(graph) {
            return graph.renderTasks();
        });
    };

    TaskGraph.validateDefinition = function validateDefinition(domain, data) {
        var definition = data.definition;
        var options = data.options;
        var context = data.context;
        var _definition = _.cloneDeep(definition);
        _definition.options = _.merge(definition.options || {}, options || {});

        return Promise.resolve()
        .then(function() {
            var graph = new TaskGraph(_definition, context, domain);
            return graph.validate();
        })
        .then(function(graph) {
            return graph._populateTaskData();
        })
        .tap(function(graph) {
            graph.detectCyclesAndSetTerminalTasks();
        })
        .tap(function(graph) {
            return Promise.map(_.values(graph.tasks), function(taskDefinition) {
                return Task.validateDefinition(taskDefinition);
            });
        });
    };

    return TaskGraph;
}