status-im/status-go

View on GitHub
peers/topicpool.go

Summary

Maintainability
A
3 hrs
Test Coverage
B
86%
package peers

import (
    "container/heap"
    "sync"
    "sync/atomic"
    "time"

    "github.com/ethereum/go-ethereum/log"
    "github.com/ethereum/go-ethereum/p2p"
    "github.com/ethereum/go-ethereum/p2p/discv5"
    "github.com/ethereum/go-ethereum/p2p/enode"

    "github.com/status-im/status-go/common"
    "github.com/status-im/status-go/discovery"
    "github.com/status-im/status-go/params"
)

const (
    // notQueuedIndex used to define that item is not queued in the heap queue.
    notQueuedIndex = -1
)

// maxCachedPeersMultiplier peers max limit will be multiplied by this number
// to get the maximum number of cached peers allowed.
var maxCachedPeersMultiplier = 1

// maxPendingPeersMultiplier peers max limit will be multiplied by this number
// to get the maximum number of pending peers allowed.
var maxPendingPeersMultiplier = 2

// TopicPoolInterface the TopicPool interface.
type TopicPoolInterface interface {
    StopSearch(server *p2p.Server)
    BelowMin() bool
    SearchRunning() bool
    StartSearch(server *p2p.Server) error
    ConfirmDropped(server *p2p.Server, nodeID enode.ID) bool
    AddPeerFromTable(server *p2p.Server) *discv5.Node
    MaxReached() bool
    ConfirmAdded(server *p2p.Server, nodeID enode.ID)
    isStopped() bool
    Topic() discv5.Topic
    SetLimits(limits params.Limits)
    setStopSearchTimeout(delay time.Duration)
    readyToStopSearch() bool
}

type Clock interface {
    Now() time.Time
}

type realClock struct{}

func (realClock) Now() time.Time { return time.Now() }

// newTopicPool returns instance of TopicPool.
func newTopicPool(discovery discovery.Discovery, topic discv5.Topic, limits params.Limits, slowMode, fastMode time.Duration, cache *Cache) *TopicPool {
    pool := TopicPool{
        discovery:            discovery,
        topic:                topic,
        limits:               limits,
        fastMode:             fastMode,
        slowMode:             slowMode,
        fastModeTimeout:      DefaultTopicFastModeTimeout,
        pendingPeers:         make(map[enode.ID]*peerInfoItem),
        discoveredPeersQueue: make(peerPriorityQueue, 0),
        discoveredPeers:      make(map[enode.ID]bool),
        connectedPeers:       make(map[enode.ID]*peerInfo),
        cache:                cache,
        maxCachedPeers:       limits.Max * maxCachedPeersMultiplier,
        maxPendingPeers:      limits.Max * maxPendingPeersMultiplier,
        clock:                realClock{},
    }
    heap.Init(&pool.discoveredPeersQueue)

    return &pool
}

// TopicPool manages peers for topic.
type TopicPool struct {
    discovery discovery.Discovery

    // configuration
    topic           discv5.Topic
    limits          params.Limits
    fastMode        time.Duration
    slowMode        time.Duration
    fastModeTimeout time.Duration

    mu     sync.RWMutex
    discWG sync.WaitGroup
    poolWG sync.WaitGroup
    quit   chan struct{}

    running int32

    currentMode           time.Duration
    period                chan time.Duration
    fastModeTimeoutCancel chan struct{}

    pendingPeers         map[enode.ID]*peerInfoItem // contains found and requested to be connected peers but not confirmed
    discoveredPeersQueue peerPriorityQueue          // priority queue to find the most recently discovered peers; does not containt peers requested to connect
    discoveredPeers      map[enode.ID]bool          // remembers which peers have already been discovered and are enqueued
    connectedPeers       map[enode.ID]*peerInfo     // currently connected peers

    stopSearchTimeout *time.Time

    maxPendingPeers int
    maxCachedPeers  int
    cache           *Cache

    clock Clock
}

