fluent/fluentd

View on GitHub
lib/fluent/plugin_helper/parser.rb

Summary

Maintainability
C
1 day
Test Coverage
#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'fluent/plugin'
require 'fluent/plugin/parser'
require 'fluent/config/element'
require 'fluent/configurable'

module Fluent
  module PluginHelper
    module Parser
      def parser_create(usage: '', type: nil, conf: nil, default_type: nil)
        parser = @_parsers[usage]
        return parser if parser && !type && !conf

        type = if type
                 type
               elsif conf && conf.respond_to?(:[])
                 raise Fluent::ConfigError, "@type is required in <parse>" unless conf['@type']
                 conf['@type']
               elsif default_type
                 default_type
               else
                 raise ArgumentError, "BUG: both type and conf are not specified"
               end
        parser = Fluent::Plugin.new_parser(type, parent: self)
        config = case conf
                 when Fluent::Config::Element
                   conf
                 when Hash
                   # in code, programmer may use symbols as keys, but Element needs strings
                   conf = Hash[conf.map{|k,v| [k.to_s, v]}]
                   Fluent::Config::Element.new('parse', usage, conf, [])
                 when nil
                   Fluent::Config::Element.new('parse', usage, {}, [])
                 else
                   raise ArgumentError, "BUG: conf must be a Element, Hash (or unspecified), but '#{conf.class}'"
                 end
        parser.configure(config)
        if @_parsers_started
          parser.start
        end

        @_parsers[usage] = parser
        parser
      end

      module ParserParams
        include Fluent::Configurable
        # minimum section definition to instantiate parser plugin instances
        config_section :parse, required: false, multi: true, init: true, param_name: :parser_configs do
          config_argument :usage, :string, default: ''
          config_param    :@type, :string # config_set_default required for :@type
        end
      end

      def self.included(mod)
        mod.include ParserParams
      end

      attr_reader :_parsers # for tests

      def initialize
        super
        @_parsers_started = false
        @_parsers = {} # usage => parser
      end

      def configure(conf)
        super

        @parser_configs.each do |section|
          if @_parsers[section.usage]
            raise Fluent::ConfigError, "duplicated parsers configured: #{section.usage}"
          end
          parser = Plugin.new_parser(section[:@type], parent: self)
          parser.configure(section.corresponding_config_element)
          @_parsers[section.usage] = parser
        end
      end

      def start
        super
        @_parsers_started = true
        @_parsers.each_pair do |usage, parser|
          parser.start
        end
      end

      def parser_operate(method_name, &block)
        @_parsers.each_pair do |usage, parser|
          begin
            parser.__send__(method_name)
            block.call(parser) if block_given?
          rescue => e
            log.error "unexpected error while #{method_name}", usage: usage, parser: parser, error: e
          end
        end
      end

      def stop
        super
        parser_operate(:stop)
      end

      def before_shutdown
        parser_operate(:before_shutdown)
        super
      end

      def shutdown
        parser_operate(:shutdown)
        super
      end

      def after_shutdown
        parser_operate(:after_shutdown)
        super
      end

      def close
        parser_operate(:close)
        super
      end

      def terminate
        parser_operate(:terminate)
        @_parsers_started = false
        @_parsers = {}
        super
      end
    end
  end
end