ManageIQ/manageiq

View on GitHub
app/models/metric/ci_mixin/processing.rb

Summary

Maintainability
A
2 hrs
Test Coverage
B
86%
module Metric::CiMixin::Processing
  def perf_process(interval_name, start_time, end_time, counters_data)
    unless Metric::Capture::VALID_CAPTURE_INTERVALS.include?(interval_name)
      raise ArgumentError, _("invalid interval_name '%{name}'") % {:name => interval_name}
    end

    log_header = "[#{interval_name}]"

    interval_orig = interval_name
    interval_name = 'hourly' if interval_name == 'historical'

    resources = counters_data.keys

    _log.info("#{log_header} Processing for #{log_specific_targets(resources)}, for range [#{start_time} - #{end_time}]...")

    _dummy, t = Benchmark.realtime_block(:total_time) do
      # Take the raw metrics and create hashes out of them
      rt_rows = {}

      counters_data.each do |resource, data|
        process_counter_values(resource, data, interval_name, rt_rows) ## rt_rows changes
      end

      parameters = convert_metrics(resources, interval_name, start_time, end_time, rt_rows)
      write_metrics(parameters)
      update_and_publish_metrics(interval_orig, start_time, end_time, resources, rt_rows)
    end
    _log.info("#{log_header} Processing for #{log_specific_targets(resources)}, for range [#{start_time} - #{end_time}]...Complete - Timings: #{t.inspect}")
  end

  def process_counter_values(resource, data, interval_name, rt_rows)
    log_header = "[#{interval_name}]"

    counters       = data[:counters]
    counter_values = data[:counter_values]

    Benchmark.realtime_block(:process_counter_values) do
      counter_values.each do |ts, cv|
        ts = Metric::Helper.nearest_realtime_timestamp(ts) if interval_name == 'realtime'

        col_vals = {}
        cv.each do |counter_id, value|
          counter = counters[counter_id]
          next if counter.nil? || counter[:capture_interval_name] != interval_name

          col = counter[:counter_key].to_sym
          unless Metric.column_names_symbols.include?(col)
            _log.debug("#{log_header} Column [#{col}] is not defined, skipping")
            next
          end

          col_vals.store_path(col, counter[:instance], [value, counter])
        end

        col_vals.each do |col, values_by_instance|
          # If there are multiple instances for a column, use the aggregate
          #   instance, if available, otherwise roll it up ourselves.
          value, counter = values_by_instance[""]
          if value.nil?
            value = 0
            counter = nil
            values_by_instance.each_value do |v, c|
              value += v
              counter = c
            end
          end

          # Create hashes for the rows
          rt = (rt_rows[[ts, resource]] ||= {
            :capture_interval_name => interval_name,
            :capture_interval      => counter[:capture_interval],
            :resource              => resource,
            :resource_name         => resource.name,
            :timestamp             => ts
          })
          rt[col], message = normalize_value(value, counter)
          _log.warn("#{log_header} #{log_target} Timestamp: [#{ts}], Column [#{col}]: '#{message}'") if message
        end
      end
    end
  end

  def convert_metrics(resources, interval_name, start_time, end_time, rt_rows)
    if ActiveMetrics::Base.connection_config[:adapter] == "miq_postgres"
      # We can just pass original data to PG, with metrics grouped by timestamps, since that is how
      # we store to PG now. It will spare quite some memory and time to not convert it to row_per_metric
      # and than back to original format.
      transform_parameters_row_with_all_metrics(resources, interval_name, start_time, end_time, rt_rows)
    else
      transform_parameters_row_per_metric(resources, interval_name, start_time, end_time, rt_rows)
    end
  end

  def write_metrics(parameters)
    ActiveMetrics::Base.connection.write_multiple(parameters)
  end

  def update_and_publish_metrics(interval_orig, start_time, end_time, resources, rt_rows)
    resources.each do |resource|
      resource.update_attribute(:last_perf_capture_on, end_time) if resource.last_perf_capture_on.nil? || resource.last_perf_capture_on.utc.iso8601 < end_time
    end

    # Raise <class>_perf_complete alert event if realtime so alerts can be evaluated.
    resources.each do |resource|
      MiqEvent.raise_evm_alert_event_queue(resource, MiqEvent.event_name_for_target(resource, "perf_complete"))
    end

    resources.each do |resource|
      resource.perf_rollup_to_parents(interval_orig, start_time, end_time)
    end

    publish_metrics(rt_rows) if syndicate_metrics?
  end

  private

  def transform_parameters_row_per_metric(_resources, interval_name, _start_time, _end_time, rt_rows)
    rt_rows.flat_map do |(ts, _resource), rt|
      rt.merge!(Metric::Processing.process_derived_columns(rt[:resource], rt, interval_name == 'realtime' ? Metric::Helper.nearest_hourly_timestamp(ts) : nil))
      rt.delete_nils
      rt_tags   = rt.slice(:capture_interval_name, :capture_interval, :resource_name).symbolize_keys
      rt_fields = rt.except(:capture_interval_name,
                            :capture_interval,
                            :resource_name,
                            :timestamp,
                            :instance_id,
                            :class_name,
                            :resource,
                            :resource_type,
                            :resource_id)

      rt_fields.map do |k, v|
        {
          :timestamp   => ts,
          :metric_name => k,
          :value       => v,
          :resource    => rt[:resource],
          :tags        => rt_tags
        }
      end
    end
  end

  def transform_parameters_row_with_all_metrics(resources, interval_name, start_time, end_time, rt_rows)
    obj_perfs, = Benchmark.realtime_block(:db_find_prev_perfs) do
      Metric::Finders.find_all_by_range(resources, start_time, end_time, interval_name).find_each.each_with_object({}) do |p, h|
        data, = Benchmark.realtime_block(:get_attributes) do
          # TODO(lsmola) calling .attributes takes more time than actually saving all the samples, try to fetch pure
          # arrays from the PG
          p.attributes.delete_nils
        end
        h.store_path([p.resource_type, p.resource_id, p.capture_interval_name, p.timestamp.utc.iso8601], data.symbolize_keys)
      end
    end

    Benchmark.realtime_block(:preload_vim_performance_state_for_ts) do
      # Make sure we preload all vim_performance_state_for_ts to avoid n+1 queries
      condition = if start_time.nil?
                    nil
                  elsif start_time == end_time
                    {:timestamp => start_time}
                  elsif end_time.nil?
                    VimPerformanceState.arel_table[:timestamp].gteq(start_time)
                  else
                    {:timestamp => start_time..end_time}
                  end

      resources.each { |r| r.preload_vim_performance_state_for_ts_iso8601(condition) }
    end

    Benchmark.realtime_block(:process_perfs) do
      rt_rows.each do |(ts, _resource), rt|
        rt[:resource_id]   = rt[:resource].id
        rt[:resource_type] = rt[:resource].class.base_class.name

        if (perf = obj_perfs.fetch_path([rt[:resource_type], rt[:resource_id], interval_name, ts]))
          rt.reverse_merge!(perf)
          rt.delete(:id) # Remove protected attributes
        end

        rt.merge!(Metric::Processing.process_derived_columns(rt[:resource], rt, interval_name == 'realtime' ? Metric::Helper.nearest_hourly_timestamp(ts) : nil))
      end
    end
    # Assign nil so GC can clean it up
    obj_perfs = nil

    return resources, interval_name, start_time, end_time, rt_rows
  end

  def normalize_value(value, counter)
    return counter[:rollup] == 'latest' ? nil : 0 if value < 0

    value = value.to_f * counter[:precision]

    message = nil
    percent_norm = 100.0
    if counter[:unit_key] == "percent" && value > percent_norm
      message = "percent value #{value} is out of range, resetting to #{percent_norm}"
      value = percent_norm
    end
    return value, message
  end

  def publish_metrics(metrics)
    metrics.each_value do |metric|
      resource = metric.delete(:resource)

      metric[:parent_ems_type] = resource.ext_management_system&.class&.ems_type
      metric[:parent_ems_uid]  = resource.ext_management_system&.uid_ems

      metric[:resource_type] = resource.class.base_class.name
      metric[:resource_id]   = resource.id

      metric[:resource_manager_ref] = resource.ems_ref if resource.respond_to?(:ems_ref)
      metric[:resource_manager_uid] = resource.uid_ems if resource.respond_to?(:uid_ems)

      MiqQueue.messaging_client("metrics_capture")&.publish_topic(
        :service => "manageiq.metrics",
        :sender  => resource.ext_management_system&.id,
        :event   => "metrics",
        :payload => metric
      )
    end
  rescue => err
    _log.warn("Failed to publish metrics: #{err}")
    _log.log_backtrace(err)
  end

  def syndicate_metrics?
    Settings.performance.syndicate_metrics && MiqQueue.messaging_type != "miq_queue"
  end
end