RocketChat/Rocket.Chat

View on GitHub
packages/agenda/src/Agenda.ts

Summary

Maintainability
F
3 days
Test Coverage
import { EventEmitter } from 'events';

import debugInitializer from 'debug';
import humanInterval from 'human-interval';
import { MongoClient } from 'mongodb';
import type { MongoClientOptions, Db, Document, Collection, ModifyResult, InsertOneResult } from 'mongodb';

import { Job } from './Job';
import { JobProcessingQueue } from './JobProcessingQueue';
import { createJob } from './createJob';
import type { IJob } from './definition/IJob';
import type { JobDefinition, JobOptions } from './definition/JobDefinition';
import { hasMongoProtocol } from './lib/hasMongoProtocol';
import { noCallback } from './lib/noCallback';

const debug = debugInitializer('agenda:agenda');

const defaultInterval = 5000;

type JobSort = Partial<Record<keyof IJob, 1 | -1>>;

type MongoTopology = {
    autoReconnect?: boolean;
    connections?(): unknown[];
    isDestroyed?(): boolean;
};

type MongoDB = Db & {
    s?: {
        client?: {
            topology?: MongoTopology;
        };
    };
    db?: {
        s?: {
            client?: {
                topology?: MongoTopology;
            };
        };
    };
    topology?: {
        s?: {
            options?: {
                useUnifiedTopology?: boolean;
            };
        };
    };
};

type AgendaConfig = {
    name?: string;
    processEvery?: string;
    defaultConcurrency?: number;
    maxConcurrency?: number;
    defaultLockLimit?: number;
    lockLimit?: number;
    defaultLockLifeTime?: number;

    sort?: JobSort;
} & (
    | {
            mongo: MongoDB;
            db?: {
                collection?: string;
            };
      }
    | {
            mongo?: undefined;
            db?: {
                address: string;
                collection?: string;
                options?: MongoClientOptions;
            };
      }
);

export type RepeatOptions = { timezone?: string; skipImmediate?: boolean };

export class Agenda extends EventEmitter {
    private _name: string | undefined;

    private _processEvery: number;

    private _defaultConcurrency: number;

    private _maxConcurrency: number;

    private _defaultLockLimit: number;

    private _defaultLockLifetime: number;

    protected _db: MongoClient | undefined;

    private _mdb: MongoDB | undefined;

    private _collection: Collection | undefined;

    private _definitions: Record<string, JobDefinition> = {};

    private _runningJobs: Job[];

    private _lockedJobs: Job[];

    private _jobsToLock: Job[];

    private _jobQueue: JobProcessingQueue;

    private _lockLimit: number;

    private _sort: JobSort;

    private _indexes: JobSort;

    private _isLockingOnTheFly: boolean;

    private _ready: Promise<void>;

    private _processInterval: NodeJS.Timeout | undefined;

    private _nextScanAt?: Date;

    private _mongoUseUnifiedTopology: boolean | undefined;

    constructor(config: AgendaConfig = {}) {
        super();

        this._name = config.name;
        this._processEvery = humanInterval(config.processEvery) || defaultInterval;
        this._defaultConcurrency = config.defaultConcurrency || 5;
        this._maxConcurrency = config.maxConcurrency || 20;
        this._defaultLockLimit = config.defaultLockLimit || 0;
        this._lockLimit = config.lockLimit || 0;

        this._definitions = {};
        this._runningJobs = [];
        this._lockedJobs = [];
        this._jobQueue = new JobProcessingQueue();
        this._defaultLockLifetime = config.defaultLockLifeTime || 10 * 60 * 1000; // 10 minute default lockLifetime
        this._sort = config.sort || { nextRunAt: 1, priority: -1 };
        this._indexes = { name: 1, ...this._sort, priority: -1, lockedAt: 1, nextRunAt: 1, disabled: 1 };

        this._isLockingOnTheFly = false;
        this._jobsToLock = [];
        this._ready = new Promise((resolve) => this.once('ready', resolve));

        if (config.mongo) {
            this.mongo(config.mongo, config.db ? config.db.collection : undefined);
        } else if (config.db) {
            this.database(config.db.address, config.db.collection, config.db.options);
        }
    }

