aergoio/aergo

View on GitHub
chain/reorg.go

Summary

Maintainability
B
4 hrs
Test Coverage
C
72%
package chain

import (
    "bytes"
    "errors"
    "fmt"
    "time"

    "github.com/aergoio/aergo/v2/consensus"
    "github.com/aergoio/aergo/v2/contract/system"
    "github.com/aergoio/aergo/v2/internal/enc/base58"
    "github.com/aergoio/aergo/v2/state"
    "github.com/aergoio/aergo/v2/state/statedb"
    "github.com/aergoio/aergo/v2/types"
    "github.com/aergoio/aergo/v2/types/message"
)

const (
    initBlkCount = 20
)

var (
    ErrInvalidReorgMarker = errors.New("reorg marker is invalid")
    ErrMarkerNil          = errors.New("reorg marker is nil")
)

type reorganizer struct {
    //input info
    cs         *ChainService
    bestBlock  *types.Block
    brTopBlock *types.Block //branch top block

    //collected info from chain
    brStartBlock *types.Block
    newBlocks    []*types.Block //roll forward target blocks
    oldBlocks    []*types.Block //roll back target blocks

    marker *ReorgMarker

    recover bool

    gatherFn       func() error
    gatherPostFn   func()
    executeBlockFn func(bstate *state.BlockState, block *types.Block) error
}

type ErrReorgBlock struct {
    msg string

    blockNo   uint64
    blockHash []byte
}

func (ec *ErrReorgBlock) Error() string {
    if ec.blockHash != nil {
        return fmt.Sprintf("%s, block:%d,%s", ec.msg, ec.blockNo, base58.Encode(ec.blockHash))
    } else if ec.blockNo != 0 {
        return fmt.Sprintf("%s, block:%d", ec.msg, ec.blockNo)
    } else {
        return fmt.Sprintf("%s", ec.msg)
    }
}

var (
    ErrInvalidBranchRoot  = errors.New("best block can't be branch root block")
    ErrGatherChain        = errors.New("new/old blocks must exist")
    ErrNotExistBranchRoot = errors.New("branch root block doesn't exist")
    ErrInvalidSwapChain   = errors.New("New chain is not longer than old chain")
    ErrInvalidBlockHeader = errors.New("invalid block header")

    errMsgNoBlock         = "block not found in the chain DB"
    errMsgInvalidOldBlock = "rollback target is not valid"
)

func (cs *ChainService) needReorg(block *types.Block) bool {
    cdb := cs.cdb
    blockNo := block.BlockNo()

    latest := cdb.getBestBlockNo()
    isNeed := latest < blockNo

    if isNeed {
        logger.Debug().
            Uint64("blockNo", blockNo).
            Uint64("latestNo", latest).
            Str("prev", block.ID()).
            Msg("need reorganization")
    }

    return isNeed
}

func newReorganizer(cs *ChainService, topBlock *types.Block, marker *ReorgMarker) (*reorganizer, error) {
    isReco := (marker != nil)

    reorg := &reorganizer{
        cs:         cs,
        brTopBlock: topBlock,
        newBlocks:  make([]*types.Block, 0, initBlkCount),
        oldBlocks:  make([]*types.Block, 0, initBlkCount),
        recover:    isReco,
        marker:     marker,
    }

    if isReco {
        marker.setCDB(reorg.cs.cdb)

        if err := reorg.initRecovery(marker); err != nil {
            return nil, err
        }

        reorg.gatherFn = reorg.gatherReco
        reorg.gatherPostFn = nil
        reorg.executeBlockFn = cs.executeBlockReco
    } else {
        reorg.gatherFn = reorg.gather
        reorg.gatherPostFn = reorg.newMarker
        reorg.executeBlockFn = cs.executeBlock
    }

    TestDebugger.Check(DEBUG_CHAIN_RANDOM_STOP, 0, nil)

    return reorg, nil
}

