lib/fluent/plugin/parser_json.rb
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
require 'fluent/plugin/parser'
require 'fluent/time'
require 'fluent/oj_options'
require 'yajl'
require 'json'
module Fluent
module Plugin
class JSONParser < Parser
Plugin.register_parser('json', self)
config_set_default :time_key, 'time'
desc 'Set JSON parser'
config_param :json_parser, :enum, list: [:oj, :yajl, :json], default: :oj
# The Yajl library defines a default buffer size of 8KiB when parsing
# from IO streams, so maintain this for backwards-compatibility.
# https://www.rubydoc.info/github/brianmario/yajl-ruby/Yajl%2FParser:parse
desc 'Set the buffer size that Yajl will use when parsing streaming input'
config_param :stream_buffer_size, :integer, default: 8192
config_set_default :time_type, :float
def configure(conf)
if conf.has_key?('time_format')
conf['time_type'] ||= 'string'
end
super
@load_proc, @error_class = configure_json_parser(@json_parser)
end
def configure_json_parser(name)
case name
when :oj
return [Oj.method(:load), Oj::ParseError] if Fluent::OjOptions.available?
log&.info "Oj is not installed, and failing back to Yajl for json parser"
configure_json_parser(:yajl)
when :json then [JSON.method(:load), JSON::ParserError]
when :yajl then [Yajl.method(:load), Yajl::ParseError]
else
raise "BUG: unknown json parser specified: #{name}"
end
end
def parse(text)
parsed_json = @load_proc.call(text)
if parsed_json.is_a?(Hash)
time, record = parse_one_record(parsed_json)
yield time, record
elsif parsed_json.is_a?(Array)
parsed_json.each do |record|
unless record.is_a?(Hash)
yield nil, nil
next
end
time, parsed_record = parse_one_record(record)
yield time, parsed_record
end
else
yield nil, nil
end
rescue @error_class, EncodingError # EncodingError is for oj 3.x or later
yield nil, nil
end
def parse_one_record(record)
time = parse_time(record)
convert_values(time, record)
end
def parser_type
:text
end
def parse_io(io, &block)
y = Yajl::Parser.new
y.on_parse_complete = ->(record){
block.call(parse_time(record), record)
}
y.parse(io, @stream_buffer_size)
end
end
end
end