lib/quebert/backend/beanstalk.rb
require "beaneater"
require "forwardable"
module Quebert
module Backend
# Manage jobs on a Beanstalk queue out of process
class Beanstalk
extend Forwardable
include Logging
# A buffer time in seconds added to the Beanstalk TTR for Quebert to do
# its own job cleanup The job will perform based on the Beanstalk TTR,
# but Beanstalk hangs on to the job just a little longer so that Quebert
# can bury the job or schedule a retry with the appropriate delay
TTR_BUFFER = 5
attr_reader :host, :queue
attr_writer :queues
def initialize(host, queue)
@host = host
@queue = queue
@queues = []
end
def self.configure(opts = {})
new(opts.fetch(:host, "127.0.0.1:11300"), opts.fetch(:queue))
end
def reserve_without_controller(timeout=nil)
watch_tubes
beanstalkd_tubes.reserve(timeout)
end
def reserve(timeout=nil)
Controller::Beanstalk.new(reserve_without_controller(timeout))
end
# For testing purposes... I think there's a better way to do this though.
def drain!
while peek(:ready) do
reserve_without_controller.delete
end
while peek(:delayed) do
reserve_without_controller.delete
end
while peek(:buried) do
kick
reserve_without_controller.delete
end
end
# TODO add a queue param?
def_delegators :default_tube, :peek, :kick
def put(job)
tube = beanstalkd_tubes[job.queue || queue]
tube.put(job.to_json,
:pri => job.priority,
:delay => job.delay,
:ttr => job.ttr + TTR_BUFFER)
end
private
def default_tube
@default_tube ||= beanstalkd_tubes[queue]
end
def beanstalkd_connection
@beanstalkd_connection ||= Beaneater.new(host)
end
def beanstalkd_tubes
beanstalkd_connection.tubes
end
def watch_tubes
if queues != @watched_tube_names
@watched_tube_names = queues
logger.info "Watching beanstalkd queues #{@watched_tube_names.inspect}"
beanstalkd_tubes.watch!(*@watched_tube_names)
end
end
def queues
@queues.empty? ? [queue] : @queues
end
end
end
end