    public mongo(mdb: MongoDB, collection: string | undefined) {
        this._mdb = mdb;
        if (mdb.s && mdb.topology && mdb.topology.s) {
            this._mongoUseUnifiedTopology = Boolean(mdb?.topology?.s?.options?.useUnifiedTopology);
        }
        return this.dbInit(collection);
    }

    /**
     * * NOTE:
     * If `url` includes auth details then `options` must specify: { 'uri_decode_auth': true }. This does Auth on
     * the specified database, not the Admin database. If you are using Auth on the Admin DB and not on the Agenda DB,
     * then you need to authenticate against the Admin DB and then pass the MongoDB instance into the constructor
     * or use Agenda.mongo(). If your app already has a MongoDB connection then use that. ie. specify config.mongo in
     * the constructor or use Agenda.mongo().
     */
    public async database(url: string, collection: string | undefined, options: MongoClientOptions = {}) {
        if (!hasMongoProtocol(url)) {
            url = `mongodb://${url}`;
        }

        collection = collection || 'agendaJobs';

        options = {
            ...options,
        };

        try {
            const client = await MongoClient.connect(url, options);
            debug('successful connection to MongoDB using collection: [%s]', collection);
            this._db = client;
            this._mdb = client.db();
            this.dbInit(collection);
        } catch (error) {
            debug('error connecting to MongoDB using collection: [%s]', collection);
            return error;
        }
    }

    public async dbInit(collection: string | undefined) {
        debug('init database collection using name [%s]', collection);
        this._collection = this.getMongoDB().collection(collection || 'agendaJobs');
        debug('attempting index creation');
        try {
            await this._collection.createIndex(this._indexes, { name: 'findAndLockNextJobIndex' });
            debug('index creation success');
            this.emit('ready');
        } catch (err) {
            debug('index creation failed');
            this.emit('error', err);
        }
    }

    public name(name: string): Agenda {
        debug('Agenda.name(%s)', name);
        this._name = name;
        return this;
    }

    public processEvery(time: string): Agenda {
        debug('Agenda.processEvery(%d)', time);
        this._processEvery = humanInterval(time) || defaultInterval;
        return this;
    }

    public maxConcurrency(num: number): Agenda {
        debug('Agenda.maxConcurrency(%d)', num);
        this._maxConcurrency = num;
        return this;
    }

    public defaultConcurrency(num: number): Agenda {
        debug('Agenda.defaultConcurrency(%d)', num);
        this._defaultConcurrency = num;
        return this;
    }

    public lockLimit(num: number): Agenda {
        debug('Agenda.lockLimit(%d)', num);
        this._lockLimit = num;
        return this;
    }

    public defaultLockLimit(num: number): Agenda {
        debug('Agenda.defaultLockLimit(%d)', num);
        this._defaultLockLimit = num;
        return this;
    }

    public defaultLockLifetime(ms: number): Agenda {
        debug('Agenda.defaultLockLifetime(%d)', ms);
        this._defaultLockLifetime = ms;
        return this;
    }

    public sort(query: JobSort): Agenda {
        debug('Agenda.sort([Object])');
        this._sort = query;
        return this;
    }

    public create(name: string, data: IJob['data'] = {}): Job {
        debug('Agenda.create(%s, [Object])', name);
        const priority = this._definitions[name] ? this._definitions[name].priority : 0;
        const job = new Job({
            name,
            data,
            type: 'normal',
            priority,
            agenda: this,
        });
        return job;
    }

    private getCollection(): Collection {
        if (!this._collection) {
            throw new Error('Agenda instance is not ready yet');
        }

        return this._collection;
    }

    private getMongoDB(): MongoDB {
        if (!this._mdb) {
            throw new Error('Agenda instance is not ready yet');
        }

        return this._mdb;
    }

    public async jobs(query = {}, sort = {}, limit = 0, skip = 0): Promise<Job[]> {
        const result = await this.getCollection().find<IJob>(query).sort(sort).limit(limit).skip(skip).toArray();

        return result.map((job) => createJob(this, job));
    }

    public async purge(): Promise<unknown> {
        // @NOTE: Only use after defining your jobs
        const definedNames = Object.keys(this._definitions);
        debug('Agenda.purge(%o)', definedNames);
        return this.cancel({ name: { $not: { $in: definedNames } } });
    }

    public define(name: string, processor: JobDefinition['fn']): void;

