moleculerjs/moleculer

View on GitHub
src/transit.js

Summary

Maintainability
A
0 mins
Test Coverage
/*
 * moleculer
 * Copyright (c) 2020 MoleculerJS (https://github.com/moleculerjs/moleculer)
 * MIT Licensed
 */

"use strict";

const P = require("./packets");
const { Packet } = require("./packets");
const E = require("./errors");

const { Transform } = require("stream");
const { METRIC } = require("./metrics");
const C = require("./constants");

/**
 * Transit class
 *
 * @class Transit
 */
class Transit {
    /**
     * Create an instance of Transit.
     *
     * @param {ServiceBroker} Broker instance
     * @param {Transporter} Transporter instance
     * @param {Object?} opts
     *
     * @memberof Transit
     */
    constructor(broker, transporter, opts) {
        this.broker = broker;
        this.Promise = broker.Promise;
        this.logger = broker.getLogger("transit");
        this.nodeID = broker.nodeID;
        this.metrics = broker.metrics;
        this.instanceID = broker.instanceID;
        this.tx = transporter;
        this.opts = opts;
        this.discoverer = broker.registry.discoverer;
        this.errorRegenerator = broker.errorRegenerator;

        this.pendingRequests = new Map();
        this.pendingReqStreams = new Map();
        this.pendingResStreams = new Map();

        /* deprecated */
        this.stat = {
            packets: {
                sent: {
                    count: 0,
                    bytes: 0
                },
                received: {
                    count: 0,
                    bytes: 0
                }
            }
        };

        this.connected = false;
        this.disconnecting = false;
        this.isReady = false;

        const wrappedMessageHandler = (cmd, packet) => this.messageHandler(cmd, packet);

        this.publish = this.broker.wrapMethod("transitPublish", this.publish, this);
        this.messageHandler = this.broker.wrapMethod(
            "transitMessageHandler",
            this.messageHandler,
            this
        );

        if (this.tx) {
            this.tx.init(this, wrappedMessageHandler, this.afterConnect.bind(this));

            this.tx.send = this.broker.wrapMethod("transporterSend", this.tx.send, this.tx);
            this.tx.receive = this.broker.wrapMethod(
                "transporterReceive",
                this.tx.receive,
                this.tx,
                { reverse: true }
            );
        }

        this.__connectResolve = null;

        this.registerMoleculerMetrics();
    }

    /**
     * Register Moleculer Transit Core metrics.
     */
    registerMoleculerMetrics() {
        if (!this.broker.isMetricsEnabled()) return;

        this.metrics
            .register({
                name: METRIC.MOLECULER_TRANSIT_READY,
                type: METRIC.TYPE_GAUGE,
                description: "Transit is ready"
            })
            .set(0);
        this.metrics
            .register({
                name: METRIC.MOLECULER_TRANSIT_CONNECTED,
                type: METRIC.TYPE_GAUGE,
                description: "Transit is connected"
            })
            .set(0);

        this.metrics.register({
            name: METRIC.MOLECULER_TRANSIT_PONG_TIME,
            type: METRIC.TYPE_GAUGE,
            labelNames: ["targetNodeID"],
            description: "Ping time"
        });
        this.metrics.register({
            name: METRIC.MOLECULER_TRANSIT_PONG_SYSTIME_DIFF,
            type: METRIC.TYPE_GAUGE,
            labelNames: ["targetNodeID"],
            description: "System time difference between nodes"
        });

        this.metrics.register({
            name: METRIC.MOLECULER_TRANSIT_ORPHAN_RESPONSE_TOTAL,
            type: METRIC.TYPE_COUNTER,
            description: "Number of orphan responses"
        });
    }

    /**
     * It will be called after transporter connected or reconnected.
     *
     * @param {any} wasReconnect
     * @returns {Promise}
     *
     * @memberof Transit
     */
    afterConnect(wasReconnect) {
        return this.Promise.resolve()

            .then(() => {
                if (wasReconnect) {
                    // After reconnecting, we should send a broadcast INFO packet because there may new nodes.
                    // In case of disabled balancer, it triggers the `makeBalancedSubscriptions` method.
                    return this.discoverer.sendLocalNodeInfo();
                } else {
                    // After connecting we should subscribe to topics
                    return this.makeSubscriptions();
                }
            })

            .then(() => this.discoverer.discoverAllNodes())
            .delay(500) // Waiting for incoming INFO packets

            .then(() => {
                this.connected = true;
                this.metrics.set(METRIC.MOLECULER_TRANSIT_CONNECTED, 1);

                this.broker.broadcastLocal("$transporter.connected", {
                    wasReconnect: !!wasReconnect
                });

                if (this.__connectResolve) {
                    this.isReady = true;
                    this.__connectResolve();
                    this.__connectResolve = null;
                }

                return null;
            });
    }

