vorteil/direktiv

View on GitHub
pkg/flow/engine.go

Summary

Maintainability
F
4 days
Test Coverage
package flow

import (
    "context"
    "encoding/base64"
    "encoding/json"
    "errors"
    "fmt"
    "log/slog"
    "net"
    "net/http"
    "path/filepath"
    "regexp"
    "runtime"
    "strings"
    "sync"
    "time"

    cloudevents "github.com/cloudevents/sdk-go/v2"
    "github.com/direktiv/direktiv/pkg/core"
    "github.com/direktiv/direktiv/pkg/database"
    "github.com/direktiv/direktiv/pkg/datastore"
    enginerefactor "github.com/direktiv/direktiv/pkg/engine"
    derrors "github.com/direktiv/direktiv/pkg/flow/errors"
    "github.com/direktiv/direktiv/pkg/flow/nohome"
    "github.com/direktiv/direktiv/pkg/flow/states"
    "github.com/direktiv/direktiv/pkg/instancestore"
    "github.com/direktiv/direktiv/pkg/model"
    "github.com/direktiv/direktiv/pkg/pubsub"
    "github.com/direktiv/direktiv/pkg/tracing"
    "github.com/google/uuid"
    "github.com/senseyeio/duration"
)

type engine struct {
    *server

    scheduled sync.Map
}

func initEngine(srv *server) *engine {
    engine := new(engine)

    engine.server = srv

    engine.pBus.Subscribe(&pubsub.InstanceMessageEvent{}, engine.instanceMessagesChannelHandler)

    go engine.instanceKicker()

    return engine
}

func (engine *engine) instanceKicker() {
    <-time.After(1 * time.Minute)
    ticker := time.NewTicker(5 * time.Second)
    for {
        <-ticker.C
        go engine.kickWaitingInstances()
    }
}

func (engine *engine) kickWaitingInstances() {
    ctx := context.Background()

    slog.Debug("Starting to kick waiting (homeless) instances.")
    tx, err := engine.beginSQLTx(ctx)
    if err != nil {
        slog.Error("Failed to begin SQL transaction in kickWaitingInstances.", "error", err)
        return
    }
    defer tx.Rollback()

    instances, err := tx.InstanceStore().GetHomelessInstances(ctx, time.Now().UTC().Add(-engineSchedulingTimeout))
    if err != nil {
        slog.Error("Failed to list homeless instances in kickWaitingInstances. Some instances may remain unprocessed.", "error", err)
        return
    }
    if len(instances) == 0 {
        slog.Debug("No homeless instances found to kick.")
        return
    }
    slog.Info("Processing homeless instances.", "count", len(instances))

    for idx := range instances {
        instance := instances[idx]
        slog.Debug("Kicking instance.", "instance", instance.ID)

        data, err := json.Marshal(&instanceMessageChannelData{
            InstanceID:        instance.ID,
            LastKnownServer:   instance.Server,
            LastKnownUpdateAt: instance.UpdatedAt,
        })
        if err != nil {
            slog.Error("Failed to marshal instance data in kickWaitingInstances.", "instance", instance.ID, "error", err)
        }

        engine.instanceMessagesChannelHandler(string(data))
    }
}

type newInstanceArgs struct {
    tx            *database.SQLStore
    ID            uuid.UUID
    Namespace     *datastore.Namespace
    CalledAs      string
    Input         []byte
    Invoker       string
    DescentInfo   *enginerefactor.InstanceDescentInfo
    TelemetryInfo *enginerefactor.InstanceTelemetryInfo
    SyncHash      *string
}

const (
    apiCaller = "api"
)

func unmarshalInstanceInputData(input []byte) interface{} {
    var inputData, stateData interface{}

    err := json.Unmarshal(input, &inputData)
    if err != nil {
        inputData = base64.StdEncoding.EncodeToString(input)
    }

    if _, ok := inputData.(map[string]interface{}); ok {
        stateData = inputData
    } else {
        stateData = map[string]interface{}{
            "input": inputData,
        }
    }

    return stateData
}

func marshalInstanceInputData(input []byte) string {
    x := unmarshalInstanceInputData(input)

    data, err := json.Marshal(x)
    if err != nil {
        panic(err)
    }

    return string(data)
}

