treasure-data/embulk-input-zendesk

View on GitHub
src/main/java/org/embulk/input/zendesk/services/ZendeskChatService.java

Summary

Maintainability
C
7 hrs
Test Coverage
package org.embulk.input.zendesk.services;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import org.embulk.config.TaskReport;
import org.embulk.input.zendesk.RecordImporter;
import org.embulk.input.zendesk.ZendeskInputPlugin;
import org.embulk.input.zendesk.clients.ZendeskRestClient;

import org.embulk.input.zendesk.utils.ZendeskConstants;
import org.embulk.input.zendesk.utils.ZendeskDateUtils;
import org.embulk.input.zendesk.utils.ZendeskUtils;
import org.embulk.spi.Exec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.IntStream;

import static org.embulk.input.zendesk.ZendeskInputPlugin.CONFIG_MAPPER_FACTORY;

public class ZendeskChatService implements ZendeskService
{
    private static final Logger logger = LoggerFactory.getLogger(ZendeskChatService.class);

    private ZendeskRestClient zendeskRestClient;

    protected ZendeskInputPlugin.PluginTask task;

    private static final int MAXIMUM_TOTAL_RECORDS = 10000;

    private static final int MAXIMUM_TOTAL_PAGES = 250;

    private static final int MAXIMUM_RECORDS_PER_PAGE = 40;

    public ZendeskChatService(final ZendeskInputPlugin.PluginTask task)
    {
        this.task = task;
    }

    @Override
    public boolean isSupportIncremental()
    {
        return true;
    }

    @Override
    public TaskReport addRecordToImporter(final int taskIndex, final RecordImporter recordImporter)
    {
        final TaskReport taskReport = CONFIG_MAPPER_FACTORY.newTaskReport();
        String startTime = getStartTime();
        String endTime = getEndTime();

        if (Exec.isPreview()) {
            fetchData(startTime, endTime, 1, recordImporter);
            return taskReport;
        }

        while (true) {
            String searchURI = buildSearchRequest(startTime, endTime, 1);

            String response = getZendeskRestClient().doGet(searchURI, task, Exec.isPreview());

            JsonNode json = ZendeskUtils.parseJsonObject(response);

            if (!ZendeskUtils.isNull(json)) {
                int totalRecords = json.get("count").asInt();
                int totalPages = totalRecords > MAXIMUM_TOTAL_RECORDS
                    ? MAXIMUM_TOTAL_PAGES
                    : slicePages(totalRecords);

                logger.info(String.format("Fetching from '%s' to '%s' with '%d' pages and '%d' records",
                    startTime, endTime, totalPages,  totalRecords));

                final String lastEndTime = endTime;

                // Can't start from 0 because page 0 and 1 return the same data, and according to document, page start from 1
                IntStream.range(1, totalPages + 1).parallel().forEach(page -> fetchData(startTime, lastEndTime, page, recordImporter));

                if (totalRecords <= MAXIMUM_TOTAL_RECORDS || Exec.isPreview()) {
                    break;
                }

                endTime = getNextEndDate(startTime, endTime);
            }
        }

        storeStartTimeForConfigDiff(taskReport,
            startTime.equals("0") ? 0 : ZendeskDateUtils.isoToEpochSecond(startTime),
            ZendeskDateUtils.isoToEpochSecond(endTime));
        return taskReport;
    }

    @Override
    public JsonNode getDataFromPath(final String path, final int page, final boolean isPreview, final long initTime)
    {
        String startTime = getStartTime();
        String endTime = getEndTime();

        List<String> ids = getListIDS(startTime, endTime, 1, true);

        if (ids.size() > 0) {
            String fetchIdsURI = buildSearchRequest(ids);

            String response = zendeskRestClient.doGet(fetchIdsURI, task, true);

            return new ObjectMapper().createObjectNode().set(task.getTarget().getJsonName(), parseData(response));
        }

        return new ObjectMapper().createObjectNode().set(task.getTarget().getJsonName(), new ObjectMapper().createArrayNode());
    }

    protected void fetchData(final String startTime, final String endTime, final int page, final RecordImporter recordImporter)
    {
        List<String> ids = getListIDS(startTime, endTime, page, Exec.isPreview());

        if (ids.size() > 0) {
            String fetchIdsURI = buildSearchRequest(ids);

            String response = getZendeskRestClient().doGet(fetchIdsURI, task, false);

            JsonNode data = ZendeskUtils.parseJsonObject(response);
            if (data.get("count").asInt() > 0) {
                data.get("docs").forEach(item -> {
                    if (item != null) {
                        recordImporter.addRecord(item);
                    }
                });
            }
        }
    }

