rschmukler/agenda

View on GitHub
src/index.ts

Summary

Maintainability
C
1 day
Test Coverage
import { EventEmitter } from 'events';
import * as debug from 'debug';

import type { Db, Filter, MongoClientOptions, Sort } from 'mongodb';
import { SortDirection } from 'mongodb';
import { ForkOptions } from 'child_process';
import type { IJobDefinition } from './types/JobDefinition';
import type { IAgendaConfig } from './types/AgendaConfig';
import type { IDatabaseOptions, IDbConfig, IMongoOptions } from './types/DbOptions';
import type { IAgendaStatus } from './types/AgendaStatus';
import type { IJobParameters } from './types/JobParameters';
import { Job, JobWithId } from './Job';
import { JobDbRepository } from './JobDbRepository';
import { JobPriority, parsePriority } from './utils/priority';
import { JobProcessor } from './JobProcessor';
import { calculateProcessEvery } from './utils/processEvery';
import { getCallerFilePath } from './utils/stack';

const log = debug('agenda');

const DefaultOptions = {
    processEvery: 5000,
    defaultConcurrency: 5,
    maxConcurrency: 20,
    defaultLockLimit: 0,
    lockLimit: 0,
    defaultLockLifetime: 10 * 60 * 1000,
    sort: { nextRunAt: 1, priority: -1 } as const,
    forkHelper: { path: 'dist/childWorker.js' }
};

/**
 * @class
 */
export class Agenda extends EventEmitter {
    readonly attrs: IAgendaConfig & IDbConfig;

    public readonly forkedWorker?: boolean;

    public readonly forkHelper?: {
        path: string;
        options?: ForkOptions;
    };

    db: JobDbRepository;

    // internally used
    on(event: 'processJob', listener: (job: JobWithId) => void): this;

    on(event: 'fail', listener: (error: Error, job: JobWithId) => void): this;
    on(event: 'success', listener: (job: JobWithId) => void): this;
    on(event: 'start', listener: (job: JobWithId) => void): this;
    on(event: 'complete', listener: (job: JobWithId) => void): this;
    on(event: string, listener: (job: JobWithId) => void): this;
    on(event: string, listener: (error: Error, job: JobWithId) => void): this;
    on(event: 'ready', listener: () => void): this;
    on(event: 'error', listener: (error: Error) => void): this;
    on(event: string, listener: (...args) => void): this {
        if (this.forkedWorker && event !== 'ready' && event !== 'error') {
            const warning = new Error(`calling on(${event}) during a forkedWorker has no effect!`);
            console.warn(warning.message, warning.stack);
            return this;
        }
        return super.on(event, listener);
    }

    readonly definitions: {
        [name: string]: IJobDefinition;
    } = {};

    private jobProcessor?: JobProcessor;

    readonly ready: Promise<void>;

    isActiveJobProcessor(): boolean {
        return !!this.jobProcessor;
    }

    async runForkedJob(jobId: string) {
        const jobData = await this.db.getJobById(jobId);
        if (!jobData) {
            throw new Error('db entry not found');
        }
        const job = new Job(this, jobData);
        await job.runJob();
    }

    async getRunningStats(fullDetails = false): Promise<IAgendaStatus> {
        if (!this.jobProcessor) {
            throw new Error('agenda not running!');
        }
        return this.jobProcessor.getStatus(fullDetails);
    }

    /**
     * @param {Object} config - Agenda Config
     * @param {Function} cb - Callback after Agenda has started and connected to mongo
     */
    constructor(
        config: {
            name?: string;
            defaultConcurrency?: number;
            processEvery?: string | number;
            maxConcurrency?: number;
            defaultLockLimit?: number;
            lockLimit?: number;
            defaultLockLifetime?: number;
            // eslint-disable-next-line @typescript-eslint/ban-types
        } & (IDatabaseOptions | IMongoOptions | {}) &
            IDbConfig & {
                forkHelper?: { path: string; options?: ForkOptions };
                forkedWorker?: boolean;
            } = DefaultOptions,
        cb?: (error?: Error) => void
    ) {
        super();

        this.attrs = {
            name: config.name || '',
            processEvery: calculateProcessEvery(config.processEvery) || DefaultOptions.processEvery,
            defaultConcurrency: config.defaultConcurrency || DefaultOptions.defaultConcurrency,
            maxConcurrency: config.maxConcurrency || DefaultOptions.maxConcurrency,
            defaultLockLimit: config.defaultLockLimit || DefaultOptions.defaultLockLimit,
            lockLimit: config.lockLimit || DefaultOptions.lockLimit,
            defaultLockLifetime: config.defaultLockLifetime || DefaultOptions.defaultLockLifetime, // 10 minute default lockLifetime
            sort: config.sort || DefaultOptions.sort
        };

        this.forkedWorker = config.forkedWorker;
        this.forkHelper = config.forkHelper;

        this.ready = new Promise(resolve => {
            this.once('ready', resolve);
        });

        if (this.hasDatabaseConfig(config)) {
            this.db = new JobDbRepository(this, config);
            this.db.connect();
        }

        if (cb) {
            this.ready.then(() => cb());
        }
    }

