vorteil/direktiv

View on GitHub
pkg/service/helper.go

Summary

Maintainability
B
5 hrs
Test Coverage
package service

import (
    "fmt"
    "strconv"
    "strings"

    "github.com/direktiv/direktiv/pkg/core"
    "github.com/mattn/go-shellwords"
    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/api/resource"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    servingv1 "knative.dev/serving/pkg/apis/serving/v1"
)

const (
    direktivCmdExecValue  = "/usr/share/direktiv/direktiv-cmd"
    direktivProxyHTTPS    = "HTTPS_PROXY"
    direktivProxyHTTP     = "HTTP_PROXY"
    direktivProxyNO       = "NO_PROXY"
    direktivOpentelemetry = "DIREKTIV_OTLP"
    direktivFlowEndpoint  = "DIREKTIV_FLOW_ENDPOINT"
    direktivDebug         = "DIREKTIV_DEBUG"
)

func buildService(c *core.Config, sv *core.ServiceFileData, registrySecrets []corev1.LocalObjectReference) (*servingv1.Service, error) {
    containers, err := buildContainers(c, sv)
    if err != nil {
        return nil, err
    }

    nonRoot := false

    initContainers := []corev1.Container{}
    if sv.Cmd == direktivCmdExecValue {
        initContainers = append(initContainers, corev1.Container{
            Name:  "init",
            Image: c.KnativeSidecar,
            Env: []corev1.EnvVar{
                {
                    Name:  "DIREKTIV_APP",
                    Value: "init",
                },
            },
            VolumeMounts: []corev1.VolumeMount{
                {
                    Name:      "bindir",
                    MountPath: "/usr/share/direktiv/",
                },
            },
        })
    }

    svc := &servingv1.Service{
        TypeMeta: metav1.TypeMeta{
            APIVersion: "serving.knative.dev/v1",
            Kind:       "Service",
        },
        ObjectMeta: buildServiceMeta(c, sv),
        Spec: servingv1.ServiceSpec{
            ConfigurationSpec: servingv1.ConfigurationSpec{
                Template: servingv1.RevisionTemplateSpec{
                    ObjectMeta: buildPodMeta(c, sv),
                    Spec: servingv1.RevisionSpec{
                        PodSpec: corev1.PodSpec{
                            SecurityContext: &corev1.PodSecurityContext{
                                RunAsNonRoot: &nonRoot,
                                SeccompProfile: &corev1.SeccompProfile{
                                    // should we change it to runtime?
                                    Type: corev1.SeccompProfileTypeUnconfined,
                                },
                            },
                            ServiceAccountName: c.KnativeServiceAccount,
                            Containers:         containers,
                            InitContainers:     initContainers,
                            Volumes:            buildVolumes(c, sv),
                            Affinity:           &corev1.Affinity{
                                // NodeAffinity: n,
                            },
                        },
                    },
                },
            },
        },
    }

    // Set Registry Secrets
    svc.Spec.ConfigurationSpec.Template.Spec.ImagePullSecrets = registrySecrets
    svc.Spec.ConfigurationSpec.Template.Spec.PodSpec.ImagePullSecrets = registrySecrets

    return svc, nil
}

func buildServiceMeta(c *core.Config, sv *core.ServiceFileData) metav1.ObjectMeta {
    meta := metav1.ObjectMeta{
        Name:        sv.GetID(),
        Namespace:   c.KnativeNamespace,
        Labels:      make(map[string]string),
        Annotations: make(map[string]string),
    }

    meta.Annotations["direktiv.io/inputHash"] = sv.GetValueHash()
    meta.Labels["networking.knative.dev/visibility"] = "cluster-local"
    meta.Annotations["networking.knative.dev/ingress.class"] = c.KnativeIngressClass

    return meta
}

