aergoio/aergo

View on GitHub
p2p/synctx.go

Summary

Maintainability
C
1 day
Test Coverage
C
73%
package p2p

import (
    "container/list"
    "fmt"
    "runtime/debug"
    "sort"
    "time"

    "github.com/aergoio/aergo-lib/log"
    "github.com/aergoio/aergo/v2/internal/enc/proto"
    "github.com/aergoio/aergo/v2/p2p/p2pcommon"
    "github.com/aergoio/aergo/v2/p2p/p2putil"
    "github.com/aergoio/aergo/v2/p2p/subproto"
    "github.com/aergoio/aergo/v2/types"
    "github.com/aergoio/aergo/v2/types/message"
    lru "github.com/hashicorp/golang-lru"
)

const minimumTxQueryInterval = time.Second >> 1
const txQueryTimeout = time.Second << 2

var unsent = time.Time{}

// syncTxManager handle operations about tx sync
type syncTxManager struct {
    logger *log.Logger

    sm        p2pcommon.SyncManager
    actor     p2pcommon.ActorService
    pm        p2pcommon.PeerManager
    msgHelper message.Helper

    txCache *lru.Cache
    // received notice but not in my mempool
    frontCache      map[types.TxID]*incomingTxNotice
    toNoticeIdQueue *list.List

    taskChannel      chan smTask
    taskQueryChannel chan smTask
    finishChannel    chan struct{}

    fcTicker *time.Ticker
}

type queryQueue struct {
    peerID types.PeerID
    txIDs  []types.TxID
}

type smTask func()

func newTxSyncManager(sm p2pcommon.SyncManager, actor p2pcommon.ActorService, pm p2pcommon.PeerManager, logger *log.Logger) *syncTxManager {
    tm := &syncTxManager{sm: sm, actor: actor, pm: pm, logger: logger,
        frontCache:       make(map[types.TxID]*incomingTxNotice),
        toNoticeIdQueue:  list.New(),
        taskChannel:      make(chan smTask, 20),
        finishChannel:    make(chan struct{}, 1),
        taskQueryChannel: make(chan smTask, 10),

        msgHelper: message.GetHelper(),
        fcTicker:  time.NewTicker(minimumTxQueryInterval),
    }
    var err error
    tm.txCache, err = lru.New(DefaultGlobalTxCacheSize)
    if err != nil {
        panic("Failed to create p2p tx cache " + err.Error())
    }
    return tm
}

func (tm *syncTxManager) Start() {
    go tm.runManager()
    go tm.runQueryLog()
}

func (tm *syncTxManager) Stop() {
    close(tm.finishChannel)
}

func (tm *syncTxManager) runManager() {
    defer func() {
        if panicMsg := recover(); panicMsg != nil {
            tm.logger.Warn().Str("callStack", string(debug.Stack())).Str("errMsg", fmt.Sprintf("%v", panicMsg)).Msg("panic ocurred tx sync task")
        }
    }()
    tm.logger.Debug().Msg("syncTXManager started")

    // set interval of trying to resend getTransaction
MANLOOP:
    for {
        select {
        case <-tm.fcTicker.C:
            tm.refineFrontCache()
        case task := <-tm.taskChannel:
            task()
        case <-tm.finishChannel:
            tm.fcTicker.Stop()
            break MANLOOP
        }
    }
    tm.logger.Debug().Msg("syncTXManager finished")
}
func (tm *syncTxManager) runQueryLog() {
    defer func() {
        if panicMsg := recover(); panicMsg != nil {
            tm.logger.Warn().Str("callStack", string(debug.Stack())).Str("errMsg", fmt.Sprintf("%v", panicMsg)).Msg("panic occurred handle get tx queries")
        }
    }()
    // set interval of trying to resend getTransaction
    tm.logger.Debug().Msg("syncTXManager starting query routine")

MANLOOP:
    for {
        select {
        case task := <-tm.taskQueryChannel:
            task()
        case <-tm.finishChannel:
            break MANLOOP
        }
    }
    tm.logger.Debug().Msg("syncTXManager finished query routine")
}

func (tm *syncTxManager) registerTxNotice(txs []*types.Tx) {
    tm.taskChannel <- func() {
        for _, tx := range txs {
            tm.moveToMPCache(tx)
        }
    }
}

