
View on GitHub


45 mins
Test Coverage
package io.vertx.mod.jet;

import io.vertx.core.Vertx;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Route;
import io.vertx.ext.web.Router;
import io.vertx.mod.jet.atom.JtConfig;
import io.vertx.mod.jet.atom.JtUri;
import io.vertx.mod.jet.init.JtPin;
import io.vertx.mod.jet.init.ServiceEnvironment;
import io.vertx.mod.jet.monitor.JtMonitor;
import io.vertx.mod.jet.uca.aim.*;
import io.vertx.up.boot.handler.CommonEndurer;
import io.vertx.up.eon.KWeb;
import io.vertx.up.extension.AbstractAres;
import io.vertx.up.runtime.ZeroUri;
import io.vertx.up.util.Ut;

import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;

 * Agent entry of dynamic deployment, this component is mount to router also.
 * 1) The dynamic router could be authorized by zero @Wall class
 * 2) The dynamic router will call connection pool of configuration, will manage all the routers in current system.
 * 3) The dynamic router will registry the routers information when booting
public class JetPollux extends AbstractAres {
     * Multi EmApp environment here
    private static final ConcurrentMap<String, ServiceEnvironment> AMBIENT = JtPin.serviceEnvironment();
    private static final AtomicBoolean UNREADY = new AtomicBoolean(Boolean.TRUE);

    private final transient JtMonitor monitor = JtMonitor.create(this.getClass());
    private final transient JetCastor castor;

    public JetPollux(final Vertx vertx) {
        this.castor = JetCastor.create(vertx);

    public void mount(final Router router, final JsonObject config) {
        // MONITOR
        if (Objects.isNull(AMBIENT) || AMBIENT.isEmpty()) {
             * 「Failure」Deployment handler
        } else {
             * 「Booting」( Multi application deployment on Router )
            final Set<JtUri> uriSet = AMBIENT.keySet().stream()
                .flatMap(appId -> AMBIENT.get(appId).routes().stream())
                 * Start up and bind `order` and `config`
                .map(uri -> uri.bind(KWeb.ORDER.DYNAMIC)
                    .<JtUri>bind(Ut.deserialize(config.copy(), JtConfig.class)))
                 * Routing deployment
                .map(uri -> {
                     * Mount the uri into Zero Uri
                    this.resolveUri(uri.method(), uri.path());
                     * 「Route」
                    final Route route = router.route();
                     * Single route registry
                    this.registryUri(route, uri);
                    return uri;
             * 「Starting」
            if (Objects.nonNull(this.castor)) {
                 * Worker deployment
                 * Here caused `block thread issue`, if we deploy agent x 32 and set `instances` of worker to 64
                 * It means that the code below will execute 32 times, and then the system will
                 * deploy 32 x 64 worker instances, it may take long time to do deployment and casued `block thread`
                 * issue. Instead we set UNREADY flag to mean execute worker deployment one time.
                 * Each time the `vert.x` insteance will set worker threads here.
                if (UNREADY.getAndSet(Boolean.FALSE)) {

    private void resolveUri(final HttpMethod method, final String uri) {
        if (Objects.nonNull(uri) && 0 < uri.indexOf(":")) {
            if (0 < uri.indexOf(":")) {
                ZeroUri.resolve(method, uri);

    private void registryUri(final Route route, final JtUri uri) {
        // Uri, Method, Order
        // Consumes / Produces
         * Major Route: EngineAim
         * 1) Pre-Condition
         *      IN_RULE
         *      IN_MAPPING
         *      IN_PLUG
         *      IN_SCRIPT
         * 2) Major code logical ( Could not be configured )
         * 3) Send logical
         *      3.1) Send current request to worker ( Ha )
         *      3.2) Send message to worker
         *      3.3) Let worker consume component
        final JtAim pre = Pool.CC_AIM.pick(() -> Ut.instance(PreAim.class), PreAim.class.getName());
        // Fn.po?lThread(Pool.AIM_PRE_HUBS, () -> Ut.instance(PreAim.class));
        final JtAim in = Pool.CC_AIM.pick(() -> Ut.instance(InAim.class), InAim.class.getName());
        final JtAim engine = Pool.CC_AIM.pick(() -> Ut.instance(EngineAim.class), EngineAim.class.getName());
        final JtAim send = Pool.CC_AIM.pick(() -> Ut.instance(SendAim.class), SendAim.class.getName());
            /* Basic parameter validation / 400 Bad Request */
             * Four rule here
             * Handler major process and workflow
             * Message sender, connect to event bus
             * Failure Handler when error occurs