aergoio/aergo

View on GitHub
syncer/blockfetcher.go

Summary

Maintainability
D
1 day
Test Coverage
B
82%
package syncer

import (
    "bytes"
    "container/list"
    "errors"
    "fmt"
    "sync"
    "sync/atomic"
    "time"

    "github.com/aergoio/aergo/v2/internal/enc/base58"
    "github.com/aergoio/aergo/v2/pkg/component"
    "github.com/aergoio/aergo/v2/types"
    "github.com/aergoio/aergo/v2/types/message"
    "github.com/rs/zerolog"
)

type BlockFetcher struct {
    compRequester component.IComponentRequester //for communicate with other service

    ctx *types.SyncContext

    quitCh chan interface{}

    hfCh chan *HashSet

    curHashSet *HashSet

    runningQueue TaskQueue
    pendingQueue TaskQueue
    retryQueue   SortedTaskQueue

    responseCh chan interface{} //BlockResponse, AddBlockResponse message
    peers      *PeerSet

    blockProcessor *BlockProcessor

    name string

    maxFetchSize   int
    maxFetchTasks  int
    maxPendingConn int

    debug bool

    stat BlockFetcherStat

    waitGroup *sync.WaitGroup
    isRunning bool

    cfg *SyncerConfig
}

type BlockFetcherStat struct {
    maxRspBlock  atomic.Value
    lastAddBlock atomic.Value
}

type SyncPeer struct {
    No      int
    ID      types.PeerID
    FailCnt int
    IsErr   bool
}

type TaskQueue struct {
    list.List
}

type SortedTaskQueue struct {
    TaskQueue
}

func (squeue *SortedTaskQueue) Push(task *FetchTask) {
    var bigElem *list.Element

    for e := squeue.Front(); e != nil; e = e.Next() {
        // do something with e.Value
        curTask := e.Value.(*FetchTask)
        if curTask.startNo > task.startNo {
            bigElem = e
            break
        }
    }

    if bigElem != nil {
        squeue.InsertBefore(task, bigElem)
    } else {
        squeue.PushBack(task)
    }

}

type FetchTask struct {
    count   int
    hashes  []message.BlockHash
    startNo types.BlockNo

    syncPeer *SyncPeer

    started time.Time
    retry   int
}

type PeerSet struct {
    total int
    free  int
    bad   int

    freePeers *list.List
    badPeers  *list.List
}

var (
    schedTick            = time.Millisecond * 100
    DfltFetchTimeOut     = time.Second * 30
    DfltBlockFetchSize   = 100
    MaxPeerFailCount     = 3
    DfltBlockFetchTasks  = 5
    MaxBlockPendingTasks = 10
)

var (
    ErrAllPeerBad       = errors.New("BlockFetcher: error no avaliable peers")
    ErrQuitBlockFetcher = errors.New("BlockFetcher quit")
)

func newBlockFetcher(ctx *types.SyncContext, compRequester component.IComponentRequester, cfg *SyncerConfig) *BlockFetcher {
    bf := &BlockFetcher{ctx: ctx, compRequester: compRequester, name: NameBlockFetcher, cfg: cfg}

    bf.quitCh = make(chan interface{})
    bf.hfCh = make(chan *HashSet)
    bf.responseCh = make(chan interface{}, cfg.maxBlockReqTasks*2) //for safety. In normal situdation, it should use only one

    bf.peers = newPeerSet()
    bf.maxFetchSize = cfg.maxBlockReqSize
    bf.maxFetchTasks = cfg.maxBlockReqTasks
    bf.maxPendingConn = cfg.maxPendingConn

    bf.blockProcessor = NewBlockProcessor(compRequester, bf, ctx.CommonAncestor, ctx.TargetNo)

    bf.blockProcessor.connQueue = make([]*ConnectTask, 0, 16)

    bf.runningQueue.Init()
    bf.pendingQueue.Init()
    bf.retryQueue.Init()

    return bf
}

