lib/rukawa/job.rb
require 'concurrent'
require 'rukawa/abstract_job'
require 'rukawa/dependency'
require 'rukawa/state'
require 'active_support/core_ext/class'
require 'active_support/callbacks'
module Rukawa
class Job < AbstractJob
include ActiveSupport::Callbacks
define_callbacks :run,
terminator: ->(_,result) { result == false },
skip_after_callbacks_if_terminated: true,
scope: [:kind, :name],
only: [:before, :around, :after]
define_callbacks :fail,
terminator: ->(_,result) { result == false },
skip_after_callbacks_if_terminated: true,
scope: [:kind, :name],
only: [:after]
attr_accessor :in_comings, :out_goings
attr_reader :state, :started_at, :finished_at, :variables
class_attribute :retryable, :retry_limit, :retry_exception_type, :retry_wait, instance_writer: false
class_attribute :dependency_type, instance_writer: false
class_attribute :resource_count, instance_reader: false, instance_writer: false
self.dependency_type = Dependency::AllSuccess
self.resource_count = 1
class << self
def set_retryable(limit: 8, type: nil, wait: nil)
self.retryable = true
self.retry_limit = limit
self.retry_exception_type = type
self.retry_wait = wait
end
def set_dependency_type(name)
self.dependency_type = Rukawa::Dependency.get(name)
end
def set_resource_count(count)
self.resource_count = count
end
def before_run(*args, **options, &block)
set_callback :run, :before, *args, **options, &block
end
def after_run(*args, **options, &block)
options[:prepend] = true
conditional = ActiveSupport::Callbacks::Conditionals::Value.new { |v|
v != false
}
options[:if] = Array(options[:if]) << conditional
set_callback :run, :after, *args, **options, &block
end
def after_fail(*args, **options, &block)
options[:prepend] = true
conditional = ActiveSupport::Callbacks::Conditionals::Value.new { |v|
v != false
}
options[:if] = Array(options[:if]) << conditional
set_callback :fail, :after, *args, **options, &block
end
def around_run(*args, **options, &block)
set_callback :run, :around, *args, **options, &block
end
def wrappers
@@wrappers ||= {}
end
def wrapper_for(*classes)
classes.each do |c|
raise "Wrapper for #{c} is already defined" if wrappers[c]
wrappers[c] = self
end
end
end
around_run do |_, blk|
Rukawa.logger.info("Start #{self.class}")
blk.call
Rukawa.logger.info("Finish #{self.class}")
end
around_run do |_, blk|
set_state(:running)
blk.call
set_state(:finished)
end
def initialize(variables: {}, context: Context.new, parent_job_net: nil)
@parent_job_net = parent_job_net
@variables = variables
@context = context
@in_comings = Set.new
@out_goings = Set.new
@retry_count = 0
@retry_wait = 1
set_state(:waiting)
end
def set_state(name)
@state = Rukawa::State.get(name)
end
def root?
in_comings.select { |edge| edge.cluster == @parent_job_net }.empty?
end
def leaf?
out_goings.select { |edge| edge.cluster == @parent_job_net }.empty?
end
def dataflow
return @dataflow if @dataflow
return @dataflow = bypass_dataflow if @state.bypassed?
@dataflow = Concurrent.dataflow_with(@context.executor, *depend_dataflows) do |*results|
acquire_resource do
do_run(*results)
@state
end
end
end
def run
end
private def do_run(*results)
@started_at = Time.now
if skip?
Rukawa.logger.info("Skip #{self.class}")
set_state(:skipped)
else
check_dependencies(results)
run_callbacks :run do
run
end
end
rescue => e
run_callbacks :fail
handle_error(e)
Rukawa.logger.error("Retry #{self.class}")
retry
ensure
@finished_at = Time.now
end
def jobs_as_from
[self]
end
alias :jobs_as_to :jobs_as_from
def to_dot_def
if state == Rukawa::State::Waiting
"\"#{name}\";\n"
else
"\"#{name}\" [style = filled,fillcolor = #{state.color}];\n"
end
end
def resource_count
[self.class.resource_count, Rukawa.config.concurrency].min
end
private
def depend_dataflows
in_comings.map { |edge| edge.from.dataflow }
end
def bypass_dataflow
Concurrent.dataflow_with(@context.executor, *depend_dataflows) do
Rukawa.logger.info("Skip #{self.class}")
@state
end
end
def dependency_type
self.class.dependency_type
end
def check_dependencies(results)
dependency = dependency_type.new(*results)
unless dependency.resolve
set_state(:aborted)
raise DependencyUnsatisfied
end
end
def handle_error(e)
Rukawa.logger.error("Error #{self.class} by #{e}")
if retry?(e)
@retry_count += 1
set_state(:waiting)
sleep @retry_wait
@retry_wait = self.class.retry_wait ? self.class.retry_wait : @retry_wait * 2
else
set_state(:error) unless e.is_a?(DependencyUnsatisfied)
raise e
end
end
def retry?(e)
return false unless self.class.retryable
type_condition = case self.class.retry_exception_type
when Array
self.class.retry_exception_type.include?(e.class)
when Class
e.is_a?(self.class.retry_exception_type)
when nil
!e.is_a?(DependencyUnsatisfied)
end
type_condition && (self.class.retry_limit.nil? || self.class.retry_limit == 0 || @retry_count < self.class.retry_limit)
end
def store(key, value)
@context.store[self.class] ||= Concurrent::Hash.new
@context.store[self.class][key] = value
end
def fetch(job, key)
job = job.is_a?(String) ? Object.const_get(job) : job
raise TypeError, "job must be a Class" unless job.is_a?(Class)
if @context.store[job]
@context.store[job][key]
end
end
def acquire_resource
@context.semaphore.acquire(resource_count) if resource_count > 0
yield
ensure
@context.semaphore.release(resource_count) if resource_count > 0
end
end
end