vorteil/direktiv

View on GitHub
pkg/flow/memory.go

Summary

Maintainability
A
3 hrs
Test Coverage
package flow

import (
    "context"
    "database/sql"
    "encoding/json"
    "errors"
    "fmt"
    "log/slog"
    "strings"
    "time"

    "github.com/direktiv/direktiv/pkg/datastore"
    enginerefactor "github.com/direktiv/direktiv/pkg/engine"
    derrors "github.com/direktiv/direktiv/pkg/flow/errors"
    "github.com/direktiv/direktiv/pkg/instancestore"
    "github.com/direktiv/direktiv/pkg/model"
    "github.com/google/uuid"
)

type instanceMemory struct {
    engine *engine

    data   interface{}
    memory interface{}
    logic  stateLogic

    // stores the events to be fired on schedule
    eventQueue []string

    tags map[string]string

    instance   *enginerefactor.Instance
    updateArgs *instancestore.UpdateInstanceDataArgs
}

func (im *instanceMemory) Namespace() *datastore.Namespace {
    return &datastore.Namespace{
        ID:   im.instance.Instance.NamespaceID,
        Name: im.instance.TelemetryInfo.NamespaceName,
    }
}

func (im *instanceMemory) flushUpdates(ctx context.Context) error {
    data, err := json.Marshal(im.updateArgs)
    if err != nil {
        panic(err)
    }

    if string(data) == `{}` {
        return nil
    }

    im.updateArgs.Server = im.engine.ID

    // NOTE: no need to make this serializable because only a single operation is performed. If we
    //         expand the number of queries here in the future we should make it serializable. Be
    //         warned however that making this serializable opens us up to serialization failures, and
    //        therefore we will need to test heavily and potentially implement retries.
    tx, err := im.engine.flow.beginSQLTx(ctx) /*&sql.TxOptions{
        Isolation: sql.LevelSerializable,
    }*/if err != nil {
        return err
    }
    defer tx.Rollback()

    err = tx.InstanceStore().ForInstanceID(im.instance.Instance.ID).UpdateInstanceData(ctx, im.updateArgs)
    if err != nil {
        if strings.Contains(err.Error(), "got 0") {
            return errors.New("node no longer believes it should modify this instance")
        }

        return err
    }

    err = tx.Commit(ctx)
    if err != nil {
        return err
    }

    im.updateArgs = new(instancestore.UpdateInstanceDataArgs)
    im.updateArgs.Server = im.engine.ID

    im.engine.pubsub.NotifyInstance(im.instance.Instance.ID)
    im.engine.pubsub.NotifyInstances(im.Namespace())

    return nil
}

func (im *instanceMemory) ID() uuid.UUID {
    return im.instance.Instance.ID
}

func (im *instanceMemory) Controller() string {
    return im.instance.RuntimeInfo.Controller
}

func (im *instanceMemory) Model() (*model.Workflow, error) {
    data := im.instance.Instance.Definition

    workflow := new(model.Workflow)

    err := workflow.Load(data)
    if err != nil {
        return nil, err
    }

    return workflow, nil
}

func (im *instanceMemory) Step() int {
    return len(im.instance.RuntimeInfo.Flow)
}

func (im *instanceMemory) Status() string {
    return im.instance.Instance.Status.String()
}

func (im *instanceMemory) Flow() []string {
    return im.instance.RuntimeInfo.Flow
}

func (im *instanceMemory) MarshalData() string {
    data, err := json.Marshal(im.data)
    if err != nil {
        panic(err)
    }

    return string(data)
}

func (im *instanceMemory) MarshalOutput() string {
    if im.Status() == "complete" {
        return im.MarshalData()
    }

    return ""
}

func (im *instanceMemory) setMemory(x interface{}) {
    im.memory = x

    data := im.MarshalMemory()

    im.instance.Instance.StateMemory = []byte(data)
    im.updateArgs.StateMemory = &im.instance.Instance.StateMemory
}

func (im *instanceMemory) GetMemory() interface{} {
    return im.memory
}

func (im *instanceMemory) MarshalMemory() string {
    data, err := json.Marshal(im.memory)
    if err != nil {
        panic(err)
    }

    return string(data)
}

func (im *instanceMemory) UnmarshalMemory(x interface{}) error {
    if im.memory == nil {
        return nil
    }

    data, err := json.Marshal(im.memory)
    if err != nil {
        return err
    }

    err = json.Unmarshal(data, x)
    if err != nil {
        return err
    }

    return nil
}

func (im *instanceMemory) ErrorCode() string {
    return im.instance.Instance.ErrorCode
}

func (im *instanceMemory) ErrorMessage() string {
    return string(im.instance.Instance.ErrorMessage)
}

