Sberned/spring-flow-statemachine

View on GitHub
state-machine-core/src/main/java/ru/sberned/statemachine/StateMachine.java

Summary

Maintainability
A
45 mins
Test Coverage
package ru.sberned.statemachine;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.event.EventListener;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
import ru.sberned.statemachine.exception.StateMachineException;
import ru.sberned.statemachine.lock.LockProvider;
import ru.sberned.statemachine.processor.UnableToProcessException;
import ru.sberned.statemachine.processor.UnhandledMessageProcessor;
import ru.sberned.statemachine.processor.UnhandledMessageProcessor.IssueType;
import ru.sberned.statemachine.state.HasStateAndId;
import ru.sberned.statemachine.state.ItemWithStateProvider;
import ru.sberned.statemachine.state.StateChangedEvent;
import ru.sberned.statemachine.state.StateChanger;

import java.text.MessageFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

import static ru.sberned.statemachine.processor.UnhandledMessageProcessor.IssueType.*;

/**
 * Created by Evgeniya Patuk (jpatuk@gmail.com) on 09/11/2016.
 */
public class StateMachine<ENTITY extends HasStateAndId<ID, STATE>, STATE, ID> {
    private static final Logger LOGGER = LoggerFactory.getLogger(StateMachine.class);
    private final ItemWithStateProvider<ENTITY, ID> stateProvider;
    private final StateChanger<ENTITY, STATE> stateChanger;
    private final LockProvider lockProvider;
    // To make @Transactional work
    @Autowired
    private StateMachine<ENTITY, STATE, ID> stateMachine = this;
    volatile private StateRepository<ENTITY, STATE, ID> stateRepository;
    @Value("${statemachine.lock.timeout.ms:5000}")
    private long lockTimeout;

    @Autowired
    public StateMachine(ItemWithStateProvider<ENTITY, ID> stateProvider,
                        StateChanger<ENTITY, STATE> stateChanger,
                        LockProvider lockProvider) {
        this.stateProvider = stateProvider;
        this.stateChanger = stateChanger;
        this.lockProvider = lockProvider;
    }

    public void setStateRepository(StateRepository<ENTITY, STATE, ID> stateRepository) {
        this.stateRepository = stateRepository;
    }

    @EventListener
    public void handleStateChanged(StateChangedEvent<STATE, ID> event) {
        Assert.notNull(stateRepository, "StateRepository must be initialized!");

        changeState(event.getIds(), event.getNewState(), event.getInfo());
    }

    public Map<ID, Future<Boolean>> changeState(Collection<ID> ids, STATE newState, Object info) {
        if (ids != null) {
            Map<ID, Future<Boolean>> processingResults = new HashMap<>();
            ids.forEach(id -> {
                Future<Boolean> future = CompletableFuture.supplyAsync(() -> {
                    try {
                        stateMachine.handleMessage(id, newState, info);
                        return true;
                    } catch (StateMachineException e) {
                        handleIncorrectCase(id, newState, e.getIssueType(), null);
                    } catch (InterruptedException e) {
                        handleIncorrectCase(id, newState, INTERRUPTED_EXCEPTION, e);
                    } catch (Exception e) {
                        handleIncorrectCase(id, newState, EXECUTION_EXCEPTION, e);
                    }
                    return false;
                });
                processingResults.put(id, future);
            });
            return processingResults;
        }
        return Collections.emptyMap();
    }

    // public is here to make @Transactional work
    @Transactional(rollbackFor = {Exception.class})
    public void handleMessage(ID id, STATE newState, Object info) throws Exception {
        Lock lockObject = lockProvider.getLockObject(id);
        boolean locked = false;
        try {
            if (locked = lockObject.tryLock(lockTimeout, TimeUnit.MILLISECONDS)) {
                ENTITY entity = stateProvider.getItemById(id);
                if (entity == null) throw new StateMachineException(ENTITY_NOT_FOUND);

                STATE currentState = entity.getState();
                if (stateRepository.isValidTransition(currentState, newState)) {
                    processItem(entity, currentState, newState, info);
                } else {
                    throw new StateMachineException(INVALID_TRANSITION);
                }
            } else {
                throw new StateMachineException(TIMEOUT);
            }
        } finally {
            if (locked) lockObject.unlock();
        }
    }

    private void handleIncorrectCase(ID id, STATE newState, IssueType issueType, Exception e) {
        String errorMsg = MessageFormat.format("Processing for item with id {0} failed. New state is {1}. Issue type is {2}", id, newState, issueType);

        if (e != null) LOGGER.error(errorMsg, e);
        else LOGGER.error(errorMsg);

        UnhandledMessageProcessor<ID, STATE> unhandledMessageProcessor = stateRepository.getUnhandledMessageProcessor();
        if (unhandledMessageProcessor != null) {
            unhandledMessageProcessor.process(id, newState, issueType, e);
        }
    }

    private void processItem(ENTITY item, STATE from, STATE to, Object info) {
        stateRepository.getBeforeAll().forEach(handler -> {
            if (!handler.beforeTransition(item, to)) {
                throw new UnableToProcessException();
            }
        });
        stateRepository.getBefore(from, to).forEach(handler -> {
            if (!handler.beforeTransition(item)) {
                throw new UnableToProcessException();
            }
        });

        if (info != null) {
            stateChanger.moveToState(to, item, info);
        } else {
            stateChanger.moveToState(to, item);
        }

        stateRepository.getAfter(from, to).forEach(handler -> handler.afterTransition(item));
        stateRepository.getAfterAll().forEach(handler -> handler.afterTransition(item, to));
    }
}