lib/firehose/server/metrics.rb
require "set"
module Firehose::Server
module Metrics
class TimeSeries
attr_reader :series
def initialize(seconds: 5, keep_buckets: 2, gc: true)
@seconds = seconds.to_i
if @seconds < 1
raise ArgumentError, "TimeSeries interval must be >= 1"
end
@keep_buckets = keep_buckets
@track_gc_metrics = gc
clear!
end
def method_missing(method, *args)
current.send(method, *args)
end
def clear!
@series = Hash.new do |h, k|
bucket = bucket(k)
h[bucket] = Firehose::Server::Metrics::Buffer.new(bucket, gc: @track_gc_metrics)
end
end
def clear_old!
# keep latest @keep_buckets buckets
buckets = []
@keep_buckets.times do |i|
bucket = bucket(Time.now) - (i * @seconds)
buckets << [bucket, @series[bucket]]
end
clear!
# use reverse_each to keep insertion order correct
# (Hash keys are insertion ordered)
buckets.reverse_each do |(bucket, buffer)|
@series[bucket] = buffer
end
@series
end
def to_json
JSON.generate @series.values.map(&:to_hash)
end
def empty?
@series.empty?
end
# private
def current
@series[bucket(Time.now)]
end
def bucket(time)
secs = time.to_i
secs - (secs % @seconds)
end
end
class Buffer
def initialize(time_bucket, gc: true)
@time_bucket = time_bucket
@active_channels = Set.new
@global = Hash.new { 0 }
@channel_metrics = Hash.new
if gc
@gc_metrics = GC.stat
end
end
# metric handlers
def message_published!(channel, message = nil)
@active_channels << channel
incr_global! :published
incr_channel! channel, :published
cm = channel_metrics(channel)
if message
incr_channel! channel, :total_size, message.size
end
end
def channel_subscribed!(channel)
@active_channels << channel
incr_global! :subscribed
incr_channel! channel, :subscribed
end
def channels_subscribed_multiplexed_ws!(channels)
channels.each do |channel|
@active_channels << channel
incr_global! :subscribed_multiplexed_ws
incr_channel! channel, :subscribed_multiplexed_ws
end
end
def channels_subscribed_multiplexed_long_polling!(channels)
channels.each do |channel|
@active_channels << channel
incr_global! :subscribed_multiplexed_long_polling
incr_channel! channel, :subscribed_multiplexed_long_polling
end
end
def channels_subscribed_multiplexed_ws_dynamic!(subscriptions)
incr_global! :subscribed_multiplexed_ws_dynamic, subscriptions.size
end
def duplicate_multiplex_ws_subscription!
incr_global! :duplicate_multiplex_subscription
end
def new_connection!
incr_global! :connections
incr_global! :connections_opened
end
def connection_closed!
incr_global! :connections_closed
decr_global! :connections
end
def error!(error_tag, channel = nil)
incr_global! :errors
if channel
incr_channel! channel, "error_#{error_tag}"
else
incr_global! "error_#{error_tag}"
end
end
def timeout!(tag, channel = nil)
incr_global! :timeouts
if channel
incr_channel! channel, "timeout_#{tag}"
else
incr_global! "timeout_#{tag}"
end
end
# serialization helpers (used to store metrics to redis)
def to_hash
h = {
time: @time_bucket,
global: @global.merge(active_channels: @active_channels.size),
channels: @channel_metrics
}
h.merge!(gc: @gc_metrics) if @gc_metrics
h
end
def to_json
JSON.generate self.to_hash
end
def == other
self.to_hash == other.to_hash
end
private
def incr_global!(name, increment = 1)
@global[name] += increment
end
def decr_global!(name, decrement = 1)
@global[name] -= decrement
end
def incr_channel!(channel, counter, increment = 1)
channel_metrics(channel)[counter] += increment
end
def decr_channel!(channel, counter, decrement = 1)
channel_metrics(channel)[counter] -= decrement
end
def channel_metrics(channel)
@channel_metrics[channel] ||= Hash.new { 0 }
end
end
end
end