librato/librato-rack

View on GitHub
lib/librato/collector/aggregator.rb

Summary

Maintainability
B
5 hrs
Test Coverage
require 'hetchy'

module Librato
  class Collector
    # maintains storage of timing and measurement type measurements
    #
    class Aggregator
      SEPARATOR = "$$"

      extend Forwardable

      def_delegators :@cache, :empty?, :prefix, :prefix=

      attr_reader :default_tags

      def initialize(options={})
        @cache = Librato::Metrics::Aggregator.new(prefix: options[:prefix])
        @percentiles = {}
        @lock = Mutex.new
        @default_tags = options.fetch(:default_tags, {})
      end

      def [](key)
        fetch(key)
      end

      # retrieve current value of a metric/source/percentage. this exists
      # primarily for debugging/testing and isn't called routinely.
      def fetch(key, options={})
        return nil if @cache.empty?
        return fetch_percentile(key, options) if options[:percentile]
        measurements = nil
        tags = options[:tags] || @default_tags
        @lock.synchronize { measurements = @cache.queued[:measurements] }
        measurements.each do |metric|
          if metric[:name] == key.to_s
            return metric if !tags && !metric[:tags]
            return metric if tags == metric[:tags]
          end
        end
        nil
      end

      # clear all stored values
      def delete_all
        @lock.synchronize { clear_storage }
      end

      # transfer all measurements to queue and reset internal status
      def flush_to(queue, opts={})
        queued = nil
        @lock.synchronize do
          return if @cache.empty?
          queued = @cache.queued
          flush_percentiles(queue, opts) unless @percentiles.empty?
          clear_storage unless opts[:preserve]
        end
        queue.merge!(queued) if queued
      end

      # @example Simple measurement
      #   measure 'sources_returned', sources.length
      #
      # @example Simple timing in milliseconds
      #   timing 'twitter.lookup', 2.31
      #
      # @example Block-based timing
      #   timing 'db.query' do
      #     do_my_query
      #   end
      #
      # @example Custom source
      #    measure 'user.all_orders', user.order_count, :source => user.id
      #
      def measure(*args, &block)
        options = {}
        event = args[0].to_s
        returned = nil

        # handle block or specified argument
        if block_given?
          start = Time.now
          returned = yield
          value = ((Time.now - start) * 1000.0).to_i
        elsif args[1]
          value = args[1]
        else
          raise "no value provided"
        end

        # detect options hash if present
        if args.length > 1 and args[-1].respond_to?(:each)
          options = args[-1]
        end

        percentiles = Array(options[:percentile])
        source = options[:source]
        tags_option = options[:tags]
        tags_option = { source: source } if source && !tags_option
        tags =
          if tags_option && options[:inherit_tags]
            @default_tags.merge(tags_option)
          elsif tags_option
            tags_option
          else
            @default_tags
          end

        @lock.synchronize do
          payload = { value: value }
          payload.merge!({ tags: tags }) if tags
          @cache.add event => payload

          percentiles.each do |perc|
            store = fetch_percentile_store(event, payload)
            store[:reservoir] << value
            track_percentile(store, perc)
          end
        end
        returned
      end
      alias :timing :measure

      private

      def clear_storage
        @cache.clear
        @percentiles.each do |key, val|
          val[:reservoir].clear
          val[:percs].clear
        end
      end

      def fetch_percentile(key, options)
        store = fetch_percentile_store(key, options)
        return nil unless store
        store[:reservoir].percentile(options[:percentile])
      end

      def fetch_percentile_store(event, options)
        keyname = event

        if options[:tags]
          keyname = Librato::Metrics::Util.build_key_for(keyname, options[:tags])
        end

        @percentiles[keyname] ||= {
          name: event,
          reservoir: Hetchy::Reservoir.new(size: 1000),
          percs: Set.new
        }
        @percentiles[keyname].merge!({ tags: options[:tags] }) if options && options[:tags]
        @percentiles[keyname]
      end

      def flush_percentiles(queue, opts)
        @percentiles.each do |key, val|
          val[:percs].each do |perc|
            perc_name = perc.to_s[0,5].gsub('.','')
            payload =
              if val[:tags]
                { value: val[:reservoir].percentile(perc), tags: val[:tags] }
              else
                val[:reservoir].percentile(perc)
              end
            queue.add "#{val[:name]}.p#{perc_name}" => payload
          end
        end
      end

      def track_percentile(store, perc)
        if perc < 0.0 || perc > 100.0
          raise InvalidPercentile, "Percentiles must be between 0.0 and 100.0"
        end
        store[:percs].add(perc)
      end

    end

  end
end