actioncable/lib/action_cable/subscription_adapter/redis.rb
# frozen_string_literal: true
# :markup: markdown
gem "redis", ">= 4", "< 6"
require "redis"
require "active_support/core_ext/hash/except"
module ActionCable
module SubscriptionAdapter
class Redis < Base # :nodoc:
prepend ChannelPrefix
# Overwrite this factory method for Redis connections if you want to use a
# different Redis library than the redis gem. This is needed, for example, when
# using Makara proxies for distributed Redis.
cattr_accessor :redis_connector, default: ->(config) do
::Redis.new(config.except(:adapter, :channel_prefix))
end
def initialize(*)
super
@listener = nil
@redis_connection_for_broadcasts = nil
end
def broadcast(channel, payload)
redis_connection_for_broadcasts.publish(channel, payload)
end
def subscribe(channel, callback, success_callback = nil)
listener.add_subscriber(channel, callback, success_callback)
end
def unsubscribe(channel, callback)
listener.remove_subscriber(channel, callback)
end
def shutdown
@listener.shutdown if @listener
end
def redis_connection_for_subscriptions
redis_connection
end
private
def listener
@listener || @server.mutex.synchronize { @listener ||= Listener.new(self, config_options, @server.event_loop) }
end
def redis_connection_for_broadcasts
@redis_connection_for_broadcasts || @server.mutex.synchronize do
@redis_connection_for_broadcasts ||= redis_connection
end
end
def redis_connection
self.class.redis_connector.call(config_options)
end
def config_options
@config_options ||= @server.config.cable.deep_symbolize_keys.merge(id: identifier)
end
class Listener < SubscriberMap
def initialize(adapter, config_options, event_loop)
super()
@adapter = adapter
@event_loop = event_loop
@subscribe_callbacks = Hash.new { |h, k| h[k] = [] }
@subscription_lock = Mutex.new
@reconnect_attempt = 0
# Use the same config as used by Redis conn
@reconnect_attempts = config_options.fetch(:reconnect_attempts, 1)
@reconnect_attempts = Array.new(@reconnect_attempts, 0) if @reconnect_attempts.is_a?(Integer)
@subscribed_client = nil
@when_connected = []
@thread = nil
end
def listen(conn)
conn.without_reconnect do
original_client = extract_subscribed_client(conn)
conn.subscribe("_action_cable_internal") do |on|
on.subscribe do |chan, count|
@subscription_lock.synchronize do
if count == 1
@reconnect_attempt = 0
@subscribed_client = original_client
until @when_connected.empty?
@when_connected.shift.call
end
end
if callbacks = @subscribe_callbacks[chan]
next_callback = callbacks.shift
@event_loop.post(&next_callback) if next_callback
@subscribe_callbacks.delete(chan) if callbacks.empty?
end
end
end
on.message do |chan, message|
broadcast(chan, message)
end
on.unsubscribe do |chan, count|
if count == 0
@subscription_lock.synchronize do
@subscribed_client = nil
end
end
end
end
end
end
def shutdown
@subscription_lock.synchronize do
return if @thread.nil?
when_connected do
@subscribed_client.unsubscribe
@subscribed_client = nil
end
end
Thread.pass while @thread.alive?
end
def add_channel(channel, on_success)
@subscription_lock.synchronize do
ensure_listener_running
@subscribe_callbacks[channel] << on_success
when_connected { @subscribed_client.subscribe(channel) }
end
end
def remove_channel(channel)
@subscription_lock.synchronize do
when_connected { @subscribed_client.unsubscribe(channel) }
end
end
def invoke_callback(*)
@event_loop.post { super }
end
private
def ensure_listener_running
@thread ||= Thread.new do
Thread.current.abort_on_exception = true
begin
conn = @adapter.redis_connection_for_subscriptions
listen conn
rescue ConnectionError
reset
if retry_connecting?
when_connected { resubscribe }
retry
end
end
end
end
def when_connected(&block)
if @subscribed_client
block.call
else
@when_connected << block
end
end
def retry_connecting?
@reconnect_attempt += 1
return false if @reconnect_attempt > @reconnect_attempts.size
sleep_t = @reconnect_attempts[@reconnect_attempt - 1]
sleep(sleep_t) if sleep_t > 0
true
end
def resubscribe
channels = @sync.synchronize do
@subscribers.keys
end
@subscribed_client.subscribe(*channels) unless channels.empty?
end
def reset
@subscription_lock.synchronize do
@subscribed_client = nil
@subscribe_callbacks.clear
@when_connected.clear
end
end
if ::Redis::VERSION < "5"
ConnectionError = ::Redis::BaseConnectionError
class SubscribedClient
def initialize(raw_client)
@raw_client = raw_client
end
def subscribe(*channel)
send_command("subscribe", *channel)
end
def unsubscribe(*channel)
send_command("unsubscribe", *channel)
end
private
def send_command(*command)
@raw_client.write(command)
very_raw_connection =
@raw_client.connection.instance_variable_defined?(:@connection) &&
@raw_client.connection.instance_variable_get(:@connection)
if very_raw_connection && very_raw_connection.respond_to?(:flush)
very_raw_connection.flush
end
nil
end
end
def extract_subscribed_client(conn)
SubscribedClient.new(conn._client)
end
else
ConnectionError = RedisClient::ConnectionError
def extract_subscribed_client(conn)
conn
end
end
end
end
end
end