// TODO: gather delete request of played tx (1 msg)
func (cs *ChainService) reorg(topBlock *types.Block, marker *ReorgMarker) error {
    logger.Info().Uint64("blockNo", topBlock.GetHeader().GetBlockNo()).Str("hash", topBlock.ID()).
        Bool("recovery", (marker != nil)).Msg("reorg started")

    begT := time.Now()

    reorg, err := newReorganizer(cs, topBlock, marker)
    if err != nil {
        logger.Error().Err(err).Msg("new reorganizer failed")
        return err
    }

    err = reorg.gatherFn()
    if err != nil {
        return err
    }

    if reorg.gatherPostFn != nil {
        reorg.gatherPostFn()
    }

    if !cs.NeedReorganization(reorg.brStartBlock.BlockNo()) {
        return consensus.ErrorConsensus{Msg: "reorganization rejected by consensus"}
    }

    err = reorg.rollback()
    if err != nil {
        return err
    }

    //it's possible to occur error while executing branch block (forgery)
    if err := reorg.rollforward(); err != nil {
        return err
    }

    if err := reorg.swapChain(); err != nil {
        switch ec := err.(type) {
        case *ErrDebug:
            return ec
        }
        logger.Fatal().Err(err).Msg("reorg failed while swapping chain, it can't recover")
        return err
    }

    cs.stat.updateEvent(ReorgStat, time.Since(begT), reorg.oldBlocks[0], reorg.newBlocks[0], reorg.brStartBlock)
    systemStateDB, err := statedb.GetSystemAccountState(cs.SDB().GetStateDB())
    system.InitSystemParams(systemStateDB, system.RESET)
    logger.Info().Msg("reorg end")

    return nil
}

func (reorg *reorganizer) initRecovery(marker *ReorgMarker) error {
    var startBlock, bestBlock, topBlock *types.Block
    var err error

    if marker == nil {
        return ErrMarkerNil
    }

    topBlock = reorg.brTopBlock

    cdb := reorg.cs.cdb

    logger.Info().Str("marker", marker.toString()).Msg("new reorganizer")

    if startBlock, err = cdb.getBlock(marker.BrStartHash); err != nil {
        return err
    }

    if bestBlock, err = cdb.getBlock(marker.BrBestHash); err != nil {
        return err
    }

    if bestBlock.GetHeader().GetBlockNo() >= topBlock.GetHeader().GetBlockNo() ||
        startBlock.GetHeader().GetBlockNo() >= bestBlock.GetHeader().GetBlockNo() ||
        startBlock.GetHeader().GetBlockNo() >= topBlock.GetHeader().GetBlockNo() {
        return ErrInvalidReorgMarker
    }

    reorg.brStartBlock = startBlock
    reorg.bestBlock = bestBlock

    return nil
}

func (reorg *reorganizer) newMarker() {
    if reorg.marker != nil {
        return
    }

    reorg.marker = NewReorgMarker(reorg)
}

// swap oldchain to newchain oneshot (best effort)
//   - chain height mapping
//   - tx mapping
//   - best block
func (reorg *reorganizer) swapChain() error {
    logger.Info().Msg("swap chain to new branch")

    if err := TestDebugger.Check(DEBUG_CHAIN_STOP, 1, nil); err != nil {
        return err
    }

    if err := reorg.marker.write(); err != nil {
        return err
    }

    if err := TestDebugger.Check(DEBUG_CHAIN_STOP, 2, nil); err != nil {
        return err
    }

    reorg.deleteOldReceipts()

    //TODO batch notification of rollforward blocks

    if err := reorg.swapTxMapping(); err != nil {
        return err
    }

    if err := reorg.swapChainMapping(); err != nil {
        return err
    }

    if err := TestDebugger.Check(DEBUG_CHAIN_STOP, 3, nil); err != nil {
        return err
    }

    reorg.marker.delete()

    return nil
}

// swapChainMapping swaps chain meta from org chain to side chain and deleting reorg marker.
// it should be executed by 1 tx to be atomic.
func (reorg *reorganizer) swapChainMapping() error {
    cdb := reorg.cs.cdb

    logger.Info().Msg("swap chain mapping for new branch")

    best, err := cdb.GetBestBlock()
    if err != nil {
        return err
    }

    if reorg.recover && bytes.Equal(best.GetHash(), reorg.brTopBlock.GetHash()) {
        logger.Warn().Msg("swap of chain mapping has already finished")
        return nil
    }

    if err := cdb.swapChainMapping(reorg.newBlocks); err != nil {
        return err
    }

    return nil
}