func trim(s string) string {
    return strings.TrimPrefix(s, "/")
}

func (engine *engine) NewInstance(ctx context.Context, args *newInstanceArgs) (*instanceMemory, error) {
    ctx = tracing.AddInstanceAttr(ctx, tracing.InstanceAttributes{
        Namespace:    args.Namespace.Name,
        InstanceID:   args.ID.String(),
        Invoker:      args.Invoker,
        Callpath:     args.TelemetryInfo.CallPath,
        WorkflowPath: args.CalledAs,
        Status:       core.LogRunningStatus,
    })
    ctx, cleanup, err2 := tracing.NewSpan(ctx, "creating a new Instance: "+args.ID.String()+", workflow: "+args.CalledAs)
    if err2 != nil {
        slog.Debug("failed in new instance", "error", err2)
    }
    defer cleanup()
    slog.DebugContext(ctx, "Initializing new instance creation.")
    file, data, err := engine.mux(ctx, args.Namespace, args.CalledAs)
    if err != nil {
        return nil, err
    }

    var wf model.Workflow
    err = wf.Load(data)
    if err != nil {
        slog.ErrorContext(ctx, "Failed to parse workflow definition.", "error", err)
        return nil, derrors.NewUncatchableError("direktiv.workflow.invalid", "cannot parse workflow '%s': %v", trim(file.Path), err)
    }

    if len(wf.GetStartDefinition().GetEvents()) > 0 {
        if strings.ToLower(args.Invoker) == apiCaller {
            return nil, derrors.NewUncatchableError("direktiv.workflow.invoke", "cannot manually invoke event-based workflow")
        }
        if strings.HasPrefix(args.Invoker, "instance") {
            return nil, derrors.NewUncatchableError("direktiv.workflow.invoke", "cannot invoke event-based workflow as a subflow")
        }
    }

    root := args.ID
    iterator := 0
    if args.DescentInfo != nil && len(args.DescentInfo.Descent) > 0 {
        root = args.DescentInfo.Descent[0].ID
        iterator = args.DescentInfo.Descent[len(args.DescentInfo.Descent)-1].Branch
    }

    descentInfo, err := args.DescentInfo.MarshalJSON()
    if err != nil {
        panic(err)
    }

    args.TelemetryInfo.NamespaceName = args.Namespace.Name
    traceParent, err := tracing.ExtractTraceParent(ctx)
    if err != nil {
        slog.Debug("NewInstance telemetry failed", "error", err)
    }
    args.TelemetryInfo.TraceParent = traceParent
    telemetryInfo, err := args.TelemetryInfo.MarshalJSON()
    if err != nil {
        panic(err)
    }
    liveData := marshalInstanceInputData(args.Input)

    ri := &enginerefactor.InstanceRuntimeInfo{}
    riData, err := ri.MarshalJSON()
    if err != nil {
        panic(err)
    }

    ci := &enginerefactor.InstanceChildrenInfo{}
    ciData, err := ci.MarshalJSON()
    if err != nil {
        panic(err)
    }

    tx := args.tx

    if tx == nil {
        tx, err = engine.flow.beginSQLTx(ctx)
        if err != nil {
            return nil, err
        }
        defer tx.Rollback()
    }
    slog.DebugContext(ctx, "Preparing to commit new instance transaction.")

    idata, err := tx.InstanceStore().CreateInstanceData(ctx, &instancestore.CreateInstanceDataArgs{
        ID:             args.ID,
        NamespaceID:    args.Namespace.ID,
        Namespace:      args.Namespace.Name,
        RootInstanceID: root,
        Server:         engine.ID,
        Invoker:        args.Invoker,
        WorkflowPath:   file.Path,
        Definition:     data,
        Input:          args.Input,
        LiveData:       []byte(liveData),
        TelemetryInfo:  telemetryInfo,
        DescentInfo:    descentInfo,
        RuntimeInfo:    riData,
        ChildrenInfo:   ciData,
        SyncHash:       args.SyncHash,
    })
    if err != nil {
        return nil, err
    }

    err = tx.Commit(ctx)
    if err != nil {
        return nil, err
    }
    slog.DebugContext(ctx, "New instance transaction committed successfully.")

    instance, err := enginerefactor.ParseInstanceData(idata)
    if err != nil {
        panic(err)
    }

    im := new(instanceMemory)
    im.engine = engine
    im.instance = instance
    im.updateArgs = new(instancestore.UpdateInstanceDataArgs)
    im.updateArgs.Server = engine.ID

    err = json.Unmarshal(im.instance.Instance.LiveData, &im.data)
    if err != nil {
        panic(err)
    }

    err = json.Unmarshal(im.instance.Instance.StateMemory, &im.memory)
    if err != nil {
        panic(err)
    }

    im.AddAttribute("loop-index", fmt.Sprintf("%d", iterator))

    engine.pubsub.NotifyInstances(im.Namespace())
    slog.InfoContext(ctx, "Workflow has been triggered")
    slog.InfoContext(tracing.WithTrack(ctx, tracing.BuildNamespaceTrack(im.Namespace().Name)), "Workflow has been triggered")

    return im, nil
}

