Fantom-foundation/go-lachesis

View on GitHub
gossip/handler.go

Summary

Maintainability
F
5 days
Test Coverage
package gossip

import (
    "fmt"
    "math"
    "sync"
    "sync/atomic"
    "time"

    "github.com/ethereum/go-ethereum/common"
    "github.com/ethereum/go-ethereum/core/types"
    notify "github.com/ethereum/go-ethereum/event"
    "github.com/ethereum/go-ethereum/log"
    "github.com/ethereum/go-ethereum/p2p"
    "github.com/ethereum/go-ethereum/p2p/enode"
    "github.com/ethereum/go-ethereum/rlp"

    "github.com/Fantom-foundation/go-lachesis/eventcheck"
    "github.com/Fantom-foundation/go-lachesis/evmcore"
    "github.com/Fantom-foundation/go-lachesis/gossip/fetcher"
    "github.com/Fantom-foundation/go-lachesis/gossip/ordering"
    "github.com/Fantom-foundation/go-lachesis/gossip/packsdownloader"
    "github.com/Fantom-foundation/go-lachesis/hash"
    "github.com/Fantom-foundation/go-lachesis/inter"
    "github.com/Fantom-foundation/go-lachesis/inter/idx"
    "github.com/Fantom-foundation/go-lachesis/logger"
)

const (
    softResponseLimitSize = 2 * 1024 * 1024    // Target maximum size of returned events, or other data.
    softLimitItems        = 250                // Target maximum number of events or transactions to request/response
    hardLimitItems        = softLimitItems * 4 // Maximum number of events or transactions to request/response

    // txChanSize is the size of channel listening to NewTxsNotify.
    // The number is referenced from the size of tx pool.
    txChanSize = 4096

    // the maximum number of events in the ordering buffer
    eventsBuffSize = 1500
)

func errResp(code errCode, format string, v ...interface{}) error {
    return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
}

func checkLenLimits(size int, v interface{}) error {
    if size <= 0 {
        return errResp(ErrEmptyMessage, "%v", v)
    }
    if size > hardLimitItems {
        return errResp(ErrMsgTooLarge, "%v", v)
    }
    return nil
}

type dagNotifier interface {
    SubscribeNewEpoch(ch chan<- idx.Epoch) notify.Subscription
    SubscribeNewPack(ch chan<- idx.Pack) notify.Subscription
    SubscribeNewEmitted(ch chan<- *inter.Event) notify.Subscription
}

type ProtocolManager struct {
    config *Config

    synced uint32 // Flag whether we're considered synchronised (enables transaction processing, events broadcasting)

    txpool   txPool
    maxPeers int

    peers *peerSet

    serverPool *serverPool

    txsCh  chan evmcore.NewTxsNotify
    txsSub notify.Subscription

    downloader *packsdownloader.PacksDownloader
    fetcher    *fetcher.Fetcher
    buffer     *ordering.EventBuffer

    store    *Store
    engine   Consensus
    engineMu *sync.RWMutex

    isMigration func() bool

    notifier         dagNotifier
    emittedEventsCh  chan *inter.Event
    emittedEventsSub notify.Subscription
    newPacksCh       chan idx.Pack
    newPacksSub      notify.Subscription
    newEpochsCh      chan idx.Epoch
    newEpochsSub     notify.Subscription

    // channels for fetcher, syncer, txsyncLoop
    newPeerCh   chan *peer
    txsyncCh    chan *txsync
    quitSync    chan struct{}
    noMorePeers chan struct{}

    // wait group is used for graceful shutdowns during downloading
    // and processing
    loopsWg sync.WaitGroup
    wg      sync.WaitGroup

    logger.Instance
}

// NewProtocolManager returns a new Fantom sub protocol manager. The Fantom sub protocol manages peers capable
// with the Fantom network.
func NewProtocolManager(
    config *Config,
    notifier dagNotifier,
    txpool txPool,
    engineMu *sync.RWMutex,
    checkers *eventcheck.Checkers,
    s *Store,
    engine Consensus,
    serverPool *serverPool,
    isMigration func() bool,
) (
    *ProtocolManager,
    error,
) {
    // Create the protocol manager with the base fields
    pm := &ProtocolManager{
        config:      config,
        notifier:    notifier,
        txpool:      txpool,
        store:       s,
        engine:      engine,
        peers:       newPeerSet(),
        serverPool:  serverPool,
        engineMu:    engineMu,
        newPeerCh:   make(chan *peer),
        noMorePeers: make(chan struct{}),
        txsyncCh:    make(chan *txsync),
        quitSync:    make(chan struct{}),
        isMigration: isMigration,

        Instance: logger.MakeInstance(),
    }

    pm.SetName("PM")

    pm.fetcher, pm.buffer = pm.makeFetcher(checkers)
    pm.downloader = packsdownloader.New(pm.fetcher, pm.onlyNotConnectedEvents, pm.removePeer)

    return pm, nil
}