// pre-allocated slices to reduce memory allocation. this buffers must used inside syncTXManager goroutine.
var (
    // for general usage
    addBuf         = make([]types.TxID, 0, DefaultPeerTxQueueSize)
    dupBuf         = make([]types.TxID, 0, DefaultPeerTxQueueSize)
    queuedBuf      = make([]types.TxID, 0, DefaultPeerTxQueueSize)
    cleanupCounter = 0
    // idsBuf is used for indivisual peer
    idsBuf    = make([][]types.TxID, 0, 10)
    bufOffset = 0
)

// getIDsBuf return empty slice with capacity DefaultPeerTxQueueSize
func getIDsBuf(idx int) []types.TxID {
    for idx >= len(idsBuf) {
        idsBuf = append(idsBuf, make([]types.TxID, 0, DefaultPeerTxQueueSize))
    }
    return idsBuf[idx][:0]
}

func (tm *syncTxManager) HandleNewTxNotice(peer p2pcommon.RemotePeer, txIDs []types.TxID, data *types.NewTransactionsNotice) {
    tm.taskChannel <- func() {
        peerID := peer.ID()
        now := time.Now()
        newComer := addBuf[:0]
        duplicated := dupBuf[:0]
        queued := queuedBuf[:0]

        for _, txID := range txIDs {
            // If you want to strict check, query tx to cahinservice. It is skipped since it's so time consuming
            // mempool has tx already
            if ok := tm.txCache.Contains(txID); ok {
                duplicated = append(duplicated, txID)
                continue
            }
            // check if tx is in front cache
            if info, ok := tm.frontCache[txID]; ok {
                // other peer sent notice already. so add peerid to next waiting list
                appendPeerID(info, peerID)
                queued = append(queued, txID)
                continue
            }

            info := &incomingTxNotice{hash: txID, created: now, lastSent: now}
            tm.frontCache[txID] = info
            newComer = append(newComer, txID)
        }

        if len(newComer) > 0 {
            if len(newComer) <= len(txIDs) {
                copy(txIDs, newComer)
                txIDs = txIDs[:len(newComer)]
            }
            tm.sendGetTx(peer, txIDs)
        }
        if len(queued) > 0 {
            toQueue := make([]types.TxID, len(queued))
            copy(toQueue, queued)
            tm.toNoticeIdQueue.PushBack(&queryQueue{peerID: peerID, txIDs: toQueue})
        }

        tm.logger.Trace().Stringer(p2putil.LogPeerID, types.LogPeerShort(peerID)).Int("newCnt", len(newComer)).Int("queCnt", len(queued)).Int("dupCnt", len(duplicated)).Array("newComer", types.NewLogTxIDsMarshaller(newComer, 10)).Array("duplicated", types.NewLogTxIDsMarshaller(duplicated, 10)).Array("queued", types.NewLogTxIDsMarshaller(queued, 10)).Int("frontCacheSize", len(tm.frontCache)).Msg("push txs, to query next time")
    }
}

func (tm *syncTxManager) sendGetTxs(peer p2pcommon.RemotePeer, ids []types.TxID) {
    tm.logger.Debug().Int("tx_cnt", len(ids)).Array("hashes", types.NewLogTxIDsMarshaller(ids, 10)).Msg("syncManager request back unknown tx hashes")
    receiver := NewGetTxsReceiver(tm.actor, peer, tm.sm, tm.logger, ids, p2pcommon.DefaultActorMsgTTL)
    receiver.StartGet()
}

func (tm *syncTxManager) HandleGetTxReq(peer p2pcommon.RemotePeer, msgID p2pcommon.MsgID, data *types.GetTransactionsRequest) error {
    select {
    case tm.taskQueryChannel <- func() {
        reqHashes := data.Hashes
        tm.handleTxReq(peer, msgID, reqHashes)
    }:
        return nil
    default:
        return p2pcommon.SyncManagerBusyError

    }
}

func (tm *syncTxManager) retryGetTx(peerID types.PeerID, hashes [][]byte) {
    tm.taskChannel <- func() {
        txIDs := make([]types.TxID, len(hashes))
        for i, hash := range hashes {
            txIDs[i] = types.ToTxID(hash)
        }
        tm.logger.Debug().Stringer(p2putil.LogPeerID, types.LogPeerShort(peerID)).Array("txIDs", types.NewLogTxIDsMarshaller(txIDs, 10)).Msg("push txs that are failed to get by server busy")
        tm.pushBackToFrontCache(peerID, txIDs)
    }
}