    public define(name: string, maybeOptions: Partial<JobOptions> | JobDefinition['fn'], maybeProcessor?: JobDefinition['fn']): void {
        const processor: JobDefinition['fn'] = maybeProcessor || (maybeOptions as JobDefinition['fn']);
        const options: Partial<JobOptions> = maybeProcessor ? (maybeOptions as Partial<JobOptions>) : {};

        this._definitions[name] = {
            fn: processor,
            concurrency: options.concurrency || this._defaultConcurrency,
            lockLimit: options.lockLimit || this._defaultLockLimit,
            priority: options.priority || 0,
            lockLifetime: options.lockLifetime || this._defaultLockLifetime,
            running: 0,
            locked: 0,
        };
        debug('job [%s] defined with following options: \n%O', name, this._definitions[name]);
    }

    public async every(interval: string | number, name: string, data: IJob['data'], options: RepeatOptions): Promise<Job>;

    public async every(interval: string | number, names: string[], data: IJob['data'], options: RepeatOptions): Promise<Job[]>;

    public async every(
        interval: string | number,
        names: string | string[],
        data: IJob['data'],
        options: RepeatOptions,
    ): Promise<Job | Job[] | undefined> {
        if (typeof names === 'string') {
            debug('Agenda.every(%s, %O, %O)', interval, names, options);
            return this._createIntervalJob(interval, names, data, options);
        }

        if (Array.isArray(names)) {
            debug('Agenda.every(%s, %s, %O)', interval, names, options);
            return this._createIntervalJobs(interval, names, data, options);
        }

        throw new Error('Unexpected error: Invalid job name(s)');
    }

    public async _createIntervalJob(interval: string | number, name: string, data: IJob['data'], options: RepeatOptions): Promise<Job> {
        const job = this.create(name, data);
        job.attrs.type = 'single';
        job.repeatEvery(interval, options);
        await job.save();

        return job;
    }

    private _createIntervalJobs(
        interval: string | number,
        names: string[],
        data: IJob['data'],
        options: RepeatOptions,
    ): Promise<Job[]> | undefined {
        try {
            const jobs = names.map((name) => this._createIntervalJob(interval, name, data, options));
            debug('every() -> all jobs created successfully');

            return Promise.all(jobs);
        } catch (error) {
            debug('every() -> error creating one or more of the jobs', error);
        }
    }

    private async _createScheduledJob(when: string | Date, name: string, data: IJob['data']): Promise<Job> {
        const job = this.create(name, data);
        await job.schedule(when).save();
        return job;
    }

    private async _createScheduledJobs(when: string | Date, names: string[], data: IJob['data']): Promise<Job[]> {
        try {
            const jobs = await Promise.all(names.map((name) => this._createScheduledJob(when, name, data)));
            debug('Agenda.schedule()::createJobs() -> all jobs created successfully');
            return jobs;
        } catch (error) {
            debug('Agenda.schedule()::createJobs() -> error creating one or more of the jobs');
            throw error;
        }
    }

    public schedule(when: string | Date, name: string, data: IJob['data']): Promise<Job>;

    public schedule(when: string | Date, names: string[], data: IJob['data']): Promise<Job[]>;

    public schedule(when: string | Date, names: string | string[], data: IJob['data']): Promise<Job | Job[]> {
        if (typeof names === 'string') {
            debug('Agenda.schedule(%s, %O, [%O], cb)', when, names);
            return this._createScheduledJob(when, names, data);
        }

        if (Array.isArray(names)) {
            debug('Agenda.schedule(%s, %O, [%O])', when, names);
            return this._createScheduledJobs(when, names, data);
        }

        throw new Error('Unexpected error: invalid job name(s)');
    }

    public async now(name: string, data: IJob['data'], ...args: Array<any>): Promise<Job> {
        debug('Agenda.now(%s, [Object])', name);
        try {
            noCallback([name, data, ...args], 2);
            const job = this.create(name, data);

            job.schedule(new Date());
            await job.save();

            return job;
        } catch (error) {
            debug('error trying to create a job for this exact moment');
            throw error;
        }
    }

    public async cancel(query: Record<string, any>): Promise<number> {
        debug('attempting to cancel all Agenda jobs', query);
        try {
            const { deletedCount } = await this.getCollection().deleteMany(query);
            debug('%s jobs cancelled', deletedCount || 0);
            return deletedCount || 0;
        } catch (error) {
            debug('error trying to delete jobs from MongoDB');
            throw error;
        }
    }