func (pm *ProtocolManager) makeFetcher(checkers *eventcheck.Checkers) (*fetcher.Fetcher, *ordering.EventBuffer) {
    // checkers
    firstCheck := func(e *inter.Event) error {
        if err := checkers.Basiccheck.Validate(e); err != nil {
            return err
        }
        if err := checkers.Epochcheck.Validate(e); err != nil {
            return err
        }
        return nil
    }
    bufferedCheck := func(e *inter.Event, parents []*inter.EventHeaderData) error {
        var selfParent *inter.EventHeaderData
        if e.SelfParent() != nil {
            selfParent = parents[0]
        }
        if err := checkers.Parentscheck.Validate(e, parents); err != nil {
            return err
        }
        if err := checkers.Gaspowercheck.Validate(e, selfParent); err != nil {
            return err
        }
        return nil
    }

    // DAG callbacks
    buffer := ordering.New(eventsBuffSize, ordering.Callback{

        Process: func(e *inter.Event) error {
            now := time.Now()
            pm.engineMu.Lock()
            defer pm.engineMu.Unlock()

            start := time.Now()
            err := pm.engine.ProcessEvent(e)
            if err != nil {
                return err
            }
            log.Info("New event", "id", e.Hash(), "parents", len(e.Parents), "by", e.Creator, "frame", inter.FmtFrame(e.Frame, e.IsRoot), "txs", e.Transactions.Len(), "t", time.Since(start))

            // If the event is indeed in our own graph, announce it
            if atomic.LoadUint32(&pm.synced) != 0 { // announce only if synced up
                passedSinceEvent := now.Sub(e.ClaimedTime.Time())
                pm.BroadcastEvent(e, passedSinceEvent)
            }

            return nil
        },

        Drop: func(e *inter.Event, peer string, err error) {
            if eventcheck.IsBan(err) {
                log.Warn("Incoming event rejected", "event", e.Hash().String(), "creator", e.Creator, "err", err)
                pm.removePeer(peer)
            }
        },

        Exists: func(id hash.Event) bool {
            return pm.store.HasEventHeader(id)
        },

        Get: func(id hash.Event) *inter.EventHeaderData {
            return pm.store.GetEventHeader(id.Epoch(), id)
        },

        Check: bufferedCheck,
    })

    newFetcher := fetcher.New(fetcher.Callback{
        PushEvent:      buffer.PushEvent,
        OnlyInterested: pm.onlyInterestedEvents,
        DropPeer:       pm.removePeer,
        FirstCheck:     firstCheck,
        HeavyCheck:     checkers.Heavycheck,
    })
    return newFetcher, buffer
}

func (pm *ProtocolManager) onlyNotConnectedEvents(ids hash.Events) hash.Events {
    if len(ids) == 0 {
        return ids
    }

    notConnected := make(hash.Events, 0, len(ids))
    for _, id := range ids {
        if pm.store.HasEventHeader(id) {
            continue
        }
        notConnected.Add(id)
    }
    return notConnected
}

func (pm *ProtocolManager) onlyInterestedEvents(ids hash.Events) hash.Events {
    if len(ids) == 0 {
        return ids
    }
    epoch := pm.engine.GetEpoch()

    interested := make(hash.Events, 0, len(ids))
    for _, id := range ids {
        if id.Epoch() != epoch {
            continue
        }
        if pm.buffer.IsBuffered(id) || pm.store.HasEventHeader(id) {
            continue
        }
        interested.Add(id)
    }
    return interested
}