func (tm *syncTxManager) pushBackToFrontCache(peerID types.PeerID, txIDs []types.TxID) {
    // this method is called when the sending is failed by remote peer is busy.
    // resetting last sent time will trigger immediate query of that tx.
    // push back
    pushedCount := 0
    for _, txID := range txIDs {
        // only search front cache.
        if info, ok := tm.frontCache[txID]; ok {
            // other peer sent notice already and ready to
            appendPeerID(info, peerID)
            info.lastSent = unsent
            pushedCount++
        }
    }
    if pushedCount > 0 {
        tm.toNoticeIdQueue.PushFront(&queryQueue{peerID: peerID, txIDs: txIDs})
    }
}

func (tm *syncTxManager) burnFailedTxFrontCache(peerID types.PeerID, txIDs []types.TxID) {
    for _, txID := range txIDs {
        // only search front cache.
        if info, ok := tm.frontCache[txID]; ok {
            if len(info.peers) > 0 {
                // make send gettx to other peer
                info.lastSent = unsent
            } else {
                delete(tm.frontCache, txID)
            }
        }
    }
}

// this function must called only if ticket can be retrieved.
func (tm *syncTxManager) handleTxReq(remotePeer p2pcommon.RemotePeer, mID p2pcommon.MsgID, reqHashes [][]byte) {
    // NOTE size estimation is tied to protobuf3 it should be changed when protobuf is changed.
    // find transactions from chainservice
    idx := 0
    status := types.ResultStatus_OK
    var hashes, mpReqs []types.TxHash
    var txInfos []*types.Tx
    var reqIDs = make([]types.TxID, len(reqHashes))
    var txs = make(map[types.TxID]*types.Tx)
    payloadSize := subproto.EmptyGetBlockResponseSize
    var txSize, fieldSize int

    bucket := message.MaxReqestHashes
    var futures []interface{}

    var inCache, inMempool = 0, 0
    // 1. first check in cache
    for i, h := range reqHashes {
        reqIDs[i] = types.ToTxID(h)
        tx, ok := tm.txCache.Get(reqIDs[i])
        if ok {
            txs[reqIDs[i]] = tx.(*types.Tx)
            inCache++
        } else {
            mpReqs = append(mpReqs, h)
        }
    }

    for _, h := range mpReqs {
        hashes = append(hashes, h)
        if len(hashes) == bucket {
            if f, err := tm.actor.CallRequestDefaultTimeout(message.MemPoolSvc,
                &message.MemPoolExistEx{Hashes: hashes}); err == nil {
                futures = append(futures, f)
            }
            hashes = nil
        }
    }
    if hashes != nil {
        if f, err := tm.actor.CallRequestDefaultTimeout(message.MemPoolSvc,
            &message.MemPoolExistEx{Hashes: hashes}); err == nil {
            futures = append(futures, f)
        }
    }
    hashes = nil
    idx = 0
    for _, f := range futures {
        if tmp, err := tm.msgHelper.ExtractTxsFromResponseAndError(f, nil); err == nil {
            for _, tx := range tmp {
                if tx == nil {
                    continue
                }
                txs[types.ToTxID(tx.Hash)] = tx
                inMempool++
            }
        } else {
            tm.logger.Debug().Err(err).Msg("ErrExtract tx in future")
        }
    }
    msgCnt := 0
    for _, tid := range reqIDs {
        tx, ok := txs[tid]
        if !ok {
            continue
        }
        hash := tx.GetHash()
        txSize = proto.Size(tx)

        fieldSize = txSize + p2putil.CalculateFieldDescSize(txSize)
        fieldSize += len(hash) + p2putil.CalculateFieldDescSize(len(hash))

        if uint32(payloadSize+fieldSize) > p2pcommon.MaxPayloadLength {
            // send partial list
            resp := &types.GetTransactionsResponse{
                Status: status,
                Hashes: hashes,
                Txs:    txInfos, HasNext: true}
            tm.logger.Trace().Int(p2putil.LogTxCount, len(hashes)).
                Str(p2putil.LogOrgReqID, mID.String()).Msg("Sending partial response")

            remotePeer.SendMessage(remotePeer.MF().
                NewMsgResponseOrder(mID, p2pcommon.GetTXsResponse, resp))
            msgCnt++
            hashes, txInfos, payloadSize = nil, nil, subproto.EmptyGetBlockResponseSize
        }

        hashes = append(hashes, hash)
        txInfos = append(txInfos, tx)
        payloadSize += fieldSize
        idx++
    }
    // generate response message
    if 0 == idx {
        // if no tx is found, set status tu not found
        status = types.ResultStatus_NOT_FOUND
    }
    resp := &types.GetTransactionsResponse{
        Status: status,
        Hashes: hashes,
        Txs:    txInfos, HasNext: false}
    tm.logger.Trace().Int(p2putil.LogTxCount, len(hashes)).
        Str(p2putil.LogOrgReqID, mID.String()).Str(p2putil.LogRespStatus, status.String()).Msg("Sending last part response")
    remotePeer.SendMessage(remotePeer.MF().NewMsgResponseOrder(mID, p2pcommon.GetTXsResponse, resp))
    msgCnt++
    tm.logger.Debug().Int("respMsgCnt", msgCnt).
        Int("inCache", inCache).Int("inMempool", inMempool).
        Str(p2putil.LogOrgReqID, mID.String()).Str(p2putil.LogRespStatus, status.String()).
        Msg("handled getTx query")
}

