lib/tumugi/executor/local_executor.rb
require 'much-timeout'
require 'concurrent'
require 'tumugi'
require 'tumugi/error'
module Tumugi
module Executor
class LocalExecutor
def initialize(dag, worker_num: 1, run_all: false)
@dag = dag
@main_task = dag.tsort.last
@worker_num = worker_num
@run_all = run_all
@mutex = Mutex.new
end
def execute
pool = Concurrent::ThreadPoolExecutor.new(
min_threads: @worker_num,
max_threads: @worker_num
)
setup_task_queue(@dag)
loop do
task = dequeue_task
break if task.nil?
Concurrent::Future.execute(executor: pool) do
if !task.runnable?(Time.now)
logger.trace { "task_not_runnable: #{task.id}" }
enqueue_task(task)
else
begin
logger.info { "task_start: #{task.id}" }
task.trigger!(:start)
MuchTimeout.optional_timeout(task_timeout(task), Tumugi::TimeoutError) do
task.run
end
task.trigger!(:complete)
logger.info { "task_#{task.state}: #{task.id}, elapsed_time: #{task.elapsed_time}" }
task.on_success
rescue => e
handle_error(task, e)
end
end
end
end
pool.shutdown
pool.wait_for_termination
@dag.tsort.all? { |t| t.success? }
end
private
def task_timeout(task)
timeout = task.timeout || Tumugi.config.timeout
timeout = nil if !timeout.nil? && timeout == 0 # for backward compatibility
timeout
end
def setup_task_queue(dag)
@queue = []
dag.tsort.each { |t| enqueue_task(t) }
@queue
end
def dequeue_task
loop do
task = @mutex.synchronize {
logger.trace { "task_queue_dump: #{@queue.map(&:id)}" } unless @queue.empty?
@queue.shift
}
if task.nil?
if @main_task.finished?
break nil
else
sleep(0.1)
end
else
logger.trace { "task_queue_dequeue: #{task.id}" }
if task.requires_failed?
task.trigger!(:requires_fail)
logger.info { "task_#{task.state}: #{task.id} has failed requires task, elapsed_time: #{task.elapsed_time}" }
elsif task.completed? && !@run_all
task.trigger!(:skip)
logger.info { "task_#{task.state}: #{task.id} is already completed, elapsed_time: #{task.elapsed_time}" }
else
break task
end
end
end
end
def enqueue_task(task)
logger.trace { "task_queue_enqueue: #{task.id}" }
@mutex.synchronize { @queue.push(task) }
end
def handle_error(task, err)
if task.retry
task.trigger!(:pend)
logger.info { "task_#{task.state}: #{task.id} failed, elapsed_time: #{task.elapsed_time}" }
logger.error { "#{err.class}: '#{err.message}' - #{task.tries} tries and wait #{task.retry_interval} seconds until the next try." }
enqueue_task(task)
task.on_retry
else
task.trigger!(:fail)
logger.info { "task_#{task.state}: #{task.id} failed, elapsed_time: #{task.elapsed_time}" }
logger.error { "#{err.class}: '#{err.message}' - #{task.tries} tries and reached max retry count, so task #{task.id} failed." }
task.on_failure
end
logger.error { err.backtrace.join("\n") }
end
def logger
@logger ||= Tumugi::ScopedLogger.new("tumugi-executor")
end
end
end
end