func (t *TopicPool) addToPendingPeers(peer *peerInfo) {
    if _, ok := t.pendingPeers[peer.NodeID()]; ok {
        return
    }
    t.pendingPeers[peer.NodeID()] = &peerInfoItem{
        peerInfo: peer,
        index:    notQueuedIndex,
    }

    // maxPendingPeers = 0 means no limits.
    if t.maxPendingPeers == 0 || t.maxPendingPeers >= len(t.pendingPeers) {
        return
    }

    var oldestPeer *peerInfo
    for _, i := range t.pendingPeers {
        if oldestPeer != nil && oldestPeer.discoveredTime.Before(i.peerInfo.discoveredTime) {
            continue
        }

        oldestPeer = i.peerInfo
    }

    t.removeFromPendingPeers(oldestPeer.NodeID())
}

// addToQueue adds the passed peer to the queue if it is already pending.
func (t *TopicPool) addToQueue(peer *peerInfo) {
    if p, ok := t.pendingPeers[peer.NodeID()]; ok {
        if _, ok := t.discoveredPeers[peer.NodeID()]; ok {
            return
        }

        heap.Push(&t.discoveredPeersQueue, p)
        t.discoveredPeers[peer.NodeID()] = true
    }
}

func (t *TopicPool) popFromQueue() *peerInfo {
    if t.discoveredPeersQueue.Len() == 0 {
        return nil
    }
    item := heap.Pop(&t.discoveredPeersQueue).(*peerInfoItem)
    item.index = notQueuedIndex
    delete(t.discoveredPeers, item.peerInfo.NodeID())
    return item.peerInfo
}

func (t *TopicPool) removeFromPendingPeers(nodeID enode.ID) {
    peer, ok := t.pendingPeers[nodeID]
    if !ok {
        return
    }
    delete(t.pendingPeers, nodeID)
    if peer.index != notQueuedIndex {
        heap.Remove(&t.discoveredPeersQueue, peer.index)
        delete(t.discoveredPeers, nodeID)
    }
}

func (t *TopicPool) updatePendingPeer(nodeID enode.ID) {
    peer, ok := t.pendingPeers[nodeID]
    if !ok {
        return
    }
    peer.discoveredTime = t.clock.Now()
    if peer.index != notQueuedIndex {
        heap.Fix(&t.discoveredPeersQueue, peer.index)
    }
}

func (t *TopicPool) movePeerFromPoolToConnected(nodeID enode.ID) {
    peer, ok := t.pendingPeers[nodeID]
    if !ok {
        return
    }
    t.removeFromPendingPeers(nodeID)
    t.connectedPeers[nodeID] = peer.peerInfo
}

// SearchRunning returns true if search is running
func (t *TopicPool) SearchRunning() bool {
    return atomic.LoadInt32(&t.running) == 1
}

// MaxReached returns true if we connected with max number of peers.
func (t *TopicPool) MaxReached() bool {
    t.mu.RLock()
    defer t.mu.RUnlock()
    return len(t.connectedPeers) == t.limits.Max
}

// BelowMin returns true if current number of peers is below min limit.
func (t *TopicPool) BelowMin() bool {
    t.mu.RLock()
    defer t.mu.RUnlock()
    return len(t.connectedPeers) < t.limits.Min
}

// maxCachedPeersReached returns true if max number of cached peers is reached.
func (t *TopicPool) maxCachedPeersReached() bool {
    if t.maxCachedPeers == 0 {
        return true
    }
    peers := t.cache.GetPeersRange(t.topic, t.maxCachedPeers)

    return len(peers) >= t.maxCachedPeers
}

// setStopSearchTimeout sets the timeout to stop current topic search if it's not
// been stopped before.
func (t *TopicPool) setStopSearchTimeout(delay time.Duration) {
    if t.stopSearchTimeout != nil {
        return
    }
    now := t.clock.Now().Add(delay)
    t.stopSearchTimeout = &now
}

