cloudfoundry/cf-k8s-controllers

View on GitHub
api/repositories/build_repository.go

Summary

Maintainability
A
0 mins
Test Coverage
B
80%
package repositories

import (
    "bufio"
    "context"
    "fmt"
    "io"
    "sort"
    "strings"
    "time"

    "code.cloudfoundry.org/korifi/api/authorization"
    apierrors "code.cloudfoundry.org/korifi/api/errors"
    korifiv1alpha1 "code.cloudfoundry.org/korifi/controllers/api/v1alpha1"

    "github.com/google/uuid"
    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/api/meta"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/labels"
    k8sclient "k8s.io/client-go/kubernetes"
    "sigs.k8s.io/controller-runtime/pkg/client"
)

const (
    BuildStateStaging = "STAGING"
    BuildStateStaged  = "STAGED"
    BuildStateFailed  = "FAILED"

    StagingConditionType   = "Staging"
    SucceededConditionType = "Succeeded"

    BuildResourceType    = "Build"
    stagingLogSourceType = "STG"
)

type BuildRecord struct {
    GUID            string
    State           string
    CreatedAt       time.Time
    UpdatedAt       *time.Time
    StagingErrorMsg string
    StagingMemoryMB int
    StagingDiskMB   int
    Lifecycle       Lifecycle
    PackageGUID     string
    DropletGUID     string
    AppGUID         string
    Labels          map[string]string
    Annotations     map[string]string
    ImageRef        string
}

type LogRecord struct {
    Message   string
    Timestamp int64
    Header    string
    Tags      map[string]string
}

type BuildRepo struct {
    namespaceRetriever NamespaceRetriever
    userClientFactory  authorization.UserK8sClientFactory
}

func NewBuildRepo(
    namespaceRetriever NamespaceRetriever,
    userClientFactory authorization.UserK8sClientFactory,
) *BuildRepo {
    return &BuildRepo{
        namespaceRetriever: namespaceRetriever,
        userClientFactory:  userClientFactory,
    }
}

func (b *BuildRepo) GetBuild(ctx context.Context, authInfo authorization.Info, buildGUID string) (BuildRecord, error) {
    ns, err := b.namespaceRetriever.NamespaceFor(ctx, buildGUID, BuildResourceType)
    if err != nil {
        return BuildRecord{}, err
    }

    userClient, err := b.userClientFactory.BuildClient(authInfo)
    if err != nil {
        return BuildRecord{}, fmt.Errorf("get-build failed to build user client: %w", err)
    }

    build := korifiv1alpha1.CFBuild{}
    if err := userClient.Get(ctx, client.ObjectKey{Namespace: ns, Name: buildGUID}, &build); err != nil {
        return BuildRecord{}, fmt.Errorf("failed to get build: %w", apierrors.FromK8sError(err, BuildResourceType))
    }

    return b.cfBuildToBuildRecord(build), nil
}

func (b *BuildRepo) GetLatestBuildByAppGUID(ctx context.Context, authInfo authorization.Info, spaceGUID string, appGUID string) (BuildRecord, error) {
    userClient, err := b.userClientFactory.BuildClient(authInfo)
    if err != nil { // Untested
        return BuildRecord{}, err
    }
    labelSelector, err := labels.ValidatedSelectorFromSet(map[string]string{
        korifiv1alpha1.CFAppGUIDLabelKey: appGUID,
    })
    if err != nil { // Untested
        return BuildRecord{}, err
    }

    listOpts := &client.ListOptions{Namespace: spaceGUID, LabelSelector: labelSelector}
    buildList := &korifiv1alpha1.CFBuildList{}

    err = userClient.List(ctx, buildList, listOpts)
    if err != nil {
        return BuildRecord{}, apierrors.FromK8sError(err, BuildResourceType)
    }

    if len(buildList.Items) == 0 {
        return BuildRecord{}, apierrors.NewNotFoundError(fmt.Errorf("builds for app %q in space %q not found", appGUID, spaceGUID), BuildResourceType)
    }

    sortedBuilds := orderBuilds(buildList.Items)

    return b.cfBuildToBuildRecord(sortedBuilds[0]), nil
}

func orderBuilds(builds []korifiv1alpha1.CFBuild) []korifiv1alpha1.CFBuild {
    sort.Slice(builds, func(i, j int) bool {
        return !builds[i].CreationTimestamp.Before(&builds[j].CreationTimestamp)
    })
    return builds
}

