lib/miga/daemon.rb
# @package MiGA
# @license Artistic-2.0
require 'miga/project'
require 'miga/common/with_daemon'
require 'miga/daemon/base'
##
# MiGA Daemons handling job submissions.
class MiGA::Daemon < MiGA::MiGA
include MiGA::Daemon::Base
include MiGA::Common::WithDaemon
extend MiGA::Common::WithDaemonClass
class << self
##
# Daemon's home inside the MiGA::Project +project+ or a String with the
# full path to the project's 'daemon' folder
def daemon_home(project)
return project if project.is_a? String
File.join(project.path, 'daemon')
end
end
# MiGA::Project in which the daemon is running
attr_reader :project
# Options used to setup the daemon
attr_reader :options
# Array of jobs next to be executed
attr_reader :jobs_to_run
# Array of jobs currently running
attr_reader :jobs_running
##
# Initialize an unactive daemon for the MiGA::Project +project+. See #daemon
# to wake the daemon. If passed, +json+ must be the path to a daemon
# definition in json format. Otherwise, the project-stored daemon definition
# is used. In either case, missing variables are used as defined in
# ~/.miga_daemon.json.
def initialize(project, json = nil)
@project = project
@runopts = {}
json ||= File.join(project.path, 'daemon/daemon.json')
default_json = File.expand_path('.miga_daemon.json', ENV['MIGA_HOME'])
MiGA::Json.parse(
json, default: File.exist?(default_json) ? default_json : nil
).each { |k, v| runopts(k, v) }
update_format_0
@jobs_to_run = []
@jobs_running = []
end
##
# Path to the daemon home
def daemon_home
self.class.daemon_home(project)
end
##
# Name of the daemon
def daemon_name
"MiGA:#{project.name}"
end
##
# Alias to +project.path+ for compatibility with lairs
def path
project.path
end
##
# Run only in the first loop
def daemon_first_loop
say '-----------------------------------'
say 'MiGA:%s launched' % project.name
say '-----------------------------------'
miga_say "Saving log to: #{output_file}" unless show_log?
say 'Configuration options:'
say @runopts.to_s
load_status
queue_maintenance(true)
end
##
# Run one loop step. Returns a Boolean indicating if the loop should continue
def daemon_loop
l_say(3, 'Daemon loop start')
reload_project
check_datasets or check_project
if shutdown_when_done? && (jobs_running.size + jobs_to_run.size).zero?
say 'Nothing else to do, shutting down'
exit_cleanup
return false
end
flush!
if (loop_i % 12).zero?
purge!
queue_maintenance if (loop_i % (12 * (skip_maintenance + 1))).zero?
end
save_status
sleep(latency)
l_say(3, 'Daemon loop end')
true
end
##
# Queue maintenance tasks as an analysis job
def queue_maintenance(force = false)
return if bypass_maintenance? || (!force && shutdown_when_done?)
say 'Queueing maintenance tasks'
queue_job(:maintenance)
end
##
# Remove temporary files on completion
def exit_cleanup
FileUtils.rm_f(File.join(daemon_home, 'status.json'))
end
##
# Send +msg+ to +say+ as long as +level+ is at most +verbosity+
def l_say(level, *msg)
say(*msg) if verbosity >= level
end
##
# Rename the orginal MiGA::MiGA.say as +miga_say+, allowing
# external reporting since MiGA::Daemon overwrites +say+
alias_method :miga_say, :say
##
# Same as +l_say+ with +level = 1+
def say(*msg)
super(logfh, *msg) if verbosity >= 1
end
##
# Reload the project's metadata
def reload_project
l_say(2, 'Reloading project')
project.load
end
##
# Report status in a JSON file.
def save_status
l_say(2, 'Saving current status')
MiGA::Json.generate(
{ jobs_running: @jobs_running, jobs_to_run: @jobs_to_run },
File.join(daemon_home, 'status.json')
)
end
##
# Load the status of a previous instance.
def load_status
f_path = File.join(daemon_home, 'status.json')
return unless File.size? f_path
say 'Loading previous status in daemon/status.json:'
status = MiGA::Json.parse(f_path)
status.each_key do |i|
status[i].map! do |j|
j.tap do |k|
unless k[:ds].nil? || k[:ds_name] == 'miga-project'
k[:ds] = project.dataset(k[:ds_name])
end
k[:job] = k[:job].to_sym unless k[:job].nil?
end
end
end
@jobs_running = status[:jobs_running]
@jobs_to_run = status[:jobs_to_run]
say "- jobs left running: #{@jobs_running.size}"
purge!
say "- jobs running: #{@jobs_running.size}"
say "- jobs to run: #{@jobs_to_run.size}"
end
##
# Traverse datasets, and returns boolean indicating if at any reference
# datasets are incomplete
def check_datasets
l_say(2, 'Checking datasets')
o = false
project.each_dataset do |ds|
next unless ds.status == :incomplete
next if ds.next_preprocessing(false).nil?
o = true if ds.ref?
queue_job(:d, ds)
end
unless show_log?
n = project.dataset_names.count
k = jobs_to_run.size + jobs_running.size
k -= 1 unless get_job(:maintenance).nil?
advance('Datasets:', n - k, n, false)
miga_say if k == 0
end
o
end
##
# Check if all reference datasets are pre-processed. If yes, check the
# project-level tasks
def check_project
l_say(2, 'Checking project')
# Ignore task if the project has no datasets
return if project.dataset_names.empty?
# Double-check if all datasets are ready
return unless project.done_preprocessing?
# Queue project-level job
to_run = project.next_task(nil, false)
queue_job(:p) unless to_run.nil?
end
##
# Add the task to the internal queue with symbol key +job+. If the task is
# dataset-specific, +ds+ specifies the dataset. To submit jobs to the
# scheduler (or to bash or ssh) see #flush!
def queue_job(job, ds = nil)
return nil unless get_job(job, ds).nil?
ds_name = (ds.nil? ? 'miga-project' : ds.name)
task_name = "#{project.metadata[:name][0..9]}:#{job}:#{ds_name}"
to_run = { ds: ds, ds_name: ds_name, job: job, task_name: task_name }
say 'Queueing %s:%s' % [to_run[:ds_name], to_run[:job]]
@jobs_to_run << to_run
end
##
# Construct the command for the given job definition with current
# daemon settings
def job_cmd(to_run)
what = to_run[:ds].nil? ? :project : :dataset
vars = {
'PROJECT' => project.path,
'RUNTYPE' => runopts_for(:type, what),
'CORES' => ppn(what),
'MIGA' => MiGA::MiGA.root_path
}
vars['DATASET'] = to_run[:ds].name unless to_run[:ds].nil?
log_dir = File.expand_path("daemon/#{to_run[:job]}", project.path)
FileUtils.mkdir_p(log_dir)
var_hsh = {
script: MiGA::MiGA.script_path(
to_run[:job], miga: vars['MIGA'], project: project
),
vars: vars.map do |k, v|
runopts(:var).miga_variables(key: k, value: v)
end.join(runopts_for(:varsep, what)),
cpus: ppn(what),
log: File.join(log_dir, "#{to_run[:ds_name]}.log"),
task_name: to_run[:task_name],
task_name_simple: to_run[:task_name].gsub(/[^A-Za-z0-9_]/, '-'),
miga: File.join(MiGA::MiGA.root_path, 'bin/miga').shellescape
}
runopts_for(:cmd, what).miga_variables(var_hsh)
end
##
# Get the taks with key symbol +job+ in dataset +ds+. For project-wide tasks
# let +ds+ be nil.
def get_job(job, ds = nil)
(jobs_to_run + jobs_running).find do |j|
if ds.nil?
j[:ds].nil? && j[:job] == job
else
!j[:ds].nil? && j[:ds].name == ds.name && j[:job] == job
end
end
end
##
# Remove finished jobs from the internal queue and launch as many as
# possible respecting #maxjobs or #nodelist (if set).
def flush!
# Check for finished jobs
l_say(2, 'Checking for finished jobs')
@jobs_running.select! do |job|
ongoing =
case job[:job].to_s
when 'd'
!job[:ds].nil? && !job[:ds].next_preprocessing(false).nil?
when 'p'
!project.next_task(nil, false).nil?
else
(job[:ds].nil? ? project : job[:ds]).add_result(job[:job], false).nil?
end
say "Completed pid:#{job[:pid]} for #{job[:task_name]}" unless ongoing
ongoing
end
# Avoid single datasets hogging resources
@jobs_to_run.rotate! rand(jobs_to_run.size)
# Prioritize: Project-wide > MiGA Online queries > Other datasets
@jobs_to_run.sort_by! do |job|
job[:ds].nil? ? 1 : job[:ds_name] =~ /^qG_/ ? 2 : 3
end
# Launch as many +jobs_to_run+ as possible
while (hostk = next_host)
break if jobs_to_run.empty?
launch_job(@jobs_to_run.shift, hostk)
end
end
##
# In SSH daemons, retrieve the host index of an available node, nil if none.
# In any other daemons, returns true as long as #maxjobs is not reached
def next_host
return jobs_running.size < maxjobs if runopts(:type) != 'ssh'
allk = (0..nodelist.size - 1).to_a
busyk = jobs_running.map { |k| k[:hostk] }
(allk - busyk).first
end
##
# Remove dead jobs.
def purge!
say 'Probing running jobs'
@jobs_running.select! do |job|
MiGA::MiGA.run_cmd(
runopts(:alive).miga_variables(pid: job[:pid]), return: :output
).chomp.to_i == 1
end
end
##
# Launch the job described by Hash +job+ to +hostk+-th host
def launch_job(job, hostk = nil)
# Execute job
job[:cmd] = job_cmd(job)
MiGA::MiGA.DEBUG "CMD: #{job[:cmd]}"
case runopts(:type)
when 'ssh'
# Remote job
job[:hostk] = hostk
job[:cmd] = job[:cmd].miga_variables(host: nodelist[hostk])
job[:pid] = spawn job[:cmd]
MiGA::MiGA.DEBUG "Detaching PID: #{job[:pid]}"
Process.detach(job[:pid]) unless [nil, '', 0].include?(job[:pid])
when 'bash'
# Local job
job[:pid] = spawn job[:cmd]
MiGA::MiGA.DEBUG "Detaching PID: #{job[:pid]}"
Process.detach(job[:pid]) unless [nil, '', 0].include?(job[:pid])
else
# Schedule cluster job (qsub, msub, slurm)
job[:pid] = MiGA::MiGA.run_cmd(job[:cmd], return: :output).chomp
end
# Check if registered
if [nil, '', 0].include? job[:pid]
job[:pid] = nil
@jobs_to_run << job
say "Unsuccessful #{job[:task_name]}, rescheduling"
else
@jobs_running << job
job_host = " to #{job[:hostk]}:#{nodelist[job[:hostk]]}" if job[:hostk]
say "Spawned pid:#{job[:pid]}#{job_host} for #{job[:task_name]}"
end
end
##
# Update from daemon JSON format 0 to the latest version
def update_format_0
{
cmd: %w[script vars cpus log task_name],
var: %w[key value],
alive: %w[pid],
kill: %w[pid]
}.each do |k, v|
if !runopts(k).nil? && runopts(k) =~ /%(\d+\$)?[ds]/
runopts(
k, runopts(k).gsub(/%(\d+\$)?d/, '%\\1s') % v.map { |i| "{{#{i}}}" }
)
end
end
runopts(:format_version, 1)
end
end