src/transporters/amqp10.js
/*
* moleculer
* Copyright (c) 2020 MoleculerJS (https://github.com/moleculerjs/moleculer)
* MIT Licensed
*/
"use strict";
const url = require("url");
const Transporter = require("./base");
const { isPromise } = require("../utils");
const C = require("../constants");
const {
PACKET_REQUEST,
PACKET_RESPONSE,
PACKET_UNKNOWN,
PACKET_EVENT,
PACKET_DISCOVER,
PACKET_INFO,
PACKET_DISCONNECT,
PACKET_HEARTBEAT,
PACKET_PING,
PACKET_PONG
} = require("../packets");
/**
* Transporter for AMQP 1.0
*
* More info: https://www.amqp.org/resources/specifications
*
* For test:
*
* docker run -p 61616:61616 -p 8161:8161 -p 5672:5672 --rm -d --name=activemq rmohr/activemq
*
* TRANSPORTER=amqp10://guest:guest@localhost:5672
*
* @class Amqp10Transporter
* @extends {Transporter}
*/
class Amqp10Transporter extends Transporter {
/**
* Creates an instance of Amqp10Transporter.
*
* @param {any} opts
*
* @memberof Amqp10Transporter
*/
constructor(opts) {
if (typeof opts == "string") opts = { url: opts };
super(opts);
/* istanbul ignore next*/
if (!this.opts)
this.opts = {
url: "amqp10://guest:guest@localhost:5672"
};
// Number of requests a broker will handle concurrently
if (typeof this.opts.prefetch !== "number") this.opts.prefetch = 1;
// Number of milliseconds before an event expires
if (typeof this.opts.eventTimeToLive !== "number") this.opts.eventTimeToLive = null;
if (typeof this.opts.heartbeatTimeToLive !== "number") this.opts.heartbeatTimeToLive = null;
if (typeof this.opts.connectionOptions !== "object") this.opts.connectionOptions = {};
if (typeof this.opts.queueOptions !== "object") this.opts.queueOptions = {};
if (typeof this.opts.topicOptions !== "object") this.opts.topicOptions = {};
if (typeof this.opts.messageOptions !== "object") this.opts.messageOptions = {};
if (typeof this.opts.topicPrefix !== "string") this.opts.topicPrefix = "topic://";
this.receivers = [];
this.hasBuiltInBalancer = true;
this.connection = null;
this.session = null;
}
_getQueueOptions(packetType /*, balancedQueue*/) {
let packetOptions = {};
switch (packetType) {
// Requests and responses don't expire.
case PACKET_REQUEST:
// TODO: auto delete
break;
case PACKET_RESPONSE:
// TODO: auto delete
break;
// Consumers can decide how long events live
// Load-balanced/grouped events
case PACKET_EVENT + "LB":
case PACKET_EVENT:
// TODO: auto delete
break;
// Packet types meant for internal use
case PACKET_HEARTBEAT:
// TODO: auto delete
// packetOptions = {};
break;
case PACKET_DISCOVER:
case PACKET_DISCONNECT:
case PACKET_UNKNOWN:
case PACKET_INFO:
case PACKET_PING:
case PACKET_PONG:
// TODO: auto delete
break;
}
return Object.assign(packetOptions, this.opts.queueOptions);
}
_getMessageOptions(packetType) {
let messageOptions = {};
switch (packetType) {
case PACKET_REQUEST:
case PACKET_RESPONSE:
break;
case PACKET_EVENT + "LB":
case PACKET_EVENT:
if (this.opts.eventTimeToLive) messageOptions.ttl = this.opts.eventTimeToLive;
break;
case PACKET_HEARTBEAT:
if (this.opts.heartbeatTimeToLive)
messageOptions.ttl = this.opts.heartbeatTimeToLive;
break;
case PACKET_DISCOVER:
case PACKET_DISCONNECT:
case PACKET_UNKNOWN:
case PACKET_INFO:
case PACKET_PING:
case PACKET_PONG:
break;
}
return Object.assign(messageOptions, this.opts.messageOptions);
}
/**
* Build a function to handle requests.
*
* @param {String} cmd
* @param {Boolean} needAck
*
* @memberof Amqp10Transporter
*/
_consumeCB(cmd, needAck = false) {
return ({ message, delivery }) => {
const result = this.incomingMessage(cmd, message.body);
if (needAck) {
if (isPromise(result)) {
return result
.then(() => {
if (this.connection) {
delivery.accept();
}
})
.catch(err => {
this.logger.error("Message handling error.", err);
if (this.connection) {
delivery.reject();
}
this.broker.broadcastLocal("$transporter.error", {
error: err,
module: "transporter",
type: C.FAILED_REQUEST_ACK
});
});
} else {
if (this.connection) {
delivery.accept();
}
}
}
return result;
};
}
/**
* Connect to a AMQP 1.0 server
*
* @memberof Amqp10Transporter
*/
connect(errorCallback) {
let rhea;
try {
rhea = require("rhea-promise");
} catch (err) {
/* istanbul ignore next */
this.broker.fatal(
"The 'rhea-promise' package is missing. Please install it with 'npm install rhea-promise --save' command.",
err,
true
);
}
// Pick url
const uri = this.opts.url;
const urlParsed = url.parse(uri);
const username = urlParsed.auth ? urlParsed.auth.split(":")[0] : undefined;
const password = urlParsed.auth ? urlParsed.auth.split(":")[1] : undefined;
const connectionOptions = Object.assign(
{
host: urlParsed.hostname,
hostname: urlParsed.hostname,
username,
password,
port: urlParsed.port || 5672,
container_id: this.broker.instanceID
},
this.opts.connectionOptions
);
const container = new rhea.Container();
const connection = container.createConnection(connectionOptions);
connection.on("disconnected", e => {
this.logger.info("AMQP10 disconnected.");
this.connected = false;
if (e) {
this.logger.error(
"AMQP10 connection error.",
(this.connection && this.connection.error) || ""
);
errorCallback && errorCallback(e);
this.broker.broadcastLocal("$transporter.error", {
error: e,
module: "transporter",
type: C.FAILED_DISCONNECTION
});
}
});
return connection
.open()
.then(connection => {
this.connection = connection;
this.connection.createSession().then(session => {
this.session = session;
this.logger.info("AMQP10 is connected");
this.connection._connection.setMaxListeners(0);
this.session._session.setMaxListeners(0);
this.session.setMaxListeners(0);
this.connected = true;
return this.onConnected();
});
})
.catch(e => {
this.logger.error(
"AMQP10 connection error.",
(this.connection && this.connection.error) || ""
);
this.logger.info("AMQP10 is disconnected.");
this.connected = false;
errorCallback && errorCallback(e);
this.broker.broadcastLocal("$transporter.error", {
error: e,
module: "transporter",
type: C.FAILED_DISCONNECTION
});
});
}
/**
* Disconnect from an AMQP 1.0 server
* Close every receiver on the connections and the close the connection
* @memberof Amqp10Transporter
*/
disconnect() {
if (this.connection) {
return this.broker.Promise.all(this.receivers.map(receiver => receiver.close()))
.then(() => this.connection.close())
.then(() => {
this.connection = null;
this.connected = false;
this.session = null;
this.receivers = [];
})
.catch(error => {
this.logger.error(error);
this.broker.broadcastLocal("$transporter.error", {
error,
module: "transporter",
type: C.FAILED_DISCONNECTION
});
});
}
}
/**
* Subscribe to a command
*
* @param {String} cmd
* @param {String} nodeID
*
* @memberof Amqp10Transporter
* @description Initialize queues and topics for all packet types.
*
* All packets that should reach multiple nodes have a dedicated topic for that command
* These packet types will not use acknowledgements.
* The time-to-live for EVENT packets can be configured in options.
* Examples: INFO, DISCOVER, DISCONNECT, HEARTBEAT, PING, PONG, EVENT
*
* Other Packets are headed towards a specific queue. These don't need topics and
* packets of this type will not expire.
* Examples: REQUEST, RESPONSE
*
* RESPONSE: Each node has its own dedicated queue and acknowledgements will not be used.
*
* REQUEST: Each action has its own dedicated queue. This way if an action has multiple workers,
* they can all pull from the same queue. This allows a message to be retried by a different node
* if one dies before responding.
*
*/
subscribe(cmd, nodeID) {
if (!this.connection) return;
const topic = this.getTopicName(cmd, nodeID);
let receiverOptions = this._getQueueOptions(cmd);
if (nodeID) {
const needAck = [PACKET_REQUEST].indexOf(cmd) !== -1;
Object.assign(receiverOptions, this.opts.queueOptions, {
credit_window: this.opts.prefetch !== 0 ? 0 : undefined,
autoaccept: !needAck,
name: topic,
source: {
address: topic
},
session: this.session
});
return this.connection.createReceiver(receiverOptions).then(receiver => {
if (this.opts.prefetch !== 0) {
receiver.addCredit(this.opts.prefetch);
}
receiver.on("message", context => {
const cb = this._consumeCB(cmd, needAck)(context);
if (isPromise(cb) && this.opts.prefetch !== 0) {
return cb.then(() => receiver.addCredit(1));
}
if (this.opts.prefetch !== 0) {
receiver.addCredit(1);
}
});
this.receivers.push(receiver);
});
} else {
const topicName = `${this.opts.topicPrefix}${topic}`;
Object.assign(receiverOptions, this.opts.topicOptions, {
name: topicName,
source: {
address: topicName
},
session: this.session
});
return this.connection.createReceiver(receiverOptions).then(receiver => {
receiver.on("message", context => {
this._consumeCB(cmd, false)(context);
});
this.receivers.push(receiver);
});
}
}
/**
* Subscribe to balanced action commands
* For REQB command types
* These queues will be used when the "disableBalancer" set to true
*
* @param {String} action
* @memberof Amqp10Transporter
*/
subscribeBalancedRequest(action) {
const queue = `${this.prefix}.${PACKET_REQUEST}B.${action}`;
const receiverOptions = Object.assign(
{
credit_window: this.opts.prefetch !== 0 ? 0 : undefined,
source: { address: queue },
autoaccept: false,
session: this.session
},
this._getQueueOptions(PACKET_REQUEST, true)
);
return this.connection.createReceiver(receiverOptions).then(receiver => {
if (this.opts.prefetch !== 0) {
receiver.addCredit(this.opts.prefetch);
}
receiver.on("message", context => {
const cb = this._consumeCB(PACKET_REQUEST, true)(context);
if (isPromise(cb) && this.opts.prefetch !== 0) {
return cb.then(() => receiver.addCredit(1));
}
if (this.opts.prefetch !== 0) {
receiver.addCredit(1);
}
});
this.receivers.push(receiver);
});
}
/**
* Subscribe to balanced event command
* For EVENTB command types
* These queues will be used when the "disableBalancer" set to true
*
* @param {String} event
* @param {String} group
* @memberof Amqp10Transporter
*/
subscribeBalancedEvent(event, group) {
const queue = `${this.prefix}.${PACKET_EVENT}B.${group}.${event}`;
const receiverOptions = Object.assign(
{
source: { address: queue },
autoaccept: false,
session: this.session
},
this._getQueueOptions(PACKET_EVENT + "LB", true)
);
return this.connection.createReceiver(receiverOptions).then(receiver => {
receiver.on("message", this._consumeCB(PACKET_EVENT, true));
this.receivers.push(receiver);
});
}
/**
* Publish a packet
*
* @param {Packet} packet
*
* @memberof Amqp10Transporter
* @description Send packets to their intended queues / topics.
*
* Reasonings documented in the subscribe method.
*/
publish(packet) {
/* istanbul ignore next*/
if (!this.connection) return;
let topic = this.getTopicName(packet.type, packet.target);
const data = this.serialize(packet);
const message = Object.assign(
{ body: data },
this.opts.messageOptions,
this._getMessageOptions(packet.type)
);
const awaitableSenderOptions = {
target: {
address: packet.target ? topic : `${this.opts.topicPrefix}${topic}`
},
session: this.session
};
return this.connection
.createAwaitableSender(awaitableSenderOptions)
.then(sender => {
return sender
.send(message)
.catch(this.logger.error)
.then(() => sender);
})
.then(sender => {
return sender.close({ closeSession: false });
})
.catch(error => {
this.logger.error(error);
this.broker.broadcastLocal("$transporter.error", {
error,
module: "transporter",
type: C.FAILED_PUBLISHER_ERROR
});
});
}
/**
* Publish a balanced EVENT(B) packet to a balanced queue
*
* @param {Packet} packet
* @param {String} group
* @returns {Promise}
* @memberof Amqp10Transporter
*/
publishBalancedEvent(packet, group) {
/* istanbul ignore next*/
if (!this.connection) return;
let queue = `${this.prefix}.${PACKET_EVENT}B.${group}.${packet.payload.event}`;
const data = this.serialize(packet);
const message = Object.assign(
{ body: data },
this.opts.messageOptions,
this._getMessageOptions(PACKET_EVENT, true)
);
const awaitableSenderOptions = {
target: {
address: queue
},
session: this.session
};
return this.connection
.createAwaitableSender(awaitableSenderOptions)
.then(sender => {
return sender
.send(message)
.catch(this.logger.error)
.then(() => sender);
})
.then(sender => {
return sender.close({ closeSession: false });
})
.catch(error => {
this.logger.error(error);
this.broker.broadcastLocal("$transporter.error", {
error,
module: "transporter",
type: C.FAILED_PUBLISH_BALANCED_EVENT
});
});
}
/**
* Publish a balanced REQ(B) packet to a balanced queue
*
* @param {Packet} packet
* @returns {Promise}
* @memberof Amqp10Transporter
*/
publishBalancedRequest(packet) {
/* istanbul ignore next*/
if (!this.connection) return this.broker.Promise.resolve();
const queue = `${this.prefix}.${PACKET_REQUEST}B.${packet.payload.action}`;
const data = this.serialize(packet);
const message = Object.assign(
{ body: data },
this.opts.messageOptions,
this._getMessageOptions(PACKET_REQUEST, true)
);
const awaitableSenderOptions = {
target: {
address: queue
},
session: this.session
};
return this.connection
.createAwaitableSender(awaitableSenderOptions)
.then(sender => {
return sender
.send(message)
.catch(this.logger.error)
.then(() => sender);
})
.then(sender => {
return sender.close({ closeSession: false });
})
.catch(error => {
this.logger.error(error);
this.broker.broadcastLocal("$transporter.error", {
error,
module: "transporter",
type: C.FAILED_PUBLISH_BALANCED_REQUEST
});
});
}
}
module.exports = Amqp10Transporter;