lib/bluepill/process.rb
# fixes problem with loading on systems with rubyist-aasm installed
gem 'state_machine'
require 'state_machine'
require 'daemons'
require 'bluepill/system'
require 'bluepill/process_journal'
module Bluepill
class Process
CONFIGURABLE_ATTRIBUTES = [
:pre_start_command,
:start_command,
:stop_command,
:restart_command,
:stdout,
:stderr,
:stdin,
:daemonize,
:pid_file,
:working_dir,
:environment,
:start_grace_time,
:stop_grace_time,
:restart_grace_time,
:uid,
:gid,
:cache_actual_pid,
:monitor_children,
:child_process_factory,
:pid_command,
:auto_start,
:supplementary_groups,
:stop_signals,
:on_start_timeout,
:group_start_noblock,
:group_restart_noblock,
:group_stop_noblock,
:group_unmonitor_noblock,
].freeze
attr_accessor :name, :watches, :triggers, :logger, :skip_ticks_until, :process_running
attr_accessor(*CONFIGURABLE_ATTRIBUTES)
attr_reader :children, :statistics
state_machine initial: :unmonitored do
# These are the idle states, i.e. only an event (either external or internal) will trigger a transition.
# The distinction between down and unmonitored is that down
# means we know it is not running and unmonitored is that we don't care if it's running.
state :unmonitored, :up, :down
# These are transitionary states, we expect the process to change state after a certain period of time.
state :starting, :stopping, :restarting
event :tick do
transition starting: :up, if: :process_running?
transition starting: :down, unless: :process_running?
transition up: :up, if: :process_running?
transition up: :down, unless: :process_running?
# The process failed to die after entering the stopping state. Change the state to reflect
# reality.
transition stopping: :up, if: :process_running?
transition stopping: :down, unless: :process_running?
transition down: :up, if: :process_running?
transition down: :starting, unless: :process_running?
transition restarting: :up, if: :process_running?
transition restarting: :down, unless: :process_running?
end
event :start do
transition [:unmonitored, :down] => :starting
end
event :stop do
transition up: :stopping
end
event :unmonitor do
transition any => :unmonitored
end
event :restart do
transition [:up, :down] => :restarting
end
before_transition any => any, do: :notify_triggers
before_transition stopping: any, do: :clean_threads
after_transition any => :starting, do: :start_process
after_transition any => :stopping, do: :stop_process
after_transition any => :restarting, do: :restart_process
after_transition any => any, do: :record_transition
end
def initialize(process_name, checks, options = {})
@name = process_name
@event_mutex = Monitor.new
@watches = []
@triggers = []
@children = []
@threads = []
@statistics = ProcessStatistics.new
@actual_pid = options[:actual_pid]
self.logger = options[:logger]
checks.each do |name, opts|
if Trigger[name]
add_trigger(name, opts)
else
add_watch(name, opts)
end
end
# These defaults are overriden below if it's configured to be something else.
@monitor_children = false
@cache_actual_pid = true
@start_grace_time = @stop_grace_time = @restart_grace_time = 3
@environment = {}
@on_start_timeout = 'start'
@group_start_noblock = @group_stop_noblock = @group_restart_noblock = @group_unmonitor_noblock = true
CONFIGURABLE_ATTRIBUTES.each do |attribute_name|
send("#{attribute_name}=", options[attribute_name]) if options.key?(attribute_name)
end
# Let state_machine do its initialization stuff
super() # no arguments intentional
end
def tick
return if skipping_ticks?
self.skip_ticks_until = nil
# clear the memoization per tick
@process_running = nil
# Deal with thread cleanup here since the stopping state isn't used
clean_threads if unmonitored?
# run state machine transitions
super
return unless up?
run_watches
return unless monitor_children?
refresh_children!
children.each(&:tick)
end
def logger=(logger)
@logger = logger
watches.each { |w| w.logger = logger }
triggers.each { |t| t.logger = logger }
end
# State machine methods
def dispatch!(event, reason = nil)
@event_mutex.synchronize do
@statistics.record_event(event, reason)
send(event.to_s)
end
end
def record_transition(transition)
return if transition.loopback?
@transitioned = true
# When a process changes state, we should clear the memory of all the watches
watches.each(&:clear_history!)
# Also, when a process changes state, we should re-populate its child list
if monitor_children?
logger.warning 'Clearing child list'
children.clear
end
logger.info "Going from #{transition.from_name} => #{transition.to_name}"
end
def notify_triggers(transition)
triggers.each do |trigger|
begin
trigger.notify(transition)
rescue => e
logger.err e.backtrace
raise e
end
end
end
# Watch related methods
def add_watch(name, options = {})
watches << ConditionWatch.new(name, options.merge(logger: logger))
end
def add_trigger(name, options = {})
triggers << Trigger[name].new(self, options.merge(logger: logger))
end
def run_watches
now = Time.now.to_i
threads = watches.collect do |watch|
[watch, Thread.new { Thread.current[:events] = watch.run(actual_pid, now) }]
end
@transitioned = false
threads.each_with_object([]) do |(watch, thread), events|
thread.join
next if thread[:events].size.zero?
logger.info "#{watch.name} dispatched: #{thread[:events].join(',')}"
thread[:events].each do |event|
events << [event, watch.to_s]
end
end.each do |event, reason| # rubocop:disable Style/MultilineBlockChain
break if @transitioned
dispatch!(event, reason)
end
end
def determine_initial_state
if process_running?(true)
self.state = 'up'
return
end
self.state = (auto_start == false) ? 'unmonitored' : 'down' # we need to check for false value
end
def handle_user_command(cmd)
case cmd
when 'start'
if process_running?(true)
logger.warning('Refusing to re-run start command on an already running process.')
else
dispatch!(:start, 'user initiated')
end
when 'stop'
stop_process
dispatch!(:unmonitor, 'user initiated')
when 'restart'
restart_process
when 'unmonitor'
# When the user issues an unmonitor cmd, reset any triggers so that
# scheduled events gets cleared
triggers.each(&:reset!)
dispatch!(:unmonitor, 'user initiated')
end
end
# System Process Methods
def process_running?(force = false)
@process_running = nil if force # clear existing state if forced
@process_running ||= signal_process(0)
# the process isn't running, so we should clear the PID
clear_pid unless @process_running
@process_running
end
def start_process
ProcessJournal.kill_all_from_journal(name) # be sure nothing else is running from previous runs
pre_start_process
logger.warning "Executing start command: #{start_command}"
if daemonize?
daemon_id = System.daemonize(start_command, system_command_options)
if daemon_id
ProcessJournal.append_pid_to_journal(name, daemon_id)
children.each do |child|
ProcessJournal.append_pid_to_journal(name, child.actual_pid)
end if monitor_children?
end
daemon_id
else
# This is a self-daemonizing process
with_timeout(start_grace_time, on_start_timeout) do
result = System.execute_blocking(start_command, system_command_options)
unless result[:exit_code].zero?
logger.warning 'Start command execution returned non-zero exit code:'
logger.warning result.inspect
end
end
end
skip_ticks_for(start_grace_time)
end
def pre_start_process
return unless pre_start_command
logger.warning "Executing pre start command: #{pre_start_command}"
result = System.execute_blocking(pre_start_command, system_command_options)
return if result[:exit_code].zero?
logger.warning 'Pre start command execution returned non-zero exit code:'
logger.warning result.inspect
end
def stop_process
if monitor_children
System.get_children(actual_pid).each do |child_pid|
ProcessJournal.append_pid_to_journal(name, child_pid)
end
end
if stop_command
cmd = prepare_command(stop_command)
logger.warning "Executing stop command: #{cmd}"
with_timeout(stop_grace_time, 'stop') do
result = System.execute_blocking(cmd, system_command_options)
unless result[:exit_code].zero?
logger.warning 'Stop command execution returned non-zero exit code:'
logger.warning result.inspect
end
end
cleanup_process
elsif stop_signals
# issue stop signals with configurable delay between each
logger.warning "Sending stop signals to #{actual_pid}"
@threads << Thread.new(self, stop_signals.clone) do |process, stop_signals|
signal = stop_signals.shift
logger.info "Sending signal #{signal} to #{process.actual_pid}"
process.signal_process(signal) # send first signal
until stop_signals.empty?
# we already checked to make sure stop_signals had an odd number of items
delay = stop_signals.shift
signal = stop_signals.shift
logger.debug "Sleeping for #{delay} seconds"
sleep delay
# break unless signal_process(0) #break unless the process can be reached
unless process.signal_process(0)
logger.debug 'Process has terminated.'
break
end
logger.info "Sending signal #{signal} to #{process.actual_pid}"
process.signal_process(signal)
end
cleanup_process
end
else
logger.warning "Executing default stop command. Sending TERM signal to #{actual_pid}"
signal_process('TERM')
cleanup_process
end
skip_ticks_for(stop_grace_time)
end
def restart_process
if restart_command
cmd = prepare_command(restart_command)
logger.warning "Executing restart command: #{cmd}"
with_timeout(restart_grace_time, 'restart') do
result = System.execute_blocking(cmd, system_command_options)
unless result[:exit_code].zero?
logger.warning 'Restart command execution returned non-zero exit code:'
logger.warning result.inspect
end
end
skip_ticks_for(restart_grace_time)
else
logger.warning 'No restart_command specified. Must stop and start to restart'
stop_process
start_process
end
end
def cleanup_process
ProcessJournal.kill_all_from_journal(name) # finish cleanup
unlink_pid # TODO: we only write the pid file if we daemonize, should we only unlink it if we daemonize?
end
def clean_threads
@threads.each(&:kill)
@threads.clear
end
def daemonize?
!!daemonize
end
def monitor_children?
!!monitor_children
end
def signal_process(code)
code = code.to_s.upcase if code.is_a?(String) || code.is_a?(Symbol)
::Process.kill(code, actual_pid)
true
rescue => e
logger.err "Failed to signal process #{actual_pid} with code #{code}: #{e}"
false
end
def cache_actual_pid?
!!@cache_actual_pid
end
def actual_pid
pid_command ? pid_from_command : pid_from_file
end
def pid_from_file
return @actual_pid if cache_actual_pid? && @actual_pid
@actual_pid = begin
if pid_file
if File.exist?(pid_file)
str = File.read(pid_file)
str.to_i unless str.empty?
else
logger.warning("pid_file #{pid_file} does not exist or cannot be read")
nil
end
end
end
end
def pid_from_command
pid = `#{pid_command}`.strip
(pid =~ /\A\d+\z/) ? pid.to_i : nil
end
def actual_pid=(pid)
ProcessJournal.append_pid_to_journal(name, pid) # be sure to always log the pid
@actual_pid = pid
end
def clear_pid
@actual_pid = nil
end
def unlink_pid
System.delete_if_exists(pid_file)
end
# Internal State Methods
def skip_ticks_for(seconds)
# TODO: should this be addative or longest wins?
# i.e. if two calls for skip_ticks_for come in for 5 and 10, should it skip for 10 or 15?
self.skip_ticks_until = (skip_ticks_until || Time.now.to_i) + seconds.to_i
end
def skipping_ticks?
skip_ticks_until && skip_ticks_until > Time.now.to_i
end
def refresh_children!
# First prune the list of dead children
@children.delete_if { |child| !child.process_running?(true) }
# Add new found children to the list
new_children_pids = System.get_children(actual_pid) - @children.collect(&:actual_pid)
unless new_children_pids.empty?
logger.info "Existing children: #{@children.collect(&:actual_pid).join(',')}. Got new children: #{new_children_pids.inspect} for #{actual_pid}"
end
# Construct a new process wrapper for each new found children
new_children_pids.each do |child_pid|
ProcessJournal.append_pid_to_journal(name, child_pid) if daemonize?
child_name = "<child(pid:#{child_pid})>"
logger = self.logger.prefix_with(child_name)
child = child_process_factory.create_child_process(child_name, child_pid, logger)
@children << child
end
end
def prepare_command(command)
command.to_s.gsub('{{PID}}', actual_pid.to_s)
end
def system_command_options
{
uid: uid,
gid: gid,
working_dir: working_dir,
environment: environment,
pid_file: pid_file,
logger: logger,
stdin: stdin,
stdout: stdout,
stderr: stderr,
supplementary_groups: supplementary_groups,
}
end
def with_timeout(secs, next_state = nil, &blk)
# Attempt to execute the passed block. If the block takes
# too long, transition to the indicated next state.
Timeout.timeout(secs.to_f, &blk)
rescue Timeout::Error
logger.err 'Execution is taking longer than expected.'
logger.err 'Did you forget to tell bluepill to daemonize this process?'
dispatch!(next_state)
end
end
end