src/main/java/org/embulk/input/zendesk/services/ZendeskUserEventService.java
package org.embulk.input.zendesk.services;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import org.apache.http.client.utils.URIBuilder;
import org.embulk.config.TaskReport;
import org.embulk.input.zendesk.RecordImporter;
import org.embulk.input.zendesk.ZendeskInputPlugin;
import org.embulk.input.zendesk.clients.ZendeskRestClient;
import org.embulk.input.zendesk.models.Target;
import org.embulk.input.zendesk.stream.paginator.sunshine.UserEventSpliterator;
import org.embulk.input.zendesk.stream.paginator.support.OrganizationSpliterator;
import org.embulk.input.zendesk.stream.paginator.support.UserSpliterator;
import org.embulk.input.zendesk.utils.ZendeskConstants;
import org.embulk.input.zendesk.utils.ZendeskDateUtils;
import org.embulk.input.zendesk.utils.ZendeskUtils;
import org.embulk.spi.DataException;
import org.embulk.spi.Exec;
import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static org.embulk.input.zendesk.ZendeskInputPlugin.CONFIG_MAPPER_FACTORY;
public class ZendeskUserEventService implements ZendeskService
{
protected ZendeskInputPlugin.PluginTask task;
private ZendeskRestClient zendeskRestClient;
public ZendeskUserEventService(final ZendeskInputPlugin.PluginTask task)
{
this.task = task;
}
public boolean isSupportIncremental()
{
return false;
}
@Override
public TaskReport addRecordToImporter(final int taskIndex, final RecordImporter recordImporter)
{
final TaskReport taskReport = CONFIG_MAPPER_FACTORY.newTaskReport();
if (Exec.isPreview()) {
JsonNode jsonNode = mockJsonNode();
recordImporter.addRecord(jsonNode.get(0));
return taskReport;
}
final List<JsonNode> organizations = StreamSupport.stream(new OrganizationSpliterator(buildOrganizationURI(), getZendeskRestClient(), task), false)
.collect(Collectors.toList());
final Set<String> knownUserIds = ConcurrentHashMap.newKeySet();
organizations.parallelStream().forEach(
organization -> {
Stream<JsonNode> stream = StreamSupport.stream(new UserSpliterator(buildOrganizationWithUserURI(organization.get("url").asText()),
getZendeskRestClient(), task, Exec.isPreview()), true);
if (task.getDedup()) {
stream = stream.filter(item -> knownUserIds.add(item.get("id").asText()));
}
stream.forEach(s ->
{
Stream<JsonNode> userEventStream = StreamSupport.stream(new UserEventSpliterator(s.get("id").asText(), buildUserEventURI(s.get("id").asText()),
getZendeskRestClient(), task, Exec.isPreview()), false);
userEventStream.forEach(recordImporter::addRecord);
});
}
);
return taskReport;
}
@Override
public JsonNode getDataFromPath(final String path, final int page, final boolean isPreview, final long startTime)
{
return new ObjectMapper().createObjectNode().set(task.getTarget().getJsonName(), mockJsonNode());
}
@VisibleForTesting
protected ZendeskRestClient getZendeskRestClient()
{
if (zendeskRestClient == null) {
zendeskRestClient = new ZendeskRestClient();
}
return zendeskRestClient;
}
private String buildOrganizationURI()
{
return ZendeskUtils.getURIBuilder(task.getLoginUrl())
.setPath(ZendeskConstants.Url.API + "/" + Target.ORGANIZATIONS.getJsonName())
.setParameter("per_page", "100")
.setParameter("page", "1")
.toString();
}
private String buildOrganizationWithUserURI(final String path)
{
return path.replace(".json", "")
+ "/users.json?per_page=100&page=1";
}
private String buildUserEventURI(final String userID)
{
final URIBuilder uriBuilder = ZendeskUtils.getURIBuilder(task.getLoginUrl())
.setPath(String.format(ZendeskConstants.Url.API_USER_EVENT, userID));
task.getUserEventSource().ifPresent(eventSource -> uriBuilder.setParameter("filter[source]", eventSource));
task.getUserEventType().ifPresent(eventType -> uriBuilder.setParameter("filter[type]", eventType));
task.getStartTime().ifPresent(startTime -> {
try {
uriBuilder.setParameter("filter[start_time]", ZendeskDateUtils.convertToDateTimeFormat(startTime, ZendeskConstants.Misc.ISO_INSTANT));
}
catch (DataException e) {
uriBuilder.setParameter("filter[start_time]", ZendeskDateUtils.convertToDateTimeFormat(Instant.EPOCH.toString(), ZendeskConstants.Misc.ISO_INSTANT));
}
});
task.getEndTime().ifPresent(endTime -> uriBuilder.setParameter("filter[end_time]", ZendeskDateUtils.convertToDateTimeFormat(endTime, ZendeskConstants.Misc.ISO_INSTANT)));
return uriBuilder.toString();
}
private JsonNode mockJsonNode()
{
try {
String mockData = "[\n" +
" {\n" +
" \"id\": \"5c7f31aef8df240001e60bbf\",\n" +
" \"type\": \"remove_from_cart\",\n" +
" \"source\": \"shopify\",\n" +
" \"description\": \"\",\n" +
" \"created_at\": \"2019-03-06T02:34:12.381847424Z\",\n" +
" \"received_at\": \"2019-03-06T02:34:12.381847424Z\",\n" +
" \"properties\": {\n" +
" \"model\": 221,\n" +
" \"size\": 6\n" +
" },\n" +
" \"user_id\": \"12312354234\"\n" +
" }\n" +
"]";
return new ObjectMapper().readTree(mockData);
}
catch (IOException ex) {
throw new RuntimeException("Can not create sample data " + ex.getMessage());
}
}
}