silentbalanceyh/vertx-zero

View on GitHub
vertx-gaia/vertx-ams/src/main/java/io/horizon/fn/_Async.java

Summary

Maintainability
A
0 mins
Test Coverage
package io.horizon.fn;

import io.horizon.exception.AbstractException;
import io.vertx.core.Future;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Function;

/**
 * @author lang : 2023-05-27
 */
class _Async {
    protected _Async() {

    }

    /**
     * 并行检查器,检查所有的异步结果,全部为 true 时则通过检查,
     * 最终返回双态 Monad
     *
     * @param response  响应
     * @param error     检查不通过抛出的异常
     * @param executors 执行器
     * @param <T>       响应类型
     * @param <E>       异常类型
     *
     * @return {@link Future}
     */
    public static <T, E extends AbstractException> Future<T> passAll(
        final T response, final E error,
        final Set<Function<T, Future<Boolean>>> executors) {
        return HAsync.pass(response, error, list -> list.stream().allMatch(Boolean::booleanValue), executors);
    }

    /**
     * 并行检查器,检查所有的异步结果,只要有一个为 true 时则通过检查,
     * 最终返回双态 Monad
     *
     * @param response  响应
     * @param error     检查不通过抛出的异常
     * @param executors 执行器
     * @param <T>       响应类型
     * @param <E>       异常类型
     *
     * @return {@link Future}
     */
    public static <T, E extends AbstractException> Future<T> passAny(
        final T response, final E error,
        final Set<Function<T, Future<Boolean>>> executors) {
        return HAsync.pass(response, error, list -> list.stream().allMatch(Boolean::booleanValue), executors);
    }

    /**
     * 并行检查器,检查所有的异步结果,所有结果都为 false 时则通过检查,
     *
     * @param response  响应
     * @param error     检查不通过抛出的异常
     * @param executors 执行器
     * @param <T>       响应类型
     * @param <E>       异常类型
     *
     * @return {@link Future}
     */
    public static <T, E extends AbstractException> Future<T> passNone(
        final T response, final E error,
        final Set<Function<T, Future<Boolean>>> executors) {
        return HAsync.pass(response, error, list -> list.stream().noneMatch(Boolean::booleanValue), executors);
    }

    /**
     * 并行编排器,此种编排器不在意执行结果,只关心执行是否成功,工作流程如下:
     * <pre><code>
     *     input -> executor1 -> output1 -> Future<input>
     *           -> executor2 -> output2
     *           -> executor3 -> output3
     *           ...
     *           -> executorN -> outputN
     * </code></pre>
     *
     * 整体流程如,其中所有的 executor 是同时执行
     * <pre><code>
     *     input -> executor1
     *              executor2
     *              executor3 -> output
     * </code></pre>
     *
     * @param input     输入
     * @param executors 执行器
     * @param <T>       输入类型
     *
     * @return {@link Future}
     */
    public static <T> Future<T> parallel(final T input, final Set<Function<T, Future<T>>> executors) {
        return HAsync.parallel(input, executors);
    }

    /**
     * 并行编排器,{@link _Async#parallel} 的重载版本
     *
     * @param input     输入
     * @param executors 执行器
     * @param <T>       输入类型
     *
     * @return {@link Future}
     */
    public static <T> Future<T> parallel(final T input, final List<Function<T, Future<T>>> executors) {
        return HAsync.parallel(input, new HashSet<>(executors));
    }

    /**
     * 并行编排器,{@link _Async#parallel} 的重载版本
     *
     * @param input     输入
     * @param executors 执行器
     * @param <T>       输入类型
     *
     * @return {@link Future}
     */
    @SafeVarargs
    public static <T> Future<T> parallel(final T input, final Function<T, Future<T>>... executors) {
        return HAsync.parallel(input, new HashSet<>(Arrays.asList(executors)));
    }

    /**
     * 异步串行编排器,工作流程如下:
     * <pre><code>
     *     input -> executor1 -> output1 ->
     *              executor2 -> output2 ->
     *              executor3 -> output3 ->
     *              ...
     *              executorN -> outputN -> Future<outputN>
     * </code></pre>
     * 执行流程过程中每一个步骤的输出结果会作为下一个执行的输入,整体流程如:
     * <pre><code>
     *     input -> executor1 -> executor2 -> executor3 -> executorN
     * </code></pre>
     *
     * @param input     输入
     * @param executors 执行器
     * @param <T>       输入类型
     *
     * @return {@link Future}
     */
    public static <T> Future<T> passion(final T input, final List<Function<T, Future<T>>> executors) {
        return HAsync.passion(input, executors);
    }

    /**
     * 异步串行编排器,{@link _Async#passion} 的重载版本
     *
     * @param input     输入
     * @param executors 执行器
     * @param <T>       输入类型
     *
     * @return {@link Future}
     */
    @SafeVarargs
    public static <T> Future<T> passion(final T input, final Function<T, Future<T>>... executors) {
        return HAsync.passion(input, Arrays.asList(executors));
    }
}