app/models/metric/ci_mixin/processing.rb
module Metric::CiMixin::Processing
def perf_process(interval_name, start_time, end_time, counters_data)
unless Metric::Capture::VALID_CAPTURE_INTERVALS.include?(interval_name)
raise ArgumentError, _("invalid interval_name '%{name}'") % {:name => interval_name}
end
log_header = "[#{interval_name}]"
interval_orig = interval_name
interval_name = 'hourly' if interval_name == 'historical'
resources = counters_data.keys
_log.info("#{log_header} Processing for #{log_specific_targets(resources)}, for range [#{start_time} - #{end_time}]...")
_dummy, t = Benchmark.realtime_block(:total_time) do
# Take the raw metrics and create hashes out of them
rt_rows = {}
counters_data.each do |resource, data|
process_counter_values(resource, data, interval_name, rt_rows) ## rt_rows changes
end
parameters = convert_metrics(resources, interval_name, start_time, end_time, rt_rows)
write_metrics(parameters)
update_and_publish_metrics(interval_orig, start_time, end_time, resources, rt_rows)
end
_log.info("#{log_header} Processing for #{log_specific_targets(resources)}, for range [#{start_time} - #{end_time}]...Complete - Timings: #{t.inspect}")
end
def process_counter_values(resource, data, interval_name, rt_rows)
log_header = "[#{interval_name}]"
counters = data[:counters]
counter_values = data[:counter_values]
Benchmark.realtime_block(:process_counter_values) do
counter_values.each do |ts, cv|
ts = Metric::Helper.nearest_realtime_timestamp(ts) if interval_name == 'realtime'
col_vals = {}
cv.each do |counter_id, value|
counter = counters[counter_id]
next if counter.nil? || counter[:capture_interval_name] != interval_name
col = counter[:counter_key].to_sym
unless Metric.column_names_symbols.include?(col)
_log.debug("#{log_header} Column [#{col}] is not defined, skipping")
next
end
col_vals.store_path(col, counter[:instance], [value, counter])
end
col_vals.each do |col, values_by_instance|
# If there are multiple instances for a column, use the aggregate
# instance, if available, otherwise roll it up ourselves.
value, counter = values_by_instance[""]
if value.nil?
value = 0
counter = nil
values_by_instance.each_value do |v, c|
value += v
counter = c
end
end
# Create hashes for the rows
rt = (rt_rows[[ts, resource]] ||= {
:capture_interval_name => interval_name,
:capture_interval => counter[:capture_interval],
:resource => resource,
:resource_name => resource.name,
:timestamp => ts
})
rt[col], message = normalize_value(value, counter)
_log.warn("#{log_header} #{log_target} Timestamp: [#{ts}], Column [#{col}]: '#{message}'") if message
end
end
end
end
def convert_metrics(resources, interval_name, start_time, end_time, rt_rows)
if ActiveMetrics::Base.connection_config[:adapter] == "miq_postgres"
# We can just pass original data to PG, with metrics grouped by timestamps, since that is how
# we store to PG now. It will spare quite some memory and time to not convert it to row_per_metric
# and than back to original format.
transform_parameters_row_with_all_metrics(resources, interval_name, start_time, end_time, rt_rows)
else
transform_parameters_row_per_metric(resources, interval_name, start_time, end_time, rt_rows)
end
end
def write_metrics(parameters)
ActiveMetrics::Base.connection.write_multiple(parameters)
end
def update_and_publish_metrics(interval_orig, start_time, end_time, resources, rt_rows)
resources.each do |resource|
resource.update_attribute(:last_perf_capture_on, end_time) if resource.last_perf_capture_on.nil? || resource.last_perf_capture_on.utc.iso8601 < end_time
end
# Raise <class>_perf_complete alert event if realtime so alerts can be evaluated.
resources.each do |resource|
MiqEvent.raise_evm_alert_event_queue(resource, MiqEvent.event_name_for_target(resource, "perf_complete"))
end
resources.each do |resource|
resource.perf_rollup_to_parents(interval_orig, start_time, end_time)
end
publish_metrics(rt_rows) if syndicate_metrics?
end
private
def transform_parameters_row_per_metric(_resources, interval_name, _start_time, _end_time, rt_rows)
rt_rows.flat_map do |(ts, _resource), rt|
rt.merge!(Metric::Processing.process_derived_columns(rt[:resource], rt, interval_name == 'realtime' ? Metric::Helper.nearest_hourly_timestamp(ts) : nil))
rt.delete_nils
rt_tags = rt.slice(:capture_interval_name, :capture_interval, :resource_name).symbolize_keys
rt_fields = rt.except(:capture_interval_name,
:capture_interval,
:resource_name,
:timestamp,
:instance_id,
:class_name,
:resource,
:resource_type,
:resource_id)
rt_fields.map do |k, v|
{
:timestamp => ts,
:metric_name => k,
:value => v,
:resource => rt[:resource],
:tags => rt_tags
}
end
end
end
def transform_parameters_row_with_all_metrics(resources, interval_name, start_time, end_time, rt_rows)
obj_perfs, = Benchmark.realtime_block(:db_find_prev_perfs) do
Metric::Finders.find_all_by_range(resources, start_time, end_time, interval_name).find_each.each_with_object({}) do |p, h|
data, = Benchmark.realtime_block(:get_attributes) do
# TODO(lsmola) calling .attributes takes more time than actually saving all the samples, try to fetch pure
# arrays from the PG
p.attributes.delete_nils
end
h.store_path([p.resource_type, p.resource_id, p.capture_interval_name, p.timestamp.utc.iso8601], data.symbolize_keys)
end
end
Benchmark.realtime_block(:preload_vim_performance_state_for_ts) do
# Make sure we preload all vim_performance_state_for_ts to avoid n+1 queries
condition = if start_time.nil?
nil
elsif start_time == end_time
{:timestamp => start_time}
elsif end_time.nil?
VimPerformanceState.arel_table[:timestamp].gteq(start_time)
else
{:timestamp => start_time..end_time}
end
resources.each { |r| r.preload_vim_performance_state_for_ts_iso8601(condition) }
end
Benchmark.realtime_block(:process_perfs) do
rt_rows.each do |(ts, _resource), rt|
rt[:resource_id] = rt[:resource].id
rt[:resource_type] = rt[:resource].class.base_class.name
if (perf = obj_perfs.fetch_path([rt[:resource_type], rt[:resource_id], interval_name, ts]))
rt.reverse_merge!(perf)
rt.delete(:id) # Remove protected attributes
end
rt.merge!(Metric::Processing.process_derived_columns(rt[:resource], rt, interval_name == 'realtime' ? Metric::Helper.nearest_hourly_timestamp(ts) : nil))
end
end
# Assign nil so GC can clean it up
obj_perfs = nil
return resources, interval_name, start_time, end_time, rt_rows
end
def normalize_value(value, counter)
return counter[:rollup] == 'latest' ? nil : 0 if value < 0
value = value.to_f * counter[:precision]
message = nil
percent_norm = 100.0
if counter[:unit_key] == "percent" && value > percent_norm
message = "percent value #{value} is out of range, resetting to #{percent_norm}"
value = percent_norm
end
return value, message
end
def publish_metrics(metrics)
metrics.each_value do |metric|
resource = metric.delete(:resource)
metric[:parent_ems_type] = resource.ext_management_system&.class&.ems_type
metric[:parent_ems_uid] = resource.ext_management_system&.uid_ems
metric[:resource_type] = resource.class.base_class.name
metric[:resource_id] = resource.id
metric[:resource_manager_ref] = resource.ems_ref if resource.respond_to?(:ems_ref)
metric[:resource_manager_uid] = resource.uid_ems if resource.respond_to?(:uid_ems)
MiqQueue.messaging_client("metrics_capture")&.publish_topic(
:service => "manageiq.metrics",
:sender => resource.ext_management_system&.id,
:event => "metrics",
:payload => metric
)
end
rescue => err
_log.warn("Failed to publish metrics: #{err}")
_log.log_backtrace(err)
end
def syndicate_metrics?
Settings.performance.syndicate_metrics && MiqQueue.messaging_type != "miq_queue"
end
end