core/autoscaling.go
// Copyright (c) 2016-2022 Cristian Măgherușan-Stanciu
// Licensed under the Open Software License version 3.0
package autospotting
import (
"errors"
"fmt"
"log"
"strings"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/autoscaling"
"github.com/aws/aws-sdk-go/service/codedeploy"
"github.com/aws/aws-sdk-go/service/ec2"
)
type autoScalingGroup struct {
*autoscaling.Group
name string
region *region
launchConfiguration *launchConfiguration
launchTemplate *launchTemplate
autospotting *AutoSpotting
instances instances
config AutoScalingConfig
}
func (a *autoScalingGroup) loadLaunchConfiguration() (*launchConfiguration, error) {
//already done
if a.launchConfiguration != nil {
return a.launchConfiguration, nil
}
lcName := a.LaunchConfigurationName
if lcName == nil {
return nil, errors.New("missing launch configuration")
}
svc := a.region.services.autoScaling
params := &autoscaling.DescribeLaunchConfigurationsInput{
LaunchConfigurationNames: []*string{lcName},
}
resp, err := svc.DescribeLaunchConfigurations(params)
if err != nil {
log.Println(err.Error())
return nil, err
}
a.launchConfiguration = &launchConfiguration{
LaunchConfiguration: resp.LaunchConfigurations[0],
}
return a.launchConfiguration, nil
}
func (a *autoScalingGroup) loadLaunchTemplate() (*launchTemplate, error) {
//already done
if a.launchTemplate != nil {
return a.launchTemplate, nil
}
lt := a.LaunchTemplate
if lt == nil {
return nil, errors.New("missing launch template")
}
ltID := lt.LaunchTemplateId
ltVer := lt.Version
if ltID == nil || ltVer == nil {
return nil, errors.New("missing launch template")
}
svc := a.region.services.ec2
params := &ec2.DescribeLaunchTemplateVersionsInput{
LaunchTemplateId: ltID,
Versions: []*string{ltVer},
}
resp, err := svc.DescribeLaunchTemplateVersions(params)
if err != nil {
log.Println(err.Error())
return nil, err
}
if len(resp.LaunchTemplateVersions) == 0 {
return nil, errors.New("missing launch template")
}
ltv := resp.LaunchTemplateVersions[0]
params2 := &ec2.DescribeImagesInput{
ImageIds: []*string{ltv.LaunchTemplateData.ImageId},
}
resp2, err2 := svc.DescribeImages(params2)
if err2 != nil {
log.Println(err2.Error())
return nil, err2
}
if len(resp2.Images) == 0 {
return nil, errors.New("missing launch template image")
}
a.launchTemplate = &launchTemplate{
LaunchTemplateVersion: ltv,
Image: resp2.Images[0],
}
return a.launchTemplate, nil
}
func (a *autoScalingGroup) needReplaceOnDemandInstances() (bool, int64) {
onDemandRunning, totalRunning := a.alreadyRunningInstanceCount(false, nil)
debug.Printf("onDemandRunning=%v totalRunning=%v a.minOnDemand=%v",
onDemandRunning, totalRunning, a.config.MinOnDemand)
if totalRunning == 0 {
log.Printf("The group %s is currently empty or in the process of launching new instances",
a.name)
return true, totalRunning
}
if onDemandRunning > a.config.MinOnDemand {
log.Println("Currently more than enough OnDemand instances running")
return true, totalRunning
}
if onDemandRunning == a.config.MinOnDemand {
log.Println("Currently OnDemand running equals to the required number, skipping run")
return false, totalRunning
}
log.Println("Currently fewer OnDemand instances than required !")
return false, totalRunning
}
func (a *autoScalingGroup) cronEventAction() runer {
a.scanInstances()
a.loadDefaultConfig()
a.loadConfigFromTags()
shouldRun := cronRunAction(time.Now(), a.config.CronSchedule, a.config.CronTimezone, a.config.CronScheduleState)
debug.Println(a.region.name, a.name, "Should take replacement actions:", shouldRun)
if !shouldRun {
log.Println(a.region.name, a.name,
"Skipping run, outside the enabled cron run schedule")
return skipRun{reason: "outside-cron-schedule"}
}
onDemandInstance := a.getAnyUnprotectedOnDemandInstance()
if need, total := a.needReplaceOnDemandInstances(); !need {
log.Printf("Not allowed to replace any more of the running OD instances in %s, currently running %d on-demand instances", a.name, total)
return skipRun{reason: "not-allowed-to-replace-more-instances"}
}
if onDemandInstance == nil {
log.Println(a.region.name, a.name,
"No running unprotected on-demand instances were found, nothing to do here...")
return skipRun{reason: "no-instances-to-replace"}
}
recapText := fmt.Sprintf("%s Triggered replacement for on-demand instance %s", a.name, *onDemandInstance.Instance.InstanceId)
a.region.conf.FinalRecap[a.region.name] = append(a.region.conf.FinalRecap[a.region.name], recapText)
return replaceAndTerminateInstance{target{
autospotting: a.autospotting,
onDemandInstance: onDemandInstance,
},
}
}
func (a *autoScalingGroup) scanInstances() instances {
log.Println("Adding instances to", a.name)
a.instances = makeInstances()
for _, inst := range a.Instances {
i := a.region.instances.get(*inst.InstanceId)
if i == nil {
debug.Println("Missing instance data for ", *inst.InstanceId, "scanning it again")
a.region.scanInstance(inst.InstanceId)
i = a.region.instances.get(*inst.InstanceId)
if i == nil {
debug.Println("Failed to scan instance", *inst.InstanceId)
continue
}
}
i.asg, i.region = a, a.region
if inst.ProtectedFromScaleIn != nil {
i.protected = i.protected || *inst.ProtectedFromScaleIn
}
if i.isSpot() {
i.price = i.typeInfo.pricing.spot[*i.Placement.AvailabilityZone]
} else {
i.price = i.typeInfo.pricing.onDemand + i.typeInfo.pricing.premium
}
// Avoid adding instance in Terminating (Wait|Proceed) Lifecycle State
if strings.HasPrefix(*inst.LifecycleState, "Terminating") {
continue
}
a.instances.add(i)
}
return a.instances
}
// Returns the information about the first running instance found in
// the group, while iterating over all instances from the
// group. It can also filter by AZ and Lifecycle.
func (a *autoScalingGroup) getInstance(
availabilityZone *string,
onDemand bool,
considerInstanceProtection bool,
) *instance {
for i := range a.instances.instances() {
// instance is running
if *i.State.Name == ec2.InstanceStateNameRunning {
// the InstanceLifecycle attribute is non-nil only for spot instances,
// where it contains the value "spot", if we're looking for on-demand
// instances only, then we have to skip the current instance.
if (onDemand && i.isSpot()) || (!onDemand && !i.isSpot()) {
debug.Println(a.name, "skipping instance", *i.InstanceId,
"having different lifecycle than what we're looking for")
continue
}
protT, err := i.isProtectedFromTermination()
if err != nil {
debug.Println(a.name, "failed to determine termination protection for", *i.InstanceId)
}
if considerInstanceProtection && (i.isProtectedFromScaleIn() || protT) {
debug.Println(a.name, "skipping protected instance", *i.InstanceId)
continue
}
if (availabilityZone != nil) && (*availabilityZone != *i.Placement.AvailabilityZone) {
debug.Println(a.name, "skipping instance", *i.InstanceId,
"placed in a different AZ than what we're looking for")
continue
}
return i
}
}
return nil
}
func (a *autoScalingGroup) getAnyUnprotectedOnDemandInstance() *instance {
return a.getInstance(nil, true, true)
}
func (a *autoScalingGroup) hasMemberInstance(inst *instance) bool {
for _, member := range a.Instances {
if *member.InstanceId == *inst.InstanceId {
return true
}
}
return false
}
func (a *autoScalingGroup) waitForInstanceStatus(instanceID *string, status string, maxRetry int) error {
isInstanceInStatus := false
for retry := 0; !isInstanceInStatus; retry++ {
if retry > maxRetry {
log.Printf("Failed waiting instance %s in status %s",
*instanceID, status)
break
} else {
result, err := a.region.services.autoScaling.DescribeAutoScalingInstances(
&autoscaling.DescribeAutoScalingInstancesInput{
InstanceIds: []*string{instanceID},
})
if err != nil {
log.Println(err.Error())
continue
}
autoScalingInstances := result.AutoScalingInstances
if len(autoScalingInstances) > 0 {
if instanceStatus := *autoScalingInstances[0].LifecycleState; instanceStatus != status {
log.Printf("Waiting for instance %s to be in status %s [%s]",
*instanceID, status, instanceStatus)
} else {
isInstanceInStatus = true
return nil
}
} else {
log.Printf("Waiting for instance %s to be in AutoScalingGroup with status %s",
*instanceID, status)
}
sleepTime := 10 - (2 * retry)
if sleepTime <= 0 {
sleepTime = 1
}
time.Sleep(time.Duration(sleepTime) * time.Second)
}
}
return errors.New("")
}
func (a *autoScalingGroup) findUnattachedInstanceLaunchedForThisASG() *instance {
for inst := range a.region.instances.instances() {
for _, tag := range inst.Tags {
if *tag.Key == "launched-for-asg" && *tag.Value == a.name {
if !a.hasMemberInstance(inst) {
return inst
}
}
}
}
return nil
}
func (a *autoScalingGroup) getAllowedInstanceTypes(baseInstance *instance) []string {
var allowedInstanceTypesTag string
// By default take the command line parameter
allowed := strings.Replace(a.region.conf.AllowedInstanceTypes, " ", ",", -1)
// Check option of allowed instance types
// If we have that option we don't need to calculate the compatible instance type.
if tagValue := a.getTagValue(AllowedInstanceTypesTag); tagValue != nil {
allowedInstanceTypesTag = strings.Replace(*tagValue, " ", ",", -1)
}
// ASG Tag config has a priority to override
if allowedInstanceTypesTag != "" {
allowed = allowedInstanceTypesTag
}
if allowed == "current" {
return []string{baseInstance.typeInfo.instanceType}
}
// Simple trick to avoid returning list with empty elements
return strings.FieldsFunc(allowed, func(c rune) bool {
return c == ','
})
}
func (a *autoScalingGroup) getDisallowedInstanceTypes(baseInstance *instance) []string {
var disallowedInstanceTypesTag string
// By default take the command line parameter
disallowed := strings.Replace(a.region.conf.DisallowedInstanceTypes, " ", ",", -1)
// Check option of disallowed instance types
// If we have that option we don't need to calculate the compatible instance type.
if tagValue := a.getTagValue(DisallowedInstanceTypesTag); tagValue != nil {
disallowedInstanceTypesTag = strings.Replace(*tagValue, " ", ",", -1)
}
// ASG Tag config has a priority to override
if disallowedInstanceTypesTag != "" {
disallowed = disallowedInstanceTypesTag
}
// Simple trick to avoid returning list with empty elements
return strings.FieldsFunc(disallowed, func(c rune) bool {
return c == ','
})
}
func (a *autoScalingGroup) setAutoScalingMaxSize(maxSize int64) error {
svc := a.region.services.autoScaling
_, err := svc.UpdateAutoScalingGroup(
&autoscaling.UpdateAutoScalingGroupInput{
AutoScalingGroupName: aws.String(a.name),
MaxSize: aws.Int64(maxSize),
})
if err != nil {
// Print the error, cast err to awserr.Error to get the Code and
// Message from an error.
log.Println(err.Error())
return err
}
return nil
}
func (a *autoScalingGroup) attachSpotInstance(spotInstanceID string, wait bool) error {
if wait {
log.Printf("Waiting for instance %s to start", spotInstanceID)
err := a.region.services.ec2.WaitUntilInstanceRunning(
&ec2.DescribeInstancesInput{
InstanceIds: []*string{aws.String(spotInstanceID)},
})
if err != nil {
log.Printf("Issue while waiting for instance %s to start: %v",
spotInstanceID, err.Error())
}
}
log.Printf("Attaching instance %s to ASG %v", spotInstanceID, a.name)
resp, err := a.region.services.autoScaling.AttachInstances(
&autoscaling.AttachInstancesInput{
AutoScalingGroupName: aws.String(a.name),
InstanceIds: []*string{
&spotInstanceID,
},
},
)
if err != nil && !strings.Contains(err.Error(), "is already part of AutoScalingGroup") {
log.Println(err.Error())
// Pretty-print the response data.
log.Println(resp)
return err
}
log.Printf("Waiting for instance %s to become in service", spotInstanceID)
if err := a.waitForInstanceStatus(&spotInstanceID, "InService", 5); err != nil {
log.Printf("Spot instance %s couldn't be attached to the group %s: %v",
spotInstanceID, a.name, err.Error())
return err
}
if hasLH, hook := a.hasCodeDeployLifecycleHook(); hasLH {
a.triggerCodeDeployDeployment(hook, spotInstanceID)
}
return nil
}
// Terminates an instance from the group using the
// TerminateInstanceInAutoScalingGroup api call.
func (a *autoScalingGroup) terminateInstanceInAutoScalingGroup(
instanceID *string, wait bool, decreaseCapacity bool) error {
if wait {
err := a.region.services.ec2.WaitUntilInstanceRunning(
&ec2.DescribeInstancesInput{
InstanceIds: []*string{instanceID},
})
if err != nil {
log.Printf("Issue while waiting for instance %v to start: %v",
*instanceID, err.Error())
}
if err = a.waitForInstanceStatus(instanceID, "InService", 5); err != nil {
log.Printf("Instance %s is still not InService, trying to terminate it anyway.",
*instanceID)
}
}
log.Println(a.region.name,
a.name,
"Terminating instance:",
*instanceID)
asSvc := a.region.services.autoScaling
resDLH, err := asSvc.DescribeLifecycleHooks(
&autoscaling.DescribeLifecycleHooksInput{
AutoScalingGroupName: a.AutoScalingGroupName,
})
if err != nil {
log.Println(err.Error())
return err
}
for _, hook := range resDLH.LifecycleHooks {
asSvc.CompleteLifecycleAction(
&autoscaling.CompleteLifecycleActionInput{
AutoScalingGroupName: a.AutoScalingGroupName,
InstanceId: instanceID,
LifecycleHookName: hook.LifecycleHookName,
LifecycleActionResult: aws.String("ABANDON"),
})
}
resTIIASG, err := asSvc.TerminateInstanceInAutoScalingGroup(
&autoscaling.TerminateInstanceInAutoScalingGroupInput{
InstanceId: instanceID,
ShouldDecrementDesiredCapacity: aws.Bool(decreaseCapacity),
})
if err != nil {
log.Println(err.Error())
return err
}
if resTIIASG != nil && resTIIASG.Activity != nil && resTIIASG.Activity.Description != nil {
log.Println(*resTIIASG.Activity.Description)
}
return nil
}
// Counts the number of already running instances on-demand or spot, in any or a specific AZ.
func (a *autoScalingGroup) alreadyRunningInstanceCount(
spot bool, availabilityZone *string) (int64, int64) {
var total, count int64
instanceCategory := Spot
if !spot {
instanceCategory = OnDemand
}
log.Println(a.name, "Counting already running", instanceCategory, "instances")
for inst := range a.instances.instances() {
if *inst.Instance.State.Name == "running" {
// Count total running instances
total++
if availabilityZone == nil || *inst.Placement.AvailabilityZone == *availabilityZone {
if (spot && inst.isSpot()) || (!spot && !inst.isSpot()) {
count++
}
}
}
}
log.Println(a.name, "Found", count, instanceCategory, "instances running on a total of", total)
return count, total
}
func (a *autoScalingGroup) suspendProcesses() {
AutoScalingProcessesToSuspend := []*string{aws.String("Terminate"), aws.String("AZRebalance")}
log.Printf("Suspending processes on ASG %s", a.name)
_, err := a.region.services.autoScaling.SuspendProcesses(
&autoscaling.ScalingProcessQuery{
AutoScalingGroupName: a.AutoScalingGroupName,
ScalingProcesses: AutoScalingProcessesToSuspend,
})
if err != nil {
log.Printf("couldn't suspend processes on ASG %s ", a.name)
}
time.Sleep(30 * time.Second * a.region.conf.SleepMultiplier)
}
func (a *autoScalingGroup) resumeProcesses() {
AutoScalingProcessesToResume := []*string{aws.String("Terminate"), aws.String("AZRebalance")}
log.Printf("Resuming processes on ASG %s", a.name)
_, err := a.region.services.autoScaling.ResumeProcesses(
&autoscaling.ScalingProcessQuery{
AutoScalingGroupName: a.AutoScalingGroupName,
ScalingProcesses: AutoScalingProcessesToResume,
})
if err != nil {
log.Printf("couldn't resume processes on ASG %s ", a.name)
}
}
func (a *autoScalingGroup) hasLifecycleHook(hookType string) (bool, *autoscaling.LifecycleHook) {
result, err := a.region.services.autoScaling.
DescribeLifecycleHooks(
&autoscaling.DescribeLifecycleHooksInput{
AutoScalingGroupName: a.AutoScalingGroupName,
})
if err != nil {
log.Println(err.Error())
return false, nil
}
for _, lfh := range result.LifecycleHooks {
if *lfh.LifecycleTransition == hookType {
log.Println("Found Hook", *lfh.LifecycleHookName)
return true, lfh
}
}
return false, nil
}
func (a *autoScalingGroup) hasCodeDeployLifecycleHook() (bool, *autoscaling.LifecycleHook) {
hasLH, lHook := a.hasLifecycleHook("autoscaling:EC2_INSTANCE_LAUNCHING")
if !hasLH {
return false, nil
}
if strings.HasPrefix(*lHook.LifecycleHookName, "CodeDeploy-managed-automatic-launch-deployment-hook") {
return true, lHook
}
return false, nil
}
func (a *autoScalingGroup) triggerCodeDeployDeployment(hook *autoscaling.LifecycleHook, spotInstanceID string) error {
appName, deploymentGroupName, err := a.findDeployment(*hook.LifecycleHookName)
if err != nil {
log.Println(err.Error())
return err
}
return a.triggerDeployment(appName, deploymentGroupName)
}
func (a *autoScalingGroup) findDeployment(hookName string) (*string, *string, error) {
apps, err := a.region.services.codedeploy.ListApplications(&codedeploy.ListApplicationsInput{})
if err != nil {
log.Println(err.Error())
return nil, nil, err
}
for _, app := range apps.Applications {
log.Println("Processing CodeDeploy application:", *app)
groups, err := a.region.services.codedeploy.ListDeploymentGroups(&codedeploy.ListDeploymentGroupsInput{
ApplicationName: app,
})
if err != nil {
log.Println(err.Error())
return nil, nil, err
}
for _, group := range groups.DeploymentGroups {
log.Printf("Processing CodeDeploy deployment group %s for application %s", *group, *app)
gd, err := a.region.services.codedeploy.GetDeploymentGroup(&codedeploy.GetDeploymentGroupInput{
ApplicationName: app,
DeploymentGroupName: group,
})
if err != nil {
log.Println(err.Error())
return nil, nil, err
}
if gd.DeploymentGroupInfo.AutoScalingGroups == nil ||
len(gd.DeploymentGroupInfo.AutoScalingGroups) == 0 {
log.Printf("Deployment group %s for application %s has no ASG configuration, skipping...", *group, *app)
continue
}
asg := *gd.DeploymentGroupInfo.AutoScalingGroups[0]
log.Printf("Deployment group %s for application %s has the ASG %s and hook %s", *group, *app, *asg.Name, *asg.Hook)
if *asg.Hook == hookName && *asg.Name == a.name {
log.Printf("Found matching deployment group %s for application %s for the ASG %s and hook %s",
*group, *app, *asg.Name, *asg.Hook)
return app, group, nil
}
}
}
return nil, nil, fmt.Errorf("unable to find deployment group information")
}
func (a *autoScalingGroup) triggerDeployment(appName, deploymentGroupName *string) error {
_, err := a.region.services.codedeploy.CreateDeployment(
&codedeploy.CreateDeploymentInput{
ApplicationName: appName,
DeploymentGroupName: deploymentGroupName,
Description: aws.String("AutoSpoting triggered deployment after new Spot instance launch"),
UpdateOutdatedInstancesOnly: aws.Bool(true),
})
if err != nil {
log.Println(err.Error())
return err
}
return nil
}