ManageIQ/manageiq-providers-openstack

View on GitHub
lib/manageiq/providers/openstack/legacy/events/openstack_ceilometer_event_monitor.rb

Summary

Maintainability
A
2 hrs
Test Coverage
C
77%
require 'manageiq/providers/openstack/legacy/openstack_event_monitor'
require 'manageiq/providers/openstack/legacy/events/openstack_event'
require 'manageiq/providers/openstack/legacy/events/openstack_ceilometer_event_converter'

class OpenstackCeilometerEventMonitor < OpenstackEventMonitor
  include TimezoneMixin

  def self.available?(options = {})
    return connect_service_from_settings(options[:ems]) if event_services.keys.include? event_service_settings
    begin
      @panko = true
      options[:ems].connect(:service => "Event")
      return true
    rescue MiqException::ServiceNotAvailable => ex
      @panko = false
      $log.debug("Skipping Openstack Panko events. Availability check failed with #{ex}. Trying Ceilometer.") if $log
      options[:ems].connect(:service => "Metering")
    end
  end

  def self.plugin_priority
    1
  end

  def initialize(options = {})
    @options = options
    @ems = options[:ems]
    @config = options.fetch(:ceilometer, {})
  end

  def start
    @since          = nil
    @monitor_events = true
  end

  def stop
    @monitor_events = false
  end

  def provider_connection
    return @provider_connection ||= self.class.connect_service_from_settings(@ems) if self.class.event_services.keys.include? self.class.event_service_settings
    begin
      @panko = true
      @provider_connection ||= @ems.connect(:service => "Event")
    rescue MiqException::ServiceNotAvailable => ex
      @panko = false
      $log.debug("Panko is not available, trying access events using Ceilometer (#{ex.inspect})") if $log
      @provider_connection = @ems.connect(:service => "Metering")
    end
  end

  def event_backread_seconds
    event_backread = Settings.fetch_path(:ems, :ems_openstack, :event_handling, :event_backread_seconds) || 0
    event_backread.seconds
  end

  def each_batch
    while @monitor_events
      $log.info("Querying OpenStack for events newer than #{latest_event_timestamp}...") if $log
      events = list_events(query_options).sort_by(&:generated)

      # Count back a few seconds to catch events that may have arrived in panko
      # out of order. OSP recommends return time in UTC. Skip time conversion when disabled.
      if event_backread_seconds < 1
        @since = events.last.generated unless events.empty?
      else
        with_a_timezone('UTC') do
          last_seen = events.last.generated unless events.empty?
          @since = (Time.zone.parse(last_seen) - event_backread_seconds).iso8601 if last_seen
        end
      end

      amqp_events = filter_unwanted_events(events).map do |event|
        converted_event = OpenstackCeilometerEventConverter.new(event)
        $log.debug("Processing a new OpenStack event: #{event.inspect}") if $log
        openstack_event(nil, converted_event.metadata, converted_event.payload)
      end

      yield amqp_events
    end
  end

  def each
    each_batch do |events|
      events.each { |e| yield e }
    end
  end

  def self.connect_service_from_settings(ems)
    $log.debug "#{_log.prefix} Using events provided by \"#{event_service_settings}\" service, which was set in settings.yml."
    @panko = (event_service_settings == "panko")
    ems.connect(:service => event_services[event_service_settings])
  end

  def self.event_service_settings
    Settings[:workers][:worker_base][:event_catcher][:event_catcher_openstack_service]
  rescue StandardError => err
    $log.warn "#{_log.prefix} Settings key :event_catcher_openstack_service is missing, #{err}."
    nil
  end

  def self.event_services
    {"panko" => "Event", "ceilometer" => "Metering"}
  end

  private

  def filter_unwanted_events(events)
    $log.debug("Received a new OpenStack events batch: (before filtering)") if $log && events.any?
    $log.debug(events.inspect) if $log && events.any?
    @event_type_regex ||= Regexp.new(@config[:event_types_regex].to_s)
    events.select { |event| @event_type_regex.match(event.event_type) }
  end

  def query_options
    options = [{
      'field' => 'start_timestamp',
      'op'    => 'ge',
      'value' => latest_event_timestamp || ''
    }]
    if @panko && tenant_sensitive?
      # all_tenants is not supported by ceilometer
      # and will cause no results to be returned,
      # so only include it if we're querying panko.
      options << {
        'field' => 'all_tenants',
        'value' => 'True'
      }
    end
    options
  end

  def list_events(query_options)
    provider_connection.list_events(query_options).body.map do |event_hash|
      begin
        Fog::OpenStack::Event::Event.new(event_hash)
      rescue NameError
        Fog::OpenStack::Metering::Event.new(event_hash)
      end
    end
  end

  def skip_history?
    Settings.fetch_path(:ems, :ems_openstack, :event_handling, :event_skip_history) || false
  end

  def latest_event_timestamp
    return @since if @since.present?

    @since = @ems.ems_events.maximum(:timestamp).try(:iso8601) || (skip_history? ? @ems.created_on.iso8601 : nil)
  end

  def tenant_sensitive?
    # keystone v2 doesn't accept all_tenants flag even in Panko
    @ems.respond_to?(:parent_manager) ? @ems.parent_manager.api_version != 'v2' : @ems.api_version != 'v2'
  end
end