rschmukler/agenda

View on GitHub
src/JobProcessor.ts

Summary

Maintainability
F
5 days
Test Coverage
import * as debug from 'debug';
import type { IAgendaJobStatus, IAgendaStatus } from './types/AgendaStatus';
import type { IJobDefinition } from './types/JobDefinition';
import type { Agenda, JobWithId } from './index';
import type { IJobParameters } from './types/JobParameters';
import { Job } from './Job';
import { JobProcessingQueue } from './JobProcessingQueue';

const log = debug('agenda:jobProcessor');

// eslint-disable-next-line @typescript-eslint/no-var-requires,global-require
const { version: agendaVersion } = require('../package.json');

const MAX_SAFE_32BIT_INTEGER = 2 ** 31; // Math.pow(2,31);

/**
 * @class
 * Process methods for jobs
 */
export class JobProcessor {
    private jobStatus: {
        [name: string]: { running: number; locked: number } | undefined;
    } = {};

    private localQueueProcessing = 0;

    async getStatus(fullDetails = false): Promise<IAgendaStatus> {
        const jobStatus = Object.keys(this.jobStatus).reduce((obj, key) => {
            // eslint-disable-next-line no-param-reassign
            obj[key] = {
                ...this.jobStatus[key],
                config: this.agenda.definitions[key]
            };
            return obj;
        }, {}) as IAgendaJobStatus;

        return {
            version: agendaVersion,
            queueName: this.agenda.attrs.name,
            totalQueueSizeDB: await this.agenda.db.getQueueSize(),
            internal: {
                localQueueProcessing: this.localQueueProcessing
            },
            config: {
                totalLockLimit: this.totalLockLimit,
                maxConcurrency: this.maxConcurrency,
                processEvery: this.processEvery
            },
            jobStatus,
            queuedJobs: !fullDetails
                ? this.jobQueue.length
                : this.jobQueue.getQueue().map(job => ({
                        ...job.toJson(),
                        canceled: job.getCanceledMessage()
                  })),
            runningJobs: !fullDetails
                ? this.runningJobs.length
                : this.runningJobs.map(job => ({
                        ...job.toJson(),
                        canceled: job.getCanceledMessage()
                  })),
            lockedJobs: !fullDetails
                ? this.lockedJobs.length
                : this.lockedJobs.map(job => ({
                        ...job.toJson(),
                        canceled: job.getCanceledMessage()
                  })),
            jobsToLock: !fullDetails
                ? this.jobsToLock.length
                : this.jobsToLock.map(job => ({
                        ...job.toJson(),
                        canceled: job.getCanceledMessage()
                  })),
            isLockingOnTheFly: this.isLockingOnTheFly
        };
    }

    private nextScanAt = new Date();

    private jobQueue: JobProcessingQueue = new JobProcessingQueue(this.agenda);

    private runningJobs: JobWithId[] = [];

    private lockedJobs: JobWithId[] = [];

    private jobsToLock: JobWithId[] = [];

    private isLockingOnTheFly = false;

    private isJobQueueFilling = new Map<string, boolean>();

    private isRunning = true;

    private processInterval?: ReturnType<typeof setInterval>;

    constructor(
        private agenda: Agenda,
        private maxConcurrency: number,
        private totalLockLimit: number,
        private processEvery: number
    ) {
        log('creating interval to call processJobs every [%dms]', processEvery);
        this.processInterval = setInterval(() => this.process(), processEvery);
        this.process();
    }

    stop(): JobWithId[] {
        log.extend('stop')('stop job processor', this.isRunning);
        this.isRunning = false;

        if (this.processInterval) {
            clearInterval(this.processInterval);
            this.processInterval = undefined;
        }

        return this.lockedJobs;
    }

