lib/stepladder/worker.rb
require 'ostruct'
module Stepladder
class Worker
attr_reader :supply
def initialize(p={}, &block)
@supply = p[:supply]
@task = block || p[:task]
@context = p[:context] || OpenStruct.new
# don't define default task here
# because we want to allow for
# an initialized worker to have
# a task injected, including
# method-based tasks.
end
def shift
ensure_ready_to_work!
workflow.resume
end
def ready_to_work?
@task && (supply || !task_accepts_a_value?)
end
def supplies(subscribing_party)
subscribing_party.supply = self
subscribing_party
end
alias_method :"|", :supplies
def supply=(supplier)
raise WorkerError.new("Worker is a source, and cannot accept a supply") unless suppliable?
@supply = supplier
end
def suppliable?
@task && @task.arity > 0
end
private
def ensure_ready_to_work!
@task ||= default_task
unless ready_to_work?
raise "This worker's task expects to receive a value from a supplier, but has no supply."
end
end
def workflow
@my_little_machine ||= Fiber.new do
loop do
value = supply && supply.shift
Fiber.yield @task.call(value, supply, @context)
end
end
end
def default_task
Proc.new { |value| value }
end
def task_accepts_a_value?
@task.arity > 0
end
def task_method_exists?
self.methods.include? :task
end
def task_method_accepts_a_value?
self.method(:task).arity > 0
end
end
class WorkerError < StandardError; end
end