docker/swarmkit

View on GitHub
agent/storage.go

Summary

Maintainability
A
2 hrs
Test Coverage
package agent

import (
    "github.com/gogo/protobuf/proto"
    "github.com/moby/swarmkit/v2/api"
    bolt "go.etcd.io/bbolt"
)

// Layout:
//
//    bucket(v1.tasks.<id>) ->
//        data (task protobuf)
//        status (task status protobuf)
//        assigned (key present)
var (
    bucketKeyStorageVersion = []byte("v1")
    bucketKeyTasks          = []byte("tasks")
    bucketKeyAssigned       = []byte("assigned")
    bucketKeyData           = []byte("data")
    bucketKeyStatus         = []byte("status")
)

// InitDB prepares a database for writing task data.
//
// Proper buckets will be created if they don't already exist.
func InitDB(db *bolt.DB) error {
    return db.Update(func(tx *bolt.Tx) error {
        _, err := createBucketIfNotExists(tx, bucketKeyStorageVersion, bucketKeyTasks)
        return err
    })
}

// GetTask retrieves the task with id from the datastore.
func GetTask(tx *bolt.Tx, id string) (*api.Task, error) {
    var t api.Task

    if err := withTaskBucket(tx, id, func(bkt *bolt.Bucket) error {
        p := bkt.Get(bucketKeyData)
        if p == nil {
            return errTaskUnknown
        }

        return proto.Unmarshal(p, &t)
    }); err != nil {
        return nil, err
    }

    return &t, nil
}

// WalkTasks walks all tasks in the datastore.
func WalkTasks(tx *bolt.Tx, fn func(task *api.Task) error) error {
    bkt := getTasksBucket(tx)
    if bkt == nil {
        return nil
    }

    return bkt.ForEach(func(k, v []byte) error {
        tbkt := bkt.Bucket(k)

        p := tbkt.Get(bucketKeyData)
        var t api.Task
        if err := proto.Unmarshal(p, &t); err != nil {
            return err
        }

        return fn(&t)
    })
}

// TaskAssigned returns true if the task is assigned to the node.
func TaskAssigned(tx *bolt.Tx, id string) bool {
    bkt := getTaskBucket(tx, id)
    if bkt == nil {
        return false
    }

    return len(bkt.Get(bucketKeyAssigned)) > 0
}

// GetTaskStatus returns the current status for the task.
func GetTaskStatus(tx *bolt.Tx, id string) (*api.TaskStatus, error) {
    var ts api.TaskStatus
    if err := withTaskBucket(tx, id, func(bkt *bolt.Bucket) error {
        p := bkt.Get(bucketKeyStatus)
        if p == nil {
            return errTaskUnknown
        }

        return proto.Unmarshal(p, &ts)
    }); err != nil {
        return nil, err
    }

    return &ts, nil
}

// WalkTaskStatus calls fn for the status of each task.
func WalkTaskStatus(tx *bolt.Tx, fn func(id string, status *api.TaskStatus) error) error {
    bkt := getTasksBucket(tx)
    if bkt == nil {
        return nil
    }

    return bkt.ForEach(func(k, v []byte) error {
        tbkt := bkt.Bucket(k)

        p := tbkt.Get(bucketKeyStatus)
        var ts api.TaskStatus
        if err := proto.Unmarshal(p, &ts); err != nil {
            return err
        }

        return fn(string(k), &ts)
    })
}

// PutTask places the task into the database.
func PutTask(tx *bolt.Tx, task *api.Task) error {
    return withCreateTaskBucketIfNotExists(tx, task.ID, func(bkt *bolt.Bucket) error {
        taskCopy := *task
        taskCopy.Status = api.TaskStatus{} // blank out the status.

        p, err := proto.Marshal(&taskCopy)
        if err != nil {
            return err
        }
        return bkt.Put(bucketKeyData, p)
    })
}

// PutTaskStatus updates the status for the task with id.
func PutTaskStatus(tx *bolt.Tx, id string, status *api.TaskStatus) error {
    // this used to be withCreateTaskBucketIfNotExists, but that could lead
    // to weird race conditions, and was not necessary.
    return withTaskBucket(tx, id, func(bkt *bolt.Bucket) error {
        p, err := proto.Marshal(status)
        if err != nil {
            return err
        }
        return bkt.Put(bucketKeyStatus, p)
    })
}

// DeleteTask completely removes the task from the database.
func DeleteTask(tx *bolt.Tx, id string) error {
    bkt := getTasksBucket(tx)
    if bkt == nil {
        return nil
    }

    return bkt.DeleteBucket([]byte(id))
}

// SetTaskAssignment sets the current assignment state.
func SetTaskAssignment(tx *bolt.Tx, id string, assigned bool) error {
    return withTaskBucket(tx, id, func(bkt *bolt.Bucket) error {
        if assigned {
            return bkt.Put(bucketKeyAssigned, []byte{0xFF})
        }
        return bkt.Delete(bucketKeyAssigned)
    })
}

func createBucketIfNotExists(tx *bolt.Tx, keys ...[]byte) (*bolt.Bucket, error) {
    bkt, err := tx.CreateBucketIfNotExists(keys[0])
    if err != nil {
        return nil, err
    }

    for _, key := range keys[1:] {
        bkt, err = bkt.CreateBucketIfNotExists(key)
        if err != nil {
            return nil, err
        }
    }

    return bkt, nil
}

func withCreateTaskBucketIfNotExists(tx *bolt.Tx, id string, fn func(bkt *bolt.Bucket) error) error {
    bkt, err := createBucketIfNotExists(tx, bucketKeyStorageVersion, bucketKeyTasks, []byte(id))
    if err != nil {
        return err
    }

    return fn(bkt)
}

func withTaskBucket(tx *bolt.Tx, id string, fn func(bkt *bolt.Bucket) error) error {
    bkt := getTaskBucket(tx, id)
    if bkt == nil {
        return errTaskUnknown
    }

    return fn(bkt)
}

func getTaskBucket(tx *bolt.Tx, id string) *bolt.Bucket {
    return getBucket(tx, bucketKeyStorageVersion, bucketKeyTasks, []byte(id))
}

func getTasksBucket(tx *bolt.Tx) *bolt.Bucket {
    return getBucket(tx, bucketKeyStorageVersion, bucketKeyTasks)
}

func getBucket(tx *bolt.Tx, keys ...[]byte) *bolt.Bucket {
    bkt := tx.Bucket(keys[0])

    for _, key := range keys[1:] {
        if bkt == nil {
            break
        }
        bkt = bkt.Bucket(key)
    }

    return bkt
}