gojekfarm/beast

View on GitHub
src/main/java/com/gojek/beast/protomapping/ProtoUpdateListener.java

Summary

Maintainability
A
0 mins
Test Coverage
package com.gojek.beast.protomapping;

import com.gojek.beast.Clock;
import com.gojek.beast.config.AppConfig;
import com.gojek.beast.config.ColumnMapping;
import com.gojek.beast.config.ConfigStore;
import com.gojek.beast.config.ProtoMappingConfig;
import com.gojek.beast.config.StencilConfig;
import com.gojek.beast.converter.ConsumerRecordConverter;
import com.gojek.beast.converter.RowMapper;
import com.gojek.beast.exception.BQDatasetLocationChangedException;
import com.gojek.beast.exception.BQPartitionKeyNotSpecified;
import com.gojek.beast.exception.BQSchemaMappingException;
import com.gojek.beast.exception.BQTableUpdateFailure;
import com.gojek.beast.exception.ProtoNotFoundException;
import com.gojek.beast.models.BQField;
import com.gojek.beast.models.ProtoField;
import com.gojek.beast.models.ProtoFieldFactory;
import com.gojek.beast.sink.bq.BQClient;
import com.gojek.beast.sink.dlq.ErrorWriter;
import com.gojek.beast.stats.Stats;
import com.gojek.de.stencil.StencilClientFactory;
import com.gojek.de.stencil.client.StencilClient;
import com.gojek.de.stencil.exception.StencilRuntimeException;
import com.gojek.de.stencil.models.DescriptorAndTypeName;
import com.gojek.de.stencil.parser.ProtoParser;
import com.gojek.de.stencil.parser.ProtoParserWithRefresh;
import com.gojek.de.stencil.utils.StencilUtils;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.Field;
import com.google.common.annotations.VisibleForTesting;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@Slf4j
public class ProtoUpdateListener extends com.gojek.de.stencil.cache.ProtoUpdateListener {
    private final String proto;
    private final ProtoMappingConfig protoMappingConfig;
    private final StencilConfig stencilConfig;
    private final AppConfig appConfig;
    private ConsumerRecordConverter recordConverter;
    private StencilClient stencilClient;
    private Converter protoMappingConverter;
    private Parser protoMappingParser;
    private BQClient bqClient;
    private ProtoFieldFactory protoFieldFactory;
    private Stats statsClient = Stats.client();
    private ErrorWriter errorWriter;

    public ProtoUpdateListener(ConfigStore configStore, Converter protoMappingConverter, Parser protoMappingParser, BigQuery bqInstance, ErrorWriter errorWriter) throws IOException {
        super(configStore.getStencilConfig().getProtoSchema());
        this.proto = configStore.getStencilConfig().getProtoSchema();
        this.protoMappingConfig = configStore.getProtoMappingConfig();
        this.stencilConfig = configStore.getStencilConfig();
        this.appConfig = configStore.getAppConfig();
        this.protoMappingConverter = protoMappingConverter;
        this.protoMappingParser = protoMappingParser;
        this.protoFieldFactory = new ProtoFieldFactory();
        this.bqClient = new BQClient(bqInstance, configStore.getBqConfig());
        this.errorWriter = errorWriter;
        this.createStencilClient();
        this.setProtoParser(getProtoMapping());
    }

    @VisibleForTesting
    public ProtoUpdateListener(ConfigStore configStore, StencilClient stencilClient, Converter protoMappingConverter, Parser protoMappingParser, BQClient bqClient, ProtoFieldFactory protoFieldFactory) {
        super(configStore.getStencilConfig().getProtoSchema());
        this.proto = configStore.getStencilConfig().getProtoSchema();
        this.protoMappingConfig = configStore.getProtoMappingConfig();
        this.stencilConfig = configStore.getStencilConfig();
        this.appConfig = configStore.getAppConfig();
        this.stencilClient = stencilClient;
        this.protoMappingConverter = protoMappingConverter;
        this.protoMappingParser = protoMappingParser;
        this.bqClient = bqClient;
        this.protoFieldFactory = protoFieldFactory;
    }

    private void createStencilClient() {
        try {
            if (protoMappingConfig.isAutoSchemaUpdateEnabled()) {
                stencilClient = StencilClientFactory.getClient(stencilConfig.getStencilUrl(), System.getenv(), Stats.client().getStatsDClient(), this);

                log.info("updating bq table at startup for proto schema {}", getProto());
                onProtoUpdate(stencilConfig.getStencilUrl(), stencilClient.getAllDescriptorAndTypeName());
            } else {
                stencilClient = StencilClientFactory.getClient(stencilConfig.getStencilUrl(), System.getenv(), Stats.client().getStatsDClient());
            }
        } catch (RuntimeException e) {
            emitStencilExceptionMetrics(e);
            throw e;
        }
    }

