treasure-data/embulk-input-marketo

View on GitHub
src/main/java/org/embulk/input/marketo/MarketoUtils.java

Summary

Maintainability
B
6 hrs
Test Coverage
package org.embulk.input.marketo;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Function;
import com.google.common.collect.Sets;
import org.embulk.base.restclient.ServiceResponseMapper;
import org.embulk.base.restclient.jackson.JacksonServiceRecord;
import org.embulk.base.restclient.jackson.JacksonServiceResponseMapper;
import org.embulk.base.restclient.jackson.JacksonTopLevelValueLocator;
import org.embulk.base.restclient.record.ServiceRecord;
import org.embulk.base.restclient.record.ValueLocator;
import org.embulk.config.TaskReport;
import org.embulk.input.marketo.model.MarketoField;
import org.embulk.spi.Column;
import org.embulk.spi.ColumnVisitor;
import org.embulk.spi.PageBuilder;
import org.embulk.spi.Schema;
import org.embulk.util.json.JsonParser;
import org.embulk.util.retryhelper.RetryExecutor;
import org.embulk.util.retryhelper.RetryGiveupException;
import org.embulk.util.retryhelper.Retryable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.time.Instant;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;

import static org.embulk.input.marketo.MarketoInputPlugin.CONFIG_MAPPER_FACTORY;

/**
 * Created by tai.khuu on 9/18/17.
 */
public class MarketoUtils
{
    public static final String MARKETO_DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S%z";
    public static final String MARKETO_DATE_FORMAT = "%Y-%m-%d";
    public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    public static final Function<ObjectNode, ServiceRecord> TRANSFORM_OBJECT_TO_JACKSON_SERVICE_RECORD_FUNCTION = new Function<ObjectNode, ServiceRecord>()
    {
        @Nullable
        @Override
        public JacksonServiceRecord apply(@Nullable ObjectNode input)
        {
            return new JacksonServiceRecord(input);
        }
    };

    public static final String MARKETO_DATE_SIMPLE_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";

    public static final String LIST_ID_COLUMN_NAME = "listId";

    public static final String PROGRAM_ID_COLUMN_NAME = "programId";

    private MarketoUtils()
    {
    }

    public static ServiceResponseMapper<? extends ValueLocator> buildDynamicResponseMapper(String prefix, List<MarketoField> columns)
    {
        JacksonServiceResponseMapper.Builder builder = JacksonServiceResponseMapper.builder();
        for (MarketoField column : columns) {
            String columName = buildColumnName(prefix, column.getName());
            MarketoField.MarketoDataType marketoDataType = column.getMarketoDataType();
            if (marketoDataType.getFormat().isPresent()) {
                builder.add(new JacksonTopLevelValueLocator(column.getName()), columName, marketoDataType.getType(), marketoDataType.getFormat().get());
            }
            else {
                builder.add(new JacksonTopLevelValueLocator(column.getName()), columName, marketoDataType.getType());
            }
        }
        return builder.build();
    }

    public static List<String> getFieldNameFromMarketoFields(List<MarketoField> columns, String... excludedFields)
    {
        Set<String> excludeFields = Sets.newHashSet(excludedFields);
        List<String> extractedFields = new ArrayList<>();
        for (MarketoField column : columns) {
            if (excludeFields.contains(column.getName())) {
                continue;
            }
            extractedFields.add(column.getName());
        }
        return extractedFields;
    }

    public static String buildColumnName(String prefix, String columnName)
    {
        return prefix + "_" + columnName;
    }

    public static List<DateRange> sliceRange(OffsetDateTime fromDate, OffsetDateTime toDate, int rangeSize)
    {
        List<DateRange> ranges = new ArrayList<>();
        while (fromDate.isBefore(toDate)) {
            OffsetDateTime nextToDate = fromDate.plusDays(rangeSize);
            if (nextToDate.isAfter(toDate)) {
                ranges.add(new DateRange(fromDate, toDate));
                break;
            }
            ranges.add(new DateRange(fromDate, nextToDate));
            fromDate = nextToDate.plusSeconds(1);
        }
        return ranges;
    }

    public static String getIdentityEndPoint(String accountId)
    {
        return "https://" + accountId.trim() + ".mktorest.com/identity";
    }