    public async has(query: Record<string, any>): Promise<boolean> {
        debug('checking whether Agenda has any jobs matching query', query);
        const record = await this.getCollection().findOne(query, { projection: { _id: 1 } });
        return record !== null;
    }

    private async _processDbResult(job: Job, result: ModifyResult | InsertOneResult): Promise<void> {
        debug('processDbResult() called with success, checking whether to process job immediately or not');

        // We have a result from the above calls
        // findOneAndUpdate() returns different results than insertOne() so check for that
        const res = await (async (): Promise<Document | null> => {
            if ('value' in result) {
                return result.value;
            }

            if ('insertedId' in result) {
                return this.getCollection().findOne({ _id: result.insertedId });
            }

            return null;
        })();

        if (!res) {
            debug('job not found');
            return;
        }

        job.attrs._id = res._id;
        job.attrs.nextRunAt = res.nextRunAt;

        // If the current job would have been processed in an older scan, process the job immediately
        if (job.attrs.nextRunAt && job.attrs.nextRunAt < (this._nextScanAt as Date)) {
            debug('[%s:%s] job would have ran by nextScanAt, processing the job immediately', job.attrs.name, res._id);
            this.processJobs(job);
        }
    }

    private async _updateJob(job: Job, props: Record<string, any>): Promise<void> {
        const id = job.attrs._id;
        const update = {
            $set: props,
        };

        // Update the job and process the resulting data'
        debug('job already has _id, calling findOneAndUpdate() using _id as query');
        const result = await this.getCollection().findOneAndUpdate({ _id: id }, update, { returnDocument: 'after' });

        return this._processDbResult(job, result);
    }

    private async _saveSingleJob(job: Job, props: Record<string, any>, now: Date): Promise<void> {
        // Job type set to 'single' so...
        debug('job with type of "single" found');

        const { nextRunAt, ...$set } = props;
        const $setOnInsert: Record<string, any> = {};

        if (nextRunAt && nextRunAt <= now) {
            debug('job has a scheduled nextRunAt time, protecting that field from upsert');
            $setOnInsert.nextRunAt = nextRunAt;
        } else {
            $set.nextRunAt = nextRunAt;
        }

        const update = {
            $set,
            ...(Object.keys($setOnInsert).length && { $setOnInsert }),
        };

        // Try an upsert
        debug('calling findOneAndUpdate() with job name and type of "single" as query');
        const result = await this.getCollection().findOneAndUpdate(
            {
                name: props.name,
                type: 'single',
            },
            update,
            {
                upsert: true,
                returnDocument: 'after',
            },
        );

        return this._processDbResult(job, result);
    }

    private async _saveUniqueJob(job: Job, props: Record<string, any>): Promise<void> {
        // If we want the job to be unique, then we can upsert based on the 'unique' query object that was passed in
        const { unique: query, uniqueOpts } = job.attrs;
        if (!query) {
            throw new Error('Unexpected Error: No unique data to store on the job');
        }

        query.name = props.name;
        const update = uniqueOpts?.insertOnly ? { $setOnInsert: props } : { $set: props };

        // Use the 'unique' query object to find an existing job or create a new one
        debug('calling findOneAndUpdate() with unique object as query: \n%O', query);
        const result = await this.getCollection().findOneAndUpdate(query, update, { upsert: true, returnDocument: 'after' });
        return this._processDbResult(job, result);
    }

    private async _saveNewJob(job: Job, props: Record<string, any>): Promise<void> {
        // If all else fails, the job does not exist yet so we just insert it into MongoDB
        debug('using default behavior, inserting new job via insertOne() with props that were set: \n%O', props);
        const result = await this.getCollection().insertOne(props);
        return this._processDbResult(job, result);
    }

