macgregor/alexandria

View on GitHub
alexandria-core/src/main/java/com/github/macgregor/alexandria/BatchProcess.java

Summary

Maintainability
B
4 hrs
Test Coverage
package com.github.macgregor.alexandria;

import com.github.macgregor.alexandria.exceptions.AlexandriaException;
import com.github.macgregor.alexandria.exceptions.BatchProcessException;
import lombok.*;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Optional;

/**
 * Generic class to handle wrapping batch processing in robust error handling.
 *
 * This is generally used for processing collections of {@link com.github.macgregor.alexandria.Config.DocumentMetadata}
 * where we want to ensure we process all documents in the collection without losing any exceptions and their execution
 * context. The exception handling is boiler plate that was being duplicated all over, this abstracts it.
 *
 * A batch process has three parts:
 * <ol>
 *  <li><b>collect</b> - call the delegated {@link Batch#collect(Context)} method to collect the objects to process</li>
 *  <li><b>execute</b> - for each object collected, call the delegated {@link Task#execute(Context, Object)}</li>
 *  <li><b>after batch</b> - call the delegated {@link AfterBatch#execute(Context, Collection)}</li>
 * </ol>
 *
 * For example, {@link AlexandriaConvert} is implemented roughly as:
 *
 * <pre>
 * {@code
 * BatchProcess<Config.DocumentMetadata> batchProcess = new BatchProcess<>(context);
 * batchProcess.execute(
 *      context -> { // lambda implemented {@link Batch}
 *          context.config().metadata().get()
 *      },
 *      (context, metadata) -> { // lambda implemented {@link Task}
 *          AlexandriaConvert.convert(context, metadata);
 *      },
 *      (context, exceptions) -> { // lambda implemented {@link AfterBatch}
 *          Alexandria.save(context);
 *          return BatchProcess.EXCEPTIONS_UNHANDLED; //delegate exception handling to {@link BatchProcess}
 *      });
 * }
 * </pre>
 *
 * @param <T> type of the object being processed
 */
@Slf4j
@ToString
@Getter @Setter @Accessors(fluent = true)
@NoArgsConstructor @RequiredArgsConstructor @AllArgsConstructor
public class BatchProcess<T> {

    public static final Boolean EXCEPTIONS_HANDLED = true;
    public static final Boolean EXCEPTIONS_UNHANDLED = false;

    @NonNull private Context context;
    private Collection<AlexandriaException> exceptions = new ArrayList<>();

    /**
     * Execute the batch, providing a default {@link AfterBatch} that calls {@link Context#save(Context)} before
     * throwing any errors that occurred.
     *
     * @see Batch#execute(Batch, Task, AfterBatch)
     *
     * @param batch  Batch collection delegate
     * @param task  Task execution delegate
     * @throws BatchProcessException  Wrapper containing all exceptions thrown while processing the batch
     */
    public void execute(Batch<T> batch, Task<T> task) throws BatchProcessException {
        execute(batch, task, (context, exceptions) -> {
            Context.save(context);
            return EXCEPTIONS_UNHANDLED;
        });
    }

