lib/bluepill/process_journal.rb
require 'bluepill/system'
module Bluepill
module ProcessJournal
module_function
class << self
attr_reader :logger
attr_reader :journal_base_dir
def logger=(new_logger)
@logger ||= new_logger
end
def base_dir=(base_dir)
@journal_base_dir ||= File.join(base_dir, 'journals')
FileUtils.mkdir_p(@journal_base_dir) unless File.exist?(@journal_base_dir)
FileUtils.chmod(0777, @journal_base_dir)
end
end
def skip_pid?(pid)
!pid.is_a?(Integer) || pid <= 1
end
def skip_pgid?(pgid)
!pgid.is_a?(Integer) || pgid <= 1
end
# atomic operation on POSIX filesystems, since
# f.flock(File::LOCK_SH) is not available on all platforms
def acquire_atomic_fs_lock(name)
times = 0
name += '.lock'
Dir.mkdir name, 0700
logger.debug("Acquired lock #{name}")
yield
rescue Errno::EEXIST
times += 1
logger.debug("Waiting for lock #{name}")
sleep 1
if times < 10
retry
else
logger.info("Timeout waiting for lock #{name}")
raise "Timeout waiting for lock #{name}"
end
ensure
clear_atomic_fs_lock(name)
end
def clear_all_atomic_fs_locks
Dir['.*.lock'].each do |f|
System.delete_if_exists(f) if File.directory?(f)
end
end
def pid_journal_filename(journal_name)
File.join(@journal_base_dir, ".bluepill_pids_journal.#{journal_name}")
end
def pgid_journal_filename(journal_name)
File.join(@journal_base_dir, ".bluepill_pgids_journal.#{journal_name}")
end
def pid_journal(filename)
logger.debug("pid journal file: #{filename}")
result = File.open(filename, 'r').readlines.collect(&:to_i).reject { |pid| skip_pid?(pid) }
logger.debug("pid journal = #{result.join(' ')}")
result
rescue Errno::ENOENT
[]
end
def pgid_journal(filename)
logger.debug("pgid journal file: #{filename}")
result = File.open(filename, 'r').readlines.collect(&:to_i).reject { |pgid| skip_pgid?(pgid) }
logger.debug("pgid journal = #{result.join(' ')}")
result
rescue Errno::ENOENT
[]
end
def clear_atomic_fs_lock(name)
return unless File.directory?(name)
Dir.rmdir(name)
logger.debug("Cleared lock #{name}")
end
def kill_all_from_all_journals
pids = Dir['.bluepill_pids_journal.*'].collect { |p| p.sub(/^\.bluepill_pids_journal\./, '') }
pids.reject! { |p| p =~ /\.lock$/ }
pids.each do |journal_name|
kill_all_from_journal(journal_name)
end
end
def kill_all_from_journal(journal_name)
kill_all_pids_from_journal(journal_name)
kill_all_pgids_from_journal(journal_name)
end
def kill_all_pgids_from_journal(journal_name)
filename = pgid_journal_filename(journal_name)
j = pgid_journal(filename)
if !j.empty?
acquire_atomic_fs_lock(filename) do
j.each do |pgid|
begin
::Process.kill('TERM', -pgid)
logger.info("Termed old process group #{pgid}")
rescue Errno::ESRCH
logger.debug("Unable to term missing process group #{pgid}")
end
end
if j.count { |pgid| System.pid_alive?(pgid) } > 1
sleep(1)
j.each do |pgid|
begin
::Process.kill('KILL', -pgid)
logger.info("Killed old process group #{pgid}")
rescue Errno::ESRCH
logger.debug("Unable to kill missing process group #{pgid}")
end
end
end
System.delete_if_exists(filename) # reset journal
logger.debug('Journal cleanup completed')
end
else
logger.debug('No previous process journal - Skipping cleanup')
end
end
def kill_all_pids_from_journal(journal_name)
filename = pid_journal_filename(journal_name)
j = pid_journal(filename)
if !j.empty?
acquire_atomic_fs_lock(filename) do
j.each do |pid|
begin
::Process.kill('TERM', pid)
logger.info("Termed old process #{pid}")
rescue Errno::ESRCH
logger.debug("Unable to term missing process #{pid}")
end
end
if j.count { |pid| System.pid_alive?(pid) } > 1
sleep(1)
j.each do |pid|
begin
::Process.kill('KILL', pid)
logger.info("Killed old process #{pid}")
rescue Errno::ESRCH
logger.debug("Unable to kill missing process #{pid}")
end
end
end
System.delete_if_exists(filename) # reset journal
logger.debug('Journal cleanup completed')
end
else
logger.debug('No previous process journal - Skipping cleanup')
end
end
def append_pgid_to_journal(journal_name, pgid)
if skip_pgid?(pgid)
logger.debug("Skipping invalid pgid #{pgid} (class #{pgid.class})")
return
end
filename = pgid_journal_filename(journal_name)
acquire_atomic_fs_lock(filename) do
if pgid_journal(filename).include?(pgid)
logger.debug("Skipping duplicate pgid #{pgid} already in journal #{journal_name}")
else
logger.debug("Saving pgid #{pgid} to process journal #{journal_name}")
File.open(filename, 'a+', 0600) { |f| f.puts(pgid) }
logger.info("Saved pgid #{pgid} to journal #{journal_name}")
logger.debug("Journal now = #{File.open(filename, 'r').read}")
end
end
end
def append_pid_to_journal(journal_name, pid)
begin
append_pgid_to_journal(journal_name, ::Process.getpgid(pid))
rescue Errno::ESRCH
end
if skip_pid?(pid)
logger.debug("Skipping invalid pid #{pid} (class #{pid.class})")
return
end
filename = pid_journal_filename(journal_name)
acquire_atomic_fs_lock(filename) do
if pid_journal(filename).include?(pid)
logger.debug("Skipping duplicate pid #{pid} already in journal #{journal_name}")
else
logger.debug("Saving pid #{pid} to process journal #{journal_name}")
File.open(filename, 'a+', 0600) { |f| f.puts(pid) }
logger.info("Saved pid #{pid} to journal #{journal_name}")
logger.debug("Journal now = #{File.open(filename, 'r').read}")
end
end
end
end
end