func (engine *engine) loadStateLogic(im *instanceMemory, stateID string) error {
    workflow, err := im.Model()
    if err != nil {
        return err
    }

    var state model.State

    if stateID == "" {
        state = workflow.GetStartState()
    } else {
        wfstates := workflow.GetStatesMap()
        var exists bool
        state, exists = wfstates[stateID]
        if !exists {
            return fmt.Errorf("workflow %s cannot resolve state: %s", nohome.GetWorkflow(im.instance.Instance.WorkflowPath), stateID)
        }
    }

    im.logic, err = states.StateLogic(im, state)
    if err != nil {
        return err
    }

    return nil
}

func (engine *engine) Transition(ctx context.Context, im *instanceMemory, nextState string, attempt int) *states.Transition {
    ctx, cleanup, err := tracing.NewSpan(ctx, "engine transitions: "+nextState)
    if err != nil {
        slog.Debug("transition failed to init telemetry", "error", err)
    }
    defer cleanup()
    workflow, err := im.Model()
    if err != nil {
        engine.CrashInstance(ctx, im, err)
        return nil
    }

    oldController := im.Controller()

    if im.Step() == 0 { //nolint:nestif
        t := time.Now().UTC()
        tSoft := time.Now().UTC().Add(time.Minute * 15)
        tHard := time.Now().UTC().Add(time.Minute * 20)

        if workflow.Timeouts != nil {
            s := workflow.Timeouts.Interrupt

            if s != "" {
                d, err := duration.ParseISO8601(s)
                if err != nil {
                    engine.CrashInstance(ctx, im, err)
                    return nil
                }
                tSoft = d.Shift(t)
                tHard = tSoft.Add(time.Minute * 5)
            }

            s = workflow.Timeouts.Kill

            if s != "" {
                d, err := duration.ParseISO8601(s)
                if err != nil {
                    engine.CrashInstance(ctx, im, err)
                    return nil
                }
                tHard = d.Shift(t)
            }
        }

        engine.ScheduleSoftTimeout(ctx, im, oldController, tSoft)
        engine.ScheduleHardTimeout(ctx, im, oldController, tHard)
    }

    if nextState == "" {
        panic("don't call this function with an empty nextState")
    }

    err = engine.loadStateLogic(im, nextState)
    if err != nil {
        engine.CrashInstance(ctx, im, err)
        return nil
    }

    flow := append(im.Flow(), nextState)
    deadline := im.logic.Deadline(ctx)

    err = engine.SetMemory(ctx, im, nil)
    if err != nil {
        engine.CrashInstance(ctx, im, err)
        return nil
    }
    ctx = tracing.AddInstanceMemoryAttr(ctx,
        tracing.InstanceAttributes{
            Namespace:    im.Namespace().Name,
            InstanceID:   im.GetInstanceID().String(),
            Invoker:      im.instance.Instance.Invoker,
            Callpath:     tracing.CreateCallpath(im.instance),
            WorkflowPath: im.instance.Instance.WorkflowPath,
            Status:       core.LogUnknownStatus,
        }, im.GetState())
    t := time.Now().UTC()

    im.instance.RuntimeInfo.Flow = flow
    im.instance.RuntimeInfo.Controller = engine.pubsub.Hostname
    im.instance.RuntimeInfo.Attempts = attempt
    im.instance.RuntimeInfo.StateBeginTime = t
    rtData, err := im.instance.RuntimeInfo.MarshalJSON()
    if err != nil {
        panic(err)
    }
    im.updateArgs.RuntimeInfo = &rtData

    im.instance.Instance.Deadline = &deadline
    im.updateArgs.Deadline = im.instance.Instance.Deadline

    err = im.flushUpdates(ctx)
    if err != nil {
        engine.CrashInstance(ctx, im, err)
        return nil
    }

    engine.ScheduleSoftTimeout(ctx, im, oldController, deadline)

    return engine.runState(ctx, im, nil, nil)
}

