aergoio/aergo

View on GitHub
consensus/impl/dpos/dpos.go

Summary

Maintainability
B
4 hrs
Test Coverage
F
4%
/**
 *  @file
 *  @copyright defined in aergo/LICENSE.txt
 */

package dpos

import (
    "context"
    "encoding/binary"
    "encoding/json"
    "fmt"
    "math/big"
    "time"

    "github.com/aergoio/aergo-lib/log"
    "github.com/aergoio/aergo/v2/chain"
    "github.com/aergoio/aergo/v2/config"
    "github.com/aergoio/aergo/v2/consensus"
    "github.com/aergoio/aergo/v2/consensus/impl/dpos/bp"
    "github.com/aergoio/aergo/v2/consensus/impl/dpos/slot"
    "github.com/aergoio/aergo/v2/contract/system"
    "github.com/aergoio/aergo/v2/p2p/p2pkey"
    "github.com/aergoio/aergo/v2/pkg/component"
    "github.com/aergoio/aergo/v2/state"
    "github.com/aergoio/aergo/v2/state/statedb"
    "github.com/aergoio/aergo/v2/types"
)

var (
    logger = log.NewLogger("dpos")

    // blockProducers is the number of block producers
    blockProducers          uint16
    majorityCount           uint16
    initialBpElectionPeriod types.BlockNo

    lastJob = &lastSlot{}
)

type lastSlot struct {
    //    sync.Mutex
    s *slot.Slot
}

func (l *lastSlot) get() *slot.Slot {
    //    l.Lock()
    //    defer l.Unlock()
    return l.s
}

func (l *lastSlot) set(s *slot.Slot) {
    //    l.Lock()
    //    defer l.Unlock()
    l.s = s
}

// DPoS is the main data structure of DPoS consensus
type DPoS struct {
    *Status
    consensus.ChainDB
    *component.ComponentHub
    bpc  *bp.Cluster
    bf   *BlockFactory
    quit chan interface{}
}

// Status shows DPoS consensus's current status
type bpInfo struct {
    consensus.ChainDB
    bestBlock *types.Block
    slot      *slot.Slot
}

func (bi *bpInfo) updateBestBlock() *types.Block {
    block, _ := bi.GetBestBlock()
    if block != nil {
        bi.bestBlock = block
    }

    return block
}

type bfWork struct {
    execCtx context.Context
    bpi     *bpInfo
}

// GetName returns the name of the consensus.
func GetName() string {
    return consensus.ConsensusName[consensus.ConsensusDPOS]
}

// GetConstructor build and returns consensus.Constructor from New function.
func GetConstructor(cfg *config.Config, hub *component.ComponentHub, cdb consensus.ChainDB,
    sdb *state.ChainStateDB) consensus.Constructor {
    return func() (consensus.Consensus, error) {
        return New(cfg, hub, cdb, sdb)
    }
}

func getStateDB(cfg *config.Config, cdb consensus.ChainDB, sdb *state.ChainStateDB) (*statedb.StateDB, error) {
    if cfg.Blockchain.VerifyOnly {
        vprInitBlockNo := func(blockNo types.BlockNo) types.BlockNo {
            if blockNo == 0 {
                return blockNo
            }
            return blockNo - 1
        }

        // Initialize the voting power ranking.
        if block, err := cdb.GetBlockByNo(vprInitBlockNo(cfg.Blockchain.VerifyBlock)); err != nil {
            return nil, err
        } else {
            return sdb.OpenNewStateDB(block.GetHeader().GetBlocksRootHash()), nil
        }
    }
    return sdb.GetStateDB(), nil
}

// New returns a new DPos object
func New(cfg *config.Config, hub *component.ComponentHub, cdb consensus.ChainDB,
    sdb *state.ChainStateDB) (consensus.Consensus, error) {

    chain.DecorateBlockRewardFn(sendVotingReward)

    bpc, err := bp.NewCluster(cdb)
    if err != nil {
        return nil, err
    }

    var state *statedb.StateDB
    if state, err = getStateDB(cfg, cdb, sdb); err != nil {
        return nil, err
    }

    if err = InitVPR(state); err != nil {
        return nil, err
    }

    Init(bpc.Size())

    quitC := make(chan interface{})

    return &DPoS{
        Status:       NewStatus(bpc, cdb, sdb, cfg.Blockchain.ForceResetHeight),
        ComponentHub: hub,
        ChainDB:      cdb,
        bpc:          bpc,
        bf:           NewBlockFactory(hub, sdb, quitC, cfg.Hardfork, cfg.Consensus.NoTimeoutTxEviction),
        quit:         quitC,
    }, nil
}