    /**
     * Connect with transporter. If failed, try again after 5 sec.
     *
     * @memberof Transit
     */
    connect() {
        this.logger.info("Connecting to the transporter...");
        return new this.Promise(resolve => {
            this.__connectResolve = resolve;

            const doConnect = () => {
                let reconnectStarted = false;

                /* istanbul ignore next */
                const errorHandler = err => {
                    if (this.disconnecting) return;
                    if (reconnectStarted) return;

                    this.logger.warn(
                        "Connection is failed.",
                        (err && err.message) || "Unknown error"
                    );
                    this.logger.debug(err);

                    if (this.opts.disableReconnect) {
                        return;
                    }

                    reconnectStarted = true;

                    setTimeout(() => {
                        this.logger.info("Reconnecting...");
                        doConnect();
                    }, 5 * 1000);
                };
                /* istanbul ignore next */
                this.tx.connect(errorHandler).catch(errorHandler);
            };

            doConnect();
        });
    }

    /**
     * Disconnect with transporter
     *
     * @memberof Transit
     */
    disconnect() {
        this.connected = false;
        this.isReady = false;
        this.disconnecting = true;
        this.metrics.set(METRIC.MOLECULER_TRANSIT_CONNECTED, 0);

        this.broker.broadcastLocal("$transporter.disconnected", { graceFul: true });

        return this.Promise.resolve()
            .then(() => {
                return this.tx.connected && this.discoverer.localNodeDisconnected();
            })
            .then(() => this.tx.disconnect())
            .then(() => (this.disconnecting = false));
    }

    /**
     * Local broker is ready (all services loaded).
     * Send INFO packet to all other nodes
     */
    ready() {
        if (this.connected) {
            this.metrics.set(METRIC.MOLECULER_TRANSIT_READY, 1);
            // We do nothing here because INFO packets are sent during the starting process.
            return;
        }
    }

    /**
     * Send DISCONNECT to remote nodes
     *
     * @returns {Promise}
     *
     * @memberof Transit
     */
    sendDisconnectPacket() {
        return this.publish(new Packet(P.PACKET_DISCONNECT)).catch(
            /* istanbul ignore next */ err =>
                this.logger.debug("Unable to send DISCONNECT packet.", err)
        );
    }

    /**
     * Subscribe to topics for transportation
     *
     * @memberof Transit
     */
    makeSubscriptions() {
        this.subscribing = this.tx
            .makeSubscriptions([
                // Subscribe to broadcast events
                { cmd: P.PACKET_EVENT, nodeID: this.nodeID },

                // Subscribe to requests
                { cmd: P.PACKET_REQUEST, nodeID: this.nodeID },

                // Subscribe to node responses of requests
                { cmd: P.PACKET_RESPONSE, nodeID: this.nodeID },

                // Discover handler
                { cmd: P.PACKET_DISCOVER },
                { cmd: P.PACKET_DISCOVER, nodeID: this.nodeID },

                // NodeInfo handler
                { cmd: P.PACKET_INFO }, // Broadcasted INFO. If a new node connected
                { cmd: P.PACKET_INFO, nodeID: this.nodeID }, // Response INFO to DISCOVER packet

                // Disconnect handler
                { cmd: P.PACKET_DISCONNECT },

                // Heartbeat handler
                { cmd: P.PACKET_HEARTBEAT },

                // Ping handler
                { cmd: P.PACKET_PING }, // Broadcasted
                { cmd: P.PACKET_PING, nodeID: this.nodeID }, // Targeted

                // Pong handler
                { cmd: P.PACKET_PONG, nodeID: this.nodeID }
            ])
            .then(() => {
                this.subscribing = null;
            });

        return this.subscribing;
    }

