slack-ruby/slack-ruby-client

View on GitHub
lib/slack/real_time/concurrency/async.rb

Summary

Maintainability
A
2 hrs
Test Coverage
# frozen_string_literal: true
require 'async/websocket'
require 'async/notification'
require 'async/clock'

module Slack
  module RealTime
    module Concurrency
      module Async
        class Client < ::Async::WebSocket::Client
          extend ::Forwardable
          def_delegators :@driver, :on, :text, :binary, :emit
        end

        class Socket < Slack::RealTime::Socket
          attr_reader :client

          def start_sync(client)
            start_reactor(client).wait
          end

          def start_async(client)
            Thread.new do
              start_reactor(client)
            end
          end

          def start_reactor(client)
            Async do |task|
              @restart = ::Async::Notification.new

              if client.run_ping?
                @ping_task = task.async do |subtask|
                  subtask.annotate "#{client} keep-alive"

                  # The timer task will naturally exit after the driver is set to nil.
                  while @restart
                    subtask.sleep client.websocket_ping_timer
                    client.run_ping! if @restart
                  end
                end
              end

              while @restart
                @client_task&.stop

                @client_task = task.async do |subtask|
                  subtask.annotate "#{client} run-loop"
                  client.run_loop
                rescue ::Async::Wrapper::Cancelled => e
                # Will get restarted by ping worker.
                rescue StandardError => e
                  client.logger.error(subtask.to_s) { e.message }
                end

                @restart.wait
              end

              @ping_task&.stop
            end
          end

          def restart_async(_client, new_url)
            @url = new_url
            @last_message_at = current_time

            @restart&.signal
          end

          def run_async(&block)
            ::Async.run(&block)
          end

          def current_time
            ::Async::Clock.now
          end

          def connect!
            super
            run_loop
          end

          # Kill the restart/ping loop.
          def disconnect!
            super
          ensure
            if (restart = @restart)
              @restart = nil
              restart.signal
            end
          end

          # Close the socket.
          def close
            super
          ensure
            if @socket
              @socket.close
              @socket = nil
            end
          end

          def run_loop
            while @driver&.next_event
              # $stderr.puts event.inspect
            end
          end

          protected

          def build_ssl_context
            OpenSSL::SSL::SSLContext.new(:TLSv1_2_client).tap do |ctx|
              ctx.set_params(verify_mode: OpenSSL::SSL::VERIFY_PEER)
            end
          end

          def build_endpoint
            endpoint = ::Async::IO::Endpoint.tcp(addr, port)
            endpoint = ::Async::IO::SSLEndpoint.new(endpoint, ssl_context: build_ssl_context) if secure?
            endpoint
          end

          def connect_socket
            build_endpoint.connect
          end

          def connect
            @socket = connect_socket
            @driver = Client.new(@socket, url)
          end
        end
      end
    end
  end
end

if Gem::Version.new(Async::WebSocket::VERSION) >= Gem::Version.new('0.9.0')
  raise(
    "Incompatible version of async-websocket, #{Async::WebSocket::VERSION}, " \
    "use \"gem 'async-websocket', '~> 0.8.0'\"."
  )
end