leahciMic/scraperjs-queue-bull

View on GitHub
src/queue-bull.js

Summary

Maintainability
A
1 hr
Test Coverage
const bull = require('bull');
const redis = require('redis');
const nodeSha1 = require('node-sha1');
const { promisify } = require('util');

const REDIS_PORT = process.env.REDIS_PORT || 6379;
const REDIS_HOST = process.env.REDIS_HOST || '127.0.0.1';

const redisClient = redis.createClient({
  host: REDIS_HOST,
  port: REDIS_PORT,
});

const KEY_PREFIX = 'scraper.js-queue-bull-url-cache';

function createBullQueue(name, { expiry = 604800 } = {}) {
  const queue = bull(name, `redis://${REDIS_HOST}:${REDIS_PORT}`);

  return {
    process(fn) {
      queue.process(job => fn(job.data, job));
    },
    async empty() {
      await promisify(redisClient.del.bind(redisClient))(`${KEY_PREFIX}:${name}`);
      return queue.empty();
    },
    async add(queueItem) {
      const {
        expiry: itemExpiry,
        priority = 10,
        attempts = 3,
        backoff = {
          type: 'exponential',
          delay: 3600000,
        },
        removeOnComplete = true,
      } = queueItem;

      if (itemExpiry) {
        // eslint-disable-next-line no-console
        console.warn('scraper.js-queue-bull: Setting an expiry on an individual queueItem is no longer accepted. The entire queue must share an expiry');
      }

      // @todo was doing: removing options paramater and combining it with the queueItem

      const key = `${KEY_PREFIX}:${name}`;
      const saved = await promisify(redisClient.sadd.bind(redisClient))(key, nodeSha1(queueItem.url));

      if (saved === 1) {
        await queue.add(queueItem, {
          priority,
          attempts,
          backoff,
          removeOnComplete,
        });
        await promisify(redisClient.expire.bind(redisClient))(key, Math.ceil(expiry / 1000));
        return true;
      }

      return false;
    },
    count() {
      return queue.count();
    },
    close() {
      return queue.close();
    },
  };
}

module.exports = {
  createQueue: createBullQueue,
};