RackHD/on-core

View on GitHub
lib/common/subscription.js

Summary

Maintainability
B
5 hrs
Test Coverage
// Copyright 2015, EMC, Inc.

'use strict';

module.exports = SubscriptionFactory;

SubscriptionFactory.$provide = 'Subscription';
SubscriptionFactory.$inject = [
    'Promise',
    'Logger',
    'Assert'
];

function SubscriptionFactory (Promise, Logger, assert) {
    var logger = Logger.initialize(SubscriptionFactory);

    /**
     * Creates a new subscription to a queue
     * @param queue {string}
     * @constructor
     */
    function Subscription (queue, options) {
        assert.object(queue, 'queue');
        assert.object(options, 'options');

        this.MAX_DISPOSE_RETRIES = 3;
        this.retryDelay = 1000;
        this.queue = queue;
        this.options = options;
        this._disposed = false;
    }

    /**
     * Removes the subscription
     *
     * @returns {Promise}
     */
    Subscription.prototype.dispose = function (attempt, retry) {
        var self = this;
        if (self._disposed && !retry) {
            logger.warning('Subscription dispose was called more than once.', {
                stack: new Error().stack,
                consumerTag: self.options.consumerTag
            });
            return Promise.resolve(true);
        } else {
            self._disposed = true;
        }

        if (attempt === undefined) {
            attempt = 0;
        } else if (attempt >= self.MAX_DISPOSE_RETRIES) {
            logger.error('Subscription failed to dispose with maximum retries.', {
                consumerTag: self.options.consumerTag
            });
            return Promise.reject(new Error('Subscription ' + self.options.consumerTag +
                        ' failed to dispose with maximum retries'));
        }

        attempt += 1;

        if (this.queue.state === 'open') {
            return Promise.resolve().then(function () {
                return self.queue.unsubscribe(self.options.consumerTag);
            }).then(function () {
                return self.queue.destroy();
            }).then(function () {
                return self.queue.close();
            }).then(function () {
                return true;
            }).catch(function(err) {
                logger.error('Subscription failed to dispose, retrying attempt ' + attempt, {
                    error: err
                });
                Promise.delay(self.retryDelay).then(function() {
                    self.dispose(attempt, true);
                });
                throw err;
            });
        } else {
            return Promise.reject(
                    new Error('Attempted to dispose a subscription whose queue state is not open'));
        }
    };

    Subscription.create = function (q, options) {
        return new Subscription(q, options);
    };

    return Subscription;
}