cloudfoundry/dea_ng

View on GitHub
lib/dea/staging/staging_task.rb

Summary

Maintainability
F
3 days
Test Coverage
require 'tempfile'
require 'tmpdir'
require 'yaml'
require 'shellwords'

require 'container/container'

require 'dea/utils/download'
require 'dea/utils/upload'
require 'dea/promise'
require 'dea/task'
require 'dea/env'
require 'dea/staging/admin_buildpack_downloader'
require 'dea/staging/staging_task_workspace'
require 'dea/staging/staging_message'
require 'dea/loggregator'
require 'dea/utils/uri_cleaner'

module Dea
  class StagingTask < Task
    class StagingError < StandardError
      def initialize(msg)
        super("Error staging: #{msg}")
      end
    end

    class StagingTaskStoppedError < StagingError
      def initialize
        super('task stopped')
      end
    end

    class StagingStackNotFoundError < StagingError; end

    attr_reader :bootstrap, :dir_server, :staging_message, :task_id, :droplet_sha1

    def initialize(bootstrap, dir_server, staging_message, buildpacks_in_use, custom_logger=nil)
      super(bootstrap.config, custom_logger)
      @bootstrap = bootstrap
      @dir_server = dir_server
      @staging_message = staging_message
      @task_id = staging_message.task_id
      @buildpacks_in_use = buildpacks_in_use

      logger.user_data[:task_id] = task_id
    end

    def start
      staging_promise = Promise.new do |p|
        resolve_staging_setup
        resolve_staging
        p.deliver
      end

      Promise.resolve(staging_promise) do |error, _|
        begin
          if error
            logger.info('staging.task.failed', error: error, backtrace: error.backtrace)
          else
            logger.info('staging.task.completed')
          end

          unless error
            begin
              resolve_staging_upload
            rescue => e
              logger.info('staging.task.upload-failed', error: e, backtrace: e.backtrace)

              error = e
            end
          end

          begin
            trigger_after_complete(error)
          rescue => e
            logger.warn('staging.task.after_complete-failed', error: e, backtrace: e.backtrace)
          end
        ensure
          promise_destroy.resolve
          @container.close_all_connections if @container
          FileUtils.rm_rf(workspace.workspace_dir)
        end
      end
    end

    def workspace
      @workspace ||= StagingTaskWorkspace.new(
        config['base_dir'],
        staging_message.properties
      )
    end

    def buildpack_manager
      @buildpack_manager ||= BuildpackManager.new(
        workspace.admin_buildpacks_dir,
        staging_message,
        @buildpacks_in_use
      )
    end

    def task_log
      File.read(workspace.staging_log_path) if File.exists?(workspace.staging_log_path)
    end

    def streaming_log_url
      @dir_server.staging_task_file_url_for(task_id, workspace.warden_staging_log)
    end

    def task_info
      File.exists?(workspace.staging_info_path) ? YAML.load_file(workspace.staging_info_path) : {}
    end

    def detected_buildpack
      task_info['detected_buildpack']
    end

    def detected_start_command
      task_info['start_command']
    end

    def procfile
      task_info['effective_procfile']
    end

    def buildpack_path
      task_info['buildpack_path']
    end

    def error_info
      task_info['staging_error']
    end

    def buildpack_key
      staging_message.buildpack_key || buildpack_manager.buildpack_key(buildpack_path)
    end

    def memory_limit_mb
      [(config.minimum_staging_memory_mb).to_i, (staging_message.mem_limit).to_i].max
    end

    def memory_limit_in_bytes
      memory_limit_mb * 1024 * 1024
    end
    alias :used_memory_in_bytes :memory_limit_in_bytes

    def disk_limit_mb
      [(config.minimum_staging_disk_mb).to_i, (staging_message.disk_limit).to_i].max
    end

    def disk_limit_in_bytes
      disk_limit_mb * 1024 * 1024
    end

    def disk_inode_limit
      config.staging_disk_inode_limit
    end

    def stop(&callback)
      stopping_promise = Promise.new do |p|
        logger.info('staging.task.stopped')

        @after_complete_callback = nil # Unregister after complete callback
        promise_stop.resolve if container.handle
        p.deliver
      end

      Promise.resolve(stopping_promise) do |error, _|
        trigger_after_stop(StagingTaskStoppedError.new)
        callback.call(error) unless callback.nil?
      end
    end

    def after_setup_callback(&blk)
      @after_setup_callback = blk
    end

    def trigger_after_setup(error)
      @after_setup_callback.call(error) if @after_setup_callback
    end

    private :trigger_after_setup

    def after_complete_callback(&blk)
      @after_complete_callback = blk
    end

    def trigger_after_complete(error)
      @after_complete_callback.call(error) if @after_complete_callback
    end
    private :trigger_after_complete

    def after_stop_callback(&blk)
      @after_stop_callback = blk
    end

    def trigger_after_stop(error)
      @after_stop_callback.call(error) if @after_stop_callback
    end
    private :trigger_after_stop

    def promise_prepare_staging_log
      Promise.new do |p|
        script = "mkdir -p #{workspace.warden_staged_dir}/logs && touch #{workspace.warden_staging_log}"

        logger.info('staging.task.preparing-log', script: script)

        container.run_script(:app, script)

        p.deliver
      end
    end

    def promise_app_dir
      Promise.new do |p|
        # Some buildpacks seem to make assumption that /app is a non-empty directory
        # See: https://github.com/heroku/heroku-buildpack-python/blob/master/bin/compile#L46
        script = 'mkdir -p /app && touch /app/support_heroku_buildpacks && chown -R vcap:vcap /app'

        logger.info('staging.task.making-app-dir', script: script)

        container.run_script(:app, script, true)

        p.deliver
      end
    end

    def promise_stage
      Promise.new do |p|
        script = staging_command
        logger.debug('staging.task.execute-staging')

        spawn_response = container.spawn(script)
        @warden_job_id = spawn_response.job_id
        bootstrap.snapshot.save

        staging_timer = EM::Timer.new(staging_timeout) do
          logger.error('staging.task.execute-staging.timed-out', timeout: staging_timeout, handle: container.handle)

          Fiber.new do
            begin
              promise_stop(true).resolve
            ensure
              p.fail('Staging in container timed out')
            end
          end.resume
        end

        begin
          container.link_or_raise(@warden_job_id)
          p.deliver
        rescue Container::WardenError => staging_error
          logger.error('staging.task.execute-staging.failed', error: staging_error)
          p.fail(staging_error)
        ensure
          staging_timer.cancel
        end
      end
    end

    def promise_task_log
      Promise.new do |p|
        logger.info('staging.task-log.copying-out', source: workspace.warden_staging_log, destination: URICleaner.clean(workspace.staging_log_path))

        copy_out_request(workspace.warden_staging_log, File.dirname(workspace.staging_log_path))
        p.deliver
      end
    end

    def promise_staging_info
      Promise.new do |p|
        logger.info('staging.task-info.copying-out', source: workspace.warden_staging_info, destination: URICleaner.clean(workspace.staging_info_path))

        copy_out_request(workspace.warden_staging_info, File.dirname(workspace.staging_info_path))
        p.deliver
      end
    end

    def promise_unpack_app
      Promise.new do |p|
        logger.info('staging.task.unpacking-app', destination: URICleaner.clean(workspace.warden_unstaged_dir))

        loggregator_emit_result container.run_script(:app, <<-BASH)
          set -o pipefail
          package_size=`du -h #{workspace.downloaded_app_package_path} | cut -f1`
          echo "-----> Downloaded app package ($package_size)" | tee -a #{workspace.warden_staging_log}
          unzip -q #{workspace.downloaded_app_package_path} -d #{workspace.warden_unstaged_dir}
        BASH

        p.deliver
      end
    end

    def promise_pack_app
      Promise.new do |p|
        logger.info('staging.task.packing-droplet')

        container.run_script(:app, <<-BASH)
          cd #{workspace.warden_staged_dir} &&
          COPYFILE_DISABLE=true tar -czf #{workspace.warden_staged_droplet} .
        BASH
        p.deliver
      end
    end

    def promise_app_download
      Promise.new do |p|
        logger.info('staging.app-download.starting', uri: URICleaner.clean(staging_message.download_uri))

        download_destination = Tempfile.new('app-package-download.tgz')

        Download.new(staging_message.download_uri, download_destination, nil, logger).download! do |error|
          if error
            logger.debug('staging.app-download.failed', duration: p.elapsed_time, error: error, backtrace: error.backtrace)

            p.fail(error)
          else
            File.rename(download_destination.path, workspace.downloaded_app_package_path)
            File.chmod(0744, workspace.downloaded_app_package_path)

            logger.debug('staging.app-download.completed',
                         duration: p.elapsed_time, destination: URICleaner.clean(workspace.downloaded_app_package_path))

            p.deliver
          end
        end
      end
    end

    def promise_log_upload_started
      Promise.new do |p|
        loggregator_emit_result container.run_script(:app, <<-BASH)
          set -o pipefail
          droplet_size=`du -h #{workspace.warden_staged_droplet} | cut -f1`
          echo "-----> Uploading droplet ($droplet_size)" | tee -a #{workspace.warden_staging_log}
        BASH
        p.deliver
      end
    end

    def promise_app_upload
      Promise.new do |p|
        logger.info('staging.droplet-upload.starting',
                    source: workspace.staged_droplet_path, destination: URICleaner.clean(staging_message.upload_uri))

        Upload.new(workspace.staged_droplet_path, staging_message.upload_uri, logger).upload! do |error|
          if error
            logger.info('staging.task.droplet-upload-failed',
                        duration: p.elapsed_time, destination: URICleaner.clean(staging_message.upload_uri), error: error, backtrace: error.backtrace)

            p.fail(error)
          else
            logger.info('staging.task.droplet-upload-completed', duration: p.elapsed_time, destination: URICleaner.clean(staging_message.upload_uri))

            p.deliver
          end
        end
      end
    end

    def promise_buildpack_cache_upload
      Promise.new do |p|
        logger.info('staging.buildpack-cache-upload.starting',
                    source: workspace.staged_buildpack_cache_path, destination: URICleaner.clean(staging_message.buildpack_cache_upload_uri))

        Upload.new(workspace.staged_buildpack_cache_path, staging_message.buildpack_cache_upload_uri, logger).upload! do |error|
          if error
            logger.info('staging.task.buildpack-cache-upload-failed',
                        duration: p.elapsed_time, destination: URICleaner.clean(staging_message.buildpack_cache_upload_uri), error: error, backtrace: error.backtrace)

            p.fail(error)
          else
            logger.info('staging.task.buildpack-cache-upload-completed',
                        duration: p.elapsed_time, destination: URICleaner.clean(staging_message.buildpack_cache_upload_uri))

            p.deliver
          end
        end
      end
    end

    def promise_buildpack_cache_download
      Promise.new do |p|
        logger.info('staging.buildpack-cache-download.starting', uri: URICleaner.clean(staging_message.buildpack_cache_download_uri))

        download_destination = Tempfile.new('buildpack-cache', workspace.tmpdir)

        Download.new(staging_message.buildpack_cache_download_uri, download_destination, nil, logger).download! do |error|
          if error
            logger.debug('staging.buildpack-cache-download.failed',
                         duration: p.elapsed_time, uri: URICleaner.clean(staging_message.buildpack_cache_download_uri), error: error, backtrace: error.backtrace)

          else
            File.rename(download_destination.path, workspace.downloaded_buildpack_cache_path)
            File.chmod(0744, workspace.downloaded_buildpack_cache_path)

            logger.debug('staging.buildpack-cache-download.completed',
                         duration: p.elapsed_time, destination: URICleaner.clean(workspace.downloaded_buildpack_cache_path))
          end

          p.deliver
        end
      end
    end

    def promise_copy_out
      Promise.new do |p|
        logger.info('staging.droplet.copying-out', source: workspace.warden_staged_droplet, destination: URICleaner.clean(workspace.staged_droplet_dir))

        copy_out_request(workspace.warden_staged_droplet, workspace.staged_droplet_dir)

        p.deliver
      end
    end

    def promise_save_droplet
      Promise.new do |p|
        @droplet_sha1 = Digest::SHA1.file(workspace.staged_droplet_path).hexdigest
        bootstrap.droplet_registry[@droplet_sha1].local_copy(workspace.staged_droplet_path) do |error|
          if error
            logger.error('staging.droplet.copy-failed', error: error, backtrace: error.backtrace)

            p.fail
          else
            p.deliver
          end
        end
      end
    end

    def promise_save_buildpack_cache
      Promise.new do |p|
        resolve_and_log(promise_pack_buildpack_cache, 'staging.buildpack-cache.save') do |error, _|
          unless error
            begin
              promise_copy_out_buildpack_cache.resolve
              promise_buildpack_cache_upload.resolve
            rescue => e
              error = e
            end
          end

          if error
            p.fail(error)
          else
            p.deliver
          end
        end
      end
    end

    def promise_pack_buildpack_cache
      Promise.new do |p|
        container.run_script(:app, <<-BASH)
          mkdir -p #{workspace.warden_cache} &&
          cd #{workspace.warden_cache} &&
          COPYFILE_DISABLE=true tar -czf #{workspace.warden_staged_buildpack_cache} .
        BASH
        p.deliver
      end
    end

    def promise_unpack_buildpack_cache
      Promise.new do |p|
        if File.exists?(workspace.downloaded_buildpack_cache_path)
          logger.info('staging.buildpack-cache.unpack', destination: URICleaner.clean(workspace.warden_cache))

          loggregator_emit_result container.run_script(:app, <<-BASH)
          set -o pipefail
          package_size=`du -h #{workspace.downloaded_buildpack_cache_path} | cut -f1`
          echo "-----> Downloaded app buildpack cache ($package_size)" | tee -a #{workspace.warden_staging_log}
          mkdir -p #{workspace.warden_cache}
          tar xfz #{workspace.downloaded_buildpack_cache_path} -C #{workspace.warden_cache}
          BASH
        end

        p.deliver
      end
    end

    def promise_copy_out_buildpack_cache
      Promise.new do |p|
        logger.info('staging.buildpack-cache.copying-out',
                    source: workspace.warden_staged_buildpack_cache, destination: URICleaner.clean(workspace.staged_droplet_dir))

        copy_out_request(workspace.warden_staged_buildpack_cache, workspace.staged_droplet_dir)

        p.deliver
      end
    end

    def path_in_container(path)
      File.join(container.path, 'tmp', 'rootfs', path.to_s) if container.path
    end

    def staging_config
      config['staging']
    end

    def staging_timeout
      (staging_config['max_staging_duration'] || '900').to_f
    end

    def bind_mounts
      [workspace.workspace_dir, workspace.buildpack_dir, workspace.admin_buildpacks_dir].collect do |path|
        {'src_path' => path, 'dst_path' => path}
      end + config['bind_mounts']
    end

    def snapshot_attributes
      logger.debug('snapshot_attributes', properties: staging_message.properties)
      {
        'staging_message' => staging_message.to_hash,
        'warden_container_path' => container.path,
        'warden_job_id' => @warden_job_id,
        'syslog_drain_urls' => syslog_drain_urls,
      }
    end

    def warden_handle
      container.handle
    end

    private

    def staging_command
      env = Env.new(staging_message, self)

      [
        'set -o pipefail;',
        env.exported_environment_variables,
        config['dea_ruby'],
        run_plugin_path,
        workspace.plugin_config_path,
        "| tee -a #{workspace.warden_staging_log}"
      ].join(' ')
    end

    def syslog_drain_urls
      services = staging_message.properties['services'] || []
      services.map { |svc_hash| svc_hash['syslog_drain_url'] }.compact
    end

    def bandwidth_limit
      limit = config.staging_bandwidth_limit
      return nil unless limit

      { rate: limit['rate'], burst: limit['burst'] }
    end

    def resolve_staging_setup
      workspace.prepare(buildpack_manager)
      with_network = false

      stack = staging_message.stack
      rootfs = config.rootfs_path(stack)
      raise StagingStackNotFoundError.new("Stack #{stack} does not exist") if rootfs.nil?

      container.create_container(
        bind_mounts: bind_mounts,
        limit_cpu: staging_config['cpu_limit_shares'],
        byte: disk_limit_in_bytes,
        inode: disk_inode_limit,
        limit_memory: memory_limit_in_bytes,
        setup_inbound_network: with_network,
        egress_rules: staging_message.egress_rules,
        rootfs: rootfs,
        limit_bandwidth: bandwidth_limit,
      )
      promises = [promise_app_download]
      promises << promise_buildpack_cache_download if staging_message.buildpack_cache_download_uri
      Promise.run_in_parallel(*promises)

      promise_update = Promise.new do |p|
        container.update_path_and_ip
        p.deliver
      end
      Promise.run_in_parallel(
        promise_prepare_staging_log,
        promise_app_dir,
        promise_update
      )

    rescue => e
      trigger_after_setup(e)
      raise
    else
      trigger_after_setup(nil)
    end

    def resolve_staging
      Promise.run_serially(
        promise_unpack_app,
        promise_unpack_buildpack_cache,
        promise_stage,
        promise_pack_app,
        promise_copy_out,
        promise_save_droplet,
        promise_log_upload_started
      )
    ensure
      promise_staging_info.resolve
      promise_task_log.resolve
    end

    def resolve_staging_upload
      promise_app_upload.resolve
      promise_save_buildpack_cache.resolve
    end

    def run_plugin_path
      File.join(workspace.buildpack_dir, 'bin/run')
    end

    def loggregator_emit_result(result)
      if (result != nil)
        Dea::Loggregator.staging_emit(staging_message.app_id, result.stdout)
        Dea::Loggregator.staging_emit_error(staging_message.app_id, result.stderr)
      end
      result
    end
  end
end