karafka/karafka

View on GitHub
lib/karafka/processing/strategies/dlq.rb

Summary

Maintainability
A
1 hr
Test Coverage
# frozen_string_literal: true

module Karafka
  module Processing
    module Strategies
      # When using dead letter queue, processing won't stop after defined number of retries
      # upon encountering non-critical errors but the messages that error will be moved to a
      # separate topic with their payload and metadata, so they can be handled differently.
      module Dlq
        include Default

        # Apply strategy when only dead letter queue is turned on
        FEATURES = %i[
          dead_letter_queue
        ].freeze

        # Override of the standard `#mark_as_consumed` in order to handle the pause tracker
        # reset in case DLQ is marked as fully independent. When DLQ is marked independent,
        # any offset marking causes the pause count tracker to reset. This is useful when
        # the error is not due to the collective batch operations state but due to intermediate
        # "crawling" errors that move with it
        #
        # @see `Strategies::Default#mark_as_consumed` for more details
        # @param message [Messages::Message]
        def mark_as_consumed(message)
          # If we are not retrying pause count is already 0, no need to try to reset the state
          return super unless retrying?
          # If we do not use independent marking on DLQ, we just mark as consumed
          return super unless topic.dead_letter_queue.independent?
          # If we were not able to mark no need to reset
          return false unless super

          coordinator.pause_tracker.reset

          true
        end

        # Override of the standard `#mark_as_consumed!`. Resets the pause tracker count in case
        # DLQ was configured with the `independent` flag.
        #
        # @see `Strategies::Default#mark_as_consumed!` for more details
        # @param message [Messages::Message]
        def mark_as_consumed!(message)
          return super unless retrying?
          return super unless topic.dead_letter_queue.independent?
          return false unless super

          coordinator.pause_tracker.reset

          true
        end

        # When manual offset management is on, we do not mark anything as consumed automatically
        # and we rely on the user to figure things out
        def handle_after_consume
          return if revoked?

          if coordinator.success?
            coordinator.pause_tracker.reset

            return if coordinator.manual_pause?

            mark_as_consumed(messages.last)
          elsif coordinator.pause_tracker.attempt <= topic.dead_letter_queue.max_retries
            retry_after_pause
          # If we've reached number of retries that we could, we need to skip the first message
          # that was not marked as consumed, pause and continue, while also moving this message
          # to the dead topic
          else
            # We reset the pause to indicate we will now consider it as "ok".
            coordinator.pause_tracker.reset

            skippable_message, = find_skippable_message

            # Send skippable message to the dql topic
            dispatch_to_dlq(skippable_message)

            # We mark the broken message as consumed and move on
            mark_dispatched_to_dlq(skippable_message)

            return if revoked?

            # We pause to backoff once just in case.
            pause(coordinator.seek_offset, nil, false)
          end
        end

        # Finds the message may want to skip (all, starting from first)
        # @private
        # @return [Array<Karafka::Messages::Message, Boolean>] message we may want to skip and
        #   information if this message was from marked offset or figured out via mom flow
        def find_skippable_message
          skippable_message = messages.find do |msg|
            coordinator.marked? && msg.offset == coordinator.seek_offset
          end

          # If we don't have the message matching the last comitted offset, it means that
          # user operates with manual offsets and we're beyond the batch in which things
          # broke for the first time. Then we skip the first (as no markings) and we
          # move on one by one.
          skippable_message ? [skippable_message, true] : [messages.first, false]
        end

        # Moves the broken message into a separate queue defined via the settings
        # @private
        # @param skippable_message [Karafka::Messages::Message] message we are skipping that also
        #   should go to the dlq topic
        def dispatch_to_dlq(skippable_message)
          producer.public_send(
            topic.dead_letter_queue.dispatch_method,
            topic: topic.dead_letter_queue.topic,
            payload: skippable_message.raw_payload
          )

          # Notify about dispatch on the events bus
          Karafka.monitor.instrument(
            'dead_letter_queue.dispatched',
            caller: self,
            message: skippable_message
          )
        end

        # Marks message that went to DLQ (if applicable) based on the requested method
        # @param skippable_message [Karafka::Messages::Message]
        def mark_dispatched_to_dlq(skippable_message)
          case topic.dead_letter_queue.marking_method
          when :mark_as_consumed
            mark_as_consumed(skippable_message)
          when :mark_as_consumed!
            mark_as_consumed!(skippable_message)
          else
            # This should never happen. Bug if encountered. Please report
            raise Karafka::Errors::UnsupportedCaseError
          end
        end
      end
    end
  end
end