
View on GitHub


1 day
Test Coverage
package backupcontroller

import (

    k8upv1 "github.com/k8up-io/k8up/v2/api/v1"
    batchv1 "k8s.io/api/batch/v1"
    corev1 "k8s.io/api/core/v1"
    controllerruntime "sigs.k8s.io/controller-runtime"

// BackupExecutor creates a batch.job object on the cluster. It merges all the
// information provided by defaults and the CRDs to ensure the backup has all information to run.
type BackupExecutor struct {
    backup *k8upv1.Backup

// NewBackupExecutor returns a new BackupExecutor.
func NewBackupExecutor(config job.Config) *BackupExecutor {
    return &BackupExecutor{Generic: executor.Generic{Config: config}, backup: config.Obj.(*k8upv1.Backup)}

// GetConcurrencyLimit returns the concurrent jobs limit
func (b *BackupExecutor) GetConcurrencyLimit() int {
    return cfg.Config.GlobalConcurrentBackupJobsLimit

// Execute triggers the actual batch.job creation on the cluster.
// It will also register a callback function on the observer so the PreBackupPods can be removed after the backup has finished.
func (b *BackupExecutor) Execute(ctx context.Context) error {
    err := b.createServiceAccountAndBinding(ctx)
    if err != nil {
        return err

    return b.startBackup(ctx)

type backupItem struct {
    volume      corev1.Volume
    node        string
    tolerations []corev1.Toleration
    targetPod   string

// listAndFilterPVCs lists all PVCs in the given namespace and filters them for K8up specific usage.
// Specifically, non-RWX PVCs will be skipped, as well PVCs that have the given annotation.
func (b *BackupExecutor) listAndFilterPVCs(ctx context.Context, annotation string) ([]backupItem, error) {
    log := controllerruntime.LoggerFrom(ctx)

    pods := &corev1.PodList{}
    pvcPodMap := make(map[string]corev1.Pod)
    labelselector, _ := labels.Parse("!" + job.K8uplabel)
    if err := b.Config.Client.List(ctx, pods, client.InNamespace(b.backup.Namespace), client.MatchingLabelsSelector{Selector: labelselector}); err != nil {
        return nil, fmt.Errorf("list pods: %w", err)
    for _, pod := range pods.Items {
        for _, volume := range pod.Spec.Volumes {
            if volume.PersistentVolumeClaim != nil {
                pvcPodMap[volume.PersistentVolumeClaim.ClaimName] = pod
                log.V(1).Info("pvc pod map", "claimName", volume.PersistentVolumeClaim.ClaimName, "pod", pod.GetName())

    backupItems := make([]backupItem, 0)
    claimlist := &corev1.PersistentVolumeClaimList{}

    log.Info("Listing all PVCs", "annotation", annotation)
    if err := b.fetchPVCs(ctx, claimlist); err != nil {
        return backupItems, err

    for _, pvc := range claimlist.Items {
        if pvc.Status.Phase != corev1.ClaimBound {
            log.Info("PVC is not bound", "pvc", pvc.GetName())

        backupAnnotation, hasBackupAnnotation := pvc.GetAnnotations()[annotation]

        isRWO := containsAccessMode(pvc.Spec.AccessModes, corev1.ReadWriteOnce)
        if !containsAccessMode(pvc.Spec.AccessModes, corev1.ReadWriteMany) && !isRWO && !hasBackupAnnotation {
            log.Info("PVC is neither RWX nor RWO and has no backup annotation", "pvc", pvc.GetName())

        if !hasBackupAnnotation {
            if cfg.Config.SkipWithoutAnnotation {
                log.Info("PVC doesn't have annotation and BACKUP_SKIP_WITHOUT_ANNOTATION is true, skipping PVC", "pvc", pvc.GetName())
            } else {
                log.Info("PVC doesn't have annotation, adding to list", "pvc", pvc.GetName())
        } else if shouldBackup, _ := strconv.ParseBool(backupAnnotation); !shouldBackup {
            log.Info("PVC skipped due to annotation", "pvc", pvc.GetName(), "annotation", backupAnnotation)
        } else {
            log.Info("Adding to list", "pvc", pvc.Name)

        bi := backupItem{
            volume: corev1.Volume{
                Name: pvc.Name,
                VolumeSource: corev1.VolumeSource{
                    PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
                        ClaimName: pvc.Name,

        if pod, ok := pvcPodMap[pvc.GetName()]; ok {
            bi.node = pod.Spec.NodeName
            bi.tolerations = pod.Spec.Tolerations
            bi.targetPod = pod.GetName()

            log.V(1).Info("PVC mounted at pod", "pvc", pvc.GetName(), "targetPod", bi.targetPod, "node", bi.node, "tolerations", bi.tolerations)
        } else if isRWO {
            pv := &corev1.PersistentVolume{}
            if err := b.Config.Client.Get(ctx, types.NamespacedName{Name: pvc.Spec.VolumeName}, pv); err != nil {
                log.Error(err, "unable to get PV, skipping pvc", "pvc", pvc.GetName(), "pv", pvc.Spec.VolumeName)

            bi.node = findNode(pv, pvc)
            if bi.node == "" {
                log.Info("RWO PVC not bound and no PV node affinity set, skipping", "pvc", pvc.GetName(), "affinity", pv.Spec.NodeAffinity)
            log.V(1).Info("node found in PV or PVC", "pvc", pvc.GetName(), "node", bi.node)
        } else {
            log.Info("RWX PVC with no specific node", "pvc", pvc.GetName())

        backupItems = append(backupItems, bi)

    return backupItems, nil

// findNode tries to find a PVs NodeAffinity for a specific hostname. If found will return that.
// If not it will try to return the value of the k8up.io/hostname annotation on the PVC. If this is not set, will return
// empty string.
func findNode(pv *corev1.PersistentVolume, pvc corev1.PersistentVolumeClaim) string {
    hostnameAnnotation := pvc.Annotations[k8upv1.AnnotationK8upHostname]
    if pv.Spec.NodeAffinity == nil || pv.Spec.NodeAffinity.Required == nil {
        return hostnameAnnotation
    for _, term := range pv.Spec.NodeAffinity.Required.NodeSelectorTerms {
        for _, matchExpr := range term.MatchExpressions {
            if matchExpr.Key == corev1.LabelHostname && matchExpr.Operator == corev1.NodeSelectorOpIn {
                return matchExpr.Values[0]
    return hostnameAnnotation

func (b *BackupExecutor) startBackup(ctx context.Context) error {
    ready, err := b.StartPreBackup(ctx)
    if err != nil {
        return err
    if !ready || b.backup.Status.IsWaitingForPreBackup() {
        return nil

    backupItems, err := b.listAndFilterPVCs(ctx, cfg.Config.BackupAnnotation)
    if err != nil {
        b.Generic.SetConditionFalseWithMessage(ctx, k8upv1.ConditionReady, k8upv1.ReasonRetrievalFailed, err.Error())
        return err

    type jobItem struct {
        job           *batchv1.Job
        targetPods    []string
        volumes       []corev1.Volume
        skipPreBackup bool
    backupJobs := map[string]jobItem{}
    for index, item := range backupItems {
        if _, ok := backupJobs[item.node]; !ok {
            backupJobs[item.node] = jobItem{
                job:           b.createJob(strconv.Itoa(index), item.node, item.tolerations),
                targetPods:    make([]string, 0),
                volumes:       make([]corev1.Volume, 0),
                skipPreBackup: true,

        j := backupJobs[item.node]
        if item.targetPod != "" {
            j.targetPods = append(j.targetPods, item.targetPod)
        j.volumes = append(j.volumes, item.volume)
        backupJobs[item.node] = j

    if err != nil {
        return err

    log := controllerruntime.LoggerFrom(ctx)
    podLister := kubernetes.NewPodLister(ctx, b.Client, cfg.Config.BackupCommandAnnotation, "", "", b.backup.Namespace, nil, false, log)
    backupPods, err := podLister.ListPods()
    if err != nil {
        log.Error(err, "could not list pods", "namespace", b.backup.Namespace)
        return fmt.Errorf("could not list pods: %w", err)

    if len(backupPods) > 0 {
        backupJobs["prebackup"] = jobItem{
            job:           b.createJob("prebackup", "", nil),
            targetPods:    make([]string, 0),
            volumes:       make([]corev1.Volume, 0),
            skipPreBackup: false,

    index := 0
    for _, batchJob := range backupJobs {
        _, err = controllerruntime.CreateOrUpdate(ctx, b.Generic.Config.Client, batchJob.job, func() error {
            mutateErr := job.MutateBatchJob(batchJob.job, b.backup, b.Generic.Config)
            if mutateErr != nil {
                return mutateErr

            vars, setupErr := b.setupEnvVars()
            if setupErr != nil {
                return setupErr
            batchJob.job.Spec.Template.Spec.Containers[0].Env = vars
            if len(batchJob.targetPods) > 0 {
                batchJob.job.Spec.Template.Spec.Containers[0].Env = append(batchJob.job.Spec.Template.Spec.Containers[0].Env, corev1.EnvVar{
                    Name:  "TARGET_PODS",
                    Value: strings.Join(batchJob.targetPods, ","),
            if batchJob.skipPreBackup {
                batchJob.job.Spec.Template.Spec.Containers[0].Env = append(batchJob.job.Spec.Template.Spec.Containers[0].Env, corev1.EnvVar{
                    Name:  "SKIP_PREBACKUP",
                    Value: "true",
            // each job sleeps for index seconds to avoid concurrent restic repository creation. Not the prettiest way but it works and a repository
            // is created only once usually.
            if index > 0 {
                batchJob.job.Spec.Template.Spec.Containers[0].Env = append(batchJob.job.Spec.Template.Spec.Containers[0].Env, corev1.EnvVar{
                    Name:  "SLEEP_DURATION",
                    Value: (5 * time.Second).String(),
            batchJob.job.Spec.Template.Spec.ServiceAccountName = cfg.Config.ServiceAccount
            batchJob.job.Spec.Template.Spec.Volumes = append(batchJob.volumes, utils.AttachTLSVolumes(b.backup.Spec.Volumes)...)
            batchJob.job.Spec.Template.Spec.Containers[0].VolumeMounts = append(b.newVolumeMounts(batchJob.volumes), b.attachTLSVolumeMounts()...)

            batchJob.job.Spec.Template.Spec.Containers[0].Args = b.setupArgs()

            return nil
        if err != nil {
            return fmt.Errorf("unable to createOrUpdate(%q): %w", batchJob.job.Name, err)

    return nil

func (b *BackupExecutor) createJob(name, node string, tolerations []corev1.Toleration) *batchv1.Job {
    batchJob := &batchv1.Job{}
    batchJob.Name = b.jobName(name)
    batchJob.Namespace = b.backup.Namespace
    batchJob.Spec.Template.Spec.Volumes = make([]corev1.Volume, 0)
    if node != "" {
        batchJob.Spec.Template.Spec.NodeSelector = map[string]string{
            corev1.LabelHostname: node,
    batchJob.Spec.Template.Spec.Tolerations = tolerations
    return batchJob

func (b *BackupExecutor) cleanupOldBackups(ctx context.Context) {
    b.Generic.CleanupOldResources(ctx, &k8upv1.BackupList{}, b.backup.Namespace, b.backup)

func (b *BackupExecutor) jobName(name string) string {
    return k8upv1.BackupType.String() + "-" + b.backup.Name + "-" + name

func (b *BackupExecutor) attachTLSVolumeMounts() []corev1.VolumeMount {
    var tlsVolumeMounts []corev1.VolumeMount
    if b.backup.Spec.Backend != nil && !utils.ZeroLen(b.backup.Spec.Backend.VolumeMounts) {
        tlsVolumeMounts = append(tlsVolumeMounts, *b.backup.Spec.Backend.VolumeMounts...)

    return utils.AttachTLSVolumeMounts(cfg.Config.PodVarDir, &tlsVolumeMounts)