mhenrixon/sidekiq-unique-jobs

View on GitHub
lib/sidekiq_unique_jobs/lua/lock.lua

Summary

Maintainability
Test Coverage
-------- BEGIN keys ---------
local digest           = KEYS[1]
local queued           = KEYS[2]
local primed           = KEYS[3]
local locked           = KEYS[4]
local info             = KEYS[5]
local changelog        = KEYS[6]
local digests          = KEYS[7]
local expiring_digests = KEYS[8]
-------- END keys ---------


-------- BEGIN lock arguments ---------
local job_id       = ARGV[1]
local pttl         = tonumber(ARGV[2])
local lock_type    = ARGV[3]
local limit        = tonumber(ARGV[4])
local lock_score   = ARGV[5]
-------- END lock arguments -----------


--------  BEGIN injected arguments --------
local current_time = tonumber(ARGV[6])
local debug_lua    = tostring(ARGV[7]) == "1"
local max_history  = tonumber(ARGV[8])
local script_name  = tostring(ARGV[9]) .. ".lua"
local redisversion = ARGV[10]
---------  END injected arguments ---------


--------  BEGIN local functions --------
<%= include_partial "shared/_common.lua" %>
----------  END local functions ----------


---------  BEGIN lock.lua ---------
log_debug("BEGIN lock digest:", digest, "job_id:", job_id)

if redis.call("HEXISTS", locked, job_id) == 1 then
  log_debug(locked, "already locked with job_id:", job_id)
  log("Duplicate")

  log_debug("LREM", queued, -1, job_id)
  redis.call("LREM", queued, -1, job_id)

  log_debug("LREM", primed, 1, job_id)
  redis.call("LREM", primed, 1, job_id)

  return job_id
end

local locked_count   = redis.call("HLEN", locked)
local within_limit   = limit > locked_count
local limit_exceeded = not within_limit

if limit_exceeded then
  log_debug("Limit exceeded:", digest, "(",  locked_count, "of", limit, ")")
  log("Limited")
  return nil
end

if lock_type == "until_expired" and pttl and pttl > 0 then
  log_debug("ZADD", expiring_digests, current_time + pttl, digest)
  redis.call("ZADD", expiring_digests, current_time + pttl, digest)
else
  local score

  if #lock_score == 0 then
    score = current_time
  else
    score = lock_score
  end

  log_debug("ZADD", digests, score, digest)
  redis.call("ZADD", digests, score, digest)
end

log_debug("HSET", locked, job_id, current_time)
redis.call("HSET", locked, job_id, current_time)

log_debug("LREM", queued, -1, job_id)
redis.call("LREM", queued, -1, job_id)

log_debug("LREM", primed, 1, job_id)
redis.call("LREM", primed, 1, job_id)

-- The Sidekiq client sets pttl
if pttl and pttl > 0 then
  log_debug("PEXPIRE", digest, pttl)
  redis.call("PEXPIRE", digest, pttl)

  log_debug("PEXPIRE", locked, pttl)
  redis.call("PEXPIRE", locked, pttl)

  log_debug("PEXPIRE", info, pttl)
  redis.call("PEXPIRE", info, pttl)
end

log_debug("PEXPIRE", queued, 1000)
redis.call("PEXPIRE", queued, 1000)

log_debug("PEXPIRE", primed, 1000)
redis.call("PEXPIRE", primed, 1000)

log("Locked")
log_debug("END lock digest:", digest, "job_id:", job_id)
return job_id
----------  END lock.lua  ----------