lib/rodbot/dispatcher.rb
# frozen-string-literal: true
module Rodbot
# Dispatcher infrastructure to run and supervise tasks
class Dispatcher
# Which signals detached processes trap in order to exit
TRAPS = %w(INT TERM).freeze
# @return [String] name of the group of tasks
attr_reader :group
# @return [String] registered tasks
attr_reader :tasks
# @param group [String] name of the group of tasks
# @param refork_delay [Integer] seconds to wait before re-forking dead tasks
def initialize(group, refork_delay: 5)
@group, @refork_delay = group, refork_delay
@tasks = {}
end
# Register a task
#
# @param task [String] task name
# @yield block for the task to run
# @return self
def register(task)
tasks[task] = Proc.new do
detach task
unless Rodbot::Log.std?
logger = Rodbot::Log.logger("dispatcher #{group}.#{task}]")
$stdout = Rodbot::Log::LoggerIO.new(logger, Logger::INFO)
$stderr = Rodbot::Log::LoggerIO.new(logger, Logger::WARN)
$stdin.reopen(File::NULL)
end
yield
end
self
end
# Run the registered tasks
#
# @param daemonize [Boolean] whether to run and supervise the tasks in
# the background
def run(daemonize: false)
if daemonize
Process.daemon(false, true)
detach 'supervisor'
dispatch
supervise
else
Process.setproctitle("#{group}.supervisor")
dispatch
sleep
end
ensure
cleanup
end
# Interrupt the registered tasks
def interrupt
Process.kill('INT', pid_file('supervisor').read.to_i)
rescue Errno::ESRCH
end
private
# Dispatch all registered tasks
def dispatch
tasks.each_value { fork(&_1) }
end
# Supervise all dispatched tasks
def supervise
loop do
pid = Process.wait
sleep @refork_delay
fork(&tasks[task(pid)])
end
end
# Remove all artefacts
def cleanup
Rodbot.env.tmp.glob("#{group}.*.pid").each do |pid_file|
pid = pid_file.read.to_i
Process.kill('INT', pid) unless pid == Process.pid
rescue Errno::ESRCH
ensure
pid_file.delete
end
end
# Perform operations to properly detach the task
#
# @param task [String] task name
def detach(task)
pid_file(task).write Process.pid
Process.setproctitle("#{group}.#{task}")
TRAPS.each { trap(_1) { exit } }
end
# PID file of the given task should be
#
# @param task [String] task name
# @return [Pathname] PID file
def pid_file(task)
Rodbot.env.tmp.join("#{group}.#{task}.pid")
end
# Fetch a task name for a process ID from the PID files
#
# @param pid [Integer] process ID
# @return [String] task name
def task(pid)
Rodbot.env.tmp.glob("#{group}.*.pid").find do |pid_file|
pid_file.read.to_i == pid
end.then do |pid_file|
pid_file.basename.to_s.split('.')[1] if pid_file
end
end
end
end