rschmukler/agenda

View on GitHub
src/JobDbRepository.ts

Summary

Maintainability
C
1 day
Test Coverage
import * as debug from 'debug';
import {
    Collection,
    Db,
    Filter,
    FindOneAndUpdateOptions,
    MongoClient,
    MongoClientOptions,
    ObjectId,
    Sort,
    UpdateFilter
} from 'mongodb';
import type { Job, JobWithId } from './Job';
import type { Agenda } from './index';
import type { IDatabaseOptions, IDbConfig, IMongoOptions } from './types/DbOptions';
import type { IJobParameters } from './types/JobParameters';
import { hasMongoProtocol } from './utils/hasMongoProtocol';

const log = debug('agenda:db');

/**
 * @class
 */
export class JobDbRepository {
    collection: Collection<IJobParameters>;

    constructor(
        private agenda: Agenda,
        private connectOptions: (IDatabaseOptions | IMongoOptions) & IDbConfig
    ) {
        this.connectOptions.sort = this.connectOptions.sort || { nextRunAt: 1, priority: -1 };
    }

    private async createConnection(): Promise<Db> {
        const { connectOptions } = this;
        if (this.hasDatabaseConfig(connectOptions)) {
            log('using database config', connectOptions);
            return this.database(connectOptions.db.address, connectOptions.db.options);
        }

        if (this.hasMongoConnection(connectOptions)) {
            log('using passed in mongo connection');
            return connectOptions.mongo;
        }

        throw new Error('invalid db config, or db config not found');
    }

    private hasMongoConnection(connectOptions: unknown): connectOptions is IMongoOptions {
        return !!(connectOptions as IMongoOptions)?.mongo;
    }

    private hasDatabaseConfig(connectOptions: unknown): connectOptions is IDatabaseOptions {
        return !!(connectOptions as IDatabaseOptions)?.db?.address;
    }

    async getJobById(id: string) {
        return this.collection.findOne({ _id: new ObjectId(id) });
    }

    async getJobs(
        query: Filter<IJobParameters>,
        sort: Sort = {},
        limit = 0,
        skip = 0
    ): Promise<IJobParameters[]> {
        return this.collection.find(query).sort(sort).limit(limit).skip(skip).toArray();
    }

    async removeJobs(query: Filter<IJobParameters>): Promise<number> {
        const result = await this.collection.deleteMany(query);
        return result.deletedCount || 0;
    }

    async getQueueSize(): Promise<number> {
        return this.collection.countDocuments({ nextRunAt: { $lt: new Date() } });
    }

    async unlockJob(job: Job): Promise<void> {
        // only unlock jobs which are not currently processed (nextRunAT is not null)
        await this.collection.updateOne(
            { _id: job.attrs._id, nextRunAt: { $ne: null } },
            { $unset: { lockedAt: true } }
        );
    }

    /**
     * Internal method to unlock jobs so that they can be re-run
     */
    async unlockJobs(jobIds: ObjectId[]): Promise<void> {
        await this.collection.updateMany(
            { _id: { $in: jobIds }, nextRunAt: { $ne: null } },
            { $unset: { lockedAt: true } }
        );
    }

    async lockJob(job: JobWithId): Promise<IJobParameters | undefined> {
        // Query to run against collection to see if we need to lock it
        const criteria: Filter<Omit<IJobParameters, 'lockedAt'> & { lockedAt?: Date | null }> = {
            _id: job.attrs._id,
            name: job.attrs.name,
            lockedAt: null,
            nextRunAt: job.attrs.nextRunAt,
            disabled: { $ne: true }
        };

        // Update / options for the MongoDB query
        const update: UpdateFilter<IJobParameters> = { $set: { lockedAt: new Date() } };
        const options: FindOneAndUpdateOptions = {
            returnDocument: 'after',
            sort: this.connectOptions.sort
        };

        // Lock the job in MongoDB!
        const resp = await this.collection.findOneAndUpdate(
            criteria as Filter<IJobParameters>,
            update,
            options
        );

        return resp?.value || undefined;
    }

