src/transporters/tcp.js
/*
* moleculer
* Copyright (c) 2020 MoleculerJS (https://github.com/moleculerjs/moleculer)
* MIT Licensed
*/
"use strict";
const Transporter = require("./base");
const _ = require("lodash");
const { isObject, isString } = require("../utils");
const fs = require("fs");
const kleur = require("kleur");
const Node = require("../registry/node");
const P = require("../packets");
const { resolvePacketID } = require("./tcp/constants");
const { MoleculerServerError } = require("../errors");
const UdpServer = require("./tcp/udp-broadcaster");
const TcpReader = require("./tcp/tcp-reader");
const TcpWriter = require("./tcp/tcp-writer");
/**
* TCP Transporter with optional UDP discovery ("zero configuration") module.
*
* TCP Transporter uses fault tolerant and peer-to-peer <b>Gossip Protocol</b>
* to discover location and service information about the other nodes
* participating in a Moleculer Cluster. In Moleculer's P2P architecture all
* nodes are equal, there is no "leader" or "controller" node, so the cluster is
* truly horizontally scalable. This transporter aims to run on top of an
* infrastructure of hundreds of nodes.
*
* @class TcpTransporter
* @extends {Transporter}
*/
class TcpTransporter extends Transporter {
/**
* Creates an instance of TcpTransporter.
*
* @param {any} opts
*
* @memberof TcpTransporter
*/
constructor(opts) {
if (isString(opts)) opts = { urls: opts };
super(opts);
this.opts = Object.assign(
{
// UDP discovery options
udpDiscovery: true,
udpPort: 4445,
udpBindAddress: null,
udpPeriod: 30,
udpReuseAddr: true,
udpMaxDiscovery: 0, // 0 - No limit
// Multicast settings
udpMulticast: "239.0.0.0",
udpMulticastTTL: 1,
// Broadcast settings
udpBroadcast: false,
// TCP options
port: null, // random port,
urls: null, // Remote node addresses (when UDP discovery is not available)
useHostname: true,
gossipPeriod: 2, // seconds
maxConnections: 32, // Max live outgoing TCP connections
maxPacketSize: 1 * 1024 * 1024
},
this.opts
);
this.reader = null;
this.writer = null;
this.udpServer = null;
this.gossipTimer = null;
this.GOSSIP_DEBUG = !!this.opts.debug;
}
/**
* Init transporter
*
* @param {Transit} transit
* @param {Function} messageHandler
* @param {Function} afterConnect
*
* @memberof BaseTransporter
*/
init(transit, messageHandler, afterConnect) {
super.init(transit, messageHandler, afterConnect);
if (this.broker) {
this.Promise = this.broker.Promise;
this.registry = this.broker.registry;
this.discoverer = this.broker.registry.discoverer;
this.nodes = this.registry.nodes;
// Disable normal HB logic
this.discoverer.disableHeartbeat();
}
}
/**
* Start UDP & TCP servers
*
* @memberof TcpTransporter
*/
connect() {
return this.Promise.resolve()
.then(() => {
// Load offline nodes
if (this.opts.urls) return this.loadUrls();
})
.then(() => this.startTcpServer())
.then(() => this.startUdpServer())
.then(() => this.startTimers())
.then(() => {
this.logger.info("TCP Transporter started.");
this.connected = true;
// Set the opened TCP port (because it is a random port by default)
this.nodes.localNode.port = this.opts.port;
// Regenerate local node INFO because port changed
this.registry.regenerateLocalRawInfo(true);
})
.then(() => this.onConnected());
}
/**
* Start a TCP server for incoming packets
*/
startTcpServer() {
this.writer = new TcpWriter(this, this.opts);
this.reader = new TcpReader(this, this.opts);
this.writer.on("error", (err, nodeID) => {
this.logger.debug(`TCP client error on '${nodeID}'`, err);
this.nodes.disconnected(nodeID, false);
});
this.writer.on("end", nodeID => {
this.logger.debug(`TCP connection ended with '${nodeID}'`);
this.nodes.disconnected(nodeID, false);
});
return this.reader.listen();
}
/**
* Start a UDP server for automatic discovery
*/
startUdpServer() {
this.udpServer = new UdpServer(this, this.opts);
this.udpServer.on("message", (nodeID, address, port) => {
if (nodeID && nodeID != this.nodeID) {
//this.logger.info(`UDP discovery received from ${address} on ${nodeID}.`);
let node = this.nodes.get(nodeID);
if (!node) {
// Unknown node. Register as offline node
node = this.addOfflineNode(nodeID, address, port);
} else if (!node.available) {
// Update connection data
node.port = port;
node.hostname = address;
if (node.ipList.indexOf(address) == -1) node.ipList.unshift(address);
}
node.udpAddress = address;
}
});
return this.udpServer.bind();
}
loadUrls() {
if (!this.opts.urls) return this.Promise.resolve();
if (Array.isArray(this.opts.urls) && this.opts.urls.length == 0)
return this.Promise.resolve();
return this.Promise.resolve(this.opts.urls)
.then(str => {
if (isString(str) && str.startsWith("file://")) {
const fName = str.replace("file://", "");
this.logger.debug(`Load nodes list from file '${fName}'...`);
let content = fs.readFileSync(fName);
if (content && content.length > 0) {
content = content.toString().trim();
if (content.startsWith("{") || content.startsWith("["))
return JSON.parse(content);
else return content.split("\n").map(s => s.trim());
}
}
return str;
})
.then(urls => {
if (isString(urls)) {
urls = urls.split(",").map(s => s.trim());
} else if (isObject(urls) && !Array.isArray(urls)) {
const list = [];
_.forIn(urls, (s, nodeID) => list.push(`${s}/${nodeID}`));
urls = list;
}
if (urls && urls.length > 0) {
urls.map(s => {
if (!s) return;
if (s.startsWith("tcp://")) s = s.replace("tcp://", "");
const p = s.split("/");
if (p.length != 2)
return this.logger.warn(
"Invalid endpoint URL. Missing nodeID. URL:",
s
);
const u = p[0].split(":");
if (u.length < 2)
return this.logger.warn("Invalid endpoint URL. Missing port. URL:", s);
const nodeID = p[1];
const port = Number(u.pop());
const host = u.join(":"); // support IPv6 addresses
return { nodeID, host, port };
}).forEach(ep => {
if (!ep) return;
if (ep.nodeID == this.nodeID) {
// Read port from urls
if (!this.opts.port) this.opts.port = ep.port;
} else {
// Create node as offline
this.addOfflineNode(ep.nodeID, ep.host, ep.port);
}
});
}
// TODO: this.nodes.disableOfflineNodeRemoving = true;
});
}
/**
* Process incoming packets
*
* @param {String} type
* @param {Object} message
* @param {Socket} socket
*/
onIncomingMessage(type, message, socket) {
return this.receive(type, message, socket);
}
/**
* Received data. It's a wrapper for middlewares.
*
* @param {String} cmd
* @param {Buffer} data
*/
receive(type, message, socket) {
switch (type) {
case P.PACKET_GOSSIP_HELLO:
return this.processGossipHello(message, socket);
case P.PACKET_GOSSIP_REQ:
return this.processGossipRequest(message);
case P.PACKET_GOSSIP_RES:
return this.processGossipResponse(message);
default:
return this.incomingMessage(type, message);
}
}
/**
* Start Gossip timers
*/
startTimers() {
this.gossipTimer = setInterval(() => {
this.getLocalNodeInfo()
.updateLocalInfo(this.broker.getCpuUsage)
.then(() => this.sendGossipRequest());
}, Math.max(this.opts.gossipPeriod, 1) * 1000);
this.gossipTimer.unref();
}
/**
* Stop Gossip timers
*/
stopTimers() {
if (this.gossipTimer) {
clearInterval(this.gossipTimer);
this.gossipTimer = null;
}
}
/**
* Register a node as offline because we don't know all information about it
*
* @param {String} id - NodeID
* @param {String} address
* @param {Number} port
*/
addOfflineNode(id, address, port) {
const node = new Node(id);
node.local = false;
node.hostname = address;
node.ipList = [address];
node.port = port;
node.available = false;
node.seq = 0;
node.offlineSince = Math.round(process.uptime());
this.nodes.add(node.id, node);
return node;
}
/**
* Wrapper for TCP writer
*
* @param {String} nodeID
* @returns
* @memberof TcpTransporter
*/
getNode(nodeID) {
return this.nodes.get(nodeID);
}
/**
* Get address for node. It returns the hostname or IP address
*
* @param {Node} node
* @returns
* @memberof TcpTransporter
*/
getNodeAddress(node) {
if (node.udpAddress) return node.udpAddress;
if (this.opts.useHostname && node.hostname) return node.hostname;
if (node.ipList && node.ipList.length > 0) return node.ipList[0];
this.logger.warn(`Node ${node.id} has no valid address`, node);
return null;
}
/**
* Send a Gossip Hello to the remote node
*
* @param {String} nodeID
*/
sendHello(nodeID) {
const node = this.getNode(nodeID);
if (!node)
return this.Promise.reject(
new MoleculerServerError(`Missing node info for '${nodeID}'`)
);
const localNode = this.nodes.localNode;
const packet = new P.Packet(P.PACKET_GOSSIP_HELLO, nodeID, {
host: this.getNodeAddress(localNode),
port: localNode.port
});
if (this.GOSSIP_DEBUG)
this.logger.info(
kleur.bgCyan().black(`----- HELLO ${this.nodeID} -> ${nodeID} -----`),
packet.payload
);
return this.publish(packet).catch(() => {
this.logger.debug(`Unable to send Gossip HELLO packet to ${nodeID}.`);
});
}
/**
* Process incoming Gossip Hello packet
*
* @param {Buffer} msg
* @param {Socket} socket
*/
processGossipHello(msg, socket) {
try {
const packet = this.deserialize(P.PACKET_GOSSIP_HELLO, msg);
const payload = packet.payload;
const nodeID = payload.sender;
if (this.GOSSIP_DEBUG)
this.logger.info(`----- HELLO ${this.nodeID} <- ${payload.sender} -----`, payload);
let node = this.nodes.get(nodeID);
if (!node) {
// Unknown node. Register as offline node
node = this.addOfflineNode(nodeID, payload.host, payload.port);
}
if (!node.udpAddress) node.udpAddress = socket.remoteAddress;
} catch (err) {
this.logger.warn("Invalid incoming GOSSIP_HELLO packet.", err);
this.logger.debug("Content:", msg.toString());
}
}
/**
* Create and send a Gossip request packet
*/
sendGossipRequest() {
const list = this.nodes.toArray();
if (!list || list.length <= 1) return;
let packet = {
online: {},
offline: {}
};
let onlineList = [];
let offlineList = [];
list.forEach(node => {
if (!node.available) {
if (node.seq > 0) {
packet.offline[node.id] = node.seq;
}
offlineList.push(node);
} else {
packet.online[node.id] = [node.seq, node.cpuSeq || 0, node.cpu || 0];
if (!node.local) onlineList.push(node);
}
});
/* istanbul ignore next */
if (Object.keys(packet.offline).length == 0) delete packet.offline;
/* istanbul ignore next */
if (Object.keys(packet.online).length == 0) delete packet.online;
if (onlineList.length > 0) {
// Send gossip message to a live endpoint
this.sendGossipToRandomEndpoint(packet, onlineList);
}
if (offlineList.length > 0) {
const ratio = offlineList.length / (onlineList.length + 1);
// Random number between 0.0 and 1.0
if (ratio >= 1 || Math.random() < ratio) {
// Send gossip message to an offline endpoint
this.sendGossipToRandomEndpoint(packet, offlineList);
}
}
}
/**
* Send a Gossip request packet to a random endpoint
*
* @param {Object} data
* @param {Array} endpoints
*/
sendGossipToRandomEndpoint(data, endpoints) {
if (endpoints.length == 0) return;
const ep =
endpoints.length == 1
? endpoints[0]
: endpoints[Math.floor(Math.random() * endpoints.length)];
if (ep) {
const packet = new P.Packet(P.PACKET_GOSSIP_REQ, ep.id, data);
this.publish(packet).catch(() => {
this.logger.debug(`Unable to send Gossip packet to ${ep.id}.`);
});
if (this.GOSSIP_DEBUG)
this.logger.info(
kleur.bgYellow().black(`----- REQUEST ${this.nodeID} -> ${ep.id} -----`),
packet.payload
);
}
}
/**
* Process incoming Gossip Request packet
*
* @param {Buffer} msg
* @memberof TcpTransporter
*/
processGossipRequest(msg) {
const response = {
online: {},
offline: {}
};
try {
const packet = this.deserialize(P.PACKET_GOSSIP_REQ, msg);
const payload = packet.payload;
if (this.GOSSIP_DEBUG)
this.logger.info(
`----- REQUEST ${this.nodeID} <- ${payload.sender} -----`,
payload
);
const list = this.nodes.toArray();
list.forEach(node => {
const online = payload.online ? payload.online[node.id] : null;
const offline = payload.offline ? payload.offline[node.id] : null;
let seq, cpuSeq, cpu;
if (offline) seq = offline;
else if (online) [seq, cpuSeq, cpu] = online;
if (!seq || seq < node.seq) {
// We have newer info or requester doesn't know it
if (node.available) {
const info = this.registry.getNodeInfo(node.id);
response.online[node.id] = [info, node.cpuSeq || 0, node.cpu || 0];
} else {
response.offline[node.id] = node.seq;
}
return;
}
if (offline) {
// Requester said it is OFFLINE
if (!node.available) {
// We also know it as offline
// Update 'seq' if it is newer than us
if (seq > node.seq) node.seq = seq;
return;
} else if (!node.local) {
// We know it is online, so we change it to offline
this.nodes.disconnected(node.id, false);
// Update the 'seq' to the received value
node.seq = seq;
} else if (node.local) {
// Requested said I'm offline. We should send back that we are online!
// We need to increment the received `seq` so that the requester will update us
node.seq = seq + 1;
const info = this.registry.getLocalNodeInfo(true);
response.online[node.id] = [info, node.cpuSeq || 0, node.cpu || 0];
}
} else if (online) {
// Requester said it is ONLINE
if (node.available) {
if (cpuSeq > node.cpuSeq) {
// We update CPU info
node.heartbeat({
cpu,
cpuSeq
});
} else if (cpuSeq < node.cpuSeq) {
// We have newer CPU value, send back
response.online[node.id] = [node.cpuSeq || 0, node.cpu || 0];
}
} else {
// We know it as offline. We do nothing, because we'll request it and we'll receive its INFO.
return;
}
}
});
// Remove empty keys
if (Object.keys(response.offline).length == 0) delete response.offline;
if (Object.keys(response.online).length == 0) delete response.online;
if (response.online || response.offline) {
let sender = this.nodes.get(payload.sender);
// Send back the Gossip response to the sender
const rspPacket = new P.Packet(P.PACKET_GOSSIP_RES, sender.id, response);
this.publish(rspPacket).catch(() => {});
if (this.GOSSIP_DEBUG)
this.logger.info(
kleur
.bgMagenta()
.black(`----- RESPONSE ${this.nodeID} -> ${sender.id} -----`),
rspPacket.payload
);
} else {
if (this.GOSSIP_DEBUG)
this.logger.info(
kleur
.bgBlue()
.white(`----- EMPTY RESPONSE ${this.nodeID} -> ${payload.sender} -----`)
);
}
} catch (err) {
this.logger.warn("Invalid incoming GOSSIP_REQ packet.", err);
this.logger.debug("Content:", msg.toString());
// this.logger.debug("Response:", inspect(response, { depth: 10 }));
}
}
/**
* Process the incoming Gossip Response packet
*
* @param {Buffer} msg
* @memberof TcpTransporter
*/
processGossipResponse(msg) {
try {
const packet = this.deserialize(P.PACKET_GOSSIP_RES, msg);
const payload = packet.payload;
if (this.GOSSIP_DEBUG)
this.logger.info(
`----- RESPONSE ${this.nodeID} <- ${payload.sender} -----`,
payload
);
// Process online nodes
if (payload.online) {
Object.keys(payload.online).forEach(nodeID => {
// We don't process the self info. We know it better.
if (nodeID == this.nodeID) return;
const row = payload.online[nodeID];
if (!Array.isArray(row)) return;
let info, cpu, cpuSeq;
if (row.length == 1) info = row[0];
else if (row.length == 2) [cpuSeq, cpu] = row;
else if (row.length == 3) [info, cpuSeq, cpu] = row;
let node = this.nodes.get(nodeID);
if (info && (!node || node.seq < info.seq)) {
// If we don't know it, or know, but has smaller seq, update 'info'
info.sender = nodeID;
node = this.nodes.processNodeInfo(info);
}
if (node && cpuSeq && cpuSeq > node.cpuSeq) {
// Update CPU
node.heartbeat({
cpu,
cpuSeq
});
}
});
}
// Process offline nodes
if (payload.offline) {
Object.keys(payload.offline).forEach(nodeID => {
// We don't process the self info. We know it better.
if (nodeID == this.nodeID) return;
const seq = payload.offline[nodeID];
const node = this.nodes.get(nodeID);
if (!node) return;
if (node.seq < seq) {
if (node.available) {
// We know it is online, so we change it to offline
this.nodes.disconnected(node.id, false);
}
// Update the 'seq' to the received value
node.seq = seq;
}
});
}
} catch (err) {
this.logger.warn("Invalid incoming GOSSIP_RES packet.", err);
this.logger.debug("Content:", msg.toString());
}
}
/**
* Close TCP & UDP servers and destroy sockets.
*
* @memberof TcpTransporter
*/
disconnect() {
this.connected = false;
this.stopTimers();
if (this.reader) this.reader.close();
if (this.writer) this.writer.close();
if (this.udpServer) this.udpServer.close();
}
/**
* Get local node info instance
*/
getLocalNodeInfo() {
return this.nodes.localNode;
}
/**
* Get a node info instance by nodeID
*
* @param {String} nodeID
*/
getNodeInfo(nodeID) {
return this.nodes.get(nodeID);
}
/**
* Subscribe to a command
*
* @param {String} cmd
* @param {String} nodeID
*
* @memberof TcpTransporter
*/
subscribe(/*cmd, nodeID*/) {
/* istanbul ignore next */
return this.Promise.resolve();
}
/**
* Publish a packet
*
* @param {Packet} packet
*
* @memberof TcpTransporter
*/
publish(packet) {
if (
!packet.target ||
[
P.PACKET_EVENT,
P.PACKET_PING,
P.PACKET_PONG,
P.PACKET_REQUEST,
P.PACKET_RESPONSE,
P.PACKET_GOSSIP_REQ,
P.PACKET_GOSSIP_RES,
P.PACKET_GOSSIP_HELLO
].indexOf(packet.type) == -1
)
return this.Promise.resolve();
const data = this.serialize(packet);
return this.send(packet.type, data, { packet });
}
/**
* Send data buffer.
*
* @param {String} topic
* @param {Buffer} data
* @param {Object} meta
*
* @returns {Promise}
*/
send(topic, data, { packet }) {
const packetID = resolvePacketID(packet.type);
return this.writer.send(packet.target, packetID, data).catch(err => {
this.nodes.disconnected(packet.target, true);
throw err;
});
}
}
module.exports = TcpTransporter;