func (pm *ProtocolManager) makeProtocol(version uint) p2p.Protocol {
    length, ok := protocolLengths[version]
    if !ok {
        panic("makeProtocol for unknown version")
    }

    return p2p.Protocol{
        Name:    protocolName,
        Version: version,
        Length:  length,
        Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
            var entry *poolEntry
            peer := pm.newPeer(int(version), p, rw)
            if pm.serverPool != nil {
                entry = pm.serverPool.connect(peer, peer.Node())
            }
            peer.poolEntry = entry
            select {
            case pm.newPeerCh <- peer:
                pm.wg.Add(1)
                defer pm.wg.Done()
                err := pm.handle(peer)
                if entry != nil {
                    pm.serverPool.disconnect(entry)
                }
                return err
            case <-pm.quitSync:
                if entry != nil {
                    pm.serverPool.disconnect(entry)
                }
                return p2p.DiscQuitting
            }
        },
        NodeInfo: func() interface{} {
            return pm.NodeInfo()
        },
        PeerInfo: func(id enode.ID) interface{} {
            if p := pm.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
                return p.Info()
            }
            return nil
        },
    }
}

func (pm *ProtocolManager) removePeer(id string) {
    // Short circuit if the peer was already removed
    peer := pm.peers.Peer(id)
    if peer == nil {
        return
    }
    log.Debug("Removing peer", "peer", id)

    // Unregister the peer from the downloader and peer set
    _ = pm.downloader.UnregisterPeer(id)
    if err := pm.peers.Unregister(id); err != nil {
        log.Error("Peer removal failed", "peer", id, "err", err)
    }
    // Hard disconnect at the networking layer
    if peer != nil {
        peer.Peer.Disconnect(p2p.DiscUselessPeer)
    }
}

func (pm *ProtocolManager) Start(maxPeers int) {
    pm.maxPeers = maxPeers

    // broadcast transactions
    pm.txsCh = make(chan evmcore.NewTxsNotify, txChanSize)
    pm.txsSub = pm.txpool.SubscribeNewTxsNotify(pm.txsCh)

    pm.loopsWg.Add(1)
    go pm.txBroadcastLoop()

    if pm.notifier != nil {
        // broadcast mined events
        pm.emittedEventsCh = make(chan *inter.Event, 4)
        pm.emittedEventsSub = pm.notifier.SubscribeNewEmitted(pm.emittedEventsCh)
        // broadcast packs
        pm.newPacksCh = make(chan idx.Pack, 4)
        pm.newPacksSub = pm.notifier.SubscribeNewPack(pm.newPacksCh)
        // epoch changes
        pm.newEpochsCh = make(chan idx.Epoch, 4)
        pm.newEpochsSub = pm.notifier.SubscribeNewEpoch(pm.newEpochsCh)

        pm.loopsWg.Add(3)
        go pm.emittedBroadcastLoop()
        go pm.progressBroadcastLoop()
        go pm.onNewEpochLoop()
    }

    // start sync handlers
    go pm.syncer()
    go pm.txsyncLoop()
    pm.fetcher.Start()
}

func (pm *ProtocolManager) Stop() {
    log.Info("Stopping Fantom protocol")

    pm.downloader.Terminate()
    pm.fetcher.Stop()

    pm.txsSub.Unsubscribe() // quits txBroadcastLoop
    if pm.notifier != nil {
        pm.emittedEventsSub.Unsubscribe() // quits eventBroadcastLoop
        pm.newPacksSub.Unsubscribe()      // quits progressBroadcastLoop
        pm.newEpochsSub.Unsubscribe()     // quits onNewEpochLoop
    }
    // Wait for the subscription loops to come down.
    pm.loopsWg.Wait()

    // Quit the sync loop.
    // After this send has completed, no new peers will be accepted.
    pm.noMorePeers <- struct{}{}
    close(pm.quitSync)

    // Disconnect existing sessions.
    // This also closes the gate for any new registrations on the peer set.
    // sessions which are already established but not added to pm.peers yet
    // will exit when they try to register.
    pm.peers.Close()

    // Wait for all peer handler goroutines to come down.
    pm.wg.Wait()

    log.Info("Fantom protocol stopped")
}

func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
    return newPeer(pv, p, rw)
}

func (pm *ProtocolManager) myProgress() PeerProgress {
    blockI, block := pm.engine.LastBlock()
    epoch := pm.engine.GetEpoch()
    return PeerProgress{
        Epoch:        epoch,
        NumOfBlocks:  blockI,
        LastBlock:    block,
        LastPackInfo: pm.store.GetPackInfoOrDefault(epoch, pm.store.GetPacksNumOrDefault(epoch)-1),
    }
}