    /**
     * Connect to the spec'd MongoDB server and database.
     */
    async database(
        address: string,
        collection?: string,
        options?: MongoClientOptions
    ): Promise<Agenda> {
        this.db = new JobDbRepository(this, { db: { address, collection, options } });
        await this.db.connect();
        return this;
    }

    /**
     * Use existing mongo connectino to pass into agenda
     * @param mongo
     * @param collection
     */
    async mongo(mongo: Db, collection?: string): Promise<Agenda> {
        this.db = new JobDbRepository(this, { mongo, db: { collection } });
        await this.db.connect();
        return this;
    }

    /**
     * Set the sort query for finding next job
     * Default is { nextRunAt: 1, priority: -1 }
     * @param query
     */
    sort(query: { [key: string]: SortDirection }): Agenda {
        log('Agenda.sort([Object])');
        this.attrs.sort = query;
        return this;
    }

    private hasDatabaseConfig(
        config: unknown
    ): config is (IDatabaseOptions | IMongoOptions) & IDbConfig {
        return !!((config as IDatabaseOptions)?.db?.address || (config as IMongoOptions)?.mongo);
    }

    /**
     * Cancels any jobs matching the passed MongoDB query, and removes them from the database.
     * @param query
     */
    async cancel(query: Filter<IJobParameters>): Promise<number> {
        log('attempting to cancel all Agenda jobs', query);
        try {
            const amountOfRemovedJobs = await this.db.removeJobs(query);
            log('%s jobs cancelled', amountOfRemovedJobs);
            return amountOfRemovedJobs;
        } catch (error) {
            log('error trying to delete jobs from MongoDB');
            throw error;
        }
    }

    /**
     * Set name of queue
     * @param name
     */
    name(name: string): Agenda {
        log('Agenda.name(%s)', name);
        this.attrs.name = name;
        return this;
    }

    /**
     * Set the time how often the job processor checks for new jobs to process
     * @param time
     */
    processEvery(time: string | number): Agenda {
        if (this.jobProcessor) {
            throw new Error(
                'job processor is already running, you need to set processEvery before calling start'
            );
        }
        log('Agenda.processEvery(%d)', time);
        this.attrs.processEvery = calculateProcessEvery(time);
        return this;
    }

    /**
     * Set the concurrency for jobs (globally), type does not matter
     * @param num
     */
    maxConcurrency(num: number): Agenda {
        log('Agenda.maxConcurrency(%d)', num);
        this.attrs.maxConcurrency = num;
        return this;
    }

    /**
     * Set the default concurrency for each job
     * @param num number of max concurrency
     */
    defaultConcurrency(num: number): Agenda {
        log('Agenda.defaultConcurrency(%d)', num);
        this.attrs.defaultConcurrency = num;
        return this;
    }

    /**
     * Set the default amount jobs that are allowed to be locked at one time (GLOBAL)
     * @param num
     */
    lockLimit(num: number): Agenda {
        log('Agenda.lockLimit(%d)', num);
        this.attrs.lockLimit = num;
        return this;
    }

    /**
     * Set default lock limit per job type
     * @param num
     */
    defaultLockLimit(num: number): Agenda {
        log('Agenda.defaultLockLimit(%d)', num);
        this.attrs.defaultLockLimit = num;
        return this;
    }

    /**
     * Set the default lock time (in ms)
     * Default is 10 * 60 * 1000 ms (10 minutes)
     * @param ms
     */
    defaultLockLifetime(ms: number): Agenda {
        log('Agenda.defaultLockLifetime(%d)', ms);
        this.attrs.defaultLockLifetime = ms;
        return this;
    }

