resque/resque

View on GitHub
lib/resque/worker.rb

Summary

Maintainability
F
4 days
Test Coverage
require 'time'
require 'set'
require 'redis/distributed'

module Resque
  # A Resque Worker processes jobs. On platforms that support fork(2),
  # the worker will fork off a child to process each job. This ensures
  # a clean slate when beginning the next job and cuts down on gradual
  # memory growth as well as low level failures.
  #
  # It also ensures workers are always listening to signals from you,
  # their master, and can react accordingly.
  class Worker
    include Resque::Helpers
    extend Resque::Helpers
    include Resque::Logging

    @@all_heartbeat_threads = []
    def self.kill_all_heartbeat_threads
      @@all_heartbeat_threads.each(&:kill).each(&:join)
      @@all_heartbeat_threads = []
    end

    def redis
      Resque.redis
    end
    alias :data_store :redis

    def self.redis
      Resque.redis
    end

    def self.data_store
      self.redis
    end

    # Given a Ruby object, returns a string suitable for storage in a
    # queue.
    def encode(object)
      Resque.encode(object)
    end

    # Given a string, returns a Ruby object.
    def decode(object)
      Resque.decode(object)
    end

    attr_accessor :term_timeout

    attr_accessor :pre_shutdown_timeout

    attr_accessor :term_child_signal

    # decide whether to use new_kill_child logic
    attr_accessor :term_child

    # should term kill workers gracefully (vs. immediately)
    # Makes SIGTERM work like SIGQUIT
    attr_accessor :graceful_term

    # When set to true, forked workers will exit with `exit`, calling any `at_exit` code handlers that have been
    # registered in the application. Otherwise, forked workers exit with `exit!`
    attr_accessor :run_at_exit_hooks

    attr_writer :fork_per_job
    attr_writer :hostname
    attr_writer :to_s
    attr_writer :pid

    # Returns an array of all worker objects.
    def self.all
      data_store.worker_ids.map { |id| find(id, :skip_exists => true) }.compact
    end

    # Returns an array of all worker objects currently processing
    # jobs.
    def self.working
      names = all
      return [] unless names.any?

      reportedly_working = {}

      begin
        reportedly_working = data_store.workers_map(names).reject do |key, value|
          value.nil? || value.empty?
        end
      rescue Redis::Distributed::CannotDistribute
        names.each do |name|
          value = data_store.get_worker_payload(name)
          reportedly_working[name] = value unless value.nil? || value.empty?
        end
      end

      reportedly_working.keys.map do |key|
        worker = find(key.sub("worker:", ''), :skip_exists => true)
        worker.job = worker.decode(reportedly_working[key])
        worker
      end.compact
    end

    # Returns a single worker object. Accepts a string id.
    def self.find(worker_id, options = {})
      skip_exists = options[:skip_exists]

      if skip_exists || exists?(worker_id)
        host, pid, queues_raw = worker_id.split(':', 3)
        queues = queues_raw.split(',')
        worker = new(*queues)
        worker.hostname = host
        worker.to_s = worker_id
        worker.pid = pid.to_i
        worker
      else
        nil
      end
    end

    # Alias of `find`
    def self.attach(worker_id)
      find(worker_id)
    end

    # Given a string worker id, return a boolean indicating whether the
    # worker exists
    def self.exists?(worker_id)
      data_store.worker_exists?(worker_id)
    end

    # Workers should be initialized with an array of string queue
    # names. The order is important: a Worker will check the first
    # queue given for a job. If none is found, it will check the
    # second queue name given. If a job is found, it will be
    # processed. Upon completion, the Worker will again check the
    # first queue given, and so forth. In this way the queue list
    # passed to a Worker on startup defines the priorities of queues.
    #
    # If passed a single "*", this Worker will operate on all queues
    # in alphabetical order. Queues can be dynamically added or
    # removed without needing to restart workers using this method.
    #
    # Workers should have `#prepare` called after they are initialized
    # if you are running work on the worker.
    def initialize(*queues)
      @shutdown = nil
      @paused = nil
      @before_first_fork_hook_ran = false

      @heartbeat_thread = nil
      @heartbeat_thread_signal = nil

      @last_state = :idle

      verbose_value = ENV['LOGGING'] || ENV['VERBOSE']
      self.verbose = verbose_value if verbose_value
      self.very_verbose = ENV['VVERBOSE'] if ENV['VVERBOSE']
      self.pre_shutdown_timeout = (ENV['RESQUE_PRE_SHUTDOWN_TIMEOUT'] || 0.0).to_f
      self.term_timeout = (ENV['RESQUE_TERM_TIMEOUT'] || 4.0).to_f
      self.term_child = ENV['TERM_CHILD']
      self.graceful_term = ENV['GRACEFUL_TERM']
      self.run_at_exit_hooks = ENV['RUN_AT_EXIT_HOOKS']

      self.queues = queues
    end

    # Daemonizes the worker if ENV['BACKGROUND'] is set and writes
    # the process id to ENV['PIDFILE'] if set. Should only be called
    # once per worker.
    def prepare
      if ENV['BACKGROUND']
        Process.daemon(true)
      end

      if ENV['PIDFILE']
        File.open(ENV['PIDFILE'], 'w') { |f| f << pid }
      end

      self.reconnect if ENV['BACKGROUND']
    end

    WILDCARDS = ['*', '?', '{', '}', '[', ']'].freeze

    def queues=(queues)
      queues = (ENV["QUEUES"] || ENV['QUEUE']).to_s.split(',') if queues.empty?
      queues = queues.map { |queue| queue.to_s.strip }

      @skip_queues, @queues = queues.partition { |queue| queue.start_with?('!') }
      @skip_queues.map! { |queue| queue[1..-1] }

      # The behavior of `queues` is dependent on the value of `@has_dynamic_queues: if it's true, the method returns the result of filtering @queues with `glob_match`
      # if it's false, the method returns @queues directly. Since `glob_match` will cause skipped queues to be filtered out, we want to make sure it's called if we have @skip_queues.any?
      @has_dynamic_queues =
        @skip_queues.any? || WILDCARDS.any? { |char| @queues.join.include?(char) }

      validate_queues
    end

    # A worker must be given a queue, otherwise it won't know what to
    # do with itself.
    #
    # You probably never need to call this.
    def validate_queues
      if @queues.nil? || @queues.empty?
        raise NoQueueError.new("Please give each worker at least one queue.")
      end
    end

    # Returns a list of queues to use when searching for a job.
    # A splat ("*") means you want every queue (in alpha order) - this
    # can be useful for dynamically adding new queues.
    def queues
      if @has_dynamic_queues
        current_queues = Resque.queues
        @queues.map { |queue| glob_match(current_queues, queue) }.flatten.uniq
      else
        @queues
      end
    end

    def glob_match(list, pattern)
      list.select do |queue|
        File.fnmatch?(pattern, queue) &&
          @skip_queues.none? { |skip_pattern| File.fnmatch?(skip_pattern, queue) }
      end.sort
    end

    # This is the main workhorse method. Called on a Worker instance,
    # it begins the worker life cycle.
    #
    # The following events occur during a worker's life cycle:
    #
    # 1. Startup:   Signals are registered, dead workers are pruned,
    #               and this worker is registered.
    # 2. Work loop: Jobs are pulled from a queue and processed.
    # 3. Teardown:  This worker is unregistered.
    #
    # Can be passed a float representing the polling frequency.
    # The default is 5 seconds, but for a semi-active site you may
    # want to use a smaller value.
    #
    # Also accepts a block which will be passed the job as soon as it
    # has completed processing. Useful for testing.
    def work(interval = 5.0, &block)
      interval = Float(interval)
      startup

      loop do
        break if shutdown?

        unless work_one_job(&block)
          state_change
          break if interval.zero?
          log_with_severity :debug, "Sleeping for #{interval} seconds"
          procline paused? ? "Paused" : "Waiting for #{queues.join(',')}"
          sleep interval
        end
      end

      unregister_worker
      run_hook :worker_exit
    rescue Exception => exception
      return if exception.class == SystemExit && !@child && run_at_exit_hooks
      log_with_severity :error, "Failed to start worker : #{exception.inspect}"
      unregister_worker(exception)
      run_hook :worker_exit
    end

    def work_one_job(job = nil, &block)
      return false if paused?
      return false unless job ||= reserve

      working_on job
      procline "Processing #{job.queue} since #{Time.now.to_i} [#{job.payload_class_name}]"

      log_with_severity :info, "got: #{job.inspect}"
      job.worker = self

      if fork_per_job?
        perform_with_fork(job, &block)
      else
        perform(job, &block)
      end

      done_working
      true
    end

    # DEPRECATED. Processes a single job. If none is given, it will
    # try to produce one. Usually run in the child.
    def process(job = nil, &block)
      return unless job ||= reserve

      job.worker = self
      working_on job
      perform(job, &block)
    ensure
      done_working
    end

    # Reports the exception and marks the job as failed
    def report_failed_job(job,exception)
      log_with_severity :error, "#{job.inspect} failed: #{exception.inspect}"
      begin
        job.fail(exception)
      rescue Object => exception
        log_with_severity :error, "Received exception when reporting failure: #{exception.inspect}"
      end
      begin
        failed!
      rescue Object => exception
        log_with_severity :error, "Received exception when increasing failed jobs counter (redis issue) : #{exception.inspect}"
      end
    end


    # Processes a given job in the child.
    def perform(job)
      begin
        if fork_per_job?
          reconnect
          run_hook :after_fork, job
        end
        job.perform
      rescue Object => e
        report_failed_job(job,e)
      else
        log_with_severity :info, "done: #{job.inspect}"
      ensure
        yield job if block_given?
      end
    end

    # Attempts to grab a job off one of the provided queues. Returns
    # nil if no job can be found.
    def reserve
      queues.each do |queue|
        log_with_severity :debug, "Checking #{queue}"
        if job = Resque.reserve(queue)
          log_with_severity :debug, "Found job on #{queue}"
          return job
        end
      end

      nil
    rescue Exception => e
      log_with_severity :error, "Error reserving job: #{e.inspect}"
      log_with_severity :error, e.backtrace.join("\n")
      raise e
    end

    # Reconnect to Redis to avoid sharing a connection with the parent,
    # retry up to 3 times with increasing delay before giving up.
    def reconnect
      tries = 0
      begin
        data_store.reconnect
      rescue Redis::BaseConnectionError
        if (tries += 1) <= 3
          log_with_severity :error, "Error reconnecting to Redis; retrying"
          sleep(tries)
          retry
        else
          log_with_severity :error, "Error reconnecting to Redis; quitting"
          raise
        end
      end
    end

    # Runs all the methods needed when a worker begins its lifecycle.
    def startup
      $0 = "resque: Starting"

      enable_gc_optimizations
      register_signal_handlers
      start_heartbeat
      prune_dead_workers
      run_hook :before_first_fork
      register_worker

      # Fix buffering so we can `rake resque:work > resque.log` and
      # get output from the child in there.
      $stdout.sync = true
    end

    # Enables GC Optimizations if you're running REE.
    # http://www.rubyenterpriseedition.com/faq.html#adapt_apps_for_cow
    def enable_gc_optimizations
      if GC.respond_to?(:copy_on_write_friendly=)
        GC.copy_on_write_friendly = true
      end
    end

    # Registers the various signal handlers a worker responds to.
    #
    # TERM: Shutdown immediately, stop processing jobs.
    #  INT: Shutdown immediately, stop processing jobs.
    # QUIT: Shutdown after the current job has finished processing.
    # USR1: Kill the forked child immediately, continue processing jobs.
    # USR2: Don't process any new jobs
    # CONT: Start processing jobs again after a USR2
    def register_signal_handlers
      trap('TERM') { graceful_term ? shutdown : shutdown!  }
      trap('INT')  { shutdown!  }

      begin
        trap('QUIT') { shutdown   }
        if term_child
          trap('USR1') { new_kill_child }
        else
          trap('USR1') { kill_child }
        end
        trap('USR2') { pause_processing }
        trap('CONT') { unpause_processing }
      rescue ArgumentError
        log_with_severity :warn, "Signals QUIT, USR1, USR2, and/or CONT not supported."
      end

      log_with_severity :debug, "Registered signals"
    end

    def unregister_signal_handlers
      trap('TERM') do
        trap('TERM') do
          # Ignore subsequent term signals
        end

        raise TermException.new("SIGTERM")
      end

      trap('INT', 'DEFAULT')

      begin
        trap('QUIT', 'DEFAULT')
        trap('USR1', 'DEFAULT')
        trap('USR2', 'DEFAULT')
      rescue ArgumentError
      end
    end

    # Schedule this worker for shutdown. Will finish processing the
    # current job.
    def shutdown
      log_with_severity :info, 'Exiting...'
      @shutdown = true
    end

    # Kill the child and shutdown immediately.
    # If not forking, abort this process.
    def shutdown!
      shutdown
      if term_child
        if fork_per_job?
          new_kill_child
        else
          # Raise TermException in the same process
          trap('TERM') do
            # ignore subsequent terms
          end
          raise TermException.new("SIGTERM")
        end
      else
        kill_child
      end
    end

    # Should this worker shutdown as soon as current job is finished?
    def shutdown?
      @shutdown
    end

    # Kills the forked child immediately, without remorse. The job it
    # is processing will not be completed.
    def kill_child
      if @child
        log_with_severity :debug, "Killing child at #{@child}"
        if `ps -o pid,state -p #{@child}`
          Process.kill("KILL", @child) rescue nil
        else
          log_with_severity :debug, "Child #{@child} not found, restarting."
          shutdown
        end
      end
    end

    def heartbeat
      data_store.heartbeat(self)
    end

    def remove_heartbeat
      data_store.remove_heartbeat(self)
    end

    def heartbeat!(time = data_store.server_time)
      data_store.heartbeat!(self, time)
    end

    def self.all_heartbeats
      data_store.all_heartbeats
    end

    # Returns a list of workers that have sent a heartbeat in the past, but which
    # already expired (does NOT include workers that have never sent a heartbeat at all).
    def self.all_workers_with_expired_heartbeats
      # Use `Worker.all_heartbeats` instead of `Worker.all`
      # to prune workers which haven't been registered but have set a heartbeat.
      # https://github.com/resque/resque/pull/1751
      heartbeats = Worker.all_heartbeats
      now = data_store.server_time

      heartbeats.select do |id, heartbeat|
        if heartbeat
          seconds_since_heartbeat = (now - Time.parse(heartbeat)).to_i
          seconds_since_heartbeat > Resque.prune_interval
        else
          false
        end
      end.each_key.map do |id|
        # skip_exists must be true to include not registered workers
        find(id, :skip_exists => true)
      end
    end

    def start_heartbeat
      remove_heartbeat

      @heartbeat_thread_signal = Resque::ThreadSignal.new

      @heartbeat_thread = Thread.new do
        loop do
          heartbeat!
          signaled = @heartbeat_thread_signal.wait_for_signal(Resque.heartbeat_interval)
          break if signaled
        end
      end

      @@all_heartbeat_threads << @heartbeat_thread
    end

    # Kills the forked child immediately with minimal remorse. The job it
    # is processing will not be completed. Send the child a TERM signal,
    # wait <term_timeout> seconds, and then a KILL signal if it has not quit
    # If pre_shutdown_timeout has been set to a positive number, it will allow
    # the child that many seconds before sending the aforementioned TERM and KILL.
    def new_kill_child
      if @child
        unless child_already_exited?
          if pre_shutdown_timeout && pre_shutdown_timeout > 0.0
            log_with_severity :debug, "Waiting #{pre_shutdown_timeout.to_f}s for child process to exit"
            return if wait_for_child_exit(pre_shutdown_timeout)
          end

          log_with_severity :debug, "Sending TERM signal to child #{@child}"
          Process.kill("TERM", @child)

          if wait_for_child_exit(term_timeout)
            return
          else
            log_with_severity :debug, "Sending KILL signal to child #{@child}"
            Process.kill("KILL", @child)
          end
        else
          log_with_severity :debug, "Child #{@child} already quit."
        end
      end
    rescue SystemCallError
      log_with_severity :error, "Child #{@child} already quit and reaped."
    end

    def child_already_exited?
      Process.waitpid(@child, Process::WNOHANG)
    end

    def wait_for_child_exit(timeout)
      (timeout * 10).round.times do |i|
        sleep(0.1)
        return true if child_already_exited?
      end
      false
    end

    # are we paused?
    def paused?
      @paused || redis.get('pause-all-workers').to_s.strip.downcase == 'true'
    end

    # Stop processing jobs after the current one has completed (if we're
    # currently running one).
    def pause_processing
      log_with_severity :info, "USR2 received; pausing job processing"
      run_hook :before_pause, self
      @paused = true
    end

    # Start processing jobs again after a pause
    def unpause_processing
      log_with_severity :info, "CONT received; resuming job processing"
      @paused = false
      run_hook :after_pause, self
    end

    # Looks for any workers which should be running on this server
    # and, if they're not, removes them from Redis.
    #
    # This is a form of garbage collection. If a server is killed by a
    # hard shutdown, power failure, or something else beyond our
    # control, the Resque workers will not die gracefully and therefore
    # will leave stale state information in Redis.
    #
    # By checking the current Redis state against the actual
    # environment, we can determine if Redis is old and clean it up a bit.
    def prune_dead_workers
      return unless data_store.acquire_pruning_dead_worker_lock(self, Resque.heartbeat_interval)

      all_workers = Worker.all

      known_workers = worker_pids
      all_workers_with_expired_heartbeats = Worker.all_workers_with_expired_heartbeats
      all_workers_with_expired_heartbeats.each do |worker|
        # If the worker hasn't sent a heartbeat, remove it from the registry.
        #
        # If the worker hasn't ever sent a heartbeat, we won't remove it since
        # the first heartbeat is sent before the worker is registred it means
        # that this is a worker that doesn't support heartbeats, e.g., another
        # client library or an older version of Resque. We won't touch these.
        log_with_severity :info, "Pruning dead worker: #{worker}"

        job_class = worker.job(false)['payload']['class'] rescue nil
        worker.unregister_worker(PruneDeadWorkerDirtyExit.new(worker.to_s, job_class))
      end

      all_workers.each do |worker|
        if all_workers_with_expired_heartbeats.include?(worker)
          next
        end

        host, pid, worker_queues_raw = worker.id.split(':')
        worker_queues = worker_queues_raw.split(",")
        unless @queues.include?("*") || (worker_queues.to_set == @queues.to_set)
          # If the worker we are trying to prune does not belong to the queues
          # we are listening to, we should not touch it.
          # Attempt to prune a worker from different queues may easily result in
          # an unknown class exception, since that worker could easily be even
          # written in different language.
          next
        end

        next unless host == hostname
        next if known_workers.include?(pid)

        log_with_severity :debug, "Pruning dead worker: #{worker}"
        worker.unregister_worker
      end
    end

    # Registers ourself as a worker. Useful when entering the worker
    # lifecycle on startup.
    def register_worker
      data_store.register_worker(self)
    end

    # Runs a named hook, passing along any arguments.
    def run_hook(name, *args)
      hooks = Resque.send(name)
      return if hooks.empty?
      return if name == :before_first_fork && @before_first_fork_hook_ran
      msg = "Running #{name} hooks"
      msg << " with #{args.inspect}" if args.any?
      log_with_severity :info, msg

      hooks.each do |hook|
        args.any? ? hook.call(*args) : hook.call
        @before_first_fork_hook_ran = true if name == :before_first_fork
      end
    end

    def kill_background_threads
      if @heartbeat_thread
        @heartbeat_thread_signal.signal
        @heartbeat_thread.join
      end
    end

    # Unregisters ourself as a worker. Useful when shutting down.
    def unregister_worker(exception = nil)
      # If we're still processing a job, make sure it gets logged as a
      # failure.
      if (hash = processing) && !hash.empty?
        job = Job.new(hash['queue'], hash['payload'])
        # Ensure the proper worker is attached to this job, even if
        # it's not the precise instance that died.
        job.worker = self
        begin
          job.fail(exception || DirtyExit.new("Job still being processed"))
        rescue RuntimeError => e
          log_with_severity :error, e.message
        end
      end

      kill_background_threads

      data_store.unregister_worker(self) do |**opts|
        Stat.clear("processed:#{self}", **opts)
        Stat.clear("failed:#{self}",    **opts)
      end
    rescue Exception => exception_while_unregistering
      message = exception_while_unregistering.message
      if exception
        message += "\nOriginal Exception (#{exception.class}): #{exception.message}"
        message += "\n  #{exception.backtrace.join("  \n")}" if exception.backtrace
      end
      fail(exception_while_unregistering.class,
           message,
           exception_while_unregistering.backtrace)
    end

    # Given a job, tells Redis we're working on it. Useful for seeing
    # what workers are doing and when.
    def working_on(job)
      data = encode \
        :queue   => job.queue,
        :run_at  => Time.now.utc.iso8601,
        :payload => job.payload
      data_store.set_worker_payload(self,data)
      state_change
    end

    # Called when we are done working - clears our `working_on` state
    # and tells Redis we processed a job.
    def done_working
      data_store.worker_done_working(self) do |**opts|
        processed!(**opts)
      end
    end

    def state_change
      current_state = state
      if current_state != @last_state
        run_hook :queue_empty if current_state == :idle
        @last_state = current_state
      end
    end

    # How many jobs has this worker processed? Returns an int.
    def processed
      Stat["processed:#{self}"]
    end

    # Tell Redis we've processed a job.
    def processed!(**opts)
      Stat.incr("processed",         1, **opts)
      Stat.incr("processed:#{self}", 1, **opts)
    end

    # How many failed jobs has this worker seen? Returns an int.
    def failed
      Stat["failed:#{self}"]
    end

    # Tells Redis we've failed a job.
    def failed!
      Stat << "failed"
      Stat << "failed:#{self}"
    end

    # What time did this worker start? Returns an instance of `Time`
    def started
      data_store.worker_start_time(self)
    end

    # Tell Redis we've started
    def started!
      data_store.worker_started(self)
    end

    # Returns a hash explaining the Job we're currently processing, if any.
    def job(reload = true)
      @job = nil if reload
      @job ||= decode(data_store.get_worker_payload(self)) || {}
    end
    attr_writer :job
    alias_method :processing, :job

    # Boolean - true if working, false if not
    def working?
      state == :working
    end

    # Boolean - true if idle, false if not
    def idle?
      state == :idle
    end

    def fork_per_job?
      return @fork_per_job if defined?(@fork_per_job)
      @fork_per_job = ENV["FORK_PER_JOB"] != 'false' && Kernel.respond_to?(:fork)
    end

    # Returns a symbol representing the current worker state,
    # which can be either :working or :idle
    def state
      data_store.get_worker_payload(self) ? :working : :idle
    end

    # Is this worker the same as another worker?
    def ==(other)
      to_s == other.to_s
    end

    def inspect
      "#<Worker #{to_s}>"
    end

    # The string representation is the same as the id for this worker
    # instance. Can be used with `Worker.find`.
    def to_s
      @to_s ||= "#{hostname}:#{pid}:#{@queues.join(',')}"
    end
    alias_method :id, :to_s

    # chomp'd hostname of this worker's machine
    def hostname
      @hostname ||= Socket.gethostname
    end

    # Returns Integer PID of running worker
    def pid
      @pid ||= Process.pid
    end

    # Returns an Array of string pids of all the other workers on this
    # machine. Useful when pruning dead workers on startup.
    def worker_pids
      if RUBY_PLATFORM =~ /solaris/
        solaris_worker_pids
      elsif RUBY_PLATFORM =~ /mingw32/
        windows_worker_pids
      else
        linux_worker_pids
      end
    end

    # Returns an Array of string pids of all the other workers on this
    # machine. Useful when pruning dead workers on startup.
    def windows_worker_pids
      tasklist_output = `tasklist /FI "IMAGENAME eq ruby.exe" /FO list`.encode("UTF-8", Encoding.locale_charmap)
      tasklist_output.split($/).select { |line| line =~ /^PID:/ }.collect { |line| line.gsub(/PID:\s+/, '') }
    end

    # Find Resque worker pids on Linux and OS X.
    #
    def linux_worker_pids
      `ps -A -o pid,command | grep -E "[r]esque:work|[r]esque:\sStarting|[r]esque-[0-9]" | grep -v "resque-web"`.split("\n").map do |line|
        line.split(' ')[0]
      end
    end

    # Find Resque worker pids on Solaris.
    #
    # Returns an Array of string pids of all the other workers on this
    # machine. Useful when pruning dead workers on startup.
    def solaris_worker_pids
      `ps -A -o pid,comm | grep "[r]uby" | grep -v "resque-web"`.split("\n").map do |line|
        real_pid = line.split(' ')[0]
        pargs_command = `pargs -a #{real_pid} 2>/dev/null | grep [r]esque | grep -v "resque-web"`
        if pargs_command.split(':')[1] == " resque-#{Resque::VERSION}"
          real_pid
        end
      end.compact
    end

    # Given a string, sets the procline ($0) and logs.
    # Procline is always in the format of:
    #   RESQUE_PROCLINE_PREFIXresque-VERSION: STRING
    def procline(string)
      $0 = "#{ENV['RESQUE_PROCLINE_PREFIX']}resque-#{Resque::VERSION}: #{string}"
      log_with_severity :debug, $0
    end

    def log(message)
      info(message)
    end

    def log!(message)
      debug(message)
    end


    attr_reader :verbose, :very_verbose

    def verbose=(value);
      if value && !very_verbose
        Resque.logger.formatter = VerboseFormatter.new
        Resque.logger.level = Logger::INFO
      elsif !value
        Resque.logger.formatter = QuietFormatter.new
      end

      @verbose = value
    end

    def very_verbose=(value)
      if value
        Resque.logger.formatter = VeryVerboseFormatter.new
        Resque.logger.level = Logger::DEBUG
      elsif !value && verbose
        Resque.logger.formatter = VerboseFormatter.new
        Resque.logger.level = Logger::INFO
      else
        Resque.logger.formatter = QuietFormatter.new
      end

      @very_verbose = value
    end

    private

    def perform_with_fork(job, &block)
      run_hook :before_fork, job

      begin
        @child = fork do
          unregister_signal_handlers if term_child
          perform(job, &block)
          exit! unless run_at_exit_hooks
        end
      rescue NotImplementedError
        @fork_per_job = false
        perform(job, &block)
        return
      end

      srand # Reseeding
      procline "Forked #{@child} at #{Time.now.to_i}"

      begin
        Process.waitpid(@child)
      rescue SystemCallError
        nil
      end

      job.fail(DirtyExit.new("Child process received unhandled signal #{$?}", $?)) if $?.signaled?
      @child = nil
    end

    def log_with_severity(severity, message)
      Logging.log(severity, message)
    end
  end
end