    @Override
    public void onProtoUpdate(String url, Map<String, DescriptorAndTypeName> newDescriptors) {
        log.info("stencil cache was refreshed, validating if bigquery schema changed");
        try {
            ProtoField protoField = protoFieldFactory.getProtoField();
            protoField = protoMappingParser.parseFields(protoField, proto, StencilUtils.getAllProtobufDescriptors(newDescriptors), StencilUtils.getTypeNameToPackageNameMap(newDescriptors));
            updateProtoParser(protoField);
        } catch (BigQueryException | ProtoNotFoundException | BQSchemaMappingException | BQPartitionKeyNotSpecified
                | BQDatasetLocationChangedException | IOException e) {
            String errMsg = "Error while updating bigquery table on callback:" + e.getMessage();
            log.error(errMsg);
            statsClient.increment("bq.table.upsert.failures");
            throw new BQTableUpdateFailure(errMsg, e);
        }
    }

    // First get latest protomapping, update bq schema, and if all goes fine
    // then only update beast's proto mapping config
    private void updateProtoParser(final ProtoField protoField) throws IOException {
        String protoMappingString = protoMappingConverter.generateColumnMappings(protoField.getFields());
        List<Field> bqSchemaFields = protoMappingConverter.generateBigquerySchema(protoField);
        addMetadataFields(bqSchemaFields);
        bqClient.upsertTable(bqSchemaFields);
        protoMappingConfig.setProperty("PROTO_COLUMN_MAPPING", protoMappingString);
        setProtoParser(protoMappingConfig.getProtoColumnMapping());
    }

    private void addMetadataFields(List<Field> bqSchemaFields) {
        List<Field> bqMetadataFields = new ArrayList<>();
        String namespaceName = appConfig.getBqMetadataNamespace();
        if (namespaceName.isEmpty()) {
            bqMetadataFields.addAll(BQField.getMetadataFields());
        } else {
            bqMetadataFields.add(BQField.getNamespacedMetadataField(namespaceName));
        }

        List<String> duplicateFields = getDuplicateFields(bqSchemaFields, bqMetadataFields).stream().map(Field::getName).collect(Collectors.toList());
        if (duplicateFields.size() > 0) {
            throw new BQSchemaMappingException(String.format("Metadata field(s) is already present in the schema. "
                    + "fields: %s", duplicateFields));
        }
        bqSchemaFields.addAll(bqMetadataFields);
    }

    private ColumnMapping getProtoMapping() throws IOException {
        ProtoField protoField = new ProtoField();
        protoField = protoMappingParser.parseFields(protoField, proto, stencilClient.getAll(), stencilClient.getTypeNameToPackageNameMap());
        String protoMapping = protoMappingConverter.generateColumnMappings(protoField.getFields());
        protoMappingConfig.setProperty("PROTO_COLUMN_MAPPING", protoMapping);
        return protoMappingConfig.getProtoColumnMapping();
    }

    public ConsumerRecordConverter getProtoParser() {
        return recordConverter;
    }

    private void setProtoParser(ColumnMapping columnMapping) {
        if (stencilConfig.getAutoRefreshCache()) {
            // periodic refresh
            ProtoParser protoParser = new ProtoParser(stencilClient, proto);
            recordConverter = new ConsumerRecordConverter(new RowMapper(columnMapping, protoMappingConfig.getFailOnUnknownFields()),
                    protoParser, new Clock(), appConfig, errorWriter);
        } else {
            // on-demand refresh
            ProtoParserWithRefresh protoParser = new ProtoParserWithRefresh(stencilClient, proto);
            recordConverter = new ConsumerRecordConverter(new RowMapper(columnMapping, protoMappingConfig.getFailOnUnknownFields()),
                    protoParser, new Clock(), appConfig, errorWriter);
        }
    }

    public void close() throws IOException {
        stencilClient.close();
    }

    private void emitStencilExceptionMetrics(RuntimeException e) {
        if (e.getCause() instanceof StencilRuntimeException) {
            if (e.getCause().getCause() instanceof org.apache.http.conn.ConnectTimeoutException
                    || e.getCause().getCause() instanceof org.apache.http.conn.HttpHostConnectException) {
                statsClient.increment("stencil.connection.timeout.errors,exception=" + e.getClass().getName());
            } else {
                statsClient.increment("stencil.unhandled.errors,exception=" + e.getClass().getName());
            }
        }
    }

    private List<Field> getDuplicateFields(List<Field> fields1, List<Field> fields2) {
        return fields1.stream().filter(field -> containsField(fields2, field.getName())).collect(Collectors.toList());
    }

    private boolean containsField(List<Field> fields, String fieldName) {
        return fields.stream().anyMatch(field -> field.getName().equals(fieldName));
    }
}