alibaba/kt-connect

View on GitHub
pkg/kt/command/clean/common.go

Summary

Maintainability
C
7 hrs
Test Coverage
package clean

import (
    "fmt"
    "github.com/alibaba/kt-connect/pkg/kt/command/general"
    opt "github.com/alibaba/kt-connect/pkg/kt/command/options"
    "github.com/alibaba/kt-connect/pkg/kt/service/cluster"
    "github.com/alibaba/kt-connect/pkg/kt/service/dns"
    "github.com/alibaba/kt-connect/pkg/kt/service/tun"
    "github.com/alibaba/kt-connect/pkg/kt/util"
    "github.com/rs/zerolog/log"
    "io/ioutil"
    appV1 "k8s.io/api/apps/v1"
    coreV1 "k8s.io/api/core/v1"
    "os"
    "strconv"
    "strings"
)

type ResourceToClean struct {
    PodsToDelete        []string
    ServicesToDelete    []string
    ConfigMapsToDelete  []string
    DeploymentsToDelete []string
    DeploymentsToScale  map[string]int32
    ServicesToRecover   []string
    ServicesToUnlock   []string
}


func CheckClusterResources() (*ResourceToClean, error) {
    pods, cfs, apps, svcs, err := cluster.Ins().GetKtResources(opt.Get().Global.Namespace)
    if err != nil {
        return nil, err
    }
    log.Debug().Msgf("Find %d kt pods", len(pods))
    resourceToClean := ResourceToClean{
        PodsToDelete:        make([]string, 0),
        ServicesToDelete:    make([]string, 0),
        ConfigMapsToDelete:  make([]string, 0),
        DeploymentsToDelete: make([]string, 0),
        DeploymentsToScale:  make(map[string]int32),
        ServicesToRecover:   make([]string, 0),
        ServicesToUnlock:    make([]string, 0),
    }
    for _, pod := range pods {
        analysisExpiredPods(pod, opt.Get().Clean.ThresholdInMinus, &resourceToClean)
    }
    for _, cf := range cfs {
        analysisExpiredConfigmaps(cf, opt.Get().Clean.ThresholdInMinus, &resourceToClean)
    }
    for _, app := range apps {
        analysisExpiredDeployments(app, opt.Get().Clean.ThresholdInMinus, &resourceToClean)
    }
    for _, svc := range svcs {
        analysisExpiredServices(svc, opt.Get().Clean.ThresholdInMinus, &resourceToClean)
    }
    svcList, err := cluster.Ins().GetAllServiceInNamespace(opt.Get().Global.Namespace)
    analysisLockAndOrphanServices(svcList.Items, &resourceToClean)
    return &resourceToClean, nil
}

func TidyClusterResources(r *ResourceToClean) {
    log.Info().Msgf("Deleting %d unavailing kt pods", len(r.PodsToDelete))
    for _, name := range r.PodsToDelete {
        err := cluster.Ins().RemovePod(name, opt.Get().Global.Namespace)
        if err != nil {
            log.Warn().Err(err).Msgf("Failed to delete pods %s", name)
        } else {
            log.Info().Msgf(" * %s", name)
        }
    }
    log.Info().Msgf("Deleting %d unavailing config maps", len(r.ConfigMapsToDelete))
    for _, name := range r.ConfigMapsToDelete {
        err := cluster.Ins().RemoveConfigMap(name, opt.Get().Global.Namespace)
        if err != nil {
            log.Warn().Err(err).Msgf("Failed to delete config map %s", name)
        } else {
            log.Info().Msgf(" * %s", name)
        }
    }
    log.Info().Msgf("Deleting %d unavailing deployments", len(r.DeploymentsToDelete))
    for _, name := range r.DeploymentsToDelete {
        err := cluster.Ins().RemoveDeployment(name, opt.Get().Global.Namespace)
        if err != nil {
            log.Warn().Err(err).Msgf("Failed to delete deployment %s", name)
        } else {
            log.Info().Msgf(" * %s", name)
        }
    }
    log.Info().Msgf("Recovering %d scaled deployments", len(r.DeploymentsToScale))
    for name, replica := range r.DeploymentsToScale {
        err := cluster.Ins().ScaleTo(name, opt.Get().Global.Namespace, &replica)
        if err != nil {
            log.Warn().Err(err).Msgf("Failed to scale deployment %s to %d", name, replica)
        } else {
            log.Info().Msgf(" * %s", name)
        }
    }
    log.Info().Msgf("Deleting %d unavailing services", len(r.ServicesToDelete))
    for _, name := range r.ServicesToDelete {
        err := cluster.Ins().RemoveService(name, opt.Get().Global.Namespace)
        if err != nil {
            log.Warn().Err(err).Msgf("Failed to delete service %s", name)
        } else {
            log.Info().Msgf(" * %s", name)
        }
    }
    log.Info().Msgf("Recovering %d meshed services", len(r.ServicesToRecover))
    for _, name := range r.ServicesToRecover {
        general.RecoverOriginalService(name, opt.Get().Global.Namespace)
        log.Info().Msgf(" * %s", name)
    }
    log.Info().Msgf("Recovering %d locked services", len(r.ServicesToUnlock))
    for _, name := range r.ServicesToUnlock {
        if app, err := cluster.Ins().GetService(name, opt.Get().Global.Namespace); err == nil {
            delete(app.Annotations, util.KtLock)
            _, err = cluster.Ins().UpdateService(app)
            if err != nil {
                log.Warn().Err(err).Msgf("Failed to lock service %s", name)
            } else {
                log.Info().Msgf(" * %s", name)
            }
        }
    }
    log.Info().Msg("Done")
}

