aergoio/aergo

View on GitHub
chain/chainhandle.go

Summary

Maintainability
F
6 days
Test Coverage
F
45%
/**
 *  @file
 *  @copyright defined in aergo/LICENSE.txt
 */

package chain

import (
    "bytes"
    "container/list"
    "context"
    "encoding/json"
    "errors"
    "fmt"
    "math/big"

    "github.com/aergoio/aergo/v2/consensus"
    "github.com/aergoio/aergo/v2/contract"
    "github.com/aergoio/aergo/v2/contract/name"
    "github.com/aergoio/aergo/v2/contract/system"
    "github.com/aergoio/aergo/v2/fee"
    "github.com/aergoio/aergo/v2/internal/enc/base58"
    "github.com/aergoio/aergo/v2/internal/enc/proto"
    "github.com/aergoio/aergo/v2/state"
    "github.com/aergoio/aergo/v2/state/statedb"
    "github.com/aergoio/aergo/v2/types"
    "github.com/aergoio/aergo/v2/types/message"
)

var (
    ErrorNoAncestor      = errors.New("not found ancestor")
    ErrBlockOrphan       = errors.New("block is orphan, so not connected in chain")
    ErrBlockCachedErrLRU = errors.New("block is in errored blocks cache")
    ErrStateNoMarker     = errors.New("statedb marker of block is not exists")

    errBlockStale       = errors.New("produced block becomes stale")
    errBlockInvalidFork = errors.New("invalid fork occurred")
    errBlockTimestamp   = errors.New("invalid timestamp")

    InAddBlock      = make(chan struct{}, 1)
    SendBlockReward = sendRewardCoinbase
)

type BlockRewardFn = func(*state.BlockState, []byte) error

type ErrReorg struct {
    err error
}

func (ec *ErrReorg) Error() string {
    return fmt.Sprintf("reorg failed. maybe need reconfiguration. error: %s", ec.err.Error())
}

type ErrBlock struct {
    err   error
    block *types.BlockInfo
}

func (ec *ErrBlock) Error() string {
    return fmt.Sprintf("Error: %s. block(%s, %d)", ec.err.Error(), base58.Encode(ec.block.Hash), ec.block.No)
}

type ErrTx struct {
    err error
    tx  *types.Tx
}

func (ec *ErrTx) Error() string {
    return fmt.Sprintf("error executing tx:%s, tx=%s", ec.err.Error(), base58.Encode(ec.tx.GetHash()))
}

func (cs *ChainService) getBestBlockNo() types.BlockNo {
    return cs.cdb.getBestBlockNo()
}

// GetGenesisInfo returns the information on the genesis block.
func (cs *ChainService) GetGenesisInfo() *types.Genesis {
    return cs.cdb.GetGenesisInfo()
}

func (cs *ChainService) GetBestBlock() (*types.Block, error) {
    return cs.cdb.GetBestBlock()
}

func (cs *ChainService) getBlockByNo(blockNo types.BlockNo) (*types.Block, error) {
    return cs.cdb.GetBlockByNo(blockNo)
}

func (cs *ChainService) GetBlock(blockHash []byte) (*types.Block, error) {
    return cs.getBlock(blockHash)
}

func (cs *ChainService) getBlock(blockHash []byte) (*types.Block, error) {
    return cs.cdb.getBlock(blockHash)
}

func (cs *ChainService) GetHashByNo(blockNo types.BlockNo) ([]byte, error) {
    return cs.getHashByNo(blockNo)
}

func (cs *ChainService) getHashByNo(blockNo types.BlockNo) ([]byte, error) {
    return cs.cdb.getHashByNo(blockNo)
}

func (cs *ChainService) getTx(txHash []byte) (*types.Tx, *types.TxIdx, error) {
    tx, txidx, err := cs.cdb.getTx(txHash)
    if err != nil {
        return nil, nil, err
    }
    block, err := cs.cdb.getBlock(txidx.BlockHash)
    blockInMainChain, err := cs.cdb.GetBlockByNo(block.Header.BlockNo)
    if !bytes.Equal(block.BlockHash(), blockInMainChain.BlockHash()) {
        return tx, nil, errors.New("tx is not in the main chain")
    }
    return tx, txidx, err
}

func (cs *ChainService) getReceipt(txHash []byte) (*types.Receipt, error) {
    tx, i, err := cs.cdb.getTx(txHash)
    if err != nil {
        return nil, err
    }

    block, err := cs.cdb.getBlock(i.BlockHash)
    blockInMainChain, err := cs.cdb.GetBlockByNo(block.Header.BlockNo)
    if !bytes.Equal(block.BlockHash(), blockInMainChain.BlockHash()) {
        return nil, errors.New("cannot find a receipt")
    }

    r, err := cs.cdb.getReceipt(block.BlockHash(), block.GetHeader().BlockNo, i.Idx, cs.cfg.Hardfork)
    if err != nil {
        return r, err
    }
    r.ContractAddress = types.AddressOrigin(r.ContractAddress)
    r.From = tx.GetBody().GetAccount()
    r.To = tx.GetBody().GetRecipient()
    return r, nil
}

