QutBioacoustics/baw-workers

View on GitHub
lib/baw-workers/resque_api.rb

Summary

Maintainability
A
3 hrs
Test Coverage
module BawWorkers
  # Custom api for access to resque information.
  class ResqueApi
    class << self

      # Is this type of job with these args currently queued?
      # @param [Class] klass
      # @param [Hash] args
      # @return [Boolean]
      def job_queued?(klass, args = {})
        job_queued_in?(Resque.queue_from_class(klass), klass, args)
      end

      # Does this queue have this type of job with these args?
      # @param [String] queue
      # @param [Class] klass
      # @param [Hash] args
      # @return [Boolean]
      def job_queued_in?(queue, klass, args = {})
        # from https://github.com/neighborland/resque_solo/blob/master/lib/resque_ext/resque.rb
        item = BawWorkers::ResqueJobId.create_payload(klass, args)
        return nil unless ResqueSolo::Queue.is_unique?(item)
        ResqueSolo::Queue.queued?(queue, item)
      end

      # Get all currently queued jobs.
      # @return [Array<Hash>]
      def jobs_queued
        jobs = []
        Resque.queues.each do |queue|
          jobs.push(jobs_queued_in(queue))
        end
        jobs.flatten
      end

      # Get jobs in this queue.
      # @param [String] queue
      # @return [Array<Hash>]
      def jobs_queued_in(queue)
        # from http://blog.mojotech.com/hello-resque-whats-happening-under-the-hood/
        payloads = []
        index = 0
        while payload = Resque.redis.lindex("queue:#{queue}", index) do
          payloads << Resque.decode(payload).merge('queue' => queue)
          index += 1
        end
        payloads
      end

      # Get jobs in this queue of this type.
      # @param [String] queue
      # @param [Class] klass
      # @return [Array<Hash>]
      def jobs_queued_in_of(queue, klass)
        match_klass = klass.to_s
        jobs_queued_in(queue).select { |job| job['class'] == match_klass }
      end

      # Get jobs in this queue of this type with these args.
      # @param [String] queue
      # @param [Class] klass
      # @param [Hash] args
      # @return [Array<Hash>]
      def jobs_queued_in_of_with(queue, klass, args = {})
        item = BawWorkers::ResqueJobId.create_payload(klass, args)
        jobs_queued_in(queue).select { |job| job['class'] == item['class'] && compare_args(job['args'], item['args']) }
      end

      # Get queued jobs of this type.
      # @param [Class] klass
      # @return [Array<Hash>]
      def jobs_queued_of(klass)
        match_klass = klass.to_s
        jobs_queued.select { |job| job['class'] == match_klass }
      end

      # Get queued jobs of this type with these args.
      # @param [Class] klass
      # @param [Hash] args
      # @return [Array<Hash>]
      def jobs_queued_of_with(klass, args = {})
        item = BawWorkers::ResqueJobId.create_payload(klass, args)
        jobs_queued.select { |job| job['class'] == item['class'] && compare_args(job['args'], item['args']) }
      end

      # Get the currently running jobs.
      # @return [Array<Hash>]
      def jobs_running
        # from http://blog.mojotech.com/hello-resque-whats-happening-under-the-hood/
        # payload_class, args, queue
        Resque::Worker.working.map(&:job)
      end

      # Get the currently running jobs.
      # @param [Class] klass
      # @return [Array<Hash>]
      def jobs_running_of(klass)
        match_klass = klass.to_s
        jobs_running.select { |job| job['class'] == match_klass }
      end

      # Get the currently running jobs.
      # @param [Class] klass
      # @param [Hash] args
      # @return [Array<Hash>]
      def jobs_running_of_with(klass, args = {})
        item = BawWorkers::ResqueJobId.create_payload(klass, args)
        jobs_running.select { |job| job['class'] == item['class'] && compare_args(job['args'], item['args']) }
      end

      # Get all jobs.
      # @return [Array<Hash>]
      def jobs
        jobs_queued + jobs_running
      end

      # Get all jobs of this type.
      # @param [Class] klass
      # @return [Array<Hash>]
      def jobs_of(klass)
        jobs_queued_of(klass) + jobs_running_of(klass)
      end

      # Get all jobs of this type with these args.
      # @param [Class] klass
      # @param [Hash] args
      # @return [Array<Hash>]
      def jobs_of_with(klass, args = {})
        jobs_queued_of_with(klass, args) + jobs_running_of_with(klass, args)
      end

      def compare_args(a, b)
        # test payload hash first - must match to continue
        return false unless a[0].is_a?(String) && b[0].is_a?(String) && a[0] == b[0]

        # convert array a to the same format as array b
        a_array = []
        a[1].each_pair do |key, value|
          a_array.push([key, value])
        end

        BawWorkers::Validation.compare(a_array, b[1..-1])
      end

      # List all running workers.
      # @return [Array] details of running workers
      def workers_running
        workers = Resque.workers
        running_workers = []

        if workers.size > 0

          BawWorkers::Config.logger_worker.info(self.name) {
            "There are #{workers.size} Resque workers currently running."
          }

          workers.each do |worker|
            running_workers.push(worker.to_s)
          end

          BawWorkers::Config.logger_worker.info(self.name) {
            worker_details = running_workers.map { |worker|
              host, pid, queues = worker.split(':')
              {host: host, pid: pid, queues: queues}
            }.join(',')

            "Resque worker details: #{worker_details}."
          }

        else
          BawWorkers::Config.logger_worker.info(self.name) {
            'No Resque workers currently running.'
          }
        end

        running_workers
      end

      # Quit all running workers.
      # @return [Array<Integer>] worker pids
      def workers_stop_all
        pids = []
        Resque.workers.each do |worker|
          pids.concat(worker.worker_pids)
        end

        pids = pids.uniq

        BawWorkers::Config.logger_worker.info(self.name) {
          "Pids of running Resque workers: '#{pids.join(',')}'."
        }

        unless pids.empty?
          syscmd = "kill -s QUIT #{pids.join(' ')}"

          BawWorkers::Config.logger_worker.warn(self.name) {
            "Running syscmd to kill all workers: '#{syscmd}'"
          }

          system(syscmd)
        end

        pids
      end

      # Clear a queue
      # @see https://gist.github.com/denmarkin/1228863
      # @param [String] queue
      # @return [void]
      def clear_queue(queue)
        BawWorkers::Config.logger_worker.warn(self.name) {
          "Clearing queue #{queue}..."
        }
        Resque.remove_queue_with_cleanup(queue)
      end

      # Clear resque stats
      # @see https://gist.github.com/denmarkin/1228863
      # @return [void]
      def clear_stats
        BawWorkers::Config.logger_worker.warn(self.name) {
          'Clearing stats...'
        }
        Resque.redis.set 'stat:failed', 0
        Resque.redis.set 'stat:processed', 0
      end

      # Retry failed jobs
      # @see https://gist.github.com/CharlesP/1818418754aec03403b3
      def retry_failed
        redis = Resque.redis
        failure_count = Resque::Failure.count
        retried_count = 0

        BawWorkers::Config.logger_worker.warn(self.name) {
          "Retrying failed jobs (total : #{failure_count})."
        }

        (0...failure_count).each do |i|
          serialized_job = redis.lindex(:failed, i)
          job = Resque.decode(serialized_job)

          next if job.nil?
          if job['exception'] == 'Resque::DirtyExit'
            retried_count = retried_count +1
            BawWorkers::Config.logger_worker.warn(self.name) {
              "Retrying job  #{job['payload']['class']}..."
            }
            Resque::Failure.requeue(i)
            Resque::Failure.remove(i)
          end
        end

        BawWorkers::Config.logger_worker.warn(self.name) {
          "Retried #{retried_count} failed jobs."
        }
      end

      # Get a Resque::Status hash for the matching action job and payload.
      # Required when you need to regenerate a deterministic key.
      # @param [Class] action_class
      # @param [Hash] args
      # @return [Resque::Plugins::Status::Hash] status
      def status(action_class, args = {})
        job_id = BawWorkers::ResqueJobId.create_id_props(action_class, args)
        Resque::Plugins::Status::Hash.get(job_id)
      end

      # Get a Resque::Status hash for the provided unique key.
      # @param [String] unique_key - the key that uniquely identifies this action. Typically a uuid.
      # @return [Resque::Plugins::Status::Hash] status
      def status_by_key(unique_key)
        Resque::Plugins::Status::Hash.get(unique_key)
      end

      # Get a resque:status' key expire time
      def status_ttl(unique_key)
        key = Resque::Plugins::Status::Hash.status_key(unique_key)
        Resque::Plugins::Status::Hash.redis.ttl(key)
      end

    end
  end
end