    /**
     * Message handler for incoming packets
     *
     * @param {Array} topic
     * @param {String} msg
     * @returns {Promise<boolean>} If packet is processed resolve with `true` else `false`
     *
     * @memberof Transit
     */
    messageHandler(cmd, packet) {
        try {
            const payload = packet.payload;

            // Check payload
            if (!payload) {
                /* istanbul ignore next */
                throw new E.MoleculerServerError(
                    "Missing response payload.",
                    500,
                    "MISSING_PAYLOAD"
                );
            }

            // Check protocol version
            if (payload.ver !== this.broker.PROTOCOL_VERSION && !this.opts.disableVersionCheck) {
                throw new E.ProtocolVersionMismatchError({
                    nodeID: payload.sender,
                    actual: this.broker.PROTOCOL_VERSION,
                    received: payload.ver
                });
            }

            if (payload.sender === this.nodeID) {
                // Detect nodeID conflict
                if (cmd === P.PACKET_INFO && payload.instanceID !== this.instanceID) {
                    this.broker.fatal(
                        "ServiceBroker has detected a nodeID conflict, use unique nodeIDs. ServiceBroker stopped."
                    );
                    return this.Promise.resolve(false);
                }

                // Skip own packets (if only built-in balancer disabled)
                if (cmd !== P.PACKET_EVENT && cmd !== P.PACKET_REQUEST && cmd !== P.PACKET_RESPONSE)
                    return this.Promise.resolve(false);
            }

            // Request
            if (cmd === P.PACKET_REQUEST) {
                return this.requestHandler(payload).then(() => true);
            }

            // Response
            else if (cmd === P.PACKET_RESPONSE) {
                this.responseHandler(payload);
            }

            // Event
            else if (cmd === P.PACKET_EVENT) {
                return this.eventHandler(payload);
            }

            // Discover
            else if (cmd === P.PACKET_DISCOVER) {
                this.discoverer.sendLocalNodeInfo(payload.sender);
            }

            // Node info
            else if (cmd === P.PACKET_INFO) {
                this.discoverer.processRemoteNodeInfo(payload.sender, payload);
            }

            // Disconnect
            else if (cmd === P.PACKET_DISCONNECT) {
                this.discoverer.remoteNodeDisconnected(payload.sender, false);
            }

            // Heartbeat
            else if (cmd === P.PACKET_HEARTBEAT) {
                this.discoverer.heartbeatReceived(payload.sender, payload);
            }

            // Ping
            else if (cmd === P.PACKET_PING) {
                this.sendPong(payload);
            }

            // Pong
            else if (cmd === P.PACKET_PONG) {
                this.processPong(payload);
            }

            return this.Promise.resolve(true);
        } catch (err) {
            this.logger.error(err, cmd, packet);

            this.broker.broadcastLocal("$transit.error", {
                error: err,
                module: "transit",
                type: C.FAILED_PROCESSING_PACKET
            });
        }
        return this.Promise.resolve(false);
    }

    /**
     * Handle incoming event
     *
     * @param {any} payload
     * @returns {Promise<boolean>}
     * @memberof Transit
     */
    eventHandler(payload) {
        this.logger.debug(
            `Event '${payload.event}' received from '${payload.sender}' node` +
                (payload.groups ? ` in '${payload.groups.join(", ")}' group(s)` : "") +
                "."
        );

        if (this.broker.stopping) {
            this.logger.warn(
                `Incoming '${payload.event}' event from '${payload.sender}' node is dropped, because broker is stopped.`
            );
            // return false so the transporter knows this event wasn't handled.
            return this.Promise.resolve(false);
        }

        // Create caller context
        const ctx = new this.broker.ContextFactory(this.broker);
        ctx.id = payload.id;
        ctx.eventName = payload.event;
        ctx.setParams(payload.data, this.broker.options.contextParamsCloning);
        ctx.eventGroups = payload.groups;
        ctx.eventType = payload.broadcast ? "broadcast" : "emit";
        ctx.meta = payload.meta || {};
        ctx.level = payload.level;
        ctx.tracing = !!payload.tracing;
        ctx.parentID = payload.parentID;
        ctx.requestID = payload.requestID;
        ctx.caller = payload.caller;
        ctx.nodeID = payload.sender;

        // ensure the eventHandler resolves true when the event was handled successfully
        return this.broker.emitLocalServices(ctx).then(() => true);
    }

    /**
     * Handle incoming request
     *
     * @param {Object} payload
     * @returns {Promise<any>}
     * @memberof Transit
     */
    requestHandler(payload) {
        const requestID = payload.requestID ? "with requestID '" + payload.requestID + "' " : "";
        this.logger.debug(
            `<= Request '${payload.action}' ${requestID}received from '${payload.sender}' node.`
        );

        try {
            if (this.broker.stopping) {
                this.logger.warn(
                    `Incoming '${payload.action}' ${requestID}request from '${payload.sender}' node is dropped because broker is stopped.`
                );
                throw new E.ServiceNotAvailableError({
                    action: payload.action,
                    nodeID: this.nodeID
                });
            }

            let pass;
            if (payload.stream !== undefined) {
                pass = this._handleIncomingRequestStream(payload);
                // eslint-disable-next-line security/detect-possible-timing-attacks
                if (pass === null) return this.Promise.resolve();
            }

            const endpoint = this.broker._getLocalActionEndpoint(payload.action);

            // Recreate caller context
            const ctx = new this.broker.ContextFactory(this.broker);
            ctx.setEndpoint(endpoint);
            ctx.id = payload.id;
            ctx.setParams(pass ? pass : payload.params, this.broker.options.contextParamsCloning);
            ctx.parentID = payload.parentID;
            ctx.requestID = payload.requestID;
            ctx.caller = payload.caller;
            ctx.meta = payload.meta || {};
            ctx.level = payload.level;
            ctx.tracing = payload.tracing;
            ctx.nodeID = payload.sender;

            if (payload.timeout != null) ctx.options.timeout = payload.timeout;

            const p = endpoint.action.handler(ctx);
            // Pointer to Context
            p.ctx = ctx;

            return p
                .then(res => this.sendResponse(payload.sender, payload.id, ctx.meta, res, null))
                .catch(err => this.sendResponse(payload.sender, payload.id, ctx.meta, null, err));
        } catch (err) {
            return this.sendResponse(payload.sender, payload.id, payload.meta, null, err);
        }
    }

