vorteil/direktiv

View on GitHub
pkg/flow/engine.go

Summary

Maintainability
F
4 days
Test Coverage
package flow

import (
    "context"
    "encoding/base64"
    "encoding/json"
    "errors"
    "fmt"
    "log/slog"
    "net"
    "net/http"
    "path/filepath"
    "regexp"
    "runtime"
    "strings"
    "sync"
    "time"

    cloudevents "github.com/cloudevents/sdk-go/v2"
    derrors "github.com/direktiv/direktiv/pkg/flow/errors"
    "github.com/direktiv/direktiv/pkg/flow/nohome"
    "github.com/direktiv/direktiv/pkg/flow/states"
    "github.com/direktiv/direktiv/pkg/model"
    "github.com/direktiv/direktiv/pkg/refactor/core"
    "github.com/direktiv/direktiv/pkg/refactor/database"
    "github.com/direktiv/direktiv/pkg/refactor/datastore"
    enginerefactor "github.com/direktiv/direktiv/pkg/refactor/engine"
    "github.com/direktiv/direktiv/pkg/refactor/instancestore"
    "github.com/google/uuid"
    "github.com/senseyeio/duration"
    "go.opentelemetry.io/otel/trace"
)

type engine struct {
    *server

    scheduled sync.Map
}

func initEngine(srv *server) (*engine, error) {
    engine := new(engine)

    engine.server = srv

    engine.pBus.Subscribe(engine.instanceMessagesChannelHandler, engineInstanceMessagesChannel)

    go engine.instanceKicker()

    return engine, nil
}

func (engine *engine) Close() error {
    return nil
}

func (engine *engine) instanceKicker() {
    <-time.After(1 * time.Minute)
    ticker := time.NewTicker(5 * time.Second)
    for {
        <-ticker.C
        go engine.kickWaitingInstances()
    }
}

func (engine *engine) kickWaitingInstances() {
    ctx := context.Background()

    slog.Debug("Starting to kick waiting (homeless) instances.")
    tx, err := engine.beginSqlTx(ctx)
    if err != nil {
        slog.Error("Failed to begin SQL transaction in kickWaitingInstances.", "error", err)
        return
    }
    defer tx.Rollback()

    instances, err := tx.InstanceStore().GetHomelessInstances(ctx, time.Now().UTC().Add(-engineSchedulingTimeout))
    if err != nil {
        slog.Error("Failed to list homeless instances in kickWaitingInstances. Some instances may remain unprocessed.", "error", err)
        return
    }
    if len(instances) == 0 {
        slog.Debug("No homeless instances found to kick.")
        return
    }
    slog.Info("Processing homeless instances.", "count", len(instances))

    for idx := range instances {
        instance := instances[idx]
        slog.Debug("Kicking instance.", "instance_id", instance.ID)

        data, err := json.Marshal(&instanceMessageChannelData{
            InstanceID:        instance.ID,
            LastKnownServer:   instance.Server,
            LastKnownUpdateAt: instance.UpdatedAt,
        })
        if err != nil {
            slog.Error("Failed to marshal instance data in kickWaitingInstances.", "instance", instance.ID, "error", err)
        }

        engine.instanceMessagesChannelHandler(string(data))
    }
}

type newInstanceArgs struct {
    tx            *database.SQLStore
    ID            uuid.UUID
    Namespace     *datastore.Namespace
    CalledAs      string
    Input         []byte
    Invoker       string
    DescentInfo   *enginerefactor.InstanceDescentInfo
    TelemetryInfo *enginerefactor.InstanceTelemetryInfo
}

const (
    apiCaller = "api"
)

func unmarshalInstanceInputData(input []byte) interface{} {
    var inputData, stateData interface{}

    err := json.Unmarshal(input, &inputData)
    if err != nil {
        inputData = base64.StdEncoding.EncodeToString(input)
    }

    if _, ok := inputData.(map[string]interface{}); ok {
        stateData = inputData
    } else {
        stateData = map[string]interface{}{
            "input": inputData,
        }
    }

    return stateData
}

func marshalInstanceInputData(input []byte) string {
    x := unmarshalInstanceInputData(input)

    data, err := json.Marshal(x)
    if err != nil {
        panic(err)
    }

    return string(data)
}