func (pm *ProtocolManager) highestPeerProgress() PeerProgress {
    peers := pm.peers.List()
    max := pm.myProgress()
    for _, peer := range peers {
        if max.NumOfBlocks < peer.progress.NumOfBlocks {
            max = peer.progress
        }
    }
    return max
}

// handle is the callback invoked to manage the life cycle of a peer. When
// this function terminates, the peer is disconnected.
func (pm *ProtocolManager) handle(p *peer) error {
    // Ignore maxPeers if this is a trusted peer
    if pm.peers.Len() >= pm.maxPeers && !p.Peer.Info().Network.Trusted {
        return p2p.DiscTooManyPeers
    }
    p.Log().Debug("Peer connected", "name", p.Name())

    // Execute the handshake
    var (
        genesis    = pm.engine.GetGenesisHash()
        myProgress = pm.myProgress()
    )
    if err := p.Handshake(pm.config.Net.NetworkID, myProgress, genesis); err != nil {
        p.Log().Debug("Handshake failed", "err", err)
        return err
    }
    //if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
    //    rw.Init(p.version)
    //}
    // Register the peer locally
    if err := pm.peers.Register(p); err != nil {
        p.Log().Warn("Peer registration failed", "err", err)
        return err
    }
    defer pm.removePeer(p.id)

    // Propagate existing transactions. new transactions appearing
    // after this will be sent via broadcasts.
    pm.syncTransactions(p)

    // Handle incoming messages until the connection is torn down
    for {
        if err := pm.handleMsg(p); err != nil {
            p.Log().Debug("Message handling failed", "err", err)
            return err
        }
    }
}

