lib/karafka/pro/processing/strategies/vp/default.rb
# frozen_string_literal: true
# This Karafka component is a Pro component under a commercial license.
# This Karafka component is NOT licensed under LGPL.
#
# All of the commercial components are present in the lib/karafka/pro directory of this
# repository and their usage requires commercial license agreement.
#
# Karafka has also commercial-friendly license, commercial support and commercial components.
#
# By sending a pull request to the pro components, you are agreeing to transfer the copyright of
# your code to Maciej Mensfeld.
module Karafka
module Pro
module Processing
module Strategies
# VP starting strategies
module Vp
# Just Virtual Partitions enabled
module Default
# This flow is exactly the same as the default one because the default one is wrapper
# with `coordinator#on_finished`
include Strategies::Default
# Features for this strategy
FEATURES = %i[
virtual_partitions
].freeze
# @param message [Karafka::Messages::Message] marks message as consumed
# @param offset_metadata [String, nil]
# @note This virtual offset management uses a regular default marking API underneath.
# We do not alter the "real" marking API, as VPs are just one of many cases we want
# to support and we do not want to impact them with collective offsets management
def mark_as_consumed(message, offset_metadata = @_current_offset_metadata)
if @_in_transaction && !collapsed?
mark_in_transaction(message, offset_metadata, true)
elsif collapsed?
super
else
manager = coordinator.virtual_offset_manager
coordinator.synchronize do
manager.mark(message, offset_metadata)
# If this is last marking on a finished flow, we can use the original
# last message and in order to do so, we need to mark all previous messages as
# consumed as otherwise the computed offset could be different
# We mark until our offset just in case of a DLQ flow or similar, where we do not
# want to mark all but until the expected location
manager.mark_until(message, offset_metadata) if coordinator.finished?
return revoked? unless manager.markable?
manager.markable? ? super(*manager.markable) : revoked?
end
end
ensure
@_current_offset_metadata = nil
end
# @param message [Karafka::Messages::Message] blocking marks message as consumed
# @param offset_metadata [String, nil]
def mark_as_consumed!(message, offset_metadata = @_current_offset_metadata)
if @_in_transaction && !collapsed?
mark_in_transaction(message, offset_metadata, false)
elsif collapsed?
super
else
manager = coordinator.virtual_offset_manager
coordinator.synchronize do
manager.mark(message, offset_metadata)
manager.mark_until(message, offset_metadata) if coordinator.finished?
manager.markable? ? super(*manager.markable) : revoked?
end
end
ensure
@_current_offset_metadata = nil
end
# Stores the next offset for processing inside of the transaction when collapsed and
# accumulates marking as consumed in the local buffer.
#
# Due to nature of VPs we cannot provide full EOS support but we can simulate it,
# making sure that no offset are stored unless transaction is finished. We do it by
# accumulating the post-transaction marking requests and after it is successfully done
# we mark each as consumed. This effectively on errors "rollbacks" the state and
# prevents offset storage.
#
# Since the EOS here is "weak", we do not have to worry about the race-conditions and
# we do not have to have any mutexes.
#
# @param message [Messages::Message] message we want to commit inside of a transaction
# @param offset_metadata [String, nil] offset metadata or nil if none
# @param async [Boolean] should we mark in async or sync way (applicable only to post
# transaction state synchronization usage as within transaction it is always sync)
def mark_in_transaction(message, offset_metadata, async)
raise Errors::TransactionRequiredError unless @_in_transaction
# Prevent from attempts of offset storage when we no longer own the assignment
raise Errors::AssignmentLostError if revoked?
return super if collapsed?
@_transaction_marked << [message, offset_metadata, async]
end
# @return [Boolean] is the virtual processing collapsed in the context of given
# consumer.
def collapsed?
coordinator.collapsed?
end
# @param offset [Integer] first offset from which we should not operate in a collapsed
# mode.
# @note Keep in mind, that if a batch contains this but also messages earlier messages
# that should be collapsed, all will continue to operate in a collapsed mode until
# first full batch with only messages that should not be collapsed.
def collapse_until!(offset)
coordinator.collapse_until!(offset)
end
# @return [Boolean] true if any of virtual partition we're operating in the entangled
# mode has already failed and we know we are failing collectively.
# Useful for early stop to minimize number of things processed twice.
#
# @note We've named it `#failing?` instead of `#failure?` because it aims to be used
# from within virtual partitions where we want to have notion of collective failing
# not just "local" to our processing. We "are" failing with other virtual partitions
# raising an error, but locally we are still processing.
def failing?
coordinator.failure?
end
# Allows for cross-virtual-partition consumers locks
#
# This is not needed in the non-VP flows except LRJ because there is always only one
# consumer per partition at the same time, so no coordination is needed directly for
# the end users. With LRJ it is needed and provided in the `LRJ::Default` strategy,
# because lifecycle events on revocation can run in parallel to the LRJ job as it is
# non-blocking.
#
# @param block [Proc] block we want to run in a mutex to prevent race-conditions
def synchronize(&block)
coordinator.shared_mutex.synchronize(&block)
end
private
# Prior to adding work to the queue, registers all the messages offsets into the
# virtual offset group.
#
# @note This can be done without the mutex, because it happens from the same thread
# for all the work (listener thread)
def handle_before_schedule_consume
super
# We should not register offsets in virtual manager when in collapse as virtual
# manager is not used then for offsets materialization.
#
# If we would do so, it would cause increased storage in cases of endless errors
# that are being retried in collapse without a DLQ.
return if collapsed?
coordinator.virtual_offset_manager.register(
messages.map(&:offset)
)
end
end
end
end
end
end
end