func buildPodMeta(c *core.Config, sv *core.ServiceFileData) metav1.ObjectMeta {
    metaSpec := metav1.ObjectMeta{
        Namespace:   c.KnativeNamespace,
        Labels:      make(map[string]string),
        Annotations: make(map[string]string),
    }
    metaSpec.Labels["direktiv-app"] = "direktiv"

    metaSpec.Annotations["autoscaling.knative.dev/minScale"] = strconv.Itoa(sv.Scale)
    metaSpec.Annotations["autoscaling.knative.dev/maxScale"] = strconv.Itoa(c.KnativeMaxScale)

    if len(c.KnativeNetShape) > 0 {
        metaSpec.Annotations["kubernetes.io/ingress-bandwidth"] = c.KnativeNetShape
        metaSpec.Annotations["kubernetes.io/egress-bandwidth"] = c.KnativeNetShape
    }

    return metaSpec
}

func buildVolumes(_ *core.Config, sv *core.ServiceFileData) []corev1.Volume {
    volumes := []corev1.Volume{
        {
            Name: "workdir",
            VolumeSource: corev1.VolumeSource{
                EmptyDir: &corev1.EmptyDirVolumeSource{},
            },
        },
    }

    // add extra folder if bin required
    if sv.Cmd == direktivCmdExecValue {
        volumes = append(volumes, corev1.Volume{
            Name: "bindir",
            VolumeSource: corev1.VolumeSource{
                EmptyDir: &corev1.EmptyDirVolumeSource{},
            },
        })
    }

    return volumes
}

func buildContainers(c *core.Config, sv *core.ServiceFileData) ([]corev1.Container, error) {
    // set resource limits.
    rl, err := buildResourceLimits(c, sv)
    if err != nil {
        return nil, err
    }

    allowPrivilegeEscalation := true
    secContext := &corev1.SecurityContext{
        AllowPrivilegeEscalation: &allowPrivilegeEscalation,
        Capabilities: &corev1.Capabilities{
            Drop: []corev1.Capability{
                // corev1.Capability("ALL"),
            },
        },
    }

    // user container
    uc := corev1.Container{
        Name:      containerUser,
        Image:     sv.Image,
        Env:       buildEnvVars(false, c, sv),
        Resources: *rl,
        VolumeMounts: []corev1.VolumeMount{
            {
                Name:      "workdir",
                MountPath: "/mnt/shared",
            },
        },
        SecurityContext: secContext,
    }

    // add volume for binary or add command
    if sv.Cmd == direktivCmdExecValue {
        uc.VolumeMounts = append(uc.VolumeMounts, corev1.VolumeMount{
            Name:      "bindir",
            MountPath: "/usr/share/direktiv/",
        })
    }

    if len(sv.Cmd) > 0 {
        args, err := shellwords.Parse(sv.Cmd)
        if err != nil {
            return []corev1.Container{}, err
        }
        uc.Command = args
    }

    vMounts := []corev1.VolumeMount{
        {
            Name:      "workdir",
            MountPath: "/mnt/shared",
        },
    }

    // direktiv sidecar
    sidecarEnvs := buildEnvVars(true, c, sv)
    sidecarEnvs = append(sidecarEnvs, corev1.EnvVar{Name: "API_KEY", Value: c.ApiKey})
    sc := corev1.Container{
        Name:         containerSidecar,
        Image:        c.KnativeSidecar,
        Env:          sidecarEnvs,
        VolumeMounts: vMounts,
        Ports: []corev1.ContainerPort{
            {
                ContainerPort: containerSidecarPort,
            },
        },
        SecurityContext: secContext,
    }

    return []corev1.Container{uc, sc}, nil
}

