fluent/fluentd

View on GitHub
lib/fluent/root_agent.rb

Summary

Maintainability
F
4 days
Test Coverage
#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'delegate'

require 'fluent/config/error'
require 'fluent/agent'
require 'fluent/label'
require 'fluent/plugin'
require 'fluent/system_config'
require 'fluent/time'
require 'fluent/source_only_buffer_agent'

module Fluent
  #
  # Fluentd forms a tree structure to manage plugins:
  #
  #                      RootAgent
  #                          |
  #             +------------+-------------+-------------+
  #             |            |             |             |
  #          <label>      <source>      <filter>      <match>
  #             |
  #        +----+----+
  #        |         |
  #     <filter>   <match>
  #
  # Relation:
  # * RootAgent has many <label>, <source>, <filter> and <match>
  # * <label>   has many <match> and <filter>
  #
  # Next step: `fluentd/agent.rb`
  # Next step: 'fluentd/label.rb'
  #
  class RootAgent < Agent
    ERROR_LABEL = "@ERROR".freeze # @ERROR is built-in error label

    class SourceOnlyMode
      DISABLED = 0
      NORMAL = 1
      ONLY_ZERO_DOWNTIME_RESTART_READY = 2

      def initialize(with_source_only, start_in_parallel)
        if start_in_parallel
          @mode = ONLY_ZERO_DOWNTIME_RESTART_READY
        elsif with_source_only
          @mode = NORMAL
        else
          @mode = DISABLED
        end
      end

      def enabled?
        @mode != DISABLED
      end

      def only_zero_downtime_restart_ready?
        @mode == ONLY_ZERO_DOWNTIME_RESTART_READY
      end

      def disable!
        @mode = DISABLED
      end
    end

    def initialize(log:, system_config: SystemConfig.new, start_in_parallel: false)
      super(log: log)

      @labels = {}
      @inputs = []
      @suppress_emit_error_log_interval = 0
      @next_emit_error_log_time = nil
      @without_source = system_config.without_source || false
      @source_only_mode = SourceOnlyMode.new(system_config.with_source_only, start_in_parallel)
      @source_only_buffer_agent = nil
      @enable_input_metrics = system_config.enable_input_metrics || false

      suppress_interval(system_config.emit_error_log_interval) unless system_config.emit_error_log_interval.nil?
    end

    attr_reader :inputs
    attr_reader :labels

    def source_only_router
      raise "[BUG] 'RootAgent#source_only_router' should not be called when 'with_source_only' is false" unless @source_only_mode.enabled?
      @source_only_buffer_agent.event_router
    end

    def configure(conf)
      used_worker_ids = []
      available_worker_ids = (0..Fluent::Engine.system_config.workers - 1).to_a
      # initialize <worker> elements
      conf.elements(name: 'worker').each do |e|
        target_worker_id_str = e.arg
        if target_worker_id_str.empty?
          raise Fluent::ConfigError, "Missing worker id on <worker> directive"
        end

        target_worker_ids = target_worker_id_str.split("-")
        if target_worker_ids.size == 2
          first_worker_id = target_worker_ids.first.to_i
          last_worker_id = target_worker_ids.last.to_i
          if first_worker_id > last_worker_id
            raise Fluent::ConfigError, "greater first_worker_id<#{first_worker_id}> than last_worker_id<#{last_worker_id}> specified by <worker> directive is not allowed. Available multi worker assign syntax is <smaller_worker_id>-<greater_worker_id>"
          end
          target_worker_ids = []
          first_worker_id.step(last_worker_id, 1) do |worker_id|
            target_worker_id = worker_id.to_i
            target_worker_ids << target_worker_id

            if target_worker_id < 0 || target_worker_id > (Fluent::Engine.system_config.workers - 1)
              raise Fluent::ConfigError, "worker id #{target_worker_id} specified by <worker> directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}"
            end
            available_worker_ids.delete(target_worker_id) if available_worker_ids.include?(target_worker_id)
            if used_worker_ids.include?(target_worker_id)
              raise Fluent::ConfigError, "specified worker_id<#{worker_id}> collisions is detected on <worker> directive. Available worker id(s): #{available_worker_ids}"
            end
            used_worker_ids << target_worker_id

            e.elements.each do |elem|
              unless ['source', 'match', 'filter', 'label'].include?(elem.name)
                raise Fluent::ConfigError, "<worker> section cannot have <#{elem.name}> directive"
              end
            end

            unless target_worker_ids.empty?
              e.set_target_worker_ids(target_worker_ids.uniq)
            end
          end
        else
          target_worker_id = target_worker_id_str.to_i
          if target_worker_id < 0 || target_worker_id > (Fluent::Engine.system_config.workers - 1)
            raise Fluent::ConfigError, "worker id #{target_worker_id} specified by <worker> directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}"
          end

          e.elements.each do |elem|
            unless ['source', 'match', 'filter', 'label'].include?(elem.name)
              raise Fluent::ConfigError, "<worker> section cannot have <#{elem.name}> directive"
            end
            elem.set_target_worker_id(target_worker_id)
          end
        end
        conf += e
      end
      conf.elements.delete_if{|e| e.name == 'worker'}

      error_label_config = nil

      # initialize <label> elements before configuring all plugins to avoid 'label not found' in input, filter and output.
      label_configs = {}
      conf.elements(name: 'label').each { |e|
        if !Fluent::Engine.supervisor_mode && e.for_another_worker?
          next
        end
        name = e.arg
        raise ConfigError, "Missing symbol argument on <label> directive" if name.empty?
        raise ConfigError, "@ROOT for <label> is not permitted, reserved for getting root router" if name == '@ROOT'

        if name == ERROR_LABEL
          error_label_config = e
        else
          add_label(name)
          label_configs[name] = e
        end
      }
      # Call 'configure' here to avoid 'label not found'
      label_configs.each { |name, e| @labels[name].configure(e) }
      setup_error_label(error_label_config) if error_label_config

      super

      setup_source_only_buffer_agent if @source_only_mode.enabled?

      # initialize <source> elements
      if @without_source
        log.info :worker0, "'--without-source' is applied. Ignore <source> sections"
      else
        conf.elements(name: 'source').each { |e|
          if !Fluent::Engine.supervisor_mode && e.for_another_worker?
            next
          end
          type = e['@type']
          raise ConfigError, "Missing '@type' parameter on <source> directive" unless type
          add_source(type, e)
        }
      end
    end

    def setup_error_label(e)
      error_label = add_label(ERROR_LABEL)
      error_label.configure(e)
      @error_collector = error_label.event_router
    end

    def setup_source_only_buffer_agent(flush: false)
      @source_only_buffer_agent = SourceOnlyBufferAgent.new(log: log, system_config: Fluent::Engine.system_config)
      @source_only_buffer_agent.configure(flush: flush)
    end

    def cleanup_source_only_buffer_agent
      @source_only_buffer_agent&.cleanup
    end

    def lifecycle(desc: false, kind_callback: nil, kind_or_agent_list: nil)
      only_zero_downtime_restart_ready = false

      unless kind_or_agent_list
        if @source_only_mode.enabled?
          kind_or_agent_list = [:input, @source_only_buffer_agent]
          only_zero_downtime_restart_ready = @source_only_mode.only_zero_downtime_restart_ready?
        elsif @source_only_buffer_agent
          # source_only_buffer_agent can re-reroute events, so the priority is equal to output_with_router.
          kind_or_agent_list = [:input, :output_with_router, @source_only_buffer_agent, @labels.values, :filter, :output].flatten
        else
          kind_or_agent_list = [:input, :output_with_router, @labels.values, :filter, :output].flatten
        end

        kind_or_agent_list.reverse! if desc
      end

      kind_or_agent_list.each do |kind|
        if kind.respond_to?(:lifecycle)
          agent = kind
          agent.lifecycle(desc: desc) do |plugin, display_kind|
            yield plugin, display_kind
          end
        else
          list = if desc
                   lifecycle_control_list[kind].reverse
                 else
                   lifecycle_control_list[kind]
                 end
          display_kind = (kind == :output_with_router ? :output : kind)
          list.each do |instance|
            if only_zero_downtime_restart_ready
              next unless instance.respond_to?(:zero_downtime_restart_ready?) and instance.zero_downtime_restart_ready?
            end
            yield instance, display_kind
          end
        end
        if kind_callback
          kind_callback.call
        end
      end
    end

    def start(kind_or_agent_list: nil)
      lifecycle(desc: true, kind_or_agent_list: kind_or_agent_list) do |i| # instance
        i.start unless i.started?
        # Input#start sometimes emits lots of events with in_tail/`read_from_head true` case
        # and it causes deadlock for small buffer/queue output. To avoid such problem,
        # buffer related output threads should be run before `Input#start`.
        # This is why after_start should be called immediately after start call.
        # This depends on `desc: true` because calling plugin order of `desc: true` is
        # Output, Filter, Label, Output with Router, then Input.
        i.after_start unless i.after_started?
      end
    end

    def flush!
      log.info "flushing all buffer forcedly"
      flushing_threads = []
      lifecycle(desc: true) do |instance|
        if instance.respond_to?(:force_flush)
          t = Thread.new do
            Thread.current.abort_on_exception = true
            begin
              instance.force_flush
            rescue => e
              log.warn "unexpected error while flushing buffer", plugin: instance.class, plugin_id: instance.plugin_id, error: e
              log.warn_backtrace
            end
          end
          flushing_threads << t
        end
      end
      flushing_threads.each{|t| t.join }
    end

    def cancel_source_only!
      unless @source_only_mode.enabled?
        log.info "do nothing for canceling with-source-only because the current mode is not with-source-only."
        return
      end

      log.info "cancel with-source-only mode and start the other plugins"
      all_plugins = [:input, :output_with_router, @labels.values, :filter, :output].flatten.reverse
      start(kind_or_agent_list: all_plugins)

      lifecycle_control_list[:input].each(&:event_emitter_cancel_source_only)

      # Want to make sure that the source_only_router finishes all process before
      # shutting down the agent.
      # Strictly speaking, it would be necessary to have exclusive lock between
      # EventRouter and the shutting down process of this agent.
      # However, adding lock to EventRouter would worsen its performance, and
      # the entire shutting down process does not care about it either.
      # So, sleep here just in case.
      sleep 1

      shutdown(kind_or_agent_list: [@source_only_buffer_agent])
      @source_only_buffer_agent = nil

      # This agent can stop after flushing its all buffer, but it is not implemented for now.
      log.info "starts the loading agent for with-source-only"
      setup_source_only_buffer_agent(flush: true)
      start(kind_or_agent_list: [@source_only_buffer_agent])

      @source_only_mode.disable!
    end

    def shutdown(kind_or_agent_list: nil)
      # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, after_shutdown, close, terminate for plugins
      # These method callers does `rescue Exception` to call methods of shutdown sequence as far as possible
      # if plugin methods does something like infinite recursive call, `exit`, unregistering signal handlers or others.
      # Plugins should be separated and be in sandbox to protect data in each plugins/buffers.

      lifecycle_safe_sequence = ->(method, checker) {
        lifecycle(kind_or_agent_list: kind_or_agent_list) do |instance, kind|
          begin
            log.debug "calling #{method} on #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id
            instance.__send__(method) unless instance.__send__(checker)
          rescue Exception => e
            log.warn "unexpected error while calling #{method} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e
            log.warn_backtrace
          end
        end
      }

      lifecycle_unsafe_sequence = ->(method, checker) {
        operation = case method
                    when :shutdown then "shutting down"
                    when :close    then "closing"
                    else
                      raise "BUG: unknown method name '#{method}'"
                    end
        operation_threads = []
        callback = ->(){
          operation_threads.each{|t| t.join }
          operation_threads.clear
        }
        lifecycle(kind_callback: callback, kind_or_agent_list: kind_or_agent_list) do |instance, kind|
          t = Thread.new do
            Thread.current.abort_on_exception = true
            begin
              if method == :shutdown
                # To avoid Input#shutdown and Output#before_shutdown mismatch problem, combine before_shutdown and shutdown call in one sequence.
                # The problem is in_tail flushes buffered multiline in shutdown but output's flush_at_shutdown is invoked in before_shutdown
                operation = "preparing shutdown" # for logging
                log.debug "#{operation} #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id
                begin
                  instance.__send__(:before_shutdown) unless instance.__send__(:before_shutdown?)
                rescue Exception => e
                  log.warn "unexpected error while #{operation} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e
                  log.warn_backtrace
                end
                operation = "shutting down"
                log.info "#{operation} #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id
                instance.__send__(:shutdown) unless instance.__send__(:shutdown?)
              else
                log.debug "#{operation} #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id
                instance.__send__(method) unless instance.__send__(checker)
              end
            rescue Exception => e
              log.warn "unexpected error while #{operation} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e
              log.warn_backtrace
            end
          end
          operation_threads << t
        end
      }

      lifecycle_safe_sequence.call(:stop, :stopped?)

      # before_shutdown does force_flush for output plugins: it should block, so it's unsafe operation
      lifecycle_unsafe_sequence.call(:shutdown, :shutdown?)

      lifecycle_safe_sequence.call(:after_shutdown, :after_shutdown?)

      lifecycle_unsafe_sequence.call(:close, :closed?)

      lifecycle_safe_sequence.call(:terminate, :terminated?)

      cleanup_source_only_buffer_agent unless kind_or_agent_list
    end

    def suppress_interval(interval_time)
      @suppress_emit_error_log_interval = interval_time
      @next_emit_error_log_time = Time.now.to_i
    end

    def add_source(type, conf)
      log_type = conf.for_this_worker? ? :default : :worker0
      log.info log_type, "adding source", type: type

      input = Plugin.new_input(type)
      # <source> emits events to the top-level event router (RootAgent#event_router).
      # Input#configure overwrites event_router to a label's event_router if it has `@label` parameter.
      # See also 'fluentd/plugin/input.rb'
      input.context_router = @event_router
      input.configure(conf)
      input.event_emitter_apply_source_only if @source_only_mode.enabled?
      if @enable_input_metrics
        @event_router.add_metric_callbacks(input.plugin_id, Proc.new {|es| input.metric_callback(es) })
      end
      @inputs << input

      input
    end

    def add_label(name)
      label = Label.new(name, log: log)
      raise ConfigError, "Section <label #{name}> appears twice" if @labels[name]
      label.root_agent = self
      @labels[name] = label
    end

    def find_label(label_name)
      if label = @labels[label_name]
        label
      else
        raise ArgumentError, "#{label_name} label not found"
      end
    end

    def emit_error_event(tag, time, record, error)
      error_info = {error: error, location: (error.backtrace ? error.backtrace.first : nil), tag: tag, time: time}
      if @error_collector
        # A record is not included in the logs because <@ERROR> handles it. This warn is for the notification
        log.warn "send an error event to @ERROR:", error_info
        @error_collector.emit(tag, time, record)
      else
        error_info[:record] = record
        log.warn "dump an error event:", error_info
      end
    end

    def handle_emits_error(tag, es, error)
      error_info = {error: error, location: (error.backtrace ? error.backtrace.first : nil), tag: tag}
      if @error_collector
        log.warn "send an error event stream to @ERROR:", error_info
        @error_collector.emit_stream(tag, es)
      else
        now = Time.now.to_i
        if @suppress_emit_error_log_interval.zero? || now > @next_emit_error_log_time
          log.warn "emit transaction failed:", error_info
          log.warn_backtrace
          @next_emit_error_log_time = now + @suppress_emit_error_log_interval
        end
        raise error
      end
    end
  end
end