func trim(s string) string {
    return strings.TrimPrefix(s, "/")
}

func (engine *engine) NewInstance(ctx context.Context, args *newInstanceArgs) (*instanceMemory, error) {
    slog.Debug("Initializing new instance creation.", "namespace", args.Namespace.Name, "workflow", args.CalledAs, "invoker", args.Invoker)
    file, data, err := engine.mux(ctx, args.Namespace, args.CalledAs)
    loggingCtx := args.Namespace.WithTags(ctx)
    loggingCtx = enginerefactor.AddTag(loggingCtx, "calledAs", args.CalledAs)
    loggingCtx = enginerefactor.AddTag(loggingCtx, "invoker", args.Invoker)
    loggingCtx = enginerefactor.WithTrack(loggingCtx, enginerefactor.BuildNamespaceTrack(args.Namespace.Name))

    if err != nil {
        slog.Debug("Failed to retrieve workflow for new instance.", enginerefactor.GetSlogAttributesWithError(loggingCtx, err)...)
        if derrors.IsNotFound(err) {
            return nil, derrors.NewUncatchableError("direktiv.workflow.notfound", "workflow not found: %v", err.Error())
        }
        return nil, err
    }

    var wf model.Workflow
    err = wf.Load(data)
    if err != nil {
        slog.Error("Failed to parse workflow definition.", enginerefactor.GetSlogAttributesWithError(loggingCtx, err)...)
        return nil, derrors.NewUncatchableError("direktiv.workflow.invalid", "cannot parse workflow '%s': %v", trim(file.Path), err)
    }

    if len(wf.GetStartDefinition().GetEvents()) > 0 {
        if strings.ToLower(args.Invoker) == apiCaller {
            return nil, derrors.NewUncatchableError("direktiv.workflow.invoke", "cannot manually invoke event-based workflow")
        }
        if strings.HasPrefix(args.Invoker, "instance") {
            return nil, derrors.NewUncatchableError("direktiv.workflow.invoke", "cannot invoke event-based workflow as a subflow")
        }
    }

    root := args.ID
    iterator := 0
    if args.DescentInfo != nil && len(args.DescentInfo.Descent) > 0 {
        root = args.DescentInfo.Descent[0].ID
        iterator = args.DescentInfo.Descent[len(args.DescentInfo.Descent)-1].Branch
    }

    descentInfo, err := args.DescentInfo.MarshalJSON()
    if err != nil {
        panic(err)
    }

    args.TelemetryInfo.NamespaceName = args.Namespace.Name
    telemetryInfo, err := args.TelemetryInfo.MarshalJSON()
    if err != nil {
        panic(err)
    }

    settings := &enginerefactor.InstanceSettings{}
    settingsData, err := settings.MarshalJSON()
    if err != nil {
        panic(err)
    }

    liveData := marshalInstanceInputData(args.Input)

    ri := &enginerefactor.InstanceRuntimeInfo{}
    riData, err := ri.MarshalJSON()
    if err != nil {
        panic(err)
    }

    ci := &enginerefactor.InstanceChildrenInfo{}
    ciData, err := ci.MarshalJSON()
    if err != nil {
        panic(err)
    }

    tx := args.tx

    if tx == nil {
        tx, err = engine.flow.beginSqlTx(ctx)
        if err != nil {
            return nil, err
        }
        defer tx.Rollback()
    }
    slog.Debug("Preparing to commit new instance transaction.", "instance", args.ID.String(), "namespace", args.Namespace)

    idata, err := tx.InstanceStore().CreateInstanceData(ctx, &instancestore.CreateInstanceDataArgs{
        ID:             args.ID,
        NamespaceID:    args.Namespace.ID,
        Namespace:      args.Namespace.Name,
        RootInstanceID: root,
        Server:         engine.ID,
        Invoker:        args.Invoker,
        WorkflowPath:   file.Path,
        Definition:     data,
        Input:          args.Input,
        LiveData:       []byte(liveData),
        TelemetryInfo:  telemetryInfo,
        Settings:       settingsData,
        DescentInfo:    descentInfo,
        RuntimeInfo:    riData,
        ChildrenInfo:   ciData,
    })
    if err != nil {
        return nil, err
    }

    err = tx.Commit(ctx)
    if err != nil {
        return nil, err
    }
    slog.Info("New instance transaction committed successfully.", "instance", args.ID.String(), "namespace", args.Namespace)

    instance, err := enginerefactor.ParseInstanceData(idata)
    if err != nil {
        panic(err)
    }

    im := new(instanceMemory)
    im.engine = engine
    im.instance = instance
    im.updateArgs = new(instancestore.UpdateInstanceDataArgs)
    im.updateArgs.Server = engine.ID

    err = json.Unmarshal(im.instance.Instance.LiveData, &im.data)
    if err != nil {
        panic(err)
    }

    err = json.Unmarshal(im.instance.Instance.StateMemory, &im.memory)
    if err != nil {
        panic(err)
    }

    _, err = traceFullAddWorkflowInstance(ctx, im) // TODO.
    if err != nil {
        return nil, fmt.Errorf("failed to traceFullAddWorkflowInstance: %w", err)
    }
    im.AddAttribute("loop-index", fmt.Sprintf("%d", iterator))

    engine.pubsub.NotifyInstances(im.Namespace())
    namespaceTrackCtx := enginerefactor.WithTrack(loggingCtx, enginerefactor.BuildNamespaceTrack(im.instance.Instance.Namespace))
    slog.Info("Workflow has been triggered", enginerefactor.GetSlogAttributesWithStatus(namespaceTrackCtx, core.LogRunningStatus)...)

    return im, nil
}