    /**
     * Handle incoming request stream.
     *
     * @param {Object} payload
     * @returns {Stream}
     */
    _handleIncomingRequestStream(payload) {
        let pass = this.pendingReqStreams.get(payload.id);
        let isNew = false;

        if (!payload.stream && !pass && !payload.seq) {
            // It is not a stream data
            return false;
        }

        if (!pass) {
            isNew = true;
            this.logger.debug(
                `<= New stream is received from '${payload.sender}'. Seq: ${payload.seq}`
            );

            // Create a new pass stream
            pass = new Transform({
                // TODO: It's incorrect because the chunks may receive in random order, so it processes an empty meta.
                // Meta is filled correctly only in the 0. chunk.
                objectMode: payload.meta && payload.meta["$streamObjectMode"],
                transform: function (chunk, encoding, done) {
                    this.push(chunk);
                    return done();
                }
            });

            pass.$prevSeq = -1;
            pass.$pool = new Map();

            this.pendingReqStreams.set(payload.id, pass);
        }

        if (payload.seq > pass.$prevSeq + 1) {
            // Some chunks are late. Store these chunks.
            this.logger.debug(
                `Put the chunk into pool (size: ${pass.$pool.size}). Seq: ${payload.seq}`
            );

            pass.$pool.set(payload.seq, payload);

            // TODO: start timer.
            // TODO: check length of pool.
            // TODO: reset seq

            return null;
        }

        // the next stream chunk received
        pass.$prevSeq = payload.seq;

        if (pass.$prevSeq > 0) {
            if (!payload.stream) {
                // Check stream error
                if (payload.meta && payload.meta["$streamError"]) {
                    pass.emit(
                        "error",
                        this._createErrFromPayload(payload.meta["$streamError"], payload)
                    );
                }

                this.logger.debug(
                    `<= Stream closing is received from '${payload.sender}'. Seq: ${payload.seq}`
                );

                // End of stream
                pass.end();

                // Remove pending request stream
                this.pendingReqStreams.delete(payload.id);

                return null;
            } else {
                this.logger.debug(
                    `<= Stream chunk is received from '${payload.sender}'. Seq: ${payload.seq}`
                );
                pass.write(
                    payload.params.type === "Buffer"
                        ? Buffer.from(payload.params.data)
                        : payload.params
                );
            }
        }

        // Check newer chunks in the pool
        if (pass.$pool.size > 0) {
            this.logger.debug(`Has stored packets. Size: ${pass.$pool.size}`);
            const nextSeq = pass.$prevSeq + 1;
            const nextPacket = pass.$pool.get(nextSeq);
            if (nextPacket) {
                pass.$pool.delete(nextSeq);
                setImmediate(() => this.requestHandler(nextPacket));
            }
        }

        return pass && payload.seq == 0 ? pass : null;
    }

    /**
     * Create an Error instance from payload ata
     * @param {Object} error
     * @param {Object} payload
     */
    _createErrFromPayload(error, payload) {
        return this.errorRegenerator.restore(error, payload);
    }

    /**
     * Process incoming response of request
     *
     * @param {Object} packet
     *
     * @memberof Transit
     */
    responseHandler(packet) {
        const id = packet.id;
        const req = this.pendingRequests.get(id);

        // If not exists (timed out), we skip response processing
        if (req == null) {
            this.logger.debug(
                "Orphan response is received. Maybe the request is timed out earlier. ID:",
                packet.id,
                ", Sender:",
                packet.sender
            );
            this.metrics.increment(METRIC.MOLECULER_TRANSIT_ORPHAN_RESPONSE_TOTAL);
            return;
        }

        this.logger.debug(`<= Response '${req.action.name}' is received from '${packet.sender}'.`);

        // Update nodeID in context (if it uses external balancer)
        req.ctx.nodeID = packet.sender;

        // Merge response meta with original meta
        Object.assign(req.ctx.meta || {}, packet.meta || {});

        // Handle stream response
        if (packet.stream != null) {
            if (this._handleIncomingResponseStream(packet, req)) return;
        }

        // Remove pending request
        this.removePendingRequest(id);

        if (!packet.success) {
            req.reject(this._createErrFromPayload(packet.error, packet));
        } else {
            req.resolve(packet.data);
        }
    }

