bosh_aws_cpi/lib/cloud/aws/cloud.rb
# Copyright (c) 2009-2012 VMware, Inc.
require 'cloud/aws/stemcell_finder'
module Bosh::AwsCloud
class Cloud < Bosh::Cloud
include Helpers
# default maximum number of times to retry an AWS API call
DEFAULT_MAX_RETRIES = 2
METADATA_TIMEOUT = 5 # in seconds
DEVICE_POLL_TIMEOUT = 60 # in seconds
attr_reader :ec2
attr_reader :registry
attr_reader :options
attr_accessor :logger
##
# Initialize BOSH AWS CPI. The contents of sub-hashes are defined in the {file:README.md}
# @param [Hash] options CPI options
# @option options [Hash] aws AWS specific options
# @option options [Hash] agent agent options
# @option options [Hash] registry agent options
def initialize(options)
@options = options.dup.freeze
validate_options
@logger = Bosh::Clouds::Config.logger
initialize_aws
initialize_registry
@metadata_lock = Mutex.new
end
##
# Reads current instance id from EC2 metadata. We are assuming
# instance id cannot change while current process is running
# and thus memoizing it.
def current_vm_id
@metadata_lock.synchronize do
return @current_vm_id if @current_vm_id
client = HTTPClient.new
client.connect_timeout = METADATA_TIMEOUT
# Using 169.254.169.254 is an EC2 convention for getting
# instance metadata
uri = "http://169.254.169.254/latest/meta-data/instance-id/"
response = client.get(uri)
unless response.status == 200
cloud_error("Instance metadata endpoint returned " \
"HTTP #{response.status}")
end
@current_vm_id = response.body
end
rescue HTTPClient::TimeoutError
cloud_error("Timed out reading instance metadata, " \
"please make sure CPI is running on EC2 instance")
end
##
# Create an EC2 instance and wait until it's in running state
# @param [String] agent_id agent id associated with new VM
# @param [String] stemcell_id AMI id of the stemcell used to
# create the new instance
# @param [Hash] resource_pool resource pool specification
# @param [Hash] network_spec network specification, if it contains
# security groups they must already exist
# @param [optional, Array] disk_locality list of disks that
# might be attached to this instance in the future, can be
# used as a placement hint (i.e. instance will only be created
# if resource pool availability zone is the same as disk
# availability zone)
# @param [optional, Hash] environment data to be merged into
# agent settings
# @return [String] EC2 instance id of the new virtual machine
def create_vm(agent_id, stemcell_id, resource_pool, network_spec, disk_locality = nil, environment = nil)
with_thread_name("create_vm(#{agent_id}, ...)") do
# do this early to fail fast
stemcell = StemcellFinder.find_by_region_and_id(region, stemcell_id)
begin
instance_manager = InstanceManager.new(region, registry, az_selector)
instance = instance_manager.
create(agent_id, stemcell.image_id, resource_pool, network_spec, (disk_locality || []), environment, options)
logger.info("Creating new instance '#{instance.id}'")
NetworkConfigurator.new(network_spec).configure(region, instance)
registry_settings = initial_agent_settings(
agent_id,
network_spec,
environment,
stemcell.root_device_name,
)
registry.update_settings(instance.id, registry_settings)
instance.id
rescue => e # is this rescuing too much?
logger.error(%Q[Failed to create instance: #{e.message}\n#{e.backtrace.join("\n")}])
instance_manager.terminate(instance.id, fast_path_delete?) if instance
raise e
end
end
end
def default_ec2_endpoint
['ec2', aws_region, 'amazonaws.com'].compact.join('.')
end
def default_elb_endpoint
['elasticloadbalancing', aws_region, 'amazonaws.com'].compact.join('.')
end
##
# Delete EC2 instance ("terminate" in AWS language) and wait until
# it reports as terminated
# @param [String] instance_id EC2 instance id
def delete_vm(instance_id)
with_thread_name("delete_vm(#{instance_id})") do
logger.info("Deleting instance '#{instance_id}'")
InstanceManager.new(region, registry).terminate(instance_id, fast_path_delete?)
end
end
##
# Reboot EC2 instance
# @param [String] instance_id EC2 instance id
def reboot_vm(instance_id)
with_thread_name("reboot_vm(#{instance_id})") do
InstanceManager.new(region, registry).reboot(instance_id)
end
end
##
# Has EC2 instance
# @param [String] instance_id EC2 instance id
def has_vm?(instance_id)
with_thread_name("has_vm?(#{instance_id})") do
InstanceManager.new(region, registry).has_instance?(instance_id)
end
end
##
# Creates a new EBS volume
# @param [Integer] size disk size in MiB
# @param [optional, String] instance_id EC2 instance id
# of the VM that this disk will be attached to
# @return [String] created EBS volume id
def create_disk(size, instance_id = nil)
with_thread_name("create_disk(#{size}, #{instance_id})") do
validate_disk_size(size)
# if the disk is created for an instance, use the same availability zone as they must match
volume = @ec2.volumes.create(:size => (size / 1024.0).ceil,
:availability_zone => @az_selector.select_availability_zone(instance_id))
logger.info("Creating volume '#{volume.id}'")
ResourceWait.for_volume(volume: volume, state: :available)
volume.id
end
end
def validate_disk_size(size)
raise ArgumentError, "disk size needs to be an integer" unless size.kind_of?(Integer)
cloud_error("AWS CPI minimum disk size is 1 GiB") if size < 1024
cloud_error("AWS CPI maximum disk size is 1 TiB") if size > 1024 * 1000
end
##
# Delete EBS volume
# @param [String] disk_id EBS volume id
# @raise [Bosh::Clouds::CloudError] if disk is not in available state
def delete_disk(disk_id)
with_thread_name("delete_disk(#{disk_id})") do
volume = @ec2.volumes[disk_id]
logger.info("Deleting volume `#{volume.id}'")
tries = 10
sleep_cb = ResourceWait.sleep_callback("Waiting for volume `#{volume.id}' to be deleted", tries)
ensure_cb = Proc.new do |retries|
cloud_error("Timed out waiting to delete volume `#{volume.id}'") if retries == tries
end
error = AWS::EC2::Errors::Client::VolumeInUse
Bosh::Common.retryable(tries: tries, sleep: sleep_cb, on: error, ensure: ensure_cb) do
volume.delete
true # return true to only retry on Exceptions
end
if fast_path_delete?
begin
TagManager.tag(volume, "Name", "to be deleted")
logger.info("Volume `#{disk_id}' has been marked for deletion")
rescue AWS::EC2::Errors::InvalidVolume::NotFound
# Once in a blue moon AWS if actually fast enough that the volume is already gone
# when we get here, and if it is, our work here is done!
end
return
end
ResourceWait.for_volume(volume: volume, state: :deleted)
logger.info("Volume `#{disk_id}' has been deleted")
end
end
# Attach an EBS volume to an EC2 instance
# @param [String] instance_id EC2 instance id of the virtual machine to attach the disk to
# @param [String] disk_id EBS volume id of the disk to attach
def attach_disk(instance_id, disk_id)
with_thread_name("attach_disk(#{instance_id}, #{disk_id})") do
instance = @ec2.instances[instance_id]
volume = @ec2.volumes[disk_id]
device_name = attach_ebs_volume(instance, volume)
update_agent_settings(instance) do |settings|
settings["disks"] ||= {}
settings["disks"]["persistent"] ||= {}
settings["disks"]["persistent"][disk_id] = device_name
end
logger.info("Attached `#{disk_id}' to `#{instance_id}'")
end
end
# Detach an EBS volume from an EC2 instance
# @param [String] instance_id EC2 instance id of the virtual machine to detach the disk from
# @param [String] disk_id EBS volume id of the disk to detach
def detach_disk(instance_id, disk_id)
with_thread_name("detach_disk(#{instance_id}, #{disk_id})") do
instance = @ec2.instances[instance_id]
volume = @ec2.volumes[disk_id]
update_agent_settings(instance) do |settings|
settings["disks"] ||= {}
settings["disks"]["persistent"] ||= {}
settings["disks"]["persistent"].delete(disk_id)
end
detach_ebs_volume(instance, volume)
logger.info("Detached `#{disk_id}' from `#{instance_id}'")
end
end
def get_disks(vm_id)
disks = []
@ec2.instances[vm_id].block_devices.each do |block_device|
if block_device[:ebs]
disks << block_device[:ebs][:volume_id]
end
end
disks
end
# Take snapshot of disk
# @param [String] disk_id disk id of the disk to take the snapshot of
# @return [String] snapshot id
def snapshot_disk(disk_id, metadata)
with_thread_name("snapshot_disk(#{disk_id})") do
volume = @ec2.volumes[disk_id]
devices = []
volume.attachments.each {|attachment| devices << attachment.device}
name = [:deployment, :job, :index].collect { |key| metadata[key] }
name << devices.first.split('/').last unless devices.empty?
snapshot = volume.create_snapshot(name.join('/'))
logger.info("snapshot '#{snapshot.id}' of volume '#{disk_id}' created")
[:agent_id, :instance_id, :director_name, :director_uuid].each do |key|
TagManager.tag(snapshot, key, metadata[key])
end
TagManager.tag(snapshot, :device, devices.first) unless devices.empty?
TagManager.tag(snapshot, 'Name', name.join('/'))
ResourceWait.for_snapshot(snapshot: snapshot, state: :completed)
snapshot.id
end
end
# Delete a disk snapshot
# @param [String] snapshot_id snapshot id to delete
def delete_snapshot(snapshot_id)
with_thread_name("delete_snapshot(#{snapshot_id})") do
snapshot = @ec2.snapshots[snapshot_id]
if snapshot.status == :in_use
raise Bosh::Clouds::CloudError, "snapshot '#{snapshot.id}' can not be deleted as it is in use"
end
snapshot.delete
logger.info("snapshot '#{snapshot_id}' deleted")
end
end
# Configure network for an EC2 instance
# @param [String] instance_id EC2 instance id
# @param [Hash] network_spec network properties
# @raise [Bosh::Clouds:NotSupported] if there's a network change that requires the recreation of the VM
def configure_networks(instance_id, network_spec)
with_thread_name("configure_networks(#{instance_id}, ...)") do
logger.info("Configuring '#{instance_id}' to use new network settings: #{network_spec.pretty_inspect}")
instance = @ec2.instances[instance_id]
network_configurator = NetworkConfigurator.new(network_spec)
compare_security_groups(instance, network_spec)
compare_private_ip_addresses(instance, network_configurator.private_ip)
network_configurator.configure(@ec2, instance)
update_agent_settings(instance) do |settings|
settings["networks"] = network_spec
end
end
end
# If the security groups change, we need to recreate the VM
# as you can't change the security group of a running instance,
# we need to send the InstanceUpdater a request to do it for us
def compare_security_groups(instance, network_spec)
actual_group_names = instance.security_groups.collect { |sg| sg.name }
specified_group_names = extract_security_group_names(network_spec)
if specified_group_names.empty?
new_group_names = Array(aws_properties["default_security_groups"])
else
new_group_names = specified_group_names
end
unless actual_group_names.sort == new_group_names.sort
raise Bosh::Clouds::NotSupported,
"security groups change requires VM recreation: %s to %s" %
[actual_group_names.join(", "), new_group_names.join(", ")]
end
end
##
# Compares actual instance private IP addresses with the IP address specified at the network spec
#
# @param [AWS::EC2::Instance] instance EC2 instance
# @param [String] specified_ip_address IP address specified at the network spec (if Manual Network)
# @return [void]
# @raise [Bosh::Clouds:NotSupported] If the IP address change, we need to recreate the VM as you can't
# change the IP address of a running server, so we need to send the InstanceUpdater a request to do it for us
def compare_private_ip_addresses(instance, specified_ip_address)
actual_ip_address = instance.private_ip_address
unless specified_ip_address.nil? || actual_ip_address == specified_ip_address
raise Bosh::Clouds::NotSupported,
"IP address change requires VM recreation: %s to %s" %
[actual_ip_address, specified_ip_address]
end
end
##
# Creates a new EC2 AMI using stemcell image.
# This method can only be run on an EC2 instance, as image creation
# involves creating and mounting new EBS volume as local block device.
# @param [String] image_path local filesystem path to a stemcell image
# @param [Hash] cloud_properties AWS-specific stemcell properties
# @option cloud_properties [String] kernel_id
# AKI, auto-selected based on the region, unless specified
# @option cloud_properties [String] root_device_name
# block device path (e.g. /dev/sda1), provided by the stemcell manifest, unless specified
# @option cloud_properties [String] architecture
# instruction set architecture (e.g. x86_64), provided by the stemcell manifest,
# unless specified
# @option cloud_properties [String] disk (2048)
# root disk size
# @return [String] EC2 AMI name of the stemcell
def create_stemcell(image_path, stemcell_properties)
with_thread_name("create_stemcell(#{image_path}...)") do
creator = StemcellCreator.new(region, stemcell_properties)
return creator.fake.id if creator.fake?
begin
# These variables are used in 'ensure' clause
instance = nil
volume = nil
# 1. Create and mount new EBS volume (2GB default)
disk_size = stemcell_properties["disk"] || 2048
volume_id = create_disk(disk_size, current_vm_id)
volume = @ec2.volumes[volume_id]
instance = @ec2.instances[current_vm_id]
sd_name = attach_ebs_volume(instance, volume)
ebs_volume = find_ebs_device(sd_name)
logger.info("Creating stemcell with: '#{volume.id}' and '#{stemcell_properties.inspect}'")
creator.create(volume, ebs_volume, image_path).id
rescue => e
logger.error(e)
raise e
ensure
if instance && volume
detach_ebs_volume(instance, volume, true)
delete_disk(volume.id)
end
end
end
end
# Delete a stemcell and the accompanying snapshots
# @param [String] stemcell_id EC2 AMI name of the stemcell to be deleted
def delete_stemcell(stemcell_id)
with_thread_name("delete_stemcell(#{stemcell_id})") do
stemcell = StemcellFinder.find_by_region_and_id(region, stemcell_id)
stemcell.delete
end
end
# Add tags to an instance. In addition to the suplied tags,
# it adds a 'Name' tag as it is shown in the AWS console.
# @param [String] vm vm id that was once returned by {#create_vm}
# @param [Hash] metadata metadata key/value pairs
# @return [void]
def set_vm_metadata(vm, metadata)
instance = @ec2.instances[vm]
metadata.each_pair do |key, value|
TagManager.tag(instance, key, value)
end
job = metadata[:job]
index = metadata[:index]
if job && index
name = "#{job}/#{index}"
elsif metadata[:compiling]
name = "compiling/#{metadata[:compiling]}"
end
TagManager.tag(instance, "Name", name) if name
rescue AWS::EC2::Errors::TagLimitExceeded => e
logger.error("could not tag #{instance.id}: #{e.message}")
end
def find_ebs_device(sd_name)
xvd_name = sd_name.gsub(/^\/dev\/sd/, "/dev/xvd")
DEVICE_POLL_TIMEOUT.times do
if File.blockdev?(sd_name)
return sd_name
elsif File.blockdev?(xvd_name)
return xvd_name
end
sleep(1)
end
cloud_error("Cannot find EBS volume on current instance")
end
private
attr_reader :az_selector
attr_reader :region
def agent_properties
@agent_properties ||= options.fetch('agent', {})
end
def aws_properties
@aws_properties ||= options.fetch('aws')
end
def aws_region
@aws_region ||= aws_properties.fetch('region', nil)
end
def fast_path_delete?
aws_properties.fetch('fast_path_delete', false)
end
def initialize_aws
aws_logger = logger
aws_params = {
access_key_id: aws_properties['access_key_id'],
secret_access_key: aws_properties['secret_access_key'],
ec2_endpoint: aws_properties['ec2_endpoint'] || default_ec2_endpoint,
elb_endpoint: aws_properties['elb_endpoint'] || default_elb_endpoint,
max_retries: aws_properties['max_retries'] || DEFAULT_MAX_RETRIES ,
logger: aws_logger
}
aws_params[:proxy_uri] = aws_properties['proxy_uri'] if aws_properties['proxy_uri']
# AWS Ruby SDK is threadsafe but Ruby autoload isn't,
# so we need to trigger eager autoload while constructing CPI
AWS.eager_autoload!
AWS.config(aws_params)
@ec2 = AWS::EC2.new
@region = @ec2.regions[aws_region]
@az_selector = AvailabilityZoneSelector.new(@region, aws_properties['default_availability_zone'])
end
def initialize_registry
registry_properties = options.fetch('registry')
registry_endpoint = registry_properties.fetch('endpoint')
registry_user = registry_properties.fetch('user')
registry_password = registry_properties.fetch('password')
# Registry updates are not really atomic in relation to
# EC2 API calls, so they might get out of sync. Cloudcheck
# is supposed to fix that.
@registry = Bosh::Registry::Client.new(registry_endpoint,
registry_user,
registry_password)
end
def update_agent_settings(instance)
unless block_given?
raise ArgumentError, "block is not provided"
end
settings = registry.read_settings(instance.id)
yield settings
registry.update_settings(instance.id, settings)
end
def attach_ebs_volume(instance, volume)
device_name = select_device_name(instance)
cloud_error('Instance has too many disks attached') unless device_name
# Work around AWS eventual (in)consistency:
# even tough we don't call attach_disk until the disk is ready,
# AWS might still lie and say that the disk isn't ready yet, so
# we try again just to be really sure it is telling the truth
attachment = nil
Bosh::Common.retryable(tries: 15, on: AWS::EC2::Errors::IncorrectState) do
attachment = volume.attach_to(instance, device_name)
end
logger.info("Attaching '#{volume.id}' to '#{instance.id}' as '#{device_name}'")
ResourceWait.for_attachment(attachment: attachment, state: :attached)
device_name = attachment.device
logger.info("Attached '#{volume.id}' to '#{instance.id}' as '#{device_name}'")
device_name
end
def select_device_name(instance)
device_names = Set.new(instance.block_device_mappings.to_hash.keys)
('f'..'p').each do |char| # f..p is what console suggests
# Some kernels will remap sdX to xvdX, so agent needs
# to lookup both (sd, then xvd)
device_name = "/dev/sd#{char}"
return device_name unless device_names.include?(device_name)
logger.warn("'#{device_name}' on '#{instance.id}' is taken")
end
nil
end
def detach_ebs_volume(instance, volume, force=false)
mappings = instance.block_device_mappings.to_hash
device_map = mappings.inject({}) do |hash, (device_name, attachment)|
hash[attachment.volume.id] = device_name
hash
end
if device_map[volume.id].nil?
raise Bosh::Clouds::DiskNotAttached.new(true),
"Disk `#{volume.id}' is not attached to instance `#{instance.id}'"
end
attachment = volume.detach_from(instance, device_map[volume.id], force: force)
logger.info("Detaching `#{volume.id}' from `#{instance.id}'")
ResourceWait.for_attachment(attachment: attachment, state: :detached)
end
##
# Checks if options passed to CPI are valid and can actually
# be used to create all required data structures etc.
#
def validate_options
required_keys = {
"aws" => ["access_key_id", "secret_access_key", "region", "default_key_name"],
"registry" => ["endpoint", "user", "password"],
}
missing_keys = []
required_keys.each_pair do |key, values|
values.each do |value|
if (!options.has_key?(key) || !options[key].has_key?(value))
missing_keys << "#{key}:#{value}"
end
end
end
raise ArgumentError, "missing configuration parameters > #{missing_keys.join(', ')}" unless missing_keys.empty?
end
# Generates initial agent settings. These settings will be read by agent
# from AWS registry (also a BOSH component) on a target instance. Disk
# conventions for amazon are:
# system disk: /dev/sda
# ephemeral disk: /dev/sdb
# EBS volumes can be configured to map to other device names later (sdf
# through sdp, also some kernels will remap sd* to xvd*).
#
# @param [String] agent_id Agent id (will be picked up by agent to
# assume its identity
# @param [Hash] network_spec Agent network spec
# @param [Hash] environment
# @param [String] root_device_name root device, e.g. /dev/sda1
# @return [Hash]
def initial_agent_settings(agent_id, network_spec, environment, root_device_name)
settings = {
"vm" => {
"name" => "vm-#{SecureRandom.uuid}"
},
"agent_id" => agent_id,
"networks" => network_spec,
"disks" => {
"system" => root_device_name,
"ephemeral" => "/dev/sdb",
"persistent" => {}
}
}
settings["env"] = environment if environment
settings.merge(agent_properties)
end
end
end