fluent/fluentd

View on GitHub
lib/fluent/agent.rb

Summary

Maintainability
A
3 hrs
Test Coverage
#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'fluent/configurable'
require 'fluent/plugin'
require 'fluent/output'
require 'fluent/match'

module Fluent
  #
  # Agent is a resource unit who manages emittable plugins
  #
  # Next step: `fluentd/root_agent.rb`
  # Next step: `fluentd/label.rb`
  #
  class Agent
    include Configurable

    def initialize(log:)
      super()

      @context = nil
      @outputs = []
      @filters = []

      @lifecycle_control_list = nil
      # lifecycle_control_list is the list of plugins in this agent, and ordered
      # from plugins which DOES emit, then DOESN'T emit
      # (input -> output w/ router -> filter -> output w/o router)
      # for start: use this order DESC
      #   (because plugins which appears later in configurations will receive events from plugins which appears earlier)
      # for stop/before_shutdown/shutdown/after_shutdown/close/terminate: use this order ASC
      @lifecycle_cache = nil

      @log = log
      @event_router = EventRouter.new(NoMatchMatch.new(log), self)
      @error_collector = nil
    end

    attr_reader :log
    attr_reader :outputs
    attr_reader :filters
    attr_reader :context
    attr_reader :event_router
    attr_reader :error_collector

    def configure(conf)
      super

      # initialize <match> and <filter> elements
      conf.elements('filter', 'match').each { |e|
        if !Fluent::Engine.supervisor_mode && e.for_another_worker?
          next
        end
        pattern = e.arg.empty? ? '**' : e.arg
        type = e['@type']
        raise ConfigError, "Missing '@type' parameter on <#{e.name}> directive" unless type
        if e.name == 'filter'
          add_filter(type, pattern, e)
        else
          add_match(type, pattern, e)
        end
      }
    end

    def lifecycle_control_list
      return @lifecycle_control_list if @lifecycle_control_list

      lifecycle_control_list = {
        input: [],
        output_with_router: [],
        filter: [],
        output: [],
      }
      if self.respond_to?(:inputs)
        inputs.each do |i|
          lifecycle_control_list[:input] << i
        end
      end
      outputs.each do |o|
        if o.has_router?
          lifecycle_control_list[:output_with_router] << o
        else
          lifecycle_control_list[:output] << o
        end
      end
      filters.each do |f|
        lifecycle_control_list[:filter] << f
      end

      @lifecycle_control_list = lifecycle_control_list
    end

    def lifecycle(desc: false)
      kind_list = if desc
                    [:output, :filter, :output_with_router]
                  else
                    [:output_with_router, :filter, :output]
                  end
      kind_list.each do |kind|
        list = if desc
                 lifecycle_control_list[kind].reverse
               else
                 lifecycle_control_list[kind]
               end
        display_kind = (kind == :output_with_router ? :output : kind)
        list.each do |instance|
          yield instance, display_kind
        end
      end
    end

    def add_match(type, pattern, conf)
      log_type = conf.for_this_worker? ? :default : :worker0
      log.info log_type, "adding match#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type

      output = Plugin.new_output(type)
      output.context_router = @event_router
      output.configure(conf)
      @outputs << output
      if output.respond_to?(:outputs) && output.respond_to?(:multi_output?) && output.multi_output?
        # TODO: ruby 2.3 or later: replace `output.respond_to?(:multi_output?) && output.multi_output?` with output&.multi_output?
        outputs = if output.respond_to?(:static_outputs)
                    output.static_outputs
                  else
                    output.outputs
                  end
        @outputs.push(*outputs)
      end
      @event_router.add_rule(pattern, output)

      output
    end

    def add_filter(type, pattern, conf)
      log_type = conf.for_this_worker? ? :default : :worker0
      log.info log_type, "adding filter#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type

      filter = Plugin.new_filter(type)
      filter.context_router = @event_router
      filter.configure(conf)
      @filters << filter
      @event_router.add_rule(pattern, filter)

      filter
    end

    # For handling invalid record
    def emit_error_event(tag, time, record, error)
    end

    def handle_emits_error(tag, es, error)
    end
  end
end