ManageIQ/manageiq-providers-azure

View on GitHub
app/models/manageiq/providers/azure/inventory/collector.rb

Summary

Maintainability
A
2 hrs
Test Coverage
B
89%
class ManageIQ::Providers::Azure::Inventory::Collector < ManageIQ::Providers::Inventory::Collector
  # TODO: cleanup later when old refresh is deleted
  include ManageIQ::Providers::Azure::RefreshHelperMethods
  include Vmdb::Logging

  attr_reader :subscription_id, :stacks_resources_cache

  def initialize(manager, _target)
    super

    @ems = manager # used in helper methods

    # TODO(lsmola) this takes about 4s, see if we can optimize it
    @config          = manager.connect

    @subscription_id = @config.subscription_id
    @thread_limit    = (options.parallel_thread_limit || 0)
    @record_limit    = (options.targeted_api_collection_threshold || 500).to_i

    @enabled_deployments_caching = options.enabled_deployments_caching.nil? ? true : options.enabled_deployments_caching

    # Caches for optimizing fetching resources and templates of stacks
    @stacks_not_changed_cache = {}
    @stacks_resources_cache = {}
    @stacks_resources_api_cache = {}
    @instances_power_state_cache = {}
    @indexed_instance_account_keys_cache = {}

    @resource_to_stack = {}
    @template_uris     = {} # templates need to be download
    @template_refs     = {} # templates need to be retrieved from VMDB
    @template_directs  = {} # templates contents already got by API

    @nis      = network_interface_service(@config)
    @ips      = ip_address_service(@config)
    @vmm      = virtual_machine_service(@config)
    @asm      = availability_set_service(@config)
    @tds      = template_deployment_service(@config)
    @rgs      = resource_group_service(@config)
    @sas      = storage_account_service(@config)
    @sds      = storage_disk_service(@config)
    @marias   = mariadb_server_service(@config)
    @mariadbs = mariadb_database_service(@config)
    @mysqls   = mysql_server_service(@config)
    @mysqldbs = mysql_database_service(@config)
    @pgs      = postgresql_server_service(@config)
    @pgdbs    = postgresql_db_service(@config)
    @sqls     = sql_server_service(@config)
    @sqldbs   = sql_db_service(@config)
    @mis      = managed_image_service(@config)
    @vmis     = virtual_machine_image_service(@config, :location => @ems.provider_region)
    @vns      = virtual_network_service(@config)
    @nsg      = network_security_group_service(@config)
    @lbs      = load_balancer_service(@config)
    @rts      = route_table_service(@config)
  end

  ##############################################################
  # Shared helpers for full and targeted CloudManager collectors
  ##############################################################
  def managed_disks
    @managed_disks ||= collect_inventory(:managed_disks) { @sds.list_all }
  end

  def storage_accounts
    # We want to always limit storage accounts, to avoid loading all account keys in full refresh. Right now we want to
    # load just used storage accounts.
    return if instances.blank?

    used_storage_accounts = instances.map do |instance|
      disks = instance.properties.storage_profile.data_disks + [instance.properties.storage_profile.os_disk]
      disks.map do |disk|
        next if instance.managed_disk?
        disk_location = disk.try(:vhd).try(:uri)
        if disk_location
          uri = Addressable::URI.parse(disk_location)
          uri.host.split('.').first
        end
      end
    end.flatten.compact.to_set

    @storage_accounts ||= collect_inventory(:storage_accounts) { @sas.list_all }.select do |x|
      used_storage_accounts.include?(x.name)
    end
  end

  def stack_resources(deployment)
    cached_stack_resources = stacks_resources_api_cache[deployment.id]
    return cached_stack_resources if cached_stack_resources

    raw_stack_resources(deployment)
  end

  def power_status(instance)
    cached_power_state = instances_power_state_cache[instance.id]
    return cached_power_state if cached_power_state

    raw_power_status(instance)
  end

  def network_ports
    @network_ports ||= collect_inventory(:network_ports) { filter_my_region(@nis.list_all) }
  end

  def network_routers
    @network_routers ||= collect_inventory(:network_routers) { filter_my_region(@rts.list_all) }
  end

  def floating_ips
    @floating_ips ||= collect_inventory(:floating_ips) { filter_my_region(@ips.list_all) }
  end

  def instance_network_ports(instance)
    @indexed_network_ports ||= network_ports.index_by(&:id)

    instance.properties.network_profile.network_interfaces.map { |x| @indexed_network_ports[x.id] }.compact
  end

  def instance_floating_ip(public_ip_obj)
    @indexed_floating_ips ||= floating_ips.index_by(&:id)

    @indexed_floating_ips[public_ip_obj.id]
  end

  def instance_managed_disk(disk_location)
    @indexed_managed_disks ||= managed_disks.index_by { |x| x.id.downcase }

    @indexed_managed_disks[disk_location.downcase]
  end

  def instance_account_keys(storage_acct)
    instance_account_keys_advanced_caching unless @instance_account_keys_advanced_caching_done
    @instance_account_keys_advanced_caching_done = true

    indexed_instance_account_keys_cache[[storage_acct.name, storage_acct.resource_group]]
  end

  def instance_storage_accounts(storage_name)
    @indexes_instance_storage_accounts ||= storage_accounts.index_by { |x| x.name.downcase }

    @indexes_instance_storage_accounts[storage_name.downcase]
  end

  def stacks
    @stacks_cache ||= collect_inventory(:deployments) { stacks_in_parallel(@tds, 'list') }

    stacks_advanced_caching(@stacks_cache) unless @stacks_advanced_caching_done
    @stacks_advanced_caching_done = true

    @stacks_cache
  end

  def stack_templates
    stacks.each do |deployment|
      # Do not fetch templates for stacks we already have in DB and that haven't changed
      next if stacks_not_changed_cache[deployment.id]

      stack_template_hash(deployment)
    end

    # download all template uris
    _log.info("Retrieving templates...")
    @template_uris.each { |uri, template| template[:content] = download_template(uri) }
    _log.info("Retrieving templates...Complete - Count [#{@template_uris.count}]")
    _log.debug("Memory usage: #{'%.02f' % collector_memory_usage} MiB")

    (@template_uris.values + @template_directs.values).select do |raw|
      raw[:content]
    end
  end

  def stack_template_hash(deployment)
    direct_stack_template(deployment) || uri_stack_template(deployment)
  end

  def direct_stack_template(deployment)
    content = @tds.get_template(deployment.name, deployment.resource_group)
    init_template_hash(deployment, content.to_s).tap do |template_hash|
      @template_directs[deployment.id] = template_hash
    end
  rescue ::Azure::Armrest::ConflictException, ::Azure::Armrest::ForbiddenException
    # Templates were not saved for deployments created before 03/20/2016
    nil
  end

  def uri_stack_template(deployment)
    uri = deployment.properties.try(:template_link).try(:uri)
    return unless uri
    @template_uris[uri] ||
      init_template_hash(deployment).tap do |template_hash|
        @template_uris[uri] = template_hash
      end
  end

  def init_template_hash(deployment, content = nil)
    # If content is nil it is to be fetched
    ver = deployment.properties.try(:template_link).try(:content_version)
    {
      :description => "contentVersion: #{ver}",
      :name        => deployment.name,
      :uid         => deployment.id,
      :content     => content
    }
  end

  def download_template(uri)
    options = {
      :method      => 'get',
      :url         => uri,
      :proxy       => @config.proxy,
      :ssl_version => @config.ssl_version,
      :ssl_verify  => @config.ssl_verify
    }

    body = RestClient::Request.execute(options).body
    JSON.parse(body).to_s # normalize to remove white spaces
  rescue StandardError => e
    _log.error("Failed to download Azure template #{uri}. Reason: #{e.inspect}")
    nil
  end

  def resource_groups
    @resource_groups ||= collect_inventory(:resource_groups) { @rgs.list(:all => true) }
  end

  def flavors
    @flavors ||= collect_inventory(:series) do
      begin
        @vmm.series(@ems.provider_region)
      rescue ::Azure::Armrest::BadGatewayException, ::Azure::Armrest::GatewayTimeoutException,
             ::Azure::Armrest::BadRequestException => err
        _log.error("Error Class=#{err.class.name}, Message=#{err.message}")
        []
      end
    end
  end

  def flavors_by_name
    @flavors_by_name ||= flavors.index_by(&:name)
  end

  def availability_zones
    collect_inventory(:availability_zones) { [::Azure::Armrest::BaseModel.new(:name => @ems.name, :id => 'default')] }
  end

  def instances
    @instances_cache ||= collect_inventory(:instances) { filter_my_region(@vmm.list_all) }

    instances_power_state_advanced_caching(@instances_cache) unless @instances_advanced_caching_done
    @instances_advanced_caching_done = true

    @instances_cache
  end

  # The underlying method that gathers these images is a bit brittle.
  # Consequently, if it raises an error we just log it and move on so
  # that it doesn't affect the rest of inventory collection.
  #
  def images
    collect_inventory(:private_images) { @sas.list_all_private_images(:location => @ems.provider_region) }
  rescue ::Azure::Armrest::ApiException => err
    _log.warn("Unable to collect Azure private images for: [#{@ems.name}] - [#{@ems.id}]: #{err.message}")
    []
  end

  def managed_images
    collect_inventory(:managed_images) { filter_my_region(@mis.list_all) }
  end

  # Collect marketplace image information if configured to do so. Normally
  # users will specify images in their configuration file. If the option
  # to collect marketplace images is selected, but there are no images
  # specified in the configuration file, it will attempt to collect all
  # marketplace images, which is an expensive operation.
  #
  def market_images
    urns = options.market_image_urns

    if urns
      urns.collect do |urn|
        publisher, offer, sku, version = urn.split(':')

        ::Azure::Armrest::VirtualMachineImage.new(
          :location  => manager.provider_region,
          :publisher => publisher,
          :offer     => offer,
          :sku       => sku,
          :version   => version,
          :id        => urn
        )
      end
    else
      filter_my_region(@vmis.list_all)
    end
  end

  def cloud_networks
    filter_my_region(@vns.list_all)
  end

  def security_groups
    filter_my_region(@nsg.list_all)
  end

  def sql_servers
    @sql_servers ||= @sqls.list_all.select do |server|
      # SqlServer instances have a "user friendly" location name
      # e.g. "US East 2" rather than the more common "useast2" that the
      # `gather_data_for_this_region` method is expecting
      server.location == provider_region_description
    end
  end

  def sql_databases
    @sql_databases ||= sql_servers.flat_map do |sql_server|
      @sqldbs.list_all(sql_server.name, sql_server.resource_group).map { |db| [sql_server, db] }
    end
  end

  def mariadb_servers
    @mariadb_servers ||= filter_my_region(@marias.list_all)
  end

  def mariadb_databases
    @mariadb_databases ||= mariadb_servers.flat_map do |server|
      @mariadbs.list_all(server.name, server.resource_group).map { |db| [server, db] }
    end
  end

  def mysql_servers
    @mysql_servers ||= filter_my_region(@mysqls.list_all)
  end

  def mysql_databases
    @mysql_databases ||= mysql_servers.flat_map do |server|
      @mysqldbs.list_all(server.name, server.resource_group).map { |db| [server, db] }
    end
  end

  def postgresql_servers
    @postgresql_servers ||= filter_my_region(@pgs.list_all)
  end

  def postgresql_databases
    @postgresql_databases ||= postgresql_servers.flat_map do |server|
      @pgdbs.list_all(server.name, server.resource_group).map { |db| [server, db] }
    end
  end

  def load_balancers
    @load_balancers ||= filter_my_region(@lbs.list_all)
  end

  protected

  attr_reader :record_limit, :enabled_deployments_caching
  attr_writer :stacks_resources_cache
  attr_accessor :stacks_not_changed_cache, :stacks_resources_api_cache, :instances_power_state_cache,
                :indexed_instance_account_keys_cache

  # Do not use threads in test environment in order to avoid breaking specs.
  #
  # @return [Integer] Number of threads we will use for API collections
  def thread_limit
    Rails.env.test? ? 0 : @thread_limit
  end

  def stacks_resources_advanced_caching(stacks)
    return if stacks.blank?

    # Fetch resources for stack, but only the stacks that changed
    results = collect_inventory_targeted("stacks_resources") do
      Parallel.map(stacks, :in_threads => thread_limit) do |stack|
        [stack.id, raw_stack_resources(stack)]
      end
    end

    stacks_resources_api_cache.merge!(results.to_h)
  end

  def instances_power_state_advanced_caching(instances)
    return if instances.blank?

    if instances_power_state_cache.blank?
      results = collect_inventory_targeted("instance_power_states") do
        Parallel.map(instances, :in_threads => thread_limit) do |instance|
          [instance.id, raw_power_status(instance)]
        end
      end

      self.instances_power_state_cache = results.to_h
    end
  end

  def stacks_advanced_caching(stacks, refs = nil)
    if enabled_deployments_caching
      db_stacks_timestamps              = {}
      db_stacks_primary_keys            = {}
      db_stacks_primary_keys_to_ems_ref = {}

      query = manager.orchestration_stacks
      query = query.where(:ems_ref => refs) if refs

      query.find_each do |stack|
        db_stacks_timestamps[stack.ems_ref]         = stack.finish_time
        db_stacks_primary_keys[stack.ems_ref]       = stack.id
        db_stacks_primary_keys_to_ems_ref[stack.id] = stack.ems_ref
      end

      stacks.each do |deployment|
        next if (api_timestamp = deployment.properties.timestamp).blank?
        next if (db_timestamp = db_stacks_timestamps[deployment.id]).nil?

        api_timestamp = Time.parse(api_timestamp).utc
        db_timestamp = db_timestamp.utc
        # If there isn't a new version of stack, we take times are equal if the difference is below 1s
        next if (db_timestamp < api_timestamp) && ((db_timestamp - api_timestamp).abs > 1.0)

        stacks_not_changed_cache[deployment.id] = db_stacks_primary_keys[deployment.id]
      end

      # Cache resources from the DB
      not_changed_stacks_ids = db_stacks_primary_keys.values
      not_changed_stacks_ids.each_slice(1000) do |batch|
        manager.orchestration_stacks_resources.where(:stack_id => batch).each do |resource|
          ems_ref = db_stacks_primary_keys_to_ems_ref[resource.stack_id]
          next unless ems_ref

          (stacks_resources_cache[ems_ref] ||= []) << parse_db_resource(resource)
        end
      end

      # Cache resources from the API
      stacks_resources_advanced_caching(stacks.reject { |x| stacks_not_changed_cache[x.id] })
    end
  end

  def instance_account_keys_advanced_caching
    return if storage_accounts.blank?

    acc_keys = Parallel.map(storage_accounts, :in_threads => thread_limit) do |storage_acct|
      [
        [storage_acct.name, storage_acct.resource_group],
        collect_inventory(:account_keys) { @sas.list_account_keys(storage_acct.name, storage_acct.resource_group) }
      ]
    end

    indexed_instance_account_keys_cache.merge!(acc_keys.to_h)
  end

  def safe_targeted_request
    yield
  rescue ::Azure::Armrest::Exception => err
    _log.debug("Record not found Error Class=#{err.class.name}, Message=#{err.message}")
    nil
  end

  private

  def raw_stack_resources(deployment)
    group = deployment.resource_group
    name  = deployment.name

    resources = collect_inventory(:stack_resources) { @tds.list_deployment_operations(name, group) }
    # resources with provsioning_operation 'Create' are the ones created by this stack
    resources.select! do |resource|
      resource.properties.provisioning_operation =~ /^create$/i
    end

    resources
  rescue ::Azure::Armrest::Exception => err
    _log.debug("Records not found Error Class=#{err.class.name}, Message=#{err.message}")
    []
  end

  def raw_power_status(instance)
    view   = @vmm.get_instance_view(instance.name, instance.resource_group)
    status = view.statuses.find { |s| s.code =~ %r{^PowerState/} }
    status&.display_status
  rescue ::Azure::Armrest::NotFoundException
    'off' # Possible race condition caused by retirement deletion.
  end

  def parse_db_resource(resource)
    {
      :ems_ref                => resource.ems_ref,
      :name                   => resource.name,
      :logical_resource       => resource.logical_resource,
      :physical_resource      => resource.physical_resource,
      :resource_category      => resource.resource_category,
      :resource_status        => resource.resource_status,
      :resource_status_reason => resource.resource_status_reason,
      :last_updated           => resource.last_updated
    }
  end

  def stacks_in_parallel(arm_service, method_name)
    region = @ems.provider_region

    Parallel.map(resource_groups, :in_threads => thread_limit) do |resource_group|
      arm_service.send(method_name, resource_group.name).select do |resource|
        location = resource.respond_to?(:location) ? resource.location : resource_group.location
        location.casecmp(region).zero?
      end
    end.flatten
  end
end