nathan-v/resque-state

View on GitHub
lib/resque/plugins/state.rb

Summary

Maintainability
B
5 hrs
Test Coverage
# Resque root module
module Resque
  # Resque::Plugins root module
  module Plugins
    # Resque::Plugins::State is a module your jobs will include.
    # It provides helper methods for updating the status/etc from within an
    # instance as well as class methods for creating and queuing the jobs.
    #
    # All you have to do to get this functionality is include
    # Resque::Plugins::State and then implement a <tt>perform<tt> method.
    #
    # For example
    #
    #       class ExampleJob
    #         include Resque::Plugins::State
    #
    #         def perform
    #           num = options['num']
    #           i = 0
    #           while i < num
    #             i += 1
    #             at(i, num)
    #           end
    #           completed("Finished!")
    #         end
    #
    #       end
    #
    # This job would iterate num times updating the status as it goes. At the
    # end we update the status telling anyone listening to this job that its
    # complete.
    module State
      VERSION = '1.1.1'.freeze

      STATUS_QUEUED = 'queued'.freeze
      STATUS_WORKING = 'working'.freeze
      STATUS_COMPLETED = 'completed'.freeze
      STATUS_FAILED = 'failed'.freeze
      STATUS_KILLED = 'killed'.freeze
      STATUS_PAUSED = 'paused'.freeze
      STATUS_WAITING = 'waiting'.freeze
      STATUS_REVERTING = 'reverting'.freeze
      STATUS_REVERTED = 'reverted'.freeze
      STATUSES = [
        STATUS_QUEUED,
        STATUS_WORKING,
        STATUS_COMPLETED,
        STATUS_FAILED,
        STATUS_KILLED,
        STATUS_PAUSED,
        STATUS_WAITING,
        STATUS_REVERTING,
        STATUS_REVERTED
      ].freeze

      autoload :Hash, 'resque/plugins/state/hash'

      # The error class raised when a job is killed
      class Killed < RuntimeError; end
      class Revert < RuntimeError; end
      class NotANumber < RuntimeError; end

      attr_reader :uuid, :options

      def self.included(base)
        base.extend(ClassMethods)
      end

      # Methods required for launching a state-ready job
      module ClassMethods
        # The default queue is :statused, this can be ovveridden in the specific
        # job class to put the jobs on a specific worker queue
        def queue
          :statused
        end

        # used when displaying the Job in the resque-web UI and identifiyng the
        # job type by status. By default this is the name of the job class, but
        # can be overidden in the specific job class to present a more user
        # friendly job name
        def name
          to_s
        end

        # Create is the primary method for adding jobs to the queue. This would
        # be called on the job class to create a job of that type. Any options
        # passed are passed to the Job instance as a hash of options. It returns
        # the UUID of the job.
        #
        # == Example:
        #
        #       class ExampleJob
        #         include Resque::Plugins::State
        #
        #         def perform
        #           job_status "Hey I'm a job num #{options['num']}"
        #         end
        #
        #       end
        #
        #       job_id = ExampleJob.create(:num => 100)
        #
        def create(options = {})
          enqueue(self, options)
        end

        # Adds a job of type <tt>klass<tt> to the queue with <tt>options<tt>.
        #
        # Returns the UUID of the job if the job was queued, or nil if the job
        # was rejected by a before_enqueue hook.
        def enqueue(klass, options = {})
          enqueue_to(Resque.queue_from_class(klass) || queue, klass, options)
        end

        # Adds a job of type <tt>klass<tt> to a specified queue with
        # <tt>options<tt>.
        #
        # Returns the UUID of the job if the job was queued, or nil if the job
        # was rejected by a before_enqueue hook.
        def enqueue_to(queue, klass, options = {})
          uuid = Resque::Plugins::State::Hash.generate_uuid
          Resque::Plugins::State::Hash.create uuid, options: options

          if Resque.enqueue_to(queue, klass, uuid, options)
            uuid
          else
            Resque::Plugins::State::Hash.remove(uuid)
            nil
          end
        end

        # Removes a job of type <tt>klass<tt> from the queue.
        #
        # The initially given options are retrieved from the status hash.
        # (Resque needs the options to find the correct queue entry)
        def dequeue(klass, uuid)
          status = Resque::Plugins::State::Hash.get(uuid)
          Resque.dequeue(klass, uuid, status.options)
        end

        # This is the method called by Resque::Worker when processing jobs. It
        # creates a new instance of the job class and populates it with the uuid
        # and options.
        #
        # You should not override this method, rahter the <tt>perform</tt>
        # instance method.
        def perform(uuid = nil, options = {})
          uuid ||= Resque::Plugins::State::Hash.generate_uuid
          instance = new(uuid, options)
          instance.safe_perform!
          instance
        end

        # Wrapper API to forward a Resque::Job creation API call into a
        # Resque::Plugins::State call.
        # This is needed to be used with resque scheduler
        # http://github.com/bvandenbos/resque-scheduler
        def scheduled(queue, _klass, *args)
          enqueue_to(queue, self, *args)
        end
      end

      # Create a new instance with <tt>uuid</tt> and <tt>options</tt>
      def initialize(uuid, options = {})
        @reverting = false
        @uuid      = uuid
        @options   = options
        @logger    = Resque.logger
      end

      # Run by the Resque::Worker when processing this job. It wraps the
      # <tt>perform</tt> method ensuring that the final status of the job is set
      # regardless of error. If an error occurs within the job's work, it will
      # set the status as failed and re-raise the error.
      def safe_perform!
        job_status('status' => STATUS_WORKING)
        messages = ['Job starting']
        @logger.info("#{@uuid}: #{messages.join(' ')}")
        perform
        if status && status.failed?
          on_failure(status.message) if respond_to?(:on_failure)
          return
        elsif status && !status.completed?
          completed
        end
        on_success if respond_to?(:on_success)
      rescue Killed
        Resque::Plugins::State::Hash.killed(uuid)
        on_killed if respond_to?(:on_killed)
      rescue Revert
        Resque::Plugins::State::Hash.revert(uuid)
        on_revert
        messages = ["Reverted at #{Time.now}"]
        job_status('status' => STATUS_REVERTED,
                   'message' => messages[0])
      rescue => e
        messages = ["Failed at #{Time.now}: #{e.message}"]
        @logger.error("Job #{@uuid}: #{messages.join(' ')}")
        failed("The task failed because of an error: #{e}")
        raise e unless respond_to?(:on_failure)
        on_failure(e)
      end

      # Set the jobs status. Can take an array of strings or hashes that are
      # merged (in order) into a final status hash.
      def status=(new_status)
        Resque::Plugins::State::Hash.set(uuid, *new_status)
      end

      # get the Resque::Plugins::State::Hash object for the current uuid
      def status
        Resque::Plugins::State::Hash.get(uuid)
      end

      def name
        "#{self.class.name}(#{options.inspect unless options.empty?})"
      end

      # Checks against the kill list if this specific job instance should be
      # killed on the next iteration
      def should_kill?
        Resque::Plugins::State::Hash.should_kill?(uuid)
      end

      # Checks against the pause list if this specific job instance should be
      # paused on the next iteration
      def should_pause?
        Resque::Plugins::State::Hash.should_pause?(uuid)
      end

      # Checks against the revert list if this specific job instance should be
      # paused on the next iteration
      def should_revert?
        Resque::Plugins::State::Hash.should_revert?(uuid)
      end

      # Checks against the lock list if this specific job instance should wait
      # before starting
      def locked?(key)
        Resque::Plugins::State::Hash.locked?(key)
      end

      # set the status of the job for the current itteration. <tt>num</tt> and
      # <tt>total</tt> are passed to the status as well as any messages.
      # This will kill the job if it has been added to the kill list with
      # <tt>Resque::Plugins::State::Hash.kill()</tt>
      def at(num, total, *messages)
        if total.to_f <= 0.0
          raise(NotANumber,
                "Called at() with total=#{total} which is not a number")
        end
        tick({
               'num' => num,
               'total' => total
             }, *messages)
      end

      # sets the status of the job for the current itteration. You should use
      # the <tt>at</tt> method if you have actual numbers to track the iteration
      # count. This will kill or pause the job if it has been added to either
      # list with <tt>Resque::Plugins::State::Hash.pause()</tt> or
      # <tt>Resque::Plugins::State::Hash.kill()</tt> respectively
      def tick(*messages)
        kill! if should_kill?
        if should_pause?
          pause!
        elsif should_revert?
          return revert! unless @reverting
          job_status({ 'status' => STATUS_REVERTING }, *messages)
          @logger.info("Job #{@uuid}: #{messages.join(' ')}")
        else
          job_status({ 'status' => STATUS_WORKING }, *messages)
          @logger.info("Job #{@uuid}: #{messages.join(' ')}")
        end
      end

      # set the status to 'failed' passing along any additional messages
      def failed(*messages)
        job_status({ 'status' => STATUS_FAILED }, *messages)
        @logger.error("Job #{@uuid}: #{messages.join(' ')}")
      end

      # set the status to 'reverted' passing along any additional messages
      def reverted(*messages)
        job_status({ 'status' => STATUS_REVERTED }, *messages)
        @logger.error("Job #{@uuid}: #{messages.join(' ')}")
      end

      # set the status to 'completed' passing along any addional messages
      def completed(*messages)
        job_status({
                     'status' => STATUS_COMPLETED,
                     'message' => "Completed at #{Time.now}"
                   }, *messages)
        @logger.info("Job #{@uuid}: #{messages.join(' ')}")
      end

      # kill the current job, setting the status to 'killed' and raising
      # <tt>Killed</tt>
      def kill!
        messages = ["Killed at #{Time.now}"]
        job_status('status' => STATUS_KILLED,
                   'message' => messages[0])
        @logger.error("Job #{@uuid}: #{messages.join(' ')}")
        raise Killed
      end

      # revert the current job, setting the status to 'reverting' and raising
      # <tt>Revert</tt>
      def revert!
        if respond_to?(:on_revert)
          messages = ["Reverting at #{Time.now}"]
          Resque::Plugins::State::Hash.unpause(uuid) if should_pause?
          @reverting = true
          job_status('status' => STATUS_REVERTING,
                     'message' => messages[0])
          @logger.info("Job #{@uuid}: #{messages.join(' ')}")
          raise Revert
        else
          @logger.error("Job #{@uuid}: Attempted revert on job with no revert"\
                        " support")
          Resque::Plugins::State::Hash.no_revert(@uuid)
          pause!('This job does not support revert functionality')
        end
      end

      # pause the current job, setting the status to 'paused' and sleeping 10
      # seconds
      def pause!(pause_text = nil)
        Resque::Plugins::State::Hash.pause(uuid)
        messages = ["Paused at #{Time.now} #{pause_text}"]
        job_status('status' => STATUS_PAUSED,
                   'message' => messages[0])
        raise Killed if @testing # Don't loop or complete during testing
        @logger.info("Job #{@uuid}: #{messages.join(' ')}")
        while should_pause?
          kill! if should_kill?
          revert! if should_revert?
          sleep 10
        end
      end

      # lock against a provided or automatic key to prevent duplicate jobs
      def lock!(key = nil)
        lock = Digest::SHA1.hexdigest @options.to_json
        lock = key if key
        if locked?(lock)
          messages = ["Waiting at #{Time.now} due to existing job"]
          job_status('status' => STATUS_WAITING,
                     'message' => messages[0])
          while locked?(lock)
            kill! if should_kill?
            pause! if should_pause?
            sleep 10
          end
        else
          Resque::Plugins::State::Hash.lock(lock)
        end
      end

      # unlock the provided or automatic key at the end of a job
      def unlock!(key = nil)
        lock = Digest::SHA1.hexdigest @options.to_json
        lock = key if key
        Resque::Plugins::State::Hash.unlock(lock)
      end

      private

      def job_status(*args)
        self.status = [status, { 'name' => name }, args].flatten
      end
    end
  end
end