waku-org/go-waku

View on GitHub
waku/v2/peermanager/peer_connector.go

Summary

Maintainability
A
35 mins
Test Coverage
B
80%
package peermanager

// Adapted from github.com/libp2p/go-libp2p@v0.23.2/p2p/discovery/backoff/backoffconnector.go

import (
    "context"
    "math/rand"
    "sync"
    "sync/atomic"
    "time"

    "github.com/libp2p/go-libp2p/core/host"
    "github.com/libp2p/go-libp2p/core/network"
    "github.com/libp2p/go-libp2p/core/peer"

    "github.com/libp2p/go-libp2p/p2p/discovery/backoff"
    "github.com/waku-org/go-waku/logging"
    "github.com/waku-org/go-waku/waku/v2/onlinechecker"
    wps "github.com/waku-org/go-waku/waku/v2/peerstore"
    "github.com/waku-org/go-waku/waku/v2/service"
    "github.com/waku-org/go-waku/waku/v2/utils"

    "go.uber.org/zap"

    lru "github.com/hashicorp/golang-lru"
)

// PeerConnectionStrategy is a utility to connect to peers,
// but only if we have not recently tried connecting to them already
type PeerConnectionStrategy struct {
    mux           sync.Mutex
    cache         *lru.TwoQueueCache
    host          host.Host
    pm            *PeerManager
    onlineChecker onlinechecker.OnlineChecker

    paused      atomic.Bool
    dialTimeout time.Duration
    *service.CommonDiscoveryService
    subscriptions []subscription

    backoff backoff.BackoffFactory
    logger  *zap.Logger
}

type subscription struct {
    ctx context.Context
    ch  <-chan service.PeerData
}

// backoff describes the strategy used to decide how long to backoff after previously attempting to connect to a peer
func getBackOff() backoff.BackoffFactory {
    rngSrc := rand.NewSource(rand.Int63())
    minBackoff, maxBackoff := time.Minute, time.Hour
    bkf := backoff.NewExponentialBackoff(minBackoff, maxBackoff, backoff.FullJitter, time.Second, 5.0, 0, rand.New(rngSrc))
    return bkf
}

// NewPeerConnectionStrategy creates a utility to connect to peers,
// but only if we have not recently tried connecting to them already.
//
// dialTimeout is how long we attempt to connect to a peer before giving up
// minPeers is the minimum number of peers that the node should have
func NewPeerConnectionStrategy(
    pm *PeerManager,
    onlineChecker onlinechecker.OnlineChecker,
    dialTimeout time.Duration,
    logger *zap.Logger,
) (*PeerConnectionStrategy, error) {
    // cacheSize is the size of a TwoQueueCache
    cacheSize := 600
    cache, err := lru.New2Q(cacheSize)
    if err != nil {
        return nil, err
    }
    //
    pc := &PeerConnectionStrategy{
        cache:                  cache,
        dialTimeout:            dialTimeout,
        CommonDiscoveryService: service.NewCommonDiscoveryService(),
        onlineChecker:          onlineChecker,
        pm:                     pm,
        backoff:                getBackOff(),
        logger:                 logger.Named("discovery-connector"),
    }
    pm.SetPeerConnector(pc)
    return pc, nil
}

type connCacheData struct {
    nextTry time.Time
    strat   backoff.BackoffStrategy
}

// Subscribe receives channels on which discovered peers should be pushed
func (c *PeerConnectionStrategy) Subscribe(ctx context.Context, ch <-chan service.PeerData) {
    // if not running yet, store the subscription and return
    if err := c.ErrOnNotRunning(); err != nil {
        c.mux.Lock()
        c.subscriptions = append(c.subscriptions, subscription{ctx, ch})
        c.mux.Unlock()
        return
    }
    // if running start a goroutine to consume the subscription
    c.WaitGroup().Add(1)
    go func() {
        defer utils.LogOnPanic()
        defer c.WaitGroup().Done()
        c.consumeSubscription(subscription{ctx, ch})
    }()
}

func (c *PeerConnectionStrategy) consumeSubscription(s subscription) {
    for {
        // for returning from the loop when peerConnector is paused.
        select {
        case <-c.Context().Done():
            return
        case <-s.ctx.Done():
            return
        default:
        }
        //
        if !c.isPaused() {
            select {
            case <-c.Context().Done():
                return
            case <-s.ctx.Done():
                return
            case p, ok := <-s.ch:
                if !ok {
                    return
                }
                triggerImmediateConnection := false
                //Not connecting to peer as soon as it is discovered,
                // rather expecting this to be pushed from PeerManager based on the need.
                if len(c.host.Network().Peers()) < c.pm.OutPeersTarget {
                    triggerImmediateConnection = true
                }
                c.pm.AddDiscoveredPeer(p, triggerImmediateConnection)

            case <-time.After(1 * time.Second):
                // This timeout is to not lock the goroutine
                break
            }
        } else {
            time.Sleep(1 * time.Second) // sleep while the peerConnector is paused.
        }
    }
}

