KurtPattyn/kimbu

View on GitHub
lib/consume_channel.js

Summary

Maintainability
B
5 hrs
Test Coverage
"use strict";

var assert = require("assert");
var util = require("util");
var EventEmitter = require("events").EventEmitter;
var konsol = require("konsol")("kimbu");
var Message = require("./message");
var MessageRecipient = require("./endpoint").MessageRecipient;
var MessageDistributor = require("./endpoint").MessageDistributor;
var Channel = require("./channel");

/**
 * Callback to call when the command has been processed.
 *
 * @callback CommandNextCallback
 * @param {?Error|Object|Array|Number|String|Boolean|Date} parameters - the parameters supplied to the request.
 * @param {?ErrorCallback} callback - An optional callback to receive an indication if the reply was correctly processed.
 */

/**
 * Called when a command is received.
 *
 * @callback CommandCallback
 * @param {Object|Array|Number|String|Boolean|Date} parameters - the parameters supplied to the request.
 * @param {!CommandNextCallback} next - callback to call when the command has been processed.
 */

/**
 * Callback to call when the event has been processed.
 *
 * @callback EventNextCallback
 * @param {?ErrorCallback} callback - An optional callback to receive an indication if the reply was correctly processed.
 */

/**
 * Called when an event is received.
 *
 * @callback EventCallback
 * @param {Object|Array|Number|String|Boolean|Date} parameters - the parameters supplied to the event.
 * @param {!EventNextCallback} next - callback to call when the command has been processed.
 */

/**
 * The ConsumeChannel class implements a channel that receives messages from a
 * {@link MessageDistributor} and that can return replies from received messages.
 *
 * @param {!String} recipientName - name of the recipient endpoint (see {@link MessageRecipient}).
 * @param {!String} distributorName - name of the distributor endpoint (see {@link MessageDistributor}).
 * @param {!Transport} provider - The message bus transport provider.
 *
 * @class
 * @extends {Channel}
 * @see MessageDistributor
 * @see MessageRecipient
 */
function ConsumeChannel(recipientName, distributorName, provider) {
  assert(util.isString(recipientName), "Must provide a valid recipient name");
  assert(util.isString(distributorName), "Must provide a valid distributor name");
  assert(!util.isNullOrUndefined(provider), "Must provide a valid message bus transport provider");
  assert(!util.isNullOrUndefined(provider.constructor.super_),
    "Must provide a valid message bus transport provider");

  //Not using instanceof because we need to require the Transport file
  //and this leads to circular include references
  assert.strictEqual(provider.constructor.super_.name, "Transport");
  Channel.call(this, provider);

  this._consumeQueue = new MessageRecipient(recipientName, provider);
  this._dispatchQueue = new MessageDistributor(distributorName, provider);
}

util.inherits(ConsumeChannel, Channel);

/**
 * @override
 * @inheritDoc
 */
ConsumeChannel.prototype.start = function(cb) {
  const self = this;

  this._consumeQueue.consume({}, function(msg, next) {
    const msgName = /*msg.options.type + ":" +*/ msg.name;

    if (EventEmitter.listenerCount(self, msgName) > 0) {
      /* istanbul ignore next */
      const options = msg.options || { options: { headers: {}}};

      self.emit(msgName, msg.parameters, function(reply, callback) {
        if (msg.options.type === "command") {
          let content;

          if (reply instanceof Error) {
            content = { error: reply.message };
          } else {
            content = {result: reply};
          }

          const msgOptions = {
            correlationId: msg.options.correlationId,
            type: "reply"
          };
          const replyMsg = new Message("reply", content, msgOptions);
          const replyQueue = new MessageRecipient(msg.options.replyTo, self._provider);

          replyQueue.publish(replyMsg, function(err) {
            next(); //acknowledge
            /* istanbul ignore next */
            if (callback) {
              callback(err);
            }
          });
        } else {
          next(); //acknowledge
          /* istanbul ignore next */
          if (callback) {
            callback();
          }
        }
      }, options.headers);
    } else {
      next(); //acknowledge
      konsol.error("Received message %j with no defined handler.", msg);
      /* istanbul ignore else */
      if (msg.options.type === "command") {
        const msgOptions = {
          correlationId: msg.options.correlationId,
          type: "reply"
        };
        const content = { error: "Message could not be handled by receiver" };
        const replyMsg = new Message("reply", content, msgOptions);

        const replyQueue = new MessageRecipient(msg.options.replyTo, self._provider);

        replyQueue.publish(replyMsg, function(/* err */) {});
      }
    }
  }, cb);
};

/**
 * @override
 * @inheritDoc
 */
ConsumeChannel.prototype.stop = function(cb) {
  const self = this;

  this._consumeQueue.cancelConsume(function(err) {
    /* istanbul ignore if */
    if (err) {
      cb(err);
    } else {
      self._dispatchQueue.stop(cb);
    }
  });
};

/**
 * Subscribes the channel to messages with the given name. When a message with the given name is received,
 * the supplied callback is invoked with the originating parameters (see {@link DispatchChannel#request} and
 * {@link DispatchChannel#publish}).
 *
 * @param {!String} msgName - the name of the message to subscribe to.
 * @param {!CommandCallback|EventCallback} callback - callback to call when a message with the given msgName is received.
 */
ConsumeChannel.prototype.on = function(msgName, callback) {
  const self = this;

  EventEmitter.prototype.on.call(self, msgName, callback);
  this._provider.bind(this._consumeQueue._name, this._dispatchQueue._name, msgName, function(err) {
    /* istanbul ignore next */
    if (err) {
      self.removeListener(msgName, callback);
    }
  });
};

/**
 * Unsubscribes the consume channel from messages with the given name.
 *
 * @param {!String} msgName - Name of the messages to stop watching.
 */
ConsumeChannel.prototype.off = function(msgName) {
  const self = this;

  this._provider.unbind(this._consumeQueue._name, this._dispatchQueue._name,
    msgName, function(ignore) {
      self.removeAllListeners(msgName);
    });
};

/**
 * Purges any outstanding messages from the consume channel.
 *
 * @param {Function} [callback] - called when the messages have been purged (successfully or not)
 * @final
 */
ConsumeChannel.prototype.purge = function purge(callback) {
  assert(!callback || util.isFunction(callback), "callback must be a valid function");

  this._consumeQueue.purge(callback);
};

module.exports = ConsumeChannel;