    /**
     * Handle incoming response stream.
     *
     * @param {Object} packet
     * @param {Object} req
     */
    _handleIncomingResponseStream(packet, req) {
        let pass = this.pendingResStreams.get(packet.id);
        if (!pass && !packet.stream && !packet.seq) return false;

        if (!pass) {
            this.logger.debug(
                `<= New stream is received from '${packet.sender}'. Seq: ${packet.seq}`
            );

            pass = new Transform({
                // TODO: It's incorrect because the chunks may receive in random order, so it processes an empty meta.
                // Meta is filled correctly only in the 0. chunk.
                objectMode: packet.meta && packet.meta["$streamObjectMode"],
                transform: function (chunk, encoding, done) {
                    this.push(chunk);
                    return done();
                }
            });

            pass.$prevSeq = -1;
            pass.$pool = new Map();

            this.pendingResStreams.set(packet.id, pass);
        }

        if (packet.seq > pass.$prevSeq + 1) {
            // Some chunks are late. Store these chunks.
            this.logger.debug(
                `Put the chunk into pool (size: ${pass.$pool.size}). Seq: ${packet.seq}`
            );

            pass.$pool.set(packet.seq, packet);

            // TODO: start timer.
            // TODO: check length of pool.
            // TODO: resetting seq.

            return true;
        }

        // the next stream chunk received
        pass.$prevSeq = packet.seq;

        if (pass && packet.seq == 0) {
            req.resolve(pass);
        }

        if (pass.$prevSeq > 0) {
            if (!packet.stream) {
                // Received error?
                if (!packet.success)
                    pass.emit("error", this._createErrFromPayload(packet.error, packet));

                this.logger.debug(
                    `<= Stream closing is received from '${packet.sender}'. Seq: ${packet.seq}`
                );

                // End of stream
                pass.end();

                // Remove pending request
                this.removePendingRequest(packet.id);

                return true;
            } else {
                // stream chunk
                this.logger.debug(
                    `<= Stream chunk is received from '${packet.sender}'. Seq: ${packet.seq}`
                );
                pass.write(
                    packet.data.type === "Buffer" ? Buffer.from(packet.data.data) : packet.data
                );
            }
        }

        // Check newer chunks in the pool
        if (pass.$pool.size > 0) {
            this.logger.debug(`Has stored packets. Size: ${pass.$pool.size}`);
            const nextSeq = pass.$prevSeq + 1;
            const nextPacket = pass.$pool.get(nextSeq);
            if (nextPacket) {
                pass.$pool.delete(nextSeq);
                setImmediate(() => this.responseHandler(nextPacket));
            }
        }

        return true;
    }

    /**
     * Send a request to a remote service. It returns a Promise
     * what will be resolved when the response received.
     *
     * @param {<Context>} ctx Context of request
     * @returns {Promise}
     *
     * @memberof Transit
     */
    request(ctx) {
        if (this.opts.maxQueueSize && this.pendingRequests.size >= this.opts.maxQueueSize)
            /* istanbul ignore next */
            return this.Promise.reject(
                new E.QueueIsFullError({
                    action: ctx.action.name,
                    nodeID: this.nodeID,
                    size: this.pendingRequests.size,
                    limit: this.opts.maxQueueSize
                })
            );

        // Expanded the code that v8 can optimize it.  (TryCatchStatement disable optimizing)
        return new this.Promise((resolve, reject) => this._sendRequest(ctx, resolve, reject));
    }