func (engine *engine) CrashInstance(ctx context.Context, im *instanceMemory, err error) {
    cerr := new(derrors.CatchableError)
    uerr := new(derrors.UncatchableError)

    if errors.As(err, &cerr) {
        engine.reportInstanceCrashed(ctx, im, "catchable", cerr.Code, err)
    } else if errors.As(err, &uerr) && uerr.Code != "" {
        engine.reportInstanceCrashed(ctx, im, "uncatchable", uerr.Code, err)
    } else {
        _, file, line, _ := runtime.Caller(1)
        engine.reportInstanceCrashed(ctx, im, "unknown", fmt.Sprintf("thrown by %s:%d", file, line), err)
    }

    engine.SetInstanceFailed(ctx, im, err)
    engine.TerminateInstance(ctx, im)
}

func (engine *engine) setEndAt(im *instanceMemory) {
    t := time.Now().UTC()
    im.instance.Instance.EndedAt = &t
    im.updateArgs.EndedAt = im.instance.Instance.EndedAt
}

type noCancelCtx struct {
    //nolint:containedctx
    ctx context.Context
}

func (c noCancelCtx) Deadline() (time.Time, bool)       { return time.Time{}, false }
func (c noCancelCtx) Done() <-chan struct{}             { return nil }
func (c noCancelCtx) Err() error                        { return nil }
func (c noCancelCtx) Value(key interface{}) interface{} { return c.ctx.Value(key) }

// WithoutCancel returns a context that is never canceled.
func NoCancelContext(ctx context.Context) context.Context {
    return noCancelCtx{ctx: ctx}
}

func (engine *engine) TerminateInstance(ctx context.Context, im *instanceMemory) {
    if engine.GetIsInstanceCrashed(im) {
        ctx = NoCancelContext(ctx)
    }

    engine.setEndAt(im)

    engine.freeArtefacts(im) //nolint:contextcheck
    err := engine.freeMemory(ctx, im)
    if err != nil {
        if !engine.GetIsInstanceCrashed(im) {
            engine.CrashInstance(ctx, im, err)
            return
        }

        engine.forceFreeCriticalMemory(ctx, im)
        slog.Debug("Failed to free memory during a crash", "error", err)
    }

    engine.WakeInstanceCaller(ctx, im)
}

