lib/sidekiq_unique_jobs/lock.rb
# frozen_string_literal: true
module SidekiqUniqueJobs
#
# Class Lock provides access to information about a lock
#
# @author Mikael Henriksson <mikael@mhenrixon.com>
#
class Lock # rubocop:disable Metrics/ClassLength
# includes "SidekiqUniqueJobs::Connection"
# @!parse include SidekiqUniqueJobs::Connection
include SidekiqUniqueJobs::Connection
# includes "SidekiqUniqueJobs::Timing"
# @!parse include SidekiqUniqueJobs::Timing
include SidekiqUniqueJobs::Timing
# includes "SidekiqUniqueJobs::JSON"
# @!parse include SidekiqUniqueJobs::JSON
include SidekiqUniqueJobs::JSON
#
# @!attribute [r] key
# @return [String] the entity redis key
attr_reader :key
#
# Initialize a locked lock
#
# @param [String] digest a unique digest
# @param [String] job_id a sidekiq JID
# @param [Hash] lock_info information about the lock
#
# @return [Lock] a newly lock that has been locked
#
def self.create(digest, job_id, lock_info: {}, time: Timing.now_f, score: nil)
lock = new(digest, time: time)
lock.lock(job_id, lock_info, score)
lock
end
#
# Initialize a new lock
#
# @param [String, Key] key either a digest or an instance of a {Key}
# @param [Timstamp, Float] time nil optional timestamp to initiate this lock with
#
def initialize(key, time: nil)
@key = get_key(key)
time = time.to_f unless time.is_a?(Float)
return unless time.nonzero?
@created_at = time
end
#
# Locks a job_id
#
# @note intended only for testing purposes
#
# @param [String] job_id a sidekiq JID
# @param [Hash] lock_info information about the lock
#
# @return [void]
#
def lock(job_id, lock_info = {}, score = nil)
score ||= now_f
redis do |conn|
conn.multi do |pipeline|
pipeline.set(key.digest, job_id)
pipeline.hset(key.locked, job_id, now_f)
info.set(lock_info, pipeline)
add_digest_to_set(pipeline, lock_info, score)
pipeline.zadd(key.changelog, score, changelog_json(job_id, "queue.lua", "Queued"))
pipeline.zadd(key.changelog, score, changelog_json(job_id, "lock.lua", "Locked"))
end
end
end
#
# Create the :QUEUED key
#
# @note intended only for testing purposes
#
# @param [String] job_id a sidekiq JID
#
# @return [void]
#
def queue(job_id)
redis do |conn|
conn.lpush(key.queued, job_id)
end
end
#
# Create the :PRIMED key
#
# @note intended only for testing purposes
#
# @param [String] job_id a sidekiq JID
#
# @return [void]
#
def prime(job_id)
redis do |conn|
conn.lpush(key.primed, job_id)
end
end
#
# Unlock a specific job_id
#
# @param [String] job_id a sidekiq JID
#
# @return [true] when job_id was removed
# @return [false] when job_id wasn't locked
#
def unlock(job_id)
locked.del(job_id)
end
#
# Deletes all the redis keys for this lock
#
#
# @return [Integer] the number of keys deleted in redis
#
def del
redis do |conn|
conn.multi do |pipeline|
pipeline.zrem(DIGESTS, key.digest)
pipeline.del(key.digest, key.queued, key.primed, key.locked, key.info)
end
end
end
#
# Returns either the time the lock was initialized with or
# the first changelog entry's timestamp
#
#
# @return [Float] a floaty timestamp represantation
#
def created_at
@created_at ||= changelogs.first&.[]("time")
end
#
# Returns all job_id's for this lock
#
# @note a JID can be present in 3 different places
#
#
# @return [Array<String>] an array with JIDs
#
def all_jids
(queued_jids + primed_jids + locked_jids).uniq
end
#
# Returns a collection of locked job_id's
#
# @param [true, false] with_values false provide the timestamp for the lock
#
# @return [Hash<String, Float>] when given `with_values: true`
# @return [Array<String>] when given `with_values: false`
#
def locked_jids(with_values: false)
locked.entries(with_values: with_values)
end
#
# Returns the queued JIDs
#
#
# @return [Array<String>] an array with queued job_ids
#
def queued_jids
queued.entries
end
#
# Returns the primed JIDs
#
#
# @return [Array<String>] an array with primed job_ids
#
def primed_jids
primed.entries
end
#
# Returns all matching changelog entries for this lock
#
#
# @return [Array<Hash>] an array with changelogs
#
def changelogs
changelog.entries(pattern: "*#{key.digest}*")
end
#
# The digest key
#
# @note Used for exists checks to avoid enqueuing
# the same lock twice
#
#
# @return [Redis::String] a string representation of the key
#
def digest
@digest ||= Redis::String.new(key.digest)
end
#
# The queued list
#
#
# @return [Redis::List] for queued JIDs
#
def queued
@queued ||= Redis::List.new(key.queued)
end
#
# The primed list
#
#
# @return [Redis::List] for primed JIDs
#
def primed
@primed ||= Redis::List.new(key.primed)
end
#
# The locked hash
#
#
# @return [Redis::Hash] for locked JIDs
#
def locked
@locked ||= Redis::Hash.new(key.locked)
end
#
# Information about the lock
#
#
# @return [Redis::Hash] with lock information
#
def info
@info ||= LockInfo.new(key.info)
end
#
# A sorted set with changelog entries
#
# @see Changelog for more information
#
#
# @return [Changelog]
#
def changelog
@changelog ||= Changelog.new
end
#
# A nicely formatted string with information about this lock
#
#
# @return [String]
#
def to_s
<<~MESSAGE
Lock status for #{key}
value: #{digest.value}
info: #{info.value}
queued_jids: #{queued_jids}
primed_jids: #{primed_jids}
locked_jids: #{locked_jids}
changelogs: #{changelogs}
MESSAGE
end
#
# @see to_s
#
def inspect
to_s
end
private
#
# Ensure the key is a {Key}
#
# @param [String, Key] key
#
# @return [Key]
#
def get_key(key)
if key.is_a?(SidekiqUniqueJobs::Key)
key
else
SidekiqUniqueJobs::Key.new(key)
end
end
#
# Generate a changelog entry for the given arguments
#
# @param [String] job_id a sidekiq JID
# @param [String] script the name of the script generating this entry
# @param [String] message a descriptive message for later review
#
# @return [String] a JSON string matching the Lua script structure
#
def changelog_json(job_id, script, message)
dump_json(
digest: key.digest,
job_id: job_id,
script: script,
message: message,
time: now_f,
)
end
#
# Add the digest to the correct sorted set
#
# @param [Object] pipeline a redis pipeline object for issue commands
# @param [Hash] lock_info the lock info relevant to the digest
#
# @return [nil]
#
def add_digest_to_set(pipeline, lock_info, score = nil)
score ||= now_f
digest_string = key.digest
if lock_info["lock"] == :until_expired
pipeline.zadd(key.expiring_digests, score + lock_info["ttl"], digest_string)
else
pipeline.zadd(key.digests, score, digest_string)
end
end
end
end