server/pkg/tasks_manager/task.go

Summary

Maintainability
A
2 hrs
Test Coverage
C
74%
package tasks_manager

import (
    "context"
    "encoding/json"
    "fmt"
    "time"

    "github.com/hashicorp/vault/sdk/logical"
    uuid "github.com/satori/go.uuid"
)

type (
    taskStatus string
    taskState  string
)

const (
    taskStatusQueued    taskStatus = "QUEUED"
    taskStatusRunning   taskStatus = "RUNNING"
    taskStatusSucceeded taskStatus = "SUCCEEDED"
    taskStatusFailed    taskStatus = "FAILED"
    taskStatusCanceled  taskStatus = "CANCELED"

    taskStateQueued    taskState = "QUEUED"
    taskStateRunning   taskState = "RUNNING"
    taskStateCompleted taskState = "COMPLETED"

    storageKeyPrefixQueuedTask    = "queued_task/"
    storageKeyPrefixRunningTask   = "running_task/"
    storageKeyPrefixCompletedTask = "completed_task/"
    storageKeyPrefixTaskLog       = "task_log/"
)

var taskStateStatusesCompleted = []taskStatus{taskStatusSucceeded, taskStatusFailed, taskStatusCanceled}

type Task struct {
    UUID     string    `structs:"uuid" json:"uuid"`
    Status   string    `structs:"status" json:"status"`
    Reason   string    `structs:"reason" json:"reason"`
    Created  time.Time `structs:"created" json:"created"`
    Modified time.Time `structs:"modified" json:"modified"`
}

func newTask() *Task {
    task := &Task{}
    task.UUID = uuid.NewV4().String()
    task.Status = string(taskStatusQueued)

    tNow := time.Now()
    task.Created = tNow
    task.Modified = tNow

    return task
}

func addNewTaskToStorage(ctx context.Context, storage logical.Storage) (string, error) {
    queuedTask := newTask()
    storageKey := taskStorageKey(taskStateQueued, queuedTask.UUID)
    entry, err := logical.StorageEntryJSON(storageKey, queuedTask)
    if err != nil {
        return "", fmt.Errorf("unable to prepare storage entry JSON: %w", err)
    }

    if err := storage.Put(ctx, entry); err != nil {
        return "", fmt.Errorf("unable to put %q into storage: %w", storageKey, err)
    }

    return queuedTask.UUID, nil
}

func switchTaskToRunningInStorage(ctx context.Context, storage logical.Storage, uuid string) error {
    // get previous task state from storage
    var prevTask *Task
    {
        t, err := getTaskFromStorage(ctx, storage, taskStateQueued, uuid)
        if err != nil {
            return err
        }

        if t == nil {
            return fmt.Errorf("queued task %q must be in storage", uuid)
        }

        prevTask = t
    }

    // add running task to storage
    {
        runningTask := prevTask
        runningTask.Status = string(taskStatusRunning)
        runningTask.Modified = time.Now()
        runningTaskState := taskStateRunning

        storageKey := taskStorageKey(runningTaskState, uuid)
        entry, err := logical.StorageEntryJSON(storageKey, prevTask)
        if err != nil {
            return fmt.Errorf("unable to prepare storage entry JSON: %w", err)
        }

        if err := storage.Put(ctx, entry); err != nil {
            return fmt.Errorf("unable to put %q into storage: %w", storageKey, err)
        }
    }

    // delete previous state from storage
    prevStorageKey := taskStorageKey(taskStateQueued, uuid)
    if err := storage.Delete(ctx, prevStorageKey); err != nil {
        return fmt.Errorf("unable to delete %q from storage: %w", prevStorageKey, err)
    }

    return nil
}

type switchTaskToCompletedInStorageOptions struct {
    reason string
    log    []byte
}

