treasure-data/embulk-input-marketo

View on GitHub
src/main/java/org/embulk/input/marketo/CsvTokenizer.java

Summary

Maintainability
F
5 days
Test Coverage
package org.embulk.input.marketo;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Preconditions;
import org.embulk.config.ConfigException;
import org.embulk.spi.DataException;
import org.embulk.util.config.Config;
import org.embulk.util.config.ConfigDefault;
import org.embulk.util.config.Task;
import org.embulk.util.text.LineDecoder;
import org.embulk.util.text.Newline;
import org.slf4j.LoggerFactory;

import java.nio.charset.Charset;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Optional;

/**
 * Created by tai.khuu on 9/15/17.
 */
public class CsvTokenizer
{
    enum RecordState
    {
        NOT_END, END,
    }

    enum ColumnState
    {
        BEGIN, VALUE, QUOTED_VALUE, AFTER_QUOTED_VALUE, FIRST_TRIM, LAST_TRIM_OR_VALUE,
    }

    private static final char END_OF_LINE = '\0';
    static final char NO_QUOTE = '\0';
    static final char NO_ESCAPE = '\0';

    public interface PluginTask extends Task
    {
        @Config("charset")
        @ConfigDefault("\"utf-8\"")
        Charset getCharset();

        @Config("newline")
        @ConfigDefault("\"CRLF\"")
        Newline getNewline();

        @Config("delimiter")
        @ConfigDefault("\",\"")
        String getDelimiter();

        @Config("quote")
        @ConfigDefault("\"\\\"\"")
        Optional<QuoteCharacter> getQuoteChar();

        @Config("escape")
        @ConfigDefault("\"\\\\\"")
        Optional<EscapeCharacter> getEscapeChar();

        // Null value handling: if the CsvParser found 'non-quoted empty string's,
        // it replaces them to string that users specified like "\N", "NULL".
        @Config("null_string")
        @ConfigDefault("\"null\"")
        Optional<String> getNullString();

        @Config("trim_if_not_quoted")
        @ConfigDefault("false")
        boolean getTrimIfNotQuoted();

        @Config("max_quoted_size_limit")
        @ConfigDefault("131072") //128kB
        long getMaxQuotedSizeLimit();

        @Config("comment_line_marker")
        @ConfigDefault("null")
        Optional<String> getCommentLineMarker();
    }

    private final char delimiterChar;
    private final String delimiterFollowingString;
    private final char quote;
    private final char escape;
    private final String newline;
    private final boolean trimIfNotQuoted;
    private final long maxQuotedSizeLimit;
    private final String commentLineMarker;
    private final LineDecoder input;
    private final String nullStringOrNull;

    private RecordState recordState = RecordState.END;  // initial state is end of a record. nextRecord() must be called first
    private long lineNumber = 0;

    private String line = null;
    private int linePos = 0;
    private boolean wasQuotedColumn = false;
    private final List<String> quotedValueLines = new ArrayList<>();
    private final Deque<String> unreadLines = new ArrayDeque<>();

    public CsvTokenizer(LineDecoder input, PluginTask task)
    {
        this(task.getDelimiter(), task.getQuoteChar().orElse(QuoteCharacter.noQuote()).getCharacter(),
                task.getEscapeChar().orElse(EscapeCharacter.noEscape()).getCharacter(), task.getNewline().getString(),
                task.getTrimIfNotQuoted(), task.getMaxQuotedSizeLimit(), task.getCommentLineMarker().orElse(null), input, task.getNullString().orElse(null));
    }

    public CsvTokenizer(String delimiter, char quote, char escape, String newline, boolean trimIfNotQuoted, long maxQuotedSizeLimit, String commentLineMarker, LineDecoder input, String nullStringOrNull)
    {
        if (delimiter.length() == 0) {
            throw new ConfigException("Empty delimiter is not allowed");
        }
        else {
            this.delimiterChar = delimiter.charAt(0);
            if (delimiter.length() > 1) {
                delimiterFollowingString = delimiter.substring(1);
            }
            else {
                delimiterFollowingString = null;
            }
        }
        this.quote = quote;
        this.escape = escape;
        this.newline = newline;
        this.trimIfNotQuoted = trimIfNotQuoted;
        this.maxQuotedSizeLimit = maxQuotedSizeLimit;
        this.commentLineMarker = commentLineMarker;
        this.input = input;
        this.nullStringOrNull = nullStringOrNull;
    }

    public long getCurrentLineNumber()
    {
        return lineNumber;
    }

    public boolean skipHeaderLine()
    {
        boolean skipped = input.poll() != null;
        if (skipped) {
            lineNumber++;
        }
        return skipped;
    }

    // returns skipped line
    public String skipCurrentLine()
    {
        String skippedLine;
        if (quotedValueLines.isEmpty()) {
            skippedLine = line;
        }
        else {
            // recover lines of quoted value
            skippedLine = quotedValueLines.remove(0);  // TODO optimize performance
            unreadLines.addAll(quotedValueLines);
            lineNumber -= quotedValueLines.size();
            if (line != null) {
                unreadLines.add(line);
                lineNumber -= 1;
            }
            quotedValueLines.clear();
        }
        recordState = RecordState.END;
        return skippedLine;
    }

    public boolean nextFile()
    {
        boolean next = input.nextFile();
        if (next) {
            lineNumber = 0;
        }
        return next;
    }

    // used by guess-csv
    public boolean nextRecord()
    {
        return nextRecord(true);
    }

    public boolean nextRecord(boolean skipEmptyLine)
    {
        // If at the end of record, read the next line and initialize the state
        if (recordState != RecordState.END) {
            throw new TooManyColumnsException("Too many columns");
        }

        boolean hasNext = nextLine(skipEmptyLine);
        if (hasNext) {
            recordState = RecordState.NOT_END;
            return true;
        }
        else {
            return false;
        }
    }

    private boolean nextLine(boolean skipEmptyLine)
    {
        while (true) {
            if (!unreadLines.isEmpty()) {
                line = unreadLines.removeFirst();
            }
            else {
                line = input.poll();
                if (line == null) {
                    return false;
                }
            }
            linePos = 0;
            lineNumber++;

            boolean skip = skipEmptyLine && (
                    line.isEmpty() ||
                            (commentLineMarker != null && line.startsWith(commentLineMarker)));
            if (!skip) {
                return true;
            }
        }
    }

    public boolean hasNextColumn()
    {
        return recordState == RecordState.NOT_END;
    }

    public String nextColumn()
    {
        if (!hasNextColumn()) {
            throw new TooFewColumnsException("Too few columns");
        }

        // reset last state
        wasQuotedColumn = false;
        quotedValueLines.clear();

        // local state
        int valueStartPos = linePos;
        int valueEndPos = 0;  // initialized by VALUE state and used by LAST_TRIM_OR_VALUE and
        StringBuilder quotedValue = null;  // initial by VALUE or FIRST_TRIM state and used by QUOTED_VALUE state
        ColumnState columnState = ColumnState.BEGIN;

        while (true) {
            final char c = nextChar();

            switch (columnState) {
                case BEGIN:
                    // TODO optimization: state is BEGIN only at the first character of a column.
                    //      this block can be out of the looop.
                    if (isDelimiter(c)) {
                        // empty value
                        if (delimiterFollowingString == null) {
                            return "";
                        }
                        else if (isDelimiterFollowingFrom(linePos)) {
                            linePos += delimiterFollowingString.length();
                            return "";
                        }
                        // not a delimiter
                    }
                    if (isEndOfLine(c)) {
                        // empty value
                        recordState = RecordState.END;
                        return "";
                    }
                    else if (isSpace(c) && trimIfNotQuoted) {
                        columnState = ColumnState.FIRST_TRIM;
                    }
                    else if (isQuote(c)) {
                        valueStartPos = linePos;  // == 1
                        wasQuotedColumn = true;
                        quotedValue = new StringBuilder();
                        columnState = ColumnState.QUOTED_VALUE;
                    }
                    else {
                        columnState = ColumnState.VALUE;
                    }
                    break;

                case FIRST_TRIM:
                    if (isDelimiter(c)) {
                        // empty value
                        if (delimiterFollowingString == null) {
                            return "";
                        }
                        else if (isDelimiterFollowingFrom(linePos)) {
                            linePos += delimiterFollowingString.length();
                            return "";
                        }
                        // not a delimiter
                    }
                    if (isEndOfLine(c)) {
                        // empty value
                        recordState = RecordState.END;
                        return "";
                    }
                    else if (isQuote(c)) {
                        // column has heading spaces and quoted. TODO should this be rejected?
                        valueStartPos = linePos;
                        wasQuotedColumn = true;
                        quotedValue = new StringBuilder();
                        columnState = ColumnState.QUOTED_VALUE;
                    }
                    else if (isSpace(c)) {
                        // skip this character
                    } else {
                        valueStartPos = linePos - 1;
                        columnState = ColumnState.VALUE;
                    }
                    break;

                case VALUE:
                    if (isDelimiter(c)) {
                        if (delimiterFollowingString == null) {
                            return line.substring(valueStartPos, linePos - 1);
                        }
                        else if (isDelimiterFollowingFrom(linePos)) {
                            String value = line.substring(valueStartPos, linePos - 1);
                            linePos += delimiterFollowingString.length();
                            return value;
                        }
                        // not a delimiter
                    }
                    if (isEndOfLine(c)) {
                        recordState = RecordState.END;
                        return line.substring(valueStartPos, linePos);
                    }
                    else if (isSpace(c) && trimIfNotQuoted) {
                        valueEndPos = linePos - 1;  // this is possibly end of value
                        columnState = ColumnState.LAST_TRIM_OR_VALUE;

                        // TODO not implemented yet foo""bar""baz -> [foo, bar, baz].append
                        //} else if (isQuote(c)) {
                        //    // In RFC4180, If fields are not enclosed with double quotes, then
                        //    // double quotes may not appear inside the fields. But they are often
                        //    // included in the fields. We should care about them later.
                    }
                    else {
                        // keep VALUE state
                    }
                    break;

                case LAST_TRIM_OR_VALUE:
                    if (isDelimiter(c)) {
                        if (delimiterFollowingString == null) {
                            return line.substring(valueStartPos, valueEndPos);
                        }
                        else if (isDelimiterFollowingFrom(linePos)) {
                            linePos += delimiterFollowingString.length();
                            return line.substring(valueStartPos, valueEndPos);
                        }
                        else {
                            // not a delimiter
                        }
                    }
                    if (isEndOfLine(c)) {
                        recordState = RecordState.END;
                        return line.substring(valueStartPos, valueEndPos);
                    }
                    else if (isSpace(c)) {
                        // keep LAST_TRIM_OR_VALUE state
                    } else {
                        // this spaces are not trailing spaces. go back to VALUE state
                        columnState = ColumnState.VALUE;
                    }
                    break;

                case QUOTED_VALUE:
                    if (isEndOfLine(c)) {
                        // multi-line quoted value
                        quotedValue.append(line.substring(valueStartPos, linePos));
                        quotedValue.append(newline);
                        quotedValueLines.add(line);
                        if (!nextLine(false)) {
                            throw new InvalidValueException("Unexpected end of line during parsing a quoted value");
                        }
                        valueStartPos = 0;
                    }
                    else if (isQuote(c)) {
                        char next = peekNextChar();
                        if (isQuote(next)) { // escaped quote
                            quotedValue.append(line.substring(valueStartPos, linePos));
                            valueStartPos = ++linePos;
                        }
                        else {
                            quotedValue.append(line.substring(valueStartPos, linePos - 1));
                            columnState = ColumnState.AFTER_QUOTED_VALUE;
                        }
                    }
                    else if (isEscape(c)) {  // isQuote must be checked first in case of quote == escape
                        // In RFC 4180, CSV's escape char is '\"'. But '\\' is often used.
                        char next = peekNextChar();
                        if (isEndOfLine(c)) {
                            // escape end of line. TODO assuming multi-line quoted value without newline?
                            quotedValue.append(line.substring(valueStartPos, linePos));
                            quotedValueLines.add(line);
                            if (!nextLine(false)) {
                                throw new InvalidValueException("Unexpected end of line during parsing a quoted value");
                            }
                            valueStartPos = 0;
                        }
                        else if (isQuote(next) || isEscape(next)) { // escaped quote
                            quotedValue.append(line.substring(valueStartPos, linePos - 1));
                            quotedValue.append(next);
                            valueStartPos = ++linePos;
                        }
                    }
                    else {
                        if ((linePos - valueStartPos) + quotedValue.length() > maxQuotedSizeLimit) {
                            throw new QuotedSizeLimitExceededException("The size of the quoted value exceeds the limit size (" + maxQuotedSizeLimit + ")");
                        }
                        // keep QUOTED_VALUE state
                    }
                    break;

                case AFTER_QUOTED_VALUE:
                    if (isDelimiter(c)) {
                        if (delimiterFollowingString == null) {
                            return quotedValue.toString();
                        }
                        else if (isDelimiterFollowingFrom(linePos)) {
                            linePos += delimiterFollowingString.length();
                            return quotedValue.toString();
                        }
                        // not a delimiter
                    }
                    if (isEndOfLine(c)) {
                        recordState = RecordState.END;
                        return quotedValue.toString();
                    }
                    else if (isSpace(c)) {
                        // column has trailing spaces and quoted. TODO should this be rejected?
                    } else {
                        throw new InvalidValueException(String.format("Unexpected extra character '%c' after a value quoted by '%c'", c, quote));
                    }
                    break;

                default:
                    assert false;
            }
        }
    }

    public String nextColumnOrNull()
    {
        String v = nextColumn();
        if (nullStringOrNull == null) {
            if (v.isEmpty()) {
                if (wasQuotedColumn) {
                    return "";
                }
                else {
                    return null;
                }
            }
            else {
                return v;
            }
        }
        else {
            if (v.equals(nullStringOrNull)) {
                return null;
            }
            else {
                return v;
            }
        }
    }

    public boolean wasQuotedColumn()
    {
        return wasQuotedColumn;
    }

    private char nextChar()
    {
        Preconditions.checkState(line != null, "nextColumn is called after end of file");

        if (linePos >= line.length()) {
            return END_OF_LINE;
        }
        else {
            return line.charAt(linePos++);
        }
    }

    private char peekNextChar()
    {
        Preconditions.checkState(line != null, "peekNextChar is called after end of file");

        if (linePos >= line.length()) {
            return END_OF_LINE;
        }
        else {
            return line.charAt(linePos);
        }
    }

    private boolean isSpace(char c)
    {
        return c == ' ';
    }

    private boolean isDelimiterFollowingFrom(int pos)
    {
        if (line.length() < pos + delimiterFollowingString.length()) {
            return false;
        }
        for (int i = 0; i < delimiterFollowingString.length(); i++) {
            if (delimiterFollowingString.charAt(i) != line.charAt(pos + i)) {
                return false;
            }
        }
        return true;
    }

    private boolean isDelimiter(char c)
    {
        return c == delimiterChar;
    }

    private boolean isEndOfLine(char c)
    {
        return c == END_OF_LINE;
    }

    private boolean isQuote(char c)
    {
        return quote != NO_QUOTE && c == quote;
    }

    private boolean isEscape(char c)
    {
        return escape != NO_ESCAPE && c == escape;
    }

    public static class InvalidFormatException
            extends DataException
    {
        public InvalidFormatException(String message)
        {
            super(message);
        }
    }

    public static class InvalidValueException
            extends DataException
    {
        public InvalidValueException(String message)
        {
            super(message);
        }
    }

    public static class QuotedSizeLimitExceededException
            extends InvalidValueException
    {
        public QuotedSizeLimitExceededException(String message)
        {
            super(message);
        }
    }

    public class TooManyColumnsException
            extends InvalidFormatException
    {
        public TooManyColumnsException(String message)
        {
            super(message);
        }
    }

    public class TooFewColumnsException
            extends InvalidFormatException
    {
        public TooFewColumnsException(String message)
        {
            super(message);
        }
    }

    public static class QuoteCharacter
    {
        private final char character;

        public QuoteCharacter(char character)
        {
            this.character = character;
        }

        public static QuoteCharacter noQuote()
        {
            return new QuoteCharacter(CsvTokenizer.NO_QUOTE);
        }

        @JsonCreator
        public static QuoteCharacter ofString(String str)
        {
            if (str.length() >= 2) {
                throw new ConfigException("\"quote\" option accepts only 1 character.");
            }
            else if (str.isEmpty()) {
                LoggerFactory.getLogger(CsvTokenizer.class).warn("Setting '' (empty string) to \"quote\" option is obsoleted. Currently it becomes '\"' automatically but this behavior will be removed. Please set '\"' explicitly.");
                return new QuoteCharacter('"');
            }
            else {
                return new QuoteCharacter(str.charAt(0));
            }
        }

        @JsonIgnore
        public char getCharacter()
        {
            return character;
        }

        @JsonValue
        public String getOptionalString()
        {
            return new String(new char[] { character });
        }

        @Override
        public int hashCode()
        {
            final int prime = 31;
            int result = 1;
            result = prime * result + character;
            return result;
        }

        @Override
        public boolean equals(Object obj)
        {
            if (!(obj instanceof QuoteCharacter)) {
                return false;
            }
            QuoteCharacter o = (QuoteCharacter) obj;
            return character == o.character;
        }
    }

    public static class EscapeCharacter
    {
        private final char character;

        public EscapeCharacter(char character)
        {
            this.character = character;
        }

        public static EscapeCharacter noEscape()
        {
            return new EscapeCharacter(CsvTokenizer.NO_ESCAPE);
        }

        @JsonCreator
        public static EscapeCharacter ofString(String str)
        {
            if (str.length() >= 2) {
                throw new ConfigException("\"escape\" option accepts only 1 character.");
            }
            else if (str.isEmpty()) {
                LoggerFactory.getLogger(CsvTokenizer.class).warn("Setting '' (empty string) to \"escape\" option is obsoleted. Currently it becomes null automatically but this behavior will be removed. Please set \"escape: null\" explicitly.");
                return noEscape();
            }
            else {
                return new EscapeCharacter(str.charAt(0));
            }
        }

        @JsonIgnore
        public char getCharacter()
        {
            return character;
        }

        @JsonValue
        public String getOptionalString()
        {
            return new String(new char[] { character });
        }

        @Override
        public boolean equals(Object obj)
        {
            if (!(obj instanceof EscapeCharacter)) {
                return false;
            }
            EscapeCharacter o = (EscapeCharacter) obj;
            return character == o.character;
        }

        @Override
        public int hashCode()
        {
            final int prime = 31;
            int result = 1;
            result = prime * result + character;
            return result;
        }
    }
}