func (tm *syncTxManager) refineFrontCache() {
    now := time.Now()
    expireTime := now.Add(-txQueryTimeout)
    if tm.toNoticeIdQueue.Len() == 0 { // nothing to resend
        cleanupCounter++
        if cleanupCounter%20 == 0 {
            cleanupCounter = 0
            if len(tm.frontCache) > 0 {
                tm.cleanupFrontCache(expireTime)
            }
        }
        return
    }
    tm.logger.Trace().Int("noticeQueues", tm.toNoticeIdQueue.Len()).Int("frontCache", len(tm.frontCache)).Msg("refining front cache")

    // init
    expired := dupBuf[:0]
    done := addBuf[:0]
    bufOffset = 0

    // assume peer is all available for now
    sendMap := make(map[types.PeerID]*[]types.TxID)
    // find txs that should query to peers
    // tx in front cache has tri-state: unsent, waitingResp, expiredWaiting

    var next *list.Element
    for e := tm.toNoticeIdQueue.Front(); e != nil; e = next {
        next = e.Next()
        queAgain := queuedBuf[:0]
        queuedIDs := e.Value.(*queryQueue)
        toSend := tm.allocIDSlice(queuedIDs.peerID, sendMap)
        if len(*toSend) >= DefaultPeerTxQueueSize {
            // list is full. skip this peer
            continue
        }

        idSize := len(queuedIDs.txIDs)
        toSendCnt := 0
        for j := 0; j < idSize; j++ {
            txID := queuedIDs.txIDs[j]
            info := tm.frontCache[txID]
            if info == nil { // tx is done to mempool or block. this txid is safe to delete
                done = append(done, txID)
                continue
            }
            if info.lastSent.After(expireTime) {
                // txs that wait for getTXResp and not expired will wait more time.
                queAgain = append(queAgain, txID)
                continue
            }
            if len(info.peers) == 0 {
                // remove old or unsent tx that has no peer to query.
                expired = append(expired, txID)
                delete(tm.frontCache, txID)
            }

            if tm.addToList(info, queuedIDs.peerID, toSend) {
                info.lastSent = now
                toSendCnt++
                if len(*toSend) >= DefaultPeerTxQueueSize {
                    queAgain = append(queAgain, queuedIDs.txIDs[j+1:]...)
                    break
                }
            } else {
                queAgain = append(queAgain, txID)
            }
        }

        // if not all txs is filled, the unsent will be pushed front to try send in next turn.
        if len(queAgain) > 0 {
            // reuse allocated slice
            toQueue := queuedIDs.txIDs[:len(queAgain)]
            copy(toQueue, queAgain)
            tm.logger.Trace().Array("queAgain", types.NewLogTxIDsMarshaller(toQueue, 10)).Msg("syncManager enqueue txIDs again that waiting for response")

            e.Value = &queryQueue{peerID: queuedIDs.peerID, txIDs: toQueue}
        } else {
            tm.toNoticeIdQueue.Remove(e)
        }
    }

    if len(expired) > 0 {
        tm.logger.Debug().Array("done", types.NewLogTxIDsMarshaller(done, 10)).Array("expired", types.NewLogTxIDsMarshaller(expired, 10)).Msg("syncManager deletes txIDs that are not needed anymore")
    }

    for peerID, idsP := range sendMap {
        ids := *idsP
        if len(ids) == 0 {
            // no tx to send
            continue
        }
        if peer, ok := tm.pm.GetPeer(peerID); ok {
            tm.sendGetTx(peer, ids)
        } else {
            // peer probably disconnected.
            tm.logger.Debug().Stringer(p2putil.LogPeerID, types.LogPeerShort(peerID)).Array("hashes", types.NewLogTxIDsMarshaller(ids, 10)).Msg("syncManager failed to send get tx, since peer is disconnected just before")
            toRetry := make([]types.TxID, len(ids))
            copy(toRetry, ids)
            tm.burnFailedTxFrontCache(peerID, toRetry)
        }
    }
}

