lib/policies/policy.js
'use strict';
var url = require('url'),
constants = require('../constants'),
putils = require('./policy_utilities'),
u = require('../utilities');
function containerName() {
return function() { return 'conn' + Date.now(); };
}
function linkName(prefix) {
var id = 1;
var pre = prefix;
return function() {
return pre + (id++);
};
}
/**
* The default policy for amqp10 clients
*
* @class
* @param {object} overrides override values for the default policy
*/
function Policy(overrides) {
if (!(this instanceof Policy))
return new Policy(overrides);
u.defaults(this, overrides, {
/**
* support subjects in link names with the following characteristics:
* receiver: "amq.topic/news", means a filter on the ReceiverLink will be made
* for messages send with a subject "news"
*
* sender: "amq.topic/news", will automatically set "news" as the subject for
* messages sent on this link, unless the user explicitly overrides
* the subject.
*
* @name Policy#defaultSubjects
* @property {boolean}
*/
defaultSubjects: true,
/**
* Options related to the reconnect behavior of the client. If this value is `null` reconnect
* is effectively disabled
*
* @name Policy#reconnect
* @type {Object|null}
* @property {number|null} [retries] How many times to attempt reconnection
* @property {string} [strategy='fibonacci'] The algorithm used for backoff. Can be `fibonacci` or `exponential`
* @property {boolean} [forever] Whether or not to attempt reconnection forever
*/
reconnect: {
retries: 10,
strategy: 'fibonacci', // || 'exponential'
forever: true
},
/**
* @name Policy#connect
* @type {object}
* @property {object} options Options passed into the open performative on initial connection
* @property {string|function} options.containerId The id of the source container
* @property {string} options.hostname The name of the target host
* @property {number} options.maxFrameSize The largest frame size that the sending peer is able to accept on this connection
* @property {number} options.channelMax The channel-max value is the highest channel number that can be used on the connection
* @property {number} options.idleTimeout The idle timeout required by the sender
* @property {array<string>|null} options.outgoingLocales A list of the locales that the peer supports for sending informational text
* @property {array<string>|null} options.incomingLocales A list of locales that the sending peer permits for incoming informational text
* @property {array<string>|null} options.offeredCapabilities A list of extension capabilities the peer may use if the sender offers them
* @property {array|null} options.desiredCapabilities The desired-capability list defines which extension capabilities the sender may use if the receiver offers them
* @property {object|null} options.properties The properties map contains a set of fields intended to indicate information about the connection and its container
* @property {object} sslOptions Options used to initiate a TLS/SSL connection, with the exception of the following options all options in this object are passed directly to node's [tls.connect](https://nodejs.org/api/tls.html#tls_tls_connect_options_callback) method.
* @property {string|null} sslOptions.keyFile Path to the file containing the private key for the client
* @property {string|null} sslOptions.certFile Path to the file containing the certificate key for the client
* @property {string|null} sslOptions.caFile Path to the file containing the trusted cert for the client
* @property {boolean} sslOptions.rejectUnauthorized
* @property {string|null} saslMechanism Allows the sasl mechanism to be overriden by policy
*/
connect: {
options: {
containerId: containerName(),
hostname: 'localhost',
maxFrameSize: constants.defaultMaxFrameSize,
channelMax: constants.defaultChannelMax,
idleTimeout: constants.defaultIdleTimeout,
outgoingLocales: constants.defaultOutgoingLocales,
incomingLocales: constants.defaultIncomingLocales,
offeredCapabilities: null,
desiredCapabilities: null,
properties: {},
},
sslOptions: {
keyFile: null,
certFile: null,
caFile: null,
rejectUnauthorized: false
},
saslMechanism: null
},
/**
* @name Policy#session
* @type {object}
* @property {object} options Options passed into the `begin` performative on session start
* @property {number} options.nextOutgoingId The transfer-id to assign to the next transfer frame
* @property {number} options.incomingWindow The maximum number of incoming transfer frames that the endpoint can currently receive
* @property {number} options.outgoingWindow The maximum number of outgoing transfer frames that the endpoint can currently send
* @property {function} window A function used to calculate how/when the flow control window should change
* @property {number} windowQuantum Quantum used in predefined window policies
* @property {boolean} enableSessionFlowControl Whether or not session flow control should be performed at all
* @property {object|null} reestablish=null Whether the session should attempt to reestablish when ended by the broker
*/
session: {
options: {
nextOutgoingId: constants.session.defaultOutgoingId,
incomingWindow: constants.session.defaultIncomingWindow,
outgoingWindow: constants.session.defaultOutgoingWindow
},
window: putils.WindowPolicies.RefreshAtHalf,
windowQuantum: constants.session.defaultIncomingWindow,
enableSessionFlowControl: true,
reestablish: null
},
/**
* @name Policy#senderLink
* @type {object}
* @property {object} attach Options passed into the `attach` performative on link attachment
* @property {string|function} attach.name This name uniquely identifies the link from the container of the source to the container of the target node
* @property {string|boolean} attach.role The role being played by the peer
* @property {string|number} attach.sndSettleMode The delivery settlement policy for the sender
* @property {number} attach.maxMessageSize The maximum message size supported by the link endpoint
* @property {number} attach.initialDeliveryCount This must not be null if role is sender, and it is ignored if the role is receiver.
* @property {string} callback Determines when a send should call its callback ('settle', 'sent', 'none')
* @property {function|null} encoder=null The optional encoder used for all outgoing sends
* @property {boolean|null} reattach=null Whether the link should attempt reattach on detach
*/
senderLink: {
attach: {
name: linkName('sender'),
role: constants.linkRole.sender,
sndSettleMode: constants.senderSettleMode.mixed,
maxMessageSize: 0,
initialDeliveryCount: 1
},
callback: putils.SenderCallbackPolicies.OnSettle,
encoder: null,
reattach: null
},
/**
* @name Policy#receiverLink
* @type {object}
* @property {object} attach Options passed into the `attach` performative on link attachment
* @property {string|function} attach.name This name uniquely identifies the link from the container of the source to the container of the target node
* @property {boolean} attach.role The role being played by the peer
* @property {number|string} attach.rcvSettleMode The delivery settlement policy for the receiver
* @property {number} attach.maxMessageSize The maximum message size supported by the link endpoint
* @property {number} attach.initialDeliveryCount This must not be null if role is sender, and it is ignored if the role is receiver.
* @property {function} credit A function that determines when (if ever) to refresh the receiver link's credit
* @property {number} creditQuantum Quantum used in pre-defined credit policy functions
* @property {function|null} decoder=null The optional decoder used for all incoming data
* @property {boolean|null} reattach=null Whether the link should attempt reattach on detach
*/
receiverLink: {
attach: {
name: linkName('receiver'),
role: constants.linkRole.receiver,
rcvSettleMode: constants.receiverSettleMode.autoSettle,
maxMessageSize: 10000, // Arbitrary choice
initialDeliveryCount: 1
},
credit: putils.CreditPolicies.RefreshAtHalf,
creditQuantum: 100,
decoder: null,
reattach: null
},
});
putils.fixDeprecatedLinkOptions(this.senderLink);
putils.fixDeprecatedLinkOptions(this.receiverLink);
return this;
}
/**
* Parses a link address used for creating Sender and Receiver links.
*
* The resulting object has a required `name` property (used as the source
* address in the attach performative), as well as an optional `subject` property
* which (if specified) will automatically create a source filter.
*
* @inner @memberof Policy
* @param {string} address the address to parse
* @return {object}
*/
Policy.prototype.parseLinkAddress = function(address) {
// @todo: this "parsing" should be far more rigorous
if (!this.defaultSubjects) {
return { name: address };
}
var parts = address.split('/');
var result = { name: parts.shift() };
if (parts.length) result.subject = parts.shift();
return result;
};
/**
* Parses an address for use when connecting to an AMQP 1.0 broker
*
* @inner @memberof Policy
* @param {string} address the address to parse
* @return {object}
*/
Policy.prototype.parseAddress = function(address) {
var parsedAddress = url.parse(address);
var result = {
host: parsedAddress.hostname || parsedAddress.href,
path: (parsedAddress.path && parsedAddress.path !== address) ?
parsedAddress.path : '/',
protocol: parsedAddress.protocol ?
parsedAddress.protocol.slice(0, -1).toLowerCase() : 'amqp',
href: parsedAddress.href
};
if (!!parsedAddress.port) {
result.port = parseInt(parsedAddress.port);
} else {
switch (result.protocol.toLowerCase()) {
case 'amqp': result.port = constants.defaultPort; break;
case 'amqps': result.port = constants.defaultTlsPort; break;
}
}
result.rootUri = result.protocol + '://';
if (!!parsedAddress.auth) {
// capture the part of the encoded URI between '//' and '@'
var matchAuth = /amqps?:\/\/([^@]+).+/g;
var auth = matchAuth.exec(address)[1];
result.rootUri += auth + '@';
var authSplit = auth.split(':', 2);
result.user = decodeURIComponent(authSplit[0]);
result.pass = (authSplit[1]) ? decodeURIComponent(authSplit[1]) : null;
}
result.rootUri += result.host + ':' + result.port;
return result;
};
module.exports = Policy;