app/models/ems_event.rb
class EmsEvent < EventStream
include Automate
CLONE_TASK_COMPLETE = "CloneVM_Task_Complete"
SOURCE_DEST_TASKS = [
'CloneVM_Task',
'MarkAsTemplate',
'MigrateVM_Task',
'RelocateVM_Task',
'Rename_Task',
]
CLASS_GROUP_LEVELS = %i[critical warning].freeze
def self.description
_("Management Events")
end
def self.class_group_levels
CLASS_GROUP_LEVELS
end
def self.group_names_and_levels
event_groups.each_with_object(default_group_names_and_levels) do |(group_name, group_details), hash|
hash[:group_names][group_name] = group_details[:name]
group_details.each_key do |level|
next if level == :name
hash[:group_levels] ||= {}
hash[:group_levels][level] ||= level.to_s.capitalize
end
end
end
def self.event_groups
@event_groups ||= begin
core_event_groups = ::Settings.event_handling.event_groups.to_hash
Settings.ems.each_with_object(core_event_groups) do |(_provider_type, provider_settings), event_groups|
provider_event_groups = provider_settings.fetch_path(:event_handling, :event_groups)
next unless provider_event_groups
DeepMerge.deep_merge!(
provider_event_groups.to_hash, event_groups,
:preserve_unmergeables => false,
:overwrite_arrays => false
)
end
end
end
private_class_method def self.partition_group_and_level_by_event_type
return @literal_group_and_level_by_event_type, @regex_group_and_level_by_event_type if @literal_group_and_level_by_event_type
@literal_group_and_level_by_event_type = {}
@regex_group_and_level_by_event_type = {}
event_groups.each do |group_name, group_contents|
group_contents.each do |group_level, event_types|
next if group_level == :name
event_types.each do |event_type|
if event_type.starts_with?("/")
@regex_group_and_level_by_event_type[Regexp.new(event_type[1..-2])] ||= [group_name, group_level]
else
@literal_group_and_level_by_event_type[event_type] ||= [group_name, group_level]
end
end
end
end
return @literal_group_and_level_by_event_type, @regex_group_and_level_by_event_type
end
def self.clear_event_groups_cache
@event_groups = @literal_group_and_level_by_event_type = @regex_group_and_level_by_event_type = nil
end
def self.group_and_level(event_type)
by_literal, by_regex = partition_group_and_level_by_event_type
by_literal[event_type] ||
by_regex.detect { |regex, _| regex.match?(event_type) }&.last ||
[DEFAULT_GROUP_NAME, DEFAULT_GROUP_LEVEL]
end
def self.group_name(group)
return if group.nil?
event_groups.dig(group.to_sym, :name) || DEFAULT_GROUP_NAME_STR
end
def handle_event
EmsEventHelper.new(self).handle
rescue => err
_log.log_backtrace(err)
end
def self.task_final_events
::Settings.event_handling.task_final_events.to_hash
end
def self.bottleneck_event_groups
::Settings.event_handling.bottleneck_event_groups.to_hash
end
def self.add_queue(meth, ems_id, event)
if MiqEventHandler.worker_settings[:dequeue_method] != "drb" && MiqQueue.messaging_client('event_handler').present?
MiqQueue.messaging_client('event_handler').publish_topic(
:service => "manageiq.#{MiqEventHandler.default_queue_name}",
:sender => ems_id,
:event => event[:event_type],
:payload => event
)
else
MiqQueue.submit_job(
:service => "event",
:target_id => ems_id,
:class_name => "EmsEvent",
:method_name => meth,
:args => [event]
)
end
end
def self.add(ems_id, event_hash)
event_type = event_hash[:event_type]
raise MiqException::Error, _("event_type must be set in event") if event_type.nil?
event_hash[:ems_id] = ems_id
process_vm_in_event!(event_hash)
process_vm_in_event!(event_hash, :prefix => "dest_")
process_host_in_event!(event_hash)
process_host_in_event!(event_hash, :prefix => "dest_")
process_availability_zone_in_event!(event_hash)
process_cluster_in_event!(event_hash)
process_container_entities_in_event!(event_hash)
process_physical_storage_in_event!(event_hash)
# Write the event
new_event = create_event(event_hash)
return if new_event.nil? # If the event is a duplicate skip further processing
# Create a 'completed task' event if this is the last in a series of events
create_completed_event(event_hash) if task_final_events.key?(event_type.to_sym)
syndicate_event(ems_id, event_hash) if syndicate_events?
new_event
end
def self.process_object_in_event!(klass, event, options = {})
prefix = options[:prefix]
key_prefix = options[:key_prefix] || klass.name.underscore
id_key = options[:id_key] || "#{prefix}#{key_prefix}_id".to_sym
ems_ref_key = options[:ems_ref_key] || "#{prefix}#{key_prefix}_ems_ref".to_sym
name_key = options[:name_key] || "#{prefix}#{key_prefix}_name".to_sym
if event[id_key].nil?
ems_ref = event[ems_ref_key]
object = klass.base_class.find_by(:ems_ref => ems_ref, :ems_id => event[:ems_id]) unless ems_ref.nil?
unless object.nil?
event[id_key] = object.id
event[name_key] ||= object.name
end
end
end
def self.process_vm_in_event!(event, options = {})
prefix = options[:prefix]
options[:id_key] = "#{prefix}vm_or_template_id".to_sym
uid_ems = event.delete(:vm_uid_ems)
process_object_in_event!(Vm, event, options)
if options[:id_key] == :vm_or_template_id && event[:vm_or_template_id].nil?
vm = VmOrTemplate.find_by(:uid_ems => uid_ems) unless uid_ems.nil?
unless vm.nil?
event[:vm_or_template_id] = vm.id
event[:vm_name] ||= vm.name
end
end
end
def self.process_host_in_event!(event, options = {})
uid_ems = event.delete(:host_uid_ems)
process_object_in_event!(Host, event, options)
if event[:host_id].nil? && uid_ems.present?
# Attempt to find a host in the current EMS first, then fallback to archived hosts
host = Host.where(:uid_ems => uid_ems, :ems_id => [event[:ems_id], nil]).order("ems_id NULLS LAST").first
unless host.nil?
event[:host_id] = host.id
event[:host_name] ||= host.name
end
end
end
def self.process_physical_storage_in_event!(event, options = {})
process_object_in_event!(PhysicalStorage, event, options)
end
def self.process_container_entities_in_event!(event, _options = {})
[ContainerNode, ContainerGroup, ContainerReplicator].each do |entity|
process_object_in_event!(entity, event)
end
end
def self.process_availability_zone_in_event!(event, options = {})
process_object_in_event!(AvailabilityZone, event, options)
if event[:availability_zone_id].nil? && event[:vm_or_template_id]
vm = VmOrTemplate.find(event[:vm_or_template_id])
if vm.respond_to?(:availability_zone)
availability_zone = vm.availability_zone
unless availability_zone.nil?
event[:availability_zone_id] = availability_zone.id
end
end
end
# there's no "availability_zone_name" column in ems_event
# availability_zone_name may be added by process_vm_in_event
# prevent EmsEvent from trying to set the event attribute for availability_zone_name
event.delete(:availability_zone_name)
end
def self.process_cluster_in_event!(event, options = {})
process_object_in_event!(EmsCluster, event, options)
end
def self.first_chained_event(ems_id, chain_id)
return nil if chain_id.nil?
EmsEvent.where(:ems_id => ems_id, :chain_id => chain_id).order(:id).first
end
def parse_event_metadata
data = full_data || {}
[
event_type == "datawarehouse_alert" ? message : nil,
data[:severity],
data[:url],
data[:resolved],
]
end
def first_chained_event
@first_chained_event ||= EmsEvent.first_chained_event(ems_id, chain_id) || self
end
def get_target(target_type)
target_type = target_type.to_s
if target_type =~ /^first_chained_(.+)$/
target_type = $1
event = first_chained_event
else
event = self
end
target_type = "src_vm_or_template" if target_type == "src_vm"
target_type = "dest_vm_or_template" if target_type == "dest_vm"
target_type = "target" if event.event_type == "datawarehouse_alert"
event.send(target_type)
end
def tenant_identity
(vm_or_template || ext_management_system).tenant_identity
end
def manager_refresh_targets
if ext_management_system.allow_targeted_refresh?
require "inventory_refresh"
ext_management_system.class::EventTargetParser.new(self).parse
else
ext_management_system
end
end
def self.display_name(number = 1)
n_('Management Event', 'Management Events', number)
end
private
def self.event_allowed_ems_ref_keys
%w[vm_ems_ref dest_vm_ems_ref]
end
private_class_method :event_allowed_ems_ref_keys
def self.create_event(event)
event.delete_if { |k,| k.to_s.ends_with?("_ems_ref") && !event_allowed_ems_ref_keys.include?(k.to_s) }
new_event = EmsEvent.create(event) unless EmsEvent.exists?(
:event_type => event[:event_type],
:timestamp => event[:timestamp],
:chain_id => event[:chain_id],
:ems_id => event[:ems_id],
:ems_ref => event[:ems_ref]
)
new_event.handle_event if new_event
new_event
end
private_class_method :create_event
def self.create_completed_event(event, orig_task = nil)
if orig_task.nil?
orig_task = first_chained_event(event[:ems_id], event[:chain_id])
return if orig_task.nil?
end
if task_final_events[event[:event_type].to_sym].include?(orig_task.event_type)
event = MiqHashStruct.new(event)
# Determine which event has the details for the source and dest
if SOURCE_DEST_TASKS.include?(orig_task.event_type)
source_event = orig_task
dest_event = event
else
source_event = event
dest_event = nil
end
# Build the 'completed task' event
new_event_type = "#{orig_task.event_type}_Complete"
new_event = {
:event_type => new_event_type,
:chain_id => event.chain_id,
:is_task => true,
:source => 'EVM',
:ems_id => event.ems_id,
:message => "#{orig_task.event_type} Completed",
:timestamp => event.timestamp,
:host_name => source_event.host_name,
:host_id => source_event.host_id,
:vm_name => source_event.vm_name,
:vm_location => source_event.vm_location,
:vm_ems_ref => source_event.vm_ems_ref,
:vm_or_template_id => source_event.vm_or_template_id
}
new_event[:username] = event.username if event.username.present?
# Fill in the dest information if we have it
unless dest_event.nil?
# Determine from which field to get the dest information
dest_key = dest_event.dest_vm_name.nil? ? '' : 'dest_'
new_event.merge!(
:dest_host_name => dest_event.host_name,
:dest_host_id => dest_event.host_id,
:dest_vm_name => dest_event.send(:"#{dest_key}vm_name"),
:dest_vm_location => dest_event.send(:"#{dest_key}vm_location"),
:dest_vm_ems_ref => dest_event.send(:"#{dest_key}vm_ems_ref"),
:dest_vm_or_template_id => dest_event.send(:"#{dest_key}vm_or_template_id")
)
end
create_event(new_event)
end
end
private_class_method :create_completed_event
private_class_method def self.syndicate_event(ems_id, event)
ems = ExtManagementSystem.find(ems_id)
event = event.dup
event[:ems_uid] = ems&.uid_ems
event[:ems_type] = ems&.class&.ems_type
MiqQueue.messaging_client('event_handler')&.publish_topic(
:service => "manageiq.ems-events",
:sender => ems_id,
:event => event[:event_type],
:payload => event
)
rescue => err
_log.warn("Failed to publish event [#{ems_id}] [#{event[:event_type]}]: #{err}")
_log.log_backtrace(err)
end
private_class_method def self.syndicate_events?
Settings.event_streams.syndicate_events && MiqQueue.messaging_type != "miq_queue"
end
def get_refresh_target(target_type)
m = "#{target_type}_refresh_target"
try(m)
end
def vm_refresh_target
(vm_or_template && vm_or_template.ext_management_system ? vm_or_template : host_refresh_target)
end
alias_method :src_vm_refresh_target, :vm_refresh_target
def src_vm_or_dest_host_refresh_target
vm_or_template ? vm_refresh_target : dest_host_refresh_target
end
def host_refresh_target
(host && host.ext_management_system ? host : ems_refresh_target)
end
alias_method :src_host_refresh_target, :host_refresh_target
def dest_vm_refresh_target
(dest_vm_or_template && dest_vm_or_template.ext_management_system ? dest_vm_or_template : dest_host_refresh_target)
end
def dest_host_refresh_target
(dest_host && dest_host.ext_management_system ? dest_host : ems_refresh_target)
end
def ems_cluster_refresh_target
ext_management_system
end
alias_method :src_ems_cluster_refresh_target, :ems_cluster_refresh_target
def ems_refresh_target
ext_management_system
end
end