    async getNextJobToRun(
        jobName: string,
        nextScanAt: Date,
        lockDeadline: Date,
        now: Date = new Date()
    ): Promise<IJobParameters | undefined> {
        /**
         * Query used to find job to run
         */
        const JOB_PROCESS_WHERE_QUERY: Filter<IJobParameters /* Omit<IJobParameters, 'lockedAt'> & { lockedAt?: Date | null } */> =
            {
                name: jobName,
                disabled: { $ne: true },
                $or: [
                    {
                        lockedAt: { $eq: null as any },
                        nextRunAt: { $lte: nextScanAt }
                    },
                    {
                        lockedAt: { $lte: lockDeadline }
                    }
                ]
            };

        /**
         * Query used to set a job as locked
         */
        const JOB_PROCESS_SET_QUERY: UpdateFilter<IJobParameters> = { $set: { lockedAt: now } };

        /**
         * Query used to affect what gets returned
         */
        const JOB_RETURN_QUERY: FindOneAndUpdateOptions = {
            returnDocument: 'after',
            sort: this.connectOptions.sort
        };

        // Find ONE and ONLY ONE job and set the 'lockedAt' time so that job begins to be processed
        const result = await this.collection.findOneAndUpdate(
            JOB_PROCESS_WHERE_QUERY,
            JOB_PROCESS_SET_QUERY,
            JOB_RETURN_QUERY
        );

        return result.value || undefined;
    }

    async connect(): Promise<void> {
        const db = await this.createConnection();
        log('successful connection to MongoDB', db.options);

        const collection = this.connectOptions.db?.collection || 'agendaJobs';

        this.collection = db.collection(collection);
        if (log.enabled) {
            log(
                `connected with collection: ${collection}, collection size: ${
                    typeof this.collection.estimatedDocumentCount === 'function'
                        ? await this.collection.estimatedDocumentCount()
                        : '?'
                }`
            );
        }

        if (this.connectOptions.ensureIndex) {
            log('attempting index creation');
            try {
                const result = await this.collection.createIndex(
                    {
                        name: 1,
                        ...this.connectOptions.sort,
                        priority: -1,
                        lockedAt: 1,
                        nextRunAt: 1,
                        disabled: 1
                    },
                    { name: 'findAndLockNextJobIndex' }
                );
                log('index succesfully created', result);
            } catch (error) {
                log('db index creation failed', error);
                throw error;
            }
        }

        this.agenda.emit('ready');
    }

    private async database(url: string, options?: MongoClientOptions) {
        let connectionString = url;

        if (!hasMongoProtocol(connectionString)) {
            connectionString = `mongodb://${connectionString}`;
        }

        const client = await MongoClient.connect(connectionString, {
            ...options
        });

        return client.db();
    }

    private processDbResult<DATA = unknown | void>(
        job: Job<DATA>,
        res?: IJobParameters<DATA>
    ): Job<DATA> {
        log(
            'processDbResult() called with success, checking whether to process job immediately or not'
        );

        // We have a result from the above calls
        if (res) {
            // Grab ID and nextRunAt from MongoDB and store it as an attribute on Job
            job.attrs._id = res._id;
            job.attrs.nextRunAt = res.nextRunAt;

            // check if we should process the job immediately
            this.agenda.emit('processJob', job);
        }

        // Return the Job instance
        return job;
    }

    async saveJobState(job: Job<any>): Promise<void> {
        const id = job.attrs._id;
        const $set = {
            lockedAt: (job.attrs.lockedAt && new Date(job.attrs.lockedAt)) || undefined,
            nextRunAt: (job.attrs.nextRunAt && new Date(job.attrs.nextRunAt)) || undefined,
            lastRunAt: (job.attrs.lastRunAt && new Date(job.attrs.lastRunAt)) || undefined,
            progress: job.attrs.progress,
            failReason: job.attrs.failReason,
            failCount: job.attrs.failCount,
            failedAt: job.attrs.failedAt && new Date(job.attrs.failedAt),
            lastFinishedAt: (job.attrs.lastFinishedAt && new Date(job.attrs.lastFinishedAt)) || undefined
        };

        log('[job %s] save job state: \n%O', id, $set);

        const result = await this.collection.updateOne(
            { _id: id, name: job.attrs.name },
            {
                $set
            }
        );

        if (!result.acknowledged || result.matchedCount !== 1) {
            throw new Error(
                `job ${id} (name: ${job.attrs.name}) cannot be updated in the database, maybe it does not exist anymore?`
            );
        }
    }

