waku-org/go-waku

View on GitHub
waku/v2/peerstore/waku_peer_store.go

Summary

Maintainability
A
0 mins
Test Coverage
C
77%
package peerstore

import (
    "errors"
    "sync"

    "github.com/ethereum/go-ethereum/p2p/enode"
    "github.com/libp2p/go-libp2p/core/network"
    "github.com/libp2p/go-libp2p/core/peer"
    "github.com/libp2p/go-libp2p/core/peerstore"
    "github.com/waku-org/go-waku/waku/v2/protocol"
    "golang.org/x/exp/maps"
)

// Origin is used to determine how the peer is identified,
// either it is statically added or discovered via one of the discovery protocols
type Origin int64

const (
    Unknown Origin = iota
    Discv5
    Static
    PeerExchange
    DNSDiscovery
    Rendezvous
    PeerManager
)

const peerOrigin = "origin"
const peerENR = "enr"
const peerDirection = "direction"
const peerPubSubTopics = "pubSubTopics"
const peerScore = "score"

// ConnectionFailures contains connection failure information towards all peers
type ConnectionFailures struct {
    sync.RWMutex
    failures map[peer.ID]int
}

// WakuPeerstoreImpl is a implementation of WakuPeerStore
type WakuPeerstoreImpl struct {
    peerStore    peerstore.Peerstore
    connFailures ConnectionFailures
}

// WakuPeerstore is an interface for implementing WakuPeerStore
type WakuPeerstore interface {
    SetOrigin(p peer.ID, origin Origin) error
    Origin(p peer.ID) (Origin, error)
    PeersByOrigin(origin Origin) peer.IDSlice
    SetENR(p peer.ID, enr *enode.Node) error
    ENR(p peer.ID) (*enode.Node, error)
    AddConnFailure(pID peer.ID)
    ResetConnFailures(pID peer.ID)
    ConnFailures(pID peer.ID) int

    SetDirection(p peer.ID, direction network.Direction) error
    Direction(p peer.ID) (network.Direction, error)

    AddPubSubTopic(p peer.ID, topic string) error
    RemovePubSubTopic(p peer.ID, topic string) error
    PubSubTopics(p peer.ID) (protocol.TopicSet, error)
    SetPubSubTopics(p peer.ID, topics []string) error
    PeersByPubSubTopics(pubSubTopics []string, specificPeers ...peer.ID) peer.IDSlice
    PeersByPubSubTopic(pubSubTopic string, specificPeers ...peer.ID) peer.IDSlice

    SetScore(peer.ID, float64) error
    Score(peer.ID) (float64, error)
}

// NewWakuPeerstore creates a new WakuPeerStore object
func NewWakuPeerstore(p peerstore.Peerstore) peerstore.Peerstore {
    return &WakuPeerstoreImpl{
        peerStore: p,
        connFailures: ConnectionFailures{
            failures: make(map[peer.ID]int),
        },
    }
}

// SetOrigin sets origin for a specific peer.
func (ps *WakuPeerstoreImpl) SetOrigin(p peer.ID, origin Origin) error {
    return ps.peerStore.Put(p, peerOrigin, origin)
}

// Origin fetches the origin for a specific peer.
func (ps *WakuPeerstoreImpl) Origin(p peer.ID) (Origin, error) {
    result, err := ps.peerStore.Get(p, peerOrigin)
    if err != nil {
        return Unknown, err
    }

    return result.(Origin), nil
}

// SetScore sets score for a specific peer.
func (ps *WakuPeerstoreImpl) SetScore(p peer.ID, score float64) error {
    return ps.peerStore.Put(p, peerScore, score)
}

// Score fetches the peerScore for a specific peer.
func (ps *WakuPeerstoreImpl) Score(p peer.ID) (float64, error) {
    result, err := ps.peerStore.Get(p, peerScore)
    if err != nil {
        return -1, err
    }

    return result.(float64), nil
}

// PeersByOrigin returns the list of peers for a specific origin
func (ps *WakuPeerstoreImpl) PeersByOrigin(expectedOrigin Origin) peer.IDSlice {
    var result peer.IDSlice
    for _, p := range ps.Peers() {
        actualOrigin, err := ps.Origin(p)
        if err == nil && actualOrigin == expectedOrigin {
            result = append(result, p)
        }
    }
    return result
}

// SetENR sets the ENR record a peer
func (ps *WakuPeerstoreImpl) SetENR(p peer.ID, enr *enode.Node) error {
    return ps.peerStore.Put(p, peerENR, enr)
}

// ENR fetches the ENR record for a peer
func (ps *WakuPeerstoreImpl) ENR(p peer.ID) (*enode.Node, error) {
    result, err := ps.peerStore.Get(p, peerENR)
    if err != nil {
        return nil, err
    }
    return result.(*enode.Node), nil
}

