ManageIQ/manageiq-providers-vmware

View on GitHub
app/models/manageiq/providers/vmware/infra_manager/inventory/collector.rb

Summary

Maintainability
A
0 mins
Test Coverage
D
68%
class ManageIQ::Providers::Vmware::InfraManager::Inventory::Collector
  include PropertyCollector
  include Vmdb::Logging

  def initialize(ems)
    @ems            = ems
    @exit_requested = false
    @cache          = cache_klass.new
    @saver          = saver_klass.new
    @vim_thread     = nil
  end

  def refresh
    self.exit_requested = true
    vim_collector
  end

  def start
    self.vim_thread = vim_collector_thread
  end

  def running?
    vim_thread&.alive?
  end

  def stop(join_timeout = 2.minutes)
    _log.info("#{log_header} Monitor updates thread exiting...")

    # The WaitOptions for WaitForUpdatesEx call sets maxWaitSeconds to 60 seconds
    self.exit_requested = true

    if vim_thread
      # Give the collector thread a chance to exit cleanly, then kill it to
      # ensure we don't have multiple collector threads running.
      result = vim_thread.join(join_timeout.to_f)
      vim_thread.kill if result.nil?
    end

    self.exit_requested = false
  end

  def restart(join_timeout = 2.minutes)
    stop(join_timeout)
    start
  end

  attr_accessor :cache, :categories_by_id, :ca_file, :tags_by_id, :tag_ids_by_attached_object

  private

  attr_reader   :ems, :saver
  attr_accessor :exit_requested, :vim_thread, :last_full_refresh

  def vim_collector_thread
    Thread.new { vim_collector }
  end

  def vim_collector
    _log.info("#{log_header} Monitor updates thread started")

    vim = vim_connect
    property_filter = create_property_filter(vim, ems_inventory_filter_spec(vim))

    _log.info("#{log_header} Refreshing initial inventory")
    version = full_refresh(vim, property_filter)
    _log.info("#{log_header} Refreshing initial inventory...Complete")

    until exit_requested
      version = full_refresh_needed? ? full_refresh(vim, property_filter) : targeted_refresh(vim, property_filter, version)
    end

    _log.info("#{log_header} Monitor updates thread exited")
  rescue => err
    _log.error("#{log_header} Refresh failed")
    _log.log_backtrace(err)

    ems.update(:last_refresh_error => err.to_s, :last_refresh_date => Time.now.utc)
  ensure
    destroy_property_filter(property_filter)
    disconnect(vim)
  end

  def full_refresh(vim, property_filter)
    persister = full_persister_klass.new(ems)
    parser    = parser_klass.new(self, persister)

    version, updated_objects = monitor_updates(vim, property_filter, "")

    cis = cis_connect(vim)

    collect_cis_taggings(cis) if cis.present?

    parse_updates(vim, parser, updated_objects)
    parse_storage_profiles(vim, parser)
    parse_content_libraries(cis, parser) if cis.present?

    save_inventory(vim, persister)

    self.last_full_refresh = Time.now.utc
    clear_cis_taggings

    version
  end

  def targeted_refresh(vim, property_filter, version)
    monitor_updates_start = Time.now.utc

    version, updated_objects = monitor_updates(vim, property_filter, version)
    if updated_objects.any?
      persister = targeted_persister_klass.new(ems)
      parser    = parser_klass.new(self, persister)

      parse_updates(vim, parser, updated_objects)
      save_inventory(vim, persister)

      # Prevent WaitForUpdatesEx from "spinning" in a tight loop if updates are
      # constantly available.  This allows for more updates to be batched together
      # making for more efficient saving and reducing the API call load on the VC.
      wait_time  = Time.now.utc - monitor_updates_start
      sleep_time = refresh_settings.update_poll_interval.to_i_with_method - wait_time

      sleep(sleep_time) if sleep_time > 0
    end

    version
  end

  def monitor_updates(vim, property_filter, version)
    updated_objects = []

    loop do
      update_set = wait_for_updates(vim, version)
      break if update_set.nil?

      version = update_set.version
      updated_objects.concat(process_update_set(property_filter, update_set))
      break unless update_set.truncated
    end

    return version, updated_objects
  end

  def vim_connect
    host = ems.hostname
    port = ems.port || 443
    username, password = ems.auth_user_pwd

    insecure = ems.verify_ssl == OpenSSL::SSL::VERIFY_NONE

    _log.info("#{log_header} Connecting to #{username}@#{host}...")

    self.ca_file = build_ca_file

    vim_opts = {
      :ns       => 'urn:vim25',
      :host     => host,
      :ssl      => true,
      :insecure => insecure,
      :ca_file  => ca_file&.path,
      :path     => '/sdk',
      :port     => port,
      :rev      => '6.5',
    }

    require 'rbvmomi'
    conn = RbVmomi::VIM.new(vim_opts).tap do |vim|
      vim.rev = vim.serviceContent.about.apiVersion
      vim.serviceContent.sessionManager.Login(:userName => username, :password => password)
    end

    _log.info("#{log_header} Connected")
    conn
  end

  def build_ca_file
    return if ems.certificate_authority.blank?

    Tempfile.new.tap do |f|
      f.write(ems.certificate_authority)
      f.close
    end
  end

  def pbm_connect(vim)
    require "rbvmomi/pbm"
    RbVmomi::PBM.connect(vim, :port => vim.http.port, :insecure => true)
  end

  def cis_connect(vim)
    return if vim.rev < '6.0' || vim.serviceContent.about.apiType != 'VirtualCenter'

    ems.connect(:service => :cis)
  end

  def disconnect(vim)
    return if vim.nil?

    # sessionManager.Logout and close the http connection
    begin
      vim.close
    rescue => err
      _log.warn("Failed to logout of session: #{err}")
    end

    # Cleanup the certificate authority file if it exists
    if ca_file
      ca_file.close
      ca_file.unlink
      self.ca_file = nil
    end
  rescue => err
    _log.warn("Failed to disconnect: #{err}")
  end

  def wait_for_updates(vim, version)
    # Return if we don't receive any updates for 60 seconds break
    # so that we can check if we are supposed to exit
    options = RbVmomi::VIM.WaitOptions(:maxWaitSeconds => 60)

    vim.propertyCollector.WaitForUpdatesEx(:version => version, :options => options)
  end

  def process_update_set(property_filter, update_set)
    property_filter_update = update_set.filterSet.to_a.detect { |update| update.filter == property_filter }
    return if property_filter_update.nil?

    object_update_set = property_filter_update.objectSet
    return if object_update_set.blank?

    _log.info("#{log_header} Processing #{object_update_set.count} updates...")

    updates = process_object_update_set(object_update_set)

    _log.info("#{log_header} Processing #{object_update_set.count} updates...Complete")

    updates
  end

  def parse_updates(vim, parser, updated_objects)
    parser.parse_ext_management_system(ems, vim.serviceContent.about)

    updated_objects.each do |managed_object, update_kind, cached_props|
      props = cached_props

      uncached_props = retrieve_uncached_props(managed_object) unless update_kind == "leave"
      props          = props.deep_merge(uncached_props) if uncached_props.present?

      retrieve_extra_props(managed_object, props)

      clear_managed_object_connection!(managed_object)

      parser.parse(managed_object, update_kind, props)
    rescue => err
      _log.warn("Failed to parse #{managed_object.class.wsdl_name}:#{managed_object._ref}: #{err}")
      _log.log_backtrace(err)
      raise
    end
  end

  def process_object_update_set(object_update_set)
    object_update_set.map do |object_update|
      process_object_update(object_update)
    end
  end

  def process_object_update(object_update)
    managed_object = object_update.obj

    log_object_update(object_update)

    props =
      case object_update.kind
      when "enter"
        process_object_update_enter(managed_object, object_update.changeSet, object_update.missingSet)
      when "modify"
        process_object_update_modify(managed_object, object_update.changeSet, object_update.missingSet)
      when "leave"
        process_object_update_leave(managed_object)
      end

    return managed_object, object_update.kind, props
  end

  def process_object_update_enter(obj, change_set, _missing_set = [])
    cache.insert(obj, process_change_set(change_set))
  end

  def process_object_update_modify(obj, change_set, _missing_set = [])
    cache.update(obj) do |props|
      process_change_set(change_set, props)
    end
  end

  def process_object_update_leave(obj)
    cache.delete(obj)
  end

  def retrieve_uncached_props(obj)
    prop_set = uncached_prop_set(obj)
    return if prop_set.nil?

    props = obj.collect!(*prop_set)
    return if props.nil?

    props.each_with_object({}) do |(name, val), result|
      h, prop_str = hash_target(result, name)
      tag, _key   = tag_and_key(prop_str)

      h[tag] = val
    end
  rescue RbVmomi::Fault => err
    _log.warn("Unable to retrieve uncached properties for #{obj.class.wsdl_name}:#{obj._ref}: #{err}")
    _log.log_backtrace
    nil
  end

  def uncached_prop_set(obj)
    @uncached_prop_set ||= {
      "HostSystem" => [
        "config.storageDevice.hostBusAdapter",
        "config.storageDevice.scsiLun",
        "config.storageDevice.scsiTopology.adapter",
      ]
    }.freeze

    return if obj.nil?

    @uncached_prop_set[obj.class.wsdl_name]
  end

  def retrieve_extra_props(obj, cached_props)
    case obj.class.wsdl_name
    when "CustomizationSpecManager"
      retrieve_customization_spec(obj, cached_props)
    end
  end

  def retrieve_customization_spec(spec_manager, cached_props)
    cached_props[:info].to_a.each do |spec_info|
      spec_info.props[:spec] = spec_manager.GetCustomizationSpec(:name => spec_info.name)&.spec
    rescue RbVmomi::Fault => err
      # Don't fail the refresh for issues with specific items
      _log.warn("Failed to get customization spec for [#{spec_info.name}]: #{err}")
    end
  end

  def parse_content_libraries(api_client, parser)
    require 'vsphere-automation-content'

    library_api      = VSphereAutomation::Content::LibraryApi.new(api_client)
    library_item_api = VSphereAutomation::Content::LibraryItemApi.new(api_client)

    library_ids = library_api.list&.value.to_a
    library_ids.each do |lib_id|
      library_item_ids = library_item_api.list(lib_id)&.value.to_a
      library_item_ids.to_a.each do |item_id|
        library_item = library_item_api.get(item_id)&.value
        parser.parse_content_library_item(library_item) if library_item
      end
    end
  rescue VSphereAutomation::ApiError, Timeout::Error => err
    _log.warn("Failed to collect Content Libraries: #{err}")
  end

  def collect_cis_taggings(api_client)
    tagging_category_api        = VSphereAutomation::CIS::TaggingCategoryApi.new(api_client)
    tagging_tag_api             = VSphereAutomation::CIS::TaggingTagApi.new(api_client)
    tagging_tag_association_api = VSphereAutomation::CIS::TaggingTagAssociationApi.new(api_client)

    category_ids     = tagging_category_api.list&.value
    tag_ids          = tagging_tag_api.list&.value

    categories       = category_ids.to_a.map { |category_id| tagging_category_api.get(category_id)&.value }.compact
    tags             = tag_ids.to_a.map { |tag_id| tagging_tag_api.get(tag_id)&.value }.compact

    self.categories_by_id = categories.index_by(&:id)
    self.tags_by_id       = tags.index_by(&:id)

    self.tag_ids_by_attached_object = Hash.new { |h, k| h[k] = Hash.new { |h1, k1| h1[k1] = [] } }

    tags.each do |tag|
      tagging_tag_association_api.list_attached_objects(tag.id)&.value.to_a.each do |obj|
        tag_ids_by_attached_object[obj.type][obj.id] << tag.id
      end
    end
  rescue VSphereAutomation::ApiError, Timeout::Error => err
    _log.warn("Failed to collect Taggings: #{err}")
  end

  # These are only collected for full refreshes, after a full they can be cleared
  # to free up memory and prevent targeted refreshes from trying to map labels
  def clear_cis_taggings
    self.categories_by_id = self.tags_by_id = self.tag_ids_by_attached_object = nil
  end

  def clear_managed_object_connection!(managed_object)
    managed_object.instance_variable_set(:@connection, nil)
    managed_object.instance_variable_set(:@soap, nil)
  end

  def parse_storage_profiles(vim, parser)
    pbm = pbm_connect(vim)

    profile_ids = pbm.serviceContent.profileManager.PbmQueryProfile(
      :resourceType => RbVmomi::PBM::PbmProfileResourceType(:resourceType => "STORAGE")
    )

    return if profile_ids.empty?

    storage_profiles = pbm.serviceContent.profileManager.PbmRetrieveContent(:profileIds => profile_ids)
    storage_profiles.to_a.each do |profile|
      persister_storage_profile = parser.parse(profile, "enter", profile.props)

      matching_hubs = pbm.serviceContent.placementSolver.PbmQueryMatchingHub(:profile => profile.profileId)
      matching_hubs.to_a.each do |placement_hub|
        next unless placement_hub.hubType == "Datastore"

        parser.parse_pbm_placement_hub(persister_storage_profile, placement_hub, "enter", placement_hub.props)
      end
    end
  rescue RbVmomi::Fault, Nokogiri::SyntaxError => err
    _log.warn("#{log_header} Unable to collect storage profiles: #{err}")
  end

  def save_inventory(vim, persister)
    with_heartbeat(vim) do
      saver.save_inventory(persister)
    end
  end

  def with_heartbeat(vim)
    should_exit = Concurrent::AtomicBoolean.new
    wakeup      = Concurrent::Event.new

    heartbeat_thread = Thread.new do
      loop do
        # Check should_exit before making the API call in case save_inventory
        # doesn't take a long time we don't want to waste an API call
        wakeup.wait(2.minutes)
        break if should_exit.true?

        # Run an API call periodically to ensure vCenter doesn't
        # timeout our session out from under us
        vim.serviceInstance.CurrentTime
      end
    end

    begin
      yield
    ensure
      should_exit.make_true
      wakeup.set

      # Give the thread a short duration to exit cleanly before
      # killing it and moving on
      heartbeat_thread.join(5) || heartbeat_thread.kill
    end
  end

  def log_header
    "EMS: [#{ems.name}], id: [#{ems.id}]"
  end

  def log_object_update(object_update)
    _log.debug do
      object_str = "#{object_update.obj.class.wsdl_name}:#{object_update.obj._ref}"

      s = "#{log_header} Object: [#{object_str}] Kind: [#{object_update.kind}]"
      if object_update.kind == "modify"
        prop_changes = object_update.changeSet.map(&:name).take(5).join(", ")
        prop_changes << ", ..." if object_update.changeSet.length > 5

        s << " Props: [#{prop_changes}]"
      end

      s
    end
  end

  def full_refresh_needed?
    (Time.now.utc - last_full_refresh) > full_refresh_interval
  end

  def full_refresh_interval
    (refresh_settings.refresh_interval || Settings.ems_refresh.refresh_interval).to_i_with_method
  end

  def refresh_settings
    Settings.ems_refresh.vmwarews
  end

  def cache_klass
    ManageIQ::Providers::Vmware::InfraManager::Inventory::Cache
  end

  def full_persister_klass
    ManageIQ::Providers::Vmware::InfraManager::Inventory::Persister::Full
  end

  def targeted_persister_klass
    ManageIQ::Providers::Vmware::InfraManager::Inventory::Persister::Targeted
  end

  def parser_klass
    ManageIQ::Providers::Vmware::InfraManager::Inventory::Parser
  end

  def saver_klass
    ManageIQ::Providers::Vmware::InfraManager::Inventory::Saver
  end
end