//nolint:gocognit
func (engine *engine) runState(ctx context.Context, im *instanceMemory, wakedata []byte, err error) *states.Transition {
    ctx = tracing.AddNamespace(ctx, im.Namespace().Name)
    ctx = tracing.WithTrack(ctx, tracing.BuildInstanceTrack(im.instance))
    ctx, cleanup, err3 := tracing.NewSpan(ctx, "preparing instance for state execution")
    if err != nil {
        slog.Debug("failed to init telemery in runstate", "error", err3)
    }
    defer cleanup()

    slog.DebugContext(ctx, "Starting state execution.")

    var transition *states.Transition

    if err != nil {
        slog.ErrorContext(ctx, "Error before state execution.", "error", err)

        goto failure
    }

    if lq := im.logic.GetLog(); im.GetMemory() == nil && len(wakedata) == 0 && lq != nil {
        var object interface{}
        object, err = jqOne(im.data, lq) //nolint:contextcheck
        if err != nil {
            slog.ErrorContext(ctx, "Failed to process jq query on state data.", "error", fmt.Errorf("query failed %v, err: %w", lq, err))

            goto failure
        }

        var data []byte
        data, err = json.MarshalIndent(object, "", "  ")
        if err != nil {
            err = derrors.NewInternalError(fmt.Errorf("failed to marshal state data: %w", err))
            slog.ErrorContext(ctx, "Failed to marshal jq query result for logging.", "error", fmt.Errorf("failed to marshal state data: %w", err))

            goto failure
        }

        engine.UserLog(ctx, im, string(data))
    }

    if md := im.logic.GetMetadata(); im.GetMemory() == nil && len(wakedata) == 0 && md != nil {
        var object interface{}
        object, err = jqOne(im.data, md) //nolint:contextcheck
        if err != nil {
            slog.ErrorContext(ctx, "Failed to execute jq query for metadata.", "error", err)

            goto failure
        }

        var data []byte
        data, err = json.MarshalIndent(object, "", "  ")
        if err != nil {
            err = derrors.NewInternalError(fmt.Errorf("failed to marshal state data: %w", err))
            slog.ErrorContext(ctx, "Failed to marshal metadata.", "error", err)

            goto failure
        }

        engine.StoreMetadata(ctx, im, string(data))
    }
    ctx = tracing.AddInstanceMemoryAttr(ctx, tracing.InstanceAttributes{
        Namespace:    im.Namespace().Name,
        InstanceID:   im.GetInstanceID().String(),
        Invoker:      im.instance.Instance.Invoker,
        Callpath:     tracing.CreateCallpath(im.instance),
        WorkflowPath: im.instance.Instance.WorkflowPath,
        Status:       core.LogUnknownStatus,
    }, im.GetState())
    slog.InfoContext(ctx, "Running state logic.")

    transition, err = im.logic.Run(ctx, wakedata)
    if err != nil {
        slog.ErrorContext(ctx, "State logic execution failed.", "error", err)

        goto failure
    }
    slog.DebugContext(ctx, "Applying state transformation based on logic run.")

    err = engine.transformState(ctx, im, transition)
    if err != nil {
        slog.ErrorContext(ctx, "State transformation failed.", "error", err)

        goto failure
    }

    slog.DebugContext(ctx, "State logic executed. Processing post-execution actions.")

next:
    slog.DebugContext(ctx, "Processing post-execution actions.")

    return engine.transitionState(ctx, im, transition)

failure:
    slog.ErrorContext(ctx, "State execution failed.", "error", err)
    // traceStateError(ctx, err)

    var breaker int

    if breaker > 10 {
        err = derrors.NewInternalError(errors.New("somehow ended up in a catchable error loop"))
        slog.ErrorContext(ctx, "Possible error loop detected.", "error", err)
    }

    err1 := engine.CancelInstanceChildren(ctx, im)
    if err1 != nil {
        slog.ErrorContext(ctx, "Canceling Instance's children failed.", "error", err1)
    }
    cerr := new(derrors.CatchableError)

    if errors.As(err, &cerr) {
        err2 := im.StoreData("error", cerr)
        if err2 != nil {
            slog.ErrorContext(ctx, "Failed to store error data.", "error", err2)
        }

        for _, catch := range im.logic.ErrorDefinitions() {
            errRegex := catch.Error
            if errRegex == "*" {
                errRegex = ".*"
            }

            matched, regErr := regexp.MatchString(errRegex, cerr.Code)
            if regErr != nil {
                slog.ErrorContext(ctx, "Regex compilation failed for error catch definition.", "error", regErr)
            }

            if matched {
                slog.InfoContext(ctx, "Catchable error matched; executing defined transition.")
                slog.ErrorContext(ctx, "State failed with an error", "error", fmt.Errorf("state failed with an error '%s': %s", cerr.Code, cerr.Message))

                transition = &states.Transition{
                    Transform: "",
                    NextState: catch.Transition,
                }

                goto next
            }
        }
    }
    slog.ErrorContext(ctx, "Unrecoverable error encountered; initiating instance crash.", "error", err)
    engine.CrashInstance(ctx, im, err)

    return nil
}

