rschmukler/agenda

View on GitHub
src/Job.ts

Summary

Maintainability
D
1 day
Test Coverage
import * as date from 'date.js';
import * as debug from 'debug';
import { ObjectId } from 'mongodb';
import { ChildProcess, fork } from 'child_process';
import type { Agenda } from './index';
import type { DefinitionProcessor } from './types/JobDefinition';
import { IJobParameters, datefields, TJobDatefield } from './types/JobParameters';
import { JobPriority, parsePriority } from './utils/priority';
import { computeFromInterval, computeFromRepeatAt } from './utils/nextRunAt';

const log = debug('agenda:job');

/**
 * @class
 */
export class Job<DATA = unknown | void> {
    readonly attrs: IJobParameters<DATA>;

    /** this flag is set to true, if a job got canceled (e.g. due to a timeout or other exception),
     * you can use it for long running tasks to periodically check if canceled is true,
     * also touch will check if and throws that the job got canceled
     */
    private canceled?: Error | true;

    getCanceledMessage() {
        return typeof this.canceled === 'object'
            ? this.canceled?.message || this.canceled
            : this.canceled;
    }

    private forkedChild?: ChildProcess;

    cancel(error?: Error) {
        this.canceled = error || true;
        if (this.forkedChild) {
            try {
                this.forkedChild.send('cancel');
                console.info('canceled child', this.attrs.name, this.attrs._id);
            } catch (err) {
                console.log('cannot send cancel to child');
            }
        }
    }

    /** internal variable to ensure a job does not set unlimited numbers of setTimeouts if the job is not processed
     * immediately */
    gotTimerToExecute: boolean;

    /**
     * creates a new job object
     * @param agenda
     * @param args
     * @param byJobProcessor
     */
    constructor(
        agenda: Agenda,
        args: Partial<IJobParameters<void>> & {
            name: string;
            type: 'normal' | 'single';
        },
        byJobProcessor?: boolean
    );
    constructor(
        agenda: Agenda,
        args: Partial<IJobParameters<DATA>> & {
            name: string;
            type: 'normal' | 'single';
            data: DATA;
        },
        byJobProcessor?: boolean
    );
    constructor(
        readonly agenda: Agenda,
        args: Partial<IJobParameters<DATA>> & {
            name: string;
            type: 'normal' | 'single';
            data: DATA;
        },
        private readonly byJobProcessor = false
    ) {
        // Set attrs to args
        this.attrs = {
            ...args,
            // Set defaults if undefined
            priority: parsePriority(args.priority),
            nextRunAt: args.nextRunAt === undefined ? new Date() : args.nextRunAt,
            type: args.type
        };
    }

    /**
     * Given a job, turn it into an JobParameters object
     */
    toJson(): IJobParameters {
        const result = {} as IJobParameters;

        for (const key of Object.keys(this.attrs)) {
            if (Object.hasOwnProperty.call(this.attrs, key)) {
                result[key] =
                    datefields.includes(key as TJobDatefield) && this.attrs[key]
                        ? new Date(this.attrs[key])
                        : this.attrs[key];
            }
        }

        return result;
    }

    /**
     * Sets a job to repeat every X amount of time
     * @param interval
     * @param options
     */
    repeatEvery(
        interval: string | number,
        options: { timezone?: string; skipImmediate?: boolean } = {}
    ): this {
        this.attrs.repeatInterval = interval;
        this.attrs.repeatTimezone = options.timezone;
        if (options.skipImmediate) {
            // Set the lastRunAt time to the nextRunAt so that the new nextRunAt will be computed in reference to the current value.
            this.attrs.lastRunAt = this.attrs.nextRunAt || new Date();
            this.computeNextRunAt();
            this.attrs.lastRunAt = undefined;
        } else {
            this.computeNextRunAt();
        }

        return this;
    }

    /**
     * Sets a job to repeat at a specific time
     * @param time
     */
    repeatAt(time: string): this {
        this.attrs.repeatAt = time;
        return this;
    }

    /**
     * if set, a job is forked via node child process and runs in a seperate/own
     * thread
     * @param enableForkMode
     */
    forkMode(enableForkMode: boolean): this {
        this.attrs.fork = enableForkMode;
        return this;
    }

    /**
     * Prevents the job from running
     */
    disable(): this {
        this.attrs.disabled = true;
        return this;
    }

    /**
     * Allows job to run
     */
    enable(): this {
        this.attrs.disabled = false;
        return this;
    }

    /**
     * Data to ensure is unique for job to be created
     * @param unique
     * @param opts
     */
    unique(
        unique: Required<IJobParameters<DATA>>['unique'],
        opts?: IJobParameters['uniqueOpts']
    ): this {
        this.attrs.unique = unique;
        this.attrs.uniqueOpts = opts;
        return this;
    }

