resque/resque-scheduler

View on GitHub
lib/resque/scheduler/server.rb

Summary

Maintainability
A
0 mins
Test Coverage
# vim:fileencoding=utf-8
require 'resque-scheduler'
require 'resque/server'
require 'tilt/erb'
require 'json'

# Extend Resque::Server to add tabs
module Resque
  module Scheduler
    module Server
      TIMESTAMP_FORMAT = '%Y-%m-%d %H:%M:%S %z'.freeze

      unless defined?(::Resque::Scheduler::Server::VIEW_PATH)
        VIEW_PATH = File.join(File.dirname(__FILE__), 'server', 'views')
      end

      def self.included(base)
        base.class_eval do
          helpers { include HelperMethods }
          include ServerMethods

          get('/schedule') { schedule }
          post('/schedule/requeue') { schedule_requeue }
          post('/schedule/requeue_with_params') do
            schedule_requeue_with_params
          end
          delete('/schedule') { delete_schedule }
          get('/delayed') { delayed }
          get('/delayed/jobs/:klass') { delayed_jobs_klass }
          post('/delayed/search') { delayed_search }
          get('/delayed/:timestamp') { delayed_timestamp }
          post('/delayed/queue_now') { delayed_queue_now }
          post('/delayed/cancel_now') { delayed_cancel_now }
          post('/delayed/clear') { delayed_clear }
        end
      end

      module ServerMethods
        def schedule
          Resque.reload_schedule! if Resque::Scheduler.dynamic
          erb scheduler_template('scheduler')
        end

        def schedule_requeue
          @job_name = params['job_name'] || params[:job_name]
          config = Resque.schedule[@job_name]
          @parameters = config['parameters'] || config[:parameters]
          if @parameters
            erb scheduler_template('requeue-params')
          else
            Resque::Scheduler.enqueue_from_config(config)
            redirect u('/overview')
          end
        end

        def schedule_requeue_with_params
          job_name = params['job_name'] || params[:job_name]
          config = Resque.schedule[job_name]
          # Build args hash from post data (removing the job name)
          submitted_args = params.reject do |key, _value|
            key == 'job_name' || key == :job_name
          end

          # Merge constructed args hash with existing args hash for
          # the job, if it exists
          config_args = config['args'] || config[:args] || {}
          config_args = config_args.merge(submitted_args)

          # Insert the args hash into config and queue the resque job
          config = config.merge('args' => config_args)
          Resque::Scheduler.enqueue_from_config(config)
          redirect u('/overview')
        end

        def delete_schedule
          if Resque::Scheduler.dynamic
            job_name = params['job_name'] || params[:job_name]
            Resque.remove_schedule(job_name)
          end
          redirect u('/schedule')
        end

        def delayed
          erb scheduler_template('delayed')
        end

        def delayed_jobs_klass
          begin
            klass = Resque::Scheduler::Util.constantize(params[:klass])
            @args = JSON.load(CGI.unescape(params[:args]))
            @timestamps = Resque.scheduled_at(klass, *@args)
          rescue
            @timestamps = []
          end

          erb scheduler_template('delayed_schedules')
        end

        def delayed_search
          @jobs = find_job(params[:search])
          erb scheduler_template('search')
        end

        def delayed_timestamp
          erb scheduler_template('delayed_timestamp')
        end

        def delayed_queue_now
          timestamp = params['timestamp'].to_i
          formatted_time = Time.at(timestamp).strftime(
            ::Resque::Scheduler::Server::TIMESTAMP_FORMAT
          )

          if timestamp > 0
            unless Resque::Scheduler.enqueue_next_item(timestamp)
              @error_message = "Unable to remove item at #{formatted_time}"
            end
          else
            @error_message = "Incorrect timestamp #{formatted_time}"
          end

          erb scheduler_template('delayed')
        end

        def delayed_cancel_now
          klass = Resque::Scheduler::Util.constantize(params['klass'])
          timestamp = params['timestamp']
          args = Resque.decode params['args']
          Resque.remove_delayed_job_from_timestamp(timestamp, klass, *args)
          redirect u('/delayed')
        end

        def delayed_clear
          Resque.reset_delayed_queue
          redirect u('delayed')
        end
      end

      module HelperMethods
        def format_time(t)
          t.strftime(::Resque::Scheduler::Server::TIMESTAMP_FORMAT)
        end

        def show_job_arguments(args)
          Array(args).map(&:inspect).join("\n")
        end

        def queue_from_class_name(class_name)
          Resque.queue_from_class(
            Resque::Scheduler::Util.constantize(class_name)
          )
        end

        def find_job(worker)
          worker = worker.downcase
          results = working_jobs_for_worker(worker)

          dels = delayed_jobs_for_worker(worker)
          results += dels.select do |j|
            j['class'].downcase.include?(worker) &&
              j.merge!('where_at' => 'delayed')
          end

          Resque.queues.each do |queue|
            queued = Resque.peek(queue, 0, Resque.size(queue))
            queued = [queued] unless queued.is_a?(Array)
            results += queued.select do |j|
              j['class'].downcase.include?(worker) &&
                j.merge!('queue' => queue, 'where_at' => 'queued')
            end
          end

          results
        end

        def schedule_interval(config)
          if config['every']
            schedule_interval_every(config['every'])
          elsif config['cron']
            'cron: ' + config['cron'].to_s
          else
            'Not currently scheduled'
          end
        end

        def schedule_interval_every(every)
          every = [*every]
          s = 'every: ' << every.first

          return s unless every.length > 1

          s << ' ('
          meta = every.last.map do |key, value|
            "#{key.to_s.tr('_', ' ')} #{value}"
          end
          s << meta.join(', ') << ')'
        end

        def schedule_class(config)
          if config['class'].nil? && !config['custom_job_class'].nil?
            config['custom_job_class']
          else
            config['class']
          end
        end

        def scheduler_template(name)
          File.read(
            File.expand_path("../server/views/#{name}.erb", __FILE__)
          )
        end

        def scheduled_in_this_env?(name)
          return true if rails_env(name).nil?
          rails_env(name).split(/[\s,]+/).include?(Resque::Scheduler.env)
        end

        def rails_env(name)
          Resque.schedule[name]['rails_env'] || Resque.schedule[name]['env']
        end

        def scheduler_view(filename, options = {}, locals = {})
          source = File.read(File.join(VIEW_PATH, "#{filename}.erb"))
          erb source, options, locals
        end

        private

        def working_jobs_for_worker(worker)
          [].tap do |results|
            working = [*Resque.working]
            work = working.select do |w|
              w.job && w.job['payload'] &&
                w.job['payload']['class'].downcase.include?(worker)
            end
            work.each do |w|
              results += [
                w.job['payload'].merge(
                  'queue' => w.job['queue'], 'where_at' => 'working'
                )
              ]
            end
          end
        end

        def delayed_jobs_for_worker(_worker)
          [].tap do |dels|
            schedule_size = Resque.delayed_queue_schedule_size
            Resque.delayed_queue_peek(0, schedule_size).each do |d|
              Resque.delayed_timestamp_peek(
                d, 0, Resque.delayed_timestamp_size(d)
              ).each do |j|
                dels << j.merge!('timestamp' => d)
              end
            end
          end
        end
      end
    end
  end
end

Resque::Server.tabs << 'Schedule'
Resque::Server.tabs << 'Delayed'

Resque::Server.class_eval do
  include Resque::Scheduler::Server
end