ManageIQ/manageiq-gems-pending

View on GitHub
lib/gems/pending/util/duplicate_blocker/dedup_handler.rb

Summary

Maintainability
A
35 mins
Test Coverage
A
100%
module DuplicateBlocker
  class DedupHandler
    #
    # The number of duplicates in a preset time window needed to trip the breaker.
    #
    attr_accessor :duplicate_threshold

    #
    # The time window in seconds
    #
    attr_accessor :duplicate_window

    #
    # The width of a slot in seconds. The time window is divided into many slots.
    #
    attr_accessor :window_slot_width

    #
    # Report the total number of blocked calls for every this many duplicates
    #
    attr_accessor :progress_threshold

    #
    # Flag whether throw an exception when a call is blocked due to too duplications
    #
    attr_accessor :throw_exception_when_blocked

    #
    # A hash history of duplicates.
    # Key is the hashcode of the key object generated by the key generator
    # Value is a History instance
    #
    attr_reader :histories

    #
    # Every this many seconds should a cleaning of history hash occurs
    #
    attr_accessor :purging_period

    #
    # Optional logger.
    #
    attr_accessor :logger

    DEFAULT_DUPLICATE_THRESHOLD = 120
    DEFAULT_DUPLICATE_WINDOW    = 60
    DEFAULT_SLOT_WIDTH          = 0.1
    DEFAULT_PROGRESS_THRESHOLD  = 500
    DEFAULT_THROW_EXECEPTION    = true
    DEFAULT_PURGING_PERIOD      = 300

    def initialize(logger = nil)
      @logger = logger
      @duplicate_threshold          = DEFAULT_DUPLICATE_THRESHOLD
      @duplicate_window             = DEFAULT_DUPLICATE_WINDOW
      @window_slot_width            = DEFAULT_SLOT_WIDTH
      @progress_threshold           = DEFAULT_PROGRESS_THRESHOLD
      @throw_exception_when_blocked = DEFAULT_THROW_EXECEPTION
      @purging_period               = DEFAULT_PURGING_PERIOD
      @histories = {}
      @last_purged = Time.new.utc
    end

    #
    # Handles the method covered by the duplicate blocker.
    #
    def handle(method, *args)
      key = key_generator.call(method, *args)
      entry = update_history(key.hash)
      entry.description = descriptor.call(method, *args)

      begin
        tripped?(entry) ? on_blocking_call(entry) : method[*args]
      ensure
        # dereferencing the description can help saving memory used by the hash
        entry.description = nil
      end
    end

    def default_key_generator(meth, *args)
      [meth, *args]
    end

    def key_generator
      @key_generator ||= method(:default_key_generator)
    end

    #
    # Allow the user to provide a proc to generate a key based on the method and parameters
    # The generator proc will receive argument (meth, *args)
    #
    attr_writer :key_generator

    def default_descriptor(meth, *args)
      "#{meth.owner.name}.#{meth.name} with arguments #{args}"
    end

    def descriptor
      @descriptor ||= method(:default_descriptor)
    end

    #
    # Allow the user to provide a proc to generate a description based on the method and parameters
    # The descriptor proc will receive argument (meth, *args)
    #
    attr_writer :descriptor

    #
    # Remove outdated histories from the hash.
    #
    def purge_histories(now)
      histories.delete_if { |_key, value| now - value.last_updated >= duplicate_window }
      @last_purged = now
    end

    TimeSlot = Struct.new(:timestamp, :count)

    class History
      attr_accessor :blocked_count
      attr_reader   :dup_count
      attr_reader   :last_updated
      attr_accessor :description

      def initialize(init_time)
        @slots = [TimeSlot.new(init_time, 1)]
        @last_updated = init_time
        @dup_count = 0
        @blocked_count = 0
      end

      # return total count change in slots (not including the current slot)
      def update_slots(handler, now)
        @last_updated = now

        slot_width = handler.window_slot_width
        last_timestamp = @slots[-1].timestamp
        if now - last_timestamp < slot_width
          @slots[-1].count += 1
          return 0
        end

        new_timestamp = last_timestamp + ((now - last_timestamp) / slot_width).to_int * slot_width

        # seal the tail slot
        delta = @slots[-1].count

        # drop slots (from head) that are outdated
        @slots = @slots.drop_while { |s| delta -= s.count if new_timestamp - s.timestamp > handler.duplicate_window }

        @dup_count += delta

        # append a new slot
        @slots << TimeSlot.new(new_timestamp, 1)

        delta
      end
    end

    private

    def tripped?(entry)
      if entry.dup_count < duplicate_threshold
        reset(entry) if entry.blocked_count > 0
        return false
      end

      if entry.blocked_count == 0
        trip(entry)
      else
        entry.blocked_count += 1
        report_tripping_still_on(entry) if entry.blocked_count % progress_threshold == 0
      end
      true
    end

    # state from normal to tripped
    def trip(entry)
      @logger.warn("Breaker for #{entry.description} is tripped. Further calls are blocked.") if @logger
      # other notification can be added here

      entry.blocked_count = 1
    end

    # state from tripped to normal
    def reset(entry)
      @logger.info("Tripped condition for #{entry.description} is now reset. " \
        "Total #{entry.blocked_count} calls were blocked.") if @logger
      # other notification can be added here

      entry.blocked_count = 0
    end

    def report_tripping_still_on(entry)
      @logger.warn("Breaker for #{entry.description} is still tripped. " \
        "Total #{entry.blocked_count} calls have been blocked.") if @logger
      # other notification can be added here
    end

    def update_history(key)
      now = Time.new.utc
      purge_histories(now) if now - @last_purged > purging_period

      history = histories[key]
      if history.nil?
        history = History.new(now)
        histories[key] = history
      else
        history.update_slots(self, now)
      end
      history
    end

    #
    # Called when a call is blocked. Raises a DuplicateFoundException exception if necessary.
    #
    def on_blocking_call(entry)
      @logger.debug("#{entry.description} is blocked because it is duplicated.") if @logger

      raise(DuplicateFoundException, entry.description, caller) if throw_exception_when_blocked
    end
  end
end