// AddConnFailure increments connectionFailures for a peer
func (ps *WakuPeerstoreImpl) AddConnFailure(pID peer.ID) {
    ps.connFailures.Lock()
    defer ps.connFailures.Unlock()
    ps.connFailures.failures[pID]++
}

// ResetConnFailures resets connectionFailures for a peer to 0
func (ps *WakuPeerstoreImpl) ResetConnFailures(pID peer.ID) {
    ps.connFailures.Lock()
    defer ps.connFailures.Unlock()
    ps.connFailures.failures[pID] = 0
}

// ConnFailures fetches connectionFailures for a peer
func (ps *WakuPeerstoreImpl) ConnFailures(pID peer.ID) int {
    ps.connFailures.RLock()
    defer ps.connFailures.RUnlock()
    return ps.connFailures.failures[pID]
}

// SetDirection sets connection direction for a specific peer.
func (ps *WakuPeerstoreImpl) SetDirection(p peer.ID, direction network.Direction) error {
    return ps.peerStore.Put(p, peerDirection, direction)
}

// Direction fetches the connection direction (Inbound or outBound) for a specific peer
func (ps *WakuPeerstoreImpl) Direction(p peer.ID) (network.Direction, error) {
    result, err := ps.peerStore.Get(p, peerDirection)
    if err != nil {
        return network.DirUnknown, err
    }

    return result.(network.Direction), nil
}

// AddPubSubTopic adds a new pubSubTopic for a peer
func (ps *WakuPeerstoreImpl) AddPubSubTopic(p peer.ID, topic string) error {
    existingTopics, err := ps.PubSubTopics(p)
    if err != nil {
        return err
    }

    if _, found := existingTopics[topic]; found {
        return nil
    }
    existingTopics[topic] = struct{}{}
    return ps.peerStore.Put(p, peerPubSubTopics, maps.Keys(existingTopics))
}

// RemovePubSubTopic removes a pubSubTopic from the peer
func (ps *WakuPeerstoreImpl) RemovePubSubTopic(p peer.ID, topic string) error {
    existingTopics, err := ps.PubSubTopics(p)
    if err != nil {
        return err
    }

    if len(existingTopics) == 0 {
        return nil
    }

    delete(existingTopics, topic)

    err = ps.SetPubSubTopics(p, maps.Keys(existingTopics))
    if err != nil {
        return err
    }
    return nil
}

// SetPubSubTopics sets pubSubTopics for a peer, it also overrides existing ones that were set previously..
func (ps *WakuPeerstoreImpl) SetPubSubTopics(p peer.ID, topics []string) error {
    return ps.peerStore.Put(p, peerPubSubTopics, topics)
}

// PubSubTopics fetches list of pubSubTopics for a peer
func (ps *WakuPeerstoreImpl) PubSubTopics(p peer.ID) (protocol.TopicSet, error) {
    result, err := ps.peerStore.Get(p, peerPubSubTopics)
    if err != nil {
        if errors.Is(err, peerstore.ErrNotFound) {
            return protocol.NewTopicSet(), nil
        } else {
            return nil, err
        }
    }
    return protocol.NewTopicSet((result.([]string))...), nil
}

// PeersByPubSubTopic Returns list of peers that support list of pubSubTopics
// If specifiPeers are listed, filtering is done from them otherwise from all peers in peerstore
func (ps *WakuPeerstoreImpl) PeersByPubSubTopics(pubSubTopics []string, specificPeers ...peer.ID) peer.IDSlice {
    if specificPeers == nil {
        specificPeers = ps.Peers()
    }
    var result peer.IDSlice
    for _, p := range specificPeers {
        peerMatch := true
        peerTopics, err := ps.PubSubTopics(p)
        if err == nil {
            for _, t := range pubSubTopics {
                if _, ok := peerTopics[t]; !ok {
                    peerMatch = false
                    break
                }
            }
            if peerMatch {
                result = append(result, p)
            }
        } //Note: skipping a peer in case of an error as there would be others available.
    }
    return result
}

// PeersByPubSubTopic Returns list of peers that support a single pubSubTopic
// If specifiPeers are listed, filtering is done from them otherwise from all peers in peerstore
func (ps *WakuPeerstoreImpl) PeersByPubSubTopic(pubSubTopic string, specificPeers ...peer.ID) peer.IDSlice {
    if specificPeers == nil {
        specificPeers = ps.Peers()
    }
    var result peer.IDSlice
    for _, p := range specificPeers {
        topics, err := ps.PubSubTopics(p)
        if err == nil {
            for topic := range topics {
                if topic == pubSubTopic {
                    result = append(result, p)
                }
            }
        } //Note: skipping a peer in case of an error as there would be others available.
    }
    return result
}