    /**
     * Finds all jobs matching 'query'
     * @param query
     * @param sort
     * @param limit
     * @param skip
     */
    async jobs(
        query: Filter<IJobParameters> = {},
        sort: Sort = {},
        limit = 0,
        skip = 0
    ): Promise<Job[]> {
        const result = await this.db.getJobs(query, sort, limit, skip);

        return result.map(job => new Job(this, job));
    }

    /**
     * Removes all jobs from queue
     * @note: Only use after defining your jobs
     */
    async purge(): Promise<number> {
        const definedNames = Object.keys(this.definitions);
        log('Agenda.purge(%o)', definedNames);
        return this.cancel({ name: { $not: { $in: definedNames } } });
    }

    /**
     * Setup definition for job
     * Method is used by consumers of lib to setup their functions
     * BREAKING CHANGE in v4: options moved from 2nd to 3rd parameter!
     * @param name
     * @param processor
     * @param options
     */
    // eslint-disable-next-line @typescript-eslint/no-explicit-any
    define<DATA = any>(
        name: string,
        processor: (agendaJob: Job<DATA>, done: (error?: Error) => void) => void,
        options?: Partial<Pick<IJobDefinition, 'lockLimit' | 'lockLifetime' | 'concurrency'>> & {
            priority?: JobPriority;
        }
    ): void;
    // eslint-disable-next-line @typescript-eslint/no-explicit-any
    define<DATA = any>(
        name: string,
        processor: (agendaJob: Job<DATA>) => Promise<void>,
        options?: Partial<Pick<IJobDefinition, 'lockLimit' | 'lockLifetime' | 'concurrency'>> & {
            priority?: JobPriority;
        }
    ): void;
    define(
        name: string,
        processor: ((job: Job) => Promise<void>) | ((job: Job, done) => void),
        options?: Partial<Pick<IJobDefinition, 'lockLimit' | 'lockLifetime' | 'concurrency'>> & {
            priority?: JobPriority;
        }
    ): void {
        if (this.definitions[name]) {
            log('overwriting already defined agenda job', name);
        }

        const filePath = getCallerFilePath();

        this.definitions[name] = {
            fn: processor,
            filePath,
            concurrency: options?.concurrency || this.attrs.defaultConcurrency,
            lockLimit: options?.lockLimit || this.attrs.defaultLockLimit,
            priority: parsePriority(options?.priority),
            lockLifetime: options?.lockLifetime || this.attrs.defaultLockLifetime
        };
        log('job [%s] defined with following options: \n%O', name, this.definitions[name]);
    }

    /**
     * Internal helper method that uses createJob to create jobs for an array of names
     * @param {Number} interval run every X interval
     * @param {Array<String>} names Strings of jobs to schedule
     * @param {Object} data data to run for job
     * @param {Object} options options to run job for
     * @returns {Array<Job>} array of jobs created
     */
    private async createJobs<DATA = unknown>(
        names: string[],
        createJob: (name: string) => Promise<Job<DATA>>
    ): Promise<Job<DATA>[]> {
        try {
            const jobs = await Promise.all(names.map(name => createJob(name)));

            log('createJobs() -> all jobs created successfully');

            return jobs;
        } catch (error) {
            log('createJobs() -> error creating one or more of the jobs', error);
            throw error;
        }
    }

    /**
     * Given a name and some data, create a new job
     * @param name
     */
    create(name: string): Job<void>;
    create<DATA = unknown>(name: string, data: DATA): Job<DATA>;
    create(name: string, data?: unknown): Job<any> {
        log('Agenda.create(%s, [Object])', name);
        const priority = this.definitions[name] ? this.definitions[name].priority : 0;
        const job = new Job(this, { name, data, type: 'normal', priority });
        return job;
    }