func (bf *BlockFetcher) Start() {
    bf.waitGroup = &sync.WaitGroup{}
    bf.waitGroup.Add(1)

    schedTicker := time.NewTicker(schedTick)

    bf.isRunning = true

    if bf.cfg.debugContext != nil && bf.cfg.debugContext.debugHashFetcher {
        testRun := func() {
            logger.Debug().Msg("BlockFetcher dummy mode started")

            defer bf.waitGroup.Done()

            for {
                if bf.cfg.debugContext.BfWaitTime > 0 {
                    logger.Debug().Msg("BlockFetcher sleep")
                    time.Sleep(bf.cfg.debugContext.BfWaitTime)
                    logger.Debug().Msg("BlockFetcher wakeup")
                }
                select {
                case <-bf.hfCh:
                case <-bf.quitCh:
                    logger.Debug().Msg("BlockFetcher dummy mode exited")
                    return
                }
            }
        }

        go testRun()
        return
    }

    run := func() {
        defer RecoverSyncer(NameBlockFetcher, bf.GetSeq(), bf.compRequester, func() { bf.waitGroup.Done() })

        logger.Debug().Msg("start block fetcher")

        if err := bf.init(); err != nil {
            stopSyncer(bf.compRequester, bf.GetSeq(), bf.name, err)
            return
        }

        logger.Debug().Msg("block fetcher loop start")

        for {
            select {
            case <-schedTicker.C:
                if err := bf.checkTaskTimeout(); err != nil {
                    logger.Error().Err(err).Msg("failed checkTaskTimeout")
                    stopSyncer(bf.compRequester, bf.GetSeq(), bf.name, err)
                    return
                }

            case msg, ok := <-bf.responseCh:
                if !ok {
                    logger.Info().Msg("BlockFetcher responseCh is closed. Syncer is maybe stopping.")
                    return
                }

                err := bf.blockProcessor.run(msg)
                if err != nil {
                    logger.Error().Err(err).Msg("invalid block response message")
                    stopSyncer(bf.compRequester, bf.GetSeq(), bf.name, err)
                    return
                }

            case <-bf.quitCh:
                logger.Info().Msg("BlockFetcher quit#1")
                return
            }

            //TODO scheduler stop if all tasks have done
            if err := bf.schedule(); err != nil {
                if err == ErrQuitBlockFetcher {
                    logger.Info().Msg("BlockFetcher exited while schedule")
                    return
                }

                logger.Error().Err(err).Msg("BlockFetcher schedule failed & finished")
                stopSyncer(bf.compRequester, bf.GetSeq(), bf.name, err)
                return
            }
        }
    }

    go run()
}

func (bf *BlockFetcher) init() error {
    setPeers := func() error {
        result, err := bf.compRequester.RequestToFutureResult(message.P2PSvc, &message.GetPeers{}, dfltTimeout, "BlockFetcher init")
        if err != nil {
            logger.Error().Err(err).Msg("failed to get peers information")
            return err
        }

        msg := result.(*message.GetPeersRsp)

        for _, peerElem := range msg.Peers {
            state := peerElem.State
            if state.Get() == types.RUNNING {
                bf.peers.addNew(types.PeerID(peerElem.Addr.PeerID))
            }
        }

        if bf.peers.freePeers.Len() != bf.peers.free {
            panic(fmt.Sprintf("free peer len mismatch %d,%d", bf.peers.freePeers.Len(), bf.peers.free))
        }

        return nil
    }

    logger.Debug().Msg("block fetcher init")

    if err := setPeers(); err != nil {
        logger.Error().Err(err).Msg("failed to set peers")
        return err
    }

    return nil
}

func (bf *BlockFetcher) GetSeq() uint64 {
    return bf.ctx.Seq
}

func (bf *BlockFetcher) schedule() error {
    for bf.peers.free > 0 {
        //check max concurrent runing task count
        curRunning := bf.runningQueue.Len()
        if curRunning >= bf.maxFetchTasks {
            //logger.Debug().Int("runnig", curRunning).Int("pending", bf.pendingQueue.Len()).Msg("max running")
            return nil
        }

        //check no task
        candTask, err := bf.searchCandidateTask()
        if err != nil {
            logger.Error().Err(err).Msg("failed to search candidate task")
            return err
        }
        if candTask == nil {
            return nil
        }

        //check max pending connect
        //    if next task is retry task, must run. it can be next block to connect
        curPendingConn := len(bf.blockProcessor.connQueue)
        if curPendingConn >= bf.maxPendingConn && candTask.retry <= 0 {
            return nil
        }

        freePeer, err := bf.popFreePeer()
        if err != nil {
            logger.Error().Err(err).Msg("error to get free peer")
            return err
        }
        if freePeer == nil {
            panic("free peer can't be nil")
        }

        bf.popNextTask(candTask)
        if candTask == nil {
            panic("task can't be nil")
        }

        logger.Debug().Int("pendingConn", curPendingConn).Int("running", curRunning).Msg("schedule")
        bf.runTask(candTask, freePeer)
    }

    return nil
}

