actioncable/lib/action_cable/channel/base.rb
# frozen_string_literal: true
# :markup: markdown
require "set"
require "active_support/rescuable"
require "active_support/parameter_filter"
module ActionCable
module Channel
# # Action Cable Channel Base
#
# The channel provides the basic structure of grouping behavior into logical
# units when communicating over the WebSocket connection. You can think of a
# channel like a form of controller, but one that's capable of pushing content
# to the subscriber in addition to simply responding to the subscriber's direct
# requests.
#
# Channel instances are long-lived. A channel object will be instantiated when
# the cable consumer becomes a subscriber, and then lives until the consumer
# disconnects. This may be seconds, minutes, hours, or even days. That means you
# have to take special care not to do anything silly in a channel that would
# balloon its memory footprint or whatever. The references are forever, so they
# won't be released as is normally the case with a controller instance that gets
# thrown away after every request.
#
# Long-lived channels (and connections) also mean you're responsible for
# ensuring that the data is fresh. If you hold a reference to a user record, but
# the name is changed while that reference is held, you may be sending stale
# data if you don't take precautions to avoid it.
#
# The upside of long-lived channel instances is that you can use instance
# variables to keep reference to objects that future subscriber requests can
# interact with. Here's a quick example:
#
# class ChatChannel < ApplicationCable::Channel
# def subscribed
# @room = Chat::Room[params[:room_number]]
# end
#
# def speak(data)
# @room.speak data, user: current_user
# end
# end
#
# The #speak action simply uses the Chat::Room object that was created when the
# channel was first subscribed to by the consumer when that subscriber wants to
# say something in the room.
#
# ## Action processing
#
# Unlike subclasses of ActionController::Base, channels do not follow a RESTful
# constraint form for their actions. Instead, Action Cable operates through a
# remote-procedure call model. You can declare any public method on the channel
# (optionally taking a `data` argument), and this method is automatically
# exposed as callable to the client.
#
# Example:
#
# class AppearanceChannel < ApplicationCable::Channel
# def subscribed
# @connection_token = generate_connection_token
# end
#
# def unsubscribed
# current_user.disappear @connection_token
# end
#
# def appear(data)
# current_user.appear @connection_token, on: data['appearing_on']
# end
#
# def away
# current_user.away @connection_token
# end
#
# private
# def generate_connection_token
# SecureRandom.hex(36)
# end
# end
#
# In this example, the subscribed and unsubscribed methods are not callable
# methods, as they were already declared in ActionCable::Channel::Base, but
# `#appear` and `#away` are. `#generate_connection_token` is also not callable,
# since it's a private method. You'll see that appear accepts a data parameter,
# which it then uses as part of its model call. `#away` does not, since it's
# simply a trigger action.
#
# Also note that in this example, `current_user` is available because it was
# marked as an identifying attribute on the connection. All such identifiers
# will automatically create a delegation method of the same name on the channel
# instance.
#
# ## Rejecting subscription requests
#
# A channel can reject a subscription request in the #subscribed callback by
# invoking the #reject method:
#
# class ChatChannel < ApplicationCable::Channel
# def subscribed
# @room = Chat::Room[params[:room_number]]
# reject unless current_user.can_access?(@room)
# end
# end
#
# In this example, the subscription will be rejected if the `current_user` does
# not have access to the chat room. On the client-side, the `Channel#rejected`
# callback will get invoked when the server rejects the subscription request.
class Base
include Callbacks
include PeriodicTimers
include Streams
include Naming
include Broadcasting
include ActiveSupport::Rescuable
attr_reader :params, :connection, :identifier
delegate :logger, to: :connection
class << self
# A list of method names that should be considered actions. This includes all
# public instance methods on a channel, less any internal methods (defined on
# Base), adding back in any methods that are internal, but still exist on the
# class itself.
#
# #### Returns
# * `Set` - A set of all methods that should be considered actions.
def action_methods
@action_methods ||= begin
# All public instance methods of this class, including ancestors
methods = (public_instance_methods(true) -
# Except for public instance methods of Base and its ancestors
ActionCable::Channel::Base.public_instance_methods(true) +
# Be sure to include shadowed public instance methods of this class
public_instance_methods(false)).uniq.map(&:to_s)
methods.to_set
end
end
private
# action_methods are cached and there is sometimes need to refresh them.
# ::clear_action_methods! allows you to do that, so next time you run
# action_methods, they will be recalculated.
def clear_action_methods! # :doc:
@action_methods = nil
end
# Refresh the cached action_methods when a new action_method is added.
def method_added(name) # :doc:
super
clear_action_methods!
end
end
def initialize(connection, identifier, params = {})
@connection = connection
@identifier = identifier
@params = params
# When a channel is streaming via pubsub, we want to delay the confirmation
# transmission until pubsub subscription is confirmed.
#
# The counter starts at 1 because it's awaiting a call to #subscribe_to_channel
@defer_subscription_confirmation_counter = Concurrent::AtomicFixnum.new(1)
@reject_subscription = nil
@subscription_confirmation_sent = nil
delegate_connection_identifiers
end
# Extract the action name from the passed data and process it via the channel.
# The process will ensure that the action requested is a public method on the
# channel declared by the user (so not one of the callbacks like #subscribed).
def perform_action(data)
action = extract_action(data)
if processable_action?(action)
payload = { channel_class: self.class.name, action: action, data: data }
ActiveSupport::Notifications.instrument("perform_action.action_cable", payload) do
dispatch_action(action, data)
end
else
logger.error "Unable to process #{action_signature(action, data)}"
end
end
# This method is called after subscription has been added to the connection and
# confirms or rejects the subscription.
def subscribe_to_channel
run_callbacks :subscribe do
subscribed
end
reject_subscription if subscription_rejected?
ensure_confirmation_sent
end
# Called by the cable connection when it's cut, so the channel has a chance to
# cleanup with callbacks. This method is not intended to be called directly by
# the user. Instead, override the #unsubscribed callback.
def unsubscribe_from_channel # :nodoc:
run_callbacks :unsubscribe do
unsubscribed
end
end
private
# Called once a consumer has become a subscriber of the channel. Usually the
# place to set up any streams you want this channel to be sending to the
# subscriber.
def subscribed # :doc:
# Override in subclasses
end
# Called once a consumer has cut its cable connection. Can be used for cleaning
# up connections or marking users as offline or the like.
def unsubscribed # :doc:
# Override in subclasses
end
# Transmit a hash of data to the subscriber. The hash will automatically be
# wrapped in a JSON envelope with the proper channel identifier marked as the
# recipient.
def transmit(data, via: nil) # :doc:
logger.debug do
status = "#{self.class.name} transmitting #{data.inspect.truncate(300)}"
status += " (via #{via})" if via
status
end
payload = { channel_class: self.class.name, data: data, via: via }
ActiveSupport::Notifications.instrument("transmit.action_cable", payload) do
connection.transmit identifier: @identifier, message: data
end
end
def ensure_confirmation_sent # :doc:
return if subscription_rejected?
@defer_subscription_confirmation_counter.decrement
transmit_subscription_confirmation unless defer_subscription_confirmation?
end
def defer_subscription_confirmation! # :doc:
@defer_subscription_confirmation_counter.increment
end
def defer_subscription_confirmation? # :doc:
@defer_subscription_confirmation_counter.value > 0
end
def subscription_confirmation_sent? # :doc:
@subscription_confirmation_sent
end
def reject # :doc:
@reject_subscription = true
end
def subscription_rejected? # :doc:
@reject_subscription
end
def delegate_connection_identifiers
connection.identifiers.each do |identifier|
define_singleton_method(identifier) do
connection.send(identifier)
end
end
end
def extract_action(data)
(data["action"].presence || :receive).to_sym
end
def processable_action?(action)
self.class.action_methods.include?(action.to_s) unless subscription_rejected?
end
def dispatch_action(action, data)
logger.debug action_signature(action, data)
if method(action).arity == 1
public_send action, data
else
public_send action
end
rescue Exception => exception
rescue_with_handler(exception) || raise
end
def action_signature(action, data)
(+"#{self.class.name}##{action}").tap do |signature|
arguments = data.except("action")
if arguments.any?
arguments = parameter_filter.filter(arguments)
signature << "(#{arguments.inspect})"
end
end
end
def parameter_filter
@parameter_filter ||= ActiveSupport::ParameterFilter.new(connection.config.filter_parameters)
end
def transmit_subscription_confirmation
unless subscription_confirmation_sent?
logger.debug "#{self.class.name} is transmitting the subscription confirmation"
ActiveSupport::Notifications.instrument("transmit_subscription_confirmation.action_cable", channel_class: self.class.name) do
connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation]
@subscription_confirmation_sent = true
end
end
end
def reject_subscription
connection.subscriptions.remove_subscription self
transmit_subscription_rejection
end
def transmit_subscription_rejection
logger.debug "#{self.class.name} is transmitting the subscription rejection"
ActiveSupport::Notifications.instrument("transmit_subscription_rejection.action_cable", channel_class: self.class.name) do
connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection]
end
end
end
end
end
ActiveSupport.run_load_hooks(:action_cable_channel, ActionCable::Channel::Base)