noodlefrenzy/node-amqp10

View on GitHub
lib/policies/policy_utilities.js

Summary

Maintainability
C
1 day
Test Coverage
'use strict';

var debug = require('debug')('amqp10:policy:utils'),
    constants = require('../constants'),
    errors = require('../errors'),
    u = require('../utilities');

var WindowPolicies = {
  RefreshAtHalf: function(session) {
    if (session._sessionParams.incomingWindow < (session.policy.windowQuantum / 2)) {
      debug('Refreshing session window by ' + session.policy.windowQuantum + ': ' + session._sessionParams.incomingWindow + ' remaining.');
      session.addWindow(session.policy.windowQuantum);
    }
  },
  RefreshAtEmpty: function(session) {
    if (session._sessionParams.incomingWindow <= 0) {
      debug('Refreshing session window by ' + session.policy.windowQuantum + ': ' + session._sessionParams.incomingWindow + ' remaining.');
      session.addWindow(session.policy.windowQuantum);
    }
  },
  DoNotRefresh: function(session) {
    // Do Nothing
  }
};

module.exports.WindowPolicies = WindowPolicies;

var CreditPolicies = {
  RefreshAtHalf: function(link) {
    if (link.linkCredit < (link.policy.creditQuantum / 2)) {
      debug('Refreshing link ' + link.name + ' credit by ' + link.policy.creditQuantum + ': ' + link.linkCredit + ' remaining.');
      link.addCredits(link.policy.creditQuantum);
    }
  },
  RefreshAtEmpty: function(link) {
    if (link.linkCredit <= 0) {
      debug('Refreshing link ' + link.name + ' credit by ' + link.policy.creditQuantum + ': ' + link.linkCredit + ' remaining.');
      link.addCredits(link.policy.creditQuantum);
    }
  },
  RefreshSettled: function (threshold) {
    return function (link, options) {
      if (link.policy.rcvSettleMode === constants.receiverSettleMode.autoSettle) {
        throw new errors.InvalidStateError('Cannot specify RefreshSettled as link refresh policy when auto-settling messages.');
      }
      var creditQuantum = (!!options && options.initial) ? link.policy.creditQuantum : link.settledMessagesSinceLastCredit;
      if (creditQuantum > 0 && link.linkCredit < threshold) {
        debug('Refreshing link ' + link.name + ' credit by ' + creditQuantum + ': ' + link.linkCredit + ' remaining.');
        link.addCredits(creditQuantum);
      }
    };
  },
  DoNotRefresh: function(link) {
    // Do Nothing
  }
};

module.exports.CreditPolicies = CreditPolicies;

/**
 * Defines the behavior of the return value of `SenderLink.send`
 * @enum
 */
var SenderCallbackPolicy = {
  /** Callback immediately after sending, no promise is created */
  None: 'none',
  /** Only callback when settled Disposition received from recipient */
  OnSettle: 'settled',
  /** Callback as soon as sent, will not call-back again if future disposition
   *  results in error. */
  OnSent: 'sent',
};

module.exports.SenderCallbackPolicies = SenderCallbackPolicy; // deprecated
module.exports.SenderCallbackPolicy = SenderCallbackPolicy;

function fixDeprecatedLinkOptions(policy) {
  if (policy && policy.attach) {
    if (policy.attach.hasOwnProperty('senderSettleMode')) {
      policy.attach.sndSettleMode = policy.attach.senderSettleMode;
      delete policy.attach.senderSettleMode;
    }

    if (policy.attach.hasOwnProperty('receiverSettleMode')) {
      policy.attach.rcvSettleMode = policy.attach.receiverSettleMode;
      delete policy.attach.receiverSettleMode;
    }
  }
}

module.exports.fixDeprecatedLinkOptions = fixDeprecatedLinkOptions;

function merge(newPolicy, base) {
  var policy = u.deepMerge(newPolicy, base);
  fixDeprecatedLinkOptions(policy.senderLink);
  fixDeprecatedLinkOptions(policy.receiverLink);
  return policy;
}

module.exports.Merge = merge;

// Receiver links process messages N at a time, only renewing credits on ack.
module.exports.RenewOnSettle = function(initialCredit, threshold, basePolicy) {
  basePolicy = basePolicy || {};

  return merge({
    receiverLink: {
      credit: CreditPolicies.RefreshSettled(threshold),
      creditQuantum: initialCredit,
      attach: {
        rcvSettleMode: constants.receiverSettleMode.settleOnDisposition
      }
    }
  }, basePolicy);
};