ManageIQ/manageiq

View on GitHub
app/models/miq_server.rb

Summary

Maintainability
A
2 hrs
Test Coverage
C
71%
require 'resolv'

class MiqServer < ApplicationRecord
  include AtStartup
  include ServerSmartProxy
  include ConfigurationManagement
  include EnvironmentManagement
  include LogManagement
  include QueueManagement
  include RoleManagement
  include StatusManagement
  include UuidMixin
  acts_as_miq_taggable
  include MiqPolicyMixin
  include RelationshipMixin

  alias_attribute :description, :name

  belongs_to              :vm, :inverse_of => :miq_server
  belongs_to              :zone
  has_many                :messages,  :as => :handler, :class_name => 'MiqQueue'
  has_many                :miq_events, :as => :target
  has_many                :miq_workers, :dependent => :destroy

  before_destroy          :validate_is_deleteable
  after_destroy           :destroy_linked_events_queue

  default_value_for(:name, "EVM")
  default_value_for(:zone) { Zone.default_zone }

  scope :active_miq_servers, -> { where(:status => STATUSES_ACTIVE) }
  scope :recently_active,    -> { where(:last_heartbeat => 10.minutes.ago.utc..) }
  scope :with_zone_id, ->(zone_id) { where(:zone_id => zone_id) }
  virtual_delegate :description, :to => :zone, :prefix => true, :allow_nil => true, :type => :string

  validate :validate_zone_not_maintenance?
  validate :zone_unchanged_in_pods, :on => :update

  GUID_FILE = Rails.root.join("GUID").freeze

  STATUS_STARTING       = 'starting'.freeze
  STATUS_STARTED        = 'started'.freeze
  STATUS_RESTARTING     = 'restarting'.freeze
  STATUS_STOPPED        = 'stopped'.freeze
  STATUS_QUIESCE        = 'quiesce'.freeze
  STATUS_NOT_RESPONDING = 'not responding'.freeze
  STATUS_KILLED         = 'killed'.freeze

  STATUSES_STOPPED = [STATUS_STOPPED, STATUS_KILLED]
  STATUSES_ACTIVE  = [STATUS_STARTING, STATUS_STARTED]
  STATUSES_ALIVE   = STATUSES_ACTIVE + [STATUS_RESTARTING, STATUS_QUIESCE]

  RESTART_EXIT_STATUS = 123

  def validate_zone_not_maintenance?
    errors.add(:zone, N_('cannot be maintenance zone')) if zone&.maintenance?
  end

  def hostname
    h = super
    h if h.to_s.hostname?
  end

  def starting_server_record
    self.started_on = self.last_heartbeat = Time.now.utc
    self.stopped_on = ""
    self.status     = "starting"
    self.pid        = Process.pid
    self.build      = Vmdb::Appliance.BUILD
    self.version    = Vmdb::Appliance.VERSION
    self.is_master  = false
    self.sql_spid   = ActiveRecord::Base.connection.spid
    save
  end

  def destroy_linked_events_queue
    MiqQueue.put(
      :class_name  => "MiqServer",
      :method_name => 'destroy_linked_events',
      :args        => [id],
      :zone        => my_zone
    )
  end

  def self.destroy_linked_events(server_id)
    EventStream.where(:target_id => server_id, :target_type => "MiqServer").destroy_all
  end

  def self.kill_all_workers
    svr = my_server(true)
    svr&.worker_manager&.kill_all_workers
  end

  def self.pidfile
    @pidfile ||= "#{Rails.root.join("tmp/pids/evm.pid")}"
  end

  def self.running?
    p = PidFile.new(pidfile)
    p.running? ? p.pid : false
  end

  def self.seed
    unless exists?(:guid => my_guid)
      _log.info("Creating Default MiqServer with guid=[#{my_guid}], zone=[#{Zone.default_zone.name}]")
      create!(:guid => my_guid, :zone => Zone.default_zone)
      my_server_clear_cache
      ::Settings.reload! # Reload the Settings now that we have a server record
      _log.info("Creating Default MiqServer... Complete")
    end
    my_server
  end

  def validate_is_deleteable
    unless is_deleteable?
      _log.error(@errors.full_messages)
      throw :abort
    end
  end

  def server_monitor
    @server_monitor ||= ServerMonitor.new(self)
  end

  def worker_manager
    @worker_manager ||= WorkerManagement.build(self)
  end

  delegate :start_workers, :stop_worker, :enough_resource_to_start_worker?, :to => :worker_manager

  def heartbeat
    # Heartbeat the server
    t = Time.now.utc
    _log.info("Heartbeat [#{t}]...")
    reload
    self.last_heartbeat = t
    self.status = "started" if status == "not responding"
    save
    _log.info("Heartbeat [#{t}]...Complete")
  end

  def log_active_servers
    MiqRegion.my_region.active_miq_servers.sort_by { |s| [s.my_zone, s.name] }.each do |s|
      local  = s.is_local? ? 'Y' : 'N'
      master = s.is_master? ? 'Y' : 'N'
      $log.info("MiqServer: local=#{local}, master=#{master}, status=#{'%08s' % s.status}, id=#{'%05d' % s.id}, pid=#{'%05d' % s.pid}, guid=#{s.guid}, name=#{s.name}, zone=#{s.my_zone}, hostname=#{s.hostname}, ipaddress=#{s.ipaddress}, version=#{s.version}, build=#{s.build}, active roles=#{s.active_role_names.join(':')}")
    end
  end

  def stop_poll
    ::Settings.server.stop_poll.to_i_with_method
  end

  def heartbeat_frequency
    ::Settings.server.heartbeat_frequency.to_i_with_method
  end

  def server_dequeue_frequency
    ::Settings.server.server_dequeue_frequency.to_i_with_method
  end

  def server_monitor_frequency
    ::Settings.server.server_monitor_frequency.to_i_with_method
  end

  def server_log_frequency
    ::Settings.server.server_log_frequency.to_i_with_method
  end

  def server_role_monitor_frequency
    ::Settings.server.server_role_monitor_frequency.to_i_with_method
  end

  def worker_dequeue_frequency
    ::Settings.server.worker_dequeue_frequency.to_i_with_method
  end

  def worker_messaging_frequency
    ::Settings.server.worker_messaging_frequency.to_i_with_method
  end

  def worker_monitor_frequency
    ::Settings.server.worker_monitor_frequency.to_i_with_method
  end

  def memory_threshold
    ::Settings.server.memory_threshold.to_i_with_method
  end

  def threshold_exceeded?(name, now = Time.now.utc)
    @thresholds ||= Hash.new(1.day.ago.utc)
    exceeded = now > (@thresholds[name] + send(name))
    @thresholds[name] = now if exceeded
    exceeded
  end

  def monitor
    now = Time.now.utc
    Benchmark.realtime_block(:heartbeat)               { heartbeat }         if threshold_exceeded?(:heartbeat_frequency, now)
    Benchmark.realtime_block(:server_dequeue)          { process_miq_queue } if threshold_exceeded?(:server_dequeue_frequency, now)

    Benchmark.realtime_block(:server_monitor) do
      server_monitor.monitor_servers
      monitor_server_roles if is_master?
      messaging_health_check
    end if threshold_exceeded?(:server_monitor_frequency, now)

    Benchmark.realtime_block(:log_active_servers)      { log_active_servers }                     if threshold_exceeded?(:server_log_frequency, now)
    Benchmark.realtime_block(:role_monitor)            { monitor_active_roles }                   if threshold_exceeded?(:server_role_monitor_frequency, now)
    Benchmark.realtime_block(:worker_monitor)          { worker_manager.monitor_workers }         if threshold_exceeded?(:worker_monitor_frequency, now)
    Benchmark.realtime_block(:worker_dequeue)          { worker_manager.populate_queue_messages } if threshold_exceeded?(:worker_dequeue_frequency, now)
    monitor_myself
  rescue SystemExit, SignalException
    # TODO: We're rescuing Exception below. WHY? :bomb:
    # A SystemExit would be caught below, so we need to explicitly rescue/raise.
    raise
  rescue Exception => err
    _log.error(err.message)
    _log.log_backtrace(err)

    begin
      _log.info("Reconnecting to database after error...")
      # Remove the connection and establish a new one since reconnect! doesn't always play nice with SSL postgresql connections
      spec_name = ActiveRecord::Base.connection_specification_name
      ActiveRecord::Base.establish_connection(ActiveRecord::Base.remove_connection(spec_name))
    rescue Exception => err
      _log.error("#{err.message}, during reconnect!")
    else
      _log.info("Reconnecting to database after error...Successful")
    end
  end

  def monitor_myself
    if memory_usage.to_i > memory_threshold
      msg = "server(pid: #{pid}, name: #{name}) memory usage [#{memory_usage.to_i}] exceeded limit: [#{memory_threshold}].  Exiting server process."
      _log.warn(msg)

      notification_options = {
        :name             => name,
        :memory_usage     => memory_usage.to_i,
        :memory_threshold => memory_threshold,
        :pid              => pid
      }
      Notification.create(:type => "evm_server_memory_exceeded", :options => notification_options)
      shutdown_and_exit(1)
    end
  end

  def stop(sync = false)
    return if stopped?

    shutdown_and_exit_queue
    wait_for_stopped if sync
  end

  def wait_for_stopped
    loop do
      reload
      break if stopped?

      sleep stop_poll
    end
  end

  def self.stop(sync = false)
    svr = my_server(true) rescue nil
    svr.stop(sync) unless svr.nil?
    PidFile.new(pidfile).remove
  end

  def kill
    # Kill all the workers of this server
    worker_manager.kill_all_workers

    # Then kill this server
    _log.info("initiated for #{format_full_log_msg}")
    update(:stopped_on => Time.now.utc, :status => "killed", :is_master => false)
    (pid == Process.pid) ? shutdown_and_exit : Process.kill(9, pid)
  end

  def self.kill
    svr = my_server(true)
    svr.kill unless svr.nil?
    PidFile.new(pidfile).remove
  end

  def shutdown
    _log.info("initiated for #{format_full_log_msg}")
    quiesce

    MiqEvent.raise_evm_event(self, "evm_server_stop") rescue nil
  end

  def shutdown_and_exit(exit_status = 0)
    shutdown
    exit exit_status
  end

  def quiesce
    update_attribute(:status, 'quiesce')
    deactivate_all_roles
    worker_manager.quiesce_all_workers
    update(:stopped_on => Time.now.utc, :status => "stopped", :is_master => false)
  end

  # Restart the local server
  def restart
    raise _("Server restart is only supported on Linux") unless MiqEnvironment::Command.is_linux?

    _log.info("Server restart initiating...")
    update_attribute(:status, "restarting")

    shutdown_and_exit(RESTART_EXIT_STATUS)
  end

  def format_full_log_msg
    "MiqServer [#{name}] with ID: [#{id}], PID: [#{pid}], GUID: [#{guid}]"
  end

  def format_short_log_msg
    "MiqServer [#{name}] with ID: [#{id}]"
  end

  def friendly_name
    _("EVM Server (%{id})") % {:id => pid}
  end

  def who_am_i
    @who_am_i ||= "#{name} #{my_zone} #{self.class.name} #{id}"
  end

  def database_application_name
    "MIQ|#{Process.pid}|#{compressed_id}|-|#{zone.compressed_id}|Server|#{zone.name}".truncate(64)
  end

  def set_database_application_name
    ArApplicationName.name = database_application_name
  end

  def is_local?
    guid == MiqServer.my_guid
  end

  def is_remote?
    !is_local?
  end

  def is_recently_active?
    last_heartbeat && (last_heartbeat >= 10.minutes.ago.utc)
  end

  def is_deleteable?
    return true if MiqEnvironment::Command.is_podified?

    if is_local?
      message = N_("Cannot delete currently used %{log_message}") % {:log_message => format_short_log_msg}
      @errors ||= ActiveModel::Errors.new(self)
      @errors.add(:base, message)
      return false
    end
    return true if stopped?

    if is_recently_active?
      message = N_("Cannot delete recently active %{log_message}") % {:log_message => format_short_log_msg}
      @errors ||= ActiveModel::Errors.new(self)
      @errors.add(:base, message)
      return false
    end

    true
  end

  def started?
    status == "started"
  end

  def stopped?
    STATUSES_STOPPED.include?(status)
  end

  def active?
    STATUSES_ACTIVE.include?(status)
  end

  def alive?
    STATUSES_ALIVE.include?(status)
  end

  def logon_status
    return :ready if started?

    started_on < (Time.now.utc - ::Settings.server.startup_timeout) ? :timed_out_starting : status.to_sym
  end

  def logon_status_details
    result = {:status => logon_status}
    return result if result[:status] == :ready

    wcnt = MiqWorker.find_starting.length
    workers = wcnt == 1 ? "worker" : "workers"
    message = "Waiting for #{wcnt} #{workers} to start"
    result.merge(:message => message)
  end

  def ui_hostname
    if MiqEnvironment::Command.is_podified?
      ENV.fetch("APPLICATION_DOMAIN", nil)
    else
      hostname || ipaddress
    end
  end

  def ui_ipaddress
    if MiqEnvironment::Command.is_podified?
      nil
    else
      ipaddress
    end
  end

  def ui_address(contact_with = :hostname)
    if MiqEnvironment::Command.is_podified?
      ENV.fetch("APPLICATION_DOMAIN", nil)
    else
      contact_with == :hostname ? ui_hostname : ui_ipaddress
    end
  end

  def ui_url(contact_with = :hostname)
    url_override = settings_for_resource.ui.url
    return url_override if url_override

    host = ui_address(contact_with)
    return if host.nil?

    URI::HTTPS.build(:host => host).to_s
  end

  def ws_hostname
    if MiqEnvironment::Command.is_podified?
      ENV.fetch("APPLICATION_DOMAIN", nil)
    else
      hostname || ipaddress
    end
  end

  def ws_ipaddress
    if MiqEnvironment::Command.is_podified?
      nil
    else
      ipaddress
    end
  end

  def ws_address
    if MiqEnvironment::Command.is_podified?
      ENV.fetch("APPLICATION_DOMAIN", nil)
    else
      ::Settings.webservices.contactwith == 'hostname' ? ws_hostname : ws_ipaddress
    end
  end

  def ws_url
    url_override = settings_for_resource.webservices.url
    return url_override if url_override

    host = ws_address
    return if host.nil?

    URI::HTTPS.build(:host => host).to_s
  end

  #
  # Zone and Role methods
  #
  @@my_guid_mutex = Mutex.new
  def self.my_guid
    @@my_guid_mutex.synchronize { @@my_guid ||= load_or_generate_guid }
  end

  # Under normal circumstances there really shouldn't be any reason to use
  # this method. It should only be used for tests and when we need to monitor
  # multiple servers.
  def self.my_guid=(guid)
    @@my_guid_mutex.synchronize { @@my_guid = guid }
  end

  def self.load_or_generate_guid
    guid = File.read(GUID_FILE).strip if File.exist?(GUID_FILE)
    return guid if guid.present?

    SecureRandom.uuid.tap do |guid|
      _log.info("Generated MiqServer GUID #{guid}")
      File.open(GUID_FILE, "wb") do |file|
        file.sync = true
        file.write(guid)
      end
    end
  end

  cache_with_timeout(:my_server) { find_by(:guid => my_guid) }

  def self.my_zone(force_reload = false)
    my_server(force_reload).my_zone
  end

  def self.my_roles(force_reload = false)
    my_server(force_reload).my_roles
  end

  def self.my_role(force_reload = false)
    my_server(force_reload).my_role
  end

  def self.my_active_roles(force_reload = false)
    my_server(force_reload).active_role_names
  end

  def self.my_active_role(force_reload = false)
    my_server(force_reload).active_role
  end

  def self.licensed_roles(force_reload = false)
    my_server(force_reload).licensed_roles
  end

  def my_zone
    zone.name
  end

  def has_zone?(zone_name)
    my_zone == zone_name
  end

  def find_other_started_servers_in_region
    self.class.active_miq_servers.in_my_region.where.not(:id => id).to_a
  end

  def find_other_servers_in_region
    self.class.active_miq_servers.where.not(:id => id).to_a
  end

  def find_other_started_servers_in_zone
    self.class.active_miq_servers.where(:zone_id => zone_id).where.not(:id => id).to_a
  end

  def find_other_servers_in_zone
    self.class.where(:zone_id => zone_id).where.not(:id => id).to_a
  end

  def mark_as_not_responding(seconds = ::Settings.server.heartbeat_timeout.to_i_with_method)
    msg = "#{format_full_log_msg} has not responded in #{seconds} seconds."
    _log.info(msg)
    update(:status => "not responding")
    deactivate_all_roles

    # TODO: need to add event for this
    MiqEvent.raise_evm_event_queue_in_region(self, "evm_server_not_responding", :event_details => msg)

    # Mark all messages currently being worked on by the not responding server's workers as error
    _log.info("Cleaning all active messages being processed by #{format_full_log_msg}")
    miq_workers.each(&:clean_active_messages)
  end

  def display_name
    "#{name} [#{id}]"
  end

  def server_timezone
    ::Settings.server.timezone || "UTC"
  end

  def tenant_identity
    User.super_admin
  end

  def miq_region
    MiqRegion.find_by(:region => region_id)
  end

  def self.display_name(number = 1)
    n_('Server', 'Servers', number)
  end

  def self.zone_is_modifiable?
    return false if MiqEnvironment::Command.is_podified?

    Zone.visible.in_my_region.count > 1
  end

  def self.audit_details
    {
      :vms                     => Vm.active.count,
      :hosts                   => Host.active.count,
      :aggregate_physical_cpus => Host.active.sum(:aggregate_physical_cpus),
      :providers               => ExtManagementSystem.group(:type).count,
      :deployment              => MiqEnvironment::Command.is_podified? ? "containers" : "appliance",
      :arch                    => MiqEnvironment.arch.to_s,
      :services                => {:active => Service.active.count, :inactive => Service.inactive.count},
      :service_catalog_items   => {:active => ServiceTemplate.active.count, :archived => ServiceTemplate.archived.count},
      :region_count            => MiqRegion.count,
    }
  end

  def self.unmanaged_resources
    {
      :vms                     => Vm.not_active.count,
      :hosts                   => Host.archived.count,
      :aggregate_physical_cpus => Host.archived.in_my_region.sum(:aggregate_physical_cpus),
    }
  end

  def self.report_audit_details
    totals = audit_details.slice(:vms, :hosts)
    $audit_log.info("Under Management: #{totals.to_json}")
  end

  private

  def zone_unchanged_in_pods
    return unless MiqEnvironment::Command.is_podified?

    errors.add(:zone, N_('cannot be changed when running in containers')) if zone_id_changed?
  end

  def messaging_health_check
    broker = MiqQueue.messaging_client("health_check")
    return if broker.nil?

    broker.publish_message(:service => "manageiq.liveness-check", :message => "test message", :payload => {})
    broker.subscribe_messages(:service => "manageiq.liveness-check") { break }
  rescue => err
    _log.error("Messaging health check failed: #{err}")
    shutdown_and_exit(1)
  end
end # class MiqServer