cloudfoundry/cloud_controller_ng

View on GitHub
lib/cloud_controller/metrics/periodic_updater.rb

Summary

Maintainability
A
1 hr
Test Coverage
require 'cloud_controller/metrics/statsd_updater'
require 'vcap/stats'

module VCAP::CloudController::Metrics
  class PeriodicUpdater
    def initialize(start_time, log_counter, logger, statsd_updater, prometheus_updater)
      @start_time = start_time
      @statsd_updater = statsd_updater
      @prometheus_updater = prometheus_updater
      @log_counter = log_counter
      @logger = logger
      @known_job_queues = {
        VCAP::CloudController::Jobs::Queues.local(VCAP::CloudController::Config.config).to_sym => 0
      }
    end

    def setup_updates
      update!
      EM.add_periodic_timer(600) { catch_error { update_user_count } }
      EM.add_periodic_timer(30)  { catch_error { update_job_queue_length } }
      EM.add_periodic_timer(30)  { catch_error { update_job_queue_load } }
      EM.add_periodic_timer(30)  { catch_error { update_thread_info } }
      EM.add_periodic_timer(30)  { catch_error { update_failed_job_count } }
      EM.add_periodic_timer(30)  { catch_error { update_vitals } }
      EM.add_periodic_timer(30)  { catch_error { update_log_counts } }
      EM.add_periodic_timer(30)  { catch_error { update_task_stats } }
      EM.add_periodic_timer(30)  { catch_error { update_deploying_count } }
      EM.add_periodic_timer(30)  { catch_error { update_webserver_stats } }
    end

    def update!
      update_user_count
      update_job_queue_length
      update_job_queue_load
      update_thread_info
      update_failed_job_count
      update_vitals
      update_log_counts
      update_task_stats
      update_deploying_count
      update_webserver_stats
    end

    def catch_error
      yield
    rescue StandardError => e
      @logger.info(e)
    end

    def update_task_stats
      running_tasks = VCAP::CloudController::TaskModel.where(state: VCAP::CloudController::TaskModel::RUNNING_STATE)
      running_task_count = running_tasks.count
      running_task_memory = running_tasks.sum(:memory_in_mb) || 0
      @statsd_updater.update_task_stats(running_task_count, running_task_memory)
      @prometheus_updater.update_task_stats(running_task_count, running_task_memory * 1024 * 1024)
    end

    def update_log_counts
      counts = @log_counter.counts

      hash = {}
      Steno::Logger::LEVELS.each_key do |level_name|
        hash[level_name] = counts.fetch(level_name.to_s, 0)
      end

      @statsd_updater.update_log_counts(hash)
    end

    def update_deploying_count
      deploying_count = VCAP::CloudController::DeploymentModel.deploying_count

      [@statsd_updater, @prometheus_updater].each { |u| u.update_deploying_count(deploying_count) }
    end

    def update_user_count
      user_count = VCAP::CloudController::User.count

      [@statsd_updater, @prometheus_updater].each { |u| u.update_user_count(user_count) }
    end

    def update_job_queue_length
      jobs_by_queue_with_count = Delayed::Job.where(attempts: 0).group_and_count(:queue)

      total                      = 0
      pending_job_count_by_queue = jobs_by_queue_with_count.each_with_object({}) do |row, hash|
        @known_job_queues[row[:queue].to_sym] = 0
        total += row[:count]
        hash[row[:queue].to_sym] = row[:count]
      end

      pending_job_count_by_queue.reverse_merge!(@known_job_queues)
      @statsd_updater.update_job_queue_length(pending_job_count_by_queue, total)
      @prometheus_updater.update_job_queue_length(pending_job_count_by_queue)
    end

    def update_job_queue_load
      jobs_by_queue_with_run_now = Delayed::Job.
                                   where(Sequel.lit('run_at <= ?', Time.now)).
                                   where(Sequel.lit('failed_at IS NULL')).group_and_count(:queue)

      total = 0
      pending_job_load_by_queue = jobs_by_queue_with_run_now.each_with_object({}) do |row, hash|
        @known_job_queues[row[:queue].to_sym] = 0
        total += row[:count]
        hash[row[:queue].to_sym] = row[:count]
      end

      pending_job_load_by_queue.reverse_merge!(@known_job_queues)
      @statsd_updater.update_job_queue_load(pending_job_load_by_queue, total)
      @prometheus_updater.update_job_queue_load(pending_job_load_by_queue)
    end

    def update_thread_info
      return unless VCAP::CloudController::Config.config.get(:webserver) == 'thin'

      local_thread_info = thread_info_thin
      [@statsd_updater, @prometheus_updater].each { |u| u.update_thread_info_thin(local_thread_info) }
    end

    def update_failed_job_count
      jobs_by_queue_with_count = Delayed::Job.where(Sequel.lit('failed_at IS NOT NULL')).group_and_count(:queue)

      total                = 0
      failed_jobs_by_queue = jobs_by_queue_with_count.each_with_object({}) do |row, hash|
        @known_job_queues[row[:queue].to_sym] = 0
        total += row[:count]
        hash[row[:queue].to_sym] = row[:count]
      end

      failed_jobs_by_queue.reverse_merge!(@known_job_queues)
      @statsd_updater.update_failed_job_count(failed_jobs_by_queue, total)
      @prometheus_updater.update_failed_job_count(failed_jobs_by_queue)
    end

    def update_vitals
      rss_bytes, pcpu = VCAP::Stats.process_memory_bytes_and_cpu

      vitals = {
        uptime: Time.now.utc.to_i - @start_time.to_i,
        cpu: pcpu.to_f,
        mem_bytes: rss_bytes.to_i,
        cpu_load_avg: VCAP::Stats.cpu_load_average,
        mem_used_bytes: VCAP::Stats.memory_used_bytes,
        mem_free_bytes: VCAP::Stats.memory_free_bytes,
        num_cores: VCAP::HostSystem.new.num_cores
      }

      @statsd_updater.update_vitals(vitals)

      prom_vitals = vitals.clone
      prom_vitals.delete(:uptime)
      prom_vitals.delete(:cpu)
      prom_vitals[:started_at] = @start_time.to_i
      @prometheus_updater.update_vitals(prom_vitals)
    end

    def update_webserver_stats
      return unless VCAP::CloudController::Config.config.get(:webserver) == 'puma'

      local_stats = Puma.stats_hash
      worker_count = local_stats[:booted_workers]
      worker_stats = []
      local_stats[:worker_status].each do |worker_status|
        worker_stats << {
          started_at: Time.parse(worker_status[:started_at]).utc.to_i,
          index: worker_status[:index],
          pid: worker_status[:pid],
          thread_count: worker_status[:last_status][:running],
          backlog: worker_status[:last_status][:backlog]
        }
      end
      @prometheus_updater.update_webserver_stats_puma(worker_count, worker_stats)
    end

    def thread_info_thin
      threadqueue = EM.instance_variable_get(:@threadqueue) || []
      resultqueue = EM.instance_variable_get(:@resultqueue) || []
      {
        thread_count: Thread.list.size,
        event_machine: {
          connection_count: EventMachine.connection_count,
          threadqueue: {
            size: threadqueue.size,
            num_waiting: threadqueue.is_a?(Array) ? 0 : threadqueue.num_waiting
          },
          resultqueue: {
            size: resultqueue.size,
            num_waiting: resultqueue.is_a?(Array) ? 0 : resultqueue.num_waiting
          }
        }
      }
    end
  end
end