    /**
     * Execute the batch wrapping the delegated methods in robust exception handling.
     *
     * This method should guarantee all exceptions thrown, even runtime exceptions are wrapped and thrown as a checked
     * {@link BatchProcessException} at the end of processing. This allows as much of the batch to be processed as possible
     * before throwing an error. If a single object has a problem, why fail the whole thing?
     *
     * @param batch  Batch collection delegate
     * @param task  Task execution delegate
     * @param after  After batch delegate
     * @throws BatchProcessException  Wrapper containing all exceptions thrown while processing the batch
     */
    public void execute(Batch<T> batch, Task<T> task, AfterBatch<T> after) throws BatchProcessException {
        try {
            for (T t : batch.collect(context)) {
                try {
                    task.execute(context, t);
                } catch(AlexandriaException e){
                    if(t instanceof Config.DocumentMetadata && !e.metadata().isPresent()){
                        e.metadata(Optional.of((Config.DocumentMetadata) t));
                    }
                    exceptions.add(e);
                } catch(Exception e){
                    exceptions.add(buildAlexandriaException(e, Optional.of(t), Optional.of("Unexpected exception thrown processing task.")));
                }
            }
        } catch(BatchProcessException e){
            if(e.exceptions().isEmpty()){
                AlexandriaException alexandriaException = new AlexandriaException(e.getMessage(), e.getCause());
                alexandriaException.setStackTrace(e.getStackTrace());
                alexandriaException.metadata(e.metadata());
                exceptions.add(alexandriaException);
            }
            exceptions.addAll(e.exceptions());
        } catch(AlexandriaException e){
            exceptions.add(e);
        } catch(Exception e){
            exceptions.add(buildAlexandriaException(e, Optional.empty(), Optional.of("Unexpected exception thrown processing batch.")));
        }

        boolean exceptionsHandled = EXCEPTIONS_UNHANDLED;
        try {
            exceptionsHandled = after.execute(context, exceptions);;
        } catch(BatchProcessException e){
            exceptions.addAll(e.exceptions());
        } catch(AlexandriaException e){
            exceptions.add(e);
        } catch(Exception e){
            exceptions.add(buildAlexandriaException(e, Optional.empty(), Optional.of("Unexpected exception thrown processing after batch.")));
        }
        if(exceptions.size() > 0 && exceptionsHandled == EXCEPTIONS_UNHANDLED){
            BatchProcessException exception = new BatchProcessException.Builder()
                    .withMessage("Alexandria batch error.")
                    .causedBy(exceptions)
                    .build();
            log.error(exception.toString());
            exception.logStacktrace();
            throw exception;
        }
    }

    /**
     * Convenience method to remove some boiler plate from wrapping exceptions.
     *
     * @param cause  Cause being wrapped
     * @param t  Type of object being processed, if processing {@link com.github.macgregor.alexandria.Config.DocumentMetadata} we add it for debugging context
     * @param message  optional error message
     * @return  new exception ready to be thrown
     */
    protected static <T> AlexandriaException buildAlexandriaException(Throwable cause, Optional<T> t, Optional<String> message){
        AlexandriaException.Builder exceptionBuilder = new AlexandriaException.Builder()
                .causedBy(cause);
        if(message.isPresent()){
            exceptionBuilder.withMessage(message.get());
        }
        if(t.isPresent() && t.get() instanceof Config.DocumentMetadata){
            exceptionBuilder.metadataContext((Config.DocumentMetadata) t.get());
        }
        return exceptionBuilder.build();
    }

    /**
     * Lambda compatible interface that will collect objects to be processed
     *
     * @param <T>  type of the object being processed
     */
    @FunctionalInterface
    public interface Batch<T> {
        /**
         * Delegated method to collect objects for processing.
         * @param context  Alexandria context that may or may not be needed by caller.
         * @return  collection of objects to process
         * @throws Exception  Critical error collecting documents
         */
        Collection<T> collect(Context context) throws Exception;
    }

    /**
     * Lambda compatible interface that will process each object returned by {@link Batch#collect(Context)}.
     *
     * @param <T>  type of the object being processed
     */
    @FunctionalInterface
    public interface Task<T> {
        /**
         * Delegated method to process an individual object.
         *
         * @param context  Alexandria context that may or may not be needed by caller.
         * @param t  Object to process
         * @throws Exception  Error processing object, the rest of the batch will continue processing
         */
        void execute(Context context, T t) throws Exception;
    }

    /**
     * Lambda compatible interface that will be called after processing is complete
     *
     * @param <T>  type of the object being processed
     */
    @FunctionalInterface
    public interface AfterBatch<T> {
        /**
         * Delegated method after a batch is completed, useful for logging results or saving state.
         *
         * @param context  Alexandria context that may or may not be needed by caller.
         * @param exceptions  Any exceptions thrown during processing, or empty list if no errors occurred
         * @return  False if exceptions are not being handled by the caller (triggering {@link Batch#execute(Batch, Task, AfterBatch)}
         *          to thrown a {@link BatchProcessException}, or true if the errors have been handled and no {@link BatchProcessException}
         *          will be thrown.
         * @throws Exception  This exception will be wrapped by {@link Batch#execute(Batch, Task, AfterBatch)} just like
         *                    exceptions at any other phase and thrown in a {@link BatchProcessException}
         */
        boolean execute(Context context, Collection<AlexandriaException> exceptions) throws Exception;
    }
}