QutBioacoustics/baw-workers

View on GitHub
lib/baw-workers/action_base.rb

Summary

Maintainability
A
0 mins
Test Coverage
module BawWorkers
  # Common functionality for resque actions.
  class ActionBase

    # Ensure that there is only one job with the same payload per queue.
    # The default method to create a job ID from these parameters is to
    # do some normalization on the payload and then md5'ing it
    include Resque::Plugins::UniqueJob

    # a set of keys starting with 'stats:jobs:queue_name' inside your Resque redis namespace
    # Jobs performed
    # Jobs enqueued
    # Jobs failed
    # Duration of last x jobs completed
    # Average job duration over last 100 jobs completed
    # Longest job duration over last 100 jobs completed
    # Jobs enqueued as timeseries data (minute, hour, day)
    # Jobs performed as timeseries data (minute, hour, day)
    extend Resque::Plugins::JobStats

    # track specific job instances and their status.
    # resque-status achieves this by giving job instances UUID's
    # and allowing the job instances to report their
    # status from within their iterations.
    # WARNING: our own monkey patch is included in `resque_status_custom_expire.rb`.
    include Resque::Plugins::Status

    # Class methods
    class << self

      # Override: Get the key for resque_solo to use in redis.
      def redis_key(payload)
        BawWorkers::ResqueJobId.create_id_payload(payload)
      end

      # Overrides method used by resque-status.
      # Uses resque_solo redis key instead of random uuid.
      # Adds a job of type <tt>klass<tt> to a specified queue with <tt>options<tt>.
      #
      # Returns the UUID of the job if the job was queued, or nil if the job was
      # rejected by a before_enqueue hook.
      def enqueue_to(queue, klass, options = {})
        uuid = BawWorkers::ResqueJobId.create_id_props(klass, options)
        Resque::Plugins::Status::Hash.create uuid, :options => options

        if Resque.enqueue_to(queue, klass, uuid, options)
          uuid
        else
          Resque::Plugins::Status::Hash.remove(uuid)
          nil
        end
      end

      # By default, lock_after_execution_period is 0 and enqueued? becomes
      # false as soon as the job is being worked on.
      # The lock_after_execution_period setting can be used to delay when
      # the unique job key is deleted (i.e. when enqueued? becomes false).
      # For example, if you have a long-running unique job that takes around
      # 10 seconds, and you don't want to requeue another job until you are
      # sure it is done, you could set lock_after_execution_period = 20.
      # Or if you never want to run a long running job more than once per
      # minute, set lock_after_execution_period = 60.
      # @return [Fixnum]
      def lock_after_execution_period
        # 2 minutes
        120
      end

      # Get the queue for this action. Used by Resque. Overrides resque-status class method.
      # @return [Symbol] The queue.
      def queue
        fail NotImplementedError, 'Must implement `queue` method.'
      end

      # Name for logging.
      def logger_name
        self.name
      end
    end

    #
    # Instance methods
    #

    # inherited fields from Resque::Plugins::Status
    # @options, @uuid, status, name

    # Get the keys for the perform options hash.
    # Order is important
    # @return [Array<String>]
    def perform_options_keys
      fail NotImplementedError, 'You must implement `perform_options_keys` in your base class.'
    end

    # Perform method used by resque-status.
    def perform
      # To be ***painstakingly clear***: the values we're using in the perform method are actually coming from
      # the resque:status' payload.options field.
      # The resque payload itself is not actually used!

      values = perform_options_keys.map { |k| options[k] }

      # kill the job if it is in resque:status's kill list
      at(0, 1, "Begin action")

      # resolve partial payloads
      values = values.map { |value| BawWorkers::PartialPayload.resolve(value) }

      begin
        self.class.action_perform(*values)
      rescue BawWorkers::Exceptions::ActionCancelledError => ace
        # catch an action cancelled exception,
        # transform into resque:status killed exception
        kill!
      end
    end


    # Produces a sensible friendly name for this payload.
    # Should be unique but does not need to be. Has no operational effect.
    # This value is only used when the status is updated by resque:status.
    #
    # We've purposely broken resque:status' implementation because it does a `.inspect` on the object which leads
    # to all the publicly available fields being serialized. This also means that job payloads are effectively
    # duplicated in a serialized form into the name field.
    # https://github.com/QutBioacoustics/baw-workers/issues/41
    def name
      fail NotImplementedError, 'You must implement `name` in your base class.'
    end

  end
end