app/models/manageiq/providers/amazon/agent_coordinator.rb
require 'yaml'
require 'open3'
require 'tempfile'
require 'linux_admin'
require 'awesome_spawn'
class ManageIQ::Providers::Amazon::AgentCoordinator
include Vmdb::Logging
include ScanningMixin
attr_accessor :ems, :deploying
SSA_LABEL = "smartstate".freeze
WORK_DIR = "/opt/ssa_container".freeze
def initialize(ems)
require 'amazon_ssa_support'
@ems = ems
# List of active agent ids
@alive_agent_ids = []
# List of all agent ids, include those in power off state.
@agent_ids = []
@deploying = false
end
def ec2
@ec2 ||= ems.connect(:service => 'EC2')
end
def sqs
@sqs ||= ems.connect(:service => 'SQS')
end
def s3
@s3 ||= ems.connect(:service => 'S3')
end
def iam
@iam ||= ems.connect(:service => 'IAM')
end
def alive_agent_ids(interval = 180)
@alive_agent_ids = agent_ids.select { |id| agent_alive?(id, interval) }
end
def request_queue_empty?
messages_in_queue(request_queue).zero?
end
def reply_queue_empty?
messages_in_queue(reply_queue).zero?
end
def deploying?
@deploying
end
def startup_agent
agent_ids.empty? ? deploy_agent : activate_agents
rescue => err
_log.error("No agent is set up to process requests: #{err.message}")
cleanup_requests(err.message)
end
def cleanup_requests(message)
_log.info("Cleaning up outstanding requests due to Agent deployment error")
if request_queue_empty?
_log.debug("No requests visible for provider #{ems.name}")
else
@ssaq = ssa_queue
_log.debug("Getting requests for #{ems.name}")
@ssaq.request_loop do |request|
begin
_log.debug("Request for #{ems.name}: #{request}")
clean_request(request, message)
rescue => err
_log.error("Error #{err} cleaning requests for #{ems.name}. Continuing.")
next
end
end
end
end
def clean_request(request, message)
@ssaq.delete_request(request)
ost = OpenStruct.new
ost.jobid = request[:job_id]
job = Job.find_by(:id =>ost.jobid)
raise "Unable to clean request for job with id #{ost.jobid}" if job.nil?
target_id = job.target_id
vm = VmOrTemplate.find(target_id)
ost.taskid = job.guid
unless vm.kind_of?(ManageIQ::Providers::Amazon::CloudManager::Vm) ||
vm.kind_of?(ManageIQ::Providers::Amazon::CloudManager::Template)
if vm.nil?
error = "Vm for Job #{ost.jobid} not found"
else
error = "Vm #{vm.name} of class #{vm.class.name} is not an Amazon instance or image" unless vm.nil?
end
update_job_message(ost, error)
job.signal(:abort, error, "error")
return
end
_log.debug("Cleaning request for #{vm.ems_ref} because #{message}")
update_job_message(ost, message)
job.signal(:abort, message, "error")
end
def cleanup_agents
_log.info("Clean up agents ...")
# Use the uniqe keypair name to filter out created instances
vms = ec2.instances(
:filters => [
{
:name => "key-name",
:values => [default_keypair_name],
},
{
:name => "instance-state-name",
# skip the state of 'terminated'
:values => ["pending", "running", "shutting-down", "stopping", "stopped"],
},
]
)
vms.each do |vm|
next if agent_ids.include?(vm.id)
vm.terminate
vm.wait_until_terminated
_log.info("Instance: #{vm.id} is deleted!")
end
end
def ssa_queue
AmazonSsaSupport::SsaQueue.new(
:ssa_bucket => ssa_bucket,
:reply_queue => reply_queue,
:request_queue => request_queue,
:region => ems.provider_region,
:sqs => sqs,
:s3 => s3
)
end
private
def scp_file(ip, username, auth_key, local_file, remote_file)
require 'net/scp'
Net::SCP.upload!(ip, username, local_file, remote_file, :ssh => {:key_data => auth_key})
rescue => err
_log.error(err.message)
raise("Failed to copy #{local_file} to #{ip}:#{remote_file}")
end
def agent_ids
# reset to empty
@agent_ids = []
bucket = s3.bucket(ssa_bucket)
return @agent_ids unless bucket.exists?
bucket.objects(:prefix => heartbeat_prefix).each do |obj|
id = obj.key.split('/')[2]
@agent_ids << id if ec2.instance(id).exists?
end
@agent_ids
end
# check timestamp of heartbeat of agent_id, return true if the last beat time in
# in the time interval
def agent_alive?(agent_id, interval = 180)
bucket = s3.bucket(ssa_bucket)
return false unless bucket.exists?
obj_id = "#{heartbeat_prefix}#{agent_id}"
obj = bucket.object(obj_id)
return false unless obj.exists?
last_heartbeat = obj.last_modified
_log.debug("#{obj.key}: Last heartbeat time stamp: #{last_heartbeat}")
Time.now.utc - last_heartbeat < interval && ec2.instance(agent_id).state.name == "running"
rescue => err
_log.error("#{agent_id}: #{err.message}")
false
end
def activate_agents
agent_ids.each do |id|
agent = ec2.instance(id)
if agent.state.name == "stopped"
agent.start
agent.wait_until_running
_log.info("Agent #{id} is activated to serve requests.")
return id
elsif agent.state.name == "running"
_log.info("Agent #{id} is running already.")
return id
else
_log.warn("Agent #{id} is in abnormal state: #{agent.state.name}.")
next
end
end
_log.warn("Failed to activate agents: #{agent_ids}. Will deploy a new agent!")
deploy_agent
end
def deploy_agent
_log.info("Deploying agent ...")
@deploying = true
kp = find_or_create_keypair
subnet = get_subnet_from_vpc_zone
# Use the first qualified subnet to deploy agent.
vpc_id = subnet.vpc_id
zone_name = subnet.availability_zone
subnet_id = subnet.subnet_id
_log.info("Smartstate agent will be deployed in vpc: [#{vpc_id}], zone: [#{zone_name}] subnet: [#{subnet_id}]")
security_group_id = find_or_create_security_group(vpc_id)
find_or_create_profile
# Based on Amazon doc, add a retry logic in creating instance to solve time issue on IAM role.
#
# Important
#
# After you create an IAM role, it may take several seconds for the permissions to propagate.
# If your first attempt to launch an instance with a role fails, wait a few seconds before trying again.
# For more information, see Troubleshooting Working with Roles in the IAM User Guide.
#
# (https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#launch-instance-with-role-console)
max_retries = 5
begin
instance = ec2.create_instances(
:iam_instance_profile => {:name => label},
:image_id => get_agent_image_id,
:instance_type => 't2.micro',
:key_name => kp.name,
:max_count => 1,
:min_count => 1,
:placement => {:availability_zone => zone_name},
:tag_specifications => [{:resource_type => "instance", :tags => [{:key => "Name", :value => label}]}],
:network_interfaces => [{
:associate_public_ip_address => true,
:delete_on_termination => true,
:device_index => 0,
:subnet_id => subnet_id,
:groups => [security_group_id]
}],
).first
rescue Aws::EC2::Errors::InvalidParameterValue => e
if max_retries.positive?
sleep 5
max_retries -= 1
_log.warn("Will retry #{max_retries} times due to error: #{e.message}")
retry
else
raise "Failed to create instance. Reason: #{e.message}"
end
end
ec2.client.wait_until(:instance_status_ok, :instance_ids => [instance.id])
_log.info("Start to load smartstate application, this may take a while ...")
setup_agent(instance)
_log.info("Docker #{docker_image} is loaded. Start to heartbeat.")
instance.id
rescue => err
_log.error(err.message)
instance&.terminate
instance&.wait_until_terminated
raise
end
def setup_agent(instance)
# Somehow instance.public_dns_name is empty, need to reinitialize to get it back
ip = ec2.instance(instance.id).public_dns_name || raise("Failed to get agent's public ip!")
key_name = instance.key_name
auth_key = get_keypair(key_name).try(:auth_key)
raise("Key [#{key_name}] is missing. Cannot SSH to the agent:#{instance.id}") if auth_key.nil?
ssh = LinuxAdmin::SSH.new(ip, agent_ami_login_user, auth_key)
# prepare work directory
perform_commands(ssh, ["sudo mkdir -p #{WORK_DIR}", "sudo chmod go+w #{WORK_DIR}"])
# scp the default setting yaml file
config = Tempfile.new('config.yml')
begin
config.write(create_config_yaml)
config.close
scp_file(ip, agent_ami_login_user, auth_key, config.path, "#{WORK_DIR}/config.yml")
ensure
config.unlink
end
# docker register
if docker_login_required?
raise("Need credentials to login") unless docker_auth
docker_username = docker_auth.userid
docker_password = docker_auth.password
login_params = [
docker_registry,
{
:u => docker_username,
:p => docker_password
}
]
login_cmd = AwesomeSpawn.build_command_line("sudo docker login", login_params)
perform_commands(ssh, [login_cmd])
end
# run docker image
image = docker_registry.present? ? "#{docker_registry}/#{docker_image}" : docker_image
run_params = [
:d,
{:restart => "always"},
['-v', '/dev:/host_dev'],
['-v', "#{WORK_DIR}/config.yml:#{WORK_DIR}/config.yml"],
:privileged,
image
]
run_cmd = AwesomeSpawn.build_command_line("sudo docker run", run_params)
perform_commands(ssh, [run_cmd])
end
def perform_commands(ssh, commands)
_log.debug("SSH commands: #{commands}")
result = ssh.perform_commands(commands)
unless result[:exit_status].zero?
_log.error("Failed to run command: #{result[:last_command]}")
raise("SSH failed to run command: #{result[:last_command]}")
end
end
def docker_auth
@ems.authentications.find_by(:authtype => "smartstate_docker")
end
def get_subnet_from_vpc_zone
vpcs = validated_vpcs
raise "Smartstate analysis needs a VPC whose enableDnsSupport/enableDnsHostnames are true and valid gateway/route setting!" if vpcs.empty?
ec2.client.describe_availability_zones.flat_map(&:availability_zones).each do |availability_zone|
vpcs.each do |vpc|
subnet = get_subnets(availability_zone.zone_name, vpc.vpc_id).try(:first)
return subnet if subnet
end
end
raise("No subnet is qualified to deploy smartstate agent!")
end
# To run SSA, VPC needs to have gateway attached, enableDnsSupport and enableDnsHostnames are enabled
def validated_vpcs
ec2.vpcs.select do |vpc|
enabled_dns_support?(vpc) && enabled_dns_hostnames?(vpc) && enabled_internet_gateways?(vpc)
end
end
def enabled_dns_hostnames?(vpc)
vpc.describe_attribute(:attribute => 'enableDnsHostnames', :vpc_id => vpc.vpc_id).enable_dns_hostnames.value
end
def enabled_dns_support?(vpc)
vpc.describe_attribute(:attribute => 'enableDnsSupport', :vpc_id => vpc.vpc_id).enable_dns_support.value
end
def enabled_internet_gateways?(vpc)
igw_ids = vpc.internet_gateways.map(&:internet_gateway_id)
unless igw_ids.empty? # Gateway attached
_log.debug("Found a gateway [#{igw_ids.first}] on VPC [#{vpc.id}]")
# Make sure gateway has vaild route
route_tables = vpc.route_tables.select do |route_table|
route_table.routes.any? { |route| route&.gateway_id == igw_ids.first }
end
unless route_tables.empty?
_log.debug("Found route tables #{route_tables} on the gateway [#{igw_ids.first}]")
subnets = route_tables.map { |rt| rt.associations.first.subnet_id }
# Now the gateway is proved to have associated route and subnet
return true if subnets.any?
end
_log.error("No route is configured on gateway [#{igw_ids.first}]")
return false
end
_log.error("No gateway is configured on VPC [#{vpc.id}]")
false
end
# Get Key Pair for SSH. Create a new one if not exists.
def find_or_create_keypair(keypair_name = default_keypair_name)
get_keypair(keypair_name) || begin
_log.info("KeyPair #{keypair_name} will be created!")
# Delete from Aws if existing
ec2.key_pair(keypair_name).try(:delete)
ManageIQ::Providers::CloudManager::AuthKeyPair.create_key_pair(@ems.id, :name => keypair_name)
end
end
def get_keypair(keypair_name = label)
@ems.authentications.find_by(:name => keypair_name)
end
def find_or_create_profile(profile_name = label, role_name = label)
ssa_profile = iam.instance_profile(profile_name)
ssa_profile = iam.create_instance_profile(:instance_profile_name => profile_name) unless ssa_profile.exists?
ssa_profile.wait_until_exists
find_or_create_role(role_name)
ssa_profile.add_role(:role_name => role_name) if ssa_profile.roles.empty?
ssa_profile
end
def find_or_create_role(role_name = label)
return iam.role(role_name) if role_exists?(role_name)
# Policy Generator:
policy_doc = {
:Version => "2012-10-17",
:Statement => [
{
:Effect => "Allow",
:Principal => { :Service => "ec2.amazonaws.com" },
:Action => "sts:AssumeRole"
}
]
}
role = iam.create_role(
:role_name => role_name,
:assume_role_policy_document => policy_doc.to_json
)
# grant all priviledges
%w(AmazonS3FullAccess AmazonEC2FullAccess AmazonSQSFullAccess).each do |policy|
role.attach_policy(:policy_arn => "arn:aws:iam::aws:policy/#{policy}")
end
role
end
def role_exists?(role_name)
!!iam.role(role_name).role_id
rescue ::Aws::IAM::Errors::NoSuchEntity
false
end
def find_or_create_security_group(vpc_id = nil, group_name = label)
security_group = ec2.client.describe_security_groups(
:filters => [{
:name => "group-name",
:values => [group_name]
}]
).flat_map(&:security_groups).first
return security_group.group_id unless security_group.nil?
# create security group if not exist
security_group = ec2.create_security_group(
:group_name => group_name,
:description => 'Security group for smartstate Agent',
:vpc_id => vpc_id
)
security_group.authorize_ingress(
:ip_permissions => [{
:ip_protocol => 'tcp',
:from_port => 22,
:to_port => 22,
:ip_ranges => [{
:cidr_ip => '0.0.0.0/0'
}]
}]
)
security_group.authorize_ingress(
:ip_permissions => [{
:ip_protocol => 'tcp',
:from_port => 80,
:to_port => 80,
:ip_ranges => [{
:cidr_ip => '0.0.0.0/0'
}]
}]
)
security_group.authorize_ingress(
:ip_permissions => [{
:ip_protocol => 'tcp',
:from_port => 443,
:to_port => 443,
:ip_ranges => [{
:cidr_ip => '0.0.0.0/0'
}]
}]
)
security_group.group_id
end
def get_subnets(az, vpc_id)
ec2.client.describe_subnets(
:filters => [
{
:name => "availability-zone",
:values => [az]
},
{
:name => "vpc-id",
:values => [vpc_id]
}
]
).flat_map(&:subnets)
end
# possible RHEL image name: values: [ "RHEL-7.3_HVM_GA*" ]
def get_agent_image_id(image_name = agent_ami_name)
image = ec2.client.describe_images(
:filters => [{
:name => "name",
:values => [image_name]
}]
).flat_map(&:images).first
raise("Unable to find AMI Image #{image_name} to launch Smartstate agent") if image.nil?
_log.info("AMI Image: #{image_name} [#{image.image_id}] is used to launch smartstate agent.")
image.image_id
end
def create_pem_file(pair_name = default_keypair_name)
keypair = find_or_create_keypair(pair_name)
pem_file_name = "#{pair_name}.pem"
File.write(pem_file_name, keypair.auth_key)
File.chmod(0o400, pem_file_name)
pem_file_name
end
def create_config_yaml
defaults = agent_coordinator_settings.to_hash.except(:agent_ami_name, :docker_image, :agent_label, :agent_ami_login_user, :docker_login_required, :response_thread_sleep_seconds)
defaults[:reply_queue] = reply_queue
defaults[:request_queue] = request_queue
defaults[:ssa_bucket] = ssa_bucket
defaults[:log_prefix] = log_prefix
defaults.to_yaml
end
def messages_in_queue(q_name)
q = sqs.get_queue_by_name(:queue_name => q_name)
q.attributes["ApproximateNumberOfMessages"].to_i + q.attributes["ApproximateNumberOfMessagesNotVisible"].to_i
rescue
0
end
def agent_coordinator_settings
@agent_coordinator_settings ||= Settings.ems.ems_amazon.agent_coordinator
end
def region
@ems.provider_region
end
def agent_log_level
ll = agent_coordinator_settings.try(:log_level) || AmazonSsaSupport::DEFAULT_LOG_LEVEL
ll.upcase
end
def heartbeat_prefix
AmazonSsaSupport::DEFAULT_HEARTBEAT_PREFIX
end
def heartbeat_interval
agent_coordinator_settings.try(:heartbeat_interval) || AmazonSsaSupport::DEFAULT_HEARTBEAT_INTERVAL
end
def ssa_bucket
@ssa_bucket ||= "#{AmazonSsaSupport::DEFAULT_BUCKET_PREFIX}-#{@ems.guid}".freeze
end
def request_queue
@request_queue ||= "#{AmazonSsaSupport::DEFAULT_REQUEST_QUEUE}-#{@ems.guid}".freeze
end
def reply_queue
@reply_queue ||= "#{AmazonSsaSupport::DEFAULT_REPLY_QUEUE}-#{@ems.guid}".freeze
end
def default_keypair_name
"#{label}-#{@ems.guid}".freeze
end
def reply_prefix
AmazonSsaSupport::DEFAULT_REPLY_PREFIX
end
def log_prefix
AmazonSsaSupport::DEFAULT_LOG_PREFIX
end
def agent_ami_name
agent_coordinator_settings.try(:agent_ami_name) || raise("Please specify AMI image name for smartstate agent")
end
def agent_ami_login_user
agent_coordinator_settings.try(:agent_ami_login_user) || raise("Please specify AMI image's login user name for smartstate agent")
end
def docker_image
agent_coordinator_settings.try(:docker_image) || raise("Please specify docker image name for smartstate agent")
end
def docker_registry
agent_coordinator_settings.try(:docker_registry)
end
def docker_login_required?
agent_coordinator_settings.try(:docker_login_required)
end
# This label is used to name all objects (profile/role/instance, etc) we created in AWS.
# Make it configurable for upstream/downstream name conventions
def label
@label ||= agent_coordinator_settings.try(:agent_label) || SSA_LABEL
end
end