
View on GitHub


6 hrs
Test Coverage
package model

import (


// WorkflowIDRegex - Regex used to validate ID.
const WorkflowIDRegex = "^[a-z][a-z0-9._-]{1,34}[a-z0-9]$"

// Workflow global object defining the fields for a workflow.
type Workflow struct {
    DirektivAPI string `json:"direktiv_api,omitempty" yaml:"direktiv_api"`
    // ID          string               `yaml:"id" json:"id"`
    // Name        string               `yaml:"name,omitempty" json:"name,omitempty"`
    URL         string `json:"url"                   yaml:"url"`
    Description string `json:"description,omitempty" yaml:"description,omitempty"`
    // Version     string               `yaml:"version,omitempty" json:"version,omitempty"`
    // Exclusive   bool                 `yaml:"singular,omitempty" json:"singular,omitempty"`
    Functions []FunctionDefinition `json:"functions,omitempty" yaml:"functions,omitempty"`
    States    []State              `json:"states,omitempty"    yaml:"states,omitempty"`
    Timeouts  *TimeoutDefinition   `json:"timeouts,omitempty"  yaml:"timeouts,omitempty"`
    Start     StartDefinition      `json:"start,omitempty"     yaml:"start,omitempty"`

func (o *Workflow) unmarshal(m map[string]interface{}) error {
    // split start out from the rest, and umarshal it
    if err := o.unmStart(m); err != nil {
        return err

    // split states out from the rest
    iStates, ok := m["states"]
    if !ok {
        return errors.New("states required")

    delete(m, "states")

    // split states out from the rest
    iFunctions, functionsOk := m["functions"]

    delete(m, "functions")

    if err := strictMapUnmarshal(m, &o); err != nil {
        return fmt.Errorf("failed to decode workflow: %w", err)

    // cast all states
    sList, ok := iStates.([]interface{})
    if !ok {
        return errors.New("invalid type for states")

    o.States = make([]State, len(sList))

    for i := range sList {
        // insert state in workflow.states[i]
        if err := o.unmState(sList[i], i); err != nil {
            return err

    // cast all functions exist
    if functionsOk {
        fList, ok := iFunctions.([]interface{})
        if !ok {
            return errors.New("invalid type for functions")

        o.Functions = make([]FunctionDefinition, len(fList))

        for i := range fList {
            // insert function in workflow.function[i]
            if err := o.unmFunction(fList[i], i); err != nil {
                return err

    return o.validate()

// unmStart - unmarshal "start" object to Workflow.
func (o *Workflow) unmStart(m map[string]interface{}) (err error) {
    // split start out from the rest
    y, startFound := m["start"]
    if startFound {
        // Start

        delete(m, "start")
        startMap, startType, err := processInterfaceMap(y)
        if err != nil {
            return fmt.Errorf("bad start: %w", err)

        start, err := getStartFromType(startType)
        if err != nil {
            return fmt.Errorf("start: %w", err)

        if err := strictMapUnmarshal(startMap, &start); err != nil {
            return fmt.Errorf("failed to decode start: %w", err)

        err = start.Validate()
        if err != nil {
            err = fmt.Errorf("start invalid: %w", err)
            return err

        o.Start = start

    return err

// unmState - unmarshal "state" object to Workflow States.
//    the state interface is casted to a supported State 'type'
//    and then inserted into workflow[sIndex].
func (o *Workflow) unmState(state interface{}, sIndex int) error {
    stateMap, stateType, err := processInterfaceMap(state)
    if err != nil {
        return fmt.Errorf("state[%d]: %w", sIndex, err)

    s, err := getStateFromType(stateType)
    if err != nil {
        return fmt.Errorf("state[%d]: %w", sIndex, err)

    if err := strictMapUnmarshal(stateMap, &s); err != nil {
        return fmt.Errorf("failed to decode state[%d]: %w", sIndex, err)

    o.States[sIndex] = s

    err = s.Validate()
    if err != nil {
        err = fmt.Errorf("state[%d]: %w", sIndex, err)

    return err

func (o *Workflow) unmFunction(state interface{}, fIndex int) error {
    fMap, fType, err := processInterfaceMap(state)
    if err != nil {
        return fmt.Errorf("function[%d]: %w", fIndex, err)

    f, err := getFunctionDefFromType(fType)
    if err != nil {
        return fmt.Errorf("function[%d]: %w", fIndex, err)

    if err := strictMapUnmarshal(fMap, &f); err != nil {
        return fmt.Errorf("failed to decode function[%d]: %w", fIndex, err)

    o.Functions[fIndex] = f

    err = f.Validate()
    if err != nil {
        err = fmt.Errorf("function[%d]: %w", fIndex, err)

    return err

func (o *Workflow) validate() error {
    if len(o.States) == 0 {
        return errors.New("workflow has no defined states")

    states, err := o.getStatesMap()
    if err != nil {
        return err

    functions, err := o.getFunctionMap()
    if err != nil {
        return err

    if o.Start != nil && o.Start.GetState() != "" {
        // Check if state exists
        if _, ok := states[o.Start.GetState()]; !ok {
            return fmt.Errorf("start targets state that does not exist")

    // functions
    for i, function := range o.GetFunctions() {
        if sErr := function.Validate(); sErr != nil {
            return fmt.Errorf("workflow function[%v] is invalid: %w", i, sErr)

    // states
    for i, state := range o.GetStates() {
        // Validate All State Transitions reference a exisiting state
        for tKey, transition := range state.getTransitions() {
            if _, ok := states[transition]; !ok {
                return fmt.Errorf("workflow state[%v] '%v' transition '%s' does not exist", i, tKey, transition)

        // Check if function actions are defined
        fActions := make([]string, 0)
        switch state.GetType() {
        case StateTypeAction:
            fActions = append(fActions, state.(*ActionState).Action.Function)
        case StateTypeParallel:
            for _, act := range state.(*ParallelState).Actions {
                fActions = append(fActions, act.Function)
        case StateTypeForEach:
            fActions = append(fActions, state.(*ForEachState).Action.Function)

        for j := range fActions {
            if _, fExists := functions[fActions[j]]; fActions[j] != "" && !fExists {
                return fmt.Errorf("workflow state[%v] actions function '%s' does not exist", i, fActions[j])

    // timeout
    return o.Timeouts.Validate()

// GetStates returns all the states of a workflow.
func (o *Workflow) GetStates() []State {
    if o.States == nil {
        return make([]State, 0)

    return o.States

// GetStatesMap : Get workflow states as a map.
func (o *Workflow) GetStatesMap() map[string]State {
    statesMap := make(map[string]State)
    for _, state := range o.GetStates() {
        statesMap[state.GetID()] = state

    return statesMap

// getStatesMap : Get workflow states as a map, and returns error if the same state is defined more than once.
func (o *Workflow) getStatesMap() (map[string]State, error) {
    statesMap := make(map[string]State)

    for _, state := range o.GetStates() {
        sID := state.GetID()
        if _, ok := statesMap[sID]; ok {
            return statesMap, fmt.Errorf("state id '%s' is used in more than one state", sID)
        statesMap[state.GetID()] = state

    return statesMap, nil

// getFunctionMap : Get functions as a map, and returns error if the same function id is defined more than once.
func (o *Workflow) getFunctionMap() (map[string]FunctionDefinition, error) {
    funcMap := make(map[string]FunctionDefinition)

    for _, wfFunc := range o.GetFunctions() {
        fID := wfFunc.GetID()
        if _, ok := funcMap[fID]; ok {
            return funcMap, fmt.Errorf("function id '%s' is used in more than one function", fID)
        funcMap[fID] = wfFunc

    return funcMap, nil

// GetFunctions: Get all function definitions.
func (o *Workflow) GetFunctions() []FunctionDefinition {
    if o.Functions == nil {
        return make([]FunctionDefinition, 0)

    return o.Functions

// GetFunction: Returns the function definition.
func (o *Workflow) GetFunction(id string) (FunctionDefinition, error) {
    for i, fn := range o.Functions {
        if fn.GetID() == id {
            return o.Functions[i], nil

    return nil, fmt.Errorf("function '%s' not defined", id)

// UnmarshalYAML unmarshals the workflow YAMl.
func (o *Workflow) UnmarshalYAML(unmarshal func(interface{}) error) error {
    m := make(map[string]interface{})
    err := unmarshal(&m)
    if err != nil {
        return err

    err = o.unmarshal(m)
    if err != nil {
        return err

    return nil

// Load unmarshals the data and validates it.
func (o *Workflow) Load(data []byte) error {
    err := yaml.Unmarshal(data, o)
    if err != nil {
        return err

    err = o.validate()
    if err != nil {
        return err

    return nil

// GetStartState returns the start state of a workflow.
func (o *Workflow) GetStartState() State {
    if o.Start == nil || o.Start.GetState() == "" {
        return o.States[0]

    for _, state := range o.States {
        if state.GetID() == o.Start.GetState() {
            return state

    panic(errors.New("cannot resolve start state"))

// VariableReference - Workflow variable referenced in getter or setter.
type VariableReference struct {
    Scope     string   `json:"scope"`
    Key       string   `json:"key"`
    Operation []string `json:"operation"`

// GetSecretReferences - Get all secrets referenced in actions.
func (o *Workflow) GetSecretReferences() []string {
    refs := make([]string, 0)
    refsMap := make(map[string]bool)

    // Get All secret references
    for _, state := range o.GetStates() {
        sType := state.GetType()

        // handle action secret references
        if sType == StateTypeAction {
            actionState := state.(*ActionState)
            for j := range actionState.Action.Secrets {
                refsMap[actionState.Action.Secrets[j]] = true

    // Convert Map to array
    for secretName := range refsMap {
        refs = append(refs, secretName)

    return refs