lib/celluloid_pubsub/reactor.rb
# encoding: utf-8
# frozen_string_literal: true
require_relative './registry'
require_relative './helper'
module CelluloidPubsub
# The reactor handles new connections. Based on what the client sends it either subscribes to a channel
# or will publish to a channel or just dispatch to the server if command is neither subscribe, publish or unsubscribe
#
# @!attribute websocket
# @return [Reel::WebSocket] websocket connection
#
# @!attribute server
# @return [CelluloidPubsub::Webserver] the server actor to which the reactor is connected to
#
# @!attribute channels
# @return [Array] array of channels to which the current reactor has subscribed to
class Reactor
include CelluloidPubsub::BaseActor
# available actions that can be delegated
AVAILABLE_ACTIONS = %w(unsubscribe_clients unsubscribe subscribe publish unsubscribe_all).freeze
# The websocket connection received from the server
# @return [Reel::WebSocket] websocket connection
attr_accessor :websocket
# The server instance to which this reactor is linked to
# @return [CelluloidPubsub::Webserver] the server actor to which the reactor is connected to
attr_accessor :server
# The channels to which this reactor has subscribed to
# @return [Array] array of channels to which the current reactor has subscribed to
attr_accessor :channels
finalizer :shutdown
# rececives a new socket connection from the server
# and listens for messages
#
# @param [Reel::WebSocket] websocket
#
# @return [void]
#
# @api public
def work(websocket, server)
@server = server
@channels = []
@websocket = websocket
log_debug "#{self.class} Streaming changes for #{websocket.url}"
async.run
end
# the method will return true if debug is enabled
#
#
# @return [Boolean] returns true if debug is enabled otherwise false
#
# @api public
def debug_enabled?
!@server.dead? && @server.debug_enabled?
end
# reads from the socket the message
# and dispatches it to the handle_websocket_message method
# @see #handle_websocket_message
#
# @return [void]
#
# @api public
#
# :nocov:
def run
loop do
break if Actor.current.dead? || @websocket.closed? || @server.dead?
message = try_read_websocket
handle_websocket_message(message) if message.present?
end
end
# will try to read the message from the websocket
# and if it fails will log the exception if debug is enabled
#
# @return [void]
#
# @api public
#
# :nocov:
def try_read_websocket
@websocket.closed? ? nil : @websocket.read
rescue
nil
end
# the method will return the reactor's class name used in debug messages
#
#
# @return [Class] returns the reactor's class name used in debug messages
#
# @api public
def reactor_class
self.class
end
# method used to parse a JSON object into a Hash object
#
# @param [JSON] message
#
# @return [Hash]
#
# @api public
def parse_json_data(message)
JSON.parse(message)
rescue => exception
log_debug "#{reactor_class} could not parse #{message} because of #{exception.inspect}"
message
end
# method that handles the message received from the websocket connection
# first will try to parse the message {#parse_json_data} and then it will dispatch
# it to another method that will decide depending the message what action
# should the reactor take {#handle_parsed_websocket_message}
#
# @see #parse_json_data
# @see #handle_parsed_websocket_message
#
# @param [JSON] message
#
# @return [void]
#
# @api public
def handle_websocket_message(message)
log_debug "#{reactor_class} read message #{message}"
json_data = parse_json_data(message)
handle_parsed_websocket_message(json_data)
end
# method that checks if the data is a Hash
#
# if the data is a hash then will stringify the keys and will call the method {#delegate_action}
# that will handle the message, otherwise will call the method {#handle_unknown_action}
#
# @see #delegate_action
# @see #handle_unknown_action
#
# @param [Hash] json_data
#
# @return [void]
#
# @api public
def handle_parsed_websocket_message(json_data)
data = json_data.is_a?(Hash) ? json_data.stringify_keys : {}
if CelluloidPubsub::Reactor::AVAILABLE_ACTIONS.include?(data['client_action'].to_s)
log_debug "#{self.class} finds actions for #{json_data}"
delegate_action(data) if data['client_action'].present?
else
handle_unknown_action(data['channel'], json_data)
end
end
# method that checks if the data is a Hash
#
# if the data is a hash then will stringify the keys and will call the method {#delegate_action}
# that will handle the message, otherwise will call the method {#handle_unknown_action}
#
# @see #delegate_action
# @see #handle_unknown_action
#
# @param [Hash] json_data
# @option json_data [String] :client_action The action based on which the reactor will decide what action should make
#
# Possible values are:
# unsubscribe_all
# unsubscribe_clients
# unsubscribe
# subscribe
# publish
#
#
# @return [void]
#
# @api public
def delegate_action(json_data)
async.send(json_data['client_action'], json_data['channel'], json_data)
end
# the method will delegate the message to the server in an asyncronous way by sending the current actor and the message
# @see {CelluloidPubsub::WebServer#handle_dispatched_message}
#
# @param [Hash] json_data
#
# @return [void]
#
# @api public
def handle_unknown_action(channel, json_data)
log_debug "Trying to dispatch to server #{json_data} on channel #{channel}"
@server.async.handle_dispatched_message(Actor.current, json_data)
end
# if the reactor has unsubscribed from all his channels will close the websocket connection,
# otherwise will delete the channel from his channel list
#
# @param [String] channel The channel that needs to be deleted from the reactor's list of subscribed channels
#
# @return [void]
#
# @api public
def forget_channel(channel)
if @channels.blank?
@websocket.close
else
@channels.delete(channel)
end
end
# the method will unsubscribe a client by closing the websocket connection if has unscribed from all channels
# and deleting the reactor from the channel list on the server
#
# @param [String] channel
#
# @return [void]
#
# @api public
def unsubscribe(channel, _json_data)
log_debug "#{self.class} runs 'unsubscribe' method with #{channel}"
return unless channel.present?
forget_channel(channel)
delete_server_subscribers(channel)
end
# the method will delete the reactor from the channel list on the server
#
# @param [String] channel
#
# @return [void]
#
# @api public
def delete_server_subscribers(channel)
@server.mutex.synchronize do
(@server.subscribers[channel] || []).delete_if do |hash|
hash[:reactor] == Actor.current
end
end
end
# the method will unsubscribe all clients subscribed to a channel by closing the
#
# @param [String] channel
#
# @return [void]
#
# @api public
def unsubscribe_clients(channel, _json_data)
log_debug "#{self.class} runs 'unsubscribe_clients' method with #{channel}"
return if channel.blank?
unsubscribe_from_channel(channel)
@server.subscribers[channel] = []
end
# the method will terminate the current actor
#
#
# @return [void]
#
# @api public
def shutdown
debug "#{self.class} tries to 'shudown'"
@websocket.close if @websocket.present? && !@websocket.closed?
terminate
end
# this method will add the current actor to the list of the subscribers {#add_subscriber_to_channel}
# and will write to the socket a message for succesful subscription
#
# @see #add_subscriber_to_channel
#
# @param [String] channel
# @param [Object] message
#
# @return [void]
#
# @api public
def subscribe(channel, message)
return unless channel.present?
add_subscriber_to_channel(channel, message)
log_debug "#{self.class} subscribed to #{channel} with #{message}"
@websocket << message.merge('client_action' => 'successful_subscription', 'channel' => channel).to_json if @server.adapter == CelluloidPubsub::WebServer::CLASSIC_ADAPTER
end
# this method will return a list of all subscribers to a particular channel or a empty array
#
#
# @param [String] channel The channel that will be used to fetch all subscribers from this channel
#
# @return [Array] returns a list of all subscribers to a particular channel or a empty array
#
# @api public
def channel_subscribers(channel)
@server.subscribers[channel] || []
end
# adds the curent actor the list of the subscribers for a particular channel
# and registers the new channel
#
# @param [String] channel
# @param [Object] message
#
# @return [void]
#
# @api public
def add_subscriber_to_channel(channel, message)
registry_channels = CelluloidPubsub::Registry.channels
@channels << channel
registry_channels << channel unless registry_channels.include?(channel)
@server.mutex.synchronize do
@server.subscribers[channel] = channel_subscribers(channel).push(reactor: Actor.current, message: message)
end
end
# method for publishing data to a channel
#
# @param [String] current_topic The Channel to which the reactor instance {CelluloidPubsub::Reactor} will publish the message to
# @param [Object] json_data The additional data that contains the message that needs to be sent
#
# @return [void]
#
# @api public
def publish(current_topic, json_data)
message = json_data['data'].to_json
return if current_topic.blank? || message.blank?
server_pusblish_event(current_topic, message)
rescue => exception
log_debug("could not publish message #{message} into topic #{current_topic} because of #{exception.inspect}")
end
# the method will publish to all subsribers of a channel a message
#
# @param [String] current_topic
# @param [#to_s] message
#
# @return [void]
#
# @api public
def server_pusblish_event(current_topic, message)
@server.mutex.synchronize do
(@server.subscribers[current_topic].dup || []).pmap do |hash|
hash[:reactor].websocket << message
end
end
end
# unsubscribes all actors from all channels and terminates the curent actor
#
# @param [String] _channel NOT USED - needed to maintain compatibility with the other methods
# @param [Object] _json_data NOT USED - needed to maintain compatibility with the other methods
#
# @return [void]
#
# @api public
def unsubscribe_all(_channel, json_data)
log_debug "#{self.class} runs 'unsubscribe_all' method"
CelluloidPubsub::Registry.channels.dup.pmap do |channel|
unsubscribe_clients(channel, json_data)
end
log_debug 'clearing connections'
shutdown
end
# unsubscribes all actors from the specified chanel
#
# @param [String] channel
# @return [void]
#
# @api public
def unsubscribe_from_channel(channel)
log_debug "#{self.class} runs 'unsubscribe_from_channel' method with #{channel}"
server_kill_reactors(channel)
end
# kills all reactors registered on a channel and closes their websocket connection
#
# @param [String] channel
# @return [void]
#
# @api public
def server_kill_reactors(channel)
@server.mutex.synchronize do
(@server.subscribers[channel].dup || []).pmap do |hash|
reactor = hash[:reactor]
reactor.websocket.close
Celluloid::Actor.kill(reactor)
end
end
end
end
end