func (engine *engine) transformState(ctx context.Context, im *instanceMemory, transition *states.Transition) error {
    if transition == nil || transition.Transform == nil {
        return nil
    }

    if s, ok := transition.Transform.(string); ok && (s == "" || s == ".") {
        return nil
    }
    loggingCtx := tracing.AddInstanceMemoryAttr(ctx, tracing.InstanceAttributes{
        Namespace:    im.instance.Instance.Namespace,
        InstanceID:   im.GetInstanceID().String(),
        Invoker:      im.instance.Instance.Invoker,
        Callpath:     tracing.CreateCallpath(im.instance),
        WorkflowPath: im.instance.Instance.WorkflowPath,
        Status:       core.LogUnknownStatus,
    }, im.GetState())
    loggingCtx = tracing.WithTrack(loggingCtx, tracing.BuildInstanceTrack(im.instance))
    slog.DebugContext(loggingCtx, "Transforming state data.")

    x, err := jqObject(im.data, transition.Transform) //nolint:contextcheck
    if err != nil {
        slog.ErrorContext(loggingCtx, "Failed to apply jq to transform.", "error", err)

        return derrors.WrapCatchableError("unable to apply transform: %v", err)
        // return derrors.WrapCatchableError("Failed to apply jq transformation on state data. Transformation: '%v', Error: %v", transition.Transform, err)
    }

    im.replaceData(x)
    slog.DebugContext(loggingCtx, "Successfully transformed state data.")

    return nil
}

func (engine *engine) transitionState(ctx context.Context, im *instanceMemory, transition *states.Transition) *states.Transition {
    e := im.flushUpdates(ctx)
    if e != nil {
        slog.Error("Failed to flush updates for instance.", "instance", im.ID(), "namespace", im.Namespace(), "error", e)
        engine.CrashInstance(ctx, im, e)

        return nil
    }

    if transition == nil {
        engine.InstanceYield(ctx, im)

        return nil
    }
    loggingCtx := tracing.AddInstanceMemoryAttr(ctx, tracing.InstanceAttributes{
        Namespace:    im.Namespace().Name,
        InstanceID:   im.GetInstanceID().String(),
        Invoker:      im.instance.Instance.Invoker,
        Callpath:     tracing.CreateCallpath(im.instance),
        WorkflowPath: im.instance.Instance.WorkflowPath,
        Status:       core.LogUnknownStatus,
    }, im.GetState())
    instanceTrackCtx := tracing.WithTrack(loggingCtx, tracing.BuildInstanceTrack(im.instance))

    if transition.NextState != "" {
        slog.DebugContext(instanceTrackCtx, "Transitioning to next state.")

        return transition
    }

    status := instancestore.InstanceStatusComplete
    if im.ErrorCode() != "" {
        status = instancestore.InstanceStatusFailed
        if im.ErrorCode() == "direktiv.cancels.parent" || im.ErrorCode() == "direktiv.cancels.api" {
            status = instancestore.InstanceStatusCancelled
        }
        slog.ErrorContext(instanceTrackCtx, "Workflow failed with an error.", "error", fmt.Errorf("'%s': %s", im.ErrorCode(), im.instance.Instance.ErrorMessage))
    }

    slog.DebugContext(instanceTrackCtx, "Instance terminated", "instance", im.ID().String(), "namespace", im.Namespace().Name)

    output := im.MarshalData()
    im.instance.Instance.Output = []byte(output)
    im.updateArgs.Output = &im.instance.Instance.Output
    im.instance.Instance.Status = status
    im.updateArgs.Status = &im.instance.Instance.Status

    slog.InfoContext(instanceTrackCtx, "Workflow completed.")
    slog.InfoContext(tracing.WithTrack(ctx, tracing.BuildNamespaceTrack(im.Namespace().Name)), "Workflow completed.")

    defer engine.pubsub.NotifyInstance(im.instance.Instance.ID)
    defer engine.pubsub.NotifyInstances(im.Namespace())

    engine.TerminateInstance(ctx, im)

    return nil
}

