bluepill-rb/bluepill

View on GitHub
lib/bluepill/process.rb

Summary

Maintainability
D
2 days
Test Coverage
# fixes problem with loading on systems with rubyist-aasm installed
gem 'state_machine'

require 'state_machine'
require 'daemons'
require 'bluepill/system'
require 'bluepill/process_journal'

module Bluepill
  class Process
    CONFIGURABLE_ATTRIBUTES = [
      :pre_start_command,
      :start_command,
      :stop_command,
      :restart_command,

      :stdout,
      :stderr,
      :stdin,

      :daemonize,
      :pid_file,
      :working_dir,
      :environment,

      :start_grace_time,
      :stop_grace_time,
      :restart_grace_time,

      :uid,
      :gid,

      :cache_actual_pid,

      :monitor_children,
      :child_process_factory,

      :pid_command,
      :auto_start,

      :supplementary_groups,

      :stop_signals,

      :on_start_timeout,

      :group_start_noblock,
      :group_restart_noblock,
      :group_stop_noblock,
      :group_unmonitor_noblock,

    ].freeze

    attr_accessor :name, :watches, :triggers, :logger, :skip_ticks_until, :process_running
    attr_accessor(*CONFIGURABLE_ATTRIBUTES)
    attr_reader :children, :statistics

    state_machine initial: :unmonitored do
      # These are the idle states, i.e. only an event (either external or internal) will trigger a transition.
      # The distinction between down and unmonitored is that down
      # means we know it is not running and unmonitored is that we don't care if it's running.
      state :unmonitored, :up, :down

      # These are transitionary states, we expect the process to change state after a certain period of time.
      state :starting, :stopping, :restarting

      event :tick do
        transition starting: :up, if: :process_running?
        transition starting: :down, unless: :process_running?

        transition up: :up, if: :process_running?
        transition up: :down, unless: :process_running?

        # The process failed to die after entering the stopping state. Change the state to reflect
        # reality.
        transition stopping: :up, if: :process_running?
        transition stopping: :down, unless: :process_running?

        transition down: :up, if: :process_running?
        transition down: :starting, unless: :process_running?

        transition restarting: :up, if: :process_running?
        transition restarting: :down, unless: :process_running?
      end

      event :start do
        transition [:unmonitored, :down] => :starting
      end

      event :stop do
        transition up: :stopping
      end

      event :unmonitor do
        transition any => :unmonitored
      end

      event :restart do
        transition [:up, :down] => :restarting
      end

      before_transition any => any, do: :notify_triggers
      before_transition stopping: any, do: :clean_threads

      after_transition any => :starting, do: :start_process
      after_transition any => :stopping, do: :stop_process
      after_transition any => :restarting, do: :restart_process

      after_transition any => any, do: :record_transition
    end

    def initialize(process_name, checks, options = {})
      @name = process_name
      @event_mutex = Monitor.new
      @watches = []
      @triggers = []
      @children = []
      @threads = []
      @statistics = ProcessStatistics.new
      @actual_pid = options[:actual_pid]
      self.logger = options[:logger]

      checks.each do |name, opts|
        if Trigger[name]
          add_trigger(name, opts)
        else
          add_watch(name, opts)
        end
      end

      # These defaults are overriden below if it's configured to be something else.
      @monitor_children = false
      @cache_actual_pid = true
      @start_grace_time = @stop_grace_time = @restart_grace_time = 3
      @environment = {}
      @on_start_timeout = 'start'
      @group_start_noblock = @group_stop_noblock = @group_restart_noblock = @group_unmonitor_noblock = true

      CONFIGURABLE_ATTRIBUTES.each do |attribute_name|
        send("#{attribute_name}=", options[attribute_name]) if options.key?(attribute_name)
      end

      # Let state_machine do its initialization stuff
      super() # no arguments intentional
    end

    def tick
      return if skipping_ticks?
      self.skip_ticks_until = nil

      # clear the memoization per tick
      @process_running = nil

      # Deal with thread cleanup here since the stopping state isn't used
      clean_threads if unmonitored?

      # run state machine transitions
      super

      return unless up?
      run_watches

      return unless monitor_children?
      refresh_children!
      children.each(&:tick)
    end

    def logger=(logger)
      @logger = logger
      watches.each { |w| w.logger = logger }
      triggers.each { |t| t.logger = logger }
    end

    # State machine methods
    def dispatch!(event, reason = nil)
      @event_mutex.synchronize do
        @statistics.record_event(event, reason)
        send(event.to_s)
      end
    end

    def record_transition(transition)
      return if transition.loopback?
      @transitioned = true

      # When a process changes state, we should clear the memory of all the watches
      watches.each(&:clear_history!)

      # Also, when a process changes state, we should re-populate its child list
      if monitor_children?
        logger.warning 'Clearing child list'
        children.clear
      end
      logger.info "Going from #{transition.from_name} => #{transition.to_name}"
    end

    def notify_triggers(transition)
      triggers.each do |trigger|
        begin
          trigger.notify(transition)
        rescue => e
          logger.err e.backtrace
          raise e
        end
      end
    end

    # Watch related methods
    def add_watch(name, options = {})
      watches << ConditionWatch.new(name, options.merge(logger: logger))
    end

    def add_trigger(name, options = {})
      triggers << Trigger[name].new(self, options.merge(logger: logger))
    end

    def run_watches
      now = Time.now.to_i

      threads = watches.collect do |watch|
        [watch, Thread.new { Thread.current[:events] = watch.run(actual_pid, now) }]
      end

      @transitioned = false

      threads.each_with_object([]) do |(watch, thread), events|
        thread.join
        next if thread[:events].size.zero?
        logger.info "#{watch.name} dispatched: #{thread[:events].join(',')}"
        thread[:events].each do |event|
          events << [event, watch.to_s]
        end
      end.each do |event, reason| # rubocop:disable Style/MultilineBlockChain
        break if @transitioned
        dispatch!(event, reason)
      end
    end

    def determine_initial_state
      if process_running?(true)
        self.state = 'up'
        return
      end
      self.state = (auto_start == false) ? 'unmonitored' : 'down' # we need to check for false value
    end

    def handle_user_command(cmd)
      case cmd
      when 'start'
        if process_running?(true)
          logger.warning('Refusing to re-run start command on an already running process.')
        else
          dispatch!(:start, 'user initiated')
        end
      when 'stop'
        stop_process
        dispatch!(:unmonitor, 'user initiated')
      when 'restart'
        restart_process
      when 'unmonitor'
        # When the user issues an unmonitor cmd, reset any triggers so that
        # scheduled events gets cleared
        triggers.each(&:reset!)
        dispatch!(:unmonitor, 'user initiated')
      end
    end

    # System Process Methods
    def process_running?(force = false)
      @process_running = nil if force # clear existing state if forced

      @process_running ||= signal_process(0)
      # the process isn't running, so we should clear the PID
      clear_pid unless @process_running
      @process_running
    end

    def start_process
      ProcessJournal.kill_all_from_journal(name) # be sure nothing else is running from previous runs
      pre_start_process
      logger.warning "Executing start command: #{start_command}"
      if daemonize?
        daemon_id = System.daemonize(start_command, system_command_options)
        if daemon_id
          ProcessJournal.append_pid_to_journal(name, daemon_id)
          children.each do |child|
            ProcessJournal.append_pid_to_journal(name, child.actual_pid)
          end if monitor_children?
        end
        daemon_id
      else
        # This is a self-daemonizing process
        with_timeout(start_grace_time, on_start_timeout) do
          result = System.execute_blocking(start_command, system_command_options)

          unless result[:exit_code].zero?
            logger.warning 'Start command execution returned non-zero exit code:'
            logger.warning result.inspect
          end
        end
      end

      skip_ticks_for(start_grace_time)
    end

    def pre_start_process
      return unless pre_start_command
      logger.warning "Executing pre start command: #{pre_start_command}"
      result = System.execute_blocking(pre_start_command, system_command_options)
      return if result[:exit_code].zero?
      logger.warning 'Pre start command execution returned non-zero exit code:'
      logger.warning result.inspect
    end

    def stop_process
      if monitor_children
        System.get_children(actual_pid).each do |child_pid|
          ProcessJournal.append_pid_to_journal(name, child_pid)
        end
      end

      if stop_command
        cmd = prepare_command(stop_command)
        logger.warning "Executing stop command: #{cmd}"

        with_timeout(stop_grace_time, 'stop') do
          result = System.execute_blocking(cmd, system_command_options)

          unless result[:exit_code].zero?
            logger.warning 'Stop command execution returned non-zero exit code:'
            logger.warning result.inspect
          end
        end
        cleanup_process
      elsif stop_signals
        # issue stop signals with configurable delay between each
        logger.warning "Sending stop signals to #{actual_pid}"
        @threads << Thread.new(self, stop_signals.clone) do |process, stop_signals|
          signal = stop_signals.shift
          logger.info "Sending signal #{signal} to #{process.actual_pid}"
          process.signal_process(signal) # send first signal

          until stop_signals.empty?
            # we already checked to make sure stop_signals had an odd number of items
            delay = stop_signals.shift
            signal = stop_signals.shift

            logger.debug "Sleeping for #{delay} seconds"
            sleep delay
            # break unless signal_process(0) #break unless the process can be reached
            unless process.signal_process(0)
              logger.debug 'Process has terminated.'
              break
            end
            logger.info "Sending signal #{signal} to #{process.actual_pid}"
            process.signal_process(signal)
          end
          cleanup_process
        end
      else
        logger.warning "Executing default stop command. Sending TERM signal to #{actual_pid}"
        signal_process('TERM')
        cleanup_process
      end

      skip_ticks_for(stop_grace_time)
    end

    def restart_process
      if restart_command
        cmd = prepare_command(restart_command)

        logger.warning "Executing restart command: #{cmd}"

        with_timeout(restart_grace_time, 'restart') do
          result = System.execute_blocking(cmd, system_command_options)

          unless result[:exit_code].zero?
            logger.warning 'Restart command execution returned non-zero exit code:'
            logger.warning result.inspect
          end
        end

        skip_ticks_for(restart_grace_time)
      else
        logger.warning 'No restart_command specified. Must stop and start to restart'
        stop_process
        start_process
      end
    end

    def cleanup_process
      ProcessJournal.kill_all_from_journal(name) # finish cleanup
      unlink_pid # TODO: we only write the pid file if we daemonize, should we only unlink it if we daemonize?
    end

    def clean_threads
      @threads.each(&:kill)
      @threads.clear
    end

    def daemonize?
      !!daemonize
    end

    def monitor_children?
      !!monitor_children
    end

    def signal_process(code)
      code = code.to_s.upcase if code.is_a?(String) || code.is_a?(Symbol)
      ::Process.kill(code, actual_pid)
      true
    rescue => e
      logger.err "Failed to signal process #{actual_pid} with code #{code}: #{e}"
      false
    end

    def cache_actual_pid?
      !!@cache_actual_pid
    end

    def actual_pid
      pid_command ? pid_from_command : pid_from_file
    end

    def pid_from_file
      return @actual_pid if cache_actual_pid? && @actual_pid
      @actual_pid = begin
        if pid_file
          if File.exist?(pid_file)
            str = File.read(pid_file)
            str.to_i unless str.empty?
          else
            logger.warning("pid_file #{pid_file} does not exist or cannot be read")
            nil
          end
        end
      end
    end

    def pid_from_command
      pid = `#{pid_command}`.strip
      (pid =~ /\A\d+\z/) ? pid.to_i : nil
    end

    def actual_pid=(pid)
      ProcessJournal.append_pid_to_journal(name, pid) # be sure to always log the pid
      @actual_pid = pid
    end

    def clear_pid
      @actual_pid = nil
    end

    def unlink_pid
      System.delete_if_exists(pid_file)
    end

    # Internal State Methods
    def skip_ticks_for(seconds)
      # TODO: should this be addative or longest wins?
      #       i.e. if two calls for skip_ticks_for come in for 5 and 10, should it skip for 10 or 15?
      self.skip_ticks_until = (skip_ticks_until || Time.now.to_i) + seconds.to_i
    end

    def skipping_ticks?
      skip_ticks_until && skip_ticks_until > Time.now.to_i
    end

    def refresh_children!
      # First prune the list of dead children
      @children.delete_if { |child| !child.process_running?(true) }

      # Add new found children to the list
      new_children_pids = System.get_children(actual_pid) - @children.collect(&:actual_pid)

      unless new_children_pids.empty?
        logger.info "Existing children: #{@children.collect(&:actual_pid).join(',')}. Got new children: #{new_children_pids.inspect} for #{actual_pid}"
      end

      # Construct a new process wrapper for each new found children
      new_children_pids.each do |child_pid|
        ProcessJournal.append_pid_to_journal(name, child_pid) if daemonize?
        child_name = "<child(pid:#{child_pid})>"
        logger = self.logger.prefix_with(child_name)

        child = child_process_factory.create_child_process(child_name, child_pid, logger)
        @children << child
      end
    end

    def prepare_command(command)
      command.to_s.gsub('{{PID}}', actual_pid.to_s)
    end

    def system_command_options
      {
        uid: uid,
        gid: gid,
        working_dir: working_dir,
        environment: environment,
        pid_file: pid_file,
        logger: logger,
        stdin: stdin,
        stdout: stdout,
        stderr: stderr,
        supplementary_groups: supplementary_groups,
      }
    end

    def with_timeout(secs, next_state = nil, &blk)
      # Attempt to execute the passed block. If the block takes
      # too long, transition to the indicated next state.
      Timeout.timeout(secs.to_f, &blk)
    rescue Timeout::Error
      logger.err 'Execution is taking longer than expected.'
      logger.err 'Did you forget to tell bluepill to daemonize this process?'
      dispatch!(next_state)
    end
  end
end