karafka/karafka

View on GitHub
lib/karafka/instrumentation/vendors/appsignal/metrics_listener.rb

Summary

Maintainability
C
7 hrs
Test Coverage
# frozen_string_literal: true

require_relative 'base'

module Karafka
  module Instrumentation
    module Vendors
      # Namespace for Appsignal instrumentation
      module Appsignal
        # Listener that ships metrics to Appsignal
        class MetricsListener < Base
          def_delegators :config, :client, :rd_kafka_metrics, :namespace

          # Value object for storing a single rdkafka metric publishing details
          RdKafkaMetric = Struct.new(:type, :scope, :name, :key_location)

          setting :namespace, default: 'karafka'

          setting :client, default: Client.new

          setting :rd_kafka_metrics, default: [
            # Broker metrics
            RdKafkaMetric.new(:count, :brokers, 'requests_retries', 'txretries_d'),
            RdKafkaMetric.new(:count, :brokers, 'transmission_errors', 'txerrs_d'),
            RdKafkaMetric.new(:count, :brokers, 'receive_errors', 'rxerrs_d'),
            RdKafkaMetric.new(:count, :brokers, 'connection_connects', 'connects_d'),
            RdKafkaMetric.new(:count, :brokers, 'connection_disconnects', 'disconnects_d'),
            RdKafkaMetric.new(:gauge, :brokers, 'network_latency_avg', %w[rtt avg]),
            RdKafkaMetric.new(:gauge, :brokers, 'network_latency_p95', %w[rtt p95]),
            RdKafkaMetric.new(:gauge, :brokers, 'network_latency_p99', %w[rtt p99]),

            # Topics partitions metrics
            RdKafkaMetric.new(:gauge, :topics, 'consumer_lag', 'consumer_lag_stored'),
            RdKafkaMetric.new(:gauge, :topics, 'consumer_lag_delta', 'consumer_lag_stored_d')
          ].freeze

          # Metrics that sum values on topics levels and not on partition levels
          setting :aggregated_rd_kafka_metrics, default: [
            # Topic aggregated metrics
            RdKafkaMetric.new(:gauge, :topics, 'consumer_aggregated_lag', 'consumer_lag_stored')
          ].freeze

          configure

          # Types of errors originating from user code in the consumer flow
          USER_CONSUMER_ERROR_TYPES = %w[
            consumer.consume.error
            consumer.revoked.error
            consumer.shutdown.error
            consumer.tick.error
          ].freeze

          private_constant :USER_CONSUMER_ERROR_TYPES

          # Before each consumption process, lets start a transaction associated with it
          # We also set some basic metadata about the given consumption that can be useful for
          # debugging
          #
          # @param event [Karafka::Core::Monitoring::Event]
          def on_consumer_consume(event)
            consumer = event.payload[:caller]

            start_transaction(consumer, 'consume')

            client.metadata = {
              batch_size: consumer.messages.size,
              first_offset: consumer.messages.metadata.first_offset,
              last_offset: consumer.messages.metadata.last_offset,
              consumer_group: consumer.topic.consumer_group.id,
              topic: consumer.topic.name,
              partition: consumer.partition,
              attempt: consumer.coordinator.pause_tracker.attempt
            }
          end

          # Once we're done with consumption, we bump counters about that
          #
          # @param event [Karafka::Core::Monitoring::Event]
          def on_consumer_consumed(event)
            consumer = event.payload[:caller]
            messages = consumer.messages
            metadata = messages.metadata

            with_multiple_resolutions(consumer) do |tags|
              count('consumer_messages', messages.size, tags)
              count('consumer_batches', 1, tags)
              gauge('consumer_offsets', metadata.last_offset, tags)
            end

            stop_transaction
          end

          # Register minute based probe only on app running. Otherwise if we would always register
          # minute probe, it would report on processes using Karafka but not running the
          # consumption process
          #
          # @param _event [Karafka::Core::Monitoring::Event]
          def on_app_running(_event)
            return if @probe_registered

            @probe_registered = true

            # Registers the minutely probe for one-every-minute metrics
            client.register_probe(:karafka, -> { minute_probe })
          end

          [
            %i[revoke revoked revoked],
            %i[shutting_down shutdown shutdown],
            %i[tick ticked tick]
          ].each do |before, after, name|
            class_eval <<~RUBY, __FILE__, __LINE__ + 1
              # Keeps track of user code execution
              #
              # @param event [Karafka::Core::Monitoring::Event]
              def on_consumer_#{before}(event)
                consumer = event.payload[:caller]
                start_transaction(consumer, '#{name}')
              end

              # Finishes the transaction
              #
              # @param _event [Karafka::Core::Monitoring::Event]
              def on_consumer_#{after}(_event)
                stop_transaction
              end
            RUBY
          end

          # Counts DLQ dispatches
          #
          # @param event [Karafka::Core::Monitoring::Event]
          def on_dead_letter_queue_dispatched(event)
            consumer = event.payload[:caller]

            with_multiple_resolutions(consumer) do |tags|
              count('consumer_dead', 1, tags)
            end
          end

          # Reports on **any** error that occurs. This also includes non-user related errors
          # originating from the framework.
          #
          # @param event [Karafka::Core::Monitoring::Event] error event details
          def on_error_occurred(event)
            # If this is a user consumption related error, we bump the counters for metrics
            if USER_CONSUMER_ERROR_TYPES.include?(event[:type])
              consumer = event.payload[:caller]

              with_multiple_resolutions(consumer) do |tags|
                count('consumer_errors', 1, tags)
              end
            end

            stop_transaction
          end

          # Hooks up to Karafka instrumentation for emitted statistics
          #
          # @param event [Karafka::Core::Monitoring::Event]
          def on_statistics_emitted(event)
            statistics = event[:statistics]
            consumer_group_id = event[:consumer_group_id]

            rd_kafka_metrics.each do |metric|
              report_metric(metric, statistics, consumer_group_id)
            end

            report_aggregated_topics_metrics(statistics, consumer_group_id)
          end

          # Reports a given metric statistics to Appsignal
          # @param metric [RdKafkaMetric] metric value object
          # @param statistics [Hash] hash with all the statistics emitted
          # @param consumer_group_id [String] cg in context which we operate
          def report_metric(metric, statistics, consumer_group_id)
            case metric.scope
            when :root
              # Do nothing on the root metrics as the same metrics are reported in a granular
              # way from other places
              nil
            when :brokers
              statistics.fetch('brokers').each_value do |broker_statistics|
                # Skip bootstrap nodes
                # Bootstrap nodes have nodeid -1, other nodes have positive
                # node ids
                next if broker_statistics['nodeid'] == -1

                public_send(
                  metric.type,
                  metric.name,
                  broker_statistics.dig(*metric.key_location),
                  {
                    broker: broker_statistics['nodename']
                  }
                )
              end
            when :topics
              statistics.fetch('topics').each do |topic_name, topic_values|
                topic_values['partitions'].each do |partition_name, partition_statistics|
                  next if partition_name == '-1'
                  # Skip until lag info is available
                  next if partition_statistics['consumer_lag'] == -1
                  next if partition_statistics['consumer_lag_stored'] == -1

                  # Skip if we do not own the fetch assignment
                  next if partition_statistics['fetch_state'] == 'stopped'
                  next if partition_statistics['fetch_state'] == 'none'

                  public_send(
                    metric.type,
                    metric.name,
                    partition_statistics.dig(*metric.key_location),
                    {
                      consumer_group: consumer_group_id,
                      topic: topic_name,
                      partition: partition_name
                    }
                  )
                end
              end
            else
              raise ArgumentError, metric.scope
            end
          end

          # Publishes aggregated topic-level metrics that are sum of per partition metrics
          #
          # @param statistics [Hash] hash with all the statistics emitted
          # @param consumer_group_id [String] cg in context which we operate
          def report_aggregated_topics_metrics(statistics, consumer_group_id)
            config.aggregated_rd_kafka_metrics.each do |metric|
              statistics.fetch('topics').each do |topic_name, topic_values|
                sum = 0

                topic_values['partitions'].each do |partition_name, partition_statistics|
                  next if partition_name == '-1'
                  # Skip until lag info is available
                  next if partition_statistics['consumer_lag'] == -1
                  next if partition_statistics['consumer_lag_stored'] == -1

                  sum += partition_statistics.dig(*metric.key_location)
                end

                public_send(
                  metric.type,
                  metric.name,
                  sum,
                  {
                    consumer_group: consumer_group_id,
                    topic: topic_name
                  }
                )
              end
            end
          end

          # Increments a counter with a namespace key, value and tags
          #
          # @param key [String] key we want to use (without the namespace)
          # @param value [Integer] count value
          # @param tags [Hash] additional extra tags
          def count(key, value, tags)
            client.count(
              namespaced_metric(key),
              value,
              tags
            )
          end

          # Sets the gauge value
          #
          # @param key [String] key we want to use (without the namespace)
          # @param value [Integer] gauge value
          # @param tags [Hash] additional extra tags
          def gauge(key, value, tags)
            client.gauge(
              namespaced_metric(key),
              value,
              tags
            )
          end

          private

          # Wraps metric name in listener's namespace
          # @param metric_name [String] RdKafkaMetric name
          # @return [String]
          def namespaced_metric(metric_name)
            "#{namespace}_#{metric_name}"
          end

          # Starts the transaction for monitoring user code
          #
          # @param consumer [Karafka::BaseConsumer] karafka consumer instance
          # @param action_name [String] lifecycle user method name
          def start_transaction(consumer, action_name)
            client.start_transaction(
              "#{consumer.class}##{action_name}"
            )
          end

          # Stops the transaction wrapping user code
          def stop_transaction
            client.stop_transaction
          end

          # @param consumer [Karafka::BaseConsumer] Karafka consumer instance
          def with_multiple_resolutions(consumer)
            topic_name = consumer.topic.name
            consumer_group_id = consumer.topic.consumer_group.id
            partition = consumer.partition

            tags = {
              consumer_group: consumer_group_id,
              topic: topic_name
            }

            yield(tags)
            yield(tags.merge(partition: partition))
          end

          # Sends minute based probing metrics
          def minute_probe
            concurrency = Karafka::App.config.concurrency

            count('processes_count', 1, {})
            count('threads_count', concurrency, {})
          end
        end
      end
    end
  end
end