silentbalanceyh/vertx-zero

View on GitHub
vertx-gaia/vertx-up/src/main/jib/io/vertx/up/uca/job/center/AbstractAgha.java

Summary

Maintainability
A
1 hr
Test Coverage
package io.vertx.up.uca.job.center;

import io.horizon.eon.VMessage;
import io.horizon.fn.Actuator;
import io.horizon.uca.log.Annal;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.WorkerExecutor;
import io.vertx.core.impl.NoStackTraceThrowable;
import io.vertx.up.annotations.Contract;
import io.vertx.up.atom.worker.Mission;
import io.vertx.up.commune.Envelop;
import io.vertx.up.eon.em.EmJob;
import io.vertx.up.uca.job.phase.Phase;
import io.vertx.up.uca.job.store.JobConfig;
import io.vertx.up.uca.job.store.JobPin;
import io.vertx.up.uca.job.store.JobStore;
import io.vertx.up.uca.job.timer.Interval;
import io.vertx.up.util.Ut;

import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

/*
 * The chain should be
 *
 * 1) Input data came from 'incomeAddress' ( There are some preparing or other info )
 * 2) `incomeComponent` will be triggered if it's existing.
 * 3) `component` is required and contain major code logical.
 * 4) `outcomeComponent` will be triggered if it's existing.
 * 5) The result message will be sent to `outcomeAddress`.
 * 6) There could be a callbackAsync method for callback execution ( After Out )
 *    - If `outcomeAddress`, the data came from Event Bus
 *    - Otherwise, the data came from `outcomeComponent`.
 */
public abstract class AbstractAgha implements Agha {

    private static final JobConfig CONFIG = JobPin.getConfig();
    private static final AtomicBoolean SELECTED = new AtomicBoolean(Boolean.TRUE);
    /*
     * STARTING ------|
     *                v
     *     |------> READY <-------------------|
     *     |          |                       |
     *     |          |                    <start>
     *     |          |                       |
     *     |        <start>                   |
     *     |          |                       |
     *     |          V                       |
     *     |        RUNNING --- <stop> ---> STOPPED
     *     |          |
     *     |          |
     *  <resume>   ( error )
     *     |          |
     *     |          |
     *     |          v
     *     |------- ERROR
     *
     */
    private static final ConcurrentMap<EmJob.Status, EmJob.Status> VM = new ConcurrentHashMap<>() {
        {
            /* STARTING -> READY */
            this.put(EmJob.Status.STARTING, EmJob.Status.READY);

            /* READY -> RUNNING ( Automatically ) */
            this.put(EmJob.Status.READY, EmJob.Status.RUNNING);

            /* RUNNING -> STOPPED ( Automatically ) */
            this.put(EmJob.Status.RUNNING, EmJob.Status.STOPPED);

            /* STOPPED -> READY */
            this.put(EmJob.Status.STOPPED, EmJob.Status.READY);

            /* ERROR -> READY */
            this.put(EmJob.Status.ERROR, EmJob.Status.READY);
        }
    };
    @Contract
    private transient Vertx vertx;

    Interval interval(final Consumer<Long> consumer) {
        final Class<?> intervalCls = CONFIG.getInterval().getComponent();
        final Interval interval = Ut.singleton(intervalCls);
        Ut.contract(interval, Vertx.class, this.vertx);

        if (SELECTED.getAndSet(Boolean.FALSE)) {
            /* Be sure the log only provide once */
            this.getLogger().info(VMessage.Job.PHASE.UCA_COMPONENT, "Interval", interval.getClass().getName());
        }
        if (Objects.nonNull(consumer)) {
            interval.bind(consumer);
        }
        return interval;
    }

    Interval interval() {
        return this.interval(null);
    }

    JobStore store() {
        return JobPin.getStore();
    }

