mender/protein

View on GitHub
lib/protein/process.rb

Summary

Maintainability
B
6 hrs
Test Coverage
# -*- encoding : utf-8 -*-
module Protein
  class Process
    delegate :config, :logger, :redis, :callbacks, :to => :Protein
    attr_accessor :running
    attr_accessor :can_fork

    def initialize
      self.can_fork = config.can_fork
      self.running  = false
    end

    def running?
      !!self.running
    end

    def startup
      #trap_signals
      enable_gc_optimizations
      self.running = true
      $stdout.sync = true
      logger.debug "Process started up"
    end

    def stop
      logger.info "Process termination ..."
      self.running = false
    end

    def trap_signals
      signals.trap("TERM") { stop }
      signals.trap("INT")  { stop }
      signals.trap("HUP")  { logger.debug 'SIGHUP received' }
      if block_given?
        begin
          yield
        ensure
          release_signals
        end
      end
      nil
    end

    def release_signals
      signals.release("TERM")
      signals.release("INT")
      signals.release("HUP")
      nil
    end

    def pid
      ::Process.pid
    end

    def app_name
      $0
    end

    def app_name=(name)
      $0 = name
    end

    def change_app_name(name)
      name = "Protein: #{name}"
      if block_given?
        begin
          self.app_name, previous = name, self.app_name
          yield
        ensure
          self.app_name = previous
        end
      else
        self.app_name = name
      end
    end

    def can_fork?
      !!self.can_fork
    end

    def fork?
      !!@is_fork
    end

    def fork
      before_spawn
      if fork = kernel_fork
        # in parent
        after_spawn(fork)
        fork
      else
        # in child
        after_fork if can_fork?
        if block_given?
          yield
        else
          nil
        end
      end
    end

    def reconnect
      logger.reopen
      redis.reconnect
    end

    def exit
      begin
        callbacks.fire(:before_exit)
      rescue => e
        logger.error(e)
      end
      begin
        logger.debug "Exit"
        logger.close
      rescue
      end
    ensure
      ::Process.exit!(true)
    end

    def tools
      @tools ||= Protein::ProcessTools.new.tap do |tools|
        tools.logger       = self.logger
        tools.term_timeout = config.process_term_timeout 
        tools.kill_timeout = config.process_kill_timeout 
      end
    end

    def signals
      @signals ||= Protein::ProcessSignals.new.tap do |signals|
        signals.logger = self.logger
      end
    end

    # def daemonize
    #   if ::Process.respond_to?(:daemon)
    #     ::Process.daemon
    #   else
    #     null_io
    #     reset_dir
    #   end
    # end

    protected

    # Platform dependent
    def kernel_fork
      return unless can_fork?

      if Kernel.respond_to?(:fork)
        Kernel.fork
      else
        self.can_fork = false
        nil
      end
    end

    def enable_gc_optimizations
      if GC.respond_to?(:copy_on_write_friendly=)
        GC.copy_on_write_friendly = true
      end
    end

    def mark_as_fork
      @is_fork = true
    end

    def null_io
      null_file = "/dev/null"
      [STDIN, STDOUT, STDERR].each do |io|
        io.reopen(null_file) rescue nil
      end
    end

    def reset_dir
      ::Dir.chdir("/")
    end

    def after_fork
      mark_as_fork
      null_io
      reset_dir
      reconnect
      startup
      callbacks.fire(:after_fork)
      logger.debug "Succesfully forked"
    end

    def before_spawn
      logger.debug "Creating fork ..."
      logger.flush
    end

    def after_spawn(fork)
      logger.info "Process with pid #{fork} has been created"
      srand # Split rand streams between spawning and forked process
      ::Process.detach(fork)
    end
  end

  class ProcessSignals
    attr_writer :logger

    def logger
      @logger ||= Logger.new(STDOUT)
    end

    def trap(signal, &block)
      logger.debug("Trap signal #{signal}")
      handlers[signal].push(::Signal.trap(signal, &block))
      nil
    end

    def release(signal)
      logger.debug("Restore previous signal handler for #{signal}")
      handler = handlers[signal].pop
      if handler.present?
        ::Signal.trap(signal, handler)
      else
        ::Signal.trap(signal, "DEFAULT")
      end
      nil
    end

    def handlers
      @handlers ||= Hash.new do |hash, name| 
        hash[name] = []
      end
    end
  end

  class ProcessTools
    require 'timeout'
    attr_writer :logger
    attr_writer :term_timeout
    attr_writer :kill_timeout
    
    def exists?(pid)
      pid = pid.to_i
      (pid > 0) && (::Process.kill(0, pid) > 0) rescue false
    end

    def stop(pid)
      return false unless l_exists?(pid)
      term(pid)
    end

    def stop!(pid)
      return false unless l_exists?(pid)
      term(pid) or kill(pid)
    end

    def stop_all(pids)
      return false if pids.blank?

      threaded_map(pids){ |pid| stop(pid) }.all?
    end

    def stop_all!(pids)
      return false if pids.blank?

      threaded_map(pids){ |pid| stop!(pid) }.all?
    end

    def threaded_map(values)
      threads = values.map do |value|
        Thread.new { yield(value) }
      end

      threads.map {|t| t.value}
    end

    def term(pid)
      send_stop_signal('TERM', pid, term_timeout)
    end

    def kill(pid)
      send_stop_signal('KILL', pid, kill_timeout)
    end

    def logger
      @logger ||= Logger.new(STDOUT)
    end

    def term_timeout
      @term_timeout ||= 10.seconds
    end

    def kill_timeout
      @kill_timeout ||= 20.seconds
    end

    protected

    def send_signal(signal, pid, seconds = 10.seconds)
      return false unless l_exists?(pid)

      handle_errors do
        timeout(seconds) do
          logger.info "Send #{signal} signal to process with pid #{pid}..."
          ::Process.kill(signal, pid)
          yield if block_given?
          return true
        end
      end

      false
    end

    def send_stop_signal(signal, pid, seconds = 10.seconds)
      result = send_signal(signal, pid, seconds) do
        sleep(1) while exists?(pid)
      end

      logger.info "Process with pid #{pid} successfully stopped" if result
      logger.info "Unable to stop process with pid #{pid}"   unless result
      result
    end

    def l_exists?(pid)
      unless exists?(pid)
        logger.info("Process with pid #{pid} doesn't exists")
        return false
      end
      true
    end

    def handle_errors
      yield if block_given?
    rescue => e
      logger.error(e)
      nil
    end

    def timeout(seconds)
      Timeout::timeout(seconds) do
        yield if block_given?
      end
    rescue Timeout::Error
      nil
    end
  end

end