    // processJobs
    async process(extraJob?: JobWithId): Promise<void> {
        // Make sure an interval has actually been set
        // Prevents race condition with 'Agenda.stop' and already scheduled run
        if (!this.isRunning) {
            log.extend('process')('JobProcessor got stopped already, returning');
            return;
        }

        // Determine whether or not we have a direct process call!
        if (!extraJob) {
            log.extend('process')('starting to process jobs');

            // Go through each jobName set in 'Agenda.process' and fill the queue with the next jobs
            await Promise.all(
                Object.keys(this.agenda.definitions).map(async jobName => {
                    log.extend('process')('queuing up job to process: [%s]', jobName);
                    await this.jobQueueFilling(jobName);
                })
            );
            this.jobProcessing();
        } else if (
            this.agenda.definitions[extraJob.attrs.name] &&
            // If the extraJob would have been processed in an older scan, process the job immediately
            extraJob.attrs.nextRunAt &&
            extraJob.attrs.nextRunAt < this.nextScanAt
        ) {
            log.extend('process')(
                '[%s:%s] job would have ran by nextScanAt, processing the job immediately',
                extraJob.attrs.name
            );
            // Add the job to list of jobs to lock and then lock it immediately!
            this.jobsToLock.push(extraJob);
            await this.lockOnTheFly();
        }
    }

    /**
     * Returns true if a job of the specified name can be locked.
     * Considers maximum locked jobs at any time if self._lockLimit is > 0
     * Considers maximum locked jobs of the specified name at any time if jobDefinition.lockLimit is > 0
     * @param {String} name name of job to check if we should lock or not
     * @returns {boolean} whether or not you should lock job
     */
    shouldLock(name: string): boolean {
        const jobDefinition = this.agenda.definitions[name];
        let shouldLock = true;
        // global lock limit
        if (this.totalLockLimit && this.lockedJobs.length >= this.totalLockLimit) {
            shouldLock = false;
        }

        // job specific lock limit
        const status = this.jobStatus[name];
        if (jobDefinition.lockLimit && status && status.locked >= jobDefinition.lockLimit) {
            shouldLock = false;
        }

        log.extend('shouldLock')(
            'job [%s] lock status: shouldLock = %s',
            name,
            shouldLock,
            `${status?.locked} >= ${jobDefinition?.lockLimit}`,
            `${this.lockedJobs.length} >= ${this.totalLockLimit}`
        );
        return shouldLock;
    }

    /**
     * Internal method that adds jobs to be processed to the local queue
     * @param {*} jobs Jobs to queue
     * @param {boolean} inFront puts the job in front of queue if true
     * @returns {undefined}
     */
    private enqueueJob(job: Job): void {
        this.jobQueue.insert(job);
    }

    /**
     * Internal method that will lock a job and store it on MongoDB
     * This method is called when we immediately start to process a job without using the process interval
     * We do this because sometimes jobs are scheduled but will be run before the next process time
     * @returns {undefined}
     */
    async lockOnTheFly(): Promise<void> {
        // Already running this? Return
        if (this.isLockingOnTheFly) {
            log.extend('lockOnTheFly')('already running, returning');
            return;
        }

        // Don't have any jobs to run? Return
        if (this.jobsToLock.length === 0) {
            log.extend('lockOnTheFly')('no jobs to current lock on the fly, returning');
            return;
        }

        this.isLockingOnTheFly = true;

        // Set that we are running this
        try {
            // Grab a job that needs to be locked
            const job = this.jobsToLock.pop();

            if (job) {
                if (this.isJobQueueFilling.has(job.attrs.name)) {
                    log.extend('lockOnTheFly')('jobQueueFilling already running for: %s', job.attrs.name);
                    return;
                }

                // If locking limits have been hit, stop locking on the fly.
                // Jobs that were waiting to be locked will be picked up during a
                // future locking interval.
                if (!this.shouldLock(job.attrs.name)) {
                    log.extend('lockOnTheFly')('lock limit hit for: [%s:%S]', job.attrs.name, job.attrs._id);
                    this.jobsToLock = [];
                    return;
                }

                // Lock the job in MongoDB!
                const resp = await this.agenda.db.lockJob(job);

                if (resp) {
                    if (job.attrs.name !== resp.name) {
                        throw new Error(
                            `got different job name: ${resp.name} (actual) !== ${job.attrs.name} (expected)`
                        );
                    }

                    const jobToEnqueue = new Job(this.agenda, resp, true) as JobWithId;

                    // Before en-queing job make sure we haven't exceed our lock limits
                    if (!this.shouldLock(jobToEnqueue.attrs.name)) {
                        log.extend('lockOnTheFly')(
                            'lock limit reached while job was locked in database. Releasing lock on [%s]',
                            jobToEnqueue.attrs.name
                        );
                        this.agenda.db.unlockJob(jobToEnqueue);

                        this.jobsToLock = [];
                        return;
                    }

                    log.extend('lockOnTheFly')(
                        'found job [%s:%s] that can be locked on the fly',
                        jobToEnqueue.attrs.name,
                        jobToEnqueue.attrs._id
                    );
                    this.updateStatus(jobToEnqueue.attrs.name, 'locked', +1);
                    this.lockedJobs.push(jobToEnqueue);
                    this.enqueueJob(jobToEnqueue);
                    this.jobProcessing();
                } else {
                    log.extend('lockOnTheFly')('cannot lock job [%s] on the fly', job.attrs.name);
                }
            }
        } finally {
            // Mark lock on fly is done for now
            this.isLockingOnTheFly = false;
        }

        // Re-run in case anything is in the queue
        await this.lockOnTheFly();
    }

