NullVoxPopuli/action_cable_client

View on GitHub
lib/action_cable_client.rb

Summary

Maintainability
A
1 hr
Test Coverage
# frozen_string_literal: true

# required gems
require 'websocket-eventmachine-client'
require 'forwardable'
require 'json'

# local files
require 'action_cable_client/errors'
require 'action_cable_client/message_factory'
require 'action_cable_client/message'

class ActionCableClient
  extend Forwardable

  class Commands
    SUBSCRIBE = 'subscribe'
    MESSAGE = 'message'
  end

  attr_reader :_websocket_client, :_uri
  attr_reader :_message_factory
  # The queue should store entries in the format:
  # [ action, data ]
  attr_accessor :message_queue, :_subscribed
  attr_accessor :_subscribed_callback, :_rejected_callback, :_pinged_callback, :_connected_callback, :_disconnected_callback

  def_delegator :_websocket_client, :onerror, :errored
  def_delegator :_websocket_client, :send, :send_msg

  # @param [String] uri - e.g.: ws://domain:port
  # @param [String] params - the name of the channel on the Rails server
  #                          or params. This gets sent with every request.
  #                           e.g.: RoomChannel
  # @param [Boolean] connect_on_start - connects on init when true
  #                                   - otherwise manually call `connect!`
  # @param [Hash] headers - HTTP headers to use in the handshake
  # @param [Hash] tls - TLS options hash to be passed to EM start_tls
  def initialize(uri, params = '', connect_on_start = true, headers = {}, tls = {})
    @_uri = uri
    @message_queue = []
    @_subscribed = false

    @_message_factory = MessageFactory.new(params)

    connect!(headers, tls) if connect_on_start
  end

  def connect!(headers = {}, tls = {})
    # Quick Reference for WebSocket::EM::Client's api
    # - onopen - called after successfully connecting
    # - onclose - called after closing connection
    # - onmessage - called when client recives a message. on `message do |msg, type (text or binary)|``
    #             - also called when a ping is received
    # - onerror - called when client encounters an error
    # - onping - called when client receives a ping from the server
    # - onpong - called when client receives a pong from the server
    # - send - sends a message to the server (and also disables any metaprogramming shenanigans :-/)
    # - close - closes the connection and optionally sends close frame to server. `close(code, data)`
    # - ping - sends a ping
    # - pong - sends a pong
    @_websocket_client = WebSocket::EventMachine::Client.connect(uri: @_uri, headers: headers, tls: tls)

    @_websocket_client.onclose do
      self._subscribed = false
      _disconnected_callback&.call
    end
  end

  def reconnect!
    uri = URI(@_uri)
    EventMachine.reconnect uri.host, uri.port, @_websocket_client
    @_websocket_client.post_init
  end

  # @param [String] action - how the message is being sent
  # @param [Hash] data - the message to be sent to the channel
  def perform(action, data)
    dispatch_message(action, data)
  end

  # callback for received messages as well as
  # what triggers depleting the message queue
  #
  # @example
  #   client = ActionCableClient.new(uri, 'RoomChannel')
  #   client.received do |message|
  #     # the received message will be JSON
  #     puts message
  #   end
  def received
    _websocket_client.onmessage do |message, _type|
      handle_received_message(message) do |json|
        yield(json)
      end
    end
  end

  # callback when the client connects to the server
  #
  # @example
  #   client = ActionCableClient.new(uri, 'RoomChannel')
  #   client.connected do
  #     # do things after the client is connected to the server
  #   end
  def connected
    self._connected_callback = proc do |json|
      yield(json)
    end
  end

  # callback when the server rejects the subscription
  #
  # @example
  #   client = ActionCableClient.new(uri, 'RoomChannel')
  #   client.rejected do
  #     # do things after the server rejects the subscription
  #   end
  def rejected
    self._rejected_callback = proc do |json|
      yield(json)
    end
  end

  # callback when the client receives a confirm_subscription message
  # from the action_cable server.
  # This is only called once, and signifies that you can now send
  # messages on the channel
  #
  # @param [Proc] block - code to run after subscribing to the channel is confirmed
  #
  # @example
  #   client = ActionCableClient.new(uri, 'RoomChannel')
  #   client.connected {}
  #   client.subscribed do
  #     # do things after successful subscription confirmation
  #   end
  def subscribed(&block)
    self._subscribed_callback = block
  end

  # @return [Boolean] is the client subscribed to the channel?
  def subscribed?
    _subscribed
  end

  # callback when the server disconnects from the client.
  #
  # @example
  #   client = ActionCableClient.new(uri, 'RoomChannel')
  #   client.connected {}
  #   client.disconnected do
  #     # cleanup after the server disconnects from the client
  #   end
  def disconnected
    self._disconnected_callback = proc do
      yield
    end
  end

  def pinged(&block)
    self._pinged_callback = block
  end

  private

  # @param [String] message - the websockt message object
  def handle_received_message(message)
    return if message.empty?

    json = JSON.parse(message)

    if is_ping?(json)
      _pinged_callback&.call(json)
    elsif is_welcome?(json)
      subscribe
      _connected_callback&.call(json)
    elsif is_rejection?(json)
      _rejected_callback&.call(json)
    elsif !subscribed?
      check_for_subscribe_confirmation(json)
    else
      # TODO: do we want to yield any additional things?
      #       maybe just make it extensible?
      yield(json)
    end
  end

  # {"identifier" => "_ping","type" => "confirm_subscription"}
  def check_for_subscribe_confirmation(message)
    message_type = message[Message::TYPE_KEY]
    return unless Message::TYPE_CONFIRM_SUBSCRIPTION == message_type

    self._subscribed = true
    _subscribed_callback&.call
  end

  # {"identifier" => "_ping","message" => 1460201942}
  # {"identifier" => "_ping","type" => "confirm_subscription"}
  def is_ping?(message)
    message_identifier = message[Message::TYPE_KEY]
    Message::IDENTIFIER_PING == message_identifier
  end

  # {"type" => "welcome"}
  def is_welcome?(message)
    message_identifier = message[Message::TYPE_KEY]
    Message::IDENTIFIER_WELCOME == message_identifier
  end

  def is_rejection?(message)
    message_type = message[Message::TYPE_KEY]
    Message::TYPE_REJECT_SUBSCRIPTION == message_type
  end

  def subscribe
    msg = _message_factory.create(Commands::SUBSCRIBE)
    send_msg(msg.to_json)
  end

  def dispatch_message(action, data)
    # can't send messages if we aren't subscribed
    return unless subscribed?

    msg = _message_factory.create(Commands::MESSAGE, action, data)
    json = msg.to_json
    send_msg(json)
  end
end