func (reorg *reorganizer) swapTxMapping() error {
    // newblock/oldblock
    // push mempool (old - tx)
    cs := reorg.cs
    cdb := cs.cdb

    var oldTxs = make(map[types.TxID]*types.Tx)

    for _, oldBlock := range reorg.oldBlocks {
        for _, tx := range oldBlock.GetBody().GetTxs() {
            oldTxs[types.ToTxID(tx.GetHash())] = tx
        }
    }

    var overwrap int

    // insert new tx mapping
    for i := len(reorg.newBlocks) - 1; i >= 0; i-- {
        newBlock := reorg.newBlocks[i]

        for _, tx := range newBlock.GetBody().GetTxs() {
            if _, ok := oldTxs[types.ToTxID(tx.GetHash())]; ok {
                overwrap++
                delete(oldTxs, types.ToTxID(tx.GetHash()))
            }
        }

        dbTx := cs.cdb.store.NewTx()

        if err := cdb.addTxsOfBlock(&dbTx, newBlock.GetBody().GetTxs(), newBlock.BlockHash()); err != nil {
            dbTx.Discard()
            return err
        }

        dbTx.Commit()
    }

    // delete old tx mapping
    bulk := cdb.store.NewBulk()
    defer bulk.DiscardLast()

    for _, oldTx := range oldTxs {
        bulk.Delete(oldTx.Hash)
    }

    bulk.Flush()

    //add rollbacked Tx to mempool (except played tx in roll forward)
    count := len(oldTxs)
    logger.Debug().Int("tx count", count).Int("overwrapped count", overwrap).Msg("tx add to mempool")

    if count > 0 {
        //txs := make([]*types.Tx, 0, count)

        for _, tx := range oldTxs {
            //            logger.Debug().Str("txID", txID.String()).Msg("tx added")
            //            txs = append(txs, tx)
            cs.RequestTo(message.MemPoolSvc, &message.MemPoolPut{
                Tx: tx,
            })
        }
        //    cs.RequestTo(message.MemPoolSvc, &message.MemPoolPut{
        //        Txs: txs,
        //    })
    }
    return nil
}

func (reorg *reorganizer) dumpOldBlocks() {
    for _, block := range reorg.oldBlocks {
        logger.Debug().Str("hash", block.ID()).Uint64("blockNo", block.GetHeader().GetBlockNo()).
            Msg("dump rollback block")
    }
}

// Find branch root and gather rollforward/rollback target blocks
func (reorg *reorganizer) gather() error {
    //find branch root block , gather rollforward Target block
    var err error
    cdb := reorg.cs.cdb

    bestBlock, err := cdb.GetBestBlock()
    if err != nil {
        return err
    }
    reorg.bestBlock = bestBlock

    brBlock := reorg.brTopBlock
    brBlockNo := brBlock.BlockNo()

    curBestNo := cdb.getBestBlockNo()

    for {
        if brBlockNo <= curBestNo {
            mainBlock, err := cdb.GetBlockByNo(brBlockNo)
            // One must be able to look up any main chain block by its block
            // no from the chain DB.
            if err != nil {
                return &ErrReorgBlock{errMsgNoBlock, brBlockNo, nil}
            }

            //found branch root
            if bytes.Equal(brBlock.BlockHash(), mainBlock.BlockHash()) {
                if curBestNo == brBlockNo {
                    return ErrInvalidBranchRoot
                }
                if len(reorg.newBlocks) == 0 || len(reorg.oldBlocks) == 0 {
                    return ErrGatherChain
                }
                reorg.brStartBlock = brBlock

                logger.Debug().Str("hash", brBlock.ID()).Uint64("blockNo", brBlockNo).
                    Msg("found branch root block")

                return nil
            }

            //gather rollback target
            logger.Debug().Str("hash", mainBlock.ID()).Uint64("blockNo", brBlockNo).
                Msg("gather rollback target")
            reorg.oldBlocks = append(reorg.oldBlocks, mainBlock)
        }

        if brBlockNo <= 0 {
            break
        }

        //gather rollforward target
        logger.Debug().Str("hash", brBlock.ID()).Uint64("blockNo", brBlockNo).
            Msg("gather rollforward target")
        reorg.newBlocks = append(reorg.newBlocks, brBlock)

        //get prev block from branch
        if brBlock, err = cdb.getBlock(brBlock.GetHeader().GetPrevBlockHash()); err != nil {
            return err
        }

        prevBrBlockNo := brBlock.GetHeader().GetBlockNo()
        if brBlockNo-1 != prevBrBlockNo {
            return &ErrReorgBlock{errMsgInvalidOldBlock, prevBrBlockNo, brBlock.BlockHash()}
        }
        brBlockNo = brBlock.GetHeader().GetBlockNo()
    }

    return ErrNotExistBranchRoot
}

