lib/baw-workers/partial_payload.rb
module BawWorkers
class PartialPayloadMissingError < StandardError;
end
class InconsistentBasePayloadError < StandardError;
end
# Stores Redis string payloads in multiple parts.
# Useful for splitting apart large payloads that are stuck in, for example, Redis LISTs.
# In the common scenario of using a LIST to store job payloads for a processing system, the large invariant parts
# of payloads can be split apart and stored separately. We experience 92-96% less memory usage using this technique
# on very large payloads for large job sets (e.g. 1000 or more).
# This technique should be used when you have at least 100 payloads which are largely identical.
class PartialPayload
@communicator = BawWorkers::Config.redis_communicator
class << self
# The name of the field to embed in the variant payload. `payload_base` should suggest to the uninitiated
# that there is a base payload somewhere else that needs to be resolved.
PARTIAL_PAYLOAD_KEY = "payload_base"
REDIS_NAMESPACE = "partial_payload"
BASE_PAYLOAD_EXPIRE = 86400 * 365 # 1 year
# Store a base payload that should be merged with a partial payload.
# See the tests for clear examples.
# @param [Hash] base_payload - they base payload to store
# @param [Object] key - a unique key used to retrieve the payload
# @return [Hash] a partial payload hashed. This hash should be merged with your actual payload.
def create(base_payload, key)
key = add_namespace(key)
# set a very long expire. We don't care if these payloads exist for a very long time.
out_opts = {
expire_seconds: BASE_PAYLOAD_EXPIRE
}
success = @communicator.set(key, base_payload, out_opts)
raise 'PartialPayload creation failed' unless success
# return a hash with the absolute redis_key that can be used to retrieve the base payload
{PARTIAL_PAYLOAD_KEY.to_sym => out_opts[:key]}
end
# Store a base payload that should be merged with a partial payload, OR if it already exists, validates that the
# two payloads are identical.
# @param [Hash] base_payload - the base payload to store
# @param [Object] key - a unique key used to retrieve the payload
# @return [Hash] a partial payload hashed. This hash should be merged with your actual payload.
def create_or_validate(base_payload, key)
existing_base = get(key)
if existing_base
raise InconsistentBasePayloadError if existing_base != BawWorkers::ResqueJobId::normalise(base_payload)
# return a hash with the absolute redis_key that can be used to retrieve the base payload
{PARTIAL_PAYLOAD_KEY.to_sym => @communicator.add_namespace(add_namespace(key))}
else
create(base_payload, key)
end
end
# Reconstruct a payload by combining a partial payload with its base payload.
# resolve can reconstruct several nested payload partials via recursion. Its only limitations
# are callstack size and Redis performance.
# Payloads that are not hashes or are hashes without the partial payload key, are ignored and returned unmodified.
# @param [Hash] payload
# @return [Hash] the reconstructed payload
def resolve(payload)
return payload unless payload.is_a?(Hash)
has_string = payload.has_key?(PARTIAL_PAYLOAD_KEY)
has_symbol = payload.has_key?(PARTIAL_PAYLOAD_KEY.to_sym)
if has_string || has_symbol
# we assume the full redis key (with namespaces) has been stored
redis_key = payload.delete(has_string ? PARTIAL_PAYLOAD_KEY : PARTIAL_PAYLOAD_KEY.to_sym)
base = @communicator.get(redis_key, {no_namespace: true})
if base.nil?
raise PartialPayloadMissingError.new("Could not retrieve partial payload `#{redis_key}`")
end
# RECURSIVE - allow the base payload to have its own base payload
new_base = resolve(base)
payload = new_base.merge(payload)
end
payload
end
# Gets a base payload if it exists
# @param [String] key - they key for the base payload to retrieve
# @return [Hash] returns a Hash if found, otherwise nil
def get(key)
key = add_namespace(key)
@communicator.get(key)
end
# Delete the base payload.
# Does not cascade and delete other linked base payloads.
# @param [String] key - they key to delete
# @return [Boolean] True if the delete succeeded.
def delete(key)
key = add_namespace(key)
@communicator.delete(key)
end
# Delete the base payload.
# *Does* cascade and delete other linked base payloads.
# Throws `PartialPayloadMissingError` if a linked base payload can not be found.
# @param [String] key - they key to delete
# @return [Boolean] True if the delete succeeded.
def delete_recursive(key)
has_namespace = key.include?(':' + REDIS_NAMESPACE + ':')
key = add_namespace(key) unless has_namespace
# first get the partial payload
partial = @communicator.get(key, {no_namespace: has_namespace})
if partial.nil?
# So the stash is in an inconsistent state? what is there to do? little. At least throwing an exception means
# we will know about it.
raise PartialPayloadMissingError.new("Could not retrieve partial payload `#{key}` during recursive delete")
end
# now recurse through linked list
has_key = has_base_payload(partial)
if has_key
# RECURSIVE - allow the base payload to have its own base payload
delete_recursive(partial[has_key])
end
# finally delete
@communicator.delete(key, {no_namespace: has_namespace})
end
private
def has_base_payload(hash)
return PARTIAL_PAYLOAD_KEY if hash.has_key?(PARTIAL_PAYLOAD_KEY)
return PARTIAL_PAYLOAD_KEY.to_sym if hash.has_key?(PARTIAL_PAYLOAD_KEY.to_sym)
nil
end
def add_namespace(key)
REDIS_NAMESPACE + ':' + key
end
end
end
end