lib/sucker_punch/queue.rb
require 'forwardable'
module SuckerPunch
class Queue < Concurrent::Synchronization::LockableObject
extend Forwardable
include Concurrent::ExecutorService
DEFAULT_MAX_QUEUE_SIZE = 0 # Unlimited
DEFAULT_EXECUTOR_OPTIONS = {
min_threads: 2,
max_threads: 2,
idletime: 60, # 1 minute
auto_terminate: false # Let shutdown modes handle thread termination
}.freeze
QUEUES = Concurrent::Map.new
def self.find_or_create(name, num_workers = 2, num_jobs_max = nil)
pool = QUEUES.fetch_or_store(name) do
options = DEFAULT_EXECUTOR_OPTIONS
.merge(
min_threads: num_workers,
max_threads: num_workers,
max_queue: num_jobs_max || DEFAULT_MAX_QUEUE_SIZE
)
Concurrent::ThreadPoolExecutor.new(**options)
end
new(name, pool)
end
def self.all
queues = Concurrent::Array.new
QUEUES.each_pair do |name, pool|
queues.push new(name, pool)
end
queues
end
def self.clear
# susceptible to race conditions--only use in testing
old = all
QUEUES.clear
SuckerPunch::Counter::Busy.clear
SuckerPunch::Counter::Processed.clear
SuckerPunch::Counter::Failed.clear
old.each { |queue| queue.kill }
end
def self.stats
queues = {}
all.each do |queue|
queues[queue.name] = {
"workers" => {
"total" => queue.total_workers,
"busy" => queue.busy_workers,
"idle" => queue.idle_workers,
},
"jobs" => {
"processed" => queue.processed_jobs,
"failed" => queue.failed_jobs,
"enqueued" => queue.enqueued_jobs,
}
}
end
queues
end
PAUSE_TIME = STDOUT.tty? ? 0.1 : 0.5
def self.shutdown_all
deadline = Time.now + SuckerPunch.shutdown_timeout
if SuckerPunch::RUNNING.make_false
# If a job is enqueued right before the script exits
# (command line, rake task, etc.), the system needs an
# interval to allow the enqueue jobs to make it in to the system
# otherwise the queue will look idle
sleep PAUSE_TIME
queues = all
# Issue shutdown to each queue and let them wrap up their work. This
# prevents new jobs from being enqueued and lets the pool clean up
# after itself
queues.each { |queue| queue.shutdown }
# return if every queue is empty and workers in every queue are idle
return if queues.all? { |queue| queue.idle? }
SuckerPunch.logger.info("Pausing to allow workers to finish...")
remaining = deadline - Time.now
# Continue to loop through each queue and test if it's idle, while
# respecting the shutdown timeout
while remaining > PAUSE_TIME
return if queues.all? { |queue| queue.idle? }
sleep PAUSE_TIME
remaining = deadline - Time.now
end
# Queues haven't finished work. Aggressively kill them.
SuckerPunch.logger.warn("Queued jobs didn't finish before shutdown_timeout...killing remaining jobs")
queues.each { |queue| queue.kill }
end
end
def self.wait
queues = all
# return if every queue is empty and workers in every queue are idle
return if queues.all? { |queue| queue.idle? }
SuckerPunch.logger.info("Pausing to allow workers to finish...")
while queues.any? { |queue| !queue.idle? }
sleep PAUSE_TIME
end
end
attr_reader :name
def_delegators :@pool,
:max_length,
:min_length,
:max_queue,
:length,
:queue_length,
:wait_for_termination#,
#:idletime,
#:max_queue,
#:largest_length,
#:scheduled_task_count,
#:completed_task_count,
#:can_overflow?,
#:remaining_capacity,
#:running?,
#:shuttingdown?
alias_method :total_workers, :length
alias_method :enqueued_jobs, :queue_length
def initialize(name, pool)
super()
@running = true
@name, @pool = name, pool
end
def running?
synchronize { @running }
end
def idle?
enqueued_jobs == 0 && busy_workers == 0
end
def ==(other)
pool == other.pool
end
def busy_workers
SuckerPunch::Counter::Busy.new(name).value
end
def idle_workers
total_workers - busy_workers
end
def processed_jobs
SuckerPunch::Counter::Processed.new(name).value
end
def failed_jobs
SuckerPunch::Counter::Failed.new(name).value
end
def post(*args, &block)
synchronize do
if @running
@pool.post(*args, &block)
else
false
end
end
end
ruby2_keywords(:post) if respond_to?(:ruby2_keywords, true)
def kill
@pool.kill
end
def shutdown
synchronize { @running = false }
@pool.shutdown
end
protected
def pool
@pool
end
end
end