lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb
# frozen_string_literal: true
module Karafka
module Instrumentation
# Namespace for vendor specific instrumentation
module Vendors
# Datadog specific instrumentation
module Datadog
# Listener that can be used to subscribe to Karafka to receive stats via StatsD
# and/or Datadog
#
# @note You need to setup the `dogstatsd-ruby` client and assign it
class MetricsListener
include ::Karafka::Core::Configurable
extend Forwardable
def_delegators :config, :client, :rd_kafka_metrics, :namespace,
:default_tags, :distribution_mode
# Value object for storing a single rdkafka metric publishing details
RdKafkaMetric = Struct.new(:type, :scope, :name, :key_location)
# Namespace under which the DD metrics should be published
setting :namespace, default: 'karafka'
# Datadog client that we should use to publish the metrics
setting :client
# Default tags we want to publish (for example hostname)
# Format as followed (example for hostname): `["host:#{Socket.gethostname}"]`
setting :default_tags, default: []
# All the rdkafka metrics we want to publish
#
# By default we publish quite a lot so this can be tuned
# Note, that the once with `_d` come from Karafka, not rdkafka or Kafka
setting :rd_kafka_metrics, default: [
# Client metrics
RdKafkaMetric.new(:count, :root, 'messages.consumed', 'rxmsgs_d'),
RdKafkaMetric.new(:count, :root, 'messages.consumed.bytes', 'rxmsg_bytes'),
# Broker metrics
RdKafkaMetric.new(:count, :brokers, 'consume.attempts', 'txretries_d'),
RdKafkaMetric.new(:count, :brokers, 'consume.errors', 'txerrs_d'),
RdKafkaMetric.new(:count, :brokers, 'receive.errors', 'rxerrs_d'),
RdKafkaMetric.new(:count, :brokers, 'connection.connects', 'connects_d'),
RdKafkaMetric.new(:count, :brokers, 'connection.disconnects', 'disconnects_d'),
RdKafkaMetric.new(:gauge, :brokers, 'network.latency.avg', %w[rtt avg]),
RdKafkaMetric.new(:gauge, :brokers, 'network.latency.p95', %w[rtt p95]),
RdKafkaMetric.new(:gauge, :brokers, 'network.latency.p99', %w[rtt p99]),
# Topics metrics
RdKafkaMetric.new(:gauge, :topics, 'consumer.lags', 'consumer_lag_stored'),
RdKafkaMetric.new(:gauge, :topics, 'consumer.lags_delta', 'consumer_lag_stored_d')
].freeze
# Whether histogram metrics should be sent as distributions or histograms.
# Distribution metrics are aggregated globally and not agent-side,
# providing more accurate percentiles whenever consumers are running on multiple hosts.
#
# Learn more at https://docs.datadoghq.com/metrics/types/?tab=distribution#metric-types
setting :distribution_mode, default: :histogram
configure
# @param block [Proc] configuration block
def initialize(&block)
configure
setup(&block) if block
end
# @param block [Proc] configuration block
# @note We define this alias to be consistent with `WaterDrop#setup`
def setup(&block)
configure(&block)
end
# Hooks up to Karafka instrumentation for emitted statistics
#
# @param event [Karafka::Core::Monitoring::Event]
def on_statistics_emitted(event)
statistics = event[:statistics]
consumer_group_id = event[:consumer_group_id]
base_tags = default_tags + ["consumer_group:#{consumer_group_id}"]
rd_kafka_metrics.each do |metric|
report_metric(metric, statistics, base_tags)
end
end
# Increases the errors count by 1
#
# @param event [Karafka::Core::Monitoring::Event]
def on_error_occurred(event)
extra_tags = ["type:#{event[:type]}"]
if event.payload[:caller].respond_to?(:messages)
extra_tags += consumer_tags(event.payload[:caller])
end
count('error_occurred', 1, tags: default_tags + extra_tags)
end
# Reports how many messages we've polled and how much time did we spend on it
#
# @param event [Karafka::Core::Monitoring::Event]
def on_connection_listener_fetch_loop_received(event)
time_taken = event[:time]
messages_count = event[:messages_buffer].size
consumer_group_id = event[:subscription_group].consumer_group.id
extra_tags = ["consumer_group:#{consumer_group_id}"]
histogram('listener.polling.time_taken', time_taken, tags: default_tags + extra_tags)
histogram('listener.polling.messages', messages_count, tags: default_tags + extra_tags)
end
# Here we report majority of things related to processing as we have access to the
# consumer
# @param event [Karafka::Core::Monitoring::Event]
def on_consumer_consumed(event)
consumer = event.payload[:caller]
messages = consumer.messages
metadata = messages.metadata
tags = default_tags + consumer_tags(consumer)
count('consumer.messages', messages.count, tags: tags)
count('consumer.batches', 1, tags: tags)
gauge('consumer.offset', metadata.last_offset, tags: tags)
histogram('consumer.consumed.time_taken', event[:time], tags: tags)
histogram('consumer.batch_size', messages.count, tags: tags)
histogram('consumer.processing_lag', metadata.processing_lag, tags: tags)
histogram('consumer.consumption_lag', metadata.consumption_lag, tags: tags)
end
{
revoked: :revoked,
shutdown: :shutdown,
ticked: :tick
}.each do |after, name|
class_eval <<~RUBY, __FILE__, __LINE__ + 1
# Keeps track of user code execution
#
# @param event [Karafka::Core::Monitoring::Event]
def on_consumer_#{after}(event)
tags = default_tags + consumer_tags(event.payload[:caller])
count('consumer.#{name}', 1, tags: tags)
end
RUBY
end
# Worker related metrics
# @param event [Karafka::Core::Monitoring::Event]
def on_worker_process(event)
jq_stats = event[:jobs_queue].statistics
gauge('worker.total_threads', Karafka::App.config.concurrency, tags: default_tags)
histogram('worker.processing', jq_stats[:busy], tags: default_tags)
histogram('worker.enqueued_jobs', jq_stats[:enqueued], tags: default_tags)
end
# We report this metric before and after processing for higher accuracy
# Without this, the utilization would not be fully reflected
# @param event [Karafka::Core::Monitoring::Event]
def on_worker_processed(event)
jq_stats = event[:jobs_queue].statistics
histogram('worker.processing', jq_stats[:busy], tags: default_tags)
end
private
%i[
count
gauge
increment
decrement
].each do |metric_type|
class_eval <<~RUBY, __FILE__, __LINE__ + 1
def #{metric_type}(key, *args)
client.#{metric_type}(
namespaced_metric(key),
*args
)
end
RUBY
end
# Selects the histogram mode configured and uses it to report to DD client
# @param key [String] non-namespaced key
# @param args [Array] extra arguments to pass to the client
def histogram(key, *args)
case distribution_mode
when :histogram
client.histogram(
namespaced_metric(key),
*args
)
when :distribution
client.distribution(
namespaced_metric(key),
*args
)
else
raise(
::ArgumentError,
'distribution_mode setting value must be either :histogram or :distribution'
)
end
end
# Wraps metric name in listener's namespace
# @param metric_name [String] RdKafkaMetric name
# @return [String]
def namespaced_metric(metric_name)
"#{namespace}.#{metric_name}"
end
# Reports a given metric statistics to Datadog
# @param metric [RdKafkaMetric] metric value object
# @param statistics [Hash] hash with all the statistics emitted
# @param base_tags [Array<String>] base tags we want to start with
def report_metric(metric, statistics, base_tags)
case metric.scope
when :root
public_send(
metric.type,
metric.name,
statistics.fetch(*metric.key_location),
tags: base_tags
)
when :brokers
statistics.fetch('brokers').each_value do |broker_statistics|
# Skip bootstrap nodes
# Bootstrap nodes have nodeid -1, other nodes have positive
# node ids
next if broker_statistics['nodeid'] == -1
public_send(
metric.type,
metric.name,
broker_statistics.dig(*metric.key_location),
tags: base_tags + ["broker:#{broker_statistics['nodename']}"]
)
end
when :topics
statistics.fetch('topics').each do |topic_name, topic_values|
topic_values['partitions'].each do |partition_name, partition_statistics|
next if partition_name == '-1'
# Skip until lag info is available
next if partition_statistics['consumer_lag'] == -1
next if partition_statistics['consumer_lag_stored'] == -1
# Skip if we do not own the fetch assignment
next if partition_statistics['fetch_state'] == 'stopped'
next if partition_statistics['fetch_state'] == 'none'
public_send(
metric.type,
metric.name,
partition_statistics.dig(*metric.key_location),
tags: base_tags + [
"topic:#{topic_name}",
"partition:#{partition_name}"
]
)
end
end
else
raise ArgumentError, metric.scope
end
end
# Builds basic per consumer tags for publication
#
# @param consumer [Karafka::BaseConsumer]
# @return [Array<String>]
def consumer_tags(consumer)
messages = consumer.messages
metadata = messages.metadata
consumer_group_id = consumer.topic.consumer_group.id
[
"topic:#{metadata.topic}",
"partition:#{metadata.partition}",
"consumer_group:#{consumer_group_id}"
]
end
end
end
end
end
end