lib/concurrent-ruby-edge/concurrent/channel/selector.rb
require 'concurrent/maybe'
require 'concurrent/channel/selector/after_clause'
require 'concurrent/channel/selector/default_clause'
require 'concurrent/channel/selector/put_clause'
require 'concurrent/channel/selector/take_clause'
module Concurrent
class Channel
# @!visibility private
class Selector
def initialize
@clauses = []
@error_handler = nil
end
def case(channel, action, message = nil, &block)
if [:take, :poll, :receive, :~].include?(action)
take(channel, &block)
elsif [:put, :offer, :send, :<<].include?(action)
put(channel, message, &block)
else
raise ArgumentError.new('invalid action')
end
end
def take(channel, &block)
raise ArgumentError.new('no block given') unless block_given?
@clauses << TakeClause.new(channel, block)
end
alias_method :receive, :take
def put(channel, message, &block)
@clauses << PutClause.new(channel, message, block)
end
alias_method :send, :put
def after(seconds, &block)
@clauses << AfterClause.new(seconds, block)
end
alias_method :timeout, :after
def default(&block)
raise ArgumentError.new('no block given') unless block_given?
@clauses << DefaultClause.new(block)
end
def error(&block)
raise ArgumentError.new('no block given') unless block_given?
raise ArgumentError.new('only one error handler allowed') if @error_handler
@error_handler = block
end
def execute
raise Channel::Error.new('no clauses given') if @clauses.empty?
loop do
done = @clauses.each do |clause|
result = clause.execute
break result if result.just?
end
break done.value if done.is_a?(Concurrent::Maybe)
Thread.pass
end
rescue => ex
if @error_handler
@error_handler.call(ex)
else
raise ex
end
end
end
end
end