lib/faulty/storage/redis.rb
# frozen_string_literal: true
class Faulty
module Storage
# A storage backend for storing circuit state in Redis.
#
# When using this or any networked backend, be sure to evaluate the risk,
# and set conservative timeouts so that the circuit storage does not cause
# cascading failures in your application when evaluating circuits. Always
# wrap this backend with a {FaultTolerantProxy} to limit the effect of
# these types of events.
class Redis
# Separates the time/status for history entry strings
ENTRY_SEPARATOR = ':'
attr_reader :options
# Options for {Redis}
#
# @!attribute [r] client
# @return [Redis,ConnectionPool] The Redis instance or a ConnectionPool
# used to connect to Redis. Default `::Redis.new`
# @!attribute [r] key_prefix
# @return [String] A string prepended to all Redis keys used to store
# circuit state. Default `faulty`.
# @!attribute [r] key_separator
# @return [String] A string used to separate the parts of the Redis keys
# used to store circuit state. Default `:`.
# @!attribute [r] max_sample_size
# @return [Integer] The number of cache run entries to keep in memory
# for each circuit. Default `100`.
# @!attribute [r] sample_ttl
# @return [Integer] The maximum number of seconds to store a
# circuit run history entry. Default `100`.
# @!attribute [r] circuit_ttl
# @return [Integer] The maximum number of seconds to keep a circuit.
# A value of `nil` disables circuit expiration. This does not apply to
# locks, which have an indefinite storage time.
# Default `604_800` (1 week).
# @!attribute [r] list_granularity
# @return [Integer] The number of seconds after which a new set is
# created to store circuit names. The old set is kept until
# circuit_ttl expires. Default `3600` (1 hour).
# @!attribute [r] disable_warnings
# @return [Boolean] By default, this class warns if the client options
# are outside the recommended values. Set to true to disable these
# warnings.
Options = Struct.new(
:client,
:key_prefix,
:key_separator,
:max_sample_size,
:sample_ttl,
:circuit_ttl,
:list_granularity,
:disable_warnings
) do
include ImmutableOptions
def defaults
{
key_prefix: 'faulty',
key_separator: ':',
max_sample_size: 100,
sample_ttl: 1800,
circuit_ttl: 604_800,
list_granularity: 3600,
disable_warnings: false
}
end
def required
%i[list_granularity]
end
def finalize
self.client = ::Redis.new(timeout: 1) unless client
end
end
# @param options [Hash] Attributes for {Options}
# @yield [Options] For setting options in a block
def initialize(**options, &block)
@options = Options.new(options, &block)
# Ensure JSON is available since we don't explicitly require it
JSON # rubocop:disable Lint/Void
check_client_options!
end
# Get the options stored for circuit
#
# @see Interface#get_options
# @param (see Interface#get_options)
# @return (see Interface#get_options)
def get_options(circuit)
json = redis { |r| r.get(options_key(circuit.name)) }
return if json.nil?
JSON.parse(json, symbolize_names: true)
end
# Store the options for a circuit
#
# These will be serialized as JSON
#
# @see Interface#set_options
# @param (see Interface#set_options)
# @return (see Interface#set_options)
def set_options(circuit, stored_options)
redis do |r|
r.set(options_key(circuit.name), JSON.dump(stored_options), ex: options.circuit_ttl)
end
end
# Add an entry to storage
#
# @see Interface#entry
# @param (see Interface#entry)
# @return (see Interface#entry)
def entry(circuit, time, success, status)
key = entries_key(circuit.name)
result = pipe do |r|
r.call([:sadd, list_key, circuit.name])
r.expire(list_key, options.circuit_ttl + options.list_granularity) if options.circuit_ttl
r.lpush(key, "#{time}#{ENTRY_SEPARATOR}#{success ? 1 : 0}")
r.ltrim(key, 0, options.max_sample_size - 1)
r.expire(key, options.sample_ttl) if options.sample_ttl
r.lrange(key, 0, -1) if status
end
Status.from_entries(map_entries(result.last), **status.to_h) if status
end
# Mark a circuit as open
#
# @see Interface#open
# @param (see Interface#open)
# @return (see Interface#open)
def open(circuit, opened_at)
key = state_key(circuit.name)
ex = options.circuit_ttl
result = watch_exec(key, ['closed', nil]) do |m|
m.set(key, 'open', ex: ex)
m.set(opened_at_key(circuit.name), opened_at, ex: ex)
end
result && result[0] == 'OK'
end
# Mark a circuit as reopened
#
# @see Interface#reopen
# @param (see Interface#reopen)
# @return (see Interface#reopen)
def reopen(circuit, opened_at, previous_opened_at)
key = opened_at_key(circuit.name)
result = watch_exec(key, [previous_opened_at.to_s]) do |m|
m.set(key, opened_at, ex: options.circuit_ttl)
end
result && result[0] == 'OK'
end
# Mark a circuit as closed
#
# @see Interface#close
# @param (see Interface#close)
# @return (see Interface#close)
def close(circuit)
key = state_key(circuit.name)
ex = options.circuit_ttl
result = watch_exec(key, ['open']) do |m|
m.set(key, 'closed', ex: ex)
m.del(entries_key(circuit.name))
end
result && result[0] == 'OK'
end
# Lock a circuit open or closed
#
# The circuit_ttl does not apply to locks
#
# @see Interface#lock
# @param (see Interface#lock)
# @return (see Interface#lock)
def lock(circuit, state)
redis { |r| r.set(lock_key(circuit.name), state) }
end
# Unlock a circuit
#
# @see Interface#unlock
# @param (see Interface#unlock)
# @return (see Interface#unlock)
def unlock(circuit)
redis { |r| r.del(lock_key(circuit.name)) }
end
# Reset a circuit
#
# @see Interface#reset
# @param (see Interface#reset)
# @return (see Interface#reset)
def reset(circuit)
name = circuit.is_a?(Circuit) ? circuit.name : circuit
pipe do |r|
r.del(
entries_key(name),
opened_at_key(name),
lock_key(name),
options_key(name)
)
r.set(state_key(name), 'closed', ex: options.circuit_ttl)
end
end
# Get the status of a circuit
#
# @see Interface#status
# @param (see Interface#status)
# @return (see Interface#status)
def status(circuit)
futures = {}
pipe do |r|
futures[:state] = r.get(state_key(circuit.name))
futures[:lock] = r.get(lock_key(circuit.name))
futures[:opened_at] = r.get(opened_at_key(circuit.name))
futures[:entries] = r.lrange(entries_key(circuit.name), 0, -1)
end
state = futures[:state].value&.to_sym || :closed
opened_at = futures[:opened_at].value ? Float(futures[:opened_at].value) : nil
opened_at = Faulty.current_time - options.circuit_ttl if state == :open && opened_at.nil?
Faulty::Status.from_entries(
map_entries(futures[:entries].value),
state: state,
lock: futures[:lock].value&.to_sym,
opened_at: opened_at,
options: circuit.options
)
end
# Get the circuit history up to `max_sample_size`
#
# @see Interface#history
# @param (see Interface#history)
# @return (see Interface#history)
def history(circuit)
entries = redis { |r| r.lrange(entries_key(circuit.name), 0, -1) }
map_entries(entries).reverse
end
# List all unexpired circuits
#
# @return (see Interface#list)
def list
redis { |r| r.sunion(*all_list_keys) }
end
# Reset all circuits
#
# This does not empty the list of circuits as returned by {#list}. This is
# because that would be a thread-usafe operation that could result in
# circuits not being in the list.
#
# This implmenentation resets circuits individually, and will be very
# slow for large numbers of circuits. It should not be used in production
# code.
#
# @return [void]
def clear
list.each { |c| reset(c) }
end
# Redis storage is not fault-tolerant
#
# @return [true]
def fault_tolerant?
false
end
private
# Generate a key from its parts
#
# @return [String] The key
def key(*parts)
[options.key_prefix, *parts].join(options.key_separator)
end
def ckey(circuit_name, *parts)
key('circuit', circuit_name, *parts)
end
# @return [String] The key for circuit options
def options_key(circuit_name)
ckey(circuit_name, 'options')
end
# @return [String] The key for circuit state
def state_key(circuit_name)
ckey(circuit_name, 'state')
end
# @return [String] The key for circuit run history entries
def entries_key(circuit_name)
ckey(circuit_name, 'entries')
end
# @return [String] The key for circuit locks
def lock_key(circuit_name)
ckey(circuit_name, 'lock')
end
# @return [String] The key for circuit opened_at
def opened_at_key(circuit_name)
ckey(circuit_name, 'opened_at')
end
# Get the current key to add circuit names to
def list_key
key('list', current_list_block)
end
# Get all active circuit list keys
#
# We use a rolling list of redis sets to store circuit names. This way we
# can maintain this index, while still using Redis to expire old circuits.
# Whenever we add a circuit to the list, we add it to the current set. A
# new set is created every `options.list_granularity` seconds.
#
# When reading the list, we union all sets together, which gets us the
# full list.
#
# Each set has its own expiration, so that the oldest sets will
# automatically be deleted from Redis after `options.circuit_ttl`.
#
# It is possible for a single circuit name to be a part of many of these
# sets. This is the space trade-off we make in exchange for automatic
# expiration.
#
# @return [Array<String>] An array of redis keys for circuit name sets
def all_list_keys
num_blocks = (options.circuit_ttl.to_f / options.list_granularity).floor + 1
start_block = current_list_block - num_blocks + 1
num_blocks.times.map do |i|
key('list', start_block + i)
end
end
# Get the block number for the current list set
#
# @return [Integer] The current block number
def current_list_block
(Faulty.current_time / options.list_granularity).floor
end
# Watch a Redis key and exec commands only if the key matches the expected
# value. Internally this uses Redis transactions with WATCH/MULTI/EXEC.
#
# @param key [String] The redis key to watch
# @param old [Array<String>] A list of previous values. The block will be
# run only if key is one of these values.
# @yield [Redis] A redis client. Commands executed using this client
# will be executed inside the MULTI context and will only be run if
# the watch succeeds and the comparison passes
# @return [Array] An array of Redis results from the commands executed
# inside the block
def watch_exec(key, old, &block)
redis do |r|
r.watch(key) do
if old.include?(r.get(key))
r.multi(&block)
else
r.unwatch
nil
end
end
end
end
# Yield a Redis connection
#
# @yield [Redis] Yields the connection to the block
# @return The value returned from the block
def redis(&block)
if options.client.respond_to?(:with)
options.client.with(&block)
else
yield options.client
end
end
# Yield a pipelined Redis connection
#
# @yield [Redis::Pipeline] Yields the connection to the block
# @return [void]
def pipe(&block)
redis { |r| r.pipelined(&block) }
end
# Map raw Redis history entries to Faulty format
#
# @see Storage::Interface
# @param raw_entries [Array<String>] The raw Redis entries
# @return [Array<Array>] The Faulty-formatted entries
def map_entries(raw_entries)
raw_entries.map do |e|
time, state = e.split(ENTRY_SEPARATOR)
[Float(time), state == '1']
end
end
def check_client_options!
return if options.disable_warnings
check_redis_options!
check_pool_options!
rescue StandardError => e
warn "Faulty error while checking client options: #{e.message}"
end
def check_redis_options!
gte5 = ::Redis::VERSION.to_f >= 5
method = gte5 ? :config : :options
ropts = redis do |r|
r.instance_variable_get(:@client).public_send(method)
end
bad_timeouts = {}
%i[connect_timeout read_timeout write_timeout].each do |time_opt|
value = gte5 ? ropts.public_send(time_opt) : ropts[time_opt]
bad_timeouts[time_opt] = value if value > 2
end
unless bad_timeouts.empty?
warn <<~MSG
Faulty recommends setting Redis timeouts <= 2 to prevent cascading
failures when evaluating circuits. Your options are:
#{bad_timeouts}
MSG
end
gt1_retry = gte5 ? ropts.retry_connecting?(1, nil) : ropts[:reconnect_attempts] > 1
if gt1_retry
warn <<~MSG
Faulty recommends setting Redis reconnect_attempts to <= 1 to
prevent cascading failures. Your setting is larger.
MSG
end
end
def check_pool_options!
if options.client.class.name == 'ConnectionPool'
timeout = options.client.instance_variable_get(:@timeout)
warn(<<~MSG) if timeout > 2
Faulty recommends setting ConnectionPool timeouts <= 2 to prevent
cascading failures when evaluating circuits. Your setting is #{timeout}
MSG
end
end
end
end
end