lib/fluent/plugin/parser.rb
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
require 'fluent/plugin/base'
require 'fluent/plugin/owned_by_mixin'
require 'fluent/error'
require 'fluent/mixin' # for TypeConverter
require 'fluent/time'
require 'fluent/plugin/string_util'
require 'serverengine/blocking_flag'
module Fluent
module Plugin
class Parser < Base
class TimeoutChecker
# This implementation now uses mutex because parser is typically used in input.
# If this has a performance issue under high concurreny, use concurrent-ruby's map instead.
def initialize(timeout)
@map = {}
@flag = ServerEngine::BlockingFlag.new
@mutex = Mutex.new
@timeout = timeout
@timeout_checker = nil
end
def start
@thread = ::Thread.new {
until @flag.wait_for_set(0.5)
now = Time.now
@mutex.synchronize {
@map.keys.each { |th|
time = @map[th]
if now - time > @timeout
th.raise UncatchableError, "parsing timed out"
@map.delete(th)
end
}
}
end
}
end
def stop
@flag.set!
@thread.join
end
def execute
th = Thread.current
@mutex.synchronize { @map[th] = Time.now }
yield
ensure
# Need clean up here because if next event is delayed, incorrect exception will be raised in normal flow.
@mutex.synchronize { @map.delete(th) }
end
end
include OwnedByMixin
include TimeMixin::Parser
class ParserError < StandardError; end
configured_in :parse
### types can be specified as string-based hash style
# field1:type, field2:type, field3:type:option, field4:type:option
### or, JSON format
# {"field1":"type", "field2":"type", "field3":"type:option", "field4":"type:option"}
config_param :types, :hash, value_type: :string, default: nil
# available options are:
# array: (1st) delimiter
# time : type[, format, timezone] -> type should be a valid "time_type"(string/unixtime/float)
# : format[, timezone]
config_param :time_key, :string, default: nil
config_param :null_value_pattern, :regexp, default: nil
config_param :null_empty_string, :bool, default: false
config_param :estimate_current_event, :bool, default: true
config_param :keep_time_key, :bool, default: false
config_param :timeout, :time, default: nil
AVAILABLE_PARSER_VALUE_TYPES = ['string', 'integer', 'float', 'bool', 'time', 'array']
# for tests
attr_reader :type_converters
PARSER_TYPES = [:text_per_line, :text, :binary]
def parser_type
:text_per_line
end
def initialize
super
@timeout_checker = nil
end
def configure(conf)
super
@time_parser = time_parser_create
@type_converters = build_type_converters(@types)
@execute_convert_values = @type_converters || @null_value_pattern || @null_empty_string
@timeout_checker = if @timeout
class << self
alias_method :parse_orig, :parse
alias_method :parse, :parse_with_timeout
end
TimeoutChecker.new(@timeout)
else
nil
end
end
def start
super
@timeout_checker.start if @timeout_checker
end
def stop
super
@timeout_checker.stop if @timeout_checker
end
def parse(text, &block)
raise NotImplementedError, "Implement this method in child class"
end
def parse_with_timeout(text, &block)
@timeout_checker.execute {
parse_orig(text, &block)
}
rescue UncatchableError
log.warn "parsing timed out with #{self.class}: text = #{text}"
# Return nil instead of raising error. in_tail or other plugin can emit broken line.
yield nil, nil
end
def call(*a, &b)
# Keep backward compatibility for existing plugins
# TODO: warn when deprecated
parse(*a, &b)
end
def implement?(feature)
methods_of_plugin = self.class.instance_methods(false)
case feature
when :parse_io then methods_of_plugin.include?(:parse_io)
when :parse_partial_data then methods_of_plugin.include?(:parse_partial_data)
else
raise ArgumentError, "Unknown feature for parser plugin: #{feature}"
end
end
def parse_io(io, &block)
raise NotImplementedError, "Optional API #parse_io is not implemented"
end
def parse_partial_data(data, &block)
raise NotImplementedError, "Optional API #parse_partial_data is not implemented"
end
def parse_time(record)
if @time_key && record.respond_to?(:has_key?) && record.has_key?(@time_key)
src = if @keep_time_key
record[@time_key]
else
record.delete(@time_key)
end
@time_parser.parse(src)
elsif @estimate_current_event
Fluent::EventTime.now
else
nil
end
rescue Fluent::TimeParser::TimeParseError => e
raise ParserError, e.message
end
# def parse(text, &block)
# time, record = convert_values(time, record)
# yield time, record
# end
def convert_values(time, record)
return time, record unless @execute_convert_values
record.each_key do |key|
value = record[key]
next unless value # nil/null value is always left as-is.
if value.is_a?(String) && string_like_null(value)
record[key] = nil
next
end
if @type_converters && @type_converters.has_key?(key)
record[key] = @type_converters[key].call(value)
end
end
return time, record
end
def string_like_null(value, null_empty_string = @null_empty_string, null_value_regexp = @null_value_pattern)
null_empty_string && value.empty? || null_value_regexp && string_safe_encoding(value){|s| null_value_regexp.match(s) }
end
TRUTHY_VALUES = ['true', 'yes', '1']
def build_type_converters(types)
return nil unless types
converters = {}
types.each_pair do |field_name, type_definition|
type, option = type_definition.split(":", 2)
unless AVAILABLE_PARSER_VALUE_TYPES.include?(type)
raise Fluent::ConfigError, "unknown value conversion for key:'#{field_name}', type:'#{type}'"
end
conv = case type
when 'string' then ->(v){ v.to_s }
when 'integer' then ->(v){ v.to_i rescue v.to_s.to_i }
when 'float' then ->(v){ v.to_f rescue v.to_s.to_f }
when 'bool' then ->(v){ TRUTHY_VALUES.include?(v.to_s.downcase) }
when 'time'
# comma-separated: time:[timezone:]time_format
# time_format is unixtime/float/string-time-format
timep = if option
time_type = 'string' # estimate
timezone, time_format = option.split(':', 2)
unless Fluent::Timezone.validate(timezone)
timezone, time_format = nil, option
end
if Fluent::TimeMixin::TIME_TYPES.include?(time_format)
time_type, time_format = time_format, nil # unixtime/float
end
time_parser_create(type: time_type.to_sym, format: time_format, timezone: timezone)
else
time_parser_create(type: :string, format: nil, timezone: nil)
end
->(v){ timep.parse(v) rescue nil }
when 'array'
delimiter = option ? option.to_s : ','
->(v){ string_safe_encoding(v.to_s){|s| s.split(delimiter) } }
else
raise "BUG: unknown type even after check: #{type}"
end
converters[field_name] = conv
end
converters
end
end
end
end