    public async saveJob(job: Job): Promise<void> {
        try {
            debug('attempting to save a job into Agenda instance');

            // Grab information needed to save job but that we don't want to persist in MongoDB
            const id = job.attrs._id;

            const props = job.toJSON();
            // delete props._id;
            // delete props.unique;
            // delete props.uniqueOpts;

            // Store name of agenda queue as last modifier in job data
            props.lastModifiedBy = this._name;
            debug('[job %s] set job props: \n%O', id, props);

            // Grab current time and set default query options for MongoDB
            const now = new Date();
            debug('current time stored as %s', now.toISOString());

            // If the job already had an ID, then update the properties of the job
            // i.e, who last modified it, etc
            if (id) {
                return this._updateJob(job, props);
            }

            if (props.type === 'single') {
                return this._saveSingleJob(job, props, now);
            }

            if (job.attrs.unique) {
                return this._saveUniqueJob(job, props);
            }

            return this._saveNewJob(job, props);
        } catch (error) {
            debug('processDbResult() received an error, job was not updated/created');
            throw error;
        }
    }

    public async start(): Promise<void> {
        if (this._processInterval) {
            debug('Agenda.start was already called, ignoring');
            return this._ready;
        }

        await this._ready;
        debug('Agenda.start called, creating interval to call processJobs every [%dms]', this._processEvery);
        this._processInterval = setInterval(() => this.processJobs(), this._processEvery || defaultInterval);
        process.nextTick(() => this.processJobs());
    }

    private async _unlockJobs(): Promise<void> {
        debug('Agenda._unlockJobs()');
        const jobIds = this._lockedJobs.map((job) => job.attrs._id);

        if (jobIds.length === 0) {
            debug('no jobs to unlock');
            return;
        }

        debug('about to unlock jobs with ids: %O', jobIds);
        await this.getCollection().updateMany({ _id: { $in: jobIds } }, { $set: { lockedAt: null } });
        this._lockedJobs = [];
    }

    public stop(): Promise<void> {
        debug('Agenda.stop called, clearing interval for processJobs()');
        clearInterval(this._processInterval as NodeJS.Timeout);
        this._processInterval = undefined;

        return this._unlockJobs();
    }

    public getDefinition(jobName: string): JobDefinition {
        return this._definitions[jobName];
    }

    private async _findAndLockNextJob(jobName: string, definition: JobDefinition): Promise<Job | undefined> {
        const now = new Date();
        const lockDeadline = new Date(Date.now().valueOf() - definition.lockLifetime);
        debug('_findAndLockNextJob(%s, [Function])', jobName);

        // Don't try and access MongoDB if we've lost connection to it.
        const client = (this.getMongoDB() || this.getMongoDB().db)?.s?.client;
        if (client?.topology?.connections?.().length === 0 && !this._mongoUseUnifiedTopology) {
            if (client.topology.autoReconnect && !client.topology.isDestroyed?.()) {
                // Continue processing but notify that Agenda has lost the connection
                debug('Missing MongoDB connection, not attempting to find and lock a job');
                this.emit('error', new Error('Lost MongoDB connection'));
            } else {
                // No longer recoverable
                debug('topology.autoReconnect: %s, topology.isDestroyed(): %s', client.topology.autoReconnect, client.topology.isDestroyed?.());
                throw new Error('MongoDB connection is not recoverable, application restart required');
            }
        } else {
            // /**
            // * Query used to find job to run
            // * @type {{$and: [*]}}
            // */
            const JOB_PROCESS_WHERE_QUERY = {
                $and: [
                    {
                        name: jobName,
                        disabled: { $ne: true },
                    },
                    {
                        $or: [
                            {
                                lockedAt: { $eq: null },
                                nextRunAt: { $lte: this._nextScanAt },
                            },
                            {
                                lockedAt: { $lte: lockDeadline },
                            },
                        ],
                    },
                ],
            };

            const JOB_PROCESS_SET_QUERY = { $set: { lockedAt: now } };

            // Find ONE and ONLY ONE job and set the 'lockedAt' time so that job begins to be processed
            const result = await this.getCollection().findOneAndUpdate(JOB_PROCESS_WHERE_QUERY, JOB_PROCESS_SET_QUERY, {
                returnDocument: 'after',
                sort: this._sort,
            });

            let job;
            if (result.value) {
                debug('found a job available to lock, creating a new job on Agenda with id [%s]', result.value._id);
                job = createJob(this, result.value as unknown as IJob);
            }

            return job;
        }
    }