func (bf *BlockFetcher) checkTaskTimeout() error {
    now := time.Now()
    var next *list.Element

    for e := bf.runningQueue.Front(); e != nil; e = next {
        // do something with e.Value
        task := e.Value.(*FetchTask)
        next = e.Next()

        if !task.isTimeOut(now, bf.cfg.fetchTimeOut) {
            continue
        }

        bf.runningQueue.Remove(e)

        if err := bf.processFailedTask(task, false); err != nil {
            return err
        }

        logger.Error().Uint64("StartNo", task.startNo).Str("start", base58.Encode(task.hashes[0])).Int("cout", task.count).Int("runqueue", bf.runningQueue.Len()).Int("pendingqueue", bf.pendingQueue.Len()).
            Msg("timeouted task pushed to pending queue")

        //time.Sleep(10000*time.Second)
    }

    return nil
}

func (bf *BlockFetcher) processFailedTask(task *FetchTask, isErr bool) error {
    logBadPeer := func(peer *SyncPeer, peers *PeerSet, cfg *SyncerConfig) {
        if cfg != nil && cfg.debugContext != nil {
            cfg.debugContext.logBadPeers[peer.No] = true
        }
    }

    logger.Error().Int("peerno", task.syncPeer.No).Uint64("StartNo", task.startNo).Str("start", base58.Encode(task.hashes[0])).Msg("task fail, move to retry queue")

    failPeer := task.syncPeer

    logBadPeer(failPeer, bf.peers, bf.cfg)

    bf.peers.processPeerFail(failPeer, isErr)

    task.retry++
    task.syncPeer = nil

    //TODO sort by time because deadlock
    bf.retryQueue.Push(task)

    if bf.peers.isAllBad() {
        return ErrAllPeerBad
    }

    return nil
}

func (bf *BlockFetcher) popNextTask(task *FetchTask) {
    logger.Debug().Int("retry", task.retry).Uint64("StartNo", task.startNo).Str("start", base58.Encode(task.hashes[0])).Str("end", base58.Encode(task.hashes[task.count-1])).
        Int("tasks retry", bf.retryQueue.Len()).Int("tasks pending", bf.pendingQueue.Len()).Msg("next fetchtask")

    var poppedTask *FetchTask
    if task.retry > 0 {
        poppedTask = bf.retryQueue.Pop()
    } else {
        poppedTask = bf.pendingQueue.Pop()
    }

    if poppedTask != task {
        logger.Panic().Uint64("next task", task.startNo).Uint64("popped task", poppedTask.startNo).
            Int("retry", task.retry).Msg("peeked task is not popped task")
        panic("peeked task is not popped task")
    }
}

