lib/nio/websocket.rb
require "nio/websocket/version"
require "websocket/driver"
require "nio"
require "socket"
require "uri"
require "openssl"
require "logger"
require "nio/websocket/reactor"
require "nio/websocket/adapter/client"
require "nio/websocket/adapter/server"
require "nio/websocket/adapter/proxy"
module NIO
module WebSocket
class << self
# Returns the current logger, or creates one at level ERROR if one has not been assigned
# @return [Logger] the current logger instance
def logger
@logger ||= begin
logger = Logger.new(STDERR, progname: "WebSocket", level: Logger::ERROR)
logger.level = Logger::ERROR
logger
end
end
attr_writer :logger
# Should raw traffic be logged through the logger? Disabled by default for security reasons
# @param enable [Boolean]
def log_traffic=(enable)
@log_traffic = enable
logger.level = Logger::DEBUG if enable
end
# Should raw traffic be logged through the logger? Disabled by default for security reasons
def log_traffic?
@log_traffic
end
# Create and return a websocket client that communicates either over the given IO object (upgrades the connection),
# or we'll create a new connection to url if io is not supplied
# @param [String] url ws:// or wss:// location to connect
# @param [Hash] options
# @param [IO] io (DI) raw IO object to use in lieu of opening a new connection to url
# @option options [Hash] :websocket_options Hash to pass to the ::WebSocket::Driver.client
# @option options [Hash] :ssl_context Hash from which to create the OpenSSL::SSL::SSLContext object
# @yield [::WebSocket::Driver]
# @return [::WebSocket::Driver]
def connect(url, options = {}, io = nil)
io ||= open_socket(url, options)
adapter = CLIENT_ADAPTER.new(url, io, options)
yield(adapter.driver, adapter) if block_given?
Reactor.queue_task do
adapter.add_to_reactor
end
Reactor.start
logger.info "Client #{io} connected to #{url}"
adapter.driver
end
# Establish a proxy host listening on the given port and address, that marshalls all data to/from a new connection on remote
# @param [Hash] options
# @param remote [String] remote server in "hostname_or_ip:port" format
# @option options [Integer] :port required: Port on which to listen for incoming connections
# @option options [String] :address optional: Specific Address on which to bind the TCPServer
# @option options [Hash] :ssl_context Hash from which to create the OpenSSL::SSL::SSLContext object
# @yield [::WebSocket::Driver]
# @return server, as passed in, or a new TCPServer if no server was specified
def proxy(remote, options = {})
server = create_server(options)
host, port, extra = remote.split(":", 3)
raise "Specify the remote parameter in 'hostname_or_ip:port' format" if extra || port.to_i == 0 || host.empty?
Reactor.queue_task do
monitor = Reactor.selector.register(server, :r)
monitor.value = proc do
accept_socket server, options do |client|
srv = open_socket "tcp://#{remote}", options
adapter = PROXY_ADAPTER.new(srv, client, options)
Reactor.queue_task do
adapter.add_to_reactor
end
logger.info "Proxy connection established between #{srv} and #{client}"
end
end
end
logger.info "Proxy Host listening for new connections on port " + options[:port].to_s
Reactor.start
server
end
# Start handling new connections, passing each through the supplied block
# @param [Hash] options
# @param server [TCPServer] (DI) TCPServer-like object to use in lieu of starting a new server
# @option options [Integer] :port required: Port on which to listen for incoming connections
# @option options [String] :address optional: Specific Address on which to bind the TCPServer
# @option options [Hash] :websocket_options Hash to pass to the ::WebSocket::Driver.server
# @option options [Hash] :ssl_context Hash from which to create the OpenSSL::SSL::SSLContext object
# @yield [::WebSocket::Driver]
# @return server, as passed in, or a new TCPServer if no server was specified
def listen(options = {}, server = nil)
server ||= create_server(options)
Reactor.queue_task do
monitor = Reactor.selector.register(server, :r)
monitor.value = proc do
accept_socket server, options do |io| # this next block won't run until ssl (if enabled) has started
adapter = SERVER_ADAPTER.new(io, options)
yield(adapter.driver, adapter) if block_given?
Reactor.queue_task do
adapter.add_to_reactor
end
logger.info "Host accepted client connection #{io} on port #{options[:port]}"
end
end
end
Reactor.start
logger.info "Host listening for new connections on port " + options[:port].to_s
server
end
SERVER_ADAPTER = NIO::WebSocket::Adapter::Server
CLIENT_ADAPTER = NIO::WebSocket::Adapter::Client
PROXY_ADAPTER = NIO::WebSocket::Adapter::Proxy
# Resets this API to a fresh state
def reset
logger.info "Resetting reactor subsystem"
Reactor.reset
end
# @!endgroup
private
# return an open socket given the url and options
def open_socket(url, options)
uri = URI(url)
port = uri.port || (uri.scheme == "wss" ? 443 : 80) # redundant? test uri.port if port is unspecified but because ws: & wss: aren't default protocols we'll maybe still need this(?)
logger.debug "Opening Connection to #{uri.hostname} on port #{port}"
io = TCPSocket.new uri.hostname, port
return io unless uri.scheme == "wss"
logger.debug "Upgrading Connection #{io} to ssl"
ssl = upgrade_to_ssl(io, options).connect
logger.info "Connection #{io} upgraded to #{ssl}"
ssl
end
def create_server(options)
options[:address] ? TCPServer.new(options[:address], options[:port]) : TCPServer.new(options[:port])
end
# supply a block to run after protocol negotiation
def accept_socket(server, options)
waiting = accept_nonblock server
if [:r, :w].include? waiting
logger.warn "Expected to receive new connection, but the server is not quite ready"
return
end
logger.debug "Receiving new connection #{waiting} on port #{options[:port]}"
if options[:ssl_context]
logger.debug "Upgrading Connection #{waiting} to ssl"
ssl = upgrade_to_ssl(waiting, options)
try_accept_nonblock ssl do
logger.info "Incoming connection #{waiting} upgraded to #{ssl}"
yield ssl
end
else
yield waiting
end
end
def try_accept_nonblock(io)
waiting = accept_nonblock io
if [:r, :w].include? waiting
# Only happens on server side ssl negotiation
Reactor.queue_task do
monitor = Reactor.selector.register(io, :rw)
monitor.value = proc do
waiting = accept_nonblock io
unless [:r, :w].include? waiting
monitor.close
yield waiting
end
end
end
else
yield waiting
end
end
def accept_nonblock(io)
io.accept_nonblock
rescue IO::WaitReadable
:r
rescue IO::WaitWritable
:w
end
def upgrade_to_ssl(io, options)
store = OpenSSL::X509::Store.new
store.set_default_paths
ctx = OpenSSL::SSL::SSLContext.new
{ cert_store: store, verify_mode: OpenSSL::SSL::VERIFY_PEER }.merge(options[:ssl_context] || {}).each do |k, v|
ctx.send "#{k}=", v if ctx.respond_to? k
end
OpenSSL::SSL::SSLSocket.new(io, ctx)
end
end
end
end