lib/karafka/instrumentation/vendors/appsignal/metrics_listener.rb
# frozen_string_literal: true
require_relative 'base'
module Karafka
module Instrumentation
module Vendors
# Namespace for Appsignal instrumentation
module Appsignal
# Listener that ships metrics to Appsignal
class MetricsListener < Base
def_delegators :config, :client, :rd_kafka_metrics, :namespace
# Value object for storing a single rdkafka metric publishing details
RdKafkaMetric = Struct.new(:type, :scope, :name, :key_location)
setting :namespace, default: 'karafka'
setting :client, default: Client.new
setting :rd_kafka_metrics, default: [
# Broker metrics
RdKafkaMetric.new(:count, :brokers, 'requests_retries', 'txretries_d'),
RdKafkaMetric.new(:count, :brokers, 'transmission_errors', 'txerrs_d'),
RdKafkaMetric.new(:count, :brokers, 'receive_errors', 'rxerrs_d'),
RdKafkaMetric.new(:count, :brokers, 'connection_connects', 'connects_d'),
RdKafkaMetric.new(:count, :brokers, 'connection_disconnects', 'disconnects_d'),
RdKafkaMetric.new(:gauge, :brokers, 'network_latency_avg', %w[rtt avg]),
RdKafkaMetric.new(:gauge, :brokers, 'network_latency_p95', %w[rtt p95]),
RdKafkaMetric.new(:gauge, :brokers, 'network_latency_p99', %w[rtt p99]),
# Topics partitions metrics
RdKafkaMetric.new(:gauge, :topics, 'consumer_lag', 'consumer_lag_stored'),
RdKafkaMetric.new(:gauge, :topics, 'consumer_lag_delta', 'consumer_lag_stored_d')
].freeze
# Metrics that sum values on topics levels and not on partition levels
setting :aggregated_rd_kafka_metrics, default: [
# Topic aggregated metrics
RdKafkaMetric.new(:gauge, :topics, 'consumer_aggregated_lag', 'consumer_lag_stored')
].freeze
configure
# Types of errors originating from user code in the consumer flow
USER_CONSUMER_ERROR_TYPES = %w[
consumer.consume.error
consumer.revoked.error
consumer.shutdown.error
consumer.tick.error
].freeze
private_constant :USER_CONSUMER_ERROR_TYPES
# Before each consumption process, lets start a transaction associated with it
# We also set some basic metadata about the given consumption that can be useful for
# debugging
#
# @param event [Karafka::Core::Monitoring::Event]
def on_consumer_consume(event)
consumer = event.payload[:caller]
start_transaction(consumer, 'consume')
client.metadata = {
batch_size: consumer.messages.size,
first_offset: consumer.messages.metadata.first_offset,
last_offset: consumer.messages.metadata.last_offset,
consumer_group: consumer.topic.consumer_group.id,
topic: consumer.topic.name,
partition: consumer.partition,
attempt: consumer.coordinator.pause_tracker.attempt
}
end
# Once we're done with consumption, we bump counters about that
#
# @param event [Karafka::Core::Monitoring::Event]
def on_consumer_consumed(event)
consumer = event.payload[:caller]
messages = consumer.messages
metadata = messages.metadata
with_multiple_resolutions(consumer) do |tags|
count('consumer_messages', messages.size, tags)
count('consumer_batches', 1, tags)
gauge('consumer_offsets', metadata.last_offset, tags)
end
stop_transaction
end
# Register minute based probe only on app running. Otherwise if we would always register
# minute probe, it would report on processes using Karafka but not running the
# consumption process
#
# @param _event [Karafka::Core::Monitoring::Event]
def on_app_running(_event)
return if @probe_registered
@probe_registered = true
# Registers the minutely probe for one-every-minute metrics
client.register_probe(:karafka, -> { minute_probe })
end
[
%i[revoke revoked revoked],
%i[shutting_down shutdown shutdown],
%i[tick ticked tick]
].each do |before, after, name|
class_eval <<~RUBY, __FILE__, __LINE__ + 1
# Keeps track of user code execution
#
# @param event [Karafka::Core::Monitoring::Event]
def on_consumer_#{before}(event)
consumer = event.payload[:caller]
start_transaction(consumer, '#{name}')
end
# Finishes the transaction
#
# @param _event [Karafka::Core::Monitoring::Event]
def on_consumer_#{after}(_event)
stop_transaction
end
RUBY
end
# Counts DLQ dispatches
#
# @param event [Karafka::Core::Monitoring::Event]
def on_dead_letter_queue_dispatched(event)
consumer = event.payload[:caller]
with_multiple_resolutions(consumer) do |tags|
count('consumer_dead', 1, tags)
end
end
# Reports on **any** error that occurs. This also includes non-user related errors
# originating from the framework.
#
# @param event [Karafka::Core::Monitoring::Event] error event details
def on_error_occurred(event)
# If this is a user consumption related error, we bump the counters for metrics
if USER_CONSUMER_ERROR_TYPES.include?(event[:type])
consumer = event.payload[:caller]
with_multiple_resolutions(consumer) do |tags|
count('consumer_errors', 1, tags)
end
end
stop_transaction
end
# Hooks up to Karafka instrumentation for emitted statistics
#
# @param event [Karafka::Core::Monitoring::Event]
def on_statistics_emitted(event)
statistics = event[:statistics]
consumer_group_id = event[:consumer_group_id]
rd_kafka_metrics.each do |metric|
report_metric(metric, statistics, consumer_group_id)
end
report_aggregated_topics_metrics(statistics, consumer_group_id)
end
# Reports a given metric statistics to Appsignal
# @param metric [RdKafkaMetric] metric value object
# @param statistics [Hash] hash with all the statistics emitted
# @param consumer_group_id [String] cg in context which we operate
def report_metric(metric, statistics, consumer_group_id)
case metric.scope
when :root
# Do nothing on the root metrics as the same metrics are reported in a granular
# way from other places
nil
when :brokers
statistics.fetch('brokers').each_value do |broker_statistics|
# Skip bootstrap nodes
# Bootstrap nodes have nodeid -1, other nodes have positive
# node ids
next if broker_statistics['nodeid'] == -1
public_send(
metric.type,
metric.name,
broker_statistics.dig(*metric.key_location),
{
broker: broker_statistics['nodename']
}
)
end
when :topics
statistics.fetch('topics').each do |topic_name, topic_values|
topic_values['partitions'].each do |partition_name, partition_statistics|
next if partition_name == '-1'
# Skip until lag info is available
next if partition_statistics['consumer_lag'] == -1
next if partition_statistics['consumer_lag_stored'] == -1
# Skip if we do not own the fetch assignment
next if partition_statistics['fetch_state'] == 'stopped'
next if partition_statistics['fetch_state'] == 'none'
public_send(
metric.type,
metric.name,
partition_statistics.dig(*metric.key_location),
{
consumer_group: consumer_group_id,
topic: topic_name,
partition: partition_name
}
)
end
end
else
raise ArgumentError, metric.scope
end
end
# Publishes aggregated topic-level metrics that are sum of per partition metrics
#
# @param statistics [Hash] hash with all the statistics emitted
# @param consumer_group_id [String] cg in context which we operate
def report_aggregated_topics_metrics(statistics, consumer_group_id)
config.aggregated_rd_kafka_metrics.each do |metric|
statistics.fetch('topics').each do |topic_name, topic_values|
sum = 0
topic_values['partitions'].each do |partition_name, partition_statistics|
next if partition_name == '-1'
# Skip until lag info is available
next if partition_statistics['consumer_lag'] == -1
next if partition_statistics['consumer_lag_stored'] == -1
sum += partition_statistics.dig(*metric.key_location)
end
public_send(
metric.type,
metric.name,
sum,
{
consumer_group: consumer_group_id,
topic: topic_name
}
)
end
end
end
# Increments a counter with a namespace key, value and tags
#
# @param key [String] key we want to use (without the namespace)
# @param value [Integer] count value
# @param tags [Hash] additional extra tags
def count(key, value, tags)
client.count(
namespaced_metric(key),
value,
tags
)
end
# Sets the gauge value
#
# @param key [String] key we want to use (without the namespace)
# @param value [Integer] gauge value
# @param tags [Hash] additional extra tags
def gauge(key, value, tags)
client.gauge(
namespaced_metric(key),
value,
tags
)
end
private
# Wraps metric name in listener's namespace
# @param metric_name [String] RdKafkaMetric name
# @return [String]
def namespaced_metric(metric_name)
"#{namespace}_#{metric_name}"
end
# Starts the transaction for monitoring user code
#
# @param consumer [Karafka::BaseConsumer] karafka consumer instance
# @param action_name [String] lifecycle user method name
def start_transaction(consumer, action_name)
client.start_transaction(
"#{consumer.class}##{action_name}"
)
end
# Stops the transaction wrapping user code
def stop_transaction
client.stop_transaction
end
# @param consumer [Karafka::BaseConsumer] Karafka consumer instance
def with_multiple_resolutions(consumer)
topic_name = consumer.topic.name
consumer_group_id = consumer.topic.consumer_group.id
partition = consumer.partition
tags = {
consumer_group: consumer_group_id,
topic: topic_name
}
yield(tags)
yield(tags.merge(partition: partition))
end
# Sends minute based probing metrics
def minute_probe
concurrency = Karafka::App.config.concurrency
count('processes_count', 1, {})
count('threads_count', concurrency, {})
end
end
end
end
end
end