func (engine *engine) loadStateLogic(im *instanceMemory, stateID string) error {
    workflow, err := im.Model()
    if err != nil {
        return err
    }

    var state model.State

    if stateID == "" {
        state = workflow.GetStartState()
    } else {
        wfstates := workflow.GetStatesMap()
        var exists bool
        state, exists = wfstates[stateID]
        if !exists {
            return fmt.Errorf("workflow %s cannot resolve state: %s", nohome.GetWorkflow(im.instance.Instance.WorkflowPath), stateID)
        }
    }

    im.logic, err = states.StateLogic(im, state)
    if err != nil {
        return err
    }

    return nil
}

func (engine *engine) Transition(ctx context.Context, im *instanceMemory, nextState string, attempt int) *states.Transition {
    workflow, err := im.Model()
    if err != nil {
        engine.CrashInstance(ctx, im, err)
        return nil
    }

    oldController := im.Controller()

    if im.Step() == 0 {
        t := time.Now().UTC()
        tSoft := time.Now().UTC().Add(time.Minute * 15)
        tHard := time.Now().UTC().Add(time.Minute * 20)

        if workflow.Timeouts != nil {
            s := workflow.Timeouts.Interrupt

            if s != "" {
                d, err := duration.ParseISO8601(s)
                if err != nil {
                    engine.CrashInstance(ctx, im, err)
                    return nil
                }
                tSoft = d.Shift(t)
                tHard = tSoft.Add(time.Minute * 5)
            }

            s = workflow.Timeouts.Kill

            if s != "" {
                d, err := duration.ParseISO8601(s)
                if err != nil {
                    engine.CrashInstance(ctx, im, err)
                    return nil
                }
                tHard = d.Shift(t)
            }
        }

        engine.ScheduleSoftTimeout(ctx, im, oldController, tSoft)
        engine.ScheduleHardTimeout(ctx, im, oldController, tHard)
    }

    if nextState == "" {
        panic("don't call this function with an empty nextState")
    }

    err = engine.loadStateLogic(im, nextState)
    if err != nil {
        engine.CrashInstance(ctx, im, err)
        return nil
    }

    flow := append(im.Flow(), nextState)
    deadline := im.logic.Deadline(ctx)

    err = engine.SetMemory(ctx, im, nil)
    if err != nil {
        engine.CrashInstance(ctx, im, err)
        return nil
    }

    ctx, cleanup, err := traceStateGenericBegin(ctx, im)
    if err != nil {
        engine.CrashInstance(ctx, im, err)
        return nil
    }
    defer cleanup()

    t := time.Now().UTC()

    im.instance.RuntimeInfo.Flow = flow
    im.instance.RuntimeInfo.Controller = engine.pubsub.Hostname
    im.instance.RuntimeInfo.Attempts = attempt
    im.instance.RuntimeInfo.StateBeginTime = t
    rtData, err := im.instance.RuntimeInfo.MarshalJSON()
    if err != nil {
        panic(err)
    }
    im.updateArgs.RuntimeInfo = &rtData

    im.instance.Instance.Deadline = &deadline
    im.updateArgs.Deadline = im.instance.Instance.Deadline

    err = im.flushUpdates(ctx)
    if err != nil {
        engine.CrashInstance(ctx, im, err)
        return nil
    }

    engine.ScheduleSoftTimeout(ctx, im, oldController, deadline)

    return engine.runState(ctx, im, nil, nil)
}

