docs/agenda/1.0.3/utils_process-jobs.js.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>
utils/process-jobs.js - Documentation
</title>
<link href="https://www.braintreepayments.com/images/favicon-ccda0b14.png" rel="icon" type="image/png">
<script src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/10.7.2/highlight.min.js"></script>
<script>hljs.initHighlightingOnLoad();</script>
<script src="https://ajax.googleapis.com/ajax/libs/jquery/3.1.0/jquery.min.js"></script>
<link type="text/css" rel="stylesheet" href="styles/prettify-tomorrow.css">
<link type="text/css" rel="stylesheet" href="styles/jsdoc-default.css">
<!-- start Mixpanel -->
<script type="text/javascript">(function(e,a){if(!a.__SV){var b=window;try{var c,l,i,j=b.location,g=j.hash;c=function(a,b){return(l=a.match(RegExp(b+"=([^&]*)")))?l[1]:null};g&&c(g,"state")&&(i=JSON.parse(decodeURIComponent(c(g,"state"))),"mpeditor"===i.action&&(b.sessionStorage.setItem("_mpcehash",g),history.replaceState(i.desiredHash||"",e.title,j.pathname+j.search)))}catch(m){}var k,h;window.mixpanel=a;a._i=[];a.init=function(b,c,f){function e(b,a){var c=a.split(".");2==c.length&&(b=b[c[0]],a=c[1]);b[a]=function(){b.push([a].concat(Array.prototype.slice.call(arguments,
0)))}}var d=a;"undefined"!==typeof f?d=a[f]=[]:f="mixpanel";d.people=d.people||[];d.toString=function(b){var a="mixpanel";"mixpanel"!==f&&(a+="."+f);b||(a+=" (stub)");return a};d.people.toString=function(){return d.toString(1)+".people (stub)"};k="disable time_event track track_pageview track_links track_forms register register_once alias unregister identify name_tag set_config reset people.set people.set_once people.increment people.append people.union people.track_charge people.clear_charges people.delete_user".split(" ");
for(h=0;h<k.length;h++)e(d,k[h]);a._i.push([b,c,f])};a.__SV=1.2;b=e.createElement("script");b.type="text/javascript";b.async=!0;b.src="undefined"!==typeof MIXPANEL_CUSTOM_LIB_URL?MIXPANEL_CUSTOM_LIB_URL:"file:"===e.location.protocol&&"//cdn.mxpnl.com/libs/mixpanel-2-latest.min.js".match(/^\/\//)?"https://cdn.mxpnl.com/libs/mixpanel-2-latest.min.js":"//cdn.mxpnl.com/libs/mixpanel-2-latest.min.js";c=e.getElementsByTagName("script")[0];c.parentNode.insertBefore(b,c)}})(document,window.mixpanel||[]);
mixpanel.init("1919205b2da72e4da3b9b6639b444d59");</script>
<!-- end Mixpanel -->
</head>
<body>
<svg style="display: none;">
<defs>
<symbol id="linkIcon" fill="#706d77" height="24" viewBox="0 0 24 24" width="24" xmlns="http://www.w3.org/2000/svg">
<path d="M0 0h24v24H0z" fill="none"/>
<path d="M3.9 12c0-1.71 1.39-3.1 3.1-3.1h4V7H7c-2.76 0-5 2.24-5 5s2.24 5 5 5h4v-1.9H7c-1.71 0-3.1-1.39-3.1-3.1zM8 13h8v-2H8v2zm9-6h-4v1.9h4c1.71 0 3.1 1.39 3.1 3.1s-1.39 3.1-3.1 3.1h-4V17h4c2.76 0 5-2.24 5-5s-2.24-5-5-5z"/>
</symbol>
</defs>
</svg>
<input type="checkbox" id="nav-trigger" class="nav-trigger" />
<label for="nav-trigger" class="navicon-button x">
<div class="navicon"></div>
</label>
<label for="nav-trigger" class="overlay"></label>
<div class="top-nav-wrapper">
<ul>
<li >
<a href="index.html">
<svg fill="#6D6D6D" height="24" viewBox="0 0 24 24" width="24" xmlns="http://www.w3.org/2000/svg">
<path d="M10 20v-6h4v6h5v-8h3L12 3 2 12h3v8z"/>
<path d="M0 0h24v24H0z" fill="none"/>
</svg>
</a>
</li>
</ul>
</div>
<nav>
<h3 class="reference-title">
Agenda
</h3>
<h3>Classes</h3><ul><li id="Agenda-nav"><a href="Agenda.html">Agenda</a><ul class='methods'><li data-type="method" id="Agenda-cancel-nav"><a href="Agenda.html#cancel">cancel</a></li><li data-type="method" id="Agenda-create-nav"><a href="Agenda.html#create">create</a></li><li data-type="method" id="Agenda-database-nav"><a href="Agenda.html#database">database</a></li><li data-type="method" id="Agenda-dbInit-nav"><a href="Agenda.html#dbInit">dbInit</a></li><li data-type="method" id="Agenda-defaultConcurrency-nav"><a href="Agenda.html#defaultConcurrency">defaultConcurrency</a></li><li data-type="method" id="Agenda-defaultLockLifetime-nav"><a href="Agenda.html#defaultLockLifetime">defaultLockLifetime</a></li><li data-type="method" id="Agenda-defaultLockLimit-nav"><a href="Agenda.html#defaultLockLimit">defaultLockLimit</a></li><li data-type="method" id="Agenda-define-nav"><a href="Agenda.html#define">define</a></li><li data-type="method" id="Agenda-every-nav"><a href="Agenda.html#every">every</a></li><li data-type="method" id="Agenda-findAndLockNextJob-nav"><a href="Agenda.html#findAndLockNextJob">findAndLockNextJob</a></li><li data-type="method" id="Agenda-jobs-nav"><a href="Agenda.html#jobs">jobs</a></li><li data-type="method" id="Agenda-locklimit-nav"><a href="Agenda.html#locklimit">locklimit</a></li><li data-type="method" id="Agenda-maxConcurrency-nav"><a href="Agenda.html#maxConcurrency">maxConcurrency</a></li><li data-type="method" id="Agenda-mongo-nav"><a href="Agenda.html#mongo">mongo</a></li><li data-type="method" id="Agenda-name-nav"><a href="Agenda.html#name">name</a></li><li data-type="method" id="Agenda-now-nav"><a href="Agenda.html#now">now</a></li><li data-type="method" id="Agenda-processEvery-nav"><a href="Agenda.html#processEvery">processEvery</a></li><li data-type="method" id="Agenda-purge-nav"><a href="Agenda.html#purge">purge</a></li><li data-type="method" id="Agenda-saveJob-nav"><a href="Agenda.html#saveJob">saveJob</a></li><li data-type="method" id="Agenda-schedule-nav"><a href="Agenda.html#schedule">schedule</a></li><li data-type="method" id="Agenda-sort-nav"><a href="Agenda.html#sort">sort</a></li><li data-type="method" id="Agenda-start-nav"><a href="Agenda.html#start">start</a></li><li data-type="method" id="Agenda-stop-nav"><a href="Agenda.html#stop">stop</a></li></ul></li><li id="Job-nav"><a href="Job.html">Job</a><ul class='methods'><li data-type="method" id="Job-computeNextRunAt-nav"><a href="Job.html#computeNextRunAt">computeNextRunAt</a></li><li data-type="method" id="Job-disable-nav"><a href="Job.html#disable">disable</a></li><li data-type="method" id="Job-enable-nav"><a href="Job.html#enable">enable</a></li><li data-type="method" id="Job-fail-nav"><a href="Job.html#fail">fail</a></li><li data-type="method" id="Job-isRunning-nav"><a href="Job.html#isRunning">isRunning</a></li><li data-type="method" id="Job-priority-nav"><a href="Job.html#priority">priority</a></li><li data-type="method" id="Job-remove-nav"><a href="Job.html#remove">remove</a></li><li data-type="method" id="Job-repeatAt-nav"><a href="Job.html#repeatAt">repeatAt</a></li><li data-type="method" id="Job-repeatEvery-nav"><a href="Job.html#repeatEvery">repeatEvery</a></li><li data-type="method" id="Job-run-nav"><a href="Job.html#run">run</a></li><li data-type="method" id="Job-schedule-nav"><a href="Job.html#schedule">schedule</a></li><li data-type="method" id="Job-toJSON-nav"><a href="Job.html#toJSON">toJSON</a></li><li data-type="method" id="Job-touch-nav"><a href="Job.html#touch">touch</a></li><li data-type="method" id="Job-unique-nav"><a href="Job.html#unique">unique</a></li></ul></li></ul><h3 id="global-nav">Global</h3><ul><li><a href="global.html#parsePriority">parsePriority</a></li></ul>
</nav>
<div id="main">
<h1 class="page-title">
utils/process-jobs.js
</h1>
<section>
<article>
<pre class="prettyprint source linenums"><code>'use strict';
// @TODO: What should we use for internal util functions?
// Maybe we should use agenda:util:processJobs which would move agenda:* to agenda:agenda;*
const debug = require('debug')('agenda:internal:processJobs');
const createJob = require('./create-job');
/**
* Process methods for jobs
* @param {Job} extraJob job to run immediately
* @returns {undefined}
*/
module.exports = function(extraJob) {
debug('starting to process jobs');
// Make sure an interval has actually been set
// Prevents race condition with 'Agenda.stop' and already scheduled run
if (!this._processInterval) {
debug('no _processInterval set when calling processJobs, returning');
return;
}
const self = this;
const definitions = this._definitions;
const jobQueue = this._jobQueue;
let jobName;
// Determine whether or not we have a direct process call!
if (!extraJob) {
// Go through each jobName set in 'Agenda.process' and fill the queue with the next jobs
for (jobName in definitions) {
if ({}.hasOwnProperty.call(definitions, jobName)) {
debug('queuing up job to process: [%s]', jobName);
jobQueueFilling(jobName);
}
}
} else if (definitions[extraJob.attrs.name]) {
// Add the job to list of jobs to lock and then lock it immediately!
debug('job [%s] was passed directly to processJobs(), locking and running immediately', extraJob.attrs.name);
self._jobsToLock.push(extraJob);
lockOnTheFly();
}
/**
* Returns true if a job of the specified name can be locked.
* Considers maximum locked jobs at any time if self._lockLimit is > 0
* Considers maximum locked jobs of the specified name at any time if jobDefinition.lockLimit is > 0
* @param {String} name name of job to check if we should lock or not
* @returns {boolean} whether or not you should lock job
*/
function shouldLock(name) {
const jobDefinition = definitions[name];
let shouldLock = true;
if (self._lockLimit && self._lockLimit <= self._lockedJobs.length) {
shouldLock = false;
}
if (jobDefinition.lockLimit && jobDefinition.lockLimit <= jobDefinition.locked) {
shouldLock = false;
}
debug('job [%s] lock status: shouldLock = %s', name, shouldLock);
return shouldLock;
}
/**
* Internal method that adds jobs to be processed to the local queue
* @param {*} jobs Jobs to queue
* @param {boolean} inFront puts the job in front of queue if true
* @returns {undefined}
*/
function enqueueJobs(jobs, inFront) {
if (!Array.isArray(jobs)) {
jobs = [jobs];
}
jobs.forEach(job => {
let jobIndex;
let start;
let loopCondition;
let endCondition;
let inc;
if (inFront) {
start = jobQueue.length ? jobQueue.length - 1 : 0;
inc = -1;
loopCondition = function() {
return jobIndex >= 0;
};
endCondition = function(queuedJob) {
return !queuedJob || queuedJob.attrs.priority < job.attrs.priority;
};
} else {
start = 0;
inc = 1;
loopCondition = function() {
return jobIndex < jobQueue.length;
};
endCondition = function(queuedJob) {
return queuedJob.attrs.priority >= job.attrs.priority;
};
}
for (jobIndex = start; loopCondition(); jobIndex += inc) {
if (endCondition(jobQueue[jobIndex])) {
break;
}
}
// Insert the job to the queue at its prioritized position for processing
jobQueue.splice(jobIndex, 0, job);
});
}
/**
* Internal method that will lock a job and store it on MongoDB
* This method is called when we immediately start to process a job without using the process interval
* We do this because sometimes jobs are scheduled but will be run before the next process time
* @returns {undefined}
*/
function lockOnTheFly() {
// Already running this? Return
if (self._isLockingOnTheFly) {
debug('lockOnTheFly() already running, returning');
return;
}
// Don't have any jobs to run? Return
if (self._jobsToLock.length === 0) {
debug('no jobs to current lock on the fly, returning');
self._isLockingOnTheFly = false;
return;
}
// Set that we are running this
self._isLockingOnTheFly = true;
// Grab a job that needs to be locked
const now = new Date();
const job = self._jobsToLock.pop();
// If locking limits have been hit, stop locking on the fly.
// Jobs that were waiting to be locked will be picked up during a
// future locking interval.
if (!shouldLock(job.attrs.name)) {
debug('lock limit hit for: [%s]', job.attrs.name);
self._jobsToLock = [];
self._isLockingOnTheFly = false;
return;
}
// Query to run against collection to see if we need to lock it
const criteria = {
_id: job.attrs._id,
lockedAt: null,
nextRunAt: job.attrs.nextRunAt,
disabled: {$ne: true}
};
// Update / options for the MongoDB query
const update = {$set: {lockedAt: now}};
const options = {returnDocument: 'after'};
// Lock the job in MongoDB!
self._collection.findOneAndUpdate(criteria, update, options, (err, resp) => {
if (err) {
throw err;
}
// Did the "job" get locked? Create a job object and run
if (resp.value) {
const job = createJob(self, resp.value);
debug('found job [%s] that can be locked on the fly', job.attrs.name);
self._lockedJobs.push(job);
definitions[job.attrs.name].locked++;
enqueueJobs(job);
jobProcessing();
}
// Mark lock on fly is done for now
self._isLockingOnTheFly = false;
// Re-run in case anything is in the queue
lockOnTheFly();
});
}
/**
* Internal method used to fill a queue with jobs that can be run
* @param {String} name fill a queue with specific job name
* @returns {undefined}
*/
function jobQueueFilling(name) {
// Don't lock because of a limit we have set (lockLimit, etc)
if (!shouldLock(name)) {
debug('lock limit reached in queue filling for [%s]', name);
return;
}
// Set the date of the next time we are going to run _processEvery function
const now = new Date();
self._nextScanAt = new Date(now.valueOf() + self._processEvery);
// For this job name, find the next job to run and lock it!
self._findAndLockNextJob(name, definitions[name], (err, job) => {
if (err) {
debug('[%s] job lock failed while filling queue', name);
throw err;
}
// Still have the job?
// 1. Add it to lock list
// 2. Add count of locked jobs
// 3. Queue the job to actually be run now that it is locked
// 4. Recursively run this same method we are in to check for more available jobs of same type!
if (job) {
debug('[%s:%s] job locked while filling queue', name, job.attrs._id);
self._lockedJobs.push(job);
definitions[job.attrs.name].locked++;
enqueueJobs(job);
jobQueueFilling(name);
jobProcessing();
}
});
}
/**
* Internal method that processes any jobs in the local queue (array)
* @returns {undefined}
*/
function jobProcessing() {
// Ensure we have jobs
if (jobQueue.length === 0) {
return;
}
// Store for all sorts of things
const now = new Date();
// Get the next job that is not blocked by concurrency
let next;
for (next = jobQueue.length - 1; next > 0; next -= 1) {
const def = definitions[jobQueue[next].attrs.name];
if (def.concurrency > def.running) {
break;
}
}
// We now have the job we are going to process and its definition
const job = jobQueue.splice(next, 1)[0];
const jobDefinition = definitions[job.attrs.name];
debug('[%s:%s] about to process job', job.attrs.name, job.attrs._id);
// If the 'nextRunAt' time is older than the current time, run the job
// Otherwise, setTimeout that gets called at the time of 'nextRunAt'
if (job.attrs.nextRunAt < now) {
debug('[%s:%s] nextRunAt is in the past, run the job immediately', job.attrs.name, job.attrs._id);
runOrRetry();
} else {
const runIn = job.attrs.nextRunAt - now;
debug('[%s:%s] nextRunAt is in the future, calling setTimeout(%d)', job.attrs.name, job.attrs._id, runIn);
setTimeout(runOrRetry, runIn);
}
/**
* Internal method that tries to run a job and if it fails, retries again!
* @returns {undefined}
*/
function runOrRetry() {
if (self._processInterval) {
if (jobDefinition.concurrency > jobDefinition.running && self._runningJobs.length < self._maxConcurrency) {
// Get the deadline of when the job is not supposed to go past for locking
const lockDeadline = new Date(Date.now() - jobDefinition.lockLifetime);
// This means a job has "expired", as in it has not been "touched" within the lockoutTime
// Remove from local lock
// NOTE: Shouldn't we update the 'lockedAt' value in MongoDB so it can be picked up on restart?
if (job.attrs.lockedAt < lockDeadline) {
debug('[%s:%s] job lock has expired, freeing it up', job.attrs.name, job.attrs._id);
self._lockedJobs.splice(self._lockedJobs.indexOf(job), 1);
jobDefinition.locked--;
jobProcessing();
return;
}
// Add to local "running" queue
self._runningJobs.push(job);
jobDefinition.running++;
// CALL THE ACTUAL METHOD TO PROCESS THE JOB!!!
debug('[%s:%s] processing job', job.attrs.name, job.attrs._id);
job.run()
.catch(err => [err, job])
.then(job => processJobResult(...Array.isArray(job) ? job : [null, job]));
// Re-run the loop to check for more jobs to process (locally)
jobProcessing();
} else {
// Run the job immediately by putting it on the top of the queue
debug('[%s:%s] concurrency preventing immediate run, pushing job to top of queue', job.attrs.name, job.attrs._id);
enqueueJobs(job, true);
}
}
}
}
/**
* Internal method used to run the job definition
* @param {Error} err thrown if can't process job
* @param {module.Job} job job to process
* @returns {undefined}
*/
function processJobResult(err, job) {
if (err) {
return job.agenda.emit('error', err);
}
const name = job.attrs.name;
// Job isn't in running jobs so throw an error
if (self._runningJobs.indexOf(job) === -1) {
debug('[%s] callback was called, job must have been marked as complete already', job.attrs._id);
throw new Error('callback already called - job ' + name + ' already marked complete');
}
// Remove the job from the running queue
self._runningJobs.splice(self._runningJobs.indexOf(job), 1);
if (definitions[name].running > 0) {
definitions[name].running--;
}
// Remove the job from the locked queue
self._lockedJobs.splice(self._lockedJobs.indexOf(job), 1);
if (definitions[name].locked > 0) {
definitions[name].locked--;
}
// Re-process jobs now that one has finished
jobProcessing();
}
};
</code></pre>
</article>
</section>
</div>
<br class="clear">
<footer>
Documentation generated by <a href="https://github.com/jsdoc3/jsdoc">JSDoc 3.5.5</a>
</footer>
<script src="scripts/linenumber.js"></script>
<script src="scripts/pagelocation.js"></script>
</body>
</html>