    /**
     * Send a remote request
     *
     * @param {<Context>} ctx      Context of request
     * @param {Function} resolve   Resolve of Promise
     * @param {Function} reject    Reject of Promise
     *
     * @memberof Transit
     */
    _sendRequest(ctx, resolve, reject) {
        const isStream =
            ctx.params &&
            ctx.params.readable === true &&
            typeof ctx.params.on === "function" &&
            typeof ctx.params.pipe === "function";

        const request = {
            action: ctx.action,
            nodeID: ctx.nodeID,
            ctx,
            resolve,
            reject,
            stream: isStream // ???
        };

        const payload = {
            id: ctx.id,
            action: ctx.action.name,
            params: isStream ? null : ctx.params,
            meta: ctx.meta,
            timeout: ctx.options.timeout,
            level: ctx.level,
            tracing: ctx.tracing,
            parentID: ctx.parentID,
            requestID: ctx.requestID,
            caller: ctx.caller,
            stream: isStream
        };

        if (payload.stream) {
            if (
                ctx.params.readableObjectMode === true ||
                (ctx.params._readableState && ctx.params._readableState.objectMode === true)
            ) {
                payload.meta = payload.meta || {};
                payload.meta["$streamObjectMode"] = true;
            }
            payload.seq = 0;
        }

        const packet = new Packet(P.PACKET_REQUEST, ctx.nodeID, payload);

        const nodeName = ctx.nodeID ? `'${ctx.nodeID}'` : "someone";
        const requestID = ctx.requestID ? "with requestID '" + ctx.requestID + "' " : "";
        this.logger.debug(`=> Send '${ctx.action.name}' request ${requestID}to ${nodeName} node.`);

        const publishCatch = /* istanbul ignore next */ err => {
            this.logger.error(
                `Unable to send '${ctx.action.name}' request ${requestID}to ${nodeName} node.`,
                err
            );

            this.broker.broadcastLocal("$transit.error", {
                error: err,
                module: "transit",
                type: C.FAILED_SEND_REQUEST_PACKET
            });
        };

        // Add to pendings
        this.pendingRequests.set(ctx.id, request);

        // Publish request
        return this.publish(packet)
            .then(() => {
                if (isStream) {
                    // Skip to send ctx.meta with chunks because it doesn't appear on the remote side.
                    payload.meta = {};
                    // Still send information about objectMode in case of packets are received in wrong order
                    if (
                        ctx.params.readableObjectMode === true ||
                        (ctx.params._readableState && ctx.params._readableState.objectMode === true)
                    ) {
                        payload.meta["$streamObjectMode"] = true;
                    }

                    const stream = ctx.params;
                    stream.on("data", chunk => {
                        stream.pause();
                        const chunks = [];
                        if (
                            chunk instanceof Buffer &&
                            this.opts.maxChunkSize > 0 &&
                            chunk.length > this.opts.maxChunkSize
                        ) {
                            let len = chunk.length;
                            let i = 0;
                            while (i < len) {
                                chunks.push(chunk.slice(i, (i += this.opts.maxChunkSize)));
                            }
                        } else {
                            chunks.push(chunk);
                        }

                        return this.Promise.all(
                            chunks.map(ch => {
                                const copy = Object.assign({}, payload);
                                copy.seq = ++payload.seq;
                                copy.stream = true;
                                copy.params = ch;

                                this.logger.debug(
                                    `=> Send stream chunk ${requestID}to ${nodeName} node. Seq: ${copy.seq}`
                                );

                                return this.publish(new Packet(P.PACKET_REQUEST, ctx.nodeID, copy));
                            })
                        )
                            .then(() => stream.resume())
                            .catch(publishCatch);
                    });

                    stream.on("end", () => {
                        const copy = Object.assign({}, payload);
                        copy.seq = ++payload.seq;
                        copy.params = null;
                        copy.stream = false;

                        this.logger.debug(
                            `=> Send stream closing ${requestID}to ${nodeName} node. Seq: ${copy.seq}`
                        );

                        return this.publish(new Packet(P.PACKET_REQUEST, ctx.nodeID, copy)).catch(
                            publishCatch
                        );
                    });

                    stream.on("error", err => {
                        const copy = Object.assign({}, payload);
                        copy.seq = ++payload.seq;
                        copy.stream = false;
                        copy.meta["$streamError"] = this._createPayloadErrorField(err, payload);
                        copy.params = null;

                        this.logger.debug(
                            `=> Send stream error ${requestID}to ${nodeName} node.`,
                            copy.meta["$streamError"]
                        );

                        return this.publish(new Packet(P.PACKET_REQUEST, ctx.nodeID, copy)).catch(
                            publishCatch
                        );
                    });
                }
            })
            .catch(err => {
                publishCatch(err);
                reject(err);
            });
    }

    /**
     * Send an event to a remote node.
     * The event is balanced by transporter
     *
     * @param {Context} ctx
     *
     * @memberof Transit
     */
    sendEvent(ctx) {
        const groups = ctx.eventGroups;
        const requestID = ctx.requestID ? "with requestID '" + ctx.requestID + "' " : "";
        if (ctx.endpoint)
            this.logger.debug(
                `=> Send '${ctx.eventName}' event ${requestID}to '${ctx.nodeID}' node` +
                    (groups ? ` in '${groups.join(", ")}' group(s)` : "") +
                    "."
            );
        else
            this.logger.debug(
                `=> Send '${ctx.eventName}' event ${requestID}to '${groups.join(", ")}' group(s).`
            );

        return this.publish(
            new Packet(P.PACKET_EVENT, ctx.endpoint ? ctx.nodeID : null, {
                id: ctx.id,
                event: ctx.eventName,
                data: ctx.params,
                groups,
                broadcast: ctx.eventType == "broadcast",
                meta: ctx.meta,
                level: ctx.level,
                tracing: ctx.tracing,
                parentID: ctx.parentID,
                requestID: ctx.requestID,
                caller: ctx.caller,
                needAck: ctx.needAck
            })
        ).catch(
            /* istanbul ignore next */ err => {
                this.logger.error(
                    `Unable to send '${ctx.eventName}' event ${requestID}to groups.`,
                    err
                );

                this.broker.broadcastLocal("$transit.error", {
                    error: err,
                    module: "transit",
                    type: C.FAILED_SEND_EVENT_PACKET
                });
            }
        );
    }

    /**
     * Remove a pending request
     *
     * @param {any} id
     *
     * @memberof Transit
     */
    removePendingRequest(id) {
        this.pendingRequests.delete(id);

        this.pendingReqStreams.delete(id);
        this.pendingResStreams.delete(id);
    }