func (tm *syncTxManager) sendGetTx(peer p2pcommon.RemotePeer, ids []types.TxID) {
    tm.logger.Trace().Str(p2putil.LogPeerName, peer.Name()).Array("hashes", types.NewLogTxIDsMarshaller(ids, 10)).Msg("syncManager try to get tx to remote peer")
    // create message data
    receiver := NewGetTxsReceiver(tm.actor, peer, tm.sm, tm.logger, ids, p2pcommon.DefaultActorMsgTTL)
    receiver.StartGet()
}

// assignTxToPeer set tx how to select peer for querying
func (tm *syncTxManager) allocIDSlice(peerID types.PeerID, sendMap map[types.PeerID]*[]types.TxID) *[]types.TxID {
    idsP, ok := sendMap[peerID]
    if !ok {
        list := getIDsBuf(bufOffset)
        bufOffset++
        idsP = &list
        sendMap[peerID] = idsP
    }
    return idsP
}

// addToList check
func (tm *syncTxManager) addToList(info *incomingTxNotice, target types.PeerID, ids *[]types.TxID) bool {
    for i, peerID := range info.peers {
        if types.IsSamePeerID(peerID, target) {
            // remove peerID from wait queue
            info.peers = append(info.peers[:i], info.peers[i+1:]...)
            *ids = append(*ids, info.hash)
            return true
        }
    }
    return false
}

// assignTxToPeer set tx how to select peer for querying
func (tm *syncTxManager) assignTxToPeer(info *incomingTxNotice, sendMap map[types.PeerID][]types.TxID) bool {
    for i, peerID := range info.peers {
        list, ok := sendMap[peerID]
        if !ok {
            list = getIDsBuf(bufOffset)
            bufOffset++
        }
        if len(list) >= DefaultPeerTxQueueSize {
            // reached max count in a single query
            continue
        }
        list = append(list, info.hash)
        info.peers = append(info.peers[:i], info.peers[i+1:]...)
        sendMap[peerID] = list
        return true
    }
    return false
}

func (tm *syncTxManager) moveToMPCache(tx *types.Tx) {
    txID := types.ToTxID(tx.Hash)
    delete(tm.frontCache, txID)
    tm.txCache.Add(txID, tx)
    tm.logger.Trace().Str("txID", txID.String()).Msg("syncManager caches tx")
}

// cleanupFrontCache clean unnecessary frontCache items. These are txs that sent request
func (tm *syncTxManager) cleanupFrontCache(expireTime time.Time) {
    testCnt, expired := 0, 0
    for txID, info := range tm.frontCache {
        if (!info.lastSent.After(expireTime)) && len(info.peers) == 0 {
            // remove old or unsent tx that has no peer to query.
            expired++
            delete(tm.frontCache, txID)
        }
        testCnt++
        if testCnt >= 10000 {
            break
        }
    }
    tm.logger.Debug().Int("testCnt", testCnt).Int("expireCnt", expired).Msg("syncManager clean up some of expired items in frontCache")

}

func appendPeerID(info *incomingTxNotice, peerID types.PeerID) {
    info.peers = append(info.peers, peerID)
    if len(info.peers) >= inTxPeerBufSize {
        info.peers = info.peers[1:]
    }
}

type incomingTxNotice struct {
    hash     types.TxID
    created  time.Time
    lastSent time.Time
    trial    int
    peers    []types.PeerID
}

// By is the type of a "less" function that defines the ordering of its Planet arguments.
type By func(p1, p2 *incomingTxNotice) bool

// Sort is a method on the function type, By, that sorts the argument slice according to the function.
func (by By) Sort(notices []incomingTxNotice) {
    ps := &txSorter{
        notices: notices,
        by:      by, // The Sort method's receiver is the function (closure) that defines the sort order.
    }
    sort.Sort(ps)
}

// planetSorter joins a By function and a slice of Planets to be sorted.
type txSorter struct {
    notices []incomingTxNotice
    by      func(p1, p2 *incomingTxNotice) bool // Closure used in the Less method.
}

// Len is part of sort.Interface.
func (s *txSorter) Len() int {
    return len(s.notices)
}

// Swap is part of sort.Interface.
func (s *txSorter) Swap(i, j int) {
    s.notices[i], s.notices[j] = s.notices[j], s.notices[i]
}

// Less is part of sort.Interface. It is implemented by calling the "by" closure in the sorter.
func (s *txSorter) Less(i, j int) bool {
    return s.by(&s.notices[i], &s.notices[j])
}