
View on GitHub


2 hrs
Test Coverage
package gossip

import (


const (
    // This is the target size for the packs of transactions sent by txsyncLoop.
    // A pack can get larger than this if a single transactions exceeds this size.
    txsyncPackSize = 100 * 1024

type txsync struct {
    p   *peer
    txs []*types.Transaction

// syncTransactions starts sending all currently pending transactions to the given peer.
func (pm *ProtocolManager) syncTransactions(p *peer) {
    var txs types.Transactions
    pending, _ := pm.txpool.Pending()
    for _, batch := range pending {
        txs = append(txs, batch...)
    if len(txs) == 0 {
    select {
    case pm.txsyncCh <- &txsync{p, txs}:
    case <-pm.quitSync:

// txsyncLoop takes care of the initial transaction sync for each new
// connection. When a new peer appears, we relay all currently pending
// transactions. In order to minimise egress bandwidth usage, we send
// the transactions in small packs to one peer at a time.
func (pm *ProtocolManager) txsyncLoop() {
    var (
        pending = make(map[enode.ID]*txsync)
        sending = false               // whether a send is active
        pack    = new(txsync)         // the pack that is being sent
        done    = make(chan error, 1) // result of the send

    // send starts a sending a pack of transactions from the sync.
    send := func(s *txsync) {
        // Fill pack with transactions up to the target size.
        size := common.StorageSize(0)
        pack.p = s.p
        pack.txs = pack.txs[:0]
        for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
            pack.txs = append(pack.txs, s.txs[i])
            size += s.txs[i].Size()
        // Remove the transactions that will be sent.
        s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
        if len(s.txs) == 0 {
            delete(pending, s.p.ID())
        // Send the pack in the background.
        s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size)
        sending = true
        go func() { done <- pack.p.SendTransactions(pack.txs) }()

    // pick chooses the next pending sync.
    pick := func() *txsync {
        if len(pending) == 0 {
            return nil
        n := rand.Intn(len(pending)) + 1
        for _, s := range pending {
            if n--; n == 0 {
                return s
        return nil

    for {
        select {
        case s := <-pm.txsyncCh:
            pending[s.p.ID()] = s
            if !sending {
        case err := <-done:
            sending = false
            // Stop tracking peers that cause send failures.
            if err != nil {
                pack.p.Log().Debug("Transaction send failed", "err", err)
                delete(pending, pack.p.ID())
            // Schedule the next send.
            if s := pick(); s != nil {
        case <-pm.quitSync:

// syncer is responsible for periodically synchronising with the network, both
// downloading hashes and events as well as handling the announcement handler.
func (pm *ProtocolManager) syncer() {
    // Start and ensure cleanup of sync mechanisms
    for {
        select {
        case <-pm.newPeerCh:
        case <-pm.noMorePeers: