services/importer/lib/importer/csv_normalizer.rb
require 'csv'
require 'charlock_holmes'
require 'tempfile'
require 'fileutils'
require_relative './job'
require_relative './source_file'
require_relative './unp'
module CartoDB
module Importer2
class CsvNormalizer
LINE_SIZE_FOR_CLEANING = 5000
LINES_FOR_DETECTION = 100 # How many lines to read?
SAMPLE_READ_LIMIT = 500000 # Read big enough sample bytes for the encoding sampling
COMMON_DELIMITERS = [',', "\t", ' ', ';', '|'].freeze
DELIMITER_WEIGHTS = { ',' => 2, "\t" => 2, ' ' => 1, ';' => 2, '|' => 2 }.freeze
DEFAULT_DELIMITER = ','
DEFAULT_ENCODING = 'UTF-8'
DEFAULT_QUOTE = '"'
OUTPUT_DELIMITER = ',' # Normalized CSVs will use this delimiter
ENCODING_CONFIDENCE = 28
ACCEPTABLE_ENCODINGS = %w{ ISO-8859-1 ISO-8859-2 UTF-8 }
LINE_FEED = "\x0A"
CARRIAGE_RETURN = "\x0D"
def initialize(filepath, job = nil, importer_config = nil)
@filepath = filepath
@job = job || Job.new
@delimiter = nil
@force_normalize = false
@encoding = nil
@importer_config = importer_config
end
def force_normalize
@force_normalize = true
end
# @throws MalformedCSVException
def run
return self unless File.exists?(filepath)
detect_delimiter
begin
return self unless (needs_normalization? || @force_normalize)
rescue CSV::MalformedCSVError => ex
raise MalformedCSVException.new(ex.message)
end
normalize(temporary_filepath)
release
File.rename(temporary_filepath, filepath)
FileUtils.rm_rf(temporary_directory)
self.temporary_directory = nil
self
end
def detect_delimiter
# Calculate variances of the N first lines for each delimiter, then grab the one that changes less
@delimiter = DEFAULT_DELIMITER unless first_line
# Flag to skip newlines contained inside string values
skip_line = false
lines_for_detection = Array.new
LINES_FOR_DETECTION.times {
line = stream.gets
skip_line ^= true if line&.count("\"")&.odd? # Toggle skip line mode
lines_for_detection << remove_quoted_strings(line) unless !line || skip_line
}
stream.rewind
# Maybe gets was not able to discern line breaks, try manually:
if lines_for_detection.size == 1
lines_for_detection = lines_for_detection.first
# Did it read as columns instead of rows?
if lines_for_detection.class == Array
lines_for_detection.first
end
# Carriage return without newline
lines_for_detection = lines_for_detection.split(CARRIAGE_RETURN)
end
occurrences = Hash[
COMMON_DELIMITERS.map { |delimiter|
[delimiter, lines_for_detection.map { |line|
line.count(delimiter) }]
}
]
stream.rewind
variances = Hash.new
@delimiter = DEFAULT_DELIMITER
use_variance = true
occurrences.each { |key, values|
if values.length > 1
variances[key] = sample_variance(values) unless values.first == 0
elsif values.length == 1
# If only detected a single line of data, cannot use variance
variances[key] = values.first * DELIMITER_WEIGHTS[key]
use_variance = false
else
use_variance = false
end
}
if variances.length > 0
if use_variance
@delimiter = variances.sort {|a, b| a.last <=> b.last }.first.first
else
# Use whatever delimiter appears more and hope for the best
@delimiter = variances.sort {|a, b| b.last <=> a.last }.first.first
end
end
@delimiter
end
def self.supported?(extension)
%w(.csv .tsv .txt).include?(extension)
end
def normalize(temporary_filepath)
temporary_csv = CSV.open(temporary_filepath, 'w', col_sep: OUTPUT_DELIMITER, encoding: 'UTF-8')
CSV.open(filepath, "rb:#{encoding}", col_sep: @delimiter) do |input|
loop do
begin
row = input.shift
break unless row
rescue CSV::MalformedCSVError
next
end
temporary_csv << multiple_column(row)
end
end
# TODO: it would be nice to detect and warn the user about ignored
# malformed rows (but probably not about malformed empty lines, such
# as trailing \n\r\n seen in some cases)
temporary_csv.close
@delimiter = OUTPUT_DELIMITER
rescue ArgumentError, Encoding::UndefinedConversionError, Encoding::InvalidByteSequenceError => e
raise EncodingDetectionError
end
def temporary_filepath(filename_prefix = '')
File.join(temporary_directory, filename_prefix + File.basename(filepath))
end
def csv_options
{
col_sep: delimiter,
quote_char: DEFAULT_QUOTE
}
end
def needs_normalization?
(!ACCEPTABLE_ENCODINGS.include?(encoding)) ||
(delimiter != DEFAULT_DELIMITER) ||
single_column?
end
def single_column?
columns = ::CSV.parse(first_line, csv_options)
raise EmptyFileError.new if !columns.any?
columns.first.length < 2
end
def multiple_column(row)
return row if row.length > 1
row << nil
end
def delimiter
@delimiter
end
def encoding
return @encoding unless @encoding.nil?
source_file = SourceFile.new(filepath)
if source_file.encoding
@encoding = source_file.encoding
else
data = File.open(filepath, 'r')
sample = data.read(SAMPLE_READ_LIMIT)
data.close
result = CharlockHolmes::EncodingDetector.detect(sample)
# Looks like an ICU problem https://github.com/brianmario/charlock_holmes/issues/38
@encoding = if result.fetch(:encoding, 'UTF-8') == 'IBM424_rtl'
DEFAULT_ENCODING
elsif result.fetch(:confidence, 0) < ENCODING_CONFIDENCE
DEFAULT_ENCODING
else
result.fetch(:encoding, DEFAULT_ENCODING)
end
end
@encoding
rescue StandardError
DEFAULT_ENCODING
end
def first_line
return @first_line if @first_line
stream.rewind
@first_line ||= stream.gets || ''
stream.rewind
@first_line
end
def release
@stream.close
@stream = nil
@first_line = nil
self
end
def stream
@stream ||= File.open(filepath, 'rb')
end
attr_reader :filepath
alias_method :converted_filepath, :filepath
private
def generate_temporary_directory
self.temporary_directory = Unp.new(@importer_config).generate_temporary_directory.temporary_directory
self
end
def temporary_directory
generate_temporary_directory unless @temporary_directory
@temporary_directory
end
def sum(items_list)
items_list.inject(0){|accum, i| accum + i }
end
def mean(items_list)
sum(items_list) / items_list.length.to_f
end
def sample_variance(items_list)
m = mean(items_list)
sum = items_list.inject(0){|accum, i| accum + (i-m)**2 }
sum / (items_list.length - 1).to_f
end
def remove_quoted_strings(input)
# Note that CSV quoted strings can use double quotes, `""`
# as a way of escaping a single quote `"`
# Since we're just removing all quoted strings, this simple
# approach works in that case too.
input.gsub(/"[^\\"]*"/, '')
end
attr_writer :temporary_directory
end
end
end