func sendVotingReward(bState *state.BlockState, dummy []byte) error {
    vrSeed := func(stateRoot []byte) int64 {
        return int64(binary.LittleEndian.Uint64(stateRoot))
    }

    // calc reward
    vaultAccountState, err := state.GetAccountState([]byte(types.AergoVault), bState.StateDB)
    if err != nil {
        logger.Info().Err(err).Msg("skip voting reward")
        return nil
    }
    vaultBalance := vaultAccountState.Balance()
    if vaultBalance.Cmp(types.NewZeroAmount()) == 0 {
        return nil
    }
    reward := system.GetVotingRewardAmount()
    if vaultBalance.Cmp(reward) < 0 {
        reward = new(big.Int).Set(vaultBalance)
    }

    // pick winner
    winner, err := system.PickVotingRewardWinner(vrSeed(bState.PrevBlockHash()))
    if err != nil {
        logger.Debug().Err(err).Msg("no voting reward winner")
        return nil
    }
    winnerAccountState, err := state.GetAccountState(winner, bState.StateDB)
    if err != nil {
        logger.Info().Err(err).Msg("skip voting reward")
        return nil
    }

    // send reward ( vault -> winner )
    err = state.SendBalance(vaultAccountState, winnerAccountState, reward)
    if err != nil {
        logger.Info().Err(err).Msg("send voting reward failed")
        return nil
    }

    if err = winnerAccountState.PutState(); err != nil {
        return err
    }
    if err = vaultAccountState.PutState(); err != nil {
        return err
    }

    bState.SetConsensus(winner)

    logger.Debug().
        Str("address", types.EncodeAddress(winner)).
        Str("amount", reward.String()).
        Str("new balance", winnerAccountState.Balance().String()).
        Str("vault balance", vaultBalance.String()).
        Msg("voting reward winner appointed")

    return nil
}

func InitVPR(sdb *statedb.StateDB) error {
    s, err := statedb.GetSystemAccountState(sdb)
    if err != nil {
        return err
    }
    return system.InitVotingPowerRank(s)
}

// Init initializes the DPoS parameters.
func Init(bpCount uint16) {
    blockProducers = bpCount
    majorityCount = blockProducers*2/3 + 1
    // Collect voting for BPs during 10 rounds.
    initialBpElectionPeriod = types.BlockNo(blockProducers) * 10
    slot.Init(consensus.BlockIntervalSec)
}

// Ticker returns a time.Ticker for the main consensus loop.
func (dpos *DPoS) Ticker() *time.Ticker {
    return time.NewTicker(tickDuration())
}

func tickDuration() time.Duration {
    return consensus.BlockInterval / 100
}

// QueueJob send a block triggering information to jq.
func (dpos *DPoS) QueueJob(now time.Time, jq chan<- interface{}) {
    bpi := dpos.getBpInfo(now)
    if bpi != nil {
        jq <- bpi
        lastJob.set(bpi.slot)
    }
}

// BlockFactory returns the BlockFactory interface in dpos.
func (dpos *DPoS) BlockFactory() consensus.BlockFactory {
    return dpos.bf
}

func (dpos *DPoS) GetType() consensus.ConsensusType {
    return consensus.ConsensusDPOS
}

// IsTransactionValid checks the DPoS consensus level validity of a transaction
func (dpos *DPoS) IsTransactionValid(tx *types.Tx) bool {
    // TODO: put a transaction validity check code here.
    return true
}

// QuitChan returns the channel from which consensus-related goroutines check when
// shutdown is initiated.
func (dpos *DPoS) QuitChan() chan interface{} {
    return dpos.quit
}

func (dpos *DPoS) bpid() types.PeerID {
    return p2pkey.NodeID()
}

// VerifyTimestamp checks the validity of the block timestamp.
func (dpos *DPoS) VerifyTimestamp(block *types.Block) bool {

    if ts := block.GetHeader().GetTimestamp(); slot.NewFromUnixNano(ts).IsFuture() {
        logger.Error().Str("BP", block.BPID2Str()).Str("id", block.ID()).
            Time("timestamp", time.Unix(0, ts)).Msg("block has a future timestamp")
        return false
    }

    // Reject the blocks with no <= LIB since it cannot lead to a
    // reorganization.
    if dpos.Status != nil && block.BlockNo() <= dpos.libNo() {
        logger.Error().Str("BP", block.BPID2Str()).Str("id", block.ID()).
            Uint64("block no", block.BlockNo()).Uint64("lib no", dpos.libNo()).
            Msg("too small block number (<= LIB number)")
        return false
    }

    return true
}

// VerifySign reports the validity of the block signature.
func (dpos *DPoS) VerifySign(block *types.Block) error {
    valid, err := block.VerifySign()
    if !valid || err != nil {
        return &consensus.ErrorConsensus{Msg: "bad block signature", Err: err}
    }
    return nil
}

