ManageIQ/manageiq

View on GitHub
app/models/mixins/process_tasks_mixin.rb

Summary

Maintainability
A
25 mins
Test Coverage
B
87%
module ProcessTasksMixin
  extend ActiveSupport::Concern
  include RetirementMixin

  module ClassMethods
    # Processes tasks received from the UI and queues them
    def process_tasks(options)
      raise _("No ids given to process_tasks") if options[:ids].blank?
      if options[:task] == 'retire_now'
        options[:ids].each do |id|
          $log.info("Creating retire request for id [#{id}] with user [#{User.current_user.userid}]")
          name.constantize.make_retire_request(id, User.current_user)
        end
      elsif options[:task] == "refresh_ems" && respond_to?("refresh_ems")
        refresh_ems(options[:ids])
        msg = "'#{options[:task]}' initiated for #{options[:ids].length} #{ui_lookup(:table => base_class.name).pluralize}"
        task_audit_event(:success, options, :message => msg)
      else
        assert_known_task(options)
        options[:userid] ||= User.current_user.try(:userid) || "system"
        invoke_tasks_queue(options)
      end
    end

    def invoke_tasks_queue(options)
      q_hash = {
        :class_name  => name,
        :method_name => "invoke_tasks",
        :args        => [options]
      }
      user = User.current_user
      q_hash.merge!(:user_id => user.id, :group_id => user.current_group.id, :tenant_id => user.current_tenant.id) if user
      MiqQueue.submit_job(q_hash)
    end

    # Performs tasks received from the UI via the queue
    def invoke_tasks(options)
      local, remote = partition_ids_by_remote_region(options[:ids])

      invoke_tasks_local(options.merge(:ids => local)) if local.present?
      invoke_tasks_remote(options.merge(:ids => remote)) if remote.present?
    end

    def invoke_tasks_local(options)
      options[:invoke_by] = task_invoked_by(options)
      args = task_arguments(options)

      instances, tasks = validate_tasks(options)

      instances.zip(tasks) do |instance, task|
        if task && task.status == "Error"
          task_audit_event(:failure, options, :target_id => instance.id, :message => task.message)
          task.state_finished
          next
        end

        invoke_task_local(task, instance, options, args)

        msg = "#{instance.name}: '#{options[:task]}' initiated"
        msg = "[Name: #{instance.name},Id: #{instance.id}, Ems_ref: #{instance.ems_ref}] Record destroyed" if options[:task] == 'destroy'

        task_audit_event(:success, options, :target_id => instance.id, :message => msg)
        task.update_status("Queued", "Ok", "Task has been queued") if task
      end
    end

    def invoke_tasks_remote(options)
      ApplicationRecord.group_ids_by_region(options[:ids]).each do |region, ids|
        remote_options = options.merge(:ids => ids)

        begin
          remote_connection = InterRegionApiMethodRelay.api_client_connection_for_region(region, remote_options[:userid])
          invoke_api_tasks(remote_connection, remote_options)
        rescue NotImplementedError => err
          $log.error("#{name} is not currently able to invoke tasks for remote regions")
          $log.log_backtrace(err)
          next
        rescue => err
          # Handle specific error case, until we can figure out how it occurs
          if err.class == ArgumentError && err.message == "cannot interpret as DNS name: nil"
            $log.error("An error occurred while invoking remote tasks...")
            $log.log_backtrace(err)
            next
          end

          $log.error("An error occurred while invoking remote tasks...Requeueing for 1 minute from now.")
          $log.log_backtrace(err)

          q_hash = {
            :class_name  => name,
            :method_name => 'invoke_tasks_remote',
            :args        => [remote_options],
            :deliver_on  => Time.now.utc + 1.minute
          }
          user = User.current_user
          q_hash.merge!(:user_id => user.id, :group_id => user.current_group.id, :tenant_id => user.current_tenant.id) if user
          MiqQueue.submit_job(q_hash)
          next
        end
      end
    end

    # Override as needed to handle differences between API actions and method names
    def action_for_task(task)
      task
    end

    def invoke_api_tasks(api_client, remote_options)
      collection_name = Api::CollectionConfig.new.name_for_klass(self)
      unless collection_name
        _log.error("No API endpoint found for class #{name}")
        raise NotImplementedError
      end

      collection   = api_client.send(collection_name)
      action       = action_for_task(remote_options[:task])
      resource_ids = remote_options[:ids]

      if resource_ids.present?
        resource_ids.each do |id|
          send_action(action, collection_name, collection, remote_options, id)
        end
      else
        send_action(action, collection_name, collection, remote_options)
      end
    end

    def send_action(action, collection_name, collection, remote_options, id = nil)
      require 'manageiq-api-client'
      post_args = remote_options[:args] || {}
      begin
        if id.present?
          msg_desination = "remote object: #{id} for collection #{collection_name}, with args #{post_args}"
          destination = collection.find(id)
        else
          msg_desination = "remote collection #{collection_name}, with args #{post_args}"
          destination = collection
        end
        _log.info("Invoking task #{action} on #{msg_desination}")
        destination.send(action, post_args)
        task_audit_event(:success, remote_options, :message => "'#{action}' successfully initiated on #{msg_desination}")
      rescue StandardError => err
        task_audit_event(:failure, remote_options, :message => "'#{action}' failed to be initiated on #{msg_desination}")
        _log.error(err.message)
        raise err unless err.kind_of?(NoMethodError) || err.kind_of?(ManageIQ::API::Client::ResourceNotFound)
      end
    end
    private :send_action

    # default: invoked by task, can be overridden
    def task_invoked_by(_options)
      :task
    end
    private :task_invoked_by

    # default: only handles retirement, can be overridden
    def task_arguments(options)
      options[:task] == 'retire_now' ? [options[:userid]] : []
    end
    private :task_arguments

    # default implementation, can be overridden
    def invoke_task_local(task, instance, options, args)
      cb = {
        :class_name  => task.class.to_s,
        :instance_id => task.id,
        :method_name => :queue_callback,
        :args        => ["Finished"]
      } if task

      q_hash = {
        :class_name   => name,
        :instance_id  => instance.id,
        :method_name  => options[:task],
        :args         => args,
        :miq_task_id  => task&.id,
        :miq_callback => cb
      }
      user = User.current_user
      q_hash.merge!(:user_id => user.id, :group_id => user.current_group.id, :tenant_id => user.current_tenant.id) if user
      MiqQueue.submit_job(q_hash)
    end

    private

    # Helper method for invoke_tasks, to determine the instances and the tasks associated
    def validate_tasks(options)
      tasks = []

      instances = base_class.where(:id => options[:ids]).order(Arel.sql("lower(name)")).to_a
      return instances, tasks unless options[:invoke_by] == :task # jobs will be used instead of tasks for feedback

      instances.each do |instance|
        # create a task for each instance
        task = MiqTask.create(:name => "#{instance.name}: '#{options[:task]}'", :userid => options[:userid])
        tasks.push(task)

        validate_task(task, instance, options)
      end
      return instances, tasks
    end

    # default: validate retirement and maintenance zone, can be overridden
    def validate_task(task, instance, options)
      if instance.try(:ext_management_system)&.zone == Zone.maintenance_zone
        task.error("#{instance.ext_management_system.name} is paused")
        return false
      end

      return true unless options[:task] == "retire_now" && instance.retired?
      task.error("#{instance.name} is already retired")
      false
    end

    def task_audit_event(event, task_options, audit_options)
      options =
        {
          :event        => task_options[:task],
          :target_class => base_class.name,
          :userid       => task_options[:userid],
        }.merge(audit_options)
      AuditEvent.send(event, options)
    end

    def assert_known_task(options)
      unless instance_methods.collect(&:to_s).include?(options[:task])
        raise _("Unknown task, %{task}") % {:task => options[:task]}
      end
    end
  end
end