cloudfoundry/cf-k8s-controllers

View on GitHub
api/actions/process_stats.go

Summary

Maintainability
A
1 hr
Test Coverage
package actions

import (
    "context"
    "fmt"
    "strconv"
    "time"

    "code.cloudfoundry.org/korifi/api/actions/shared"
    "code.cloudfoundry.org/korifi/api/authorization"
    "code.cloudfoundry.org/korifi/api/repositories"
    korifiv1alpha1 "code.cloudfoundry.org/korifi/controllers/api/v1alpha1"
    "code.cloudfoundry.org/korifi/tools"
    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/api/resource"
    metricsv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1"
    "sigs.k8s.io/controller-runtime/pkg/client"
)

const (
    ApplicationContainerName = "application"
    EnvCFInstanceIndex       = "CF_INSTANCE_INDEX"
    LabelGUID                = "korifi.cloudfoundry.org/guid"
    LabelVersion             = "korifi.cloudfoundry.org/version"
    stateStarting            = "STARTING"
    stateRunning             = "RUNNING"
    stateDown                = "DOWN"
    stateCrashed             = "CRASHED"
)

//counterfeiter:generate -o fake -fake-name MetricsRepository . MetricsRepository

type (
    MetricsRepository interface {
        GetMetrics(ctx context.Context, authInfo authorization.Info, namespace string, podSelector client.MatchingLabels) ([]repositories.PodMetrics, error)
    }

    Usage struct {
        Time *string
        CPU  *float64
        Mem  *int64
        Disk *int64
    }

    PodStatsRecord struct {
        Type      string
        Index     int
        State     string `default:"DOWN"`
        Usage     Usage
        MemQuota  *int64
        DiskQuota *int64
    }

    ProcessStats struct {
        processRepo shared.CFProcessRepository
        appRepo     shared.CFAppRepository
        metricsRepo MetricsRepository
    }
)

func NewProcessStats(processRepo shared.CFProcessRepository, appRepo shared.CFAppRepository, metricsRepo MetricsRepository) *ProcessStats {
    return &ProcessStats{
        processRepo: processRepo,
        appRepo:     appRepo,
        metricsRepo: metricsRepo,
    }
}

func (a *ProcessStats) FetchStats(ctx context.Context, authInfo authorization.Info, processGUID string) ([]PodStatsRecord, error) {
    processRecord, err := a.processRepo.GetProcess(ctx, authInfo, processGUID)
    if err != nil {
        return nil, err
    }

    appRecord, err := a.appRepo.GetApp(ctx, authInfo, processRecord.AppGUID)
    if err != nil {
        return nil, err
    }

    if appRecord.State == repositories.StoppedState {
        return []PodStatsRecord{
            {
                Type:  processRecord.Type,
                Index: 0,
                State: "DOWN",
            },
        }, nil
    }

    metrics, err := a.metricsRepo.GetMetrics(ctx, authInfo, appRecord.SpaceGUID, client.MatchingLabels{
        korifiv1alpha1.CFAppGUIDLabelKey: appRecord.GUID,
        LabelVersion:                     appRecord.Revision,
        LabelGUID:                        processGUID,
    })
    if err != nil {
        return nil, err
    }

    // Initialize records slice with the pod instances we expect to exist
    records := make([]PodStatsRecord, processRecord.DesiredInstances)
    for i := range records {
        records[i] = PodStatsRecord{
            Type:  processRecord.Type,
            Index: i,
            State: stateDown,
        }
    }

    for _, m := range metrics {
        index, err := extractIndex(m.Pod)
        if err != nil {
            return nil, err
        }

        podState := getPodState(m.Pod)
        if podState == stateDown {
            continue
        }

        if index >= len(records) {
            continue
        }

        records[index].State = podState

        metricsMap := aggregateContainerMetrics(m.Metrics.Containers)
        if len(metricsMap) == 0 {
            continue
        }

        if cpuQuantity, ok := metricsMap["cpu"]; ok {
            value := float64(cpuQuantity.ScaledValue(resource.Nano))
            // CF tracks CPU usage as a percentage of cores used.
            // Convert the number of nanoCPU to CPU for greatest accuracy.
            percentage := value / 1e9
            records[index].Usage.CPU = &percentage
        }

        if memQuantity, ok := metricsMap["memory"]; ok {
            value := memQuantity.Value()
            records[index].Usage.Mem = &value
        }

        if storageQuantity, ok := metricsMap["storage"]; ok {
            value := storageQuantity.Value()
            records[index].Usage.Disk = &value
        }

        time := m.Metrics.Timestamp.UTC().Format(time.RFC3339)
        records[index].Usage.Time = &time

        records[index].MemQuota = tools.PtrTo(megabytesToBytes(processRecord.MemoryMB))
        records[index].DiskQuota = tools.PtrTo(megabytesToBytes(processRecord.DiskQuotaMB))
    }
    return records, nil
}