func (cs *ChainService) getReceipts(blockHash []byte) (*types.Receipts, error) {
    block, err := cs.cdb.getBlock(blockHash)
    if err != nil {
        return nil, &ErrNoBlock{blockHash}
    }

    blockInMainChain, err := cs.cdb.GetBlockByNo(block.Header.BlockNo)
    if !bytes.Equal(block.BlockHash(), blockInMainChain.BlockHash()) {
        return nil, errors.New("cannot find a receipt")
    }

    receipts, err := cs.cdb.getReceipts(block.BlockHash(), block.GetHeader().BlockNo, cs.cfg.Hardfork)
    if err != nil {
        return nil, err
    }

    for idx, r := range receipts.Get() {
        r.SetMemoryInfo(blockHash, block.Header.BlockNo, int32(idx))

        r.ContractAddress = types.AddressOrigin(r.ContractAddress)

        for _, tx := range block.GetBody().GetTxs() {
            if bytes.Equal(r.GetTxHash(), tx.GetHash()) {
                r.From = tx.GetBody().GetAccount()
                r.To = tx.GetBody().GetRecipient()
                break
            }
        }
    }

    return receipts, nil
}

func (cs *ChainService) getReceiptsByNo(blockNo types.BlockNo) (*types.Receipts, error) {
    blockInMainChain, err := cs.cdb.GetBlockByNo(blockNo)
    if err != nil {
        return nil, &ErrNoBlock{blockNo}
    }

    block, err := cs.cdb.getBlock(blockInMainChain.BlockHash())
    if !bytes.Equal(block.BlockHash(), blockInMainChain.BlockHash()) {
        return nil, errors.New("cannot find a receipt")
    }

    receipts, err := cs.cdb.getReceipts(block.BlockHash(), block.GetHeader().BlockNo, cs.cfg.Hardfork)
    if err != nil {
        return nil, err
    }

    for idx, r := range receipts.Get() {
        r.SetMemoryInfo(blockInMainChain.BlockHash(), blockNo, int32(idx))

        r.ContractAddress = types.AddressOrigin(r.ContractAddress)

        for _, tx := range block.GetBody().GetTxs() {
            if bytes.Equal(r.GetTxHash(), tx.GetHash()) {
                r.From = tx.GetBody().GetAccount()
                r.To = tx.GetBody().GetRecipient()
                break
            }
        }
    }

    return receipts, nil
}

func (cs *ChainService) getEvents(events *[]*types.Event, blkNo types.BlockNo, filter *types.FilterInfo,
    argFilter []types.ArgFilter) uint64 {
    blkHash, err := cs.cdb.getHashByNo(blkNo)
    if err != nil {
        return 0
    }
    receipts, err := cs.cdb.getReceipts(blkHash, blkNo, cs.cfg.Hardfork)
    if err != nil {
        return 0
    }
    if receipts.BloomFilter(filter) == false {
        return 0
    }
    var totalSize uint64
    for idx, r := range receipts.Get() {
        if r.BloomFilter(filter) == false {
            continue
        }
        for _, e := range r.Events {
            if e.Filter(filter, argFilter) {
                e.SetMemoryInfo(r, blkHash, blkNo, int32(idx))
                *events = append(*events, e)
                totalSize += uint64(proto.Size(e))
            }
        }
    }
    return totalSize
}

const MaxEventSize = 4 * 1024 * 1024

func (cs *ChainService) listEvents(filter *types.FilterInfo) ([]*types.Event, error) {
    from := filter.Blockfrom
    to := filter.Blockto

    if filter.RecentBlockCnt > 0 {
        to = cs.cdb.getBestBlockNo()
        if to <= uint64(filter.RecentBlockCnt) {
            from = 0
        } else {
            from = to - uint64(filter.RecentBlockCnt)
        }
    } else {
        if to == 0 {
            to = cs.cdb.getBestBlockNo()
        }
    }
    err := filter.ValidateCheck(to)
    if err != nil {
        return nil, err
    }
    argFilter, err := filter.GetExArgFilter()
    if err != nil {
        return nil, err
    }
    events := []*types.Event{}
    var totalSize uint64
    if filter.Desc {
        for i := to; i >= from && i != 0; i-- {
            totalSize += cs.getEvents(&events, types.BlockNo(i), filter, argFilter)
            if totalSize > MaxEventSize {
                return nil, errors.New(fmt.Sprintf("too large size of event (%v)", totalSize))
            }
        }
    } else {
        for i := from; i <= to; i++ {
            totalSize += cs.getEvents(&events, types.BlockNo(i), filter, argFilter)
            if totalSize > MaxEventSize {
                return nil, errors.New(fmt.Sprintf("too large size of event (%v)", totalSize))
            }
        }
    }
    return events, nil
}