// isStopSearchDelayExpired returns true if the timeout to stop current topic
// search has been accomplished.
func (t *TopicPool) isStopSearchDelayExpired() bool {
    if t.stopSearchTimeout == nil {
        return false
    }
    return t.stopSearchTimeout.Before(t.clock.Now())
}

// readyToStopSearch return true if all conditions to stop search are ok.
func (t *TopicPool) readyToStopSearch() bool {
    return t.isStopSearchDelayExpired() || t.maxCachedPeersReached()
}

// updateSyncMode changes the sync mode depending on the current number
// of connected peers and limits.
func (t *TopicPool) updateSyncMode() {
    newMode := t.slowMode
    if len(t.connectedPeers) < t.limits.Min {
        newMode = t.fastMode
    }
    t.setSyncMode(newMode)
}

func (t *TopicPool) setSyncMode(mode time.Duration) {
    if mode == t.currentMode {
        return
    }

    t.period <- mode
    t.currentMode = mode

    // if selected mode is fast mode and fast mode timeout was not set yet,
    // do it now
    if mode == t.fastMode && t.fastModeTimeoutCancel == nil {
        t.fastModeTimeoutCancel = t.limitFastMode(t.fastModeTimeout)
    }
    // remove fast mode timeout as slow mode is selected now
    if mode == t.slowMode && t.fastModeTimeoutCancel != nil {
        close(t.fastModeTimeoutCancel)
        t.fastModeTimeoutCancel = nil
    }
}

func (t *TopicPool) limitFastMode(timeout time.Duration) chan struct{} {
    if timeout == 0 {
        return nil
    }

    cancel := make(chan struct{})

    t.poolWG.Add(1)
    go func() {
        defer common.LogOnPanic()
        defer t.poolWG.Done()

        select {
        case <-time.After(timeout):
            t.mu.Lock()
            t.setSyncMode(t.slowMode)
            t.mu.Unlock()
        case <-cancel:
            return
        }
    }()

    return cancel
}

// ConfirmAdded called when peer was added by p2p Server.
//  1. Skip a peer if it not in our peer table
//  2. Add a peer to a cache.
//  3. Disconnect a peer if it was connected after we reached max limit of peers.
//     (we can't know in advance if peer will be connected, thats why we allow
//     to overflow for short duration)
//  4. Switch search to slow mode if it is running.
func (t *TopicPool) ConfirmAdded(server *p2p.Server, nodeID enode.ID) {
    t.mu.Lock()
    defer t.mu.Unlock()

    peerInfoItem, ok := t.pendingPeers[nodeID]
    inbound := !ok || !peerInfoItem.added

    log.Debug("peer added event", "peer", nodeID.String(), "inbound", inbound)

    if inbound {
        return
    }

    peer := peerInfoItem.peerInfo // get explicit reference

    // established connection means that the node
    // is a viable candidate for a connection and can be cached
    if err := t.cache.AddPeer(peer.node, t.topic); err != nil {
        log.Error("failed to persist a peer", "error", err)
    }

    t.movePeerFromPoolToConnected(nodeID)
    // if the upper limit is already reached, drop this peer
    if len(t.connectedPeers) > t.limits.Max {
        log.Debug("max limit is reached drop the peer", "ID", nodeID, "topic", t.topic)
        peer.dismissed = true
        t.removeServerPeer(server, peer)
        return
    }

    // make sure `dismissed` is reset
    peer.dismissed = false

    // A peer was added so check if we can switch to slow mode.
    if t.SearchRunning() {
        t.updateSyncMode()
    }
}

