jaredhanson/antenna-amqp

View on GitHub
lib/bus.js

Summary

Maintainability
A
3 hrs
Test Coverage
/**
 * Module dependencies.
 */
var EventEmitter = require('events').EventEmitter
  , amqp = require('amqp')
  , uid = require('uid2')
  , util = require('util')
  , Message = require('./message')
  , NoQueueError = require('./errors/noqueueerror')
  , debug = require('debug')('antenna-amqp');

/**
 * Message delivery mode constants.
 */
var NONPERSISTENT_MODE = 1;
var PERSISTENT_MODE = 2;


/**
 * `Bus` constructor.
 *
 * @api public
 */
function Bus() {
  EventEmitter.call(this);
  this._exchange = null;
  this._queue = null;
}

/**
 * Inherit from `EventEmitter`.
 */
util.inherits(Bus, EventEmitter);


/**
 * Connect to AMQP server.
 *
 * For AMQP buses, `options` argument should be an object that specifies the
 * following parameters:
 *
 *   - `port`  port the client should connect to, defaults to 5672
 *   - `host`  host the client should connect to, defaults to localhost
 *
 * Examples:
 *
 *     bus.connect({ host: '127.0.0.1', port: 5672 }, function() {
 *       console.log('ready');
 *     });
 *
 *     bus.connect({ host: '127.0.0.1', port: 5672, exchange: 'amq.fanout' }, function() {
 *       console.log('ready');
 *     });
 *
 *     bus.connect({ host: '127.0.0.1', port: 5672,
 *                   exchange: {
 *                     name: 'app1.topic',
 *                     options: { type: 'topic', confirm: false }
 *                   } },
 *       function() {
 *         console.log('ready');
 *       });
 *
 * @param {Object} options
 * @param {Function} readyListener
 * @api public
 */