func (engine *engine) subflowInvoke(ctx context.Context, pi *enginerefactor.ParentInfo, instance *enginerefactor.Instance, name string, input []byte) (*instanceMemory, error) {
    var err error

    di := &enginerefactor.InstanceDescentInfo{
        Descent: append(instance.DescentInfo.Descent, *pi),
    }

    slog.InfoContext(ctx, "Invoking a subflow")

    args := &newInstanceArgs{
        ID: uuid.New(),
        Namespace: &datastore.Namespace{
            ID:   instance.Instance.NamespaceID,
            Name: instance.TelemetryInfo.NamespaceName,
        },
        CalledAs:    name,
        Input:       input,
        Invoker:     fmt.Sprintf("instance:%v", pi.ID),
        DescentInfo: di,
        TelemetryInfo: &enginerefactor.InstanceTelemetryInfo{
            TraceParent:   instance.TelemetryInfo.TraceParent,
            NamespaceName: instance.TelemetryInfo.NamespaceName,
        },
    }

    if !filepath.IsAbs(args.CalledAs) {
        dir, _ := filepath.Split(instance.Instance.WorkflowPath)
        if dir == "" {
            dir = "/"
        }
        args.CalledAs = filepath.Join(dir, args.CalledAs)
    }

    im, err := engine.NewInstance(ctx, args)
    if err != nil {
        return nil, err
    }

    im.AddAttribute("loop-index", fmt.Sprintf("%d", pi.Branch))

    return im, nil
}

type retryMessage struct {
    InstanceID string
    Data       []byte
}

const retryWakeupFunction = "retryWakeup"

func (engine *engine) scheduleRetry(id string, t time.Time, data []byte) error {
    data, err := json.Marshal(&retryMessage{
        InstanceID: id,
        Data:       data,
    })
    if err != nil {
        panic(err) // TODO ?
    }

    if d := time.Until(t); d < time.Second*5 {
        go func() {
            time.Sleep(d)
            /* #nosec */
            engine.retryWakeup(data)
        }()

        return nil
    }

    err = engine.timers.addOneShot(id, retryWakeupFunction, t, data)
    if err != nil {
        return derrors.NewInternalError(err)
    }

    return nil
}

func (engine *engine) retryWakeup(data []byte) {
    msg := new(retryMessage)

    err := json.Unmarshal(data, msg)
    if err != nil {
        slog.Error("failed to unmarshal retryMessage", "error", err)
        return
    }

    uid, err := uuid.Parse(msg.InstanceID)
    if err != nil {
        slog.Error("failed to parse instance ID in retryMessage", "error", err)
        return
    }

    ctx := context.Background()

    err = engine.enqueueInstanceMessage(ctx, uid, "wake", msg)
    if err != nil {
        slog.Error("failed to enqueue instance message for retryWakeup", "error", err)
        return
    }
}

type actionResultPayload struct {
    ActionID     string
    ErrorCode    string
    ErrorMessage string
    Output       []byte
}

type actionResultMessage struct {
    InstanceID string
    enginerefactor.ActionContext
    Payload actionResultPayload
}

func (engine *engine) createTransport() *http.Transport {
    tr := &http.Transport{
        Proxy: http.ProxyFromEnvironment,
        DialContext: (&net.Dialer{
            Timeout:   10 * time.Second,
            KeepAlive: 30 * time.Second,
            DualStack: true,
        }).DialContext,
        ForceAttemptHTTP2:     true,
        MaxIdleConns:          100,
        IdleConnTimeout:       90 * time.Second,
        TLSHandshakeTimeout:   10 * time.Second,
        ExpectContinueTimeout: 1 * time.Second,
    }

    return tr
}

func (engine *engine) WakeEventsWaiter(instance uuid.UUID, events []*cloudevents.Event) {
    ctx := context.Background()

    err := engine.enqueueInstanceMessage(ctx, instance, "event", events)
    if err != nil {
        slog.Error("failed to enqueue instance message for wakeEventsWaiter", "error", err)
        return
    }
}

