ManageIQ/manageiq

View on GitHub
app/models/miq_queue_worker_base/runner.rb

Summary

Maintainability
A
1 hr
Test Coverage
F
40%
require 'miq-system'

class MiqQueueWorkerBase::Runner < MiqWorker::Runner
  def after_sync_config
    sync_dequeue_method
  end

  def sync_dequeue_method
    previous_dequeue_method = @dequeue_method
    @dequeue_method = (worker_settings[:dequeue_method] || :sql).to_sym

    dequeue_method_changed(previous_dequeue_method) if @dequeue_method != previous_dequeue_method
  end

  def dequeue_method_via_drb?
    @dequeue_method == :drb && drb_dequeue_available?
  end

  def dequeue_method_via_miq_messaging?
    @dequeue_method == :miq_messaging && miq_messaging_dequeue_available?
  end

  def dequeue_method_changed(previous_dequeue_method)
    @listener_thread&.kill if previous_dequeue_method == :miq_messaging
  end

  def get_message_via_drb
    loop do
      begin
        msg_id, lock_version = worker_monitor_drb.get_queue_message(@worker.pid)
      rescue DRb::DRbError => err
        do_exit("Error communicating with WorkerMonitor because <#{err.message}>", 1)
      end

      return nil if msg_id.nil?

      msg = MiqQueue.find_by(:id => msg_id)
      if msg.nil?
        _log.debug("#{log_prefix} Message id: [#{msg_id}] stale (msg gone), retrying...")
        next
      end

      if msg.lock_version != lock_version
        _log.debug("#{log_prefix} #{MiqQueue.format_short_log_msg(msg)} stale (lock_version mismatch), retrying...")
        next
      end

      # TODO: Possible race condition where task_id is checked in a separate call from dequeueing
      next if msg.task_id && MiqQueue.exists?(:state => MiqQueue::STATE_DEQUEUE, :zone => [nil, MiqServer.my_zone], :task_id => msg.task_id)

      begin
        msg.update!(:state => MiqQueue::STATE_DEQUEUE, :handler => @worker)
        _log.info("#{MiqQueue.format_full_log_msg(msg)}, Dequeued in: [#{Time.now - msg.created_on}] seconds")
        return msg
      rescue ActiveRecord::StaleObjectError
        _log.debug("#{log_prefix} #{MiqQueue.format_short_log_msg(msg)} stale, retrying...")
        next
      rescue => err
        msg.update_column(:state, MiqQueue::STATUS_ERROR)
        raise _("%{log} \"%{error}\" attempting to get next message") % {:log => log_prefix, :error => err}
      end
    end
  end

  def get_message_via_sql
    loop do
      msg = MiqQueue.get(
        :queue_name => @worker.queue_name,
        :role       => @worker.required_roles.presence,
        :priority   => @worker.class.queue_priority
      )
      return msg unless msg == :stale
    end
  end

  def get_message_via_miq_messaging
    @message_queue ||= Queue.new
    @message_queue.pop unless @message_queue.empty?
  end

  def get_message
    if dequeue_method_via_miq_messaging?
      get_message_via_miq_messaging
    elsif dequeue_method_via_drb? && @worker_monitor_drb
      get_message_via_drb
    else
      get_message_via_sql
    end
  end

  def deliver_queue_message(msg, &block)
    reset_poll_escalate if poll_method == :sleep_poll_escalate

    begin
      $_miq_worker_current_msg = msg
      Thread.current[:tracking_label] = msg.tracking_label || msg.task_id
      heartbeat_message_timeout(msg)
      status, message, result = msg.deliver(&block)

      if status == MiqQueue::STATUS_TIMEOUT
        begin
          _log.info("#{log_prefix} Reconnecting to DB after timeout error during queue deliver")

          # Remove the connection and establish a new one since reconnect! doesn't always play nice with SSL postgresql connections
          spec_name = ActiveRecord::Base.connection_specification_name
          ActiveRecord::Base.establish_connection(ActiveRecord::Base.remove_connection(spec_name))
          @worker.update_spid!
        rescue => err
          do_exit("Exiting worker due to timeout error that could not be recovered from...error: #{err.class.name}: #{err.message}", 1)
        end
      end

      msg.delivered(status, message, result) unless status == MiqQueue::STATUS_RETRY
      do_exit("Exiting worker due to timeout error", 1) if status == MiqQueue::STATUS_TIMEOUT
    ensure
      $_miq_worker_current_msg = nil # to avoid log messages inadvertantly prefixed by previous task_id
      Thread.current[:tracking_label] = nil
    end
  end

  def deliver_message(msg)
    case msg
    when MiqQueue
      deliver_queue_message(msg)
    when ManageIQ::Messaging::ReceivedMessage
      process_miq_messaging_message(msg)
    when String
      process_message(msg)
    else
      _log.error("#{log_prefix} Message <#{msg.inspect}> is of unknown type <#{msg.class}>")
      raise _("%{log} Message <%{message}> is of unknown type <%{type}>") %
            {:log => log_prefix, :message => msg.inspect, :type => msg.class}
    end
  end

  def do_work
    register_worker_with_worker_monitor if dequeue_method_via_drb?
    ensure_miq_listener_thread!         if dequeue_method_via_miq_messaging?

    # Keep collecting messages from the queue until the queue is empty,
    #   so we don't sleep in between messages
    loop do
      heartbeat
      msg = get_message
      break if msg.nil?

      deliver_message(msg)
    end
  end

  private

  def register_worker_with_worker_monitor
    worker_monitor_drb.register_worker(@worker.pid, @worker.class.name, @worker.queue_name)
  rescue DRb::DRbError => err
    do_exit("Failed to register worker with worker monitor: #{err.class.name}: #{err.message}", 1)
  end

  def drb_dequeue_available?
    @drb_dequeue_available ||=
      begin
        server.drb_uri.present? && worker_monitor_drb.respond_to?(:register_worker)
      rescue DRb::DRbError
        false
      end
  end

  def miq_messaging_dequeue_available?
    MiqQueue.messaging_type != "miq_queue" && MiqQueue.messaging_client(self.class.name).present?
  end

  def process_miq_messaging_message(_msg)
    raise NotImplementedError, 'Must be implemented in subclass'
  end

  def ensure_miq_listener_thread!
    @miq_listener_thread = nil if @miq_listener_thread && !@miq_listener_thread.alive?

    miq_listener_thread
  end

  def miq_listener_thread
    @miq_listener_thread ||= Thread.new { miq_messaging_listener_thread }
  end

  def miq_messaging_listener_thread
    loop do
      send(:"miq_messaging_subscribe_#{@worker.class.miq_messaging_subscribe_mode}") do |msg|
        @message_queue << msg
      end
    rescue => err
      _log.warn("miq_messaging_listener_thread error [#{err}]")
    end
  end

  def miq_messaging_subscribe_topic(&block)
    messaging_client = MiqQueue.messaging_client(self.class.name)
    messaging_client.subscribe_topic(:service => "manageiq.#{@worker.queue_name}", :persist_ref => @worker.guid, &block)
  end

  def miq_messaging_subscribe_queue(&block)
    messaging_client = MiqQueue.messaging_client(self.class.name)
    messaging_client.subscribe_messages(:service => "manageiq.#{@worker.queue_name}") do |messages|
      messages.each(&block)
    end
  end

  # Only for file based heartbeating
  def heartbeat_message_timeout(message)
    if message.msg_timeout
      timeout = worker_settings[:poll] + message.msg_timeout
      systemd_worker? ? worker.sd_notify_watchdog_usec(timeout) : heartbeat_to_file(timeout)
    end
  end
end