type chainProcessor struct {
    *ChainService
    block       *types.Block // starting block
    lastBlock   *types.Block
    state       *state.BlockState
    mainChain   *list.List
    isByBP      bool
    isMainChain bool

    add   func(blk *types.Block) error
    apply func(blk *types.Block) error
    run   func() error
}

func newChainProcessor(block *types.Block, state *state.BlockState, cs *ChainService) (*chainProcessor, error) {
    var isMainChain bool
    var err error

    if isMainChain, err = cs.cdb.isMainChain(block); err != nil {
        return nil, err
    }

    cp := &chainProcessor{
        ChainService: cs,
        block:        block,
        state:        state,
        isByBP:       state != nil,
        isMainChain:  isMainChain,
    }

    if cp.isMainChain {
        cp.apply = cp.execute
    } else {
        cp.apply = cp.addBlock
    }

    if cp.isByBP {
        cp.run = func() error {
            blk := cp.block
            cp.notifyBlockByBP(blk)
            return cp.apply(blk)
        }
    } else {
        cp.run = func() error {
            blk := cp.block

            for blk != nil {
                if err = cp.apply(blk); err != nil {
                    return err
                }

                // Remove a block depending on blk from the orphan cache.
                if blk, err = cp.resolveOrphan(blk); err != nil {
                    return err
                }
            }
            return nil
        }
    }

    return cp, nil
}

func (cp *chainProcessor) addBlock(blk *types.Block) error {
    dbTx := cp.cdb.store.NewTx()
    defer dbTx.Discard()

    if err := cp.cdb.addBlock(dbTx, blk); err != nil {
        return err
    }

    dbTx.Commit()

    if logger.IsDebugEnabled() {
        logger.Debug().Bool("isMainChain", cp.isMainChain).
            Uint64("latest", cp.cdb.getBestBlockNo()).
            Uint64("blockNo", blk.BlockNo()).
            Str("hash", blk.ID()).
            Str("prev_hash", base58.Encode(blk.GetHeader().GetPrevBlockHash())).
            Msg("block added to the block indices")
    }
    cp.lastBlock = blk

    return nil
}

func (cp *chainProcessor) notifyBlockByBP(block *types.Block) {
    if cp.isByBP {
        cp.notifyBlock(block, true)
    }
}

func (cp *chainProcessor) notifyBlockByOther(block *types.Block) {
    if !cp.isByBP {
        logger.Debug().Msg("notify block from other bp")
        cp.notifyBlock(block, false)
    }
}

func checkDebugSleep(isBP bool) {
    if isBP {
        _ = TestDebugger.Check(DEBUG_CHAIN_BP_SLEEP, 0, nil)
    } else {
        _ = TestDebugger.Check(DEBUG_CHAIN_OTHER_SLEEP, 0, nil)
    }
}

func (cp *chainProcessor) executeBlock(block *types.Block) error {
    checkDebugSleep(cp.isByBP)

    err := cp.ChainService.executeBlock(cp.state, block)
    cp.state = nil
    return err
}

func (cp *chainProcessor) execute(block *types.Block) error {
    if !cp.isMainChain {
        return nil
    }

    var err error

    err = cp.executeBlock(block)
    if err != nil {
        logger.Error().Str("error", err.Error()).Str("hash", block.ID()).
            Msg("failed to execute block")
        return err
    }
    //SyncWithConsensus :ga
    //     After executing MemPoolDel in the chain service, MemPoolGet must be executed on the consensus.
    //     To do this, cdb.setLatest() must be executed after MemPoolDel.
    //    In this case, messages of mempool is synchronized in actor message queue.
    if _, err = cp.connectToChain(block); err != nil {
        return err
    }

    cp.notifyBlockByOther(block)

    return nil
}

func (cp *chainProcessor) connectToChain(block *types.Block) (types.BlockNo, error) {
    dbTx := cp.cdb.store.NewTx()
    defer dbTx.Discard()

    // skip to add hash/block if wal of block is already written
    oldLatest := cp.cdb.connectToChain(dbTx, block, cp.isByBP && cp.HasWAL())
    if err := cp.cdb.addTxsOfBlock(&dbTx, block.GetBody().GetTxs(), block.BlockHash()); err != nil {
        return 0, err
    }

    dbTx.Commit()

    return oldLatest, nil
}

func (cp *chainProcessor) reorganize() error {
    // - Reorganize if new bestblock then process Txs
    // - Add block if new bestblock then update context connect next orphan
    if !cp.isMainChain && cp.needReorg(cp.lastBlock) {
        err := cp.reorg(cp.lastBlock, nil)
        if e, ok := err.(consensus.ErrorConsensus); ok {
            logger.Info().Err(e).Msg("reorg stopped by consensus error")
            return nil
        }

        if err != nil {
            logger.Info().Err(err).Msg("reorg stopped by unexpected error")
            return &ErrReorg{err: err}
        }
    }

    return nil
}

