brainopia/sidekiq-limit_fetch

View on GitHub
lib/sidekiq/limit_fetch/global/semaphore.rb

Summary

Maintainability
B
4 hrs
Test Coverage
# frozen_string_literal: true

module Sidekiq
  module LimitFetch
    module Global
      class Semaphore
        PREFIX = 'limit_fetch'

        attr_reader :local_busy

        def initialize(name)
          @name = name
          @lock = Mutex.new
          @local_busy = 0
        end

        def limit
          value = redis { |it| it.get "#{PREFIX}:limit:#{@name}" }
          value&.to_i
        end

        def limit=(value)
          @limit_changed = true

          if value
            redis { |it| it.set "#{PREFIX}:limit:#{@name}", value }
          else
            redis { |it| it.del "#{PREFIX}:limit:#{@name}" }
          end
        end

        def limit_changed?
          @limit_changed
        end

        def process_limit
          value = redis { |it| it.get "#{PREFIX}:process_limit:#{@name}" }
          value&.to_i
        end

        def process_limit=(value)
          if value
            redis { |it| it.set "#{PREFIX}:process_limit:#{@name}", value }
          else
            redis { |it| it.del "#{PREFIX}:process_limit:#{@name}" }
          end
        end

        def acquire
          Selector.acquire([@name], namespace).size.positive?
        end

        def release
          redis { |it| it.lrem "#{PREFIX}:probed:#{@name}", 1, Selector.uuid }
        end

        def busy
          redis { |it| it.llen "#{PREFIX}:busy:#{@name}" }
        end

        def busy_processes
          redis { |it| it.lrange "#{PREFIX}:busy:#{@name}", 0, -1 }
        end

        def increase_busy
          increase_local_busy
          redis { |it| it.rpush "#{PREFIX}:busy:#{@name}", Selector.uuid }
        end

        def decrease_busy
          decrease_local_busy
          redis { |it| it.lrem "#{PREFIX}:busy:#{@name}", 1, Selector.uuid }
        end

        def probed
          redis { |it| it.llen "#{PREFIX}:probed:#{@name}" }
        end

        def probed_processes
          redis { |it| it.lrange "#{PREFIX}:probed:#{@name}", 0, -1 }
        end

        def pause
          redis { |it| it.set "#{PREFIX}:pause:#{@name}", '1' }
        end

        def pause_for_ms(milliseconds)
          redis { |it| it.psetex "#{PREFIX}:pause:#{@name}", milliseconds, 1 }
        end

        def unpause
          redis { |it| it.del "#{PREFIX}:pause:#{@name}" }
        end

        def paused?
          redis { |it| it.get "#{PREFIX}:pause:#{@name}" } == '1'
        end

        def block
          redis { |it| it.set "#{PREFIX}:block:#{@name}", '1' }
        end

        def block_except(*queues)
          raise ArgumentError if queues.empty?

          redis { |it| it.set "#{PREFIX}:block:#{@name}", queues.join(',') }
        end

        def unblock
          redis { |it| it.del "#{PREFIX}:block:#{@name}" }
        end

        def blocking?
          redis { |it| it.get "#{PREFIX}:block:#{@name}" } == '1'
        end

        def clear_limits
          redis do |it|
            %w[block busy limit pause probed process_limit].each do |key|
              it.del "#{PREFIX}:#{key}:#{@name}"
            end
          end
        end

        def increase_local_busy
          @lock.synchronize { @local_busy += 1 }
        end

        def decrease_local_busy
          @lock.synchronize { @local_busy -= 1 }
        end

        def local_busy?
          @local_busy.positive?
        end

        def explain
          <<-INFO.gsub(/^ {8}/, '')
        Current sidekiq process: #{Selector.uuid}

          All processes:
        #{Monitor.all_processes.join "\n"}

          Stale processes:
        #{Monitor.old_processes.join "\n"}

          Locked queue processes:
        #{probed_processes.sort.join "\n"}

          Busy queue processes:
        #{busy_processes.sort.join "\n"}

          Limit:
        #{limit.inspect}

          Process limit:
        #{process_limit.inspect}

          Blocking:
        #{blocking?}
          INFO
        end

        def remove_locks_except!(processes)
          locked_processes = probed_processes.uniq
          (locked_processes - processes).each do |dead_process|
            remove_lock! dead_process
          end
        end

        def remove_lock!(process)
          redis do |it|
            it.lrem "#{PREFIX}:probed:#{@name}", 0, process
            it.lrem "#{PREFIX}:busy:#{@name}", 0, process
          end
        end

        private

        def redis(&block)
          Sidekiq.redis(&block)
        end

        def namespace
          Sidekiq::LimitFetch::Queues.namespace
        end
      end
    end
  end
end