gojekfarm/beast

View on GitHub
src/main/java/com/gojek/beast/converter/RowMapper.java

Summary

Maintainability
A
1 hr
Test Coverage
package com.gojek.beast.converter;

import com.gojek.beast.config.ColumnMapping;
import com.gojek.beast.config.Constants.Config;
import com.gojek.beast.converter.fields.NestedField;
import com.gojek.beast.converter.fields.ProtoField;
import com.gojek.beast.exception.UnknownProtoFieldFoundException;
import com.gojek.beast.models.ConfigurationException;
import com.gojek.beast.protomapping.UnknownProtoFields;
import com.gojek.beast.stats.Stats;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
@AllArgsConstructor
public class RowMapper {

    private final ColumnMapping mapping;
    private final boolean failOnUnknownFields;
    private final Stats statsClient = Stats.client();

    public RowMapper(ColumnMapping mappings) {
        this(mappings, false);
    }

    public Map<String, Object> map(DynamicMessage message) {
        if (mapping == null) {
            throw new ConfigurationException("BQ_PROTO_COLUMN_MAPPING is not configured");
        }
        return getMappings(message, mapping);
    }

    private Map<String, Object> getMappings(DynamicMessage message, ColumnMapping columnMapping) {
        if (message == null || columnMapping == null || columnMapping.isEmpty()) {
            return new HashMap<>();
        }
        if (message.getUnknownFields().asMap().size() > 0) {
            statsClient.count("kafka.error.records.count,type=unknownfields," + statsClient.getBqTags(), 1);
            String serializedUnknownFields = message.getUnknownFields().asMap().keySet().toString();
            String serializedMessage = UnknownProtoFields.toString(message.toByteArray());
            log.warn(String.format("[%s] unknown fields found in proto [%s], either update mapped protobuf or disable FAIL_ON_UNKNOWN_FIELDS",
                    serializedUnknownFields, serializedMessage));
            if (failOnUnknownFields)
                throw new UnknownProtoFieldFoundException(serializedUnknownFields, serializedMessage);
        }
        Descriptors.Descriptor descriptorForType = message.getDescriptorForType();

        Map<String, Object> row = new HashMap<>(columnMapping.size());
        columnMapping.forEach((key, value) -> {
            String columnName = value.toString();
            String column = key.toString();
            if (column.equals(Config.RECORD_NAME)) {
                return;
            }
            Integer protoIndex = Integer.valueOf(column);
            Descriptors.FieldDescriptor fieldDesc = descriptorForType.findFieldByNumber(protoIndex);
            if (fieldDesc != null && !message.getField(fieldDesc).toString().isEmpty()) {
                Object field = message.getField(fieldDesc);
                ProtoField protoField = FieldFactory.getField(fieldDesc, field);
                Object fieldValue = protoField.getValue();

                if (fieldValue instanceof List) {
                    addRepeatedFields(row, (String) key, value, (List<Object>) fieldValue);
                    return;
                }

                if (protoField.getClass().getName().equals(NestedField.class.getName())) {
                    try {
                        columnName = getNestedColumnName((ColumnMapping) value);
                        fieldValue = getMappings((DynamicMessage) field, (ColumnMapping) value);
                    } catch (Exception e) {
                        log.error("Exception::Handling nested field failure: {}", e.getMessage());
                        throw e;
                    }
                }
                row.put(columnName, fieldValue);
            }
        });
        return row;
    }

    private String getNestedColumnName(ColumnMapping value) {
        return value.get(Config.RECORD_NAME).toString();
    }

    private void addRepeatedFields(Map<String, Object> row, String key, Object value, List<Object> fieldValue) {
        if (fieldValue.isEmpty()) {
            return;
        }
        List<Object> repeatedNestedFields = new ArrayList<>();
        String columnName = null;
        for (Object f : fieldValue) {
            if (f instanceof DynamicMessage) {
                ColumnMapping nestedMappings = (ColumnMapping) value;
                repeatedNestedFields.add(getMappings((DynamicMessage) f, nestedMappings));
                columnName = getNestedColumnName(nestedMappings);
            } else {
                repeatedNestedFields.add(f);
                columnName = (String) value;
            }
        }
        row.put(columnName, repeatedNestedFields);
    }
}