gojekfarm/beast

View on GitHub
src/main/java/com/gojek/beast/sink/bq/BqSink.java

Summary

Maintainability
A
2 hrs
Test Coverage
package com.gojek.beast.sink.bq;

import com.gojek.beast.models.Record;
import com.gojek.beast.models.Records;
import com.gojek.beast.models.Status;
import com.gojek.beast.sink.Sink;
import com.gojek.beast.sink.bq.handler.BQFilteredResponse;
import com.gojek.beast.sink.bq.handler.BQResponseParser;
import com.gojek.beast.sink.bq.handler.BQRow;
import com.gojek.beast.sink.dlq.ErrorWriter;
import com.gojek.beast.sink.dlq.RecordsErrorType;
import com.gojek.beast.stats.Stats;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import com.google.common.collect.ImmutableMap;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.time.Instant;
import java.util.Collections;
import java.util.List;

import static com.google.cloud.bigquery.InsertAllRequest.Builder;
import static com.google.cloud.bigquery.InsertAllRequest.newBuilder;

@Slf4j
@AllArgsConstructor
public class BqSink implements Sink {

    private final BigQuery bigquery;
    private final TableId tableId;
    private final BQResponseParser responseParser;
    private final BQRow recordInserter;
    private final ErrorWriter errorWriter;

    private final Stats statsClient = Stats.client();

    @Override
    public Status push(Records records) {
        InsertAllResponse response = insertIntoBQ(records.getRecords());
        if (response.hasErrors()) {
            //parse the error records
            BQFilteredResponse filteredResponse = responseParser.parseResponse(records.getRecords(), response);

            // if batch contains records that we can't really handle, fail whole batch
            List<Record> unhandledRecords = filteredResponse.getUnhandledRecords();
            if (!unhandledRecords.isEmpty()) {
                log.error("Batch with records size: {} contains invalid records, marking this batch to fail", unhandledRecords.size());
                statsClient.gauge("record.processing.failure,type=invalid," + statsClient.getBqTags(), unhandledRecords.size());
                return new InsertStatus(false, response.getInsertErrors());
            }

            // retryable records
            List<Record> retryableRecords = filteredResponse.getRetryableRecords();
            if (!retryableRecords.isEmpty()) {
                // try inserting valid records into bq
                InsertAllResponse retriedResponse = insertIntoBQ(retryableRecords);
                if (retriedResponse.hasErrors()) {
                    statsClient.gauge("record.processing.failure,type=retry," + statsClient.getBqTags(), retryableRecords.size());
                    return new InsertStatus(false, retriedResponse.getInsertErrors());
                }
            }

            // DLQ sinkable records
            List<Record> oobRecords = filteredResponse.getOobRecords();
            if (!oobRecords.isEmpty()) {
                log.warn("Error handler parsed OOB records size {}, handoff to the writer {}", oobRecords.size(), errorWriter.getClass().getSimpleName());
                statsClient.count("kafka.error.records.count,type=oob," + statsClient.getBqTags(), oobRecords.size());
                final Status dlqStatus = errorWriter.writeRecords(ImmutableMap.of(RecordsErrorType.OOB, oobRecords));
                if (!dlqStatus.isSuccess()) {
                    log.error("Batch with records size: {} contains DLQ sinkable records but failed to sink", oobRecords.size());
                    return dlqStatus;
                }
            }
        }

        return new InsertStatus(true, Collections.emptyMap());
    }

    private InsertAllResponse insertIntoBQ(List<Record> records) {
        Instant start = Instant.now();
        Builder builder = newBuilder(tableId);
        records.forEach((Record m) -> builder.addRow(recordInserter.of(m)));
        InsertAllRequest rows = builder.build();
        InsertAllResponse response = bigquery.insertAll(rows);

        log.info("Pushed a batch of {} records to BQ. Insert success?: {}", records.size(), !response.hasErrors());
        statsClient.count("bq.sink.push.records", records.size());
        statsClient.timeIt("bq.sink.push.time", start);
        return response;
    }

    @Override
    public void close(String reason) {
        log.info("BQSink closed: {}", reason);
    }
}