silentbalanceyh/vertx-zero

View on GitHub
vertx-semper/aeon-eternal/aeon-inlet/src/main/java/io/vertx/up/uca/micro/center/ZeroRegistry.java

Summary

Maintainability
A
0 mins
Test Coverage
package io.vertx.up.uca.micro.center;

import io.horizon.eon.VValue;
import io.horizon.uca.cache.Cc;
import io.horizon.uca.log.Annal;
import io.reactivex.rxjava3.core.Observable;
import io.vertx.core.RpcOptions;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.mod.plugin.etcd.center.EtcdData;
import io.vertx.up.eon.em.Etat;
import io.vertx.up.eon.em.EtcdPath;
import io.vertx.up.exception.booting.EtcdNetworkException;
import io.vertx.up.extension.dot.PluginExtension;
import io.vertx.up.fn.Fn;
import io.vertx.up.uca.registry.UddiRegistry;
import io.vertx.up.util.Ut;
import io.vertx.zero.exception.MicroModeUpException;

import java.text.MessageFormat;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiFunction;

/**
 * Zero registry center to write/read data with Etcd for zero micro service
 * This will be called by ZeroRpcAgent class to write service meta.
 * 1. Status RUNNING/STOPPED/FAILED
 * /zero/ipc/status/{name}/{host}:{port}
 * 2. Services for discovery
 * /zero/ipc/services/{name}/{host}/{port}
 */
public class ZeroRegistry implements UddiRegistry {

    private static final String PATH_STATUS = "/zero/{0}/{1}/services/{2}:{3}:{4}";

    private static final String PATH_CATALOG = "/zero/{0}/{1}/services";

    private static final String ROUTE_TREE = "/zero/{0}/{1}/routes/{2}";
    /*
     * The default path that should not be registry to etcd.
     */
    private static final Set<String> DEFAULTS = new HashSet<String>() {
        {
            this.add("/");
            this.add("/api/");
        }
    };
    private static final Cc<String, ZeroRegistry> CC_REGISTRY = Cc.openThread();

    private final transient Annal logger;
    private final transient EtcdData etcd;

    public ZeroRegistry(final Class<?> useCls) {
        this.etcd = EtcdData.create(useCls);
        this.logger = Annal.get(useCls);
    }

    public static ZeroRegistry create(final Class<?> useCls) {
        return CC_REGISTRY.pick(() -> new ZeroRegistry(useCls));
        // Fn.po?lThread(REGISTRY_MAP, () -> new ZeroRegistry(useCls));
    }

    /**
     * Get current etcd configuration information that initialized
     * in zero system.
     *
     * @return Return zero configuration in etcd
     */
    public JsonArray getConfig() {
        return this.etcd.getConfig();
    }

    public Set<JsonObject> getData(
        final EtcdPath etcdPath,
        final String key,
        final BiFunction<String, JsonArray, Set<JsonObject>> convert
    ) {
        final String path = MessageFormat.format(ROUTE_TREE, this.etcd.getApplication(),
            etcdPath.toString().toLowerCase(), key);
        this.logger.debug(Info.ETCD_READ, path);
        final String node = this.etcd.readData(path);
        final Set<JsonObject> sets = new HashSet<>();
        if (!Ut.isNil(node)) {
            final JsonArray value = new JsonArray(node);
            sets.addAll(convert.apply(key, value));
        }
        return sets;
    }

    public Set<String> getServices(
        final EtcdPath etcdPath) {
        final String path = MessageFormat.format(PATH_CATALOG, this.etcd.getApplication(),
            etcdPath.toString().toLowerCase());
        this.logger.debug(Info.ETCD_READ, path);
        final ConcurrentMap<String, String> nodes = this.etcd.readDir(path, true);
        final Set<String> sets = new HashSet<>();

        Observable.fromIterable(nodes.entrySet())
            .filter(Objects::nonNull)
            .filter(item -> Objects.nonNull(item.getKey()) && Objects.nonNull(item.getValue()))
            .filter(item -> Etat.RUNNING == Ut.toEnum(item::getValue, Etat.class))
            .map(Map.Entry::getKey)
            .subscribe(sets::add).dispose();
        return sets;
    }

