lib/task-scheduler.js
// Copyright © 2016-2017 Dell Inc. or its subsidiaries. All Rights Reserved.
'use strict';
var di = require('di');
module.exports = taskSchedulerFactory;
di.annotate(taskSchedulerFactory, new di.Provide('TaskGraph.TaskScheduler'));
di.annotate(taskSchedulerFactory,
new di.Inject(
'Protocol.Events',
'TaskGraph.Store',
'TaskGraph.LeaseExpirationPoller',
'Constants',
'Logger',
'Promise',
'uuid',
'Assert',
'_',
'Rx.Mixins',
'Task.Messenger',
'Services.Configuration',
'TaskGraph.TaskScheduler.Server',
'consul',
'Services.GraphProgress'
)
);
function taskSchedulerFactory(
eventsProtocol,
store,
LeaseExpirationPoller,
Constants,
Logger,
Promise,
uuid,
assert,
_,
Rx,
taskMessenger,
configuration,
SchedulerServer,
Consul,
graphProgressService
) {
var logger = Logger.initialize(taskSchedulerFactory);
var url = require('url');
var consulUrl = configuration.get('consulUrl');
var consul;
if (consulUrl) {
var urlObject = url.parse(consulUrl);
// Create a promisified Consul interface
var consulOpts = {
host: urlObject.hostname,
port: urlObject.port || 8500
};
consul = Consul(_.merge({}, consulOpts, {
promisify: function(fn) {
return new Promise(function(resolve, reject) {
try {
return fn(function(err, data, res) {
if (err) {
err.res = res;
return reject(err);
}
return resolve([data, res]);
});
} catch (err) {
return reject(err);
}
});
}
}));
}
/**
* The TaskScheduler handles all graph and task evaluation, and is
* the decision engine for scheduling new tasks to be run within a graph.
*
* @param {Object} options
* @constructor
*/
function TaskScheduler(options) {
options = options || {};
this.running = false;
this.schedulerId = options.schedulerId || uuid.v4();
this.domain = options.domain || Constants.Task.DefaultDomain;
this.evaluateTaskStream = new Rx.Subject();
this.evaluateGraphStream = new Rx.Subject();
this.checkGraphFinishedStream = new Rx.Subject();
this.pollInterval = options.pollInterval || 500;
this.concurrencyMaximums = this.getConcurrencyMaximums(options.concurrent);
this.findUnevaluatedTasksLimit = options.findUnevaluatedTasksLimit || 200;
this.subscriptions = [];
this.leasePoller = null;
this.debug = _.has(options, 'debug') ? options.debug : false;
}
/**
* Generate a concurrency counter object. This is used with the
* Rx.Observable.prototype.mergeLossy function from Rx.Mixins to keep
* ensure that a specified maximum of the same type of asynchronous
* call be able to be unresolved at the same time (for example, only
* wait on a max of 100 database calls to resolve at any point in time).
*
* @param {Number} max
* @returns {Object} concurrency counter object
* @memberOf TaskScheduler
*/
TaskScheduler.prototype.concurrentCounter = function(max) {
assert.number(max);
return {
count: 0,
max: max
};
};
/**
* Generate concurrency counter objects for the different IO calls we make
* to the store and messenger. Basically a rudimentary method of adding throttling.
*
* @param {Object} concurrentOptions
* @returns {Object} set of concurrency counter objects
* @memberOf TaskScheduler
*/
TaskScheduler.prototype.getConcurrencyMaximums = function(concurrentOptions) {
var self = this;
var _options = _.defaults(concurrentOptions || {}, {
// Favor evaluateGraphStream, since it's what results in actual
// scheduling. If we're at load, defer evaluation in favor of scheduling.
// TODO: Probably better as a priority queue, evaluateGraphStream events
// always come up first?
findReadyTasks: 100,
updateTaskDependencies: 100,
handleScheduleTaskEvent: 100,
completeGraphs: 100,
findUnevaluatedTasks: 1
});
return _.transform(_options, function(result, v, k) {
result[k] = self.concurrentCounter(v);
}, {});
};
/**
* Create and start the Rx observables (streams) that do all the scheduling work.
*
* @memberOf TaskScheduler
*/
TaskScheduler.prototype.initializePipeline = function() {
/*
* Before setting up the stream, make sure it is running, otherwise
* this will create a stream that will never run and immediately complete.
* This is basically defensive programming to try to prevent accidents where the
* startup code is executed in the wrong order (e.g. pollTasks() then
* this.running = true, that would be buggy because pollTasks() would
* stop execution before this.running = true was set).
*/
assert.ok(this.running, 'scheduler is running');
// Inputs from this.evaluateTaskStream
// Outputs to this.evaluateGraphStream
// Outputs to this.checkGraphFinishedStream
this.createUpdateTaskDependenciesSubscription(
this.evaluateTaskStream, this.evaluateGraphStream, this.checkGraphFinishedStream)
.subscribe(
this.handleStreamDebug.bind(this, 'Task evaluated'),
this.handleStreamError.bind(this, 'Error at update task dependencies stream')
);
// Outputs to this.evaluateTaskStream
this.createUnevaluatedTaskPollerSubscription(this.evaluateTaskStream)
.subscribe(
this.handleStreamDebug.bind(this, 'Triggered evaluate task event'),
this.handleStreamError.bind(this, 'Error polling for tasks')
);
// Outputs to this.evaluateGraphStream
this.createEvaluatedTaskPollerSubscription(this.evaluateGraphStream)
.subscribe(
this.handleStreamDebug.bind(this, 'Triggered evaluate graph event'),
this.handleStreamError.bind(this, 'Error polling for tasks')
);
// Inputs from this.evaluateGraphStream
this.createTasksToScheduleSubscription(this.evaluateGraphStream)
.subscribe(
this.handleStreamSuccess.bind(this, 'Task scheduled'),
this.handleStreamError.bind(this, 'Error at task scheduling stream')
);
// Inputs from this.checkGraphFinishedStream
this.createCheckGraphFinishedSubscription(this.checkGraphFinishedStream)
.subscribe(
this.handleStreamSuccess.bind(this, 'Graph finished'),
this.handleStreamError.bind(this, 'Error at check graph finished stream')
);
};
/**
* This is used with Rx.Observable.prototype.takeWhile in each Observable
* created by TaskScheduler.prototype.initializePipeline. When isRunning()
* returns false, all the observables will automatically dispose.
*
* @returns {Boolean}
* @memberOf TaskScheduler
*/
TaskScheduler.prototype.isRunning = function() {
return this.running;
};
/**
* This finds all tasks that are ready to run. If a graphId is
* specified in the data object, then it will only find tasks ready
* to run that are within that graph.
*
* @param {Object} data
* @returns {Observable}
* @memberOf TaskScheduler
*/
TaskScheduler.prototype.findReadyTasks = function(data) {
assert.object(data);
var self = this;
return Rx.Observable.just(data)
.flatMap(function() {
return store.findReadyTasks(self.domain, data.graphId);
})
.catch(self.handleStreamError.bind(self, 'Error finding ready tasks'));
};
/**
* This handles task finished events, and updates all other tasks that
* have a waitingOn dependency on the finished task.
*
* @param {Object} taskHandlerStream
* @param {Object} evaluateGraphStream
* @param {Object} checkGraphFinishedStream
* @returns {Observable}
* @memberOf TaskScheduler
*/
TaskScheduler.prototype.createUpdateTaskDependenciesSubscription =
function(taskHandlerStream, evaluateGraphStream, checkGraphFinishedStream) {
var self = this;
return taskHandlerStream
.takeWhile(self.isRunning.bind(self))
.tap(self.handleStreamDebug.bind(self, 'Received evaluate task event'))
.map(self.updateTaskDependencies.bind(self))
.mergeLossy(self.concurrencyMaximums.updateTaskDependencies)
.tap(function(task) {
var _task = _.pick(task, ['domain', 'graphId', 'taskId']);
self.handleStreamDebug('Updated dependencies for task', _task);
})
.filter(function(data) { return data; })
.tap(function(task){
//The reason to cloneDeep the task is that the method tap in Rxjs is not waited
//to finish, and the data flowing into tap is modified by later stream operation.
var taskCopy = _.cloneDeep(task);
return graphProgressService.publishTaskFinished(taskCopy, {swallowError: true});
})
.map(self.handleEvaluatedTask.bind(self, checkGraphFinishedStream, evaluateGraphStream));
};
/**
* Once a task has finished and been evaluated (dependendent tasks updated)
* then check if the task is terminal to determine whether the graph is potentially
* completed or not. If it is not terminal, emit to evaluateGraphStream which
* will trigger findReadyTasks for that graph. If it is terminal, check if
* the graph is finished.
*
* @param {Object} checkGraphFinishedStream
* @param {Object} evaluateGraphStream
* @param {Object} data
* @memberOf TaskScheduler
*/
TaskScheduler.prototype.handleEvaluatedTask = function(
checkGraphFinishedStream, evaluateGraphStream, data) {
if (_.contains(data.terminalOnStates, data.state)) {
checkGraphFinishedStream.onNext(data);
} else {
evaluateGraphStream.onNext({ graphId: data.graphId });
}
};
/**
* Stream handler that finds tasks that are ready to schedule and schedules
* them.
*
* @param {Object} evaluateGraphStream
* @returns {Observable}
* @memberOf TaskScheduler
*/
TaskScheduler.prototype.createTasksToScheduleSubscription = function(evaluateGraphStream) {
var self = this;
return evaluateGraphStream
.takeWhile(self.isRunning.bind(self))
.map(self.findReadyTasks.bind(self))
.mergeLossy(self.concurrencyMaximums.findReadyTasks)
.filter(function(data) { return !_.isEmpty(data.tasks); })
.pluck('tasks')
// Ideally this would just be .pluck('tasks').from() but that didn't work.
// Instead, flatMap the Rx.Observable.from observable below.
// Take a single stream event with an array data type, and emit
// multiple new stream events (one for each element in the array)
// that are all non-blocking on each other, because we want to process
// handleScheduleTaskEvent for each task independently of the others.
.flatMap(function(tasks) { return Rx.Observable.from(tasks); })
.map(self.handleScheduleTaskEvent.bind(self))
.mergeLossy(self.concurrencyMaximums.handleScheduleTaskEvent)
.map(function(task) {
return _.pick(task, ['domain', 'graphId', 'graphName', 'taskId','taskName']);
});
};
/**
* Publish a task schedule event with the messenger.
*
* @param {Object} data
* @returns {Observable}
* @memberOf TaskScheduler
*/
TaskScheduler.prototype.getGraphNameAndTaskNameFromDB = function(data){
var self = this;
var task_data = store.getTaskById(data);
return task_data.then(function (res) {
return {
"domain": self.domain,
"taskId": res.task.instanceId,
"taskName": res.task.injectableName,
"graphId": res.graphId,
"graphName": res.context.graphName
};
});
};
TaskScheduler.prototype.handleScheduleTaskEvent = function(data) {
var self = this;
assert.object(data, 'task data object');
return Rx.Observable.just(data)
.flatMap(self.getGraphNameAndTaskNameFromDB.bind(self))
.flatMap(self.publishScheduleTaskEvent.bind(self))
.catch(self.handleStreamError.bind(self, 'Error scheduling task'));
};
/**
* Determine whether a graph is finished or failed based on a terminal
* task state.
*
* @param {Object} checkGraphFinishedStream
* @returns {Observable}
* @memberOf TaskScheduler
*/
TaskScheduler.prototype.createCheckGraphFinishedSubscription = function(
checkGraphFinishedStream) {
var self = this;
return checkGraphFinishedStream
.takeWhile(self.isRunning.bind(self))
.map(function(data) {
// We already know that the task in question is in a terminal state,
// otherwise we wouldn't have published data to this stream.
// If a task is in a failed task state but it is non-terminal, this
// code will not be reached.
if (!data.ignoreFailure && _.contains(Constants.Task.FailedStates, data.state)) {
return self.failGraph(data, Constants.Task.States.Failed);
} else {
return self.checkGraphSucceeded(data);
}
})
.mergeLossy(self.concurrencyMaximums.completeGraphs);
};
/**
* Check if a graph is finished. If so, mark it as successful
* in the store.
*
* @param {Object} data
* @returns {Observable}
* @memberOf TaskScheduler
*/
TaskScheduler.prototype.checkGraphSucceeded = function(data) {
assert.object(data, 'graph data object');
var self = this;
return Rx.Observable.just(data)
.flatMap(store.checkGraphSucceeded.bind(store))
.filter(function(_data) { return _data.done; })
.flatMap(store.setGraphDone.bind(store, Constants.Task.States.Succeeded))
.filter(function(graph) { return !_.isEmpty(graph); })
.tap(function(graph) {
return graphProgressService.publishGraphFinished(
graph,
Constants.Task.States.Succeeded,
{swallowError: true}
);
})
.map(function(graph) {
var obj = _.pick(graph, ['instanceId', '_status', 'node', 'name']);
if(obj.node) {
// Change node Id from object to string
obj.node = obj.node.toString();
}
return obj;
})
.tap(self._publishGraphFinished.bind(self))
.catch(self.handleStreamError.bind(self, 'Error handling graph done event'));
};
/**
* Set a graph to a failed state in the store, and find pending tasks
* within the graph that should also be failed.
*
* @param {Object} data
* @param {String} graphState
* @returns {Observable}
* @memberOf TaskScheduler
*/
TaskScheduler.prototype.failGraph = function(data, graphState) {
var self = this;
var graphToBePublished;
return Rx.Observable.just(data.graphId)
.flatMap(store.getActiveGraphById)
.filter(function(graph) {return !_.isEmpty(graph);})
.map(function(doneGraph) {
graphToBePublished = _.cloneDeep(doneGraph);
return _.map(doneGraph.tasks, function(taskObj) {
if(taskObj.state === Constants.Task.States.Pending) {
taskObj.state = graphState;
}
taskObj.taskId = taskObj.instanceId;
taskObj.graphId = data.graphId;
return taskObj;
});
})
.flatMap(self.handleFailGraphTasks.bind(self))
.flatMap(store.setGraphDone.bind(store, graphState, data))
// setGraphDone can return null if another source has already updated
// the graph state. Don't publish the same event twice.
.filter(function(graph) {
graph._id = graph._id.toString();
if(graph.node) {
graph.node = graph.node.toString();
}
return graph;
})
.tap(self._publishGraphFinished.bind(self))
.tap(function() {
return graphProgressService.publishGraphFinished(
graphToBePublished,
graphState,
{swallowError: true}
);
})
.catch(self.handleStreamError.bind(self, 'Error failing/cancelling graph'));
};
/**
* Fail pending tasks within a graph.
*
* @param {Array} tasks
* @returns {Observable}
* @memberOf TaskScheduler
*/
TaskScheduler.prototype.handleFailGraphTasks = function(tasks) {
logger.debug('cancel/failing pending tasks', {data:_.pluck(tasks, 'taskId')});
return Rx.Observable.just(tasks)
.flatMap(Promise.map.bind(Promise, tasks, store.setTaskState))
.flatMap(Promise.map.bind(Promise, tasks, store.markTaskEvaluated))
.flatMap(Promise.map.bind(Promise, _.pluck(tasks, 'taskId'),
taskMessenger.publishCancelTask))
.flatMap(Promise.map.bind(Promise, tasks, store.setTaskStateInGraph.bind(store)));
};
/**
* Subscribe to messenger events to cancel a graph, and fail the graph on
* received events.
*
* @returns {Promise}
* @memberOf TaskScheduler
*/
TaskScheduler.prototype.subscribeCancelGraph = function() {
var self = this;
return taskMessenger.subscribeCancelGraph(
function(data) {
var resolve;
var deferred = new Promise(function(_resolve) {
resolve = _resolve;
});
logger.debug('listener received cancel graph event', {
data: data,
schedulerId: self.schedulerId
});
self.failGraph(data, Constants.Task.States.Cancelled)
.pluck('instanceId')
.subscribe(
function(graphId) {
if (deferred.isPending()) {
resolve(graphId);
}
self.handleStreamSuccess('Graph Cancelled', { graphId: graphId });
},
self.handleStreamError.bind(self, 'subscribeCancelGraph stream error'),
function() {
// If there is no active workflow, this.failGraph will trigger
// an onCompleted event but not an onNext event. Use this to determine
// when a bad request has been made and to respond to the messenger request
// with null. This lets the API server handle error codes for the race
// condition where there is an active workflow before the messenger
// request is made.
if (deferred.isPending()) {
resolve(null);
}
}
);
return deferred;
}
);
};
/**
* Evaluate and update all tasks that have a waitingOn dependency for a finished task,
* then mark the finished task as evaluated. If a failure occurs before the
* store.markTaskEvaluated call, then this process (which is idempotent) will be
* repeated on the next poll interval. This basically enables some sort of
* an equivalent to a transactional database call in failure cases.
*
* @param {Object} data
* @returns {Observable}
* @memberOf TaskScheduler
*/
TaskScheduler.prototype.updateTaskDependencies = function(data) {
assert.object(data, 'task dependency object');
return Rx.Observable.forkJoin([
store.setTaskStateInGraph(data),
store.updateDependentTasks(data),
store.updateUnreachableTasks(data)
])
.flatMap(store.markTaskEvaluated.bind(store, data))
.catch(this.handleStreamError.bind(this, 'Error updating task dependencies'));
};
/**
* Log handler for observable onNext success events.
*
* @param {String} msg
* @param {Object} tasks
* @returns {Observable}
* @memberOf TaskScheduler
*/
TaskScheduler.prototype.handleStreamSuccess = function(msg, data) {
if (msg) {
if (data) {
data.schedulerId = this.schedulerId;
}
logger.debug(msg, data);
}
return Rx.Observable.empty();
};
/**
* Log handler for observable onError failure events.
*
* @param {String} msg
* @param {Object} err
* @returns {Observable}
* @memberOf TaskScheduler
*/
TaskScheduler.prototype.handleStreamError = function(msg, err) {
logger.error(msg, {
schedulerId: this.schedulerId,
// stacks on some error objects (particularly from the assert library)
// don't get printed if part of the error object so separate them out here.
error: _.omit(err, 'stack'),
stack: err.stack
});
return Rx.Observable.empty();
};
/**
* Log handler for debug messaging during development/debugging. Only
* works when this.debug is set to true;
*
* @param {String} msg
* @param {Object} data
* @returns {Observable}
* @memberOf TaskScheduler
*/
TaskScheduler.prototype.handleStreamDebug = function(msg, data) {
if (this.debug) {
if (data) {
data.schedulerId = this.schedulerId;
}
logger.debug(msg, data);
}
};
/**
* Receive messenger events for when tasks finish, and kick off the task/graph
* evaluation via the evaluateTaskStream.
*
* @returns {Promise}
* @memberOf TaskScheduler
*/
TaskScheduler.prototype.subscribeTaskFinished = function() {
var self = this;
return taskMessenger.subscribeTaskFinished(
this.domain,
function(data) {
logger.debug('Listener received task finished event, triggering evaluation', {
data: data,
schedulerId: self.schedulerId
});
self.evaluateTaskStream.onNext(data);
}
);
};
/**
* Publish a run task event with the messenger, to be picked up by any task runners
* within the domain.
*
* @param {Object} data
* @returns {Promise}
* @memberOf TaskScheduler
*/
TaskScheduler.prototype.publishScheduleTaskEvent = function(data) {
// TODO: Add more scheduling logic here when necessary
return taskMessenger.publishRunTask(this.domain, data.taskId, data.graphId)
.then(function() {
return data;
});
};
/**
* On the case of messenger or scheduler failures, or lossiness caused by high load,
* poll the database on an interval to pick up dropped work related to
* updating unevaluated tasks and dependent tasks.
*
* @param {Object} evaluateTaskStream
* @returns {Observable}
* @memberOf TaskScheduler
*/
TaskScheduler.prototype.createUnevaluatedTaskPollerSubscription = function(evaluateTaskStream) {
var self = this;
return Rx.Observable.interval(self.pollInterval)
.takeWhile(self.isRunning.bind(self))
.map(self.findUnevaluatedTasks.bind(self, self.domain))
.mergeLossy(self.concurrencyMaximums.findUnevaluatedTasks)
.flatMap(function(tasks) { return Rx.Observable.from(tasks); })
.map(evaluateTaskStream.onNext.bind(evaluateTaskStream));
};
/**
* On the case of messenger or scheduler failures, or lossiness caused by high load,
* poll the database on an interval to pick up dropped work related to
* evaluating graph states and scheduling new tasks.
*
* @param {Object} evaluateGraphStream
* @returns {Observable}
* @memberOf TaskScheduler
*/
TaskScheduler.prototype.createEvaluatedTaskPollerSubscription = function(evaluateGraphStream) {
var self = this;
return Rx.Observable.interval(self.pollInterval)
.takeWhile(self.isRunning.bind(self))
.map(evaluateGraphStream.onNext.bind(evaluateGraphStream, {}));
};
/**
* Find all tasks in the database that haven't been fully evaluated
* (see TaskScheduler.prototype.updateTaskDependencies).
*
* @param {String} domain
* @returns {Observable}
* @memberOf TaskScheduler
*/
TaskScheduler.prototype.findUnevaluatedTasks = function(domain) {
return Rx.Observable.just()
.flatMap(store.findUnevaluatedTasks.bind(store, domain, this.findUnevaluatedTasksLimit))
.tap(function(tasks) {
if (tasks && tasks.length) {
logger.debug('Poller is triggering unevaluated tasks to be evaluated', {
tasks: _.map(tasks, 'taskId')
});
}
})
.catch(this.handleStreamError.bind(this, 'Error finding unevaluated tasks'));
};
/**
* Subscribe to messenger events to run new graphs.
*
* @returns {Promise}
* @memberOf TaskScheduler
*/
TaskScheduler.prototype.subscribeRunTaskGraph = function() {
return taskMessenger.subscribeRunTaskGraph(this.domain,
this.runTaskGraphCallback.bind(this));
};
/**
* Emit a new graph evaluation event to trigger TaskScheduler.prototype.findReadyTasks.
*
* @param {Object} data
* @memberOf TaskScheduler
*/
TaskScheduler.prototype.runTaskGraphCallback = function(data) {
assert.object(data);
assert.uuid(data.graphId);
this.evaluateGraphStream.onNext(data);
};
/**
* Start the task scheduler and its observable pipeline, as well as the expired lease poller.
* Subscribe to messenger events.
*
* @returns {Promise}
* @memberOf TaskScheduler
*/
TaskScheduler.prototype.start = function() {
var self = this;
return Promise.resolve()
.then(function() {
self.running = true;
self.initializePipeline();
self.leasePoller = LeaseExpirationPoller.create(self, {});
self.leasePoller.start();
return [
self.subscribeRunTaskGraph(),
self.subscribeTaskFinished(),
self.subscribeCancelGraph()
];
})
.spread(function(
runTaskGraphSubscription, taskFinishedSubscription, cancelGraphSubscription
) {
self.subscriptions.push(runTaskGraphSubscription);
self.subscriptions.push(taskFinishedSubscription);
self.subscriptions.push(cancelGraphSubscription);
logger.info('Task scheduler started', {
schedulerId: self.schedulerId,
domain: self.domain
});
})
.then(function() {
/*
* Start the gRPC endpoint
*/
var urlObject = url.parse(
_.get(configuration.get('taskgraphConfig'), 'url',
'scheduler://127.0.0.1:31001'));
self.gRPC = new SchedulerServer({
hostname: urlObject.hostname,
port: urlObject.port
});
return self.gRPC.start();
})
.then(function() {
if (consul) {
return consul.agent.service.register({
name: 'taskgraph',
id: self.schedulerId,
tags: [ 'scheduler' ],
address: self.gRPC.options.hostname,
port: self.gRPC.options.port
});
}
});
};
/**
* Clean up any messenger subscriptions. Stop polling for expired leases.
*
* @param {Object} data
* @returns {Promise}
* @memberOf TaskScheduler
*/
TaskScheduler.prototype.stop = function() {
var self = this;
self.running = false;
if (self.leasePoller) {
self.leasePoller.stop();
}
return Promise.try(function() {
if (consul) {
return consul.agent.service.deregister({id: self.schedulerId});
}
}).then(function() {
return self.gRPC.stop();
}).then(function() {
return Promise.map(self.subscriptions, function() {
return self.subscriptions.pop().dispose();
});
});
};
/**
* Publish a graph finished event over AMQP and via Web Hooks.
*
* @param {Object} graph
* @returns {Promise}
* @memberOf TaskScheduler
*/
TaskScheduler.prototype._publishGraphFinished = function(graph) {
return eventsProtocol.publishGraphFinished(graph.instanceId, {
graphId: graph.instanceId,
graphName: graph.name,
status: graph._status
}, graph.node)
.catch(function(error) {
logger.error('Error publishing graph finished event', {
graphId: graph.instanceId,
_status: graph._status,
error: error
});
});
};
/**
* @param {Object} options
* @returns {Object} TaskScheduler instance
* @memberOf TaskScheduler
*/
TaskScheduler.create = function(options) {
return new TaskScheduler(options);
};
return TaskScheduler;
}