// ConfirmDropped called when server receives drop event.
// 1. Skip peer if it is not in our peer table.
// 2. If disconnect request - we could drop that peer ourselves.
// 3. If connected number will drop below min limit - switch to fast mode.
// 4. Delete a peer from cache and peer table.
// Returns false if peer is not in our table or we requested removal of this peer.
// Otherwise peer is removed and true is returned.
func (t *TopicPool) ConfirmDropped(server *p2p.Server, nodeID enode.ID) bool {
    t.mu.Lock()
    defer t.mu.Unlock()

    // either inbound or connected from another topic
    peer, exist := t.connectedPeers[nodeID]
    if !exist {
        return false
    }

    log.Debug("disconnect", "ID", nodeID, "dismissed", peer.dismissed)

    delete(t.connectedPeers, nodeID)
    // Peer was removed by us because exceeded the limit.
    // Add it back to the pool as it can be useful in the future.
    if peer.dismissed {
        t.addToPendingPeers(peer)
        // use queue for peers that weren't added to p2p server
        t.addToQueue(peer)
        return false
    }

    // If there was a network error, this event will be received
    // but the peer won't be removed from the static nodes set.
    // That's why we need to call `removeServerPeer` manually.
    t.removeServerPeer(server, peer)

    if err := t.cache.RemovePeer(nodeID, t.topic); err != nil {
        log.Error("failed to remove peer from cache", "error", err)
    }

    // As we removed a peer, update a sync strategy if needed.
    if t.SearchRunning() {
        t.updateSyncMode()
    }

    return true
}

// AddPeerFromTable checks if there is a valid peer in local table and adds it to a server.
func (t *TopicPool) AddPeerFromTable(server *p2p.Server) *discv5.Node {
    t.mu.RLock()
    defer t.mu.RUnlock()

    // The most recently added peer is removed from the queue.
    // If it did not expire yet, it will be added to the server.
    // TODO(adam): investigate if it's worth to keep the peer in the queue
    // until the server confirms it is added and in the meanwhile only adjust its priority.
    peer := t.popFromQueue()
    if peer != nil && t.clock.Now().Before(peer.discoveredTime.Add(expirationPeriod)) {
        t.addServerPeer(server, peer)
        return peer.node
    }

    return nil
}

// StartSearch creates discv5 queries and runs a loop to consume found peers.
func (t *TopicPool) StartSearch(server *p2p.Server) error {
    if atomic.LoadInt32(&t.running) == 1 {
        return nil
    }
    if !t.discovery.Running() {
        return ErrDiscv5NotRunning
    }
    atomic.StoreInt32(&t.running, 1)

    t.mu.Lock()
    defer t.mu.Unlock()

    t.quit = make(chan struct{})
    t.stopSearchTimeout = nil

    // `period` is used to notify about the current sync mode.
    t.period = make(chan time.Duration, 2)
    // use fast sync mode at the beginning
    t.setSyncMode(t.fastMode)

    // peers management
    found := make(chan *discv5.Node, 5) // 5 reasonable number for concurrently found nodes
    lookup := make(chan bool, 10)       // sufficiently buffered channel, just prevents blocking because of lookup

    for _, peer := range t.cache.GetPeersRange(t.topic, 5) {
        log.Debug("adding a peer from cache", "peer", peer)
        found <- peer
    }

    t.discWG.Add(1)
    go func() {
        defer common.LogOnPanic()
        if err := t.discovery.Discover(string(t.topic), t.period, found, lookup); err != nil {
            log.Error("error searching foro", "topic", t.topic, "err", err)
        }
        t.discWG.Done()
    }()
    t.poolWG.Add(1)
    go func() {
        defer common.LogOnPanic()
        t.handleFoundPeers(server, found, lookup)
        t.poolWG.Done()
    }()

    return nil
}

func (t *TopicPool) handleFoundPeers(server *p2p.Server, found <-chan *discv5.Node, lookup <-chan bool) {
    selfID := discv5.PubkeyID(server.Self().Pubkey())
    for {
        select {
        case <-t.quit:
            return
        case <-lookup:
        case node := <-found:
            if node.ID == selfID {
                continue
            }
            if err := t.processFoundNode(server, node); err != nil {
                log.Error("failed to process found node", "node", node, "error", err)
            }
        }
    }
}