    private async findAndLockNextJob(
        jobName: string,
        definition: IJobDefinition
    ): Promise<JobWithId | undefined> {
        const lockDeadline = new Date(Date.now().valueOf() - definition.lockLifetime);
        log.extend('findAndLockNextJob')(
            `looking for lockable jobs for ${jobName} (lock dead line = ${lockDeadline})`
        );

        // Find ONE and ONLY ONE job and set the 'lockedAt' time so that job begins to be processed
        const result = await this.agenda.db.getNextJobToRun(jobName, this.nextScanAt, lockDeadline);

        if (result) {
            log.extend('findAndLockNextJob')(
                'found a job available to lock, creating a new job on Agenda with id [%s]',
                result._id
            );
            return new Job(this.agenda, result, true) as JobWithId;
        }

        return undefined;
    }

    /**
     * Internal method used to fill a queue with jobs that can be run
     * @param {String} name fill a queue with specific job name
     * @returns {undefined}
     */
    private async jobQueueFilling(name: string): Promise<void> {
        this.isJobQueueFilling.set(name, true);

        try {
            // Don't lock because of a limit we have set (lockLimit, etc)
            if (!this.shouldLock(name)) {
                log.extend('jobQueueFilling')('lock limit reached in queue filling for [%s]', name);
                return;
            }

            // Set the date of the next time we are going to run _processEvery function
            const now = new Date();
            this.nextScanAt = new Date(now.valueOf() + this.processEvery);

            // For this job name, find the next job to run and lock it!
            const job = await this.findAndLockNextJob(name, this.agenda.definitions[name]);

            // Still have the job?
            // 1. Add it to lock list
            // 2. Add count of locked jobs
            // 3. Queue the job to actually be run now that it is locked
            // 4. Recursively run this same method we are in to check for more available jobs of same type!
            if (job) {
                if (job.attrs.name !== name) {
                    throw new Error(
                        `got different job name: ${job.attrs.name} (actual) !== ${name} (expected)`
                    );
                }

                // Before en-queing job make sure we haven't exceed our lock limits
                if (!this.shouldLock(name)) {
                    log.extend('jobQueueFilling')(
                        'lock limit reached before job was returned. Releasing lock on [%s]',
                        name
                    );
                    this.agenda.db.unlockJob(job);
                    return;
                }

                log.extend('jobQueueFilling')(
                    '[%s:%s] job locked while filling queue',
                    name,
                    job.attrs._id
                );
                this.updateStatus(name, 'locked', +1);
                this.lockedJobs.push(job);

                this.enqueueJob(job);
                await this.jobQueueFilling(name);
            } else {
                log.extend('jobQueueFilling')('Cannot lock job [%s]', name);
            }
        } catch (error) {
            log.extend('jobQueueFilling')('[%s] job lock failed while filling queue', name, error);
            this.agenda.emit('error', error);
        } finally {
            this.isJobQueueFilling.delete(name);
        }
    }

