alkal-io/kalium

View on GitHub
src/main/java/io/alkal/kalium/internals/KaliumImpl.java

Summary

Maintainability
A
0 mins
Test Coverage
package io.alkal.kalium.internals;

import io.alkal.kalium.Kalium;
import io.alkal.kalium.annotations.On;
import io.alkal.kalium.exceptions.KaliumException;
import io.alkal.kalium.interfaces.KaliumQueueAdapter;
import io.alkal.kalium.internals.utils.ReflectionUtils;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.*;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * @author Ziv Salzman
 * Created on 20-Jan-2019
 */

public class KaliumImpl implements Kalium, QueueListener {

    private static final Logger logger = Logger.getLogger(KaliumImpl.class.getName());

    private List<Object> reactions;
    private Map<String, Object> reactionIdToReactionMap = new HashMap<>();
    private Map<String, Map<Class, List<Method>>> reactionIdToObjectTypeToMethodMap = new HashMap<>();
    private KaliumQueueAdapter queueAdapter;


    @Override
    public void start() {
        if (reactions != null) {
            reactions.forEach(reaction -> addReactionInternal(
                    reaction.getClass().getSimpleName(), reaction));
        }
        queueAdapter.start();
        logger.info("Kalium started :)");
    }

    @Override
    public void stop() {
        queueAdapter.stop();
        logger.info("Kalium stopped :(");
    }

    @Override
    public void addReaction(Object reaction) {
        if (reaction == null) {
            logger.warning("Reaction is null! No new reaction will be added.");
            return;
        }
        if (reactions == null) {
            reactions = new LinkedList<>();
        }
        reactions.add(reaction);

        logger.info("New Reaction object was added [class=" + reaction.getClass().getName() + "]");
    }


    @Override
    public void post(Object object) {
        if (object != null) {
            logger.log(Level.FINEST, "Posting object: " + object.toString());
            queueAdapter.post(object);
        } else {
            logger.info("Object is null, no object will be posted!");
        }
    }

    void setQueueAdapter(KaliumQueueAdapter queueAdapter) {
        this.queueAdapter = queueAdapter;
    }


    @Override
    public void onObjectReceived(String reactionId, Object object) {
        if (!reactionIdToObjectTypeToMethodMap.containsKey(reactionId)) {
            logger.info("[reactionId=" + reactionId + "] does not match any of the registered reactions");
            return;
        }
        Map<Class, List<Method>> objectTypeToHandlersMap = reactionIdToObjectTypeToMethodMap.get(reactionId);
        if (!objectTypeToHandlersMap.containsKey(object.getClass())) return;
        //TODO run in parallel
        objectTypeToHandlersMap.get(object.getClass()).stream().forEach(method -> {
            Object reaction = reactionIdToReactionMap.get(reactionId);
            try {
                method.invoke(reaction, object);
            } catch (IllegalAccessException | InvocationTargetException e) {
                logger.log(Level.WARNING, "Failed to invoke [method=" +
                        method.getName() + "],[reactionId=" + reactionId + "]", e);
            }
        });

    }

    @Override
    public Map<String, Collection<Class>> getReactionIdsToObjectTypesMap() {
        Map<String, Collection<Class>> reactionIdsToObjectTypeMap = new HashMap<>();
        reactionIdToObjectTypeToMethodMap.entrySet().forEach(entry -> {
            Collection<Class> objectTypes = entry.getValue().keySet();

            reactionIdsToObjectTypeMap.put(entry.getKey(), objectTypes);
        });
        logger.finest(reactionIdsToObjectTypeMap.toString());
        return reactionIdsToObjectTypeMap;
    }

    @Override
    public <T> void on(Class<T> tClass, Consumer<T> consumer) throws KaliumException {
        on(tClass, consumer, UUID.randomUUID().toString());
    }

    @Override
    public <T> void on(Class<T> tClass, Consumer<T> consumer, String reactionId) throws KaliumException {
        validateOnInputs(tClass, consumer, reactionId);
        BaseReaction<T> reaction = new BaseReaction<T>() {
            @On
            public void doSomething(T t) {
                consumer.accept(t);
            }
        };
        Map<Class, List<Method>> objectTypeToHandlersMap = new HashMap<>();
        objectTypeToHandlersMap.put(tClass,
                ReflectionUtils.getMethodsAnnotatedWithOn(reaction.getClass()));
        reactionIdToObjectTypeToMethodMap.put(reactionId, objectTypeToHandlersMap);
        reactionIdToReactionMap.put(reactionId, reaction);
        logger.info("Reaction in form of lambda expression was added. " +
                "[class=" + tClass.getName() + "],[reactionId=" + reactionId + "]");

    }

    private void validateOnInputs(Class tClass, Consumer consumer, String reactionId) throws KaliumException {
        KaliumException exception = null;
        if (tClass == null) {
            exception = new KaliumException(".on(...) cannot use null class!");
        } else if (consumer == null) {
            exception = new KaliumException(".on(...) cannot use null reaction lambda expression!");
        } else if (reactionId == null || reactionId.isEmpty()) {
            exception = new KaliumException(".on(...) cannot use null or empty reactionId!");
        }

        if (exception != null) {
            logger.log(Level.WARNING, exception.getMessage(), exception);
            throw exception;
        }
    }

    private void addReactionInternal(String reactionId, Object reaction) {

        Class reactionClass = reaction.getClass();
        Map<Class, List<Method>> objectTypeToHandlersMap = new HashMap<>();
        reactionIdToObjectTypeToMethodMap.put(reactionId, objectTypeToHandlersMap);
        reactionIdToReactionMap.put(reactionId, reaction);
        ReflectionUtils.getMethodsAnnotatedWithOn(reactionClass).forEach(method -> {
                    assert method.getParameterCount() == 1;
                    Class parameter = method.getParameterTypes()[0];
                    List<Method> handlers = objectTypeToHandlersMap.get(parameter);
                    if (handlers == null) {
                        handlers = new LinkedList<>();
                        objectTypeToHandlersMap.put(parameter, handlers);
                    }
                    handlers.add(method);
                }
        );
    }
}