// build reorg chain info from marker
func (reorg *reorganizer) gatherReco() error {
    var err error

    cdb := reorg.cs.cdb

    startBlock := reorg.brStartBlock
    bestBlock := reorg.bestBlock
    topBlock := reorg.brTopBlock

    reorg.brStartBlock = startBlock
    reorg.bestBlock = bestBlock

    gatherBlocksToStart := func(top *types.Block, stage string) ([]*types.Block, error) {
        blocks := make([]*types.Block, 0)

        for tmpBlk := top; tmpBlk.GetHeader().GetBlockNo() > startBlock.GetHeader().GetBlockNo(); {
            blocks = append(blocks, tmpBlk)

            logger.Debug().Str("stage", stage).Str("hash", tmpBlk.ID()).Uint64("blockNo", tmpBlk.GetHeader().GetBlockNo()).
                Msg("gather target for reco")

            if tmpBlk, err = cdb.getBlock(tmpBlk.GetHeader().GetPrevBlockHash()); err != nil {
                return blocks, err
            }
        }

        return blocks, nil
    }

    reorg.oldBlocks, err = gatherBlocksToStart(bestBlock, "rollback")
    if err != nil {
        return err
    }

    reorg.newBlocks, err = gatherBlocksToStart(topBlock, "rollforward")
    if err != nil {
        return err
    }

    return nil
}

func (reorg *reorganizer) rollback() error {
    brStartBlock := reorg.brStartBlock
    brStartBlockNo := brStartBlock.GetHeader().GetBlockNo()

    logger.Info().Str("hash", brStartBlock.ID()).Uint64("no", brStartBlockNo).Msg("rollback chain to branch start block")

    if err := reorg.cs.sdb.SetRoot(brStartBlock.GetHeader().GetBlocksRootHash()); err != nil {
        return fmt.Errorf("failed to rollback sdb(branchRoot:no=%d,hash=%v)", brStartBlockNo,
            brStartBlock.ID())
    }

    reorg.cs.Update(brStartBlock)

    return nil
}

func (reorg *reorganizer) deleteOldReceipts() {
    dbTx := reorg.cs.cdb.NewTx()
    for _, blk := range reorg.oldBlocks {
        reorg.cs.cdb.deleteReceipts(&dbTx, blk.GetHash(), blk.BlockNo())
    }
    dbTx.Commit()
}

/*
rollforward

    rollforwardBlock
    add oldTxs to mempool
*/
func (reorg *reorganizer) rollforward() error {
    //cs := reorg.cs

    logger.Info().Bool("recover", reorg.recover).Msg("rollforward chain started")

    for i := len(reorg.newBlocks) - 1; i >= 0; i-- {
        newBlock := reorg.newBlocks[i]
        newBlockNo := newBlock.GetHeader().GetBlockNo()

        if err := reorg.executeBlockFn(nil, newBlock); err != nil {
            logger.Error().Bool("recover", reorg.recover).Str("hash", newBlock.ID()).Uint64("no", newBlockNo).
                Msg("failed to execute block in reorg")
            return err
        }
    }

    return nil
}