vorteil/direktiv

View on GitHub
pkg/service/manager.go

Summary

Maintainability
B
5 hrs
Test Coverage
package service

import (
    "context"
    "fmt"
    "io"
    "log/slog"
    "slices"
    "sort"
    "sync"
    "time"

    "github.com/direktiv/direktiv/pkg/core"
    "github.com/direktiv/direktiv/pkg/reconcile"
    "github.com/direktiv/direktiv/pkg/tracing"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "knative.dev/serving/pkg/client/clientset/versioned"
)

// manager struct implements core.ServiceManager by wrapping runtimeClient. manager implementation manages
// services in the system in a declarative manner. This implementation spans up a goroutine (via Start())
// to Run the services in list param versus what is running in the runtime.
type manager struct {
    cfg *core.Config
    // this list maintains all the service configurations that need to be running.
    list []*core.ServiceFileData

    // the underlying service runtime used to create/schedule the services.
    runtimeClient runtimeClient

    lock *sync.Mutex

    servicesListHasBeenSet bool // NOTE: set to true the first time SetServices is called, and used to prevent any reconciles before that has happened.
}

func NewManager(c *core.Config) (core.ServiceManager, error) {
    return newKnativeManager(c)
}

func newKnativeManager(c *core.Config) (*manager, error) {
    config, err := rest.InClusterConfig()
    if err != nil {
        return nil, err
    }
    knativeCli, err := versioned.NewForConfig(config)
    if err != nil {
        return nil, err
    }

    k8sCli, err := kubernetes.NewForConfig(config)
    if err != nil {
        return nil, err
    }

    client := &knativeClient{
        config:     c,
        knativeCli: knativeCli,
        k8sCli:     k8sCli,
    }

    return &manager{
        cfg:           c,
        list:          make([]*core.ServiceFileData, 0),
        runtimeClient: client,

        lock: &sync.Mutex{},
    }, nil
}

func (m *manager) runCycle() []error {
    if !m.servicesListHasBeenSet {
        return nil
    }
    // clone the list
    src := make([]reconcile.Item, len(m.list))
    for i, v := range m.list {
        src[i] = v
    }

    searchSrc := map[string]*core.ServiceFileData{}
    for _, v := range m.list {
        searchSrc[v.GetID()] = v
    }

    knList, err := m.runtimeClient.listServices()
    if err != nil {
        return []error{err}
    }

    target := make([]reconcile.Item, len(knList))
    for i, v := range knList {
        target[i] = v
    }

    // slog.Debug("services reconcile", "src", len(src), "target", len(target))

    result := reconcile.Calculate(src, target)
    tracingHelper := func(v *core.ServiceFileData) context.Context {
        ctx := context.Background()
        ctx = tracing.AddNamespace(ctx, v.Namespace)
        ctx = tracing.WithTrack(ctx, tracing.BuildNamespaceTrack(v.Namespace))

        return ctx
    }
    errs := []error{}
    for _, id := range result.Deletes {
        slog.DebugContext(context.Background(), fmt.Sprintf("deleting service with ID %s", id))
        if err := m.runtimeClient.deleteService(id); err != nil {
            slog.ErrorContext(context.Background(), fmt.Sprintf("failed to delete service with ID %s", id), "error", err)
            errs = append(errs, fmt.Errorf("delete service id: %s %w", id, err))
        }
    }

    for _, id := range result.Creates {
        v := searchSrc[id]
        v.Error = nil
        // v is passed un-cloned.
        slog.DebugContext(tracingHelper(v), fmt.Sprintf("creating service %s with ID %s, type %s", v.FilePath, v.ID, v.Typ))
        if err := m.runtimeClient.createService(v); err != nil {
            slog.ErrorContext(tracingHelper(v), fmt.Sprintf("failed to create service %s with ID %s, type %s", v.FilePath, v.ID, v.Typ), "error", err)
            errs = append(errs, fmt.Errorf("create service id: %s %w", id, err))
            errStr := err.Error()
            v.Error = &errStr
        }
    }

    for _, id := range result.Updates {
        v := searchSrc[id]
        v.Error = nil
        // v is passed un-cloned.
        slog.DebugContext(tracingHelper(v), fmt.Sprintf("updating service %s with ID %s, type %s", v.FilePath, v.ID, v.Typ))
        if err := m.runtimeClient.updateService(v); err != nil {
            slog.ErrorContext(tracingHelper(v), fmt.Sprintf("failed to update service %s with ID %s, type %s", v.FilePath, v.ID, v.Typ), "error", err)
            errs = append(errs, fmt.Errorf("update service id: %s %w", id, err))
            errStr := err.Error()
            v.Error = &errStr
        }
    }

    return errs
}