Bus.prototype.connect = function(options, readyListener) {
  var listen = (options.listen === undefined) ? true : options.listen;
  
  if (readyListener) { this.once('ready', readyListener); }
  
  debug('connecting %s:%s', options.host || 'localhost', options.port || 5672);
  
  var self = this;
  this._connection = amqp.createConnection(options, { reconnect: false });
  this._connection.once('ready', function() {
    var exchange = options.exchange || 'amq.fanout';

    var name = exchange
      , opts = {};
    if (typeof exchange == 'object') {
      name = exchange.name;
      opts = exchange.options;
    }
    
    // AMQP uses period ('.') separators rather than slash ('/')
    name = name.replace(/\//g, '.');
    if (name.indexOf('amq.') !== 0) {
      opts.type = (opts.type === undefined) ? 'fanout' : opts.type;
    } else {
      // Options for built-in exchanges can not be overridden.
      opts = {};
    }
    
    debug('exchange %s', name);
    self._exchange = self._connection.exchange(name, opts, function(exchange) {
      if (!listen) {
        return self.emit('ready');
      } else {
        var qname = options.queue
          , qopts = {};
        if (typeof queue == 'object') {
          qname = queue.name;
          qopts = queue.options;
        }
        
        qname = qname || ('node-' + uid(16));
        qopts.exclusive = (qopts.exclusive === undefined) ? true : qopts.exclusive;
        
        
        var onQueueError = function(err) {
          return self.emit('error', err);
        };
        
        debug('queue %s', qname);
        var q = self._connection.queue(qname, qopts, function(q) {
          q.removeListener('error', onQueueError);
          
          self.address = qname;
          self._subscribe(qopts, function(err) {
            if (err) { return self.emit('error', err); }
            return self.emit('ready');
          });
        });
        self._queue = q;
        
        q.once('error', onQueueError);
      }
    });
    self._exchange.on('error', function(err) {
      // NOTE: This will occur if an exchange is redeclared with different
      //       properties.
      //
      //       For example, the underlying `amqp` emits an error with the
      //       following properties:
      //         - message: PRECONDITION_FAILED - cannot redeclare exchange
      //                    'foo' in vhost '/' with different type, durable,
      //                    internal or autodelete value
      //         - code: 406
      return self.emit('error', err);
    });
  });
  
  this._connection.on('error', this.emit.bind(this, 'error'));
}

/**
 * Broadcast a message on the bus.
 *
 * In AMQP, publishing a message to an exchange has the effect of enqueuing that
 * message on all queues that are bound to the exchange with a routing key that
 * matches the topic.
 *
 * Examples:
 *
 *     bus.publish('events/on', { timestamp: 0 }, function(err) {
 *       if (err) { throw err; }
 *       ...
 *     });
 *
 *     bus.publish('events/off', { timestamp: 256 });
 *
 * @param {String} topic
 * @param {Mixed} msg
 * @param {Object} options
 * @param {Function} cb
 * @api public
 */
Bus.prototype.broadcast =
Bus.prototype.publish = function(topic, msg, options, cb) {
  if (typeof options == 'function') {
    cb = options;
    options = undefined;
  }
  options = options || {};
  
  // AMQP uses period ('.') separators rather than slash ('/')
  topic = topic.replace(/\//g, '.');
  options.deliveryMode = (options.deliveryMode === undefined) ? NONPERSISTENT_MODE : options.deliveryMode;
  
  if (this._exchange.options && this._exchange.options.confirm) {
    debug('publish %s (confirm)', topic);
    this._exchange.publish(topic, msg, options, function(hadError, err) {
      if (hadError) {
        err = err || new Error('Failed to publish message to topic "' + topic + '"');
        return cb(err);
      }
      return cb();
    });
  } else {
    debug('publish %s', topic);
    this._exchange.publish(topic, msg, options);
    if (cb) { return process.nextTick(cb); }
  }
}

Bus.prototype.direct = function(addr, msg, options, cb) {
  if (typeof options == 'function') {
    cb = options;
    options = undefined;
  }
  options = options || {};
  
  // AMQP uses period ('.') separators rather than slash ('/')
  addr = addr.replace(/\//g, '.');
  options.deliveryMode = (options.deliveryMode === undefined) ? NONPERSISTENT_MODE : options.deliveryMode;
  
  this._connection.publish(addr, msg, options);
  if (cb) { return process.nextTick(cb); }
}


/**
 * Subscribe to messages of `topic` broadcast on the bus.
 *
 * Once subscribed, messages will be delivered and emitted in `message` events.
 * An Antenna application can be registered as a listener for these events,
 * allowing listener processes to be developed in a style similar to that of
 * Express applications.
 *
 * Examples:
 *
 *     bus.subscribe('events/on', function(err) {
 *       if (err) { throw err; }
 *       ...
 *     });
 *
 * @param {String} queue
 * @param {Object} options
 * @param {Function} cb
 * @api public
 */
Bus.prototype.subscribe = function(topic, options, cb) {
  if (typeof options == 'function') {
    cb = options;
    options = undefined;
  }
  options = options || {};
  
  if (!this._queue) { return cb(new NoQueueError('Bus not in listening mode')); }
  
  // AMQP uses period ('.') separators rather than slash ('/')
  topic = topic.replace(/\//g, '.');
  
  var q = this._queue
    , exchange = this._exchange.name;
  
  var onError = function(err) {
    return cb(err);
  };
  
  debug('bind %s %s %s', q.name, exchange, topic);
  q.bind(exchange, topic, function(q) {
    q.removeListener('error', onError);
    return cb();
  });
  
  // NOTE: This will occur if an attempt is made to bind to an exchange that
  //       does not exist.
  //
  //       For example, the underlying `amqp` emits an error with the following
  //       properties:
  //         - message: NOT_FOUND - no exchange 'foo' in vhost '/'
  //         - code: 404
  q.once('error', onError);
}

/**
 * Subscribe to internal queue.
 *
 * @param {Object} options
 * @param {Function} cb
 * @api private
 */
Bus.prototype._subscribe = function(options, cb) {
  if (typeof options == 'function') {
    cb = options;
    options = undefined;
  }
  options = options || {};
  
  var self = this
    , q = this._queue;
    
  var onError = function(err) {
    return cb(err);
  };
    
  debug('subscribe %s', q.name);
  q.subscribe(options, function(message, headers, deliveryInfo) {
    var m = new Message(message, headers, deliveryInfo);
    m.bus = self;
    self.emit('message', m);
  }).addCallback(function(ok) {
    // This callback is invoked when the subscription was successful, and is
    // equivalent to the registering a listener for the queue's `basicConsumeOk`
    // event.
    q.removeListener('error', onError);
    
    // Store the consumerTag so that this queue may also be unsubscribed from.
    self._consumerTag = ok.consumerTag;

    return cb();
  }).addErrback(function(err) {
    // NOTE: Promise errbacks are not properly invoked by the underlying `amqp`
    //       module.  As a workaround, a listener is explicitly registered for
    //       the queue's `error` event.
  });
  
  // NOTE: This will occur if an attempt is made to subscribe to a queue that
  //       already has an exclusive subscription.
  //
  //       For example, the underlying `amqp` emits an error with the following
  //       properties:
  //         - message: ACCESS_REFUSED - queue 'foo' in vhost '/' in exclusive
  //                    use
  //         - code: 403
  q.once('error', onError);
}

/**
 * Desubscribe to messages of `topic` broadcast on the bus.
 *
 *
 * Examples:
 *
 *     bus.unsubscribe('events/on');
 *
 * @param {String} queue
 * @param {Object} options
 * @param {Function} cb
 * @api public
 */
Bus.prototype.unsubscribe = function(topic, options, cb) {
  if (typeof options == 'function') {
    cb = options;
    options = undefined;
  }

  options = options || {};
  
  if (!this._queue) { return cb(new NoQueueError('Bus not in listening mode')); }
  
  // AMQP uses period ('.') separators rather than slash ('/')
  topic = topic.replace(/\//g, '.');
  
  var q = this._queue
    , exchange = this._exchange.name;
  
  var onError = function(err) {
    return cb(err);
  };
  
  debug('unbind %s %s %s', q.name, exchange, topic);
  q.unbind(exchange, topic)
    .on('queueUnbindOk', function(){
      return cb();
    });
}

/**
 * Unsubscribe to an internal queue.
 *
 * @param {Object} options
 * @param {Function} cb
 * @api private
 */
Bus.prototype._unsubscribe = function(options, cb) {
  if (typeof options == 'function') {
    cb = options;
    options = undefined;
  }
  options = options || {};
  
  var self = this
    , q = this._queue;
    
  var onError = function(err) {
    return cb(err);
  };
    
  debug('unsubscribe %s', q.name);
  q.unsubscribe(self._consumerTag);
}

/**
 * Expose `Bus`.
 */
module.exports = Bus;