    /**
     * Schedules a job to run at specified time
     * @param time
     */
    schedule(time: string | Date): this {
        const d = new Date(time);

        this.attrs.nextRunAt = Number.isNaN(d.getTime()) ? date(time) : d;

        return this;
    }

    /**
     * Sets priority of the job
     * @param priority priority of when job should be queued
     */
    priority(priority: JobPriority): this {
        this.attrs.priority = parsePriority(priority);
        return this;
    }

    /**
     * Fails the job with a reason (error) specified
     *
     * @param reason
     */
    fail(reason: Error | string): this {
        this.attrs.failReason = reason instanceof Error ? reason.message : reason;
        this.attrs.failCount = (this.attrs.failCount || 0) + 1;
        const now = new Date();
        this.attrs.failedAt = now;
        this.attrs.lastFinishedAt = now;
        log(
            '[%s:%s] fail() called [%d] times so far',
            this.attrs.name,
            this.attrs._id,
            this.attrs.failCount
        );
        return this;
    }

    private async fetchStatus(): Promise<void> {
        const dbJob = await this.agenda.db.getJobs({ _id: this.attrs._id });
        if (!dbJob || dbJob.length === 0) {
            // @todo: should we just return false instead? a finished job could have been removed from database,
            // and then this would throw...
            throw new Error(`job with id ${this.attrs._id} not found in database`);
        }

        this.attrs.lastRunAt = dbJob[0].lastRunAt;
        this.attrs.lockedAt = dbJob[0].lockedAt;
        this.attrs.lastFinishedAt = dbJob[0].lastFinishedAt;
    }

    /**
     * A job is running if:
     * (lastRunAt exists AND lastFinishedAt does not exist)
     * OR
     * (lastRunAt exists AND lastFinishedAt exists but the lastRunAt is newer [in time] than lastFinishedAt)
     * @returns Whether or not job is running at the moment (true for running)
     */
    async isRunning(): Promise<boolean> {
        if (!this.byJobProcessor || this.attrs.fork) {
            // we have no job definition, therfore we are not the job processor, but a client call
            // so we get the real state from database
            await this.fetchStatus();
        }

        if (!this.attrs.lastRunAt) {
            return false;
        }

        if (!this.attrs.lastFinishedAt) {
            return true;
        }

        if (
            this.attrs.lockedAt &&
            this.attrs.lastRunAt.getTime() > this.attrs.lastFinishedAt.getTime()
        ) {
            return true;
        }

        return false;
    }

    /**
     * Saves a job to database
     */
    async save(): Promise<Job> {
        if (this.agenda.forkedWorker) {
            const warning = new Error('calling save() on a Job during a forkedWorker has no effect!');
            console.warn(warning.message, warning.stack);
            return this as Job;
        }
        // ensure db connection is ready
        await this.agenda.ready;
        return this.agenda.db.saveJob(this as Job);
    }

    /**
     * Remove the job from database
     */
    remove(): Promise<number> {
        return this.agenda.cancel({ _id: this.attrs._id });
    }

    async isDead(): Promise<boolean> {
        return this.isExpired();
    }

    async isExpired(): Promise<boolean> {
        if (!this.byJobProcessor || this.attrs.fork) {
            // we have no job definition, therfore we are not the job processor, but a client call
            // so we get the real state from database
            await this.fetchStatus();
        }

        const definition = this.agenda.definitions[this.attrs.name];

        const lockDeadline = new Date(Date.now() - definition.lockLifetime);

        // This means a job has "expired", as in it has not been "touched" within the lockoutTime
        // Remove from local lock
        if (this.attrs.lockedAt && this.attrs.lockedAt < lockDeadline) {
            return true;
        }
        return false;
    }

    /**
     * Updates "lockedAt" time so the job does not get picked up again
     * @param progress 0 to 100
     */
    async touch(progress?: number): Promise<void> {
        if (this.canceled) {
            throw new Error(`job ${this.attrs.name} got canceled already: ${this.canceled}!`);
        }
        this.attrs.lockedAt = new Date();
        this.attrs.progress = progress;

        await this.agenda.db.saveJobState(this);
    }

    private computeNextRunAt() {
        try {
            if (this.attrs.repeatInterval) {
                this.attrs.nextRunAt = computeFromInterval(this.attrs);
                log(
                    '[%s:%s] nextRunAt set to [%s]',
                    this.attrs.name,
                    this.attrs._id,
                    new Date(this.attrs.nextRunAt).toISOString()
                );
            } else if (this.attrs.repeatAt) {
                this.attrs.nextRunAt = computeFromRepeatAt(this.attrs);

                log(
                    '[%s:%s] nextRunAt set to [%s]',
                    this.attrs.name,
                    this.attrs._id,
                    this.attrs.nextRunAt.toISOString()
                );
            } else {
                this.attrs.nextRunAt = null;
            }
        } catch (error: any) {
            this.attrs.nextRunAt = null;
            this.fail(error);
        }

        return this;
    }

