aws/aws-codedeploy-agent

View on GitHub
lib/instance_agent/plugins/codedeploy/command_poller.rb

Summary

Maintainability
B
4 hrs
Test Coverage
require 'socket'
require 'concurrent'
require 'pathname'
require 'instance_metadata'
require 'instance_agent/agent/base'
require_relative 'deployment_command_tracker'

module InstanceAgent
  module Plugins
    module CodeDeployPlugin
      class CommandPoller < InstanceAgent::Agent::Base

        VERSION = "2013-04-23"

        #Map commands to lifecycle hooks
        DEFAULT_HOOK_MAPPING =
          { "BeforeBlockTraffic"=>["BeforeBlockTraffic"],
            "AfterBlockTraffic"=>["AfterBlockTraffic"],
            "ApplicationStop"=>["ApplicationStop"],
            "BeforeInstall"=>["BeforeInstall"],
            "AfterInstall"=>["AfterInstall"],
            "ApplicationStart"=>["ApplicationStart"],
            "BeforeAllowTraffic"=>["BeforeAllowTraffic"],
            "AfterAllowTraffic"=>["AfterAllowTraffic"],
            "ValidateService"=>["ValidateService"]}

        def initialize
          test_profile = InstanceAgent::Config.config[:codedeploy_test_profile]
          unless ["beta", "gamma"].include?(test_profile.downcase)
            # Remove any user overrides set in the environment.
            # The agent should always pull credentials from the EC2 instance
            # profile or the credentials in the OnPremises config file.
            ENV['AWS_ACCESS_KEY_ID'] = nil
            ENV['AWS_SECRET_ACCESS_KEY'] = nil
            ENV['AWS_CREDENTIAL_FILE'] = nil
          end
          CodeDeployPlugin::OnPremisesConfig.configure
          region = ENV['AWS_REGION'] || InstanceMetadata.region
          @host_identifier = ENV['AWS_HOST_IDENTIFIER'] || InstanceMetadata.host_identifier

          log(:debug, "Configuring deploy control client: Region=#{region.inspect}")
          log(:debug, "Deploy control endpoint override=#{InstanceAgent::Config.config[:deploy_control_endpoint]}")
          log(:debug, "Enable auth policy = #{InstanceAgent::Config.config[:enable_auth_policy]}")

          @deploy_control = InstanceAgent::Plugins::CodeDeployPlugin::CodeDeployControl.new(:region => region, :logger => InstanceAgent::Log, :ssl_ca_directory => ENV['AWS_SSL_CA_DIRECTORY'])
          @deploy_control_client = @deploy_control.get_client

          @plugin = InstanceAgent::Plugins::CodeDeployPlugin::CommandExecutor.new(:hook_mapping => DEFAULT_HOOK_MAPPING)

          @thread_pool = Concurrent::ThreadPoolExecutor.new(
            #TODO: Make these values configurable in agent configuration
            min_threads: 1,
            max_threads: 16,
            max_queue: 0 # unbounded work queue
          )

          log(:debug, "Initializing Host Agent: " +
          "Host Identifier = #{@host_identifier}")
        end

        def validate
          test_profile = InstanceAgent::Config.config[:codedeploy_test_profile]
          unless ["beta", "gamma"].include?(test_profile.downcase)
            log(:debug, "Validating CodeDeploy Plugin Configuration")
            Kernel.abort "Stopping CodeDeploy agent due to SSL validation error." unless @deploy_control.validate_ssl_config
            log(:debug, "CodeDeploy Plugin Configuration is valid")
          end
        end

        # Called during initialization of the child process
        def recover_from_crash?
          begin
            if DeploymentCommandTracker.check_deployment_event_inprogress?() then
              log(:warn, "Deployment tracking file found: #{DeploymentCommandTracker.deployment_dir_path()}. The agent likely restarted while running a customer-supplied script. Failing the lifecycle event.")
              host_command_identifier = DeploymentCommandTracker.most_recent_host_command_identifier()

              log(:info, "Calling PutHostCommandComplete: 'Failed' #{host_command_identifier}")
              @deploy_control_client.put_host_command_complete(
                :command_status => "Failed",
                :diagnostics => {:format => "JSON", :payload => gather_diagnostics_from_failure_after_restart("Failing in-progress lifecycle event after an agent restart.")},
                :host_command_identifier => host_command_identifier)

              DeploymentCommandTracker.clean_ongoing_deployment_dir()
              return true
            end
            # We want to catch-all exceptions so that the child process always can startup succesfully.
          rescue Exception => e
            log(:error, "Exception thrown during restart recovery: #{e}")
            return nil
          end
        end

        def perform
          return unless command = next_command

          #Commands will be executed on a separate thread.
          begin
            @thread_pool.post {
              acknowledge_and_process_command(command)
            }
          rescue Concurrent::RejectedExecutionError
            log(:warn, 'Graceful shutdown initiated, skipping any further polling until agent restarts')
          end
        end

        def graceful_shutdown
          log(:info, "Gracefully shutting down agent child threads now, will wait up to #{ProcessManager::Config.config[:kill_agent_max_wait_time_seconds]} seconds")
          # tell the pool to shutdown in an orderly fashion, allowing in progress work to complete
          @thread_pool.shutdown
          # now wait for all work to complete, wait till the timeout value
          @thread_pool.wait_for_termination ProcessManager::Config.config[:kill_agent_max_wait_time_seconds]
          log(:info, 'All agent child threads have been shut down')
        end

        def acknowledge_and_process_command(command)
          begin
            spec = get_deployment_specification(command)
            return unless acknowledge_command(command, spec)
            process_command(command, spec)
            #Commands that throw an exception will be considered to have failed
          rescue Exception => e
            log(:warn, 'Calling PutHostCommandComplete: "Code Error" ')
            @deploy_control_client.put_host_command_complete(
            :command_status => "Failed",
            :diagnostics => {:format => "JSON", :payload => gather_diagnostics_from_error(e)},
            :host_command_identifier => command.host_command_identifier)
            raise e
          end
        end
        
        def process_command(command, spec)
          log(:debug, "Calling #{@plugin.to_s}.execute_command")
          begin
            deployment_id = InstanceAgent::Plugins::CodeDeployPlugin::DeploymentSpecification.parse(spec).deployment_id
            DeploymentCommandTracker.create_ongoing_deployment_tracking_file(deployment_id, command.host_command_identifier)
            #Successful commands will complete without raising an exception
            @plugin.execute_command(command, spec)
            
            log(:debug, 'Calling PutHostCommandComplete: "Succeeded"')
            @deploy_control_client.put_host_command_complete(
            :command_status => 'Succeeded',
            :diagnostics => {:format => "JSON", :payload => gather_diagnostics()},
            :host_command_identifier => command.host_command_identifier)
            #Commands that throw an exception will be considered to have failed
          rescue ScriptError => e
            log(:debug, 'Calling PutHostCommandComplete: "Code Error" ')
            @deploy_control_client.put_host_command_complete(
            :command_status => "Failed",
            :diagnostics => {:format => "JSON", :payload => gather_diagnostics_from_script_error(e)},
            :host_command_identifier => command.host_command_identifier)
            log(:error, "Error during perform: #{e.class} - #{e.message} - #{e.backtrace.join("\n")}")
            raise e
          rescue Exception => e
            log(:debug, 'Calling PutHostCommandComplete: "Code Error" ')
            @deploy_control_client.put_host_command_complete(
            :command_status => "Failed",
            :diagnostics => {:format => "JSON", :payload => gather_diagnostics_from_error(e)},
            :host_command_identifier => command.host_command_identifier)
            log(:error, "Error during perform: #{e.class} - #{e.message} - #{e.backtrace.join("\n")}")
            raise e
          ensure 
            DeploymentCommandTracker.delete_deployment_command_tracking_file(deployment_id)  
          end
        end
        
        private
        def next_command
          log(:debug, "Calling PollHostCommand:")
          begin
            output = @deploy_control_client.poll_host_command(:host_identifier => @host_identifier)
          rescue Exception => e
            log(:error, "Error polling for host commands: #{e.class} - #{e.message} - #{e.backtrace.join("\n")}")
            raise e
          end
          command = output.host_command
          if command.nil?
            log(:debug, "PollHostCommand: Host Command =  nil")
          else
            log(:debug, "PollHostCommand: "  +
            "Host Identifier = #{command.host_identifier}; "  +
            "Host Command Identifier = #{command.host_command_identifier}; "  +
            "Deployment Execution ID = #{command.deployment_execution_id}; "  +
            "Command Name = #{command.command_name}")
            raise "Host Identifier mismatch: #{@host_identifier} != #{command.host_identifier}" unless @host_identifier.include? command.host_identifier
            raise "Command Name missing" if command.command_name.nil? || command.command_name.empty?
          end
          command
        end

        private
        def get_ack_diagnostics(command, spec)
          is_command_noop = @plugin.is_command_noop?(command.command_name, spec)
          return {:format => "JSON", :payload => {'IsCommandNoop' => is_command_noop}.to_json()}
        end

        private
        def acknowledge_command(command, spec)
          ack_diagnostics = get_ack_diagnostics(command, spec)

          log(:debug, "Calling PutHostCommandAcknowledgement:")
          output =  @deploy_control_client.put_host_command_acknowledgement(
          :diagnostics => ack_diagnostics,
          :host_command_identifier => command.host_command_identifier)
          status = output.command_status
          log(:debug, "Command Status = #{status}")

          if status == "Failed" then
            log(:info, "Received Failed for command #{command.command_name}, checking whether command is a noop...")
            complete_if_noop_command(command)
          end
          true unless status == "Succeeded" || status == "Failed"
        end

        private
        def complete_if_noop_command(command)
          spec = get_deployment_specification(command)

          if @plugin.is_command_noop?(command.command_name, spec) then
            log(:debug, 'Calling PutHostCommandComplete: "Succeeded"')
            @deploy_control_client.put_host_command_complete(
            :command_status => 'Succeeded',
            :diagnostics => {:format => "JSON", :payload => gather_diagnostics("CompletedNoopCommand")},
            :host_command_identifier => command.host_command_identifier)
          end
        end

        private
        def get_deployment_specification(command)
          log(:debug, "Calling GetDeploymentSpecification:")
          output =  @deploy_control_client.get_deployment_specification(
          :deployment_execution_id => command.deployment_execution_id,
          :host_identifier => @host_identifier)
          log(:debug, "GetDeploymentSpecification: " +
          "Deployment System = #{output.deployment_system}")
          raise "Deployment System mismatch: #{@plugin.deployment_system} != #{output.deployment_system}" unless @plugin.deployment_system == output.deployment_system
          raise "Deployment Specification missing" if output.deployment_specification.nil?
          output.deployment_specification.generic_envelope
        end

        private
        def gather_diagnostics_from_script_error(script_error)
          return script_error.to_json
          rescue Exception => e
            return {'error_code' => "Unknown", 'script_name' => script_error.script_name, 'message' => "Attempting minimal diagnostics", 'log' => "Exception #{e.class} occured"}.to_json
        end

        private
        def gather_diagnostics_from_error(error)
          begin
            message = error.message || ""
            raise ScriptError.new(ScriptError::UNKNOWN_ERROR_CODE, "", ScriptLog.new), message
          rescue ScriptError => e
            script_error = e
          end
          gather_diagnostics_from_script_error(script_error)
        end

        private
        def gather_diagnostics_from_failure_after_restart(msg = "")
          begin
            raise ScriptError.new(ScriptError::FAILED_AFTER_RESTART_CODE, "", ScriptLog.new), "Failed: #{msg}"
          rescue ScriptError => e
            script_error = e
          end
          gather_diagnostics_from_script_error(script_error)
        end

        private
        def gather_diagnostics(msg = "")
          begin
            raise ScriptError.new(ScriptError::SUCCEEDED_CODE, "", ScriptLog.new), "Succeeded: #{msg}"
          rescue ScriptError => e
            script_error = e
          end
          gather_diagnostics_from_script_error(script_error)
        end
      end
    end
  end
end