// IsBlockValid checks the DPoS consensus level validity of a block
func (dpos *DPoS) IsBlockValid(block *types.Block, bestBlock *types.Block) error {
    id, err := block.BPID()
    if err != nil {
        return &consensus.ErrorConsensus{Msg: "bad public key in block", Err: err}
    }

    idx := dpos.bpc.BpID2Index(id)
    ns := block.GetHeader().GetTimestamp()
    s := slot.NewFromUnixNano(ns)
    // Check whether the BP ID is one of the current BP members and its
    // corresponding BP index is consistent with the block timestamp.
    if !s.IsFor(idx, dpos.bpc.Size()) {
        return &consensus.ErrorConsensus{
            Msg: fmt.Sprintf("BP %v (idx: %v) is not permitted for the time slot %v (%v)",
                block.BPID2Str(), idx, time.Unix(0, ns), s.NextBpIndex(dpos.bpc.Size())),
        }
    }

    return nil
}

func (dpos *DPoS) bpIdx() bp.Index {
    return dpos.bpc.BpID2Index(dpos.bpid())
}

func (dpos *DPoS) getBpInfo(now time.Time) *bpInfo {
    s := slot.Time(now)

    if !s.IsFor(dpos.bpIdx(), dpos.bpc.Size()) {
        return nil
    }

    // already queued slot.
    if slot.Equal(s, lastJob.get()) {
        return nil
    }

    block, _ := dpos.GetBestBlock()
    if block == nil {
        return nil
    }

    if !isBpTiming(block, s) {
        return nil
    }

    return &bpInfo{
        ChainDB:   dpos.ChainDB,
        bestBlock: block,
        slot:      s,
    }
}

// ConsensusInfo returns the basic DPoS-related info.
func (dpos *DPoS) ConsensusInfo() *types.ConsensusInfo {
    withLock := func(fn func()) {
        dpos.RLock()
        defer dpos.RUnlock()
        fn()
    }

    ci := &types.ConsensusInfo{Type: GetName()}
    withLock(func() {
        ci.Bps = dpos.bpc.BPs()

    })

    if dpos.done {
        var lpbNo types.BlockNo

        withLock(func() {
            lpbNo = dpos.lpbNo()
        })

        if lpbNo > 0 {
            if block, err := dpos.GetBlockByNo(lpbNo); err == nil {
                type lpbInfo struct {
                    BPID      string
                    Height    types.BlockNo
                    Hash      string
                    Timestamp string
                }
                s := struct {
                    NodeID              string
                    RecentBlockProduced lpbInfo
                }{
                    NodeID: dpos.bf.ID,
                    RecentBlockProduced: lpbInfo{
                        BPID:      block.BPID2Str(),
                        Height:    lpbNo,
                        Hash:      block.ID(),
                        Timestamp: block.Localtime().String(),
                    },
                }
                if m, err := json.Marshal(s); err == nil {
                    ci.Info = string(m)
                }
            }
        }
    }

    return ci
}

var dummyRaft consensus.DummyRaftAccessor

func (dpos *DPoS) RaftAccessor() consensus.AergoRaftAccessor {
    return &dummyRaft
}

func isBpTiming(block *types.Block, s *slot.Slot) bool {
    blockSlot := slot.NewFromUnixNano(block.Header.Timestamp)
    // The block corresponding to the current slot has already been generated.
    if slot.LessEqual(s, blockSlot) {
        return false
    }

    // Check whether the remaining time is enough until the next block
    // generation time.
    if !slot.IsNextTo(s, blockSlot) && !s.TimesUp() {
        return false
    }

    timeLeft := s.RemainingTimeMS()
    if timeLeft < 0 {
        logger.Debug().Int64("remaining time", timeLeft).Msg("no time left to produce block")
        return false
    }

    return true
}

func (dpos *DPoS) NeedNotify() bool {
    return true
}

func (dpos *DPoS) HasWAL() bool {
    return false
}

func (dpos *DPoS) IsForkEnable() bool {
    return true
}

func (dpos *DPoS) IsConnectedBlock(block *types.Block) bool {
    _, err := dpos.ChainDB.GetBlock(block.BlockHash())
    if err == nil {
        return true
    }

    return false
}

func (dpos *DPoS) ConfChange(req *types.MembershipChange) (*consensus.Member, error) {
    return nil, consensus.ErrNotSupportedMethod
}

func (dpos *DPoS) ConfChangeInfo(requestID uint64) (*types.ConfChangeProgress, error) {
    return nil, consensus.ErrNotSupportedMethod
}

func (dpos *DPoS) MakeConfChangeProposal(req *types.MembershipChange) (*consensus.ConfChangePropose, error) {
    return nil, consensus.ErrNotSupportedMethod
}

func (dpos *DPoS) ClusterInfo(bestBlockHash []byte) *types.GetClusterInfoResponse {
    return &types.GetClusterInfoResponse{ChainID: nil, Error: consensus.ErrNotSupportedMethod.Error(), MbrAttrs: nil, HardStateInfo: nil}
}

func ValidateGenesis(genesis *types.Genesis) error {
    return nil
}