func (cs *ChainService) addBlockInternal(newBlock *types.Block, usedBState *state.BlockState, peerID types.PeerID) (err error, cache bool) {
    if !cs.VerifyTimestamp(newBlock) {
        return &ErrBlock{
            err: errBlockTimestamp,
            block: &types.BlockInfo{
                Hash: newBlock.BlockHash(),
                No:   newBlock.BlockNo(),
            },
        }, false
    }

    var (
        bestBlock  *types.Block
        savedBlock *types.Block
    )

    if bestBlock, err = cs.cdb.GetBestBlock(); err != nil {
        return err, false
    }

    // The newly produced block becomes stale because the more block(s) are
    // connected to the blockchain so that the best block is changed. In this
    // case, newBlock is rejected because it is unlikely that newBlock belongs
    // to the main branch. Warning: the condition 'usedBState != nil' is used
    // to check whether newBlock is produced by the current node itself. Later,
    // more explicit condition may be needed instead of this.
    if usedBState != nil && newBlock.PrevID() != bestBlock.ID() {
        return &ErrBlock{
            err: errBlockStale,
            block: &types.BlockInfo{
                Hash: newBlock.BlockHash(),
                No:   newBlock.BlockNo(),
            },
        }, false
    }

    //Fork should never occur in raft.
    checkFork := func(block *types.Block) error {
        if cs.IsForkEnable() {
            return nil
        }
        if usedBState != nil {
            return nil
        }

        savedBlock, err = cs.getBlockByNo(newBlock.GetHeader().GetBlockNo())
        if err == nil {
            /* TODO change to error after testing */
            logger.Fatal().Str("newblock", newBlock.ID()).Str("savedblock", savedBlock.ID()).Msg("drop block making invalid fork")
            return &ErrBlock{
                err:   errBlockInvalidFork,
                block: &types.BlockInfo{Hash: newBlock.BlockHash(), No: newBlock.BlockNo()},
            }
        }

        return nil
    }

    if err := checkFork(newBlock); err != nil {
        return err, false
    }

    if !newBlock.ValidChildOf(bestBlock) {
        return fmt.Errorf("invalid chain id - best: %v, current: %v",
            bestBlock.GetHeader().GetChainID(), newBlock.GetHeader().GetChainID()), false
    }

    if err := cs.VerifySign(newBlock); err != nil {
        return err, true
    }

    // handle orphan
    if cs.isOrphan(newBlock) {
        if usedBState != nil {
            return fmt.Errorf("block received from BP can not be orphan"), false
        }
        err := cs.handleOrphan(newBlock, bestBlock, peerID)
        if err == nil {
            return nil, false
        }

        return err, false
    }

    // try to acquire lock
    select {
    case InAddBlock <- struct{}{}:
    }
    defer func() {
        <-InAddBlock
    }()

    cp, err := newChainProcessor(newBlock, usedBState, cs)
    if err != nil {
        return err, true
    }

    if err := cp.run(); err != nil {
        return err, true
    }

    // TODO: reorganization should be done before chain execution to avoid an
    // unnecessary chain execution & rollback.
    if err := cp.reorganize(); err != nil {
        return err, true
    }

    logger.Info().Uint64("best", cs.cdb.getBestBlockNo()).Str("hash", newBlock.ID()).Msg("block added successfully")

    return nil, true
}

func (cs *ChainService) addBlock(newBlock *types.Block, usedBState *state.BlockState, peerID types.PeerID) error {
    hashID := types.ToHashID(newBlock.BlockHash())

    if cs.errBlocks.Contains(hashID) {
        return ErrBlockCachedErrLRU
    }

    var err error

    if cs.IsConnectedBlock(newBlock) {
        logger.Warn().Str("hash", newBlock.ID()).Uint64("no", newBlock.BlockNo()).Msg("block is already connected")
        return nil
    }

    var needCache bool
    err, needCache = cs.addBlockInternal(newBlock, usedBState, peerID)
    if err != nil {
        if needCache {
            evicted := cs.errBlocks.Add(hashID, newBlock)
            logger.Error().Err(err).Bool("evicted", evicted).Uint64("no", newBlock.GetHeader().BlockNo).
                Str("hash", newBlock.ID()).Msg("add errored block to errBlocks lru")
        }
        // err must be returned regardless of the value of needCache.
        return err
    }

    return nil
}

func (cs *ChainService) CountTxsInChain() int {
    var txCount int

    blk, err := cs.GetBestBlock()
    if err != nil {
        return -1
    }

    var no uint64
    for {
        no = blk.GetHeader().GetBlockNo()
        if no == 0 {
            break
        }

        txCount += len(blk.GetBody().GetTxs())

        blk, err = cs.getBlock(blk.GetHeader().GetPrevBlockHash())
        if err != nil {
            txCount = -1
            break
        }
    }

    return txCount
}

type TxExecFn func(bState *state.BlockState, tx types.Transaction) error
type ValidatePostFn func() error
type ValidateSignWaitFn func() error