    async run(): Promise<void> {
        this.attrs.lastRunAt = new Date();
        log(
            '[%s:%s] setting lastRunAt to: %s',
            this.attrs.name,
            this.attrs._id,
            this.attrs.lastRunAt.toISOString()
        );
        this.computeNextRunAt();
        await this.agenda.db.saveJobState(this);

        try {
            this.agenda.emit('start', this);
            this.agenda.emit(`start:${this.attrs.name}`, this);
            log('[%s:%s] starting job', this.attrs.name, this.attrs._id);

            if (this.attrs.fork) {
                if (!this.agenda.forkHelper) {
                    throw new Error('no forkHelper specified, you need to set a path to a helper script');
                }
                const { forkHelper } = this.agenda;

                await new Promise<void>((resolve, reject) => {
                    this.forkedChild = fork(
                        forkHelper.path,
                        [
                            this.attrs.name,
                            this.attrs._id!.toString(),
                            this.agenda.definitions[this.attrs.name].filePath || ''
                        ],
                        forkHelper.options
                    );

                    let childError: any;
                    this.forkedChild.on('close', code => {
                        if (code) {
                            console.info(
                                'fork parameters',
                                forkHelper,
                                this.attrs.name,
                                this.attrs._id,
                                this.agenda.definitions[this.attrs.name].filePath
                            );
                            const error = new Error(`child process exited with code: ${code}`);
                            console.warn(error.message, childError || this.canceled);
                            reject(childError || this.canceled || error);
                        } else {
                            resolve();
                        }
                    });
                    this.forkedChild.on('message', message => {
                        // console.log(`Message from child.js: ${message}`, JSON.stringify(message));
                        if (typeof message === 'string') {
                            try {
                                childError = JSON.parse(message);
                            } catch (errJson) {
                                childError = message;
                            }
                        } else {
                            childError = message;
                        }
                    });
                });
            } else {
                await this.runJob();
            }

            this.attrs.lastFinishedAt = new Date();

            this.agenda.emit('success', this);
            this.agenda.emit(`success:${this.attrs.name}`, this);
            log('[%s:%s] has succeeded', this.attrs.name, this.attrs._id);
        } catch (error: any) {
            log('[%s:%s] unknown error occurred', this.attrs.name, this.attrs._id);

            this.fail(error);

            this.agenda.emit('fail', error, this);
            this.agenda.emit(`fail:${this.attrs.name}`, error, this);
            log('[%s:%s] has failed [%s]', this.attrs.name, this.attrs._id, error.message);
        } finally {
            this.forkedChild = undefined;
            this.attrs.lockedAt = undefined;
            try {
                await this.agenda.db.saveJobState(this);
                log('[%s:%s] was saved successfully to MongoDB', this.attrs.name, this.attrs._id);
            } catch (err) {
                // in case this fails, we ignore it
                // this can e.g. happen if the job gets removed during the execution
                log('[%s:%s] was not saved to MongoDB', this.attrs.name, this.attrs._id, err);
            }

            this.agenda.emit('complete', this);
            this.agenda.emit(`complete:${this.attrs.name}`, this);
            log(
                '[%s:%s] job finished at [%s] and was unlocked',
                this.attrs.name,
                this.attrs._id,
                this.attrs.lastFinishedAt
            );
        }
    }

    async runJob() {
        const definition = this.agenda.definitions[this.attrs.name];

        if (!definition) {
            log('[%s:%s] has no definition, can not run', this.attrs.name, this.attrs._id);
            throw new Error('Undefined job');
        }

        if (definition.fn.length === 2) {
            log('[%s:%s] process function being called', this.attrs.name, this.attrs._id);
            await new Promise<void>((resolve, reject) => {
                try {
                    const result = definition.fn(this as Job, error => {
                        if (error) {
                            reject(error);
                            return;
                        }
                        resolve();
                    });

                    if (this.isPromise(result)) {
                        result.catch((error: Error) => reject(error));
                    }
                } catch (error) {
                    reject(error);
                }
            });
        } else {
            log('[%s:%s] process function being called', this.attrs.name, this.attrs._id);
            await (definition.fn as DefinitionProcessor<DATA, void>)(this);
        }
    }

    private isPromise(value: unknown): value is Promise<void> {
        return !!(value && typeof (value as Promise<void>).then === 'function');
    }
}

export type JobWithId = Job & { attrs: IJobParameters & { _id: ObjectId } };