ManageIQ/manageiq-providers-kubernetes

View on GitHub
app/models/manageiq/providers/kubernetes/container_manager/scanning/job.rb

Summary

Maintainability
A
0 mins
Test Coverage
B
87%
autoload(:ImageInspectorClient, 'image-inspector-client')
autoload(:Kubeclient, 'kubeclient')
autoload(:KubeException, 'kubeclient')
 
class ManageIQ::Providers::Kubernetes::ContainerManager::Scanning::Job < Job
PROVIDER_CLASS = ManageIQ::Providers::Kubernetes::ContainerManager
INSPECTOR_IMAGE_TAG = '2.1'.freeze
INSPECTOR_PORT = 8080
DOCKER_SOCKET = '/var/run/docker.sock'
SCAN_CATEGORIES = %w(system software)
POD_POLL_INTERVAL = 10
IMAGES_GUEST_OS = 'Linux'
INSPECTOR_HEALTH_PATH = '/healthz'
ERRCODE_NOTFOUND = 404
IMAGE_INSPECTOR_SA = 'inspector-admin'
INSPECTOR_ADMIN_SECRET_PATH = '/var/run/secrets/kubernetes.io/inspector-admin-secret-'
PROXY_ENV_VARIABLES = %w(no_proxy http_proxy https_proxy)
 
def load_transitions
self.state ||= 'initializing'
{
:initializing => {'initializing' => 'waiting_to_start'},
:start => {'waiting_to_start' => 'pod_create'},
:pod_wait => {'pod_create' => 'waiting_to_scan',
'waiting_to_scan' => 'waiting_to_scan'},
:analyze => {'waiting_to_scan' => 'scanning'},
:data => {'scanning' => 'synchronizing',
'synchronizing' => 'synchronizing'},
:cleanup => {'synchronizing' => 'pod_delete'},
:abort_job => {'*' => 'aborting'},
:cancel_job => {'*' => 'canceling'},
:cancel => {'*' => 'canceling'},
:finish => {'*' => 'finishing'},
:done => {'pod_delete' => 'finished',
'aborting' => 'finished',
'canceling' => 'finished',
'finishing' => 'finished'}
}
end
 
def initializing
# exactly like job.dispatch_start except for storage bits
_log.info "Dispatch Status is 'pending'"
update(:dispatch_status => "pending")
end
 
def start
image = target_entity
return queue_signal(:abort_job, "no image found", "error") unless image
return queue_signal(:abort_job, "cannot analyze non docker images", "error") unless image.docker_id
 
namespace = ::Settings.ems.ems_kubernetes.miq_namespace
 
update!(:options => options.merge(
:docker_image_id => image.docker_id,
:image_full_name => image.full_name,
:image_name => image.name,
:pod_name => "manageiq-img-scan-#{guid[0..4]}",
:pod_port => INSPECTOR_PORT,
:pod_namespace => namespace
))
 
_log.info("Getting inspector-admin secret for pod [#{pod_full_name}]")
begin
inspector_admin_secret_names = inspector_admin_secrets
rescue SocketError, KubeException => e
msg = "getting inspector-admin secret failed"
_log.error("#{msg}: [#{e}]")
return queue_signal(:abort_job, msg, "error")
end
 
pod = pod_definition(inspector_admin_secret_names)
 
_log.info("Creating pod [#{pod_full_name}] to analyze docker image [#{options[:docker_image_id]}] [#{pod.to_json}]")
begin
kubernetes_client.create_pod(pod)
rescue SocketError, KubeException => e
msg = "pod creation for [#{pod_full_name}] failed"
_log.error("#{msg}: [#{e}]")
return queue_signal(:abort_job, msg, "error")
end
 
queue_signal(:pod_wait)
end
 
def poll_pod_wait
queue_signal(:pod_wait, :deliver_on => POD_POLL_INTERVAL.seconds.from_now.utc)
end
 
def pod_wait
_log.info("waiting for pod #{pod_full_name} to be available")
 
