app/models/miq_server/worker_management/kubernetes.rb
class MiqServer::WorkerManagement::Kubernetes < MiqServer::WorkerManagement
class_attribute :current_pods
self.current_pods = Concurrent::Hash.new
class_attribute :current_deployments
self.current_deployments = Concurrent::Hash.new
attr_accessor :deployments_monitor_thread, :pods_monitor_thread
def sync_from_system
# All miq_server instances have to reside on the same Kubernetes cluster, so
# we only have to sync the list of pods and deployments once
ensure_kube_monitors_started if my_server_is_primary?
# Before syncing the workers check for any orphaned worker rows that don't have
# a current pod and delete them
cleanup_orphaned_worker_rows
# Update worker deployments with updated settings such as cpu/memory limits
sync_deployment_settings
end
def sync_starting_workers
starting = MiqWorker.find_all_starting
# Get a list of pods that aren't currently assigned to MiqWorker records
pods_without_workers = current_pods.keys - MiqWorker.server_scope.pluck(:system_uid).compact
# Non-rails workers cannot set their own miq_worker record to started once they
# have finished initializing. Check for any starting non-rails workers whose
# pod is running and mark the miq_worker as started.
starting.reject(&:rails_worker?).each do |worker|
# If the current worker doesn't have a system_uid assigned then find the first
# pod available for our worker type and link them up.
if worker.system_uid.nil?
system_uid = pods_without_workers.detect { |pod_name| pod_name.start_with?(worker.worker_deployment_name) }
if system_uid
# We have found a pod for the current worker record so remove the pod from
# the list of pods without workers and set the pod name as the system_uid
# for the current worker record.
pods_without_workers.delete(system_uid)
worker.update!(:system_uid => system_uid)
else
# If we haven't found a pod for this worker record then we need to check
# whether it has been starting for too long and should be marked as
# not responding.
stop_worker(worker, MiqServer::WorkerManagement::NOT_RESPONDING) if exceeded_heartbeat_threshold?(worker)
# Without a valid system_uid we cannot run any further logic in this
# loop.
next
end
end
worker_pod = current_pods[worker.system_uid]
next if worker_pod.nil?
worker.update!(:status => MiqWorker::STATUS_STARTED) if worker_pod[:running]
end
starting.reload
end
def sync_stopping_workers
stopping = MiqWorker.find_all_stopping
stopping.reject(&:rails_worker?).each do |worker|
next if current_pods.key?(worker[:system_uid])
worker.update!(:status => MiqWorker::STATUS_STOPPED)
end
stopping.reload
end
def enough_resource_to_start_worker?(_worker_class)
true
end
def cleanup_orphaned_worker_rows
unless current_pods.empty?
# Any worker rows which have a system_uid that is not in the list of
# current pod names, and is not starting (aka hasn't had a system_uid set
# yet) should be deleted.
orphaned_rows = miq_workers.where.not(:system_uid => current_pods.keys)
.where.not(:status => MiqWorker::STATUSES_STARTING)
unless orphaned_rows.empty?
_log.warn("Removing orphaned worker rows without corresponding pods: #{orphaned_rows.collect(&:system_uid).inspect}")
orphaned_rows.destroy_all
end
end
end
def cleanup_failed_workers
super
delete_failed_deployments
end
def failed_deployments(restart_count = 5)
# TODO: This logic might flag deployments that are hitting memory/cpu limits or otherwise not really 'failed'
current_pods.values.select { |h| h[:last_state_terminated] && h.fetch(:container_restarts, 0) > restart_count }.pluck(:label_name).uniq
end
def sync_deployment_settings
checked_deployments = Set.new
miq_workers.each do |worker|
next if checked_deployments.include?(worker.worker_deployment_name)
if deployment_resource_constraints_changed?(worker)
_log.info("Constraints changed, patching deployment: [#{worker.worker_deployment_name}]")
begin
worker.patch_deployment
rescue => err
_log.warn("Failure patching deployment: [#{worker.worker_deployment_name}] for worker: id: [#{worker.id}], system_uid: [#{worker.system_uid}]. Error: [#{err}]... skipping")
next
end
end
checked_deployments << worker.worker_deployment_name
end
end
def deployment_resource_constraints_changed?(worker)
return false unless ::Settings.server.worker_monitor.enforce_resource_constraints
container = current_deployments.fetch_path(worker.worker_deployment_name, :spec, :template, :spec, :containers).try(:first)
current_constraints = container.try(:fetch, :resources, nil) || {}
desired_constraints = worker.resource_constraints
constraints_changed?(current_constraints, desired_constraints)
end
def constraints_changed?(current, desired)
if current.present? && desired.present?
!cpu_value_eql?(current.fetch_path(:requests, :cpu), desired.fetch_path(:requests, :cpu)) ||
!cpu_value_eql?(current.fetch_path(:limits, :cpu), desired.fetch_path(:limits, :cpu)) ||
!mem_value_eql?(current.fetch_path(:requests, :memory), desired.fetch_path(:requests, :memory)) ||
!mem_value_eql?(current.fetch_path(:limits, :memory), desired.fetch_path(:limits, :memory))
else
# current, no desired => changed
# no current, desired => changed
# no current, no desired => unchanged
current.blank? ^ desired.blank?
end
end
private
# In podified there is only one "primary" miq_server whose zone is "default", the
# other miq_server instances are simply to allow for additional zones
def my_server_is_primary?
my_server.zone&.name == "default"
end
def cpu_value_eql?(current, desired)
# Convert to millicores if not already converted: "1" -> 1000; "1000m" -> 1000
current = current.to_s[-1] == "m" ? current.to_f : current.to_f * 1000
desired = desired.to_s[-1] == "m" ? desired.to_f : desired.to_f * 1000
current == desired
end
def mem_value_eql?(current, desired)
current.try(:iec_60027_2_to_i) == desired.try(:iec_60027_2_to_i)
end
def start_kube_monitor(resource = :pods)
require 'http'
require 'concurrent/atomic/event'
monitor_started = Concurrent::Event.new
thread = Thread.new do
_log.info("Starting new #{resource} monitor thread of #{Thread.list.length} total")
begin
send(:"monitor_#{resource}", monitor_started)
rescue HTTP::ConnectionError => e
_log.error("Exiting #{resource} monitor thread due to [#{e.class.name}]: #{e}")
rescue => e
_log.error("Exiting #{resource} monitor thread after uncaught error")
_log.log_backtrace(e)
ensure
monitor_started.set
end
end
monitor_started.wait
_log.info("Starting new #{resource} monitor thread...Complete")
thread
end
def ensure_kube_monitors_started
[:deployments, :pods].each do |resource|
getter = "#{resource}_monitor_thread"
thread = send(getter)
if thread.nil? || !thread.alive?
if !thread.nil? && thread.status.nil?
dead_thread = thread
send(:"#{getter}=", nil)
_log.info("Waiting for the #{getter} Monitor Thread to exit...")
dead_thread.join
end
send(:"#{getter}=", start_kube_monitor(resource))
end
end
end
def delete_failed_deployments
return unless my_server_is_primary?
failed_deployments.each do |failed|
orchestrator.delete_deployment(failed)
end
end
def orchestrator
@orchestrator ||= ContainerOrchestrator.new
end
def monitor_deployments(monitor_started)
loop do
current_deployments.clear
resource_version = collect_initial(:deployments)
monitor_started.set
watch_for_events(:deployments, resource_version)
end
end
def monitor_pods(monitor_started)
loop do
current_pods.clear
resource_version = collect_initial(:pods)
monitor_started.set
# watch_for_events doesn't return unless an error caused us to break out of it, so we'll start over again
watch_for_events(:pods, resource_version)
end
end
def collect_initial(resource = :pods)
objects = orchestrator.send(:"get_#{resource}")
objects.each { |p| send(:"save_#{resource.to_s.singularize}", p) }
objects.resourceVersion
end
def watch_for_events(resource, resource_version)
orchestrator.send(:"watch_#{resource}", resource_version).each do |event|
case event.type.downcase
when "added", "modified"
send(:"save_#{resource.to_s.singularize}", event.object)
when "deleted"
send(:"delete_#{resource.to_s.singularize}", event.object)
when "error"
if (status = event.object)
# ocp 3 appears to return 'ERROR' watch events with the object containing the 410 code and "Gone" reason like below:
# #<Kubeclient::Resource type="ERROR", object={:kind=>"Status", :apiVersion=>"v1", :metadata=>{}, :status=>"Failure", :message=>"too old resource version: 199900 (27177196)", :reason=>"Gone", :code=>410}>
log_resource_error_event(status.code, status.message, status.reason)
end
break
end
end
end
def log_resource_error_event(code, message, reason)
_log.warn("Restarting watch_for_events due to error: [#{code} #{reason}], [#{message}]")
end
def save_deployment(deployment)
name = deployment.metadata.name
new_hash = Concurrent::Hash.new
new_hash[:spec] = deployment.spec.to_h
current_deployments[name] ||= new_hash
current_deployments[name].merge!(new_hash)
end
def delete_deployment(deployment)
current_deployments.delete(deployment.metadata.name)
end
def save_pod(pod)
return unless pod.status.containerStatuses
ch = Concurrent::Hash.new
ch[:label_name] = pod.metadata.labels.name
ch[:last_state_terminated] = pod.status.containerStatuses.any? { |cs| cs.lastState.terminated }
ch[:container_restarts] = pod.status.containerStatuses.sum { |cs| cs.restartCount.to_i }
ch[:running] = pod.status.phase == "Running" && pod.status.containerStatuses.all? { |cs| cs.ready && cs.started }
name = pod.metadata.name
current_pods[name] ||= ch
current_pods[name].merge!(ch)
end
def delete_pod(pod)
current_pods.delete(pod.metadata.name)
end
def get_pod(pod_name)
orchestrator.get_pods.find { |pod| pod.metadata[:name] == pod_name }
end
end