gojekfarm/beast

View on GitHub
src/main/java/com/gojek/beast/sink/executor/RetryExecutor.java

Summary

Maintainability
A
40 mins
Test Coverage
package com.gojek.beast.sink.executor;

import com.gojek.beast.backoff.BackOffProvider;
import com.gojek.beast.models.FailureStatus;
import com.gojek.beast.models.Records;
import com.gojek.beast.models.Status;
import com.gojek.beast.sink.Sink;
import com.gojek.beast.stats.Stats;

public class RetryExecutor implements Executor {

    private Sink sink;
    private Records records;
    private int maxAttempts;
    private BackOffProvider backOffProvider;
    private final Stats statsClient = Stats.client(); // metrics client

    private Status status;
    private int attemptCount = 0;

    public RetryExecutor(Sink sink, Records records, int maxAttempts, BackOffProvider backOffProvider) {
        this.sink = sink;
        this.records = records;
        this.maxAttempts = maxAttempts;
        this.backOffProvider = backOffProvider;
    }

    @Override
    public Status status() {
        statsClient.gauge("RetrySink.queue.push.attempts", attemptCount);
        return status;
    }

    private boolean shouldExecute() {
        return ((attemptCount < maxAttempts) && (!status.isSuccess()));
    }

    @Override
    public Executor ifFailure() {
        if (shouldExecute()) {
            backOffProvider.backOff(attemptCount);
            execute();
        }
        return this;
    }

    @Override
    public Executor execute() {
        attemptCount++;
        try {
            status = sink.push(records);
        } catch (Exception e) {
            statsClient.increment("retrysink.exec.failure.count," + statsClient.getBqTags());
            status = new FailureStatus(e);
        }
        ifFailure();
        return this;
    }
}