treasure-data/embulk-input-jira

View on GitHub
src/main/java/org/embulk/input/jira/JiraInputPlugin.java

Summary

Maintainability
A
1 hr
Test Coverage
package org.embulk.input.jira;

import com.google.common.annotations.VisibleForTesting;
import com.google.gson.JsonElement;
import com.google.gson.JsonNull;
import com.google.gson.JsonObject;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigException;
import org.embulk.config.ConfigSource;
import org.embulk.config.TaskReport;
import org.embulk.config.TaskSource;
import org.embulk.input.jira.client.JiraClient;
import org.embulk.input.jira.util.JiraUtil;
import org.embulk.spi.Exec;
import org.embulk.spi.InputPlugin;
import org.embulk.spi.PageBuilder;
import org.embulk.spi.PageOutput;
import org.embulk.spi.Schema;
import org.embulk.util.config.Config;
import org.embulk.util.config.ConfigDefault;
import org.embulk.util.config.ConfigMapper;
import org.embulk.util.config.ConfigMapperFactory;
import org.embulk.util.config.Task;
import org.embulk.util.config.TaskMapper;
import org.embulk.util.config.units.ColumnConfig;
import org.embulk.util.config.units.SchemaConfig;
import org.embulk.util.guess.SchemaGuess;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;

import static org.embulk.input.jira.Constant.GUESS_RECORDS_COUNT;
import static org.embulk.input.jira.Constant.MAX_RESULTS;
import static org.embulk.input.jira.Constant.PREVIEW_RECORDS_COUNT;