func (bf *BlockFetcher) searchCandidateTask() (*FetchTask, error) {
    getNewHashSet := func() (*HashSet, error) {
        if bf.curHashSet == nil { //blocking
            logger.Info().Msg("BlockFetcher waiting first hashset")

            select {
            case hashSet := <-bf.hfCh:
                return hashSet, nil
            case <-bf.quitCh:
                logger.Debug().Msg("BlockFetcher quitCh#2")
                return nil, ErrQuitBlockFetcher
            }
        } else {
            select { //nonblocking
            case hashSet := <-bf.hfCh:
                return hashSet, nil
            case <-bf.quitCh:
                logger.Debug().Msg("BlockFetcher quitCh#3")
                return nil, ErrQuitBlockFetcher
            default:
                //logger.Debug().Msg("BlockFetcher has no input HashSet")
                return nil, nil
            }
        }
    }

    addNewFetchTasks := func(hashSet *HashSet) {
        start, end := 0, 0
        count := hashSet.Count

        logger.Debug().Uint64("startno", hashSet.StartNo).Str("start", base58.Encode(hashSet.Hashes[0])).Int("count", hashSet.Count).Msg("add new fetchtasks from HashSet")

        for start < count {
            end = start + bf.maxFetchSize
            if end > count {
                end = count
            }

            task := &FetchTask{count: end - start, hashes: hashSet.Hashes[start:end], startNo: hashSet.StartNo + uint64(start), retry: 0}

            logger.Debug().Uint64("StartNo", task.startNo).Int("count", task.count).Msg("add fetchtask")

            bf.pendingQueue.PushBack(task)

            start = end
        }
        logger.Debug().Int("pendingqueue", bf.pendingQueue.Len()).Msg("addNewTasks end")
    }

    var newTask *FetchTask

    if bf.retryQueue.Len() > 0 {
        newTask = bf.retryQueue.Peek()
    } else {
        if bf.pendingQueue.Len() == 0 {
            logger.Debug().Msg("pendingqueue is empty")

            hashSet, err := getNewHashSet()
            if err != nil {
                logger.Debug().Err(err).Msg("failed to get new hashset")
                return nil, err
            }
            if hashSet == nil {
                logger.Debug().Msg("BlockFetcher no hashSet")
                return nil, nil
            }

            logger.Debug().Uint64("startno", hashSet.StartNo).Array("hashes", &LogBlockHashesMarshaller{hashSet.Hashes, 10}).Str("start", base58.Encode(hashSet.Hashes[0])).Int("count", hashSet.Count).Msg("BlockFetcher got hashset")

            bf.curHashSet = hashSet
            addNewFetchTasks(hashSet)
        }

        newTask = bf.pendingQueue.Peek()
    }

    //logger.Debug().Int("retry", newTask.retry).Uint64("StartNo", newTask.startNo).Str("start", enc.ToString(newTask.hashes[0])).Str("end", enc.ToString(newTask.hashes[newTask.count-1])).
    //    Int("tasks retry", bf.retryQueue.Len()).Int("tasks pending", bf.pendingQueue.Len()).Msg("candidate fetchtask")

    return newTask, nil
}

type LogBlockHashesMarshaller struct {
    arr   []message.BlockHash
    limit int
}

func (m LogBlockHashesMarshaller) MarshalZerologArray(a *zerolog.Array) {
    size := len(m.arr)
    if size > m.limit {
        for i := 0; i < m.limit-1; i++ {
            a.Str(base58.Encode(m.arr[i]))
        }
        a.Str(fmt.Sprintf("(and %d more)", size-m.limit+1))
    } else {
        for _, element := range m.arr {
            a.Str(base58.Encode(element))
        }
    }
}

func (bf *BlockFetcher) popFreePeer() (*SyncPeer, error) {
    setDebugAllPeerBad := func(err error, cfg *SyncerConfig) {
        if err == ErrAllPeerBad && cfg != nil && cfg.debugContext != nil {
            debugCtx := cfg.debugContext
            debugCtx.logAllPeersBad = true
        }
    }

    freePeer, err := bf.peers.popFree()
    if err != nil {
        setDebugAllPeerBad(err, bf.cfg)
        logger.Error().Err(err).Msg("pop free peer failed")
        return nil, err
    }

    if freePeer != nil {
        logger.Debug().Int("peerno", freePeer.No).Int("free", bf.peers.free).Int("total", bf.peers.total).Int("bad", bf.peers.bad).Msg("popped free peer")
    } else {
        logger.Debug().Int("free", bf.peers.free).Int("total", bf.peers.total).Int("bad", bf.peers.bad).Msg("not exist free peer")
    }

    return freePeer, nil
}

func (bf *BlockFetcher) pushFreePeer(syncPeer *SyncPeer) {
    bf.peers.pushFree(syncPeer)

    logger.Debug().Int("peerno", syncPeer.No).Int("free", bf.peers.free).Msg("pushed free peer")
}

