ManageIQ/manageiq

View on GitHub
app/models/metric/rollup.rb

Summary

Maintainability
A
1 hr
Test Coverage
A
90%
module Metric::Rollup
  ROLLUP_COLS  = Metric.columns_hash.collect { |c, h| c.to_sym if h.type == :float || c[0, 7] == "derived" }.compact +
                 [:stat_container_group_create_rate,
                  :stat_container_group_delete_rate,
                  :stat_container_image_registration_rate]
  STORAGE_COLS = Metric.columns_hash.collect { |c, _h| c.to_sym if c.starts_with?("derived_storage_") }.compact.freeze

  NON_STORAGE_ROLLUP_COLS = (ROLLUP_COLS - STORAGE_COLS).freeze
  CONTAINER_ROLLUP_COLS   = [:cpu_usage_rate_average, :derived_vm_numvcpus, :derived_memory_used, :net_usage_rate_average].freeze
  VM_ROLLUP_COLS          = [:cpu_usage_rate_average, :derived_memory_used, :disk_usage_rate_average, :net_usage_rate_average].freeze

  AGGREGATE_COLS = {
    :MiqEnterprise_miq_regions            => ROLLUP_COLS,
    :MiqRegion_ext_management_systems     => NON_STORAGE_ROLLUP_COLS,
    :MiqRegion_storages                   => STORAGE_COLS,
    :ExtManagementSystem_hosts            => NON_STORAGE_ROLLUP_COLS,
    :ExtManagementSystem_container_nodes  => NON_STORAGE_ROLLUP_COLS,
    :EmsCluster_hosts                     => [
      :cpu_ready_delta_summation,
      :cpu_system_delta_summation,
      :cpu_usage_rate_average,
      :cpu_usagemhz_rate_average,
      :cpu_used_delta_summation,
      :cpu_wait_delta_summation,
      :derived_vm_allocated_disk_storage,
      :derived_vm_numvcpus,
      :derived_vm_used_disk_storage,
      :disk_devicelatency_absolute_average,
      :disk_kernellatency_absolute_average,
      :disk_queuelatency_absolute_average,
      :disk_usage_rate_average,
      :mem_usage_absolute_average,
      :net_usage_rate_average,
    ],
    :Host_vms                             => [
      :cpu_ready_delta_summation,
      :cpu_system_delta_summation,
      :cpu_used_delta_summation,
      :cpu_wait_delta_summation,
      :derived_vm_allocated_disk_storage,
      :derived_vm_numvcpus,
      :derived_vm_used_disk_storage,
    ],
    :ContainerImage_containers             => CONTAINER_ROLLUP_COLS,
    :ContainerProject_all_container_groups => CONTAINER_ROLLUP_COLS,
    :ContainerService_container_groups     => CONTAINER_ROLLUP_COLS,
    :ContainerReplicator_container_groups  => CONTAINER_ROLLUP_COLS,
    :AvailabilityZone_vms                  => VM_ROLLUP_COLS,
    :HostAggregate_vms                     => VM_ROLLUP_COLS,
    :Service_vms                           => [
      :cpu_ready_delta_summation,
      :cpu_system_delta_summation,
      :cpu_usage_rate_average,
      :cpu_usagemhz_rate_average,
      :cpu_used_delta_summation,
      :cpu_wait_delta_summation,
      :derived_vm_allocated_disk_storage,
      :derived_vm_numvcpus,
      :derived_vm_used_disk_storage,
      :derived_memory_used,
      :derived_memory_available,
      :disk_devicelatency_absolute_average,
      :disk_kernellatency_absolute_average,
      :disk_queuelatency_absolute_average,
      :disk_usage_rate_average,
      :mem_usage_absolute_average,
      :net_usage_rate_average,
    ]
  }

  VM_REALTIME_COLS = [
    # Collected from VC
    :cpu_ready_delta_summation,
    :cpu_system_delta_summation,
    :cpu_usage_rate_average,
    :cpu_usagemhz_rate_average,
    :cpu_used_delta_summation,
    :cpu_wait_delta_summation,
    :disk_usage_rate_average,
    :mem_swapin_absolute_average,
    :mem_swapout_absolute_average,
    :mem_swapped_absolute_average,
    :mem_swaptarget_absolute_average,
    :mem_usage_absolute_average,
    :mem_vmmemctl_absolute_average,
    :mem_vmmemctltarget_absolute_average,
    :net_usage_rate_average,
    :sys_uptime_absolute_latest,
    # Derived
    :v_derived_cpu_reserved_pct,
    :v_derived_memory_reserved_pct,
    :v_pct_cpu_ready_delta_summation,
    :v_pct_cpu_used_delta_summation,
    :v_pct_cpu_wait_delta_summation
  ]

  HOST_REALTIME_COLS = [
    # Collected from VC
    :cpu_usage_rate_average,
    :cpu_usagemhz_rate_average,
    :cpu_used_delta_summation,
    :disk_devicelatency_absolute_average,
    :disk_kernellatency_absolute_average,
    :disk_queuelatency_absolute_average,
    :disk_usage_rate_average,
    :mem_swapin_absolute_average,
    :mem_swapout_absolute_average,
    :mem_usage_absolute_average,
    :mem_vmmemctl_absolute_average,
    :net_usage_rate_average,
    :sys_uptime_absolute_latest,
    # Derived
    :derived_cpu_available,
    :derived_memory_available,
    :derived_memory_used
  ]

  EMS_CLUSTER_REALTIME_COLS = HOST_REALTIME_COLS

  INFREQUENTLY_CHANGING_COLS = [
    :derived_cpu_available,
    :derived_cpu_reserved,
    :derived_host_count_off,
    :derived_host_count_on,
    :derived_memory_available,
    :derived_memory_reserved,
    :derived_vm_allocated_disk_storage,
    :derived_vm_count_off,
    :derived_vm_count_on,
    :derived_vm_numvcpus,
  ].freeze

  def self.excluded_col_for_expression?(col)
    NON_STORAGE_ROLLUP_COLS.include?(col) && !INFREQUENTLY_CHANGING_COLS.include?(col)
  end

  # these columns will pass false for aggregate_only to Aggregation::Process.column
  # this means that when processing the totals for a parent rollup, the total
  # values will be averaged across the number of children
  AVG_VALUE_COLUMNS = [
    :cpu_usage_rate_average
  ]

  DAILY_SUM_COLUMNS = [
    :stat_container_group_create_rate,
    :stat_container_group_delete_rate,
    :stat_container_image_registration_rate
  ].freeze
  BURST_COLS = [
    :cpu_usage_rate_average,
    :cpu_usagemhz_rate_average,
    :disk_usage_rate_average,
    :mem_usage_absolute_average,
    :derived_memory_used,
    :net_usage_rate_average
  ]
  BURST_TYPES = ['min', 'max']

  ASSOC_KEYS = [:vms, :hosts]

  TIMEOUT_PROCESS = 30.minutes.to_i
  DERIVED_COLS_EXCLUDED_CLASSES = ['MiqRegion', 'MiqEnterprise']
  TAG_SEP = "|"

  def self.aggregate_columns(klass, assoc, interval_name)
    interval_name == "realtime" ? const_get("#{klass.base_class.name.underscore.upcase}_REALTIME_COLS") : AGGREGATE_COLS["#{klass.base_class}_#{assoc}".to_sym]
  end

  def self.rollup_realtime(obj, rt_ts, _interval_name, _time_profile, new_perf, orig_perf)
    # Roll up realtime metrics from child objects
    children = obj.class::PERF_ROLLUP_CHILDREN
    children.each { |c| new_perf.merge!(rollup_child_metrics(obj, rt_ts, 'realtime', c)) } unless children.empty?

    new_perf.reverse_merge!(orig_perf)
    new_perf.merge!(Metric::Processing.process_derived_columns(obj, new_perf, rt_ts)) unless DERIVED_COLS_EXCLUDED_CLASSES.include?(obj.class.base_class.name)

    new_perf
  end

  def self.rollup_hourly(obj, hour, _interval_name, _time_profile, new_perf, orig_perf)
    # Roll up realtime metrics
    rt_perfs = Metric::Finders.find_all_by_hour(obj, hour, 'realtime')
    rollup_realtime_perfs(obj, rt_perfs, new_perf)

    # Roll up hourly metrics from child objects
    children = obj.class::PERF_ROLLUP_CHILDREN
    children.each { |c| new_perf.merge!(rollup_child_metrics(obj, hour, 'hourly', c)) } unless children.empty?

    new_perf.reverse_merge!(orig_perf)
    new_perf.merge!(Metric::Processing.process_derived_columns(obj, new_perf, hour)) unless DERIVED_COLS_EXCLUDED_CLASSES.include?(obj.class.base_class.name)
    new_perf.merge!(Metric::Statistic.calculate_stat_columns(obj, hour))

    new_perf
  end

  class << self
    alias_method :rollup_historical, :rollup_hourly
  end

  def self.rollup_daily(obj, day, interval_name, time_profile, new_perf, orig_perf)
    tp = TimeProfile.extract_objects(time_profile)
    if tp.nil?
      _log.info("Skipping [#{interval_name}] Rollup for #{obj.class.name} name: [#{obj.name}], id: [#{obj.id}] for time: [#{day}] since the time profile no longer exists.")
      return
    end

    hr_perfs = Metric::Finders.find_all_by_day(obj, day, 'hourly', tp)
    daily_perfs = VimPerformanceDaily.process_hourly_for_one_day(hr_perfs, :time_profile => tp, :save => false)

    new_perf.merge!(daily_perfs.first) unless daily_perfs.first.nil?
    new_perf.reverse_merge!(orig_perf)

    new_perf
  end

  def self.rollup_realtime_perfs(obj, rt_perfs, new_perf = {})
    new_perf_counts = {}

    rt_perfs.each do |rt|
      Metric::Capture.capture_cols.each do |col|
        new_perf[col] ||= 0
        new_perf_counts[col] ||= 0

        value = rt.send(col)
        Metric::Aggregation::Aggregate.column(col, nil, new_perf, new_perf_counts, value)
      end

      next unless obj.kind_of?(VmOrTemplate)

      new_perf[:min_max] ||= {}
      BURST_COLS.each do |col|
        value = rt.send(col)
        rollup_burst(col, new_perf[:min_max], rt.timestamp, value)
      end
    end

    new_perf.each_key do |col|
      Metric::Aggregation::Process.column(col, nil, new_perf, new_perf_counts)
    end

    new_perf[:intervals_in_rollup] = Metric::Helper.max_count(new_perf_counts)

    new_perf
  end

  # @param obj parent object
  # @param timestamp [Time] for hourly, time is at the beginning of the hour
  # @param interval_name  ["realtime", "hourly", "historical"]
  # @param assoc [Symbol] name of the association to rollup
  def self.rollup_child_metrics(obj, timestamp, interval_name, assoc)
    timestamp = Time.parse(timestamp).utc if timestamp.kind_of?(String)
    ts = timestamp.utc.iso8601
    recs = obj.vim_performance_state_association(timestamp, assoc).to_a

    result = {}
    counts = {}

    agg_cols = aggregate_columns(obj.class, assoc, interval_name)
    agg_cols.each do |c|
      # Initialize aggregation col values and counts to zero before starting
      counts[c] = 0
      result[c] = 0
    end

    perf_recs = Metric::Finders.hash_by_capture_interval_name_and_timestamp(recs, ts, ts, interval_name)

    # Preload states for perf timestamp and the current hour.
    #   in a single query bring back all relevant performance states
    #   preload puts the states into rec's association where vim_performance_state_for_ts can find it
    #
    #   The scope passed in gets appied by rails to the rec.vim_performance_states
    #
    #   We are only concerned with the timestamp of interest.
    #   If the record for that time are not found, use the current performance_state (can't capture in the past)
    MiqPreloader.preload(recs, :vim_performance_states, VimPerformanceState.where(:timestamp => [ts, Metric::Helper.nearest_hourly_timestamp(Time.now.utc)])) unless recs.empty?

    recs.each do |rec|
      perf = perf_recs.fetch_path(rec.class.base_class.name, rec.id, interval_name, ts)
      next unless perf

      state = rec.vim_performance_state_for_ts(timestamp)
      agg_cols.each do |c|
        result[c] ||= 0
        counts[c] ||= 0
        value = perf ? perf.send(c) : 0
        Metric::Aggregation::Aggregate.column(c, state, result, counts, value, :average)
      end
    end

    agg_cols.each do |c|
      aggregate_only = !AVG_VALUE_COLUMNS.include?(c)
      Metric::Aggregation::Process.column(c, obj.vim_performance_state_for_ts(timestamp), result, counts, aggregate_only, :average)
    end

    result
  end

  def self.burst_col_names(type, col)
    prefix = "abs_#{type}_#{col}"
    return "#{prefix}_timestamp".to_sym, "#{prefix}_value".to_sym
  end

  def self.rollup_burst(c, result, timestamp, value, types = nil)
    Array.wrap(types || BURST_TYPES).each do |type|
      ts_key, val_key = burst_col_names(type, c)

      if new_min_max?(result[val_key], value, type)
        result[ts_key] = timestamp
        result[val_key] = value
      end
    end
  end

  def self.new_min_max?(existing, new_value, type)
    case type
    when 'min' then existing.nil? || (new_value && new_value < existing)
    when 'max' then existing.nil? || (new_value && new_value > existing)
    else false
    end
  end

  def self.rollup_min(c, result, value)
    key = "min_#{c}".to_sym
    result[key] = value if result[key].nil? || (value && value < result[key])
  end

  def self.rollup_max(c, result, value)
    key = "max_#{c}".to_sym
    result[key] = value if result[key].nil? || (value && value > result[key])
  end

  def self.rollup_assoc(c, result, value)
    return if value.nil?

    ASSOC_KEYS.each do |assoc|
      next if value[assoc].nil?

      result[c] ||= {}
      result[c][assoc] ||= {}

      [:on, :off].each do |mode|
        next if value[assoc][mode].nil?

        result[c][assoc][mode] ||= []
        result[c][assoc][mode].concat(value[assoc][mode]).uniq!
      end
    end
  end

  def self.rollup_tags(c, result, value)
    return if value.blank?

    result[c] ||= ""
    result[c] = result[c].split(TAG_SEP).concat(value.split(TAG_SEP)).uniq.join(TAG_SEP)
  end

  #
  # Gap collection
  #

  def self.perf_rollup_gap(start_time, end_time, interval_name, time_profile_id = nil)
    targets = find_distinct_resources
    return if targets.empty?

    _log.info("Queueing #{interval_name} rollups for range: [#{start_time} - #{end_time}]...")
    targets.each { |t| t.perf_rollup_range_queue(start_time, end_time, interval_name, time_profile_id, MiqQueue::LOW_PRIORITY) }
    _log.info("Queueing #{interval_name} rollups for range: [#{start_time} - #{end_time}]...Complete")
  end

  def self.perf_rollup_gap_queue(start_time, end_time, interval_name, time_profile_id = nil)
    MiqQueue.put_unless_exists(
      :class_name  => name,
      :method_name => "perf_rollup_gap",
      :priority    => MiqQueue::HIGH_PRIORITY,
      :args        => [start_time, end_time, interval_name, time_profile_id]
    )
  end

  def self.find_distinct_resources
    metrics = Metric.in_my_region.select("DISTINCT resource_type, resource_id")
    metric_rollups = MetricRollup.in_my_region.select("DISTINCT resource_type, resource_id")

    recs = (metrics + metric_rollups).group_by(&:resource_type)
    recs.keys.each { |k| recs[k] = recs[k].collect(&:resource_id).uniq }

    recs.each_with_object([]) do |(klass, ids), ret|
      ret.concat(klass.constantize.where(:id => ids))
    end
  end
end