public class JiraInputPlugin
        implements InputPlugin
{
    private static final Logger LOGGER = LoggerFactory.getLogger(JiraInputPlugin.class);
    @VisibleForTesting
    public static final ConfigMapperFactory CONFIG_MAPPER_FACTORY = ConfigMapperFactory
            .builder()
            .addDefaultModules()
            .build();
    @VisibleForTesting
    public static final ConfigMapper CONFIG_MAPPER = CONFIG_MAPPER_FACTORY.createConfigMapper();
    private static final TaskMapper TASK_MAPPER = CONFIG_MAPPER_FACTORY.createTaskMapper();

    public interface PluginTask
            extends Task
    {
        @Config("username")
        public String getUsername();

        @Config("password")
        public String getPassword();

        @Config("uri")
        public String getUri();

        @Config("initial_retry_interval_millis")
        @ConfigDefault("1000")
        int getInitialRetryIntervalMillis();

        @Config("maximum_retry_interval_millis")
        @ConfigDefault("120000")
        int getMaximumRetryIntervalMillis();

        @Config("timeout_millis")
        @ConfigDefault("300000")
        int getTimeoutMillis();

        @Config("retry_limit")
        @ConfigDefault("5")
        public int getRetryLimit();

        @Config("jql")
        @ConfigDefault("null")
        public Optional<String> getJQL();

        @Config("dynamic_schema")
        @ConfigDefault("false")
        public boolean getDynamicSchema();

        @Config("columns")
        public SchemaConfig getColumns();

        // For future support of other authentication methods
        @Config("auth_method")
        @ConfigDefault("\"basic\"")
        public AuthenticateMethod getAuthMethod();
    }

    @Override
    public ConfigDiff transaction(final ConfigSource config,
            final InputPlugin.Control control)
    {
        final PluginTask task = CONFIG_MAPPER.map(config, PluginTask.class);
        SchemaConfig schemaConfig = task.getColumns();
        if (task.getDynamicSchema()) {
            final JiraClient jiraClient = getJiraClient();
            final List<ColumnConfig> columns = new ArrayList<>();
            try {
                final List<ConfigDiff> guessedColumns = getGuessedColumns(jiraClient, task);
                for (final ConfigDiff guessedColumn : guessedColumns) {
                    columns.add(new ColumnConfig(CONFIG_MAPPER_FACTORY.newConfigSource().merge(guessedColumn)));
                }
            }
            catch (final ConfigException e) {
                if (!e.getMessage().equals("Could not guess schema due to empty data set")) {
                    throw e;
                }
            }
            schemaConfig = new SchemaConfig(columns);
        }
        final Schema schema = schemaConfig.toSchema();
        final int taskCount = 1;

        return resume(task.toTaskSource(), schema, taskCount, control);
    }

    @Override
    public ConfigDiff resume(final TaskSource taskSource,
            final Schema schema, final int taskCount,
            final InputPlugin.Control control)
    {
        control.run(taskSource, schema, taskCount);
        return CONFIG_MAPPER_FACTORY.newConfigDiff();
    }

    @Override
    public TaskReport run(final TaskSource taskSource,
            final Schema schema, final int taskIndex,
            final PageOutput output)
    {
        final PluginTask task = TASK_MAPPER.map(taskSource, PluginTask.class);
        JiraUtil.validateTaskConfig(task);
        final JiraClient jiraClient = getJiraClient();
        jiraClient.checkUserCredentials(task);
        try (final PageBuilder pageBuilder = getPageBuilder(schema, output)) {
            if (isPreview()) {
                final List<Issue> issues = jiraClient.searchIssues(task, 0, PREVIEW_RECORDS_COUNT);
                issues.forEach(issue -> JiraUtil.addRecord(issue, schema, task, pageBuilder));
            }
            else {
                int currentPage = 0;
                final int totalCount = jiraClient.getTotalCount(task);
                final int totalPage = JiraUtil.calculateTotalPage(totalCount, MAX_RESULTS);
                LOGGER.info(String.format("Total pages (%d)", totalPage));
                while (currentPage < totalPage) {
                    LOGGER.info(String.format("Fetching page %d/%d", (currentPage + 1), totalPage));
                    final List<Issue> issues = jiraClient.searchIssues(task, (currentPage * MAX_RESULTS), MAX_RESULTS);
                    issues.forEach(issue -> JiraUtil.addRecord(issue, schema, task, pageBuilder));
                    currentPage++;
                }
            }
            pageBuilder.finish();
        }
        return CONFIG_MAPPER_FACTORY.newTaskReport();
    }

    @Override
    public ConfigDiff guess(final ConfigSource config)
    {
        // Reset columns in case already have or missing on configuration
        config.set("columns", new ArrayList<>());
        final PluginTask task = CONFIG_MAPPER.map(config, PluginTask.class);
        JiraUtil.validateTaskConfig(task);
        final JiraClient jiraClient = getJiraClient();
        jiraClient.checkUserCredentials(task);
        return CONFIG_MAPPER_FACTORY.newConfigDiff().set("columns", getGuessedColumns(jiraClient, task));
    }

    private List<ConfigDiff> getGuessedColumns(final JiraClient jiraClient, final PluginTask task)
    {
        final List<Issue> issues = jiraClient.searchIssues(task, 0, GUESS_RECORDS_COUNT);
        if (issues.isEmpty()) {
            throw new ConfigException("Could not guess schema due to empty data set");
        }
        final List<ConfigDiff> columns = SchemaGuess.of(CONFIG_MAPPER_FACTORY).fromLinkedHashMapRecords(createGuessSample(issues, getUniqueAttributes(issues)));
        columns.forEach(conf -> conf.remove("index"));
        return columns;
    }

    private SortedSet<String> getUniqueAttributes(final List<Issue> issues)
    {
        final SortedSet<String> uniqueAttributes = new TreeSet<>();
        for (final Issue issue : issues) {
            for (final Entry<String, JsonElement> entry : issue.getFlatten().entrySet()) {
                uniqueAttributes.add(entry.getKey());
            }
        }
        return uniqueAttributes;
    }

    private List<LinkedHashMap<String, Object>> createGuessSample(final List<Issue> issues, final Set<String> uniqueAttributes)
    {
        final List<LinkedHashMap<String, Object>> samples = new ArrayList<>();
        for (final Issue issue : issues) {
            final JsonObject flatten = issue.getFlatten();
            final JsonObject unified = new JsonObject();
            for (final String key : uniqueAttributes) {
                JsonElement value = flatten.get(key);
                if (value == null) {
                    value = JsonNull.INSTANCE;
                }
                unified.add(key, value);
            }
            samples.add(JiraUtil.toLinkedHashMap(unified));
        }
        return samples;
    }

    @VisibleForTesting
    public PageBuilder getPageBuilder(final Schema schema, final PageOutput output)
    {
        return Exec.getPageBuilder(Exec.getBufferAllocator(), schema, output);
    }

    @VisibleForTesting
    public boolean isPreview()
    {
        return Exec.isPreview();
    }

    @VisibleForTesting
    public JiraClient getJiraClient()
    {
        return new JiraClient();
    }
    @Override
    public void cleanup(final TaskSource taskSource, final Schema schema, final int taskCount, final List<TaskReport> successTaskReports)
    {}
}