cloudfoundry/dea_ng

View on GitHub
lib/container/container.rb

Summary

Maintainability
B
4 hrs
Test Coverage
require 'em/warden/client'
require 'dea/utils/egress_rules_mapper'
require "dea/loggregator"

class Container
  class WardenError < StandardError
    attr_reader :result

    def initialize(message, response=nil)
      super(message)
      @result = response
    end
  end

  BIND_MOUNT_MODE_MAP = {
    'ro' => ::Warden::Protocol::CreateRequest::BindMount::Mode::RO,
    'rw' => ::Warden::Protocol::CreateRequest::BindMount::Mode::RW,
  }

  attr_accessor :handle
  attr_reader :path, :host_ip, :container_ip, :network_ports

  def initialize(client_provider)
    @client_provider = client_provider
    @path = nil
    @network_ports = {}
  end

  def update_path_and_ip
    raise ArgumentError, 'container handle must not be nil' unless handle

    response = call(:info, ::Warden::Protocol::InfoRequest.new(handle: handle))

    raise RuntimeError, 'container path is not available' unless response.container_path
    @path = response.container_path
    @host_ip = response.host_ip
    @container_ip = response.container_ip

    response
  end

  def get_new_warden_net_in
    call(:app, ::Warden::Protocol::NetInRequest.new(handle: handle))
  end

  def call_with_retry(name, request)
    count = 0
    response = nil

    begin
      response = call(name, request)
    rescue ::EM::Warden::Client::ConnectionError => error
      count += 1
      logger.warn("Request failed: #{request.inspect}, retrying ##{count}.")
      logger.error(error)
      retry
    end

    if count > 0
      logger.debug("Request succeeded after #{count} retries: #{request.inspect}")
    end
    response
  end

  def run_script(name, script, privileged=false, discard_output=false, log_tag=nil)
    request = ::Warden::Protocol::RunRequest.new(handle: handle,
                                                 script: script,
                                                 privileged: privileged,
                                                 discard_output: discard_output,
                                                 log_tag: log_tag)

    response = call(name, request)
    if response.exit_status > 0
      data = {
          script: script,
          exit_status: response.exit_status,
          stdout: response.stdout,
          stderr: response.stderr,
      }
      logger.warn('%s exited with status %d with data %s' % [script.inspect, response.exit_status, data.inspect])
      raise WardenError.new("Script exited with status #{response.exit_status}", response)
    else
      response
    end
  end

  def spawn(script, resource_limits = nil)
    spawn_params = {
        handle: handle,
        script: script,
        discard_output: true
    }
    spawn_params[:rlimits] = resource_limits if resource_limits

    call(:app, ::Warden::Protocol::SpawnRequest.new(spawn_params))
  end

  def resource_limits(file_descriptor_limit, process_limit)
    ::Warden::Protocol::ResourceLimits.new(nofile: file_descriptor_limit, nproc: process_limit)
  end

  def destroy!
    with_em do
      begin
        call_with_retry(:app, ::Warden::Protocol::DestroyRequest.new(handle: handle))
      rescue ::EM::Warden::Client::Error => error
        logger.warn("Error destroying container: #{error.message}")
      end

      @handle = nil
    end
  end

  def create_container(params)
    [:bind_mounts, :limit_cpu, :byte, :inode, :limit_memory, :setup_inbound_network, :rootfs].each do |param|
      raise ArgumentError, "expecting #{param.to_s} parameter to create container" if params[param].nil?
    end

    with_em do
      new_container_with_bind_mounts_and_rootfs(params[:bind_mounts], params[:rootfs])
      limit_cpu(params[:limit_cpu])
      limit_disk(byte: params[:byte], inode: params[:inode])
      limit_memory(params[:limit_memory])
      limit_bandwidth(params[:limit_bandwidth]) if params[:limit_bandwidth]
      setup_inbound_network if params[:setup_inbound_network]
      setup_egress_rules(params[:egress_rules])
    end
  end

  def setup_egress_rules(rules)
    logger.debug("setting up egress rules: #{rules}")
    ::EgressRulesMapper.new(rules, handle).map_to_warden_rules.each do |request|
      response = call(:app, request)
    end
  end

  def new_container_with_bind_mounts_and_rootfs(bind_mounts, rootfs)
    with_em do
      bind_mount_requests =
          bind_mounts.map do |bm|
            src_path = bm['src_path']
            dst_path = bm['dst_path'] || src_path
            mode_key = bm['mode'] || 'ro'

            bind_mount_params = {
                src_path: src_path,
                dst_path: dst_path,
                mode: BIND_MOUNT_MODE_MAP[mode_key]
            }

            ::Warden::Protocol::CreateRequest::BindMount.new(bind_mount_params)
          end

      response = call(:app, ::Warden::Protocol::CreateRequest.new(bind_mounts: bind_mount_requests, rootfs: rootfs))

      @handle = response.handle
    end
  end

  def close_all_connections
    @client_provider.close_all
  end

  def setup_inbound_network
    response = call(:app, ::Warden::Protocol::NetInRequest.new(handle: handle))
    network_ports['host_port'] = response.host_port
    network_ports['container_port'] = response.container_port
  end

  def info
    call(:app_info, ::Warden::Protocol::InfoRequest.new(handle: handle))
  end

  def list
    call(:list, ::Warden::Protocol::ListRequest.new)
  end

  def link(job_id)
    call_with_retry(:link, ::Warden::Protocol::LinkRequest.new(handle: handle, job_id: job_id))
  end

  def link_or_raise(job_id)
    response = link(job_id)
    if response.exit_status > 0
      raise WardenError.new("Script exited with status #{response.exit_status}", response)
    else
      response
    end
  end

  def call(name, request)
    start_time_in_ms = (Time.now.to_f * 1_000).to_i
    client(name).call(request)
  rescue => e
    emit_warden_failure
    raise e
  ensure
    elapsed_time = (Time.now.to_f * 1_000).to_i - start_time_in_ms
    emit_warden_response_time(elapsed_time)
  end

  def stream(request, &blk)
    client(:app).stream(request, &blk)
  end

  def client(name)
    @client_provider.get(name)
  end

  def limit_cpu(shares)
    call(:app, ::Warden::Protocol::LimitCpuRequest.new(handle: handle, limit_in_shares: shares))
  end

  def limit_disk(params)
    request_params = { handle: handle }
    request_params[:byte] = params[:byte] unless params[:byte].nil?
    request_params[:inode] = params[:inode] unless params[:inode].nil?

    call(:app, ::Warden::Protocol::LimitDiskRequest.new(request_params))
  end

  def limit_memory(bytes)
    call(:app, ::Warden::Protocol::LimitMemoryRequest.new(handle: handle, limit_in_bytes: bytes))
  end

  def limit_bandwidth(params)
    request_params = { handle: handle, rate: params[:rate], burst: params[:burst] }
    call(:app, ::Warden::Protocol::LimitBandwidthRequest.new(request_params))
  end

  private

  def with_em(&blk)
    if EM.reactor_running?
      blk.call
    else
      EM.run do
        Fiber.new do
          begin
            blk.call
          ensure
            EM.stop
          end
        end.resume
      end
    end
  end

  def emit_warden_response_time(response_time_in_ms)
    Dea::Loggregator.emit_counter("total_warden_response_time_in_ms", response_time_in_ms)
    Dea::Loggregator.emit_counter("warden_request_count", 1)
  end

  def emit_warden_failure
    Dea::Loggregator.emit_counter("warden_error_response_count", 1)
  end
end