    /**
     * Internal method that processes any jobs in the local queue (array)
     * handledJobs keeps list of already processed jobs
     * @returns {undefined}
     */
    private async jobProcessing(handledJobs: IJobParameters['_id'][] = []) {
        // Ensure we have jobs
        if (this.jobQueue.length === 0) {
            return;
        }

        this.localQueueProcessing += 1;

        try {
            const now = new Date();

            // Check if there is any job that is not blocked by concurrency
            const job = this.jobQueue.returnNextConcurrencyFreeJob(this.jobStatus, handledJobs);

            if (!job) {
                log.extend('jobProcessing')('[%s:%s] there is no job to process');
                return;
            }

            this.jobQueue.remove(job);

            if (!(await job.isExpired())) {
                // check if job has expired (and therefore probably got picked up again by another queue in the meantime)
                // before it even has started to run

                log.extend('jobProcessing')(
                    '[%s:%s] there is a job to process (priority = %d)',
                    job.attrs.name,
                    job.attrs._id,
                    job.attrs.priority,
                    job.gotTimerToExecute
                );

                // If the 'nextRunAt' time is older than the current time, run the job
                // Otherwise, setTimeout that gets called at the time of 'nextRunAt'
                if (!job.attrs.nextRunAt || job.attrs.nextRunAt <= now) {
                    log.extend('jobProcessing')(
                        '[%s:%s] nextRunAt is in the past, run the job immediately',
                        job.attrs.name,
                        job.attrs._id
                    );
                    this.runOrRetry(job);
                } else {
                    const runIn = job.attrs.nextRunAt.getTime() - now.getTime();
                    if (runIn > this.processEvery) {
                        // this job is not in the near future, remove it (it will be picked up later)
                        log.extend('runOrRetry')(
                            '[%s:%s] job is too far away, freeing it up',
                            job.attrs.name,
                            job.attrs._id
                        );
                        let lockedJobIndex = this.lockedJobs.indexOf(job);
                        if (lockedJobIndex === -1) {
                            // lookup by id
                            lockedJobIndex = this.lockedJobs.findIndex(
                                j => j.attrs._id?.toString() === job.attrs._id?.toString()
                            );
                        }
                        if (lockedJobIndex === -1) {
                            throw new Error(`cannot find job ${job.attrs._id} in locked jobs queue?`);
                        }

                        this.lockedJobs.splice(lockedJobIndex, 1);
                        this.updateStatus(job.attrs.name, 'locked', -1);
                    } else {
                        log.extend('jobProcessing')(
                            '[%s:%s] nextRunAt is in the future, calling setTimeout(%d)',
                            job.attrs.name,
                            job.attrs._id,
                            runIn
                        );
                        // re add to queue (puts it at the right position in the queue)
                        this.jobQueue.insert(job);
                        // ensure every job gets a timer to run at the near future time (but also ensure this time is set only once)
                        if (!job.gotTimerToExecute) {
                            job.gotTimerToExecute = true;
                            setTimeout(
                                () => {
                                    this.jobProcessing();
                                },
                                runIn > MAX_SAFE_32BIT_INTEGER ? MAX_SAFE_32BIT_INTEGER : runIn
                            ); // check if runIn is higher than unsined 32 bit int, if so, use this time to recheck,
                            // because setTimeout will run in an overflow otherwise and reprocesses immediately
                        }
                    }
                }
            }

            handledJobs.push(job.attrs._id);

            if (job && this.localQueueProcessing < this.maxConcurrency) {
                // additionally run again and check if there are more jobs that we can process right now (as long concurrency not reached)
                setImmediate(() => this.jobProcessing(handledJobs));
            }
        } finally {
            this.localQueueProcessing -= 1;
        }
    }

