ManageIQ/manageiq

View on GitHub
app/models/manageiq/providers/base_manager/metrics_capture.rb

Summary

Maintainability
A
25 mins
Test Coverage
A
92%
class ManageIQ::Providers::BaseManager::MetricsCapture
  include Vmdb::Logging

  attr_reader :target, :ems

  # @param target [Array[Host,Vm],Vm,Host] object(s) that needs perf capture
  def initialize(target, ems = nil)
    @target = target
    @ems = ems
  end

  def zone
    ems.zone
  end

  def my_zone
    ems.zone.name
  end

  def targets
    Array(target)
  end

  # Queue Capturing all metrics for an ems
  def perf_capture_all_queue
    perf_capture_health_check
    @target = filter_perf_capture_now(capture_ems_targets)
    perf_capture_queue("realtime", :rollups => true)
  end

  def perf_capture_gap(start_time, end_time)
    @target = capture_ems_targets(:exclude_storages => true)
    perf_capture_queue('historical', :start_time => start_time.utc, :end_time => end_time.utc)
  end

  def perf_capture_realtime_queue
    perf_capture_queue('realtime')
  end

  # @param [String] interval "realtime" or "historical". Storage uses "hourly"
  # @param [Boolean] rollups: typically true
  def perf_capture_queue(interval, start_time: nil, end_time: nil, rollups: false)
    if interval == "realtime" && Metric::Capture.historical_days != 0
      historical_start = Metric::Capture.historical_start_time
      historical_end   = 1.day.from_now.utc.beginning_of_day
    end

    targets_by_class = Array(@target).group_by { |t| t.class.base_class.name }
    targets_by_class.each do |class_name, class_targets|
      class_interval = class_name == "Storage" ? "hourly" : interval

      if class_name == "Host" && rollups # class_interval == realtime (only possible value if rollups == true)
        perf_capture_queue_targets_hosts(class_targets, class_interval, :start_time => start_time, :end_time => end_time)
      elsif class_interval == "historical"
        perf_capture_queue_targets_hist(class_targets, class_interval, :start_time => start_time, :end_time => end_time)
      else # class_interval == "realtime" or "hourly" (Storage)
        perf_capture_queue_targets(class_targets, class_interval, :start_time => start_time, :end_time => end_time)
      end

      # detect gaps and add add "historial" captures for a "realtime" capture
      if class_interval == "realtime" # implied: class_name != "Storage"
        perf_capture_queue_gap(class_targets, :historical_start => historical_start, :historical_end => historical_end)
      end
    end
  end

  # @param [Date|Nil] historical_start nil if no capture.
  # @param [Date|Nil] historical_end
  # @param [Date] realtime_cut_off
  # TODO: group historicals by last_perf_capture_on groupings (by hour?)
  # NOTE: if metrics is paused for a long time, we will send a lot of captures
  def perf_capture_queue_gap(targets, historical_start:, historical_end:)
    targets_already_captured, targets_not_captured = targets.partition(&:last_perf_capture_on)

    # targets with no captured metrics
    if historical_start
      perf_capture_queue_targets_hist(targets_not_captured, "historical", :start_time => historical_start, :end_time => historical_end)
    end

    # targets that haven't captured realtime metrics in a while
    # TODO: send these in batches instead of one offs (use split_capture_intervals logic to send 1 day at a time)
    # TODO: if it has been months, do we want a limit to the total time duration of these captures?
    realtime_cut_off = 4.hours.ago.utc.beginning_of_day
    gapped_targets = targets_already_captured.select { |t| t.last_perf_capture_on < realtime_cut_off }
    gapped_targets.each do |target|
      split_capture_intervals(target.last_perf_capture_on, realtime_cut_off).each do |s, e|
        perf_capture_queue_target(target, "historical", :start_time => s, :end_time => e)
      end
    end
  end

  # @param [String] interval only ever "historical"
  def perf_capture_queue_targets_hist(targets, interval, start_time:, end_time:)
    split_capture_intervals(start_time, end_time).each do |st, ed|
      perf_capture_queue_targets(targets, interval, :start_time => st, :end_time => ed)
    end
  end

  # @param [Array<Host>] targets for collection
  # @param [String] interval only ever "realtime"
  def perf_capture_queue_targets_hosts(targets, interval, start_time:, end_time:)
    targets.group_by(&:ems_cluster).each do |ems_cluster, hosts|
      perf_capture_queue_targets(hosts, interval, :start_time => start_time, :end_time => end_time, :parent => ems_cluster)
    end
  end

  # private, but called by ci_mixin/capture.rb
  def log_targets
    if targets.size == 1
      "[#{targets.first.class.name}], [#{targets.first.id}], [#{targets.first.name}]"
    else
      "[#{targets.map { |obj| obj.class.name }.uniq.join(", ")}], [#{targets.size} targets]"
    end
  end

  private

  def perf_capture_health_check
    q_items = MiqQueue.select(:method_name, :created_on).order(:created_on)
                      .where(:state       => "ready",
                             :role        => "ems_metrics_collector",
                             :method_name => %w[perf_capture perf_capture_realtime perf_capture_hourly perf_capture_historical],
                             :zone        => my_zone)
    items_by_interval = q_items.group_by(&:method_name)
    items_by_interval.reverse_merge!("perf_capture_realtime" => [], "perf_capture_hourly" => [], "perf_capture_historical" => [])
    items_by_interval.each do |method_name, items|
      interval = method_name.sub("perf_capture_", "")
      msg = "#{items.length} #{interval.inspect} captures on the queue for zone [#{my_zone}]"
      msg << " - oldest: [#{items.first.created_on.utc.iso8601}], recent: [#{items.last.created_on.utc.iso8601}]" if items.present?
      _log.info(msg)
    end
  end

  def filter_perf_capture_now(targets)
    targets.select do |target|
      if perf_capture_now?(target)
        true
      else
        _log.debug do
          "Skipping capture of #{target.log_target} -" +
            "Performance last captured on [#{target.last_perf_capture_on}] is within threshold"
        end
        false
      end
    end
  end

  def capture_ems_targets(_options = {})
    raise(NotImplementedError, _("must be implemented in subclass"))
  end

  # if it has not been run, or it was a very long time ago, just run it
  # if it has been run very recently (even too recently for realtime) then skip it
  # otherwise, it needs to be run if it is realtime, but not if it is standard threshold
  # assumes alert capture threshold <= standard capture threshold
  def perf_capture_now?(target)
    return true  if target.last_perf_capture_on.nil?
    return true  if target.last_perf_capture_on < Metric::Capture.standard_capture_threshold(target)
    return false if target.last_perf_capture_on >= Metric::Capture.alert_capture_threshold(target)

    MiqAlert.target_needs_realtime_capture?(target)
  end

  def create_rollup_task_for_cluster(ems_cluster, hosts)
    return unless ems_cluster

    pkey = "#{ems_cluster.class.name}:#{ems_cluster.id}"
    prev_task = MiqTask.where(:identifier => pkey).order("id DESC").first
    task_start_time = prev_task ? prev_task.context_data[:end] : 1.hour.ago.utc.iso8601
    task_end_time = Time.now.utc.iso8601

    MiqTask.create(
      :name         => "Performance rollup for #{pkey}",
      :identifier   => pkey,
      :state        => MiqTask::STATE_QUEUED,
      :status       => MiqTask::STATUS_OK,
      :message      => "Task has been queued",
      :context_data => {
        :start    => task_start_time,
        :end      => task_end_time,
        :parent   => pkey,
        :targets  => hosts.map { |target| "#{target.class}:#{target.id}" },
        :complete => [],
        :interval => "realtime"
      }
    ).tap do |task|
      _log.info("Created task id: [#{task.id}] for: [#{pkey}] with targets: #{hosts.inspect} for time range: [#{task_start_time} - #{task_end_time}]")
    end
  end

  # number of records to send at a time for a single capture message
  # @return [Numeric] 1 for no batching.  (defaults to 1)
  def concurrent_requests(interval_name)
    @concurrent_requests ||= ::Settings.dig(:ems, "ems_#{ems.provider_name.underscore}", :concurrent_requests) || {}
    ret = @concurrent_requests[interval_name].to_i
    ret == 0 ? 1 : ret
  end

  def perf_capture_queue_targets(targets, interval, start_time:, end_time:, parent: nil)
    batch_size = concurrent_requests(interval)
    if batch_size > 1
      # send all of the hosts in a cluster in one message and ignore batching
      #   hosts will rollup into the cluster, so the entire batch must be together otherwise
      #   we will require a create_rollup_task_for_cluster
      current_batch_size = parent ? targets.size : batch_size
      rollup = parent.present?
      targets.each_slice(current_batch_size) do |targets_batch|
        perf_capture_queue_target(targets_batch, interval, :start_time => start_time, :end_time => end_time, :rollup => rollup)
      end
    else # send individual messages
      task = create_rollup_task_for_cluster(parent, targets) if parent
      perf_capture_queue_targets_individually(targets, interval, :start_time => start_time, :end_time => end_time, :task_id => task&.id)
    end
  end

  # standard one message per object to collect
  def perf_capture_queue_targets_individually(targets, interval, start_time:, end_time:, task_id: nil)
    targets.each do |target|
      perf_capture_queue_target(target, interval, :start_time => start_time, :end_time => end_time, :task_id => task_id)
    rescue => err
      _log.warn("Failed to queue perf_capture for target [#{target.class.name}], [#{target.id}], [#{target.name}]: #{err}")
    end
  end

  def perf_capture_queue_target(target, interval_name, start_time:, end_time:, task_id: nil, rollup: false)
    if target.kind_of?(Array)
      target_ids = target.map(&:id) if target.size > 1
      target = target.first
    end

    # Queue up the actual items
    queue_item = {
      :class_name  => target.class.name,
      :method_name => "perf_capture_#{interval_name}",
      :priority    => Metric::Capture.interval_priority(interval_name),
      :instance_id => target.id,
      :role        => 'ems_metrics_collector',
      :queue_name  => ems.metrics_collector_queue_name,
      :zone        => my_zone,
      :state       => ['ready', 'dequeue'],
    }
    queue_item[:args] = [start_time, end_time, target_ids, rollup] if start_time || target_ids.present? || rollup

    return MiqQueue.put_unless_exists(queue_item) unless task_id

    # cb is the task used to group cluster realtime metrics
    cb = {:class_name => target.class.name, :instance_id => target.id, :method_name => :perf_capture_callback, :args => [[task_id]]}

    # reason for setting MiqQueue#miq_task_id is to initializes MiqTask.started_on column when message delivered.
    MiqQueue.create_with(:miq_task_id => task_id, :miq_callback => cb).put_or_update(queue_item) do |msg, qi|
      if msg.nil?
        # no message on queue. put on queue, but with default state
        qi.delete(:state)
        qi
      elsif msg.state == "ready"
        # rerun the job
        qi.delete(:state)
        existing_tasks = ((msg.miq_callback || {})[:args] || []).first || []
        qi[:miq_callback] = cb.merge(:args => [existing_tasks + [task_id]])
        qi
      else # state == "dequeue"
        _log.debug("Skipping capture of #{target.log_target} - Performance capture for interval #{interval_name} is still running")
        # NOTE: do not update the message queue
        nil
      end
    end
  end

  # split capture interval for historical queue captures (so the fetch is not too big)
  # an array of ordered pairs from start_time to end_time partitioned by each day. (most recent day first / reverse chronological)
  #
  # example:
  #
  #  start_time = 2017/01/01 12:00:00, end_time = 2017/01/04 12:00:00
  #  [[2017-01-03 12:00:00 UTC, 2017-01-04 12:00:00 UTC],
  #   [2017-01-02 12:00:00 UTC, 2017-01-03 12:00:00 UTC],
  #   [2017-01-01 12:00:00 UTC, 2017-01-02 12:00:00 UTC]]
  def split_capture_intervals(start_time, end_time, threshold = 1.day)
    return [] unless start_time

    (start_time.utc..end_time.utc).step_value(threshold).each_cons(2).collect do |s_time, e_time|
      [s_time, e_time]
    end.reverse
  end
end