func (b *BuildRepo) GetBuildLogs(ctx context.Context, authInfo authorization.Info, spaceGUID string, buildGUID string) ([]LogRecord, error) {
    userClient, err := b.userClientFactory.BuildK8sClient(authInfo)
    if err != nil { // Untested
        return []LogRecord{}, err
    }
    logWriter := new(strings.Builder)
    err = NewBuildLogsClient(userClient).GetImageLogs(ctx, logWriter, buildGUID, spaceGUID)
    if err != nil {
        return nil, err
    }
    buildLogs := strings.Split(strings.Trim(logWriter.String(), "\n"), "\n")
    toReturn := make([]LogRecord, 0, len(buildLogs))

    for _, log := range buildLogs {
        logLine, logTime, _ := parseRFC3339NanoTime(log)

        toReturn = append(toReturn, LogRecord{
            Message:   logLine,
            Timestamp: logTime,
            Tags: map[string]string{
                "source_type": stagingLogSourceType,
            },
        })
    }

    return toReturn, nil
}

func (b *BuildRepo) cfBuildToBuildRecord(cfBuild korifiv1alpha1.CFBuild) BuildRecord {
    toReturn := BuildRecord{
        GUID:            cfBuild.Name,
        State:           BuildStateStaging,
        CreatedAt:       cfBuild.CreationTimestamp.Time,
        UpdatedAt:       getLastUpdatedTime(&cfBuild),
        StagingErrorMsg: "",
        StagingMemoryMB: cfBuild.Spec.StagingMemoryMB,
        StagingDiskMB:   cfBuild.Spec.StagingDiskMB,
        Lifecycle: Lifecycle{
            Type: string(cfBuild.Spec.Lifecycle.Type),
            Data: LifecycleData{
                Buildpacks: []string{},
                Stack:      cfBuild.Spec.Lifecycle.Data.Stack,
            },
        },
        PackageGUID: cfBuild.Spec.PackageRef.Name,
        DropletGUID: "",
        AppGUID:     cfBuild.Spec.AppRef.Name,
        Labels:      cfBuild.Labels,
        Annotations: cfBuild.Annotations,
    }

    if cfBuild.Spec.Lifecycle.Type == "docker" {
        toReturn.Lifecycle.Data = LifecycleData{}
    }

    if cfBuild.Spec.Lifecycle.Data.Buildpacks != nil {
        toReturn.Lifecycle.Data.Buildpacks = cfBuild.Spec.Lifecycle.Data.Buildpacks
    }

    stagingStatus := getConditionValue(&cfBuild.Status.Conditions, StagingConditionType)
    succeededStatus := getConditionValue(&cfBuild.Status.Conditions, SucceededConditionType)
    // TODO: Consider moving this logic to CRDs repo in case Status Conditions change later?
    if stagingStatus == metav1.ConditionFalse {
        switch succeededStatus {
        case metav1.ConditionTrue:
            toReturn.State = BuildStateStaged
            toReturn.DropletGUID = cfBuild.Name
        case metav1.ConditionFalse:
            toReturn.State = BuildStateFailed
            conditionStatus := meta.FindStatusCondition(cfBuild.Status.Conditions, SucceededConditionType)
            toReturn.StagingErrorMsg = conditionStatus.Message
        }
    }

    return toReturn
}

func (b *BuildRepo) CreateBuild(ctx context.Context, authInfo authorization.Info, message CreateBuildMessage) (BuildRecord, error) {
    cfBuild := message.toCFBuild()
    userClient, err := b.userClientFactory.BuildClient(authInfo)
    if err != nil {
        return BuildRecord{}, fmt.Errorf("failed to build user k8s client: %w", err)
    }

    if err := userClient.Create(ctx, &cfBuild); err != nil {
        return BuildRecord{}, apierrors.FromK8sError(err, BuildResourceType)
    }

    return b.cfBuildToBuildRecord(cfBuild), nil
}

type CreateBuildMessage struct {
    AppGUID         string
    PackageGUID     string
    SpaceGUID       string
    StagingMemoryMB int
    StagingDiskMB   int
    Lifecycle       Lifecycle
    Labels          map[string]string
    Annotations     map[string]string
}