type blockExecutor struct {
    *state.BlockState
    sdb              *state.ChainStateDB
    execTx           TxExecFn
    txs              []*types.Tx
    validatePost     ValidatePostFn
    coinbaseAccount  []byte
    commitOnly       bool
    verifyOnly       bool
    validateSignWait ValidateSignWaitFn
    bi               *types.BlockHeaderInfo
}

func newBlockExecutor(cs *ChainService, bState *state.BlockState, block *types.Block, verifyOnly bool) (*blockExecutor, error) {
    var exec TxExecFn
    var validateSignWait ValidateSignWaitFn
    var bi *types.BlockHeaderInfo

    commitOnly := false

    // The DPoS block factory executes transactions during block generation. In
    // such a case it sends block with block state so that bState != nil. On the
    // contrary, the block propagated from the network is not half-executed.
    // Hence, we need a new block state and tx executor (execTx).
    if bState == nil {
        if err := cs.validator.ValidateBlock(block); err != nil {
            return nil, err
        }

        bState = state.NewBlockState(
            cs.sdb.OpenNewStateDB(cs.sdb.GetRoot()),
            state.SetPrevBlockHash(block.GetHeader().GetPrevBlockHash()),
        )
        bi = types.NewBlockHeaderInfo(block)
        // FIXME currently the verify only function is allowed long execution time,
        exec = NewTxExecutor(context.Background(), cs.ChainConsensus, cs.cdb, bi, contract.ChainService)

        validateSignWait = func() error {
            return cs.validator.WaitVerifyDone()
        }
    } else {
        logger.Debug().Uint64("block no", block.BlockNo()).Msg("received block from block factory")
        // In this case (bState != nil), the transactions has already been
        // executed by the block factory.
        commitOnly = true
    }
    bState.SetGasPrice(system.GetGasPrice())
    bState.Receipts().SetHardFork(cs.cfg.Hardfork, block.BlockNo())

    return &blockExecutor{
        BlockState:      bState,
        sdb:             cs.sdb,
        execTx:          exec,
        txs:             block.GetBody().GetTxs(),
        coinbaseAccount: block.GetHeader().GetCoinbaseAccount(),
        validatePost: func() error {
            return cs.validator.ValidatePost(bState.GetRoot(), bState.Receipts(), block)
        },
        commitOnly:       commitOnly,
        verifyOnly:       verifyOnly,
        validateSignWait: validateSignWait,
        bi:               bi,
    }, nil
}

// NewTxExecutor returns a new TxExecFn.
func NewTxExecutor(execCtx context.Context, ccc consensus.ChainConsensusCluster, cdb contract.ChainAccessor, bi *types.BlockHeaderInfo, executionMode int) TxExecFn {
    return func(bState *state.BlockState, tx types.Transaction) error {
        if bState == nil {
            logger.Error().Msg("bstate is nil in txExec")
            return ErrGatherChain
        }
        if bi.ForkVersion < 0 {
            logger.Error().Err(ErrInvalidBlockHeader).Msgf("ChainID.ForkVersion = %d", bi.ForkVersion)
            return ErrInvalidBlockHeader
        }
        blockSnap := bState.Snapshot()

        err := executeTx(execCtx, ccc, cdb, bState, tx, bi, executionMode)
        if err != nil {
            logger.Error().Err(err).Str("hash", base58.Encode(tx.GetHash())).Msg("tx failed")
            if err2 := bState.Rollback(blockSnap); err2 != nil {
                logger.Panic().Err(err).Msg("failed to rollback block state")
            }

            return err
        }
        return nil
    }
}

func (e *blockExecutor) execute() error {
    // Receipt must be committed unconditionally.
    if !e.commitOnly {
        defer contract.CloseDatabase()
        logger.Trace().Int("txCount", len(e.txs)).Msg("executing txs")
        for _, tx := range e.txs {
            // execute the transaction
            if err := e.execTx(e.BlockState, types.NewTransaction(tx)); err != nil {
                //FIXME maybe system error. restart or panic
                // all txs have executed successfully in BP node
                return err
            }
        }

        if e.validateSignWait != nil {
            if err := e.validateSignWait(); err != nil {
                return err
            }
        }

        //TODO check result of verifying txs
        if err := SendBlockReward(e.BlockState, e.coinbaseAccount); err != nil {
            return err
        }

        if err := contract.SaveRecoveryPoint(e.BlockState); err != nil {
            return err
        }

        if err := e.Update(); err != nil {
            return err
        }
    }

    if err := e.validatePost(); err != nil {
        // TODO write verbose tx result if debug log is enabled
        return err
    }

    // TODO: sync status of bstate and cdb what to do if cdb.commit fails after

    if !e.verifyOnly {
        if err := e.commit(); err != nil {
            return err
        }
    }

    logger.Debug().Msg("block executor finished")
    return nil
}

func (e *blockExecutor) commit() error {
    if err := e.BlockState.Commit(); err != nil {
        return err
    }

    //TODO: after implementing BlockRootHash, remove statedb.lastest
    if err := e.sdb.UpdateRoot(e.BlockState); err != nil {
        return err
    }

    return nil
}

