
View on GitHub


1 day
Test Coverage
package io.vertx.up.unity;

import io.horizon.atom.common.Refer;
import io.horizon.eon.VValue;
import io.horizon.uca.log.Annal;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.up.fn.Fn;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.function.Supplier;

class Async {

    private static final Annal LOGGER = Annal.get(Async.class);

    static <T> Future<T> fromAsync(final CompletionStage<T> state) {
        final Promise<T> promise = Promise.promise();
        state.whenComplete((result, error) -> {
            if (Objects.isNull(error)) {
            } else {
        return promise.future();

    static <T> Future<T> future(final T input, final Set<Function<T, Future<T>>> set) {
        final List<Future<T>> futures = new ArrayList<>(); -> consumer.apply(input)).forEach(futures::add);
        Fn.combineT(futures).compose(nil -> {
  "「Job Infusion」 There are `{0}` jobs that are finished successfully!", String.valueOf(set.size()));
            return To.future(nil);
        return To.future(input);

    static <T> Future<T> future(final T input, final List<Function<T, Future<T>>> queues) {
        if (0 == queues.size()) {
             * None queue here
            return To.future(input);
        } else {
            Future<T> first = queues.get(VValue.IDX).apply(input);
            if (Objects.isNull(first)) {
                LOGGER.error("The index = 0 future<T> returned null, plugins will be terminal");
                return To.future(input);
            } else {
                if (1 == queues.size()) {
                     * Get first future
                    return first;
                } else {
                     * future[0]
                     *    .compose(future[1])
                     *    .compose(future[2])
                     *    .compose(...)
                    final Refer response = new Refer();

                    for (int idx = 1; idx < queues.size(); idx++) {
                        final int current = idx;
                        first = first.compose(json -> {
                            final Future<T> future = queues.get(current).apply(json);
                            if (Objects.isNull(future)) {
                                 * When null found, skip current
                                return To.future(json);
                            } else {
                                return future
                                     * Replace the result with successed item here
                                     * If success
                                     * -- replace previous response with next
                                     * If handler
                                     * -- returned current json and replace previous response with current
                                     * The step stopped
                                    .otherwise(Debug.otherwise(() -> response.add(json).get()));
                        }).otherwise(Debug.otherwise(() -> response.get()));
                    return first;

    static <T> Future<JsonObject> toJsonFuture(
        final String pojo,
        final CompletableFuture<T> completableFuture
    ) {
        final Promise<JsonObject> future = Promise.promise();
        Fn.runAt(null == completableFuture, null,
            () -> future.complete(new JsonObject()),
            () -> completableFuture.thenAcceptAsync((item) -> Fn.runAt(
                null == item, null,
                () -> future.complete(new JsonObject()),
                () -> future.complete(To.toJObject(item, pojo))
            )).exceptionally((ex) -> {
                return null;
        return future.future();

    static <T> Future<JsonArray> toArrayFuture(
        final String pojo,
        final CompletableFuture<List<T>> completableFuture
    ) {
        final Promise<JsonArray> future = Promise.promise();
        Fn.runAt(null == completableFuture, null,
            () -> future.complete(new JsonArray()),
            () -> completableFuture.thenAcceptAsync((item) -> Fn.runAt(
                null == item, null,
                () -> future.complete(new JsonArray()),
                () -> future.complete(To.toJArray(item, pojo))
            )).exceptionally((ex) -> {
                return null;
        return future.future();

    static <T> Future<JsonObject> toUpsertFuture(final T entity, final String pojo,
                                                 final Supplier<Future<JsonObject>> supplier,
                                                 final Function<JsonObject, JsonObject> updateFun) {
        // Default Case
        if (Objects.isNull(entity)) {
            return supplier.get();
        final JsonObject params = To.toJObject(entity, pojo);

        // Update Function == null
        if (Objects.isNull(updateFun)) {
            return Future.succeededFuture(params);

        // Update Executor
        return Future.succeededFuture(updateFun.apply(params));

    static <T> Function<Throwable, Future<T>> toErrorFuture(final Supplier<T> input) {
        return ex -> {
            if (Objects.nonNull(ex)) {
            return Future.succeededFuture(input.get());

    static <T, O> Future<O> channel(final Class<T> clazz, final Supplier<O> supplier,
                                    final Function<T, Future<O>> executor) {
        final T channel = Pocket.lookup(clazz);
        if (Objects.isNull(channel)) {
            LOGGER.warn("「SL Channel」Channel {0} null", clazz.getName());
            return To.future(supplier.get());
        } else {
            LOGGER.debug("「SL Channel」Channel Async selected {0}, {1}",
                channel.getClass().getName(), String.valueOf(channel.hashCode()));
            return executor.apply(channel);

    static <T, O> O channelSync(final Class<T> clazz, final Supplier<O> supplier,
                                final Function<T, O> executor) {
        final T channel = Pocket.lookup(clazz);
        if (Objects.isNull(channel)) {
            LOGGER.warn("「SL Channel」Channel Sync {0} null", clazz.getName());
            return supplier.get();
        } else {
            LOGGER.debug("「SL Channel」Channel Sync selected {0}, {1}",
                channel.getClass().getName(), String.valueOf(channel.hashCode()));
            return executor.apply(channel);

    static <T, O> Future<O> channelAsync(final Class<T> clazz, final Supplier<Future<O>> supplier,
                                         final Function<T, Future<O>> executor) {
        final T channel = Pocket.lookup(clazz);
        if (Objects.isNull(channel)) {
            LOGGER.warn("「SL Channel」Channel Async {0} null", clazz.getName());
            return supplier.get();
        } else {
            LOGGER.debug("「SL Channel」Channel Async selected {0}, {1}",
                channel.getClass().getName(), String.valueOf(channel.hashCode()));
            return executor.apply(channel);