cloudfoundry/cf-k8s-controllers

View on GitHub
job-task-runner/controllers/taskworkload_controller.go

Summary

Maintainability
A
0 mins
Test Coverage
/*
Copyright 2022.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllers

import (
    "context"
    "fmt"
    "time"

    korifiv1alpha1 "code.cloudfoundry.org/korifi/controllers/api/v1alpha1"
    "code.cloudfoundry.org/korifi/tools"
    "code.cloudfoundry.org/korifi/tools/k8s"

    "github.com/go-logr/logr"
    batchv1 "k8s.io/api/batch/v1"
    corev1 "k8s.io/api/core/v1"
    k8serrors "k8s.io/apimachinery/pkg/api/errors"
    "k8s.io/apimachinery/pkg/api/meta"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime"
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/builder"
    "sigs.k8s.io/controller-runtime/pkg/client"
    "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

const (
    workloadContainerName = "workload"
    ServiceAccountName    = "korifi-task"
)

//counterfeiter:generate -o fake -fake-name TaskStatusGetter . TaskStatusGetter

type TaskStatusGetter interface {
    GetStatusConditions(ctx context.Context, job *batchv1.Job) ([]metav1.Condition, error)
}

// TaskWorkloadReconciler reconciles a TaskWorkload object
type TaskWorkloadReconciler struct {
    k8sClient                                  client.Client
    logger                                     logr.Logger
    scheme                                     *runtime.Scheme
    statusGetter                               TaskStatusGetter
    jobTTL                                     time.Duration
    jobTaskRunnerTemporarySetPodSeccompProfile bool
}

func NewTaskWorkloadReconciler(
    logger logr.Logger,
    k8sClient client.Client,
    scheme *runtime.Scheme,
    statusGetter TaskStatusGetter,
    jobTTL time.Duration,
    jobTaskRunnerTemporarySetPodSeccompProfile bool,
) *k8s.PatchingReconciler[korifiv1alpha1.TaskWorkload, *korifiv1alpha1.TaskWorkload] {
    taskReconciler := TaskWorkloadReconciler{
        k8sClient:    k8sClient,
        logger:       logger,
        scheme:       scheme,
        statusGetter: statusGetter,
        jobTTL:       jobTTL,
        jobTaskRunnerTemporarySetPodSeccompProfile: jobTaskRunnerTemporarySetPodSeccompProfile,
    }

    return k8s.NewPatchingReconciler[korifiv1alpha1.TaskWorkload, *korifiv1alpha1.TaskWorkload](logger, k8sClient, &taskReconciler)
}

func (r *TaskWorkloadReconciler) SetupWithManager(mgr ctrl.Manager) *builder.Builder {
    return ctrl.NewControllerManagedBy(mgr).
        For(&korifiv1alpha1.TaskWorkload{}).
        Owns(&batchv1.Job{})
}

//+kubebuilder:rbac:groups=korifi.cloudfoundry.org,resources=taskworkloads,verbs=get;list;watch;patch
//+kubebuilder:rbac:groups=korifi.cloudfoundry.org,resources=taskworkloads/status,verbs=get;patch
//+kubebuilder:rbac:groups=korifi.cloudfoundry.org,resources=taskworkloads/finalizers,verbs=update
//+kubebuilder:rbac:groups=batch,resources=jobs,verbs=create;get;list;watch
//+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch

func (r *TaskWorkloadReconciler) ReconcileResource(ctx context.Context, taskWorkload *korifiv1alpha1.TaskWorkload) (ctrl.Result, error) {
    log := logr.FromContextOrDiscard(ctx)

    taskWorkload.Status.ObservedGeneration = taskWorkload.Generation
    log.V(1).Info("set observed generation", "generation", taskWorkload.Status.ObservedGeneration)

    job, err := r.getOrCreateJob(ctx, log, taskWorkload)
    if err != nil {
        return ctrl.Result{}, err
    }

    if job == nil {
        return ctrl.Result{}, nil
    }

    if err = r.updateTaskWorkloadStatus(ctx, taskWorkload, job); err != nil {
        log.Info("failed to update task workload status", "reason", err)
        return ctrl.Result{}, err
    }

    return ctrl.Result{}, nil
}

func (r TaskWorkloadReconciler) getOrCreateJob(ctx context.Context, logger logr.Logger, taskWorkload *korifiv1alpha1.TaskWorkload) (*batchv1.Job, error) {
    job := &batchv1.Job{}

    err := r.k8sClient.Get(ctx, client.ObjectKeyFromObject(taskWorkload), job)
    if err == nil {
        return job, nil
    }

    if k8serrors.IsNotFound(err) {
        if meta.IsStatusConditionTrue(taskWorkload.Status.Conditions, korifiv1alpha1.TaskInitializedConditionType) {
            return nil, nil
        }

        return r.createJob(ctx, logger, taskWorkload)
    }

    logger.Info("getting job failed", "reason", err)
    return nil, err
}

func (r TaskWorkloadReconciler) createJob(ctx context.Context, logger logr.Logger, taskWorkload *korifiv1alpha1.TaskWorkload) (*batchv1.Job, error) {
    job := WorkloadToJob(taskWorkload, int32(r.jobTTL.Seconds()), r.jobTaskRunnerTemporarySetPodSeccompProfile)
    err := controllerutil.SetControllerReference(taskWorkload, job, r.scheme)
    if err != nil {
        return nil, err
    }

    err = r.k8sClient.Create(ctx, job)
    if err != nil {
        if k8serrors.IsAlreadyExists(err) {
            logger.V(1).Info("job for TaskWorkload already exists")
        } else {
            logger.Info("failed to create job for task workload", "reason", err)
        }
        return nil, err
    }

    return job, nil
}

func WorkloadToJob(
    taskWorkload *korifiv1alpha1.TaskWorkload,
    jobTTL int32,
    jobTaskRunnerTemporarySetPodSeccompProfile bool,
) *batchv1.Job {
    job := &batchv1.Job{
        ObjectMeta: metav1.ObjectMeta{
            Name:      taskWorkload.Name,
            Namespace: taskWorkload.Namespace,
        },
        Spec: batchv1.JobSpec{
            BackoffLimit:            tools.PtrTo(int32(0)),
            Parallelism:             tools.PtrTo(int32(1)),
            Completions:             tools.PtrTo(int32(1)),
            TTLSecondsAfterFinished: tools.PtrTo(jobTTL),
            Template: corev1.PodTemplateSpec{
                Spec: corev1.PodSpec{
                    RestartPolicy: corev1.RestartPolicyNever,
                    SecurityContext: &corev1.PodSecurityContext{
                        RunAsNonRoot: tools.PtrTo(true),
                    },
                    AutomountServiceAccountToken: tools.PtrTo(false),
                    ImagePullSecrets:             taskWorkload.Spec.ImagePullSecrets,
                    Containers: []corev1.Container{{
                        Name:      workloadContainerName,
                        Image:     taskWorkload.Spec.Image,
                        Command:   taskWorkload.Spec.Command,
                        Resources: taskWorkload.Spec.Resources,
                        Env:       taskWorkload.Spec.Env,
                        SecurityContext: &corev1.SecurityContext{
                            Capabilities: &corev1.Capabilities{
                                Drop: []corev1.Capability{"ALL"},
                            },
                            AllowPrivilegeEscalation: tools.PtrTo(false),
                            SeccompProfile: &corev1.SeccompProfile{
                                Type: corev1.SeccompProfileTypeRuntimeDefault,
                            },
                        },
                    }},
                    ServiceAccountName: ServiceAccountName,
                },
            },
        },
    }

    if jobTaskRunnerTemporarySetPodSeccompProfile {
        job.Spec.Template.Spec.SecurityContext.SeccompProfile = &corev1.SeccompProfile{
            Type: corev1.SeccompProfileTypeRuntimeDefault,
        }
    }
    return job
}

func (r *TaskWorkloadReconciler) updateTaskWorkloadStatus(ctx context.Context, taskWorkload *korifiv1alpha1.TaskWorkload, job *batchv1.Job) error {
    conditions, err := r.statusGetter.GetStatusConditions(ctx, job)
    if err != nil {
        return fmt.Errorf("failed to get status conditions for job %s:%s: %w", job.Namespace, job.Name, err)
    }

    for _, condition := range conditions {
        condition.ObservedGeneration = taskWorkload.Generation
        meta.SetStatusCondition(&taskWorkload.Status.Conditions, condition)
    }

    return nil
}