ManageIQ/floe

View on GitHub
lib/floe/container_runner/docker.rb

Summary

Maintainability
A
3 hrs
Test Coverage
D
66%
# frozen_string_literal: true

module Floe
  class ContainerRunner
    class Docker < Floe::Runner
      include Floe::ContainerRunner::DockerMixin

      DOCKER_COMMAND = "docker"

      def initialize(options = {})
        require "awesome_spawn"
        require "io/wait"
        require "tempfile"

        super

        @network     = options.fetch("network", "bridge")
        @pull_policy = options["pull-policy"]
      end

      def run_async!(resource, env, secrets, context)
        raise ArgumentError, "Invalid resource" unless resource&.start_with?("docker://")

        image          = resource.sub("docker://", "")
        execution_id   = context.execution["Id"]
        runner_context = {}

        if secrets && !secrets.empty?
          runner_context["secrets_ref"] = create_secret(secrets)
        end

        begin
          runner_context["container_ref"] = run_container(image, env, execution_id, runner_context["secrets_ref"], context.logger)
          runner_context
        rescue AwesomeSpawn::CommandResultError => err
          cleanup(runner_context)
          {"Error" => "States.TaskFailed", "Cause" => err.to_s}
        end
      end

      def cleanup(runner_context)
        container_id, secrets_file = runner_context.values_at("container_ref", "secrets_ref")

        delete_container(container_id) if container_id
        delete_secret(secrets_file)    if secrets_file
      end

      def wait(timeout: nil, events: %i[create update delete], &block)
        until_timestamp = Time.now.utc + timeout if timeout

        r, w = IO.pipe

        pid = AwesomeSpawn.run_detached(
          self.class::DOCKER_COMMAND, :err => :out, :out => w, :params => wait_params(until_timestamp)
        )

        w.close

        loop do
          readable_timeout = until_timestamp - Time.now.utc if until_timestamp

          # Wait for our end of the pipe to be readable and if it didn't timeout
          # get the events from stdout
          next if r.wait_readable(readable_timeout).nil?

          # Get all events while the pipe is readable
          notices = []
          while r.ready?
            notice = r.gets

            # If the process has exited `r.gets` returns `nil` and the pipe is
            # always `ready?`
            break if notice.nil?

            event, runner_context = parse_notice(notice)
            next if event.nil? || !events.include?(event)

            notices << [event, runner_context]
          end

          # If we're given a block yield the events otherwise return them
          if block
            notices.each(&block)
          else
            # Terminate the `docker events` process before returning the events
            sigterm(pid)

            return notices
          end

          # Check that the `docker events` process is still alive
          Process.kill(0, pid)
        rescue Errno::ESRCH
          # Break out of the loop if the `docker events` process has exited
          break
        end
      ensure
        r.close
      end

      def status!(runner_context)
        return if runner_context.key?("Error")

        runner_context["container_state"] = inspect_container(runner_context["container_ref"])&.dig("State")
      end

      def running?(runner_context)
        !!runner_context.dig("container_state", "Running")
      end

      def success?(runner_context)
        runner_context.dig("container_state", "ExitCode") == 0
      end

      def output(runner_context)
        return runner_context.slice("Error", "Cause") if runner_context.key?("Error")

        output = docker!("logs", runner_context["container_ref"], :combined_output => true).output
        runner_context["output"] = output
      end

      private

      attr_reader :network

      def run_container(image, env, execution_id, secrets_file, logger)
        params = run_container_params(image, env, execution_id, secrets_file)

        logger.debug("Running #{AwesomeSpawn.build_command_line(self.class::DOCKER_COMMAND, params)}")

        result = docker!(*params)
        result.output.chomp
      end

      def run_container_params(image, env, execution_id, secrets_file)
        params  = ["run"]
        params << :detach
        params += env.map { |k, v| [:e, "#{k}=#{v}"] }
        params << [:e, "_CREDENTIALS=/run/secrets"] if secrets_file
        params << [:pull, @pull_policy] if @pull_policy
        params << [:net, "host"] if @network == "host"
        params << [:label, "execution_id=#{execution_id}"]
        params << [:v, "#{secrets_file}:/run/secrets:z"] if secrets_file
        params << [:name, container_name(image)]
        params << image
      end

      def wait_params(until_timestamp)
        params = ["events", [:format, "{{json .}}"], [:filter, "type=container"], [:since, Time.now.utc.to_i]]
        params << [:until, until_timestamp.to_i] if until_timestamp
        params
      end

      def parse_notice(notice)
        notice = JSON.parse(notice)

        status  = notice["status"]
        event   = docker_event_status_to_event(status)
        running = event != :delete

        name, exit_code, execution_id = notice.dig("Actor", "Attributes")&.values_at("name", "exitCode", "execution_id")

        runner_context = {"container_ref" => name, "container_state" => {"Running" => running, "ExitCode" => exit_code.to_i}}

        [event, {"execution_id" => execution_id, "runner_context" => runner_context}]
      rescue JSON::ParserError
        []
      end

      def docker_event_status_to_event(status)
        case status
        when "create"
          :create
        when "start"
          :update
        when "die", "destroy"
          :delete
        else
          :unkonwn
        end
      end

      def inspect_container(container_id)
        JSON.parse(docker!("inspect", container_id).output).first
      rescue AwesomeSpawn::CommandResultError => err
        raise Floe::ExecutionError.new("Failed to get status for container #{container_id}: #{err}")
      end

      def delete_container(container_id)
        docker!("rm", container_id)
      rescue
        nil
      end

      def delete_secret(secrets_file)
        return unless File.exist?(secrets_file)

        File.unlink(secrets_file)
      rescue
        nil
      end

      def create_secret(secrets)
        secrets_file = Tempfile.new
        secrets_file.write(secrets.to_json)
        secrets_file.close
        secrets_file.path
      end

      def sigterm(pid)
        Process.kill("TERM", pid)
      rescue Errno::ESRCH
        nil
      end

      def global_docker_options
        []
      end

      def docker!(*args, **kwargs)
        params = global_docker_options + args
        AwesomeSpawn.run!(self.class::DOCKER_COMMAND, :params => params, **kwargs)
      end
    end
  end
end