// SetHost sets the host to be able to mount or consume a protocol
func (c *PeerConnectionStrategy) SetHost(h host.Host) {
    c.host = h
}

// Start attempts to connect to the peers passed in by peerCh.
// Will not connect to peers if they are within the backoff period.
func (c *PeerConnectionStrategy) Start(ctx context.Context) error {
    return c.CommonDiscoveryService.Start(ctx, c.start)

}
func (c *PeerConnectionStrategy) start() error {
    c.WaitGroup().Add(1)

    go c.dialPeers()

    c.consumeSubscriptions()

    return nil
}

// Stop terminates the peer-connector
func (c *PeerConnectionStrategy) Stop() {
    c.CommonDiscoveryService.Stop(func() {})
}

func (c *PeerConnectionStrategy) isPaused() bool {
    return c.paused.Load()
}

func (c *PeerConnectionStrategy) SetPaused(paused bool) {
    c.paused.Store(paused)
}

// it might happen Subscribe is called before peerConnector has started so store these subscriptions in subscriptions array and custom after c.cancel is set.
func (c *PeerConnectionStrategy) consumeSubscriptions() {
    for _, subs := range c.subscriptions {
        c.WaitGroup().Add(1)
        go func(s subscription) {
            defer utils.LogOnPanic()
            defer c.WaitGroup().Done()
            c.consumeSubscription(s)
        }(subs)
    }
    c.subscriptions = nil
}

const maxActiveDials = 5

// c.cache is thread safe
// only reason why mutex is used: if canDialPeer is queried twice for the same peer.
func (c *PeerConnectionStrategy) canDialPeer(pi peer.AddrInfo) bool {
    c.mux.Lock()
    defer c.mux.Unlock()
    val, ok := c.cache.Get(pi.ID)
    if ok {
        tv := val.(*connCacheData)
        now := time.Now()
        if now.Before(tv.nextTry) {
            c.logger.Debug("Skipping connecting to peer due to backoff strategy",
                logging.UTCTime("currentTime", now), logging.UTCTime("until", tv.nextTry))
            return false
        }
        c.logger.Debug("Proceeding with connecting to peer",
            logging.UTCTime("currentTime", now), logging.UTCTime("nextTry", tv.nextTry))
    }
    return true
}

func (c *PeerConnectionStrategy) addConnectionBackoff(peerID peer.ID) {
    c.mux.Lock()
    defer c.mux.Unlock()
    val, ok := c.cache.Get(peerID)
    var cachedPeer *connCacheData
    if ok {
        tv := val.(*connCacheData)
        tv.nextTry = time.Now().Add(tv.strat.Delay())
    } else {
        cachedPeer = &connCacheData{strat: c.backoff()}
        cachedPeer.nextTry = time.Now().Add(cachedPeer.strat.Delay())
        c.logger.Debug("Initializing connectionCache for peer ",
            logging.HostID("peerID", peerID), logging.UTCTime("until", cachedPeer.nextTry))
        c.cache.Add(peerID, cachedPeer)
    }
}

func (c *PeerConnectionStrategy) dialPeers() {
    defer utils.LogOnPanic()
    defer c.WaitGroup().Done()

    maxGoRoutines := c.pm.OutPeersTarget
    if maxGoRoutines > maxActiveDials {
        maxGoRoutines = maxActiveDials
    }

    sem := make(chan struct{}, maxGoRoutines)

    for {
        select {
        case <-c.Context().Done():
            return
        case pd, ok := <-c.GetListeningChan():
            if !ok {
                return
            }

            if !c.onlineChecker.IsOnline() {
                continue
            }

            addrInfo := pd.AddrInfo

            if addrInfo.ID == c.host.ID() || addrInfo.ID == "" ||
                c.host.Network().Connectedness(addrInfo.ID) == network.Connected {
                continue
            }

            if c.canDialPeer(addrInfo) {
                sem <- struct{}{}
                c.WaitGroup().Add(1)
                go c.dialPeer(addrInfo, sem)
            }
        }
    }
}

func (c *PeerConnectionStrategy) dialPeer(pi peer.AddrInfo, sem chan struct{}) {
    defer utils.LogOnPanic()
    defer c.WaitGroup().Done()
    ctx, cancel := context.WithTimeout(c.Context(), c.dialTimeout)
    defer cancel()
    err := c.host.Connect(ctx, pi)
    if err != nil {
        c.pm.HandleDialError(err, pi.ID)
    } else {
        c.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(pi.ID)
    }
    <-sem
}