begin
statuses = kubernetes_client.get_pod(options[:pod_name], options[:pod_namespace])[:status].try(:containerStatuses)
unless statuses
_log.info("No containerStatuses for pod #{options[:pod_name]}")
return poll_pod_wait
end
ready = statuses[0][:ready]
rescue SocketError, KubeException => e
msg = "unknown access error to pod #{pod_full_name}: [#{e.message}]"
_log.info(msg)
return queue_signal(:abort_job, msg, "error")
end
if ready
_log.info("pod #{pod_full_name} is ready and accessible")
return queue_signal(:analyze)
else
# continue: pod is still not up and running
_log.info("pod #{pod_full_name} is not ready")
return poll_pod_wait
end
end
 
def verify_scanned_image_id(metadata)
actual = metadata.Id
return nil if actual == options[:docker_image_id]
msg = "cannot analyze image %s with id %s: detected ids were %s" % [
options[:image_full_name], options[:docker_image_id][0..11], actual[0..11]]
 
if metadata.RepoDigests
metadata.RepoDigests.each do |repo_digest|
return nil if repo_digest == options[:docker_image_id]
sha_digest = repo_digest.split('@').last
return nil if sha_digest == options[:docker_image_id].split('@').last
msg << ", #{sha_digest[0..11]}"
end
end
 
msg
end
 
def analyze
image = target_entity
return queue_signal(:abort_job, "no image found", "error") unless image
 
_log.info("scanning image #{options[:docker_image_id]}")
 
scan_args = {
:pod_namespace => options[:pod_namespace],
:pod_name => options[:pod_name],
:pod_port => options[:pod_port],
:guest_os => IMAGES_GUEST_OS
}
 
begin
inspector_metadata = image_inspector_client.fetch_metadata
rescue ImageInspectorClient::InspectorClientException => e
msg = "analyzing image-inspector metadata for #{options[:docker_image_id]} failed with error: #{e}"
_log.error(msg)
return queue_signal(:abort_job, msg, 'error')
end
 
if inspector_metadata.ImageAcquireError.present?
msg = "image acquiring error: #{inspector_metadata.ImageAcquireError}"
_log.error(msg)
return queue_signal(:abort_job, msg, 'error')
end
 
verify_error = verify_scanned_image_id(inspector_metadata)
if verify_error
_log.error(verify_error)
return queue_signal(:abort_job, verify_error, 'error')
end
 
if inspector_metadata.OpenSCAP.Status == 'Error'
update!(:options => options.merge(:scan_status => inspector_metadata.OpenSCAP.Status,
:scan_result_message => inspector_metadata.OpenSCAP.ErrorMessage))
else
collect_compliance_data(image)
end
 
image.scan_metadata(SCAN_CATEGORIES,
"taskid" => jobid,
"host" => MiqServer.my_server,
"args" => [YAML.dump(scan_args)])
end
 
def collect_compliance_data(image)
unless OpenscapResult.openscap_available?
_log.warn("OpenSCAP Binary missing, skipping scan")
return nil
end
_log.info "collecting compliance data for #{options[:docker_image_id]}"
openscap_result = image.openscap_result || OpenscapResult.new(:container_image => image)
openscap_result.attach_raw_result(image_inspector_client.fetch_oscap_arf)
openscap_result.save
rescue ImageInspectorClient::InspectorClientException => e
_log.error("collecting compliance data for #{options[:docker_image_id]} with error: #{e}")
end
 
def synchronize
image = target_entity
return queue_signal(:abort_job, "no image found", "error") unless image
 
image.sync_metadata(SCAN_CATEGORIES,
"taskid" => jobid,
"host" => MiqServer.my_server)
end
 
def data(payload)
payload_document = MiqXml.load(payload)
 
if payload_document.root.name.downcase == "summary"
case operation = payload_document.root.first.name.downcase
when "scanmetadata" then synchronize
when "syncmetadata" then queue_signal(:cleanup)
else raise "Unknown operation #{operation}"
end
end
end
 
def delete_pod
return if options[:pod_name].blank?
client = kubernetes_client
 
begin
pod = client.get_pod(options[:pod_name], options[:pod_namespace])
rescue KubeException => e
if e.error_code == ERRCODE_NOTFOUND
_log.info("pod #{pod_full_name} not found, skipping delete")
return
end
# TODO: handle the cleanup at a later time
raise
end
 
pod_jobid = pod.metadata.annotations['manageiq.org/jobid']
 
