fluent/fluentd

View on GitHub
lib/fluent/plugin_helper/retry_state.rb

Summary

Maintainability
C
1 day
Test Coverage
#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

module Fluent
  module PluginHelper
    module RetryState
      def retry_state_create(
          title, retry_type, wait, timeout,
          forever: false, max_steps: nil, backoff_base: 2, max_interval: nil, randomize: true, randomize_width: 0.125,
          secondary: false, secondary_threshold: 0.8
      )
        case retry_type
        when :exponential_backoff
          ExponentialBackOffRetry.new(title, wait, timeout, forever, max_steps, randomize, randomize_width, backoff_base, max_interval, secondary, secondary_threshold)
        when :periodic
          PeriodicRetry.new(title, wait, timeout, forever, max_steps, randomize, randomize_width, secondary, secondary_threshold)
        else
          raise "BUG: unknown retry_type specified: '#{retry_type}'"
        end
      end

      class RetryStateMachine
        attr_reader :title, :start, :steps, :next_time, :timeout_at, :current, :secondary_transition_at, :secondary_transition_steps

        def initialize(title, wait, timeout, forever, max_steps, randomize, randomize_width, secondary, secondary_threshold)
          @title = title

          @start = current_time
          @steps = 0
          @next_time = nil # should be initialized for first retry by child class

          @timeout = timeout
          @timeout_at = @start + timeout
          @has_reached_timeout = false
          @has_timed_out = false
          @current = :primary

          if randomize_width < 0 || randomize_width > 0.5
            raise "BUG: randomize_width MUST be between 0 and 0.5"
          end

          @randomize = randomize
          @randomize_width = randomize_width

          @forever = forever
          @max_steps = max_steps

          @secondary = secondary
          @secondary_threshold = secondary_threshold
          if @secondary
            raise "BUG: secondary_transition_threshold MUST be between 0 and 1" if @secondary_threshold <= 0 || @secondary_threshold >= 1
            max_retry_timeout = timeout
            if max_steps
              timeout_by_max_steps = calc_max_retry_timeout(max_steps)
              max_retry_timeout = timeout_by_max_steps if timeout_by_max_steps < max_retry_timeout
            end
            @secondary_transition_at = @start + max_retry_timeout * @secondary_threshold
            @secondary_transition_steps = nil
          end
        end

        def current_time
          Time.now
        end

        def randomize(interval)
          return interval unless @randomize

          interval + (interval * @randomize_width * (2 * rand - 1.0))
        end

        def calc_next_time
          if @forever || !@secondary # primary
            naive = naive_next_time(@steps)
            if @forever
              naive
            elsif naive >= @timeout_at
              @timeout_at
            else
              naive
            end
          elsif @current == :primary && @secondary
            naive = naive_next_time(@steps)
            if naive >= @secondary_transition_at
              @secondary_transition_at
            else
              naive
            end
          elsif @current == :secondary
            naive = naive_next_time(@steps - @secondary_transition_steps)
            if naive >= @timeout_at
              @timeout_at
            else
              naive
            end
          else
            raise "BUG: it's out of design"
          end
        end

        def naive_next_time(retry_times)
          raise NotImplementedError
        end

        def secondary?
          !@forever && @secondary && (@current == :secondary || current_time >= @secondary_transition_at)
        end

        def step
          @steps += 1
          if !@forever && @secondary && @current != :secondary && current_time >= @secondary_transition_at
            @current = :secondary
            @secondary_transition_steps = @steps
          end

          @next_time = calc_next_time

          if @has_reached_timeout
            @has_timed_out = @next_time >= @timeout_at
          else
            @has_reached_timeout = @next_time >= @timeout_at
          end

          nil
        end

        def recalc_next_time
          @next_time = calc_next_time
        end

        def limit?
          if @forever
            false
          else
            @has_timed_out || !!(@max_steps && @steps >= @max_steps)
          end
        end
      end

      class ExponentialBackOffRetry < RetryStateMachine
        def initialize(title, wait, timeout, forever, max_steps, randomize, randomize_width, backoff_base, max_interval, secondary, secondary_threshold)
          @constant_factor = wait
          @backoff_base = backoff_base
          @max_interval = max_interval

          super(title, wait, timeout, forever, max_steps, randomize, randomize_width, secondary, secondary_threshold)

          @next_time = @start + @constant_factor
        end

        def naive_next_time(retry_next_times)
          intr = calc_interval(retry_next_times)
          current_time + randomize(intr)
        end

        def calc_max_retry_timeout(max_steps)
          result = 0
          max_steps.times { |i|
            result += calc_interval(i)
          }
          result
        end

        def calc_interval(num)
          interval = raw_interval(num)
          if @max_interval && interval > @max_interval
            @max_interval
          else
            if interval.finite?
              interval
            else
              # Calculate previous finite value to avoid inf related errors. If this re-computing is heavy, use cache.
              until interval.finite?
                num -= 1
                interval = raw_interval(num)
              end
              interval
            end
          end
        end

        def raw_interval(num)
          @constant_factor.to_f * (@backoff_base ** (num))
        end
      end

      class PeriodicRetry < RetryStateMachine
        def initialize(title, wait, timeout, forever, max_steps, randomize, randomize_width, secondary, secondary_threshold)
          @retry_wait = wait

          super(title, wait, timeout, forever, max_steps, randomize, randomize_width, secondary, secondary_threshold)

          @next_time = @start + @retry_wait
        end

        def naive_next_time(retry_next_times)
          current_time + randomize(@retry_wait)
        end

        def calc_max_retry_timeout(max_steps)
          @retry_wait * max_steps
        end
      end
    end
  end
end