func (bf *BlockFetcher) runTask(task *FetchTask, peer *SyncPeer) {
    task.started = time.Now()
    task.syncPeer = peer
    bf.runningQueue.PushBack(task)

    logger.Debug().Int("peerno", task.syncPeer.No).Int("count", task.count).Uint64("StartNo", task.startNo).Str("start", base58.Encode(task.hashes[0])).Int("runqueue", bf.runningQueue.Len()).Msg("send block fetch request")

    bf.compRequester.TellTo(message.P2PSvc, &message.GetBlockChunks{Seq: bf.GetSeq(), GetBlockInfos: message.GetBlockInfos{ToWhom: peer.ID, Hashes: task.hashes}, TTL: DfltFetchTimeOut})
}

// TODO refactoring matchFunc
func (bf *BlockFetcher) findFinished(msg *message.GetBlockChunksRsp, peerMatch bool) (*FetchTask, error) {
    count := len(msg.Blocks)

    var next *list.Element
    for e := bf.runningQueue.Front(); e != nil; e = next {
        // do something with e.Value
        task := e.Value.(*FetchTask)
        next = e.Next()

        //find failed peer
        if peerMatch {
            if task.isPeerMatched(msg.ToWhom) {
                bf.runningQueue.Remove(e)

                logger.Debug().Stringer("peer", types.LogPeerShort(msg.ToWhom)).Err(msg.Err).Str("start", base58.Encode(task.hashes[0])).Int("count", task.count).Int("runqueue", bf.runningQueue.Len()).Msg("task finished with error")
                return task, nil
            }
        } else {
            //find finished peer
            if task.isMatched(msg.ToWhom, msg.Blocks, count) {
                bf.runningQueue.Remove(e)

                logger.Debug().Uint64("StartNo", task.startNo).Str("start", base58.Encode(task.hashes[0])).Int("count", task.count).Int("runqueue", bf.runningQueue.Len()).
                    Msg("task finished")

                return task, nil
            }
        }
    }

    return nil, &ErrSyncMsg{msg: msg}
}

func (bf *BlockFetcher) handleBlockRsp(msg interface{}) error {
    if bf == nil {
        return nil
    }

    bf.responseCh <- msg
    return nil
}

func (bf *BlockFetcher) stop() {
    if bf == nil {
        return
    }

    //logger.Info().Bool("isrunning", bf.isRunning).Bool("isnil", bf.quitCh== nil).Msg("BlockFetcher stop")

    if bf.isRunning {
        logger.Info().Msg("BlockFetcher stop#1")

        close(bf.quitCh)
        close(bf.hfCh)

        bf.waitGroup.Wait()
        bf.isRunning = false
    }
    logger.Info().Msg("BlockFetcher stopped")
}

func (stat *BlockFetcherStat) setMaxChunkRsp(lastBlock *types.Block) {
    curMaxRspBlock := stat.getMaxChunkRsp()

    if curMaxRspBlock == nil || curMaxRspBlock.GetHeader().BlockNo < lastBlock.GetHeader().BlockNo {
        stat.maxRspBlock.Store(lastBlock)
        logger.Debug().Uint64("no", lastBlock.GetHeader().BlockNo).Msg("last block chunk response")
    }
}

func (stat *BlockFetcherStat) setLastAddBlock(block *types.Block) {
    stat.lastAddBlock.Store(block)
    logger.Debug().Uint64("no", block.GetHeader().BlockNo).Msg("last block add response")
}

func (stat *BlockFetcherStat) getMaxChunkRsp() *types.Block {
    aopv := stat.maxRspBlock.Load()
    if aopv != nil {
        return aopv.(*types.Block)
    }

    return nil
}

func (stat *BlockFetcherStat) getLastAddBlock() *types.Block {
    aopv := stat.lastAddBlock.Load()
    if aopv != nil {
        return aopv.(*types.Block)
    }

    return nil
}

func newPeerSet() *PeerSet {
    ps := &PeerSet{}

    ps.freePeers = list.New()
    ps.badPeers = list.New()

    return ps
}

func (ps *PeerSet) isAllBad() bool {
    if ps.total == ps.bad {
        return true
    }

    return false
}