# If the job id is not matching the pod was not created by this
# job and ManageIQ instance.
if pod_jobid != jobid
_log.info("skipping delete for pod #{pod_full_name} with " \
"job id #{pod_jobid}")
else
_log.info("deleting pod #{pod_full_name}")
begin
client.delete_pod(options[:pod_name], options[:pod_namespace])
rescue SocketError, KubeException => e
_log.info("pod #{pod_full_name} deletion failed: #{e}")
# TODO: handle the cleanup at a later time
end
end
end
 
def cleanup(*args)
unqueue_all_signals
image = target_entity
raise_image_scan_event(image) if image
 
delete_pod
 
set_image_scan_status unless %w(aborting canceling finishing).include?(self.state)
ensure
case self.state
when 'aborting', 'canceling', 'finishing' then message, status = message_status(*args)
else
message = target_entity.last_scan_result.scan_result_message
status = target_entity.last_scan_result.scan_status
end
queue_signal(:done, message, status)
end
 
def raise_image_scan_event(image)
suffix = %w(aborting canceling).include?(state) ? 'abort' : 'complete'
MiqEvent.raise_evm_job_event(image, :type => "scan", :suffix => suffix)
end
 
def set_image_scan_status
return unless target_entity
target_entity.update(
:last_scan_result => ScanResult.create(
:scan_type => "OpenSCAP",
:scan_status => options[:scan_status] == 'Error' ? 'warn' : 'ok',
:scan_result_message => if options[:scan_status] == 'Error'
options[:scan_result_message]
else
'image analysis completed successfully'.freeze
end
)
)
end
 
def done(*args)
# exactly like job.dispatch_finish except for storage bits
_log.info "Dispatch Status is 'finished'"
update(:dispatch_status => "finished")
process_finished(*args)
end
 
alias_method :abort_job, :cleanup
 
def cancel(*_args)
_log.info "Job Canceling"
if self.state != "canceling" # ensure change of states
signal :cancel
else
signal :cancel_job
end
end
alias_method :cancel_job, :cleanup
alias_method :finish, :cleanup
 
def queue_callback(state, msg, _)
if state == "timeout" && self.state != "aborting"
queue_signal(:abort_job, "Job Timeout: #{msg}", "error")
end
end
 
def self.current_job_timeout(timeout_adjustment = 1)
::Settings.container_scanning.scanning_job_timeout.to_f_with_method * timeout_adjustment
end
 
private
 
def message_status(*args)
message, status = args
message = message.to_s.empty? ? "Job interrupted" : message
status = status.to_s.empty? ? "ok" : status
return message, status
end
 
def ext_management_system
@ext_management_system ||= ExtManagementSystem.find(options[:ems_id])
end
 
def kubernetes_client
ext_management_system.connect(:service => PROVIDER_CLASS.ems_type)
end
 
def image_inspector_client
kubeclient = kubernetes_client
 
ImageInspectorClient::Client.new(
pod_proxy_url(kubeclient, ''),
'v1',
:ssl_options => {
:verify_ssl => ext_management_system.verify_ssl_mode,
:cert_store => ext_management_system.ssl_cert_store
},
:auth_options => kubeclient.auth_options,
:http_proxy_uri => kubeclient.http_proxy_uri
)
end
 
def queue_options
{
:class_name => "Job",
:instance_id => id,
:method_name => "signal",
:priority => MiqQueue::HIGH_PRIORITY,
:role => "smartstate",
:task_id => guid,
:zone => zone
}
end
 
def unqueue_all_signals
MiqQueue.unqueue(queue_options)
end
 
def queue_signal(*args, deliver_on: nil)
MiqQueue.put_unless_exists(**queue_options, :args => args, :deliver_on => deliver_on) do |_msg, find_options|
find_options.merge(
:miq_callback => {
:class_name => self.class.to_s,
:instance_id => id,
:method_name => :queue_callback
}
)
end
end
 
def pod_proxy_url(client, path = "")
# TODO: change proxy_url in kubeclient to return URI
pod_proxy = client.proxy_url(:pod,
options[:pod_name],
options[:pod_port],
options[:pod_namespace])
URI.parse(pod_proxy + path)
end
 
def pod_full_name
"#{options[:pod_namespace]}/#{options[:pod_name]}"
end
 
