ParentSquare/faulty

View on GitHub
lib/faulty/circuit.rb

Summary

Maintainability
B
6 hrs
Test Coverage
A
100%
# frozen_string_literal: true

class Faulty
  # Runs code protected by a circuit breaker
  #
  # https://www.martinfowler.com/bliki/CircuitBreaker.html
  #
  # A circuit is intended to protect against repeated calls to a failing
  # external dependency. For example, a vendor API may be failing continuously.
  # In that case, we trip the circuit breaker and stop calling that API for
  # a specified cool-down period.
  #
  # Once the cool-down passes, we try the API again, and if it succeeds, we reset
  # the circuit.
  #
  # Why isn't there a timeout option?
  # -----------------
  # Timeout is inherently unsafe, and
  # should not be used blindly.
  # See [Why Ruby's timeout is Dangerous](https://jvns.ca/blog/2015/11/27/why-rubys-timeout-is-dangerous-and-thread-dot-raise-is-terrifying).
  #
  # You should prefer a network timeout like `open_timeout` and `read_timeout`, or
  # write your own code to periodically check how long it has been running.
  # If you're sure you want ruby's generic Timeout, you can apply it yourself
  # inside the circuit run block.
  class Circuit
    CACHE_REFRESH_SUFFIX = '.faulty_refresh'

    attr_reader :name

    # Options for {Circuit}
    #
    # @!attribute [r] cache_expires_in
    #   @return [Integer, nil] The number of seconds to keep
    #     cached results. A value of nil will keep the cache indefinitely.
    #     Default `86400`.
    # @!attribute [r] cache_refreshes_after
    #   @return [Integer, nil] The number of seconds after which we attempt
    #     to refresh the cache even if it's not expired. If the circuit fails,
    #     we continue serving the value from cache until `cache_expires_in`.
    #     A value of `nil` disables cache refreshing.
    #     Default `900`.
    # @!attribute [r] cache_refresh_jitter
    #   @return [Integer] The maximum number of seconds to randomly add or
    #     subtract from `cache_refreshes_after` when determining whether to
    #     refresh the cache.  A non-zero value helps reduce a "thundering herd"
    #     cache refresh in most scenarios. Set to `0` to disable jitter.
    #     Default `0.2 * cache_refreshes_after`.
    # @!attribute [r] cool_down
    #   @return [Integer] The number of seconds the circuit will
    #     stay open after it is tripped. Default 300.
    # @!attribute [r] error_mapper
    #   @return [Module, #call] Used by patches to set the namespace module for
    #     the faulty errors that will be raised. Should be a module or a callable.
    #     If given a module, the circuit assumes the module has error classes
    #     in that module. If given an object that responds to `#call` (a proc
    #     or lambda), the return value of the callable will be used. The callable
    #     is called with (`error_name`, `cause_error`, `circuit`). Default `Faulty`
    # @!attribute [r] evaluation_window
    #   @return [Integer] The number of seconds of history that
    #     will be evaluated to determine the failure rate for a circuit.
    #     Default `60`.
    # @!attribute [r] rate_threshold
    #   @return [Float] The minimum failure rate required to trip
    #     the circuit. For example, `0.5` requires at least a 50% failure rate to
    #     trip. Default `0.5`.
    # @!attribute [r] sample_threshold
    #   @return [Integer] The minimum number of runs required before
    #     a circuit can trip. A value of 1 means that the circuit will trip
    #     immediately when a failure occurs. Default `3`.
    # @!attribute [r] errors
    #   @return [Error, Array<Error>] An array of errors that are considered circuit
    #     failures. Default `[StandardError]`.
    # @!attribute [r] exclude
    #   @return [Error, Array<Error>] An array of errors that will not be
    #     captured by Faulty. These errors will not be considered circuit
    #     failures. Default `[]`.
    # @!attribute [r] cache
    #   @return [Cache::Interface] The cache backend. Default
    #   `Cache::Null.new`. Unlike {Faulty#initialize}, this is not wrapped in
    #   {Cache::AutoWire} by default.
    # @!attribute [r] notifier
    #   @return [Events::Notifier] A Faulty notifier. Default `Events::Notifier.new`
    # @!attribute [r] storage
    #   @return [Storage::Interface] The storage backend. Default
    #   `Storage::Memory.new`. Unlike {Faulty#initialize}, this is not wrapped
    #    in {Storage::AutoWire} by default.
    # @!attribute [r] registry
    #   @return [CircuitRegistry] For use by {Faulty} instances to facilitate
    #   memoization of circuits.
    Options = Struct.new(
      :cache_expires_in,
      :cache_refreshes_after,
      :cache_refresh_jitter,
      :cool_down,
      :evaluation_window,
      :rate_threshold,
      :sample_threshold,
      :errors,
      :error_mapper,
      :exclude,
      :cache,
      :notifier,
      :storage,
      :registry
    ) do
      include ImmutableOptions

      # Get the options stored in the storage backend
      #
      # @return [Hash] A hash of stored options
      def for_storage
        {
          cool_down: cool_down,
          evaluation_window: evaluation_window,
          rate_threshold: rate_threshold,
          sample_threshold: sample_threshold
        }
      end

      def defaults
        {
          cache_expires_in: 86_400,
          cache_refreshes_after: 900,
          cool_down: 300,
          errors: [StandardError],
          error_mapper: Faulty,
          exclude: [],
          evaluation_window: 60,
          rate_threshold: 0.5,
          sample_threshold: 3
        }
      end

      def required
        %i[
          cache
          cool_down
          errors
          error_mapper
          exclude
          evaluation_window
          rate_threshold
          sample_threshold
          notifier
          storage
        ]
      end

      def finalize
        self.cache ||= Cache::Default.new
        self.notifier ||= Events::Notifier.new
        self.storage ||= Storage::Memory.new
        self.errors = [errors] if errors && !errors.is_a?(Array)
        self.exclude = [exclude] if exclude && !exclude.is_a?(Array)

        unless cache_refreshes_after.nil?
          self.cache_refresh_jitter = 0.2 * cache_refreshes_after
        end
      end
    end

    # @return [String] Text representation of the circuit
    def inspect
      interested_opts = %i[
        cache_expires_in
        cache_refreshes_after
        cache_refresh_jitter
        cool_down evaluation_window
        rate_threshold
        sample_threshold
        errors exclude
      ]
      options_text = options.each_pair.map { |k, v| "#{k}: #{v}" if interested_opts.include?(k) }.compact.join(', ')
      %(#<#{self.class.name} name: #{name}, state: #{status.state}, options: { #{options_text} }>)
    end

    # @param name [String] The name of the circuit
    # @param options [Hash] Attributes for {Options}
    # @yield [Options] For setting options in a block
    def initialize(name, **options, &block)
      raise ArgumentError, 'name must be a String' unless name.is_a?(String)

      @name = name
      @given_options = Options.new(options, &block)
      @pulled_options = nil
      @options_pushed = false
    end

    # Get the options for this circuit
    #
    # If this circuit has been run, these will the options exactly as given
    # to {.new}. However, if this circuit has not yet been run, these options
    # will be supplemented by the last-known options from the circuit storage.
    #
    # Once a circuit is run, the given options are pushed to circuit storage to
    # be persisted.
    #
    # This is to allow circuit objects to behave as expected in contexts where
    # the exact options for a circuit are not known such as an admin dashboard
    # or in a debug console.
    #
    # Note that this distinction isn't usually important unless using
    # distributed circuit storage like the Redis storage backend.
    #
    # @example
    #   Faulty.circuit('api', cool_down: 5).run { api.users }
    #   # This status will be calculated using the cool_down of 5 because
    #   # the circuit was already run
    #   Faulty.circuit('api').status
    #
    # @example
    #   # This status will be calculated using the cool_down in circuit storage
    #   # if it is available instead of using the default value.
    #   Faulty.circuit('api').status
    #
    # @example
    #   # For typical usage, this behaves as expected, but note that it's
    #   # possible to run into some unexpected behavior when creating circuits
    #   # in unusual ways.
    #
    #   # For example, this status will be calculated using the cool_down in
    #   # circuit storage if it is available despite the given value of 5.
    #   Faulty.circuit('api', cool_down: 5).status
    #   Faulty.circuit('api').run { api.users }
    #   # However now, after the circuit is run, status will be calculated
    #   # using the given cool_down of 5 and the value of 5 will be pushed
    #   # permanently to circuit storage
    #   Faulty.circuit('api').status
    #
    # @return [Options] The resolved options
    def options
      return @given_options if @options_pushed
      return @pulled_options if @pulled_options

      stored = @given_options.storage.get_options(self)
      @pulled_options = stored ? @given_options.dup_with(stored) : @given_options
    end

    # Run the circuit as with {#run}, but return a {Result}
    #
    # This is syntax sugar for running a circuit and rescuing an error
    #
    # @example
    #   result = Faulty.circuit(:api).try_run do
    #     api.get
    #   end
    #
    #   response = if result.ok?
    #     result.get
    #   else
    #     { error: result.error.message }
    #   end
    #
    # @example
    #   # The Result object has a fetch method that can return a default value
    #   # if an error occurs
    #   result = Faulty.circuit(:api).try_run do
    #     api.get
    #   end.fetch({})
    #
    # @param (see #run)
    # @yield (see #run)
    # @raise If the block raises an error not in the error list, or if the error
    #   is excluded.
    # @return [Result<Object, Error>] A result where the ok value is the return
    #   value of the block, or the error value is an error captured by the
    #   circuit.
    def try_run(**options, &block)
      Result.new(ok: run(**options, &block))
    rescue FaultyError => e
      Result.new(error: e)
    end

    # Run a block protected by this circuit
    #
    # If the circuit is closed, the block will run. Any exceptions raised inside
    # the block will be checked against the error and exclude options to determine
    # whether that error should be captured. If the error is captured, this
    # run will be recorded as a failure.
    #
    # If the circuit exceeds the failure conditions, this circuit will be tripped
    # and marked as open. Any future calls to run will not execute the block, but
    # instead wait for the cool down period. Once the cool down period passes,
    # the circuit transitions to half-open, and the block will be allowed to run.
    #
    # If the circuit fails again while half-open, the circuit will be closed for
    # a second cool down period. However, if the circuit completes successfully,
    # the circuit will be closed and reset to its initial state.
    #
    # When this is run, the given options are persisted to the storage backend.
    #
    # @param cache [String, nil] A cache key, or nil if caching is not desired
    # @yield The block to protect with this circuit
    # @raise If the block raises an error not in the error list, or if the error
    #   is excluded.
    # @raise {OpenCircuitError} if the circuit is open
    # @raise {CircuitTrippedError} if this run causes the circuit to trip. It's
    #   possible for concurrent runs to simultaneously trip the circuit if the
    #   storage engine is not concurrency-safe.
    # @raise {CircuitFailureError} if this run fails, but doesn't cause the
    #   circuit to trip
    # @return The return value of the block
    def run(cache: nil, &block)
      push_options
      cached_value = cache_read(cache)
      # return cached unless cached.nil?
      return cached_value if !cached_value.nil? && !cache_should_refresh?(cache)

      current_status = status
      return run_skipped(cached_value) unless current_status.can_run?

      run_exec(current_status, cached_value, cache, &block)
    end

    # Force the circuit to stay open until unlocked
    #
    # @return [self]
    def lock_open!
      storage.lock(self, :open)
      self
    end

    # Force the circuit to stay closed until unlocked
    #
    # @return [self]
    def lock_closed!
      storage.lock(self, :closed)
      self
    end

    # Remove any open or closed locks
    #
    # @return [self]
    def unlock!
      storage.unlock(self)
      self
    end

    # Reset this circuit to its initial state
    #
    # This removes the current state, all history, and locks
    #
    # @return [self]
    def reset!
      @options_pushed = false
      @pulled_options = nil
      storage.reset(self)
      self
    end

    # Get the current status of the circuit
    #
    # This method is not safe for concurrent operations, so it's unsafe
    # to check this method and make runtime decisions based on that. However,
    # it's useful for getting a non-synchronized snapshot of a circuit.
    #
    # @return [Status]
    def status
      storage.status(self)
    end

    # Get the history of runs of this circuit
    #
    # The history is an array of tuples where the first value is
    # the run time, and the second value is a boolean which is true
    # if the run was successful.
    #
    # @return [Array<Array>>] An array of tuples of [run_time, is_success]
    def history
      storage.history(self)
    end

    private

    # Push the given options to circuit storage and set those as the current
    # options
    #
    # @return [void]
    def push_options
      return if @options_pushed

      @pulled_options = nil
      @options_pushed = true
      resolved = options.registry&.resolve(self)
      if resolved
        # If another circuit instance was resolved, don't store these options
        # Instead, copy the options from that circuit as if we were given those
        @given_options = resolved.options
      else
        storage.set_options(self, @given_options.for_storage)
      end
    end

    # Process a skipped run
    #
    # @param cached_value The cached value if one is available
    # @return The result from cache if available
    def run_skipped(cached_value)
      skipped!
      raise map_error(:OpenCircuitError) if cached_value.nil?

      cached_value
    end

    # Execute a run
    #
    # @param cached_value The cached value if one is available
    # @param cache_key [String, nil] The cache key if one is given
    # @return The run result
    def run_exec(status, cached_value, cache_key)
      result = yield
      success!(status)
      cache_write(cache_key, result)
      result
    rescue *options.errors => e
      raise if options.exclude.any? { |ex| e.is_a?(ex) }

      opened = failure!(status, e)
      if cached_value.nil?
        if opened
          raise map_error(:CircuitTrippedError, e)
        else
          raise map_error(:CircuitFailureError, e)
        end
      else
        cached_value
      end
    end

    # @return [Boolean] True if the circuit transitioned to closed
    def success!(status)
      storage.entry(self, Faulty.current_time, true, nil)
      closed = close! if status.half_open?

      options.notifier.notify(:circuit_success, circuit: self)
      closed
    end

    # @return [Boolean] True if the circuit transitioned to open
    def failure!(status, error)
      status = storage.entry(self, Faulty.current_time, false, status)
      options.notifier.notify(:circuit_failure, circuit: self, status: status, error: error)

      if status.half_open?
        reopen!(error, status.opened_at)
      elsif status.fails_threshold?
        open!(error)
      else
        false
      end
    end

    def skipped!
      options.notifier.notify(:circuit_skipped, circuit: self)
    end

    # @return [Boolean] True if the circuit transitioned from closed to open
    def open!(error)
      opened = storage.open(self, Faulty.current_time)
      options.notifier.notify(:circuit_opened, circuit: self, error: error) if opened
      opened
    end

    # @return [Boolean] True if the circuit was reopened
    def reopen!(error, previous_opened_at)
      reopened = storage.reopen(self, Faulty.current_time, previous_opened_at)
      options.notifier.notify(:circuit_reopened, circuit: self, error: error) if reopened
      reopened
    end

    # @return [Boolean] True if the circuit transitioned from half-open to closed
    def close!
      closed = storage.close(self)
      options.notifier.notify(:circuit_closed, circuit: self) if closed
      closed
    end

    # Read from the cache if it is configured
    #
    # @param key The key to read from the cache
    # @return The cached value, or nil if not present
    def cache_read(key)
      return unless key

      result = options.cache.read(key.to_s)
      event = result.nil? ? :circuit_cache_miss : :circuit_cache_hit
      options.notifier.notify(event, circuit: self, key: key)
      result
    end

    # Write to the cache if it is configured
    #
    # @param key The key to read from the cache
    # @return [void]
    def cache_write(key, value)
      return unless key

      options.notifier.notify(:circuit_cache_write, circuit: self, key: key)
      options.cache.write(key.to_s, value, expires_in: options.cache_expires_in)

      unless options.cache_refreshes_after.nil?
        options.cache.write(cache_refresh_key(key.to_s), next_refresh_time, expires_in: options.cache_expires_in)
      end
    end

    # Check whether the cache should be refreshed
    #
    # Should be called only if cache is present
    #
    # @return [Boolean] true if the cache should be refreshed
    def cache_should_refresh?(key)
      time = options.cache.read(cache_refresh_key(key.to_s)).to_i
      time + (((rand * 2) - 1) * options.cache_refresh_jitter) < Faulty.current_time
    end

    # Get the next time to refresh the cache when writing to it
    #
    # @return [Integer] The timestamp to refresh at
    def next_refresh_time
      (Faulty.current_time + options.cache_refreshes_after).floor
    end

    # Get the corresponding cache refresh key for a given cache key
    #
    # We use this to force a cache entry to refresh before it has expired
    #
    # @return [String] The cache refresh key
    def cache_refresh_key(key)
      "#{key}#{CACHE_REFRESH_SUFFIX}"
    end

    # Get a random number from 0.0 to 1.0 for use with cache jitter
    #
    # @return [Float] A random number from 0.0 to 1.0
    def rand
      SecureRandom.random_number
    end

    # Alias to the storage engine from options
    #
    # Always returns the value from the given options
    #
    # @return [Storage::Interface]
    def storage
      return Faulty::Storage::Null.new if Faulty.disabled?

      @given_options.storage
    end

    def map_error(error_name, cause = nil)
      if options.error_mapper.respond_to?(:call)
        options.error_mapper.call(error_name, cause, self)
      else
        options.error_mapper.const_get(error_name).new(cause&.message, self)
      end
    end
  end
end