    /**
     * Save the properties on a job to MongoDB
     * @name Agenda#saveJob
     * @function
     * @param {Job} job job to save into MongoDB
     * @returns {Promise} resolves when job is saved or errors
     */
    async saveJob<DATA = unknown | void>(job: Job<DATA>): Promise<Job<DATA>> {
        try {
            log('attempting to save a job');

            // Grab information needed to save job but that we don't want to persist in MongoDB
            const id = job.attrs._id;

            // Store job as JSON and remove props we don't want to store from object
            // _id, unique, uniqueOpts
            // eslint-disable-next-line @typescript-eslint/no-unused-vars
            const { _id, unique, uniqueOpts, ...props } = {
                ...job.toJson(),
                // Store name of agenda queue as last modifier in job data
                lastModifiedBy: this.agenda.attrs.name
            };

            log('[job %s] set job props: \n%O', id, props);

            // Grab current time and set default query options for MongoDB
            const now = new Date();
            const protect: Partial<IJobParameters> = {};
            let update: UpdateFilter<IJobParameters> = { $set: props };
            log('current time stored as %s', now.toISOString());

            // If the job already had an ID, then update the properties of the job
            // i.e, who last modified it, etc
            if (id) {
                // Update the job and process the resulting data'
                log('job already has _id, calling findOneAndUpdate() using _id as query');
                const result = await this.collection.findOneAndUpdate(
                    { _id: id, name: props.name },
                    update,
                    { returnDocument: 'after' }
                );
                return this.processDbResult(job, result.value as IJobParameters<DATA>);
            }

            if (props.type === 'single') {
                // Job type set to 'single' so...
                log('job with type of "single" found');

                // If the nextRunAt time is older than the current time, "protect" that property, meaning, don't change
                // a scheduled job's next run time!
                if (props.nextRunAt && props.nextRunAt <= now) {
                    log('job has a scheduled nextRunAt time, protecting that field from upsert');
                    protect.nextRunAt = props.nextRunAt;
                    delete (props as Partial<IJobParameters>).nextRunAt;
                }

                // If we have things to protect, set them in MongoDB using $setOnInsert
                if (Object.keys(protect).length > 0) {
                    update.$setOnInsert = protect;
                }

                // Try an upsert
                log(
                    `calling findOneAndUpdate(${props.name}) with job name and type of "single" as query`,
                    await this.collection.findOne({
                        name: props.name,
                        type: 'single'
                    })
                );
                // this call ensure a job of this name can only exists once
                const result = await this.collection.findOneAndUpdate(
                    {
                        name: props.name,
                        type: 'single'
                    },
                    update,
                    {
                        upsert: true,
                        returnDocument: 'after'
                    }
                );
                log(
                    `findOneAndUpdate(${props.name}) with type "single" ${
                        result.lastErrorObject?.updatedExisting
                            ? 'updated existing entry'
                            : 'inserted new entry'
                    }`
                );
                return this.processDbResult(job, result.value as IJobParameters<DATA>);
            }

            if (job.attrs.unique) {
                // If we want the job to be unique, then we can upsert based on the 'unique' query object that was passed in
                const query: Filter<Omit<IJobParameters<DATA>, 'unique'>> = job.attrs.unique;
                query.name = props.name;
                if (job.attrs.uniqueOpts?.insertOnly) {
                    update = { $setOnInsert: props };
                }

                // Use the 'unique' query object to find an existing job or create a new one
                log('calling findOneAndUpdate() with unique object as query: \n%O', query);
                const result = await this.collection.findOneAndUpdate(query as IJobParameters, update, {
                    upsert: true,
                    returnDocument: 'after'
                });
                return this.processDbResult(job, result.value as IJobParameters<DATA>);
            }

            // If all else fails, the job does not exist yet so we just insert it into MongoDB
            log(
                'using default behavior, inserting new job via insertOne() with props that were set: \n%O',
                props
            );
            const result = await this.collection.insertOne(props);
            return this.processDbResult(job, {
                _id: result.insertedId,
                ...props
            } as IJobParameters<DATA>);
        } catch (error) {
            log('processDbResult() received an error, job was not updated/created');
            throw error;
        }
    }
}