dcadenas/preforker

View on GitHub
lib/preforker.rb

Summary

Maintainability
A
1 hr
Test Coverage
require 'socket'
require 'fcntl'
require 'preforker/pid_manager'
require 'preforker/worker'
require 'preforker/signal_processor'
require 'logger'

class Preforker
  attr_reader :timeout, :app_name, :logger
  attr_accessor :number_of_workers

  def initialize(options = {}, &worker_block)
    @app_name = options[:app_name] || "Preforker"
    default_log_file = "#{@app_name.downcase}.log"
    @options = {
      :timeout => 5,
      :workers => 10,
      :app_name => "Preforker",
      :stdout_path => default_log_file,
      :stderr_path => default_log_file
    }.merge(options)

    @logger = @options[:logger] || Logger.new(default_log_file)

    @timeout = @options[:timeout]
    @number_of_workers = @options[:workers]
    @worker_block = worker_block || lambda {}

    @workers = {}
    $0 = "#@app_name Master"
  end

  def run(ready_write = nil)
    $stdin.reopen("/dev/null")
    set_stdout_path(@options[:stdout_path])
    set_stderr_path(@options[:stderr_path])

    logger.info "Master started"

    pid_path = @options[:pid_path] || "./#{@app_name.downcase}.pid"
    @pid_manager = PidManager.new(pid_path)
    @signal_processor = SignalProcessor.new(self)

    spawn_missing_workers do
      ready_write.close if ready_write
    end

    #tell parent we are ready
    if ready_write
      ready_write.syswrite($$.to_s)
      ready_write.close rescue nil
    end

    @signal_processor.start_signal_loop
  end

  def start
    launch do |ready_write|
      run(ready_write)
    end
  end

  def launch(&block)
     puts "Starting server"

     ready_read, ready_write = IO.pipe
     [ready_read, ready_write].each { |io| io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) }

     fork do
       ready_read.close

       Process.setsid
       fork do
         block.call(ready_write)
       end
     end

     ready_write.close
     master_pid = (ready_read.readpartial(16) rescue nil).to_i
     ready_read.close
     if master_pid <= 1
       warn "Master failed to start, check stderr log for details"
       exit!(1)
     else
       puts "Server started successfuly"
     end
  end

  def close_resources_worker_wont_use
    @signal_processor.reset
    @workers.values.each { |other| other.tmp.close rescue nil }
    @workers.clear
  end

  # Terminates all workers, but does not exit master process
  def stop(graceful = true)
    limit = Time.now + @timeout
    until @workers.empty? || Time.now > limit
      signal_each_worker(graceful ? :QUIT : :TERM)
      sleep(0.1)
      reap_all_workers
    end
    signal_each_worker(:KILL)
  end

  def quit(graceful = true)
    stop(graceful)
    @pid_manager.unlink
  end

  def reap_all_workers
    begin
      loop do
        worker_pid, status = Process.waitpid2(-1, Process::WNOHANG)
        break unless worker_pid
        worker = @workers.delete(worker_pid) and worker.tmp.close rescue nil
        logger.info "reaped #{status.inspect}"
      end
    rescue Errno::ECHILD
    end
  end

  def spawn_missing_workers(new_workers_count = @number_of_workers, &init_block)
    new_workers_count.times do
      worker = Worker.new(@worker_block, self)
      worker_pid = fork do
        close_resources_worker_wont_use
        init_block.call if init_block
        worker.work
      end

      worker.pid = worker_pid
      @workers[worker_pid] = worker
    end
  end

  def maintain_worker_count
    number_of_missing_workers = @number_of_workers - @workers.size
    return if number_of_missing_workers == 0
    return spawn_missing_workers(number_of_missing_workers) if number_of_missing_workers > 0
    @workers.values[0..(-number_of_missing_workers - 1)].each do |unneeded_worker|
      signal_worker(:QUIT, unneeded_worker.pid) rescue nil
    end
  end

  # forcibly terminate all workers that haven't checked in in timeout
  # seconds.  The timeout is implemented using an unlinked File
  # shared between the parent process and each worker.  The worker
  # runs File#chmod to modify the ctime of the File.  If the ctime
  # is stale for >timeout seconds, then we'll kill the corresponding
  # worker.
  def murder_lazy_workers
    @workers.dup.each_pair do |worker_pid, worker|
      stat = worker.tmp.stat
      # skip workers that disable fchmod or have never fchmod-ed
      next if stat.mode == 0100600
      next if (diff = (Time.now - stat.ctime)) <= @timeout
      logger.error "Worker=#{worker_pid} timeout (#{diff}s > #{@timeout}s), killing"
      signal_worker(:KILL, worker_pid) # take no prisoners for timeout violations
    end
  end

  # delivers a signal to a worker and fails gracefully if the worker
  # is no longer running.
  def signal_worker(signal, worker_pid)
    begin
      Process.kill(signal, worker_pid)
    rescue Errno::ESRCH
      worker = @workers.delete(worker_pid) and worker.tmp.close rescue nil
    end
  end

  # delivers a signal to each worker
  def signal_each_worker(signal)
    @workers.keys.each { |worker_pid| signal_worker(signal, worker_pid) }
  end

  def signal_quit
    signal_worker(:QUIT, @pid_manager.pid)
  end

  def set_stdout_path(path)
    redirect_io($stdout, path)
  end

  def set_stderr_path(path)
    redirect_io($stderr, path)
  end

  def redirect_io(io, path)
    File.open(path, 'ab') { |fp| io.reopen(fp) } if path
    io.sync = true
  end
end