aergoio/aergo

View on GitHub
syncer/blockprocessor.go

Summary

Maintainability
A
1 hr
Test Coverage
B
85%
package syncer

import (
    "bytes"
    "fmt"
    "sort"

    "github.com/aergoio/aergo/v2/chain"
    "github.com/aergoio/aergo/v2/internal/enc/base58"
    "github.com/aergoio/aergo/v2/pkg/component"
    "github.com/aergoio/aergo/v2/types"
    "github.com/aergoio/aergo/v2/types/message"
)

type BlockProcessor struct {
    compRequester component.IComponentRequester //for communicate with other service

    blockFetcher *BlockFetcher

    curConnRequest *ConnectTask

    connQueue []*ConnectTask

    prevBlock *types.Block
    curBlock  *types.Block

    targetBlockNo types.BlockNo
    name          string
}

type ConnectTask struct {
    FromPeer types.PeerID
    Blocks   []*types.Block
    firstNo  types.BlockNo
    cur      int
}

func NewBlockProcessor(compRequester component.IComponentRequester, blockFetcher *BlockFetcher, ancestor *types.Block,
    targetNo types.BlockNo) *BlockProcessor {
    return &BlockProcessor{
        compRequester: compRequester,
        blockFetcher:  blockFetcher,
        prevBlock:     ancestor,
        targetBlockNo: targetNo,
        name:          NameBlockProcessor,
    }
}

func (bproc *BlockProcessor) run(msg interface{}) error {
    //TODO in test mode, if syncer receives invalid messages, syncer stop with panic()
    switch msg.(type) {
    case *message.GetBlockChunksRsp:
        if err := bproc.GetBlockChunkRsp(msg.(*message.GetBlockChunksRsp)); err != nil {
            return err
        }
    case *message.AddBlockRsp:
        if err := bproc.AddBlockResponse(msg.(*message.AddBlockRsp)); err != nil {
            return err
        }

        chain.TestDebugger.Check(chain.DEBUG_SYNCER_CRASH, 2, nil)
    default:
        return fmt.Errorf("invalid msg type:%T", msg)
    }

    return nil
}

func (bproc *BlockProcessor) isValidResponse(msg interface{}) error {
    validateBlockChunksRsp := func(msg *message.GetBlockChunksRsp) error {
        var prev []byte
        blocks := msg.Blocks

        if msg.Err != nil {
            logger.Error().Err(msg.Err).Msg("GetBlockChunksRsp has error")
            return msg.Err
        }

        if blocks == nil || len(blocks) == 0 {
            logger.Error().Err(msg.Err).Stringer("peer", types.LogPeerShort(msg.ToWhom)).Msg("GetBlockChunksRsp is empty")
            return &ErrSyncMsg{msg: msg, str: "blocks is empty"}
        }

        for _, block := range blocks {
            if prev != nil && !bytes.Equal(prev, block.GetHeader().GetPrevBlockHash()) {
                logger.Error().Stringer("peer", types.LogPeerShort(msg.ToWhom)).Msg("GetBlockChunksRsp hashes inconsistent")
                return &ErrSyncMsg{msg: msg, str: "blocks hash not matched"}
            }

            prev = block.GetHash()
        }
        return nil
    }

    validateAddBlockRsp := func(msg *message.AddBlockRsp) error {
        if msg.Err != nil {
            return msg.Err
        }

        if msg.BlockHash == nil {
            return &ErrSyncMsg{msg: msg, str: "invalid add block resonse"}
        }

        return nil
    }

    switch msg.(type) {
    case *message.GetBlockChunksRsp:
        if err := validateBlockChunksRsp(msg.(*message.GetBlockChunksRsp)); err != nil {
            return err
        }

    case *message.AddBlockRsp:
        if err := validateAddBlockRsp(msg.(*message.AddBlockRsp)); err != nil {
            return err
        }

    default:
        return fmt.Errorf("invalid msg type:%T", msg)
    }

    return nil
}

func (bproc *BlockProcessor) GetBlockChunkRsp(msg *message.GetBlockChunksRsp) error {
    if err := bproc.isValidResponse(msg); err != nil {
        return bproc.GetBlockChunkRspError(msg, err)
    }

    bf := bproc.blockFetcher

    logger.Debug().Stringer("peer", types.LogPeerShort(msg.ToWhom)).Uint64("startNo", msg.Blocks[0].GetHeader().BlockNo).Int("count", len(msg.Blocks)).Msg("received GetBlockChunkRsp")

    task, err := bf.findFinished(msg, false)
    if err != nil {
        //TODO invalid peer
        logger.Error().Stringer("peer", types.LogPeerShort(msg.ToWhom)).
            Int("count", len(msg.Blocks)).
            Str("from", base58.Encode(msg.Blocks[0].GetHash())).
            Str("to", base58.Encode(msg.Blocks[len(msg.Blocks)-1].GetHash())).
            Msg("dropped unknown block response message")
        return nil
    }

    bf.pushFreePeer(task.syncPeer)

    bf.stat.setMaxChunkRsp(msg.Blocks[len(msg.Blocks)-1])

    bproc.addConnectTask(msg)

    return nil
}

func (bproc *BlockProcessor) GetBlockChunkRspError(msg *message.GetBlockChunksRsp, err error) error {
    bf := bproc.blockFetcher

    logger.Error().Err(err).Stringer("peer", types.LogPeerShort(msg.ToWhom)).Msg("receive GetBlockChunksRsp with error message")

    task, err := bf.findFinished(msg, true)
    if err != nil {
        //TODO invalid peer
        logger.Error().Err(err).Stringer("peer", types.LogPeerShort(msg.ToWhom)).Msg("dropped unknown block error message")
        return nil
    }

    if err := bf.processFailedTask(task, false); err != nil {
        return err
    }

    return nil
}