    /**
     * Internal method that tries to run a job and if it fails, retries again!
     * @returns {boolean} processed a job or not
     */
    private async runOrRetry(job: JobWithId): Promise<void> {
        if (!this.isRunning) {
            // const a = new Error();
            // console.log('STACK', a.stack);
            log.extend('runOrRetry')(
                'JobProcessor got stopped already while calling runOrRetry, returning!'
            );
            return;
        }

        const jobDefinition = this.agenda.definitions[job.attrs.name];
        const status = this.jobStatus[job.attrs.name];

        if (
            (!jobDefinition.concurrency || !status || status.running < jobDefinition.concurrency) &&
            this.runningJobs.length < this.maxConcurrency
        ) {
            // Add to local "running" queue
            this.runningJobs.push(job);
            this.updateStatus(job.attrs.name, 'running', 1);

            let jobIsRunning = true;
            try {
                log.extend('runOrRetry')('[%s:%s] processing job', job.attrs.name, job.attrs._id);

                // check if the job is still alive
                const checkIfJobIsStillAlive = () =>
                    // check every "this.agenda.definitions[job.attrs.name].lockLifetime / 2"" (or at mininum every processEvery)
                    new Promise<void>((resolve, reject) => {
                        setTimeout(async () => {
                            // when job is not running anymore, just finish
                            if (!jobIsRunning) {
                                log.extend('runOrRetry')(
                                    '[%s:%s] checkIfJobIsStillAlive detected job is not running anymore. stopping check.',
                                    job.attrs.name,
                                    job.attrs._id
                                );
                                resolve();
                                return;
                            }

                            if (await job.isExpired()) {
                                log.extend('runOrRetry')(
                                    '[%s:%s] checkIfJobIsStillAlive detected an expired job, killing it.',
                                    job.attrs.name,
                                    job.attrs._id
                                );

                                reject(
                                    new Error(
                                        `execution of '${job.attrs.name}' canceled, execution took more than ${
                                            this.agenda.definitions[job.attrs.name].lockLifetime
                                        }ms. Call touch() for long running jobs to keep them alive.`
                                    )
                                );
                                return;
                            }

                            if (!job.attrs.lockedAt) {
                                log.extend('runOrRetry')(
                                    '[%s:%s] checkIfJobIsStillAlive detected a job without a lockedAt value, killing it.',
                                    job.attrs.name,
                                    job.attrs._id
                                );

                                reject(
                                    new Error(
                                        `execution of '${job.attrs.name}' canceled, no lockedAt date found. Ensure to call touch() for long running jobs to keep them alive.`
                                    )
                                );
                                return;
                            }

                            resolve(checkIfJobIsStillAlive());
                        }, Math.max(this.processEvery / 2, this.agenda.definitions[job.attrs.name].lockLifetime / 2));
                    });
                // CALL THE ACTUAL METHOD TO PROCESS THE JOB!!!
                await Promise.race([job.run(), checkIfJobIsStillAlive()]);

                log.extend('runOrRetry')(
                    '[%s:%s] processing job successfull',
                    job.attrs.name,
                    job.attrs._id
                );

                // Job isn't in running jobs so throw an error
                if (!this.runningJobs.includes(job)) {
                    log.extend('runOrRetry')(
                        '[%s] callback was called, job must have been marked as complete already',
                        job.attrs._id
                    );
                    throw new Error(
                        `callback already called - job ${job.attrs.name} already marked complete`
                    );
                }
            } catch (error: any) {
                job.cancel(error);
                log.extend('runOrRetry')(
                    '[%s:%s] processing job failed',
                    job.attrs.name,
                    job.attrs._id,
                    error
                );
                this.agenda.emit('error', error);
            } finally {
                jobIsRunning = false;

                // Remove the job from the running queue
                let runningJobIndex = this.runningJobs.indexOf(job);
                if (runningJobIndex === -1) {
                    // lookup by id
                    runningJobIndex = this.runningJobs.findIndex(
                        j => j.attrs._id?.toString() === job.attrs._id?.toString()
                    );
                }
                if (runningJobIndex === -1) {
                    // eslint-disable-next-line no-unsafe-finally
                    throw new Error(`cannot find job ${job.attrs._id} in running jobs queue?`);
                }
                this.runningJobs.splice(runningJobIndex, 1);
                this.updateStatus(job.attrs.name, 'running', -1);

                // Remove the job from the locked queue
                let lockedJobIndex = this.lockedJobs.indexOf(job);
                if (lockedJobIndex === -1) {
                    // lookup by id
                    lockedJobIndex = this.lockedJobs.findIndex(
                        j => j.attrs._id?.toString() === job.attrs._id?.toString()
                    );
                }
                if (lockedJobIndex === -1) {
                    // eslint-disable-next-line no-unsafe-finally
                    throw new Error(`cannot find job ${job.attrs._id} in locked jobs queue?`);
                }
                this.lockedJobs.splice(lockedJobIndex, 1);
                this.updateStatus(job.attrs.name, 'locked', -1);
            }

            // Re-process jobs now that one has finished
            setImmediate(() => this.jobProcessing());
            return;
        }

        // Run the job later
        log.extend('runOrRetry')(
            '[%s:%s] concurrency preventing immediate run, pushing job to top of queue',
            job.attrs.name,
            job.attrs._id
        );
        this.enqueueJob(job);
    }

    private updateStatus(name: string, key: 'locked' | 'running', number: -1 | 1) {
        if (!this.jobStatus[name]) {
            this.jobStatus[name] = {
                locked: 0,
                running: 0
            };
        }
        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
        this.jobStatus[name]![key] += number;
    }
}