    /**
     * Returns true if a job of the specified name can be locked.
     * Considers maximum locked jobs at any time if self._lockLimit is > 0
     * Considers maximum locked jobs of the specified name at any time if jobDefinition.lockLimit is > 0
     */
    private _shouldLock(name: string): boolean {
        const jobDefinition = this.getDefinition(name);
        let shouldLock = true;
        if (this._lockLimit && this._lockLimit <= this._lockedJobs.length) {
            shouldLock = false;
        }

        if (jobDefinition.lockLimit && jobDefinition.lockLimit <= jobDefinition.locked) {
            shouldLock = false;
        }

        debug('job [%s] lock status: shouldLock = %s', name, shouldLock);
        return shouldLock;
    }

    private _enqueueJobs(job: Job | Job[]): void {
        const jobs = Array.isArray(job) ? job : [job];

        jobs.forEach((job) => this._jobQueue.insert(job));
    }

    /**
     * Internal method that will lock a job and store it on MongoDB
     * This method is called when we immediately start to process a job without using the process interval
     * We do this because sometimes jobs are scheduled but will be run before the next process time
     */
    private async _lockOnTheFly(): Promise<void> {
        // Already running this? Return
        if (this._isLockingOnTheFly) {
            debug('lockOnTheFly() already running, returning');
            return;
        }

        // Don't have any jobs to run? Return
        if (this._jobsToLock.length === 0) {
            debug('no jobs to current lock on the fly, returning');
            this._isLockingOnTheFly = false;
            return;
        }

        // Set that we are running this
        this._isLockingOnTheFly = true;

        // Grab a job that needs to be locked
        const now = new Date();
        const job = this._jobsToLock.pop();
        if (!job) {
            throw new Error('Unexpected Error: Job not found [lockOnTheFly]');
        }

        // If locking limits have been hit, stop locking on the fly.
        // Jobs that were waiting to be locked will be picked up during a
        // future locking interval.
        if (!this._shouldLock(job.attrs.name)) {
            debug('lock limit hit for: [%s]', job.attrs.name);
            this._jobsToLock = [];
            this._isLockingOnTheFly = false;
            return;
        }

        // Query to run against collection to see if we need to lock it
        const criteria = {
            _id: job.attrs._id,
            lockedAt: null,
            nextRunAt: job.attrs.nextRunAt,
            disabled: { $ne: true },
        };

        // Update / options for the MongoDB query
        const update = { $set: { lockedAt: now } };

        // Lock the job in MongoDB!
        const resp = await this.getCollection().findOneAndUpdate(criteria, update, { returnDocument: 'after' });

        if (resp.value) {
            const job = createJob(this, resp.value as unknown as IJob);
            debug('found job [%s] that can be locked on the fly', job.attrs.name);
            this._lockedJobs.push(job);
            this._definitions[job.attrs.name].locked++;
            this._enqueueJobs(job);
            this._jobProcessing();
        }

        // Mark lock on fly is done for now
        this._isLockingOnTheFly = false;

        // Re-run in case anything is in the queue
        await this._lockOnTheFly();
    }

    private async _jobQueueFilling(name: string): Promise<void> {
        // Don't lock because of a limit we have set (lockLimit, etc)
        if (!this._shouldLock(name)) {
            debug('lock limit reached in queue filling for [%s]', name);
            return;
        }

        // Set the date of the next time we are going to run _processEvery function
        this._nextScanAt = new Date(Date.now() + this._processEvery || defaultInterval);

        // For this job name, find the next job to run and lock it!
        try {
            const job = await this._findAndLockNextJob(name, this._definitions[name]);
            // Still have the job?
            // 1. Add it to lock list
            // 2. Add count of locked jobs
            // 3. Queue the job to actually be run now that it is locked
            // 4. Recursively run this same method we are in to check for more available jobs of same type!
            if (job) {
                debug('[%s:%s] job locked while filling queue', name, job.attrs._id);
                this._lockedJobs.push(job);
                this._definitions[job.attrs.name].locked++;
                this._enqueueJobs(job);
                await this._jobQueueFilling(name);
                this._jobProcessing();
            }
        } catch (error) {
            debug('[%s] job lock failed while filling queue', name, error);
        }
    }