    public static String getEndPoint(String accountID)
    {
        return "https://" + accountID.trim() + ".mktorest.com";
    }

    public static  final class DateRange
    {
        public final OffsetDateTime fromDate;
        public final OffsetDateTime toDate;

        public DateRange(OffsetDateTime fromDate, OffsetDateTime toDate)
        {
            this.fromDate = fromDate;
            this.toDate = toDate;
        }

        @Override
        public String toString()
        {
            return "DateRange{" +
                    "fromDate=" + fromDate +
                    ", toDate=" + toDate +
                    '}';
        }
    }

    public static <T> T executeWithRetry(int maximumRetries, int initialRetryIntervalMillis, int maximumRetryIntervalMillis, AlwaysRetryRetryable<T> alwaysRetryRetryable) throws InterruptedException, RetryGiveupException
    {
        return RetryExecutor.builder()
                .withRetryLimit(maximumRetries)
                .withInitialRetryWaitMillis(initialRetryIntervalMillis)
                .withMaxRetryWaitMillis(maximumRetryIntervalMillis)
                .build()
                .runInterruptible(alwaysRetryRetryable);
    }

    public abstract static class AlwaysRetryRetryable<T> implements Retryable<T>
    {
        private static final Logger LOGGER = LoggerFactory.getLogger(AlwaysRetryRetryable.class);

        @Override
        public abstract T call() throws Exception;

        @Override
        public boolean isRetryableException(Exception exception)
        {
            return true;
        }

        @Override
        public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait) throws RetryGiveupException
        {
            LOGGER.info("Retry [{}]/[{}] with retryWait [{}] on exception {}", retryCount, retryLimit, retryWait, exception.getMessage());
        }

        @Override
        public void onGiveup(Exception firstException, Exception lastException) throws RetryGiveupException
        {
            LOGGER.info("Giving up execution on exception", lastException);
        }
    }
    public static <T, R> Iterable<R> flatMap(final Iterable<T> iterable, final Function<T, Iterable<R>> function)
    {
        final Iterator<T> iterator = iterable.iterator();
        return new Iterable<R>()
        {
            @Override
            public Iterator<R> iterator()
            {
                return new Iterator<R>()
                {
                    Iterator<R> currentIterator;
                    @Override
                    public boolean hasNext()
                    {
                        if (currentIterator != null && currentIterator.hasNext()) {
                            return true;
                        }
                        while (iterator.hasNext()) {
                            currentIterator = function.apply(iterator.next()).iterator();
                            if (currentIterator.hasNext()) {
                                return true;
                            }
                        }
                        return false;
                    }
                    @Override
                    public R next()
                    {
                        if (hasNext()) {
                            return currentIterator.next();
                        }
                        throw new NoSuchElementException();
                    }
                    @Override
                    public void remove()
                    {
                        throw new UnsupportedOperationException();
                    }
                };
            }
        };
    }

    public static TaskReport importMockPreviewData(final PageBuilder pageBuilder, int numberRecords)
    {
        final JsonParser jsonParser = new JsonParser();
        Schema schema = pageBuilder.getSchema();
        for (int i = 1; i <= numberRecords; i++) {
            final int rowNum = i;
            schema.visitColumns(new ColumnVisitor()
            {
                @Override
                public void booleanColumn(Column column)
                {
                    pageBuilder.setBoolean(column, false);
                }

                @Override
                public void longColumn(Column column)
                {
                    pageBuilder.setLong(column, 12345L);
                }

                @Override
                public void doubleColumn(Column column)
                {
                    pageBuilder.setDouble(column, 12345.123);
                }

                @Override
                public void stringColumn(Column column)
                {
                    pageBuilder.setString(column, column.getName() + "_" + rowNum);
                }

                @Override
                public void timestampColumn(Column column)
                {
                    pageBuilder.setTimestamp(column, Instant.ofEpochMilli(System.currentTimeMillis()));
                }

                @Override
                public void jsonColumn(Column column)
                {
                    pageBuilder.setJson(column, jsonParser.parse("{\"mockKey\":\"mockValue\"}"));
                }
            });
            pageBuilder.addRecord();
        }
        return CONFIG_MAPPER_FACTORY.newTaskReport();
    }
}