// TODO: Refactoring: batch
func (cs *ChainService) executeBlock(bstate *state.BlockState, block *types.Block) error {
    // Caution: block must belong to the main chain.
    logger.Debug().Str("hash", block.ID()).Uint64("no", block.GetHeader().BlockNo).Msg("start to execute")

    var (
        bestBlock *types.Block
        err       error
    )

    if bestBlock, err = cs.cdb.GetBestBlock(); err != nil {
        return err
    }

    // Check consensus info validity
    if err = cs.IsBlockValid(block, bestBlock); err != nil {
        return err
    }
    bstate = bstate.SetPrevBlockHash(block.GetHeader().GetPrevBlockHash())
    // TODO refactoring: receive execute function as argument (executeBlock or executeBlockReco)
    ex, err := newBlockExecutor(cs, bstate, block, false)
    if err != nil {
        return err
    }

    // contract & state DB update is done during execution.
    if err := ex.execute(); err != nil {
        cs.Update(bestBlock)
        return err
    }

    if len(ex.BlockState.Receipts().Get()) != 0 {
        cs.cdb.writeReceipts(block.BlockHash(), block.BlockNo(), ex.BlockState.Receipts())
    }

    cs.notifyEvents(block, ex.BlockState)

    cs.Update(block)

    logger.Debug().Uint64("no", block.GetHeader().BlockNo).Msg("end to execute")

    return nil
}

// verifyBlock execute block and verify state root but doesn't save data to database.
// ChainVerifier use this function.
func (cs *ChainService) verifyBlock(block *types.Block) error {
    var (
        err error
        ex  *blockExecutor
    )

    // Caution: block must belong to the main chain.
    logger.Debug().Str("hash", block.ID()).Uint64("no", block.GetHeader().BlockNo).Msg("start to verify")

    ex, err = newBlockExecutor(cs, nil, block, true)
    if err != nil {
        return err
    }

    // contract & state DB update is done during execution.
    if err = ex.execute(); err != nil {
        return err
    }

    // set root of sdb to block root hash
    if err = cs.sdb.SetRoot(block.GetHeader().GetBlocksRootHash()); err != nil {
        return fmt.Errorf("failed to set root of sdb(no=%d,hash=%v)", block.BlockNo(), block.ID())
    }

    logger.Debug().Uint64("no", block.GetHeader().BlockNo).Msg("end verify")

    return nil
}

// TODO: Refactoring: batch
func (cs *ChainService) executeBlockReco(_ *state.BlockState, block *types.Block) error {
    // Caution: block must belong to the main chain.
    logger.Debug().Str("hash", block.ID()).Uint64("no", block.GetHeader().BlockNo).Msg("start to execute for reco")

    var (
        bestBlock *types.Block
        err       error
    )

    if bestBlock, err = cs.cdb.GetBestBlock(); err != nil {
        return err
    }

    // Check consensus info validity
    // TODO remove bestblock
    if err = cs.IsBlockValid(block, bestBlock); err != nil {
        return err
    }

    if !cs.sdb.GetStateDB().HasMarker(block.GetHeader().GetBlocksRootHash()) {
        logger.Error().Str("hash", block.ID()).Uint64("no", block.GetHeader().GetBlockNo()).Msg("state marker does not exist")
        return ErrStateNoMarker
    }

    // move stateroot
    if err := cs.sdb.SetRoot(block.GetHeader().GetBlocksRootHash()); err != nil {
        return fmt.Errorf("failed to set sdb(branchRoot:no=%d,hash=%v)", block.GetHeader().GetBlockNo(),
            block.ID())
    }

    cs.Update(block)

    logger.Debug().Uint64("no", block.GetHeader().BlockNo).Msg("end to execute for reco")

    return nil
}

func (cs *ChainService) notifyEvents(block *types.Block, bstate *state.BlockState) {
    blkNo := block.GetHeader().GetBlockNo()
    blkHash := block.BlockHash()

    logger.Debug().Uint64("no", blkNo).Msg("add event from executed block")

    cs.RequestTo(message.MemPoolSvc, &message.MemPoolDel{
        Block: block,
    })

    cs.TellTo(message.RPCSvc, block)

    events := []*types.Event{}
    for idx, receipt := range bstate.Receipts().Get() {
        for _, e := range receipt.Events {
            e.SetMemoryInfo(receipt, blkHash, blkNo, int32(idx))
            events = append(events, e)
        }
    }

    if len(events) != 0 {
        cs.TellTo(message.RPCSvc, events)
    }
}

const maxRetSize = 1024

func adjustRv(ret string) string {
    if len(ret) > maxRetSize {
        modified, _ := json.Marshal(ret[:maxRetSize-4] + " ...")

        return string(modified)
    }
    return ret
}

func resetAccount(account *state.AccountState, fee *big.Int, nonce *uint64) error {
    account.Reset()
    if fee != nil {
        if account.Balance().Cmp(fee) < 0 {
            return &types.InternalError{Reason: "fee is greater than balance"}
        }
        account.SubBalance(fee)
    }
    if nonce != nil {
        account.SetNonce(*nonce)
    }
    return account.PutState()
}