func PrintClusterResourcesToClean(r *ResourceToClean) {
    log.Info().Msgf("Find %d unavailing pods to delete:", len(r.PodsToDelete))
    for _, name := range r.PodsToDelete {
        log.Info().Msgf(" * %s", name)
    }
    log.Info().Msgf("Find %d unavailing config maps to delete:", len(r.ConfigMapsToDelete))
    for _, name := range r.ConfigMapsToDelete {
        log.Info().Msgf(" * %s", name)
    }
    log.Info().Msgf("Find %d unavailing deployments to delete:", len(r.DeploymentsToDelete))
    for _, name := range r.DeploymentsToDelete {
        log.Info().Msgf(" * %s", name)
    }
    log.Info().Msgf("Find %d exchanged deployments to recover:", len(r.DeploymentsToScale))
    for name, replica := range r.DeploymentsToScale {
        log.Info().Msgf(" * %s -> %d", name, replica)
    }
    log.Info().Msgf("Find %d unavailing service to delete:", len(r.ServicesToDelete))
    for _, name := range r.ServicesToDelete {
        log.Info().Msgf(" * %s", name)
    }
    log.Info().Msgf("Find %d meshed service to recover:", len(r.ServicesToRecover))
    for _, name := range r.ServicesToRecover {
        log.Info().Msgf(" * %s", name)
    }
    log.Info().Msgf("Find %d locked services to recover:", len(r.ServicesToUnlock))
    for _, name := range r.ServicesToUnlock {
        log.Info().Msgf(" * %s", name)
    }
}

func TidyLocalResources() {
    log.Debug().Msg("Cleaning up unused pid files")
    cleanPidFiles()
    log.Debug().Msg("Cleaning up unused local rsa keys")
    util.CleanRsaKeys()
    log.Debug().Msg("Cleaning up background logs")
    util.CleanBackgroundLogs()
    if util.GetDaemonRunning(util.ComponentConnect) < 0 {
        if util.IsRunAsAdmin() {
            log.Debug().Msg("Cleaning up hosts file")
            dns.DropHosts()
            log.Debug().Msg("Cleaning DNS configuration")
            dns.RestoreNameServer()
            log.Info().Msgf("Cleaning route table")
            if err := tun.Ins().RestoreRoute(); err != nil {
                log.Warn().Err(err).Msgf("Unable to clean up route table")
            }
        } else {
            log.Info().Msgf("Not %s user, DNS cleanup skipped", util.GetAdminUserName())
        }
    }
}

func cleanPidFiles() {
    files, _ := ioutil.ReadDir(util.KtPidDir)
    for _, f := range files {
        if strings.HasSuffix(f.Name(), ".pid") {
            component, pid := parseComponentAndPid(f.Name())
            if util.IsProcessExist(pid) {
                log.Debug().Msgf("Find kt %s instance with pid %d", component, pid)
            } else {
                log.Info().Msgf("Removing remnant pid file %s", f.Name())
                if err := os.Remove(fmt.Sprintf("%s/%s", util.KtPidDir, f.Name())); err != nil {
                    log.Error().Err(err).Msgf("Delete pid file %s failed", f.Name())
                }
            }
        }
    }
}

func parseComponentAndPid(pidFileName string) (string, int) {
    startPos := strings.LastIndex(pidFileName, "-")
    endPos := strings.Index(pidFileName, ".")
    if startPos > 0 && endPos > startPos {
        component := pidFileName[0 : startPos]
        pid, err := strconv.Atoi(pidFileName[startPos+1 : endPos])
        if err != nil {
            return "", -1
        }
        return component, pid
    }
    return "", -1
}

func analysisExpiredPods(pod coreV1.Pod, cleanThresholdInMinus int64, resourceToClean *ResourceToClean) {
    lastHeartBeat := util.ParseTimestamp(pod.Annotations[util.KtLastHeartBeat])
    if lastHeartBeat < 0 {
        log.Debug().Msgf("Pod %s does no have heart beat annotation", pod.Name)
    } else if isExpired(lastHeartBeat, cleanThresholdInMinus) {
        log.Debug().Msgf(" * pod %s expired, lastHeartBeat: %d ", pod.Name, lastHeartBeat)
        if pod.DeletionTimestamp == nil {
            resourceToClean.PodsToDelete = append(resourceToClean.PodsToDelete, pod.Name)
        }
        analysisConfigAnnotation(pod.Labels[util.KtRole], util.String2Map(pod.Annotations[util.KtConfig]), resourceToClean)
    }
}

