aergoio/aergo

View on GitHub
consensus/chain/tx.go

Summary

Maintainability
B
5 hrs
Test Coverage
F
7%
/**
 *  @file
 *  @copyright defined in aergo/LICENSE.txt
 */

package chain

import (
    "context"
    "errors"
    "time"

    "github.com/aergoio/aergo-lib/log"
    "github.com/aergoio/aergo/v2/chain"
    "github.com/aergoio/aergo/v2/contract"
    "github.com/aergoio/aergo/v2/internal/enc/proto"
    "github.com/aergoio/aergo/v2/pkg/component"
    "github.com/aergoio/aergo/v2/state"
    "github.com/aergoio/aergo/v2/types"
    "github.com/aergoio/aergo/v2/types/message"
)

var (
    // ErrBestBlock indicates that the best block is being changed in
    // chainservice soon.
    ErrBestBlock = errors.New("best block changed in chainservice")

    logger = log.NewLogger("consensus")
)

// FetchTXs requests to mempool and returns types.Tx array.
func FetchTXs(hs component.ICompSyncRequester, maxBlockBodySize uint32) []types.Transaction {
    //bf.RequestFuture(message.MemPoolSvc, &message.MemPoolGenerateSampleTxs{MaxCount: 3}, time.Second)
    result, err := hs.RequestFuture(message.MemPoolSvc,
        &message.MemPoolGet{MaxBlockBodySize: maxBlockBodySize}, time.Second,
        "consensus/util/info.FetchTXs").Result()
    if err != nil {
        logger.Info().Err(err).Msg("can't fetch transactions from mempool")
        return make([]types.Transaction, 0)
    }

    return result.(*message.MemPoolGetRsp).Txs
}

// TxOp is an interface used by GatherTXs for apply some transaction related operation.
type TxOp interface {
    Apply(bState *state.BlockState, tx types.Transaction) error
}

// TxOpFn is the type of arguments for CompositeTxDo.
type TxOpFn func(bState *state.BlockState, tx types.Transaction) error

// Apply applies f to tx.
func (f TxOpFn) Apply(bState *state.BlockState, tx types.Transaction) error {
    return f(bState, tx)
}

// NewCompTxOp returns a function which applies each function in fn.
func NewCompTxOp(fn ...TxOp) TxOp {
    return TxOpFn(func(bState *state.BlockState, tx types.Transaction) error {
        for _, f := range fn {
            var err error
            if err = f.Apply(bState, tx); err != nil {
                return err
            }
        }

        // If TxOp executes tx, it has a resulting BlockState. The final
        // BlockState must be sent to the chain service receiver.
        return nil
    })
}

func newBlockLimitOp(maxBlockBodySize uint32) TxOpFn {
    // Caution: the closure below captures the local variable 'size.' Generate
    // it whenever needed. Don't reuse it!
    size := 0
    return TxOpFn(func(bState *state.BlockState, tx types.Transaction) error {
        if size += proto.Size(tx.GetTx()); uint32(size) > maxBlockBodySize {
            return errBlockSizeLimit
        }
        return nil
    })
}

// Lock acquires the chain lock in a blocking mode.
func Lock() {
    chain.InAddBlock <- struct{}{}
}

// LockNonblock acquires the chain lock in a non-blocking mode. It returns
// ErrBestBlock upon failure.
func LockNonblock() error {
    select {
    case chain.InAddBlock <- struct{}{}:
        return nil
    default:
        return ErrBestBlock
    }
}

// Unlock release the chain lock.
func Unlock() {
    <-chain.InAddBlock
}

// GatherTXs returns transactions from txIn. The selection is done by applying
// txDo.
func (g *BlockGenerator) GatherTXs() ([]types.Transaction, error) {
    var (
        bState = g.bState

        nCollected int
        nCand      int
    )

    if logger.IsDebugEnabled() {
        logger.Debug().Msg("start gathering tx")
    }

    if err := LockNonblock(); err != nil {
        return nil, ErrBestBlock
    }
    defer Unlock()

    txIn := g.fetchTXs(g.hs, g.maxBlockBodySize)
    nCand = len(txIn)

    txRes := make([]types.Transaction, 0, nCand)

    defer func() {
        logger.Info().
            Int("candidates", nCand).
            Int("collected", nCollected).
            Msg("transactions collected")
        contract.CloseDatabase()
    }()

    // block generation timeout check. this function works like BlockFactory#checkBpTimeout()
    checkBGTimeout := NewCompTxOp(
        TxOpFn(func(bState *state.BlockState, txIn types.Transaction) error {
            select {
            case <-g.ctx.Done():
                // TODO use function Cause() for precise control, later. cause can be used in go1.20 and later
                causeErr := g.ctx.Err()
                //causeErr := context.Cause(g.ctx)
                switch causeErr {
                case context.Canceled: // Only quitting of Aergo triggers Canceled error for now.
                    return ErrQuit
                default:
                    return ErrTimeout{Kind: "block"}
                }
            default:
                return nil
            }
        }),
    )

    if nCand > 0 {
        op := NewCompTxOp(checkBGTimeout, g.txOp)

        for i, tx := range txIn {
            // process the transaction
            err := op.Apply(bState, tx)

            //don't include tx that error is occurred
            if e, ok := err.(ErrTimeout); ok {
                logger.Debug().Msg("finishing gathering tx due to time limit")
                err = e
                break
            } else if cause, ok := err.(*contract.VmTimeoutError); ok {
                logger.Debug().Msg("stop gathering tx and cancel last tx due to time limit")
                // Mark the rejected TX by timeout. The marked TX will be
                // forced to be the first TX of the next block. By doing this,
                // the TX may have a chance to use the maximum block execution
                // time. If the TX is rejected by timeout even with this, it
                // may be evicted from the mempool after checking the actual
                // execution time.
                if g.tteEnabled() {
                    g.setRejected(tx, cause, i == 0)
                }

                err = ErrTimeout{Kind: "contract"}

                break
            } else if err == errBlockSizeLimit {
                if logger.IsDebugEnabled() {
                    logger.Debug().Msg("stop gathering tx due to size limit")
                }
                break
            } else if err != nil {
                logger.Debug().Err(err).Int("idx", i).Stringer("hash", types.LogBase58(tx.GetHash())).Msg("skip error tx")

                //FIXME handling system error (panic?)
                // ex) gas error/nonce error skip, but other system error panic
                continue
            }

            txRes = append(txRes, tx)
        }

        nCollected = len(txRes)
    }

    // Warning: This line must be run even with 0 gathered TXs, since the
    // function below includes voting reward as well as BP reward.
    if err := chain.SendBlockReward(bState, chain.CoinbaseAccount); err != nil {
        return nil, err
    }

    if err := contract.SaveRecoveryPoint(bState); err != nil {
        return nil, err
    }

    if err := bState.Update(); err != nil {
        return nil, err
    }

    return txRes, nil
}