fluent/fluentd

View on GitHub
lib/fluent/plugin/parser.rb

Summary

Maintainability
C
1 day
Test Coverage
#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'fluent/plugin/base'
require 'fluent/plugin/owned_by_mixin'

require 'fluent/error'
require 'fluent/mixin' # for TypeConverter
require 'fluent/time'
require 'fluent/plugin/string_util'

require 'serverengine/blocking_flag'

module Fluent
  module Plugin
    class Parser < Base
      class TimeoutChecker
        # This implementation now uses mutex because parser is typically used in input.
        # If this has a performance issue under high concurreny, use concurrent-ruby's map instead.
        def initialize(timeout)
          @map = {}
          @flag = ServerEngine::BlockingFlag.new
          @mutex = Mutex.new
          @timeout = timeout
          @timeout_checker = nil
        end

        def start
          @thread = ::Thread.new {
            until @flag.wait_for_set(0.5)
              now = Time.now
              @mutex.synchronize {
                @map.keys.each { |th|
                  time = @map[th]
                  if now - time > @timeout
                    th.raise UncatchableError, "parsing timed out"
                    @map.delete(th)
                  end
                }
              }
            end
          }
        end

        def stop
          @flag.set!
          @thread.join
        end

        def execute
          th = Thread.current
          @mutex.synchronize { @map[th] = Time.now }
          yield
        ensure
          # Need clean up here because if next event is delayed, incorrect exception will be raised in normal flow.
          @mutex.synchronize { @map.delete(th) }
        end
      end

      include OwnedByMixin
      include TimeMixin::Parser

      class ParserError < StandardError; end

      configured_in :parse

      ### types can be specified as string-based hash style
      # field1:type, field2:type, field3:type:option, field4:type:option
      ### or, JSON format
      # {"field1":"type", "field2":"type", "field3":"type:option", "field4":"type:option"}
      config_param :types, :hash, value_type: :string, default: nil

      # available options are:
      # array: (1st) delimiter
      # time : type[, format, timezone] -> type should be a valid "time_type"(string/unixtime/float)
      #      : format[, timezone]

      config_param :time_key, :string, default: nil
      config_param :null_value_pattern, :regexp, default: nil
      config_param :null_empty_string, :bool, default: false
      config_param :estimate_current_event, :bool, default: true
      config_param :keep_time_key, :bool, default: false
      config_param :timeout, :time, default: nil

      AVAILABLE_PARSER_VALUE_TYPES = ['string', 'integer', 'float', 'bool', 'time', 'array']

      # for tests
      attr_reader :type_converters

      PARSER_TYPES = [:text_per_line, :text, :binary]
      def parser_type
        :text_per_line
      end

      def initialize
        super

        @timeout_checker = nil
      end

      def configure(conf)
        super

        @time_parser = time_parser_create
        @type_converters = build_type_converters(@types)
        @execute_convert_values = @type_converters || @null_value_pattern || @null_empty_string
        @timeout_checker = if @timeout
                             class << self
                               alias_method :parse_orig, :parse
                               alias_method :parse, :parse_with_timeout
                             end
                             TimeoutChecker.new(@timeout)
                           else
                             nil
                           end
      end

      def start
        super

        @timeout_checker.start if @timeout_checker
      end

      def stop
        super

        @timeout_checker.stop if @timeout_checker
      end

      def parse(text, &block)
        raise NotImplementedError, "Implement this method in child class"
      end

      def parse_with_timeout(text, &block)
        @timeout_checker.execute {
          parse_orig(text, &block)
        }
      rescue UncatchableError
        log.warn "parsing timed out with #{self.class}: text = #{text}"
        # Return nil instead of raising error. in_tail or other plugin can emit broken line.
        yield nil, nil
      end

      def call(*a, &b)
        # Keep backward compatibility for existing plugins
        # TODO: warn when deprecated
        parse(*a, &b)
      end

      def implement?(feature)
        methods_of_plugin = self.class.instance_methods(false)
        case feature
        when :parse_io then methods_of_plugin.include?(:parse_io)
        when :parse_partial_data then methods_of_plugin.include?(:parse_partial_data)
        else
          raise ArgumentError, "Unknown feature for parser plugin: #{feature}"
        end
      end

      def parse_io(io, &block)
        raise NotImplementedError, "Optional API #parse_io is not implemented"
      end

      def parse_partial_data(data, &block)
        raise NotImplementedError, "Optional API #parse_partial_data is not implemented"
      end

      def parse_time(record)
        if @time_key && record.respond_to?(:has_key?) && record.has_key?(@time_key)
          src = if @keep_time_key
                  record[@time_key]
                else
                  record.delete(@time_key)
                end
          @time_parser.parse(src)
        elsif @estimate_current_event
          Fluent::EventTime.now
        else
          nil
        end
      rescue Fluent::TimeParser::TimeParseError => e
        raise ParserError, e.message
      end

      # def parse(text, &block)
      #   time, record = convert_values(time, record)
      #   yield time, record
      # end
      def convert_values(time, record)
        return time, record unless @execute_convert_values

        record.each_key do |key|
          value = record[key]
          next unless value # nil/null value is always left as-is.

          if value.is_a?(String) && string_like_null(value)
            record[key] = nil
            next
          end

          if @type_converters && @type_converters.has_key?(key)
            record[key] = @type_converters[key].call(value)
          end
        end

        return time, record
      end

      def string_like_null(value, null_empty_string = @null_empty_string, null_value_regexp = @null_value_pattern)
        null_empty_string && value.empty? || null_value_regexp && string_safe_encoding(value){|s| null_value_regexp.match(s) }
      end

      TRUTHY_VALUES = ['true', 'yes', '1']

      def build_type_converters(types)
        return nil unless types

        converters = {}

        types.each_pair do |field_name, type_definition|
          type, option = type_definition.split(":", 2)
          unless AVAILABLE_PARSER_VALUE_TYPES.include?(type)
            raise Fluent::ConfigError, "unknown value conversion for key:'#{field_name}', type:'#{type}'"
          end

          conv = case type
                 when 'string' then ->(v){ v.to_s }
                 when 'integer' then ->(v){ v.to_i rescue v.to_s.to_i }
                 when 'float' then ->(v){ v.to_f rescue v.to_s.to_f }
                 when 'bool' then ->(v){ TRUTHY_VALUES.include?(v.to_s.downcase) }
                 when 'time'
                   # comma-separated: time:[timezone:]time_format
                   # time_format is unixtime/float/string-time-format
                   timep = if option
                             time_type = 'string' # estimate
                             timezone, time_format = option.split(':', 2)
                             unless Fluent::Timezone.validate(timezone)
                               timezone, time_format = nil, option
                             end
                             if Fluent::TimeMixin::TIME_TYPES.include?(time_format)
                               time_type, time_format = time_format, nil # unixtime/float
                             end
                             time_parser_create(type: time_type.to_sym, format: time_format, timezone: timezone)
                           else
                             time_parser_create(type: :string, format: nil, timezone: nil)
                           end
                   ->(v){ timep.parse(v) rescue nil }
                 when 'array'
                   delimiter = option ? option.to_s : ','
                   ->(v){ string_safe_encoding(v.to_s){|s| s.split(delimiter) } }
                 else
                   raise "BUG: unknown type even after check: #{type}"
                 end
          converters[field_name] = conv
        end

        converters
      end
    end
  end
end