guidesmiths/rascal

View on GitHub
lib/amqp/Broker.js

Summary

Maintainability
A
0 mins
Test Coverage
A
100%
const debug = require('debug')('rascal:Broker');
const format = require('util').format;
const inherits = require('util').inherits;
const EventEmitter = require('events').EventEmitter;
const _ = require('lodash');
const async = require('async');
const tasks = require('./tasks');
const configure = require('../config/configure');
const validate = require('../config/validate');
const fqn = require('../config/fqn');

const preflight = async.compose(validate, configure);
const stub = require('../counters/stub');
const inMemory = require('../counters/inMemory');
const inMemoryCluster = require('../counters/inMemoryCluster').worker;

const maxInterval = 2147483647;

module.exports = {
  create: function create(config, components, next) {
    if (arguments.length === 2) return create(config, {}, arguments[1]);

    const counters = _.defaults({}, components.counters, {
      stub,
      inMemory,
      inMemoryCluster,
    });

    preflight(_.cloneDeep(config), (err, augmentedConfig) => {
      if (err) return next(err);
      new Broker(augmentedConfig, _.assign({}, components, { counters }))._init(next);
    });
  },
};

inherits(Broker, EventEmitter);

function Broker(config, components) {
  const self = this;
  let vhosts = {};
  let publications = {};
  let subscriptions = {};
  let sessions = [];
  const init = async.compose(tasks.initShovels, tasks.initSubscriptions, tasks.initPublications, tasks.initCounters, tasks.initVhosts);
  const nukeVhost = async.compose(tasks.deleteVhost, tasks.shutdownVhost, tasks.nukeVhost);
  const purgeVhost = tasks.purgeVhost;
  const forewarnVhost = tasks.forewarnVhost;
  const shutdownVhost = tasks.shutdownVhost;
  const bounceVhost = tasks.bounceVhost;

  this.config = _.cloneDeep(config);
  this.promises = false;

  this._init = function (next) {
    debug('Initialising broker');
    vhosts = {};
    publications = {};
    subscriptions = {};
    sessions = [];
    init(config, { broker: self, components }, (err) => {
      self.keepActive = setInterval(_.noop, maxInterval);
      setImmediate(() => {
        next(err, self);
      });
    });
  };

  this.connect = function (name, next) {
    if (!vhosts[name]) return next(new Error(format('Unknown vhost: %s', name)));
    vhosts[name].connect(next);
  };

  this.purge = function (next) {
    debug('Purging all queues in all vhosts');
    async.eachSeries(
      _.values(vhosts),
      (vhost, callback) => {
        purgeVhost(config, { vhost }, callback);
      },
      (err) => {
        if (err) return next(err);
        debug('Finished purging all queues in all vhosts');
        next();
      },
    );
  };

  this.shutdown = function (next) {
    debug('Shutting down broker');
    async.eachSeries(
      _.values(vhosts),
      (vhost, callback) => {
        forewarnVhost(config, { vhost }, callback);
      },
      (err) => {
        if (err) return next(err);
        self.unsubscribeAll((err) => {
          if (err) self.emit('error', err);
          async.eachSeries(
            _.values(vhosts),
            (vhost, callback) => {
              shutdownVhost(config, { vhost }, callback);
            },
            (err) => {
              if (err) return next(err);
              clearInterval(self.keepActive);
              debug('Finished shutting down broker');
              next();
            },
          );
        });
      },
    );
  };

  this.bounce = function (next) {
    debug('Bouncing broker');
    self.unsubscribeAll((err) => {
      if (err) self.emit('error', err);
      async.eachSeries(
        _.values(vhosts),
        (vhost, callback) => {
          bounceVhost(config, { vhost }, callback);
        },
        (err) => {
          if (err) return next(err);
          debug('Finished bouncing broker');
          next();
        },
      );
    });
  };

  this.nuke = function (next) {
    debug('Nuking broker');
    self.unsubscribeAll((err) => {
      if (err) self.emit('error', err);
      async.eachSeries(
        _.values(vhosts),
        (vhost, callback) => {
          nukeVhost(config, { vhost, components }, callback);
        },
        (err) => {
          if (err) return next(err);
          vhosts = {};
          publications = {};
          subscriptions = {};
          clearInterval(self.keepActive);
          debug('Finished nuking broker');
          next();
        },
      );
    });
  };

  this.publish = function (name, message, overrides, next) {
    if (arguments.length === 3) return self.publish(name, message, {}, arguments[2]);
    if (_.isString(overrides)) return self.publish(name, message, { routingKey: overrides }, next);
    if (!publications[name]) return next(new Error(format('Unknown publication: %s', name)));
    publications[name].publish(message, overrides, next);
  };

  this.forward = function (name, message, overrides, next) {
    if (arguments.length === 3) return self.forward(name, message, {}, arguments[2]);
    if (_.isString(overrides)) return self.forward(name, message, { routingKey: overrides }, next);
    if (!config.publications[name]) return next(new Error(format('Unknown publication: %s', name)));
    publications[name].forward(message, overrides, next);
  };

  this.subscribe = function (name, overrides, next) {
    if (arguments.length === 2) return self.subscribe(name, {}, arguments[1]);
    if (!subscriptions[name]) return next(new Error(format('Unknown subscription: %s', name)));
    subscriptions[name].subscribe(overrides, (err, session) => {
      if (err) return next(err);
      sessions.push(session);
      next(null, session);
    });
  };

  this.subscribeAll = function (filter, next) {
    if (arguments.length === 1) {
      return self.subscribeAll(() => {
        return true;
      }, arguments[0]);
    }
    const filteredSubscriptions = _.chain(config.subscriptions).values().filter(filter).value();
    async.mapSeries(
      filteredSubscriptions,
      (subscriptionConfig, cb) => {
        self.subscribe(subscriptionConfig.name, (err, subscription) => {
          if (err) return cb(err);
          cb(null, subscription);
        });
      },
      next,
    );
  };

  this.unsubscribeAll = function (next) {
    async.each(
      sessions.slice(),
      (session, cb) => {
        sessions.shift();
        session.cancel(cb);
      },
      next,
    );
  };

  this.getConnections = function () {
    return Object.keys(vhosts).map((name) => {
      return vhosts[name].getConnectionDetails();
    });
  };

  /* eslint-disable-next-line no-multi-assign */
  this.getFullyQualifiedName = this.qualify = function (vhost, name) {
    return fqn.qualify(name, config.vhosts[vhost].namespace);
  };

  this._addVhost = function (vhost) {
    vhosts[vhost.name] = vhost;
  };

  this._addPublication = function (publication) {
    publications[publication.name] = publication;
  };

  this._addSubscription = function (subscription) {
    subscriptions[subscription.name] = subscription;
  };
}