gojekfarm/beast

View on GitHub
src/main/java/com/gojek/beast/worker/Worker.java

Summary

Maintainability
A
25 mins
Test Coverage
package com.gojek.beast.worker;

import com.gojek.beast.models.Status;
import com.gojek.beast.stats.Stats;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public abstract class Worker extends Thread {
    private static StopEvent stopEvent;
    private final WorkerState state;
    private final Stats statsClient = Stats.client();

    public Worker(String name, WorkerState state) {
        super(name);
        this.state = state;
    }

    public abstract void stop(String reason);

    protected abstract Status job();

    @Override
    public void run() {
        log.info("Started worker {}", getClass().getSimpleName());
        Status status;
        do {
            status = job();
        } while (!state.isStopped() && status.isSuccess());

        if (!status.isSuccess()) {
            if (status.getException().isPresent()) {
                statsClient.increment("global.errors,exception=" + status.getException().get().getClass().getName());
            } else {
                log.warn("non-successful worker status had empty exception");
            }
        }

        onStopEvent(status.toString());
    }

    private void onStopEvent(String reason) {
        log.debug("{} returned Error::{}, stopping other worker threads", getClass().getSimpleName(), reason);
        if (stopEvent == null) {
            stopEvent = new StopEvent(getClass().getSimpleName(), reason);
        }
        state.closeWorker();
        stop(stopEvent.toString());
        log.info("Stopped worker {} job status: {}, reason: {}", getClass().getSimpleName(), reason, stopEvent);
    }
}