func (engine *engine) CrashInstance(ctx context.Context, im *instanceMemory, err error) {
    cerr := new(derrors.CatchableError)
    uerr := new(derrors.UncatchableError)

    if errors.As(err, &cerr) {
        engine.reportInstanceCrashed(ctx, im, "catchable", cerr.Code, err)
    } else if errors.As(err, &uerr) && uerr.Code != "" {
        engine.reportInstanceCrashed(ctx, im, "uncatchable", uerr.Code, err)
    } else {
        _, file, line, _ := runtime.Caller(1)
        engine.reportInstanceCrashed(ctx, im, "unknown", fmt.Sprintf("thrown by %s:%d", file, line), err)
    }

    engine.SetInstanceFailed(ctx, im, err)
    engine.TerminateInstance(ctx, im)
}

func (engine *engine) setEndAt(im *instanceMemory) {
    t := time.Now().UTC()
    im.instance.Instance.EndedAt = &t
    im.updateArgs.EndedAt = im.instance.Instance.EndedAt
}

type noCancelCtx struct {
    //nolint:containedctx
    ctx context.Context
}

func (c noCancelCtx) Deadline() (time.Time, bool)       { return time.Time{}, false }
func (c noCancelCtx) Done() <-chan struct{}             { return nil }
func (c noCancelCtx) Err() error                        { return nil }
func (c noCancelCtx) Value(key interface{}) interface{} { return c.ctx.Value(key) }

// WithoutCancel returns a context that is never canceled.
func NoCancelContext(ctx context.Context) context.Context {
    return noCancelCtx{ctx: ctx}
}

func (engine *engine) TerminateInstance(ctx context.Context, im *instanceMemory) {
    if engine.GetIsInstanceCrashed(im) {
        ctx = NoCancelContext(ctx)
    }

    engine.setEndAt(im)

    engine.freeArtefacts(im)
    err := engine.freeMemory(ctx, im)
    if err != nil {
        if !engine.GetIsInstanceCrashed(im) {
            engine.CrashInstance(ctx, im, err)
            return
        }

        engine.forceFreeCriticalMemory(ctx, im)
        slog.Debug("Failed to free memory during a crash", "error", err)
    }

    if im.logic != nil {
        engine.metricsCompleteState(im, "", im.ErrorCode(), false)
    }

    engine.metricsCompleteInstance(im)

    engine.WakeInstanceCaller(ctx, im)
}

