treasure-data/embulk-input-zendesk

View on GitHub
src/main/java/org/embulk/input/zendesk/services/ZendeskCursorBasedService.java

Summary

Maintainability
C
7 hrs
Test Coverage
package org.embulk.input.zendesk.services;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import org.apache.http.HttpStatus;
import org.apache.http.client.utils.URIBuilder;
import org.embulk.config.ConfigException;
import org.embulk.config.TaskReport;
import org.embulk.input.zendesk.RecordImporter;
import org.embulk.input.zendesk.ZendeskInputPlugin;
import org.embulk.input.zendesk.clients.ZendeskRestClient;
import org.embulk.input.zendesk.models.ZendeskException;
import org.embulk.input.zendesk.utils.ZendeskConstants;
import org.embulk.input.zendesk.utils.ZendeskDateUtils;
import org.embulk.input.zendesk.utils.ZendeskUtils;
import org.embulk.spi.DataException;
import org.embulk.spi.Exec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URISyntaxException;
import java.util.Iterator;

import static org.embulk.input.zendesk.ZendeskInputPlugin.CONFIG_MAPPER_FACTORY;

public class ZendeskCursorBasedService
    implements ZendeskService
{
    private static final Logger logger = LoggerFactory.getLogger(ZendeskNormalServices.class);

    protected ZendeskInputPlugin.PluginTask task;

    private ZendeskRestClient zendeskRestClient;

    public ZendeskCursorBasedService(final ZendeskInputPlugin.PluginTask task)
    {
        this.task = task;
    }

    @Override
    public boolean isSupportIncremental()
    {
        return true;
    }

    @Override
    public TaskReport addRecordToImporter(int taskIndex, RecordImporter recordImporter)
    {
        TaskReport taskReport = CONFIG_MAPPER_FACTORY.newTaskReport();
        importData(task, recordImporter, taskReport);

        return taskReport;
    }

    @Override
    public JsonNode getDataFromPath(String path, int page, boolean isPreview, long startTime)
    {
        try {
            String buildPath = buildPath(0);
            final String response = getZendeskRestClient().doGet(buildPath, task, Exec.isPreview());
            return ZendeskUtils.parseJsonObject(response);
        }
        catch (URISyntaxException e) {
            throw new ConfigException(e);
        }
    }

    @VisibleForTesting
    protected ZendeskRestClient getZendeskRestClient()
    {
        if (zendeskRestClient == null) {
            zendeskRestClient = new ZendeskRestClient();
        }
        return zendeskRestClient;
    }

    private void importData(final ZendeskInputPlugin.PluginTask task, final RecordImporter recordImporter, final TaskReport taskReport)
    {
        long initStartTime = 0;

        if (task.getStartTime().isPresent()) {
            initStartTime = ZendeskDateUtils.getStartTime(task.getStartTime().get());
        }

        long nextStartTime = initStartTime;
        long totalRecords = 0;
        try {
            String path = buildPath(initStartTime);

            while (true) {
                final JsonNode result = fetchResultFromPath(path);

                final Iterator<JsonNode> iterator = ZendeskUtils.getListRecords(result, task.getTarget().getJsonName());

                int numberOfRecords = 0;

                while (iterator.hasNext()) {
                    final JsonNode recordJsonNode = iterator.next();
                    fetchSubResourceAndAddToImporter(recordJsonNode, task, recordImporter);
                    numberOfRecords++;
                    // Store nextStartTime of last item
                    if (!iterator.hasNext() && task.getIncremental()) {
                        nextStartTime = ZendeskDateUtils.isoToEpochSecond(recordJsonNode.get(ZendeskConstants.Field.UPDATED_AT).asText());
                    }
                }

                totalRecords = totalRecords + numberOfRecords;
                if (result.has(ZendeskConstants.Field.END_OF_STREAM)) {
                    if (result.get(ZendeskConstants.Field.END_OF_STREAM).asBoolean()) {
                        break;
                    }
                }
                else {
                    throw new DataException("Missing end of stream, please double-check the endpoint");
                }
                if (Exec.isPreview()) {
                    break;
                }

                path = result.get(ZendeskConstants.Field.AFTER_URL).asText();
            }

            logger.info("import records total " + totalRecords);

            if (!Exec.isPreview() && task.getIncremental()) {
                storeStartTimeForConfigDiff(taskReport, nextStartTime);
            }
        }
        catch (Exception e) {
            throw new DataException(e);
        }
    }

    private String buildPath(long startTime)
        throws URISyntaxException
    {
        return ZendeskUtils.getURIBuilder(task.getLoginUrl()).setPath(ZendeskConstants.Url.API + "/" + "incremental" + "/" + task.getTarget().toString() + "/" + "cursor.json").build().toString() + "?start_time=" + startTime;
    }

    private JsonNode fetchResultFromPath(String path)
    {
        final String response = getZendeskRestClient().doGet(path, task, Exec.isPreview());
        return ZendeskUtils.parseJsonObject(response);
    }

    private void fetchSubResourceAndAddToImporter(final JsonNode jsonNode, final ZendeskInputPlugin.PluginTask task, final RecordImporter recordImporter)
    {
        task.getIncludes().forEach(include -> {
            final String relatedObjectName = include.trim();

            final URIBuilder uriBuilder = ZendeskUtils.getURIBuilder(task.getLoginUrl()).setPath(ZendeskConstants.Url.API + "/" + task.getTarget().toString() + "/" + jsonNode.get(ZendeskConstants.Field.ID).asText() + "/" + relatedObjectName + ".json");
            try {
                final JsonNode result = getDataFromPath(uriBuilder.toString(), 0, false, 0);
                if (result != null && result.has(relatedObjectName)) {
                    ((ObjectNode) jsonNode).set(include, result.get(relatedObjectName));
                }
            }
            catch (final ConfigException e) {
                // Sometimes we get 404 when having invalid endpoint, so ignore when we get 404 InvalidEndpoint
                if (!(e.getCause() instanceof ZendeskException && ((ZendeskException) e.getCause()).getStatusCode() == HttpStatus.SC_NOT_FOUND)) {
                    throw e;
                }
            }
        });

        recordImporter.addRecord(jsonNode);
    }

    private void storeStartTimeForConfigDiff(final TaskReport taskReport, final long nextStartTime)
    {
        taskReport.set(ZendeskConstants.Field.START_TIME, nextStartTime);
    }
}