func buildResourceLimits(cf *core.Config, sv *core.ServiceFileData) (*corev1.ResourceRequirements, error) {
    var (
        m int
        c string
        d int
    )

    switch sv.Size {
    case "small":
        m = cf.KnativeSizeMemorySmall
        c = cf.KnativeSizeCPUSmall
        d = cf.KnativeSizeDiskSmall
    case "medium":
        m = cf.KnativeSizeMemoryMedium
        c = cf.KnativeSizeCPUMedium
        d = cf.KnativeSizeDiskMedium
    case "large":
        m = cf.KnativeSizeMemoryLarge
        c = cf.KnativeSizeCPULarge
        d = cf.KnativeSizeDiskLarge
    default:
        return nil, fmt.Errorf("service size: '%s' is invalid, expected value: ['small', 'medium', 'large']", sv.Size)
    }

    ephemeralHigh, err := resource.ParseQuantity(fmt.Sprintf("%dM", d))
    if err != nil {
        return nil, err
    }

    rl := corev1.ResourceList{
        "ephemeral-storage": ephemeralHigh,
    }

    if m != 0 {
        qmem, err := resource.ParseQuantity(fmt.Sprintf("%dM", m))
        if err != nil {
            return nil, err
        }
        rl["memory"] = qmem
    }

    if c != "" {
        qcpu, err := resource.ParseQuantity(c)
        if err != nil {
            return nil, err
        }
        rl["cpu"] = qcpu
    }

    baseCPU, _ := resource.ParseQuantity("0.1")
    baseMem, _ := resource.ParseQuantity("64M")
    baseDisk, _ := resource.ParseQuantity("64M")

    return &corev1.ResourceRequirements{
        Requests: corev1.ResourceList{
            "cpu":               baseCPU,
            "memory":            baseMem,
            "ephemeral-storage": baseDisk,
        },
        Limits: rl,
    }, nil
}

func buildEnvVars(forSidecar bool, c *core.Config, sv *core.ServiceFileData) []corev1.EnvVar {
    proxyEnvs := []corev1.EnvVar{}

    if len(c.KnativeProxyHTTP) > 0 {
        proxyEnvs = append(proxyEnvs, corev1.EnvVar{
            Name:  direktivProxyHTTP,
            Value: c.KnativeProxyHTTP,
        })
        proxyEnvs = append(proxyEnvs, corev1.EnvVar{
            Name:  strings.ToLower(direktivProxyHTTP),
            Value: c.KnativeProxyHTTP,
        })
    }

    if len(c.KnativeProxyHTTPS) > 0 {
        proxyEnvs = append(proxyEnvs, corev1.EnvVar{
            Name:  direktivProxyHTTPS,
            Value: c.KnativeProxyHTTPS,
        })
        proxyEnvs = append(proxyEnvs, corev1.EnvVar{
            Name:  strings.ToLower(direktivProxyHTTPS),
            Value: c.KnativeProxyHTTPS,
        })
    }

    if len(c.KnativeProxyNo) > 0 {
        proxyEnvs = append(proxyEnvs, corev1.EnvVar{
            Name:  direktivProxyNO,
            Value: c.KnativeProxyNo,
        })
        proxyEnvs = append(proxyEnvs, corev1.EnvVar{
            Name:  strings.ToLower(direktivProxyNO),
            Value: c.KnativeProxyNo,
        })
    }

    // add debug if there is an env
    if c.LogDebug {
        proxyEnvs = append(proxyEnvs, corev1.EnvVar{
            Name:  direktivDebug,
            Value: "true",
        })
    }

    proxyEnvs = append(proxyEnvs, corev1.EnvVar{
        Name:  direktivOpentelemetry,
        Value: c.OpenTelemetry,
    })

    if forSidecar {
        namespace := c.DirektivNamespace

        proxyEnvs = append(proxyEnvs, corev1.EnvVar{
            Name:  direktivFlowEndpoint,
            Value: fmt.Sprintf("direktiv-flow.%s", namespace),
        })

        proxyEnvs = append(proxyEnvs, corev1.EnvVar{
            Name:  "DIREKTIV_APP",
            Value: "sidecar",
        })
    } else {
        for _, v := range sv.Envs {
            proxyEnvs = append(proxyEnvs, corev1.EnvVar{
                Name:  v.Name,
                Value: v.Value,
            })
        }
    }

    return proxyEnvs
}