lib/sidekiq/limit_fetch/global/semaphore.rb
# frozen_string_literal: true
module Sidekiq
module LimitFetch
module Global
class Semaphore
PREFIX = 'limit_fetch'
attr_reader :local_busy
def initialize(name)
@name = name
@lock = Mutex.new
@local_busy = 0
end
def limit
value = redis { |it| it.get "#{PREFIX}:limit:#{@name}" }
value&.to_i
end
def limit=(value)
@limit_changed = true
if value
redis { |it| it.set "#{PREFIX}:limit:#{@name}", value }
else
redis { |it| it.del "#{PREFIX}:limit:#{@name}" }
end
end
def limit_changed?
@limit_changed
end
def process_limit
value = redis { |it| it.get "#{PREFIX}:process_limit:#{@name}" }
value&.to_i
end
def process_limit=(value)
if value
redis { |it| it.set "#{PREFIX}:process_limit:#{@name}", value }
else
redis { |it| it.del "#{PREFIX}:process_limit:#{@name}" }
end
end
def acquire
Selector.acquire([@name], namespace).size.positive?
end
def release
redis { |it| it.lrem "#{PREFIX}:probed:#{@name}", 1, Selector.uuid }
end
def busy
redis { |it| it.llen "#{PREFIX}:busy:#{@name}" }
end
def busy_processes
redis { |it| it.lrange "#{PREFIX}:busy:#{@name}", 0, -1 }
end
def increase_busy
increase_local_busy
redis { |it| it.rpush "#{PREFIX}:busy:#{@name}", Selector.uuid }
end
def decrease_busy
decrease_local_busy
redis { |it| it.lrem "#{PREFIX}:busy:#{@name}", 1, Selector.uuid }
end
def probed
redis { |it| it.llen "#{PREFIX}:probed:#{@name}" }
end
def probed_processes
redis { |it| it.lrange "#{PREFIX}:probed:#{@name}", 0, -1 }
end
def pause
redis { |it| it.set "#{PREFIX}:pause:#{@name}", '1' }
end
def pause_for_ms(milliseconds)
redis { |it| it.psetex "#{PREFIX}:pause:#{@name}", milliseconds, 1 }
end
def unpause
redis { |it| it.del "#{PREFIX}:pause:#{@name}" }
end
def paused?
redis { |it| it.get "#{PREFIX}:pause:#{@name}" } == '1'
end
def block
redis { |it| it.set "#{PREFIX}:block:#{@name}", '1' }
end
def block_except(*queues)
raise ArgumentError if queues.empty?
redis { |it| it.set "#{PREFIX}:block:#{@name}", queues.join(',') }
end
def unblock
redis { |it| it.del "#{PREFIX}:block:#{@name}" }
end
def blocking?
redis { |it| it.get "#{PREFIX}:block:#{@name}" } == '1'
end
def clear_limits
redis do |it|
%w[block busy limit pause probed process_limit].each do |key|
it.del "#{PREFIX}:#{key}:#{@name}"
end
end
end
def increase_local_busy
@lock.synchronize { @local_busy += 1 }
end
def decrease_local_busy
@lock.synchronize { @local_busy -= 1 }
end
def local_busy?
@local_busy.positive?
end
def explain
<<-INFO.gsub(/^ {8}/, '')
Current sidekiq process: #{Selector.uuid}
All processes:
#{Monitor.all_processes.join "\n"}
Stale processes:
#{Monitor.old_processes.join "\n"}
Locked queue processes:
#{probed_processes.sort.join "\n"}
Busy queue processes:
#{busy_processes.sort.join "\n"}
Limit:
#{limit.inspect}
Process limit:
#{process_limit.inspect}
Blocking:
#{blocking?}
INFO
end
def remove_locks_except!(processes)
locked_processes = probed_processes.uniq
(locked_processes - processes).each do |dead_process|
remove_lock! dead_process
end
end
def remove_lock!(process)
redis do |it|
it.lrem "#{PREFIX}:probed:#{@name}", 0, process
it.lrem "#{PREFIX}:busy:#{@name}", 0, process
end
end
private
def redis(&block)
Sidekiq.redis(&block)
end
def namespace
Sidekiq::LimitFetch::Queues.namespace
end
end
end
end
end