    /**
     * Remove a pending request & streams
     *
     * @param {String} nodeID
     *
     * @memberof Transit
     */
    removePendingRequestByNodeID(nodeID) {
        this.logger.debug(`Remove pending requests of '${nodeID}' node.`);
        this.pendingRequests.forEach((req, id) => {
            if (req.nodeID === nodeID) {
                this.pendingRequests.delete(id);

                // Reject the request
                req.reject(
                    new E.RequestRejectedError({
                        action: req.action.name,
                        nodeID: req.nodeID
                    })
                );

                this.pendingReqStreams.delete(id);
                this.pendingResStreams.delete(id);
            }
        });
    }

    /**
     * Create error field in outgoing payload
     *
     * @param {Error} err
     * @param {Object} payload
     * @returns {Object}
     * @memberof Transit
     */
    _createPayloadErrorField(err, payload) {
        return this.errorRegenerator.extractPlainError(err, payload);
    }

    /**
     * Send back the response of request
     *
     * @param {String} nodeID
     * @param {String} id
     * @param {any} meta
     * @param {any} data
     * @param {Error} err
     *
     * @memberof Transit
     */
    sendResponse(nodeID, id, meta, data, err) {
        // Publish the response
        const payload = {
            id: id,
            meta: meta,
            success: err == null,
            data: data
        };

        if (err) payload.error = this._createPayloadErrorField(err, payload);

        const publishCatch = /* istanbul ignore next */ err => {
            this.logger.error(`Unable to send '${id}' response to '${nodeID}' node.`, err);

            this.broker.broadcastLocal("$transit.error", {
                error: err,
                module: "transit",
                type: C.FAILED_SEND_RESPONSE_PACKET
            });
        };

        if (
            data &&
            data.readable === true &&
            typeof data.on === "function" &&
            typeof data.pipe === "function"
        ) {
            // Streaming response
            payload.stream = true;
            if (
                data.readableObjectMode === true ||
                (data._readableState && data._readableState.objectMode === true)
            ) {
                payload.meta = payload.meta || {};
                payload.meta["$streamObjectMode"] = true;
            }
            payload.seq = 0;

            const stream = data;
            stream.pause();

            stream.on("data", chunk => {
                stream.pause();
                const chunks = [];
                if (
                    chunk instanceof Buffer &&
                    this.opts.maxChunkSize > 0 &&
                    chunk.length > this.opts.maxChunkSize
                ) {
                    let len = chunk.length;
                    let i = 0;
                    while (i < len) {
                        chunks.push(chunk.slice(i, (i += this.opts.maxChunkSize)));
                    }
                } else {
                    chunks.push(chunk);
                }

                return this.Promise.all(
                    chunks.map(ch => {
                        const copy = Object.assign({}, payload);
                        copy.seq = ++payload.seq;
                        copy.stream = true;
                        copy.data = ch;

                        this.logger.debug(
                            `=> Send stream chunk to ${nodeID} node. Seq: ${copy.seq}`
                        );

                        return this.publish(new Packet(P.PACKET_RESPONSE, nodeID, copy));
                    })
                )
                    .then(() => stream.resume())
                    .catch(publishCatch);
            });

            stream.on("end", () => {
                const copy = Object.assign({}, payload);
                copy.stream = false;
                copy.seq = ++payload.seq;
                copy.data = null;

                this.logger.debug(`=> Send stream closing to ${nodeID} node. Seq: ${copy.seq}`);

                return this.publish(new Packet(P.PACKET_RESPONSE, nodeID, copy)).catch(
                    publishCatch
                );
            });

            stream.on("error", err => {
                const copy = Object.assign({}, payload);
                copy.stream = false;
                copy.seq = ++payload.seq;
                if (err) {
                    copy.success = false;
                    copy.error = this._createPayloadErrorField(err, payload);
                }

                this.logger.debug(`=> Send stream error to ${nodeID} node.`, copy.error);

                return this.publish(new Packet(P.PACKET_RESPONSE, nodeID, copy)).catch(
                    publishCatch
                );
            });

            payload.data = null;
            return this.publish(new Packet(P.PACKET_RESPONSE, nodeID, payload))
                .then(() => {
                    if (payload.stream) stream.resume();
                })
                .catch(publishCatch);
        }

        return this.publish(new Packet(P.PACKET_RESPONSE, nodeID, payload)).catch(publishCatch);
    }

    /**
     * Discover other nodes. It will be called after success connect.
     *
     * @memberof Transit
     */
    discoverNodes() {
        return this.publish(new Packet(P.PACKET_DISCOVER)).catch(
            /* istanbul ignore next */ err => {
                this.logger.error("Unable to send DISCOVER packet.", err);

                this.broker.broadcastLocal("$transit.error", {
                    error: err,
                    module: "transit",
                    type: C.FAILED_NODES_DISCOVERY
                });
            }
        );
    }

