lib/karafka/processing/strategies/dlq.rb
# frozen_string_literal: true
module Karafka
module Processing
module Strategies
# When using dead letter queue, processing won't stop after defined number of retries
# upon encountering non-critical errors but the messages that error will be moved to a
# separate topic with their payload and metadata, so they can be handled differently.
module Dlq
include Default
# Apply strategy when only dead letter queue is turned on
FEATURES = %i[
dead_letter_queue
].freeze
# Override of the standard `#mark_as_consumed` in order to handle the pause tracker
# reset in case DLQ is marked as fully independent. When DLQ is marked independent,
# any offset marking causes the pause count tracker to reset. This is useful when
# the error is not due to the collective batch operations state but due to intermediate
# "crawling" errors that move with it
#
# @see `Strategies::Default#mark_as_consumed` for more details
# @param message [Messages::Message]
def mark_as_consumed(message)
# If we are not retrying pause count is already 0, no need to try to reset the state
return super unless retrying?
# If we do not use independent marking on DLQ, we just mark as consumed
return super unless topic.dead_letter_queue.independent?
# If we were not able to mark no need to reset
return false unless super
coordinator.pause_tracker.reset
true
end
# Override of the standard `#mark_as_consumed!`. Resets the pause tracker count in case
# DLQ was configured with the `independent` flag.
#
# @see `Strategies::Default#mark_as_consumed!` for more details
# @param message [Messages::Message]
def mark_as_consumed!(message)
return super unless retrying?
return super unless topic.dead_letter_queue.independent?
return false unless super
coordinator.pause_tracker.reset
true
end
# When manual offset management is on, we do not mark anything as consumed automatically
# and we rely on the user to figure things out
def handle_after_consume
return if revoked?
if coordinator.success?
coordinator.pause_tracker.reset
return if coordinator.manual_pause?
mark_as_consumed(messages.last)
elsif coordinator.pause_tracker.attempt <= topic.dead_letter_queue.max_retries
retry_after_pause
# If we've reached number of retries that we could, we need to skip the first message
# that was not marked as consumed, pause and continue, while also moving this message
# to the dead topic
else
# We reset the pause to indicate we will now consider it as "ok".
coordinator.pause_tracker.reset
skippable_message, = find_skippable_message
# Send skippable message to the dql topic
dispatch_to_dlq(skippable_message)
# We mark the broken message as consumed and move on
mark_dispatched_to_dlq(skippable_message)
return if revoked?
# We pause to backoff once just in case.
pause(coordinator.seek_offset, nil, false)
end
end
# Finds the message may want to skip (all, starting from first)
# @private
# @return [Array<Karafka::Messages::Message, Boolean>] message we may want to skip and
# information if this message was from marked offset or figured out via mom flow
def find_skippable_message
skippable_message = messages.find do |msg|
coordinator.marked? && msg.offset == coordinator.seek_offset
end
# If we don't have the message matching the last comitted offset, it means that
# user operates with manual offsets and we're beyond the batch in which things
# broke for the first time. Then we skip the first (as no markings) and we
# move on one by one.
skippable_message ? [skippable_message, true] : [messages.first, false]
end
# Moves the broken message into a separate queue defined via the settings
# @private
# @param skippable_message [Karafka::Messages::Message] message we are skipping that also
# should go to the dlq topic
def dispatch_to_dlq(skippable_message)
producer.public_send(
topic.dead_letter_queue.dispatch_method,
topic: topic.dead_letter_queue.topic,
payload: skippable_message.raw_payload
)
# Notify about dispatch on the events bus
Karafka.monitor.instrument(
'dead_letter_queue.dispatched',
caller: self,
message: skippable_message
)
end
# Marks message that went to DLQ (if applicable) based on the requested method
# @param skippable_message [Karafka::Messages::Message]
def mark_dispatched_to_dlq(skippable_message)
case topic.dead_letter_queue.marking_method
when :mark_as_consumed
mark_as_consumed(skippable_message)
when :mark_as_consumed!
mark_as_consumed!(skippable_message)
else
# This should never happen. Bug if encountered. Please report
raise Karafka::Errors::UnsupportedCaseError
end
end
end
end
end
end