iwanbk/bcache

View on GitHub
bcache.go

Summary

Maintainability
A
1 hr
Test Coverage
package bcache

import (
    "errors"
    "net"
    "strconv"
    "time"

    "github.com/weaveworks/mesh"
    "golang.org/x/sync/singleflight"
)

const (
    // weaveworks/mesh channel name
    channel = "bcache"
)

var (
    // ErrNilFiller returned when GetWithFiller called with nil
    // filler func
    ErrNilFiller = errors.New("nil filler")
)

// Bcache represents bcache struct
type Bcache struct {
    peer          *peer
    router        *mesh.Router
    logger        Logger
    flight        singleflight.Group
    deletionDelay time.Duration
}

// New creates new bcache from the given config
func New(cfg Config) (*Bcache, error) {
    const (
        connLimit = 64 // mesh router connection limit
    )

    if err := cfg.setDefault(); err != nil {
        return nil, err
    }

    var (
        peerName = mesh.PeerName(cfg.PeerID)
        nickName = cfg.ListenAddr
        logger   = cfg.Logger
    )

    // parse host port
    host, portStr, err := net.SplitHostPort(cfg.ListenAddr)
    if err != nil {
        return nil, err
    }
    port, err := strconv.Atoi(portStr)
    if err != nil {
        return nil, err
    }

    // creates mesh router
    router, err := mesh.NewRouter(mesh.Config{
        Host:               host,
        Port:               port,
        ProtocolMinVersion: mesh.ProtocolMinVersion,
        ConnLimit:          connLimit,
        PeerDiscovery:      true,
        TrustedSubnets:     []*net.IPNet{},
    }, peerName, nickName, mesh.NullOverlay{}, logger)
    if err != nil {
        return nil, err
    }

    // bcache peer
    peer, err := newPeer(peerName, cfg.MaxKeys, logger)
    if err != nil {
        return nil, err
    }

    // creates gossip channel
    gossip, err := router.NewGossip(channel, peer)
    if err != nil {
        return nil, err
    }

    peer.register(gossip)

    // start mesh router
    logger.Printf("mesh router starting at %s", cfg.ListenAddr)
    router.Start()

    // creates new connection to the provided peers
    router.ConnectionMaker.InitiateConnections(cfg.Peers, true)

    return &Bcache{
        peer:          peer,
        router:        router,
        logger:        logger,
        deletionDelay: time.Duration(cfg.DeletionDelay) * time.Second,
    }, nil
}

// Set sets value for the given key with the given ttl in second.
// if ttl <= 0, the key will expired instantly
func (b *Bcache) Set(key, val string, ttl int) {
    if ttl <= 0 {
        b.Delete(key)
        return
    }
    b.set(key, val, ttl)
}

func (b *Bcache) set(key, val string, ttl int) int64 {
    expired := time.Now().Add(time.Duration(ttl) * time.Second).UnixNano()
    b.peer.Set(key, val, expired)
    return expired
}

// Get gets value for the given key.
//
// It returns the value and true if the key exists
func (b *Bcache) Get(key string) (string, bool) {
    return b.peer.Get(key)
}

// Delete the given key.
//
func (b *Bcache) Delete(key string) {
    deleteTs := time.Now().Add(b.deletionDelay).UnixNano()
    b.peer.Delete(key, deleteTs)
}

// Filler defines func to be called when the given key is not exists
type Filler func(key string) (val string, err error)

// GetWithFiller gets value for the given key and fill the cache
// if the given key is not exists.
//
// `filler` will be used to fill(Set) the cache
// when the given key is not exist.
// Even there are many goroutines which call `GetWithFiller`, the given `Filler` func
// will only called once for each of the key.
//
//
// It useful to avoid cache stampede to  the underlying database
func (b *Bcache) GetWithFiller(key string, filler Filler, ttl int) (string, error) {
    if filler == nil {
        return "", ErrNilFiller
    }

    // get value from cache
    val, ok := b.Get(key)
    if ok {
        return val, nil
    }

    // construct singleflight filler
    flightFn := func() (interface{}, error) {
        val, err := filler(key)
        if err != nil {
            b.logger.Errorf("filler failed: %v", err)
            return nil, err
        }

        expired := b.set(key, val, ttl)

        return value{
            value:   val,
            expired: expired,
        }, nil
    }

    // call the filler
    valueIf, err, _ := b.flight.Do(key, func() (interface{}, error) {
        return flightFn()
    })
    if err != nil {
        return "", err
    }

    // return the value
    value := valueIf.(value)
    return value.value, nil
}

// Close closes the cache, free all the resource
func (b *Bcache) Close() error {
    b.logger.Printf("mesh router stopping")
    return b.router.Stop()
}