librato/librato-rack

View on GitHub
lib/librato/rack/tracker.rb

Summary

Maintainability
A
2 hrs
Test Coverage
require 'socket'

module Librato
  class Rack
    class Tracker
      extend Forwardable

      def_delegators :collector, :increment, :measure, :timing, :group
      def_delegators :logger, :log

      attr_reader :config
      attr_accessor :on_heroku

      def initialize(config)
        @config = config
        collector.prefix = config.prefix
        config.register_listener(collector)
      end

      # check to see if we should start a worker for this process.
      # if you are using this externally, use #start! instead as this
      # method may change
      def check_worker
        return if @worker # already running
        return unless start_worker?
        log(:debug) { "config: #{config.dump}" }
        @pid = $$
        log(:debug) { ">> starting up worker for pid #{@pid}..." }

        @worker = Worker.new(timer: config.event_mode)
        @worker.run_periodically(config.flush_interval) do
          flush
        end

        config.deprecations.each { |d| deprecate(d) }
      end

      # primary collector object used by this tracker
      def collector
        @collector ||= Librato::Collector.new(tags: tags)
      end

      # log a deprecation message
      def deprecate(message)
        log :warn, "DEPRECATION: #{message}"
      end

      # send all current data to Metrics
      def flush
        log :debug, "flushing pid #{@pid} (#{Time.now}).."
        start = Time.now
        # thread safety is handled internally for stores
        queue = build_flush_queue(collector)
        queue.submit unless queue.empty?
        log(:trace) { "flushed pid #{@pid} in #{(Time.now - start)*1000.to_f}ms" }
      rescue Exception => error
        log :error, "submission failed permanently: #{error}"
      end

      # current local instrumentation to be sent on next flush
      # this is for debugging, don't call rapidly in production as it
      # may introduce latency
      def queued
        build_flush_queue(collector, true).queued
      end

      # given current state, should the tracker start a reporter thread?
      def should_start?
        if !config.user || !config.token
          # don't show this unless we're debugging, expected behavior
          log :debug, 'halting: credentials not present.'
        elsif config.autorun == false
          log :debug, 'halting: LIBRATO_AUTORUN disabled startup'
        elsif tags.any? { |k,v| k.to_s !~ ValidatingQueue::TAGS_KEY_REGEX || v.to_s !~ ValidatingQueue::TAGS_VALUE_REGEX }
          log :warn, "halting: '#{tags}' are invalid tags."
        elsif tags.keys.length > ValidatingQueue::DEFAULT_TAGS_LIMIT
          log :warn, "halting: cannot exceed default tags limit of #{ValidatingQueue::DEFAULT_TAGS_LIMIT} tag names per measurement."
        elsif on_heroku && !config.has_tags?
          log :warn, 'halting: tags must be provided in configuration.'
        else
          return true
        end
        false
      end

      # start worker thread, one per process.
      # if this process has been forked from an one with an active
      # worker thread we don't need to worry about cleanup, the worker
      # thread will not pass with the fork
      def start!
        check_worker if should_start?
      end

      # change output stream for logging
      def update_log_target(target)
        logger.outlet = target
      end

      def suite_enabled?(suite)
        config.metric_suites.include?(suite.to_sym)
      end

      private

      # access to client instance
      def client
        @client ||= prepare_client
      end

      # use custom faraday adapter if running in evented context
      def custom_adapter
        case config.event_mode
        when :eventmachine
          :em_http
        when :synchrony
          :em_synchrony
        else
          nil
        end
      end

      def build_flush_queue(collector, preserve=false)
        queue = ValidatingQueue.new( client: client,
          prefix: config.prefix, skip_measurement_times: true )
        [collector.counters, collector.aggregate].each do |cache|
          cache.flush_to(queue, preserve: preserve)
        end
        if suite_enabled?(:rack)
          queue.add 'rack.processes' => { value: 1, tags: tags }
        end
        trace_queued(queue.queued) #if should_log?(:trace)
        queue
      end

      # trace metrics being sent
      def trace_queued(queued)
        require 'pp'
        log(:trace) { "Queued: " + queued.pretty_inspect }
      end

      def logger
        return @logger if @logger
        @logger = Logger.new(config.log_target)
        @logger.log_level = config.log_level
        @logger.prefix    = config.log_prefix
        @logger
      end

      def prepare_client
        client = Librato::Metrics::Client.new
        client.authenticate config.user, config.token
        client.api_endpoint = config.api_endpoint
        client.proxy = config.proxy
        client.custom_user_agent = user_agent
        if custom_adapter
          client.faraday_adapter = custom_adapter
        end
        client
      end

      def ruby_engine
        return RUBY_ENGINE if Object.constants.include?(:RUBY_ENGINE)
        RUBY_DESCRIPTION.split[0]
      end

      def tags
        @tags ||= config.has_tags? ? config.tags : { host: Socket.gethostname.downcase }
      end

      # should we spin up a worker? wrap this in a process check
      # so we only actually check once per process. this allows us
      # to check again if the process forks.
      def start_worker?
        if @pid_checked == $$
          false
        else
          @pid_checked = $$
          should_start?
        end
      end

      def user_agent
        ua_chunks = [version_string]
        ua_chunks << "(#{ruby_engine}; #{RUBY_VERSION}p#{RUBY_PATCHLEVEL}; #{RUBY_PLATFORM})"
        ua_chunks.join(' ')
      end

      def version_string
        "librato-rack/#{Librato::Rack::VERSION}"
      end
    end
  end
end