lib/concurrent_worker/worker.rb
module ConcurrentWorker
class Worker
# Worker : worker class
# +cncr_block : concurrent processing block : thread(as ConcurrentThread)/process(as ConcurrentProcess)
# +base_block : user defined preparation to exec 'work block'
# +loop_block : loop of receiving request and exec 'work block'
# +work_block : user requested work
#
# These blocks are executed with 'instance_exec' method of worker,
# so that they can share instance variables:@xxxx.
#
attr_reader :req_counter, :snd_queue_max, :undone_requests
def queue_closed?
@req_counter.closed?
end
def queue_empty?
!queue_closed? && @req_counter.size == 0
end
def initialize(*args, **options, &work_block)
@args = args
@options = options
set_block(:work_block, &work_block) if work_block
@state = :idle
@result_callbacks = []
@retired_callbacks = []
@snd_queue_max = @options[:snd_queue_max] || 2
@req_mutex = Mutex.new
@req_counter = RequestCounter.new
@options[:result_callback_interrupt] ||= :immediate
@options[:retired_callback_interrupt] ||= :immediate
@undone_requests = []
case @options[:type]
when :process
class << self
include ConcurrentProcess
end
when :thread
class << self
include ConcurrentThread
end
else
class << self
include ConcurrentThread
end
end
end
def add_callback(&callback)
raise "block is nil" unless callback
@result_callbacks.push(callback)
end
def clear_callbacks
@result_callbacks.clear
end
def call_result_callbacks(args)
Thread.handle_interrupt(Object => :never) do
Thread.handle_interrupt(Object => @options[:result_callback_interrupt]) do
@result_callbacks.each do |callback|
callback.call(*args)
end
end
@req_counter.pop
end
end
def add_retired_callback(&callback)
raise "block is nil" unless callback
@retired_callbacks.push(callback)
end
def clear_retired_callbacks
@retired_callbacks.clear
end
def call_retired_callbacks
Thread.handle_interrupt(Object => @options[:retired_callback_interrupt]) do
@retired_callbacks.each do |callback|
callback.call
end
end
end
def req_counter_close
@req_counter.close
until @req_counter.empty?
@undone_requests.push(@req_counter.pop)
end
end
def result_handle_thread(&recv_block)
Thread.new do
Thread.handle_interrupt(Object => :never) do
begin
Thread.handle_interrupt(Object => :immediate) do
recv_block.call
end
ensure
req_counter_close
channel_close
call_retired_callbacks
end
end
end
end
def define_block(symbol,&block)
worker_block = Proc.new do |*args|
self.instance_exec(*args, &block)
end
instance_variable_set("@" + symbol.to_s, worker_block)
end
def define_block_yield(symbol)
define_singleton_method("yield_" + symbol.to_s) do |*args|
blk = instance_variable_get("@" + symbol.to_s)
if blk
blk.call(*args)
else
raise "block " + symbol.to_s + " is not defined"
end
end
end
def set_block(symbol, &block)
raise "block is nil" unless block
unless [:base_block, :loop_block, :work_block].include?(symbol)
raise symbol.to_s + " is not used as worker block"
end
define_block(symbol,&block)
define_block_yield(symbol)
end
def set_default_loop_block
set_block(:loop_block) do
while req = receive_req
(args, work_block) = req
if work_block
set_block(:work_block, &work_block)
end
send_res([yield_work_block(*args)])
end
end
end
def set_default_base_block
set_block(:base_block) do
yield_loop_block
end
end
def run
@state = :run
set_default_loop_block unless defined?(@loop_block) && @loop_block
set_default_base_block unless defined?(@base_block) && @base_block
cncr_block
end
def req(*args, &work_block)
@req_mutex.synchronize do
unless @state == :run
run
end
@req_counter.wait_until_less_than(@snd_queue_max) if @snd_queue_max > 0
begin
@req_counter.push([args, work_block])
send_req([args, work_block])
true
rescue ClosedQueueError, IOError
false
end
end
end
def quit
unless @state == :run
return
end
begin
send_req(nil)
true
rescue ClosedQueueError, IOError
false
end
end
def join
unless @state == :run
return true
end
@req_counter.wait_until_less_than(1)
quit
wait_cncr_proc
true
end
end
module ConcurrentThread
def cncr_block
@thread_channel = Queue.new
@thread = result_handle_thread do
yield_base_block
end
end
def send_req(args)
@thread_channel.push(args)
end
def receive_req
@thread_channel.pop
end
def send_res(args)
call_result_callbacks(args)
end
def channel_close
@thread_channel.close
end
def wait_cncr_proc
@thread && @thread.join
end
end
module ConcurrentProcess
def ipc_recv_loop
while result = @ipc_channel.recv
raise result if result.kind_of?(Exception)
call_result_callbacks(result)
end
end
def cncr_block
@ipc_channel = IPCDuplexChannel.new
@c_pid = fork do
@ipc_channel.choose_io
Thread.handle_interrupt(Object => :never) do
begin
Thread.handle_interrupt(Object => :immediate) do
yield_base_block
end
rescue
@ipc_channel.send($!)
ensure
@ipc_channel.send(nil)
end
end
end
@ipc_channel.choose_io
@recv_thread = result_handle_thread do
ipc_recv_loop
end
end
def send_req(args)
#called from main process only
@ipc_channel.send(args)
end
def receive_req
#called from worker process only
@ipc_channel.recv
end
def send_res(args)
#called from worker process only
@ipc_channel.send(args)
end
def channel_close
@ipc_channel.close
end
def wait_cncr_proc
begin
Process.waitpid(@c_pid)
rescue Errno::ECHILD
end
@recv_thread && @recv_thread.join
end
end
end