gojekfarm/beast

View on GitHub
src/main/java/com/gojek/beast/sink/MultiSink.java

Summary

Maintainability
A
0 mins
Test Coverage
package com.gojek.beast.sink;

import com.gojek.beast.models.MultiException;
import com.gojek.beast.models.Records;
import com.gojek.beast.models.Status;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.stream.Collectors;

import static com.gojek.beast.config.Constants.SUCCESS_STATUS;

@Slf4j
@AllArgsConstructor
public class MultiSink implements Sink {
    private final List<Sink> sinks;

    @Override
    public Status push(Records records) {
        List<Status> failures = sinks.stream()
                .map(s -> s.push(records))
                .filter(s -> !s.isSuccess())
                .collect(Collectors.toList());
        return failures.isEmpty() ? SUCCESS_STATUS : new MultiException(failures);
    }

    @Override
    public void close(String reason) {
        sinks.forEach(sink -> sink.close(reason));
        log.info("Stopped MultiSink Successfully: {}", reason);
    }
}