iwanbk/bcache

View on GitHub
peer.go

Summary

Maintainability
A
0 mins
Test Coverage
package bcache

import (
    "github.com/weaveworks/mesh"
)

type peer struct {
    cc       *cache
    name     mesh.PeerName
    send     mesh.Gossip
    actionCh chan func()
    quitCh   chan struct{}
    logger   Logger
}

func newPeer(name mesh.PeerName, maxKeys int, logger Logger) (*peer, error) {
    cc, err := newCache(maxKeys)
    if err != nil {
        return nil, err
    }

    p := &peer{
        cc:       cc,
        name:     name,
        send:     nil, // must be registered
        actionCh: make(chan func()),
        quitCh:   make(chan struct{}),
        logger:   logger,
    }
    go p.loop()
    return p, nil
}

// register the result of a mesh.Router.NewGossip.
func (p *peer) register(send mesh.Gossip) {
    p.actionCh <- func() {
        p.send = send
    }
}

// Gossip implements mesh.Gossiper.Gossip
func (p *peer) Gossip() mesh.GossipData {
    return p.cc.Messages()
}

// OnGossip merges received data into state and returns "everything new
// I've just learnt", or nil if nothing in the received data was new.
//
// It implements mesh.Gossiper.OnGossip
func (p *peer) OnGossip(buf []byte) (delta mesh.GossipData, err error) {
    msg, err := newMessageFromBuf(buf)
    if err != nil {
        return
    }

    var deltaMsg *message

    delta = p.cc.mergeNew(msg)
    if delta != nil {
        deltaMsg = delta.(*message)
    }

    p.logger.Debugf("[%d]OnGossip %v => delta %v", p.name, msg, deltaMsg)
    return
}

// OnGossipBroadcast merges received data into state and returns a
// representation of the received data (typically a delta) for further
// propagation.
//
// It implements mesh.Gossiper.OnGossipBroadcast
func (p *peer) OnGossipBroadcast(src mesh.PeerName, update []byte) (received mesh.GossipData, err error) {
    if src == p.name { // message from ourself, is it possible?
        return
    }
    msg, err := newMessageFromBuf(update)
    if err != nil {
        return
    }

    var recvMsg *message

    received = p.cc.mergeDelta(msg)
    if received != nil {
        recvMsg = received.(*message)
    }
    p.logger.Debugf("[%d]OnGossipBroadcast %v => delta %v", p.name, msg, recvMsg)
    return

}

func (p *peer) OnGossipUnicast(src mesh.PeerName, update []byte) error {
    msg, err := newMessageFromBuf(update)
    if err != nil {
        return err
    }
    p.cc.mergeComplete(msg)
    return nil
}

func (p *peer) Set(key, val string, expiredTimestamp int64) {
    c := make(chan struct{})

    p.actionCh <- func() {
        defer close(c)

        // set our cache
        p.cc.Set(key, val, expiredTimestamp, 0)

        // construct & send the message
        m := newMessage(p.name, 1)
        m.add(key, val, expiredTimestamp, 0)

        p.broadcast(m)
    }

    <-c // wait for it to be finished
}

func (p *peer) Delete(key string, deleteTimestamp int64) bool {
    var (
        c     = make(chan struct{})
        exist bool
    )

    p.actionCh <- func() {
        defer close(c)

        // delete from our cache
        val, expired, exist := p.cc.Delete(key, deleteTimestamp)
        if !exist {
            return
        }

        // construct & send the message
        m := newMessage(p.name, 1)
        m.add(key, val, expired, deleteTimestamp)

        p.broadcast(m)
    }

    <-c // wait for it to be finished
    return exist
}

func (p *peer) Get(key string) (string, bool) {
    return p.cc.Get(key)
}

func (p *peer) loop() {
    for {
        select {
        case f := <-p.actionCh:
            f()
        case <-p.quitCh:
            return
        }
    }
}

func (p *peer) broadcast(msg *message) {
    if p.send == nil {
        return
    }
    p.send.GossipBroadcast(msg)

}