lib/workflow/stores/mongo.js
// Copyright © 2015-2017 Dell Inc. or its subsidiaries. All Rights Reserved.
'use strict';
module.exports = mongoStoreFactory;
mongoStoreFactory.$provide = 'TaskGraph.Stores.Mongo';
mongoStoreFactory.$inject = [
'Services.Waterline',
'Promise',
'Constants',
'Errors',
'Assert',
'_'
];
function mongoStoreFactory(waterline, Promise, Constants, Errors, assert, _) {
var exports = {};
exports.publishRecordByGraphId = function (graphId, event) {
return waterline.graphobjects.needOneMongo(
{ instanceId: graphId },
{ _status: 1, error: 1}
).then(function (graphData) {
exports.publishGraphRecord(graphData, event, graphId);
});
};
exports.publishGraphRecord = function(graph, event, id) {
return waterline.graphobjects.publishRecord(event || 'updated', graph, id);
};
// NOTE: This is meant to be idempotent, and just drop the update silently
// if the graph has already been marked as done elsewhere and the query returns
// empty.
/**
* Atomically sets the graph document in the graphobjects collection given by data.graphId
* to the given state
* @param {String} state - the finished state to set the graph to
* @param {Object} data
* @param {String} data.graphId - the graphId of the graph to be set to done
* @memberOf store
* @returns {Promise} - a promise for the graph after its state has been set
*/
exports.setGraphDone = function(state, data) {
assert.string(state, 'state');
assert.object(data, 'data');
assert.uuid(data.graphId, 'data.graphId');
var query = {
instanceId: data.graphId,
_status: {$in: Constants.Task.ActiveStates}
};
var update = {
$set: {
_status: state
}
};
var options = {
new: true
};
return waterline.graphobjects.findAndModifyMongo(query, {}, update, options)
.tap(function (graph) {
exports.publishGraphRecord(graph);
});
};
/**
* Sets the state of a reachable, matching task in the taskdependencies collection
* and updates the task's context.
* @param {Object} task - a task object
* @param {String} task.graphId - the unique ID of the graph to which the task belongs
* @param {Object} task.context - the task context object
* @param {String} task.state - the state with which to update the task document
* in the database
* @memberOf store
* @returns {Promise}
*/
exports.setTaskState = function(task) {
assert.uuid(task.taskId, 'taskId');
assert.uuid(task.graphId, 'task.graphId');
assert.string(task.state, 'task.state');
assert.optionalObject(task.context, 'task.context');
// TODO: including graphId with the intent that we'll create an
// index against it in the database
if(task.state !== Constants.Task.States.Succeeded){
task.context = null;
}
var query = {
graphId: task.graphId,
taskId: task.taskId,
reachable: true
};
var update = {
$set: {
state: task.state,
context: task.context
}
};
var options = {
multi: true
};
return waterline.taskdependencies.updateMongo(query, update, options)
.tap(function () {
exports.publishRecordByGraphId(task.graphId);
});
};
/**
* Atomically sets the state of a task in the graphobjects collection
* @param {Object} data
* @param {String} data.taskId - the unique ID of the task
* @param {String} data.graphId - the unique ID of the graph to which the task belongs
* @param {String} task.state - the state with which to update the task subdocument
* @memberOf store
* @returns {Promise} - a promise for the graph document containing the task
*/
exports.setTaskStateInGraph = function(data) {
assert.uuid(data.taskId, 'data.taskId');
assert.uuid(data.graphId, 'data.graphId');
assert.string(data.state, 'data.state');
// TODO: including graphId with the intent that we'll create an
// index against it in the database
var query = {
instanceId: data.graphId
};
var update = {
$set: {}
};
var timestamp = new Date();
var timeKey = ['tasks', data.taskId, 'taskEndTime'].join('.');
update.$set[timeKey] = timestamp;
var stateKey = ['tasks', data.taskId, 'state'].join('.');
update.$set[stateKey] = data.state;
if (data.error) {
var errorKey = ['tasks', data.taskId, 'error'].join('.');
update.$set[errorKey] = data.error;
}
_.forEach(data.context, function(val, key) {
update.$set[['context', key].join('.')] = val;
});
var options = {
new: true
};
return waterline.graphobjects.findAndModifyMongo(query, {}, update, options)
.tap(function (graph) {
exports.publishGraphRecord(graph);
});
};
/**
* Get the definition of a task from the taskdefinitions collection
* @param {String} injectableName - the injectable name for the desired task
* @returns {Promise} - a promise for the definition for the desired task
* @memberOf store
*/
exports.getTaskDefinition = function(injectableName) {
return waterline.taskdefinitions.findOne({ injectableName: injectableName })
.then(function(taskDefinition) {
if (_.isEmpty(taskDefinition)) {
throw new Errors.NotFoundError(
'Could not find task definition with injectableName %s'
.format(injectableName));
}
return taskDefinition.toJSON();
});
};
/**
* Persists a graph definition ot the graphdefinitions collection
* @param {Object} definition - the graph definition to persist
* @returns {Promise} a promise for the persisted graph definition object
* @memberOf store
*/
exports.persistGraphDefinition = function(definition) {
assert.object(definition, 'definition');
assert.string(definition.injectableName, 'definition.injectableName');
var query = {
injectableName: definition.injectableName
};
var options = {
new: true,
upsert: true
};
return waterline.graphdefinitions.findAndModifyMongo(query, {}, definition, options);
};
/**
* Deletes a graph definition of the graphdefinitions collection
* @param {String} injectableName - name of Graph to delete
* @returns {Promise} a promise
* @memberOf store
*/
exports.destroyGraphDefinition = function(injectableName) {
assert.string(injectableName, 'injectableName');
var query = {
injectableName: injectableName
};
return waterline.graphdefinitions.destroy(query);
};
/**
* Persists a task definition ot the taskdefinitions collection
* @param {Object} definition - the task definition to persist
* @returns {Promise} a promise for the persisted task definition object
* @memberOf store
*/
exports.persistTaskDefinition = function(definition) {
assert.object(definition, 'definition');
assert.string(definition.injectableName, 'definition.injectableName');
var query = {
injectableName: definition.injectableName
};
var options = {
new: true,
upsert: true
};
return waterline.taskdefinitions.findAndModifyMongo(query, {}, definition, options);
};
/**
* Gets one or all graph definitions from the graphdefinitions collection
* @param {String=} injectableName - an optional injectableName for the desired
* graph definition
* @returns {Promise} a promise for the matching graph definition or
* all graph definitions if no injectableName was given
* @memberOf store
*/
exports.getGraphDefinitions = function(injectableName) {
var query = {};
if (injectableName) {
query.injectableName = injectableName;
}
return waterline.graphdefinitions.find(query)
.then(function(graphs) {
return _.map(graphs, function(graph) {
return graph.toJSON();
});
});
};
/**
* Gets one or all task definitions from the taskdefinitions collection
* @param {String=} injectableName - an optional injectableName for the desired
* task definition
* @returns {Promise} a promise for the matching task definition or
* all task definitions if no injectableName was given
* @memberOf store
*/
exports.getTaskDefinitions = function(injectableName) {
var query = {};
if (injectableName) {
query.injectableName = injectableName;
}
return waterline.taskdefinitions.find(query);
};
/**
* Persists a graph to the graphobjects collection
* @param {Object} graph - the graph object to persist
* @param {String} graph.instanceId - the unique ID for the graph instance
* @returns {Promise} a promise for the persisted graph object
* @memberOf store
*/
exports.persistGraphObject = function(graph) {
assert.object(graph, 'graph');
assert.uuid(graph.instanceId, 'graph.instanceId');
if (graph.node) {
graph.node = waterline.taskdependencies.mongo.objectId(graph.node);
}
var query = {
instanceId: graph.instanceId
};
var options = {
new: true,
upsert: true,
fields: {
_id: 0,
instanceId: 1
}
};
return waterline.graphobjects.findAndModifyMongo(query, {}, graph, options);
};
/**
* Persists a task object and its dependencies to the taskdependencies collection
* @param {Object} taskDependencyItem - the task object
* @param {String} taskDependencyItem.taskId - the unique ID for the task
* @param {Object} taskDependencyItem.dependencies - the list of dependencies for the task
* @param {String[]} taskDependencyItem.terminalOnStates - the list of states for
* which this task can be the last task in its graph
* @param {String} graphId - the unique ID of the graph to which the task belongs
* @returns {Promise} a promise for the created taskdependency object
* @memberOf store
*/
exports.persistTaskDependencies = function(taskDependencyItem, graphId) {
assert.object(taskDependencyItem, 'taskDependencyItem');
assert.uuid(taskDependencyItem.taskId, 'taskDependencyItem.taskId');
assert.uuid(graphId, 'graphId');
assert.object(taskDependencyItem.dependencies, 'taskDependencyItem.dependencies');
assert.arrayOfString(
taskDependencyItem.terminalOnStates, 'taskDependencyItem.terminalOnStates');
var obj = {
taskId: taskDependencyItem.taskId,
graphId: graphId,
state: Constants.Task.States.Pending,
dependencies: taskDependencyItem.dependencies,
terminalOnStates: taskDependencyItem.terminalOnStates,
ignoreFailure: taskDependencyItem.ignoreFailure
};
return waterline.taskdependencies.create(obj);
};
/**
* Gets a task subdocument from the graphobjects collection
* @param {Object} data
* @param {String} data.graphId - the unique ID of the graph to which the task belongs
* @param {String} data.taskId - the unique ID of the desired task subdocument
* @returns {Promise} a promise for an object containing the graphId, requested task
* and the associated graph context
* @memberOf store
*/
exports.getTaskById = function(data) {
assert.object(data, 'data');
assert.uuid(data.graphId, 'data.graphId');
assert.uuid(data.taskId, 'data.taskId');
var query = {
instanceId: data.graphId
};
var options = {
fields: {
_id: 0,
instanceId: 1,
context: 1,
tasks: {}
}
};
options.fields.tasks[data.taskId] = 1;
return waterline.graphobjects.findOne(query, options)
.then(function(graph) {
if (_.isEmpty(graph)) {
return undefined;
} else {
return {
graphId: graph.instanceId,
context: graph.context,
task: graph.tasks[data.taskId]
};
}
});
};
/**
* Gets graphs with the attribute 'serviceGraph' marked true from the
* graphobjects collection
* @returns {Promise} a promise for the marked serivice graphs
* @memberOf store
*/
exports.getServiceGraphs = function() {
var query = {
serviceGraph: true
};
return waterline.graphobjects.find(query)
.then(function(graphs) {
return _.map(graphs, function(graph) {
return graph.toJSON();
});
});
};
/**
* Updates the lease/heartbeat for all tasks in the taskdependencies collection
* with the given lease
* @param {String} leaseId - the taskRunner ID to match against the
* taskRunnerLease document field when updating heartbeats
* @returns {Promise} a promise containing the number of updated leases
* @memberOf store
*/
exports.heartbeatTasksForRunner = function(leaseId) {
assert.uuid(leaseId, 'leaseId');
var query = {
taskRunnerLease: leaseId,
reachable: true,
state: Constants.Task.States.Pending
};
var update = {
$set: {
taskRunnerLease: leaseId,
taskRunnerHeartbeat: new Date()
}
};
var options = {
multi: true
};
return waterline.taskdependencies.updateMongo(query, update, options);
};
/**
* Gets all tasks that match the given leaseId
* @param {String} leaseId - the leaseId to match against the taskRunnerLease document field
* @returns {Promise} a promise for the matching tasks from the taskdependencies collection
* @memberOf store
*/
exports.getOwnTasks = function(leaseId) {
assert.uuid(leaseId, 'leaseId');
var query = {
where: {
taskRunnerLease: leaseId,
reachable: true,
state: Constants.Task.States.Pending
}
};
return waterline.taskdependencies.find(query);
};
/**
* Gets the active graph associated with a nodeId
* @param {String} target - the node Id for which to return active graphs
* @returns {Promise} a promise for a graph object
* @memberOf store
*/
exports.findActiveGraphForTarget = function(target) {
if (!target) {
return Promise.resolve(null);
}
assert.string(target, 'target');
var query = {
"context.target": target,
_status: {$in: Constants.Task.ActiveStates}
};
return waterline.graphobjects.findOneMongo(query);
};
/**
* Gets all the active graphs within a given domain
* @param {String} domain - the domain to get all active graphs for
* @returns {Promise} a promise for all active graphs in the given domain
* @memberOf store
*/
exports.findActiveGraphs = function(domain) {
assert.string(domain, 'domain');
var query = {
domain: domain,
_status: {$in: Constants.Task.ActiveStates}
};
return waterline.graphobjects.find(query);
};
/**
* Gets all tasks for a given domain that are finished but unevaluated from
* the taskdependencies collection
* @param {String} domain - the domain to get tasks from
* @param {Number=} limit - an option limit on the number of tasks to return
* @returns {Promise} a promise for the matching task objects
* @memberOf store
*/
exports.findUnevaluatedTasks = function(domain, limit) {
assert.string(domain, 'domain');
if (limit) {
assert.number(limit, 'limit');
}
var query = {
domain: domain,
evaluated: false,
reachable: true,
state: {
$in: Constants.Task.FinishedStates
}
};
var promise = waterline.taskdependencies.find(query);
if (limit) {
promise.limit(limit);
}
return promise.then(function(tasks) {
return _.map(tasks, function(task) {
return task.toJSON();
});
});
};
/**
* Gets all tasks for a given domain and graph that are ready to run from the
* taskdependencies collection
* @param {String} domain - the domain to get tasks from
* @param {String=} graphId - the unique ID for the graph to fetch ready tasks from
* @returns {Promise} a promise for the ready tasks
* @memberOf store
*/
exports.findReadyTasks = function(domain, graphId) {
assert.string(domain, 'domain');
if (graphId) {
assert.uuid(graphId, 'graphId');
}
var query = {
taskRunnerLease: null,
domain: domain,
dependencies: {},
reachable: true,
state: Constants.Task.States.Pending
};
if (graphId) {
query.graphId = graphId;
}
return waterline.taskdependencies.find(query)
.then(function(tasks) {
return {
tasks: _.map(tasks, function(task) { return task.toJSON(); }),
graphId: graphId || null
};
});
};
/**
* Atomically check out a taskdependencies task document by marking it's lease with
* the given taskRunner ID and setting the taskRunnerHeartbeat field to 'now'
* @param {String} taskRunnerId - the unique ID of the taskRunner for which the
* task is being checked out
* @param {Object} data
* @param {String} data.graphId - the unique ID of the graph to which the task belongs
* @param {String} data.taskId - the unique ID of the task to be checked out
* @returns {Promise} a promise for the checked out task
* @memberOf store
*/
exports.checkoutTask = function(taskRunnerId, data) {
assert.object(data, 'data');
assert.uuid(data.graphId, 'data.graphId');
assert.uuid(data.taskId, 'data.taskId');
var query = {
graphId: data.graphId,
taskId: data.taskId,
taskRunnerLease: null,
dependencies: {},
reachable: true
};
var update = {
$set: {
taskRunnerLease: taskRunnerId,
taskRunnerHeartbeat: new Date()
}
};
var options = {
new: true
};
var timestamp = new Date();
var queryGraph = {
instanceId: data.graphId
};
var updateTaskGraph = {
$set: {}
};
var timeKey = ['tasks', data.taskId, 'taskStartTime'].join('.');
updateTaskGraph.$set[timeKey] = timestamp;
return waterline.graphobjects.findAndModifyMongo(
queryGraph, {}, updateTaskGraph, options)
.tap(function(graph){
exports.publishGraphRecord(graph);
})
.then(function(){
return waterline.taskdependencies.findAndModifyMongo(query, {}, update, options);
});
};
/**
* Checks whether there are any pending, reachable, or failed tasks corresponding to
* the given graph ID.
* @param {Object} data
* @param {String} data.graphId - The uniqe ID of the graph to be checked
* @retuns {Promise} a promise for an object with a boolean 'done' field
* @memberOf store
*/
exports.checkGraphSucceeded = function(data) {
assert.object(data, 'data');
assert.uuid(data.graphId, 'data.graphId');
var query = {
graphId: data.graphId,
state: {
$ne: Constants.Task.States.Succeeded
},
ignoreFailure: {
$ne: true
},
reachable: true
};
return waterline.taskdependencies.find(query)
.then(function(results) {
data.done = _.every(results, function(task) {
return !_.contains(task.terminalOnStates, task.state);
});
return data;
});
};
/**
* Updates the tasks dependant on the given task ID to reflect its new, finished
* state in the taskdependencies collection
* @param {Object} data - the task data object
* @param {String} data.taskId - the unique ID of the task whose dependencies
* should be updated
* @param {String} data.graphId - the unique ID of the graph to which the task
* belongs
* @returns {Promise} a promise for the number of updated task documents
* @memberOf store
*/
exports.updateDependentTasks = function(data) {
assert.object(data, 'data');
assert.uuid(data.graphId, 'data.graphId');
assert.uuid(data.taskId, 'data.taskId');
assert.string(data.state, 'data.state');
var queryDependency = {
graphId: data.graphId,
reachable: true
};
queryDependency['dependencies.' + data.taskId] = {
$in: [data.state, Constants.Task.States.Finished]
};
var updateDependency = {
$unset: {}
};
updateDependency.$unset['dependencies.' + data.taskId] = '';
var queryAnyOf = {
graphId: data.graphId,
reachable: true
};
queryAnyOf['dependencies.anyOf.' + data.taskId] = {
$in: [data.state, Constants.Task.States.Finished]
};
var updateAnyOf = {
$unset: {}
};
updateAnyOf.$unset["dependencies.anyOf"] = "";
var options = {
multi: true
};
return Promise.all([
waterline.taskdependencies.updateMongo(queryDependency, updateDependency, options),
waterline.taskdependencies.updateMongo(queryAnyOf, updateAnyOf, options)
]);
};
/**
* Updates tasks which will no longer be reachable as a result of the given
* task's state so that they are marked as unreachable in the taskdependencies
* collection
* @param {Object} data - the task data object
* @param {String} data.taskId - the unique ID of the task whose dependencies
* should be updated
* @param {String} data.graphId - the unique ID of the graph to which the task
* belongs
* @returns {Promise} a promise for the number of updated task documents
* @memberOf store
*/
exports.updateUnreachableTasks = function(data) {
assert.object(data, 'data');
assert.uuid(data.graphId, 'data.graphId');
assert.uuid(data.taskId, 'data.taskId');
assert.string(data.state, 'data.state');
var query = {
graphId: data.graphId,
reachable: true,
$or: [{},{}]
};
var options = {
multi: true
};
query.$or[0]['dependencies.'+data.taskId] = {
$in: _.difference(Constants.Task.FinishedStates, [data.state])
};
query.$or[1]['dependencies.anyOf'+data.taskId] = {
$in: _.difference(Constants.Task.FinishedStates, [data.state])
};
return waterline.taskdependencies.find(query).then(function(res){
var updateRemove = {
$unset:{}
};
updateRemove.$unset['dependencies.anyOf.'+data.taskId] = "";
var queryRemove = {
graphId: data.graphId,
reachable: true,
// List of task Ids that need to remove dependent.anyOf.<data.taskId>
$or:[]
};
// Mongo update, mark the downstream task unreachable
var setUnreachable = {
$set: {
reachable: false
}
};
var querySetUnreachable = {
graphId: data.graphId,
reachable: true,
// List of task Ids that need to mark as unrechable
$or: []
};
_.forEach(res, function(itrTask){
var taskIns = {
taskId: ""
};
if (! _.isEmpty(itrTask.dependencies[data.taskId])) {
taskIns.taskId = itrTask.taskId;
querySetUnreachable.$or.push(taskIns);
}
else {
taskIns.taskId = itrTask.taskId;
var anyDepSize = Object.keys(itrTask.dependencies.anyOf).length;
if (anyDepSize > 1) {
queryRemove.$or.push(taskIns);
}
else {
querySetUnreachable.$or.push(taskIns);
}
}
});
var retPromise = [];
if (queryRemove.$or.length > 0) {
retPromise.push(waterline.taskdependencies
.updateMongo(queryRemove, updateRemove, options));
}
if (querySetUnreachable.$or.length > 0) {
retPromise.push(waterline.taskdependencies
.updateMongo(querySetUnreachable, setUnreachable, options));
}
return Promise.all(retPromise);
});
};
/**
* Marks the given task document's evaluated field to true in the taskdependencies
* collection
* @param {Object} data
* @param {String} data.taskId - the unique ID for the task which should be marked
* evaluated
* @param {String} data.graphId - the unique ID of the graph to which the task
* belongs
* @returns {Promise} a promise for the new, updated, task document
* @memberOf store
*/
exports.markTaskEvaluated = function(data) {
assert.object(data, 'data');
assert.uuid(data.graphId, 'data.graphId');
assert.uuid(data.taskId, 'data.taskId');
var query = {
graphId: data.graphId,
taskId: data.taskId,
reachable: true
};
var update = {
$set: {
evaluated: true
}
};
var options = {
new: true
};
return waterline.taskdependencies.findAndModifyMongo(query, {}, update, options);
};
/**
* Finds all reachable taskdependencies documents whose leases are more than the given
* leaseAdjust milliseconds old
* @param {String} domain - the domain to restrict the search to
* @param {Number} leaseAdjust - the time after which to consier a lease expired in milliseconds
* @returns {Promise} a promise for all taskdependencies documents whose leases are
* expired according to the given leaseAdjst
* @memberOf store
*/
exports.findExpiredLeases = function(domain, leaseAdjust) {
assert.string(domain, 'domain');
assert.number(leaseAdjust, 'leaseAdjust');
var query = {
domain: domain,
reachable: true,
taskRunnerLease: { $ne: null },
taskRunnerHeartbeat: {
$lt: new Date(Date.now() - leaseAdjust)
},
state: Constants.Task.States.Pending
};
return waterline.taskdependencies.find(query);
};
/**
* Expires the lease on a taskdependencies object by setting taskRunnerLease and
* taskRunnerHeartbeat fields to null
* @param {String} objId - the mongo ID for a taskdependencies document
* @returns {Promise} a promise for the taskdependencies document with expried lease
* @memberOf store
*/
exports.expireLease = function(objId) {
assert.string(objId, 'objId');
var query = {
_id: waterline.taskdependencies.mongo.objectId(objId)
};
var update = {
$set: {
taskRunnerLease: null,
taskRunnerHeartbeat: null
}
};
var options = {
new: false
};
return waterline.taskdependencies.findAndModifyMongo(query, {}, update, options);
};
/**
* Find all taskdependencies documents that are unreachable or in a finished state
* @param {Number=} limit - an optional limit to the number of documents returned
* @returns {Promise} a promise for all complete or unreachable tasks
* @memberOf store
*/
exports.findCompletedTasks = function(limit) {
var query = {
$or: [
{
evaluated: true,
state: {
$in: Constants.Task.FinishedStates
}
},
{ reachable: false }
]
};
var options = {};
// Limit may be undefined or null, so do a non-strict equality check
if (limit != null) { /* jshint ignore:line */
options.limit = limit;
}
return waterline.taskdependencies.findMongo(query, options);
};
/**
* Deletes the taskdependencies documents by given mongo IDs
* @param {String[]} an array of mongo objct IDs for the documents to be deleted
* @returns {Promise}
* @memberOf store
*/
exports.deleteTasks = function(objectIds) {
var ids = _.map(objectIds, waterline.taskdependencies.mongo.objectId);
var query = {
_id: {
$in: ids
}
};
return waterline.taskdependencies.removeMongo(query);
};
/**
* Deletes a given graph by graphId from the graphobjects collection
* @param {String} graphId - the unique ID for the graph to be deleted
* @returns {Promise}
* @memberOf store
*/
exports.deleteGraph = function(graphId) {
var query = {
instanceId: graphId
};
return waterline.graphobjects.destroy(query);
};
/**
* Deletes a given task by taskName from the taskobjects collection
* @param {String} injectableName - the injectable name for the desired task to be deleted
* @returns {Promise}
* @memberOf store
*/
exports.deleteTaskByName = function(injectableName) {
assert.string(injectableName, 'injectableName');
var query = {
injectableName: injectableName
};
return waterline.taskdefinitions.destroy(query);
};
/**
* Finds one graph document with state 'Pending' or 'Running' by graphId
* @param {String} graphId - the unique ID of the desired active graph
* @returns {Promise} a promise for the graph with the given graphId
* @memberOf store
*/
exports.getActiveGraphById = function(graphId) {
assert.uuid(graphId);
var query = {
instanceId: graphId,
_status: {$in: Constants.Task.ActiveStates}
};
return waterline.graphobjects.findOne(query);
};
/**
* Finds one graph document by graphId
* @param {String} graphId - the unique ID of the desired graph
* @returns {Promise} a promise for the graph with the given graphId
* @memberOf store
*/
exports.getGraphById = function(graphId) {
assert.uuid(graphId);
var query = {
instanceId: graphId
};
return waterline.graphobjects.findOne(query);
};
exports.checkoutTimer = function(leaseToken, criteria, leaseDuration) {
var now = new Date();
var query = {
$and: [
criteria,
{ paused: false },
{
leaseToken: null,
$or: [
{ nextScheduled: { $lte: now } },
{ nextScheduled: null }
]
}
]
};
var options = {
sort: {nextScheduled: 1},
new: true
};
var update = {
$set: {
lastStarted: now,
leaseToken: leaseToken,
leaseExpires: new Date(now.valueOf() + leaseDuration)
}
};
return waterline.workitems.findAndModifyMongo(query, {}, update, options);
};
exports.updatePollerStatus = function(id, obj) {
var query = {
_id: waterline.workitems.mongo.objectId(id)
};
var update = {
$set: {
nextScheduled: obj.nextScheduled,
lastFinished: obj.lastFinished,
leaseToken: null,
leaseExpires: null,
state: obj.state
}
};
var options = {
new: true
};
if( obj.status !== Constants.Task.States.Succeeded ) {
update.$inc = {failureCount: 1};
} else {
update.$set.failureCount = 0;
}
return waterline.workitems.findAndModifyMongo(query, {}, update, options);
};
exports.getPollers = function(criteria) {
return waterline.workitems.find({
where: {
$and: [
criteria || {},
{
$or: _(Constants.WorkItems.Pollers).map(function (pollerName) {
if( _.isString(pollerName)) {
return { name: pollerName };
}
}).compact().value()
}
]
}
});
};
/**
* Finds the child graph of a runGraphJob by the job's task id/the child
* graph's parentTaskId
* @param {String} parentTaskId - The task id of a runGraphTask to match
* against a graph's context._parent.taskId field
* @returns {Promise} a promise for the graph with the given context._parent.taskId
* @memberOf store
*/
exports.findChildGraph = function(parentTaskId) {
assert.uuid(parentTaskId, "Parent task ID should be a UUID");
var query = {
parentTaskId: parentTaskId
};
return waterline.graphobjects.findOneMongo(query);
};
return exports;
}