horizoncd/horizon

View on GitHub
pkg/cluster/tekton/tekton_run.go

Summary

Maintainability
A
1 hr
Test Coverage
F
55%
// Copyright © 2023 Horizoncd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
 
package tekton
 
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
 
"github.com/horizoncd/horizon/core/common"
"k8s.io/apimachinery/pkg/fields"
 
herrors "github.com/horizoncd/horizon/core/errors"
"github.com/horizoncd/horizon/pkg/cluster/tekton/log"
perror "github.com/horizoncd/horizon/pkg/errors"
"github.com/horizoncd/horizon/pkg/util/wlog"
"github.com/tektoncd/cli/pkg/options"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)
 
func (t *Tekton) GetPipelineRunByID(ctx context.Context, ciEventID string) (_ *v1beta1.PipelineRun, err error) {
return t.getPipelineRunByID(ctx, ciEventID)
}
 
Method `Tekton.CreatePipelineRun` has 6 return statements (exceeds 4 allowed).
func (t *Tekton) CreatePipelineRun(ctx context.Context, pr *PipelineRun) (eventID string, err error) {
const op = "tekton: create pipelineRun"
defer wlog.Start(ctx, op).StopPrint()
 
bodyBytes, err := json.Marshal(pr)
if err != nil {
return "", perror.Wrap(herrors.ErrParamInvalid, err.Error())
}
 
resp, err := t.sendHTTPRequest(ctx, http.MethodPost, t.server, bytes.NewReader(bodyBytes))
if err != nil {
return "", perror.Wrap(herrors.ErrHTTPRequestFailed, err.Error())
}
defer func() { _ = resp.Body.Close() }()
 
// From Tekton Trigger v0.15.0, the EventListener will always respond with a 202 Accepted response code.
// Please refer: https://github.com/tektoncd/triggers/releases/tag/v0.15.0.
if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusAccepted {
message := common.Response(ctx, resp)
return "", perror.Wrapf(herrors.ErrHTTPRespNotAsExpected, "statusCode = %d, message = %s", resp.StatusCode, message)
}
respData, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", perror.Wrap(herrors.ErrReadFailed, err.Error())
}
var pipelineRunResp struct {
EventID string `json:"eventID"`
}
err = json.Unmarshal(respData, &pipelineRunResp)
if err != nil {
return "", perror.Wrap(herrors.ErrParamInvalid, err.Error())
}
 
return pipelineRunResp.EventID, nil
}
 
type patchStringValue struct {
Op string `json:"op"`
Path string `json:"path"`
Value string `json:"value"`
}
 
Method `Tekton.StopPipelineRun` has 6 return statements (exceeds 4 allowed).
func (t *Tekton) StopPipelineRun(ctx context.Context, ciEventID string) (err error) {
const op = "tekton: stop pipelineRun"
defer wlog.Start(ctx, op).StopPrint()
 
pr, err := t.GetPipelineRunByID(ctx, ciEventID)
if err != nil {
return err
}
if pr == nil {
return nil
}
 
// This judgment refers to the source code of tekton/cli:
// https://github.com/tektoncd/cli/blob/master/pkg/cmd/pipelinerun/cancel.go#L69
if len(pr.Status.Conditions) > 0 {
if pr.Status.Conditions[0].Status != corev1.ConditionUnknown {
// PipelineRun has already finished execution
return nil
}
}
 
payload := []patchStringValue{{
Op: "replace",
Path: "/spec/status",
Value: v1beta1.PipelineRunSpecStatusCancelled,
}}
 
data, err := json.Marshal(payload)
if err != nil {
return perror.Wrap(herrors.ErrParamInvalid, err.Error())
}
if _, err := t.client.Tekton.TektonV1beta1().PipelineRuns(pr.Namespace).Patch(ctx, pr.Name,
types.JSONPatchType, data, metav1.PatchOptions{}); err != nil {
return herrors.NewErrUpdateFailed(herrors.Pipelinerun, err.Error())
}
return nil
}
 
func (t *Tekton) GetPipelineRunLogByID(ctx context.Context, ciEventID string) (
_ <-chan log.Log, _ <-chan error, err error) {
pr, err := t.getPipelineRunByID(ctx, ciEventID)
 
if err != nil {
return nil, nil, perror.WithMessage(err, "failed to get pipeline run with labels")
}
 
return t.GetPipelineRunLog(ctx, pr)
}
 
func (t *Tekton) GetPipelineRunLog(ctx context.Context, pr *v1beta1.PipelineRun) (<-chan log.Log, <-chan error, error) {
logOps := &options.LogOptions{
Params: log.NewTektonParams(t.client.Dynamic, t.client.Kube, t.client.Tekton, t.namespace),
PipelineRunName: pr.Name,
}
 
lr, err := log.NewReader(log.LogTypePipeline, logOps)
if err != nil {
return nil, nil, err
}
return lr.Read()
}
 
func (t *Tekton) DeletePipelineRun(ctx context.Context, pr *v1beta1.PipelineRun) error {
if pr == nil {
return nil
}
err := t.client.Tekton.TektonV1beta1().PipelineRuns(t.namespace).
Delete(ctx, pr.Name, metav1.DeleteOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
return herrors.NewErrNotFound(herrors.Pipelinerun, err.Error())
}
return herrors.NewErrDeleteFailed(herrors.Pipelinerun, err.Error())
}
 
return nil
}
 
func (t *Tekton) getPipelineRunByID(ctx context.Context, ciEventID string) (_ *v1beta1.PipelineRun,
err error) {
selector := fields.ParseSelectorOrDie(fmt.Sprintf("%v=%v", common.TektonTriggersEventIDKey, ciEventID))
prs, err := t.client.Tekton.TektonV1beta1().PipelineRuns(t.namespace).List(ctx, metav1.ListOptions{
LabelSelector: selector.String(),
})
if err != nil {
return nil, perror.Wrap(herrors.ErrTektonInternal, err.Error())
}
if prs == nil || len(prs.Items) == 0 {
return nil, herrors.NewErrNotFound(herrors.PipelinerunInTekton,
fmt.Sprintf("failed to list pipeline with selector %s", selector.String()))
} else if len(prs.Items) > 1 {
return nil, perror.Wrap(herrors.ErrTektonInternal,
fmt.Sprintf("unexpected: selector %s match multi pipeline runs", selector.String()))
}
return &(prs.Items[0]), nil
}
 
func (t *Tekton) sendHTTPRequest(ctx context.Context, method string,
url string, body io.Reader) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, method, url, body)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json; charset=utf-8")
req.Header.Set("X-Request-Id", "")
client := &http.Client{}
return client.Do(req)
}