lib/floe/container_runner/docker.rb
# frozen_string_literal: true
module Floe
class ContainerRunner
class Docker < Floe::Runner
include Floe::ContainerRunner::DockerMixin
DOCKER_COMMAND = "docker"
def initialize(options = {})
require "awesome_spawn"
require "io/wait"
require "tempfile"
super
@network = options.fetch("network", "bridge")
@pull_policy = options["pull-policy"]
end
def run_async!(resource, env, secrets, context)
raise ArgumentError, "Invalid resource" unless resource&.start_with?("docker://")
image = resource.sub("docker://", "")
execution_id = context.execution["Id"]
runner_context = {}
if secrets && !secrets.empty?
runner_context["secrets_ref"] = create_secret(secrets)
end
begin
runner_context["container_ref"] = run_container(image, env, execution_id, runner_context["secrets_ref"], context.logger)
runner_context
rescue AwesomeSpawn::CommandResultError => err
cleanup(runner_context)
{"Error" => "States.TaskFailed", "Cause" => err.to_s}
end
end
def cleanup(runner_context)
container_id, secrets_file = runner_context.values_at("container_ref", "secrets_ref")
delete_container(container_id) if container_id
delete_secret(secrets_file) if secrets_file
end
def wait(timeout: nil, events: %i[create update delete], &block)
until_timestamp = Time.now.utc + timeout if timeout
r, w = IO.pipe
pid = AwesomeSpawn.run_detached(
self.class::DOCKER_COMMAND, :err => :out, :out => w, :params => wait_params(until_timestamp)
)
w.close
loop do
readable_timeout = until_timestamp - Time.now.utc if until_timestamp
# Wait for our end of the pipe to be readable and if it didn't timeout
# get the events from stdout
next if r.wait_readable(readable_timeout).nil?
# Get all events while the pipe is readable
notices = []
while r.ready?
notice = r.gets
# If the process has exited `r.gets` returns `nil` and the pipe is
# always `ready?`
break if notice.nil?
event, runner_context = parse_notice(notice)
next if event.nil? || !events.include?(event)
notices << [event, runner_context]
end
# If we're given a block yield the events otherwise return them
if block
notices.each(&block)
else
# Terminate the `docker events` process before returning the events
sigterm(pid)
return notices
end
# Check that the `docker events` process is still alive
Process.kill(0, pid)
rescue Errno::ESRCH
# Break out of the loop if the `docker events` process has exited
break
end
ensure
r.close
end
def status!(runner_context)
return if runner_context.key?("Error")
runner_context["container_state"] = inspect_container(runner_context["container_ref"])&.dig("State")
end
def running?(runner_context)
!!runner_context.dig("container_state", "Running")
end
def success?(runner_context)
runner_context.dig("container_state", "ExitCode") == 0
end
def output(runner_context)
return runner_context.slice("Error", "Cause") if runner_context.key?("Error")
output = docker!("logs", runner_context["container_ref"], :combined_output => true).output
runner_context["output"] = output
end
private
attr_reader :network
def run_container(image, env, execution_id, secrets_file, logger)
params = run_container_params(image, env, execution_id, secrets_file)
logger.debug("Running #{AwesomeSpawn.build_command_line(self.class::DOCKER_COMMAND, params)}")
result = docker!(*params)
result.output.chomp
end
def run_container_params(image, env, execution_id, secrets_file)
params = ["run"]
params << :detach
params += env.map { |k, v| [:e, "#{k}=#{v}"] }
params << [:e, "_CREDENTIALS=/run/secrets"] if secrets_file
params << [:pull, @pull_policy] if @pull_policy
params << [:net, "host"] if @network == "host"
params << [:label, "execution_id=#{execution_id}"]
params << [:v, "#{secrets_file}:/run/secrets:z"] if secrets_file
params << [:name, container_name(image)]
params << image
end
def wait_params(until_timestamp)
params = ["events", [:format, "{{json .}}"], [:filter, "type=container"], [:since, Time.now.utc.to_i]]
params << [:until, until_timestamp.to_i] if until_timestamp
params
end
def parse_notice(notice)
notice = JSON.parse(notice)
status = notice["status"]
event = docker_event_status_to_event(status)
running = event != :delete
name, exit_code, execution_id = notice.dig("Actor", "Attributes")&.values_at("name", "exitCode", "execution_id")
runner_context = {"container_ref" => name, "container_state" => {"Running" => running, "ExitCode" => exit_code.to_i}}
[event, {"execution_id" => execution_id, "runner_context" => runner_context}]
rescue JSON::ParserError
[]
end
def docker_event_status_to_event(status)
case status
when "create"
:create
when "start"
:update
when "die", "destroy"
:delete
else
:unkonwn
end
end
def inspect_container(container_id)
JSON.parse(docker!("inspect", container_id).output).first
rescue AwesomeSpawn::CommandResultError => err
raise Floe::ExecutionError.new("Failed to get status for container #{container_id}: #{err}")
end
def delete_container(container_id)
docker!("rm", container_id)
rescue
nil
end
def delete_secret(secrets_file)
return unless File.exist?(secrets_file)
File.unlink(secrets_file)
rescue
nil
end
def create_secret(secrets)
secrets_file = Tempfile.new
secrets_file.write(secrets.to_json)
secrets_file.close
secrets_file.path
end
def sigterm(pid)
Process.kill("TERM", pid)
rescue Errno::ESRCH
nil
end
def global_docker_options
[]
end
def docker!(*args, **kwargs)
params = global_docker_options + args
AwesomeSpawn.run!(self.class::DOCKER_COMMAND, :params => params, **kwargs)
end
end
end
end