fluent/fluentd

View on GitHub
lib/fluent/plugin_helper/formatter.rb

Summary

Maintainability
C
1 day
Test Coverage
#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'fluent/plugin'
require 'fluent/plugin/formatter'
require 'fluent/config/element'
require 'fluent/configurable'

module Fluent
  module PluginHelper
    module Formatter
      def formatter_create(usage: '', type: nil, conf: nil, default_type: nil)
        formatter = @_formatters[usage]
        return formatter if formatter && !type && !conf

        type = if type
                 type
               elsif conf && conf.respond_to?(:[])
                 raise Fluent::ConfigError, "@type is required in <format>" unless conf['@type']
                 conf['@type']
               elsif default_type
                 default_type
               else
                 raise ArgumentError, "BUG: both type and conf are not specified"
               end
        formatter = Fluent::Plugin.new_formatter(type, parent: self)
        config = case conf
                 when Fluent::Config::Element
                   conf
                 when Hash
                   # in code, programmer may use symbols as keys, but Element needs strings
                   conf = Hash[conf.map{|k,v| [k.to_s, v]}]
                   Fluent::Config::Element.new('format', usage, conf, [])
                 when nil
                   Fluent::Config::Element.new('format', usage, {}, [])
                 else
                   raise ArgumentError, "BUG: conf must be a Element, Hash (or unspecified), but '#{conf.class}'"
                 end
        formatter.configure(config)
        if @_formatters_started
          formatter.start
        end

        @_formatters[usage] = formatter
        formatter
      end

      module FormatterParams
        include Fluent::Configurable
        # minimum section definition to instantiate formatter plugin instances
        config_section :format, required: false, multi: true, init: true, param_name: :formatter_configs do
          config_argument :usage, :string, default: ''
          config_param    :@type, :string # config_set_default required for :@type
        end
      end

      def self.included(mod)
        mod.include FormatterParams
      end

      attr_reader :_formatters # for tests

      def initialize
        super
        @_formatters_started = false
        @_formatters = {} # usage => formatter
      end

      def configure(conf)
        super

        @formatter_configs.each do |section|
          if @_formatters[section.usage]
            raise Fluent::ConfigError, "duplicated formatter configured: #{section.usage}"
          end
          formatter = Plugin.new_formatter(section[:@type], parent: self)
          formatter.configure(section.corresponding_config_element)
          @_formatters[section.usage] = formatter
        end
      end

      def start
        super
        @_formatters_started = true
        @_formatters.each_pair do |usage, formatter|
          formatter.start
        end
      end

      def formatter_operate(method_name, &block)
        @_formatters.each_pair do |usage, formatter|
          begin
            formatter.__send__(method_name)
            block.call(formatter) if block_given?
          rescue => e
            log.error "unexpected error while #{method_name}", usage: usage, formatter: formatter, error: e
          end
        end
      end

      def stop
        super
        formatter_operate(:stop)
      end

      def before_shutdown
        formatter_operate(:before_shutdown)
        super
      end

      def shutdown
        formatter_operate(:shutdown)
        super
      end

      def after_shutdown
        formatter_operate(:after_shutdown)
        super
      end

      def close
        formatter_operate(:close)
        super
      end

      def terminate
        formatter_operate(:terminate)
        @_formatters_started = false
        @_formatters = {}
        super
      end
    end
  end
end