alibaba/kt-connect

View on GitHub
pkg/kt/command/recover.go

Summary

Maintainability
A
3 hrs
Test Coverage
package command

import (
    "encoding/json"
    "fmt"
    "github.com/alibaba/kt-connect/pkg/kt/command/general"
    opt "github.com/alibaba/kt-connect/pkg/kt/command/options"
    "github.com/alibaba/kt-connect/pkg/kt/command/recover"
    "github.com/alibaba/kt-connect/pkg/kt/service/cluster"
    "github.com/alibaba/kt-connect/pkg/kt/util"
    "github.com/rs/zerolog/log"
    "github.com/spf13/cobra"
    appV1 "k8s.io/api/apps/v1"
    coreV1 "k8s.io/api/core/v1"
    "strings"
)

// NewRecoverCommand return new recover command
func NewRecoverCommand() *cobra.Command {
    cmd := &cobra.Command{
        Use:  "recover",
        Short: "Restore traffic of specified kubernetes service changed by exchange or mesh",
        PreRunE: func(cmd *cobra.Command, args []string) error {
            if len(args) == 0 {
                return fmt.Errorf("name of service to recover is required")
            } else if len(args) > 1 {
                return fmt.Errorf("too many service names are spcified (%s), should be one", strings.Join(args, ",") )
            }
            opt.Get().Global.UseLocalTime = true
            return general.Prepare()
        },
        RunE: func(cmd *cobra.Command, args []string) error {
            return Recover(args[0])
        },
        Example: "ktctl recover [command options]",
    }

    cmd.SetUsageTemplate(general.UsageTemplate(true))
    opt.SetOptions(cmd, cmd.Flags(), opt.Get().Recover, opt.RecoverFlags())
    return cmd
}

// Recover delete unavailing shadow pods
func Recover(serviceName string) error {
    svc, err := cluster.Ins().GetService(serviceName, opt.Get().Global.Namespace)
    if err != nil {
        log.Error().Err(err).Msgf("Failed to fetch service '%s'", serviceName)
    }

    apps, err := cluster.Ins().GetDeploymentsByLabel(svc.Spec.Selector, svc.Namespace)
    if err != nil {
        return err
    }
    pods, err := cluster.Ins().GetPodsByLabel(svc.Spec.Selector, svc.Namespace)
    if err != nil {
        return err
    }
    targetDeployment, targetPod, targetRole := fetchTargetRole(apps, pods)
    log.Debug().Msgf("Target role is: %s", targetRole)

    if svc.Annotations == nil {
        // put an empty map to avoid npe
        svc.Annotations = map[string]string{}
        if targetRole == "" {
            if svc.Spec.Selector[util.KtRole] != "" {
                log.Error().Msgf("Service %s is selecting kt pods, but cannot be recovered automatically", serviceName)
            } else {
                log.Info().Msgf("Service %s is clean and tidy, nothing would be done", serviceName)
            }
            return nil
        }
    }

    needUnlock := checkAndMarkUnlock(serviceName, svc)

    if originSelector, exists := svc.Annotations[util.KtSelector]; exists {
        var selector map[string]string
        if err = json.Unmarshal([]byte(originSelector), &selector); err != nil {
            return fmt.Errorf("service %s has %s annotation, but selecting nothing", serviceName, util.KtSelector)
        }
        log.Debug().Msgf("Recovering selector to %v", selector)
        svc.Spec.Selector = selector
        delete(svc.Annotations, util.KtSelector)
        if targetRole == util.RoleRouter {
            log.Info().Msgf("Service %s is meshed, recovering", serviceName)
            return recover.HandleMeshedByAutoService(svc, targetDeployment, targetPod)
        } else if targetRole == util.RoleExchangeShadow {
            log.Info().Msgf("Service %s is exchanged, recovering", serviceName)
            return recover.HandleExchangedBySelectorService(svc, targetDeployment, targetPod)
        } else {
            log.Info().Msgf("Service %s is selecting non-kt pods, recovering", serviceName)
            return recover.HandleServiceSelectorAndRemotePods(svc, targetDeployment, targetPod)
        }
    } else {
        if targetRole == util.RoleMeshShadow {
            log.Info().Msgf("Service %s is meshed, recovering", serviceName)
            return recover.HandleMeshedByManualService(svc, targetDeployment, targetPod)
        } else if targetRole == util.RoleExchangeShadow {
            log.Info().Msgf("Service %s is exchanged, recovering", serviceName)
            return recover.HandleExchangedByScaleService(svc, targetDeployment, targetPod)
        } else if needUnlock {
            return recover.UnlockServiceOnly(svc)
        }
    }
    log.Info().Msgf("Service %s neither exchanged nor meshed by kt, nothing would be done", serviceName)
    return nil
}

func fetchTargetRole(apps *appV1.DeploymentList, pods *coreV1.PodList) (*appV1.Deployment, *coreV1.Pod, string) {
    if len(apps.Items) > 0 {
        for _, app := range apps.Items {
            if app.Labels != nil {
                if role, ok2 := app.Labels[util.KtRole]; ok2 {
                    return &app, nil, role
                }
            }
        }
    } else if len(pods.Items) > 0 {
        for _, pod := range pods.Items {
            if pod.Labels != nil && pod.DeletionTimestamp == nil {
                if role, ok2 := pod.Labels[util.KtRole]; ok2 {
                    return nil, &pod, role
                }
            }
        }
    }
    return nil, nil, ""
}

func checkAndMarkUnlock(serviceName string, svc *coreV1.Service) bool {
    if _, exists := svc.Annotations[util.KtLock]; exists {
        log.Info().Msgf("Unlocking service %s", serviceName)
        delete(svc.Annotations, util.KtLock)
        return true
    }
    return false
}