// handleMsg is invoked whenever an inbound message is received from a remote
// peer. The remote connection is torn down upon returning any error.
func (pm *ProtocolManager) handleMsg(p *peer) error {
    // Read the next message from the remote peer, and ensure it's fully consumed
    msg, err := p.rw.ReadMsg()
    if err != nil {
        return err
    }
    if msg.Size > protocolMaxMsgSize {
        return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, protocolMaxMsgSize)
    }
    defer msg.Discard()

    myEpoch := pm.engine.GetEpoch()
    peerDwnlr := pm.downloader.Peer(p.id)

    // Handle the message depending on its contents
    switch {
    case msg.Code == EthStatusMsg:
        // Status messages should never arrive after the handshake
        return errResp(ErrExtraStatusMsg, "uncontrolled status message")

    case msg.Code == ProgressMsg:
        if pm.isMigration() {
            break
        }
        var progress PeerProgress
        if err := msg.Decode(&progress); err != nil {
            return errResp(ErrDecode, "%v: %v", msg, err)
        }
        p.SetProgress(progress)
        if progress.Epoch == myEpoch {
            atomic.StoreUint32(&pm.synced, 1) // Mark initial sync done on any peer which has the same epoch
        }

        // notify downloader about new peer's epoch
        _ = pm.downloader.RegisterPeer(packsdownloader.Peer{
            ID:               p.id,
            Epoch:            p.progress.Epoch,
            RequestPack:      p.RequestPack,
            RequestPackInfos: p.RequestPackInfos,
        }, myEpoch)
        peerDwnlr = pm.downloader.Peer(p.id)

        if peerDwnlr != nil && progress.LastPackInfo.Index > 0 {
            _ = peerDwnlr.NotifyPackInfo(p.progress.Epoch, progress.LastPackInfo.Index, progress.LastPackInfo.Heads, time.Now())
        }

    case msg.Code == NewEventHashesMsg:
        if pm.isMigration() {
            break
        }
        if pm.fetcher.Overloaded() {
            break
        }
        // Fresh events arrived, make sure we have a valid and fresh graph to handle them
        if atomic.LoadUint32(&pm.synced) == 0 {
            break
        }
        var announces hash.Events
        if err := msg.Decode(&announces); err != nil {
            return errResp(ErrDecode, "%v: %v", msg, err)
        }
        if err := checkLenLimits(len(announces), announces); err != nil {
            return err
        }
        if len(announces) > 0 && announces[0].Lamport() > pm.store.GetHighestLamport()+eventsBuffSize {
            break
        }
        // Mark the hashes as present at the remote node
        for _, id := range announces {
            p.MarkEvent(id)
        }
        // Schedule all the unknown hashes for retrieval
        _ = pm.fetcher.Notify(p.id, announces, time.Now(), p.RequestEvents)

    case msg.Code == EventsMsg:
        if pm.isMigration() {
            break
        }
        slept := time.Duration(0)
        for pm.fetcher.Overloaded() && slept < time.Second {
            time.Sleep(time.Millisecond * 5)
            slept += time.Millisecond * 5
        }
        if slept >= time.Second {
            break
        }
        var events []*inter.Event
        if err := msg.Decode(&events); err != nil {
            return errResp(ErrDecode, "%v: %v", msg, err)
        }
        if err := checkLenLimits(len(events), events); err != nil {
            return err
        }
        if len(events) > 0 && events[0].Lamport > pm.store.GetHighestLamport()+eventsBuffSize {
            break
        }
        // Mark the hashes as present at the remote node
        for _, e := range events {
            p.MarkEvent(e.Hash())
        }
        _ = pm.fetcher.Enqueue(p.id, events, time.Now(), p.RequestEvents)

    case msg.Code == EvmTxMsg:
        if pm.isMigration() {
            break
        }
        // Transactions arrived, make sure we have a valid and fresh graph to handle them
        if atomic.LoadUint32(&pm.synced) == 0 {
            break
        }
        // Transactions can be processed, parse all of them and deliver to the pool
        var txs []*types.Transaction
        if err := msg.Decode(&txs); err != nil {
            return errResp(ErrDecode, "msg %v: %v", msg, err)
        }
        for i, tx := range txs {
            // Validate and mark the remote transaction
            if tx == nil {
                return errResp(ErrDecode, "transaction %d is nil", i)
            }
            p.MarkTransaction(tx.Hash())
        }
        pm.txpool.AddRemotes(txs)

    case msg.Code == GetEventsMsg:
        var requests hash.Events
        if err := msg.Decode(&requests); err != nil {
            return errResp(ErrDecode, "%v: %v", msg, err)
        }
        if err := checkLenLimits(len(requests), requests); err != nil {
            return err
        }

        rawEvents := make([]rlp.RawValue, 0, len(requests))
        ids := make(hash.Events, 0, len(requests))
        size := 0
        for _, id := range requests {
            if raw := pm.store.GetEventRLP(id); raw != nil {
                rawEvents = append(rawEvents, raw)
                ids = append(ids, id)
                size += len(raw)
            } else {
                pm.Log.Debug("requested event not found", "hash", id)
            }
            if size >= softResponseLimitSize {
                break
            }
        }
        if len(rawEvents) != 0 {
            _ = p.SendEventsRLP(rawEvents, ids)
        }

    case msg.Code == GetPackInfosMsg:
        var request getPackInfosData
        if err := msg.Decode(&request); err != nil {
            return errResp(ErrDecode, "%v: %v", msg, err)
        }
        if err := checkLenLimits(len(request.Indexes), request); err != nil {
            return err
        }

        packsNum, ok := pm.store.GetPacksNum(request.Epoch)
        if !ok {
            // no packs in the requested epoch
            break
        }

        rawPackInfos := make([]rlp.RawValue, 0, len(request.Indexes))
        size := 0
        for _, index := range request.Indexes {
            if index >= packsNum {
                // return only pinned and existing packs
                continue
            }

            if raw := pm.store.GetPackInfoRLP(request.Epoch, index); raw != nil {
                rawPackInfos = append(rawPackInfos, raw)
                size += len(raw)
            }
            if size >= softResponseLimitSize {
                break
            }
        }
        if len(rawPackInfos) != 0 {
            _ = p.SendPackInfosRLP(&packInfosDataRLP{
                Epoch:           request.Epoch,
                TotalNumOfPacks: packsNum,
                RawInfos:        rawPackInfos,
            })
        }

    case msg.Code == GetPackMsg:
        var request getPackData
        if err := msg.Decode(&request); err != nil {
            return errResp(ErrDecode, "%v: %v", msg, err)
        }

        if request.Epoch > myEpoch {
            // short circuit if future epoch
            break
        }

        ids := make(hash.Events, 0, softLimitItems)
        for i, id := range pm.store.GetPack(request.Epoch, request.Index) {
            ids = append(ids, id)
            if i >= softLimitItems {
                break
            }
        }
        if len(ids) != 0 {
            _ = p.SendPack(&packData{
                Epoch: request.Epoch,
                Index: request.Index,
                IDs:   ids,
            })
        }

    case msg.Code == PackInfosMsg:
        if peerDwnlr == nil {
            break
        }
        if pm.isMigration() {
            break
        }

        var infos packInfosData
        if err := msg.Decode(&infos); err != nil {
            return errResp(ErrDecode, "%v: %v", msg, err)
        }
        if err := checkLenLimits(len(infos.Infos), infos); err != nil {
            return err
        }

        // notify about number of packs this peer has
        _ = peerDwnlr.NotifyPacksNum(infos.Epoch, infos.TotalNumOfPacks)

        for _, info := range infos.Infos {
            if len(info.Heads) == 0 {
                return errResp(ErrEmptyMessage, "%v", msg)
            }
            // Mark the hashes as present at the remote node
            for _, id := range info.Heads {
                p.MarkEvent(id)
            }
            // Notify downloader about new packInfo
            _ = peerDwnlr.NotifyPackInfo(infos.Epoch, info.Index, info.Heads, time.Now())
        }

    case msg.Code == PackMsg:
        if peerDwnlr == nil {
            break
        }
        if pm.isMigration() {
            break
        }

        var pack packData
        if err := msg.Decode(&pack); err != nil {
            return errResp(ErrDecode, "%v: %v", msg, err)
        }
        if err := checkLenLimits(len(pack.IDs), pack); err != nil {
            return err
        }
        if len(pack.IDs) == 0 {
            return errResp(ErrDecode, "%v: %v", msg, err)
        }
        // Mark the hashes as present at the remote node
        for _, id := range pack.IDs {
            p.MarkEvent(id)
        }
        // Notify downloader about new pack
        _ = peerDwnlr.NotifyPack(pack.Epoch, pack.Index, pack.IDs, time.Now(), p.RequestEvents)

    default:
        return errResp(ErrInvalidMsgCode, "%v", msg.Code)
    }
    return nil
}