func (engine *engine) runState(ctx context.Context, im *instanceMemory, wakedata []byte, err error) *states.Transition {
    loggingCtx := im.Namespace().WithTags(ctx)
    instanceTrackCtx := enginerefactor.WithTrack(loggingCtx, enginerefactor.BuildInstanceTrack(im.instance))
    slog.Debug("Starting state execution.", enginerefactor.GetSlogAttributesWithStatus(ctx, core.LogRunningStatus)...)

    engine.logRunState(ctx, im, wakedata, err)
    slog.Debug("State logic executed. Processing post-execution actions.", enginerefactor.GetSlogAttributesWithStatus(instanceTrackCtx, core.LogRunningStatus)...)
    slog.Debug("Processing post-execution actions.", enginerefactor.GetSlogAttributesWithStatus(instanceTrackCtx, core.LogRunningStatus)...)

    var code string
    var transition *states.Transition

    ctx, cleanup, e2 := traceStateGenericLogicThread(ctx, im)
    if e2 != nil {
        err = e2
        goto failure
    }
    defer cleanup()

    if err != nil {
        goto failure
    }

    if lq := im.logic.GetLog(); im.GetMemory() == nil && len(wakedata) == 0 && lq != nil {
        var object interface{}
        object, err = jqOne(im.data, lq)
        if err != nil {
            slog.Error("Failed to process jq query on state data.", enginerefactor.GetSlogAttributesWithError(instanceTrackCtx, fmt.Errorf("query failed %v, err: %w", lq, err))...)

            goto failure
        }

        var data []byte
        data, err = json.MarshalIndent(object, "", "  ")
        if err != nil {
            err = derrors.NewInternalError(fmt.Errorf("failed to marshal state data: %w", err))
            slog.Error("Failed to marshal jq query result for logging.", enginerefactor.GetSlogAttributesWithError(instanceTrackCtx, fmt.Errorf("failed to marshal state data: %w", err))...)

            goto failure
        }

        engine.UserLog(ctx, im, string(data))
    }

    if md := im.logic.GetMetadata(); im.GetMemory() == nil && len(wakedata) == 0 && md != nil {
        var object interface{}
        object, err = jqOne(im.data, md)
        if err != nil {
            goto failure
        }

        var data []byte
        data, err = json.MarshalIndent(object, "", "  ")
        if err != nil {
            err = derrors.NewInternalError(fmt.Errorf("failed to marshal state data: %w", err))
            goto failure
        }

        engine.StoreMetadata(ctx, im, string(data))
    }
    slog.Debug("Executing state logic.", enginerefactor.GetSlogAttributesWithStatus(instanceTrackCtx, core.LogRunningStatus)...)

    transition, err = im.logic.Run(ctx, wakedata)
    if err != nil {
        goto failure
    }
    slog.Debug("Applying state transformation based on logic run.", enginerefactor.GetSlogAttributesWithStatus(instanceTrackCtx, core.LogRunningStatus)...)

    err = engine.transformState(ctx, im, transition)
    if err != nil {
        goto failure
    }

next:
    slog.Debug("Initiating state logic run.", enginerefactor.GetSlogAttributesWithStatus(instanceTrackCtx, core.LogRunningStatus)...)
    return engine.transitionState(ctx, im, transition, code)

failure:

    traceStateError(ctx, err)

    var breaker int

    if breaker > 10 {
        err = derrors.NewInternalError(errors.New("somehow ended up in a catchable error loop"))
    }

    err1 := engine.CancelInstanceChildren(ctx, im)
    if err1 != nil {
        slog.Error("Canceling Instance's chrildren failed", enginerefactor.GetSlogAttributesWithError(instanceTrackCtx, err1)...)
    }
    cerr := new(derrors.CatchableError)

    if errors.As(err, &cerr) {
        _ = im.StoreData("error", cerr)

        for _, catch := range im.logic.ErrorDefinitions() {
            errRegex := catch.Error
            if errRegex == "*" {
                errRegex = ".*"
            }

            matched, regErr := regexp.MatchString(errRegex, cerr.Code)
            if regErr != nil {
                slog.Error("Regex compilation failed for error catch definition.", enginerefactor.GetSlogAttributesWithError(instanceTrackCtx, regErr)...)
            }

            if matched {
                slog.Info("Catchable error matched; executing defined transition.", enginerefactor.GetSlogAttributesWithStatus(instanceTrackCtx, core.LogErrStatus)...)
                slog.Error("State failed with an error", enginerefactor.GetSlogAttributesWithError(instanceTrackCtx, fmt.Errorf("State failed with an error '%s': %s", cerr.Code, cerr.Message))...)

                transition = &states.Transition{
                    Transform: "",
                    NextState: catch.Transition,
                }

                // breaker++

                code = cerr.Code

                goto next
            }
        }
    }
    slog.Error("Unrecoverable error encountered; initiating instance crash.", enginerefactor.GetSlogAttributesWithError(instanceTrackCtx, err)...)
    engine.CrashInstance(ctx, im, err)
    return nil
}

