waku-org/go-waku

View on GitHub
waku/v2/peermanager/peer_discovery.go

Summary

Maintainability
A
1 hr
Test Coverage
B
82%
package peermanager

import (
    "context"
    "errors"

    "github.com/libp2p/go-libp2p/core/protocol"
    "github.com/waku-org/go-waku/waku/v2/discv5"
    wps "github.com/waku-org/go-waku/waku/v2/peerstore"
    waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
    wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
    "github.com/waku-org/go-waku/waku/v2/protocol/relay"
    "github.com/waku-org/go-waku/waku/v2/service"
    "github.com/waku-org/go-waku/waku/v2/utils"
    "go.uber.org/zap"
)

// DiscoverAndConnectToPeers discovers peers using discoveryv5 and connects to the peers.
// It discovers peers till maxCount peers are found for the cluster,shard and protocol or the context passed expires.
func (pm *PeerManager) DiscoverAndConnectToPeers(ctx context.Context, cluster uint16,
    shard uint16, serviceProtocol protocol.ID, maxCount int) error {
    if pm.discoveryService == nil {
        return nil
    }
    peers, err := pm.discoverOnDemand(cluster, shard, serviceProtocol, ctx, maxCount)
    if err != nil {
        return err
    }

    pm.logger.Debug("discovered peers on demand ", zap.Int("noOfPeers", len(peers)))
    connectNow := false
    //Add discovered peers to peerStore and connect to them
    for idx, p := range peers {
        if serviceProtocol != relay.WakuRelayID_v200 && idx <= maxCount {
            //how many connections to initiate? Maybe this could be a config exposed to client API.
            //For now just going ahead with initiating connections with 2 nodes in case of non-relay service peers
            //In case of relay let it go through connectivityLoop
            connectNow = true
        }
        pm.AddDiscoveredPeer(p, connectNow)
    }
    return nil
}

// RegisterWakuProtocol to be used by Waku protocols that could be used for peer discovery
// Which means protoocl should be as defined in waku2 ENR key in https://rfc.vac.dev/spec/31/.
func (pm *PeerManager) RegisterWakuProtocol(proto protocol.ID, bitField uint8) {
    pm.wakuprotoToENRFieldMap[proto] = WakuProtoInfo{waku2ENRBitField: bitField}
}

// OnDemandPeerDiscovery initiates an on demand peer discovery and
// filters peers based on cluster,shard and any wakuservice protocols specified
func (pm *PeerManager) discoverOnDemand(cluster uint16,
    shard uint16, wakuProtocol protocol.ID, ctx context.Context, maxCount int) ([]service.PeerData, error) {
    var peers []service.PeerData

    wakuProtoInfo, ok := pm.wakuprotoToENRFieldMap[wakuProtocol]
    if !ok {
        pm.logger.Warn("cannot do on demand discovery for non-waku protocol", zap.String("protocol", string(wakuProtocol)))
        return nil, errors.New("cannot do on demand discovery for non-waku protocol")
    }
    iterator, err := pm.discoveryService.PeerIterator(
        discv5.FilterShard(cluster, shard),
        discv5.FilterCapabilities(wakuProtoInfo.waku2ENRBitField),
    )
    if err != nil {
        pm.logger.Error("failed to find peers for shard and services", zap.Uint16("cluster", cluster),
            zap.Uint16("shard", shard), zap.String("service", string(wakuProtocol)), zap.Error(err))
        return peers, err
    }

    //Iterate and fill peers.
    defer iterator.Close()

    for iterator.Next() {

        pInfo, err := wenr.EnodeToPeerInfo(iterator.Node())
        if err != nil {
            continue
        }
        pData := service.PeerData{
            Origin:   wps.Discv5,
            ENR:      iterator.Node(),
            AddrInfo: *pInfo,
        }
        peers = append(peers, pData)

        if len(peers) >= maxCount {
            pm.logger.Debug("found required number of nodes, stopping on demand discovery", zap.Uint16("cluster", cluster),
                zap.Uint16("shard", shard), zap.Int("required-nodes", maxCount))
            break
        }

        select {
        case <-ctx.Done():
            pm.logger.Error("failed to find peers for shard and services", zap.Uint16("cluster", cluster),
                zap.Uint16("shard", shard), zap.String("service", string(wakuProtocol)), zap.Error(ctx.Err()))
            return nil, ctx.Err()
        default:
        }

    }
    return peers, nil
}

func (pm *PeerManager) discoverPeersByPubsubTopics(pubsubTopics []string, proto protocol.ID, ctx context.Context, maxCount int) {
    defer utils.LogOnPanic()
    shardsInfo, err := waku_proto.TopicsToRelayShards(pubsubTopics...)
    if err != nil {
        pm.logger.Error("failed to convert pubsub topic to shard", zap.Strings("topics", pubsubTopics), zap.Error(err))
        return
    }
    if len(shardsInfo) > 0 {
        for _, shardInfo := range shardsInfo {
            err = pm.DiscoverAndConnectToPeers(ctx, shardInfo.ClusterID, shardInfo.ShardIDs[0], proto, maxCount)
            if err != nil {
                pm.logger.Warn("failed to discover and connect to peers", zap.Error(err))
            }
        }
    } else {
        pm.logger.Debug("failed to convert pubsub topics to shards as one of the topics is named pubsubTopic", zap.Strings("topics", pubsubTopics))
    }
}