experiments/concurrency/stub.rb
require 'celluloid'
require 'coap'
require 'david'
class Listener
include David::Server::Respond
def initialize(mode, socket, cache)
@mode = mode
@socket = socket
@cache = cache
@app = Rack::HelloWorld.new
@options = David::AppConfig.new
end
def run
loop do
if defined?(JRuby) || defined?(Rubinius) || @mode == :prefork || @mode == :threaded
data, sender = @socket.recvfrom(1152)
port, _, host = sender[1..3]
else
begin
data, sender, _, anc = @socket.to_io.recvmsg_nonblock
rescue ::IO::WaitReadable
Celluloid::IO.wait_readable(@socket)
retry
end
host, port = sender.ip_address, sender.ip_port
end
message = CoAP::Message.parse(data)
exchange = David::Exchange.new(host, port, message, anc)
return if !exchange.non? && exchange.multicast?
key = exchange.key
cached = @cache[key]
if exchange.ack? && !cached.nil?
@cache.delete(key)
elsif exchange.request?
if exchange.con? && !cached.nil?
response = cached[0]
else
response, _ = respond(exchange)
end
end
unless response.nil?
@socket.send(response.to_wire, 0, host, port)
if !cached.nil?
cached[1] = Time.now.to_i
elsif exchange.reliable?
@cache[[host, response.mid]] = [response, Time.now.to_i]
end
end
end
end
end
trap('EXIT') { socket.close }
cache = {}
case ARGV[0]
when 'prefork'
# ~33000
socket = UDPSocket.new(Socket::AF_INET6)
socket.bind('::', 5683)
4.times { fork { Listener.new(:prefork, socket, cache).run } }
when 'threaded'
# ~17500
socket = UDPSocket.new(Socket::AF_INET6)
socket.bind('::', 5683)
Listener.send(:include, Celluloid)
Listener.pool(size: 8, args: [:threaded, socket, cache]).run
else
# ~17000
socket = Celluloid::IO::UDPSocket.new(Socket::AF_INET6)
socket.bind('::', 5683)
Listener.send(:include, Celluloid::IO)
Listener.new(:sped, socket, cache).run
end
Process.waitall