karafka/karafka

View on GitHub
lib/karafka/pro/processing/strategies/dlq/default.rb

Summary

Maintainability
A
2 hrs
Test Coverage
# frozen_string_literal: true

# This Karafka component is a Pro component under a commercial license.
# This Karafka component is NOT licensed under LGPL.
#
# All of the commercial components are present in the lib/karafka/pro directory of this
# repository and their usage requires commercial license agreement.
#
# Karafka has also commercial-friendly license, commercial support and commercial components.
#
# By sending a pull request to the pro components, you are agreeing to transfer the copyright of
# your code to Maciej Mensfeld.

module Karafka
  module Pro
    module Processing
      module Strategies
        # Namespace for all the strategies starting with DLQ
        module Dlq
          # Only dead letter queue enabled
          module Default
            include Strategies::Default

            # Features for this strategy
            FEATURES = %i[
              dead_letter_queue
            ].freeze

            # Override of the standard `#mark_as_consumed` in order to handle the pause tracker
            # reset in case DLQ is marked as fully independent. When DLQ is marked independent,
            # any offset marking causes the pause count tracker to reset. This is useful when
            # the error is not due to the collective batch operations state but due to intermediate
            # "crawling" errors that move with it
            #
            # @see `Strategies::Default#mark_as_consumed` for more details
            # @param message [Messages::Message]
            # @param offset_metadata [String, nil]
            def mark_as_consumed(message, offset_metadata = @_current_offset_metadata)
              return super unless retrying?
              return super unless topic.dead_letter_queue.independent?
              return false unless super

              coordinator.pause_tracker.reset

              true
            ensure
              @_current_offset_metadata = nil
            end

            # Override of the standard `#mark_as_consumed!`. Resets the pause tracker count in case
            # DLQ was configured with the `independent` flag.
            #
            # @see `Strategies::Default#mark_as_consumed!` for more details
            # @param message [Messages::Message]
            # @param offset_metadata [String, nil]
            def mark_as_consumed!(message, offset_metadata = @_current_offset_metadata)
              return super unless retrying?
              return super unless topic.dead_letter_queue.independent?
              return false unless super

              coordinator.pause_tracker.reset

              true
            ensure
              @_current_offset_metadata = nil
            end

            # When we encounter non-recoverable message, we skip it and go on with our lives
            def handle_after_consume
              coordinator.on_finished do |last_group_message|
                return if revoked?

                if coordinator.success?
                  coordinator.pause_tracker.reset

                  return if coordinator.manual_pause?

                  mark_as_consumed(last_group_message)
                else
                  apply_dlq_flow do
                    dispatch_if_needed_and_mark_as_consumed
                  end
                end
              end
            end

            # Finds the message may want to skip (all, starting from first)
            # @private
            # @return [Array<Karafka::Messages::Message, Boolean>] message we may want to skip and
            #   information if this message was from marked offset or figured out via mom flow
            def find_skippable_message
              skippable_message = messages.find do |msg|
                coordinator.marked? && msg.offset == coordinator.seek_offset
              end

              # If we don't have the message matching the last comitted offset, it means that
              # user operates with manual offsets and we're beyond the batch in which things
              # broke for the first time. Then we skip the first (as no markings) and we
              # move on one by one.
              skippable_message ? [skippable_message, true] : [messages.first, false]
            end

            # Moves the broken message into a separate queue defined via the settings
            #
            # @param skippable_message [Array<Karafka::Messages::Message>] message we want to
            #   dispatch to DLQ
            def dispatch_to_dlq(skippable_message)
              # DLQ should never try to dispatch a message that was cleaned. It message was
              # cleaned, we will not have all the needed data. If you see this error, it means
              # that your processing flow is not as expected and you have cleaned message that
              # should not be cleaned as it should go to the DLQ
              raise(Cleaner::Errors::MessageCleanedError) if skippable_message.cleaned?

              producer.public_send(
                topic.dead_letter_queue.dispatch_method,
                build_dlq_message(
                  skippable_message
                )
              )

              # Notify about dispatch on the events bus
              Karafka.monitor.instrument(
                'dead_letter_queue.dispatched',
                caller: self,
                message: skippable_message
              )
            end

            # Dispatches the message to the DLQ (when needed and when applicable based on settings)
            #   and marks this message as consumed for non MOM flows.
            #
            # If producer is transactional and config allows, uses transaction to do that
            def dispatch_if_needed_and_mark_as_consumed
              skippable_message, = find_skippable_message

              dispatch = lambda do
                dispatch_to_dlq(skippable_message) if dispatch_to_dlq?
                mark_dispatched_to_dlq(skippable_message)
              end

              if dispatch_in_a_transaction?
                transaction { dispatch.call }
              else
                dispatch.call
              end
            end

            # @param skippable_message [Array<Karafka::Messages::Message>]
            # @return [Hash] dispatch DLQ message
            def build_dlq_message(skippable_message)
              original_partition = skippable_message.partition.to_s

              dlq_message = {
                topic: topic.dead_letter_queue.topic,
                key: original_partition,
                payload: skippable_message.raw_payload,
                headers: skippable_message.headers.merge(
                  'original_topic' => topic.name,
                  'original_partition' => original_partition,
                  'original_offset' => skippable_message.offset.to_s,
                  'original_consumer_group' => topic.consumer_group.id,
                  'original_attempts' => attempt.to_s
                )
              }

              # Optional method user can define in consumer to enhance the dlq message hash with
              # some extra details if needed or to replace payload, etc
              if respond_to?(:enhance_dlq_message, true)
                enhance_dlq_message(
                  dlq_message,
                  skippable_message
                )
              end

              dlq_message
            end

            # @return [Boolean] should we dispatch the message to DLQ or not. When the dispatch
            #   topic is set to false, we will skip the dispatch, effectively ignoring the broken
            #   message without taking any action.
            def dispatch_to_dlq?
              return false unless topic.dead_letter_queue.topic
              return false unless @_dispatch_to_dlq

              true
            end

            # @return [Boolean] should we use a transaction to move the data to the DLQ.
            #   We can do it only when producer is transactional and configuration for DLQ
            #   transactional dispatches is not set to false.
            def dispatch_in_a_transaction?
              producer.transactional? && topic.dead_letter_queue.transactional?
            end

            # Runs the DLQ strategy and based on it it performs certain operations
            #
            # In case of `:skip` and `:dispatch` will run the exact flow provided in a block
            # In case of `:retry` always `#retry_after_pause` is applied
            def apply_dlq_flow
              flow = topic.dead_letter_queue.strategy.call(errors_tracker, attempt)

              case flow
              when :retry
                retry_after_pause

                return
              when :skip
                @_dispatch_to_dlq = false
              when :dispatch
                @_dispatch_to_dlq = true
              else
                raise Karafka::UnsupportedCaseError, flow
              end

              yield

              # We reset the pause to indicate we will now consider it as "ok".
              coordinator.pause_tracker.reset

              # Always backoff after DLQ dispatch even on skip to prevent overloads on errors
              pause(coordinator.seek_offset, nil, false)
            end

            # Marks message that went to DLQ (if applicable) based on the requested method
            # @param skippable_message [Karafka::Messages::Message]
            def mark_dispatched_to_dlq(skippable_message)
              case topic.dead_letter_queue.marking_method
              when :mark_as_consumed
                mark_as_consumed(skippable_message)
              when :mark_as_consumed!
                mark_as_consumed!(skippable_message)
              else
                # This should never happen. Bug if encountered. Please report
                raise Karafka::Errors::UnsupportedCaseError
              end
            end
          end
        end
      end
    end
  end
end