vorteil/direktiv

View on GitHub
pkg/mirror/apply.go

Summary

Maintainability
C
1 day
Test Coverage
package mirror

import (
    "context"
    "fmt"
    "os"
    "path/filepath"
    "sort"
    "strings"

    "github.com/direktiv/direktiv/pkg/datastore"
    "github.com/direktiv/direktiv/pkg/filestore"
    "github.com/gabriel-vasile/mimetype"
    "github.com/google/uuid"
)

type Applyer interface {
    apply(ctx context.Context, callbacks Callbacks, proc *datastore.MirrorProcess, parser *Parser, notes map[string]string) error
}

type DryrunApplyer struct{}

func (o *DryrunApplyer) apply(_ context.Context, _ Callbacks, _ *datastore.MirrorProcess, _ *Parser, _ map[string]string) error {
    return nil
}

type DirektivApplyer struct {
    NamespaceID uuid.UUID
    log         FormatLogger
    callbacks   Callbacks
    proc        *datastore.MirrorProcess
    parser      *Parser

    rootID uuid.UUID
    notes  map[string]string
}

func (o *DirektivApplyer) apply(ctx context.Context, callbacks Callbacks, proc *datastore.MirrorProcess, parser *Parser, notes map[string]string) error {
    o.log = newPIDFormatLogger(callbacks.ProcessLogger(), proc.ID)
    o.callbacks = callbacks
    o.proc = proc
    o.parser = parser
    o.notes = notes

    o.rootID = uuid.New()

    root, err := callbacks.FileStore().CreateTempRoot(ctx, o.rootID)
    if err != nil {
        return fmt.Errorf("failed to create new filesystem root: %w", err)
    }

    err = o.copyFilesIntoRoot(ctx)
    if err != nil {
        return fmt.Errorf("failed to copy files into new filesystem root: %w", err)
    }

    err = o.copyServicesIntoRoot(ctx)
    if err != nil {
        return fmt.Errorf("failed to copy services into new filesystem root: %w", err)
    }

    err = o.copyWorkflowsIntoRoot(ctx)
    if err != nil {
        return fmt.Errorf("failed to copy workflows into new filesystem root: %w", err)
    }

    err = o.copyEndpointsIntoRoot(ctx)
    if err != nil {
        return fmt.Errorf("failed to copy endpoints into new filesystem root: %w", err)
    }

    err = o.copyConsumersIntoRoot(ctx)
    if err != nil {
        return fmt.Errorf("failed to copy consumers into new filesystem root: %w", err)
    }

    err = o.copyDeprecatedVariables(ctx)
    if err != nil {
        return fmt.Errorf("failed to copy deprecated variables: %w", err)
    }

    // TODO: join the next two operations into a single atomic SQL operation?
    err = callbacks.FileStore().ForNamespace(proc.Namespace).Delete(ctx)
    if err != nil {
        return fmt.Errorf("failed to delete old filesystem root: %w", err)
    }

    err = callbacks.FileStore().ForRootID(root.ID).SetNamespace(ctx, proc.Namespace)
    if err != nil {
        return fmt.Errorf("failed to delete old filesystem root: %w", err)
    }

    err = o.configureWorkflows(ctx)
    if err != nil {
        return fmt.Errorf("failed to configure workflows: %w", err)
    }

    return nil
}

func (o *DirektivApplyer) copyFilesIntoRoot(ctx context.Context) error {
    paths, err := o.parser.ListFiles()
    if err != nil {
        return err
    }

    for _, path := range paths {
        actual := filepath.Join(o.parser.tempDir, path)

        fi, err := os.Stat(actual)
        if err != nil {
            return err
        }

        if fi.IsDir() {
            _, err = o.callbacks.FileStore().ForRootID(o.rootID).CreateFile(ctx, path, filestore.FileTypeDirectory, "", nil)
            if err != nil {
                return err
            }

            o.log.Debugf("Created directory in database: %s", path)

            continue
        }

        data, err := os.ReadFile(actual)
        if err != nil {
            return err
        }

        mt := mimetype.Detect(data)

        _, err = o.callbacks.FileStore().ForRootID(o.rootID).CreateFile(ctx, path, filestore.FileTypeFile, strings.Split(mt.String(), ";")[0], data)
        if err != nil {
            return err
        }

        o.log.Debugf("Created file in database: %s", path)
    }

    return nil
}