func extractIndex(pod corev1.Pod) (int, error) {
    container, err := extractProcessContainer(pod.Spec.Containers)
    if err != nil {
        return 0, err
    }

    indexString, err := extractEnvVarFromContainer(*container, EnvCFInstanceIndex)
    if err != nil {
        return 0, err
    }

    index, err := strconv.Atoi(indexString)
    if err != nil {
        return 0, fmt.Errorf("%s is not a valid index: %w", EnvCFInstanceIndex, err)
    }

    if index < 0 {
        return 0, fmt.Errorf("%s is not a valid index: instance indexes can't be negative", EnvCFInstanceIndex)
    }

    return index, nil
}

func extractProcessContainer(containers []corev1.Container) (*corev1.Container, error) {
    for i, c := range containers {
        if c.Name == ApplicationContainerName {
            return &containers[i], nil
        }
    }
    return nil, fmt.Errorf("container %q not found", ApplicationContainerName)
}

func extractEnvVarFromContainer(container corev1.Container, envVar string) (string, error) {
    envs := container.Env
    for _, e := range envs {
        if e.Name == envVar {
            return e.Value, nil
        }
    }
    return "", fmt.Errorf("%s not set", envVar)
}

// Logic from Kubernetes in Action 2nd Edition - Ch 6.
// DOWN => !pod || !pod.conditions.PodScheduled
// CRASHED => any(pod.ContainerStatuses.State isA Terminated)
// RUNNING => pod.conditions.Ready
// STARTING => default

func getPodState(pod corev1.Pod) string {
    // return running when all containers are ready
    if podConditionStatus(pod, corev1.PodReady) {
        return stateRunning
    }

    if !podConditionStatus(pod, corev1.PodScheduled) {
        return stateDown
    }

    if podHasCrashedContainer(pod) {
        return stateCrashed
    }

    return stateStarting
}

func podHasCrashedContainer(pod corev1.Pod) bool {
    for _, cond := range pod.Status.ContainerStatuses {
        if cond.State.Waiting != nil && cond.State.Waiting.Reason == "CrashLoopBackOff" {
            return true
        }
    }

    return false
}

func podConditionStatus(pod corev1.Pod, conditionType corev1.PodConditionType) bool {
    for _, cond := range pod.Status.Conditions {
        if cond.Type == conditionType {
            return cond.Status == corev1.ConditionTrue
        }
    }

    return false
}

func aggregateContainerMetrics(containers []metricsv1beta1.ContainerMetrics) map[string]resource.Quantity {
    metrics := map[string]resource.Quantity{}

    for _, container := range containers {
        for k, v := range container.Usage {
            if value, ok := metrics[string(k)]; ok {
                value.Add(v)
                metrics[string(k)] = value
            } else {
                metrics[string(k)] = v
            }
        }
    }

    return metrics
}

func megabytesToBytes(mb int64) int64 {
    return mb * 1024 * 1024
}