pkg/flow/temporary.go
package flow
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net/http"
"strconv"
"time"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/direktiv/direktiv/pkg/core"
"github.com/direktiv/direktiv/pkg/datastore"
enginerefactor "github.com/direktiv/direktiv/pkg/engine"
"github.com/direktiv/direktiv/pkg/filestore"
derrors "github.com/direktiv/direktiv/pkg/flow/errors"
log "github.com/direktiv/direktiv/pkg/flow/internallogger"
"github.com/direktiv/direktiv/pkg/flow/states"
"github.com/direktiv/direktiv/pkg/model"
"github.com/direktiv/direktiv/pkg/service"
"github.com/direktiv/direktiv/pkg/tracing"
"github.com/direktiv/direktiv/pkg/utils"
"github.com/google/uuid"
"go.opentelemetry.io/otel/trace"
)
// TEMPORARY EVERYTHING
func (im *instanceMemory) BroadcastCloudevent(ctx context.Context, event *cloudevents.Event, dd int64) error {
return im.engine.events.BroadcastCloudevent(ctx, im.Namespace(), event, dd)
}
//nolint:gocognit
func (im *instanceMemory) GetVariables(ctx context.Context, vars []states.VariableSelector) ([]states.Variable, error) {
x := make([]states.Variable, 0)
tx, err := im.engine.flow.beginSQLTx(ctx)
if err != nil {
return nil, err
}
defer tx.Rollback()
for _, selector := range vars {
//nolint:nestif
if selector.Scope == "" || selector.Scope == utils.VarScopeInstance || selector.Scope == utils.VarScopeWorkflow || selector.Scope == utils.VarScopeNamespace {
if selector.Scope == "" {
selector.Scope = utils.VarScopeNamespace
}
var item *datastore.RuntimeVariable
switch selector.Scope {
case utils.VarScopeInstance:
item, err = tx.DataStore().RuntimeVariables().GetForInstance(ctx, im.instance.Instance.ID, selector.Key)
case utils.VarScopeWorkflow:
item, err = tx.DataStore().RuntimeVariables().GetForWorkflow(ctx, im.instance.Instance.Namespace, im.instance.Instance.WorkflowPath, selector.Key)
case utils.VarScopeNamespace:
item, err = tx.DataStore().RuntimeVariables().GetForNamespace(ctx, im.instance.Instance.Namespace, selector.Key)
default:
return nil, derrors.NewInternalError(errors.New("invalid scope"))
}
if errors.Is(err, datastore.ErrNotFound) {
x = append(x, states.Variable{
Scope: selector.Scope,
Key: selector.Key,
Data: []byte{},
})
} else if err != nil {
return nil, derrors.NewInternalError(err)
} else {
data, err := tx.DataStore().RuntimeVariables().LoadData(ctx, item.ID)
if err != nil {
return nil, derrors.NewInternalError(err)
}
x = append(x, states.Variable{
Scope: selector.Scope,
Key: selector.Key,
Data: data,
})
}
continue
}
if selector.Scope == utils.VarScopeFileSystem { //nolint:nestif
file, err := tx.FileStore().ForNamespace(im.instance.Instance.Namespace).GetFile(ctx, selector.Key)
if errors.Is(err, filestore.ErrNotFound) {
x = append(x, states.Variable{
Scope: selector.Scope,
Key: selector.Key,
Data: make([]byte, 0),
})
} else if err != nil {
return nil, err
} else {
// TODO: alan, maybe need to enhance the GetData function to also return us some information like mime type, checksum, and size
if file.Typ == filestore.FileTypeDirectory {
return nil, model.ErrVarNotFile
}
data, err := tx.FileStore().ForFile(file).GetData(ctx)
if err != nil {
return nil, err
}
x = append(x, states.Variable{
Scope: selector.Scope,
Key: selector.Key,
Data: data,
})
}
continue
}
}
return x, nil
}
func (im *instanceMemory) ListenForEvents(ctx context.Context, events []*model.ConsumeEventDefinition, all bool) error {
err := im.engine.events.deleteInstanceEventListeners(ctx, im)
if err != nil {
return err
}
err = im.engine.events.listenForEvents(ctx, im, events, all)
if err != nil {
return err
}
return nil
}
func (im *instanceMemory) Log(ctx context.Context, level log.Level, a string, x ...interface{}) {
ctx = tracing.AddInstanceMemoryAttr(ctx, tracing.InstanceAttributes{
Namespace: im.Namespace().Name,
InstanceID: im.GetInstanceID().String(),
Invoker: im.instance.Instance.Invoker,
Callpath: tracing.CreateCallpath(im.instance),
WorkflowPath: im.instance.Instance.WorkflowPath,
Status: core.LogUnknownStatus,
}, im.GetState())
ctx = tracing.WithTrack(ctx, tracing.BuildInstanceTrack(im.instance))
switch level {
case log.Info:
slog.InfoContext(ctx, fmt.Sprintf(a, x...))
case log.Debug:
slog.DebugContext(ctx, fmt.Sprintf(a, x...))
case log.Error:
slog.ErrorContext(ctx, fmt.Sprintf(a, x...))
case log.Panic:
slog.ErrorContext(ctx, fmt.Sprintf("Panic: "+a, x...))
}
}
func (im *instanceMemory) AddAttribute(tag, value string) {
if im.tags == nil {
im.tags = make(map[string]string)
}
im.tags[tag] = value
}
func (im *instanceMemory) Iterator() (int, bool) {
if im.tags == nil {
return 0, false
}
val, ok := im.tags["loop-index"]
iterator, err := strconv.Atoi(val)
if err != nil {
return 0, false
}
return iterator, ok
}
func (im *instanceMemory) Raise(ctx context.Context, err *derrors.CatchableError) error {
return im.engine.InstanceRaise(ctx, im, err)
}
func (im *instanceMemory) RetrieveSecret(ctx context.Context, secret string) (string, error) {
tx, err := im.engine.flow.beginSQLTx(ctx)
if err != nil {
return "", err
}
defer tx.Rollback()
secretData, err := tx.DataStore().Secrets().Get(ctx, im.instance.Instance.Namespace, secret)
if err != nil {
return "", err
}
return string(secretData.Data), nil
}
//nolint:gocognit
func (im *instanceMemory) SetVariables(ctx context.Context, vars []states.VariableSetter) error {
tx, err := im.engine.flow.beginSQLTx(ctx)
if err != nil {
return err
}
defer tx.Rollback()
for idx := range vars {
v := vars[idx]
var item *datastore.RuntimeVariable
switch v.Scope {
case utils.VarScopeInstance:
item, err = tx.DataStore().RuntimeVariables().GetForInstance(ctx, im.instance.Instance.ID, v.Key)
case utils.VarScopeWorkflow:
item, err = tx.DataStore().RuntimeVariables().GetForWorkflow(ctx, im.instance.Instance.Namespace, im.instance.Instance.WorkflowPath, v.Key)
case utils.VarScopeNamespace:
item, err = tx.DataStore().RuntimeVariables().GetForNamespace(ctx, im.instance.Instance.Namespace, v.Key)
default:
return derrors.NewInternalError(errors.New("invalid scope"))
}
if err != nil && !errors.Is(err, datastore.ErrNotFound) {
return err
}
d := string(v.Data)
if len(d) == 0 {
err = tx.DataStore().RuntimeVariables().Delete(ctx, item.ID)
if err != nil && !errors.Is(err, datastore.ErrNotFound) {
return err
}
continue
}
//nolint:nestif
if !(v.MIMEType == "text/plain; charset=utf-8" || v.MIMEType == "text/plain" || v.MIMEType == "application/octet-stream") && (d == "{}" || d == "[]" || d == "0" || d == `""` || d == "null") {
if item != nil {
err = tx.DataStore().RuntimeVariables().Delete(ctx, item.ID)
if err != nil && !errors.Is(err, datastore.ErrNotFound) {
return err
}
}
} else {
newVar := &datastore.RuntimeVariable{
Name: v.Key,
MimeType: v.MIMEType,
Data: v.Data,
Namespace: im.instance.Instance.Namespace,
}
switch v.Scope {
case utils.VarScopeInstance:
newVar.InstanceID = im.instance.Instance.ID
case utils.VarScopeWorkflow:
newVar.WorkflowPath = im.instance.Instance.WorkflowPath
}
_, err = tx.DataStore().RuntimeVariables().Set(ctx, newVar)
if err != nil {
return err
}
}
}
err = tx.Commit(ctx)
if err != nil {
return err
}
return nil
}
func (im *instanceMemory) Sleep(ctx context.Context, d time.Duration, x interface{}) error {
return im.ScheduleRetry(ctx, d, im.logic.GetID(), x)
}
func (im *instanceMemory) GetInstanceData() interface{} {
return im.data
}
func (im *instanceMemory) GetModel() (*model.Workflow, error) {
return im.Model()
}
func (im *instanceMemory) GetInstanceID() uuid.UUID {
return im.instance.Instance.ID
}
func (im *instanceMemory) GetTraceID(ctx context.Context) string {
return trace.SpanFromContext(ctx).SpanContext().TraceID().String()
}
func (im *instanceMemory) PrimeDelayedEvent(event cloudevents.Event) {
im.eventQueue = append(im.eventQueue, event.ID())
}
func (im *instanceMemory) SetMemory(ctx context.Context, x interface{}) error {
return im.engine.SetMemory(ctx, im, x)
}
func (im *instanceMemory) Deadline(ctx context.Context) time.Time {
return time.Now().UTC().Add(states.DefaultShortDeadline)
}
func (im *instanceMemory) LivingChildren(ctx context.Context) []*states.ChildInfo {
return nil
}
func (im *instanceMemory) ScheduleRetry(ctx context.Context, d time.Duration, stateID string, x interface{}) error {
data, err := json.Marshal(x)
if err != nil {
return err
}
t := time.Now().UTC().Add(d)
err = im.engine.scheduleRetry(im.ID().String(), t, data) //nolint:contextcheck
if err != nil {
return err
}
return nil
}
func (im *instanceMemory) CreateChild(ctx context.Context, args states.CreateChildArgs) (states.Child, error) {
var ci states.ChildInfo
if args.Definition.GetType() == model.SubflowFunctionType {
pi := &enginerefactor.ParentInfo{
ID: im.ID(),
State: im.logic.GetID(),
Step: im.Step(),
Branch: args.Iterator,
}
sfim, err := im.engine.subflowInvoke(ctx, pi, im.instance, args.Definition.(*model.SubflowFunctionDefinition).Workflow, args.Input)
if err != nil {
return nil, err
}
ci.ID = sfim.ID().String()
ci.Type = "subflow"
// ci.Attempts: this is ignored here. Must be handled elsewhere.
return &subflowHandle{
im: sfim,
info: ci,
engine: im.engine,
}, nil
}
switch args.Definition.GetType() { //nolint:exhaustive
case model.SystemKnativeFunctionType:
case model.NamespacedKnativeFunctionType:
case model.ReusableContainerFunctionType:
default:
return nil, derrors.NewInternalError(fmt.Errorf("unsupported function type: %v", args.Definition.GetType()))
}
uid := uuid.New()
ar, arReq, err := im.engine.newIsolateRequest(im, im.logic.GetID(), args.Timeout, args.Definition, args.Input, uid, args.Async, args.Files, args.Iterator)
if err != nil {
return nil, err
}
ci.ID = ar.ActionID
ci.ServiceName = ar.Container.Service
ci.Type = "isolate"
return &knativeHandle{
im: im,
info: ci,
engine: im.engine,
ar: ar,
arReq: arReq,
}, nil
}
type subflowHandle struct {
im *instanceMemory
info states.ChildInfo
engine *engine
}
func (child *subflowHandle) Run(ctx context.Context) {
go child.engine.start(child.im) //nolint:contextcheck
}
func (child *subflowHandle) Info() states.ChildInfo {
return child.info
}
func (engine *engine) newIsolateRequest(im *instanceMemory, stateID string, timeout int,
fn model.FunctionDefinition, inputData []byte,
uid uuid.UUID, async bool, files []model.FunctionFileDefinition, iterator int,
) (*functionRequest, *enginerefactor.ActionRequest, error) {
ar := new(functionRequest)
ar.Timeout = timeout
ar.ActionID = uid.String()
if ar.Timeout == 0 {
ar.Timeout = 5 * 60 // 5 mins default, knative's default
}
arReq := enginerefactor.ActionRequest{
Async: async,
UserInput: inputData,
Deadline: time.Now().UTC().Add(time.Duration(timeout) * time.Second), // TODO?
}
callpath := ""
if len(im.instance.DescentInfo.Descent) == 0 {
callpath = im.GetInstanceID().String()
}
for _, v := range im.instance.DescentInfo.Descent {
callpath += "/" + v.ID.String()
}
arCtx := enginerefactor.ActionContext{
TraceParent: im.instance.TelemetryInfo.TraceParent,
State: stateID,
Branch: iterator,
Namespace: im.Namespace().Name,
Workflow: im.instance.Instance.WorkflowPath,
Instance: im.ID().String(),
Callpath: callpath,
Action: uid.String(),
}
arReq.ActionContext = arCtx
ar.Container.Type = fn.GetType()
switch ar.Container.Type { //nolint:exhaustive
case model.ReusableContainerFunctionType:
con := fn.(*model.ReusableFunctionDefinition) //nolint:forcetypeassert
scale := int32(0)
ar.Container.Image = con.Image
ar.Container.Cmd = con.Cmd
ar.Container.Size = con.Size
ar.Container.Scale = int(scale)
ar.Container.ID = con.ID
ar.Container.Service = service.GetServiceURL(arCtx.Namespace, core.ServiceTypeWorkflow, arCtx.Workflow, con.ID)
case model.NamespacedKnativeFunctionType:
con := fn.(*model.NamespacedFunctionDefinition) //nolint:forcetypeassert
ar.Container.ID = con.ID
ar.Container.Service = service.GetServiceURL(arCtx.Namespace, core.ServiceTypeNamespace, con.Path, "")
case model.SystemKnativeFunctionType:
con := fn.(*model.SystemFunctionDefinition) //nolint:forcetypeassert
ar.Container.ID = con.ID
ar.Container.Service = service.GetServiceURL(core.SystemNamespace, core.ServiceTypeSystem, con.Path, "")
default:
return nil, nil, fmt.Errorf("unexpected function type: %v", fn)
}
// check for duplicate file names
m := make(map[string]*model.FunctionFileDefinition)
for i := range files {
f := &files[i]
k := f.As
if k == "" {
k = f.Key
}
if _, exists := m[k]; exists {
return nil, nil, fmt.Errorf("multiple files with same name: %s", k)
}
m[k] = f
}
files2 := make([]enginerefactor.FunctionFileDefinition, len(files))
for i := range files {
files2[i] = enginerefactor.FunctionFileDefinition{
Key: files[i].Key,
As: files[i].As,
Scope: files[i].Scope,
Type: files[i].Type,
Permissions: files[i].Permissions,
// Content: TODO: evaluate if we should inject the content here?
}
}
arReq.Files = files2
return ar, &arReq, nil
}
type knativeHandle struct {
im *instanceMemory
info states.ChildInfo
engine *engine
ar *functionRequest
arReq *enginerefactor.ActionRequest
}
func (child *knativeHandle) Run(ctx context.Context) {
go child.engine.doActionRequest(ctx, child.ar, child.arReq) // using a go routine may be unsafe (caused panics) where but why?
}
func (child *knativeHandle) Info() states.ChildInfo {
return child.info
}
func (engine *engine) doActionRequest(ctx context.Context, ar *functionRequest, arReq *enginerefactor.ActionRequest) {
// Log warning if timeout exceeds max allowed timeout.
ctx = tracing.AddInstanceAttr(ctx, tracing.InstanceAttributes{
Namespace: arReq.Namespace,
InstanceID: arReq.Instance,
Callpath: arReq.Callpath,
WorkflowPath: arReq.Workflow,
Status: core.LogUnknownStatus,
})
ctx = tracing.WithTrack(ctx, tracing.BuildInstanceTrackViaCallpath(arReq.Callpath))
if actionTimeout := time.Duration(ar.Timeout) * time.Second; actionTimeout > engine.server.config.GetFunctionsTimeout() {
slog.WarnContext(ctx,
fmt.Sprintf("Warning: Action timeout '%v' is longer than max allowed duariton '%v'", actionTimeout, engine.server.config.GetFunctionsTimeout()),
)
}
switch ar.Container.Type { //nolint:exhaustive
case model.DefaultFunctionType:
fallthrough
case model.NamespacedKnativeFunctionType:
fallthrough
case model.SystemKnativeFunctionType:
fallthrough
case model.ReusableContainerFunctionType:
go engine.doKnativeHTTPRequest(ctx, ar, arReq) // go routine causes panic
default:
panic(fmt.Errorf("unexpected type: %+v", ar.Container.Type))
}
}
func (engine *engine) doKnativeHTTPRequest(ctx context.Context,
ar *functionRequest, arReq *enginerefactor.ActionRequest,
) {
ctx, spanEnd, err := tracing.NewSpan(ctx, "executing knative request to action")
if err != nil {
slog.Debug("failed in doKnativeHTTPRequest", "error", err)
}
defer spanEnd()
slog.DebugContext(ctx, "starting function request")
tr := engine.createTransport()
addr := ar.Container.Service
slog.Debug("function request for image", "name", ar.Container.Image, "addr", addr, "image_id", ar.Container.ID)
rctx, cancel := context.WithDeadline(context.Background(), arReq.Deadline)
defer cancel()
slog.DebugContext(ctx, fmt.Sprintf("deadline for request is %s", time.Until(arReq.Deadline)), "deadline", time.Until(arReq.Deadline))
reader, err := enginerefactor.EncodeActionRequest(*arReq)
if err != nil {
engine.reportError(ctx, &arReq.ActionContext, err)
return
}
req, err := http.NewRequestWithContext(rctx, http.MethodPost, addr, reader)
if err != nil {
engine.reportError(ctx, &arReq.ActionContext, err)
return
}
req.Header.Add(DirektivActionIDHeader, ar.ActionID)
client := &http.Client{
Transport: tr,
}
var resp *http.Response
// potentially dns error for a brand new service
// we just loop and see if we can recreate the service
// one minute wait max
//nolint:intrange
for i := 0; i < 300; i++ { // 5 minutes max retry
slog.Debug("attempting function request", "retry", i, "address", addr)
resp, err = client.Do(req)
if err != nil {
if ctxErr := rctx.Err(); ctxErr != nil {
slog.Debug("request canceled or deadline exceeded", "error", ctxErr)
engine.reportError(ctx, &arReq.ActionContext, fmt.Errorf("request timed out or was canceled: %w", ctxErr))
return
}
if i%10 == 0 {
slog.DebugContext(ctx, fmt.Sprintf("retrying function request for container '%s' (attempt %d)", ar.Container.ID, i), "image", ar.Container.Image, "image_id", ar.Container.ID, "error", err)
} else {
slog.Debug("retrying function request", "image", ar.Container.Image, "image_id", ar.Container.ID, "error", err)
}
time.Sleep(time.Second)
} else {
defer resp.Body.Close()
slog.Debug("function request successful", "image", ar.Container.Image, "image_id", ar.Container.ID)
aid := resp.Header.Get(DirektivActionIDHeader)
if len(aid) == 0 {
slog.Debug("action ID missing from response", "this", this())
engine.reportError(ctx, &arReq.ActionContext, fmt.Errorf("missing action ID in response"))
return
}
var respBody enginerefactor.ActionResponse
decoder := json.NewDecoder(resp.Body)
if err := decoder.Decode(&respBody); err != nil {
slog.Debug("failed to decode response body", "error", err)
engine.reportError(ctx, &arReq.ActionContext, err)
return
}
payload := &actionResultPayload{
ActionID: aid,
ErrorCode: respBody.ErrCode,
ErrorMessage: respBody.ErrMsg,
Output: respBody.Output,
}
uid, err := uuid.Parse(arReq.Instance)
if err != nil {
slog.Debug("failed to parse instance UUID", "error", err)
engine.reportError(ctx, &arReq.ActionContext, err)
return
}
err = engine.enqueueInstanceMessage(ctx, uid, "action", payload)
if err != nil {
slog.Debug("failed to enqueue instance message", "error", err)
engine.reportError(ctx, &arReq.ActionContext, err)
return
}
break
}
}
if err != nil {
err := fmt.Errorf("failed creating function with image %s name %s with error: %w", ar.Container.Image, ar.Container.ID, err)
engine.reportError(ctx, &arReq.ActionContext, err)
return
}
if resp.StatusCode != http.StatusOK {
engine.reportError(ctx, &arReq.ActionContext, fmt.Errorf("action error status: %d", resp.StatusCode))
}
slog.DebugContext(ctx, "function request done")
}
func (engine *engine) reportError(ctx context.Context, ar *enginerefactor.ActionContext, err error) {
ctx = tracing.AddNamespace(ctx, ar.Namespace)
tracing.AddInstanceAttr(ctx, tracing.InstanceAttributes{
Namespace: ar.Namespace,
InstanceID: ar.Instance,
Callpath: ar.Callpath,
WorkflowPath: ar.Workflow,
Status: core.LogUnknownStatus,
})
ctx = tracing.WithTrack(ctx, tracing.BuildInstanceTrackViaCallpath(ar.Callpath))
slog.ErrorContext(
ctx,
"action failed",
"error",
err,
)
}