func executeTx(execCtx context.Context, ccc consensus.ChainConsensusCluster, cdb contract.ChainAccessor, bs *state.BlockState, tx types.Transaction, bi *types.BlockHeaderInfo, executionMode int) error {
    var (
        txBody    = tx.GetBody()
        isQuirkTx = types.IsQuirkTx(tx.GetHash())
        account   []byte
        recipient []byte
        err       error
    )

    if account, err = name.Resolve(bs, txBody.GetAccount(), isQuirkTx); err != nil {
        return err
    }

    if tx.HasVerifedAccount() {
        txAcc := tx.GetVerifedAccount()
        tx.RemoveVerifedAccount()
        if !bytes.Equal(txAcc, account) {
            return types.ErrSignNotMatch
        }
    }

    err = tx.Validate(bi.ChainIdHash(), IsPublic())
    if err != nil {
        return err
    }

    sender, err := state.GetAccountState(account, bs.StateDB)
    if err != nil {
        return err
    }

    err = tx.ValidateWithSenderState(sender.State(), bs.GasPrice, bi.ForkVersion)
    if err != nil {
        return err
    }

    if recipient, err = name.Resolve(bs, txBody.Recipient, isQuirkTx); err != nil {
        return err
    }
    var receiver *state.AccountState
    status := "SUCCESS"
    if len(recipient) > 0 {
        receiver, err = state.GetAccountState(recipient, bs.StateDB)
        if receiver != nil && txBody.Type == types.TxType_REDEPLOY {
            status = "RECREATED"
            receiver.SetRedeploy()
        }
    } else {
        receiver, err = state.CreateAccountState(contract.CreateContractID(txBody.Account, txBody.Nonce), bs.StateDB)
        status = "CREATED"
    }
    if err != nil {
        return err
    }

    var txFee *big.Int
    var rv string
    var events []*types.Event
    switch txBody.Type {
    case types.TxType_NORMAL, types.TxType_REDEPLOY, types.TxType_TRANSFER, types.TxType_CALL, types.TxType_DEPLOY:
        rv, events, txFee, err = contract.Execute(execCtx, bs, cdb, tx.GetTx(), sender, receiver, bi, executionMode, false)
        sender.SubBalance(txFee)
    case types.TxType_GOVERNANCE:
        txFee = new(big.Int).SetUint64(0)
        events, err = executeGovernanceTx(ccc, bs, txBody, sender, receiver, bi)
        if err != nil {
            logger.Warn().Err(err).Str("txhash", base58.Encode(tx.GetHash())).Msg("governance tx Error")
        }
    case types.TxType_FEEDELEGATION:
        err = tx.ValidateMaxFee(receiver.Balance(), bs.GasPrice, bi.ForkVersion)
        if err != nil {
            return err
        }

        var contractState *statedb.ContractState
        contractState, err = statedb.OpenContractState(receiver.ID(), receiver.State(), bs.StateDB)
        if err != nil {
            return err
        }
        err = contract.CheckFeeDelegation(recipient, bs, bi, cdb, contractState, txBody.GetPayload(),
            tx.GetHash(), txBody.GetAccount(), txBody.GetAmount())
        if err != nil {
            if err != types.ErrNotAllowedFeeDelegation {
                logger.Warn().Err(err).Str("txhash", base58.Encode(tx.GetHash())).Msg("checkFeeDelegation Error")
                return err
            }
            return types.ErrNotAllowedFeeDelegation
        }
        rv, events, txFee, err = contract.Execute(execCtx, bs, cdb, tx.GetTx(), sender, receiver, bi, executionMode, true)
        receiver.SubBalance(txFee)
    }

    if err != nil {
        // Reset events on error
        if bi.ForkVersion >= 3 {
            events = nil
        }

        if !contract.IsRuntimeError(err) {
            return err
        }
        if txBody.Type != types.TxType_FEEDELEGATION || sender.AccountID() == receiver.AccountID() {
            sErr := resetAccount(sender, txFee, &txBody.Nonce)
            if sErr != nil {
                return sErr
            }
        } else {
            sErr := resetAccount(sender, nil, &txBody.Nonce)
            if sErr != nil {
                return sErr
            }
            sErr = resetAccount(receiver, txFee, nil)
            if sErr != nil {
                return sErr
            }
        }
        status = "ERROR"
        rv = err.Error()
    } else {
        if txBody.Type != types.TxType_FEEDELEGATION {
            if sender.Balance().Sign() < 0 {
                return &types.InternalError{Reason: "fee is greater than balance"}
            }
        } else {
            if receiver.Balance().Sign() < 0 {
                return &types.InternalError{Reason: "fee is greater than balance"}
            }
        }
        sender.SetNonce(txBody.Nonce)
        err = sender.PutState()
        if err != nil {
            return err
        }
        if sender.AccountID() != receiver.AccountID() {
            err = receiver.PutState()
            if err != nil {
                return err
            }
        }
        rv = adjustRv(rv)
    }
    bs.BpReward.Add(&bs.BpReward, txFee)

    receipt := types.NewReceipt(receiver.ID(), status, rv)
    receipt.FeeUsed = txFee.Bytes()
    receipt.TxHash = tx.GetHash()
    receipt.Events = events
    receipt.FeeDelegation = txBody.Type == types.TxType_FEEDELEGATION
    isGovernance := txBody.Type == types.TxType_GOVERNANCE
    receipt.GasUsed = fee.ReceiptGasUsed(bi.ForkVersion, isGovernance, txFee, bs.GasPrice)

    return bs.AddReceipt(receipt)
}

