src/lib/agenda.ts
import path from 'path';
import { Agenda, Job } from 'agenda';
import config from 'config';
import { logger } from './logger';
import { JobService } from '../app/common/agenda/job-service';
type JobConfig = {
name: string;
file: string;
interval: string;
data: unknown;
options: unknown;
};
const registerJobs = (agenda: Agenda) => {
const jobs = config.get<JobConfig[]>('agenda.jobs');
logger.info(`Registering ${jobs.length} job(s)...`);
return Promise.all(
jobs.map((jobConfig: JobConfig) => registerJob(agenda, jobConfig))
);
};
const registerJob = async (agenda: Agenda, jobConfig: JobConfig) => {
logger.info(`Registering job: ${jobConfig.name}`);
const { default: Service } = await import(path.posix.resolve(jobConfig.file));
const jobService: JobService = new Service();
agenda.define(jobConfig.name, jobConfig.options ?? {}, (job: Job) => {
logger.debug('Running job', { job: jobConfig.name });
jobService
.run(job)
.catch((err) => {
logger.error('Error running job', { job: jobConfig.name }, err);
// Ignore any errors
return Promise.resolve();
})
.then(() => {
logger.debug('Job complete', { job: jobConfig.name });
});
});
};
const scheduleJobs = (agenda: Agenda) => {
const jobsToSchedule: JobConfig[] = config
.get<JobConfig[]>('agenda.jobs')
.filter((job: JobConfig) => job.interval);
logger.info(`Scheduling ${jobsToSchedule.length} job(s)...`);
return Promise.all(
jobsToSchedule.map((job) => {
logger.info(`Scheduling job: ${job.name} [${job.interval}]`);
return agenda.every(
job.interval,
job.name,
job.data ?? null,
job.options ?? {}
);
})
);
};
export const init = async () => {
if (config.get('agenda.enabled') !== true) {
// agenda must be enabled explicitly in the config
return;
}
logger.info('Initializing Agenda.js...');
const agenda = new Agenda({
db: { address: config.get<string>('db.admin') }
});
await registerJobs(agenda);
agenda.on('ready', async () => {
await agenda.start();
await scheduleJobs(agenda);
logger.info('Agenda.js ready...');
});
};