func (pm *ProtocolManager) decideBroadcastAggressiveness(size int, passed time.Duration, peersNum int) int {
    percents := 100
    maxPercents := 1000000 * percents
    latencyVsThroughputTradeoff := maxPercents
    cfg := pm.config.Protocol
    if cfg.ThroughputImportance != 0 {
        latencyVsThroughputTradeoff = (cfg.LatencyImportance * percents) / cfg.ThroughputImportance
    }

    byteCost := time.Millisecond / 2
    broadcastCost := passed + time.Duration(size)*byteCost
    broadcastAllCostTarget := time.Duration(latencyVsThroughputTradeoff) * (700 * time.Millisecond) / time.Duration(percents)
    broadcastSqrtCostTarget := broadcastAllCostTarget * 20

    fullRecipients := 0
    if latencyVsThroughputTradeoff >= maxPercents {
        // edge case
        fullRecipients = peersNum
    } else if latencyVsThroughputTradeoff <= 0 {
        // edge case
        fullRecipients = 0
    } else if broadcastCost <= broadcastAllCostTarget {
        // if event is small or was created recently, always send to everyone full event
        fullRecipients = peersNum
    } else if broadcastCost <= broadcastSqrtCostTarget || passed == 0 {
        // if event is big but was created recently, send full event to subset of peers
        fullRecipients = int(math.Sqrt(float64(peersNum)))
        if fullRecipients < 4 {
            fullRecipients = 4
        }
    }
    if fullRecipients > peersNum {
        fullRecipients = peersNum
    }
    return fullRecipients
}

// BroadcastEvent will either propagate a event to a subset of it's peers, or
// will only announce it's availability (depending what's requested).
func (pm *ProtocolManager) BroadcastEvent(event *inter.Event, passed time.Duration) int {
    if passed < 0 {
        passed = 0
    }
    id := event.Hash()
    peers := pm.peers.PeersWithoutEvent(id)
    if len(peers) == 0 {
        log.Trace("Event is already known to all peers", "hash", id)
        return 0
    }

    fullRecipients := pm.decideBroadcastAggressiveness(event.Size(), passed, len(peers))

    // Broadcast of full event to a subset of peers
    fullBroadcast := peers[:fullRecipients]
    for _, peer := range fullBroadcast {
        peer.AsyncSendEvents(inter.Events{event})
    }
    // Broadcast of event hash to the rest peers
    hashBroadcast := peers[fullRecipients:]
    for _, peer := range hashBroadcast {
        peer.AsyncSendNewEventHashes(hash.Events{event.Hash()})
    }
    log.Trace("Broadcast event", "hash", id, "fullRecipients", len(fullBroadcast), "hashRecipients", len(hashBroadcast))
    return len(peers)
}

