albanm/nsi-queues

View on GitHub
lib/amqp.js

Summary

Maintainability
B
4 hrs
Test Coverage
var util = require('util');
var EventEmitter = require('events').EventEmitter;

var _ = require('lodash');
var uuid = require('node-uuid');

util.inherits(AMQPQueuesManager, EventEmitter);

function AMQPQueuesManager(connection, callback) {
    EventEmitter.call(this);

    var that = this;
    that.connection = connection;
    that.correlationCallbacks = {};

    var active = false;
    connection.on('error', function(err) {
        // if the connection was not yet considered as ready, return an error to the callback
        if (!active) {
            active = true;
            return callback(err);
        }
        // else it is to late to answer to the callback, then emit the error
        that.emit('error', err);
    });

    var onceConnectionReady = function() {
        // Prepare the exchange for simple-integration queues
        that.exchange = that.connection.exchange('nsi.direct', {
            type: 'direct',
            confirm: true
        });

        that.exchange.on('open', function() {
            // generate a random name for the response queue
            that.responseQueue = 'nsi.responses-' + uuid.v4();
            that.connection.queue(that.responseQueue, {
                durable: true,
                exclusive: true,
                autoDelete: true
            }, function(q) {
                q.bind(that.exchange, that.responseQueue);

                // Receive messages
                q.subscribe(function(message, headers, deliveryInfo) {
                    // check if a correlationId matching this response was expected,
                    // if so execute the matching callback
                    if (that.correlationCallbacks[deliveryInfo.correlationId]) {
                        // message can be either the expected object if it was sent as JSON, or an object with keys data and contentType
                        if (message.data && message.contentType) {
                            message = message.data.toString('utf-8');
                        }
                        that.correlationCallbacks[deliveryInfo.correlationId](null, message, headers);
                        delete that.correlationCallbacks[deliveryInfo.correlationId];
                    }
                });

                // The exchange is ready, the response queue is ready, we are ok here
                active = true;
                callback(null, that);
            });
        });
    };

    // the connection can be passed already ready
    if (connection.readyEmitted) return onceConnectionReady();
    // or we should wait for the event
    connection.once('ready', onceConnectionReady);
}

// Publish a message to a queue
AMQPQueuesManager.prototype.to = function(queue, message, headers, publishOptions, callback) {
    var that = this;
    // deal with optional headers and publishOptions argument
    if (typeof headers === 'function') {
        callback = headers;
        headers = {};
    }
    if (typeof publishOptions === 'function') callback = publishOptions;

    // prepare AMQP publish options and content
    var options = _.merge({
        mandatory: true,
        deliveryMode: 2 // persistence delivery
    }, publishOptions);
    options.headers = headers;

    // deal with message sent as object, it should serialized here then deserialized in from
    if (typeof message === 'object') {
        options.contentType = 'application/json';
        message = JSON.stringify(message);
    }

    that.exchange.publish(queue, message, options, function(err) {
        if (err) callback(new Error('Error when publishing to queue ' + queue));
        callback(null, message, headers);
    });
};

// Publish a message to a queue and declare a callback on the responses queu
AMQPQueuesManager.prototype.inOut = function(queue, message, headers, publishOptions, callback) {
    var that = this;
    // deal with optional publishOptions argument
    if (typeof headers === 'function') {
        callback = headers;
        headers = {};
    }
    if (typeof publishOptions === 'function') callback = publishOptions;

    // prepare AMQP publish options and content
    var correlationId = uuid.v4();
    var options = _.merge({
        mandatory: true,
        deliveryMode: 2, // persistence delivery
        correlationId: correlationId,
        replyTo: that.responseQueue
    }, publishOptions);
    options.headers = headers;

    // deal with message sent as object, it should serialized here then deserialized in from
    if (typeof message === 'object') {
        options.contentType = 'application/json';
        message = JSON.stringify(message);
    }

    // Prepare waiting for the response message
    that.correlationCallbacks[correlationId] = callback;

    that.exchange.publish(queue, message, options, function(err) {
        if (err) {
            callback(err);
            delete that.correlationCallbacks[correlationId];
        }
    });
};

// Subscribe to messages from a queue
AMQPQueuesManager.prototype.from = function(queue, readyCallback, callback) {
    var that = this;
    // ready callback is optional
    if (callback === null) {
        callback = readyCallback;
        readyCallback = null;
    }

    that.connection.queue(queue, {
        durable: true,
        autoDelete: false
    }, function(q) {
        q.bind(that.exchange, queue, function() {
            // readyCallback not passed as reference because bind sends a non null object as first parameter that is not an error
            if (readyCallback) readyCallback();
        });

        // Receive messages
        q.subscribe({
            ack: true, // require all messages to be acknowledged to the borker
            prefetchCount: 0 // but do not limit the rate of messages
        }, function(message, headers, deliveryInfo, ack) {
            // prepare a callback that the user program will call to acknowledge reception of the message
            var responseCallback = function(err, responseMessage, responseHeaders, responseAckCallback) {
                ack.acknowledge(!!err, true); // acknowledge reception, reject if err is defined and requeue

                // deal with optional headers and responsAckCallback arguments
                if (typeof responseHeaders === 'function') {
                    callback = responseHeaders;
                    responseHeaders = {};
                }
                responseHeaders = responseHeaders || {};

                // Send a response message if requested
                if (deliveryInfo.replyTo && deliveryInfo.correlationId) {
                    var options = _.merge({
                        mandatory: true,
                        deliveryMode: 2, // persistence delivery
                        correlationId: deliveryInfo.correlationId
                    });
                    options.headers = responseHeaders;

                    // deal with message sent as object, it should be serialized here then deserialized in inOut response
                    if (typeof responseMessage === 'object') {
                        options.contentType = 'application/json';
                        responseMessage = JSON.stringify(responseMessage);
                    }

                    that.exchange.publish(deliveryInfo.replyTo, responseMessage, options, function(err) {
                        if (responseAckCallback) return responseAckCallback(err);
                    });
                } else {
                    if (responseAckCallback) responseAckCallback();
                }
            };
            // message can be either the expected object if it was sent as JSON, or an object with keys data and contentType
            if (message.data && message.contentType) {
                message = message.data.toString('utf-8');
            }

            callback(message, headers, responseCallback);
        });
    });
};

module.exports = AMQPQueuesManager;