vertx-pin/zero-vie/src/main/java/io/mature/extension/refine/OxPlugin.java
package io.mature.extension.refine;
import io.aeon.runtime.channel.Pocket;
import io.horizon.spi.plugin.AspectPlugin;
import io.mature.extension.uca.log.Ko;
import io.vertx.core.Future;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.mod.atom.modeling.builtin.DataAtom;
import io.vertx.mod.atom.modeling.data.DataGroup;
import io.vertx.up.fn.Fn;
import io.vertx.up.plugin.database.DS;
import io.vertx.up.plugin.database.DataPool;
import io.vertx.up.unity.Ux;
import io.vertx.up.util.Ut;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* ## 插件工具
*
* ### 1. 基本介绍
*
* Ox平台专用插件处理工具。
*
* ### 2. 插件表
*
* - 数据连接池插件:{@link DataPool}类型。
*
* @author <a href="http://www.origin-x.cn">Lang</a>
*/
final class OxPlugin {
/*
* 私有构造函数(工具类转换)
*/
private OxPlugin() {
}
/**
* 根据应用标识符读取数据连接池({@link DataPool}类型)。
*
* @param sigma {@link String} 应用统一标识符
*
* @return {@link DataPool}连接池对象
*/
private static DataPool pluginDs(final String sigma) {
if (Ut.isNil(sigma)) {
return null;
} else {
final DS ds = Pocket.lookup(DS.class);
if (Objects.isNull(ds)) {
return null;
} else {
return ds.switchDs(sigma);
}
}
}
/**
* 「Function」插件类安全执行器,执行内部`executor`,若有异常则直接调用内部日志记录。
*
* @param clazz {@link Class} 调用该方法的对象类
* @param input `executor`的输入
* @param executor {@link Function} 外部传入执行器
* @param <T> `executor`执行器处理类型
*
* @return `executor`执行结果
*/
static <T> Future<T> runSafe(final Class<?> clazz, final T input, final Function<T, Future<T>> executor) {
try {
return executor.apply(input);
} catch (final Throwable ex) {
return runSafe(clazz, input, ex);
}
}
/**
* 「Supplier」插件类安全执行器,执行内部`executor`,若有异常则直接调用内部日志记录。
*
* @param clazz {@link Class} 调用该方法的对象类
* @param input `executor`的输入
* @param supplier {@link Supplier} 外部传入数据构造器
* @param <T> `executor`执行器处理类型
*
* @return `executor`执行结果
*/
static <T> Future<T> runSafe(final Class<?> clazz, final T input, final Supplier<T> supplier) {
try {
return Ux.future(supplier.get())
/* 内置处理 */
.compose(indexed -> Ux.future(input));
} catch (final Throwable ex) {
return runSafe(clazz, input, ex);
}
}
/**
* 「内部调用」异常执行器。
*
* @param clazz {@link Class} 调用该方法的对象类
* @param input `executor`的输入
* @param ex {@link Throwable} 异常信息
* @param <T> 返回的真实数据类型
*
* @return {@link Future}
*/
private static <T> Future<T> runSafe(final Class<?> clazz, final T input, final Throwable ex) {
// ex.printStackTrace();
Ox.LOG.Plugin.warn(clazz, "plugin", ex.getMessage());
/* 集成专用信息,这里不打印 ex 的 Stack 信息 */
Ko.integration(clazz, null, ex);
return Ux.future(input);
}
/**
* 数据源执行器,{@link DataPool}数据源运行主流程。
*
* @param sigma {@link String} 应用统一标识符
* @param supplier {@link Supplier} 外部数据读取器
* @param executor {@link Function} 函数执行器
* @param <T> 最终执行后返回的数据类型
*
* @return {@link Future}
*/
static <T> Future<T> runDs(final String sigma, final Supplier<T> supplier,
final Function<DataPool, Future<T>> executor) {
final DataPool ds = pluginDs(sigma);
if (Objects.isNull(ds)) {
return Ux.future(supplier.get());
} else {
return executor.apply(ds);
}
}
/**
* 分组运行器,将数据分组后执行分组过后的运行。
*
* - 每一组有相同的模型定义{@link DataAtom}。
* - 每一组有相同的数据输入{@link JsonArray}
*
* @param groupSet {@link Set}<{@link DataGroup}> 分组集合
* @param consumer {@link BiFunction} 双输入函数
*
* @return {@link Future}<{@link JsonArray}>
*/
static Future<JsonArray> runGroup(final Set<DataGroup> groupSet,
final BiFunction<JsonArray, DataAtom, Future<JsonArray>> consumer) {
final List<Future<JsonArray>> futures = new ArrayList<>();
groupSet.forEach(group -> futures.add(consumer.apply(group.data(), group.atom())));
return Fn.compressA(futures);
}
/**
* 切面执行器,执行`before -> executor -> after`流程处理数据记录。
*
* @param input {@link JsonObject} 输入数据记录
* @param config {@link JsonObject} 配置数据
* @param supplier {@link Supplier} 插件提取器,提取{@link AspectPlugin}插件
* @param atom {@link DataAtom} 模型定义
* @param executor {@link Function} 函数执行器
*
* @return {@link Future}<{@link JsonObject}>返回执行的最终结果
*/
static Future<JsonObject> runAop(final JsonObject input, final JsonObject config,
final Supplier<AspectPlugin> supplier, final DataAtom atom,
final Function<JsonObject, Future<JsonObject>> executor) {
/*
* 输入处理 input
*/
if (Ut.isNil(input)) {
return Ux.future(new JsonObject());
} else {
final AspectPlugin plugin = supplier.get();
/*
* 插件是否存在
*/
if (Objects.isNull(plugin)) {
/*
* 默认的主逻辑
*/
return executor.apply(input);
} else {
/*
* 插件不为空
* - beforeAsync
* - 主逻辑
* - afterAsync
*/
plugin.bind(atom);
Ox.LOG.Plugin.info(OxPlugin.class, "插件选择:{0}", plugin.getClass());
return plugin.beforeAsync(input, config)
/*
* 主逻辑
*/
.compose(executor)
/*
* 后置逻辑
*/
.compose(done -> plugin.afterAsync(done, config));
}
}
}
}