lib/concurrent_worker/common.rb
module ConcurrentWorker
require 'thread'
class RequestCounter
def initialize
@count = Queue.new
@com = Queue.new
end
def push(args)
@count.push(args)
end
def pop
Thread.handle_interrupt(Object => :never) do
r = @count.pop
@com.push(true)
r
end
end
def wait_until_less_than(n)
return if @count.size < n
while @com.pop
break if @count.size < n
end
end
def empty?
@count.empty?
end
def size
@count.size
end
def close
@count.close
end
def closed?
@count.closed?
end
end
class IPCDuplexChannel
def initialize
@p_pid = Process.pid
@p2c = IO.pipe('ASCII-8BIT', 'ASCII-8BIT')
@c2p = IO.pipe('ASCII-8BIT', 'ASCII-8BIT')
end
def choose_io
w_pipe, r_pipe = @p_pid == Process.pid ? [@p2c, @c2p] : [@c2p, @p2c]
@wio, @rio = w_pipe[1], r_pipe[0]
[w_pipe[0], r_pipe[1]].map(&:close)
end
def send(obj)
begin
Marshal.dump(obj, @wio)
end
end
def recv
begin
Marshal.load(@rio)
rescue IOError
raise StopIteration
end
end
def close
[@wio, @rio].map(&:close)
end
end
end