func (o *DirektivApplyer) copyWorkflowsIntoRoot(ctx context.Context) error {
    paths := []string{}
    for k := range o.parser.Workflows {
        paths = append(paths, k)
    }

    sort.Strings(paths)

    for _, path := range paths {
        data := o.parser.Workflows[path]
        _, err := o.callbacks.FileStore().ForRootID(o.rootID).CreateFile(ctx, path, filestore.FileTypeWorkflow, "application/yaml", data)
        if err != nil {
            return err
        }

        o.log.Debugf("Created workflow in database: %s", path)
    }

    return nil
}

func (o *DirektivApplyer) copyServicesIntoRoot(ctx context.Context) error {
    paths := []string{}
    for k := range o.parser.Services {
        paths = append(paths, k)
    }

    sort.Strings(paths)

    for _, path := range paths {
        data := o.parser.Services[path]
        _, err := o.callbacks.FileStore().ForRootID(o.rootID).CreateFile(ctx, path, filestore.FileTypeService, "application/yaml", data)
        if err != nil {
            return err
        }

        o.log.Debugf("Created service in database: %s", path)
    }

    return nil
}

func (o *DirektivApplyer) copyEndpointsIntoRoot(ctx context.Context) error {
    paths := []string{}
    for k := range o.parser.Endpoints {
        paths = append(paths, k)
    }

    sort.Strings(paths)

    for _, path := range paths {
        data := o.parser.Endpoints[path]
        _, err := o.callbacks.FileStore().ForRootID(o.rootID).CreateFile(ctx, path, filestore.FileTypeEndpoint, "application/yaml", data)
        if err != nil {
            return err
        }

        o.log.Debugf("Created endpoint in database: %s", path)
    }

    return nil
}

func (o *DirektivApplyer) copyConsumersIntoRoot(ctx context.Context) error {
    paths := []string{}
    for k := range o.parser.Consumers {
        paths = append(paths, k)
    }

    sort.Strings(paths)

    for _, path := range paths {
        data := o.parser.Consumers[path]
        _, err := o.callbacks.FileStore().ForRootID(o.rootID).CreateFile(ctx, path, filestore.FileTypeConsumer, "application/yaml", data)
        if err != nil {
            return err
        }

        o.log.Debugf("Created consumer in database: %s", path)
    }

    return nil
}

func (o *DirektivApplyer) configureWorkflows(ctx context.Context) error {
    paths := []string{}
    for k := range o.parser.Workflows {
        paths = append(paths, k)
    }

    sort.Strings(paths)

    for _, path := range paths {
        file, err := o.callbacks.FileStore().ForRootID(o.rootID).GetFile(ctx, path)
        if err != nil {
            return err
        }

        err = o.callbacks.ConfigureWorkflowFunc(ctx, o.NamespaceID, o.proc.Namespace, file)
        if err != nil {
            return err
        }

        o.log.Debugf("Configured workflow in database: %s", path)
    }

    return nil
}

func (o *DirektivApplyer) copyDeprecatedVariables(ctx context.Context) error {
    for k, v := range o.parser.DeprecatedNamespaceVars {
        mt := mimetype.Detect(v)
        mtString := strings.Split(mt.String(), ";")

        _, err := o.callbacks.VarStore().Set(ctx,
            &datastore.RuntimeVariable{
                Namespace: o.proc.Namespace,
                Name:      k,
                MimeType:  mtString[0],
                Data:      v,
            })
        if err != nil {
            return fmt.Errorf("failed to save namespace variable '%s': %w", k, err)
        }
    }

    for path, m := range o.parser.DeprecatedWorkflowVars {
        file, err := o.callbacks.FileStore().ForRootID(o.rootID).GetFile(ctx, path)
        if err != nil {
            return err
        }

        for k, v := range m {
            mt := mimetype.Detect(v)
            mtString := strings.Split(mt.String(), ";")

            _, err := o.callbacks.VarStore().Set(ctx,
                &datastore.RuntimeVariable{
                    Namespace:    o.proc.Namespace,
                    WorkflowPath: file.Path,
                    Name:         k,
                    MimeType:     mtString[0],
                    Data:         v,
                })
            if err != nil {
                return fmt.Errorf("failed to save workflow variable '%s' '%s': %w", path, k, err)
            }
        }
    }

    return nil
}