func (engine *engine) EventsInvoke(tctx context.Context, workflowID uuid.UUID, events ...*cloudevents.Event) {
    ctx := context.Background()

    tx, err := engine.flow.beginSQLTx(ctx)
    if err != nil {
        slog.Error("Failed to begin SQL transaction in EventsInvoke.", "error", err)
        return
    }
    defer tx.Rollback()

    file, err := tx.FileStore().GetFileByID(ctx, workflowID)
    if err != nil {
        slog.Error("Failed to fetch file from database.", "workflowID", workflowID, "error", err)
        return
    }

    root, err := tx.FileStore().GetRoot(ctx, file.RootID)
    if err != nil {
        slog.Error("Failed to fetch Root from database.", "workflowID", workflowID, "error", err)
        return
    }

    ns, err := tx.DataStore().Namespaces().GetByName(ctx, root.Namespace)
    if err != nil {
        slog.Error("Failed to fetch namespace from database.", "namespace", root.Namespace, "error", err)
        return
    }

    tx.Rollback()

    var input []byte
    m := make(map[string]interface{})
    for _, event := range events {
        if event == nil {
            continue
        }

        m[event.Type()] = event
    }

    input, err = json.Marshal(m)
    if err != nil {
        slog.Error("Failed to marshal event data in EventsInvoke.", "error", err)
        return
    }
    tctx, end, err2 := tracing.NewSpan(tctx, "engine invoked by event")
    if err2 != nil {
        slog.Debug("Failed to tracing.NewSpan.", "error", err)
    }
    defer end()
    traceParent, err2 := tracing.ExtractTraceParent(tctx)
    if err2 != nil {
        slog.Debug("Failed to extract traceParent in EventsInvoke.", "error", err)
    }
    // TODO: tracing

    args := &newInstanceArgs{
        ID:        uuid.New(),
        Namespace: ns,
        CalledAs:  file.Path,
        Input:     input,
        Invoker:   "cloudevent",
        TelemetryInfo: &enginerefactor.InstanceTelemetryInfo{
            TraceParent:   traceParent,
            NamespaceName: ns.Name,
        },
    }

    im, err := engine.NewInstance(ctx, args)
    if err != nil {
        slog.Error("new instance", "error", err)
        return
    }
    slog.Debug("Successfully invoked new workflow instance.", "instanceID", im.ID().String(), "workflowPath", file.Path)

    go engine.start(im)
}

func (engine *engine) SetMemory(ctx context.Context, im *instanceMemory, x interface{}) error {
    im.setMemory(x)

    data, err := json.Marshal(x)
    if err != nil {
        panic(err)
    }

    im.instance.Instance.StateMemory = data
    im.updateArgs.StateMemory = &data

    return nil
}

func (engine *engine) reportInstanceCrashed(ctx context.Context, im *instanceMemory, typ, code string, err error) {
    loggingCtx := tracing.AddInstanceMemoryAttr(ctx, tracing.InstanceAttributes{
        Namespace:    im.Namespace().Name,
        InstanceID:   im.GetInstanceID().String(),
        Invoker:      im.instance.Instance.Invoker,
        Callpath:     tracing.CreateCallpath(im.instance),
        WorkflowPath: im.instance.Instance.WorkflowPath,
        Status:       core.LogUnknownStatus,
    }, im.GetState())
    instanceTrackCtx := tracing.WithTrack(loggingCtx, tracing.BuildInstanceTrack(im.instance))

    namespaceTrackCtx := tracing.WithTrack(loggingCtx, tracing.BuildNamespaceTrack(im.Namespace().Name))
    msg := fmt.Sprintf("Workflow failed with code = %v, type = %v, error = %v", typ, code, err.Error())
    slog.ErrorContext(instanceTrackCtx, msg, "error", err)
    slog.ErrorContext(namespaceTrackCtx, msg, "error", err)
}

func (engine *engine) UserLog(ctx context.Context, im *instanceMemory, msg string) {
    loggingCtx := tracing.AddInstanceMemoryAttr(ctx, tracing.InstanceAttributes{
        Namespace:    im.Namespace().Name,
        InstanceID:   im.GetInstanceID().String(),
        Invoker:      im.instance.Instance.Invoker,
        Callpath:     tracing.CreateCallpath(im.instance),
        WorkflowPath: im.instance.Instance.WorkflowPath,
        Status:       core.LogUnknownStatus,
    }, im.GetState())
    instanceTrackCtx := tracing.WithTrack(loggingCtx, tracing.BuildInstanceTrack(im.instance))
    slog.InfoContext(instanceTrackCtx, msg)
}

// GetInodePath returns the exact path to a inode.
func GetInodePath(path string) string {
    path = strings.TrimSuffix(path, "/")
    if !strings.HasPrefix(path, "/") {
        return "/" + path
    }
    path = filepath.Clean(path)

    return path
}