func (engine *engine) transformState(ctx context.Context, im *instanceMemory, transition *states.Transition) error {
    if transition == nil || transition.Transform == nil {
        return nil
    }

    if s, ok := transition.Transform.(string); ok && (s == "" || s == ".") {
        return nil
    }
    loggingCtx := im.Namespace().WithTags(ctx)
    slog.Debug("Transforming state data.",
        enginerefactor.GetSlogAttributesWithStatus(
            enginerefactor.WithTrack(im.WithTags(loggingCtx),
                enginerefactor.BuildInstanceTrack(im.instance)), core.LogRunningStatus,
        )...)

    x, err := jqObject(im.data, transition.Transform)
    if err != nil {
        return derrors.WrapCatchableError("unable to apply transform: %v", err)
        // return derrors.WrapCatchableError("Failed to apply jq transformation on state data. Transformation: '%v', Error: %v", transition.Transform, err)
    }

    im.replaceData(x)
    slog.Debug("Successfully transformed state data.",
        enginerefactor.GetSlogAttributesWithStatus(
            enginerefactor.WithTrack(im.WithTags(loggingCtx),
                enginerefactor.BuildInstanceTrack(im.instance)), core.LogRunningStatus,
        )...)

    return nil
}

func (engine *engine) transitionState(ctx context.Context, im *instanceMemory, transition *states.Transition, errCode string) *states.Transition {
    e := im.flushUpdates(ctx)
    if e != nil {
        slog.Error("Failed to flush updates for instance.", "instance", im.ID(), "namespace", im.Namespace(), "error", e)
        engine.CrashInstance(ctx, im, e)
        return nil
    }

    if transition == nil {
        engine.InstanceYield(ctx, im)
        return nil
    }
    loggingCtx := im.Namespace().WithTags(ctx)
    instanceTrackCtx := enginerefactor.WithTrack(im.WithTags(loggingCtx), enginerefactor.BuildInstanceTrack(im.instance))

    if transition.NextState != "" {
        engine.metricsCompleteState(im, transition.NextState, errCode, false)
        slog.Debug("Transitioning to next state.",
            enginerefactor.GetSlogAttributesWithStatus(instanceTrackCtx, core.LogRunningStatus)...)

        return transition
    }

    status := instancestore.InstanceStatusComplete
    if im.ErrorCode() != "" {
        status = instancestore.InstanceStatusFailed
        if im.ErrorCode() == "direktiv.cancels.parent" || im.ErrorCode() == "direktiv.cancels.api" {
            status = instancestore.InstanceStatusCancelled
        }
        slog.Debug("Workflow failed with an error.", enginerefactor.GetSlogAttributesWithError(instanceTrackCtx, fmt.Errorf("'%s': %s", im.ErrorCode(), im.instance.Instance.ErrorMessage))...)
    }

    slog.Debug("Instance terminated", "instance", im.ID().String(), "namespace", im.Namespace())

    output := im.MarshalData()
    im.instance.Instance.Output = []byte(output)
    im.updateArgs.Output = &im.instance.Instance.Output
    im.instance.Instance.Status = status
    im.updateArgs.Status = &im.instance.Instance.Status

    slog.Info("Workflow completed.", enginerefactor.GetSlogAttributesWithStatus(instanceTrackCtx, core.LogCompletedStatus)...)

    defer engine.pubsub.NotifyInstance(im.instance.Instance.ID)
    defer engine.pubsub.NotifyInstances(im.Namespace())

    engine.TerminateInstance(ctx, im)

    return nil
}

func (engine *engine) subflowInvoke(ctx context.Context, pi *enginerefactor.ParentInfo, instance *enginerefactor.Instance, name string, input []byte) (*instanceMemory, error) {
    var err error

    di := &enginerefactor.InstanceDescentInfo{
        Descent: append(instance.DescentInfo.Descent, *pi),
    }

    span := trace.SpanFromContext(ctx)

    args := &newInstanceArgs{
        ID: uuid.New(),
        Namespace: &datastore.Namespace{
            ID:   instance.Instance.NamespaceID,
            Name: instance.TelemetryInfo.NamespaceName,
        },
        CalledAs:    name,
        Input:       input,
        Invoker:     fmt.Sprintf("instance:%v", pi.ID),
        DescentInfo: di,
        TelemetryInfo: &enginerefactor.InstanceTelemetryInfo{
            TraceID:       span.SpanContext().TraceID().String(),
            SpanID:        span.SpanContext().SpanID().String(),
            NamespaceName: instance.TelemetryInfo.NamespaceName,
        },
    }

    if !filepath.IsAbs(args.CalledAs) {
        dir, _ := filepath.Split(instance.Instance.WorkflowPath)
        if dir == "" {
            dir = "/"
        }
        args.CalledAs = filepath.Join(dir, args.CalledAs)
    }

    im, err := engine.NewInstance(ctx, args)
    if err != nil {
        return nil, err
    }

    im.AddAttribute("loop-index", fmt.Sprintf("%d", pi.Branch))
    traceSubflowInvoke(ctx, args.CalledAs, im.ID().String())

    return im, nil
}