func analysisExpiredConfigmaps(cf coreV1.ConfigMap, cleanThresholdInMinus int64, resourceToClean *ResourceToClean) {
    lastHeartBeat := util.ParseTimestamp(cf.Annotations[util.KtLastHeartBeat])
    if lastHeartBeat < 0 {
        log.Debug().Msgf("Configmap %s does no have heart beat annotation", cf.Name)
    } else if isExpired(lastHeartBeat, cleanThresholdInMinus) {
        resourceToClean.ConfigMapsToDelete = append(resourceToClean.ConfigMapsToDelete, cf.Name)
    }
}

func analysisExpiredDeployments(app appV1.Deployment, cleanThresholdInMinus int64, resourceToClean *ResourceToClean) {
    lastHeartBeat := util.ParseTimestamp(app.Annotations[util.KtLastHeartBeat])
    if lastHeartBeat < 0 {
        log.Debug().Msgf("Deployment %s does no have heart beat annotation", app.Name)
    } else if isExpired(lastHeartBeat, cleanThresholdInMinus) {
        resourceToClean.DeploymentsToDelete = append(resourceToClean.DeploymentsToDelete, app.Name)
        analysisConfigAnnotation(app.Labels[util.KtRole], util.String2Map(app.Annotations[util.KtConfig]), resourceToClean)
    }
}

func analysisExpiredServices(svc coreV1.Service, cleanThresholdInMinus int64, resourceToClean *ResourceToClean) {
    lastHeartBeat := util.ParseTimestamp(svc.Annotations[util.KtLastHeartBeat])
    if lastHeartBeat < 0 {
        log.Debug().Msgf("Service %s does no have heart beat annotation", svc.Name)
    } else if isExpired(lastHeartBeat, cleanThresholdInMinus) {
        resourceToClean.ServicesToDelete = append(resourceToClean.ServicesToDelete, svc.Name)
    }
}

func analysisLockAndOrphanServices(svcs []coreV1.Service, resourceToClean *ResourceToClean) {
    for _, svc := range svcs {
        if svc.Annotations == nil {
            continue
        }
        if lock, exists := svc.Annotations[util.KtLock]; exists && util.GetTime() - util.ParseTimestamp(lock) > general.LockTimeout {
            resourceToClean.ServicesToUnlock = append(resourceToClean.ServicesToUnlock, svc.Name)
        }
        if svc.Annotations[util.KtSelector] != "" {
            if svc.Spec.Selector[util.KtRole] == util.RoleRouter {
                // it's a meshed service, but router pod already gone
                if !isRouterPodExist(svc.Name, svc.Namespace) {
                    resourceToClean.ServicesToRecover = append(resourceToClean.ServicesToRecover, svc.Name)
                }
            } else {
                // it's an exchanged service, but shadow pod already gone
                if !isShadowPodExist(svc.Spec.Selector, svc.Name, svc.Namespace, util.KtExchangeContainer) {
                    resourceToClean.ServicesToRecover = append(resourceToClean.ServicesToRecover, svc.Name)
                }
            }
        }
    }
}

func analysisConfigAnnotation(role string, config map[string]string, resourceToClean *ResourceToClean) {
    log.Debug().Msgf("   role %s, config: %v", role, config)
    // scale exchange
    if role == util.RoleExchangeShadow {
        replica, _ := strconv.ParseInt(config["replicas"], 10, 32)
        app := config["app"]
        if replica > 0 && app != "" {
            resourceToClean.DeploymentsToScale[app] = int32(replica)
        }
    }
    // auto mesh and selector exchange
    if role == util.RoleRouter || role == util.RoleExchangeShadow {
        if service, exists := config["service"]; exists {
            resourceToClean.ServicesToRecover = append(resourceToClean.ServicesToRecover, service)
        }
    }
}

func isShadowPodExist(selector map[string]string, svcName, namespace, suffix string) bool {
    pods, err := cluster.Ins().GetPodsByLabel(selector, namespace)
    if err != nil {
        return false
    }
    for _, pod := range pods.Items {
        if strings.HasPrefix(pod.Name, fmt.Sprintf("%s-%s-", svcName, suffix)) {
            return true
        }
    }
    return false
}

func isRouterPodExist(svcName, namespace string) bool {
    routerPodName := svcName + util.RouterPodSuffix
    _, err := cluster.Ins().GetPod(routerPodName, namespace)
    return err == nil
}

func isExpired(lastHeartBeat, cleanThresholdInMinus int64) bool {
    return util.GetTime() - lastHeartBeat > cleanThresholdInMinus*60
}