lib/tamashii/server/connection/stream_event_loop.rb
# frozen_string_literal: true
Thread.abort_on_exception = true
module Tamashii
module Server
module Connection
# :nodoc:
class StreamEventLoop
def initialize
@nio = @thread = nil
@stopping = false
@streams = {}
@todo = Queue.new
@spawn_mutex = Mutex.new
end
def timer(interval, &block)
Concurrent::TimerTask.new(
execution_interval: interval, &block
).tap(&:execute)
end
def post(task = nil, &block)
task ||= block
Concurrent.global_io_executor << task
end
def attach(io, stream)
@todo << lambda do
@streams[io] = @nio.register(io, :r)
@streams[io].value = stream
end
wakeup
end
def detach(io, _)
@todo << lambda do
@nio.deregister io
@streams.delete io
io.close
end
wakeup
end
def writes_pending(io)
@todo << lambda do
monitor = @streams[io]
monitor&.interests = :rw
end
wakeup
end
def stop
@stopping = true
wakeup if @nio
end
def stopped?
@stopping
end
private
def spawn
return if @thread && @thread.status
@spawn_mutex.synchronize do
return if @thread && @thread.status
@nio ||= NIO::Selector.new
@thread = Thread.new { run }
return true
end
end
def wakeup
spawn || @nio.wakeup
end
def run
loop do
if stopped?
@nio.close
break
end
@todo.pop(true).call until @todo.empty?
monitors = @nio.select
next unless monitors
process(monitors)
end
end
def process(monitors)
monitors.each do |monitor|
io = monitor.io
stream = monitor.value
if monitor.writable?
monitor.interests = :r if stream.flush_write_buffer
next unless monitor.readable?
end
next unless read(io, stream)
end
end
def read(io, stream)
incoming = io.read_nonblock(4096, exception: false)
case incoming
when :wait_readable then false
when nil then stream.close
else
stream.receive incoming
end
rescue
try_close(io, stream)
end
def try_close(io, stream)
stream.close
rescue
@nio.deregister io
@streams.delete io
end
end
end
end
end