type retryMessage struct {
    InstanceID string
    Data       []byte
}

const retryWakeupFunction = "retryWakeup"

func (engine *engine) scheduleRetry(id string, t time.Time, data []byte) error {
    data, err := json.Marshal(&retryMessage{
        InstanceID: id,
        Data:       data,
    })
    if err != nil {
        panic(err) // TODO ?
    }

    if d := time.Until(t); d < time.Second*5 {
        go func() {
            time.Sleep(d)
            /* #nosec */
            engine.retryWakeup(data)
        }()
        return nil
    }

    err = engine.timers.addOneShot(id, retryWakeupFunction, t, data)
    if err != nil {
        return derrors.NewInternalError(err)
    }

    return nil
}

func (engine *engine) retryWakeup(data []byte) {
    msg := new(retryMessage)

    err := json.Unmarshal(data, msg)
    if err != nil {
        slog.Error("failed to unmarshal retryMessage", "error", err)
        return
    }

    uid, err := uuid.Parse(msg.InstanceID)
    if err != nil {
        slog.Error("failed to parse instance ID in retryMessage", "error", err)
        return
    }

    ctx := context.Background()

    err = engine.enqueueInstanceMessage(ctx, uid, "wake", msg)
    if err != nil {
        slog.Error("failed to enqueue instance message for retryWakeup", "error", err)
        return
    }
}

type actionResultPayload struct {
    ActionID     string
    ErrorCode    string
    ErrorMessage string
    Output       []byte
}

type actionResultMessage struct {
    InstanceID string
    State      string
    Step       int
    Payload    actionResultPayload
}

func (engine *engine) createTransport() *http.Transport {
    tr := &http.Transport{
        Proxy: http.ProxyFromEnvironment,
        DialContext: (&net.Dialer{
            Timeout:   10 * time.Second,
            KeepAlive: 30 * time.Second,
            DualStack: true,
        }).DialContext,
        ForceAttemptHTTP2:     true,
        MaxIdleConns:          100,
        IdleConnTimeout:       90 * time.Second,
        TLSHandshakeTimeout:   10 * time.Second,
        ExpectContinueTimeout: 1 * time.Second,
    }

    return tr
}

func (engine *engine) wakeEventsWaiter(instance uuid.UUID, events []*cloudevents.Event) {
    ctx := context.Background()

    err := engine.enqueueInstanceMessage(ctx, instance, "event", events)
    if err != nil {
        slog.Error("failed to enqueue instance message for wakeEventsWaiter", "error", err)
        return
    }
}

func (engine *engine) EventsInvoke(workflowID uuid.UUID, events ...*cloudevents.Event) {
    ctx := context.Background()

    tx, err := engine.flow.beginSqlTx(ctx)
    if err != nil {
        slog.Error("Failed to begin SQL transaction in EventsInvoke.", "error", err)
        return
    }
    defer tx.Rollback()

    file, err := tx.FileStore().GetFileByID(ctx, workflowID)
    if err != nil {
        slog.Error("Failed to fetch file from database.", "workflowID", workflowID, "error", err)
        return
    }

    root, err := tx.FileStore().GetRoot(ctx, file.RootID)
    if err != nil {
        slog.Error("Failed to fetch Root from database.", "workflowID", workflowID, "error", err)
        return
    }

    ns, err := tx.DataStore().Namespaces().GetByName(ctx, root.Namespace)
    if err != nil {
        slog.Error("Failed to fetch namespace from database.", "namespace", root.Namespace, "error", err)
        return
    }

    tx.Rollback()

    var input []byte
    m := make(map[string]interface{})
    for _, event := range events {
        if event == nil {
            continue
        }

        m[event.Type()] = event
    }

    input, err = json.Marshal(m)
    if err != nil {
        slog.Error("Failed to marshal event data in EventsInvoke.", "error", err)
        return
    }

    span := trace.SpanFromContext(ctx)

    args := &newInstanceArgs{
        ID:        uuid.New(),
        Namespace: ns,
        CalledAs:  file.Path,
        Input:     input,
        Invoker:   "cloudevent",
        TelemetryInfo: &enginerefactor.InstanceTelemetryInfo{
            TraceID:       span.SpanContext().TraceID().String(),
            SpanID:        span.SpanContext().SpanID().String(),
            NamespaceName: ns.Name,
        },
    }

    im, err := engine.NewInstance(ctx, args)
    if err != nil {
        slog.Error("new instance", "error", err)
        return
    }
    slog.Debug("Successfully invoked new workflow instance.", "instanceID", im.ID().String(), "workflowPath", file.Path)

    go engine.start(im)
}

