fluent/fluentd

View on GitHub
lib/fluent/plugin/output.rb

Summary

Maintainability
F
2 wks
Test Coverage
#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'fluent/env'
require 'fluent/error'
require 'fluent/plugin/base'
require 'fluent/plugin/buffer'
require 'fluent/plugin_helper/record_accessor'
require 'fluent/msgpack_factory'
require 'fluent/log'
require 'fluent/plugin_id'
require 'fluent/plugin_helper'
require 'fluent/timezone'
require 'fluent/unique_id'
require 'fluent/clock'
require 'fluent/ext_monitor_require'

require 'time'

module Fluent
  module Plugin
    class Output < Base
      include PluginId
      include PluginLoggerMixin
      include PluginHelper::Mixin
      include UniqueId::Mixin

      helpers_internal :thread, :retry_state, :metrics

      CHUNK_KEY_PATTERN = /^[-_.@a-zA-Z0-9]+$/
      CHUNK_KEY_PLACEHOLDER_PATTERN = /\$\{([-_.@$a-zA-Z0-9]+)\}/
      CHUNK_TAG_PLACEHOLDER_PATTERN = /\$\{(tag(?:\[-?\d+\])?)\}/
      CHUNK_ID_PLACEHOLDER_PATTERN = /\$\{chunk_id\}/

      CHUNKING_FIELD_WARN_NUM = 4

      config_param :time_as_integer, :bool, default: false
      desc 'The threshold to show slow flush logs'
      config_param :slow_flush_log_threshold, :float, default: 20.0

      # `<buffer>` and `<secondary>` sections are available only when '#format' and '#write' are implemented
      config_section :buffer, param_name: :buffer_config, init: true, required: false, multi: false, final: true do
        config_argument :chunk_keys, :array, value_type: :string, default: []
        config_param :@type, :string, default: 'memory', alias: :type

        config_param :timekey, :time, default: nil # range size to be used: `time.to_i / @timekey`
        config_param :timekey_wait, :time, default: 600
        # These are for #extract_placeholders
        config_param :timekey_use_utc, :bool, default: false # default is localtime
        config_param :timekey_zone, :string, default: Time.now.strftime('%z') # e.g., "-0700" or "Asia/Tokyo"

        desc 'If true, plugin will try to flush buffer just before shutdown.'
        config_param :flush_at_shutdown, :bool, default: nil # change default by buffer_plugin.persistent?

        desc 'How to enqueue chunks to be flushed. "interval" flushes per flush_interval, "immediate" flushes just after event arrival.'
        config_param :flush_mode, :enum, list: [:default, :lazy, :interval, :immediate], default: :default
        config_param :flush_interval, :time, default: 60, desc: 'The interval between buffer chunk flushes.'

        config_param :flush_thread_count, :integer, default: 1, desc: 'The number of threads to flush the buffer.'

        config_param :flush_thread_interval, :float, default: 1.0, desc: 'Seconds to sleep between checks for buffer flushes in flush threads.'
        config_param :flush_thread_burst_interval, :float, default: 1.0, desc: 'Seconds to sleep between flushes when many buffer chunks are queued.'

        config_param :delayed_commit_timeout, :time, default: 60, desc: 'Seconds of timeout for buffer chunks to be committed by plugins later.'

        config_param :overflow_action, :enum, list: [:throw_exception, :block, :drop_oldest_chunk], default: :throw_exception, desc: 'The action when the size of buffer exceeds the limit.'

        config_param :retry_forever, :bool, default: false, desc: 'If true, plugin will ignore retry_timeout and retry_max_times options and retry flushing forever.'
        config_param :retry_timeout, :time, default: 72 * 60 * 60, desc: 'The maximum seconds to retry to flush while failing, until plugin discards buffer chunks.'
        # 72hours == 17 times with exponential backoff (not to change default behavior)
        config_param :retry_max_times, :integer, default: nil, desc: 'The maximum number of times to retry to flush while failing.'

        config_param :retry_secondary_threshold, :float, default: 0.8, desc: 'ratio of retry_timeout to switch to use secondary while failing.'
        # exponential backoff sequence will be initialized at the time of this threshold

        desc 'How to wait next retry to flush buffer.'
        config_param :retry_type, :enum, list: [:exponential_backoff, :periodic], default: :exponential_backoff
        ### Periodic -> fixed :retry_wait
        ### Exponential backoff: k is number of retry times
        # c: constant factor, @retry_wait
        # b: base factor, @retry_exponential_backoff_base
        # k: times
        # total retry time: c + c * b^1 + (...) + c*b^k = c*b^(k+1) - 1
        config_param :retry_wait, :time, default: 1, desc: 'Seconds to wait before next retry to flush, or constant factor of exponential backoff.'
        config_param :retry_exponential_backoff_base, :float, default: 2, desc: 'The base number of exponential backoff for retries.'
        config_param :retry_max_interval, :time, default: nil, desc: 'The maximum interval seconds for exponential backoff between retries while failing.'

        config_param :retry_randomize, :bool, default: true, desc: 'If true, output plugin will retry after randomized interval not to do burst retries.'
      end

      config_section :secondary, param_name: :secondary_config, required: false, multi: false, final: true do
        config_param :@type, :string, default: nil, alias: :type
        config_section :buffer, required: false, multi: false do
          # dummy to detect invalid specification for here
        end
        config_section :secondary, required: false, multi: false do
          # dummy to detect invalid specification for here
        end
      end

      def process(tag, es)
        raise NotImplementedError, "BUG: output plugins MUST implement this method"
      end

      def write(chunk)
        raise NotImplementedError, "BUG: output plugins MUST implement this method"
      end

      def try_write(chunk)
        raise NotImplementedError, "BUG: output plugins MUST implement this method"
      end

      def format(tag, time, record)
        # standard msgpack_event_stream chunk will be used if this method is not implemented in plugin subclass
        raise NotImplementedError, "BUG: output plugins MUST implement this method"
      end

      def formatted_to_msgpack_binary?
        # To indicate custom format method (#format) returns msgpack binary or not.
        # If #format returns msgpack binary, override this method to return true.
        false
      end

      # Compatibility for existing plugins
      def formatted_to_msgpack_binary
        formatted_to_msgpack_binary?
      end

      def prefer_buffered_processing
        # override this method to return false only when all of these are true:
        #  * plugin has both implementation for buffered and non-buffered methods
        #  * plugin is expected to work as non-buffered plugin if no `<buffer>` sections specified
        true
      end

      def prefer_delayed_commit
        # override this method to decide which is used of `write` or `try_write` if both are implemented
        true
      end

      def multi_workers_ready?
        false
      end

      # Internal states
      FlushThreadState = Struct.new(:thread, :next_clock, :mutex, :cond_var)
      DequeuedChunkInfo = Struct.new(:chunk_id, :time, :timeout) do
        def expired?
          time + timeout < Time.now
        end
      end

      attr_reader :as_secondary, :delayed_commit, :delayed_commit_timeout, :timekey_zone

      # for tests
      attr_reader :buffer, :retry, :secondary, :chunk_keys, :chunk_key_accessors, :chunk_key_time, :chunk_key_tag
      attr_accessor :output_enqueue_thread_waiting, :dequeued_chunks, :dequeued_chunks_mutex
      # output_enqueue_thread_waiting: for test of output.rb itself
      attr_accessor :retry_for_error_chunk # if true, error flush will be retried even if under_plugin_development is true

      def num_errors
        @num_errors_metrics.get
      end

      def emit_count
        @emit_count_metrics.get
      end

      def emit_size
        @emit_size_metrics.get
      end

      def emit_records
        @emit_records_metrics.get
      end

      def write_count
        @write_count_metrics.get
      end

      def rollback_count
        @rollback_count_metrics.get
      end

      def initialize
        super
        @counter_mutex = Mutex.new
        @flush_thread_mutex = Mutex.new
        @buffering = false
        @delayed_commit = false
        @as_secondary = false
        @primary_instance = nil

        # TODO: well organized counters
        @num_errors_metrics = nil
        @emit_count_metrics = nil
        @emit_records_metrics = nil
        @emit_size_metrics = nil
        @write_count_metrics = nil
        @rollback_count_metrics = nil
        @flush_time_count_metrics = nil
        @slow_flush_count_metrics = nil
        @enable_size_metrics = false

        # How to process events is decided here at once, but it will be decided in delayed way on #configure & #start
        if implement?(:synchronous)
          if implement?(:buffered) || implement?(:delayed_commit)
            @buffering = nil # do #configure or #start to determine this for full-featured plugins
          else
            @buffering = false
          end
        else
          @buffering = true
        end
        @custom_format = implement?(:custom_format)
        @enable_msgpack_streamer = false # decided later

        @buffer = nil
        @secondary = nil
        @retry = nil
        @dequeued_chunks = nil
        @dequeued_chunks_mutex = nil
        @output_enqueue_thread = nil
        @output_flush_threads = nil
        @output_flush_thread_current_position = 0

        @simple_chunking = nil
        @chunk_keys = @chunk_key_accessors = @chunk_key_time = @chunk_key_tag = nil
        @flush_mode = nil
        @timekey_zone = nil

        @retry_for_error_chunk = false
      end

      def acts_as_secondary(primary)
        @as_secondary = true
        @primary_instance = primary
        @chunk_keys = @primary_instance.chunk_keys || []
        @chunk_key_tag = @primary_instance.chunk_key_tag || false
        if @primary_instance.chunk_key_time
          @chunk_key_time = @primary_instance.chunk_key_time
          @timekey_zone = @primary_instance.timekey_zone
          @output_time_formatter_cache = {}
        end
        self.context_router = primary.context_router

        singleton_class.module_eval do
          define_method(:commit_write){ |chunk_id| @primary_instance.commit_write(chunk_id, delayed: delayed_commit, secondary: true) }
          define_method(:rollback_write){ |chunk_id, update_retry: true| @primary_instance.rollback_write(chunk_id, update_retry) }
        end
      end

      def configure(conf)
        unless implement?(:synchronous) || implement?(:buffered) || implement?(:delayed_commit)
          raise "BUG: output plugin must implement some methods. see developer documents."
        end

        has_buffer_section = (conf.elements(name: 'buffer').size > 0)
        has_flush_interval = conf.has_key?('flush_interval')

        super

        @num_errors_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "num_errors", help_text: "Number of count num errors")
        @emit_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "emit_count", help_text: "Number of count emits")
        @emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "emit_records", help_text: "Number of emit records")
        @emit_size_metrics =  metrics_create(namespace: "fluentd", subsystem: "output", name: "emit_size", help_text: "Total size of emit events")
        @write_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "write_count", help_text: "Number of writing events")
        @rollback_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "rollback_count", help_text: "Number of rollbacking operations")
        @flush_time_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "flush_time_count", help_text: "Count of flush time")
        @slow_flush_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "slow_flush_count", help_text: "Count of slow flush occurred time(s)")

        if has_buffer_section
          unless implement?(:buffered) || implement?(:delayed_commit)
            raise Fluent::ConfigError, "<buffer> section is configured, but plugin '#{self.class}' doesn't support buffering"
          end
          @buffering = true
        else # no buffer sections
          if implement?(:synchronous)
            if !implement?(:buffered) && !implement?(:delayed_commit)
              if @as_secondary
                raise Fluent::ConfigError, "secondary plugin '#{self.class}' must support buffering, but doesn't."
              end
              @buffering = false
            else
              if @as_secondary
                # secondary plugin always works as buffered plugin without buffer instance
                @buffering = true
              else
                # @buffering.nil? shows that enabling buffering or not will be decided in lazy way in #start
                @buffering = nil
              end
            end
          else # buffered or delayed_commit is supported by `unless` of first line in this method
            @buffering = true
          end
        end
        # Enable to update record size metrics or not
        @enable_size_metrics = !!system_config.enable_size_metrics

        if @as_secondary
          if !@buffering && !@buffering.nil?
            raise Fluent::ConfigError, "secondary plugin '#{self.class}' must support buffering, but doesn't"
          end
        end

        if (@buffering || @buffering.nil?) && !@as_secondary
          # When @buffering.nil?, @buffer_config was initialized with default value for all parameters.
          # If so, this configuration MUST success.
          @chunk_keys = @buffer_config.chunk_keys.dup
          @chunk_key_time = !!@chunk_keys.delete('time')
          @chunk_key_tag = !!@chunk_keys.delete('tag')
          if @chunk_keys.any? { |key|
              begin
                k = Fluent::PluginHelper::RecordAccessor::Accessor.parse_parameter(key)
                if k.is_a?(String)
                  k !~ CHUNK_KEY_PATTERN
                else
                  if key.start_with?('$[')
                    raise Fluent::ConfigError, "in chunk_keys: bracket notation is not allowed"
                  else
                    false
                  end
                end
              rescue => e
                raise Fluent::ConfigError, "in chunk_keys: #{e.message}"
              end
            }
            raise Fluent::ConfigError, "chunk_keys specification includes invalid char"
          else
            @chunk_key_accessors = Hash[@chunk_keys.map { |key| [key.to_sym, Fluent::PluginHelper::RecordAccessor::Accessor.new(key)] }]
          end

          if @chunk_key_time
            raise Fluent::ConfigError, "<buffer ...> argument includes 'time', but timekey is not configured" unless @buffer_config.timekey
            Fluent::Timezone.validate!(@buffer_config.timekey_zone)
            @timekey_zone = @buffer_config.timekey_use_utc ? '+0000' : @buffer_config.timekey_zone
            @timekey = @buffer_config.timekey
            if @timekey <= 0
              raise Fluent::ConfigError, "timekey should be greater than 0. current timekey: #{@timekey}"
            end
            @timekey_use_utc = @buffer_config.timekey_use_utc
            @offset = Fluent::Timezone.utc_offset(@timekey_zone)
            @calculate_offset = @offset.respond_to?(:call) ? @offset : nil
            @output_time_formatter_cache = {}
          end

          if (@chunk_key_tag ? 1 : 0) + @chunk_keys.size >= CHUNKING_FIELD_WARN_NUM
            log.warn "many chunk keys specified, and it may cause too many chunks on your system."
          end

          # no chunk keys or only tags (chunking can be done without iterating event stream)
          @simple_chunking = !@chunk_key_time && @chunk_keys.empty?

          @flush_mode = @buffer_config.flush_mode
          if @flush_mode == :default
            if has_flush_interval
              log.info "'flush_interval' is configured at out side of <buffer>. 'flush_mode' is set to 'interval' to keep existing behaviour"
              @flush_mode = :interval
            else
              @flush_mode = (@chunk_key_time ? :lazy : :interval)
            end
          end

          buffer_type = @buffer_config[:@type]
          buffer_conf = conf.elements(name: 'buffer').first || Fluent::Config::Element.new('buffer', '', {}, [])
          @buffer = Plugin.new_buffer(buffer_type, parent: self)
          @buffer.configure(buffer_conf)
          keep_buffer_config_compat
          @buffer.enable_update_timekeys if @chunk_key_time

          @flush_at_shutdown = @buffer_config.flush_at_shutdown
          if @flush_at_shutdown.nil?
            @flush_at_shutdown = if @buffer.persistent?
                                   false
                                 else
                                   true # flush_at_shutdown is true in default for on-memory buffer
                                 end
          elsif !@flush_at_shutdown && !@buffer.persistent?
            buf_type = Plugin.lookup_type_from_class(@buffer.class)
            log.warn "'flush_at_shutdown' is false, and buffer plugin '#{buf_type}' is not persistent buffer."
            log.warn "your configuration will lose buffered data at shutdown. please confirm your configuration again."
          end

          if (@flush_mode != :interval) && buffer_conf.has_key?('flush_interval')
            if buffer_conf.has_key?('flush_mode')
              raise Fluent::ConfigError, "'flush_interval' can't be specified when 'flush_mode' is not 'interval' explicitly: '#{@flush_mode}'"
            else
              log.warn "'flush_interval' is ignored because default 'flush_mode' is not 'interval': '#{@flush_mode}'"
            end
          end

          if @buffer.queued_chunks_limit_size.nil?
            @buffer.queued_chunks_limit_size = @buffer_config.flush_thread_count
          end
        end

        if @secondary_config
          raise Fluent::ConfigError, "Invalid <secondary> section for non-buffered plugin" unless @buffering
          raise Fluent::ConfigError, "<secondary> section cannot have <buffer> section" if @secondary_config.buffer
          raise Fluent::ConfigError, "<secondary> section cannot have <secondary> section" if @secondary_config.secondary
          if @buffer_config.retry_forever
            log.warn "<secondary> with 'retry_forever', only unrecoverable errors are moved to secondary"
          end

          secondary_type = @secondary_config[:@type]
          unless secondary_type
            secondary_type = conf['@type'] # primary plugin type
          end
          secondary_conf = conf.elements(name: 'secondary').first
          @secondary = Plugin.new_output(secondary_type)
          unless @secondary.respond_to?(:acts_as_secondary)
            raise Fluent::ConfigError, "Failed to setup secondary plugin in '#{conf['@type']}'. '#{secondary_type}' plugin in not allowed due to non buffered output"
          end
          @secondary.acts_as_secondary(self)
          @secondary.configure(secondary_conf)
          if (@secondary.class.to_s != "Fluent::Plugin::SecondaryFileOutput") &&
             (self.class != @secondary.class) &&
             (@custom_format || @secondary.implement?(:custom_format))
            log.warn "Use different plugin for secondary. Check the plugin works with primary like secondary_file", primary: self.class.to_s, secondary: @secondary.class.to_s
          end
        else
          @secondary = nil
        end

        self
      end

      def keep_buffer_config_compat
        # Need this to call `@buffer_config.disable_chunk_backup` just as before,
        # since some plugins may use this option in this way.
        @buffer_config[:disable_chunk_backup] = @buffer.disable_chunk_backup
      end

      def start
        super

        if @buffering.nil?
          @buffering = prefer_buffered_processing
          if !@buffering && @buffer
            @buffer.terminate # it's not started, so terminate will be enough
            # At here, this plugin works as non-buffered plugin.
            # Un-assign @buffer not to show buffering metrics (e.g., in_monitor_agent)
            @buffer = nil
          end
        end

        if @buffering
          m = method(:emit_buffered)
          singleton_class.module_eval do
            define_method(:emit_events, m)
          end

          @custom_format = implement?(:custom_format)
          @enable_msgpack_streamer = @custom_format ? formatted_to_msgpack_binary : true
          @delayed_commit = if implement?(:buffered) && implement?(:delayed_commit)
                              prefer_delayed_commit
                            else
                              implement?(:delayed_commit)
                            end
          @delayed_commit_timeout = @buffer_config.delayed_commit_timeout
        else # !@buffering
          m = method(:emit_sync)
          singleton_class.module_eval do
            define_method(:emit_events, m)
          end
        end

        if @buffering && !@as_secondary
          @retry = nil
          @retry_mutex = Mutex.new

          @buffer.start

          @output_enqueue_thread = nil
          @output_enqueue_thread_running = true

          @output_flush_threads = []
          @output_flush_threads_mutex = Mutex.new
          @output_flush_threads_running = true

          # mainly for test: detect enqueue works as code below:
          #   @output.interrupt_flushes
          #   # emits
          #   @output.enqueue_thread_wait
          @output_flush_interrupted = false
          @output_enqueue_thread_mutex = Mutex.new
          @output_enqueue_thread_waiting = false

          @dequeued_chunks = []
          @dequeued_chunks_mutex = Mutex.new

          @output_flush_thread_current_position = 0
          @buffer_config.flush_thread_count.times do |i|
            thread_title = "flush_thread_#{i}".to_sym
            thread_state = FlushThreadState.new(nil, nil, Mutex.new, ConditionVariable.new)
            thread = thread_create(thread_title) do
              flush_thread_run(thread_state)
            end
            thread_state.thread = thread
            @output_flush_threads_mutex.synchronize do
              @output_flush_threads << thread_state
            end
          end

          if !@under_plugin_development && (@flush_mode == :interval || @chunk_key_time)
            @output_enqueue_thread = thread_create(:enqueue_thread, &method(:enqueue_thread_run))
          end
        end
        @secondary.start if @secondary
      end

      def after_start
        super
        @secondary.after_start if @secondary
      end

      def stop
        @secondary.stop if @secondary
        @buffer.stop if @buffering && @buffer

        super
      end

      def before_shutdown
        @secondary.before_shutdown if @secondary

        if @buffering && @buffer
          if @flush_at_shutdown
            force_flush
          end
          @buffer.before_shutdown
          # Need to ensure to stop enqueueing ... after #shutdown, we cannot write any data
          @output_enqueue_thread_running = false
          if @output_enqueue_thread && @output_enqueue_thread.alive?
            @output_enqueue_thread.wakeup
            @output_enqueue_thread.join
          end
        end

        super
      end

      def shutdown
        @secondary.shutdown if @secondary
        @buffer.shutdown if @buffering && @buffer

        super
      end

      def after_shutdown
        try_rollback_all if @buffering && !@as_secondary # rollback regardless with @delayed_commit, because secondary may do it
        @secondary.after_shutdown if @secondary

        if @buffering && @buffer
          @buffer.after_shutdown

          @output_flush_threads_running = false
          if @output_flush_threads && !@output_flush_threads.empty?
            @output_flush_threads.each do |state|
              # to wakeup thread and make it to stop by itself
              state.mutex.synchronize {
                if state.thread && state.thread.status
                  state.next_clock = 0
                  state.cond_var.signal
                end
              }
              Thread.pass
              state.thread.join
            end
          end
        end

        super
      end

      def close
        @buffer.close if @buffering && @buffer
        @secondary.close if @secondary

        super
      end

      def terminate
        @buffer.terminate if @buffering && @buffer
        @secondary.terminate if @secondary

        super
      end

      def actual_flush_thread_count
        return 0 unless @buffering
        return @buffer_config.flush_thread_count unless @as_secondary
        @primary_instance.buffer_config.flush_thread_count
      end

      # Ensures `path` (filename or filepath) processable
      # only by the current thread in the current process.
      # For multiple workers, the lock is shared if `path` is the same value.
      # For multiple threads, the lock is shared by all threads in the same process.
      def synchronize_path(path)
        synchronize_path_in_workers(path) do
          synchronize_in_threads do
            yield
          end
        end
      end

      def synchronize_path_in_workers(path)
        need_worker_lock = system_config.workers > 1
        if need_worker_lock
          acquire_worker_lock(path) { yield }
        else
          yield
        end
      end

      def synchronize_in_threads
        need_thread_lock = actual_flush_thread_count > 1
        if need_thread_lock
          @flush_thread_mutex.synchronize { yield }
        else
          yield
        end
      end

      def support_in_v12_style?(feature)
        # for plugins written in v0.12 styles
        case feature
        when :synchronous    then false
        when :buffered       then false
        when :delayed_commit then false
        when :custom_format  then false
        else
          raise ArgumentError, "unknown feature: #{feature}"
        end
      end

      def implement?(feature)
        methods_of_plugin = self.class.instance_methods(false)
        case feature
        when :synchronous    then methods_of_plugin.include?(:process) || support_in_v12_style?(:synchronous)
        when :buffered       then methods_of_plugin.include?(:write) || support_in_v12_style?(:buffered)
        when :delayed_commit then methods_of_plugin.include?(:try_write)
        when :custom_format  then methods_of_plugin.include?(:format) || support_in_v12_style?(:custom_format)
        else
          raise ArgumentError, "Unknown feature for output plugin: #{feature}"
        end
      end

      def placeholder_validate!(name, str)
        placeholder_validators(name, str).each do |v|
          v.validate!
        end
      end

      def placeholder_validators(name, str, time_key = (@chunk_key_time && @buffer_config.timekey), tag_key = @chunk_key_tag, chunk_keys = @chunk_keys)
        validators = []

        sec, title, example = get_placeholders_time(str)
        if sec || time_key
          validators << PlaceholderValidator.new(name, str, :time, {sec: sec, title: title, example: example, timekey: time_key})
        end

        parts = get_placeholders_tag(str)
        if tag_key || !parts.empty?
          validators << PlaceholderValidator.new(name, str, :tag, {parts: parts, tagkey: tag_key})
        end

        keys = get_placeholders_keys(str)
        if chunk_keys && !chunk_keys.empty? || !keys.empty?
          validators << PlaceholderValidator.new(name, str, :keys, {keys: keys, chunkkeys: chunk_keys})
        end

        validators
      end

      class PlaceholderValidator
        attr_reader :name, :string, :type, :argument

        def initialize(name, str, type, arg)
          @name = name
          @string = str
          @type = type
          raise ArgumentError, "invalid type:#{type}" if @type != :time && @type != :tag && @type != :keys
          @argument = arg
        end

        def time?
          @type == :time
        end

        def tag?
          @type == :tag
        end

        def keys?
          @type == :keys
        end

        def validate!
          case @type
          when :time then validate_time!
          when :tag  then validate_tag!
          when :keys then validate_keys!
          end
        end

        def validate_time!
          sec = @argument[:sec]
          title = @argument[:title]
          example = @argument[:example]
          timekey = @argument[:timekey]
          if !sec && timekey
            raise Fluent::ConfigError, "Parameter '#{name}: #{string}' doesn't have timestamp placeholders for timekey #{timekey.to_i}"
          end
          if sec && !timekey
            raise Fluent::ConfigError, "Parameter '#{name}: #{string}' has timestamp placeholders, but chunk key 'time' is not configured"
          end
          if sec && timekey && timekey < sec
            raise Fluent::ConfigError, "Parameter '#{name}: #{string}' doesn't have timestamp placeholder for #{title}('#{example}') for timekey #{timekey.to_i}"
          end
        end

        def validate_tag!
          parts = @argument[:parts]
          tagkey = @argument[:tagkey]
          if tagkey && parts.empty?
            raise Fluent::ConfigError, "Parameter '#{name}: #{string}' doesn't have tag placeholder"
          end
          if !tagkey && !parts.empty?
            raise Fluent::ConfigError, "Parameter '#{name}: #{string}' has tag placeholders, but chunk key 'tag' is not configured"
          end
        end

        def validate_keys!
          keys = @argument[:keys]
          chunk_keys = @argument[:chunkkeys]
          if (chunk_keys - keys).size > 0
            not_specified = (chunk_keys - keys).sort
            raise Fluent::ConfigError, "Parameter '#{name}: #{string}' doesn't have enough placeholders for keys #{not_specified.join(',')}"
          end
          if (keys - chunk_keys).size > 0
            not_satisfied = (keys - chunk_keys).sort
            raise Fluent::ConfigError, "Parameter '#{name}: #{string}' has placeholders, but chunk keys doesn't have keys #{not_satisfied.join(',')}"
          end
        end
      end

      TIME_KEY_PLACEHOLDER_THRESHOLDS = [
        [1, :second, '%S'],
        [60, :minute, '%M'],
        [3600, :hour, '%H'],
        [86400, :day, '%d'],
      ]
      TIMESTAMP_CHECK_BASE_TIME = Time.parse("2016-01-01 00:00:00 UTC")
      # it's not validated to use timekey larger than 1 day
      def get_placeholders_time(str)
        base_str = TIMESTAMP_CHECK_BASE_TIME.strftime(str)
        TIME_KEY_PLACEHOLDER_THRESHOLDS.each do |triple|
          sec = triple.first
          return triple if (TIMESTAMP_CHECK_BASE_TIME + sec).strftime(str) != base_str
        end
        nil
      end

      # -1 means whole tag
      def get_placeholders_tag(str)
        # [["tag"],["tag[0]"]]
        parts = []
        str.scan(CHUNK_TAG_PLACEHOLDER_PATTERN).map(&:first).each do |ph|
          if ph == "tag"
            parts << -1
          elsif ph =~ /^tag\[(-?\d+)\]$/
            parts << $1.to_i
          end
        end
        parts.sort
      end

      def get_placeholders_keys(str)
        str.scan(CHUNK_KEY_PLACEHOLDER_PATTERN).map(&:first).reject{|s| (s == "tag") || (s == 'chunk_id') }.sort
      end

      # TODO: optimize this code
      def extract_placeholders(str, chunk)
        metadata = if chunk.is_a?(Fluent::Plugin::Buffer::Chunk)
                     chunk_passed = true
                     chunk.metadata
                   else
                     chunk_passed = false
                     # For existing plugins. Old plugin passes Chunk.metadata instead of Chunk
                     chunk
                   end
        if metadata.empty?
          str.sub(CHUNK_ID_PLACEHOLDER_PATTERN) {
            if chunk_passed
              dump_unique_id_hex(chunk.unique_id)
            else
              log.warn "${chunk_id} is not allowed in this plugin. Pass Chunk instead of metadata in extract_placeholders's 2nd argument"
            end
          }
        else
          rvalue = str.dup
          # strftime formatting
          if @chunk_key_time # this section MUST be earlier than rest to use raw 'str'
            @output_time_formatter_cache[str] ||= Fluent::Timezone.formatter(@timekey_zone, str)
            rvalue = @output_time_formatter_cache[str].call(metadata.timekey)
          end
          # ${tag}, ${tag[0]}, ${tag[1]}, ... , ${tag[-2]}, ${tag[-1]}
          if @chunk_key_tag
            if str.include?('${tag}')
              rvalue = rvalue.gsub('${tag}', metadata.tag)
            end
            if CHUNK_TAG_PLACEHOLDER_PATTERN.match?(str)
              hash = {}
              tag_parts = metadata.tag.split('.')
              tag_parts.each_with_index do |part, i|
                hash["${tag[#{i}]}"] = part
                hash["${tag[#{i-tag_parts.size}]}"] = part
              end
              rvalue = rvalue.gsub(CHUNK_TAG_PLACEHOLDER_PATTERN, hash)
            end
            if rvalue =~ CHUNK_TAG_PLACEHOLDER_PATTERN
              log.warn "tag placeholder '#{$1}' not replaced. tag:#{metadata.tag}, template:#{str}"
            end
          end

          # First we replace ${chunk_id} with chunk.unique_id (hexlified).
          rvalue = rvalue.sub(CHUNK_ID_PLACEHOLDER_PATTERN) {
            if chunk_passed
              dump_unique_id_hex(chunk.unique_id)
            else
              log.warn "${chunk_id} is not allowed in this plugin. Pass Chunk instead of metadata in extract_placeholders's 2nd argument"
            end
          }

          # Then, replace other ${chunk_key}s.
          if !@chunk_keys.empty? && metadata.variables
            hash = {'${tag}' => '${tag}'} # not to erase this wrongly
            @chunk_keys.each do |key|
              hash["${#{key}}"] = metadata.variables[key.to_sym]
            end

            rvalue = rvalue.gsub(CHUNK_KEY_PLACEHOLDER_PATTERN) do |matched|
              hash.fetch(matched) do
                log.warn "chunk key placeholder '#{matched[2..-2]}' not replaced. template:#{str}"
                ''
              end
            end
          end

          if rvalue =~ CHUNK_KEY_PLACEHOLDER_PATTERN
            log.warn "chunk key placeholder '#{$1}' not replaced. template:#{str}"
          end

          rvalue
        end
      end

      def emit_events(tag, es)
        # actually this method will be overwritten by #configure
        if @buffering
          emit_buffered(tag, es)
        else
          emit_sync(tag, es)
        end
      end

      def emit_sync(tag, es)
        @emit_count_metrics.inc
        begin
          process(tag, es)
          @emit_records_metrics.add(es.size)
          @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
        rescue
          @num_errors_metrics.inc
          raise
        end
      end

      def emit_buffered(tag, es)
        @emit_count_metrics.inc
        begin
          execute_chunking(tag, es, enqueue: (@flush_mode == :immediate))
          if !@retry && @buffer.queued?(nil, optimistic: true)
            submit_flush_once
          end
        rescue
          # TODO: separate number of errors into emit errors and write/flush errors
          @num_errors_metrics.inc
          raise
        end
      end

      # TODO: optimize this code
      def metadata(tag, time, record)
        # this arguments are ordered in output plugin's rule
        # Metadata 's argument order is different from this one (timekey, tag, variables)

        raise ArgumentError, "tag must be a String: #{tag.class}" unless tag.nil? || tag.is_a?(String)
        raise ArgumentError, "time must be a Fluent::EventTime (or Integer): #{time.class}" unless time.nil? || time.is_a?(Fluent::EventTime) || time.is_a?(Integer)
        raise ArgumentError, "record must be a Hash: #{record.class}" unless record.nil? || record.is_a?(Hash)

        if @chunk_keys.nil? && @chunk_key_time.nil? && @chunk_key_tag.nil?
          # for tests
          return Struct.new(:timekey, :tag, :variables).new
        end

        # timekey is int from epoch, and `timekey - timekey % 60` is assumed to mach with 0s of each minutes.
        # it's wrong if timezone is configured as one which supports leap second, but it's very rare and
        # we can ignore it (especially in production systems).
        if @chunk_keys.empty?
          if !@chunk_key_time && !@chunk_key_tag
            @buffer.metadata()
          elsif @chunk_key_time && @chunk_key_tag
            timekey = calculate_timekey(time)
            @buffer.metadata(timekey: timekey, tag: tag)
          elsif @chunk_key_time
            timekey = calculate_timekey(time)
            @buffer.metadata(timekey: timekey)
          else
            @buffer.metadata(tag: tag)
          end
        else
          timekey = if @chunk_key_time
                      calculate_timekey(time)
                    else
                      nil
                    end
          pairs = Hash[@chunk_key_accessors.map { |k, a| [k, a.call(record)] }]
          @buffer.metadata(timekey: timekey, tag: (@chunk_key_tag ? tag : nil), variables: pairs)
        end
      end

      def calculate_timekey(time)
        time_int = time.to_i
        if @timekey_use_utc
          (time_int - (time_int % @timekey)).to_i
        else
          offset = @calculate_offset ? @calculate_offset.call(time) : @offset
          (time_int - ((time_int + offset)% @timekey)).to_i
        end
      end

      def chunk_for_test(tag, time, record)
        require 'fluent/plugin/buffer/memory_chunk'

        m = metadata(tag, time, record)
        Fluent::Plugin::Buffer::MemoryChunk.new(m)
      end

      def execute_chunking(tag, es, enqueue: false)
        if @simple_chunking
          handle_stream_simple(tag, es, enqueue: enqueue)
        elsif @custom_format
          handle_stream_with_custom_format(tag, es, enqueue: enqueue)
        else
          handle_stream_with_standard_format(tag, es, enqueue: enqueue)
        end
      end

      def write_guard(&block)
        begin
          block.call
        rescue Fluent::Plugin::Buffer::BufferOverflowError
          log.warn "failed to write data into buffer by buffer overflow", action: @buffer_config.overflow_action
          case @buffer_config.overflow_action
          when :throw_exception
            raise
          when :block
            log.debug "buffer.write is now blocking"
            until @buffer.storable?
              if self.stopped?
                log.error "breaking block behavior to shutdown Fluentd"
                # to break infinite loop to exit Fluentd process
                raise
              end
              log.trace "sleeping until buffer can store more data"
              sleep 1
            end
            log.debug "retrying buffer.write after blocked operation"
            retry
          when :drop_oldest_chunk
            begin
              oldest = @buffer.dequeue_chunk
              if oldest
                log.warn "dropping oldest chunk to make space after buffer overflow", chunk_id: dump_unique_id_hex(oldest.unique_id)
                @buffer.purge_chunk(oldest.unique_id)
              else
                log.error "no queued chunks to be dropped for drop_oldest_chunk"
              end
            rescue
              # ignore any errors
            end
            raise unless @buffer.storable?
            retry
          else
            raise "BUG: unknown overflow_action '#{@buffer_config.overflow_action}'"
          end
        end
      end

      FORMAT_MSGPACK_STREAM = ->(e){ e.to_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
      FORMAT_COMPRESSED_MSGPACK_STREAM = ->(e){ e.to_compressed_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
      FORMAT_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
      FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_compressed_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }

      def generate_format_proc
        if @buffer && @buffer.compress == :gzip
          @time_as_integer ? FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT : FORMAT_COMPRESSED_MSGPACK_STREAM
        else
          @time_as_integer ? FORMAT_MSGPACK_STREAM_TIME_INT : FORMAT_MSGPACK_STREAM
        end
      end

      # metadata_and_data is a Hash of:
      #  (standard format) metadata => event stream
      #  (custom format)   metadata => array of formatted event
      # For standard format, formatting should be done for whole event stream, but
      #   "whole event stream" may be a split of "es" here when it's bigger than chunk_limit_size.
      #   `@buffer.write` will do this splitting.
      # For custom format, formatting will be done here. Custom formatting always requires
      #   iteration of event stream, and it should be done just once even if total event stream size
      #   is bigger than chunk_limit_size because of performance.
      def handle_stream_with_custom_format(tag, es, enqueue: false)
        meta_and_data = {}
        records = 0
        es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record|
          meta = metadata(tag, time, record)
          meta_and_data[meta] ||= []
          res = format(tag, time, record)
          if res
            meta_and_data[meta] << res
            records += 1
          end
        end
        write_guard do
          @buffer.write(meta_and_data, enqueue: enqueue)
        end
        @emit_records_metrics.add(es.size)
        @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
        true
      end

      def handle_stream_with_standard_format(tag, es, enqueue: false)
        format_proc = generate_format_proc
        meta_and_data = {}
        records = 0
        es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record|
          meta = metadata(tag, time, record)
          meta_and_data[meta] ||= MultiEventStream.new
          meta_and_data[meta].add(time, record)
          records += 1
        end
        write_guard do
          @buffer.write(meta_and_data, format: format_proc, enqueue: enqueue)
        end
        @emit_records_metrics.add(es.size)
        @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
        true
      end

      def handle_stream_simple(tag, es, enqueue: false)
        format_proc = nil
        meta = metadata((@chunk_key_tag ? tag : nil), nil, nil)
        records = es.size
        if @custom_format
          records = 0
          data = []
          es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record|
            res = format(tag, time, record)
            if res
              data << res
              records += 1
            end
          end
        else
          format_proc = generate_format_proc
          data = es
        end
        write_guard do
          @buffer.write({meta => data}, format: format_proc, enqueue: enqueue)
        end
        @emit_records_metrics.add(es.size)
        @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
        true
      end

      def commit_write(chunk_id, delayed: @delayed_commit, secondary: false)
        log.on_trace { log.trace "committing write operation to a chunk", chunk: dump_unique_id_hex(chunk_id), delayed: delayed }

        if delayed
          @dequeued_chunks_mutex.synchronize do
            @dequeued_chunks.delete_if{ |info| info.chunk_id == chunk_id }
          end
        end
        @buffer.purge_chunk(chunk_id)

        @retry_mutex.synchronize do
          if @retry # success to flush chunks in retries
            if secondary
              log.warn "retry succeeded by secondary.", chunk_id: dump_unique_id_hex(chunk_id)
            else
              log.warn "retry succeeded.", chunk_id: dump_unique_id_hex(chunk_id)
            end
            @retry = nil
          end
        end
      end

      # update_retry parameter is for preventing busy loop by async write
      # We will remove this parameter by re-design retry_state management between threads.
      def rollback_write(chunk_id, update_retry: true)
        # This API is to rollback chunks explicitly from plugins.
        # 3rd party plugins can depend it on automatic rollback of #try_rollback_write
        @dequeued_chunks_mutex.synchronize do
          @dequeued_chunks.delete_if{ |info| info.chunk_id == chunk_id }
        end
        # returns true if chunk was rollbacked as expected
        #         false if chunk was already flushed and couldn't be rollbacked unexpectedly
        # in many cases, false can be just ignored
        if @buffer.takeback_chunk(chunk_id)
          @rollback_count_metrics.inc
          if update_retry
            primary = @as_secondary ? @primary_instance : self
            primary.update_retry_state(chunk_id, @as_secondary)
          end
          true
        else
          false
        end
      end

      def try_rollback_write
        @dequeued_chunks_mutex.synchronize do
          while @dequeued_chunks.first && @dequeued_chunks.first.expired?
            info = @dequeued_chunks.shift
            if @buffer.takeback_chunk(info.chunk_id)
              @rollback_count_metrics.inc
              log.warn "failed to flush the buffer chunk, timeout to commit.", chunk_id: dump_unique_id_hex(info.chunk_id), flushed_at: info.time
              primary = @as_secondary ? @primary_instance : self
              primary.update_retry_state(info.chunk_id, @as_secondary)
            end
          end
        end
      end

      def try_rollback_all
        return unless @dequeued_chunks
        @dequeued_chunks_mutex.synchronize do
          until @dequeued_chunks.empty?
            info = @dequeued_chunks.shift
            if @buffer.takeback_chunk(info.chunk_id)
              @rollback_count_metrics.inc
              log.info "delayed commit for buffer chunks was cancelled in shutdown", chunk_id: dump_unique_id_hex(info.chunk_id)
              primary = @as_secondary ? @primary_instance : self
              primary.update_retry_state(info.chunk_id, @as_secondary)
            end
          end
        end
      end

      def next_flush_time
        if @buffer.queued?
          @retry_mutex.synchronize do
            @retry ? @retry.next_time : Time.now + @buffer_config.flush_thread_burst_interval
          end
        else
          Time.now + @buffer_config.flush_thread_interval
        end
      end

      UNRECOVERABLE_ERRORS = [Fluent::UnrecoverableError, TypeError, ArgumentError, NoMethodError, MessagePack::UnpackError, EncodingError]

      def try_flush
        chunk = @buffer.dequeue_chunk
        return unless chunk

        log.on_trace { log.trace "trying flush for a chunk", chunk: dump_unique_id_hex(chunk.unique_id) }

        output = self
        using_secondary = false
        if @retry_mutex.synchronize{ @retry && @retry.secondary? }
          output = @secondary
          using_secondary = true
        end

        if @enable_msgpack_streamer
          chunk.extend ChunkMessagePackEventStreamer
        end

        begin
          chunk_write_start = Fluent::Clock.now

          if output.delayed_commit
            log.trace "executing delayed write and commit", chunk: dump_unique_id_hex(chunk.unique_id)
            @write_count_metrics.inc
            @dequeued_chunks_mutex.synchronize do
              # delayed_commit_timeout for secondary is configured in <buffer> of primary (<secondary> don't get <buffer>)
              @dequeued_chunks << DequeuedChunkInfo.new(chunk.unique_id, Time.now, self.delayed_commit_timeout)
            end

            output.try_write(chunk)
            check_slow_flush(chunk_write_start)
          else # output plugin without delayed purge
            chunk_id = chunk.unique_id
            dump_chunk_id = dump_unique_id_hex(chunk_id)
            log.trace "adding write count", instance: self.object_id
            @write_count_metrics.inc
            log.trace "executing sync write", chunk: dump_chunk_id

            output.write(chunk)
            check_slow_flush(chunk_write_start)

            log.trace "write operation done, committing", chunk: dump_chunk_id
            commit_write(chunk_id, delayed: false, secondary: using_secondary)
            log.trace "done to commit a chunk", chunk: dump_chunk_id
          end
        rescue *UNRECOVERABLE_ERRORS => e
          if @secondary
            if using_secondary
              log.warn "got unrecoverable error in secondary.", error: e
              log.warn_backtrace
              backup_chunk(chunk, using_secondary, output.delayed_commit)
            else
              if (self.class == @secondary.class)
                log.warn "got unrecoverable error in primary and secondary type is same as primary. Skip secondary", error: e
                log.warn_backtrace
                backup_chunk(chunk, using_secondary, output.delayed_commit)
              else
                # Call secondary output directly without retry update.
                # In this case, delayed commit causes inconsistent state in dequeued chunks so async output in secondary is not allowed for now.
                if @secondary.delayed_commit
                  log.warn "got unrecoverable error in primary and secondary is async output. Skip secondary for backup", error: e
                  log.warn_backtrace
                  backup_chunk(chunk, using_secondary, output.delayed_commit)
                else
                  log.warn "got unrecoverable error in primary. Skip retry and flush chunk to secondary", error: e
                  log.warn_backtrace
                  begin
                    @secondary.write(chunk)
                    commit_write(chunk_id, delayed: output.delayed_commit, secondary: true)
                  rescue => e
                    log.warn "got an error in secondary for unrecoverable error", error: e
                    log.warn_backtrace
                    backup_chunk(chunk, using_secondary, output.delayed_commit)
                  end
                end
              end
            end
          else
            log.warn "got unrecoverable error in primary and no secondary", error: e
            log.warn_backtrace
            backup_chunk(chunk, using_secondary, output.delayed_commit)
          end
        rescue => e
          log.debug "taking back chunk for errors.", chunk: dump_unique_id_hex(chunk.unique_id)
          if output.delayed_commit
            @dequeued_chunks_mutex.synchronize do
              @dequeued_chunks.delete_if{|d| d.chunk_id == chunk.unique_id }
            end
          end

          if @buffer.takeback_chunk(chunk.unique_id)
            @rollback_count_metrics.inc
          end

          update_retry_state(chunk.unique_id, using_secondary, e)

          raise if @under_plugin_development && !@retry_for_error_chunk
        end
      end

      def backup_chunk(chunk, using_secondary, delayed_commit)
        if @buffer.disable_chunk_backup
          log.warn "disable_chunk_backup is true. #{dump_unique_id_hex(chunk.unique_id)} chunk is thrown away"
        else
          @buffer.backup(chunk.unique_id) { |f|
            chunk.write_to(f)
          }
        end
        commit_write(chunk.unique_id, secondary: using_secondary, delayed: delayed_commit)
      end

      def check_slow_flush(start)
        elapsed_time = Fluent::Clock.now - start
        elapsed_millsec = (elapsed_time * 1000).to_i
        @flush_time_count_metrics.add(elapsed_millsec)
        if elapsed_time > @slow_flush_log_threshold
          @slow_flush_count_metrics.inc
          log.warn "buffer flush took longer time than slow_flush_log_threshold:",
                   elapsed_time: elapsed_time, slow_flush_log_threshold: @slow_flush_log_threshold, plugin_id: self.plugin_id
        end
      end

      def update_retry_state(chunk_id, using_secondary, error = nil)
        @retry_mutex.synchronize do
          @num_errors_metrics.inc
          chunk_id_hex = dump_unique_id_hex(chunk_id)

          unless @retry
            @retry = retry_state(@buffer_config.retry_randomize)

            if @retry.limit?
              handle_limit_reached(error)
            elsif error
              log_retry_error(error, chunk_id_hex, using_secondary)
            end

            return
          end

          # @retry exists

          # Ensure that the current time is greater than or equal to @retry.next_time to avoid the situation when
          # @retry.step is called almost as many times as the number of flush threads in a short time.
          if Time.now >= @retry.next_time
            @retry.step
          else
            @retry.recalc_next_time # to prevent all flush threads from retrying at the same time
          end

          if @retry.limit?
            handle_limit_reached(error)
          elsif error
            log_retry_error(error, chunk_id_hex, using_secondary)
          end
        end
      end

      def log_retry_error(error, chunk_id_hex, using_secondary)
        return unless error
        if using_secondary
          msg = "failed to flush the buffer with secondary output."
        else
          msg = "failed to flush the buffer."
        end
        log.warn(msg, retry_times: @retry.steps, next_retry_time: @retry.next_time.round, chunk: chunk_id_hex, error: error)
        log.warn_backtrace(error.backtrace)
      end

      def handle_limit_reached(error)
        if error
          records = @buffer.queued_records
          msg = "Hit limit for retries. dropping all chunks in the buffer queue."
          log.error msg, retry_times: @retry.steps, records: records, error: error
          log.error_backtrace error.backtrace
        end
        @buffer.clear_queue!
        log.debug "buffer queue cleared"
        @retry = nil
      end

      def retry_state(randomize)
        if @secondary
          retry_state_create(
            :output_retries, @buffer_config.retry_type, @buffer_config.retry_wait, @buffer_config.retry_timeout,
            forever: @buffer_config.retry_forever, max_steps: @buffer_config.retry_max_times, backoff_base: @buffer_config.retry_exponential_backoff_base,
            max_interval: @buffer_config.retry_max_interval,
            secondary: true, secondary_threshold: @buffer_config.retry_secondary_threshold,
            randomize: randomize
          )
        else
          retry_state_create(
            :output_retries, @buffer_config.retry_type, @buffer_config.retry_wait, @buffer_config.retry_timeout,
            forever: @buffer_config.retry_forever, max_steps: @buffer_config.retry_max_times, backoff_base: @buffer_config.retry_exponential_backoff_base,
            max_interval: @buffer_config.retry_max_interval,
            randomize: randomize
          )
        end
      end

      def submit_flush_once
        # Without locks: it is rough but enough to select "next" writer selection
        @output_flush_thread_current_position = (@output_flush_thread_current_position + 1) % @buffer_config.flush_thread_count
        state = @output_flush_threads[@output_flush_thread_current_position]
        state.mutex.synchronize {
          if state.thread && state.thread.status # "run"/"sleep"/"aborting" or false(successfully stop) or nil(killed by exception)
            state.next_clock = 0
            state.cond_var.signal
          else
            log.warn "thread is already dead"
          end
        }
        Thread.pass
      end

      def force_flush
        if @buffering
          @buffer.enqueue_all(true)
          submit_flush_all
        end
      end

      def submit_flush_all
        while !@retry && @buffer.queued?
          submit_flush_once
          sleep @buffer_config.flush_thread_burst_interval
        end
      end

      # only for tests of output plugin
      def interrupt_flushes
        @output_flush_interrupted = true
      end

      # only for tests of output plugin
      def enqueue_thread_wait
        @output_enqueue_thread_mutex.synchronize do
          @output_flush_interrupted = false
          @output_enqueue_thread_waiting = true
        end
        require 'timeout'
        Timeout.timeout(10) do
          Thread.pass while @output_enqueue_thread_waiting
        end
      end

      # only for tests of output plugin
      def flush_thread_wakeup
        @output_flush_threads.each do |state|
          state.mutex.synchronize {
            if state.thread && state.thread.status
              state.next_clock = 0
              state.cond_var.signal
            end
          }
          Thread.pass
        end
      end

      def enqueue_thread_run
        value_for_interval = nil
        if @flush_mode == :interval
          value_for_interval = @buffer_config.flush_interval
        end
        if @chunk_key_time
          if !value_for_interval || @buffer_config.timekey < value_for_interval
            value_for_interval = [@buffer_config.timekey, @buffer_config.timekey_wait].min
          end
        end
        unless value_for_interval
          raise "BUG: both of flush_interval and timekey are disabled"
        end
        interval = value_for_interval / 11.0
        if interval < @buffer_config.flush_thread_interval
          interval = @buffer_config.flush_thread_interval
        end

        while !self.after_started? && !self.stopped?
          sleep 0.5
        end
        log.debug "enqueue_thread actually running"

        begin
          while @output_enqueue_thread_running
            now_int = Time.now.to_i
            if @output_flush_interrupted
              sleep interval
              next
            end

            @output_enqueue_thread_mutex.lock
            begin
              if @flush_mode == :interval
                flush_interval = @buffer_config.flush_interval.to_i
                # This block should be done by integer values.
                # If both of flush_interval & flush_thread_interval are 1s, expected actual flush timing is 1.5s.
                # If we use integered values for this comparison, expected actual flush timing is 1.0s.
                @buffer.enqueue_all{ |metadata, chunk| chunk.raw_create_at + flush_interval <= now_int }
              end

              if @chunk_key_time
                timekey_unit = @buffer_config.timekey
                timekey_wait = @buffer_config.timekey_wait
                current_timekey = now_int - now_int % timekey_unit
                @buffer.enqueue_all{ |metadata, chunk| metadata.timekey < current_timekey && metadata.timekey + timekey_unit + timekey_wait <= now_int }
              end
            rescue => e
              raise if @under_plugin_development
              log.error "unexpected error while checking flushed chunks. ignored.", error: e
              log.error_backtrace
            ensure
              @output_enqueue_thread_waiting = false
              @output_enqueue_thread_mutex.unlock
            end
            sleep interval
          end
        rescue => e
          # normal errors are rescued by inner begin-rescue clause.
          log.error "error on enqueue thread", error: e
          log.error_backtrace
          raise
        end
      end

      def flush_thread_run(state)
        flush_thread_interval = @buffer_config.flush_thread_interval

        state.next_clock = Fluent::Clock.now + flush_thread_interval

        while !self.after_started? && !self.stopped?
          sleep 0.5
        end
        log.debug "flush_thread actually running"

        state.mutex.lock
        begin
          # This thread don't use `thread_current_running?` because this thread should run in `before_shutdown` phase
          while @output_flush_threads_running
            current_clock = Fluent::Clock.now
            next_retry_time = nil

            @retry_mutex.synchronize do
              next_retry_time = @retry ? @retry.next_time : nil
            end

            if state.next_clock > current_clock
              interval = state.next_clock - current_clock
            elsif next_retry_time && next_retry_time > Time.now
              interval = next_retry_time.to_f - Time.now.to_f
            else
              state.mutex.unlock
              begin
                try_flush
                # next_flush_time uses flush_thread_interval or flush_thread_burst_interval (or retrying)
                interval = next_flush_time.to_f - Time.now.to_f
                # TODO: if secondary && delayed-commit, next_flush_time will be much longer than expected
                #       because @retry still exists (#commit_write is not called yet in #try_flush)
                #       @retry should be cleared if delayed commit is enabled? Or any other solution?
                state.next_clock = Fluent::Clock.now + interval
              ensure
                state.mutex.lock
              end
            end

            if @dequeued_chunks_mutex.synchronize{ !@dequeued_chunks.empty? && @dequeued_chunks.first.expired? }
              unless @output_flush_interrupted
                state.mutex.unlock
                begin
                  try_rollback_write
                ensure
                  state.mutex.lock
                end
              end
            end

            state.cond_var.wait(state.mutex, interval) if interval > 0
          end
        rescue => e
          # normal errors are rescued by output plugins in #try_flush
          # so this rescue section is for critical & unrecoverable errors
          log.error "error on output thread", error: e
          log.error_backtrace
          raise
        ensure
          state.mutex.unlock
        end
      end

      BUFFER_STATS_KEYS = {}
      Fluent::Plugin::Buffer::STATS_KEYS.each { |key|
        BUFFER_STATS_KEYS[key] = "buffer_#{key}"
      }

      def statistics
        stats = {
          'emit_records' => @emit_records_metrics.get,
          'emit_size' => @emit_size_metrics.get,
          # Respect original name
          # https://github.com/fluent/fluentd/blob/45c7b75ba77763eaf87136864d4942c4e0c5bfcd/lib/fluent/plugin/in_monitor_agent.rb#L284
          'retry_count' => @num_errors_metrics.get,
          'emit_count' => @emit_count_metrics.get,
          'write_count' => @write_count_metrics.get,
          'rollback_count' => @rollback_count_metrics.get,
          'slow_flush_count' => @slow_flush_count_metrics.get,
          'flush_time_count' => @flush_time_count_metrics.get,
        }

        if @buffer && @buffer.respond_to?(:statistics)
          (@buffer.statistics['buffer'] || {}).each do |k, v|
            stats[BUFFER_STATS_KEYS[k]] = v
          end
        end

        { 'output' => stats }
      end
    end
  end
end