resque/resque

View on GitHub
lib/resque.rb

Summary

Maintainability
B
4 hrs
Test Coverage
require 'mono_logger'
require 'redis/namespace'
require 'forwardable'

require 'resque/version'

require 'resque/errors'

require 'resque/failure'
require 'resque/failure/base'

require 'resque/helpers'
require 'resque/stat'
require 'resque/logging'
require 'resque/log_formatters/quiet_formatter'
require 'resque/log_formatters/verbose_formatter'
require 'resque/log_formatters/very_verbose_formatter'
require 'resque/job'
require 'resque/worker'
require 'resque/plugin'
require 'resque/data_store'
require 'resque/thread_signal'

require 'resque/vendor/utf8_util'

module Resque
  include Helpers
  extend self

  # Given a Ruby object, returns a string suitable for storage in a
  # queue.
  def encode(object)
    if MultiJson.respond_to?(:dump) && MultiJson.respond_to?(:load)
      MultiJson.dump object
    else
      MultiJson.encode object
    end
  end

  # Given a string, returns a Ruby object.
  def decode(object)
    return unless object

    begin
      if MultiJson.respond_to?(:dump) && MultiJson.respond_to?(:load)
        MultiJson.load object
      else
        MultiJson.decode object
      end
    rescue ::MultiJson::DecodeError => e
      raise Helpers::DecodeException, e.message, e.backtrace
    end
  end

  # Given a word with dashes, returns a camel cased version of it.
  #
  # classify('job-name') # => 'JobName'
  def classify(dashed_word)
    dashed_word.split('-').map(&:capitalize).join
  end

  # Tries to find a constant with the name specified in the argument string:
  #
  # constantize("Module") # => Module
  # constantize("Test::Unit") # => Test::Unit
  #
  # The name is assumed to be the one of a top-level constant, no matter
  # whether it starts with "::" or not. No lexical context is taken into
  # account:
  #
  # C = 'outside'
  # module M
  #   C = 'inside'
  #   C # => 'inside'
  #   constantize("C") # => 'outside', same as ::C
  # end
  #
  # NameError is raised when the constant is unknown.
  def constantize(camel_cased_word)
    camel_cased_word = camel_cased_word.to_s

    if camel_cased_word.include?('-')
      camel_cased_word = classify(camel_cased_word)
    end

    names = camel_cased_word.split('::')
    names.shift if names.empty? || names.first.empty?

    constant = Object
    names.each do |name|
      args = Module.method(:const_get).arity != 1 ? [false] : []

      if constant.const_defined?(name, *args)
        constant = constant.const_get(name)
      else
        constant = constant.const_missing(name)
      end
    end
    constant
  end

  extend ::Forwardable

  # Accepts:
  #   1. A 'hostname:port' String
  #   2. A 'hostname:port:db' String (to select the Redis db)
  #   3. A 'hostname:port/namespace' String (to set the Redis namespace)
  #   4. A Redis URL String 'redis://host:port'
  #   5. An instance of `Redis`, `Redis::Client`, `Redis::DistRedis`,
  #      or `Redis::Namespace`.
  #   6. An Hash of a redis connection {:host => 'localhost', :port => 6379, :db => 0}
  def redis=(server)
    case server
    when String
      if server =~ /rediss?\:\/\//
        redis = Redis.new(:url => server, :thread_safe => true)
      else
        server, namespace = server.split('/', 2)
        host, port, db = server.split(':')
        redis = Redis.new(:host => host, :port => port,
          :thread_safe => true, :db => db)
      end
      namespace ||= :resque

      @data_store = Resque::DataStore.new(Redis::Namespace.new(namespace, :redis => redis))
    when Redis::Namespace
      @data_store = Resque::DataStore.new(server)
    when Resque::DataStore
      @data_store = server
    when Hash
      @data_store = Resque::DataStore.new(Redis::Namespace.new(:resque, :redis => Redis.new(server)))
    else
      @data_store = Resque::DataStore.new(Redis::Namespace.new(:resque, :redis => server))
    end
  end

  # Returns the current Redis connection. If none has been created, will
  # create a new one.
  def redis
    return @data_store if @data_store
    self.redis = Redis.respond_to?(:connect) ? Redis.connect : "localhost:6379"
    self.redis
  end
  alias :data_store :redis

  def redis_id
    data_store.identifier
  end

  # Set the data store for the processed and failed statistics.
  #
  # By default it uses the same as `Resque.redis`, but different stores can be used.
  #
  # A custom store needs to obey the following API to work correctly
  #
  # class NullDataStore
  #   # Returns the current value for the given stat.
  #   def stat(stat)
  #   end
  #
  #   # Increments the stat by the given value.
  #   def increment_stat(stat, by)
  #   end
  #
  #   # Decrements the stat by the given value.
  #   def decrement_stat(stat, by)
  #   end
  #
  #   # Clear the values for the given stat.
  #   def clear_stat(stat)
  #   end
  # end
  def stat_data_store=(stat_data_store)
    Resque::Stat.data_store = stat_data_store
  end

  # Returns the data store for the statistics module.
  def stat_data_store
    Resque::Stat.data_store
  end

  # Set or retrieve the current logger object
  attr_accessor :logger

  DEFAULT_HEARTBEAT_INTERVAL = 60
  DEFAULT_PRUNE_INTERVAL = DEFAULT_HEARTBEAT_INTERVAL * 5

  # Defines how often a Resque worker updates the heartbeat key. Must be less
  # than the prune interval.
  attr_writer :heartbeat_interval
  def heartbeat_interval
    if defined? @heartbeat_interval
      @heartbeat_interval
    else
      DEFAULT_HEARTBEAT_INTERVAL
    end
  end

  # Defines how often Resque checks for dead workers.
  attr_writer :prune_interval
  def prune_interval
    if defined? @prune_interval
      @prune_interval
    else
      DEFAULT_PRUNE_INTERVAL
    end
  end

  # By default, jobs are pushed to the back of the queue and popped from
  # the front, resulting in "first in, first out" (FIFO) execution order.
  # Set to true to push jobs to the front of the queue instead, resulting
  # in "last in, first out" (LIFO) execution order.
  attr_writer :enqueue_front
  def enqueue_front
    if defined? @enqueue_front
      @enqueue_front
    else
      @enqueue_front = false
    end
  end

  # The `before_first_fork` hook will be run in the **parent** process
  # only once, before forking to run the first job. Be careful- any
  # changes you make will be permanent for the lifespan of the
  # worker.
  #
  # Call with a block to register a hook.
  # Call with no arguments to return all registered hooks.
  def before_first_fork(&block)
    block ? register_hook(:before_first_fork, block) : hooks(:before_first_fork)
  end

  # Register a before_first_fork proc.
  def before_first_fork=(block)
    register_hook(:before_first_fork, block)
  end

  # The `before_fork` hook will be run in the **parent** process
  # before every job, so be careful- any changes you make will be
  # permanent for the lifespan of the worker.
  #
  # Call with a block to register a hook.
  # Call with no arguments to return all registered hooks.
  def before_fork(&block)
    block ? register_hook(:before_fork, block) : hooks(:before_fork)
  end

  # Register a before_fork proc.
  def before_fork=(block)
    register_hook(:before_fork, block)
  end

  # The `after_fork` hook will be run in the child process and is passed
  # the current job. Any changes you make, therefore, will only live as
  # long as the job currently being processed.
  #
  # Call with a block to register a hook.
  # Call with no arguments to return all registered hooks.
  def after_fork(&block)
    block ? register_hook(:after_fork, block) : hooks(:after_fork)
  end

  # Register an after_fork proc.
  def after_fork=(block)
    register_hook(:after_fork, block)
  end

  # The `before_pause` hook will be run in the parent process before the
  # worker has paused processing (via #pause_processing or SIGUSR2).
  def before_pause(&block)
    block ? register_hook(:before_pause, block) : hooks(:before_pause)
  end

  # Register a before_pause proc.
  def before_pause=(block)
    register_hook(:before_pause, block)
  end

  # The `after_pause` hook will be run in the parent process after the
  # worker has paused (via SIGCONT).
  def after_pause(&block)
    block ? register_hook(:after_pause, block) : hooks(:after_pause)
  end

  # Register an after_pause proc.
  def after_pause=(block)
    register_hook(:after_pause, block)
  end

  def to_s
    "Resque Client connected to #{redis_id}"
  end

  attr_accessor :inline

  # If 'inline' is true Resque will call #perform method inline
  # without queuing it into Redis and without any Resque callbacks.
  # The 'inline' is false Resque jobs will be put in queue regularly.
  alias :inline? :inline

  #
  # queue manipulation
  #

  # Pushes a job onto a queue. Queue name should be a string and the
  # item should be any JSON-able Ruby object.
  #
  # Resque works generally expect the `item` to be a hash with the following
  # keys:
  #
  #   class - The String name of the job to run.
  #    args - An Array of arguments to pass the job. Usually passed
  #           via `class.to_class.perform(*args)`.
  #
  # Example
  #
  #   Resque.push('archive', :class => 'Archive', :args => [ 35, 'tar' ])
  #
  # Returns nothing
  def push(queue, item)
    data_store.push_to_queue(queue,encode(item))
  end

  # Pops a job off a queue. Queue name should be a string.
  #
  # Returns a Ruby object.
  def pop(queue)
    decode(data_store.pop_from_queue(queue))
  end

  # Returns an integer representing the size of a queue.
  # Queue name should be a string.
  def size(queue)
    data_store.queue_size(queue)
  end

  # Returns an array of items currently queued. Queue name should be
  # a string.
  #
  # start and count should be integer and can be used for pagination.
  # start is the item to begin, count is how many items to return.
  #
  # To get the 3rd page of a 30 item, paginatied list one would use:
  #   Resque.peek('my_list', 59, 30)
  def peek(queue, start = 0, count = 1)
    results = data_store.peek_in_queue(queue,start,count)
    if count == 1
      decode(results)
    else
      results.map { |result| decode(result) }
    end
  end

  # Does the dirty work of fetching a range of items from a Redis list
  # and converting them into Ruby objects.
  def list_range(key, start = 0, count = 1)
    results = data_store.list_range(key, start, count)
    if count == 1
      decode(results)
    else
      results.map { |result| decode(result) }
    end
  end

  # Returns an array of all known Resque queues as strings.
  def queues
    data_store.queue_names
  end

  # Given a queue name, completely deletes the queue.
  def remove_queue(queue)
    data_store.remove_queue(queue)
  end

  # Used internally to keep track of which queues we've created.
  # Don't call this directly.
  def watch_queue(queue)
    data_store.watch_queue(queue)
  end


  #
  # job shortcuts
  #

  # This method can be used to conveniently add a job to a queue.
  # It assumes the class you're passing it is a real Ruby class (not
  # a string or reference) which either:
  #
  #   a) has a @queue ivar set
  #   b) responds to `queue`
  #
  # If either of those conditions are met, it will use the value obtained
  # from performing one of the above operations to determine the queue.
  #
  # If no queue can be inferred this method will raise a `Resque::NoQueueError`
  #
  # Returns true if the job was queued, nil if the job was rejected by a
  # before_enqueue hook.
  #
  # This method is considered part of the `stable` API.
  def enqueue(klass, *args)
    enqueue_to(queue_from_class(klass), klass, *args)
  end

  # Just like `enqueue` but allows you to specify the queue you want to
  # use. Runs hooks.
  #
  # `queue` should be the String name of the queue you're targeting.
  #
  # Returns true if the job was queued, nil if the job was rejected by a
  # before_enqueue hook.
  #
  # This method is considered part of the `stable` API.
  def enqueue_to(queue, klass, *args)
    # Perform before_enqueue hooks. Don't perform enqueue if any hook returns false
    before_hooks = Plugin.before_enqueue_hooks(klass).collect do |hook|
      klass.send(hook, *args)
    end
    return nil if before_hooks.any? { |result| result == false }

    Job.create(queue, klass, *args)

    Plugin.after_enqueue_hooks(klass).each do |hook|
      klass.send(hook, *args)
    end

    return true
  end

  # This method can be used to conveniently remove a job from a queue.
  # It assumes the class you're passing it is a real Ruby class (not
  # a string or reference) which either:
  #
  #   a) has a @queue ivar set
  #   b) responds to `queue`
  #
  # If either of those conditions are met, it will use the value obtained
  # from performing one of the above operations to determine the queue.
  #
  # If no queue can be inferred this method will raise a `Resque::NoQueueError`
  #
  # If no args are given, this method will dequeue *all* jobs matching
  # the provided class. See `Resque::Job.destroy` for more
  # information.
  #
  # Returns the number of jobs destroyed.
  #
  # Example:
  #
  #   # Removes all jobs of class `UpdateNetworkGraph`
  #   Resque.dequeue(GitHub::Jobs::UpdateNetworkGraph)
  #
  #   # Removes all jobs of class `UpdateNetworkGraph` with matching args.
  #   Resque.dequeue(GitHub::Jobs::UpdateNetworkGraph, 'repo:135325')
  #
  # This method is considered part of the `stable` API.
  def dequeue(klass, *args)
    # Perform before_dequeue hooks. Don't perform dequeue if any hook returns false
    before_hooks = Plugin.before_dequeue_hooks(klass).collect do |hook|
      klass.send(hook, *args)
    end
    return if before_hooks.any? { |result| result == false }

    destroyed = Job.destroy(queue_from_class(klass), klass, *args)

    Plugin.after_dequeue_hooks(klass).each do |hook|
      klass.send(hook, *args)
    end

    destroyed
  end

  # Given a class, try to extrapolate an appropriate queue based on a
  # class instance variable or `queue` method.
  def queue_from_class(klass)
    (klass.instance_variable_defined?(:@queue) && klass.instance_variable_get(:@queue)) ||
      (klass.respond_to?(:queue) and klass.queue)
  end

  # This method will return a `Resque::Job` object or a non-true value
  # depending on whether a job can be obtained. You should pass it the
  # precise name of a queue: case matters.
  #
  # This method is considered part of the `stable` API.
  def reserve(queue)
    Job.reserve(queue)
  end

  # Validates if the given klass could be a valid Resque job
  #
  # If no queue can be inferred this method will raise a `Resque::NoQueueError`
  #
  # If given klass is nil this method will raise a `Resque::NoClassError`
  def validate(klass, queue = nil)
    queue ||= queue_from_class(klass)

    if !queue
      raise NoQueueError.new("Jobs must be placed onto a queue. No queue could be inferred for class #{klass}")
    end

    if klass.to_s.empty?
      raise NoClassError.new("Jobs must be given a class.")
    end
  end


  #
  # worker shortcuts
  #

  # A shortcut to Worker.all
  def workers
    Worker.all
  end

  # A shortcut to Worker.working
  def working
    Worker.working
  end

  # A shortcut to unregister_worker
  # useful for command line tool
  def remove_worker(worker_id)
    worker = Resque::Worker.find(worker_id)
    worker.unregister_worker
  end

  #
  # stats
  #

  # Returns a hash, similar to redis-rb's #info, of interesting stats.
  def info
    return {
      :pending   => queue_sizes.inject(0) { |sum, (_queue_name, queue_size)| sum + queue_size },
      :processed => Stat[:processed],
      :queues    => queues.size,
      :workers   => workers.size.to_i,
      :working   => working.size,
      :failed    => data_store.num_failed,
      :servers   => [redis_id],
      :environment  => ENV['RAILS_ENV'] || ENV['RACK_ENV'] || 'development'
    }
  end

  # Returns an array of all known Resque keys in Redis. Redis' KEYS operation
  # is O(N) for the keyspace, so be careful - this can be slow for big databases.
  def keys
    data_store.all_resque_keys
  end

  # Returns a hash, mapping queue names to queue sizes
  def queue_sizes
    queue_names = queues

    sizes = redis.pipelined do
      queue_names.each do |name|
        redis.llen("queue:#{name}")
      end
    end

    Hash[queue_names.zip(sizes)]
  end

  # Returns a hash, mapping queue names to (up to `sample_size`) samples of jobs in that queue
  def sample_queues(sample_size = 1000)
    queue_names = queues

    samples = redis.pipelined do
      queue_names.each do |name|
        key = "queue:#{name}"
        redis.llen(key)
        redis.lrange(key, 0, sample_size - 1)
      end
    end

    hash = {}

    queue_names.zip(samples.each_slice(2).to_a) do |queue_name, (queue_size, serialized_samples)|
      samples = serialized_samples.map do |serialized_sample|
        Job.decode(serialized_sample)
      end

      hash[queue_name] = {
        :size => queue_size,
        :samples => samples
      }
    end

    hash
  end

  private

  @hooks = Hash.new { |h, k| h[k] = [] }

  # Register a new proc as a hook. If the block is nil this is the
  # equivalent of removing all hooks of the given name.
  #
  # `name` is the hook that the block should be registered with.
  def register_hook(name, block)
    return clear_hooks(name) if block.nil?

    block = Array(block)
    @hooks[name].concat(block)
  end

  # Clear all hooks given a hook name.
  def clear_hooks(name)
    @hooks[name] = []
  end

  # Retrieve all hooks of a given name.
  def hooks(name)
    @hooks[name]
  end
end

# Log to STDOUT by default
Resque.logger           = MonoLogger.new(STDOUT)
Resque.logger.formatter = Resque::QuietFormatter.new