    @VisibleForTesting
    protected ZendeskRestClient getZendeskRestClient()
    {
        if (zendeskRestClient == null) {
            zendeskRestClient = new ZendeskRestClient();
        }
        return zendeskRestClient;
    }

    private List<String> getListIDS(final String startTime, final String endTime, final int page, final boolean isPreview)
    {
        String searchURI = buildSearchRequest(startTime, endTime, page);

        String response = getZendeskRestClient().doGet(searchURI, task, isPreview);
        JsonNode json = ZendeskUtils.parseJsonObject(response);
        Iterator<JsonNode> data = ZendeskUtils.getListRecords(json, "results");

        List<String> ids = new ArrayList<>();
        data.forEachRemaining(record -> ids.add(record.get("id").asText()));
        return ids;
    }

    private String buildSearchRequest(final String startTime, final String endTime, final int page)
    {
        return ZendeskUtils.getURIBuilder(task.getLoginUrl())
            .setPath(ZendeskConstants.Url.API_CHAT_SEARCH)
            .setParameter("q", buildSearchParam(startTime, endTime))
            .setParameter("page", String.valueOf(page)).toString();
    }

    private String buildSearchParam(final String startTime, final String endTime)
    {
        return new StringBuilder()
            .append("timestamp:[")
            .append(
                startTime.equals("0")
                ? "*"
                : ZendeskDateUtils.convertToDateTimeFormat(startTime, ZendeskConstants.Misc.ISO_INSTANT))
            .append(" TO ")
            .append(ZendeskDateUtils.convertToDateTimeFormat(endTime, ZendeskConstants.Misc.ISO_INSTANT))
            .append("]")
            .toString();
    }

    private String buildSearchRequest(final List<String> ids)
    {
        return ZendeskUtils.getURIBuilder(task.getLoginUrl())
            .setPath(ZendeskConstants.Url.API_CHAT)
            .setParameter("ids", buildIdsParam(ids))
            .toString();
    }

    private String buildIdsParam(final List<String> ids)
    {
        return Joiner.on(",").join(ids);
    }

    private ArrayNode parseData(String response)
    {
        ArrayNode arrayNode = new ObjectMapper().createArrayNode();
        JsonNode data = ZendeskUtils.parseJsonObject(response);
        if (data.get("count").asInt() > 0) {
            data.get("docs").forEach(item -> {
                if (item != null) {
                    arrayNode.add(item);
                }
            });
        }
        return arrayNode;
    }

    private int slicePages(final int totalRecords)
    {
        if (totalRecords % MAXIMUM_RECORDS_PER_PAGE == 0) {
            return totalRecords / MAXIMUM_RECORDS_PER_PAGE;
        }
        return totalRecords / MAXIMUM_RECORDS_PER_PAGE + 1;
    }

    private String getNextEndDate(final String startTime, final String endTime)
    {
        String searchURI = buildSearchRequest(startTime, endTime, MAXIMUM_TOTAL_PAGES);
        String response = getZendeskRestClient().doGet(searchURI, task, false);
        JsonNode json = ZendeskUtils.parseJsonObject(response);

        return ZendeskDateUtils.convertToDateTimeFormat(
                    json.get("results")
                    .get(MAXIMUM_RECORDS_PER_PAGE - 1)
                    .get("timestamp").asText(), ZendeskConstants.Misc.ISO_INSTANT);
    }

    private void storeStartTimeForConfigDiff(final TaskReport taskReport, final long initStartTime, final long resultEndTime)
    {
        if (task.getIncremental()) {
            long nextStartTime;
            long now = Instant.now().getEpochSecond();
            // no record to add
            if (resultEndTime == 0) {
                nextStartTime = now;
            }
            else {
                if (task.getEndTime().isPresent()) {
                    long endTime = ZendeskDateUtils.isoToEpochSecond(task.getEndTime().get());
                    nextStartTime = endTime + 1;
                }
                else {
                    // NOTE: start_time compared as "=>", not ">".
                    // If we will use end_time for next start_time, we got the same records that are fetched
                    // end_time + 1 is workaround for that
                    nextStartTime = resultEndTime + 1;
                }
            }

            if (task.getEndTime().isPresent()) {
                long endTime = ZendeskDateUtils.isoToEpochSecond(task.getEndTime().get());
                taskReport.set(ZendeskConstants.Field.END_TIME, nextStartTime + endTime - initStartTime);
            }

            taskReport.set(ZendeskConstants.Field.START_TIME, nextStartTime);
        }
    }

    private String getStartTime()
    {
        return task.getStartTime().isPresent()
            ? task.getStartTime().get()
            : "0";
    }

    private String getEndTime()
    {
        return task.getEndTime().isPresent()
            ? task.getEndTime().get()
            : OffsetDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_INSTANT);
    }
}