markthethomas/raft-badger

View on GitHub
badger_store.go

Summary

Maintainability
A
0 mins
Test Coverage
package raftbadgerdb

import (
    "bytes"
    "encoding/binary"
    "encoding/gob"
    "errors"
    "fmt"
    "log"
    "math"
    "strconv"

    "github.com/dgraph-io/badger"
    "github.com/hashicorp/raft"
)

var (
    // Bucket names we perform transactions in
    dbLogsPrefix = []byte("logs")
    dbConfPrefix = []byte("conf")

    // ErrKeyNotFound is an error indicating a given key does not exist
    ErrKeyNotFound = errors.New("not found")
)

// BadgerStore provides access to Badger for Raft to store and retrieve
// log entries. It also provides key/value storage, and can be used as
// a LogStore and StableStore. See https://godoc.org/github.com/hashicorp/raft#StableStore
// and https://godoc.org/github.com/hashicorp/raft#LogStore
type BadgerStore struct {
    db   *badger.DB
    path string
}

// Options contains all the configuration used to open BadgerDB
type Options struct {
    // BadgerOptions contains any Badger-specific options
    BadgerOptions *badger.Options
    // Path is the directory
    Path string
}

// NewBadgerStore takes a file path and returns a connected Raft backend.
func NewBadgerStore(path string) (*BadgerStore, error) {
    opts := Options{Path: path, BadgerOptions: &badger.DefaultOptions}
    return New(opts)
}

// New uses the supplied options to open a badger db and prepare it for use as a raft backend.
func New(options Options) (*BadgerStore, error) {
    options.BadgerOptions.Dir = options.Path + "/badger"
    options.BadgerOptions.ValueDir = options.Path + "/badger"
    db, err := badger.Open(*options.BadgerOptions)
    if err != nil {
        log.Fatal(err)
    }

    store := &BadgerStore{
        db:   db,
        path: options.Path,
    }
    return store, nil
}

// Close is used to gracefully close the DB connection.
func (b *BadgerStore) Close() error {
    return b.db.Close()
}

func bytesToUint64(b []byte) uint64 {
    return binary.BigEndian.Uint64(b)
}

// Converts a uint to a byte slice
func uint64ToBytes(u uint64) []byte {
    buf := make([]byte, 8)
    binary.BigEndian.PutUint64(buf, u)
    return buf
}

// FirstIndex returns the first known index from the Raft log.
func (b *BadgerStore) FirstIndex() (uint64, error) {
    first := uint64(0)
    err := b.db.View(func(txn *badger.Txn) error {
        it := txn.NewIterator(badger.DefaultIteratorOptions)
        defer it.Close()
        it.Seek(dbLogsPrefix)
        if it.ValidForPrefix(dbLogsPrefix) {
            item := it.Item()
            k := string(item.Key()[len(dbLogsPrefix):])
            idx, err := strconv.ParseUint(k, 10, 64)
            if err != nil {
                return err
            }
            first = idx
        }
        return nil
    })
    if err != nil {
        return 0, err
    }
    return first, nil
}

// LastIndex returns the last known index from the Raft log.
func (b *BadgerStore) LastIndex() (uint64, error) {
    last := uint64(0)
    if err := b.db.View(func(txn *badger.Txn) error {
        opts := badger.DefaultIteratorOptions
        opts.Reverse = true
        it := txn.NewIterator(opts)
        defer it.Close()
        // ensure reverse seeking will include the
        // see https://github.com/dgraph-io/badger/issues/436 and
        // https://github.com/dgraph-io/badger/issues/347
        seekKey := append(dbLogsPrefix, 0xFF)
        it.Seek(seekKey)
        if it.ValidForPrefix(dbLogsPrefix) {
            item := it.Item()
            k := string(item.Key()[len(dbLogsPrefix):])
            idx, err := strconv.ParseUint(k, 10, 64)
            if err != nil {
                return err
            }
            last = idx
        }
        return nil
    }); err != nil {
        return 0, err
    }
    return last, nil
}

// GetLog is used to retrieve a log from Badger at a given index.
func (b *BadgerStore) GetLog(idx uint64, log *raft.Log) error {
    return b.db.View(func(txn *badger.Txn) error {
        item, err := txn.Get([]byte(fmt.Sprintf("%s%d", dbLogsPrefix, idx)))
        if item == nil {
            return raft.ErrLogNotFound
        }
        v, err := item.Value()
        if err != nil {
            return err
        }
        buf := bytes.NewBuffer(v)
        dec := gob.NewDecoder(buf)
        return dec.Decode(&log)
    })
}