func (im *instanceMemory) StateBeginTime() time.Time {
    return im.instance.RuntimeInfo.StateBeginTime
}

func (im *instanceMemory) replaceData(x map[string]interface{}) {
    im.data = x
    data := im.MarshalData()
    im.instance.Instance.LiveData = []byte(data)
    im.updateArgs.LiveData = &im.instance.Instance.LiveData
}

func (im *instanceMemory) StoreData(key string, val interface{}) error {
    m, ok := im.data.(map[string]interface{})
    if !ok {
        return derrors.NewInternalError(errors.New("unable to store data because state data isn't a valid JSON object"))
    }

    m[key] = val

    im.replaceData(m)

    return nil
}

func (im *instanceMemory) GetState() string {
    if im.logic != nil {
        return im.logic.GetID()
    }

    return ""
}

var errEngineSync = errors.New("instance appears to be under control of another node")

func (engine *engine) getInstanceMemory(ctx context.Context, id uuid.UUID) (*instanceMemory, error) {
    tx, err := engine.flow.beginSQLTx(ctx, &sql.TxOptions{
        Isolation: sql.LevelSerializable,
    })
    if err != nil {
        return nil, err
    }
    defer tx.Rollback()

    idata, err := tx.InstanceStore().ForInstanceID(id).GetMost(ctx)
    if err != nil {
        return nil, err
    }

    if idata.Server != engine.ID {
        if time.Now().Add(-1 * engineOwnershipTimeout).Before(idata.UpdatedAt) {
            return nil, errEngineSync
        }

        // TODO: alan DIR-1313
        // we need to ensure there's an auto-reattempter somewhere in the code
    }

    if idata.EndedAt != nil && !idata.EndedAt.IsZero() {
        return nil, derrors.NewInternalError(fmt.Errorf("aborting workflow logic: database records instance terminated"))
    }

    err = tx.InstanceStore().ForInstanceID(id).UpdateInstanceData(ctx, &instancestore.UpdateInstanceDataArgs{
        BypassOwnershipCheck: true,
        Server:               engine.ID,
    })
    if err != nil {
        return nil, err
    }

    err = tx.Commit(ctx)
    if err != nil {
        return nil, err
    }

    instance, err := enginerefactor.ParseInstanceData(idata)
    if err != nil {
        return nil, err
    }

    im := new(instanceMemory)
    im.engine = engine
    im.instance = instance
    im.updateArgs = new(instancestore.UpdateInstanceDataArgs)
    im.updateArgs.Server = engine.ID

    err = json.Unmarshal(im.instance.Instance.LiveData, &im.data)
    if err != nil {
        engine.CrashInstance(ctx, im, derrors.NewUncatchableError("", err.Error()))

        return nil, err
    }

    err = json.Unmarshal(im.instance.Instance.StateMemory, &im.memory)
    if err != nil {
        engine.CrashInstance(ctx, im, derrors.NewUncatchableError("", err.Error()))

        return nil, err
    }

    flow := im.instance.RuntimeInfo.Flow
    stateID := ""

    if len(flow) > 0 {
        stateID = flow[len(flow)-1]
    }

    err = engine.loadStateLogic(im, stateID)
    if err != nil {
        engine.CrashInstance(ctx, im, err)

        return nil, err
    }

    return im, nil
}

func (engine *engine) InstanceCaller(im *instanceMemory) *enginerefactor.ParentInfo {
    di := im.instance.DescentInfo
    if len(di.Descent) == 0 {
        return nil
    }

    return &di.Descent[len(di.Descent)-1]
}

func (engine *engine) StoreMetadata(ctx context.Context, im *instanceMemory, data string) {
    im.instance.Instance.Metadata = []byte(data)
    im.updateArgs.Metadata = &im.instance.Instance.Metadata
}

func (engine *engine) freeArtefacts(im *instanceMemory) {
    engine.timers.deleteTimersForInstance(im.ID().String())

    err := engine.events.deleteInstanceEventListeners(context.Background(), im)
    if err != nil {
        slog.Error("Failed to delete instance event listeners.", "error", err, "instance", im.instance, "namespace", im.Namespace())
    }
}

func (engine *engine) freeMemory(ctx context.Context, im *instanceMemory) error {
    im.eventQueue = make([]string, 0)

    err := im.flushUpdates(ctx)
    if err != nil {
        return err
    }

    return nil
}

func (engine *engine) forceFreeCriticalMemory(ctx context.Context, im *instanceMemory) {
    err := im.flushUpdates(ctx)
    if err != nil {
        slog.ErrorContext(ctx, "Failed to force flush updates for instance memory during critical memory release.", "instance", im.ID().String(), "namespace", im.Namespace(), "error", err)
    }
}