vertx-gaia/vertx-co/src/main/java/io/vertx/up/fn/_Combine.java
package io.vertx.up.fn;
import io.vertx.core.Future;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.up.util.Ut;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
/**
* @author lang : 2023/4/27
*/
@SuppressWarnings("all")
class _Combine extends _Atomic {
protected _Combine() {
}
/*
/*
* This arrange part will replace `thenCombine?` method. Here are detail readme information:
*
* 1. Flag
* Here the flag identified the return value of internal ( Generic T )
* - A: JsonArray
* - J: JsonObject
* - T: Generice <T>
* - M: Map
* - L: List
*
* 2. Prefix
* - combine, From element to container, this situation often happens as following
* 1) The element of collection contains async operation.
* 2) The result should be collection and major thread must wait for each element async operation finished.
* - comic, Expand element to multi, this situation often happens as following
* 1) The element of collection is single type.
* 2) The single type will generate multi elements into collection formed
* - compress, Compress collection to single, this situation often happens as following
* 1) The collection has 2 layers and each element is also another collection.
* 2) This kind of API will compress the collection into 1 layer ( single collection ).
*
* 3. The mark of data structure
*
* o - Pure element type without any attached information.
* If there are more types, I'll use o1, o2, o3.
* [o] - 「Container」Collection type and the element type is o.
* (o) - 「Container」Async type and the element type is o.
* fx - Consumer Function
*/
// ---------------------------------------------------- 编排函数 ----------------------------------------------------
/**
* 二选一专用编排函数,执行流程如下:
*
* 1. 从 JsonObject 提取对象引用,该引用由 `field` 来提取。
* 2. 检查对象引用的类型,分别为 `JsonArray` 和 `JsonObject`
* 3. 根据不同的类型执行不同的逻辑函数
* -- JsonObject: itemFnJ -> JsonObject -> Future[J]
* -- JsonObject: itemFnA -> JsonArray -> Future[A]
* 4. 将执行函数的结果回写到 input 中的 `field` 属性。
*
* <pre><code>
* JsonObject
* JsonObject
* | --> fx: itemFnJ --> ( j1 ) --> input[field] = j1 |
* | |
* input[field] --> X o --> ( input )
* | JsonArray |
* | --> fx: itemFnA --> ( a1 ) --> input[field] = a1 |
* | |
* | ? (default) |
* | --> x -----------------> input |
* </code></pre>
*
* 注意此处的编排函数结构返回值可以是 J 或 A,命名表示通常以 Future[JsonObject] 或 Future[JsonArray] 返回,但核心
* 执行过程中可定义泛型返回值,但由于回调函数中会将返回结果重新封装回 input[field] 中,所以此处执行结果必须是JsonObject
* 中可支持的类型,不可支持的类型会导致回写失败,而且当条件不匹配时什么都不做,直接返回 input。
*
* @param input JsonObject 输入的JsonObject对象
* @param field String 提取对象引用的字段名
* @param itemOrJ Function 逻辑函数,输入为 JsonObject,输出为 Future[J]
* @param itemOrA Function 逻辑函数,输入为 JsonArray,输出为 Future[A]
* @param <J> [J] 逻辑函数的输出类型[J],执行函数 jobject -> Future[J]
* @param <A> [A] 逻辑函数的输出类型[A],执行函数 jarray -> Future[A]
*
* @return 返回自包含的 Future<JsonObject> 对象
*/
public static <J, A> Future<JsonObject> choiceJ(
final JsonObject input, final String field,
final Function<JsonObject, Future<J>> itemOrJ, final Function<JsonArray, Future<A>> itemOrA) {
return ThenJ.choiceJ(input, field, itemOrJ, itemOrA);
}
// ---------------------------------------------------- 组合函数 ----------------------------------------------------
// >>> 返回:Future<JsonArray>
/**
* 一维组合编排函数,其执行流程
* <pre><code>
* [
* combinerOf
* j + j1 => j2
* generateOf
* j --> fx --> ( j1 ) --> fx -->
* j --> fx --> ( j1 ) --> fx --> ( [j2, j2, j2] )
* j --> fx --> ( j1 ) --> fx -->
* ]
* </code></pre>
*
* 1. 处理单个异步结果
* 2. 迭代JsonArray并提取 element 类型位 JsonObject 的结果集
* 3. 针对每个JsonObject 元素执行 generateOf 函数,生成 Future<JsonObject> 结果
* 4. 将得到的所有结果执行亮亮组合,并且使用拉平操作,得到的最终使用组合后的结果生成一个新的 JsonObject
* -- combinerOf的参数 (j, j1)
* ----- 第一参数 j 是原始的输入元素,即 generateOf 函数中的输入
* ----- 第二参数 j1 是生成函数的输出,即 generateOf 函数的执行结果(异步结果)
* 5. 最终返回组合后的结果,JsonArray 中的每个元素都是 JsonObject
*
* 组合函数在此处实际是针对输入和输出的组合,输入和输出在此函数中的类型是一致的,此处都是 JsonObject
* - 输入:迭代 JsonArray 中生成的每个 JsonObject 元素
* - 输出:针对 JsonObject 执行过 generateOf 函数后的结果
* - 默认情况下不满足条件的(element instanceof JsonObject) 的元素不会执行 generateOf 函数,会直接被过滤掉
*
* @param source Future<JsonArray> 输入的异步结果,结果内是 JsonArray
* @param generateOf 元素生成函数,针对JsonArray中的每一个 JsonObject 函数执行 generateOf
* @param combinerOf 组合函数,生成函数结果位 Future<JsonObject>,将所有异步结果执行两两合并
*
* @return 返回执行过的结果数组 Future<JsonArray>
*/
public static Future<JsonArray> combineA(
final Future<JsonArray> source,
final Function<JsonObject, Future<JsonObject>> generateOf, final BinaryOperator<JsonObject> combinerOf) {
return ThenA.combineA(source, generateOf, combinerOf);
}
/**
* 一维组合编排函数,其执行流程
* <pre><code>
* [
* generateOf
* j --> fx --> ( j1 )
* j --> fx --> ( j1 ) --> ( [j1,j1,j1] )
* j --> fx --> ( j1 )
* ]
* </code></pre>
*
* 1. 针对 JsonArray 执行迭代,提取每个类型为 JsonObject 的元素
* 2. 针对每个类型为 JsonObject 的元素执行生成函数 generateOf,生成 Future<JsonObject> 结果
* 3. 将所有的 Future<JsonObject> 执行组合操作,生成新的 Future<JsonArray> 结果
*
* @param input JsonArray 输入的异步结果,结果内是 JsonArray
* @param generateOf 元素生成函数,针对JsonArray中的每一个 JsonObject 函数执行 generateOf
*
* @return 返回执行过的结果数组 Future<JsonArray>
*/
public static Future<JsonArray> combineA(final JsonArray input,
final Function<JsonObject, Future<JsonObject>> generateOf) {
final List<Future<JsonObject>> futures = new ArrayList<>();
Ut.itJArray(input).map(generateOf).forEach(futures::add);
return ThenA.combineA(futures);
}
/**
* 一维组合编排函数,其执行流程
* <pre><code>
* [
* ( j )
* ( j ) --> ( [j, j, j] )
* ( j )
* ]
* </code></pre>
*
* 最简单的异步编排组合函数,直接将输入的列表型的异步结果组合到一起
* List<Future<T>> 到 Future<List<T>> 的转换模式
*
* @param source Future<JsonArray> 输入的异步结果,结果内是 JsonArray
*
* @return 返回执行过的结果数组 Future<JsonArray>
*/
public static Future<JsonArray> combineA(final List<Future<JsonObject>> source) {
return ThenA.combineA(source);
}
/*
* Workflow:
*
* [
* t --> fx --> ( [...] )
* t --> fx --> ( [...] ) --> ( [... ... ... ... ...] )
* t --> fx --> ( [...] )
* ]
*
* The o = Class<T>
* 「1 Dim」
* This method is common comic, the input data structure is collection ( JsonArray ) as:
*
* 1. Filter the `source` based on `clazz` type, other type will be ignored.
* 2. 「F」Process each element t1 ( selected Class<T> ) to generate the new collection.
* 3. Until this step the final result set is matrix: [[]]
* 4. Combine the matrix from 2 dim to 1 dim: [[]] => []
*/
/**
* 防重载的JsonArray再行的 combineA 的变体组合函数,由于和普通的JsonArray再行的组合函数冲突,所以需要重命名该函数,执行流程
* 如下(该函数为扩散型):
* <pre><code>
* e 的类型为 Class<T>
* [
* generateOf
* e --> fx ( [t, t, t, ...] )
* e --> fx ( [t, t, t, ...] ) --> ( [..., ..., ...] )
* e --> fx ( [t, t, t, ...] )
* ]
* </code></pre>
* 该方法是合并变体函数,它的输入为一个JsonArray,内部元素 e 的类型为传入类型
*
* 1. 使用类型对 JsonArray 传入集合执行过滤,只有满足类型条件的会生效,其他内容会被忽略。
* 2. 筛选出合法元素之后根据合法元素执行 generateOf 且返回值为 JsonArray 异步结果。
* 3. 将异步结果全部拉平合并到一个 JsonArray 中,二阶转一阶:[[],[],[]] => []
*
* @param source JsonArray 输入的集合
* @param clazz Class<T> 输入的类型,限定集合中元素类型
* @param generateOf 元素生成函数,针对JsonArray中的每一个 JsonObject 函数执行 generateOf
* @param <T> 泛型类型
*
* @return 返回执行过的结果数组 Future<JsonArray>
*/
public static <T> Future<JsonArray> combineA_(final JsonArray source, final Class<T> clazz,
final Function<T, Future<JsonArray>> generateOf) {
final List<Future<JsonArray>> futures = new ArrayList<>();
Ut.itJArray(source, clazz, (item, index) -> futures.add(generateOf.apply(item)));
return ThenA.compressA(futures);
}
public static Future<JsonArray> combineA_(final JsonArray source,
final Function<JsonObject, Future<JsonArray>> generateFun) {
return combineA_(source, JsonObject.class, generateFun);
}
// >>> 返回:Future<JsonObject>
/**
* JsonObject格式的变参组合编排函数,生成数据结构有所变化,它的执行流程如:
* <pre><code>
* (
* (j1) 0 = j1
* (j2) -> 1 = j2
* ...
* (jn) n - 1 = jn
* )
* </code></pre>
*
* @param futures Future<JsonObject>... 变参集合信息
*
* @return Future<JsonObject> 返回的数据结构是 JsonObject
*/
public static Future<JsonObject> combineJ(final Future<JsonObject>... futures) {
return ThenJ.combineJ(futures);
}
// ----------------------- 防重载组合函数, combineJ_ 变体 -----------------------
/**
* 防止重载的JsonObject类型 combineJ 的变体组合函数,由于会和普通的JsonObject类型的组合函数冲突,所以需要重命名该组合函数,其执行
* 流程如下
* <pre><code>
* generateOf combinerOf
* j => ([j1,j1,j1]) j + j1 => j2
* t --> fx --> ( [j1,j1,j1] ) --> fx --> (j2)
*
* </code></pre>
* 该函数执行流程相对复杂
*
* 1. 先将输入作为生成函数的输入,会生成一个列表信息,这种模式会拓展生成多个 JsonObject 列表,而且生成过程中
* 每个函数本身都是异步结果。
* 2. 根据一部结果针对每个元素执行一次合并,得到最终结果如:
* <pre><code>
* j --> j1 ( index = 0 ) --> combinerOf ( index = 0 ) --> j2
* j1 ( index = 1 ) --> combinerOf ( index = 1 ) --> j2
* ... --> ... --> ...
* </code></pre>
* 拉平处理之后执行组合,最终会返回合并后的唯一的JsonObject,该函数可能会有副作用,导致最终结果受到影响。
*
* 这个接口和其他接口不同,combinerOf 会有两个版本
* 1. 如果 source 被修改,那么 combinerOf 可以执行修改操作,这种模式会影响输入数据,带有副作用
* 2.
*
* @param source 输入的JsonObject
* @param generateOf 生成函数,输入为 JsonObject,输出为 List<Future>
* @param combinerOf 组合函数,输入为 JsonObject 和 JsonObject,输出为 JsonObject
*
* @return Future<JsonObject> 返回执行过的结果
*/
public static Future<JsonObject> combineJ_(
final Future<JsonObject> source, final Function<JsonObject, List<Future>> generateOf,
final BiConsumer<JsonObject, JsonObject>... combinerOf) {
return ThenJ.combineJ(source, generateOf, combinerOf);
}
public static Future<JsonObject> combineJ_(
final JsonObject source, final Function<JsonObject, List<Future>> generateFun,
final BiConsumer<JsonObject, JsonObject>... operatorFun) {
return ThenJ.combineJ(Future.succeededFuture(source), generateFun, operatorFun);
}
/**
* 哈希表组合函数
* <pre><code>
* [ (
* k=(t) k=t,
* k=(t) --> (t) = k=t,
* k=(t) k=t,
* ] )
* </code></pre>
* 哈希表专用的组合函数,针对每一个键值提供异步结果,此处的类型必须使用 ConcurrentMap,由于每一个键值对是同时执行
* 且相互之间不依赖,在并行环境下,只有 ConcurrentMap 才能保证线程安全。
*
* @param futureMap ConcurrentMap<K, Future<T>> 输入的异步结果,结果内是 ConcurrentMap<K, Future<T>>
* @param <K> 键类型
* @param <T> 值类型
*
* @return 返回执行过的结果数组 Future<ConcurrentMap<K, T>>
*/
public static <K, T> Future<ConcurrentMap<K, T>> combineM(final ConcurrentMap<K, Future<T>> futureMap) {
return ThenM.combineM(futureMap);
}
// >>> 返回:Future<T>
// ---------------------------------------------------- 压缩函数 ----------------------------------------------------
// >>> 返回:Future<JsonArray>
/**
* 压缩函数,二阶转一阶
* [[t1, t2, t3], [t4, t5, t6]] -> [t1, t2, t3, t4, t5, t6]
* 该压缩函数在很多地方会使用到,主要用于将:集合的集合转换成集合(拉平后处理,类似 flatMap 效果,只是该函数支持异步)
*
* @param futures List<Future<JsonArray>> 输入的异步结果,结果内是 JsonArray
*
* @return Future<JsonArray> 返回执行过的压缩结果
*/
public static Future<JsonArray> compressA(final List<Future<JsonArray>> futures) {
return ThenA.compressA(futures);
}
/*
* Workflow:
*
* [
* ( [t,t] ) --> ...
* ( [t,t,t,t] ) --> ... --> ( [t,t, t,t,t,t, t] )
* ( [t] ) --> ...
* ]
*
* 「2 Dim」
* The input is matrix, this method will compress the 2 dim matrix to 1 dim, the code logical is:
*
* 1. Process each element ( Future<List<T>> ) async operation to get the result ( List<T> ).
* 2. Combine the result ( List<List<T>> ) => ( List<T> ) only.
* 3. The final data structure is: [[]] => []
*/
/**
* 压缩函数,二阶转一阶
*
* <pre><code>
* [
* ( [t,t] ) --> ...
* ( [t,t,t,t] ) --> ... --> ( [t,t, t,t,t,t, t] )
* ( [t] ) --> ...
* ]
* </code></pre>
* 该函数的输入通常是一个矩阵,这个方法会将2阶矩阵压缩成1阶列表:
*
* 1. 执行单元素的一部返回结果,得到 List<T>
* 2. 针对该结果执行组合操作,将 List<List<T>> 转换成 List<T>
* 3. 最终的结果就是一个单列表,[[], [], []] => []
*
* @param futures List<Future<List<T>>> 输入的异步结果,结果内是 List<T>
* @param <T> 输入类型T
*
* @return Future<List < T>> 返回执行过的压缩结果
*/
public static <T> Future<List<T>> compressL(final List<Future<List<T>>> futures) {
return ThenL.compressL(futures);
}
/**
* 哈希表压缩函数,二阶转一阶
* <pre><code>
* [
* ( [k=t,k=t,k=t] ) --> ...
* ( [k=t] ) --> ... --> ( [k=t,k=t,k=t, k=t, k=t,k=t] )
* ( [k=t,k=t] ) --> ...
* ]
* </code></pre>
* 该函数输入有两层容器,第一层是 List,第二层是 ConcurrentMap,两层容器为 2 阶数据结构,所以该输入不能单纯看成
* 矩阵,代码逻辑如:
*
* 1. 执行 List 的每一个元素( Future<ConcurrentMap<String, T>> ) 一部操作得到结果 ( ConcurrentMap<String, T> ).
* 2. 一个最终结果组合成集合 ( List<ConcurrentMap<String, T>> ) 通过计算得到最终的 ( ConcurrentMap<String, T> ).
* 默认版本组合函数直接调用哈希表的 addAll 操作
* 3. 最终的结果为:[[k=t],[k=t]] => [k=t, k=t]
*
* @param futures List<Future<ConcurrentMap<String, T>>> 输入的异步结果,结果内是 ConcurrentMap<String, T>
* @param combinerOf BiFunction<T, T, T> 组合函数,用于将 ConcurrentMap<String, T> 组合成 ConcurrentMap<String, T>
* @param <T> 输入类型T
*
* @return Future<Map < String, T>> 返回执行过的压缩结果
*/
public static <T> Future<ConcurrentMap<String, T>> compressM(final List<Future<ConcurrentMap<String, T>>> futures,
final BinaryOperator<T> combinerOf) {
return ThenM.compressM(futures, combinerOf);
}
public static Future<ConcurrentMap<String, JsonArray>> compressM(final List<Future<ConcurrentMap<String, JsonArray>>> futures) {
return ThenM.compressM(futures, (original, latest) -> original.addAll(latest));
}
}