ManageIQ/manageiq

View on GitHub
app/models/miq_server/worker_management/kubernetes.rb

Summary

Maintainability
A
1 hr
Test Coverage
B
81%
class MiqServer::WorkerManagement::Kubernetes < MiqServer::WorkerManagement
  class_attribute :current_pods
  self.current_pods = Concurrent::Hash.new

  class_attribute :current_deployments
  self.current_deployments = Concurrent::Hash.new

  attr_accessor :deployments_monitor_thread, :pods_monitor_thread

  def sync_from_system
    # All miq_server instances have to reside on the same Kubernetes cluster, so
    # we only have to sync the list of pods and deployments once
    ensure_kube_monitors_started if my_server_is_primary?

    # Before syncing the workers check for any orphaned worker rows that don't have
    # a current pod and delete them
    cleanup_orphaned_worker_rows

    # Update worker deployments with updated settings such as cpu/memory limits
    sync_deployment_settings
  end

  def sync_starting_workers
    starting = MiqWorker.find_all_starting

    # Get a list of pods that aren't currently assigned to MiqWorker records
    pods_without_workers = current_pods.keys - MiqWorker.server_scope.pluck(:system_uid).compact

    # Non-rails workers cannot set their own miq_worker record to started once they
    # have finished initializing.  Check for any starting non-rails workers whose
    # pod is running and mark the miq_worker as started.
    starting.reject(&:rails_worker?).each do |worker|
      # If the current worker doesn't have a system_uid assigned then find the first
      # pod available for our worker type and link them up.
      if worker.system_uid.nil?
        system_uid = pods_without_workers.detect { |pod_name| pod_name.start_with?(worker.worker_deployment_name) }
        if system_uid
          # We have found a pod for the current worker record so remove the pod from
          # the list of pods without workers and set the pod name as the system_uid
          # for the current worker record.
          pods_without_workers.delete(system_uid)
          worker.update!(:system_uid => system_uid)
        else
          # If we haven't found a pod for this worker record then we need to check
          # whether it has been starting for too long and should be marked as
          # not responding.
          stop_worker(worker, MiqServer::WorkerManagement::NOT_RESPONDING) if exceeded_heartbeat_threshold?(worker)
          # Without a valid system_uid we cannot run any further logic in this
          # loop.
          next
        end
      end

      worker_pod = current_pods[worker.system_uid]
      next if worker_pod.nil?

      worker.update!(:status => MiqWorker::STATUS_STARTED) if worker_pod[:running]
    end

    starting.reload
  end

  def sync_stopping_workers
    stopping = MiqWorker.find_all_stopping

    stopping.reject(&:rails_worker?).each do |worker|
      next if current_pods.key?(worker[:system_uid])

      worker.update!(:status => MiqWorker::STATUS_STOPPED)
    end

    stopping.reload
  end

  def enough_resource_to_start_worker?(_worker_class)
    true
  end

  def cleanup_orphaned_worker_rows
    unless current_pods.empty?
      # Any worker rows which have a system_uid that is not in the list of
      # current pod names, and is not starting (aka hasn't had a system_uid set
      # yet) should be deleted.
      orphaned_rows = miq_workers.where.not(:system_uid => current_pods.keys)
                                 .where.not(:status => MiqWorker::STATUSES_STARTING)
      unless orphaned_rows.empty?
        _log.warn("Removing orphaned worker rows without corresponding pods: #{orphaned_rows.collect(&:system_uid).inspect}")
        orphaned_rows.destroy_all
      end
    end
  end

  def cleanup_failed_workers
    super

    delete_failed_deployments
  end

  def failed_deployments(restart_count = 5)
    # TODO: This logic might flag deployments that are hitting memory/cpu limits or otherwise not really 'failed'
    current_pods.values.select { |h| h[:last_state_terminated] && h.fetch(:container_restarts, 0) > restart_count }.pluck(:label_name).uniq
  end

  def sync_deployment_settings
    checked_deployments = Set.new
    miq_workers.each do |worker|
      next if checked_deployments.include?(worker.worker_deployment_name)

      if deployment_resource_constraints_changed?(worker)
        _log.info("Constraints changed, patching deployment: [#{worker.worker_deployment_name}]")

        begin
          worker.patch_deployment
        rescue => err
          _log.warn("Failure patching deployment: [#{worker.worker_deployment_name}] for worker: id: [#{worker.id}], system_uid: [#{worker.system_uid}]. Error: [#{err}]... skipping")
          next
        end
      end
      checked_deployments << worker.worker_deployment_name
    end
  end

  def deployment_resource_constraints_changed?(worker)
    return false unless ::Settings.server.worker_monitor.enforce_resource_constraints

    container = current_deployments.fetch_path(worker.worker_deployment_name, :spec, :template, :spec, :containers).try(:first)
    current_constraints = container.try(:fetch, :resources, nil) || {}
    desired_constraints = worker.resource_constraints
    constraints_changed?(current_constraints, desired_constraints)
  end

  def constraints_changed?(current, desired)
    if current.present? && desired.present?
      !cpu_value_eql?(current.fetch_path(:requests, :cpu), desired.fetch_path(:requests, :cpu)) ||
        !cpu_value_eql?(current.fetch_path(:limits, :cpu), desired.fetch_path(:limits, :cpu)) ||
        !mem_value_eql?(current.fetch_path(:requests, :memory), desired.fetch_path(:requests, :memory)) ||
        !mem_value_eql?(current.fetch_path(:limits, :memory), desired.fetch_path(:limits, :memory))
    else
      # current, no desired    => changed
      # no current, desired    => changed
      # no current, no desired => unchanged
      current.blank? ^ desired.blank?
    end
  end

  private

  # In podified there is only one "primary" miq_server whose zone is "default", the
  # other miq_server instances are simply to allow for additional zones
  def my_server_is_primary?
    my_server.zone&.name == "default"
  end

  def cpu_value_eql?(current, desired)
    # Convert to millicores if not already converted: "1" -> 1000; "1000m" -> 1000
    current = current.to_s[-1] == "m" ? current.to_f : current.to_f * 1000
    desired = desired.to_s[-1] == "m" ? desired.to_f : desired.to_f * 1000
    current == desired
  end

  def mem_value_eql?(current, desired)
    current.try(:iec_60027_2_to_i) == desired.try(:iec_60027_2_to_i)
  end

  def start_kube_monitor(resource = :pods)
    require 'http'
    require 'concurrent/atomic/event'

    monitor_started = Concurrent::Event.new

    thread = Thread.new do
      _log.info("Starting new #{resource} monitor thread of #{Thread.list.length} total")
      begin
        send(:"monitor_#{resource}", monitor_started)
      rescue HTTP::ConnectionError => e
        _log.error("Exiting #{resource} monitor thread due to [#{e.class.name}]: #{e}")
      rescue => e
        _log.error("Exiting #{resource} monitor thread after uncaught error")
        _log.log_backtrace(e)
      ensure
        monitor_started.set
      end
    end

    monitor_started.wait

    _log.info("Starting new #{resource} monitor thread...Complete")

    thread
  end

  def ensure_kube_monitors_started
    [:deployments, :pods].each do |resource|
      getter = "#{resource}_monitor_thread"
      thread = send(getter)
      if thread.nil? || !thread.alive?
        if !thread.nil? && thread.status.nil?
          dead_thread = thread
          send(:"#{getter}=", nil)
          _log.info("Waiting for the #{getter} Monitor Thread to exit...")
          dead_thread.join
        end

        send(:"#{getter}=", start_kube_monitor(resource))
      end
    end
  end

  def delete_failed_deployments
    return unless my_server_is_primary?

    failed_deployments.each do |failed|
      orchestrator.delete_deployment(failed)
    end
  end

  def orchestrator
    @orchestrator ||= ContainerOrchestrator.new
  end

  def monitor_deployments(monitor_started)
    loop do
      current_deployments.clear
      resource_version = collect_initial(:deployments)

      monitor_started.set

      watch_for_events(:deployments, resource_version)
    end
  end

  def monitor_pods(monitor_started)
    loop do
      current_pods.clear
      resource_version = collect_initial(:pods)

      monitor_started.set

      # watch_for_events doesn't return unless an error caused us to break out of it, so we'll start over again
      watch_for_events(:pods, resource_version)
    end
  end

  def collect_initial(resource = :pods)
    objects = orchestrator.send(:"get_#{resource}")
    objects.each { |p| send(:"save_#{resource.to_s.singularize}", p) }
    objects.resourceVersion
  end

  def watch_for_events(resource, resource_version)
    orchestrator.send(:"watch_#{resource}", resource_version).each do |event|
      case event.type.downcase
      when "added", "modified"
        send(:"save_#{resource.to_s.singularize}", event.object)
      when "deleted"
        send(:"delete_#{resource.to_s.singularize}", event.object)
      when "error"
        if (status = event.object)
          # ocp 3 appears to return 'ERROR' watch events with the object containing the 410 code and "Gone" reason like below:
          # #<Kubeclient::Resource type="ERROR", object={:kind=>"Status", :apiVersion=>"v1", :metadata=>{}, :status=>"Failure", :message=>"too old resource version: 199900 (27177196)", :reason=>"Gone", :code=>410}>
          log_resource_error_event(status.code, status.message, status.reason)
        end

        break
      end
    end
  end

  def log_resource_error_event(code, message, reason)
    _log.warn("Restarting watch_for_events due to error: [#{code} #{reason}], [#{message}]")
  end

  def save_deployment(deployment)
    name = deployment.metadata.name
    new_hash = Concurrent::Hash.new
    new_hash[:spec] = deployment.spec.to_h
    current_deployments[name] ||= new_hash
    current_deployments[name].merge!(new_hash)
  end

  def delete_deployment(deployment)
    current_deployments.delete(deployment.metadata.name)
  end

  def save_pod(pod)
    return unless pod.status.containerStatuses

    ch = Concurrent::Hash.new
    ch[:label_name]            = pod.metadata.labels.name
    ch[:last_state_terminated] = pod.status.containerStatuses.any? { |cs| cs.lastState.terminated }
    ch[:container_restarts]    = pod.status.containerStatuses.sum { |cs| cs.restartCount.to_i }
    ch[:running]               = pod.status.phase == "Running" && pod.status.containerStatuses.all? { |cs| cs.ready && cs.started }

    name = pod.metadata.name
    current_pods[name] ||= ch
    current_pods[name].merge!(ch)
  end

  def delete_pod(pod)
    current_pods.delete(pod.metadata.name)
  end

  def get_pod(pod_name)
    orchestrator.get_pods.find { |pod| pod.metadata[:name] == pod_name }
  end
end