ManageIQ/manageiq

View on GitHub
app/models/miq_queue.rb

Summary

Maintainability
C
7 hrs
Test Coverage
B
88%
require 'timeout'
require 'digest'

# Message Queue entry to run a method on any server
#   zone
#     This states the subset of miq_servers in this region that can perform this job.
#     put: Defaults to the zone of the current caller ("MyZone")
#          Pass in nil to have this performed in any zone.
#     get: Fetches jobs both for the caller's zone and for any zone.
#   role
#     This states the role necessary for a miq_server to perform this job.
#     put: Defaults to nil (no role required).
#          Typically this is passed in to require a role.
#     get: Fetches jobs both for the caller's roles and for no role required.
#   queue_name
#     This states the worker queue that will perform this job.
#     put: Default to "generic" to be performed by the generic worker.
#     get: Defaults to "generic" but is typically overridden by the caller (a worker)
#
class MiqQueue < ApplicationRecord
  belongs_to :handler, :polymorphic => true
  belongs_to :miq_task

  attr_accessor :last_exception

  MAX_PRIORITY    = 0
  HIGH_PRIORITY   = 20
  MEDIUM_PRIORITY = 50
  NORMAL_PRIORITY = 100
  LOW_PRIORITY    = 150
  MIN_PRIORITY    = 200

  PRIORITY_WHICH  = [:max, :high, :medium, :normal, :low, :min]
  PRIORITY_DIR    = [:higher, :lower]

  def self.messaging_type
    ENV.fetch("MESSAGING_TYPE", nil) || Settings.messaging.type
  end

  def self.messaging_client(client_ref)
    @messaging_client ||= {}
    return if messaging_type == "miq_queue"

    @messaging_client[client_ref] ||= begin
      require "manageiq-messaging"
      ManageIQ::Messaging.logger = _log

      # caching the client works, even if the connection becomes unavailable
      # internally the client will track the state of the connection and re-open it,
      # once it's available again - at least thats true for a stomp connection
      options = messaging_client_options&.merge(:client_ref => client_ref)
      return if options.nil?

      ManageIQ::Messaging::Client.open(options)
    rescue => err
      _log.warn("Failed to open messaging client: #{err}")
      nil
    end
  end

  def self.messaging_client_options
    opts = messaging_options_from_env || messaging_options_from_file
    return if opts.nil?

    opts.transform_values! { |v| v.kind_of?(String) ? ManageIQ::Password.try_decrypt(v) : v }
  end

  def self.columns_for_requeue
    @requeue_columns ||= MiqQueue.column_names.map(&:to_sym) - [:id]
  end

  def self.priority(which, dir = nil, by = 0)
    unless which.kind_of?(Integer) || PRIORITY_WHICH.include?(which)
      raise ArgumentError,
            _("which must be an Integer or one of %{priority}") % {:priority => PRIORITY_WHICH.join(", ")}
    end
    unless dir.nil? || PRIORITY_DIR.include?(dir)
      raise ArgumentError, _("dir must be one of %{directory}") % {:directory => PRIORITY_DIR.join(", ")}
    end

    which = const_get("#{which.to_s.upcase}_PRIORITY") unless which.kind_of?(Integer)
    priority = which.send(dir == :higher ? "-" : "+", by)
    priority = MIN_PRIORITY if priority > MIN_PRIORITY
    priority = MAX_PRIORITY if priority < MAX_PRIORITY
    priority
  end

  def self.higher_priority(*priorities)
    priorities.min
  end

  def self.lower_priority(*priorities)
    priorities.max
  end

  def self.higher_priority?(p1, p2)
    p1 < p2
  end

  def self.lower_priority?(p1, p2)
    p1 > p2
  end

  TIMEOUT = 10.minutes

  serialize :args, Array
  serialize :miq_callback, Hash

  validate :validate_zone_name

  STATE_READY   = 'ready'.freeze
  STATE_DEQUEUE = 'dequeue'.freeze
  STATE_WARN    = 'warn'.freeze
  STATE_ERROR   = 'error'.freeze
  STATE_TIMEOUT = 'timeout'.freeze
  STATE_EXPIRED = "expired".freeze
  validates_inclusion_of :state,  :in => [STATE_READY, STATE_DEQUEUE, STATE_WARN, STATE_ERROR, STATE_TIMEOUT, STATE_EXPIRED]
  FINISHED_STATES = [STATE_WARN, STATE_ERROR, STATE_TIMEOUT, STATE_EXPIRED].freeze

  STATUS_OK      = 'ok'.freeze
  STATUS_RETRY   = 'retry'.freeze
  STATUS_WARN    = STATE_WARN
  STATUS_ERROR   = STATE_ERROR
  STATUS_TIMEOUT = STATE_TIMEOUT
  DEFAULT_QUEUE  = "generic"

  def data
    msg_data && Marshal.load(msg_data)
  end

  def data=(value)
    self.msg_data = Marshal.dump(value)
  end

  def self.put(options)
    options = options.merge(
      :zone         => Zone.determine_queue_zone(options),
      :state        => STATE_READY,
      :handler_type => nil,
      :handler_id   => nil
    )

    if Zone.maintenance?(options[:zone])
      _log.debug("MiqQueue#put skipped: #{options.inspect}")
      return
    end

    create_with_options = all.values[:create_with] || {}
    options[:priority]     ||= create_with_options[:priority] || NORMAL_PRIORITY
    options[:queue_name]   ||= create_with_options[:queue_name] || "generic"
    options[:msg_timeout]  ||= create_with_options[:msg_timeout] || TIMEOUT
    options[:task_id]        = (defined?($_miq_worker_current_msg) && $_miq_worker_current_msg.try(:task_id)) unless options.key?(:task_id)
    options[:tracking_label] = Thread.current[:tracking_label] || options[:task_id] unless options.key?(:tracking_label)
    options[:role]           = options[:role].to_s unless options[:role].nil?

    options[:args] = [options[:args]] if options[:args] && !options[:args].kind_of?(Array)

    if !Rails.env.production? && options[:args] &&
       (arg = options[:args].detect { |a| a.kind_of?(ActiveRecord::Base) && !a.new_record? })
      raise ArgumentError, "MiqQueue.put(:class_name => #{options[:class_name]}, :method => #{options[:method_name]}) does not support args with #{arg.class.name} objects"
    end

    msg = MiqQueue.create!(options)
    _log.info(MiqQueue.format_full_log_msg(msg))
    msg
  end

  # Execute a job on all servers.
  #
  # Raises an ArgumentError if zone or role keys are specified, and those keys
  # will be nil'd out so `MiqQueue.get` "ignores" those fields.
  #
  def self.broadcast(options)
    # Currently not filterable by these keys (:zone, :role)
    #
    # If this feature is ever needed, ensure you are not just passing the
    # values from :zone and :role, but ALSO filtering the server list down by
    # those same values to ensure orphan jobs are not being created.
    raise ArgumentError, "invalid key :zone" if options.key?(:zone)
    raise ArgumentError, "invalid key :role" if options.key?(:role)

    MiqServer.active_miq_servers.select(:id, :guid).each do |server|
      put(options.merge(:server_guid => server.guid, :zone => nil, :role => nil))
    end
  end

  # Trigger a background job
  #
  # target_worker:
  #
  # @options options [String] :class_name
  # @options options [String] :instance_id
  # @options options [String] :method_name
  # @options options [String] :args
  # @options options [String] :target_id (deprecated)
  # @options options [String] :data (deprecated)
  #
  # execution parameters:
  #
  # @options options [String] :expires_on
  # @options options [String] :ttl
  # @options options [String] :task_id (deprecated)
  #
  # routing:
  #
  # @options options [String] :service name of the service. Similar to previous role or queue name derives
  #                                    queue_name, role, and zone.
  # @options options [ExtManagementSystem|Nil|Array<Class,id>] :affinity resource for affinity. Typically an ems
  # @options options [String] :miq_zone this overrides the auto derived zone.
  #
  def self.submit_job(options)
    service = options.delete(:service) || "generic"
    resource = options.delete(:affinity)
    case service
    when "automate"
      # options[:queue_name] = "generic"
      options[:role] = service
    when "ems_inventory"
      options[:queue_name] = resource.queue_name_for_ems_refresh
      options[:role]       = service
      options[:zone]       = resource.my_zone
    when "ems_operations"
      options[:role] = service
      options[:zone] = resource.try(:my_zone) || MiqServer.my_zone
      options[:queue_name] = resource.try(:queue_name_for_ems_operations) || "generic"
    when "event"
      options[:queue_name] = "ems"
      options[:role] = service
    when "generic"
      raise ArgumentError, "generic job should have no resource" if resource
      # TODO: can we transition to zone = nil
    when "notifier"
      options[:role] = service
    when "reporting"
      options[:queue_name] = queue_name_for_priority_service(service, options[:priority])
      options[:role] = service
    when "smartproxy"
      options[:queue_name] = "smartproxy"
      options[:role] = "smartproxy"
    when "smartstate"
      options[:role] = service
      options[:zone] = resource.try(:my_zone) || MiqServer.my_zone
    end

    # Note, options[:zone] is set in 'put' via 'determine_queue_zone' and handles setting
    # a nil (any) zone for regional roles.  Therefore, regional roles don't need to set zone here.
    put(options)
  end

  # For services with a dedicated queue_name and worker, such as reporting, high priority work is
  # run in the 'generic' queue with HIGH_PRIORITY so priority workers or even generic workers can
  # pick it up immediately. Anything but high priority work or higher is run in the service's
  # queue, such as 'reporting', and the dedicated worker will process this message following
  # priority processing.
  #
  # Why? For example, with 'reporting', reports are often started by a user who is actively waiting
  # for the result so we need a way to expedite their request in workers who deal with high
  # priority work.  At the same time, if a user isn't actively waiting for a report, it should be
  # handled by reporting workers and not sit behind other items in the generic queue as it's far
  # easier to scale these workers up and down as needed.
  #
  # TODO: Review if other services in submit_job, such as event/smart proxy should follow this pattern.
  def self.queue_name_for_priority_service(service, priority)
    (priority.nil? || MiqQueue.lower_priority?(priority, HIGH_PRIORITY)) ? service.to_s : "generic"
  end

  def self.where_queue_name(is_array)
    is_array ? "AND queue_name in (?)" : "AND queue_name = ?"
  end

  MIQ_QUEUE_GET = <<-EOL
    state = 'ready'
    AND (zone IS NULL OR zone = ?)
    AND (task_id IS NULL OR task_id NOT IN (
      SELECT DISTINCT task_id
      FROM #{table_name}
      WHERE state = 'dequeue'
        AND (zone IS NULL OR zone = ?)
        AND task_id IS NOT NULL
    ))
    AND (role IS NULL OR role IN (?))
    AND (server_guid IS NULL OR server_guid = ?)
    AND (deliver_on IS NULL OR deliver_on <= ?)
    AND (priority <= ?)
  EOL

  def self.get(options = {})
    sql_for_get = MIQ_QUEUE_GET + where_queue_name(options[:queue_name].kind_of?(Array))
    cond = [
      sql_for_get,
      options[:zone] || MiqServer.my_server.zone.name,
      options[:zone] || MiqServer.my_server.zone.name,
      options[:role] || MiqServer.my_server.active_role_names,
      MiqServer.my_guid,
      Time.now.utc,
      options[:priority] || MIN_PRIORITY,
      options[:queue_name] || "generic",
    ]

    prefetch_max_per_worker = Settings.server.prefetch_max_per_worker
    msgs = MiqQueue.where(cond).order("priority, id").limit(prefetch_max_per_worker)

    result = nil
    msgs.each do |msg|
      begin
        _log.info("#{MiqQueue.format_short_log_msg(msg)} previously timed out, retrying...") if msg.state == STATE_TIMEOUT
        handler = MiqWorker.my_worker || MiqServer.my_server
        msg.update!(:state => STATE_DEQUEUE, :handler => handler)
        _log.info("#{MiqQueue.format_full_log_msg(msg)}, Dequeued in: [#{Time.now.utc - msg.created_on}] seconds")
        return msg
      rescue ActiveRecord::StaleObjectError
        result = :stale
      rescue => err
        raise _("%{log_message} \"%{error}\" attempting to get next message") % {:log_message => _log.prefix, :error => err}
      end
    end
    _log.debug("All #{prefetch_max_per_worker} messages stale, returning...") if result == :stale
    result
  end

  # This are the queue calls related to worker management which
  # might not be needed once we use kubernetes for worker/pod management
  def self.put_deprecated(*args)
    put(*args)
  end

  def unget(options = {})
    update!(options.merge(:state => STATE_READY, :handler => nil))
    @delivered_on = nil
    _log.info("#{MiqQueue.format_full_log_msg(self)}, Requeued")
  end

  # TODO (juliancheal) This is a hack. Brakeman was giving us an SQL injection
  # warning when we concatonated the queue_name string onto the query.
  # Creating two seperate queries like this, resolves the Brakeman issue, but
  # isn't ideal. This will need to be rewritten using Arel queries at some point.

  MIQ_QUEUE_PEEK = <<-EOL
    state = 'ready'
    AND (zone IS NULL OR zone = ?)
    AND (role IS NULL OR role IN (?))
    AND (server_guid IS NULL OR server_guid = ?)
    AND (deliver_on IS NULL OR deliver_on <= ?)
    AND (priority <= ?)
    AND queue_name = ?
  EOL

  MIQ_QUEUE_PEEK_ARRAY = <<-EOL
    state = 'ready'
    AND (zone IS NULL OR zone = ?)
    AND (role IS NULL OR role IN (?))
    AND (server_guid IS NULL OR server_guid = ?)
    AND (deliver_on IS NULL OR deliver_on <= ?)
    AND (priority <= ?)
    AND queue_name in (?)
  EOL

  def self.peek(options = {})
    conditions, select, limit = options.values_at(:conditions, :select, :limit)

    sql_for_peek = conditions[:queue_name].kind_of?(Array) ? MIQ_QUEUE_PEEK_ARRAY : MIQ_QUEUE_PEEK

    cond = [
      sql_for_peek,
      conditions[:zone] || MiqServer.my_server.zone.name,
      conditions[:role] || MiqServer.my_server.active_role_names,
      MiqServer.my_guid,
      Time.now.utc,
      conditions[:priority] || MIN_PRIORITY,
      conditions[:queue_name] || "generic",
    ]

    result = MiqQueue.where(cond).order(:priority, :id).limit(limit || 1)
    result = result.select(select) unless select.nil?
    result.to_a
  end

  # Find the MiqQueue item with the specified find options, and yields that
  #   record to a block.  The block should return the options for updating
  #   the record.  If the record was not found, the block's options will be
  #   used to put a new item on the queue.
  #
  def self.put_or_update(find_options)
    find_options = default_get_options(find_options)

    # Since args are a serializable field, remove them and manually dump them
    #   for proper comparison.
    where_scope =
      if find_options.key?(:args)
        MiqQueue.where(find_options.except(:args)).where(['args = ?', find_options[:args].try(:to_yaml)])
      else
        MiqQueue.where(find_options)
      end

    msg = nil
    loop do
      msg = where_scope.order("priority, id").first

      save_options = block_given? ? yield(msg, find_options) : nil

      # Add a new queue item based on the returned save options, or the find
      #   options if no save options were given.
      if msg.nil?
        put_options = save_options || find_options
        put_options = put_options.except(:state) if put_options.key?(:state)
        msg = MiqQueue.put(put_options)
        break
      end

      begin
        # Update the queue item based on the returned save options.
        unless save_options.nil?
          if save_options.key?(:msg_timeout) && (msg.msg_timeout > save_options[:msg_timeout])
            _log.warn("#{MiqQueue.format_short_log_msg(msg)} ignoring request to decrease timeout from <#{msg.msg_timeout}> to <#{save_options[:msg_timeout]}>")
            save_options = save_options.except(:msg_timeout)
          end

          msg.update!(save_options)
          _log.info("#{MiqQueue.format_short_log_msg(msg)} updated with following: #{save_options.except(:data, :msg_data).inspect}")
          _log.info("#{MiqQueue.format_full_log_msg(msg)}, Requeued")
        end
        break
      rescue ActiveRecord::StaleObjectError
        _log.debug("#{MiqQueue.format_short_log_msg(msg)} stale, retrying...")
      rescue => err
        raise RuntimeError,
              _("%{log_message} \"%{error}\" attempting merge next message") % {:log_message => _log.prefix,
                                                                                :error       => err},
              err.backtrace
      end
    end
    msg
  end

  # Find the MiqQueue item with the specified find options, and if not found
  #   puts a new item on the queue.  If the item was found, it will not be
  #   changed, and will be yielded to an optional block, generally for logging
  #   purposes.
  def self.put_unless_exists(find_options)
    put_or_update(find_options) do |msg, item_hash|
      ret = yield(msg, item_hash) if block_given?
      # create the record if the original message did not exist, don't change otherwise
      ret if msg.nil?
    end
  end

  def self.unqueue(options)
    find_by(optional_values(default_get_options(options))).try(:destroy)
  end

  def deliver(requester = nil, &block)
    result = nil
    delivered_on
    _log.info("#{MiqQueue.format_short_log_msg(self)}, Delivering...")

    begin
      raise _("class_name cannot be nil") if class_name.nil?

      obj = class_name.constantize

      if instance_id
        begin
          if (class_name == requester.class.name) && requester.respond_to?(:id) && (instance_id == requester.id)
            obj = requester
          else
            obj = obj.find(instance_id)
          end
        rescue ActiveRecord::RecordNotFound => err
          _log.warn("#{MiqQueue.format_short_log_msg(self)} will not be delivered because #{err.message}")
          return STATUS_WARN, nil, nil
        rescue => err
          _log.error("#{MiqQueue.format_short_log_msg(self)} will not be delivered because #{err.message}")
          return STATUS_ERROR, err.message, nil
        end
      end

      data = self.data
      args.push(data) if data
      args.unshift(target_id) if obj.kind_of?(Class) && target_id

      begin
        status = STATUS_OK
        message = "Message delivered successfully"
        result = User.with_user_group(user_id, group_id) { dispatch_method(obj, args, &block) }
      rescue MiqException::MiqQueueRetryLater => err
        unget(err.options)
        message = "Message not processed.  Retrying #{err.options[:deliver_on] ? "at #{err.options[:deliver_on]}" : 'immediately'}"
        _log.error("#{MiqQueue.format_short_log_msg(self)}, #{message}")
        status = STATUS_RETRY
      rescue Timeout::Error
        message = "timed out after #{Time.now - delivered_on} seconds.  Timeout threshold [#{msg_timeout}]"
        _log.error("#{MiqQueue.format_short_log_msg(self)}, #{message}")
        status = STATUS_TIMEOUT
      end
    rescue StandardError, SyntaxError => error
      _log.error("#{MiqQueue.format_short_log_msg(self)}, Error: [#{error}]")
      _log.log_backtrace(error) unless error.kind_of?(MiqException::Error)
      status = STATUS_ERROR
      self.last_exception = error
      message = error.message
    end

    return status, message, result
  end

  # @return status
  def deliver_and_process(requester = nil, &block)
    status, message, result = deliver(requester, &block)
    delivered(status, message, result) unless status == STATUS_RETRY

    status
  end

  def dispatch_method(obj, args)
    Timeout.timeout(msg_timeout) do
      args = activate_miq_task(args)
      block_given? ? yield : obj.send(method_name, *args)
    end
  end

  DELIVER_IN_ERROR_MSG = 'Deliver in error'.freeze
  def delivered_in_error(msg = nil)
    delivered('error', msg || DELIVER_IN_ERROR_MSG, nil)
  end

  def delivered(state, msg, result)
    self.state = state
    _log.info("#{MiqQueue.format_short_log_msg(self)}, State: [#{state}], Delivered in [#{Time.now - delivered_on}] seconds")
    m_callback(msg, result) if miq_callback.present?
  rescue => err
    _log.error("#{MiqQueue.format_short_log_msg(self)}, #{err.message}")
  ensure
    destroy_potentially_stale_record
  end

  def delivered_on
    @delivered_on ||= Time.now
  end

  def m_callback(msg, result)
    if miq_callback[:class_name] && miq_callback[:method_name]
      begin
        klass = miq_callback[:class_name].constantize
        if miq_callback[:instance_id]
          obj = klass.find(miq_callback[:instance_id])
        else
          obj = klass
          _log.debug("#{MiqQueue.format_short_log_msg(self)}, Could not find callback in Class: [#{miq_callback[:class_name]}]") unless obj
        end
        if obj.respond_to?(miq_callback[:method_name])
          miq_callback[:args] ||= []

          log_args = result.inspect
          log_args = "#{log_args[0, 500]}..." if log_args.length > 500  # Trim long results
          log_args = miq_callback[:args] + [state, msg, log_args]
          _log.info("#{MiqQueue.format_short_log_msg(self)}, Invoking Callback with args: #{log_args.inspect}") unless obj.nil?

          cb_args = miq_callback[:args] + [state, msg, result]
          cb_args << self if cb_args.length == (obj.method(miq_callback[:method_name]).arity - 1)
          obj.send(miq_callback[:method_name], *cb_args)
        else
          _log.warn("#{MiqQueue.format_short_log_msg(self)}, Instance: [#{obj}], does not respond to Method: [#{miq_callback[:method_name]}], skipping")
        end
      rescue => err
        _log.error("#{MiqQueue.format_short_log_msg(self)}: #{err}")
        _log.log_backtrace(err)
      end
    else
      _log.warn("#{MiqQueue.format_short_log_msg(self)}, Callback is not well-defined, skipping")
    end
  end

  def requeue(options = {})
    options.reverse_merge!(attributes.symbolize_keys)
    MiqQueue.put(options.slice(*MiqQueue.columns_for_requeue))
  end

  def check_for_timeout(log_prefix = "MIQ(MiqQueue.check_for_timeout)", grace = 10.seconds, timeout = msg_timeout.seconds)
    if state == 'dequeue' && Time.now.utc > (updated_on + timeout.seconds + grace.seconds).utc
      msg = " processed by #{handler.format_full_log_msg}" unless handler.nil?
      $log.warn("#{log_prefix} Timed Out Active #{MiqQueue.format_short_log_msg(self)}#{msg} after #{Time.now.utc - updated_on} seconds")
      destroy rescue nil
    end
  end

  def self.candidates_for_timeout
    where(:state => STATE_DEQUEUE).where("(select date_part('epoch', updated_on) + msg_timeout) < ?", Time.now.to_i)
  end

  def self.check_for_timeout
    candidates_for_timeout.each(&:check_for_timeout)
  end

  def finished?
    FINISHED_STATES.include?(state)
  end

  def unfinished?
    !finished?
  end

  def self.format_full_log_msg(msg)
    handler = msg.handler_type.nil? ? "" : "#{msg.handler_type} #{msg.handler_id}"
    data = msg.data.nil? ? "" : "#{msg.data.length} bytes"
    args = ManageIQ::Password.sanitize_string(msg.args.inspect)

    "Message id: [#{msg.id}], " \
    "Zone: [#{msg.zone}], " \
    "Role: [#{msg.role}], " \
    "Server: [#{msg.server_guid}], " \
    "MiqTask id: [#{msg.miq_task_id}], " \
    "Handler id: [#{handler}], " \
    "Ident: [#{msg.queue_name}], " \
    "Target id: [#{msg.target_id}], " \
    "Instance id: [#{msg.instance_id}], " \
    "Task id: [#{msg.task_id}], " \
    "Command: [#{msg.class_name}.#{msg.method_name}], " \
    "Timeout: [#{msg.msg_timeout}], " \
    "Priority: [#{msg.priority}], " \
    "State: [#{msg.state}], " \
    "Deliver On: [#{msg.deliver_on}], " \
    "Data: [#{data}], " \
    "Args: #{args}"
  end

  def self.format_short_log_msg(msg)
    "Message id: [#{msg.id}]"
  end

  def get_worker
    handler if handler.kind_of?(MiqWorker)
  end

  def self.get_worker(task_id)
    find_by(:task_id => task_id).try(:get_worker)
  end

  def self.display_name(number = 1)
    n_('Queue', 'Queues', number)
  end

  private

  # NOTE: this will intentionally lookup missing zones every time
  def validate_zone_name
    if zone && !self.class.valid_zone_names[zone]
      found = self.class.valid_zone_names[zone] = Zone.in_my_region.exists?(:name => zone)
      errors.add(:zone, N_("Unknown Zone")) unless found
    end
  end

  cache_with_timeout(:valid_zone_names, 1.minute) { {} }

  def activate_miq_task(args)
    MiqTask.update_status(miq_task_id, MiqTask::STATE_ACTIVE, MiqTask::STATUS_OK, "Task starting") if miq_task_id
    params = args.first
    params[:miq_task_id] = miq_task_id if params.kind_of?(Hash)
    args
  end

  # default values for get operations
  def self.default_get_options(options)
    result = options.reverse_merge(
      :queue_name => DEFAULT_QUEUE,
      :state      => STATE_READY,
      :zone       => Zone.determine_queue_zone(options)
    )

    if result[:class_name].kind_of?(Class)
      ActiveSupport::Deprecation.warn("Rails 5.1 dropped support for Class query values, use a String for class_name.", caller[1..-1])
      result[:class_name] = result[:class_name].name
    end
    result
  end

  private_class_method :default_get_options

  # when searching miq_queue, we often want to see if a key is nil, or a particular value
  # given a set of keys, modify the params to have those values
  # example:
  #   optional_values({:a => 'x', :b => 'y'}, [:a])
  #     # => {:a => [nil, 'x'], :b => 'y'}
  #   sql => "where (a is nil or a = 'x') and b = 'y'"
  #
  def self.optional_values(options, keys = [:zone])
    options = options.dup
    Array(keys).each do |key|
      options[key] = [nil, options[key]].uniq if options.key?(key)
    end
    options
  end

  private_class_method :optional_values

  private_class_method def self.messaging_options_from_env
    return unless ENV["MESSAGING_HOSTNAME"] && ENV["MESSAGING_PORT"] && ENV["MESSAGING_USERNAME"] && ENV["MESSAGING_PASSWORD"]

    options = {
      :host           => ENV["MESSAGING_HOSTNAME"],
      :port           => ENV["MESSAGING_PORT"].to_i,
      :username       => ENV["MESSAGING_USERNAME"],
      :password       => ENV["MESSAGING_PASSWORD"],
      :protocol       => ENV.fetch("MESSAGING_PROTOCOL", "Kafka"),
      :encoding       => ENV.fetch("MESSAGING_ENCODING", "json"),
      :sasl_mechanism => ENV.fetch("MESSAGING_SASL_MECHANISM", "PLAIN")
    }

    if ENV["MESSAGING_SSL_CA"].present?
      options[:ssl] = true
      options[:ca_file] = ENV["MESSAGING_SSL_CA"]
    end

    options
  end

  MESSAGING_CONFIG_FILE = Rails.root.join("config/messaging.yml")
  private_class_method def self.messaging_options_from_file
    return unless MESSAGING_CONFIG_FILE.file?

    YAML.load_file(MESSAGING_CONFIG_FILE)[Rails.env].symbolize_keys
  end

  def destroy_potentially_stale_record
    destroy
  rescue ActiveRecord::StaleObjectError
    begin
      reload.destroy
    rescue ActiveRecord::RecordNotFound
      # ignore
    end
  end
end # Class MiqQueue