func switchTaskToCompletedInStorage(ctx context.Context, storage logical.Storage, status taskStatus, uuid string, opts switchTaskToCompletedInStorageOptions) error {
    // validate new status
    if !isCompletedTaskStatus(status) {
        return fmt.Errorf("runtime error: task in completed state cannot be with status %q", status)
    }

    // get previous task state from storage
    var prevTask *Task
    var prevTaskState taskState
    {
        for _, s := range []taskState{taskStateRunning, taskStateQueued} {
            t, err := getTaskFromStorage(ctx, storage, s, uuid)
            if err != nil {
                return err
            }

            if t == nil {
                continue
            }

            prevTask = t
            prevTaskState = s
        }

        if prevTask == nil {
            return fmt.Errorf("queued or running task %q not found in storage", uuid)
        }
    }

    // add completed task and optional log to storage
    {
        completedTask := prevTask
        completedTask.Status = string(status)
        completedTask.Modified = time.Now()
        completedTask.Reason = opts.reason
        completedTaskState := taskStatusState(status)

        storageKey := taskStorageKey(completedTaskState, uuid)
        entry, err := logical.StorageEntryJSON(storageKey, completedTask)
        if err != nil {
            return fmt.Errorf("unable to prepare storage entry JSON: %w", err)
        }

        if err := storage.Put(ctx, entry); err != nil {
            return fmt.Errorf("unable to put %q into storage: %w", storageKey, err)
        }

        if len(opts.log) != 0 {
            logStorageKey := taskLogStorageKey(uuid)
            if err := storage.Put(ctx, &logical.StorageEntry{
                Key:   logStorageKey,
                Value: opts.log,
            }); err != nil {
                return fmt.Errorf("unable to put %q into the storage: %w", logStorageKey, err)
            }
        }
    }

    // delete previous state from storage
    prevStorageKey := taskStorageKey(prevTaskState, prevTask.UUID)
    if err := storage.Delete(ctx, prevStorageKey); err != nil {
        return fmt.Errorf("unable to delete %q from storage: %w", prevStorageKey, err)
    }

    return nil
}

func getTaskFromStorage(ctx context.Context, storage logical.Storage, state taskState, uuid string) (*Task, error) {
    storageKey := taskStorageKey(state, uuid)
    entry, err := storage.Get(ctx, storageKey)
    if err != nil {
        return nil, fmt.Errorf("unable to get %q from storage: %w", storageKey, err)
    }

    if entry == nil {
        return nil, nil
    }

    return storageEntryToTask(entry)
}

func getTaskLogFromStorage(ctx context.Context, storage logical.Storage, uuid string) ([]byte, error) {
    storageKey := taskLogStorageKey(uuid)
    entry, err := storage.Get(ctx, taskLogStorageKey(uuid))
    if err != nil {
        return nil, fmt.Errorf("unable to get %q from storage: %w", storageKey, err)
    }

    if entry == nil {
        return nil, nil
    }

    return entry.Value, nil
}

func taskStorageKey(state taskState, uuid string) string {
    return taskStorageKeyPrefix(state) + uuid
}

func taskStorageKeyPrefix(state taskState) string {
    switch state {
    case taskStateQueued:
        return storageKeyPrefixQueuedTask
    case taskStateRunning:
        return storageKeyPrefixRunningTask
    case taskStateCompleted:
        return storageKeyPrefixCompletedTask
    default:
        panic(fmt.Sprintf("unexpected task state %q", state))
    }
}

func taskStatusState(status taskStatus) taskState {
    switch {
    case status == taskStatusQueued:
        return taskStateQueued
    case status == taskStatusRunning:
        return taskStateRunning
    case isCompletedTaskStatus(status):
        return taskStateCompleted
    default:
        panic(fmt.Sprintf("unexpected task status %q", status))
    }
}

func isCompletedTaskStatus(status taskStatus) bool {
    for _, s := range taskStateStatusesCompleted {
        if s == status {
            return true
        }
    }

    return false
}

func taskLogStorageKey(uuid string) string {
    return storageKeyPrefixTaskLog + uuid
}

func storageEntryToTask(entry *logical.StorageEntry) (*Task, error) {
    var task *Task
    if err := json.Unmarshal(entry.Value, &task); err != nil {
        return nil, err
    }

    return task, nil
}