    /**
     * Discover a node. It will be called if we got message from an unknown node.
     *
     * @memberof Transit
     */
    discoverNode(nodeID) {
        return this.publish(new Packet(P.PACKET_DISCOVER, nodeID)).catch(
            /* istanbul ignore next */ err => {
                this.logger.error(`Unable to send DISCOVER packet to '${nodeID}' node.`, err);

                this.broker.broadcastLocal("$transit.error", {
                    error: err,
                    module: "transit",
                    type: C.FAILED_NODE_DISCOVERY
                });
            }
        );
    }

    /**
     * Send node info package to other nodes.
     *
     * @memberof Transit
     */
    sendNodeInfo(info, nodeID) {
        if (!this.connected || !this.isReady) return this.Promise.resolve();

        return this.publish(
            new Packet(P.PACKET_INFO, nodeID, {
                services: info.services,
                ipList: info.ipList,
                hostname: info.hostname,
                client: info.client,
                config: info.config,
                instanceID: this.broker.instanceID,
                metadata: info.metadata,
                seq: info.seq
            })
        ).catch(
            /* istanbul ignore next */ err => {
                this.logger.error(`Unable to send INFO packet to '${nodeID}' node.`, err);

                this.broker.broadcastLocal("$transit.error", {
                    error: err,
                    module: "transit",
                    type: C.FAILED_SEND_INFO_PACKET
                });
            }
        );
    }

    /**
     * Send ping to a node (or all nodes if nodeID is null)
     *
     * @param {String} nodeID
     * @param {String} id
     * @returns
     * @memberof Transit
     */
    sendPing(nodeID, id) {
        return this.publish(
            new Packet(P.PACKET_PING, nodeID, {
                time: Date.now(),
                id: id || this.broker.generateUid()
            })
        ).catch(
            /* istanbul ignore next */ err => {
                this.logger.error(`Unable to send PING packet to '${nodeID}' node.`, err);

                this.broker.broadcastLocal("$transit.error", {
                    error: err,
                    module: "transit",
                    type: C.FAILED_SEND_PING_PACKET
                });
            }
        );
    }

    /**
     * Send back pong response
     *
     * @param {Object} payload
     * @returns
     * @memberof Transit
     */
    sendPong(payload) {
        return this.publish(
            new Packet(P.PACKET_PONG, payload.sender, {
                time: payload.time,
                id: payload.id,
                arrived: Date.now()
            })
        ).catch(
            /* istanbul ignore next */ err => {
                this.logger.error(`Unable to send PONG packet to '${payload.sender}' node.`, err);

                this.broker.broadcastLocal("$transit.error", {
                    error: err,
                    module: "transit",
                    type: C.FAILED_SEND_PONG_PACKET
                });
            }
        );
    }

    /**
     * Process incoming PONG packet.
     * Measure ping time & current time difference.
     *
     * @param {Object} payload
     * @memberof Transit
     */
    processPong(payload) {
        const now = Date.now();
        const elapsedTime = now - payload.time;
        const timeDiff = Math.round(now - payload.arrived - elapsedTime / 2);

        // this.logger.debug(`PING-PONG from '${payload.sender}' - Time: ${elapsedTime}ms, Time difference: ${timeDiff}ms`);

        this.broker.broadcastLocal("$node.pong", {
            nodeID: payload.sender,
            elapsedTime,
            timeDiff,
            id: payload.id
        });

        this.metrics.set(METRIC.MOLECULER_TRANSIT_PONG_TIME, elapsedTime, {
            targetNodeID: payload.sender
        });
        this.metrics.set(METRIC.MOLECULER_TRANSIT_PONG_SYSTIME_DIFF, timeDiff, {
            targetNodeID: payload.sender
        });
    }

    /**
     * Send a node heartbeat. It will be called with timer from local Discoverer.
     *
     * @params {Node} localNode
     * @memberof Transit
     */
    sendHeartbeat(localNode) {
        return this.publish(
            new Packet(P.PACKET_HEARTBEAT, null, {
                cpu: localNode.cpu
            })
        ).catch(
            /* istanbul ignore next */ err => {
                this.logger.error("Unable to send HEARTBEAT packet.", err);

                this.broker.broadcastLocal("$transit.error", {
                    error: err,
                    module: "transit",
                    type: C.FAILED_SEND_HEARTBEAT_PACKET
                });
            }
        );
    }

    /**
     * Subscribe via transporter
     *
     * @param {String} topic
     * @param {String=} nodeID
     *
     * @deprecated
     * @memberof Transit
     */
    subscribe(topic, nodeID) {
        return this.tx.subscribe(topic, nodeID);
    }

    /**
     * Publish via transporter
     *
     * @param {Packet} Packet
     *
     * @memberof Transit
     */
    publish(packet) {
        if (this.subscribing) {
            return this.subscribing.then(() => {
                return this.tx.prepublish(packet);
            });
        }
        return this.tx.prepublish(packet);
    }
}

module.exports = Transit;