    private async _runOrRetry(): Promise<void> {
        if (!this._processInterval) {
            return;
        }

        const job = this._jobQueue.pop();
        if (!job) {
            return;
        }

        const jobDefinition = this._definitions[job.attrs.name];
        if (jobDefinition.concurrency > jobDefinition.running && this._runningJobs.length < this._maxConcurrency) {
            // Get the deadline of when the job is not supposed to go past for locking
            const lockDeadline = new Date(Date.now() - jobDefinition.lockLifetime);

            // This means a job has "expired", as in it has not been "touched" within the lockoutTime
            // Remove from local lock
            // NOTE: Shouldn't we update the 'lockedAt' value in MongoDB so it can be picked up on restart?
            if (job.attrs.lockedAt && job.attrs.lockedAt < lockDeadline) {
                debug('[%s:%s] job lock has expired, freeing it up', job.attrs.name, job.attrs._id);
                this._lockedJobs.splice(this._lockedJobs.indexOf(job), 1);
                jobDefinition.locked--;
                this._jobProcessing();
                return;
            }

            // Add to local "running" queue
            this._runningJobs.push(job);
            jobDefinition.running++;

            // CALL THE ACTUAL METHOD TO PROCESS THE JOB!!!
            debug('[%s:%s] processing job', job.attrs.name, job.attrs._id);

            job
                .run()
                .then((jobRan) => [null, jobRan])
                .catch((error) => [error, job])
                .then(([error, jobRan]) => this._processJobResult(error, jobRan));
        } else {
            // Run the job immediately by putting it on the top of the queue
            debug('[%s:%s] concurrency preventing immediate run, pushing job to top of queue', job.attrs.name, job.attrs._id);
            this._enqueueJobs(job);
        }
    }

    private _jobProcessing(): void {
        // Ensure we have jobs
        if (this._jobQueue.length === 0) {
            return;
        }

        // Store for all sorts of things
        const now = new Date();

        // Get the next job that is not blocked by concurrency
        const job = this._jobQueue.returnNextConcurrencyFreeJob(this._definitions);

        if (!job.attrs.nextRunAt) {
            return;
        }

        debug('[%s:%s] about to process job', job.attrs.name, job.attrs._id);

        // If the 'nextRunAt' time is older than the current time, run the job
        // Otherwise, setTimeout that gets called at the time of 'nextRunAt'
        if (job.attrs.nextRunAt <= now) {
            debug('[%s:%s] nextRunAt is in the past, run the job immediately', job.attrs.name, job.attrs._id);
            this._runOrRetry();
        } else {
            const runIn = job.attrs.nextRunAt.valueOf() - now.valueOf();
            debug('[%s:%s] nextRunAt is in the future, calling setTimeout(%d)', job.attrs.name, job.attrs._id, runIn);
            setTimeout(() => this._jobProcessing(), runIn);
        }
    }

    private _processJobResult(err: Error | null, job: Job): void {
        if (err) {
            job.agenda.emit('error', err);
            return;
        }

        const { name } = job.attrs;

        // Job isn't in running jobs so throw an error
        if (!this._runningJobs.includes(job)) {
            debug('[%s] callback was called, job must have been marked as complete already', job.attrs._id);
            throw new Error(`callback already called - job ${name} already marked complete`);
        }

        // Remove the job from the running queue
        this._runningJobs.splice(this._runningJobs.indexOf(job), 1);
        if (this._definitions[name].running > 0) {
            this._definitions[name].running--;
        }

        // Remove the job from the locked queue
        this._lockedJobs.splice(this._lockedJobs.indexOf(job), 1);
        if (this._definitions[name].locked > 0) {
            this._definitions[name].locked--;
        }

        // Re-process jobs now that one has finished
        this._jobProcessing();
    }

    public processJobs(extraJob?: Job): void {
        debug('starting to process jobs');
        // Make sure an interval has actually been set
        // Prevents race condition with 'Agenda.stop' and already scheduled run
        if (!this._processInterval) {
            debug('no _processInterval set when calling processJobs, returning');
            return;
        }

        let jobName;

        // Determine whether or not we have a direct process call!
        if (!extraJob) {
            // Go through each jobName set in 'Agenda.process' and fill the queue with the next jobs
            for (jobName in this._definitions) {
                if (this._definitions.hasOwnProperty(jobName)) {
                    debug('queuing up job to process: [%s]', jobName);
                    this._jobQueueFilling(jobName);
                }
            }
        } else if (this._definitions[extraJob.attrs.name]) {
            // Add the job to list of jobs to lock and then lock it immediately!
            debug('job [%s] was passed directly to processJobs(), locking and running immediately', extraJob.attrs.name);
            this._jobsToLock.push(extraJob);
            this._lockOnTheFly();
        }
    }
}