mhenrixon/sidekiq-unique-jobs

View on GitHub
lib/sidekiq_unique_jobs/digests.rb

Summary

Maintainability
A
25 mins
Test Coverage
A
100%
# frozen_string_literal: true

module SidekiqUniqueJobs
  #
  # Class Changelogs provides access to the changelog entries
  #
  # @author Mikael Henriksson <mikael@mhenrixon.com>
  #
  class Digests < Redis::SortedSet
    #
    # @return [Integer] the number of matches to return by default
    DEFAULT_COUNT = 1_000
    #
    # @return [String] the default pattern to use for matching
    SCAN_PATTERN  = "*"
    #
    # @return [Array(String, String, String, String)] The empty runtime or queuetime keys.
    EMPTY_KEYS_SEGMENT = ["", "", "", ""].freeze

    def initialize(digests_key = DIGESTS)
      super
    end

    #
    # Adds a digest
    #
    # @param [String] digest the digest to add
    #
    def add(digest)
      redis { |conn| conn.zadd(key, now_f, digest) }
    end

    # Deletes unique digests by pattern
    #
    # @param [String] pattern a key pattern to match with
    # @param [Integer] count the maximum number
    # @return [Hash<String,Float>] Hash mapping of digest matching the given pattern and score

    def delete_by_pattern(pattern, count: DEFAULT_COUNT)
      result, elapsed = timed do
        digests = entries(pattern: pattern, count: count).keys
        redis { |conn| BatchDelete.call(digests, conn) }
      end

      log_info("#{__method__}(#{pattern}, count: #{count}) completed in #{elapsed}ms")

      result
    end

    # Delete unique digests by digest
    #   Also deletes the :AVAILABLE, :EXPIRED etc keys
    #
    # @param [String] digest a unique digest to delete
    # @param queuetime [Boolean] Whether to delete queue locks.
    # @param runtime [Boolean] Whether to delete run locks.
    def delete_by_digest(digest, queuetime: true, runtime: true)
      result, elapsed = timed do
        call_script(
          :delete_by_digest,
          queuetime_keys(queuetime ? digest : nil) + runtime_keys(runtime ? digest : nil) + [key],
        )
      end

      log_info("#{__method__}(#{digest}) completed in #{elapsed}ms")

      result
    end

    #
    # The entries in this sorted set
    #
    # @param [String] pattern SCAN_PATTERN the match pattern to search for
    # @param [Integer] count DEFAULT_COUNT the number of entries to return
    #
    # @return [Array<String>] an array of digests matching the given pattern
    #
    def entries(pattern: SCAN_PATTERN, count: DEFAULT_COUNT)
      redis { |conn| conn.zscan(key, match: pattern, count: count).to_a }.to_h
    end

    #
    # Returns a paginated
    #
    # @param [Integer] cursor the cursor for this iteration
    # @param [String] pattern SCAN_PATTERN the match pattern to search for
    # @param [Integer] page_size 100 the size per page
    #
    # @return [Array<Integer, Integer, Array<Lock>>] total_size, next_cursor, locks
    #
    def page(cursor: 0, pattern: SCAN_PATTERN, page_size: 100)
      redis do |conn|
        total_size, digests = conn.multi do |pipeline|
          pipeline.zcard(key)
          pipeline.zscan(key, cursor, match: pattern, count: page_size)
        end

        # NOTE: When debugging, check the last item in the returned array.
        [
          total_size.to_i,
          digests[0].to_i, # next_cursor
          digests[1].each_slice(2).map { |digest, score| Lock.new(digest, time: score) }, # entries
        ]
      end
    end

    private

    # @param digest [String, nil] The digest to form runtime keys from.
    # @return [Array(String, String, String, String)] The list of runtime keys or empty strings if +digest+ was +nil+.
    def runtime_keys(digest)
      return EMPTY_KEYS_SEGMENT unless digest

      [
        "#{digest}:RUN",
        "#{digest}:RUN:QUEUED",
        "#{digest}:RUN:PRIMED",
        "#{digest}:RUN:LOCKED",
      ]
    end

    # @param digest [String, nil] The digest to form queuetime keys from.
    # @return [Array(String, String, String, String)] The list of queuetime keys or empty strings if +digest+ was +nil+.
    def queuetime_keys(digest)
      return EMPTY_KEYS_SEGMENT unless digest

      [
        digest,
        "#{digest}:QUEUED",
        "#{digest}:PRIMED",
        "#{digest}:LOCKED",
      ]
    end
  end
end