// BroadcastTxs will propagate a batch of transactions to all peers which are not known to
// already have the given transaction.
func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) {
    if len(txs) > softLimitItems {
        txs = txs[:softLimitItems]
    }

    var txset = make(map[*peer]types.Transactions)

    // Broadcast transactions to a batch of peers not knowing about it
    for _, tx := range txs {
        peers := pm.peers.PeersWithoutTx(tx.Hash())
        for _, peer := range peers {
            txset[peer] = append(txset[peer], tx)
        }
        log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers))
    }
    // FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
    for peer, txs := range txset {
        peer.AsyncSendTransactions(txs)
    }
}

// Mined broadcast loop
func (pm *ProtocolManager) emittedBroadcastLoop() {
    defer pm.loopsWg.Done()
    for {
        select {
        case emitted := <-pm.emittedEventsCh:
            pm.BroadcastEvent(emitted, 0)
        // Err() channel will be closed when unsubscribing.
        case <-pm.emittedEventsSub.Err():
            return
        }
    }
}

// Progress broadcast loop
func (pm *ProtocolManager) progressBroadcastLoop() {
    defer pm.loopsWg.Done()
    // automatically stops if unsubscribe
    prevProgress := pm.myProgress()
    for {
        select {
        case _ = <-pm.newPacksCh:
            // broadcast my new progress, but not recent one,
            // so others could receive all the events before this node announces the pack
            for _, peer := range pm.peers.List() {
                err := peer.SendProgress(prevProgress)
                if err != nil {
                    log.Warn("Failed to send progress status", "peer", peer.id, "err", err)
                }
            }
            prevProgress = pm.myProgress()
        // Err() channel will be closed when unsubscribing.
        case <-pm.newPacksSub.Err():
            return
        }
    }
}

func (pm *ProtocolManager) onNewEpochLoop() {
    defer pm.loopsWg.Done()
    for {
        select {
        case myEpoch := <-pm.newEpochsCh:
            peerEpoch := func(peer string) idx.Epoch {
                p := pm.peers.Peer(peer)
                if p == nil {
                    return 0
                }
                return p.progress.Epoch
            }
            if atomic.LoadUint32(&pm.synced) == 0 {
                synced := false
                for _, peer := range pm.peers.List() {
                    if peer.progress.Epoch == myEpoch {
                        synced = true
                    }
                }
                // Mark initial sync done on any peer which has the same epoch
                if synced {
                    atomic.StoreUint32(&pm.synced, 1)
                }
            }
            pm.buffer.Clear()
            pm.downloader.OnNewEpoch(myEpoch, peerEpoch)
        // Err() channel will be closed when unsubscribing.
        case <-pm.newEpochsSub.Err():
            return
        }
    }
}

func (pm *ProtocolManager) txBroadcastLoop() {
    defer pm.loopsWg.Done()
    for {
        select {
        case notify := <-pm.txsCh:
            pm.BroadcastTxs(notify.Txs)

        // Err() channel will be closed when unsubscribing.
        case <-pm.txsSub.Err():
            return
        }
    }
}

// NodeInfo represents a short summary of the sub-protocol metadata
// known about the host peer.
type NodeInfo struct {
    Network     uint64      `json:"network"` // network ID
    Genesis     common.Hash `json:"genesis"` // SHA3 hash of the host's genesis object
    Epoch       idx.Epoch   `json:"epoch"`
    NumOfBlocks idx.Block   `json:"blocks"`
    //Config  *params.ChainConfig `json:"config"`  // Chain configuration for the fork rules
}

// NodeInfo retrieves some protocol metadata about the running host node.
func (pm *ProtocolManager) NodeInfo() *NodeInfo {
    numOfBlocks, _ := pm.engine.LastBlock()
    return &NodeInfo{
        Network:     pm.config.Net.NetworkID,
        Genesis:     pm.engine.GetGenesisHash(),
        Epoch:       pm.engine.GetEpoch(),
        NumOfBlocks: numOfBlocks,
    }
}