// processFoundNode called when node is discovered by kademlia search query
// 2 important conditions
//  1. every time when node is processed we need to update discoveredTime.
//     peer will be considered as valid later only if it was discovered < 60m ago
//  2. if peer is connected or if max limit is reached we are not a adding peer to p2p server
func (t *TopicPool) processFoundNode(server *p2p.Server, node *discv5.Node) error {
    t.mu.Lock()
    defer t.mu.Unlock()

    pk, err := node.ID.Pubkey()
    if err != nil {
        return err
    }

    nodeID := enode.PubkeyToIDV4(pk)

    log.Debug("peer found", "ID", nodeID, "topic", t.topic)

    // peer is already connected so update only discoveredTime
    if peer, ok := t.connectedPeers[nodeID]; ok {
        peer.discoveredTime = t.clock.Now()
        return nil
    }

    if _, ok := t.pendingPeers[nodeID]; ok {
        t.updatePendingPeer(nodeID)
    } else {
        t.addToPendingPeers(&peerInfo{
            discoveredTime: t.clock.Now(),
            node:           node,
            publicKey:      pk,
        })
    }
    log.Debug(
        "adding peer to a server", "peer", node.ID.String(),
        "connected", len(t.connectedPeers), "max", t.maxCachedPeers)

    // This can happen when the monotonic clock is not precise enough and
    // multiple peers gets added at the same clock time, resulting in all
    // of them having the same discoveredTime.
    // At which point a random peer will be removed, sometimes being the
    // peer we just added.
    // We could make sure that the latest added peer is not removed,
    // but this is simpler, and peers will be fresh enough as resolution
    // should be quite high (ms at least).
    // This has been reported on windows builds
    // only https://github.com/status-im/nim-status-client/issues/522
    if t.pendingPeers[nodeID] == nil {
        log.Debug("peer added has just been removed", "peer", nodeID)
        return nil
    }

    // the upper limit is not reached, so let's add this peer
    if len(t.connectedPeers) < t.maxCachedPeers {
        t.addServerPeer(server, t.pendingPeers[nodeID].peerInfo)
    } else {
        t.addToQueue(t.pendingPeers[nodeID].peerInfo)
    }

    return nil
}

func (t *TopicPool) addServerPeer(server *p2p.Server, info *peerInfo) {
    info.added = true
    n := enode.NewV4(info.publicKey, info.node.IP, int(info.node.TCP), int(info.node.UDP))
    server.AddPeer(n)
}

func (t *TopicPool) removeServerPeer(server *p2p.Server, info *peerInfo) {
    info.added = false
    n := enode.NewV4(info.publicKey, info.node.IP, int(info.node.TCP), int(info.node.UDP))
    server.RemovePeer(n)
}

func (t *TopicPool) isStopped() bool {
    t.mu.Lock()
    defer t.mu.Unlock()
    return t.currentMode == 0
}

// StopSearch stops the closes stop
func (t *TopicPool) StopSearch(server *p2p.Server) {
    if !atomic.CompareAndSwapInt32(&t.running, 1, 0) {
        return
    }
    if t.quit == nil {
        return
    }
    select {
    case <-t.quit:
        return
    default:
    }
    log.Debug("stoping search", "topic", t.topic)
    close(t.quit)
    t.mu.Lock()
    if t.fastModeTimeoutCancel != nil {
        close(t.fastModeTimeoutCancel)
        t.fastModeTimeoutCancel = nil
    }
    t.currentMode = 0
    t.mu.Unlock()
    // wait for poolWG to exit because it writes to period channel
    t.poolWG.Wait()
    close(t.period)
    t.discWG.Wait()
}

// Topic exposes the internal discovery topic.
func (t *TopicPool) Topic() discv5.Topic {
    return t.topic
}

// SetLimits set the limits for the current TopicPool.
func (t *TopicPool) SetLimits(limits params.Limits) {
    t.mu.Lock()
    defer t.mu.Unlock()

    t.limits = limits
}