waku/v2/peermanager/peer_manager.go
package peermanager
import (
"context"
"errors"
"sync"
"time"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
ma "github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/discv5"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/protocol/metadata"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/service"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
type TopicHealth int
const (
UnHealthy = iota
MinimallyHealthy = 1
SufficientlyHealthy = 2
)
func (t TopicHealth) String() string {
switch t {
case UnHealthy:
return "UnHealthy"
case MinimallyHealthy:
return "MinimallyHealthy"
case SufficientlyHealthy:
return "SufficientlyHealthy"
default:
return ""
}
}
type TopicHealthStatus struct {
Topic string
Health TopicHealth
}
// NodeTopicDetails stores pubSubTopic related data like topicHandle for the node.
type NodeTopicDetails struct {
topic *pubsub.Topic
healthStatus TopicHealth
}
// WakuProtoInfo holds protocol specific info
// To be used at a later stage to set various config such as criteria for peer management specific to each Waku protocols
// This should make peer-manager agnostic to protocol
type WakuProtoInfo struct {
waku2ENRBitField uint8
}
// PeerManager applies various controls and manage connections towards peers.
type PeerManager struct {
peerConnector *PeerConnectionStrategy
metadata *metadata.WakuMetadata
relay *relay.WakuRelay
maxPeers int
maxRelayPeers int
logger *zap.Logger
InPeersTarget int
OutPeersTarget int
host host.Host
serviceSlots *ServiceSlots
ctx context.Context
sub event.Subscription
topicMutex sync.RWMutex
subRelayTopics map[string]*NodeTopicDetails
discoveryService *discv5.DiscoveryV5
wakuprotoToENRFieldMap map[protocol.ID]WakuProtoInfo
TopicHealthNotifCh chan<- TopicHealthStatus
rttCache *FastestPeerSelector
RelayEnabled bool
evtDialError event.Emitter
}
// PeerSelection provides various options based on which Peer is selected from a list of peers.
type PeerSelection int
const (
Automatic PeerSelection = iota
LowestRTT
)
const maxFailedAttempts = 5
const prunePeerStoreInterval = 10 * time.Minute
const peerConnectivityLoopSecs = 15
const maxConnsToPeerRatio = 3
const badPeersCleanupInterval = 1 * time.Minute
const maxDialFailures = 2
// 80% relay peers 20% service peers
func relayAndServicePeers(maxConnections int) (int, int) {
return maxConnections - maxConnections/5, maxConnections / 5
}
// 66% inRelayPeers 33% outRelayPeers
func inAndOutRelayPeers(relayPeers int) (int, int) {
outRelayPeers := relayPeers / 3
//
const minOutRelayConns = 10
if outRelayPeers < minOutRelayConns {
outRelayPeers = minOutRelayConns
}
return relayPeers - outRelayPeers, outRelayPeers
}
// checkAndUpdateTopicHealth finds health of specified topic and updates and notifies of the same.
// Also returns the healthyPeerCount
func (pm *PeerManager) checkAndUpdateTopicHealth(topic *NodeTopicDetails) int {
if topic == nil {
return 0
}
healthyPeerCount := 0
for _, p := range pm.relay.PubSub().MeshPeers(topic.topic.String()) {
if pm.host.Network().Connectedness(p) == network.Connected {
pThreshold, err := pm.host.Peerstore().(wps.WakuPeerstore).Score(p)
if err == nil {
if pThreshold < relay.PeerPublishThreshold {
pm.logger.Debug("peer score below publish threshold", zap.Stringer("peer", p), zap.Float64("score", pThreshold))
} else {
healthyPeerCount++
}
} else {
if errors.Is(err, peerstore.ErrNotFound) {
// For now considering peer as healthy if we can't fetch score.
healthyPeerCount++
pm.logger.Debug("peer score is not available yet", zap.Stringer("peer", p))
} else {
pm.logger.Warn("failed to fetch peer score ", zap.Error(err), zap.Stringer("peer", p))
}
}
}
}
//Update topic's health
oldHealth := topic.healthStatus
if healthyPeerCount < 1 { //Ideally this check should be done with minPeersForRelay, but leaving it as is for now.
topic.healthStatus = UnHealthy
} else if healthyPeerCount < waku_proto.GossipSubDMin {
topic.healthStatus = MinimallyHealthy
} else {
topic.healthStatus = SufficientlyHealthy
}
if oldHealth != topic.healthStatus {
//Check old health, and if there is a change notify of the same.
pm.logger.Debug("topic health has changed", zap.String("pubsubtopic", topic.topic.String()), zap.Stringer("health", topic.healthStatus))
pm.TopicHealthNotifCh <- TopicHealthStatus{topic.topic.String(), topic.healthStatus}
}
return healthyPeerCount
}
// TopicHealth can be used to fetch health of a specific pubsubTopic.
// Returns error if topic is not found.
func (pm *PeerManager) TopicHealth(pubsubTopic string) (TopicHealth, error) {
pm.topicMutex.RLock()
defer pm.topicMutex.RUnlock()
topicDetails, ok := pm.subRelayTopics[pubsubTopic]
if !ok {
return UnHealthy, errors.New("topic not found")
}
return topicDetails.healthStatus, nil
}
// NewPeerManager creates a new peerManager instance.
func NewPeerManager(maxConnections int, maxPeers int, metadata *metadata.WakuMetadata, relay *relay.WakuRelay, relayEnabled bool, logger *zap.Logger) *PeerManager {
var inPeersTarget, outPeersTarget, maxRelayPeers int
if relayEnabled {
maxRelayPeers, _ := relayAndServicePeers(maxConnections)
inPeersTarget, outPeersTarget = inAndOutRelayPeers(maxRelayPeers)
if maxPeers == 0 || maxConnections > maxPeers {
maxPeers = maxConnsToPeerRatio * maxConnections
}
} else {
maxRelayPeers = 0
inPeersTarget = 0
//TODO: ideally this should be 2 filter peers per topic, 2 lightpush peers per topic and 2-4 store nodes per topic
outPeersTarget = 10
}
pm := &PeerManager{
logger: logger.Named("peer-manager"),
metadata: metadata,
relay: relay,
maxRelayPeers: maxRelayPeers,
InPeersTarget: inPeersTarget,
OutPeersTarget: outPeersTarget,
serviceSlots: NewServiceSlot(),
subRelayTopics: make(map[string]*NodeTopicDetails),
maxPeers: maxPeers,
wakuprotoToENRFieldMap: map[protocol.ID]WakuProtoInfo{},
rttCache: NewFastestPeerSelector(logger),
RelayEnabled: relayEnabled,
}
logger.Info("PeerManager init values", zap.Int("maxConnections", maxConnections),
zap.Int("maxRelayPeers", maxRelayPeers),
zap.Int("outPeersTarget", outPeersTarget),
zap.Int("inPeersTarget", pm.InPeersTarget),
zap.Int("maxPeers", maxPeers))
return pm
}
// SetDiscv5 sets the discoveryv5 service to be used for peer discovery.
func (pm *PeerManager) SetDiscv5(discv5 *discv5.DiscoveryV5) {
pm.discoveryService = discv5
}
// SetHost sets the host to be used in order to access the peerStore.
func (pm *PeerManager) SetHost(host host.Host) {
pm.host = host
pm.rttCache.SetHost(host)
}
// SetPeerConnector sets the peer connector to be used for establishing relay connections.
func (pm *PeerManager) SetPeerConnector(pc *PeerConnectionStrategy) {
pm.peerConnector = pc
}
// Start starts the processing to be done by peer manager.
func (pm *PeerManager) Start(ctx context.Context) {
pm.ctx = ctx
if pm.RelayEnabled {
pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField)
if pm.sub != nil {
go pm.peerEventLoop(ctx)
}
go pm.connectivityLoop(ctx)
}
go pm.peerStoreLoop(ctx)
if pm.host != nil {
var err error
pm.evtDialError, err = pm.host.EventBus().Emitter(new(utils.DialError))
if err != nil {
pm.logger.Error("failed to create dial error emitter", zap.Error(err))
}
}
}
func (pm *PeerManager) removeBadPeers() {
if !pm.RelayEnabled {
for _, peerID := range pm.host.Peerstore().Peers() {
if pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) > maxDialFailures {
//delete peer from peerStore
pm.logger.Debug("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID))
pm.RemovePeer(peerID)
}
}
}
}
func (pm *PeerManager) peerStoreLoop(ctx context.Context) {
defer utils.LogOnPanic()
t := time.NewTicker(prunePeerStoreInterval)
t1 := time.NewTicker(badPeersCleanupInterval)
defer t.Stop()
defer t1.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
pm.prunePeerStore()
case <-t1.C:
pm.removeBadPeers()
}
}
}
func (pm *PeerManager) prunePeerStore() {
peers := pm.host.Peerstore().Peers()
numPeers := len(peers)
if numPeers < pm.maxPeers {
pm.logger.Debug("peerstore size within capacity, not pruning", zap.Int("capacity", pm.maxPeers), zap.Int("numPeers", numPeers))
return
}
peerCntBeforePruning := numPeers
pm.logger.Debug("peerstore capacity exceeded, hence pruning", zap.Int("capacity", pm.maxPeers), zap.Int("numPeers", peerCntBeforePruning))
for _, peerID := range peers {
connFailues := pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID)
if connFailues > maxFailedAttempts {
// safety check so that we don't end up disconnecting connected peers.
if pm.host.Network().Connectedness(peerID) == network.Connected {
pm.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(peerID)
continue
}
pm.host.Peerstore().RemovePeer(peerID)
numPeers--
}
if numPeers < pm.maxPeers {
pm.logger.Debug("finished pruning peer store", zap.Int("capacity", pm.maxPeers), zap.Int("beforeNumPeers", peerCntBeforePruning), zap.Int("afterNumPeers", numPeers))
return
}
}
notConnectedPeers := pm.getPeersBasedOnconnectionStatus("", network.NotConnected)
peersByTopic := make(map[string]peer.IDSlice)
var prunedPeers peer.IDSlice
//prune not connected peers without shard
for _, peerID := range notConnectedPeers {
topics, err := pm.host.Peerstore().(wps.WakuPeerstore).PubSubTopics(peerID)
//Prune peers without pubsubtopics.
if err != nil || len(topics) == 0 {
if err != nil {
pm.logger.Error("pruning:failed to fetch pubsub topics", zap.Error(err), zap.Stringer("peer", peerID))
}
prunedPeers = append(prunedPeers, peerID)
pm.host.Peerstore().RemovePeer(peerID)
numPeers--
} else {
prunedPeers = append(prunedPeers, peerID)
for topic := range topics {
peersByTopic[topic] = append(peersByTopic[topic], peerID)
}
}
if numPeers < pm.maxPeers {
pm.logger.Debug("finished pruning peer store", zap.Int("capacity", pm.maxPeers), zap.Int("beforeNumPeers", peerCntBeforePruning), zap.Int("afterNumPeers", numPeers), zap.Stringers("prunedPeers", prunedPeers))
return
}
}
pm.logger.Debug("pruned notconnected peers", zap.Stringers("prunedPeers", prunedPeers))
// calculate the avg peers per shard
total, maxPeerCnt := 0, 0
for _, peersInTopic := range peersByTopic {
peerLen := len(peersInTopic)
total += peerLen
if peerLen > maxPeerCnt {
maxPeerCnt = peerLen
}
}
avgPerTopic := min(1, total/maxPeerCnt)
// prune peers from shard with higher than avg count
for topic, peers := range peersByTopic {
count := max(len(peers)-avgPerTopic, 0)
var prunedPeers peer.IDSlice
for i, pID := range peers {
if i > count {
break
}
prunedPeers = append(prunedPeers, pID)
pm.host.Peerstore().RemovePeer(pID)
numPeers--
if numPeers < pm.maxPeers {
pm.logger.Debug("finished pruning peer store", zap.Int("capacity", pm.maxPeers), zap.Int("beforeNumPeers", peerCntBeforePruning), zap.Int("afterNumPeers", numPeers), zap.Stringers("prunedPeers", prunedPeers))
return
}
}
pm.logger.Debug("pruned peers higher than average", zap.Stringers("prunedPeers", prunedPeers), zap.String("topic", topic))
}
pm.logger.Debug("finished pruning peer store", zap.Int("capacity", pm.maxPeers), zap.Int("beforeNumPeers", peerCntBeforePruning), zap.Int("afterNumPeers", numPeers))
}
// This is a connectivity loop, which currently checks and prunes inbound connections.
func (pm *PeerManager) connectivityLoop(ctx context.Context) {
defer utils.LogOnPanic()
pm.connectToPeers()
t := time.NewTicker(peerConnectivityLoopSecs * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
pm.connectToPeers()
}
}
}
// GroupPeersByDirection returns all the connected peers in peer store grouped by Inbound or outBound direction
func (pm *PeerManager) GroupPeersByDirection(specificPeers ...peer.ID) (inPeers peer.IDSlice, outPeers peer.IDSlice, err error) {
if len(specificPeers) == 0 {
specificPeers = pm.host.Network().Peers()
}
for _, p := range specificPeers {
direction, err := pm.host.Peerstore().(wps.WakuPeerstore).Direction(p)
if err == nil {
if direction == network.DirInbound {
inPeers = append(inPeers, p)
} else if direction == network.DirOutbound {
outPeers = append(outPeers, p)
}
} else {
pm.logger.Error("failed to retrieve peer direction",
zap.Stringer("peerID", p), zap.Error(err))
}
}
return inPeers, outPeers, nil
}
// getRelayPeers - Returns list of in and out peers supporting WakuRelayProtocol within specifiedPeers.
// If specifiedPeers is empty, it checks within all peers in peerStore.
func (pm *PeerManager) getRelayPeers(specificPeers ...peer.ID) (inRelayPeers peer.IDSlice, outRelayPeers peer.IDSlice) {
//Group peers by their connected direction inbound or outbound.
inPeers, outPeers, err := pm.GroupPeersByDirection(specificPeers...)
if err != nil {
return
}
pm.logger.Debug("number of peers connected", zap.Int("inPeers", inPeers.Len()),
zap.Int("outPeers", outPeers.Len()))
//Need to filter peers to check if they support relay
if inPeers.Len() != 0 {
inRelayPeers, _ = pm.FilterPeersByProto(inPeers, nil, relay.WakuRelayID_v200)
}
if outPeers.Len() != 0 {
outRelayPeers, _ = pm.FilterPeersByProto(outPeers, nil, relay.WakuRelayID_v200)
}
return
}
// ensureMinRelayConnsPerTopic makes sure there are min of D conns per pubsubTopic.
// If not it will look into peerStore to initiate more connections.
// If peerStore doesn't have enough peers, will wait for discv5 to find more and try in next cycle
func (pm *PeerManager) ensureMinRelayConnsPerTopic() {
pm.topicMutex.RLock()
defer pm.topicMutex.RUnlock()
for topicStr, topicInst := range pm.subRelayTopics {
meshPeerLen := pm.checkAndUpdateTopicHealth(topicInst)
curConnectedPeerLen := pm.getPeersBasedOnconnectionStatus(topicStr, network.Connected).Len()
if meshPeerLen < waku_proto.GossipSubDMin || curConnectedPeerLen < pm.OutPeersTarget {
pm.logger.Debug("subscribed topic has not reached target peers, initiating more connections to maintain healthy mesh",
zap.String("pubSubTopic", topicStr), zap.Int("connectedPeerCount", curConnectedPeerLen),
zap.Int("targetPeers", pm.OutPeersTarget))
//Find not connected peers.
notConnectedPeers := pm.getPeersBasedOnconnectionStatus(topicStr, network.NotConnected)
if notConnectedPeers.Len() == 0 {
pm.logger.Debug("could not find any peers in peerstore to connect to, discovering more", zap.String("pubSubTopic", topicStr))
go pm.discoverPeersByPubsubTopics([]string{topicStr}, relay.WakuRelayID_v200, pm.ctx, 2)
continue
}
pm.logger.Debug("connecting to eligible peers in peerstore", zap.String("pubSubTopic", topicStr))
//Connect to eligible peers.
numPeersToConnect := pm.OutPeersTarget - curConnectedPeerLen
if numPeersToConnect > 0 {
if numPeersToConnect > notConnectedPeers.Len() {
numPeersToConnect = notConnectedPeers.Len()
}
pm.connectToSpecifiedPeers(notConnectedPeers[0:numPeersToConnect])
}
}
}
}
// connectToPeers ensures minimum D connections are there for each pubSubTopic.
// If not, initiates connections to additional peers.
// It also checks for incoming relay connections and prunes once they cross inRelayTarget
func (pm *PeerManager) connectToPeers() {
if pm.RelayEnabled {
//Check for out peer connections and connect to more peers.
pm.ensureMinRelayConnsPerTopic()
inRelayPeers, outRelayPeers := pm.getRelayPeers()
pm.logger.Debug("number of relay peers connected",
zap.Int("in", inRelayPeers.Len()),
zap.Int("out", outRelayPeers.Len()))
if inRelayPeers.Len() > 0 &&
inRelayPeers.Len() > pm.InPeersTarget {
pm.pruneInRelayConns(inRelayPeers)
}
} else {
//TODO: Connect to filter peers per topic as of now.
//Fetch filter peers from peerStore, TODO: topics for lightNode not available here?
//Filter subscribe to notify peerManager whenever a new topic/shard is subscribed to.
pm.logger.Debug("light mode..not doing anything")
}
}
// connectToSpecifiedPeers connects to peers provided in the list if the addresses have not expired.
func (pm *PeerManager) connectToSpecifiedPeers(peers peer.IDSlice) {
for _, peerID := range peers {
peerData := AddrInfoToPeerData(wps.PeerManager, peerID, pm.host)
if peerData == nil {
continue
}
pm.peerConnector.PushToChan(*peerData)
}
}
// getPeersBasedOnconnectionStatus returns peers for a pubSubTopic that are either connected/not-connected based on status passed.
func (pm *PeerManager) getPeersBasedOnconnectionStatus(pubsubTopic string, connected network.Connectedness) (filteredPeers peer.IDSlice) {
var peerList peer.IDSlice
if pubsubTopic == "" {
peerList = pm.host.Peerstore().Peers()
} else {
peerList = pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubsubTopic)
}
for _, peerID := range peerList {
if pm.host.Network().Connectedness(peerID) == connected {
filteredPeers = append(filteredPeers, peerID)
}
}
return
}
// pruneInRelayConns prune any incoming relay connections crossing derived inrelayPeerTarget
func (pm *PeerManager) pruneInRelayConns(inRelayPeers peer.IDSlice) {
//Start disconnecting peers, based on what?
//For now no preference is used
//TODO: Need to have more intelligent way of doing this, maybe peer scores.
//TODO: Keep optimalPeersRequired for a pubSubTopic in mind while pruning connections to peers.
pm.logger.Info("peer connections exceed target relay peers, hence pruning",
zap.Int("cnt", inRelayPeers.Len()), zap.Int("target", pm.InPeersTarget))
for pruningStartIndex := pm.InPeersTarget; pruningStartIndex < inRelayPeers.Len(); pruningStartIndex++ {
p := inRelayPeers[pruningStartIndex]
err := pm.host.Network().ClosePeer(p)
if err != nil {
pm.logger.Warn("failed to disconnect connection towards peer",
zap.Stringer("peerID", p))
}
pm.logger.Debug("successfully disconnected connection towards peer",
zap.Stringer("peerID", p))
}
}
func (pm *PeerManager) processPeerENR(p *service.PeerData) []protocol.ID {
shards, err := wenr.RelaySharding(p.ENR.Record())
if err != nil {
pm.logger.Error("could not derive relayShards from ENR", zap.Error(err),
zap.Stringer("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String()))
} else {
if shards != nil {
p.PubsubTopics = make([]string, 0)
topics := shards.Topics()
for _, topic := range topics {
topicStr := topic.String()
p.PubsubTopics = append(p.PubsubTopics, topicStr)
}
} else {
pm.logger.Debug("ENR doesn't have relay shards", zap.Stringer("peer", p.AddrInfo.ID))
}
}
supportedProtos := []protocol.ID{}
//Identify and specify protocols supported by the peer based on the discovered peer's ENR
enrField, err := wenr.GetWakuEnrBitField(p.ENR)
if err == nil {
for proto, protoENR := range pm.wakuprotoToENRFieldMap {
protoENRField := protoENR.waku2ENRBitField
if protoENRField&enrField != 0 {
supportedProtos = append(supportedProtos, proto)
//Add Service peers to serviceSlots.
pm.addPeerToServiceSlot(proto, p.AddrInfo.ID)
}
}
}
return supportedProtos
}
// AddDiscoveredPeer to add dynamically discovered peers.
// Note that these peers will not be set in service-slots.
func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) {
//Check if the peer is already present, if so skip adding
_, err := pm.host.Peerstore().(wps.WakuPeerstore).Origin(p.AddrInfo.ID)
if err == nil {
//Add addresses if existing addresses have expired
existingAddrs := pm.host.Peerstore().Addrs(p.AddrInfo.ID)
if len(existingAddrs) == 0 {
pm.host.Peerstore().AddAddrs(p.AddrInfo.ID, p.AddrInfo.Addrs, peerstore.AddressTTL)
}
enr, err := pm.host.Peerstore().(wps.WakuPeerstore).ENR(p.AddrInfo.ID)
// Verifying if the enr record is more recent (DiscV5 and peer exchange can return peers already seen)
if err == nil {
if p.ENR != nil {
if enr.Record().Seq() >= p.ENR.Seq() {
return
}
//Peer is already in peer-store but stored ENR is older than discovered one.
pm.logger.Info("peer already found in peerstore, but re-adding it as ENR sequence is higher than locally stored",
zap.Stringer("peer", p.AddrInfo.ID), logging.Uint64("newENRSeq", p.ENR.Record().Seq()), logging.Uint64("storedENRSeq", enr.Record().Seq()))
} else {
pm.logger.Info("peer already found in peerstore, but no new ENR", zap.Stringer("peer", p.AddrInfo.ID))
}
} else {
//Peer is in peer-store but it doesn't have an enr
pm.logger.Info("peer already found in peerstore, but doesn't have an ENR record, re-adding",
zap.Stringer("peer", p.AddrInfo.ID))
}
}
pm.logger.Debug("adding discovered peer", zap.Stringer("peerID", p.AddrInfo.ID))
supportedProtos := []protocol.ID{}
if len(p.PubsubTopics) == 0 && p.ENR != nil {
// Try to fetch shard info and supported protocols from ENR to arrive at pubSub topics.
supportedProtos = pm.processPeerENR(&p)
}
_ = pm.addPeer(p.AddrInfo.ID, p.AddrInfo.Addrs, p.Origin, p.PubsubTopics, supportedProtos...)
if p.ENR != nil {
pm.logger.Debug("setting ENR for peer", zap.Stringer("peerID", p.AddrInfo.ID), zap.Stringer("enr", p.ENR))
err := pm.host.Peerstore().(wps.WakuPeerstore).SetENR(p.AddrInfo.ID, p.ENR)
if err != nil {
pm.logger.Error("could not store enr", zap.Error(err),
zap.Stringer("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String()))
}
}
if connectNow {
pm.logger.Debug("connecting now to discovered peer", zap.Stringer("peer", p.AddrInfo.ID))
go pm.peerConnector.PushToChan(p)
}
}
// addPeer adds peer to the peerStore.
// It also sets additional metadata such as origin and supported protocols
func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) error {
pm.logger.Info("adding peer to peerstore", zap.Stringer("peer", ID))
if origin == wps.Static {
pm.host.Peerstore().AddAddrs(ID, addrs, peerstore.PermanentAddrTTL)
} else {
//Need to re-evaluate the address expiry
// For now expiring them with default addressTTL which is an hour.
pm.host.Peerstore().AddAddrs(ID, addrs, peerstore.AddressTTL)
}
err := pm.host.Peerstore().(wps.WakuPeerstore).SetOrigin(ID, origin)
if err != nil {
pm.logger.Error("could not set origin", zap.Error(err), zap.Stringer("peer", ID))
return err
}
if len(protocols) > 0 {
err = pm.host.Peerstore().AddProtocols(ID, protocols...)
if err != nil {
pm.logger.Error("could not set protocols", zap.Error(err), zap.Stringer("peer", ID))
return err
}
}
if len(pubSubTopics) == 0 {
// Probably the peer is discovered via DNSDiscovery (for which we don't have pubSubTopic info)
//If pubSubTopic and enr is empty or no shard info in ENR,then set to defaultPubSubTopic
pubSubTopics = []string{relay.DefaultWakuTopic}
}
err = pm.host.Peerstore().(wps.WakuPeerstore).SetPubSubTopics(ID, pubSubTopics)
if err != nil {
pm.logger.Error("could not store pubSubTopic", zap.Error(err),
zap.Stringer("peer", ID), zap.Strings("topics", pubSubTopics))
}
return nil
}
func AddrInfoToPeerData(origin wps.Origin, peerID peer.ID, host host.Host, pubsubTopics ...string) *service.PeerData {
addrs := host.Peerstore().Addrs(peerID)
if len(addrs) == 0 {
//Addresses expired, remove peer from peerStore
host.Peerstore().RemovePeer(peerID)
return nil
}
return &service.PeerData{
Origin: origin,
AddrInfo: peer.AddrInfo{
ID: peerID,
Addrs: addrs,
},
PubsubTopics: pubsubTopics,
}
}
// AddPeer adds peer to the peerStore and also to service slots
func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubsubTopics []string, protocols ...protocol.ID) (*service.PeerData, error) {
//Assuming all addresses have peerId
info, err := peer.AddrInfoFromP2pAddr(address)
if err != nil {
return nil, err
}
//Add Service peers to serviceSlots.
for _, proto := range protocols {
pm.addPeerToServiceSlot(proto, info.ID)
}
//Add to the peer-store
err = pm.addPeer(info.ID, info.Addrs, origin, pubsubTopics, protocols...)
if err != nil {
return nil, err
}
pData := &service.PeerData{
Origin: origin,
AddrInfo: peer.AddrInfo{
ID: info.ID,
Addrs: info.Addrs,
},
PubsubTopics: pubsubTopics,
}
return pData, nil
}
// Connect establishes a connection to a
func (pm *PeerManager) Connect(pData *service.PeerData) {
go pm.peerConnector.PushToChan(*pData)
}
// RemovePeer deletes peer from the peerStore after disconnecting it.
// It also removes the peer from serviceSlot.
func (pm *PeerManager) RemovePeer(peerID peer.ID) {
pm.host.Peerstore().RemovePeer(peerID)
//Search if this peer is in serviceSlot and if so, remove it from there
// TODO:Add another peer which is statically configured to the serviceSlot.
pm.serviceSlots.removePeer(peerID)
}
// addPeerToServiceSlot adds a peerID to serviceSlot.
// Adding to peerStore is expected to be already done by caller.
// If relay proto is passed, it is not added to serviceSlot.
func (pm *PeerManager) addPeerToServiceSlot(proto protocol.ID, peerID peer.ID) {
if proto == relay.WakuRelayID_v200 {
pm.logger.Debug("cannot add Relay peer to service peer slots")
return
}
//For now adding the peer to serviceSlot which means the latest added peer would be given priority.
//TODO: Ideally we should sort the peers per service and return best peer based on peer score or RTT etc.
pm.logger.Info("adding peer to service slots", zap.Stringer("peer", peerID),
zap.String("service", string(proto)))
// getPeers returns nil for WakuRelayIDv200 protocol, but we don't run this ServiceSlot code for WakuRelayIDv200 protocol
pm.serviceSlots.getPeers(proto).add(peerID)
}
func (pm *PeerManager) HandleDialError(err error, peerID peer.ID) {
if err == nil || errors.Is(err, context.Canceled) {
return
}
if pm.peerConnector != nil {
pm.peerConnector.addConnectionBackoff(peerID)
}
if pm.host != nil {
pm.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(peerID)
}
pm.logger.Warn("connecting to peer", logging.HostID("peerID", peerID), zap.Error(err))
if pm.evtDialError != nil {
emitterErr := pm.evtDialError.Emit(utils.DialError{Err: err, PeerID: peerID})
if emitterErr != nil {
pm.logger.Error("failed to emit DialError", zap.Error(emitterErr))
}
}
if !pm.RelayEnabled && pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) >= maxDialFailures {
//delete peer from peerStore
pm.logger.Debug("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID))
pm.RemovePeer(peerID)
}
}