src/transporters/amqp.js
/*
* moleculer
* Copyright (c) 2018 MoleculerJS (https://github.com/moleculerjs/moleculer)
* MIT Licensed
*/
"use strict";
const url = require("url");
const Transporter = require("./base");
const { isPromise } = require("../utils");
const C = require("../constants");
const {
PACKET_REQUEST,
PACKET_RESPONSE,
PACKET_UNKNOWN,
PACKET_EVENT,
PACKET_DISCOVER,
PACKET_INFO,
PACKET_DISCONNECT,
PACKET_HEARTBEAT,
PACKET_PING,
PACKET_PONG
} = require("../packets");
/**
* Transporter for AMQP
*
* More info: https://www.amqp.org/
*
* For test:
*
* docker run -d -p 5672:5672 -p 15672:15672 --name rabbit rabbitmq:3-management
*
* @class AmqpTransporter
* @extends {Transporter}
*/
class AmqpTransporter extends Transporter {
/**
* Creates an instance of AmqpTransporter.
*
* @param {any} opts
*
* @memberof AmqpTransporter
*/
constructor(opts) {
if (typeof opts === "string") opts = { url: opts };
else if (opts == null) opts = {};
// Number of requests a broker will handle concurrently
if (typeof opts.prefetch !== "number") opts.prefetch = 1;
// Number of milliseconds before an event expires
if (typeof opts.eventTimeToLive !== "number") opts.eventTimeToLive = null;
if (typeof opts.heartbeatTimeToLive !== "number") opts.heartbeatTimeToLive = null;
if (typeof opts.queueOptions !== "object") opts.queueOptions = {};
if (typeof opts.exchangeOptions !== "object") opts.exchangeOptions = {};
if (typeof opts.messageOptions !== "object") opts.messageOptions = {};
if (typeof opts.consumeOptions !== "object") opts.consumeOptions = {};
// The default behavior is to delete the queues after they haven't had any
// connected consumers for 2 minutes.
const autoDeleteQueuesAfterDefault = 2 * 60 * 1000;
opts.autoDeleteQueues =
opts.autoDeleteQueues === true
? autoDeleteQueuesAfterDefault
: typeof opts.autoDeleteQueues === "number"
? opts.autoDeleteQueues
: opts.autoDeleteQueues === false
? -1
: autoDeleteQueuesAfterDefault;
// Support for multiple URLs (clusters)
opts.url = Array.isArray(opts.url)
? opts.url
: !opts.url
? [""]
: opts.url.split(";").filter(s => !!s);
super(opts);
this.hasBuiltInBalancer = true;
this.connection = null;
this.channel = null;
this.bindings = [];
this.channelDisconnecting = false;
this.connectionDisconnecting = false;
this.connectionCount = 0;
}
/**
* Connect to a AMQP server
*
* @memberof AmqpTransporter
*/
connect(errorCallback) {
return new this.broker.Promise((_resolve, _reject) => {
let _isResolved = false;
const resolve = () => {
_isResolved = true;
_resolve();
};
const reject = err => {
_reject(err);
// Returning callback to avoid race condition in the tests
if (_isResolved) return errorCallback(err);
};
let amqp;
try {
amqp = require("amqplib");
} catch (err) {
/* istanbul ignore next */
this.broker.fatal(
"The 'amqplib' package is missing. Please install it with 'npm install amqplib --save' command.",
err,
true
);
}
// Pick url
this.connectAttempt = (this.connectAttempt || 0) + 1;
const urlIndex = (this.connectAttempt - 1) % this.opts.url.length;
const uri = this.opts.url[urlIndex];
const urlParsed = url.parse(uri);
amqp.connect(
uri,
Object.assign({}, this.opts.socketOptions || {}, { servername: urlParsed.hostname })
)
.then(connection => {
this.connection = connection;
this.logger.info("AMQP is connected.");
/* istanbul ignore next*/
connection
.on("error", err => {
// No need to reject here since close event will be fired after
// if not connected at all connection promise will be rejected
this.logger.error("AMQP connection error.", err);
this.broker.broadcastLocal("$transporter.error", {
error: err,
module: "transporter",
type: C.FAILED_CONNECTION_ERROR
});
})
.on("close", err => {
this.connected = false;
if (!this.connectionDisconnecting) {
this.logger.error("AMQP connection is closed.");
return reject(err);
} else {
this.logger.info("AMQP connection is closed gracefully.");
}
})
.on("blocked", reason => {
this.logger.warn("AMQP connection is blocked.", reason);
})
.on("unblocked", () => {
this.logger.info("AMQP connection is unblocked.");
});
connection
.createChannel()
.then(channel => {
this.channel = channel;
this.logger.info("AMQP channel is created.");
channel.prefetch(this.opts.prefetch);
/* istanbul ignore next*/
channel
.on("close", () => {
this.channel = null;
// No need to reject here since close event on connection will handle
if (!this.channelDisconnecting)
this.logger.warn("AMQP channel is closed.");
else this.logger.info("AMQP channel is closed gracefully.");
})
.on("error", err => {
// No need to reject here since close event will be fired after
this.logger.error("AMQP channel error.", err);
this.broker.broadcastLocal("$transporter.error", {
error: err,
module: "transporter",
type: C.FAILED_CHANNEL_ERROR
});
})
.on("drain", () => {
this.logger.info("AMQP channel is drained.");
})
.on("return", msg => {
this.logger.warn("AMQP channel returned a message.", msg);
});
this.connectionCount += 1;
const isReconnect = this.connectionCount > 1;
// HACK: We have to do this because subscriptions aren't persistent,
// and only balanced subscriptions happen automatically on reconnects
const p = isReconnect
? this.transit.makeSubscriptions()
: this.broker.Promise.resolve();
return p.then(() => this.onConnected(isReconnect));
})
.then(resolve)
.catch(err => {
/* istanbul ignore next*/
this.logger.error("AMQP failed to create channel.");
this.connected = false;
return reject(err);
});
return null;
})
.catch(err => {
/* istanbul ignore next*/
this.logger.warn("AMQP failed to connect!");
this.connected = false;
return reject(err);
});
});
}
/**
* Disconnect from an AMQP server
*
* @memberof AmqpTransporter
* @description Close the connection and unbind this node's queues.
* This prevents messages from being broadcasted to a dead node.
* Note: Some methods of ending a node process don't allow disconnect to fire, meaning that
* some dead nodes could still receive published packets.
* Queues and Exchanges are not be deleted since they could contain important messages.
*/
disconnect() {
this.connectionCount = 0;
if (this.connection && this.channel && this.bindings) {
return this.broker.Promise.all(
this.bindings.map(binding => this.channel.unbindQueue(...binding))
)
.then(() => {
this.channelDisconnecting = this.transit.disconnecting;
this.connectionDisconnecting = this.transit.disconnecting;
})
.then(() => this.channel.close())
.then(() => this.connection.close())
.then(() => {
this.bindings = [];
this.channel = null;
this.connection = null;
})
.catch(err => this.logger.warn(err));
}
}
/**
* Get assertQueue options by packet type.
*
* @param {String} packetType
*
* @memberof AmqpTransporter
*/
_getQueueOptions(packetType, balancedQueue) {
let packetOptions;
switch (packetType) {
// Requests and responses don't expire.
case PACKET_REQUEST:
packetOptions =
this.opts.autoDeleteQueues >= 0 && !balancedQueue
? { expires: this.opts.autoDeleteQueues }
: {};
break;
case PACKET_RESPONSE:
packetOptions =
this.opts.autoDeleteQueues >= 0 ? { expires: this.opts.autoDeleteQueues } : {};
break;
// Consumers can decide how long events live
// Load-balanced/grouped events
case PACKET_EVENT + "LB":
case PACKET_EVENT:
packetOptions =
this.opts.autoDeleteQueues >= 0 ? { expires: this.opts.autoDeleteQueues } : {};
// If eventTimeToLive is specified, add to options.
if (this.opts.eventTimeToLive) packetOptions.messageTtl = this.opts.eventTimeToLive;
break;
// Packet types meant for internal use
case PACKET_HEARTBEAT:
packetOptions = { autoDelete: true };
// If heartbeatTimeToLive is specified, add to options.
if (this.opts.heartbeatTimeToLive)
packetOptions.messageTtl = this.opts.heartbeatTimeToLive;
break;
case PACKET_DISCOVER:
case PACKET_DISCONNECT:
case PACKET_UNKNOWN:
case PACKET_INFO:
case PACKET_PING:
case PACKET_PONG:
packetOptions = { autoDelete: true };
break;
}
return Object.assign(packetOptions, this.opts.queueOptions);
}
/**
* Build a function to handle requests.
*
* @param {String} cmd
* @param {Boolean} needAck
*
* @memberof AmqpTransporter
*/
_consumeCB(cmd, needAck = false) {
return msg => {
const result = this.receive(cmd, msg.content);
// If a promise is returned, acknowledge the message after it has resolved.
// This means that if a worker dies after receiving a message but before responding, the
// message won't be lost and it can be retried.
if (needAck) {
if (isPromise(result)) {
return result
.then(() => {
if (this.channel) this.channel.ack(msg);
})
.catch(err => {
this.logger.error("Message handling error.", err);
if (this.channel) this.channel.nack(msg);
});
} else if (this.channel) {
this.channel.ack(msg);
}
}
return result;
};
}
/**
* Subscribe to a command
*
* @param {String} cmd
* @param {String} nodeID
*
* @memberof AmqpTransporter
* @description Initialize queues and exchanges for all packet types except Request.
*
* All packets that should reach multiple nodes have a dedicated queue per node, and a single
* exchange that routes each message to all queues. These packet types will not use
* acknowledgements and have a set time-to-live. The time-to-live for EVENT packets can be
* configured in options.
* Examples: INFO (sometimes), DISCOVER, DISCONNECT, HEARTBEAT, PING, PONG, EVENT
*
* Other Packets are headed towards a specific node or queue. These don't need exchanges and
* packets of this type will not expire.
* Examples: REQUEST, RESPONSE
*
* RESPONSE: Each node has its own dedicated queue and acknowledgements will not be used.
*
* REQUEST: Each action has its own dedicated queue. This way if an action has multiple workers,
* they can all pull from the same queue. This allows a message to be retried by a different node
* if one dies before responding.
*
* Note: Queue's for REQUEST packet types are not initialized in the subscribe method because the
* actions themselves are not available from within the method. Instead they are intercepted from
* "prefix.INFO" packets because they are broadcast whenever a service is registered.
*
*/
subscribe(cmd, nodeID) {
if (!this.channel) return;
const topic = this.getTopicName(cmd, nodeID);
// Some topics are specific to this node already, in these cases we don't need an exchange.
if (nodeID != null) {
const needAck = [PACKET_REQUEST].indexOf(cmd) !== -1;
return this.channel
.assertQueue(topic, this._getQueueOptions(cmd))
.then(() =>
this.channel.consume(
topic,
this._consumeCB(cmd, needAck),
Object.assign({ noAck: !needAck }, this.opts.consumeOptions)
)
);
} else {
// Create a queue specific to this nodeID so that this node can receive broadcasted messages.
const queueName = `${this.prefix}.${cmd}.${this.nodeID}`;
// Save binding arguments for easy unbinding later.
const bindingArgs = [queueName, topic, ""];
this.bindings.push(bindingArgs);
return this.broker.Promise.all([
this.channel.assertExchange(topic, "fanout", this.opts.exchangeOptions),
this.channel.assertQueue(queueName, this._getQueueOptions(cmd))
]).then(() =>
this.broker.Promise.all([
this.channel.bindQueue(...bindingArgs),
this.channel.consume(
queueName,
this._consumeCB(cmd),
Object.assign({ noAck: true }, this.opts.consumeOptions)
)
])
);
}
}
/**
* Subscribe to balanced action commands
*
* @param {String} action
* @memberof AmqpTransporter
*/
subscribeBalancedRequest(action) {
if (!this.channel) return;
const queue = `${this.prefix}.${PACKET_REQUEST}B.${action}`;
return this.channel
.assertQueue(queue, this._getQueueOptions(PACKET_REQUEST, true))
.then(() =>
this.channel.consume(
queue,
this._consumeCB(PACKET_REQUEST, true),
this.opts.consumeOptions
)
);
}
/**
* Subscribe to balanced event command
*
* @param {String} event
* @param {String} group
* @memberof AmqpTransporter
*/
subscribeBalancedEvent(event, group) {
if (!this.channel) return;
const queue = `${this.prefix}.${PACKET_EVENT}B.${group}.${event}`;
return this.channel
.assertQueue(queue, this._getQueueOptions(PACKET_EVENT + "LB", true))
.then(() =>
this.channel.consume(
queue,
this._consumeCB(PACKET_EVENT, true),
this.opts.consumeOptions
)
);
}
/**
* Send data buffer.
*
* @param {String} topic
* @param {Buffer} data
* @param {Object} meta
*
* @returns {Promise}
*/
send(topic, data, { balanced, packet }) {
/* istanbul ignore next*/
if (!this.channel) return this.broker.Promise.resolve();
if (packet.target != null || balanced) {
this.channel.sendToQueue(topic, data, this.opts.messageOptions);
} else {
this.channel.publish(topic, "", data, this.opts.messageOptions);
}
return this.broker.Promise.resolve();
}
}
module.exports = AmqpTransporter;