actioncable/lib/action_cable/connection/base.rb

Summary

Maintainability
B
4 hrs
Test Coverage
# frozen_string_literal: true

# :markup: markdown

require "action_dispatch"
require "active_support/rescuable"

module ActionCable
  module Connection
    # # Action Cable Connection Base
    #
    # For every WebSocket connection the Action Cable server accepts, a Connection
    # object will be instantiated. This instance becomes the parent of all of the
    # channel subscriptions that are created from there on. Incoming messages are
    # then routed to these channel subscriptions based on an identifier sent by the
    # Action Cable consumer. The Connection itself does not deal with any specific
    # application logic beyond authentication and authorization.
    #
    # Here's a basic example:
    #
    #     module ApplicationCable
    #       class Connection < ActionCable::Connection::Base
    #         identified_by :current_user
    #
    #         def connect
    #           self.current_user = find_verified_user
    #           logger.add_tags current_user.name
    #         end
    #
    #         def disconnect
    #           # Any cleanup work needed when the cable connection is cut.
    #         end
    #
    #         private
    #           def find_verified_user
    #             User.find_by_identity(cookies.encrypted[:identity_id]) ||
    #               reject_unauthorized_connection
    #           end
    #       end
    #     end
    #
    # First, we declare that this connection can be identified by its current_user.
    # This allows us to later be able to find all connections established for that
    # current_user (and potentially disconnect them). You can declare as many
    # identification indexes as you like. Declaring an identification means that an
    # attr_accessor is automatically set for that key.
    #
    # Second, we rely on the fact that the WebSocket connection is established with
    # the cookies from the domain being sent along. This makes it easy to use signed
    # cookies that were set when logging in via a web interface to authorize the
    # WebSocket connection.
    #
    # Finally, we add a tag to the connection-specific logger with the name of the
    # current user to easily distinguish their messages in the log.
    #
    # Pretty simple, eh?
    class Base
      include Identification
      include InternalChannel
      include Authorization
      include Callbacks
      include ActiveSupport::Rescuable

      attr_reader :server, :env, :subscriptions, :logger, :worker_pool, :protocol
      delegate :event_loop, :pubsub, :config, to: :server

      def initialize(server, env, coder: ActiveSupport::JSON)
        @server, @env, @coder = server, env, coder

        @worker_pool = server.worker_pool
        @logger = new_tagged_logger

        @websocket      = ActionCable::Connection::WebSocket.new(env, self, event_loop)
        @subscriptions  = ActionCable::Connection::Subscriptions.new(self)
        @message_buffer = ActionCable::Connection::MessageBuffer.new(self)

        @_internal_subscriptions = nil
        @started_at = Time.now
      end

      # Called by the server when a new WebSocket connection is established. This
      # configures the callbacks intended for overwriting by the user. This method
      # should not be called directly -- instead rely upon on the #connect (and
      # #disconnect) callbacks.
      def process # :nodoc:
        logger.info started_request_message

        if websocket.possible? && allow_request_origin?
          respond_to_successful_request
        else
          respond_to_invalid_request
        end
      end

      # Decodes WebSocket messages and dispatches them to subscribed channels.
      # WebSocket message transfer encoding is always JSON.
      def receive(websocket_message) # :nodoc:
        send_async :dispatch_websocket_message, websocket_message
      end

      def dispatch_websocket_message(websocket_message) # :nodoc:
        if websocket.alive?
          handle_channel_command decode(websocket_message)
        else
          logger.error "Ignoring message processed after the WebSocket was closed: #{websocket_message.inspect})"
        end
      end

      def handle_channel_command(payload)
        run_callbacks :command do
          subscriptions.execute_command payload
        end
      end

      def transmit(cable_message) # :nodoc:
        websocket.transmit encode(cable_message)
      end

      # Close the WebSocket connection.
      def close(reason: nil, reconnect: true)
        transmit(
          type: ActionCable::INTERNAL[:message_types][:disconnect],
          reason: reason,
          reconnect: reconnect
        )
        websocket.close
      end

      # Invoke a method on the connection asynchronously through the pool of thread
      # workers.
      def send_async(method, *arguments)
        worker_pool.async_invoke(self, method, *arguments)
      end

      # Return a basic hash of statistics for the connection keyed with `identifier`,
      # `started_at`, `subscriptions`, and `request_id`. This can be returned by a
      # health check against the connection.
      def statistics
        {
          identifier: connection_identifier,
          started_at: @started_at,
          subscriptions: subscriptions.identifiers,
          request_id: @env["action_dispatch.request_id"]
        }
      end

      def beat
        transmit type: ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_i
      end

      def on_open # :nodoc:
        send_async :handle_open
      end

      def on_message(message) # :nodoc:
        message_buffer.append message
      end

      def on_error(message) # :nodoc:
        # log errors to make diagnosing socket errors easier
        logger.error "WebSocket error occurred: #{message}"
      end

      def on_close(reason, code) # :nodoc:
        send_async :handle_close
      end

      def inspect # :nodoc:
        "#<#{self.class.name}:#{'%#016x' % (object_id << 1)}>"
      end

      private
        attr_reader :websocket
        attr_reader :message_buffer

        # The request that initiated the WebSocket connection is available here. This
        # gives access to the environment, cookies, etc.
        def request # :doc:
          @request ||= begin
            environment = Rails.application.env_config.merge(env) if defined?(Rails.application) && Rails.application
            ActionDispatch::Request.new(environment || env)
          end
        end

        # The cookies of the request that initiated the WebSocket connection. Useful for
        # performing authorization checks.
        def cookies # :doc:
          request.cookie_jar
        end

        def encode(cable_message)
          @coder.encode cable_message
        end

        def decode(websocket_message)
          @coder.decode websocket_message
        end

        def handle_open
          @protocol = websocket.protocol
          connect if respond_to?(:connect)
          subscribe_to_internal_channel
          send_welcome_message

          message_buffer.process!
          server.add_connection(self)
        rescue ActionCable::Connection::Authorization::UnauthorizedError
          close(reason: ActionCable::INTERNAL[:disconnect_reasons][:unauthorized], reconnect: false) if websocket.alive?
        end

        def handle_close
          logger.info finished_request_message

          server.remove_connection(self)

          subscriptions.unsubscribe_from_all
          unsubscribe_from_internal_channel

          disconnect if respond_to?(:disconnect)
        end

        def send_welcome_message
          # Send welcome message to the internal connection monitor channel. This ensures
          # the connection monitor state is reset after a successful websocket connection.
          transmit type: ActionCable::INTERNAL[:message_types][:welcome]
        end

        def allow_request_origin?
          return true if server.config.disable_request_forgery_protection

          proto = Rack::Request.new(env).ssl? ? "https" : "http"
          if server.config.allow_same_origin_as_host && env["HTTP_ORIGIN"] == "#{proto}://#{env['HTTP_HOST']}"
            true
          elsif Array(server.config.allowed_request_origins).any? { |allowed_origin|  allowed_origin === env["HTTP_ORIGIN"] }
            true
          else
            logger.error("Request origin not allowed: #{env['HTTP_ORIGIN']}")
            false
          end
        end

        def respond_to_successful_request
          logger.info successful_request_message
          websocket.rack_response
        end

        def respond_to_invalid_request
          close(reason: ActionCable::INTERNAL[:disconnect_reasons][:invalid_request]) if websocket.alive?

          logger.error invalid_request_message
          logger.info finished_request_message
          [ 404, { Rack::CONTENT_TYPE => "text/plain; charset=utf-8" }, [ "Page not found" ] ]
        end

        # Tags are declared in the server but computed in the connection. This allows us
        # per-connection tailored tags.
        def new_tagged_logger
          TaggedLoggerProxy.new server.logger,
            tags: server.config.log_tags.map { |tag| tag.respond_to?(:call) ? tag.call(request) : tag.to_s.camelize }
        end

        def started_request_message
          'Started %s "%s"%s for %s at %s' % [
            request.request_method,
            request.filtered_path,
            websocket.possible? ? " [WebSocket]" : "[non-WebSocket]",
            request.ip,
            Time.now.to_s ]
        end

        def finished_request_message
          'Finished "%s"%s for %s at %s' % [
            request.filtered_path,
            websocket.possible? ? " [WebSocket]" : "[non-WebSocket]",
            request.ip,
            Time.now.to_s ]
        end

        def invalid_request_message
          "Failed to upgrade to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)" % [
            env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"]
          ]
        end

        def successful_request_message
          "Successfully upgraded to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)" % [
            env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"]
          ]
        end
    end
  end
end

ActiveSupport.run_load_hooks(:action_cable_connection, ActionCable::Connection::Base)