pkg/cd/cd.go
File `cd.go` has 527 lines of code (exceeds 500 allowed). Consider refactoring./*Copyright The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License");you may not use this file except in compliance with the License.You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, softwaredistributed under the License is distributed on an "AS IS" BASIS,WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.See the License for the specific language governing permissions andlimitations under the License.*/ package cd import ( "context" "encoding/json" "fmt" "net/http" "sync" applicationV1alpha1 "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1" rolloutsV1alpha1 "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" "github.com/argoproj/gitops-engine/pkg/health" kubeutil "github.com/argoproj/gitops-engine/pkg/utils/kube" "github.com/horizoncd/horizon/core/common" herrors "github.com/horizoncd/horizon/core/errors" "github.com/horizoncd/horizon/pkg/argocd" "github.com/horizoncd/horizon/pkg/cluster/gitrepo" "github.com/horizoncd/horizon/pkg/cluster/kubeclient" argocdconf "github.com/horizoncd/horizon/pkg/config/argocd" perror "github.com/horizoncd/horizon/pkg/errors" "github.com/horizoncd/horizon/pkg/regioninformers" "github.com/horizoncd/horizon/pkg/util/kube" "github.com/horizoncd/horizon/pkg/util/log" "github.com/horizoncd/horizon/pkg/util/wlog" "github.com/horizoncd/horizon/pkg/workload" "github.com/horizoncd/horizon/pkg/workload/getter" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/kubernetes") const ( _deploymentRevision = "deployment.kubernetes.io/revision" DeploymentPodTemplateHash = "pod-template-hash" _rolloutRevision = "rollout.argoproj.io/revision" RolloutPodTemplateHash = "rollouts-pod-template-hash") var ( GKPod = schema.GroupKind{ Group: "", Kind: "Pod", }) const ( // PodLifeCycleSchedule specifies whether pod has been scheduled PodLifeCycleSchedule = "PodSchedule" // PodLifeCycleInitialize specifies whether all init containers have finished PodLifeCycleInitialize = "PodInitialize" // PodLifeCycleContainerStartup specifies whether the container has passed its startup probe PodLifeCycleContainerStartup = "ContainerStartup" // PodLifeCycleContainerOnline specified whether the container has passed its postStart hook PodLifeCycleContainerOnline = "ContainerOnline" // PodLifeCycleHealthCheck specifies whether the container has passed its readiness probe PodLifeCycleHealthCheck = "HealthCheck" // PodLifeCycleContainerPreStop specifies whether the container is executing preStop hook PodLifeCycleContainerPreStop = "PreStop" LifeCycleStatusSuccess = "Success" LifeCycleStatusWaiting = "Waiting" LifeCycleStatusRunning = "Running" LifeCycleStatusAbnormal = "Abnormal" PodErrCrashLoopBackOff = "CrashLoopBackOff") //go:generate mockgen -source=$GOFILE -destination=../../mock/pkg/cd/cd_mock.go -package=mock_cdtype CD interface { CreateCluster(ctx context.Context, params *CreateClusterParams) error DeployCluster(ctx context.Context, params *DeployClusterParams) error DeleteCluster(ctx context.Context, params *DeleteClusterParams) error GetClusterState(ctx context.Context, params *GetClusterStateV2Params) (*ClusterStateV2, error) GetResourceTree(ctx context.Context, params *GetResourceTreeParams) ([]ResourceNode, error) GetStep(ctx context.Context, params *GetStepParams) (*Step, error) GetPodEvents(ctx context.Context, params *GetPodEventsParams) ([]Event, error)} type cd struct { kubeClientFactory kubeclient.Factory informerFactories *regioninformers.RegionInformers factory argocd.Factory clusterGitRepo gitrepo.ClusterGitRepo targetRevision string} Function `NewCD` has 5 arguments (exceeds 4 allowed). Consider refactoring.func NewCD(informerFactories *regioninformers.RegionInformers, clusterGitRepo gitrepo.ClusterGitRepo, argoCDMapper argocdconf.Mapper, regionArgoCDMapper argocdconf.RegionMapper, targetRevision string) CD { return &cd{ kubeClientFactory: kubeclient.Fty, informerFactories: informerFactories, factory: argocd.NewFactory(argoCDMapper, regionArgoCDMapper), clusterGitRepo: clusterGitRepo, targetRevision: targetRevision, }} Method `cd.CreateCluster` has 6 return statements (exceeds 4 allowed).func (c *cd) CreateCluster(ctx context.Context, params *CreateClusterParams) (err error) { const op = "cd: create cluster" defer wlog.Start(ctx, op).StopPrint() argo, err := c.factory.GetArgoCD(params.RegionEntity.Name, params.Environment) if err != nil { return err } // if argo application exists, return, else create it _, err = argo.GetApplication(ctx, params.Cluster) if err == nil { return nil } if _, ok := perror.Cause(err).(*herrors.HorizonErrNotFound); !ok { return err } var argoApplication = argo.AssembleArgoApplication(params.Cluster, params.Namespace, params.GitRepoURL, params.RegionEntity.Server, params.ValueFiles, c.targetRevision) manifest, err := json.Marshal(argoApplication) if err != nil { return perror.Wrap(herrors.ErrParamInvalid, err.Error()) } if err := argo.CreateApplication(ctx, manifest); err != nil { return perror.Wrap(herrors.ErrParamInvalid, err.Error()) } return nil} func (c *cd) DeployCluster(ctx context.Context, params *DeployClusterParams) (err error) { const op = "cd: deploy cluster" defer wlog.Start(ctx, op).StopPrint() argo, err := c.factory.GetArgoCD(params.Region, params.Environment) if err != nil { return perror.Wrap(herrors.ErrParamInvalid, err.Error()) } return argo.DeployApplication(ctx, params.Cluster, params.Revision)} Method `cd.DeleteCluster` has 5 return statements (exceeds 4 allowed).func (c *cd) DeleteCluster(ctx context.Context, params *DeleteClusterParams) (err error) { const op = "cd: delete cluster" defer wlog.Start(ctx, op).StopPrint() argo, err := c.factory.GetArgoCD(params.Region, params.Environment) if err != nil { return err } // 1. get application first applicationCR, err := argo.GetApplication(ctx, params.Cluster) if err != nil { if _, ok := perror.Cause(err).(*herrors.HorizonErrNotFound); ok { return nil } return } // 2. delete application if err := argo.DeleteApplication(ctx, params.Cluster); err != nil { return err } // 3. wait for application to delete completely return argo.WaitApplication(ctx, params.Cluster, string(applicationCR.UID), http.StatusNotFound)} Method `cd.GetResourceTree` has 6 return statements (exceeds 4 allowed).func (c *cd) GetResourceTree(ctx context.Context, params *GetResourceTreeParams) ([]ResourceNode, error) { const op = "cd: get resource tree" defer wlog.Start(ctx, op).StopPrint() argo, err := c.factory.GetArgoCD(params.RegionEntity.Name, params.Environment) if err != nil { return nil, err } // get resourceTreeInArgo resourceTreeInArgo, err := argo.GetApplicationTree(ctx, params.Cluster) if err != nil { return nil, err } resourceTree := make([]ResourceNode, 0, len(resourceTreeInArgo.Nodes)) pd, err := workload.GetAbility(GKPod) if err != nil { return nil, err } gt := getter.New(pd) for _, node := range resourceTreeInArgo.Nodes { n := ResourceNode{ResourceNode: node} if n.Kind == "Pod" { var podDetail corev1.Pod err = c.informerFactories.GetDynamicFactory(params.RegionEntity.ID, func(factory dynamicinformer.DynamicSharedInformerFactory) error { log.Debugf(ctx, "get pod detail: %v", node.Name) pods, err := gt.ListPods(&node, factory) if err != nil { log.Debugf(ctx, "failed to get pod detail: %v", err) return err } log.Debugf(ctx, "get pod detail success: %v", node.Name) podDetail = pods[0] return nil }) if err != nil { log.Errorf(ctx, "failed to get pod detail: %v", err) continue } t := Compact(podDetail) n.PodDetail = &t } resourceTree = append(resourceTree, n) } return resourceTree, nil} Method `cd.GetStep` has 51 lines of code (exceeds 50 allowed). Consider refactoring.
Method `cd.GetStep` has 10 return statements (exceeds 4 allowed).func (c *cd) GetStep(ctx context.Context, params *GetStepParams) (*Step, error) { const op = "cd: get step" defer wlog.Start(ctx, op).StopPrint() _, kubeClient, err := c.kubeClientFactory.GetByK8SServer(params.RegionEntity.Server, params.RegionEntity.Certificate) if err != nil { return nil, err } argo, err := c.factory.GetArgoCD(params.RegionEntity.Name, params.Environment) if err != nil { return nil, err } // get resourceTreeInArgo resourceTreeInArgo, err := argo.GetApplicationTree(ctx, params.Cluster) if err != nil { return nil, err } ifContinue := true step := (*workload.Step)(nil) c.traverseResourceTree(resourceTreeInArgo, func(node *ResourceTreeNode) bool { if !ifContinue { return ifContinue } workload.LoopAbilities(func(workload workload.Workload) bool { if !workload.MatchGK(schema.GroupKind{Group: node.Group, Kind: node.Kind}) { return true } gt := getter.New(workload) step, err = gt.GetSteps(node.ResourceNode, kubeClient) if err != nil { return true } ifContinue = false return false }) return ifContinue }) // step if step == nil { return &Step{ Index: 0, Total: 0, Replicas: []int{}, ManualPaused: false, AutoPromote: false, }, nil } return &Step{ Index: step.Index, Total: step.Total, Replicas: step.Replicas, ManualPaused: step.ManualPaused, AutoPromote: step.AutoPromote, Extra: step.Extra, }, nil} // GetClusterState fetches status of clusterMethod `cd.GetClusterState` has 70 lines of code (exceeds 50 allowed). Consider refactoring.
Method `cd.GetClusterState` has 15 return statements (exceeds 4 allowed).
Method `cd.GetClusterState` has a Cognitive Complexity of 24 (exceeds 20 allowed). Consider refactoring.func (c *cd) GetClusterState(ctx context.Context, params *GetClusterStateV2Params) (*ClusterStateV2, error) { const op = "cd: get cluster status" defer wlog.Start(ctx, op).StopPrint() argo, err := c.factory.GetArgoCD(params.RegionEntity.Name, params.Environment) if err != nil { return nil, err } // get application status argoApp, err := argo.GetApplication(ctx, params.Cluster) if err != nil { return nil, err } if argoApp.Status.Health.Status == "" { return nil, perror.Wrapf( herrors.NewErrNotFound(herrors.ClusterStateInArgo, "cluster not found in argo"), "failed to get cluster status from argo: app name = %v", params.Cluster) } status := &ClusterStateV2{ Status: string(argoApp.Status.Health.Status), } if status.Status != string(health.HealthStatusHealthy) { return status, nil } if argoApp.Status.Sync.Status != applicationV1alpha1.SyncStatusCodeSynced { status.Status = string(health.HealthStatusProgressing) return status, nil } lastConfigCommit, err := c.clusterGitRepo.GetConfigCommit(ctx, params.Application, params.Cluster) if err != nil { return nil, err } if lastConfigCommit.Master != argoApp.Status.Sync.Revision { status.Status = string(health.HealthStatusProgressing) log.Warningf(ctx, "current revision(%s) is not consistent with gitops repo commit(%s)", argoApp.Status.Sync.Revision, lastConfigCommit.Master) return status, nil } _, kubeClient, err := c.kubeClientFactory.GetByK8SServer(params.RegionEntity.Server, params.RegionEntity.Certificate) if err != nil { return nil, err } // get resourceTreeInArgo resourceTreeInArgo, err := argo.GetApplicationTree(ctx, params.Cluster) if err != nil { return nil, err } if argoApp.Status.Health.Status == health.HealthStatusHealthy { isHealthy := true c.traverseResourceTree(resourceTreeInArgo, func(node *ResourceTreeNode) bool { if !isHealthy { return false } workload.LoopAbilities(func(workload workload.Workload) bool { if !workload.MatchGK(schema.GroupKind{Group: node.Group, Kind: node.Kind}) { return true } gt := getter.New(workload) nodeHealthy, err := gt.IsHealthy(node.ResourceNode, kubeClient) if err != nil { return true } log.Debugf(ctx, "[cd get status v2] node(%v) kind(%v) isHealthy(%v)", node.Name, node.Kind, nodeHealthy) isHealthy = isHealthy && nodeHealthy return isHealthy }) // break if isHealthy is false return isHealthy }) if !isHealthy { status.Status = string(health.HealthStatusProgressing) } } return status, nil} // Deprecated: using GetClusterState insteadMethod `cd.GetClusterStateV1` has a Cognitive Complexity of 70 (exceeds 20 allowed). Consider refactoring.
Method `cd.GetClusterStateV1` has 141 lines of code (exceeds 50 allowed). Consider refactoring.
Method `cd.GetClusterStateV1` has 15 return statements (exceeds 4 allowed).func (c *cd) GetClusterStateV1(ctx context.Context, params *GetClusterStateParams) (clusterState *ClusterState, err error) { const op = "cd: get cluster status" defer wlog.Start(ctx, op).StopPrint() argo, err := c.factory.GetArgoCD(params.RegionEntity.Name, params.Environment) if err != nil { return nil, err } _, kubeClient, err := c.kubeClientFactory.GetByK8SServer(params.RegionEntity.Server, params.RegionEntity.Certificate) if err != nil { return nil, err } clusterState = &ClusterState{Versions: map[string]*ClusterVersion{}} // get application status argoApp, err := argo.GetApplication(ctx, params.Cluster) if err != nil { return nil, err } namespace := argoApp.Spec.Destination.Namespace // namespace = argoApp.Spec.Destination.Namespace clusterState.Status = argoApp.Status.Health.Status if clusterState.Status == "" { return nil, herrors.NewErrNotFound(herrors.ClusterStateInArgo, "clusterState.State == \"\"") } if clusterState.Status == health.HealthStatusUnknown { clusterState.Status = health.HealthStatusDegraded } else if clusterState.Status == health.HealthStatusMissing { clusterState.Status = health.HealthStatusProgressing } // TODO: rollout coupling var rollout *rolloutsV1alpha1.Rollout labelSelector := fields.ParseSelectorOrDie(fmt.Sprintf("%v=%v", common.ClusterClusterLabelKey, params.Cluster)) if err := argo.GetApplicationResource(ctx, params.Cluster, argocd.ResourceParams{ Group: "argoproj.io", Version: "v1alpha1", Kind: "Rollout", Namespace: argoApp.Spec.Destination.Namespace, ResourceName: params.Cluster, }, &rollout); err != nil { if _, ok := perror.Cause(err).(*herrors.HorizonErrNotFound); ok { // get pods by resourceTree var ( clusterPodMap = map[string]*ClusterPod{} podMap = map[string]corev1.Pod{} ) resourceTree, err := argo.GetApplicationTree(ctx, params.Cluster) if err != nil { return nil, err } // application with deployment may be serverless if !resourceTreeContains(resourceTree, kubeutil.DeploymentKind) { allPods, err := kube.GetPods(ctx, kubeClient.Basic, namespace, labelSelector.String()) if err != nil { return nil, err } for _, pod := range allPods { podMap[pod.Name] = pod } for _, node := range resourceTree.Nodes { if node.Kind == kubeutil.PodKind {Avoid deeply nested control flow statements. if _, ok := podMap[node.Name]; !ok { return nil, herrors.NewErrNotFound(herrors.PodsInK8S, fmt.Sprintf("pod %s does not exist", node.Name)) } clusterPodMap[node.Name] = podMapping(podMap[node.Name]) } } clusterState.PodTemplateHash = "default" clusterState.PodTemplateHashKey = "default" clusterState.Replicas = len(clusterPodMap) clusterState.Versions["default"] = &ClusterVersion{ Replicas: len(clusterPodMap), Pods: clusterPodMap, } clusterState.Step = &Step{ Index: 0, Total: 1, Replicas: []int{1}, } return clusterState, nil } } else { return nil, perror.WithMessagef(err, "failed to get rollout for cluster %s", params.Cluster) } } clusterState.Step = getStep(rollout) if rollout != nil { desiredReplicas := 1 if rollout.Spec.Replicas != nil { desiredReplicas = int(*rollout.Spec.Replicas) } clusterState.DesiredReplicas = &desiredReplicas clusterState.ManualPaused = rollout.Spec.Paused } var latestReplicaSet *appsv1.ReplicaSet rss, err := kube.GetReplicaSets(ctx, kubeClient.Basic, namespace, labelSelector.String()) if err != nil { return nil, err } else if len(rss) == 0 { return nil, herrors.NewErrNotFound(herrors.ReplicasSetInK8S, "ReplicaSet instance not found") } for i := range rss { rs := &rss[i] if latestReplicaSet == nil || CompareRevision(ctx, rs, latestReplicaSet) { latestReplicaSet = rs } _, hash := getPodTemplateHash(rs) clusterState.Versions[hash] = &ClusterVersion{ Pods: map[string]*ClusterPod{}, Revision: getRevision(rs), } } // set revision, podTemplateHash clusterState.PodTemplateHashKey, clusterState.PodTemplateHash = getPodTemplateHash(latestReplicaSet) clusterState.Revision = getRevision(latestReplicaSet) if clusterState.PodTemplateHash == "" { return nil, herrors.NewErrNotFound(herrors.ClusterStateInArgo, "clusterState.PodTemplateHash == ''") } if clusterState.PodTemplateHashKey == DeploymentPodTemplateHash { labelSelector := fields.ParseSelectorOrDie( fmt.Sprintf("%v=%v", common.ClusterClusterLabelKey, params.Cluster)) deploymentList, err := kube.GetDeploymentList(ctx, kubeClient.Basic, namespace, labelSelector.String()) if err != nil { return nil, err } var latestDeployment *appsv1.Deployment for i := range deploymentList { if latestDeployment == nil || deploymentList[i].CreationTimestamp.After(latestDeployment.CreationTimestamp.Time) { latestDeployment = &deploymentList[i] } } // Borrowed at kubernetes/kubectl/rollout_status.go if latestDeployment != nil { if latestDeployment.Generation <= latestDeployment.Status.ObservedGeneration { cond := getDeploymentCondition(latestDeployment.Status, appsv1.DeploymentProgressing) if cond != nil && cond.Reason == "ProgressDeadlineExceeded" { // By default, if a Deployment fails to complete a rollover within ten minutes, // Then the Deployment's health status is HealthStatusDegraded. clusterState.Status = health.HealthStatusDegraded } } else { // If the Deployment has an update that has not been processed by the Deployment Controller,, // Then the Deployment's health status is HealthStatusProgressing. clusterState.Status = health.HealthStatusProgressing } } } if err := c.paddingPodAndEventInfo(ctx, params.Cluster, namespace, kubeClient.Basic, clusterState); err != nil { return nil, err } return clusterState, nil} Method `cd.GetPodEvents` has 5 return statements (exceeds 4 allowed).func (c *cd) GetPodEvents(ctx context.Context, params *GetPodEventsParams) (events []Event, err error) { const op = "cd: get cluster pod events" defer wlog.Start(ctx, op).StopPrint() _, kubeClient, err := c.kubeClientFactory.GetByK8SServer(params.RegionEntity.Server, params.RegionEntity.Certificate) if err != nil { return nil, err } resourceTree, err := c.GetResourceTree(ctx, &GetResourceTreeParams{ Environment: params.Environment, Cluster: params.Cluster, RegionEntity: params.RegionEntity, }) if err != nil { return nil, err } for i := range resourceTree { pod := resourceTree[i].PodDetail if pod != nil && pod.Metadata.Namespace == params.Namespace && pod.Metadata.Name == params.Pod { k8sEvents, err := kube.GetPodEvents(ctx, kubeClient.Basic, params.Namespace, params.Pod) if err != nil { return nil, err } for _, event := range k8sEvents { eventTimeStamp := metav1.Time{Time: event.EventTime.Time} if eventTimeStamp.IsZero() { eventTimeStamp = event.FirstTimestamp } events = append(events, Event{ Type: event.Type, Reason: event.Reason, Message: event.Message, Count: event.Count, EventTimestamp: eventTimeStamp, }) } return events, nil } } return nil, herrors.NewErrNotFound(herrors.PodsInK8S, "pod does not exist")} // Deprecatedfunc (c *cd) paddingPodAndEventInfo(ctx context.Context, cluster, namespace string, kubeClient kubernetes.Interface, clusterState *ClusterState) error { labelSelector := fields.ParseSelectorOrDie(fmt.Sprintf("%v=%v", common.ClusterClusterLabelKey, cluster)) var pods []corev1.Pod var events map[string][]*corev1.Event var err1, err2 error var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() pods, err1 = kube.GetPods(ctx, kubeClient, namespace, labelSelector.String()) }() go func() { defer wg.Done() events, err2 = kube.GetEvents(ctx, kubeClient, namespace) }() wg.Wait() for _, e := range []error{err1, err2} { if e != nil { return e } } for i := range pods { pod := &pods[i] podEvents := events[fmt.Sprintf("%v-%v-%v", pod.Name, pod.UID, pod.Namespace)] if err := parsePod(ctx, clusterState, pod, podEvents); err != nil { log.Info(ctx, err) continue } else { clusterState.Replicas++ } } return nil}