lib/fluent/plugin/parser_csv.rb
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
require 'fluent/plugin/parser'
require 'csv'
module Fluent
module Plugin
class CSVParser < Parser
Plugin.register_parser('csv', self)
desc 'Names of fields included in each lines'
config_param :keys, :array, value_type: :string
desc 'The delimiter character (or string) of CSV values'
config_param :delimiter, :string, default: ','
desc 'The parser type used to parse CSV line'
config_param :parser_engine, :enum, list: [:normal, :fast], default: :normal, alias: :parser_type
def configure(conf)
super
if @parser_engine == :fast
@quote_char = '"'
@escape_pattern = Regexp.compile(@quote_char * 2)
m = method(:parse_fast)
self.singleton_class.module_eval do
define_method(:parse, m)
end
end
end
def parse(text, &block)
values = CSV.parse_line(text, col_sep: @delimiter)
r = Hash[@keys.zip(values)]
time, record = convert_values(parse_time(r), r)
yield time, record
end
def parse_fast(text, &block)
r = parse_fast_internal(text)
time, record = convert_values(parse_time(r), r)
yield time, record
end
# CSV.parse_line is too slow due to initialize lots of object and
# CSV module doesn't provide the efficient method for parsing single line.
# This method avoids the overhead of CSV.parse_line for typical patterns
def parse_fast_internal(text)
record = {}
text.chomp!
return record if text.empty?
# use while because while is now faster than each_with_index
columns = text.split(@delimiter, -1)
num_columns = columns.size
i = 0
j = 0
while j < num_columns
column = columns[j]
case column.count(@quote_char)
when 0
if column.empty?
column = nil
end
when 1
if column.start_with?(@quote_char)
to_merge = [column]
j += 1
while j < num_columns
merged_col = columns[j]
to_merge << merged_col
break if merged_col.end_with?(@quote_char)
j += 1
end
column = to_merge.join(@delimiter)[1..-2]
end
when 2
if column.start_with?(@quote_char) && column.end_with?(@quote_char)
column = column[1..-2]
end
else
if column.start_with?(@quote_char) && column.end_with?(@quote_char)
column = column[1..-2]
end
column.gsub!(@escape_pattern, @quote_char)
end
record[@keys[i]] = column
j += 1
i += 1
end
record
end
end
end
end