ManageIQ/manageiq

View on GitHub
app/models/vm_scan/dispatcher.rb

Summary

Maintainability
B
5 hrs
Test Coverage
A
91%
class VmScan
  class Dispatcher < Job::Dispatcher
    def initialize
      @vm = nil
      @all_busy_by_host_id_storage_id = {}
      @active_vm_scans_by_zone = nil
      @zone = nil
    end

    def dispatch
      _, total_time = Benchmark.realtime_block(:total_time) do
        jobs_to_dispatch, = Benchmark.realtime_block(:pending_vm_jobs) { pending_jobs }
        Benchmark.current_realtime[:vm_jobs_to_dispatch_count] = jobs_to_dispatch.length

        # Skip work if there are no jobs to dispatch
        if !jobs_to_dispatch.empty?
          Benchmark.realtime_block(:active_vm_scans) { active_vm_scans_by_zone }
          Benchmark.realtime_block(:busy_proxies) { busy_proxies }
          Benchmark.realtime_block(:busy_resources_for_embedded_scanning) { busy_resources_for_embedded_scanning }

          vms_for_jobs = jobs_to_dispatch.collect(&:target_id)
          @vms_for_dispatch_jobs, = Benchmark.realtime_block(:vm_find) do
            VmOrTemplate.where(:id => vms_for_jobs)
                        .includes(:ext_management_system => :zone, :storage => :hosts)
                        .order(:id)
          end

          concurrent_vm_scans_limit = zone.settings.blank? ? 0 : zone.settings[:concurrent_vm_scans].to_i

          jobs_to_dispatch.each do |job|
            if concurrent_vm_scans_limit > 0 && active_vm_scans_by_zone[zone_name] >= concurrent_vm_scans_limit
              _log.warn("SKIPPING remaining Vm Or Template jobs in dispatch since there are [%d] active scans in the zone [%s]" %
                        [active_vm_scans_by_zone[zone_name], zone_name])
              break
            end
            @vm = @vms_for_dispatch_jobs.detect { |v| v.id == job.target_id }
            if @vm.nil? # Handle job for VM that was deleted
              _log.warn("VM with id: [#{job.target_id}] no longer exists, aborting job [#{job.guid}]")
              job.signal(:abort, "VM with id: [#{job.target_id}] no longer exists, job aborted.", "warn")
              next
            end

            proxy = nil
            if @all_busy_by_host_id_storage_id["#{@vm.host_id}_#{@vm.storage_id}"]
              _log.debug("Skipping job id [#{job.id}] guid [#{job.guid}] for vm: [#{@vm.id}] in this dispatch since a prior job with the same host [#{@vm.host_id}] and storage [#{@vm.storage_id}] determined that all resources are busy.")
              next
            end

            begin
              eligible_proxies, = Benchmark.realtime_block(:get_eligible_proxies_for_job) { get_eligible_proxies_for_job(job) }
              proxy = eligible_proxies.detect do |p|
                Benchmark.current_realtime[:busy_proxy_count] += 1
                busy, = Benchmark.realtime_block(:busy_proxy) { busy_proxy?(p, job) }
                !busy
              end
            rescue => err
              _log.warn("#{err}, attempting to dispatch job [#{job.guid}], aborting job")
              job.signal(:abort, "Error [#{err}], attempting to dispatch, aborting job [#{job.guid}].", "error")
            end

            if proxy
              # Skip this embedded scan if the host/vc we'd need has already exceeded the limit
              next if embedded_resource_limit_exceeded?(job)

              _log.info("STARTING job: [#{job.guid}] on proxy: [#{proxy.name}]")
              Benchmark.current_realtime[:start_job_on_proxy_count] += 1
              Benchmark.realtime_block(:start_job_on_proxy) { start_job_on_proxy(job, proxy) }
            elsif @vm.host_id && @vm.storage_id && !@vm.template?
              _log.debug("Skipping job id [#{job.id}] guid [#{job.guid}] for vm: [#{@vm.id}] in this dispatch since no proxies/servers are available. Caching result for Vm's host [#{@vm.host_id}] and storage [#{@vm.storage_id}].")
              @all_busy_by_host_id_storage_id["#{@vm.host_id}_#{@vm.storage_id}"] = true
            end
          end
        end
      end

      _log.info("Complete - Timings: #{total_time.inspect}")
    end

    def queue_signal(job, options)
      Benchmark.current_realtime[:queue_signal_count] += 1
      Benchmark.realtime_block(:queue_signal) do
        return if options.blank?

        default_opts = {
          :class_name  => "Job",
          :method_name => "signal",
          :instance_id => job.id,
          :task_id     => job.guid,
          :priority    => MiqQueue::HIGH_PRIORITY,
          :role        => "smartstate"
        }

        default_opts[:zone] = job.zone if job.zone
        options = default_opts.merge(options)
        # special case signal(:abort) - so we can easily pull off the queue
        if (sig = options[:args].first) == :abort
          options[:args].shift # remove :abort from args
          options[:method_name] = "signal_abort"
        end
        MiqQueue.put_unless_exists(options) do |msg|
          _log.warn("Previous Job signal [#{sig}] for Job: [#{job.guid}] is still running, skipping...") unless msg.nil?
        end
      end
    end

    def start_job_on_proxy(job, proxy)
      assign_proxy_to_job(proxy, job)
      _log.info("Job #{job.attributes_log}")
      job_options = {:args => ["start"], :zone => MiqServer.my_zone, :server_guid => proxy.guid, :role => "smartproxy"}
      @active_vm_scans_by_zone[MiqServer.my_zone] += 1
      queue_signal(job, job_options)
    end

    def assign_proxy_to_job(proxy, job)
      job.miq_server_id   = proxy.id
      job.started_on      = Time.now.utc
      job.dispatch_status = "active"
      job.save

      # Increment the counts for busy proxies and busy hosts for embedded
      busy_proxies["MiqServer_#{job.miq_server_id}"] ||= 0
      busy_proxies["MiqServer_#{job.miq_server_id}"] += 1

      # Track the host/vc resource for embedded scans so we can limit the resource impact
      if (key = embedded_scan_resource(@vm))
        busy_resources_for_embedded_scanning[key] ||= 0
        busy_resources_for_embedded_scanning[key] += 1
      end
    end

    def busy_proxy?(proxy, _job)
      active_job_count = busy_proxies["#{proxy.class}_#{proxy.id}"]

      # If active is false there is nothing else to check
      return false if active_job_count.nil? || active_job_count == 0

      # If the agent only supports 1 concurrent instance we do not have to perform the count lookup
      concurrent_job_max, = Benchmark.realtime_block(:busy_proxy__concurrent_job_max) { concurrent_job_max_by_proxy(proxy) }
      return true if concurrent_job_max <= 1

      # Return if the active job count meets or exceeds the max allowed concurrent jobs for the agent
      if active_job_count >= concurrent_job_max
        return true
      end

      false
    end

    def concurrent_job_max_by_proxy(proxy)
      @max_concurrent_job_hash ||= {}
      key = "#{proxy.class.name}_#{proxy.id}"
      return @max_concurrent_job_hash[key] if @max_concurrent_job_hash.key?(key) && !@max_concurrent_job_hash[key].nil?

      @max_concurrent_job_hash[key] = proxy.concurrent_job_max
    end

    def busy_proxies
      @busy_proxies ||= job_class.where(:dispatch_status => "active")
                                 .where.not(:state => "finished")
                                 .select([:miq_server_id])
                                 .each_with_object({}) do |j, busy_hsh|
        busy_hsh["MiqServer_#{j.miq_server_id}"] ||= 0
        busy_hsh["MiqServer_#{j.miq_server_id}"] += 1
      end
    end

    def active_scans_by_zone(job_class, count = true)
      actives = Hash.new(0)
      jobs = job_class.where(:zone => zone_name, :dispatch_status => "active")
                      .where.not(:state => "finished")
      actives[zone_name] = count ? jobs.count : jobs
      actives
    end

    def active_vm_scans_by_zone
      @active_vm_scans_by_zone ||= active_scans_by_zone(job_class)
    end

    def busy_resources_for_embedded_scanning
      return @busy_resources_for_embedded_scanning_hash unless @busy_resources_for_embedded_scanning_hash.nil?

      _log.debug("Initializing busy_resources_for_embedding_scanning hash")
      @busy_resources_for_embedded_scanning_hash ||= {}

      vms_in_embedded_scanning =
        Job.where(:dispatch_status => "active")
           .where(:target_class => "VmOrTemplate")
           .where.not(:state => "finished")
           .pluck(:target_id).compact.uniq
      return @busy_resources_for_embedded_scanning_hash if vms_in_embedded_scanning.blank?

      embedded_scans_by_resource = Hash.new { |h, k| h[k] = 0 }
      VmOrTemplate.where(:id => vms_in_embedded_scanning).each do |v|
        key = embedded_scan_resource(v)
        embedded_scans_by_resource[key] += 1 if key
      end

      @busy_resources_for_embedded_scanning_hash = embedded_scans_by_resource
    end

    def embedded_scan_resource(vm)
      if vm.scan_via_ems?
        "ExtManagementSystem_#{vm.ems_id}" unless vm.ems_id.nil?
      else
        "Host_#{vm.host_id}" unless vm.host_id.nil?
      end
    end

    def embedded_resource_limit_exceeded?(job)
      return false unless job.target_class == "VmOrTemplate"

      if @vm.nil?
        job.signal(:abort, "Unable to find vm [#{job.target_id}], aborting job [#{job.guid}].", "error")
        return
      end

      if @vm.scan_via_ems?
        count_allowed = ::Settings.coresident_miqproxy.concurrent_per_ems.to_i
      else
        return false if @vm.host_id.nil? # e.g. EC2 images do not have hosts

        count_allowed = ::Settings.coresident_miqproxy.concurrent_per_host.to_i
      end

      return false if busy_resources_for_embedded_scanning.blank?

      begin
        count_allowed = 1 if count_allowed.zero?
        target_resource = embedded_scan_resource(@vm)
        count = busy_resources_for_embedded_scanning[target_resource]
        if count && count >= count_allowed
          return true
        end
      rescue
      end

      false
    end

    def get_eligible_proxies_for_job(job)
      Benchmark.current_realtime[:get_eligible_proxies_for_job_count] += 1

      if @vm.nil?
        msg = "Unable to find vm [#{job.target_id}], aborting job [#{job.guid}]."
        queue_signal(job, {:args => [:abort, msg, "error"]})
        return []
      end

      unless @vm.supports?(:smartstate_analysis)
        msg = @vm.unsupported_reason(:smartstate_analysis)
        queue_signal(job, {:args => [:abort, msg, "error"]})
        return []
      end

      vm_proxies, = Benchmark.realtime_block(:get_eligible_proxies_for_job__proxies4job) { @vm.proxies4job(job) }
      if vm_proxies[:proxies].empty?
        msg = "No eligible proxies for VM :[#{@vm.path}] - [#{vm_proxies[:message]}], aborting job [#{job.guid}]."
        queue_signal(job, {:args => [:abort, msg, "error"]})
        return []
      end

      vm_proxies[:proxies]
    end
  end
end