betagouv/service-national-universel

View on GitHub
api/src/queues/cronsQueue.ts

Summary

Maintainability
A
0 mins
Test Coverage
import config from "config";
import { logger } from "../logger";
import { Worker, Queue, Job, RepeatableJob } from "bullmq";
import { capture, captureMessage } from "../sentry";
import CRONS from "../crons";

import { captureCheckIn } from "@sentry/node";
import { MonitorConfig } from "@sentry/types";
import { logStartedTask, logSucceedTask, logFailedTask } from "./taskLoggerService";

const CRONS_QUEUE = `${config.get("TASK_QUEUE_PREFIX")}_crons`;
const CONCURRENCY = 2;

let queue: Queue | null = null;
let worker: Worker | null = null;

export function initQueue(connection) {
  queue = new Queue(CRONS_QUEUE, {
    defaultJobOptions: {
      attempts: 3,
      backoff: {
        type: "exponential",
        delay: 2000,
      },
      removeOnComplete: {
        age: 7 * 24 * 3600, // 1 week
      },
      removeOnFail: {
        age: 30 * 24 * 3600, // 1 month
      },
    },
    connection,
  });
  return queue;
}

export async function scheduleCrons() {
  // @ts-ignore
  const jobs = await queue.getRepeatableJobs();
  const unchangedJobs: RepeatableJob[] = [];
  const outdatedJobs: RepeatableJob[] = [];

  for (const job of jobs) {
    if (CRONS.find((cron) => cron.name === job.name && cron.crontab === job.pattern)) {
      unchangedJobs.push(job);
    } else {
      // cron deleted or updated in code
      outdatedJobs.push(job);
    }
  }

  // Remove outdated Jobs :
  // - cron not declared in CRONS with existing RepeatableJob (deleted)
  // - cron declared in CRONS with existing RepeatableJob but different name & crontab (updated)
  await Promise.all(
    outdatedJobs.map((job) => {
      logger.info(`Deleting outdated cron ${job.name} (${job.pattern})`);
      // @ts-ignore
      return queue.removeRepeatableByKey(job.key);
    }),
  );

  // Add new Jobs :
  // - cron declared in CRONS with no existing RepeatableJob
  const newCrons = CRONS.filter((cron) => !unchangedJobs.some((job) => job.name === cron.name));
  await Promise.all(
    newCrons.map((cron) => {
      logger.info(`Adding new cron ${cron.name} (${cron.crontab})`);
      // @ts-ignore
      return queue.add(
        cron.name,
        {},
        {
          repeat: {
            pattern: cron.crontab,
          },
          jobId: cron.name,
        },
      );
    }),
  );
}

export function initWorker(connection) {
  worker = new Worker(
    CRONS_QUEUE,
    async (job) => {
      const cron = CRONS.find((c) => c.name === job.name);
      if (!cron) {
        throw new Error("CRON not found");
      }
      logStartedTask(job);
      const monitorConfig = {
        schedule: {
          type: "crontab",
          value: cron.crontab,
        },
        timezone: "Etc/UTC",
        checkinMargin: 10,
        maxRuntime: 15,
      } as MonitorConfig;
      const checkInId = captureCheckIn(
        {
          monitorSlug: cron.name,
          status: "in_progress",
        },
        monitorConfig,
      );
      try {
        await Promise.all(cron.handlers.map((h) => h.call()));
        captureCheckIn(
          {
            checkInId,
            monitorSlug: cron.name,
            status: "ok",
          },
          monitorConfig,
        );
      } catch (err) {
        captureCheckIn(
          {
            checkInId,
            monitorSlug: cron.name,
            status: "error",
          },
          monitorConfig,
        );
        throw err;
      }
    },
    { connection, concurrency: CONCURRENCY },
  );
  worker.on("completed", (job) => {
    logSucceedTask(job);
  });

  worker.on("failed", (job: Job, error) => {
    const error_id = capture(error);
    logFailedTask(job, error_id);
  });
  worker.on("error", (err) => captureMessage(err));
  return worker;
}