func (m CreateBuildMessage) toCFBuild() korifiv1alpha1.CFBuild {
    guid := uuid.NewString()
    return korifiv1alpha1.CFBuild{
        ObjectMeta: metav1.ObjectMeta{
            Name:        guid,
            Namespace:   m.SpaceGUID,
            Labels:      m.Labels,
            Annotations: m.Annotations,
        },
        Spec: korifiv1alpha1.CFBuildSpec{
            PackageRef: corev1.LocalObjectReference{
                Name: m.PackageGUID,
            },
            AppRef: corev1.LocalObjectReference{
                Name: m.AppGUID,
            },
            StagingMemoryMB: m.StagingMemoryMB,
            StagingDiskMB:   m.StagingDiskMB,
            Lifecycle: korifiv1alpha1.Lifecycle{
                Type: korifiv1alpha1.LifecycleType(m.Lifecycle.Type),
                Data: korifiv1alpha1.LifecycleData{
                    Buildpacks: m.Lifecycle.Data.Buildpacks,
                    Stack:      m.Lifecycle.Data.Stack,
                },
            },
        },
    }
}

// This code was copied from https://github.com/pivotal/kpack/blob/37764d9940b2f45b1c7acb6c2cd33cf4f489c03b/pkg/logs/build_logs.go
// with minor modifications.
type readyContainer struct {
    podName       string
    containerName string
    namespace     string
}

type BuildLogsClient struct {
    k8sClient k8sclient.Interface
    processed map[readyContainer]interface{}
}

func NewBuildLogsClient(k8sClient k8sclient.Interface) *BuildLogsClient {
    return &BuildLogsClient{
        k8sClient: k8sClient,
        processed: make(map[readyContainer]interface{}),
    }
}

const BuildWorkloadLabelKey = "korifi.cloudfoundry.org/build-workload-name"

func (c *BuildLogsClient) GetImageLogs(ctx context.Context, writer io.Writer, buildGUID, namespace string) error {
    return c.getPodLogs(ctx, writer, namespace, metav1.ListOptions{
        Watch:         false,
        LabelSelector: fmt.Sprintf("%s=%s", BuildWorkloadLabelKey, buildGUID),
    }, false)
}

func (c *BuildLogsClient) getPodLogs(ctx context.Context, writer io.Writer, namespace string, listOptions metav1.ListOptions, follow bool) error {
    readyContainers, err := c.getContainers(ctx, namespace, listOptions)
    if err != nil {
        return err
    }

    for _, container := range readyContainers {
        err := c.streamLogsForContainer(ctx, writer, container, follow)
        if err != nil {
            return err
        }
    }

    return nil
}

func (c *BuildLogsClient) getContainers(ctx context.Context, namespace string, listOptions metav1.ListOptions) ([]readyContainer, error) {
    readyContainers := make([]readyContainer, 0)
    pods, err := c.k8sClient.CoreV1().Pods(namespace).List(ctx, listOptions)
    if err != nil {
        return nil, err
    }

    for _, pod := range pods.Items {
        for _, c := range pod.Status.InitContainerStatuses {
            if c.State.Waiting == nil {
                readyContainers = append(readyContainers, readyContainer{
                    podName:       pod.Name,
                    containerName: c.Name,
                    namespace:     pod.Namespace,
                })
            }
        }

        for _, c := range pod.Status.ContainerStatuses {
            if c.State.Waiting == nil {
                readyContainers = append(readyContainers, readyContainer{
                    podName:       pod.Name,
                    containerName: c.Name,
                    namespace:     pod.Namespace,
                })
            }
        }
    }

    return readyContainers, nil
}

func (c *BuildLogsClient) streamLogsForContainer(ctx context.Context, writer io.Writer, readyContainer readyContainer, follow bool) error {
    if _, alreadyProcessed := c.processed[readyContainer]; alreadyProcessed {
        return nil
    }
    c.processed[readyContainer] = nil

    logReadCloser, err := c.k8sClient.CoreV1().Pods(readyContainer.namespace).GetLogs(readyContainer.podName, &corev1.PodLogOptions{
        Container:  readyContainer.containerName,
        Follow:     follow,
        Timestamps: true,
    }).Stream(ctx)
    if err != nil {
        return err
    }
    defer logReadCloser.Close()

    r := bufio.NewReader(logReadCloser)
    for {
        select {
        case <-ctx.Done():
            return nil
        default:
            line, err := r.ReadBytes('\n')
            if err != nil {
                if err == io.EOF {
                    return nil
                }

                return err
            }

            _, err = writer.Write(line)
            if err != nil {
                return err
            }
        }
    }
}