gojekfarm/beast

View on GitHub
src/main/java/com/gojek/beast/worker/ConsumerWorker.java

Summary

Maintainability
A
0 mins
Test Coverage
package com.gojek.beast.worker;

import com.gojek.beast.consumer.MessageConsumer;
import com.gojek.beast.models.FailureStatus;
import com.gojek.beast.models.Status;
import com.gojek.beast.stats.Stats;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ConsumerWorker extends Worker {
    private final MessageConsumer messageConsumer;
    private final Stats statsClient = Stats.client();

    public ConsumerWorker(String name, MessageConsumer messageConsumer, WorkerState workerState) {
        super(name, workerState);
        this.messageConsumer = messageConsumer;
    }

    @Override
    public Status job() {
        Status status;
        try {
            status = messageConsumer.consume();
            if (!status.isSuccess()) {
                log.error("message consumption failed: {}", status.toString());
                statsClient.increment("worker.consumer.consume.errors");
            }
        } catch (RuntimeException e) {
            log.error("Exception::Stop Message Consumption: {}", e.getMessage());
            return new FailureStatus(e);
        }
        return status;
    }

    @Override
    public void stop(String reason) {
        log.info("Stopping consumer worker with reason {}", reason);
        messageConsumer.close();
    }
}