    /*
     * Input workflow for Mission
     * 1. Whether address configured ?
     *    - Yes, get Envelop from event bus as secondary input
     *    - No, get Envelop of `Envelop.ok()` instead
     * 2. Extract `JobIncome`
     * 3. Major
     * 4. JobOutcome
     * 5. Whether defined address of output
     * 6. If 5, provide callback function of this job here.
     */
    private Future<Envelop> workingAsync(final Mission mission) {
        /*
         * Initializing phase reference here.
         */
        final Phase phase = Phase.start(mission.getCode())
            .bind(this.vertx)
            .bind(mission);
        /*
         * 1. Step 1:  EventBus ( Input )
         */
        return phase.inputAsync(mission)
            /*
             * 2. Step 2:  JobIncome ( Process )
             */
            .compose(phase::incomeAsync)
            /*
             * 3. Step 3:  Major cole logical here
             */
            .compose(phase::invokeAsync)
            /*
             * 4. Step 4:  JobOutcome ( Process )
             */
            .compose(phase::outcomeAsync)
            /*
             * 5. Step 5: EventBus ( Output )
             */
            .compose(phase::outputAsync)
            /*
             * 6. Final steps here
             */
            .compose(phase::callbackAsync);
    }

    void working(final Mission mission, final Actuator actuator) {
        if (EmJob.Status.READY == mission.getStatus()) {
            /*
             * READY -> RUNNING
             */
            this.moveOn(mission, true);
            /*
             * Read threshold
             * 「OLD」for KTimer not null, but in ONCE or some spec types,
             * the timer could be null
             * final KTimer timer = mission.timer();
             * Objects.requireNonNull(timer);
             */
            final long threshold = mission.timeout();
            /*
             * Worker Executor of New created
             * 1) Create new worker pool for next execution here
             * 2) Do not break the major thread for terminal current job
             * 3)Executing log here for long block issue
             */
            final String code = mission.getCode();
            final WorkerExecutor executor =
                this.vertx.createSharedWorkerExecutor(code, 1, threshold);
            this.getLogger().info(VMessage.Job.AGHA.WORKER_START, code, String.valueOf(TimeUnit.NANOSECONDS.toSeconds(threshold)));
            /*
             * The executor start to process the workers here.
             */
            executor.<Envelop>executeBlocking(promise -> promise.handle(this.workingAsync(mission)
                .compose(result -> {
                    /*
                     * The job is executing successfully and then stopped
                     */
                    actuator.execute();
                    this.getLogger().info(VMessage.Job.AGHA.WORKER_END, code);
                    return Future.succeededFuture(result);
                })
                .otherwise(error -> {
                    /*
                     * The job exception
                     */
                    if (!(error instanceof NoStackTraceThrowable)) {
                        error.printStackTrace();
                        this.moveOn(mission, false);
                    }
                    return Envelop.failure(error);
                })), handler -> {
                /*
                 * Async result here to check whether it's ended
                 */
                if (handler.succeeded()) {
                    /*
                     * Successful, close worker executor
                     */
                    executor.close();
                } else {
                    if (Objects.nonNull(handler.cause())) {
                        /*
                         * Failure, print stack instead of other exception here.
                         */
                        final Throwable error = handler.cause();
                        if (!(error instanceof NoStackTraceThrowable)) {
                            error.printStackTrace();
                        }
                    }
                }
            });
        }
    }

    void moveOn(final Mission mission, final boolean noError) {
        if (noError) {
            /*
             * Preparing for job
             **/
            if (VM.containsKey(mission.getStatus())) {
                /*
                 * Next Status
                 */
                final EmJob.Status moved = VM.get(mission.getStatus());
                final EmJob.Status original = mission.getStatus();
                mission.setStatus(moved);
                /*
                 * Log and update cache
                 */
                this.getLogger().info(VMessage.Job.AGHA.MOVED, mission.getType(), mission.getCode(), original, moved);
                this.store().update(mission);
            }
        } else {
            /*
             * Terminal job here
             */
            if (EmJob.Status.RUNNING == mission.getStatus()) {
                mission.setStatus(EmJob.Status.ERROR);
                this.getLogger().info(VMessage.Job.AGHA.TERMINAL, mission.getType(), mission.getCode());
                this.store().update(mission);
            }
        }
    }

    protected Annal getLogger() {
        return Annal.get(this.getClass());
    }
}