    /**
     * Creates a scheduled job with given interval and name/names of the job to run
     * @param interval
     * @param names
     * @param data
     * @param options
     */
    async every(
        interval: string | number,
        names: string[],
        data?: undefined,
        options?: { timezone?: string; skipImmediate?: boolean; forkMode?: boolean }
    ): Promise<Job<void>[]>;
    async every(
        interval: string | number,
        name: string,
        data?: undefined,
        options?: { timezone?: string; skipImmediate?: boolean; forkMode?: boolean }
    ): Promise<Job<void>>;
    async every<DATA = unknown>(
        interval: string | number,
        names: string[],
        data: DATA,
        options?: { timezone?: string; skipImmediate?: boolean; forkMode?: boolean }
    ): Promise<Job<DATA>[]>;
    async every<DATA = unknown>(
        interval: string | number,
        name: string,
        data: DATA,
        options?: { timezone?: string; skipImmediate?: boolean; forkMode?: boolean }
    ): Promise<Job<DATA>>;
    async every(
        interval: string | number,
        names: string | string[],
        data?: unknown,
        options?: { timezone?: string; skipImmediate?: boolean; forkMode?: boolean }
    ): Promise<Job<any> | Job<any>[]> {
        /**
         * Internal method to setup job that gets run every interval
         * @param {Number} interval run every X interval
         * @param {String} name String job to schedule
         * @param {Object} data data to run for job
         * @param {Object} options options to run job for
         * @returns {Job} instance of job
         */
        log('Agenda.every(%s, %O, %O)', interval, names, options);

        const createJob = async (name: string): Promise<Job> => {
            const job = this.create(name, data);
            job.attrs.type = 'single';
            job.repeatEvery(interval, options);
            if (options?.forkMode) {
                job.forkMode(options.forkMode);
            }
            await job.save();

            return job;
        };

        if (typeof names === 'string') {
            const job = await createJob(names);

            return job;
        }

        log('Agenda.every(%s, %s, %O)', interval, names, options);
        const jobs = await this.createJobs(names, createJob);

        return jobs;
    }

    /**
     * Schedule a job or jobs at a specific time
     * @param when
     * @param names
     */
    async schedule<DATA = void>(when: string | Date, names: string[]): Promise<Job<DATA>[]>;
    async schedule<DATA = void>(when: string | Date, names: string): Promise<Job<DATA>>;
    async schedule<DATA = unknown>(
        when: string | Date,
        names: string[],
        data: DATA
    ): Promise<Job<DATA>[]>;
    async schedule<DATA = unknown>(when: string | Date, name: string, data: DATA): Promise<Job<DATA>>;
    async schedule(
        when: string | Date,
        names: string | string[],
        data?: unknown
    ): Promise<Job | Job[]> {
        const createJob = async (name: string) => {
            const job = this.create(name, data);

            await job.schedule(when).save();

            return job;
        };

        if (typeof names === 'string') {
            log('Agenda.schedule(%s, %O, [%O])', when, names);
            return createJob(names);
        }

        log('Agenda.schedule(%s, %O, [%O])', when, names);
        return this.createJobs(names, createJob);
    }

    /**
     * Create a job for this exact moment
     * @param name
     */
    async now<DATA = void>(name: string): Promise<Job<DATA>>;
    async now<DATA = unknown>(name: string, data: DATA): Promise<Job<DATA>>;
    async now<DATA>(name: string, data?: DATA): Promise<Job<DATA | void>> {
        log('Agenda.now(%s, [Object])', name);
        try {
            const job = this.create(name, data);

            job.schedule(new Date());
            await job.save();

            return job as Job<DATA | void>;
        } catch (error) {
            log('error trying to create a job for this exact moment');
            throw error;
        }
    }

    /**
     * Starts processing jobs using processJobs() methods, storing an interval ID
     * This method will only resolve if a db has been set up beforehand.
     */
    async start(): Promise<void> {
        log(
            'Agenda.start called, waiting for agenda to be initialized (db connection)',
            this.attrs.processEvery
        );
        await this.ready;
        if (this.jobProcessor) {
            log('Agenda.start was already called, ignoring');
            return;
        }

        this.jobProcessor = new JobProcessor(
            this,
            this.attrs.maxConcurrency,
            this.attrs.lockLimit,
            this.attrs.processEvery
        );

        this.on('processJob', this.jobProcessor.process.bind(this.jobProcessor));
    }

    /**
     * Clear the interval that processes the jobs and unlocks all currently locked jobs
     */
    async stop(): Promise<void> {
        if (!this.jobProcessor) {
            log('Agenda.stop called, but agenda has never started!');
            return;
        }

        log('Agenda.stop called, clearing interval for processJobs()');

        const lockedJobs = this.jobProcessor.stop();

        log('Agenda._unlockJobs()');
        const jobIds = lockedJobs?.map(job => job.attrs._id) || [];

        if (jobIds.length > 0) {
            log('about to unlock jobs with ids: %O', jobIds);
            await this.db.unlockJobs(jobIds);
        }

        this.off('processJob', this.jobProcessor.process.bind(this.jobProcessor));

        this.jobProcessor = undefined;
    }
}

export * from './types/AgendaConfig';

export * from './types/JobDefinition';

export * from './types/JobParameters';

export * from './types/DbOptions';

export * from './Job';