routemaster/models/batch.rb
require 'routemaster/models'
require 'routemaster/models/message'
require 'routemaster/models/queue'
require 'routemaster/models/subscriber'
require 'routemaster/mixins/redis'
require 'routemaster/mixins/assert'
require 'routemaster/mixins/log'
require 'routemaster/mixins/counters'
require 'routemaster/services/codec'
module Routemaster
module Models
# Abstracts an ordered list of Message
class Batch
include Mixins::Redis
include Mixins::Assert
include Mixins::Log
include Mixins::Counters
Inconsistency = Class.new(RuntimeError)
# number of prefix metdata items in a batch list
PREFIX_COUNT = 3
attr_reader :uid, :deadline
def initialize(uid:, deadline: nil, subscriber: nil, length: nil)
@uid = uid
@deadline = deadline
@subscriber = subscriber
@_length = length
end
def subscriber
@subscriber ||= begin
return if subscriber_name.nil?
Subscriber.find(subscriber_name)
end
end
def subscriber_name
@_subscriber_name ||= _redis.lindex(_batch_key, 0)
end
# Return the number of events in the batch (memoised)
def length
@_length ||= begin
raw = _redis.llen(_batch_key)
return 0 if raw.nil? || raw.zero?
raise Inconsistency, @uid if raw < PREFIX_COUNT
raw - PREFIX_COUNT
end
end
# Does this batch still exist?
def exists?
_redis.exists(_batch_key)
end
# Is this batch deliverable?
def valid?
subscriber && data&.any?
end
# Has the batch reached capacity?
def full?
length && subscriber && length >= subscriber.max_events
end
# Is this the batch currently being filled for its subscriber?
def current?
_redis.sismember(_batch_ref_key, @uid)
end
# Counts the number of times delivery was attempted
def attempts
@_attempts ||= _redis.lindex(_batch_key, 2).to_i
end
def created_at
@_created_at ||= _redis.lindex(_batch_key, 1).to_i
end
# Load batch data and increment the attempt counter
def load_and_count
list = _redis_lua_run(
'batch_load_and_count',
keys: [_batch_key])
return unless list && list.length >= PREFIX_COUNT
@_subscriber_name = list[0]
@_created_at = list[1].to_i
@_attempts = list[2].to_i
@_data = list[3..-1]
self
end
# Returns the list of (serialised) payloads in the batch.
# Memoised.
#
# It is not an error if the batch no longer exists.
def data
@_data ||= _redis.lrange(_batch_key, PREFIX_COUNT, -1)
end
# Return a new instance without memoised state
def reload
self.class.new(uid: @uid)
end
def ==(other)
other.kind_of?(Batch) && other.uid == @uid
end
# Transitions a batch from current.
# A non-current batch will no have data added to it.
#
# It is not an error if the batch is not current, or no longer exists.
def promote
_redis_lua_run(
'batch_promote',
keys: [_batch_ref_key],
argv: [@uid])
self
end
# Removes all references to the batch (it's been delivered, or autodropped)
def delete
count = _redis_lua_run(
'batch_delete',
keys: [_batch_key, _index_key, _batch_ref_key, _batch_gauge_key, _event_gauge_key],
argv: [@uid, PREFIX_COUNT, subscriber_name])
_counters.incr('events.removed', queue: subscriber_name, count: count)
self
end
module ClassMethods
include Mixins::Counters
# Add the data to the subscriber's current batch. A new batch will be
# created as needed. The batch will be promoted if it's full.
def ingest(data:, timestamp:, subscriber:)
batch_ref_key = _batch_ref_key(subscriber.name)
now = Routemaster.now
deadline = timestamp + subscriber.timeout
# Ingestion might create a new batch if there is no current batch (pointed to by
# `batch_ref_key`) changes between the SRANDMEMBER and the EVAL.
# To this effect, we provide an alternate batch UID to be used if
# creating a batch.
uid = _redis.srandmember(batch_ref_key)
alt_uid = _generate_uid
yield if block_given? # this is used in tests only, to inject behaviour to simulate concurrency
actual_uid, length =_redis_lua_run(
'batch_ingest',
keys: [batch_ref_key, _batch_key(uid), _batch_key(alt_uid), _index_key, _batch_gauge_key, _event_gauge_key],
argv: [uid, alt_uid, data, subscriber.name, PREFIX_COUNT, subscriber.max_events, now])
_counters.incr('events.added', queue: subscriber.name)
new(subscriber: subscriber, uid: actual_uid, deadline: deadline, length: length)
end
def gauges
{
batches: _redis.hgetall(_batch_gauge_key).map_values(&:to_i).tap { |h| h.default = 0 },
events: _redis.hgetall(_event_gauge_key).map_values(&:to_i).tap { |h| h.default = 0 },
}
end
def all
Iterator.new
end
private
def _generate_uid
SecureRandom.urlsafe_base64(15)
end
def _batch_ref_key(name)
name ? "batches:current:#{name}" : nil
end
def _event_gauge_key
'batches:gauges:event'
end
def _batch_gauge_key
'batches:gauges:batch'
end
def _index_key
'batches:index'
end
def _batch_key(uid)
uid ? "batches:#{uid}" : nil
end
end
extend ClassMethods
private
def _batch_ref_key
self.class.send(:_batch_ref_key, subscriber_name)
end
def _batch_key
self.class.send(:_batch_key, @uid)
end
def _class_method_delegate(*args)
self.class.send(__callee__, *args)
end
alias_method :_index_key, :_class_method_delegate
alias_method :_event_gauge_key, :_class_method_delegate
alias_method :_batch_gauge_key, :_class_method_delegate
class Iterator
include Enumerable
include Mixins::Redis
def initialize(batch_size: 100)
@batch_size = batch_size
end
# Yields all know batches, in creation order.
def each
_redis.zscan_each(Batch.send(:_index_key), count: @batch_size) do |uid, score|
yield Batch.new(uid: uid, deadline: score)
end
end
end
end
end
end