def inspector_admin_secrets
kubeclient = kubernetes_client
begin
inspector_sa = kubeclient.get_service_account(IMAGE_INSPECTOR_SA, options[:pod_namespace])
Use `filter_map` instead.
return inspector_sa.try(:imagePullSecrets).to_a.collect { |sec| sec.try(:name) }.compact.uniq
rescue KubeException => e
raise e unless e.error_code == ERRCODE_NOTFOUND
_log.warn("Service Account #{IMAGE_INSPECTOR_SA} does not exist.")
end
return nil
end
 
def ems_image_inspector_options
@provider_options ||= ext_management_system.options.try(:fetch_path, :image_inspector_options) || {}
end
 
def pod_definition(inspector_admin_secret_names)
pod_def = {
:apiVersion => "v1",
:kind => "Pod",
:metadata => {
:name => options[:pod_name],
:namespace => options[:pod_namespace],
:labels => {
'name' => options[:pod_name],
'manageiq.org' => "true"
},
:annotations => {
# in case hostname is not set and options[:miq_server_host] is nil, change ""
'manageiq.org/hostname' => options[:miq_server_host] || "unknown",
'manageiq.org/guid' => options[:miq_server_guid],
'manageiq.org/image' => options[:image_full_name],
'manageiq.org/jobid' => jobid,
}
},
:spec => {
# A hack to smuggle at least partial info which image was scanned into the
# OpenSCAP report - determines the "target name" in the report.
# Must be lowercase and valid DNS RFC-1123 label up to 63 chars or kubernetes
# won't run the pod.
:hostname => options[:image_name].match(/(?:.*\/)*(.*)$/).captures[0]
.downcase.tr("^a-z0-9", "").truncate(63, :omission => ""),
:restartPolicy => 'Never',
:containers => [
{
:name => "image-inspector",
:image => inspector_image,
:imagePullPolicy => "Always",
:command => [
"/usr/bin/image-inspector",
"--chroot",
"--image=#{options[:image_full_name]}",
"--scan-type=openscap",
"--serve=0.0.0.0:#{options[:pod_port]}"
],
:ports => [{:containerPort => options[:pod_port]}],
:securityContext => {:privileged => true},
:volumeMounts => [
{
:mountPath => DOCKER_SOCKET,
:name => "docker-socket"
}
],
:env => inspector_proxy_env_variables,
:readinessProbe => {
"initialDelaySeconds" => 15,
"periodSeconds" => 5,
"httpGet" => {
"path" => "/healthz",
"port" => options[:pod_port]
}
}
}
],
:volumes => [
{
:name => "docker-socket",
:hostPath => {:path => DOCKER_SOCKET}
}
]
}
}
 
inspector_admin_secret_names.each do |secret_name|
add_secret_to_pod_def(pod_def, secret_name)
end
add_cve_url(pod_def)
Kubeclient::Resource.new(pod_def)
end
 
def add_secret_to_pod_def(pod_def, secret_name)
pod_def[:spec][:containers][0][:command].append("--dockercfg=" + INSPECTOR_ADMIN_SECRET_PATH +
secret_name + "/.dockercfg")
pod_def[:spec][:containers][0][:volumeMounts].append(
:name => "inspector-admin-secret-" + secret_name,
:mountPath => INSPECTOR_ADMIN_SECRET_PATH + secret_name,
:readOnly => true)
pod_def[:spec][:volumes].append(
:name => "inspector-admin-secret-" + secret_name,
:secret => {:secretName => secret_name})
end
 
def inspector_image
registry = ems_image_inspector_options.fetch_path(:registry) || ::Settings.ems.ems_kubernetes.image_inspector_registry
repo = ems_image_inspector_options.fetch_path(:repository) || ::Settings.ems.ems_kubernetes.image_inspector_repository
tag = ems_image_inspector_options.fetch_path(:image_tag) || INSPECTOR_IMAGE_TAG
"#{registry}/#{repo}:#{tag}"
end
 
def inspector_proxy_env_variables
PROXY_ENV_VARIABLES.each_with_object([]) do |var_name, env|
next unless ems_image_inspector_options.key?(var_name.to_sym)
var_value = ems_image_inspector_options[var_name.to_sym]
env << {:name => var_name.upcase,
:value => var_value}
end
end
 
def add_cve_url(pod_def)
cve_url = ems_image_inspector_options.fetch_path(:cve_url) || ::Settings.ems.ems_kubernetes.image_inspector_cve_url
pod_def[:spec][:containers][0][:command].append("--cve-url=#{cve_url}") unless cve_url.blank?
end
end