    public void erasingStatus(final String name,
                              final String host,
                              final Integer port,
                              final EtcdPath path) {
        final String target = MessageFormat.format(PATH_STATUS, this.etcd.getApplication(),
            path.toString().toLowerCase(), name,
            host, String.valueOf(port));
        this.logger.info(Info.ETCD_CLEAN, name, path);
        this.etcd.write(target, Etat.STOPPED, VValue.ZERO);
    }

    @Override
    public void registryHttp(final String service,
                             final HttpServerOptions options, final Etat etat) {
        final String path = MessageFormat.format(PATH_STATUS, this.etcd.getApplication(),
            EtcdPath.ENDPOINT.toString().toLowerCase(), service,
            Ut.netIPv4(), String.valueOf(options.getPort()));
        this.logger.info(Info.ETCD_STATUS, service, etat, path);
        this.etcd.write(path, etat, VValue.ZERO);
    }

    @Override
    public void initialize(final Class<?> clazz) {
        try {
            EtcdData.create(clazz);
        } catch (final EtcdNetworkException ex) {
            Fn.outBoot(true, this.logger, MicroModeUpException.class, clazz, ex.getMessage());
        }
    }

    public void registryRpc(final RpcOptions options, final Etat etat) {
        final String path = MessageFormat.format(PATH_STATUS, this.etcd.getApplication(),
            EtcdPath.IPC.toString().toLowerCase(), options.getName(),
            Ut.netIPv4(), String.valueOf(options.getPort()));
        this.logger.info(Info.ETCD_STATUS, options.getName(), etat, path);
        this.etcd.write(path, etat, VValue.ZERO);
    }

    public void registryIpcs(final RpcOptions options, final Set<String> ipcs) {
        final String path = MessageFormat.format(ROUTE_TREE, this.etcd.getApplication(),
            EtcdPath.IPC.toString().toLowerCase(),
            MessageFormat.format("{0}:{1}:{2}", options.getName(),
                Ut.netIPv4(), String.valueOf(options.getPort())));
        final String host = Ut.netIPv4();
        final String endpoint = MessageFormat.format("grpc://{0}:{1}",
            host,
            String.valueOf(options.getPort()));
        // Screen Information
        final StringBuilder builder = new StringBuilder();
        for (final String ipc : ipcs) {
            builder.append("\n\t[ Up Rpc √ ] \t").append(ipc);
        }
        this.logger.info(Info.ETCD_IPCS, this.etcd.getApplication(),
            path, options.getName(), endpoint, builder.toString());
        // Build Data
        final JsonArray routeData = new JsonArray();
        Observable.fromIterable(ipcs)
            .subscribe(routeData::add).dispose();
        this.etcd.write(path, routeData, VValue.ZERO);
    }

    @Override
    public void registryRoute(final String name,
                              final HttpServerOptions options, final Set<String> routes) {
        /*
         * Console information for report micro service.
         */
        final String path = MessageFormat.format(ROUTE_TREE, this.etcd.getApplication(),
            EtcdPath.ENDPOINT.toString().toLowerCase(),
            MessageFormat.format("{0}:{1}:{2}", name,
                Ut.netIPv4(), String.valueOf(options.getPort())));
        final String host = Ut.netIPv4();
        final String endpoint = MessageFormat.format("http://{0}:{1}",
            host,
            String.valueOf(options.getPort()));
        /*
         * Sort naturally
         */
        final Set<String> processed = new TreeSet<>();
        routes.stream().filter(route -> !DEFAULTS.contains(route))
            .forEach(processed::add);
        /*
         * Scan configuration file and capture plug-in data
         * (Sync) Plug-in in sync mode
         */
        PluginExtension.ZeroRegistry.registryRoute(processed);

        // Screen Information
        final StringBuilder builder = new StringBuilder();
        for (final String route : processed) {
            builder.append("\n\t[ Up Micro ] \t").append(route);
        }
        this.logger.info(Info.ETCD_ROUTE, this.etcd.getApplication(),
            path, name, endpoint, builder.toString());

        // Build Data
        final JsonArray routeData = new JsonArray();
        Observable.fromIterable(processed)
            .subscribe(routeData::add).dispose();
        this.etcd.write(path, routeData, VValue.ZERO);
    }
}