// StoreLog is used to store a single raft log
func (b *BadgerStore) StoreLog(log *raft.Log) error {
    return b.StoreLogs([]*raft.Log{log})
}

// StoreLogs is used to store a set of raft logs
func (b *BadgerStore) StoreLogs(logs []*raft.Log) error {
    maxBatchSize := b.db.MaxBatchSize()
    min := uint64(0)
    max := uint64(len(logs))
    ranges := b.generateRanges(min, max, maxBatchSize)
    for _, r := range ranges {
        txn := b.db.NewTransaction(true)
        defer txn.Discard()
        for index := r.from; index < r.to; index++ {
            log := logs[index]
            key := []byte(fmt.Sprintf("%s%d", dbLogsPrefix, log.Index))
            var out bytes.Buffer
            enc := gob.NewEncoder(&out)
            enc.Encode(log)
            if err := txn.Set(key, out.Bytes()); err != nil {
                return err
            }
        }
        if err := txn.Commit(nil); err != nil {
            return err
        }
    }
    return nil
}

type iteratorRange struct{ from, to uint64 }

func (b *BadgerStore) generateRanges(min, max uint64, batchSize int64) []iteratorRange {
    nSegments := int(math.Round(float64((max - min) / uint64(batchSize))))
    segments := []iteratorRange{}
    if (max - min) <= uint64(batchSize) {
        segments = append(segments, iteratorRange{from: min, to: max})
        return segments
    }
    for len(segments) < nSegments {
        nextMin := min + uint64(batchSize)
        segments = append(segments, iteratorRange{from: min, to: nextMin})
        min = nextMin + 1
    }
    segments = append(segments, iteratorRange{from: min, to: max})
    return segments
}

// DeleteRange is used to delete logs within a given range inclusively.
func (b *BadgerStore) DeleteRange(min, max uint64) error {
    maxBatchSize := b.db.MaxBatchSize()
    ranges := b.generateRanges(min, max, maxBatchSize)
    for _, r := range ranges {
        txn := b.db.NewTransaction(true)
        it := txn.NewIterator(badger.DefaultIteratorOptions)
        defer txn.Discard()

        it.Rewind()
        // Get the key to start at
        minKey := []byte(fmt.Sprintf("%s%d", dbLogsPrefix, r.from))
        for it.Seek(minKey); it.ValidForPrefix(dbLogsPrefix); it.Next() {
            item := it.Item()
            // get the index as a string to convert to uint64
            k := string(item.Key()[len(dbLogsPrefix):])
            idx, err := strconv.ParseUint(k, 10, 64)
            if err != nil {
                it.Close()
                return err
            }
            // Handle out-of-range index
            if idx > r.to {
                break
            }
            // Delete in-range index
            delKey := []byte(fmt.Sprintf("%s%d", dbLogsPrefix, idx))
            if err := txn.Delete(delKey); err != nil {
                it.Close()
                return err
            }
        }
        it.Close()
        if err := txn.Commit(nil); err != nil {
            return err
        }
    }
    return nil
}

// Set is used to set a key/value set outside of the raft log
func (b *BadgerStore) Set(k, v []byte) error {
    return b.db.Update(func(txn *badger.Txn) error {
        key := []byte(fmt.Sprintf("%s%d", dbConfPrefix, k))
        return txn.Set(key, v)
    })
}

// Get is used to retrieve a value from the k/v store by key
func (b *BadgerStore) Get(k []byte) ([]byte, error) {
    txn := b.db.NewTransaction(false)
    defer txn.Discard()
    key := []byte(fmt.Sprintf("%s%d", dbConfPrefix, k))
    item, err := txn.Get(key)
    if item == nil {
        return nil, ErrKeyNotFound
    }
    if err != nil {
        return nil, err
    }
    v, err := item.ValueCopy(nil)
    if err != nil {
        return nil, err
    }
    if err := txn.Commit(nil); err != nil {
        return nil, err
    }
    return append([]byte(nil), v...), nil
}

// SetUint64 is like Set, but handles uint64 values
func (b *BadgerStore) SetUint64(key []byte, val uint64) error {
    return b.Set(key, uint64ToBytes(val))
}

// GetUint64 is like Get, but handles uint64 values
func (b *BadgerStore) GetUint64(key []byte) (uint64, error) {
    val, err := b.Get(key)
    if err != nil {
        return 0, err
    }
    return bytesToUint64(val), nil
}