func (ps *PeerSet) addNew(peerID types.PeerID) {
    peerno := ps.total
    ps.pushFree(&SyncPeer{No: peerno, ID: peerID})
    ps.total++

    logger.Info().Stringer("peer", types.LogPeerShort(peerID)).Int("peerno", peerno).Int("no", ps.total).Msg("new peer added")
}

/*
func (ps *PeerSet) print() {

}
*/
func (ps *PeerSet) pushFree(freePeer *SyncPeer) {
    ps.freePeers.PushBack(freePeer)
    ps.free++

    logger.Info().Int("no", freePeer.No).Int("free", ps.free).Msg("free peer added")
}

func (ps *PeerSet) popFree() (*SyncPeer, error) {
    if ps.isAllBad() {
        logger.Error().Msg("all peers are bad")
        return nil, ErrAllPeerBad
    }

    elem := ps.freePeers.Front()
    if elem == nil {
        return nil, nil
    }

    ps.freePeers.Remove(elem)
    ps.free--

    if ps.freePeers.Len() != ps.free {
        panic(fmt.Sprintf("free peer len mismatch %d,%d", ps.freePeers.Len(), ps.free))
    }

    freePeer := elem.Value.(*SyncPeer)
    logger.Debug().Int("peerno", freePeer.No).Int("no", freePeer.No).Msg("free peer poped")
    return freePeer, nil
}

func (ps *PeerSet) processPeerFail(failPeer *SyncPeer, isErr bool) {
    //TODO handle connection closed
    failPeer.FailCnt++
    failPeer.IsErr = isErr

    logger.Error().Int("peerno", failPeer.No).Int("failcnt", failPeer.FailCnt).Int("maxfailcnt", MaxPeerFailCount).Bool("iserr", failPeer.IsErr).Msg("peer failed")

    if isErr || failPeer.FailCnt >= MaxPeerFailCount {
        ps.badPeers.PushBack(failPeer)
        ps.bad++

        if ps.badPeers.Len() != ps.bad {
            panic(fmt.Sprintf("bad peer len mismatch %d,%d", ps.badPeers.Len(), ps.bad))
        }

        logger.Error().Int("peerno", failPeer.No).Int("total", ps.total).Int("free", ps.free).Int("bad", ps.bad).Msg("peer move to bad")
    } else {
        ps.freePeers.PushBack(failPeer)
        ps.free++

        logger.Error().Int("peerno", failPeer.No).Int("total", ps.total).Int("free", ps.free).Int("bad", ps.bad).Msg("peer move to free")
    }
}

func (tq *TaskQueue) Pop() *FetchTask {
    elem := tq.Front()
    if elem == nil {
        return nil
    }

    tq.Remove(elem)
    return elem.Value.(*FetchTask)
}

func (tq *TaskQueue) Peek() *FetchTask {
    elem := tq.Front()
    if elem == nil {
        return nil
    }

    return elem.Value.(*FetchTask)
}

func (task *FetchTask) isTimeOut(now time.Time, timeout time.Duration) bool {
    if now.Sub(task.started) > timeout {
        logger.Info().Int("peerno", task.syncPeer.No).Uint64("startno", task.startNo).Str("start", base58.Encode(task.hashes[0])).Int("cout", task.count).Msg("FetchTask peer timeouted")
        return true
    }

    return false
}

func (task *FetchTask) isMatched(peerID types.PeerID, blocks []*types.Block, count int) bool {
    startHash, endHash := blocks[0].GetHash(), blocks[len(blocks)-1].GetHash()

    if task.count != count ||
        task.syncPeer.ID != peerID ||
        bytes.Compare(task.hashes[0], startHash) != 0 ||
        bytes.Compare(task.hashes[len(task.hashes)-1], endHash) != 0 {
        return false
    }

    for i, block := range blocks {
        if bytes.Compare(task.hashes[i], block.GetHash()) != 0 {
            logger.Info().Int("peerno", task.syncPeer.No).Str("hash", base58.Encode(task.hashes[0])).Int("idx", i).Msg("task hash mismatch")
            return false
        }
    }

    return true
}

func (task *FetchTask) isPeerMatched(peerID types.PeerID) bool {
    if task.syncPeer.ID == peerID {
        return true
    }

    return false
}