pkg/flow/states/action-helpers.go
package states
import (
"context"
"encoding/json"
"errors"
"fmt"
"regexp"
"strconv"
"time"
derrors "github.com/direktiv/direktiv/pkg/flow/errors"
log "github.com/direktiv/direktiv/pkg/flow/internallogger"
"github.com/direktiv/direktiv/pkg/model"
"github.com/direktiv/direktiv/pkg/utils"
"github.com/senseyeio/duration"
)
type actionRetryInfo struct {
Children []*ChildInfo
Idx int
Iterator int
}
type actionResultPayload struct {
ActionID string
ErrorCode string
ErrorMessage string
Output []byte
}
func isRetryable(code string, patterns []string) bool {
for _, pattern := range patterns {
// NOTE: this error should be checked in model validation
if pattern == "*" {
pattern = ".*"
}
matched, _ := regexp.MatchString(pattern, code)
if matched {
return true
}
}
return false
}
func retryDelay(attempt int, delay string, multiplier float64) time.Duration {
d := time.Second * 5
if x, err := duration.ParseISO8601(delay); err == nil {
t0 := time.Now().UTC()
t1 := x.Shift(t0)
d = t1.Sub(t0)
}
if multiplier != 0 {
//nolint:intrange
for i := 0; i < attempt; i++ {
d = time.Duration(float64(d) * multiplier)
}
}
return d
}
func preprocessRetry(retry *model.RetryDefinition, attempt int, err error) (time.Duration, error) {
var d time.Duration
if retry == nil {
return d, err
}
cerr := new(derrors.CatchableError)
if !errors.As(err, &cerr) {
return d, err
}
if !isRetryable(cerr.Code, retry.Codes) {
return d, err
}
if attempt >= retry.MaxAttempts {
return d, derrors.NewCatchableError("direktiv.retries.exceeded", "maximum retries exceeded")
}
d = retryDelay(attempt, retry.Delay, retry.Multiplier)
return d, nil
}
func scheduleRetry(ctx context.Context, instance Instance, children []*ChildInfo, idx int, d time.Duration) error {
var err error
children[idx].Attempts++
children[idx].ID = ""
err = instance.SetMemory(ctx, children)
if err != nil {
return err
}
retry := &actionRetryInfo{
Idx: idx,
Children: children,
Iterator: idx,
}
err = instance.Sleep(ctx, d, retry)
if err != nil {
return err
}
return nil
}
type generateActionInputArgs struct {
Instance Instance
Source interface{}
Action *model.ActionDefinition
Files []model.FunctionFileDefinition
Iterator int
}
func generateActionInput(ctx context.Context, args *generateActionInputArgs) ([]byte, []model.FunctionFileDefinition, error) {
var err error
var input interface{}
input, err = jqObject(args.Source, "jq(.)") //nolint:contextcheck
if err != nil {
return nil, nil, err
}
m, ok := input.(map[string]interface{})
if !ok {
err = derrors.NewInternalError(errors.New("invalid state data"))
return nil, nil, err
}
m, err = addSecrets(ctx, args.Instance, m, args.Action.Secrets...)
if err != nil {
return nil, nil, err
}
if args.Action.Input == nil {
input, err = jqOne(m, "jq(.)") //nolint:contextcheck
if err != nil {
return nil, nil, err
}
} else {
input, err = jqOne(m, args.Action.Input) //nolint:contextcheck
if err != nil {
return nil, nil, err
}
}
var inputData []byte
inputData, err = json.Marshal(input)
if err != nil {
err = derrors.NewInternalError(err)
return nil, nil, err
}
files := make([]model.FunctionFileDefinition, 0)
for idx := range args.Files {
file := args.Files[idx]
s, err := jqString(m, file.As) //nolint:contextcheck
if err != nil {
return nil, nil, wrap(err, fmt.Sprintf("error evaluating jq in 'as' for function file %d: %%w", idx))
}
file.As = s
s, err = jqString(m, file.Key) //nolint:contextcheck
if err != nil {
return nil, nil, wrap(err, fmt.Sprintf("error evaluating jq in 'key' for function file %d: %%w", idx))
}
file.Key = s
if file.Key == "" {
return nil, nil, derrors.NewCatchableError(ErrCodeInvalidVariableKey, "invalid 'key' for function file %d: got zero-length string", idx)
}
if file.Scope != utils.VarScopeFileSystem && !utils.VarNameRegex.MatchString(file.Key) {
return nil, nil, derrors.NewCatchableError(ErrCodeInvalidVariableKey, "invalid 'key' for function file %d: must start with a letter and only contain letters, numbers and '_'", idx)
}
s, err = jqString(m, file.Scope) //nolint:contextcheck
if err != nil {
return nil, nil, wrap(err, fmt.Sprintf("error evaluating jq in 'scope' for function file %d: %%w", idx))
}
file.Scope = s
switch file.Scope {
case "":
case utils.VarScopeNamespace:
case utils.VarScopeWorkflow:
case utils.VarScopeInstance:
case utils.VarScopeThread:
case utils.VarScopeFileSystem:
default:
return nil, nil, derrors.NewCatchableError(ErrCodeInvalidVariableScope, "invalid 'scope' for function file %d: %s", idx, file.Scope)
}
s, err = jqString(m, file.Type) //nolint:contextcheck
if err != nil {
return nil, nil, wrap(err, fmt.Sprintf("error evaluating jq in 'type' for function file %d: %%w", idx))
}
file.Type = s
if file.Permissions != "" {
_, err := strconv.ParseUint(file.Permissions, 8, 32)
if err != nil {
return nil, nil, derrors.NewCatchableError(ErrCodeInvalidVariablePermissions, "invalid 'permissions' for function file %d: %s", idx, err.Error())
}
}
files = append(files, file)
}
return inputData, files, nil
}
func addSecrets(ctx context.Context, instance Instance, m map[string]interface{}, secrets ...string) (map[string]interface{}, error) {
if len(secrets) > 0 {
s := make(map[string]string)
for _, name := range secrets {
dd, err := instance.RetrieveSecret(ctx, name)
if err != nil {
return nil, err
}
s[name] = dd
}
m["secrets"] = s
}
return m, nil
}
type invokeActionArgs struct {
instance Instance
async bool
fn model.FunctionDefinition
input []byte
attempt int
timeout int
files []model.FunctionFileDefinition
iterator int
}
func invokeAction(ctx context.Context, args invokeActionArgs) (*ChildInfo, error) {
child, err := args.instance.CreateChild(ctx, CreateChildArgs{
Definition: args.fn,
Input: args.input,
Timeout: args.timeout,
Async: args.async,
Files: args.files,
Iterator: args.iterator,
})
if err != nil {
return nil, err
}
defer child.Run(ctx)
ci := child.Info()
if args.async {
args.instance.Log(ctx, log.Debug, "Running child '%s' in fire-and-forget mode (async).", ci.ID)
//nolint:nilnil
return nil, nil
}
return &ChildInfo{
ID: ci.ID,
Type: ci.Type,
Attempts: args.attempt,
ServiceName: ci.ServiceName,
}, nil
}
func ISO8601StringtoSecs(timeout string) (int, error) {
// default 15 mins timeout
wfto := 15 * 60
if len(timeout) > 0 {
to, err := duration.ParseISO8601(timeout)
if err != nil {
return wfto, err
}
dur := time.Until(to.Shift(time.Now().UTC()))
wfto = int(dur.Seconds())
}
return wfto, nil
}