mhenrixon/sidekiq-unique-jobs

View on GitHub
lib/sidekiq_unique_jobs/batch_delete.rb

Summary

Maintainability
A
0 mins
Test Coverage
A
100%
# frozen_string_literal: true

module SidekiqUniqueJobs
  #
  # Class BatchDelete provides batch deletion of digests
  #
  # @author Mikael Henriksson <mikael@mhenrixon.com>
  #
  class BatchDelete
    #
    # @return [Integer] the default batch size
    BATCH_SIZE = 500

    #
    # @return [Array<String>] Supported key suffixes
    SUFFIXES = %w[
      QUEUED
      PRIMED
      LOCKED
      INFO
    ].freeze

    # includes "SidekiqUniqueJobs::Connection"
    # @!parse include SidekiqUniqueJobs::Connection
    include SidekiqUniqueJobs::Connection
    # includes "SidekiqUniqueJobs::Logging"
    # @!parse include SidekiqUniqueJobs::Logging
    include SidekiqUniqueJobs::Logging

    #
    # @!attribute [r] digests
    #   @return [Array<String>] a collection of digests to be deleted
    attr_reader :digests
    #
    # @!attribute [r] conn
    #   @return [Redis, RedisConnection, ConnectionPool] a redis connection
    attr_reader :conn

    #
    # Executes a batch deletion of the provided digests
    #
    # @param [Array<String>] digests the digests to delete
    # @param [Redis] conn the connection to use for deletion
    #
    # @return [void]
    #
    def self.call(digests, conn = nil)
      new(digests, conn).call
    end

    #
    # Initialize a new batch delete instance
    #
    # @param [Array<String>] digests the digests to delete
    # @param [Redis] conn the connection to use for deletion
    #
    def initialize(digests, conn)
      @count   = 0
      @digests = digests
      @conn    = conn
      @digests ||= []
      @digests.compact!
      redis_version # Avoid pipelined calling redis_version and getting a future.
    end

    #
    # Executes a batch deletion of the provided digests
    # @note Just wraps batch_delete to be able to provide no connection
    #
    #
    def call
      return log_info("Nothing to delete; exiting.") if digests.none?

      log_info("Deleting batch with #{digests.size} digests")
      return batch_delete(conn) if conn

      redis { |rcon| batch_delete(rcon) }
    end

    private

    #
    # Does the actual batch deletion
    #
    #
    # @return [Integer] the number of deleted digests
    #
    def batch_delete(conn)
      digests.each_slice(BATCH_SIZE) do |chunk|
        conn.pipelined do |pipeline|
          chunk.each do |digest|
            del_digest(pipeline, digest)
            pipeline.zrem(SidekiqUniqueJobs::DIGESTS, digest)
            pipeline.zrem(SidekiqUniqueJobs::EXPIRING_DIGESTS, digest)
            @count += 1
          end
        end
      end

      @count
    end

    def del_digest(pipeline, digest)
      removable_keys = keys_for_digest(digest)

      pipeline.unlink(*removable_keys)
    end

    def keys_for_digest(digest)
      [digest, "#{digest}:RUN"].each_with_object([]) do |key, digest_keys|
        digest_keys.push(key)
        digest_keys.concat(SUFFIXES.map { |suffix| "#{key}:#{suffix}" })
      end
    end

    def redis_version
      @redis_version ||= SidekiqUniqueJobs.config.redis_version
    end
  end
end