albanm/nsi-queues

View on GitHub
lib/stomp.js

Summary

Maintainability
B
4 hrs
Test Coverage
var util = require('util');
var EventEmitter = require('events').EventEmitter;

var _ = require('lodash');
var uuid = require('node-uuid');

util.inherits(STOMPQueuesManager, EventEmitter);

function STOMPQueuesManager(client, callback) {
    EventEmitter.call(this);

    var that = this;
    that.client = client;
    that.correlationCallbacks = {};
    that.errorCallbacks = {};
    that.receiptCallbacks = {};

    // generate a random name for the response queue
    that.responseQueue = '/temp-queue/nsi.responses-' + uuid.v4();

    /*that.client.socket.on('error', function(err) {
        console.log(err);
    });*/

    that.client.on('receipt', function(receiptId) {
        if (that.receiptCallbacks[receiptId]) {
            that.receiptCallbacks[receiptId]();
            delete that.receiptCallbacks[receiptId];
        }
    });

    var active = false;
    that.client.on('error', function(err) {
        // the error can be a error frame from a publish or a socket error
        if (err.headers) {
            if (that.errorCallbacks[err.headers['receipt-id']]) {
                that.errorCallbacks[err.headers['receipt-id']](new Error(err.body));
                delete that.errorCallbacks[err.headers['receipt-id']];
            }
        } else {
            // if the connection was not yet considered as ready, return a socket error to the callback
            if (!active) {
                active = true;
                return callback(err);
            }
            // else it is to late to answer to the callback, then emit the error
            that.emit('error', err);
        }
    });

    var onceConnected = function(){
        // Subscribe to a shared response queue for inOut messages
        that.client.subscribe({
            destination: that.responseQueue
        }, function(body, headers) {
            if (that.correlationCallbacks[headers['correlation-id']]) {
                // deserialize the message if it is JSON
                var message = headers['content-type'] === 'application/json' ? JSON.parse(body[0]) : body[0];
                that.correlationCallbacks[headers['correlation-id']](null, message, headers);
                delete that.correlationCallbacks[headers['correlation-id']];
            }
        });
        // when connected we are ok to run the callback
        active = true;
        callback(null, that);
    };

    // already connect
    if (!that.client.socket._connecting) return onceConnected();
    that.client.once('connected', onceConnected);
}

// Publish a message to a queue
STOMPQueuesManager.prototype.to = function(queue, message, headers, callback) {
    var that = this;
    // deal with optional headers argument
    if (typeof headers === 'function') {
        callback = headers;
        headers = {};
    }

    // stomp-js is weird on this point, we have to mix headers and body
    var stompHeaders = _.clone(headers);
    stompHeaders.destination = '/queue/' + queue;
    stompHeaders.persistent = true;
    // deal with message sent as object, it should serialized here then deserialized in from
    if (typeof message === 'object') {
        stompHeaders['content-type'] = 'application/json';
        message = JSON.stringify(message);
    }
    stompHeaders.body = message;

    // send the message with its headers and require a receipt
    that.client.send(stompHeaders, true);

    // stomp-js puts the receipt id in the headers, we can get it by reference
    // expect error event to run callback with error
    that.errorCallbacks[stompHeaders.receipt] = callback;
    that.receiptCallbacks[stompHeaders.receipt] = function() {
        // success, just return the message as received
        callback(null, message, headers);
    };
};

// Publish a message to a queue and declare a callback on the responses queu
STOMPQueuesManager.prototype.inOut = function(queue, message, headers, callback) {
    var that = this;
    // deal with optional headers argument
    if (typeof headers === 'function') {
        callback = headers;
        headers = {};
    }

    // stomp-js is weird on this point, we have to mix headers and body
    headers = _.clone(headers);
    headers.destination = '/queue/' + queue;
    headers.persistent = true;
    // deal with message sent as object, it should serialized here then deserialized in from
    if (typeof message === 'object') {
        headers['content-type'] = 'application/json';
        message = JSON.stringify(message);
    }
    headers.body = message;
    headers['correlation-id'] = uuid.v4();
    headers['reply-to'] = that.responseQueue;

    // send the message with its headers and require a receipt
    that.client.send(headers, true);

    // stomp-js puts the receipt id in the headers, we can get it by reference
    // expect error event to run callback with error
    that.errorCallbacks[headers.receipt] = callback;

    // Prepare waiting for the response message
    that.correlationCallbacks[headers['correlation-id']] = callback;
};

// Subscribe to messages from a queue
STOMPQueuesManager.prototype.from = function(queue, readyCallback, callback) {
    var that = this;
    // ready callback is optional
    if (callback === null) {
        callback = readyCallback;
        readyCallback = null;
    }

    that.client.subscribe({
        destination: '/queue/' + queue,
        ack: 'client'
    }, function(body, headers) {
        // deserialize the message if it is JSON
        var message = headers['content-type'] === 'application/json' ? JSON.parse(body[0]) : body[0];

        // prepare a callback that the user program will call to acknowledge reception of the message
        var responseCallback = function(err, responseMessage, responseHeaders, responseAckCallback) {
            if (err) {
                that.client.nack(headers['message-id']);
            } else {
                that.client.ack(headers['message-id']);
            }

            // deal with optional headers and responseAckCallback arguments
            if (typeof responseHeaders === 'function') {
                callback = responseHeaders;
                responseHeaders = {};
            }
            responseHeaders = responseHeaders || {};

            // Send a response message if requested
            if (headers['reply-to'] && headers['correlation-id']) {
                // stomp-js is weird on this point, we have to mix headers and body
                stompHeaders = _.clone(responseHeaders);
                stompHeaders.destination = headers['reply-to'];
                stompHeaders.persistent = true;
                // deal with message sent as object, it should serialized here then deserialized in from
                if (typeof responseMessage === 'object') {
                    stompHeaders['content-type'] = 'application/json';
                    responseMessage = JSON.stringify(responseMessage);
                }
                stompHeaders.body = responseMessage;

                stompHeaders['correlation-id'] = headers['correlation-id'];

                that.client.send(stompHeaders, !!responseAckCallback);

                // stomp-js puts the receipt id in the headers, we can get it by reference
                // expect error event to run callback with error
                if (responseAckCallback) {
                    that.errorCallbacks[stompHeaders.receipt] = responseAckCallback;
                    that.receiptCallbacks[stompHeaders.receipt] = function() {
                        // success, just return the message as received
                        responseAckCallback(null, responseMessage, responseHeaders);
                    };
                }
            } else {
                if (responseAckCallback) responseAckCallback();
            }
        };
        callback(message, headers, responseCallback);
    });

    if (readyCallback) readyCallback();
};

module.exports = STOMPQueuesManager;