salsify/delayed_job_worker_pool

View on GitHub
lib/delayed_job_worker_pool/worker_pool.rb

Summary

Maintainability
A
3 hrs
Test Coverage
# frozen_string_literal: true

require 'fcntl'
require 'socket'

module DelayedJobWorkerPool
  class WorkerPool

    SIGNALS = ['TERM', 'INT'].map(&:freeze).freeze
    DEFAULT_WORKER_COUNT = 1

    def initialize(options = {})
      @options = options
      @registry = Registry.new
      @pending_signals = []
      @pending_signal_read_pipe, @pending_signal_write_pipe = create_pipe(inheritable: false)
      @master_alive_read_pipe, @master_alive_write_pipe = create_pipe(inheritable: true)
      self.shutting_down = false
    end

    def run
      log("Starting master #{Process.pid}")

      install_signal_handlers

      if preload_app?
        load_app
        invoke_callback(:after_preload_app)
      end

      log_uninheritable_threads

      fork_workers

      monitor_workers

      exit
    ensure
      master_alive_write_pipe.close if master_alive_write_pipe
      master_alive_read_pipe.close if master_alive_read_pipe
    end

    private

    attr_reader :options, :registry, :master_alive_read_pipe, :master_alive_write_pipe,
                :pending_signals, :pending_signal_read_pipe, :pending_signal_write_pipe
    attr_accessor :shutting_down

    def install_signal_handlers
      SIGNALS.each do |signal|
        trap(signal) do
          pending_signals << signal
          pending_signal_write_pipe.write_nonblock('.')
        end
      end
    end

    def uninstall_signal_handlers
      SIGNALS.each do |signal|
        trap(signal, 'DEFAULT')
      end
    end

    def log_uninheritable_threads
      Thread.list.each do |t|
        next if t == Thread.current

        if t.respond_to?(:backtrace)
          log("WARNING: Thread will not be inherited by workers: #{t.inspect} - " \
              "#{t.backtrace ? t.backtrace.first : ''}")
        else
          log("WARNING: Thread will not be inherited by workers: #{t.inspect}")
        end
      end
    end

    def load_app
      DelayedJobWorkerPool::Application.load
    end

    def shutdown(signal)
      log("Shutting down master #{Process.pid} with signal #{signal}")
      self.shutting_down = true
      registry.worker_pids.each do |child_pid|
        group = registry.group(child_pid)
        log("Telling worker #{child_pid} from group #{group} to shutdown with signal #{signal}")
        Process.kill(signal, child_pid)
      end
    end

    def monitor_workers
      while workers?
        if pending_signal?
          shutdown(pending_signals.pop)
        elsif (wait_result = Process.wait2(-1, Process::WNOHANG))
          handle_dead_worker(wait_result.first, wait_result.last)
        else
          wait_for_signal(1)
        end
      end
    end

    def handle_dead_worker(worker_pid, status)
      return unless registry.include_worker?(worker_pid)

      log("Worker #{worker_pid} exited with status #{status.to_i}")

      group = registry.group(worker_pid)
      invoke_callback(:after_worker_shutdown, worker_info(worker_pid, group))

      registry.remove_worker(worker_pid)
      fork_worker(group) unless shutting_down
    end

    def workers?
      registry.workers?
    end

    def pending_signal?
      !pending_signals.empty?
    end

    def invoke_callback(callback_name, *args)
      options[callback_name].call(*args) if options[callback_name]
    end

    def fork_workers
      options.fetch(:worker_groups).each do |name, group|
        workers = group.workers || DEFAULT_WORKER_COUNT

        registry.add_group(name, group.dj_worker_options)

        workers.times { fork_worker(name) }
      end
    end

    def fork_worker(group)
      worker_pid = Kernel.fork { run_worker(group) }
      log("Started worker in group #{group}: #{worker_pid}")

      registry.add_worker(group, worker_pid)

      invoke_callback(:after_worker_boot, worker_info(worker_pid, group))
    end

    def run_worker(group)
      master_alive_write_pipe.close

      uninstall_signal_handlers

      Thread.new do
        IO.select([master_alive_read_pipe])
        log('Detected dead master. Shutting down worker.')
        exit(1)
      end

      load_app unless preload_app?

      invoke_callback(:on_worker_boot, worker_info(Process.pid, group))

      DelayedJobWorkerPool::Worker.run(worker_options(Process.pid, group))
    rescue StandardError => e
      log("Worker failed with error: #{e.message}\n#{e.backtrace.join("\n")}")
      exit(1)
    end

    def worker_info(worker_pid, group)
      DelayedJobWorkerPool::WorkerInfo.new(
        name: worker_name(worker_pid, group),
        process_id: worker_pid,
        worker_group: group
      )
    end

    def worker_name(worker_pid, group)
      "host:#{Socket.gethostname} pid:#{worker_pid} group:#{group}"
    end

    def preload_app?
      options.fetch(:preload_app, false)
    end

    def worker_options(worker_pid, group)
      registry.options(group).merge(name: worker_name(worker_pid, group))
    end

    def create_pipe(inheritable: true)
      read, write = IO.pipe
      unless inheritable
        make_file_descriptor_uninheritable(read)
        make_file_descriptor_uninheritable(write)
      end
      [read, write]
    end

    def make_file_descriptor_uninheritable(io)
      io.fcntl(Fcntl::F_SETFD)
    end

    def wait_for_signal(timeout)
      drain_pipe(pending_signal_read_pipe) if IO.select([pending_signal_read_pipe], [], [], timeout)
    end

    def drain_pipe(pipe)
      loop { pipe.read_nonblock(16) }
    rescue IO::WaitReadable
      # We've drained the pipe
    end

    def log(message)
      puts(message)
    end
  end
end