ManageIQ/manageiq-providers-vmware

View on GitHub
workers/event_catcher/event_catcher.rb

Summary

Maintainability
A
0 mins
Test Coverage
require_relative "event_parser"

class EventCatcher
  def initialize(ems, endpoint, authentication, settings, messaging, logger, page_size = 20)
    @ems            = ems
    @endpoint       = endpoint
    @authentication = authentication
    @logger         = logger
    @messaging      = messaging
    @page_size      = page_size
    @settings       = settings
  end

  def run!
    vim                     = connect
    event_history_collector = create_event_history_collector(vim, page_size)
    property_filter         = create_property_filter(vim, event_history_collector)

    notify_started

    log_prefix = "MIQ(ManageIQ::Providers::Vmware::InfraManager::EventCatcher)".freeze
    logger.info("#{log_prefix} Collecting events...")

    wait_for_updates(vim) do |property_change|
      logger.info("#{log_prefix} #{property_change.name}")
      next unless property_change.name.match?(/latestPage.*/)

      events = Array(property_change.val).map do |event|
        EventParser.parse_event(event).merge(:ems_id => ems["id"])
      end

      logger.info("#{log_prefix} events: [#{events.to_json}]")

      publish_events(events)
    end
  rescue Interrupt
    # Catch SIGINT
  ensure
    notify_stopping
    property_filter&.DestroyPropertyFilter
    event_history_collector&.DestroyCollector
    vim&.close
  end

  def stop!
  end

  private

  attr_reader :ems, :endpoint, :authentication, :logger, :messaging, :page_size, :settings

  def connect
    vim_opts = {
      :ns       => 'urn:vim25',
      :ssl      => true,
      :host     => endpoint["hostname"],
      :port     => endpoint["port"] || 443,
      :insecure => endpoint["verify_ssl"] == OpenSSL::SSL::VERIFY_NONE,
      :path     => '/sdk',
      :rev      => '7.0',
    }

    RbVmomi::VIM.new(vim_opts).tap do |vim|
      vim.rev = vim.serviceContent.about.apiVersion
      vim.serviceContent.sessionManager.Login(
        :userName => authentication["userid"],
        :password => authentication["password"]
      )
    end
  end

  def create_event_history_collector(vim, page_size)
    filter = RbVmomi::VIM.EventFilterSpec()

    event_manager = vim.serviceContent.eventManager
    event_manager.CreateCollectorForEvents(:filter => filter).tap do |c|
      c.SetCollectorPageSize(:maxCount => page_size)
    end
  end

  def create_property_filter(vim, event_history_collector)
    vim.propertyCollector.CreateFilter(
      :spec           => RbVmomi::VIM.PropertyFilterSpec(
        :objectSet => [
          RbVmomi::VIM.ObjectSpec(
            :obj => event_history_collector
          )
        ],
        :propSet   => [
          RbVmomi::VIM.PropertySpec(
            :type    => event_history_collector.class.wsdl_name,
            :all     => false,
            :pathSet => ["latestPage"]
          )
        ]
      ),
      :partialUpdates => true
    )
  end

  def wait_for_updates(vim, &block)
    version = nil
    options = RbVmomi::VIM.WaitOptions(:maxWaitSeconds => 60)

    loop do
      update_set = vim.propertyCollector.WaitForUpdatesEx(:version => version, :options => options)
      heartbeat
      next if update_set.nil?

      version = update_set.version

      Array(update_set.filterSet).each do |property_filter_update|
        Array(property_filter_update.objectSet).each do |object_update|
          next unless object_update.kind == "modify"

          Array(object_update.changeSet).each(&block)
        end
      end
    end
  end

  def publish_events(events)
    events.each do |event|
      messaging_client.publish_topic(
        :service => "manageiq.ems",
        :sender  => ems["id"],
        :event   => event[:event_type],
        :payload => event
      )
    end
  end

  def messaging_client
    @messaging_client ||= ManageIQ::Messaging::Client.open(
      messaging.merge(:client_ref => "vmware-event-catcher-#{ems["id"]}")
    )
  end

  def notify_started
    if ENV.fetch("NOTIFY_SOCKET", nil)
      SdNotify.ready
    elsif ENV.fetch("WORKER_HEARTBEAT_FILE", nil)
      heartbeat_to_file
    end
  end

  def heartbeat
    if ENV.fetch("NOTIFY_SOCKET", nil)
      SdNotify.watchdog
    elsif ENV.fetch("WORKER_HEARTBEAT_FILE", nil)
      heartbeat_to_file
    end
  end

  def notify_stopping
    SdNotify.stopping if ENV.fetch("NOTIFY_SOCKET", nil)
  end

  def heartbeat_to_file
    heartbeat_file = ENV.fetch("WORKER_HEARTBEAT_FILE")

    File.write(heartbeat_file, heartbeat_timeout)
  end

  def heartbeat_timeout
    Time.now.to_i + (settings.dig("worker_settings", "heartbeat_timeout") || 120)
  end
end