func DecorateBlockRewardFn(fn BlockRewardFn) {
    SendBlockReward = func(bState *state.BlockState, coinbaseAccount []byte) error {
        if err := fn(bState, coinbaseAccount); err != nil {
            return err
        }

        return sendRewardCoinbase(bState, coinbaseAccount)
    }
}

func sendRewardCoinbase(bState *state.BlockState, coinbaseAccount []byte) error {
    bpReward := &bState.BpReward
    if bpReward.Cmp(new(big.Int).SetUint64(0)) <= 0 || coinbaseAccount == nil {
        logger.Debug().Str("reward", bpReward.String()).Msg("coinbase is skipped")
        return nil
    }

    // add bp reward to coinbase account
    coinbaseAccountState, err := state.GetAccountState(coinbaseAccount, bState.StateDB)
    if err != nil {
        return err
    }
    coinbaseAccountState.AddBalance(bpReward)
    err = coinbaseAccountState.PutState()
    if err != nil {
        return err
    }

    logger.Debug().Str("reward", bpReward.String()).
        Str("newbalance", coinbaseAccountState.Balance().String()).Msg("send reward to coinbase account")

    return nil
}

// find an orphan block which is the child of the added block
func (cs *ChainService) resolveOrphan(block *types.Block) (*types.Block, error) {
    hash := block.BlockHash()

    orphanID := types.ToBlockID(hash)
    orphan, exists := cs.op.cache[orphanID]
    if !exists {
        return nil, nil
    }

    orphanBlock := orphan.Block

    if (block.GetHeader().GetBlockNo() + 1) != orphanBlock.GetHeader().GetBlockNo() {
        return nil, fmt.Errorf("invalid orphan block no (p=%d, c=%d)", block.GetHeader().GetBlockNo(),
            orphanBlock.GetHeader().GetBlockNo())
    }

    logger.Info().Str("parent", block.ID()).
        Str("orphan", orphanBlock.ID()).
        Msg("connect orphan")

    if err := cs.op.removeOrphan(orphanID); err != nil {
        return nil, err
    }

    return orphanBlock, nil
}

func (cs *ChainService) isOrphan(block *types.Block) bool {
    prevhash := block.Header.PrevBlockHash
    _, err := cs.getBlock(prevhash)

    return err != nil
}

func (cs *ChainService) handleOrphan(block *types.Block, bestBlock *types.Block, peerID types.PeerID) error {
    err := cs.addOrphan(block)
    if err != nil {
        logger.Error().Err(err).Str("hash", block.ID()).Msg("add orphan block failed")

        return err
    }

    cs.RequestTo(message.SyncerSvc, &message.SyncStart{PeerID: peerID, TargetNo: block.GetHeader().GetBlockNo()})

    return nil
}

func (cs *ChainService) addOrphan(block *types.Block) error {
    return cs.op.addOrphan(block)
}

func (cs *ChainService) findAncestor(Hashes [][]byte) (*types.BlockInfo, error) {
    // 1. check endpoint is on main chain (or, return nil)
    logger.Debug().Int("len", len(Hashes)).Msg("find ancestor")

    var mainhash []byte
    var mainblock *types.Block
    var err error
    // 2. get the highest block of Hashes hash on main chain
    for _, hash := range Hashes {
        // need to be short
        mainblock, err = cs.cdb.getBlock(hash)
        if err != nil {
            mainblock = nil
            continue
        }
        // get main hash with same block height
        mainhash, err = cs.cdb.getHashByNo(
            types.BlockNo(mainblock.GetHeader().GetBlockNo()))
        if err != nil {
            mainblock = nil
            continue
        }

        if bytes.Equal(mainhash, mainblock.BlockHash()) {
            break
        }
        mainblock = nil
    }

    // TODO: handle the case that can't find the hash in main chain
    if mainblock == nil {
        logger.Debug().Msg("Can't search same ancestor")
        return nil, ErrorNoAncestor
    }

    return &types.BlockInfo{Hash: mainblock.BlockHash(), No: mainblock.GetHeader().GetBlockNo()}, nil
}

func (cs *ChainService) setSkipMempool(isSync bool) {
    //don't use mempool if sync is in progress
    cs.validator.signVerifier.SetSkipMempool(isSync)
}