lib/task-runner.js
// Copyright © 2016-2017 Dell Inc. or its subsidiaries. All Rights Reserved.
'use strict';
var di = require('di');
module.exports = taskRunnerFactory;
di.annotate(taskRunnerFactory, new di.Provide('TaskGraph.TaskRunner'));
di.annotate(taskRunnerFactory,
new di.Inject(
'Logger',
'Promise',
'Services.GraphProgress',
'Constants',
'Assert',
'uuid',
'_',
'Rx',
'Task.Task',
'Task.Messenger',
'TaskGraph.Store'
)
);
function taskRunnerFactory(
Logger,
Promise,
graphProgressService,
Constants,
assert,
uuid,
_,
Rx,
Task,
taskMessenger,
store
) {
var logger = Logger.initialize(taskRunnerFactory);
/**
* The taskRunner runs tasks which are sent over AMQP by a scheduler; a runner
* will only run tasks that share its domain
*
* @param {Object} options
* @param {String} options.domain - The scheduling domain to accept tasks from
* @param {Object} options.heartbeatInterval - Rx.js observable interval to schedule heartbeats
* @param {Number} options.lostBeatLimit - The number of heartbeats to allow
* an inactive task before expiring its lease
* @constructor
*/
function TaskRunner(options) {
options = options || {};
this.lostTasks = {};
this.taskRunnerId = uuid.v4();
this.completedTasks = [];
this.runTaskStream = new Rx.Subject();
this.cancelTaskStream = new Rx.Subject();
this.heartbeat = Rx.Observable.interval(options.heartbeatInterval || 1000);
this.subscriptions = [];
this.running = false;
this.activeTasks = {};
this.lostBeatLimit = options.lostBeatLimit || 3;
this.domain = options.domain || Constants.Task.DefaultDomain;
}
/**
* Returns true if the TaskRunner is running, false otherwise
* @returns {Boolean}
* @memberOf TaskRunner
*/
TaskRunner.prototype.isRunning = function() {
return this.running;
};
/**
* Initializes all permanent observable pipelines to handle
* running, heartbeating, and cancelling tasks
*
* @memberOf TaskRunner
*/
TaskRunner.prototype.initializePipeline = function() {
this.createRunTaskSubscription(this.runTaskStream).subscribe(
this.handleStreamSuccess.bind(this, 'Task finished'),
this.handleStreamError.bind(this, 'Task failure')
);
this.createHeartbeatSubscription(this.heartbeat).subscribe(
this.handleStreamSuccess.bind(this, null),
this.handleStreamError.bind(this, 'Error handling heartbeat failure')
);
this.createCancelTaskSubscription(this.cancelTaskStream)
.subscribe(
this.handleStreamSuccess.bind(this, 'Task cancelled'),
this.handleStreamError.bind(this, 'Task cancellation error')
);
};
/**
* Wraps promise or observable returning functions in their own new observable
* sequence to catch failures
*
* @memberOf TaskRunner
* @param {Object} toObserve - Promise/Observable to wrap
* @param {String} msg - an error message for if the wrapped observable fails
* @param {Object} the data to be mapped with the given Promise/Observable
* @returns {Observable}
*/
TaskRunner.prototype.safeStream = function(toObserve, msg, streamData) {
var self = this;
return Rx.Observable.just(streamData)
.flatMap(toObserve)
.catch(self.handleStreamError.bind(self,
msg || 'An unhandled Error occured in the safe task stream'));
};
/**
* Subscribes to run task messages over AMQP and pushes incoming messages
* into the runTaskStream
*
* @returns {Promise}
* @memberOf TaskRunner
*/
TaskRunner.prototype.subscribeRunTask = function() {
return taskMessenger.subscribeRunTask(
this.domain,
this.runTaskStream.onNext.bind(this.runTaskStream)
);
};
/**
* Subscribes to cancel task messages over AMQP and pushes incoming messages
* into the cancelTaskStream
*
* @returns {Promise}
* @memberOf TaskRunner
*/
TaskRunner.prototype.subscribeCancelTask = function() {
return taskMessenger.subscribeCancelTask(
this.cancelTaskStream.onNext.bind(this.cancelTaskStream)
);
};
/**
* Creates the Rx.Observable pipeline for running tasks:
* Checks out a task, then gets the task defition, then instantiates and runs the task
*
* @param {Object} runTaskStream
* @returns {Observable}
* @memberOf TaskRunner
*/
TaskRunner.prototype.createRunTaskSubscription = function(runTaskStream) {
var self = this;
return runTaskStream
.takeWhile(self.isRunning.bind(self))
.filter(function(taskData) {
return !_.has(self.activeTasks, taskData.taskId);
})
.flatMap(self.safeStream.bind(
self,
store.checkoutTask.bind(store, self.taskRunnerId),
'Error checking out task'))
.filter(function(data) { return !_.isEmpty(data);})
.flatMap(self.safeStream.bind(self, store.getTaskById, 'Error fetching task data'))
.filter(function(data) { return !_.isEmpty(data);})
.flatMap(self.runTask.bind(self));
};
/**
* Creates the Rx.Observable pipeline for cancelling tasks
*
* @param {Object} cancelTaskStream
* @returns {Observable}
* @memberOf TaskRunner
*/
TaskRunner.prototype.createCancelTaskSubscription = function(cancelTaskStream) {
var self = this;
return cancelTaskStream
.takeWhile(self.isRunning.bind(self))
.flatMap(self.cancelTask.bind(self));
};
/**
* Cancel a task
*
* @param {Object} data
* @returns {Observable}
* @memberOf TaskRunner
*/
TaskRunner.prototype.cancelTask = function(data) {
var self = this;
return Rx.Observable.just(data)
.map(function(taskData) {
return self.activeTasks[taskData.taskId];
})
.filter(function(task) { return !_.isEmpty(task); })
.tap(function(task) {
logger.info('Cancelling task', { taskId: task.instanceId });
})
.flatMap(function(task) { return task.cancel(); })
.map(function() { return { taskId: data.taskId }; })
.finally(function() {
delete self.activeTasks[data.taskId];
});
};
/**
* Creates the heartbeat pipeline from the given observable
*
* @param {Object} heartInterval
* @returns {Observable}
* @memberOf TaskRunner
*/
TaskRunner.prototype.createHeartbeatSubscription = function(heartInterval) {
var self = this;
return heartInterval
.takeWhile(self.isRunning.bind(self))
.flatMap(store.heartbeatTasksForRunner.bind(store, self.taskRunnerId))
.flatMap( function(taskCount) {
if(taskCount < Object.keys(self.activeTasks).length){
return self.handleUnownedTasks();
} else if (taskCount > Object.keys(self.activeTasks).length) {
return self.handleLostTasks();
}
return Rx.Observable.just(null);
})
.catch(function(error) {
logger.error('Failed to update heartbeat, stopping task runner and tasks', {
taskRunnerId: self.taskRunnerId,
error: error,
activeTasks: _.keys(self.activeTasks)
});
return Rx.Observable.just(self.stop.bind(self)());
});
};
/**
* Records the number of times specific tasks have been heartbeated while not active
* expires the leases on tasks which exceed the limit for heartbeats without being
* added to active tasks
*
* @returns {Observable}
* @memberOf TaskRunner
*/
TaskRunner.prototype.handleLostTasks = function() {
var self = this;
return Rx.Observable.fromPromise(store.getOwnTasks(self.taskRunnerId))
.flatMap(function(ownTasks) {
_.difference(_.pluck(ownTasks, 'taskId'), _.keys(self.activeTasks))
.forEach(function(taskId){
if(!self.lostTasks[taskId]) {
self.lostTasks[taskId] = 0;
}
self.lostTasks[taskId] += 1;
if(self.lostTasks[taskId] >= self.lostBeatLimit) {
store.expireLease(taskId);
delete self.lostTasks[taskId];
}
});
return Rx.Observable.just(null);
});
};
/**
* Stops any tasks which are in activeTasks but are not being heartbeated
*
* @returns {Observable}
* @memberOf TaskRunner
*/
TaskRunner.prototype.handleUnownedTasks = function() {
var self = this;
return Rx.Observable.fromPromise(store.getOwnTasks(self.taskRunnerId))
.flatMap(function(ownTasks) {
_.difference(Object.keys(self.activeTasks),
_.pluck(ownTasks, 'taskId'))
.forEach(function(taskId) {
logger.info('stopping unowned task ', {data: taskId});
if(self.activeTasks[taskId]) {
self.activeTasks[taskId].stop();
}
delete self.activeTasks[taskId];
});
return Rx.Observable.just(null);
});
};
/**
* Subscription function, called when any item is successfully processed by
* the consumed Observable pipeline
*
* @param {String} msg
* @param {Object} data
* @returns {Observable}
* @memberOf TaskRunner
*/
TaskRunner.prototype.handleStreamSuccess = function(msg, data) {
if (msg) {
if (data && !data.taskRunnerId) {
data.taskRunnerId = this.taskRunnerId;
}
logger.info(msg, data);
}
return Rx.Observable.empty();
};
/**
* Subscription function, called when an error is emitted on the consumed
* Observable pipeline
*
* @param {String} msg
* @param {Error} err
* @returns {Observable}
* @memberOf TaskRunner
*/
TaskRunner.prototype.handleStreamError = function(msg, err) {
logger.error(msg, {
taskRunnerId: this.taskRunnerId,
// stacks on some error objects don't get printed if part of
// the error object so separate them out here
error: _.omit(err, 'stack'),
stack: err.stack
});
return Rx.Observable.empty();
};
/**
* Instantiates, runs, and publishes the status of a task
*
* @param {Object} data
* @returns {Observable}
* @memberOf TaskRunner
*/
TaskRunner.prototype.runTask = function(data) {
var self = this;
return Rx.Observable.just(data)
.flatMap(function(_data) {
return Task.create(
_data.task,
{ instanceId: _data.task.instanceId },
_data.context
);
})
.tap(function(task) {
//The reason to cloneDeep the task is that the method tap in Rxjs is not waited
//to finish, and the data flowing into tap is modified by later stream operation.
var taskCopy = _.cloneDeep(task);
return graphProgressService.publishTaskStarted(taskCopy, {swallowError: true});
})
.tap(function(task) {
self.activeTasks[task.instanceId] = task;
logger.info("Running task ", {
taskRunnerId: self.taskRunnerId,
taskId: task.instanceId,
taskName: task.definition.injectableName
});
})
.flatMap(function(task) {
return task.run();
})
.takeWhile(function(task) { return !_.isEmpty(task);})
.flatMap(function(task) {
return Rx.Observable.forkJoin([
Rx.Observable.just(task),
store.setTaskState({
taskId: task.instanceId,
graphId: task.context.graphId,
state: task.state,
error: task.error,
context: task.context
})
]);
})
.map(_.first)
.tap(function(task) {
delete self.activeTasks[task.instanceId];
})
.tap(function(task) {
return taskMessenger.publishTaskFinished(self.domain, task, true);
})
.map(function(task) {
task.taskName = task.definition.injectableName;
return _.pick(task, ['instanceId', 'taskName', 'state']); })
.catch(self.handleStreamError.bind(self, 'error while running task'));
};
/**
* Stops the TaskRunner and disposes resources as necessary
*
* @returns {Promise}
* @memberOf TaskRunner
*/
TaskRunner.prototype.stop = function() {
var self = this;
self.running = false;
return Promise.map(this.subscriptions, function() {
return self.subscriptions.pop().dispose();
});
};
/**
* Starts the task runner and initializes pipelines
*
* @returns {Promise}
* @memberOf TaskRunner
*/
TaskRunner.prototype.start = function() {
var self = this;
return Promise.resolve()
.then(function() {
self.running = true;
self.initializePipeline();
return [self.subscribeCancelTask(), self.subscribeRunTask()];
})
.spread(function(cancelSubscription, runTaskSubscription) {
self.subscriptions.push(cancelSubscription);
self.subscriptions.push(runTaskSubscription);
logger.info('Task runner started', {
TaskRunnerId: self.taskRunnerId,
domain: self.domain
});
});
};
/** creates a new TaskRunner
*
* @param {Object} options
* @returns {Object} TaskRunner instance
* @memberOf TaskRunner
*/
TaskRunner.create = function(options) {
return new TaskRunner(options);
};
return TaskRunner;
}