lib/quebert/controller/beanstalk.rb
require 'benchmark'
module Quebert
module Controller
# Handle interactions between a job and a Beanstalk queue.
class Beanstalk < Base
include Logging
attr_reader :beanstalk_job, :job
MAX_TIMEOUT_RETRY_DELAY = 300
TIMEOUT_RETRY_DELAY_SEED = 2
TIMEOUT_RETRY_GROWTH_RATE = 3
def initialize(beanstalk_job)
@beanstalk_job = beanstalk_job
@job = Job.from_json(beanstalk_job.body)
rescue => e
beanstalk_job.bury
logger.error "Error caught on initialization. #{e.inspect}"
raise
end
def perform
logger.error(job) { "Performing #{job.class.name}." }
logger.error(job) { "Beanstalk Job Stats: #{beanstalk_job.stats.inspect}" }
result = false
time = Benchmark.realtime do
result = job.perform!
delete
end
logger.info(job) { "Completed in #{(time*1000*1000).to_i/1000.to_f} ms\n" }
result
rescue Job::Delete
logger.info(job) { "Deleting job" }
delete
logger.info(job) { "Job deleted" }
rescue Job::Release
logger.info(job) { "Releasing with priority: #{job.priority} and delay: #{job.delay}" }
release(pri: job.priority, delay: job.delay)
logger.info(job) { "Job released" }
rescue Job::Bury
logger.info(job) { "Burying job" }
bury
logger.info(job) { "Job buried" }
rescue Job::Timeout => e
logger.info(job) { "Job timed out. Retrying with delay. #{e.inspect} #{e.backtrace.join("\n")}" }
retry_with_delay
raise
rescue Job::Retry
# The difference between the Retry and Timeout class is that
# Retry does not logger.error(job) { an exception where as Timeout does }
logger.info(job) { "Manually retrying with delay" }
retry_with_delay
rescue => e
logger.error(job) { "Error caught on perform. Burying job. #{e.inspect} #{e.backtrace.join("\n")}" }
bury
logger.error(job) { "Job buried" }
raise
end
protected
def retry_with_delay
delay = TIMEOUT_RETRY_DELAY_SEED + TIMEOUT_RETRY_GROWTH_RATE**beanstalk_job.stats["releases"].to_i
if delay > MAX_TIMEOUT_RETRY_DELAY
logger.error(job) { "Max retry delay exceeded. Burying job" }
bury
logger.error(job) { "Job buried" }
else
logger.error(job) { "TTR exceeded. Releasing with priority: #{job.priority} and delay: #{delay}" }
release(pri: job.priority, delay: delay)
logger.error(job) { "Job released" }
end
rescue ::Beaneater::NotFoundError
logger.error(job) { "Job ran longer than allowed. Beanstalk already deleted it!!!!" }
# Sometimes the timer doesn't behave correctly and this job actually runs longer than
# allowed. At that point the beanstalk job no longer exists anymore. Lets let it go and don't blow up.
end
def bury
job.around_bury do
beanstalk_job.bury
end
end
def release(opts)
job.around_release do
beanstalk_job.release(opts)
end
end
def delete
job.around_delete do
beanstalk_job.delete
end
end
end
end
end