cloudfoundry/cloud_controller_ng

View on GitHub
lib/cloud_controller/diego/reporters/instances_stats_reporter.rb

Summary

Maintainability
B
4 hrs
Test Coverage
require 'logcache/client'
require 'cloud_controller/diego/reporters/reporter_mixins'

module VCAP::CloudController
  module Diego
    class InstancesStatsReporter
      include ReporterMixins

      def initialize(bbs_instances_client, logstats_client)
        @bbs_instances_client = bbs_instances_client
        @logstats_client = logstats_client
      end

      def stats_for_app(process)
        result       = {}
        current_time = Time.now.to_f
        formatted_current_time = Time.now.to_datetime.rfc3339

        logger.debug('stats_for_app.fetching_container_metrics', process_guid: process.guid)
        desired_lrp = bbs_instances_client.desired_lrp_instance(process)

        log_cache_data, log_cache_errors = envelopes(desired_lrp, process)
        stats = formatted_process_stats(log_cache_data, formatted_current_time)
        quota_stats = formatted_quota_stats(log_cache_data)

        bbs_instances_client.lrp_instances(process).each do |actual_lrp|
          index = actual_lrp.actual_lrp_key.index
          next unless index < process.instances

          info = {
            state: LrpStateTranslator.translate_lrp_state(actual_lrp),
            isolation_segment: desired_lrp.PlacementTags.first,
            stats: {
              name: process.name,
              uris: process.uris,
              host: actual_lrp.actual_lrp_net_info.address,
              port: get_default_port(actual_lrp.actual_lrp_net_info),
              net_info: actual_lrp.actual_lrp_net_info.to_h,
              uptime: nanoseconds_to_seconds((current_time * 1e9) - actual_lrp.since),
              fds_quota: process.file_descriptors
            }.merge(metrics_data_for_instance(stats, quota_stats, log_cache_errors, formatted_current_time, index))
          }
          info[:details] = actual_lrp.placement_error if actual_lrp.placement_error.present?

          info[:routable] = (actual_lrp.routable if actual_lrp.optional_routable)
          result[actual_lrp.actual_lrp_key.index] = info
        end

        fill_unreported_instances_with_down_instances(result, process, flat: false)

        warnings = [log_cache_errors].compact
        [result, warnings]
      rescue CloudController::Errors::NoRunningInstances => e
        logger.info('stats_for_app.error', error: e.to_s)
        [fill_unreported_instances_with_down_instances({}, process, flat: false), []]
      rescue StandardError => e
        logger.error('stats_for_app.error', error: e.to_s)
        raise e if e.is_a?(CloudController::Errors::ApiError) && e.name == 'ServiceUnavailable'

        exception = CloudController::Errors::InstancesUnavailable.new(e.message)
        exception.set_backtrace(e.backtrace)
        raise exception
      end

      private

      attr_reader :bbs_instances_client

      def metrics_data_for_instance(stats, quota_stats, log_cache_errors, formatted_current_time, index)
        if log_cache_errors.blank?
          {
            mem_quota: quota_stats[index]&.memory_bytes_quota,
            disk_quota: quota_stats[index]&.disk_bytes_quota,
            log_rate_limit: quota_stats[index]&.log_rate_limit,
            usage: stats[index] || missing_process_stats(formatted_current_time)
          }
        else
          {
            mem_quota: nil,
            disk_quota: nil,
            log_rate_limit: nil,
            usage: {}
          }
        end
      end

      def missing_process_stats(formatted_current_time)
        {
          time: formatted_current_time,
          cpu: 0,
          cpu_entitlement: nil,
          mem: 0,
          disk: 0,
          log_rate: 0
        }
      end

      def formatted_process_stats(log_cache_data, formatted_current_time)
        log_cache_data.
          map do |e|
            [
              e.instance_index,
              converted_container_metrics(e, formatted_current_time)
            ]
          end.to_h
      end

      def formatted_quota_stats(log_cache_data)
        log_cache_data.
          index_by(&:instance_index)
      end

      def envelopes(desired_lrp, process)
        if desired_lrp.metric_tags['process_id']
          filter = ->(envelope) { envelope.tags.any? { |key, value| key == 'process_id' && value == process.guid } }
          source_guid = process.app_guid
        else
          filter = ->(_) { true }
          source_guid = process.guid
        end

        [@logstats_client.container_metrics(
          source_guid: source_guid,
          logcache_filter: filter
        ), nil]
      rescue GRPC::BadStatus, CloudController::Errors::ApiError => e
        logger.error('stats_for_app.error', error: e.message, backtrace: e.backtrace.join("\n"))
        [[], 'Stats server temporarily unavailable.']
      end

      def logger
        @logger ||= Steno.logger('cc.diego.instances_reporter')
      end

      def converted_container_metrics(container_metrics, formatted_current_time)
        cpu = container_metrics.cpu_percentage
        cpu_entitlement = container_metrics.cpu_entitlement_percentage.nil? ? nil : container_metrics.cpu_entitlement_percentage / 100
        mem = container_metrics.memory_bytes
        disk = container_metrics.disk_bytes
        log_rate = container_metrics.log_rate

        if cpu.nil? || mem.nil? || disk.nil? || log_rate.nil?
          missing_process_stats(formatted_current_time)
        else
          {
            time: formatted_current_time,
            cpu: cpu / 100,
            cpu_entitlement: cpu_entitlement,
            mem: mem,
            disk: disk,
            log_rate: log_rate
          }
        end
      end

      def get_default_port(net_info)
        net_info.ports.each do |port_mapping|
          return port_mapping.host_port if port_mapping.container_port == DEFAULT_APP_PORT
        end

        0
      end
    end
  end
end