lib/websocket_rails/connection_adapters.rb
module WebsocketRails
module ConnectionAdapters
attr_reader :adapters
module_function :adapters
def self.register(adapter)
@adapters ||= []
@adapters.unshift adapter
end
def self.establish_connection(request, dispatcher)
adapter = adapters.detect { |a| a.accepts?(request.env) } || raise(InvalidConnectionError)
adapter.new request, dispatcher
end
class Base
include Logging
def self.accepts?(env)
false
end
def self.inherited(adapter)
ConnectionAdapters.register adapter
end
attr_reader :dispatcher, :queue, :env, :request, :data_store
# The ConnectionManager will set the connection ID when the
# connection is opened.
attr_accessor :id
def initialize(request, dispatcher)
@env = request.env.dup
@request = request
@dispatcher = dispatcher
@connected = true
@queue = EventQueue.new
@data_store = DataStore::Connection.new(self)
@delegate = WebsocketRails::DelegationController.new
@delegate.instance_variable_set(:@_env, request.env)
@delegate.instance_variable_set(:@_request, request)
start_ping_timer
end
def on_open(data=nil)
event = Event.new_on_open( self, data )
dispatch event
trigger event
end
def on_message(encoded_data)
event = Event.new_from_json( encoded_data, self )
dispatch event
end
def on_close(data=nil)
@ping_timer.try(:cancel)
dispatch Event.new_on_close( self, data )
close_connection
end
def on_error(data=nil)
event = Event.new_on_error( self, data )
dispatch event
on_close event.data
end
def enqueue(event)
@queue << event
end
def trigger(event)
send "[#{event.serialize}]"
end
def flush
message = []
@queue.flush do |event|
message << event.as_json
end
send message.to_json
end
def send_message(event_name, data = {}, options = {})
options.merge! :user_id => user_identifier, :connection => self
options[:data] = data
event = Event.new(event_name, options)
event.trigger
end
def send(message)
raise NotImplementedError, "Override this method in the connection specific adapter class"
end
def rack_response
[ -1, {}, [] ]
end
def controller_delegate
@delegate
end
def connected?
true & @connected
end
def inspect
"#<Connection::#{id}>"
end
def to_s
inspect
end
def user_connection?
not user_identifier.nil?
end
def user
return unless user_connection?
controller_delegate.current_user
end
def user_identifier
@user_identifier ||= begin
identifier = WebsocketRails.config.user_identifier
return unless current_user_responds_to?(identifier)
controller_delegate.current_user.send(identifier)
end
end
def ping_interval
@ping_interval ||= WebsocketRails.config.default_ping_interval
end
def ping_interval=(interval)
@ping_interval = interval.to_i
@ping_timer.try(:cancel)
start_ping_timer
end
private
def dispatch(event)
dispatcher.dispatch event
end
def connection_manager
dispatcher.connection_manager
end
def close_connection
@data_store.destroy!
@ping_timer.try(:cancel)
dispatcher.connection_manager.close_connection self
end
def current_user_responds_to?(identifier)
controller_delegate &&
controller_delegate.respond_to?(:current_user) &&
controller_delegate.current_user &&
controller_delegate.current_user.respond_to?(identifier)
end
attr_accessor :pong
public :pong, :pong=
def start_ping_timer
@pong = true
# Set negative interval to nil to deactivate periodic pings
if ping_interval > 0
@ping_timer = EM::PeriodicTimer.new(ping_interval) do
if pong == true
self.pong = false
ping = Event.new_on_ping self
trigger ping
else
@ping_timer.cancel
on_error
end
end
end
end
end
end
end