KurtPattyn/kimbu

View on GitHub
lib/dispatch_channel.js

Summary

Maintainability
A
3 hrs
Test Coverage
"use strict";

var util = require("util");
var assert = require("assert");
var Channel = require("./channel");
var utils = require("./utils");
var MessageDistributor = require("./endpoint").MessageDistributor;
var MessageRecipient = require("./endpoint").MessageRecipient;
var Message = require("./message");

/**
 * <p>The DispatchChannel class implements a channel that dispatches the messages
 * to interested consumers.</p>
 * <p>Two messaging patterns are implemented:
 * <li>**Request-reply pattern**: a request is sent, a consumer picks it up, replies to the request
 * and the reply is returned to the caller (RPC-like).</li>
 * <li>**Fanout pattern**: a message is published and zero or more listeners receive the message.
 * There is no reply.</li></p>
 * <p>A DispatchChannel is normally not created directly, but through a call to
 * {@link Transport#dispatchChannel}.</p>
 *
 * @class
 * @constructor
 * @param {!String} distributorName - The name of the distributor endpoint.
 * @param {!String} replyRecipientName - Name of the MessageRecipient where replies to requests will be delivered.
 * @param {!Transport} provider - The message bus transport provider.
 * @public
 * @extends {Channel}
 * @see MessageDistributor
 * @see MessageRecipient
 */
function DispatchChannel(distributorName, replyRecipientName, provider) {
  assert(util.isString(distributorName), "Must provide a valid distributor name");
  assert(util.isString(replyRecipientName), "Must provide a valid reply recipient name");
  assert(!util.isNullOrUndefined(provider), "Must provide a valid message bus transport provider");
  assert(!util.isNullOrUndefined(provider.constructor.super_),
    "Must provide a valid message bus transport provider");

  //Not using instanceof because we need to require the Transport file
  //and this leads to circular include references
  assert.strictEqual(provider.constructor.super_.name, "Transport");
  Channel.call(this, provider);

  this._dispatchQueue = new MessageDistributor(distributorName, provider);
  this._replyQueue = new MessageRecipient(replyRecipientName, provider);
  this._id = 0;
}

util.inherits(DispatchChannel, Channel);


/**
 * @override
 * @inheritDoc
 */
DispatchChannel.prototype.start = function(callback) {
  const self = this;

  this._replyQueue.consume({}, function(msg, next) {
    self.emit("reply:" + msg.options.correlationId, msg.parameters, next);
  }, callback);
};

/**
 * @override
 * @inheritDoc
 */
DispatchChannel.prototype.stop = function(callback) {
  const self = this;

  this._replyQueue.cancelConsume(function(err) {
    /* istanbul ignore if */
    if (err) {
      callback(err);
    } else {
      self._dispatchQueue.stop(callback);
    }
  });
};

/**
 * Called when a request has finished.
 *
 * @callback RequestCallback
 * @param {?Error} error - the error that occurred; null if there was no error
 * @param {?Object|Array|Number|String|Boolean|Date} reply - the reply of the request.
 * @public
 * @static
 */

/**
 * Sends a request over the channel. The reply to the request is delivered through the supplied callback.
 * This is an RPC-like call, aka request-response.
 *
 * @param {!String} cmd - The command to execute.
 * @param {!Object|Array|Number|String|Boolean|Date} parameters - A valid Javascript type used as parameter(s) to the command.
 * @param {!Object} options - Message options for the request.
 * @param {!RequestCallback} callback - Called when the request is finished.
 *
 * @final
 */
DispatchChannel.prototype.request = function request(cmd, parameters, options, callback) {
  assert(util.isString(cmd), "cmd must be a string");
  assert(cmd.length > 0, "cmd must not be an empty string");
  assert(!util.isNullOrUndefined(parameters), "parameters must be a valid (possible empty) object");
  assert.ok(util.isObject(options), "'options' must be a valid object (possibly empty)");
  assert(util.isFunction(callback), "callback must be a valid function");

  const self = this;
  let msgOptions = {
    replyTo: this._replyQueue._name,
    correlationId: process.pid + "." + (++this._id),
    type: "command"
  };

  msgOptions = utils.extend(msgOptions, options);

  self.once("reply:" + msgOptions.correlationId, function(reply, next) {
    const answer = reply.result;
    const error = reply.error ? new Error(reply.error) : null;

    setImmediate(callback.bind(self, error, answer));
    next();
  });
  const message = new Message(cmd, parameters, msgOptions);

  self._dispatchQueue.publish(message, function(err) {
    /* istanbul ignore next */
    if (err) {
      self.removeListener("reply:" + msgOptions.correlationId, callback);
      callback(err);
    }
  });
};

/**
 * Publishes an event over the channel. A publication does not receive a reply as there can be many subscribers.
 * This is a pub-sub style of messaging.
 *
 * @param {!String} eventName - The event to publish.
 * @param {!Object|Array|Number|String|Boolean|Date} parameters - A valid Javascript type used as parameter(s) to the event.
 * @param {!Object} options - Message options for the event.
 * @param {ErrorCallback} [callback] - Called when the publication succeeds.
 *
 * @final
 */
DispatchChannel.prototype.publish = function publish(eventName, parameters, options, callback) {
  assert(util.isString(eventName), "eventName must be a string");
  assert(eventName.length > 0, "eventName must not be an empty string");
  assert(!util.isNullOrUndefined(parameters));
  assert.ok(util.isObject(options), "'options' must be a valid object (possibly empty)");
  assert(!callback || util.isFunction(callback), "callback must be a valid function");

  let msgOptions = {
    type: "event"
  };

  msgOptions = utils.extend(msgOptions, options);

  const cb = callback || function() {};
  const msg = new Message(eventName, parameters, msgOptions);

  this._dispatchQueue.publish(msg, function(err) {
    cb(err);
  });
};

/**
 * Purges any outstanding messages from the dispatch channel.
 *
 * @param {Function} [callback] - called when the messages have been purged (successfully or not)
 * @final
 */
DispatchChannel.prototype.purge = function purge(callback) {
  assert(!callback || util.isFunction(callback), "callback must be a valid function");

  this._replyQueue.purge(callback);
};

module.exports = DispatchChannel;