silentbalanceyh/vertx-zero

View on GitHub
vertx-ifx/zero-ifx-excel/src/main/java/io/vertx/up/plugin/excel/SheetImport.java

Summary

Maintainability
B
6 hrs
Test Coverage
package io.vertx.up.plugin.excel;

import io.horizon.eon.em.typed.ChangeFlag;
import io.horizon.exception.web._500InternalServerException;
import io.horizon.uca.log.Annal;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.up.error.excel._500ExportingErrorException;
import io.vertx.up.fn.Fn;
import io.vertx.up.plugin.excel.atom.ExTable;
import io.vertx.up.uca.jooq.UxJooq;
import io.vertx.up.unity.Ux;
import io.vertx.up.util.Ut;

import java.util.*;
import java.util.concurrent.ConcurrentMap;

/**
 * @author <a href="http://www.origin-x.cn">Lang</a>
 */
class SheetImport {
    private static final Annal LOGGER = Annal.get(ExcelClientImpl.class);
    private transient final ExcelHelper helper;

    private SheetImport(final ExcelHelper helper) {
        this.helper = helper;
    }

    static SheetImport create(final ExcelHelper helper) {
        return new SheetImport(helper);
    }

    <T> Set<T> saveEntity(final JsonArray data, final ExTable table) {
        final Set<T> resultSet = new HashSet<>();
        if (Objects.nonNull(table.classPojo()) && Objects.nonNull(table.classDao())) {
            try {
                final JsonObject filters = table.whereAncient(data);
                LOGGER.debug("[ Έξοδος ]  Table: {1}, Filters: {0}", filters.encode(), table.getName());
                final List<T> entities = Ux.fromJson(data, table.classPojo(), table.filePojo());
                final UxJooq jooq = this.jooq(table);
                assert null != jooq;
                final List<T> queried = jooq.fetch(filters);
                /*
                 * Compare by unique
                 */
                ConcurrentMap<ChangeFlag, List<T>> compared =
                    Ux.compare(queried, entities, table.ukIn(), table.filePojo());
                final List<T> qUpdate = compared.getOrDefault(ChangeFlag.UPDATE, new ArrayList<>());
                final List<T> qInsert = compared.getOrDefault(ChangeFlag.ADD, new ArrayList<>());
                if (!qInsert.isEmpty()) {
                    /*
                     * Compare by keys
                     */
                    final String entityKey = table.pkIn();
                    if (Objects.nonNull(entityKey)) {
                        final Set<String> keys = new HashSet<>();
                        qInsert.forEach(item -> {
                            final Object value = Ut.field(item, entityKey);
                            if (Objects.nonNull(value)) {
                                keys.add(value.toString());
                            }
                        });
                        final List<T> qKeys = jooq.fetchIn(entityKey, keys);
                        if (!qKeys.isEmpty()) {
                            compared = Ux.compare(qKeys, qInsert, table.ukIn(), table.filePojo());
                            qUpdate.addAll(compared.getOrDefault(ChangeFlag.UPDATE, new ArrayList<>()));
                            // qInsert reset
                            qInsert.clear();
                            qInsert.addAll(compared.getOrDefault(ChangeFlag.ADD, new ArrayList<>()));
                        }
                    }
                }
                /*
                 * Batch operation
                 */
                final List<T> batchInsert = jooq.insert(this.helper.compress(qInsert, table));
                resultSet.addAll(batchInsert);
                final List<T> batchUpdate = jooq.update(qUpdate);
                resultSet.addAll(batchUpdate);
                final int total = batchUpdate.size() + batchInsert.size();
                LOGGER.info("[ Έξοδος ] `{0}` -- ( {1} ), Inserted: {2}, Updated: {3}",
                    table.getName(), String.valueOf(total), String.valueOf(batchInsert.size()), String.valueOf(batchUpdate.size()));
            } catch (final Throwable ex) {
                ex.printStackTrace();
                LOGGER.fatal(ex);
            }
        }
        return resultSet;
    }

    <T> T saveEntity(final JsonObject data, final ExTable table) {
        T reference = null;
        if (Objects.nonNull(table.classPojo()) && Objects.nonNull(table.classDao())) {
            /*
             * First, find the record by unique filters that defined in business here.
             */
            final JsonObject filters = table.whereUnique(data);
            LOGGER.debug("[ Έξοδος ]  Table: {1}, Filters: {0}", filters.encode(), table.getName());
            final T entity = Ux.fromJson(data, table.classPojo(), table.filePojo());
            final UxJooq jooq = this.jooq(table);
            assert null != jooq;
            /*
             * Unique filter to fetch single record database here.
             * Such as code + sigma
             */
            final T queried = jooq.fetchOne(filters);
            if (null == queried) {
                /*
                 * Here are two situations that we could be careful
                 * 1. Unique Condition in source does not change, do insert here.
                 * 2. Key Condition existing in database, do update here.
                 */
                final String key = table.whereKey(data);
                if (Ut.isNil(key)) {
                    /*
                     * No definition of key here, insert directly.
                     */
                    reference = jooq.insert(entity);
                } else {
                    /*
                     * Double check to avoid issue:
                     * java.sql.SQLIntegrityConstraintViolationException: Duplicate entry 'xxx' for key 'PRIMARY'
                     */
                    final T fetched = jooq.fetchById(key);
                    if (null == fetched) {
                        /*
                         * In this situation, it common workflow to do data loading.
                         */
                        reference = jooq.insert(entity);
                    } else {
                        /*
                         * In this situation, it means the old unique filters have been changed.
                         * Such as:
                         * From
                         * id,      code,      sigma
                         * 001,     AB.CODE,   5sLyA90qSo7
                         *
                         * To
                         * id,      code,      sigma
                         * 001,     AB.CODE1,  5sLyA90qSo7
                         *
                         * Above example could show that primary key has not been modified
                         */
                        reference = jooq.update(entity);
                    }
                }
            } else {
                /*
                 * code, sigma did not change and we could identify this record
                 * do update directly to modify old information.
                 */
                reference = jooq.update(entity);
            }
        }
        return reference;
    }

    <T> Future<Set<T>> importAsync(final Set<ExTable> tables) {
        /*
         * Loading data into system
         */
        final List<Future<Set<T>>> futures = new ArrayList<>();
        tables.forEach(table ->
            futures.add(this.helper.extract(table)
                .compose(data -> Ux.future(this.saveEntity(data, table)))
            ));
        /* Set<T> handler */
        return Fn.combineT(futures).compose(result -> {
            final Set<T> entitySet = new HashSet<>();
            result.forEach(entitySet::addAll);
            return Ux.future(entitySet);
        });
    }

    <T> Future<Set<T>> importAsync(final AsyncResult<Set<ExTable>> async) {
        if (async.succeeded()) {
            final Set<ExTable> tables = async.result();
            return this.importAsync(tables);
        } else {
            final Throwable error = async.cause();
            if (Objects.nonNull(error)) {
                return Future.failedFuture(new _500ExportingErrorException(this.getClass(), error.getMessage()));
            } else {
                return Future.failedFuture(new _500InternalServerException(this.getClass(),
                    "Unexpected Error when Importing"));
            }
        }
    }

    private UxJooq jooq(final ExTable table) {
        final UxJooq jooq = Ux.Jooq.on(table.classDao());
        if (null != jooq) {
            final String pojoFile = table.filePojo();
            if (Ut.isNotNil(pojoFile)) {
                jooq.on(pojoFile);
            }
        }
        return jooq;
    }
}