fluent/fluentd

View on GitHub
lib/fluent/plugin_helper/compat_parameters.rb

Summary

Maintainability
D
2 days
Test Coverage
#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'fluent/config/types'
require 'fluent/config/element'

module Fluent
  module PluginHelper
    module CompatParameters
      # This plugin helper is to bring old-fashioned buffer/other
      # configuration parameters to v0.14 plugin API configurations.
      # This helper is mainly to convert plugins from v0.12 API
      # to v0.14 API safely, without breaking user deployment.

      BUFFER_PARAMS = {
        "buffer_type" => "@type",
        "buffer_path" => "path",
        "num_threads"                 => "flush_thread_count",
        "flush_interval"              => "flush_interval",
        "try_flush_interval"          => "flush_thread_interval",
        "queued_chunk_flush_interval" => "flush_thread_burst_interval",
        "disable_retry_limit" => "retry_forever",
        "retry_limit"         => "retry_max_times",
        "max_retry_wait"      => "retry_max_interval",
        "buffer_chunk_limit"  => "chunk_limit_size",
        "buffer_queue_limit"  => "queue_limit_length",
        "buffer_queue_full_action" => "overflow_action",
        "flush_at_shutdown" => "flush_at_shutdown",
      }

      BUFFER_TIME_SLICED_PARAMS = {
        "time_slice_format" => nil,
        "time_slice_wait" => "timekey_wait",
        "timezone" => "timekey_zone",
      }

      PARSER_PARAMS = {
        "format" => nil,
        "types" => nil,
        "types_delimiter" => nil,
        "types_label_delimiter" => nil,
        "keys" => "keys", # CSVParser, TSVParser (old ValuesParser)
        "time_key"    => "time_key",
        "time_format" => "time_format",
        "localtime" => nil,
        "utc" => nil,
        "delimiter"   => "delimiter",
        "keep_time_key" => "keep_time_key",
        "null_empty_string" => "null_empty_string",
        "null_value_pattern" => "null_value_pattern",
        "json_parser"      => "json_parser", # JSONParser
        "label_delimiter"  => "label_delimiter", # LabeledTSVParser
        "format_firstline" => "format_firstline", # MultilineParser
        "message_key"      => "message_key", # NoneParser
        "with_priority"       => "with_priority", # SyslogParser
        "message_format"      => "message_format", # SyslogParser
        "rfc5424_time_format" => "rfc5424_time_format", # SyslogParser
        # There has been no parsers which can handle timezone in v0.12
      }

      INJECT_PARAMS = {
        "include_time_key" => nil,
        "time_key"    => "time_key",
        "time_format" => "time_format",
        "timezone"    => "timezone",
        "include_tag_key" => nil,
        "tag_key" => "tag_key",
        "localtime" => nil,
        "utc" => nil,
      }

      EXTRACT_PARAMS = {
        "time_key"    => "time_key",
        "time_format" => "time_format",
        "timezone"    => "timezone",
        "tag_key" => "tag_key",
        "localtime" => nil,
        "utc" => nil,
      }

      FORMATTER_PARAMS = {
        "format" => "@type",
        "delimiter" => "delimiter",
        "force_quotes" => "force_quotes", # CsvFormatter
        "keys" => "keys", # TSVFormatter
        "fields" => "fields", # CsvFormatter
        "json_parser" => "json_parser", # JSONFormatter
        "label_delimiter" => "label_delimiter", # LabeledTSVFormatter
        "output_time" => "output_time", # OutFileFormatter
        "output_tag"  => "output_tag",  # OutFileFormatter
        "localtime"   => "localtime",   # OutFileFormatter
        "utc"         => "utc",         # OutFileFormatter
        "timezone"    => "timezone",    # OutFileFormatter
        "message_key" => "message_key", # SingleValueFormatter
        "add_newline" => "add_newline", # SingleValueFormatter
        "output_type" => "output_type", # StdoutFormatter
      }

      def compat_parameters_convert(conf, *types, **kwargs)
        types.each do |type|
          case type
          when :buffer
            compat_parameters_buffer(conf, **kwargs)
          when :inject
            compat_parameters_inject(conf)
          when :extract
            compat_parameters_extract(conf)
          when :parser
            compat_parameters_parser(conf)
          when :formatter
            compat_parameters_formatter(conf)
          else
            raise "BUG: unknown compat_parameters type: #{type}"
          end
        end

        conf
      end

      def compat_parameters_buffer(conf, default_chunk_key: '')
        # return immediately if <buffer> section exists, or any buffer-related parameters don't exist
        return unless conf.elements('buffer').empty?
        return if (BUFFER_PARAMS.keys + BUFFER_TIME_SLICED_PARAMS.keys).all?{|k| !conf.has_key?(k) }

        # TODO: warn obsolete parameters if these are deprecated
        buffer_params = BUFFER_PARAMS.merge(BUFFER_TIME_SLICED_PARAMS)
        hash = compat_parameters_copy_to_subsection_attributes(conf, buffer_params) do |compat_key, value|
          if compat_key == 'buffer_queue_full_action' && value == 'exception'
            'throw_exception'
          else
            value
          end
        end

        chunk_key = default_chunk_key

        if conf.has_key?('time_slice_format')
          chunk_key = 'time'
          hash['timekey'] = case conf['time_slice_format']
                            when /\%S/ then 1
                            when /\%M/ then 60
                            when /\%H/ then 3600
                            when /\%d/ then 86400
                            else
                              raise Fluent::ConfigError, "time_slice_format only with %Y or %m is too long"
                            end
          if conf.has_key?('localtime') || conf.has_key?('utc')
            if conf.has_key?('localtime') && conf.has_key?('utc')
              raise Fluent::ConfigError, "both of utc and localtime are specified, use only one of them"
            elsif conf.has_key?('localtime')
              hash['timekey_use_utc'] = !(Fluent::Config.bool_value(conf['localtime']))
            elsif conf.has_key?('utc')
              hash['timekey_use_utc'] = Fluent::Config.bool_value(conf['utc'])
            end
          end
        else
          if chunk_key == 'time'
            hash['timekey'] = 86400 # TimeSliceOutput.time_slice_format default value is '%Y%m%d'
          end
        end

        e = Fluent::Config::Element.new('buffer', chunk_key, hash, [])
        conf.elements << e

        conf
      end

      def compat_parameters_inject(conf)
        return unless conf.elements('inject').empty?
        return if INJECT_PARAMS.keys.all?{|k| !conf.has_key?(k) }

        # TODO: warn obsolete parameters if these are deprecated
        hash = compat_parameters_copy_to_subsection_attributes(conf, INJECT_PARAMS)

        if conf.has_key?('include_time_key') && Fluent::Config.bool_value(conf['include_time_key'])
          hash['time_key'] ||= 'time'
          hash['time_type'] ||= 'string'
        end
        if conf.has_key?('time_as_epoch') && Fluent::Config.bool_value(conf['time_as_epoch'])
          hash['time_key'] ||= 'time'
          hash['time_type'] = 'unixtime'
        end
        if conf.has_key?('localtime') || conf.has_key?('utc')
          utc = to_bool(conf['utc'])
          localtime = to_bool(conf['localtime'])
          if conf.has_key?('localtime') && conf.has_key?('utc') && !(localtime ^ utc)
            raise Fluent::ConfigError, "both of utc and localtime are specified, use only one of them"
          elsif conf.has_key?('localtime')
            hash['localtime'] = Fluent::Config.bool_value(conf['localtime'])
          elsif conf.has_key?('utc')
            hash['localtime'] = !(Fluent::Config.bool_value(conf['utc']))
            # Specifying "localtime false" means using UTC in TimeFormatter
            # And specifying "utc" is different from specifying "timezone +0000"(it's not always UTC).
            # There are difference between "Z" and "+0000" in timezone formatting.
            # TODO: add kwargs to TimeFormatter to specify "using localtime", "using UTC" or "using specified timezone" in more explicit way
          end
        end

        if conf.has_key?('include_tag_key') && Fluent::Config.bool_value(conf['include_tag_key'])
          hash['tag_key'] ||= 'tag'
        end

        e = Fluent::Config::Element.new('inject', '', hash, [])
        conf.elements << e

        conf
      end

      def to_bool(v)
        if  v.is_a?(FalseClass) || v == 'false' || v.nil?
          false
        else
          true
        end
      end

      def compat_parameters_extract(conf)
        return unless conf.elements('extract').empty?
        return if EXTRACT_PARAMS.keys.all?{|k| !conf.has_key?(k) } && !conf.has_key?('format')

        # TODO: warn obsolete parameters if these are deprecated
        hash = compat_parameters_copy_to_subsection_attributes(conf, EXTRACT_PARAMS)

        if conf.has_key?('time_as_epoch') && Fluent::Config.bool_value(conf['time_as_epoch'])
          hash['time_key'] ||= 'time'
          hash['time_type'] = 'unixtime'
        elsif conf.has_key?('format') && conf["format"].start_with?("/") && conf["format"].end_with?("/") # old-style regexp parser
          hash['time_key'] ||= 'time'
          hash['time_type'] ||= 'string'
        end
        if conf.has_key?('localtime') || conf.has_key?('utc')
          if conf.has_key?('localtime') && conf.has_key?('utc')
            raise Fluent::ConfigError, "both of utc and localtime are specified, use only one of them"
          elsif conf.has_key?('localtime')
            hash['localtime'] = Fluent::Config.bool_value(conf['localtime'])
          elsif conf.has_key?('utc')
            hash['localtime'] = !(Fluent::Config.bool_value(conf['utc']))
            # Specifying "localtime false" means using UTC in TimeFormatter
            # And specifying "utc" is different from specifying "timezone +0000"(it's not always UTC).
            # There are difference between "Z" and "+0000" in timezone formatting.
            # TODO: add kwargs to TimeFormatter to specify "using localtime", "using UTC" or "using specified timezone" in more explicit way
          end
        end

        e = Fluent::Config::Element.new('extract', '', hash, [])
        conf.elements << e

        conf
      end

      def compat_parameters_parser(conf)
        return unless conf.elements('parse').empty?
        return if PARSER_PARAMS.keys.all?{|k| !conf.has_key?(k) }

        # TODO: warn obsolete parameters if these are deprecated
        hash = compat_parameters_copy_to_subsection_attributes(conf, PARSER_PARAMS)

        if conf["format"]
          if conf["format"].start_with?("/") && conf["format"].end_with?("/")
            hash["@type"] = "regexp"
            hash["expression"] = conf["format"][1..-2]
          else
            hash["@type"] = conf["format"]
          end
        end

        if conf["types"]
          delimiter = conf["types_delimiter"] || ','
          label_delimiter = conf["types_label_delimiter"] || ':'
          types = {}
          conf['types'].split(delimiter).each do |pair|
            key, value = pair.split(label_delimiter, 2)
            types[key] = value
          end
          hash["types"] = JSON.dump(types)
        end
        if conf.has_key?('localtime') || conf.has_key?('utc')
          if conf.has_key?('localtime') && conf.has_key?('utc')
            raise Fluent::ConfigError, "both of utc and localtime are specified, use only one of them"
          elsif conf.has_key?('localtime')
            hash['localtime'] = Fluent::Config.bool_value(conf['localtime'])
          elsif conf.has_key?('utc')
            hash['localtime'] = !(Fluent::Config.bool_value(conf['utc']))
            # Specifying "localtime false" means using UTC in TimeFormatter
            # And specifying "utc" is different from specifying "timezone +0000"(it's not always UTC).
            # There are difference between "Z" and "+0000" in timezone formatting.
            # TODO: add kwargs to TimeFormatter to specify "using localtime", "using UTC" or "using specified timezone" in more explicit way
          end
        end

        e = Fluent::Config::Element.new('parse', '', hash, [])
        conf.elements << e

        conf
      end

      def compat_parameters_formatter(conf)
        return unless conf.elements('format').empty?
        return if FORMATTER_PARAMS.keys.all?{|k| !conf.has_key?(k) }

        # TODO: warn obsolete parameters if these are deprecated
        hash = compat_parameters_copy_to_subsection_attributes(conf, FORMATTER_PARAMS)

        if conf.has_key?('time_as_epoch') && Fluent::Config.bool_value(conf['time_as_epoch'])
          hash['time_type'] = 'unixtime'
        end

        e = Fluent::Config::Element.new('format', '', hash, [])
        conf.elements << e

        conf
      end

      def compat_parameters_copy_to_subsection_attributes(conf, params, &block)
        hash = {}
        params.each do |compat, current|
          next unless current
          if conf.has_key?(compat)
            if block_given?
              hash[current] = block.call(compat, conf[compat])
            else
              hash[current] = conf[compat]
            end
          end
        end
        hash
      end
    end
  end
end