ManageIQ/manageiq

View on GitHub
app/models/miq_server/worker_management/dequeue.rb

Summary

Maintainability
A
3 hrs
Test Coverage
F
22%
module MiqServer::WorkerManagement::Dequeue
  extend ActiveSupport::Concern

  def peek(queue_name, priority, limit)
    MiqQueue.peek(
      :conditions => {:queue_name => queue_name, :priority => priority, :role => @active_role_names},
      :select     => "id, lock_version, priority, role",
      :limit      => limit
    )
  end

  def get_worker_dequeue_method(worker_class)
    (@child_worker_settings[worker_class.settings_name][:dequeue_method] || :drb).to_sym
  end

  def reset_queue_messages
    @queue_messages_lock.synchronize(:EX) do
      @queue_messages = {}
    end
  end

  def get_queue_priority_for_worker(w)
    w[:class].respond_to?(:queue_priority) ? w[:class].queue_priority : MiqQueue::MIN_PRIORITY
  end

  def get_queue_message_for_worker(w)
    return nil if w.nil? || w[:queue_name].nil?

    @queue_messages_lock.synchronize(:EX) do
      queue_name = w[:queue_name]
      queue_hash = @queue_messages[queue_name]
      return nil unless queue_hash.kind_of?(Hash)

      messages = queue_hash[:messages]
      return nil unless messages.kind_of?(Array)

      messages.each_index do |index|
        msg = messages[index]
        next if msg.nil?
        next if MiqQueue.lower_priority?(msg[:priority], get_queue_priority_for_worker(w))
        next unless w[:class].required_roles.blank? || msg[:role].blank? || Array.wrap(w[:class].required_roles).include?(msg[:role])
        return messages.delete_at(index)
      end

      return nil
    end
  end

  def get_queue_message(pid)
    @workers_lock.synchronize(:SH) do
      w = @workers[pid]

      msg = get_queue_message_for_worker(w)
      [msg[:id], msg[:lock_version]] if msg
    end unless @workers_lock.nil?
  end

  def prefetch_stale_threshold
    ::Settings.server.prefetch_stale_threshold.to_i_with_method
  end

  def prefetch_below_threshold?(queue_name, wcount)
    @queue_messages_lock.synchronize(:SH) do
      return false unless @queue_messages.key_path?(queue_name, :messages)
      return (@queue_messages[queue_name][:messages].length <= (::Settings.server.prefetch_min_per_worker_dequeue * wcount))
    end
  end

  def prefetch_stale?(queue_name)
    @queue_messages_lock.synchronize(:SH) do
      return true if @queue_messages[queue_name].nil?
      return ((Time.now.utc - @queue_messages[queue_name][:timestamp]) > prefetch_stale_threshold)
    end
  end

  def prefetch_has_lower_priority_than_miq_queue?(queue_name)
    @queue_messages_lock.synchronize(:SH) do
      return true if @queue_messages[queue_name].nil? || @queue_messages[queue_name][:messages].nil?
      msg = @queue_messages[queue_name][:messages].first
      return true if msg.nil?
      return peek(queue_name, MiqQueue.priority(msg[:priority], :higher, 1), 1).any?
    end
  end

  def get_worker_count_and_priority_by_queue_name
    queue_names = {}
    @workers_lock.synchronize(:SH) do
      @workers.each do |_pid, w|
        next if w[:queue_name].nil?
        next if w[:class].nil?
        next unless get_worker_dequeue_method(w[:class]) == :drb
        options = (queue_names[w[:queue_name]] ||= [0, MiqQueue::MAX_PRIORITY])
        options[0] += 1
        options[1]  = MiqQueue.lower_priority(get_queue_priority_for_worker(w), options[1])
      end
    end unless @workers_lock.nil?
    queue_names
  end

  def register_worker(worker_pid, worker_class, queue_name)
    worker_class = worker_class.constantize if worker_class.kind_of?(String)

    @workers_lock.synchronize(:EX) do
      worker_add(worker_pid)
      h = @workers[worker_pid]
      h[:class] ||= worker_class
      h[:queue_name] ||= queue_name
    end unless @workers_lock.nil?
  end

  def populate_queue_messages
    queue_names = get_worker_count_and_priority_by_queue_name
    @queue_messages_lock.synchronize(:EX) do
      queue_names.each do |queue_name, (wcount, priority)|
        if prefetch_below_threshold?(queue_name, wcount) || prefetch_stale?(queue_name) || prefetch_has_lower_priority_than_miq_queue?(queue_name)
          @queue_messages[queue_name] ||= {}
          @queue_messages[queue_name][:timestamp] = Time.now.utc
          @queue_messages[queue_name][:messages]  = peek(queue_name, priority, (::Settings.server.prefetch_max_per_worker_dequeue * wcount)).collect do |q|
            {:id => q.id, :lock_version => q.lock_version, :priority => q.priority, :role => q.role}
          end
          _log.info("Fetched #{@queue_messages[queue_name][:messages].length} miq_queue rows for queue_name=#{queue_name}, wcount=#{wcount.inspect}, priority=#{priority.inspect}") if @queue_messages[queue_name][:messages].length > 0
        end
      end
    end
  end
end