mhenrixon/sidekiq-unique-jobs

View on GitHub
lib/sidekiq_unique_jobs/locksmith.rb

Summary

Maintainability
A
3 hrs
Test Coverage
A
98%
# frozen_string_literal: true

module SidekiqUniqueJobs
  # Lock manager class that handles all the various locks
  #
  # @author Mikael Henriksson <mikael@mhenrixon.com>
  class Locksmith # rubocop:disable Metrics/ClassLength
    # includes "SidekiqUniqueJobs::Connection"
    # @!parse include SidekiqUniqueJobs::Connection
    include SidekiqUniqueJobs::Connection

    # includes "SidekiqUniqueJobs::Logging"
    # @!parse include SidekiqUniqueJobs::Logging
    include SidekiqUniqueJobs::Logging

    # includes "SidekiqUniqueJobs::Reflectable"
    # @!parse include SidekiqUniqueJobs::Reflectable
    include SidekiqUniqueJobs::Reflectable

    # includes "SidekiqUniqueJobs::Timing"
    # @!parse include SidekiqUniqueJobs::Timing
    include SidekiqUniqueJobs::Timing

    # includes "SidekiqUniqueJobs::Script::Caller"
    # @!parse include SidekiqUniqueJobs::Script::Caller
    include SidekiqUniqueJobs::Script::Caller

    # includes "SidekiqUniqueJobs::JSON"
    # @!parse include SidekiqUniqueJobs::JSON
    include SidekiqUniqueJobs::JSON

    #
    # @return [Float] used to take into consideration the inaccuracy of redis timestamps
    CLOCK_DRIFT_FACTOR = 0.01
    NETWORK_FACTOR = 0.04

    #
    # @!attribute [r] key
    #   @return [Key] the key used for locking
    attr_reader :key
    #
    # @!attribute [r] job_id
    #   @return [String] a sidekiq JID
    attr_reader :job_id
    #
    # @!attribute [r] config
    #   @return [LockConfig] the configuration for this lock
    attr_reader :config
    #
    # @!attribute [r] item
    #   @return [Hash] a sidekiq job hash
    attr_reader :item

    #
    # Initialize a new Locksmith instance
    #
    # @param [Hash] item a Sidekiq job hash
    # @option item [Integer] :lock_ttl the configured expiration
    # @option item [String] :jid the sidekiq job id
    # @option item [String] :unique_digest the unique digest (See: {LockDigest#lock_digest})
    # @param [Sidekiq::RedisConnection, ConnectionPool] redis_pool the redis connection
    #
    def initialize(item, redis_pool = nil)
      @item        = item
      @key         = Key.new(item[LOCK_DIGEST] || item[UNIQUE_DIGEST]) # fallback until can be removed
      @job_id      = item[JID]
      @config      = LockConfig.new(item)
      @redis_pool  = redis_pool
    end

    #
    # Deletes the lock unless it has a pttl set
    #
    #
    def delete
      return if config.pttl.positive?

      delete!
    end

    #
    # Deletes the lock regardless of if it has a pttl set
    #
    def delete!
      call_script(:delete, key.to_a, argv).to_i.positive?
    end

    #
    # Create a lock for the Sidekiq job
    #
    # @return [String] the Sidekiq job_id that was locked/queued
    #
    def lock(wait: nil)
      method_name = wait ? :primed_async : :primed_sync
      redis(redis_pool) do |conn|
        lock!(conn, method(method_name), wait) do
          return job_id
        end
      end
    end

    def execute(&block)
      raise SidekiqUniqueJobs::InvalidArgument, "#execute needs a block" unless block

      redis(redis_pool) do |conn|
        lock!(conn, method(:primed_async), &block)
      end
    end

    #
    # Removes the lock keys from Redis if locked by the provided jid/token
    #
    # @return [false] unless locked?
    # @return [String] Sidekiq job_id (jid) if successful
    #
    def unlock(conn = nil)
      return false unless locked?(conn)

      unlock!(conn)
    end

    #
    # Removes the lock keys from Redis
    #
    # @return [false] unless locked?
    # @return [String] Sidekiq job_id (jid) if successful
    #
    def unlock!(conn = nil)
      call_script(:unlock, key.to_a, argv, conn) do |unlocked_jid|
        if unlocked_jid == job_id
          reflect(:debug, :unlocked, item, unlocked_jid)
          reflect(:unlocked, item)
        end

        unlocked_jid
      end
    end

    # Checks if this instance is considered locked
    #
    # @param [Sidekiq::RedisConnection, ConnectionPool] conn the redis connection
    #
    # @return [true, false] true when the :LOCKED hash contains the job_id
    #
    def locked?(conn = nil)
      return taken?(conn) if conn

      redis { |rcon| taken?(rcon) }
    end

    #
    # Nicely formatted string with information about self
    #
    #
    # @return [String]
    #
    def to_s
      "Locksmith##{object_id}(digest=#{key} job_id=#{job_id} locked=#{locked?})"
    end

    #
    # @see to_s
    #
    def inspect
      to_s
    end

    #
    # Compare this locksmith with another
    #
    # @param [Locksmith] other the locksmith to compare with
    #
    # @return [true, false]
    #
    def ==(other)
      key == other.key && job_id == other.job_id
    end

    private

    attr_reader :redis_pool

    #
    # Used to reduce some duplication from the two methods
    #
    # @see lock
    # @see execute
    #
    # @param [Sidekiq::RedisConnection, ConnectionPool] conn the redis connection
    # @param [Method] primed_method reference to the method to use for getting a primed token
    # @param [nil, Integer, Float] wait time to wait before timeout
    #
    # @yieldparam [string] job_id the sidekiq JID
    # @yieldreturn [void] whatever the calling block returns
    def lock!(conn, primed_method, wait = nil)
      return yield if locked?(conn)

      enqueue(conn) do |queued_jid|
        reflect(:debug, :queued, item, queued_jid)

        primed_method.call(conn, wait) do |primed_jid|
          reflect(:debug, :primed, item, primed_jid)
          locked_jid = call_script(:lock, key.to_a, argv, conn)

          if locked_jid
            reflect(:debug, :locked, item, locked_jid)
            return yield
          end
        end
      end
    end

    #
    # Prepares all the various lock data
    #
    # @param [Redis] conn a redis connection
    #
    # @return [nil] when redis was already prepared for this lock
    # @return [yield<String>] when successfully enqueued
    #
    def enqueue(conn)
      queued_jid, elapsed = timed do
        call_script(:queue, key.to_a, argv, conn)
      end

      return unless queued_jid
      return unless [job_id, "1"].include?(queued_jid)

      validity = config.pttl - elapsed - drift(config.pttl)
      return unless validity >= 0 || config.pttl.zero?

      write_lock_info(conn)
      yield job_id
    end

    #
    # Pops an enqueued token
    # @note Used for runtime locks to avoid problems with blocking commands
    #   in current thread
    #
    # @param [Redis] conn a redis connection
    #
    # @return [nil] when lock was not possible
    # @return [Object] whatever the block returns when lock was acquired
    #
    def primed_async(conn, wait = nil, &block) # rubocop:disable Metrics/MethodLength
      timeout = (wait || config.timeout).to_i
      timeout = 1 if timeout.zero?

      brpoplpush_timeout = timeout
      concurrent_timeout = add_drift(timeout)

      reflect(:debug, :timeouts, item,
        timeouts: {
          brpoplpush_timeout: brpoplpush_timeout,
          concurrent_timeout: concurrent_timeout,
        })

      # NOTE: When debugging, change .value to .value!
      primed_jid = Concurrent::Promises
        .future(conn) { |red_con| pop_queued(red_con, timeout) }
        .value

      handle_primed(primed_jid, &block)
    end

    #
    # Pops an enqueued token
    # @note Used for non-runtime locks
    #
    # @param [Redis] conn a redis connection
    #
    # @return [nil] when lock was not possible
    # @return [Object] whatever the block returns when lock was acquired
    #
    def primed_sync(conn, wait = nil, &block)
      primed_jid = pop_queued(conn, wait)
      handle_primed(primed_jid, &block)
    end

    def handle_primed(primed_jid)
      return yield job_id if [job_id, "1"].include?(primed_jid)

      reflect(:timeout, item) unless config.wait_for_lock?
    end

    #
    # Does the actual popping of the enqueued token
    #
    # @param [Redis] conn a redis connection
    #
    # @return [String] a previously enqueued token (now taken off the queue)
    #
    def pop_queued(conn, wait = 1)
      wait ||= config.timeout if config.wait_for_lock?

      if wait.nil?
        rpoplpush(conn)
      else
        brpoplpush(conn, wait)
      end
    end

    #
    # @api private
    #
    def brpoplpush(conn, wait)
      # passing timeout 0 to brpoplpush causes it to block indefinitely
      raise InvalidArgument, "wait must be an integer" unless wait.is_a?(Integer)

      conn.blmove(key.queued, key.primed, "RIGHT", "LEFT", wait)
    end

    #
    # @api private
    #
    def rpoplpush(conn)
      conn.lmove(key.queued, key.primed, "RIGHT", "LEFT")
    end

    #
    # Writes lock information to redis.
    #   The lock information contains information about worker, queue, limit etc.
    #
    #
    # @return [void]
    #
    def write_lock_info(conn)
      return unless config.lock_info?

      conn.set(key.info, lock_info)
    end

    #
    # Used to combat redis imprecision with ttl/pttl
    #
    # @param [Integer] val the value to compute drift for
    #
    # @return [Integer] a computed drift value
    #
    def drift(val)
      # Add 2 milliseconds to the drift to account for Redis expires
      # precision, which is 1 millisecond, plus 1 millisecond min drift
      # for small TTLs.
      (val + 2).to_f * CLOCK_DRIFT_FACTOR
    end

    def add_drift(val)
      val = val.to_f
      val + drift(val)
    end

    #
    # Checks if the lock has been taken
    #
    # @param [Redis] conn a redis connection
    #
    # @return [true, false]
    #
    def taken?(conn)
      conn.hexists(key.locked, job_id) != 0
    end

    def argv
      [job_id, config.pttl, config.type, config.limit, lock_score]
    end

    def lock_score
      item[AT].to_s
    end

    def lock_info
      @lock_info ||= dump_json(
        WORKER => item[CLASS],
        QUEUE => item[QUEUE],
        LIMIT => item[LOCK_LIMIT],
        TIMEOUT => item[LOCK_TIMEOUT],
        TTL => item[LOCK_TTL],
        TYPE => config.type,
        LOCK_ARGS => item[LOCK_ARGS],
        TIME => now_f,
        AT => item[AT],
      )
    end

    def redis_version
      @redis_version ||= SidekiqUniqueJobs.config.redis_version
    end
  end
end