gojekfarm/beast

View on GitHub
src/main/java/com/gojek/beast/commiter/OffsetState.java

Summary

Maintainability
A
0 mins
Test Coverage
package com.gojek.beast.commiter;

import lombok.Getter;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.time.Instant;
import java.util.Map;
import java.util.Set;

public class OffsetState {
    @Getter
    private final long acknowledgeTimeoutMs;
    @Getter
    private final long offsetBatchDuration;
    private boolean start;
    private Instant lastCommittedTime;
    private Set<Map<TopicPartition, OffsetAndMetadata>> partitionOffsetAck;

    public OffsetState(Set<Map<TopicPartition, OffsetAndMetadata>> partitionOffsetAck, long acknowledgeTimeoutMs, long offsetBatchDuration) {
        this.partitionOffsetAck = partitionOffsetAck;
        this.acknowledgeTimeoutMs = acknowledgeTimeoutMs;
        this.offsetBatchDuration = offsetBatchDuration;
        lastCommittedTime = Instant.now();
    }

    public boolean shouldCloseConsumer(Map<TopicPartition, OffsetAndMetadata> currentOffset) {
        if (!start) {
            return false;
        }
        boolean ackTimedOut = (Instant.now().toEpochMilli() - lastCommittedTime.toEpochMilli()) > acknowledgeTimeoutMs;
        return ackTimedOut;
    }

    public int partitionOffsetAckSize() {
        return partitionOffsetAck.size();
    }

    public boolean removeFromOffsetAck(Map<TopicPartition, OffsetAndMetadata> commitOffset) {
        return partitionOffsetAck.remove(commitOffset);
    }

    public void resetOffset() {
        lastCommittedTime = Instant.now();
    }

    public void startTimer() {
        start = true;
    }
}