lib/resque/scheduler/server.rb
# vim:fileencoding=utf-8
require 'resque-scheduler'
require 'resque/server'
require 'tilt/erb'
require 'json'
# Extend Resque::Server to add tabs
module Resque
module Scheduler
module Server
TIMESTAMP_FORMAT = '%Y-%m-%d %H:%M:%S %z'.freeze
unless defined?(::Resque::Scheduler::Server::VIEW_PATH)
VIEW_PATH = File.join(File.dirname(__FILE__), 'server', 'views')
end
def self.included(base)
base.class_eval do
helpers { include HelperMethods }
include ServerMethods
get('/schedule') { schedule }
post('/schedule/requeue') { schedule_requeue }
post('/schedule/requeue_with_params') do
schedule_requeue_with_params
end
delete('/schedule') { delete_schedule }
get('/delayed') { delayed }
get('/delayed/jobs/:klass') { delayed_jobs_klass }
post('/delayed/search') { delayed_search }
get('/delayed/:timestamp') { delayed_timestamp }
post('/delayed/queue_now') { delayed_queue_now }
post('/delayed/cancel_now') { delayed_cancel_now }
post('/delayed/clear') { delayed_clear }
end
end
module ServerMethods
def schedule
Resque.reload_schedule! if Resque::Scheduler.dynamic
erb scheduler_template('scheduler')
end
def schedule_requeue
@job_name = params['job_name'] || params[:job_name]
config = Resque.schedule[@job_name]
@parameters = config['parameters'] || config[:parameters]
if @parameters
erb scheduler_template('requeue-params')
else
Resque::Scheduler.enqueue_from_config(config)
redirect u('/overview')
end
end
def schedule_requeue_with_params
job_name = params['job_name'] || params[:job_name]
config = Resque.schedule[job_name]
# Build args hash from post data (removing the job name)
submitted_args = params.reject do |key, _value|
key == 'job_name' || key == :job_name
end
# Merge constructed args hash with existing args hash for
# the job, if it exists
config_args = config['args'] || config[:args] || {}
config_args = config_args.merge(submitted_args)
# Insert the args hash into config and queue the resque job
config = config.merge('args' => config_args)
Resque::Scheduler.enqueue_from_config(config)
redirect u('/overview')
end
def delete_schedule
if Resque::Scheduler.dynamic
job_name = params['job_name'] || params[:job_name]
Resque.remove_schedule(job_name)
end
redirect u('/schedule')
end
def delayed
erb scheduler_template('delayed')
end
def delayed_jobs_klass
begin
klass = Resque::Scheduler::Util.constantize(params[:klass])
@args = JSON.load(CGI.unescape(params[:args]))
@timestamps = Resque.scheduled_at(klass, *@args)
rescue
@timestamps = []
end
erb scheduler_template('delayed_schedules')
end
def delayed_search
@jobs = find_job(params[:search])
erb scheduler_template('search')
end
def delayed_timestamp
erb scheduler_template('delayed_timestamp')
end
def delayed_queue_now
timestamp = params['timestamp'].to_i
formatted_time = Time.at(timestamp).strftime(
::Resque::Scheduler::Server::TIMESTAMP_FORMAT
)
if timestamp > 0
unless Resque::Scheduler.enqueue_next_item(timestamp)
@error_message = "Unable to remove item at #{formatted_time}"
end
else
@error_message = "Incorrect timestamp #{formatted_time}"
end
erb scheduler_template('delayed')
end
def delayed_cancel_now
klass = Resque::Scheduler::Util.constantize(params['klass'])
timestamp = params['timestamp']
args = Resque.decode params['args']
Resque.remove_delayed_job_from_timestamp(timestamp, klass, *args)
redirect u('/delayed')
end
def delayed_clear
Resque.reset_delayed_queue
redirect u('delayed')
end
end
module HelperMethods
def format_time(t)
t.strftime(::Resque::Scheduler::Server::TIMESTAMP_FORMAT)
end
def show_job_arguments(args)
Array(args).map(&:inspect).join("\n")
end
def queue_from_class_name(class_name)
Resque.queue_from_class(
Resque::Scheduler::Util.constantize(class_name)
)
end
def find_job(worker)
worker = worker.downcase
results = working_jobs_for_worker(worker)
dels = delayed_jobs_for_worker(worker)
results += dels.select do |j|
j['class'].downcase.include?(worker) &&
j.merge!('where_at' => 'delayed')
end
Resque.queues.each do |queue|
queued = Resque.peek(queue, 0, Resque.size(queue))
queued = [queued] unless queued.is_a?(Array)
results += queued.select do |j|
j['class'].downcase.include?(worker) &&
j.merge!('queue' => queue, 'where_at' => 'queued')
end
end
results
end
def schedule_interval(config)
if config['every']
schedule_interval_every(config['every'])
elsif config['cron']
'cron: ' + config['cron'].to_s
else
'Not currently scheduled'
end
end
def schedule_interval_every(every)
every = [*every]
s = 'every: ' << every.first
return s unless every.length > 1
s << ' ('
meta = every.last.map do |key, value|
"#{key.to_s.tr('_', ' ')} #{value}"
end
s << meta.join(', ') << ')'
end
def schedule_class(config)
if config['class'].nil? && !config['custom_job_class'].nil?
config['custom_job_class']
else
config['class']
end
end
def scheduler_template(name)
File.read(
File.expand_path("../server/views/#{name}.erb", __FILE__)
)
end
def scheduled_in_this_env?(name)
return true if rails_env(name).nil?
rails_env(name).split(/[\s,]+/).include?(Resque::Scheduler.env)
end
def rails_env(name)
Resque.schedule[name]['rails_env'] || Resque.schedule[name]['env']
end
def scheduler_view(filename, options = {}, locals = {})
source = File.read(File.join(VIEW_PATH, "#{filename}.erb"))
erb source, options, locals
end
private
def working_jobs_for_worker(worker)
[].tap do |results|
working = [*Resque.working]
work = working.select do |w|
w.job && w.job['payload'] &&
w.job['payload']['class'].downcase.include?(worker)
end
work.each do |w|
results += [
w.job['payload'].merge(
'queue' => w.job['queue'], 'where_at' => 'working'
)
]
end
end
end
def delayed_jobs_for_worker(_worker)
[].tap do |dels|
schedule_size = Resque.delayed_queue_schedule_size
Resque.delayed_queue_peek(0, schedule_size).each do |d|
Resque.delayed_timestamp_peek(
d, 0, Resque.delayed_timestamp_size(d)
).each do |j|
dels << j.merge!('timestamp' => d)
end
end
end
end
end
end
end
end
Resque::Server.tabs << 'Schedule'
Resque::Server.tabs << 'Delayed'
Resque::Server.class_eval do
include Resque::Scheduler::Server
end