pkg/flow/status.go
package flow
import (
"context"
"errors"
"fmt"
"github.com/direktiv/direktiv/pkg/core"
derrors "github.com/direktiv/direktiv/pkg/flow/errors"
"github.com/direktiv/direktiv/pkg/instancestore"
"github.com/direktiv/direktiv/pkg/tracing"
"golang.org/x/exp/slog"
)
func (engine *engine) GetIsInstanceFailed(im *instanceMemory) bool {
if engine.GetIsInstanceCrashed(im) {
return true
}
if im.instance.Instance.Status == instancestore.InstanceStatusFailed || im.instance.Instance.Status == instancestore.InstanceStatusCancelled {
return true
}
if im.updateArgs.Status != nil && (*im.updateArgs.Status) == instancestore.InstanceStatusFailed {
return true
}
return false
}
func (engine *engine) GetIsInstanceCrashed(im *instanceMemory) bool {
if im.instance.Instance.Status == instancestore.InstanceStatusCrashed {
return true
}
if im.updateArgs.Status != nil && (*im.updateArgs.Status) == instancestore.InstanceStatusCrashed {
return true
}
return false
}
func (engine *engine) SetInstanceFailed(ctx context.Context, im *instanceMemory, err error) {
var status instancestore.InstanceStatus
var code, message string
status = instancestore.InstanceStatusFailed
code = ErrCodeInternal
ctx = tracing.AddInstanceMemoryAttr(ctx, tracing.InstanceAttributes{
Namespace: im.Namespace().Name,
InstanceID: im.GetInstanceID().String(),
Invoker: im.instance.Instance.Invoker,
Callpath: tracing.CreateCallpath(im.instance),
WorkflowPath: im.instance.Instance.WorkflowPath,
Status: core.LogUnknownStatus,
}, im.GetState())
ctx = tracing.WithTrack(ctx, tracing.BuildInstanceTrack(im.instance))
uerr := new(derrors.UncatchableError)
cerr := new(derrors.CatchableError)
ierr := new(derrors.InternalError)
slog.ErrorContext(ctx, "Workflow canceled due to failed instance")
if errors.As(err, &uerr) {
code = uerr.Code
message = uerr.Message
} else if errors.As(err, &cerr) {
code = cerr.Code
message = cerr.Message
} else if errors.As(err, &ierr) {
slog.ErrorContext(ctx, "Workflow instance encountered an internal error.", "error", fmt.Errorf("internal error: %w", ierr))
status = instancestore.InstanceStatusCrashed
message = "an internal error occurred"
} else {
slog.ErrorContext(ctx, "Workflow instance failed due to an unhandled error.", "error", fmt.Errorf("unhandled error: %w", err))
status = instancestore.InstanceStatusCrashed
code = ErrCodeInternal
message = err.Error()
}
if code == "direktiv.cancels.parent" || code == "direktiv.cancels.api" {
status = instancestore.InstanceStatusCancelled
}
im.instance.Instance.Status = status
im.instance.Instance.ErrorCode = code
im.instance.Instance.ErrorMessage = []byte(message)
im.updateArgs.Status = &im.instance.Instance.Status
im.updateArgs.ErrorCode = &im.instance.Instance.ErrorCode
im.updateArgs.ErrorMessage = &im.instance.Instance.ErrorMessage
}
func (engine *engine) InstanceRaise(ctx context.Context, im *instanceMemory, cerr *derrors.CatchableError) error {
if im.ErrorCode() == "" {
im.instance.Instance.Status = instancestore.InstanceStatusFailed
im.instance.Instance.ErrorCode = cerr.Code
im.instance.Instance.ErrorMessage = []byte(cerr.Message)
im.updateArgs.Status = &im.instance.Instance.Status
im.updateArgs.ErrorCode = &im.instance.Instance.ErrorCode
im.updateArgs.ErrorMessage = &im.instance.Instance.ErrorMessage
} else {
return derrors.NewCatchableError(ErrCodeMultipleErrors, "the workflow instance tried to throw multiple errors")
}
return nil
}