lib/simpler_workflow/workflow.rb
module SimplerWorkflow
class Workflow
include OptionsAsMethods
attr_reader :task_list, :domain, :name, :version, :options, :initial_activity_type
def initialize(domain, name, version, options = {})
Workflow.workflows[[name, version]] ||= begin
default_options = {
:default_task_list => name,
:default_task_start_to_close_timeout => 2 * 60,
:default_execution_start_to_close_timeout => 2 * 60,
:default_child_policy => :terminate
}
@options = default_options.merge(options)
@domain = domain
@name = name
@version = version
self
end
end
def initial_activity(name, version = nil)
activity = Activity[domain, name.to_sym, version]
@initial_activity_type = activity.to_activity_type
end
def decision_loop
SimplerWorkflow.child_processes << fork do
$0 = "Workflow: #{name} #{version}"
Signal.trap('QUIT') do
# Don't log in trap, ruby 2 complains
# since we need to exit quickly, only delay quit
# if we are in the middle of a task
if @in_task
@time_to_exit = true
else
Process.exit 0
end
end
Signal.trap('INT') do
# Don't log in trap, ruby 2 complains
Process.exit!(0)
end
if SimplerWorkflow.after_fork
SimplerWorkflow.after_fork.call
end
loop do
begin
logger.info("Waiting for a decision task for #{name.to_s}, #{version} listening to #{task_list}")
domain.decision_tasks.poll_for_single_task(task_list) do |decision_task|
@in_task = true # lock for TERM signal handling
handle_decision_task(decision_task)
end
Process.exit 0 if @time_to_exit
rescue Timeout::Error => e
if @time_to_exit
Process.exit 0
else
retry
end
rescue => e
context = {
:workflow => to_workflow_type
}
SimplerWorkflow.exception_reporter.report(e, context)
raise e
ensure
@in_task = false
end
end
end
end
def task_list
options[:default_task_list][:name].to_s
end
def to_workflow_type
{ :name => name, :version => version }
end
def start_workflow(input, options = {})
options[:input] = input
domain.workflow_types[name.to_s, version].start_execution(options)
end
def on_start_execution(&block)
event_handlers['WorkflowExecutionStarted'] = block
end
def on_activity_completed(&block)
event_handlers['ActivityTaskCompleted'] = block
end
def on_activity_failed(&block)
event_handlers['ActivityTaskFailed'] = block
end
def on_activity_timed_out(&block)
event_handlers['ActivityTaskTimedOut'] = block
end
def self.[](name, version)
workflows[[name, version]]
end
def self.register(name, version, workflow)
workflows[[name, version]] = workflow
end
def scheduled_event(decision_task, event)
decision_task.scheduled_event(event)
end
def last_activity(decision_task, event)
scheduled_event(decision_task, event).attributes.activity_type
end
def last_input(decision_task, event)
scheduled_event(decision_task, event).attributes.input
end
protected
def self.workflows
@workflows ||= {}
end
def self.swf
SimplerWorkflow.swf
end
def logger
SimplerWorkflow.logger
end
def handle_decision_task(decision_task)
decision_task.extend AWS::SimpleWorkflow::DecisionTaskAdditions
logger.info("Received decision task")
decision_task.new_events.each do |event|
logger.info("Processing #{event.event_type}")
event_handlers.fetch(event.event_type, DefaultEventHandler.new(self)).call(decision_task, event)
end
end
def event_handlers
@event_handlers ||= Map[
:WorkflowExecutionStarted , WorkflowExecutionStartedHandler.new(self) ,
:ActivityTaskCompleted , ActivityTaskCompletedHandler.new(self) ,
:ActivityTaskFailed , ActivityTaskFailedHandler.new(self) ,
:ActivityTaskTimedOut , ActivityTaskTimedOutHandler.new(self)
]
end
class DefaultEventHandler
attr_accessor :workflow
def initialize(workflow)
@workflow = workflow
end
def scheduled_event(*args)
workflow.scheduled_event(*args)
end
def domain
workflow.domain
end
def last_activity(*args)
workflow.last_activity(*args)
end
def last_input(*args)
workflow.last_input(*args)
end
def initial_activity_type
workflow.initial_activity_type
end
def call(*args); end
end
class ActivityTaskTimedOutHandler < DefaultEventHandler
def call(decision_task, event)
case event.attributes.timeoutType
when 'START_TO_CLOSE', 'SCHEDULE_TO_START', 'SCHEDULE_TO_CLOSE'
last_activity_type = last_activity(decision_task, event)
SimplerWorkflow.logger.info("Retrying activity #{last_activity_type.name} #{last_activity_type.version} due to timeout.")
decision_task.schedule_activity_task last_activity_type, :input => last_input(decision_task, event)
when 'HEARTBEAT'
decision_task.fail_workflow_execution
end
end
end
class ActivityTaskFailedHandler < DefaultEventHandler
def call(decision_task, event)
last_activity_type = last_activity(decision_task, event)
failed_activity = domain.activities[last_activity_type]
case failed_activity.failure_policy
when :abort, :cancel
SimplerWorkflow.logger.info("Cancelling workflow execution.")
decision_task.cancel_workflow_execution
when :retry
SimplerWorkflow.logger.info("Retrying activity #{last_activity_type.name} #{last_activity_type.version}")
decision_task.schedule_activity_task last_activity_type, :input => last_input(decision_task, event)
else
SimplerWorkflow.logger.info("Failing the workflow execution.")
decision_task.fail_workflow_execution
end
end
end
class ActivityTaskCompletedHandler < DefaultEventHandler
def call(decision_task, event)
last_activity_type = last_activity(decision_task, event)
completed_activity = domain.activities[last_activity_type]
if next_activity = completed_activity.next_activity
activity_type = domain.activity_types[next_activity.name, next_activity.version]
decision_task.schedule_activity_task activity_type, input: scheduled_event(decision_task, event).attributes.input
else
decision_task.complete_workflow_execution(result: 'success')
end
end
end
class WorkflowExecutionStartedHandler < DefaultEventHandler
def call(decision_task, event)
decision_task.schedule_activity_task initial_activity_type, input: event.attributes.input
end
end
end
end