moleculerjs/moleculer

View on GitHub
src/registry/discoverers/etcd3.js

Summary

Maintainability
A
0 mins
Test Coverage
/*
 * moleculer
 * Copyright (c) 2020 MoleculerJS (https://github.com/moleculerjs/moleculer)
 * MIT Licensed
 */

"use strict";

const _ = require("lodash");
const kleur = require("kleur");
const BaseDiscoverer = require("./base");
const { METRIC } = require("../../metrics");
const Serializers = require("../../serializers");
const { removeFromArray, randomInt } = require("../../utils");
const P = require("../../packets");
const C = require("../../constants");

let ETCD3;

/**
 * etcd3-based Discoverer class
 *
 * @class Etcd3Discoverer
 */
class Etcd3Discoverer extends BaseDiscoverer {
    /**
     * Creates an instance of Discoverer.
     *
     * TODO:
     *     - the etcd3 lib has no reconnection logic
     *
     * @memberof Etcd3Discoverer
     */
    constructor(opts) {
        if (typeof opts === "string") opts = { etcd: { hosts: opts.replace(/etcd3:\/\//g, "") } };

        super(opts);

        this.opts = _.defaultsDeep(this.opts, {
            etcd: undefined,
            serializer: "JSON",
            fullCheck: 10 // Disable with `0` or `null`
        });

        // Loop counter for full checks. Starts from a random value for better distribution
        this.idx = this.opts.fullCheck > 1 ? randomInt(this.opts.fullCheck - 1) : 0;

        // Etcd client instance
        this.client = null;

        // Last sequence numbers
        this.lastInfoSeq = 0;
        this.lastBeatSeq = 0;

        // Leases
        this.leaseBeat = null;
        this.leaseInfo = null;
    }

    /**
     * Initialize Discoverer
     *
     * @param {any} registry
     *
     * @memberof Etcd3Discoverer
     */
    init(registry) {
        super.init(registry);

        try {
            ETCD3 = require("etcd3");
        } catch (err) {
            /* istanbul ignore next */
            this.broker.fatal(
                "The 'etcd3' package is missing. Please install it with 'npm install etcd3 --save' command.",
                err,
                true
            );
        }

        this.logger.warn(
            kleur
                .yellow()
                .bold("Etcd3 Discoverer is an EXPERIMENTAL module. Do NOT use it in production!")
        );

        this.instanceHash = this.broker.instanceID.substring(0, 8);

        this.PREFIX = `moleculer${
            this.broker.namespace ? "-" + this.broker.namespace : ""
        }/discovery`;
        this.BEAT_KEY = `${this.PREFIX}/beats/${this.broker.nodeID}/${this.instanceHash}`;
        this.INFO_KEY = `${this.PREFIX}/info/${this.broker.nodeID}`;

        this.client = new ETCD3.Etcd3(this.opts.etcd);

        // create an instance of serializer (default to JSON)
        this.serializer = Serializers.resolve(this.opts.serializer);
        this.serializer.init(this.broker);

        this.logger.debug("Etcd3 Discoverer created. Prefix:", this.PREFIX);
    }

    /**
     * Stop discoverer clients.
     */
    stop() {
        return super.stop().then(() => {
            if (this.client) return this.client.close();
        });
    }

    /**
     * Register Moleculer Transit Core metrics.
     */
    registerMoleculerMetrics() {
        this.broker.metrics.register({
            name: METRIC.MOLECULER_DISCOVERER_ETCD_COLLECT_TOTAL,
            type: METRIC.TYPE_COUNTER,
            rate: true,
            description: "Number of Service Registry fetching from etcd"
        });
        this.broker.metrics.register({
            name: METRIC.MOLECULER_DISCOVERER_ETCD_COLLECT_TIME,
            type: METRIC.TYPE_HISTOGRAM,
            quantiles: true,
            unit: METRIC.UNIT_MILLISECONDS,
            description: "Time of Service Registry fetching from etcd"
        });
    }

    /**
     * Sending a local heartbeat to etcd.
     */
    sendHeartbeat() {
        const timeEnd = this.broker.metrics.timer(METRIC.MOLECULER_DISCOVERER_ETCD_COLLECT_TIME);
        const data = {
            sender: this.broker.nodeID,
            ver: this.broker.PROTOCOL_VERSION,

            //timestamp: Date.now(),
            cpu: this.localNode.cpu,
            seq: this.localNode.seq,
            instanceID: this.broker.instanceID
        };

        const seq = this.localNode.seq;
        const key = this.BEAT_KEY + "/" + seq;
        let leaseBeat = this.leaseBeat;

        return this.Promise.resolve()
            .then(() => {
                if (leaseBeat) {
                    if (seq != this.lastBeatSeq) {
                        // If seq changed, revoke the current lease
                        const p = leaseBeat.revoke();
                        leaseBeat = null;
                        return p;
                    }
                }
            })
            .then(() => {
                if (!leaseBeat) {
                    // Create a new for lease
                    leaseBeat = this.client.lease(this.opts.heartbeatTimeout);

                    //Handle lease-lost event. Release lease when lost. Next heartbeat will request a new lease
                    leaseBeat.on("lost", err => {
                        this.logger.warn(
                            "Lost heartbeat lease. Dropping lease and retrying heartbeat. Error:",
                            err.message
                        );
                        leaseBeat.release();
                        this.leaseBeat = null;
                        // if broker is connected, send heartbeat immediately. Otherwise it is sent on reconnect.
                        if (this.broker.transit.connected) {
                            this.sendHeartbeat();
                        }
                    });

                    return leaseBeat
                        .grant() // Waiting for the lease creation on the server
                        .then(() => (this.leaseBeat = leaseBeat));
                }
            })
            .then(() =>
                this.leaseBeat.put(key).value(this.serializer.serialize(data, P.PACKET_HEARTBEAT))
            )
            .then(() => (this.lastBeatSeq = seq))
            .then(() => this.collectOnlineNodes())
            .catch(err => {
                this.logger.error("Error occurred while collect etcd keys.", err);

                this.broker.broadcastLocal("$discoverer.error", {
                    error: err,
                    module: "discoverer",
                    type: C.FAILED_COLLECT_KEYS
                });
            })
            .then(() => {
                timeEnd();
                this.broker.metrics.increment(METRIC.MOLECULER_DISCOVERER_ETCD_COLLECT_TOTAL);
            });
    }

    /**
     * Collect online nodes from etcd server.
     */
    collectOnlineNodes() {
        // Get the current node list so that we can check the disconnected nodes.
        const prevNodes = this.registry.nodes
            .list({ onlyAvailable: true, withServices: false })
            .map(node => node.id)
            .filter(nodeID => nodeID !== this.broker.nodeID);

        // Collect the online node keys.
        return this.Promise.resolve()
            .then(() => {
                if (this.opts.fullCheck && ++this.idx % this.opts.fullCheck == 0) {
                    // Full check
                    //this.logger.debug("Full check", this.idx);
                    this.idx = 0;

                    return this.client
                        .getAll()
                        .prefix(`${this.PREFIX}/beats/`)
                        .buffers()
                        .then(result =>
                            Object.values(result).map(raw => {
                                try {
                                    return this.serializer.deserialize(raw, P.PACKET_INFO);
                                } catch (err) {
                                    this.logger.warn("Unable to parse HEARTBEAT packet", err, raw);
                                }
                            })
                        );
                } else {
                    //this.logger.debug("Lazy check", this.idx);
                    // Lazy check
                    return this.client
                        .getAll()
                        .prefix(`${this.PREFIX}/beats/`)
                        .keys()
                        .then(keys =>
                            keys.map(key => {
                                const p = key.substring(`${this.PREFIX}/beats/`.length).split("/");
                                return {
                                    key,
                                    sender: p[0],
                                    instanceID: p[1],
                                    seq: Number(p[2])
                                };
                            })
                        );
                }
            })

            .then(packets => {
                _.compact(packets).map(packet => {
                    if (packet.sender == this.broker.nodeID) return;

                    removeFromArray(prevNodes, packet.sender);
                    this.heartbeatReceived(packet.sender, packet);
                });
            })

            .then(() => {
                if (prevNodes.length > 0) {
                    // Disconnected nodes
                    prevNodes.map(nodeID => {
                        this.logger.info(
                            `The node '${nodeID}' is not available. Removing from registry...`
                        );
                        this.remoteNodeDisconnected(nodeID, true);
                    });
                }
            });
    }

    /**
     * Discover a new or old node.
     *
     * @param {String} nodeID
     */
    discoverNode(nodeID) {
        return this.client
            .get(`${this.PREFIX}/info/${nodeID}`)
            .buffer()
            .then(res => {
                if (!res) {
                    this.logger.warn(`No INFO for '${nodeID}' node in registry.`);
                    return;
                }
                try {
                    const info = this.serializer.deserialize(res, P.PACKET_INFO);
                    return this.processRemoteNodeInfo(nodeID, info);
                } catch (err) {
                    this.logger.warn("Unable to parse INFO packet", err, res);
                }
            });
    }

    /**
     * Discover all nodes (after connected)
     */
    discoverAllNodes() {
        return this.collectOnlineNodes();
    }

    /**
     * Local service registry has been changed. We should notify remote nodes.
     * @param {String} nodeID
     */
    sendLocalNodeInfo(nodeID) {
        const info = this.broker.getLocalNodeInfo();

        const payload = Object.assign(
            {
                ver: this.broker.PROTOCOL_VERSION,
                sender: this.broker.nodeID
            },
            info
        );

        const key = this.INFO_KEY;
        const seq = this.localNode.seq;
        let leaseInfo = this.leaseInfo;

        const p =
            !nodeID && this.broker.options.disableBalancer
                ? this.transit.tx.makeBalancedSubscriptions()
                : this.Promise.resolve();
        return p
            .then(() => {
                if (leaseInfo) {
                    if (seq != this.lastInfoSeq) {
                        const p = leaseInfo.revoke();
                        leaseInfo = null;
                        return p;
                    }
                }
            })
            .then(() => {
                if (!leaseInfo) {
                    leaseInfo = this.client.lease(60);

                    //Handle lease-lost event. Release lease when lost. Next heartbeat will request a new lease
                    leaseInfo.on("lost", err => {
                        this.logger.warn(
                            "Lost info lease. Dropping lease and retrying info-send. Error:",
                            err.message
                        );
                        leaseInfo.release();
                        this.leaseInfo = null;
                        // if broker is connected, send local node info immediately. Otherwise it is sent on reconnect.
                        if (this.broker.transit.connected) {
                            this.sendLocalNodeInfo(nodeID);
                        }
                    });

                    return leaseInfo
                        .grant() // Waiting for the lease creation on the server
                        .then(() => (this.leaseInfo = leaseInfo));
                }
            })
            .then(() => leaseInfo.put(key).value(this.serializer.serialize(payload, P.PACKET_INFO)))
            .then(() => {
                this.lastInfoSeq = seq;

                // Sending a new heartbeat because it contains the `seq`
                if (!nodeID) return this.beat();
            })
            .catch(err => {
                this.logger.error("Unable to send INFO to etcd server", err);

                this.broker.broadcastLocal("$discoverer.error", {
                    error: err,
                    module: "discoverer",
                    type: C.FAILED_SEND_INFO
                });
            });
    }

    /**
     * Unregister local node after disconnecting.
     */
    localNodeDisconnected() {
        return this.Promise.resolve()
            .then(() => super.localNodeDisconnected())
            .then(() => this.logger.debug("Remove local node from registry..."))
            .then(() => this.client.delete().key(this.INFO_KEY))
            .then(() => this.client.delete().key(this.BEAT_KEY))
            .then(() => {
                if (this.leaseBeat) return this.leaseBeat.revoke();
            })
            .then(() => {
                if (this.leaseInfo) return this.leaseInfo.revoke();
            });
    }
}

module.exports = Etcd3Discoverer;