cloudfoundry/collector

View on GitHub
lib/collector/handler.rb

Summary

Maintainability
A
35 mins
Test Coverage
require "vcap/common"

module Collector

  class HandlerContext
    attr_reader :index
    attr_reader :now
    attr_reader :varz

    def initialize(index, now, varz)
      @index = index
      @now = now
      @varz = varz
    end

    def ==(other)
      other.index == index && other.now == now && other.varz == varz
    end
  end

  # Varz metric handler
  #
  # It's used for processing varz from jobs and publishing them to the metric collector (Historian)
  # server
  class Handler
    @instance_map = {}

    class << self
      # @return [Hash<String, Handler>] hash of jobs to {Handler}s
      attr_accessor :handler_map
      attr_accessor :instance_map

      def handler_map
        Components::HANDLERS
      end

      # Retrieves a {Handler} for the job type with the provided context. Will
      # default to the generic one if the job does not have a handler
      # registered.
      #
      # @param [Collector::Historian] historian the historian to use for
      #   writing metrics
      # @param [String] job the job name
      # @param [Fixnum] index the job index
      # @param [Fixnum] now the timestamp of when the metrics were collected
      # @param [Hash] varz the values from the remote server /varz
      # @return [Handler] the handler for this job from the handler map or the
      #   default one
      def handler(historian, job)
        handler_class = Handler.handler_map.fetch(job, Handler)
        handler_instance = @instance_map[handler_class]
        unless handler_instance
          handler_instance = handler_class.new(historian, job)
          @instance_map[handler_class] = handler_instance
        end
        handler_instance
      end
    end

    # @return [String] job name
    attr_reader :job

    # Creates a new varz handler
    #
    # @param [Collector::Historian] historian
    # @param [String] job the job for this varz
    # @param [Fixnum] index the index for this varz
    # @param [Fixnum] now the timestamp when it was collected
    def initialize(historian, job)
      @historian = historian
      @job = job
    end

    # Processes varz in the context of the collection. Subclasses
    # should override this.
    #
    # @param [Hash] varz the varzs collected for this job
    def process(context)
    end

    # Subclasses can override this to add additional tags to the metrics
    # submitted.
    #
    # @param [Hash] varz the varzs collected for this job
    # @return [Hash] the key/value tags that will be added to the submission
    def additional_tags(context)
      {}
    end

    MEM_AND_CPU_STATS = %w(mem_bytes mem_used_bytes mem_free_bytes cpu_load_avg).freeze

    RECORDED_LOG_LEVELS = %w(fatal error warn).freeze

    # Called by the collector to process the varz. Processes common
    # metric data and then calls process() to add subclass behavior.
    def do_process(context)
      varz = context.varz

      MEM_AND_CPU_STATS.each { |stat| send_metric(stat, varz[stat], context) if varz[stat] }

      send_metric("uptime_in_seconds", VCAP.uptime_string_to_seconds(varz["uptime"]), context) if varz["uptime"]

      # Log counts in varz look like: { log_counts: { "error": 2, "warn": 1 }}
      varz.fetch("log_counts", {}).each do |level, count|
        next unless RECORDED_LOG_LEVELS.include?(level)
        send_metric("log_count", count, context, {"level" => level})
      end

      process(context)
    end

    # Sends the metric to the metric collector (historian)
    #
    # @param [String] name the metric name
    # @param [String, Fixnum] value the metric value
    def send_metric(name, value, context, tags_provided = {})
      if value.nil?
        Config.logger.warn("Received no value for #{name}")
        return
      end

      tags_provided = symbolize_keys(tags_provided)
      tags = base_tags(context).merge(tags_provided)
      job = tags[:job]
      index = tags[:index]
      tags[:name] = "#{job}/#{index}" unless tags[:name]

      @historian.send_data({key: name,
                               timestamp: context.now,
                               value: value,
                               tags: tags})
    end

    # Sends latency metrics to the metric collector (historian)
    #
    # @param [String] name the metric name
    # @param [Hash] value the latency metric value
    def send_latency_metric(name, value, context, tags = {})
      return unless value

      samples = value[:samples] || value["samples"]
      if samples > 0
        value = value[:value] || value["value"]
        send_metric(name, value / samples, context, tags)
      end
    end

    private

    def symbolize_keys(hash)
      hash.inject({}){|memo,(k,v)| memo[k.to_sym] = v; memo}
    end

    def base_tags(context)
      base_tags = additional_tags(context)
      base_tags.merge!(Components.get_job_tags(@job))
      base_tags.merge!(job: @job, index: context.index, deployment: Config.deployment_name)
    end
  end
end