NexusSW/nio4r-websocket

View on GitHub
lib/nio/websocket.rb

Summary

Maintainability
A
35 mins
Test Coverage
A
94%
require "nio/websocket/version"
require "websocket/driver"
require "nio"
require "socket"
require "uri"
require "openssl"
require "logger"
require "nio/websocket/reactor"
require "nio/websocket/adapter/client"
require "nio/websocket/adapter/server"
require "nio/websocket/adapter/proxy"

module NIO
  module WebSocket
    class << self
      # Returns the current logger, or creates one at level ERROR if one has not been assigned
      # @return [Logger] the current logger instance
      def logger
        @logger ||= begin
        logger = Logger.new(STDERR, progname: "WebSocket", level: Logger::ERROR)
        logger.level = Logger::ERROR
        logger
      end
      end

      attr_writer :logger

      # Should raw traffic be logged through the logger?  Disabled by default for security reasons
      # @param enable [Boolean]
      def log_traffic=(enable)
        @log_traffic = enable
        logger.level = Logger::DEBUG if enable
      end

      # Should raw traffic be logged through the logger?  Disabled by default for security reasons
      def log_traffic?
        @log_traffic
      end

      # Create and return a websocket client that communicates either over the given IO object (upgrades the connection),
      # or we'll create a new connection to url if io is not supplied
      # @param [String] url ws:// or wss:// location to connect
      # @param [Hash] options
      # @param [IO] io (DI) raw IO object to use in lieu of opening a new connection to url
      # @option options [Hash] :websocket_options Hash to pass to the ::WebSocket::Driver.client
      # @option options [Hash] :ssl_context Hash from which to create the OpenSSL::SSL::SSLContext object
      # @yield [::WebSocket::Driver]
      # @return [::WebSocket::Driver]
      def connect(url, options = {}, io = nil)
        io ||= open_socket(url, options)
        adapter = CLIENT_ADAPTER.new(url, io, options)
        yield(adapter.driver, adapter) if block_given?
        Reactor.queue_task do
          adapter.add_to_reactor
        end
        Reactor.start
        logger.info "Client #{io} connected to #{url}"
        adapter.driver
      end

      # Establish a proxy host listening on the given port and address, that marshalls all data to/from a new connection on remote
      # @param [Hash] options
      # @param remote [String] remote server in "hostname_or_ip:port" format
      # @option options [Integer] :port required: Port on which to listen for incoming connections
      # @option options [String] :address optional: Specific Address on which to bind the TCPServer
      # @option options [Hash] :ssl_context Hash from which to create the OpenSSL::SSL::SSLContext object
      # @yield [::WebSocket::Driver]
      # @return server, as passed in, or a new TCPServer if no server was specified
      def proxy(remote, options = {})
        server = create_server(options)
        host, port, extra = remote.split(":", 3)
        raise "Specify the remote parameter in 'hostname_or_ip:port' format" if extra || port.to_i == 0 || host.empty?
        Reactor.queue_task do
          monitor = Reactor.selector.register(server, :r)
          monitor.value = proc do
            accept_socket server, options do |client|
              srv = open_socket "tcp://#{remote}", options
              adapter = PROXY_ADAPTER.new(srv, client, options)
              Reactor.queue_task do
                adapter.add_to_reactor
              end
              logger.info "Proxy connection established between #{srv} and #{client}"
            end
          end
        end
        logger.info "Proxy Host listening for new connections on port " + options[:port].to_s
        Reactor.start
        server
      end

      # Start handling new connections, passing each through the supplied block
      # @param [Hash] options
      # @param server [TCPServer] (DI) TCPServer-like object to use in lieu of starting a new server
      # @option options [Integer] :port required: Port on which to listen for incoming connections
      # @option options [String] :address optional: Specific Address on which to bind the TCPServer
      # @option options [Hash] :websocket_options Hash to pass to the ::WebSocket::Driver.server
      # @option options [Hash] :ssl_context Hash from which to create the OpenSSL::SSL::SSLContext object
      # @yield [::WebSocket::Driver]
      # @return server, as passed in, or a new TCPServer if no server was specified
      def listen(options = {}, server = nil)
        server ||= create_server(options)
        Reactor.queue_task do
          monitor = Reactor.selector.register(server, :r)
          monitor.value = proc do
            accept_socket server, options do |io| # this next block won't run until ssl (if enabled) has started
              adapter = SERVER_ADAPTER.new(io, options)
              yield(adapter.driver, adapter) if block_given?
              Reactor.queue_task do
                adapter.add_to_reactor
              end
              logger.info "Host accepted client connection #{io} on port #{options[:port]}"
            end
          end
        end
        Reactor.start
        logger.info "Host listening for new connections on port " + options[:port].to_s
        server
      end

      SERVER_ADAPTER = NIO::WebSocket::Adapter::Server
      CLIENT_ADAPTER = NIO::WebSocket::Adapter::Client
      PROXY_ADAPTER = NIO::WebSocket::Adapter::Proxy

      # Resets this API to a fresh state
      def reset
        logger.info "Resetting reactor subsystem"
        Reactor.reset
      end

      # @!endgroup

      private

      # return an open socket given the url and options
      def open_socket(url, options)
        uri = URI(url)
        port = uri.port || (uri.scheme == "wss" ? 443 : 80) # redundant?  test uri.port if port is unspecified but because ws: & wss: aren't default protocols we'll maybe still need this(?)
        logger.debug "Opening Connection to #{uri.hostname} on port #{port}"
        io = TCPSocket.new uri.hostname, port
        return io unless uri.scheme == "wss"
        logger.debug "Upgrading Connection #{io} to ssl"
        ssl = upgrade_to_ssl(io, options).connect
        logger.info "Connection #{io} upgraded to #{ssl}"
        ssl
      end

      def create_server(options)
        options[:address] ? TCPServer.new(options[:address], options[:port]) : TCPServer.new(options[:port])
      end

      # supply a block to run after protocol negotiation
      def accept_socket(server, options)
        waiting = accept_nonblock server
        if [:r, :w].include? waiting
          logger.warn "Expected to receive new connection, but the server is not quite ready"
          return
        end
        logger.debug "Receiving new connection #{waiting} on port #{options[:port]}"
        if options[:ssl_context]
          logger.debug "Upgrading Connection #{waiting} to ssl"
          ssl = upgrade_to_ssl(waiting, options)
          try_accept_nonblock ssl do
            logger.info "Incoming connection #{waiting} upgraded to #{ssl}"
            yield ssl
          end
        else
          yield waiting
        end
      end

      def try_accept_nonblock(io)
        waiting = accept_nonblock io
        if [:r, :w].include? waiting
          # Only happens on server side ssl negotiation
          Reactor.queue_task do
            monitor = Reactor.selector.register(io, :rw)
            monitor.value = proc do
              waiting = accept_nonblock io
              unless [:r, :w].include? waiting
                monitor.close
                yield waiting
              end
            end
          end
        else
          yield waiting
        end
      end

      def accept_nonblock(io)
        io.accept_nonblock
      rescue IO::WaitReadable
        :r
      rescue IO::WaitWritable
        :w
      end

      def upgrade_to_ssl(io, options)
        store = OpenSSL::X509::Store.new
        store.set_default_paths
        ctx = OpenSSL::SSL::SSLContext.new
        { cert_store: store, verify_mode: OpenSSL::SSL::VERIFY_PEER }.merge(options[:ssl_context] || {}).each do |k, v|
          ctx.send "#{k}=", v if ctx.respond_to? k
        end
        OpenSSL::SSL::SSLSocket.new(io, ctx)
      end
    end
  end
end