func (bproc *BlockProcessor) AddBlockResponse(msg *message.AddBlockRsp) error {
    if err := bproc.isValidResponse(msg); err != nil {
        logger.Info().Err(err).Uint64("no", msg.BlockNo).Str("hash", base58.Encode(msg.BlockHash)).Msg("block connect failed")
        return err
    }

    curBlock := bproc.curBlock
    curNo := curBlock.GetHeader().BlockNo
    curHash := curBlock.GetHash()

    if curNo != msg.BlockNo || !bytes.Equal(curHash, msg.BlockHash) {
        logger.Error().Uint64("curNo", curNo).Uint64("msgNo", msg.BlockNo).
            Str("curHash", base58.Encode(curHash)).Str("msgHash", base58.Encode(msg.BlockHash)).
            Msg("invalid add block response")
        return &ErrSyncMsg{msg: msg, str: "drop unknown add response"}
    }

    logger.Info().Uint64("no", msg.BlockNo).Str("hash", base58.Encode(msg.BlockHash)).Msg("block connect succeed")

    bproc.blockFetcher.stat.setLastAddBlock(curBlock)

    if curBlock.BlockNo() == bproc.targetBlockNo {
        logger.Info().Msg("succeed to add last block, request stopping syncer")
        stopSyncer(bproc.compRequester, bproc.blockFetcher.GetSeq(), bproc.name, nil)
    }

    bproc.prevBlock = curBlock
    bproc.curBlock = nil

    block := bproc.getNextBlockToConnect()

    if block != nil {
        bproc.connectBlock(block)
    }

    return nil
}

func (bproc *BlockProcessor) addConnectTask(msg *message.GetBlockChunksRsp) {
    req := &ConnectTask{FromPeer: msg.ToWhom, Blocks: msg.Blocks, firstNo: msg.Blocks[0].GetHeader().BlockNo, cur: 0}

    logger.Debug().Uint64("firstno", req.firstNo).Int("count", len(req.Blocks)).Msg("add connect task to queue")

    bproc.pushToConnQueue(req)

    block := bproc.getNextBlockToConnect()

    if block != nil {
        bproc.connectBlock(block)
    }
}

func (bproc *BlockProcessor) getNextBlockToConnect() *types.Block {
    //already prev request is running, don't request any more
    if bproc.curBlock != nil {
        return nil
    }

    //request next block of current Request
    if bproc.curConnRequest != nil {
        req := bproc.curConnRequest
        req.cur++

        if req.cur >= len(req.Blocks) {
            logger.Debug().Msg("current connect task is finished")
            bproc.curConnRequest = nil
        }
    }

    //pop from pending request
    if bproc.curConnRequest == nil {
        nextReq := bproc.popFromConnQueue()
        if nextReq == nil {
            return nil
        }

        bproc.curConnRequest = nextReq
    }

    next := bproc.curConnRequest.cur
    nextBlock := bproc.curConnRequest.Blocks[next]

    logger.Debug().Uint64("no", nextBlock.GetHeader().BlockNo).Str("hash", nextBlock.ID()).
        Int("idx in req", next).Msg("next block to connect")

    bproc.curBlock = nextBlock

    return nextBlock
}

func (bproc *BlockProcessor) connectBlock(block *types.Block) {
    if block == nil {
        return
    }

    logger.Info().Uint64("no", block.GetHeader().BlockNo).
        Str("hash", base58.Encode(block.GetHash())).
        Msg("request connecting block to chainsvc")

    bproc.compRequester.RequestTo(message.ChainSvc, &message.AddBlock{PeerID: "", Block: block, Bstate: nil, IsSync: true})
}

func (bproc *BlockProcessor) pushToConnQueue(newReq *ConnectTask) {
    sortedList := bproc.connQueue

    index := sort.Search(len(sortedList), func(i int) bool { return sortedList[i].firstNo > newReq.firstNo })
    sortedList = append(sortedList, &ConnectTask{})
    copy(sortedList[index+1:], sortedList[index:])
    sortedList[index] = newReq

    bproc.connQueue = sortedList

    logger.Info().Int("len", len(bproc.connQueue)).Uint64("firstno", newReq.firstNo).
        Str("firstHash", base58.Encode(newReq.Blocks[0].GetHash())).
        Msg("add new task to connect queue")
}

func (bproc *BlockProcessor) popFromConnQueue() *ConnectTask {
    sortedList := bproc.connQueue
    if len(sortedList) == 0 {
        logger.Debug().Msg("connect queue is empty. so wait new connect task")
        return nil
    }

    //check if first task is next block
    firstReq := sortedList[0]
    if bproc.prevBlock != nil &&
        firstReq.firstNo != (bproc.prevBlock.BlockNo()+1) {
        logger.Debug().Uint64("first", firstReq.firstNo).Uint64("prev", bproc.prevBlock.BlockNo()).Msg("next block is not fetched yet")
        return nil
    }

    newReq := sortedList[0]
    sortedList = sortedList[1:]
    bproc.connQueue = sortedList

    logger.Info().Int("len", len(sortedList)).Uint64("firstno", newReq.firstNo).
        Str("firstHash", base58.Encode(newReq.Blocks[0].GetHash())).
        Msg("pop task from connect queue")

    return newReq
}