mbroadst/amqp10-rpc

View on GitHub
lib/rpc-client.js

Summary

Maintainability
D
2 days
Test Coverage
'use strict';
var Promise = require('bluebird'),
    uuid = require('uuid'),
    errors = require('./errors'),
    u = require('./utilities');

function RpcClient(client, options) {
  options = options || {};
  this._client = client;
  this._logger = options.logger;
  this._receiver = null;
  this._sender = null;
  this._requests = {};
  this._timeout = options.timeout || 5000;
  this._responseLinkParameters = !!options.responseAddress ?
    [ options.responseAddress ] : [ null, { attach: { source: { dynamic: true } } } ];

  if (options.hasOwnProperty('interceptor') && typeof options.interceptor === 'function') {
    this._interceptor = options.interceptor;
  }
}

/**
 * Connect to a broker creating a dynamic link for responses
 */
RpcClient.prototype.connect = function(address, options) {
  options = options || {};
  var self = this;
  return Promise.all([
    self._client.createReceiver.apply(self._client, self._responseLinkParameters),
    self._client.createSender(address, options)
  ])
  .spread(function(receiver, sender) {
    self._receiver = receiver;
    self._sender = sender;

    receiver.on('message', self._processMessage.bind(self));
    receiver.on('errorReceived', function(err) {
      var _keys = Object.keys(self.requests), _len = _keys.length;
      for (var i = 0; i < _len; ++i) {
        self._requests[_keys[i]].reject(err);
        delete self._requests[_keys[i]];
      }
    });
  });
};

/**
 * Make an rpc call to a remote endpoint
 *
 * @params method   the method to call on the remote endpoint
 * @param params    parameters to call the method with remotely
 */
RpcClient.prototype.call = function(method, params) {
  var correlator = uuid.v4().replace(/-/g, '');
  var request = {
    properties: {
      replyTo: this._receiver.remote.attach.source.address,
      correlationId: correlator
    },
    body: {
      method: method
    }
  };

  // add timeout as TTL, if it exists
  if (!!this._timeout) {
    request.header = {};
    request.header.ttl = this._timeout;
  }

  // support call with raw request objects { method: 'method', params: [] }
  if (u.isPlainObject(method) && method.hasOwnProperty('method') ||
      Array.isArray(method)) {
    request.body = method;
    return this._sendRequest(correlator, request);
  }

  if (arguments.length > 2) {
    var args = new Array(arguments.length);
    for (var i = 0, ii = args.length; i < ii; ++i) args[i] = arguments[i];
    request.body.params = args.slice(1);
  } else if (!!params) {
    request.body.params = u.isPlainObject(params) ? params : [ params ];
  }

  return this._sendRequest(correlator, request);
};

/**
 * Make an rpc call without a correlation id
 *
 * @param {String|Object} methodOrRequest the method or raw request to call on the remote endpoint
 * @param {Array|Object}  paramsOrOverrides parameters to call the method with remotely, or link overrides
 */
RpcClient.prototype.notify = function(method, params) {
  // support call with raw request objects { method: 'method', params: [] }
  if (u.isPlainObject(method) && method.hasOwnProperty('method') ||
      Array.isArray(method)) {
    var notification = {};
    if (u.isPlainObject(params)) notification = params;
    notification.body = method;

    if (notification.hasOwnProperty('properties') && !!notification.properties.replyTo) {
      throw new errors.BadRequestError('notify must not have a replyTo');
    }

    return this._sender.send(notification);
  }

  var request = { body: { method: method } };
  if (arguments.length > 2) {
    var args = new Array(arguments.length);
    for (var i = 0, ii = args.length; i < ii; ++i) args[i] = arguments[i];
    request.body.params = args.slice(1);
  } else if (!!params) {
    request.body.params = u.isPlainObject(params) ? params : [ params ];
  }

  return this._sender.send(request);
};

// private api
RpcClient.prototype._sendRequest = function(correlator, request) {
  if (!!this._interceptor) {
    var shouldContinue = this._interceptor(this, correlator, request);
    if (!shouldContinue) return;
  }

  var self = this;
  return new Promise(function(resolve, reject) {
    self._requests[correlator] = { resolve: resolve, reject: reject };
    self._requests[correlator].timeoutId = setTimeout(function() {
      if (self._requests.hasOwnProperty(correlator)) {
        self._requests[correlator].reject(new errors.RequestTimeoutError());
        delete self._requests[correlator];
      }
    }, self._timeout);

    self._sender.send(request)
      .catch(function(err) {
        clearTimeout(self._requests[correlator].timeoutId);
        delete self._requests[correlator];
        reject(err);
      });
  });
};

RpcClient.prototype._processMessage = function(message) {
  var correlationId = message.properties.correlationId;
  if (correlationId === undefined || correlationId === null) {
    this._logger.error({
      message: 'message lacks correlation-id',
      data: message
    });

    return;
  }

  if (!this._requests.hasOwnProperty(correlationId)) {
    this._logger.error({
      message: 'invalid correlation-id: ' + correlationId,
      data: message
    });

    return;
  }

  var request = this._requests[correlationId];

  // disable timeout check if necessary
  if (request.hasOwnProperty('timeoutId')) {
    clearTimeout(request.timeoutId);
  }

  if (Array.isArray(message.body)) {  // batch response?
    var response = message.body.map(function(r) {
      return r.hasOwnProperty('result') ? r.result :
              r.hasOwnProperty('error') ? r.error : undefined;
    });

    // @todo: what do we do with interleaved errors?
    request.resolve(response);
  } else if (message.body.hasOwnProperty('result')) {
    request.resolve(message.body.result);
  } else if (message.body.hasOwnProperty('error')) {
    request.reject(errors.wrapProtocolError(message.body.error));
  } else {
    // invalid message - maybe should reject with a custom error?
    request.reject(message);
  }

  delete this._requests[correlationId];
};

module.exports = RpcClient;