ManageIQ/manageiq

View on GitHub
app/models/miq_worker/runner.rb

Summary

Maintainability
A
55 mins
Test Coverage
F
57%
require 'miq-process'

class MiqWorker::Runner
  class TemporaryFailure < RuntimeError
  end

  include Vmdb::Logging
  attr_accessor :last_hb, :worker, :worker_settings
  attr_reader   :active_roles, :server

  delegate :systemd_worker?, :to => :worker

  INTERRUPT_SIGNALS = %w[SIGINT SIGTERM].freeze

  SAFE_SLEEP_SECONDS = 60

  def self.start_worker(*args)
    new(*args).start
  end

  def poll_method
    return @poll_method unless @poll_method.nil?

    self.poll_method = worker_settings[:poll_method]&.to_sym
  end

  def poll_method=(val)
    val = "sleep_poll_#{val}"
    raise ArgumentError, _("poll method '%{value}' not defined") % {:value => val} unless respond_to?(val)

    @poll_method = val.to_sym
  end

  def self.corresponding_model
    module_parent
  end

  def initialize(cfg = {})
    @cfg = cfg
    $log ||= Rails.logger

    @server = MiqServer.my_server(true)
    @worker_should_exit = false

    worker_initialization
    after_initialize

    @worker.release_db_connection if @worker.respond_to?(:release_db_connection)
  end

  def worker_initialization
    starting_worker_record
    set_process_title
    # Sync the config and roles early since heartbeats and logging require the configuration
    sync_config

    set_connection_pool_size
  end

  # More process specific stuff :-(
  def set_database_application_name
    ArApplicationName.name = @worker.database_application_name
  end

  def set_connection_pool_size
    cur_size = ActiveRecord::Base.connection_pool.instance_variable_get(:@size)
    new_size = worker_settings[:connection_pool_size] || cur_size
    return if cur_size == new_size

    ActiveRecord::Base.connection_pool.instance_variable_set(:@size, new_size)
    _log.info("#{log_prefix} Changed connection_pool size from #{cur_size} to #{new_size}")
  end

  ###############################
  # Worker Monitor Methods
  ###############################

  def worker_monitor_drb
    @worker_monitor_drb ||= begin
      raise _("%{log} No MiqServer found to establishing DRb Connection to") % {:log => log_prefix} if server.nil?

      drb_uri = server.reload.drb_uri
      if drb_uri.blank?
        raise _("%{log} Blank DRb_URI for MiqServer with ID=[%{number}], NAME=[%{name}], PID=[%{pid_number}], GUID=[%{guid_number}]") %
          {:log         => log_prefix,
           :number      => server.id,
           :name        => server.name,
           :pid_number  => server.pid,
           :guid_number => server.guid}
      end
      _log.info("#{log_prefix} Initializing DRb Connection to MiqServer with ID=[#{server.id}], NAME=[#{server.name}], PID=[#{server.pid}], GUID=[#{server.guid}] DRb URI=[#{drb_uri}]")
      require 'drb'
      DRbObject.new(nil, drb_uri)
    end
  end

  def start
    self.class.module_parent.rails_worker? ? start_rails_worker : start_non_rails_worker
  end

  def start_rails_worker
    prepare
    run
  end

  def start_non_rails_worker
    # Create a temp file and immediately unlink it to create a
    # secure hidden file to be used for the child process' stdin.
    #
    # Typically this is done using a pipe but because we are exec'ing
    # the child worker not forking it we aren't able to have the child
    # worker read from one side of the pipe while the parent is writing
    # to the other side.  This means that the amount of data that can be
    # written is limited to the buffer size of the pipe and if you write
    # more than that it will hang.
    stdin_tmp = Tempfile.new
    File.unlink(stdin_tmp.path)

    stdin_tmp.write(worker_options.to_json)
    stdin_tmp.rewind

    $stdin.reopen(stdin_tmp)

    # Using exec here rather than fork+exec so that we can continue to use the
    # standard systemd service Type=notify and not have to use Type=forking which
    # can limit other systemd options available to the service.
    Bundler.unbundled_exec(worker_env, worker_cmdline)
  end

  def recover_from_temporary_failure
    @backoff ||= 30
    @backoff *= 2 if @backoff < 4.hours
    safe_sleep(@backoff)
  end

  def prepare
    set_database_application_name
    ObjectSpace.garbage_collect
    started_worker_record
    do_before_work_loop
    self
  end

  def run
    do_work_loop
  end

  def self.log_prefix
    @log_prefix ||= "MIQ(#{name})"
  end

  def log_prefix
    self.class.log_prefix
  end

  #
  # @worker object handling methods
  #

  def find_worker_record
    @worker = self.class.corresponding_model.find_by(:guid => @cfg[:guid])
    do_exit("Unable to find instance for worker GUID [#{@cfg[:guid]}].", 1) if @worker.nil?
    MiqWorker.my_guid = @cfg[:guid]
  end

  def starting_worker_record
    find_worker_record
    @worker.status         = "starting"
    @worker.started_on     = Time.now.utc
    @worker.last_heartbeat = Time.now.utc
    @worker.update_spid
    @worker.save
  end

  def started_worker_record
    reload_worker_record
    @worker.sd_notify_started if systemd_worker?
    @worker.status         = "started"
    @worker.last_heartbeat = Time.now.utc
    @worker.update_spid
    @worker.save
    $log.info("#{self.class.name} started. ID [#{@worker.id}], PID [#{@worker.pid}], GUID [#{@worker.guid}], Zone [#{MiqServer.my_zone}], Role [#{MiqServer.my_role}]")
  end

  def reload_worker_record
    worker_id   = @worker.id
    worker_guid = @worker.guid
    begin
      @worker.reload
    rescue ActiveRecord::RecordNotFound
      do_exit("Unable to find instance for worker ID [#{worker_id}] GUID [#{worker_guid}].", 1)
    end
  end

  #
  # Worker exit methods
  #

  def self.safe_log(worker, message = nil, exit_code = 0)
    meth = (exit_code == 0) ? :info : :error

    prefix = "#{log_prefix} "      rescue ""
    pid    = "PID [#{Process.pid}] "    rescue ""
    guid   = worker.nil? ? '' : "GUID [#{worker.guid}] "  rescue ""
    id     = worker.nil? ? '' : "ID [#{worker.id}] "      rescue ""
    logmsg = "#{prefix}#{id}#{pid}#{guid}#{message}"

    begin
      $log.send(meth, logmsg)
    rescue
      puts "#{meth.to_s.upcase}: #{logmsg}" rescue nil
    end
  end

  def safe_log(message = nil, exit_code = 0)
    self.class.safe_log(@worker, message, exit_code)
  end

  def update_worker_record_at_exit(exit_code)
    return if @worker.nil?

    @worker.reload
    @worker.status     = exit_code == 0 ? MiqWorker::STATUS_STOPPED : MiqWorker::STATUS_ABORTED
    @worker.stopped_on = Time.now.utc
    @worker.save

    @worker.sd_notify_stopping if systemd_worker?
    @worker.status_update
    @worker.log_status
  end

  def do_exit(message = nil, exit_code = 0)
    return if @exiting # Prevent running the do_exit logic more than one time

    @exiting = true

    begin
      before_exit(message, exit_code)
    rescue Exception => e
      safe_log("Error in before_exit: #{e.message}", :error)
    end

    begin
      update_worker_record_at_exit(exit_code)
    rescue Exception => e
      safe_log("Error in update_worker_record_at_exit: #{e.message}", :error)
    end

    begin
      MiqWorker.release_db_connection
    rescue Exception => e
      safe_log("Error in releasing database connection: #{e.message}", :error)
    end

    safe_log("#{message} Worker exiting.", exit_code)
  ensure
    exit exit_code
  end

  def sync_config
    # Sync roles
    @active_roles = MiqServer.my_active_roles(true)
    after_sync_active_roles

    # Sync settings
    Vmdb::Settings.reload!
    @my_zone ||= MiqServer.my_zone
    sync_worker_settings
    sync_blacklisted_events
    after_sync_config

    _log.info("ID [#{@worker.id}], PID [#{Process.pid}], GUID [#{@worker.guid}], Zone [#{@my_zone}], Active Roles [#{@active_roles.join(',')}], Assigned Roles [#{MiqServer.my_role}], Configuration:")
    $log.log_hashes(@worker_settings)
    $log.info("---")
    $log.log_hashes(@cfg)

    @worker.release_db_connection if @worker.respond_to?(:release_db_connection)
  end

  def sync_worker_settings
    @worker_settings = self.class.corresponding_model.worker_settings(:config => ::Settings.to_hash)
    @poll = @worker_settings[:poll]
    poll_method
  end

  #
  # Work methods
  #

  def do_work
    raise NotImplementedError, _("must be implemented in a subclass")
  end

  def do_work_loop
    warn_about_heartbeat_skipping if skip_heartbeat?
    loop do
      begin
        heartbeat
        do_work
      rescue TemporaryFailure => error
        msg = "#{log_prefix} Temporary failure (message: '#{error}') caught" \
            " during #do_work. Sleeping for a while before resuming."
        _log.warn(msg)
        recover_from_temporary_failure
      rescue SystemExit
        do_exit("SystemExit signal received.")
      rescue Exception => err
        do_exit("An error has occurred during work processing: #{err}\n#{err.backtrace.join("\n")}", 1)
      else
        @backoff = nil
      end

      do_exit("Request to exit received:") if @worker_should_exit

      do_gc
      self.class.log_ruby_object_usage(worker_settings[:top_ruby_object_classes_to_log].to_i)
      send(poll_method)
    end
  end

  def heartbeat
    now = Time.now.utc
    # Heartbeats can be expensive, so do them only when needed
    return if @last_hb.kind_of?(Time) && (@last_hb + worker_settings[:heartbeat_freq]) >= now

    systemd_worker? ? @worker.sd_notify_watchdog : heartbeat_to_file

    if config_out_of_date?
      _log.info("#{log_prefix} Synchronizing configuration...")
      sync_config
      _log.info("#{log_prefix} Synchronizing configuration complete...")
    end

    @last_hb = now
    do_heartbeat_work
  rescue SystemExit, SignalException
    raise
  rescue Exception => err
    do_exit("Error heartbeating because #{err.class.name}: #{err.message}\n#{err.backtrace.join('\n')}", 1)
  end

  def heartbeat_to_file(timeout = nil)
    # Disable heartbeat check.  Useful if a worker is running in isolation
    # without the oversight of MiqServer::WorkerManagement
    return if skip_heartbeat?

    timeout ||= worker_settings[:heartbeat_timeout] || Workers::MiqDefaults.heartbeat_timeout
    File.write(@worker.heartbeat_file, (Time.now.to_i + timeout))
  end

  def config_out_of_date?
    @my_last_config_change ||= Time.now.utc

    last_config_change = server_last_change(:last_config_change)
    if last_config_change && last_config_change > @my_last_config_change
      _log.info("#{log_prefix} Configuration has changed, New TS: #{last_config_change}, Old TS: #{@my_last_config_change}")
      @my_last_config_change = last_config_change
      return true
    end

    false
  end

  def key_store
    @key_store ||= MiqMemcached.client(:namespace => "server_monitor")
  end

  def server_last_change(key)
    key_store.get(key)
  end

  def do_gc
    t = Time.now.utc
    interval = worker_settings[:gc_interval] || 15.minutes
    interval = 1.minute if interval < 1.minute
    if @last_gc.nil? || @last_gc + interval < t
      gc_time = Benchmark.realtime { ObjectSpace.garbage_collect }
      gc_meth = gc_time >= 5 ? :warn : :debug
      $log.send(gc_meth, "#{log_prefix} Garbage collection took #{gc_time} seconds")
      @last_gc = t
    end
  end

  #
  # For derived classes to override, if they need to
  #
  def do_heartbeat_work
  end

  def do_before_work_loop
  end

  def after_initialize
  end

  def after_sync_config
  end

  def after_sync_active_roles
  end

  def before_exit(_message, _exit_code)
  end

  def sync_blacklisted_events
  end

  #
  # Polling methods
  #

  def sleep_poll_normal
    sleep(@poll)
  end

  def sleep_poll_escalate
    @poll_escalate = @poll_escalate.nil? ? @poll : @poll_escalate * 2
    @poll_escalate = worker_settings[:poll_escalate_max] if @poll_escalate > worker_settings[:poll_escalate_max]
    sleep(@poll_escalate)
  end

  def reset_poll_escalate
    @poll_escalate = nil
  end

  def safe_sleep(seconds)
    (seconds / SAFE_SLEEP_SECONDS).times do
      sleep SAFE_SLEEP_SECONDS
      heartbeat
    end
    sleep(seconds % SAFE_SLEEP_SECONDS)
  end

  def self.ruby_object_usage
    types = Hash.new { |h, k| h[k] = 0 }
    ObjectSpace.each_object(Object) do |obj|
      next unless defined?(obj.class)

      types[obj.class.name] += 1
    end
    types
  end

  LOG_RUBY_OBJECT_USAGE_INTERVAL = 60
  def self.log_ruby_object_usage(top = 20)
    return unless top > 0

    t = Time.now.utc
    @last_ruby_object_usage ||= t

    if (@last_ruby_object_usage + LOG_RUBY_OBJECT_USAGE_INTERVAL) < t
      types = ruby_object_usage
      _log.info("Ruby Object Usage: #{types.sort_by { |_k, v| -v }.take(top).inspect}")
      @last_ruby_object_usage = t
    end
  end

  # Traps both SIGTERM and SIGINT here, and does the same thing, but in a
  # container based deployment, SIGTERM is probably the one that will be
  # received from the container management system (aka OpenShift).  The SIGINT
  # trap is mostly a developer convenience.
  def setup_sigterm_trap
    INTERRUPT_SIGNALS.each do |signal|
      Kernel.trap(signal) { @worker_should_exit = true }
    end
  end

  protected

  def process_message(message, *args)
    meth = "message_#{message}"
    if respond_to?(meth)
      send(meth, *args)
    else
      _log.warn("#{log_prefix} Message [#{message}] is not recognized, ignoring")
    end
  end

  def process_title
    type   = @worker.abbreviated_class_name
    title  = "#{MiqWorker::PROCESS_TITLE_PREFIX} #{type} id: #{@worker.id}"
    title << ", queue: #{@worker.queue_name}" if @worker.queue_name
    title << ", uri: #{@worker.uri}" if @worker.uri
    title
  end

  def set_process_title
    Process.setproctitle(process_title)
  end

  private

  def worker_options
    settings = {
      :worker_settings => worker_settings
    }

    worker.class.worker_settings_paths.to_a.each do |settings_path|
      settings.store_path(settings_path, Settings.to_hash.dig(*settings_path))
    end

    {
      :messaging => MiqQueue.messaging_client_options,
      :settings  => worker.class.normalize_settings!(settings, :recurse => true)
    }
  end

  def worker_env
    {
      "APP_ROOT"              => Rails.root.to_s,
      "GUID"                  => @worker.guid,
      "WORKER_HEARTBEAT_FILE" => @worker.heartbeat_file
    }
  end

  def worker_cmdline
    # Attempt to find the plugin where the class lives then default to
    # the application root
    engine = Vmdb::Plugins.plugin_for_class(self.class) || Rails

    worker_type = self.class.module_parent.name.split("::").last.underscore
    engine.root.join("workers/#{worker_type}/worker").to_s
  end

  def skip_heartbeat?
    ENV["DISABLE_MIQ_WORKER_HEARTBEAT"]
  end

  def warn_about_heartbeat_skipping
    puts "**************************************************"
    puts "WARNING:  SKIPPING HEARTBEATING WITH THIS WORKER!"
    puts "**************************************************"
    puts ""
    puts "Remove the `DISABLE_MIQ_WORKER_HEARTBEAT` ENV variable"
    puts "to reenable heartbeating normally."
  end
end