pkg/argocd/argocd.go
// Copyright © 2023 Horizoncd.//// Licensed under the Apache License, Version 2.0 (the "License");// you may not use this file except in compliance with the License.// You may obtain a copy of the License at//// http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.// See the License for the specific language governing permissions and// limitations under the License. package argocd import ( "bufio" "bytes" "context" "crypto/tls" "encoding/json" stderrors "errors" "fmt" "io" "io/ioutil" "net/http" "strings" "time" "github.com/horizoncd/horizon/core/common" herrors "github.com/horizoncd/horizon/core/errors" perror "github.com/horizoncd/horizon/pkg/errors" "github.com/horizoncd/horizon/pkg/util/errors" "github.com/horizoncd/horizon/pkg/util/log" "github.com/horizoncd/horizon/pkg/util/wlog" "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1" "github.com/hashicorp/go-retryablehttp" corev1 "k8s.io/api/core/v1") var ( ErrResourceNotFound = stderrors.New("resource not found") ErrResponseNotOK = stderrors.New("response for argoCD is not 200 OK") ErrUnexpected = stderrors.New("unexpected error")) type ( // ArgoCD interact with ArgoCD Server ArgoCD interface { // AssembleArgoApplication assemble application by params AssembleArgoApplication(name, namespace, gitRepoURL, server string, valueFiles []string, targetRevision string) *Application // CreateApplication create an application in argoCD CreateApplication(ctx context.Context, manifest []byte) error // DeployApplication deploy an application in argoCD DeployApplication(ctx context.Context, application string, revision string) error // DeleteApplication delete an application in argoCD // You need to delete Argo Application first, then delete gitlab repo, // otherwise Argo Application can never be deleted. // ref:https://argoproj.github.io/argo-cd/faq/#ive-deletedcorrupted-my-repo-and-cant-delete-my-app DeleteApplication(ctx context.Context, application string) error // WaitApplication Wait for the app sync to complete WaitApplication(ctx context.Context, application string, uid string, status int) error // GetApplication get an application in argoCD GetApplication(ctx context.Context, application string) (*v1alpha1.Application, error) // RefreshApplication ... RefreshApplication(ctx context.Context, application string) (app *v1alpha1.Application, err error) // GetApplicationTree get resource-tree of an application in argoCD GetApplicationTree(ctx context.Context, application string) (*v1alpha1.ApplicationTree, error) // GetApplicationResource get a resource under an application in argoCD GetApplicationResource(ctx context.Context, application string, param ResourceParams, resource interface{}) error // ListResourceEvents get resource's events of an application in argoCD ListResourceEvents(ctx context.Context, application string, param EventParam) (*corev1.EventList, error) // ResumeRollout ... ResumeRollout(ctx context.Context, application string) error // GetContainerLog get standard output of container of an application in argoCD GetContainerLog(ctx context.Context, application string, param ContainerLogParams) (<-chan ContainerLog, <-chan error, error) } // EventParam the params for ListResourceEvents EventParam struct { ResourceNamespace string `json:"resourceNamespace"` ResourceUID string `json:"resourceUID"` ResourceName string `json:"resourceName"` } // ResourceParams the params for GetApplicationResource ResourceParams struct { // Group name in k8s, for example, Deployment resource is in 'apps' group Group string `json:"group,omitempty"` // Version in k8s, for example, Deployment resource has a 'v1' version Version string `json:"version,omitempty"` // the Kind of resource in k8s, for example, the kind of Deployment resource is 'Deployment' Kind string `json:"kind,omitempty"` // the namespace of a resource in k8s Namespace string `json:"namespace,omitempty"` // the resource name ResourceName string `json:"resourceName,omitempty"` } ErrorResponse struct { StreamError struct { GrpcCode int `json:"grpc_code"` HTTPCode int `json:"http_code"` Message string `json:"message"` HTTPStatus string `json:"http_status"` } `json:"error"` } // ContainerLogParams the params for GetContainerLog ContainerLogParams struct { Namespace string `json:"namespace,omitempty" yaml:"namespace,omitempty"` PodName string `json:"podName,omitempty" yaml:"podName,omitempty"` ContainerName string `json:"containerName,omitempty" yaml:"containerName,omitempty"` TailLines int `json:"tailLines,omitempty" yaml:"tailLines,omitempty"` } ContainerLog struct { Result struct { Content string `json:"content,omitempty" yaml:"content,omitempty"` Timestamp string `json:"timestamp,omitempty" yaml:"timestamp,omitempty"` } `json:"result"` }) type ( // argo holding the info for ArgoCD Server helper struct { // URL for argoCD server URL string `json:"url"` // Token the token to be used for argoCD server Token string `json:"token"` // Namespace where argoCD deployed Namespace string `yaml:"namespace"` } Hook struct{} Strategy struct { Hook Hook `json:"hook"` } DeployApplicationRequest struct { Revision string `json:"revision"` Prune bool `json:"prune"` DryRun bool `json:"dryRun"` Strategy Strategy `json:"strategy"` }) func NewArgoCD(URL, token, namespace string) ArgoCD { return &helper{URL: URL, Token: token, Namespace: namespace}} var _ ArgoCD = (*helper)(nil) const ( // http retry count _retry = 3 // http timeout _timeout = 10 * time.Second // retry backoff duration _backoff = 1 * time.Second) var ( _client = &retryablehttp.Client{ HTTPClient: &http.Client{ Transport: &http.Transport{ TLSClientConfig: &tls.Config{ InsecureSkipVerify: true, }, }, Timeout: _timeout, }, RetryMax: _retry, CheckRetry: retryablehttp.DefaultRetryPolicy, ErrorHandler: retryablehttp.PassthroughErrorHandler, Backoff: func(min, max time.Duration, attemptNum int, resp *http.Response) time.Duration { return _backoff }, }) func (h *helper) AssembleArgoApplication(name, namespace, gitRepoURL, server string, valueFiles []string, targetRevision string) *Application { const finalizer = "resources-finalizer.argocd.argoproj.io" const apiVersion = "argoproj.io/v1alpha1" const kind = "Application" const project = "default" return &Application{ APIVersion: apiVersion, Kind: kind, Metadata: ApplicationMetadata{ Finalizers: []string{finalizer}, Name: name, Namespace: h.Namespace, }, Spec: ApplicationSpec{ Source: ApplicationSource{ RepoURL: gitRepoURL, Path: ".", TargetRevision: targetRevision, Helm: &ApplicationSourceHelm{ ValueFiles: valueFiles, }, }, Destination: ApplicationDestination{ Server: server, Namespace: namespace, }, Project: project, SyncPolicy: &SyncPolicy{ SyncOptions: SyncOptions{"CreateNamespace=true"}, }, }, }} func (h *helper) CreateApplication(ctx context.Context, manifest []byte) (err error) { const op = "argo: create application" defer wlog.Start(ctx, op).StopPrint() url := h.URL + "/api/v1/applications?validate=false&upsert=false" resp, err := h.sendHTTPRequest(ctx, http.MethodPost, url, bytes.NewReader(manifest)) if err != nil { return err } defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK { return perror.Wrap(herrors.ErrHTTPRespNotAsExpected, common.Response(ctx, resp)) } return nil} func (h *helper) DeployApplication(ctx context.Context, application string, revision string) (err error) { const op = "argo: deploy application" defer wlog.Start(ctx, op).StopPrint() url := fmt.Sprintf("%v/api/v1/applications/%v/sync", h.URL, application) req := DeployApplicationRequest{ Revision: revision, Prune: true, DryRun: false, Strategy: Strategy{Hook: Hook{}}, } reqBody, err := json.Marshal(req) if err != nil { return perror.Wrap(herrors.ErrParamInvalid, err.Error()) } resp, err := h.sendHTTPRequest(ctx, http.MethodPost, url, bytes.NewReader(reqBody)) if err != nil { return err } defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK { return perror.Wrap(herrors.ErrHTTPRespNotAsExpected, common.Response(ctx, resp)) } return nil} func (h *helper) DeleteApplication(ctx context.Context, application string) (err error) { const op = "argo: delete application" defer wlog.Start(ctx, op).StopPrint() url := fmt.Sprintf("%v/api/v1/applications/%v?cascade=true", h.URL, application) resp, err := h.sendHTTPRequest(ctx, http.MethodDelete, url, nil) if err != nil { return err } defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNotFound { message := common.Response(ctx, resp) return perror.Wrapf(herrors.ErrHTTPRespNotAsExpected, "status = %s, statusCode = %d, message = %s", resp.Status, resp.StatusCode, message) } return nil} Method `helper.WaitApplication` has 9 return statements (exceeds 4 allowed).
Method `helper.WaitApplication` has a Cognitive Complexity of 23 (exceeds 20 allowed). Consider refactoring.func (h *helper) WaitApplication(ctx context.Context, cluster string, uid string, status int) (err error) { const op = "argo: wait application" defer wlog.Start(ctx, op).StopPrint() waitError := fmt.Errorf("continue to wait") waitFunc := func(i int) error { ctx, cancel := context.WithTimeout(ctx, time.Second*2) defer cancel() log.Infof(ctx, "wait for cluster<%v> to be status of %v, count=%v", cluster, status, i+1) applicationCR, err := h.RefreshApplication(ctx, cluster) if err != nil && stderrors.Is(err, context.DeadlineExceeded) { return waitError } if err == nil { if uid != "" && uid != string(applicationCR.UID) { return perror.Wrap(herrors.ErrNameConflict, "the cluster has been recreated with the same name") } if status == http.StatusOK && applicationCR.Status.Sync.Status == v1alpha1.SyncStatusCodeSynced { return nil } } else if _, ok := perror.Cause(err).(*herrors.HorizonErrNotFound); ok { if status == http.StatusNotFound { return nil } } else { return perror.Wrap(herrors.ErrHTTPRespNotAsExpected, err.Error()) } return waitError } for i := 0; i < 700; i++ { err := waitFunc(i) if err == nil { return nil } if err != waitError { return err } time.Sleep(time.Second) } return perror.Wrap(herrors.ErrDeadlineExceeded, "time out")} Similar blocks of code found in 2 locations. Consider refactoring.func (h *helper) GetApplication(ctx context.Context, application string) (applicationCRD *v1alpha1.Application, err error) { const op = "argo: get application" defer wlog.Start(ctx, op).StopPrint() url := fmt.Sprintf("%v/api/v1/applications/%v", h.URL, application) return h.getOrRefreshApplication(ctx, url)} Similar blocks of code found in 2 locations. Consider refactoring.func (h *helper) RefreshApplication(ctx context.Context, application string) (applicationCRD *v1alpha1.Application, err error) { const op = "argo: refresh application " defer wlog.Start(ctx, op).StopPrint() url := fmt.Sprintf("%v/api/v1/applications/%v?refresh=normal", h.URL, application) return h.getOrRefreshApplication(ctx, url)} Method `helper.getOrRefreshApplication` has 6 return statements (exceeds 4 allowed).func (h *helper) getOrRefreshApplication(ctx context.Context, url string) (applicationCRD *v1alpha1.Application, err error) { resp, err := h.sendHTTPRequest(ctx, http.MethodGet, url, nil) if err != nil { return nil, err } defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK { if resp.StatusCode == http.StatusNotFound { return nil, herrors.NewErrNotFound(herrors.ApplicationInArgo, fmt.Sprintf("application not found for url %s", url)) } return nil, perror.Wrap(herrors.ErrHTTPRespNotAsExpected, resp.Status) } data, err := ioutil.ReadAll(resp.Body) if err != nil { return nil, perror.Wrap(herrors.ErrReadFailed, err.Error()) } if err := json.Unmarshal(data, &applicationCRD); err != nil { return nil, perror.Wrap(herrors.ErrParamInvalid, err.Error()) } return applicationCRD, nil} Method `helper.GetApplicationTree` has 6 return statements (exceeds 4 allowed).func (h *helper) GetApplicationTree(ctx context.Context, application string) ( tree *v1alpha1.ApplicationTree, err error) { const op = "argo: get application tree" defer wlog.Start(ctx, op).StopPrint() url := fmt.Sprintf("%v/api/v1/applications/%v/resource-tree", h.URL, application) resp, err := h.sendHTTPRequest(ctx, http.MethodGet, url, nil) if err != nil { return nil, err } defer func() { _ = resp.Body.Close() }() if resp.StatusCode == http.StatusNotFound { return nil, herrors.NewErrNotFound(herrors.ApplicationInArgo, fmt.Sprintf("application %s not found", application)) } if resp.StatusCode != http.StatusOK { return nil, perror.Wrap(herrors.ErrHTTPRespNotAsExpected, common.Response(ctx, resp)) } data, err := ioutil.ReadAll(resp.Body) if err != nil { return nil, perror.Wrap(herrors.ErrReadFailed, err.Error()) } if err = json.Unmarshal(data, &tree); err != nil { return nil, perror.Wrap(herrors.ErrParamInvalid, err.Error()) } return tree, nil} Method `helper.GetApplicationResource` has 8 return statements (exceeds 4 allowed).func (h *helper) GetApplicationResource(ctx context.Context, application string, gvk ResourceParams, resource interface{}) (err error) { const op = "argo: get application resource" defer wlog.Start(ctx, op).StopPrint() url := fmt.Sprintf("%v/api/v1/applications/%v/resource?namespace=%v&resourceName=%v&group=%v&version=%v&kind=%v", h.URL, application, gvk.Namespace, gvk.ResourceName, gvk.Group, gvk.Version, gvk.Kind) resp, err := h.sendHTTPRequest(ctx, http.MethodGet, url, nil) if err != nil { return err } defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK { message := common.Response(ctx, resp) if strings.Contains(message, "not found") { return herrors.NewErrNotFound(herrors.ApplicationResourceInArgo, message) } return perror.Wrap(herrors.ErrHTTPRespNotAsExpected, message) } data, err := ioutil.ReadAll(resp.Body) if err != nil { return perror.Wrap(herrors.ErrReadFailed, err.Error()) } type manifest struct { Manifest string `json:"manifest"` } var m manifest if err = json.Unmarshal(data, &m); err != nil { return perror.Wrap(herrors.ErrParamInvalid, err.Error()) } if m.Manifest == "" || m.Manifest == "{}" { return herrors.NewErrNotFound(herrors.ApplicationManifestInArgo, "manifest is empty") } if err = json.Unmarshal([]byte(m.Manifest), &resource); err != nil { return perror.Wrap(herrors.ErrParamInvalid, err.Error()) } return nil} Method `helper.ListResourceEvents` has 5 return statements (exceeds 4 allowed).func (h *helper) ListResourceEvents(ctx context.Context, application string, param EventParam) ( eventList *corev1.EventList, err error) { const op = "argo: list resource events" defer wlog.Start(ctx, op).StopPrint() url := fmt.Sprintf("%v/api/v1/applications/%v/events?resourceUID=%v&resourceNamespace=%v&resourceName=%v", h.URL, application, param.ResourceUID, param.ResourceNamespace, param.ResourceName) resp, err := h.sendHTTPRequest(ctx, http.MethodGet, url, nil) if err != nil { return nil, err } defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK { return nil, perror.Wrap(herrors.ErrHTTPRespNotAsExpected, common.Response(ctx, resp)) } data, err := ioutil.ReadAll(resp.Body) if err != nil { return nil, perror.Wrap(herrors.ErrReadFailed, err.Error()) } if err := json.Unmarshal(data, &eventList); err != nil { return nil, perror.Wrap(herrors.ErrParamInvalid, err.Error()) } return eventList, nil} func (h *helper) ResumeRollout(ctx context.Context, application string) (err error) { const op = "argo: resume rollout" defer wlog.Start(ctx, op).StopPrint() app, err := h.GetApplication(ctx, application) if err != nil { return errors.E(op, err) } rolloutVersion := "v1alpha1" rolloutGroup := "argoproj.io" namespace := app.Spec.Destination.Namespace format := "%v/api/v1/applications/%v/resource/actions?namespace=%v&resourceName=%v&version=%s&kind=Rollout&group=%s" url := fmt.Sprintf(format, h.URL, application, namespace, application, rolloutVersion, rolloutGroup) requestBodyStr := `"resume"` resp, err := h.sendHTTPRequest(ctx, http.MethodPost, url, bytes.NewReader([]byte(requestBodyStr))) if err != nil { return err } defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK { return perror.Wrap(herrors.ErrHTTPRespNotAsExpected, common.Response(ctx, resp)) } return nil} Method `helper.GetContainerLog` has 7 return statements (exceeds 4 allowed).func (h *helper) GetContainerLog(ctx context.Context, application string, param ContainerLogParams) (lc <-chan ContainerLog, ec <-chan error, err error) { const op = "argo: get container log" defer wlog.Start(ctx, op).StopPrint() format := "%v/api/v1/applications/%v/pods/%v/logs?container=%v&follow=false&namespace=%v&tailLines=%v" url := fmt.Sprintf(format, h.URL, application, param.PodName, param.ContainerName, param.Namespace, param.TailLines) resp, err := h.sendHTTPRequest(ctx, http.MethodGet, url, nil) // nolint:bodyclose if err != nil { return nil, nil, err } if resp.StatusCode != http.StatusOK { data, err := ioutil.ReadAll(resp.Body) if err != nil { return nil, nil, perror.Wrap(herrors.ErrReadFailed, err.Error()) } _ = resp.Body.Close() var errorResponse *ErrorResponse err = json.Unmarshal(data, &errorResponse) if err != nil { return nil, nil, perror.Wrap(herrors.ErrParamInvalid, err.Error()) } return nil, nil, perror.Wrap(herrors.ErrHTTPRespNotAsExpected, fmt.Sprintf("status code = %d, message = %s", resp.StatusCode, errorResponse.StreamError.Message)) } logC := make(chan ContainerLog) errC := make(chan error) go func() { defer close(logC) defer close(errC) defer func() { _ = resp.Body.Close() }() scanner := bufio.NewScanner(resp.Body) for scanner.Scan() { var containerLog ContainerLog if err := json.Unmarshal(scanner.Bytes(), &containerLog); err != nil { errC <- perror.Wrap(herrors.ErrParamInvalid, err.Error()) return } logC <- containerLog } if err := scanner.Err(); err != nil { errC <- perror.Wrap(herrors.ErrReadFailed, err.Error()) return } }() return logC, errC, nil} func (h *helper) sendHTTPRequest(ctx context.Context, method string, url string, body io.Reader) (*http.Response, error) { log.Infof(ctx, "method: %v, url: %v", method, url) req, err := http.NewRequestWithContext(ctx, method, url, body) if err != nil { return nil, perror.Wrap(herrors.ErrParamInvalid, err.Error()) } req.Header.Add("Authorization", fmt.Sprintf("Bearer %v", h.Token)) req.Header.Add("Content-Type", "application/json") r, err := retryablehttp.FromRequest(req) if err != nil { return nil, perror.Wrap(herrors.ErrParamInvalid, "") } return _client.Do(r)}