FunNode/npm-rabbitmq

View on GitHub
index.js

Summary

Maintainability
A
2 hrs
Test Coverage
A
95%
/* global R5 */

module.exports = Rabbitmq;

if (!global.R5) {
  global.R5 = {
    out: console
  };
}

const amqp = require('amqplib');

// Constructors

function Rabbitmq (host, user, pass, vhost = 'development') {
  this.host = host;
  this.user = user;
  this.pass = pass;
  this.vhost = vhost;
  this.connect_retries = 0;
  this.error_timeout = 10000;
  this.consumers = [];
}

// Public Methods

Rabbitmq.prototype = {
  connect: async function (config) {
    this.config = config;
    const url = `amqp://${this.user}:${this.pass}@${this.host}/${this.vhost}`;
    try {
      this.conn = await amqp.connect(url);
    }
    catch (err) {
      if (this.connect_retries++ < 10) {
        R5.out.error(`RabbitMQ connecting (retrying [${this.connect_retries}]): ${err.code}`);
        await delay(this.error_timeout * this.connect_retries);
        return this.connect(config);
      }
      R5.out.error(`RabbitMQ connecting: ${err.stack}`);
      throw err;
    }
    this.ch = await this.conn.createChannel();
    await this.ch.assertExchange(config.exchange_name, 'topic', { durable: false });
    R5.out.log(`RabbitMQ connected to ${this.host}:${config.queue_name}`);
    for (const consumer of this.consumers) {
      await this.bind(consumer, true);
    }
    this.connect_retries = 0;
    const _this = this;
    this.conn.on('close', async function (err) {
      if (err) {
        R5.out.error(`RabbitMQ reconnecting on close`);
        return _this.connect(config);
      }
    });
  },

  disconnect: async function () {
    await this.ch.close();
    await this.conn.close();
  },

  ack: function (msg, message = {}) {
    this.ch.ack(msg);
    R5.out.log(`RabbitMQ ACKD ${message_summary(message)}`);
  },

  // eslint-disable-next-line no-unused-vars
  bind: async function (callback = async (msg, message) => {}, reconnecting = false) {
    await this.ch.assertQueue(this.config.queue_name, { durable: true });
    await this.ch.prefetch(1);

    R5.out.log(`RabbitMQ waiting for messages from #${this.config.queue_name}..`);
    await this.ch.consume(this.config.queue_name, function (msg) {
      let message = parse_json(msg.content.toString());
      return callback(msg, message);
    }, { noAck: false });
    if (!reconnecting) {
      this.consumers.push(callback);
    }
  },

  get: async function () {
    await this.ch.assertQueue(this.config.queue_name, { durable: true });
    const msg = await this.ch.get(this.config.queue_name, { noAck: false });
    let message;
    if (msg) { message = parse_json(msg.content.toString()); }
    return { msg, message };
  },

  send: async function (message) {
    let message_string = JSON.stringify(message);
    await this.ch.assertQueue(this.config.queue_name, { durable: true });
    await this.ch.sendToQueue(this.config.queue_name, Buffer.from(message_string, 'utf8'), {
      persistent: true
    });

    R5.out.log(`RabbitMQ SENT ${message_summary(message)}`);
  }
};

// Private Methods

function json_is_valid (json_str) {
  try {
    return (JSON.parse(json_str) && !!json_str);
  }
  catch (e) {
    return false;
  }
}

function parse_json (str) {
  let message = null;

  if (json_is_valid(str)) {
    message = JSON.parse(str);
    R5.out.log(`RabbitMQ RECV ${message_summary(message)}`);
  }
  else {
    R5.out.error(`RabbitMQ JSON is invalid: ${str}`);
  }

  return message;
}

function message_summary (message) {
  let summary = '';
  if (message.game) {
    summary = `${message.game ? `${message.game}:` : ''}${message.category}:`;
  }
  else if (message.match) {
    summary = `${message.match.settings ? `${message.match.settings.game}:` : ''}`;
    summary += message.category === 'match' ? `${message.match.id}:` : `${message.category}:`;
  }
  summary += message.type;
  summary += message.user && message.user.name ? `:${message.user.name}` : '';
  return summary;
}

function delay (ms) {
  return new Promise((res) => setTimeout(res, ms));
}