
View on GitHub


1 day
Test Coverage
// Copyright 2015, EMC, Inc.

'use strict';

var di = require('di');

module.exports = completedTaskPollerFactory;
        new di.Provide('TaskGraph.CompletedTaskPoller'));
    new di.Inject(

function completedTaskPollerFactory(
) {
    var logger = Logger.initialize(completedTaskPollerFactory);

     * The CompletedTaskPoller polls the store for any tasks that have been
     * finished or marked as unreachable (in the case of multiple branches of
     * execution within a graph). It also evaluates graph states for
     * completed tasks, and finally deletes them from the store so that it
     * doesn't grow to be too large.
     * @param {String} domain
     * @param {Object} options
     * @constructor CompletedTaskPoller
    function CompletedTaskPoller(domain, options) {
        options = options || {};
        this.running = false;
        this.pollInterval = options.pollInterval || 1000;
        this.concurrentCounter = { count: 0, max: 1 };
        this.completedTaskBatchSize = options.completedTaskBatchSize || 200;
        assert.number(this.completedTaskBatchSize, 'completedTaskBatchSize');
        this.domain = domain || Constants.Task.DefaultDomain;
        assert.string(this.domain, 'domain');
        this.debug = _.has(options, 'debug') ? options.debug : false;

     * Poll the store for completed tasks.
     * @memberOf CompletedTaskPoller
    CompletedTaskPoller.prototype.pollTasks = function() {
        var self = this;
         * Before setting up the stream, make sure it is running, otherwise
         * this will create a stream that will never run and immediately complete.
         * This is basically defensive programming to try to prevent accidents where the
         * startup code is executed in the wrong order (e.g. pollTasks() then
         * this.running = true, that would be buggy because pollTasks() would
         * stop execution before this.running = true was set).
        assert.ok(self.running, 'lease expiration poller is running');

         * For those unfamiliar with Rx.js:
         * This Rx.Observable.interval call produces a continuously running
         * pipeline of the chained calls below, that triggers on every
         * pollInterval period. The reason for the .takeWhile call at the top is
         * to basically implement an auto-disposal mechanism: before doing any
         * of the subsequent logic, first check if we're still running and
         * just shut down whole stream if we're not. That way we don't have
         * to do any asynchronous disposal when stopping the service, we just
         * set a variable this.running = false.
         * The .map call is where the actual work is done, and the rest of the
         * calls are just coordination about when to do it.
        .map(self.processCompletedTasks.bind(self, self.completedTaskBatchSize))
        // Don't let processCompletedTasks return a waterline object to the logger
        // otherwise it will exceed the call stack trying to traverse a circular object
            // Success handler callback. Only log in debug mode.
            self.handleStreamDebug.bind(self, 'CompletedTaskPoller stream pipeline success'),
            // Error handler callback. This is considered catastrophic, most errors
            // _should_ be caught at a lower level and not bubbled up.
            self.handleStreamError.bind(self, 'Error with completed task deletion stream.')

     * This is used with Rx.Observable.prototype.takeWhile in the Observable
     * created by CompletedTaskPoller.prototype.pollTasks. When isRunning()
     * returns false, all the observables will automatically dispose.
     * @returns {Boolean}
     * @memberOf CompletedTaskPoller
    CompletedTaskPoller.prototype.isRunning = function() {
        return this.running;

     * Find <limit> number of completed tasks in the store and process them,
     * determining if any actions need to be taken in regards to the graph state,
     * and finally deleting the task documents from the store.
     * @param {Number} limit
     * @returns {Observable}
     * @memberOf CompletedTaskPoller
    CompletedTaskPoller.prototype.processCompletedTasks = function(limit) {
        return Rx.Observable.just()
        .flatMap(store.findCompletedTasks.bind(store, limit))
        .filter(function(tasks) { return !_.isEmpty(tasks); })
        .catch(this.handleStreamError.bind(this, 'Error processing completed tasks'));

     * Determine if a graph is finished, and if so mark its finished state
     * in the store and publish an event to the messenger.
     * @param {Object} data
     * @returns {Observable}
     * @memberOf CompletedTaskPoller
    CompletedTaskPoller.prototype.handlePotentialFinishedGraph = function(data) {
        var self = this;
        assert.object(data, 'data');
        assert.string(data.state, 'data.state');

        return Rx.Observable.just(data)
        .flatMap(function(data) {
            if (_.contains(Constants.Task.FailedStates, data.state)) {
                data.failed = true;
                data.done = true;
                return Rx.Observable.just(data);
            return store.checkGraphSucceeded(data);
        .flatMap(function(_data) {
            if (_data.done) {
                var graphState;
                if (data.failed) {
                    graphState = Constants.Task.States.Failed;
                } else {
                    graphState = Constants.Task.States.Succeeded;

                return store.setGraphDone(graphState, _data)
                .then(function(graph) {
                    // Don't publish duplicate events if we've already set the graph as done
                    // prior, but DO continue with the outer stream so that we delete
                    // the task document whose existence triggered this check.
                    if (!_.isEmpty(graph)) {
                        return Promise.resolve()
                        .tap(function() {
                            return graphProgressService.publishGraphFinished(
                                {swallowError: true}
                        .then(function() {
                            return self._publishGraphFinished(graph);
            return Rx.Observable.just();

     * Evaluate an array of finished tasks, and check if a graph is finished
     * for each task that is marked as being potentially terminal.
     * @param {Array} tasks
     * @returns {Observable}
     * @memberOf CompletedTaskPoller
    CompletedTaskPoller.prototype.deleteCompletedGraphs = function(tasks) {
        assert.arrayOfObject(tasks, 'tasks array');
        var terminalTasks = _.transform(tasks, function(result, task) {
            // Collect only terminal tasks (tasks that we know are the last or
            // one of the last tasks to run in a graph) for determing graph completion checks.
            // This logic handles cases where all tasks in a graph are completed,
            // but the graph completion event was dropped by the scheduler, either
            // due to high load or a process failure. Hooking onto task deletion
            // allows us to avoid doing full collection scans against graphobjects
            // to find potential unfinished graphs.
            if (_.contains(task.terminalOnStates, task.state)) {
        if (_.isEmpty(terminalTasks)) {
            return Rx.Observable.just(tasks);

        return Rx.Observable.from(terminalTasks)
        .catch(this.handleStreamError.bind(this, 'Error handling potential finished graphs'));

     * Delete task documents from the store
     * @param {Array} tasks
     * @returns {Observable}
     * @memberOf CompletedTaskPoller
    CompletedTaskPoller.prototype.deleteTasks = function(tasks) {
        assert.arrayOfObject(tasks, 'tasks array');
        var objectIds = _.map(tasks, function(task) {
            return task._id;

        return Rx.Observable.just(objectIds)
        .catch(this.handleStreamError.bind(this, 'Error deleting completed tasks'));

     * Log handler for observable onError failure events.
     * @param {String} msg
     * @param {Object} err
     * @returns {Observable}
     * @memberOf CompletedTaskPoller
    CompletedTaskPoller.prototype.handleStreamError = function(msg, err) {
        logger.error(msg, {
            // stacks on some error objects (particularly from the assert library)
            // don't get printed if part of the error object so separate them out here.
            error: _.omit(err, 'stack'),
            stack: err.stack
        return Rx.Observable.empty();

     * Log handler for debug messaging during development/debugging. Only
     * works when this.debug is set to true;
     * @param {String} msg
     * @param {Object} data
     * @returns {Observable}
     * @memberOf CompletedTaskPoller
    CompletedTaskPoller.prototype.handleStreamDebug = function(msg, data) {
        if (this.debug) {
            logger.debug(msg, data);

     * @memberOf CompletedTaskPoller
    CompletedTaskPoller.prototype.start = function() {
        this.running = true;

     * @memberOf CompletedTaskPoller
    CompletedTaskPoller.prototype.stop = function() {
        this.running = false;

     * @returns {Object} CompletedTaskPoller instance
     * @memberOf CompletedTaskPoller
    CompletedTaskPoller.create = function(domain, options) {
        return new CompletedTaskPoller(domain, options);

     * Publish a graph finished event over AMQP and via Web Hooks.
     * @param {Object} graph
     * @returns {Promise}
     * @memberOf CompletedTaskPoller
    CompletedTaskPoller.prototype._publishGraphFinished = function(graph) {
        return eventsProtocol.publishGraphFinished(graph.instanceId, {
            graphId: graph.instanceId,
            graphName: graph.name,
            status: graph._status
        }, graph.node)
        .catch(function(error) {
            logger.error('Error publishing graph finished event', {
                graphId: graph.instanceId,
                _status: graph._status,
                error: error

    return CompletedTaskPoller;