func (engine *engine) SetMemory(ctx context.Context, im *instanceMemory, x interface{}) error {
    im.setMemory(x)

    data, err := json.Marshal(x)
    if err != nil {
        panic(err)
    }

    im.instance.Instance.StateMemory = data
    im.updateArgs.StateMemory = &data

    return nil
}

func (engine *engine) reportInstanceCrashed(ctx context.Context, im *instanceMemory, typ, code string, err error) {
    loggingCtx := im.Namespace().WithTags(ctx)

    instanceTrackCtx := enginerefactor.WithTrack(im.WithTags(loggingCtx), enginerefactor.BuildInstanceTrack(im.instance))
    namespaceTrackCtx := enginerefactor.WithTrack(im.WithTags(loggingCtx), enginerefactor.BuildNamespaceTrack(im.Namespace().Name))
    msg := fmt.Sprintf("Workflow failed with code = %v, error = %v", typ, code)
    slog.Error(msg, enginerefactor.GetSlogAttributesWithError(instanceTrackCtx, err)...)
    slog.Error(msg, enginerefactor.GetSlogAttributesWithError(namespaceTrackCtx, err)...)
}

func (engine *engine) UserLog(ctx context.Context, im *instanceMemory, msg string) {
    loggingCtx := im.Namespace().WithTags(ctx)
    instanceTrackCtx := enginerefactor.WithTrack(im.WithTags(loggingCtx), enginerefactor.BuildInstanceTrack(im.instance))

    slog.Info(msg, enginerefactor.GetSlogAttributesWithStatus(instanceTrackCtx, core.LogUnknownStatus)...)

    if attr := im.instance.Settings.LogToEvents; attr != "" {
        s := msg
        event := cloudevents.NewEvent()
        event.SetID(uuid.New().String())
        event.SetSource(im.instance.Instance.WorkflowPath)
        event.SetType("direktiv.instanceLog")
        event.SetExtension("logger", attr)
        event.SetDataContentType("application/json")
        err := event.SetData("application/json", s)
        if err != nil {
            slog.Error("failed to create cloudevent", "error", err.Error())
        }

        err = engine.events.BroadcastCloudevent(ctx, im.Namespace(), &event, 0)
        if err != nil {
            slog.Error("failed to broadcast cloudevent", "error", err)
            return
        }
    }
}

func (engine *engine) logRunState(ctx context.Context, im *instanceMemory, wakedata []byte, err error) {
    slog.Debug("Running state logic", "instance", im.ID().String(), "step", im.Step(), "logic", im.logic.GetID())
    if im.GetMemory() == nil && len(wakedata) == 0 && err == nil {
        loggingCtx := im.Namespace().WithTags(ctx)
        instanceTrackCtx := enginerefactor.WithTrack(im.WithTags(loggingCtx), enginerefactor.BuildInstanceTrack(im.instance))

        slog.Info("Running state logic", enginerefactor.GetSlogAttributesWithStatus(instanceTrackCtx, core.LogRunningStatus)...)
    }
}

// GetInodePath returns the exact path to a inode.
func GetInodePath(path string) string {
    path = strings.TrimSuffix(path, "/")
    if !strings.HasPrefix(path, "/") {
        return "/" + path
    }
    path = filepath.Clean(path)
    return path
}