dotcloud/docker

View on GitHub
libnetwork/internal/kvstore/boltdb/boltdb.go

Summary

Maintainability
A
0 mins
Test Coverage
package boltdb

import (
    "bytes"
    "encoding/binary"
    "errors"
    "os"
    "path/filepath"
    "sync"
    "sync/atomic"
    "time"

    store "github.com/docker/docker/libnetwork/internal/kvstore"
    bolt "go.etcd.io/bbolt"
)

// ErrBoltBucketOptionMissing is thrown when boltBcuket config option is missing
var ErrBoltBucketOptionMissing = errors.New("boltBucket config option missing")

const filePerm = 0o644

// BoltDB type implements the Store interface
type BoltDB struct {
    mu         sync.Mutex
    client     *bolt.DB
    boltBucket []byte
    dbIndex    atomic.Uint64
    path       string
    timeout    time.Duration
}

const (
    libkvmetadatalen = 8
    transientTimeout = time.Duration(10) * time.Second
)

// New opens a new BoltDB connection to the specified path and bucket
func New(endpoint string, options *store.Config) (store.Store, error) {
    if (options == nil) || (len(options.Bucket) == 0) {
        return nil, ErrBoltBucketOptionMissing
    }

    dir, _ := filepath.Split(endpoint)
    if err := os.MkdirAll(dir, 0o750); err != nil {
        return nil, err
    }

    db, err := bolt.Open(endpoint, filePerm, &bolt.Options{
        Timeout: options.ConnectionTimeout,
    })
    if err != nil {
        return nil, err
    }

    timeout := transientTimeout
    if options.ConnectionTimeout != 0 {
        timeout = options.ConnectionTimeout
    }

    b := &BoltDB{
        client:     db,
        path:       endpoint,
        boltBucket: []byte(options.Bucket),
        timeout:    timeout,
    }

    return b, nil
}

// Put the key, value pair. index number metadata is prepended to the value
func (b *BoltDB) Put(key string, value []byte) error {
    b.mu.Lock()
    defer b.mu.Unlock()

    return b.client.Update(func(tx *bolt.Tx) error {
        bucket, err := tx.CreateBucketIfNotExists(b.boltBucket)
        if err != nil {
            return err
        }

        dbIndex := b.dbIndex.Add(1)
        dbval := make([]byte, libkvmetadatalen)
        binary.LittleEndian.PutUint64(dbval, dbIndex)
        dbval = append(dbval, value...)

        return bucket.Put([]byte(key), dbval)
    })
}

// Exists checks if the key exists inside the store
func (b *BoltDB) Exists(key string) (bool, error) {
    b.mu.Lock()
    defer b.mu.Unlock()

    var exists bool
    err := b.client.View(func(tx *bolt.Tx) error {
        bucket := tx.Bucket(b.boltBucket)
        if bucket == nil {
            return store.ErrKeyNotFound
        }

        exists = len(bucket.Get([]byte(key))) > 0
        return nil
    })
    if err != nil {
        return false, err
    }
    if !exists {
        return false, store.ErrKeyNotFound
    }
    return true, nil
}

// List returns the range of keys starting with the passed in prefix
func (b *BoltDB) List(keyPrefix string) ([]*store.KVPair, error) {
    b.mu.Lock()
    defer b.mu.Unlock()

    var kv []*store.KVPair
    err := b.client.View(func(tx *bolt.Tx) error {
        bucket := tx.Bucket(b.boltBucket)
        if bucket == nil {
            return store.ErrKeyNotFound
        }

        cursor := bucket.Cursor()
        prefix := []byte(keyPrefix)

        for key, v := cursor.Seek(prefix); bytes.HasPrefix(key, prefix); key, v = cursor.Next() {
            dbIndex := binary.LittleEndian.Uint64(v[:libkvmetadatalen])
            v = v[libkvmetadatalen:]
            val := make([]byte, len(v))
            copy(val, v)

            kv = append(kv, &store.KVPair{
                Key:       string(key),
                Value:     val,
                LastIndex: dbIndex,
            })
        }
        return nil
    })
    if err != nil {
        return nil, err
    }
    if len(kv) == 0 {
        return nil, store.ErrKeyNotFound
    }
    return kv, nil
}

// AtomicDelete deletes a value at "key" if the key
// has not been modified in the meantime, throws an
// error if this is the case
func (b *BoltDB) AtomicDelete(key string, previous *store.KVPair) error {
    b.mu.Lock()
    defer b.mu.Unlock()

    if previous == nil {
        return store.ErrPreviousNotSpecified
    }

    return b.client.Update(func(tx *bolt.Tx) error {
        bucket := tx.Bucket(b.boltBucket)
        if bucket == nil {
            return store.ErrKeyNotFound
        }

        val := bucket.Get([]byte(key))
        if val == nil {
            return store.ErrKeyNotFound
        }
        dbIndex := binary.LittleEndian.Uint64(val[:libkvmetadatalen])
        if dbIndex != previous.LastIndex {
            return store.ErrKeyModified
        }
        return bucket.Delete([]byte(key))
    })
}

// Delete deletes a value at "key". Unlike AtomicDelete it doesn't check
// whether the deleted key is at a specific version before deleting.
func (b *BoltDB) Delete(key string) error {
    b.mu.Lock()
    defer b.mu.Unlock()

    return b.client.Update(func(tx *bolt.Tx) error {
        bucket := tx.Bucket(b.boltBucket)
        if bucket == nil || bucket.Get([]byte(key)) == nil {
            return store.ErrKeyNotFound
        }
        return bucket.Delete([]byte(key))
    })
}

// AtomicPut puts a value at "key" if the key has not been
// modified since the last Put, throws an error if this is the case
func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair) (*store.KVPair, error) {
    b.mu.Lock()
    defer b.mu.Unlock()

    var dbIndex uint64
    dbval := make([]byte, libkvmetadatalen)
    err := b.client.Update(func(tx *bolt.Tx) error {
        bucket := tx.Bucket(b.boltBucket)
        if bucket == nil {
            if previous != nil {
                return store.ErrKeyNotFound
            }
            var err error
            bucket, err = tx.CreateBucket(b.boltBucket)
            if err != nil {
                return err
            }
        }
        // AtomicPut is equivalent to Put if previous is nil and the Ky
        // doesn't exist in the DB.
        val := bucket.Get([]byte(key))
        if previous == nil && len(val) != 0 {
            return store.ErrKeyExists
        }
        if previous != nil {
            if len(val) == 0 {
                return store.ErrKeyNotFound
            }
            dbIndex = binary.LittleEndian.Uint64(val[:libkvmetadatalen])
            if dbIndex != previous.LastIndex {
                return store.ErrKeyModified
            }
        }
        dbIndex = b.dbIndex.Add(1)
        binary.LittleEndian.PutUint64(dbval, dbIndex)
        dbval = append(dbval, value...)
        return bucket.Put([]byte(key), dbval)
    })
    if err != nil {
        return nil, err
    }
    return &store.KVPair{Key: key, Value: value, LastIndex: dbIndex}, nil
}

// Close the db connection to the BoltDB
func (b *BoltDB) Close() {
    b.mu.Lock()
    defer b.mu.Unlock()

    b.client.Close()
}