app/models/miq_worker.rb
require 'io/wait'
class MiqWorker < ApplicationRecord
include ContainerCommon
include SystemdCommon
include UuidMixin
after_initialize :set_system_uid
before_destroy :error_out_tasks_with_active_queue_message, :log_destroy_of_worker_messages
belongs_to :miq_server
has_many :messages, :as => :handler, :class_name => 'MiqQueue'
has_many :active_messages, -> { where(["state = ?", "dequeue"]) }, :as => :handler, :class_name => 'MiqQueue'
has_many :ready_messages, -> { where(["state = ?", "ready"]) }, :as => :handler, :class_name => 'MiqQueue'
has_many :processed_messages, -> { where.not(state: "ready") }, :as => :handler, :class_name => 'MiqQueue', :dependent => :destroy
virtual_column :friendly_name, :type => :string
virtual_column :uri_or_queue_name, :type => :string
scope :with_miq_server_id, ->(server_id) { where(:miq_server_id => server_id) }
scope :with_status, ->(status) { where(:status => status) }
class_attribute :my_guid
STATUS_CREATING = 'creating'.freeze
STATUS_STARTING = 'starting'.freeze
STATUS_STARTED = 'started'.freeze
STATUS_READY = 'ready'.freeze
STATUS_WORKING = 'working'.freeze
STATUS_STOPPING = 'stopping'.freeze
STATUS_STOPPED = 'stopped'.freeze
STATUS_KILLED = 'killed'.freeze
STATUS_ABORTED = 'aborted'.freeze
STATUSES_STARTING = [STATUS_CREATING, STATUS_STARTING]
STATUSES_CURRENT = [STATUS_STARTED, STATUS_READY, STATUS_WORKING]
STATUSES_STOPPED = [STATUS_STOPPED, STATUS_KILLED, STATUS_ABORTED]
STATUSES_CURRENT_OR_STARTING = STATUSES_CURRENT + STATUSES_STARTING
STATUSES_ALIVE = STATUSES_CURRENT_OR_STARTING + [STATUS_STOPPING]
PROCESS_INFO_FIELDS = %i[priority memory_usage percent_memory percent_cpu memory_size cpu_time proportional_set_size unique_set_size]
PROCESS_TITLE_PREFIX = "MIQ:".freeze
def self.atShutdown
stop_workers
end
class << self
attr_writer :workers, :rails_worker
end
def self.bundler_groups
%w[manageiq_default ui_dependencies]
end
def self.kill_priority
raise NotImplementedError, ".kill_priority must be implemented in a subclass"
end
def self.workers
return 0 unless has_required_role?
return @workers.call if @workers.kind_of?(Proc)
return @workers unless @workers.nil?
workers_configured_count
end
def self.rails_worker?
return @rails_worker.call if @rails_worker.kind_of?(Proc)
return @rails_worker unless @rails_worker.nil?
true
end
delegate :rails_worker?, :to => :class
def self.scalable?
maximum_workers_count.nil? || maximum_workers_count > 1
end
def scalable?
self.class.scalable?
end
def self.workers_configured_count
count = worker_settings[:count]
if maximum_workers_count.kind_of?(Integer)
count = maximum_workers_count if maximum_workers_count < count
end
count
end
def self.concrete_subclasses
leaf_subclasses | descendants.select { |d| d.try(:acts_as_sti_leaf_class?) }
end
class_attribute :default_queue_name, :required_roles, :maximum_workers_count, :include_stopping_workers_on_synchronize, :worker_settings_paths
self.include_stopping_workers_on_synchronize = false
self.required_roles = []
self.worker_settings_paths = []
def self.server_scope
return current_scope if current_scope && current_scope.where_values_hash.include?('miq_server_id')
where(:miq_server_id => MiqServer.my_server&.id)
end
CONDITION_CURRENT = {:status => STATUSES_CURRENT}
def self.find_current
server_scope.where(CONDITION_CURRENT)
end
def self.find_current_in_my_region
in_my_region.where(CONDITION_CURRENT)
end
def self.find_starting
server_scope.where(:status => STATUSES_STARTING)
end
def self.find_stopping
server_scope.where(:status => STATUS_STOPPING)
end
def self.find_all_starting
find_starting.where(:type => MiqWorkerType.worker_class_names)
end
def self.find_all_stopping
find_stopping.where(:type => MiqWorkerType.worker_class_names)
end
def self.find_current_or_starting
server_scope.where(:status => STATUSES_CURRENT_OR_STARTING)
end
def self.find_alive
server_scope.where(:status => STATUSES_ALIVE)
end
def self.has_required_role?
roles = if required_roles.kind_of?(Proc)
required_roles.call
else
required_roles
end
return true if roles.blank?
roles = Array(roles) if roles.kind_of?(String)
raise _("Unexpected type: <self.required_roles.class.name>") unless roles.kind_of?(Array)
roles.any? { |role| MiqServer.my_server.has_active_role?(role) }
end
def self.enough_resource_to_start_worker?
MiqServer.my_server.enough_resource_to_start_worker?(self)
end
def self.sync_workers
w = include_stopping_workers_on_synchronize ? find_alive : find_current_or_starting
current = w.length
desired = workers
result = {:adds => [], :deletes => []}
if current != desired
_log.info("Workers are being synchronized: Current #: [#{current}], Desired #: [#{desired}]")
if desired > current && enough_resource_to_start_worker?
(desired - current).times { result[:adds] << start_worker.pid }
elsif desired < current
w = w.to_a
(current - desired).times do
ww = w.pop
result[:deletes] << ww.pid
ww.stop
end
end
end
result
end
# Convert the Models name from MiqGenericWorker to :generic_worker
def self.settings_name
@settings_name ||=
if self == MiqWorker
:worker_base
elsif module_parent.try(:short_token)
# :generic_worker_infra, :generic_worker_vmware
:"#{normalized_type}_#{module_parent.short_token.underscore}"
else
# :generic_worker
normalized_type.to_sym
end
end
# Grab all the classes in the hierarchy below ActiveRecord::Base
def self.path_to_my_worker_settings
@path_to_my_worker_settings ||=
ancestors.grep(Class).select { |c| c <= MiqWorker }.reverse.collect(&:settings_name)
end
def self.fetch_worker_settings_from_server(miq_server, options = {})
return {} unless miq_server
# TODO: commit bb15370a2131e5a8f02f63de334959685b68d620 added the conditional here
# to prefer the passed in options before using the server settings for bug 998991,
# bug 1004455, bug 1004459. We'll keep this logic for now.
server_config = options[:config] || miq_server.settings
fetch_worker_settings_from_options_hash(server_config, options[:raw])
end
def self.fetch_worker_settings_from_options_hash(options_hash, raw = false)
return {} unless options_hash.key?(:workers)
settings = {}
# Get the configuration values
section = options_hash[:workers]
unless section.nil?
classes = path_to_my_worker_settings
classes.each do |c|
section = section[c]
raise _("Missing config section %{section_name}") % {:section_name => c} if section.nil?
defaults = section[:defaults]
unless defaults.nil?
defaults.delete_if { |_k, v| v == Vmdb::Settings::RESET_VALUE }
settings.merge!(defaults)
end
end
section.delete_if { |_k, v| v == Vmdb::Settings::RESET_VALUE }
settings.merge!(section)
normalize_settings!(settings) unless raw == true
end
settings
end
# If not specified, provide the worker_settings cleaned up in fixnums, etc. instead of 1.seconds, 10.megabytes
# and decrypt any values which are encrypted with ManageIQ::Password.
def self.normalize_settings!(settings, recurse: false)
settings.each_key do |k|
v = settings[k]
if v.kind_of?(Hash) && recurse
normalize_settings!(v, :recurse => true)
elsif v.kind_of?(String)
if v.number_with_method?
settings[k] = v.to_i_with_method
elsif v.match?(/\A\d+(.\d+)?\z/) # case where int/float saved as string
settings[k] = v.to_i
elsif ManageIQ::Password.encrypted?(v)
settings[k] = ManageIQ::Password.decrypt(v)
end
end
end
end
def worker_settings(options = {})
self.class.fetch_worker_settings_from_server(miq_server, options)
end
def heartbeat_file
@heartbeat_file ||= Workers::MiqDefaults.heartbeat_file(guid)
end
def self.worker_settings(options = {})
fetch_worker_settings_from_server(MiqServer.my_server, options)
end
def self.reload_worker_settings
# By default worker settings are reloaded periodically by the worker runner
end
def self.start_workers
return unless has_required_role?
workers.times { start_worker }
end
def self.stop_workers
server_scope.each(&:stop)
end
def self.restart_workers
find_current.each(&:restart)
end
def self.status_update
find_current.each(&:status_update)
end
def self.log_status(level = :info)
find_current.each { |w| w.log_status(level) }
end
def self.init_worker_object(*params)
params = params.first
params = {} unless params.kind_of?(Hash)
params[:queue_name] = default_queue_name unless params.key?(:queue_name) || default_queue_name.nil?
params[:status] = STATUS_CREATING
params[:last_heartbeat] = Time.now.utc
server_scope.new(params)
end
def self.create_worker_record(*params)
init_worker_object(*params).tap(&:save!)
end
def self.start_worker(*params)
w = containerized_worker? ? init_worker_object(*params) : create_worker_record(*params)
w.start
w
end
cache_with_timeout(:my_worker) { server_scope.find_by(:guid => my_guid) }
def self.status_update_all
MiqWorker.status_update
end
def self.log_status_all(level = :info)
MiqWorker.log_status(level)
end
def self.containerized_worker?
MiqEnvironment::Command.is_podified?
end
def containerized_worker?
self.class.containerized_worker?
end
def self.systemd_worker?
MiqEnvironment::Command.supports_systemd?
end
def systemd_worker?
self.class.systemd_worker?
end
def start_runner
if systemd_worker?
start_systemd_worker
elsif containerized_worker?
start_runner_via_container
else
start_runner_via_spawn
end
end
def start_runner_via_container
create_container_objects
end
def self.build_command_line(guid, ems_id = nil)
raise ArgumentError, "No guid provided" unless guid
require 'awesome_spawn'
cmd = "#{Gem.ruby} #{runner_script}"
cmd = "nice -n #{nice_increment} #{cmd}" if ENV["APPLIANCE"]
options = {:guid => guid, :heartbeat => nil}
options[:ems_id] = ems_id if ems_id
"#{AwesomeSpawn::CommandLineBuilder.new.build(cmd, options)} #{name}"
end
def self.runner_script
script = ManageIQ.root.join("lib/workers/bin/run_single_worker.rb")
raise "script not found: #{script}" unless File.exist?(script)
script
end
def command_line
self.class.build_command_line(*worker_options.values_at(:guid, :ems_id))
end
def environment_variables
{
"BUNDLER_GROUPS" => self.class.bundler_groups.join(",")
}
end
def start_runner_via_spawn
pid = Kernel.spawn(
environment_variables,
command_line,
[:out, :err] => [Rails.root.join("log/evm.log"), "a"]
)
Process.detach(pid)
pid
end
def start
self.pid = start_runner
save if !containerized_worker? && !systemd_worker?
msg = "Worker started: ID [#{id}], PID [#{pid}], GUID [#{guid}]"
MiqEvent.raise_evm_event_queue(miq_server || MiqServer.my_server, "evm_worker_start", :event_details => msg, :type => self.class.name)
_log.info(msg)
self
end
def stop
miq_server.worker_manager.stop_worker_queue(self)
end
# Let the worker monitor start a new worker
alias_method :restart, :stop
def kill
kill_process
destroy
end
# kill needs be done by the worker's orchestrator pod / server process
# TODO: Note, stop is async through the queue, while kill is sync. Should kill be async too?
# Also, this looks a lot like MiqServer#stop_worker_queue except stop_worker is called on the server row whereas
# we're calling kill on the worker row.
def kill_async
MiqQueue.put_unless_exists(
:class_name => self.class.name,
:instance_id => id,
:method_name => 'kill',
:queue_name => 'miq_server',
:server_guid => miq_server.guid,
:zone => miq_server.my_zone
)
end
def kill_process
if containerized_worker?
delete_container_objects
return
end
unless pid.nil?
begin
_log.info("Killing worker: ID [#{id}], PID [#{pid}], GUID [#{guid}], status [#{status}]")
Process.kill(9, pid)
loop do
break unless is_alive?
sleep(0.01)
end
rescue Errno::ESRCH
_log.warn("Worker ID [#{id}] PID [#{pid}] GUID [#{guid}] has been killed")
rescue => err
_log.warn("Worker ID [#{id}] PID [#{pid}] GUID [#{guid}] has been killed, but with the following error: #{err}")
end
end
end
def is_current?
STATUSES_CURRENT.include?(status)
end
def is_alive?
STATUSES_ALIVE.include?(status) && actually_running?
end
def is_stopped?
STATUSES_STOPPED.include?(status)
end
def current_or_starting?
STATUSES_CURRENT_OR_STARTING.include?(status)
end
def started?
STATUS_STARTED == status
end
def actually_running?
MiqProcess.is_worker?(pid)
end
def enabled_or_running?
!is_stopped? || actually_running?
end
def stopping_for_too_long?
# Note, a 'stopping' worker heartbeats in DRb but NOT to
# the database, so we can see how long it's been
# 'stopping' by checking the last_heartbeat.
stopping_timeout = self.class.worker_settings[:stopping_timeout] || Workers::MiqDefaults.stopping_timeout
status == MiqWorker::STATUS_STOPPING && (last_heartbeat + current_timeout.to_i) < stopping_timeout.seconds.ago
end
def validate_active_messages
active_messages.each { |msg| msg.check_for_timeout(_log.prefix) }
end
def clean_active_messages
active_messages.each do |m|
_log.warn("Message id: [#{m.id}] Setting state to 'error'")
m.delivered_in_error('Clean Active Messages')
end
end
private def set_system_uid
self.system_uid = unit_name if systemd_worker?
end
private def error_out_tasks_with_active_queue_message
message = "Task Handler: [#{friendly_name}] ID [#{id}] has been deleted!"
processed_messages.includes(:miq_task).where.not(:miq_task_id => nil).each do |m|
# Note, this is done synchronously from destroy because workers have 1 message they're currently "handling"
# and each message can only have 1 task, so this should be very fast even for many workers.
m.miq_task.update_status(MiqTask::STATE_FINISHED, MiqTask::STATUS_ERROR, message)
end
end
def log_destroy_of_worker_messages
ready_messages.each do |m|
_log.warn("Nullifying: #{MiqQueue.format_full_log_msg(m)}") rescue nil
m.update(:handler_id => nil, :handler_type => nil) rescue nil
end
processed_messages.each do |m|
_log.warn("Destroying: #{MiqQueue.format_full_log_msg(m)}") rescue nil
end
end
def status_update
return if MiqEnvironment::Command.is_podified?
begin
pinfo = MiqProcess.processInfo(pid)
rescue Errno::ESRCH
update(:status => STATUS_ABORTED)
_log.warn("No such process [#{friendly_name}] with PID=[#{pid}], aborting worker.")
rescue => err
_log.warn("Unexpected error: #{err.message}, while requesting process info for [#{friendly_name}] with PID=[#{pid}]")
else
# Ensure the hash only contains the values we want to store in the table
pinfo.slice!(*PROCESS_INFO_FIELDS)
pinfo[:os_priority] = pinfo.delete(:priority)
update!(pinfo)
end
end
def log_status(level = :info)
_log.send(level, "[#{friendly_name}] Worker ID [#{id}], PID [#{pid}], GUID [#{guid}], Last Heartbeat [#{last_heartbeat}], Process Info: Memory Usage [#{memory_usage}], Memory Size [#{memory_size}], Proportional Set Size: [#{proportional_set_size}], Unique Set Size: [#{unique_set_size}], Memory % [#{percent_memory}], CPU Time [#{cpu_time}], CPU % [#{percent_cpu}], Priority [#{os_priority}]")
end
def current_timeout
msg = active_messages.first
msg.try(:msg_timeout)
end
def uri_or_queue_name
uri || queue_name
end
def friendly_name
normalized_type.titleize
end
delegate :normalized_type, :to => :class
def self.abbreviated_class_name
name.sub(/^ManageIQ::Providers::/, "")
end
def abbreviated_class_name
self.class.abbreviated_class_name
end
def self.minimal_class_name
abbreviated_class_name
.sub("Miq", "")
.sub("Worker", "")
end
def minimal_class_name
self.class.minimal_class_name
end
def database_application_name
zone = MiqServer.my_server.zone
"MIQ|#{Process.pid}|#{miq_server.compressed_id}|#{compressed_id}|#{zone.compressed_id}|#{minimal_class_name}|#{zone.name}".truncate(64)
end
def format_full_log_msg
"Worker [#{self.class}] with ID: [#{id}], PID: [#{pid}], GUID: [#{guid}]"
end
def format_short_log_msg
"Worker ID: [#{id}]"
end
def self.release_db_connection
ActiveRecord::Base.connection_pool.release_connection if ActiveRecord::Base.connected?
end
def update_heartbeat
update_attribute(:last_heartbeat, Time.now.utc)
end
def self.config_settings_path
@config_settings_path ||= [:workers] + path_to_my_worker_settings
end
class << self
attr_writer :config_settings_path
end
def update_spid(spid = ActiveRecord::Base.connection.spid)
self.sql_spid = spid
end
def update_spid!(spid = ActiveRecord::Base.connection.spid)
if sql_spid != spid
self.sql_spid = spid
save
end
end
def worker_options
{:guid => guid}
end
def self.normalized_type
@normalized_type ||= if module_parent == Object
name.sub(/^Miq/, '').underscore
else
name.demodulize.underscore
end
end
def self.nice_increment
delta = worker_settings[:nice_delta]
delta.kind_of?(Integer) ? delta.to_s : "10"
end
def self.display_name(number = 1)
n_('Worker', 'Workers', number)
end
private_class_method :nice_increment
end