func (m *manager) Run(circuit *core.Circuit) error {
    cycleTime := m.cfg.GetFunctionsReconcileInterval()
    for {
        if circuit.IsDone() {
            return nil
        }
        m.lock.Lock()
        errs := m.runCycle()
        m.lock.Unlock()
        for _, err := range errs {
            slog.Error("run cycle", "err", err)
        }

        time.Sleep(cycleTime)
    }
}

func (m *manager) SetServices(list []*core.ServiceFileData) {
    m.lock.Lock()
    defer m.lock.Unlock()

    m.servicesListHasBeenSet = true

    m.list = slices.Clone(list)
    for i := range m.list {
        cp := *m.list[i]
        m.setServiceDefaults(&cp)
        m.list[i] = &cp
    }
}

func (m *manager) getList(filterNamespace string, filterTyp string, filterPath string, filterName string) ([]*core.ServiceFileData, error) {
    // clone the list
    sList := make([]*core.ServiceFileData, 0)
    for i, v := range m.list {
        if filterNamespace != "" && filterNamespace != v.Namespace {
            continue
        }
        if filterPath != "" && v.FilePath != filterPath {
            continue
        }
        if filterTyp != "" && v.Typ != filterTyp {
            continue
        }
        if filterName != "" && v.Name != filterName {
            continue
        }

        sList = append(sList, m.list[i])
    }

    services, err := m.runtimeClient.listServices()
    if err != nil {
        return nil, err
    }
    searchServices := map[string]status{}
    for _, v := range services {
        searchServices[v.GetID()] = v
    }

    // Populate id and conditions fields.
    for _, v := range sList {
        service := searchServices[v.GetID()]
        // sometimes hashes might be different (not reconciled yet).
        if service != nil && service.GetValueHash() == v.GetValueHash() {
            v.ID = v.GetID()
            v.Conditions = service.GetConditions()
        } else {
            v.ID = v.GetID()
            v.Conditions = make([]any, 0)
        }
    }

    sort.Slice(sList, func(i, j int) bool {
        return sList[i].FilePath < sList[j].FilePath
    })

    return sList, nil
}

// nolint:unparam
func (m *manager) getOne(namespace string, serviceID string) (*core.ServiceFileData, error) {
    list, err := m.getList(namespace, "", "", "")
    if err != nil {
        return nil, err
    }
    for _, svc := range list {
        if svc.ID == serviceID {
            cp := *svc

            return &cp, nil
        }
    }

    return nil, core.ErrNotFound
}

func (m *manager) GeAll(namespace string) ([]*core.ServiceFileData, error) {
    m.lock.Lock()
    defer m.lock.Unlock()

    return m.getList(namespace, "", "", "")
}

func (m *manager) GetPods(namespace string, serviceID string) (any, error) {
    m.lock.Lock()
    defer m.lock.Unlock()

    // check if serviceID exists.
    _, err := m.getOne(namespace, serviceID)
    if err != nil {
        return nil, err
    }

    pods, err := m.runtimeClient.listServicePods(serviceID)
    if err != nil {
        return nil, err
    }

    return pods, nil
}

func (m *manager) StreamLogs(namespace string, serviceID string, podID string) (io.ReadCloser, error) {
    m.lock.Lock()
    defer m.lock.Unlock()

    // check if serviceID exists.
    _, err := m.getOne(namespace, serviceID)
    if err != nil {
        return nil, err
    }

    return m.runtimeClient.streamServiceLogs(serviceID, podID)
}

func (m *manager) Rebuild(namespace string, serviceID string) error {
    m.lock.Lock()
    defer m.lock.Unlock()

    // check if serviceID exists.
    _, err := m.getOne(namespace, serviceID)
    if err != nil {
        return err
    }

    return m.runtimeClient.rebuildService(serviceID)
}

func (m *manager) setServiceDefaults(sv *core.ServiceFileData) {
    // empty size string defaults to medium
    ctx := tracing.AddNamespace(context.Background(), sv.Namespace)
    if sv.Size == "" {
        slog.WarnContext(ctx, "empty service size, defaulting to medium", "service_file", sv.FilePath)
        sv.Size = "medium"
    }
    if sv.Scale > m.cfg.KnativeMaxScale {
        slog.WarnContext(ctx, "service_scale is bigger than allowed max_scale, defaulting to max_scale",
            "service_scale", sv.Scale,
            "max_scale", m.cfg.KnativeMaxScale,
            "service_file", sv.FilePath)
        sv.Scale = m.cfg.KnativeMaxScale
    }
    if len(sv.Envs) == 0 {
        sv.Envs = make([]core.EnvironmentVariable, 0)
    }
}