karafka/karafka

View on GitHub
lib/karafka/instrumentation/vendors/datadog/logger_listener.rb

Summary

Maintainability
A
1 hr
Test Coverage
# frozen_string_literal: true

module Karafka
  module Instrumentation
    # Namespace for vendor specific instrumentation
    module Vendors
      # Datadog specific instrumentation
      module Datadog
        # A karafka's logger listener for Datadog
        # It depends on the 'ddtrace' gem
        class LoggerListener
          include ::Karafka::Core::Configurable
          extend Forwardable

          def_delegators :config, :client, :service_name

          # `Datadog::Tracing` client that we should use to trace stuff
          setting :client

          # @see https://docs.datadoghq.com/tracing/trace_collection/dd_libraries/ruby
          setting :service_name, default: nil

          configure

          # Log levels that we use in this particular listener
          USED_LOG_LEVELS = %i[
            info
            error
            fatal
          ].freeze

          private_constant :USED_LOG_LEVELS

          # @param block [Proc] configuration block
          def initialize(&block)
            configure
            setup(&block) if block
          end

          # @param block [Proc] configuration block
          # @note We define this alias to be consistent with `WaterDrop#setup`
          def setup(&block)
            configure(&block)
          end

          # Prints info about the fact that a given job has started
          #
          # @param event [Karafka::Core::Monitoring::Event] event details including payload
          def on_worker_process(event)
            current_span = client.trace('karafka.consumer', service: service_name)
            push_tags

            job = event[:job]
            job_type = job.class.to_s.split('::').last
            consumer = job.executor.topic.consumer
            topic = job.executor.topic.name

            action = case job_type
                     when 'Periodic'
                       'tick'
                     when 'PeriodicNonBlocking'
                       'tick'
                     when 'Shutdown'
                       'shutdown'
                     when 'Revoked'
                       'revoked'
                     when 'RevokedNonBlocking'
                       'revoked'
                     when 'Idle'
                       'idle'
                     else
                       'consume'
                     end

            current_span.resource = "#{consumer}##{action}"
            info "[#{job.id}] #{job_type} job for #{consumer} on #{topic} started"

            pop_tags
          end

          # Prints info about the fact that a given job has finished
          #
          # @param event [Karafka::Core::Monitoring::Event] event details including payload
          def on_worker_processed(event)
            push_tags

            job = event[:job]
            time = event[:time]
            job_type = job.class.to_s.split('::').last
            consumer = job.executor.topic.consumer
            topic = job.executor.topic.name

            info "[#{job.id}] #{job_type} job for #{consumer} on #{topic} finished in #{time}ms"

            client.active_span&.finish

            pop_tags
          end

          # There are many types of errors that can occur in many places, but we provide a single
          # handler for all of them to simplify error instrumentation.
          # @param event [Karafka::Core::Monitoring::Event] event details including payload
          def on_error_occurred(event)
            push_tags

            error = event[:error]
            client.active_span&.set_error(error)

            case event[:type]
            when 'consumer.consume.error'
              error "Consumer consuming error: #{error}"
            when 'consumer.revoked.error'
              error "Consumer on revoked failed due to an error: #{error}"
            when 'consumer.before_schedule.error'
              error "Consumer before schedule failed due to an error: #{error}"
            when 'consumer.before_consume.error'
              error "Consumer before consume failed due to an error: #{error}"
            when 'consumer.after_consume.error'
              error "Consumer after consume failed due to an error: #{error}"
            when 'consumer.shutdown.error'
              error "Consumer on shutdown failed due to an error: #{error}"
            when 'consumer.tick.error'
              error "Consumer tick failed due to an error: #{error}"
            when 'worker.process.error'
              fatal "Worker processing failed due to an error: #{error}"
            when 'connection.listener.fetch_loop.error'
              error "Listener fetch loop error: #{error}"
            when 'runner.call.error'
              fatal "Runner crashed due to an error: #{error}"
            when 'app.stopping.error'
              error 'Forceful Karafka server stop'
            when 'swarm.supervisor.error'
              fatal "Swarm supervisor crashed due to an error: #{error}"
            when 'librdkafka.error'
              error "librdkafka internal error occurred: #{error}"
              # Those will only occur when retries in the client fail and when they did not stop
              # after back-offs
            when 'connection.client.poll.error'
              error "Data polling error occurred: #{error}"
            else
              pop_tags
              # This should never happen. Please contact the maintainers
              raise Errors::UnsupportedCaseError, event
            end

            pop_tags
          end

          USED_LOG_LEVELS.each do |log_level|
            define_method log_level do |*args|
              Karafka.logger.send(log_level, *args)
            end
          end

          # Pushes datadog's tags to the logger
          # This is required when tracing log lines asynchronously to correlate logs of the same
          # process together
          def push_tags
            return unless Karafka.logger.respond_to?(:push_tags)

            Karafka.logger.push_tags(client.log_correlation)
          end

          # Pops datadog's tags from the logger
          # This is required when tracing log lines asynchronously to avoid the logs of the
          # different processes to be correlated
          def pop_tags
            return unless Karafka.logger.respond_to?(:pop_tags)

            Karafka.logger.pop_tags
          end
        end
      end
    end
  end
end