consensus/impl/raftv2/raftserver.go
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package raftv2
import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"runtime/debug"
"strconv"
"sync"
"time"
"github.com/aergoio/aergo/v2/chain"
"github.com/aergoio/aergo/v2/consensus"
"github.com/aergoio/aergo/v2/internal/enc/proto"
"github.com/aergoio/aergo/v2/p2p/p2pcommon"
"github.com/aergoio/aergo/v2/pkg/component"
"github.com/aergoio/aergo/v2/types"
"github.com/aergoio/aergo/v2/types/message"
"github.com/aergoio/etcd/etcdserver/stats"
etcdtypes "github.com/aergoio/etcd/pkg/types"
raftlib "github.com/aergoio/etcd/raft"
"github.com/aergoio/etcd/raft/raftpb"
"github.com/aergoio/etcd/rafthttp"
"github.com/aergoio/etcd/snap"
)
const (
HasNoLeader uint64 = 0
)
// noinspection ALL
var (
raftLogger raftlib.Logger
)
var (
ErrRaftNotReady = errors.New("raft library is not initialized")
ErrCCAlreadyApplied = errors.New("conf change entry is already applied")
ErrInvalidMember = errors.New("member of conf change is invalid")
ErrCCAlreadyAdded = errors.New("member has already added")
ErrCCAlreadyRemoved = errors.New("member has already removed")
ErrCCNoMemberToRemove = errors.New("there is no member to remove")
ErrEmptySnapshot = errors.New("received empty snapshot")
ErrInvalidRaftIdentity = errors.New("raft identity is not set")
ErrProposeNilBlock = errors.New("proposed block is nil")
)
const (
BackendP2P = "aergop2p"
BackendHTTP = "http"
)
func init() {
raftLogger = NewRaftLogger(logger)
}
// A key-value stream backed by raft
// A key-value stream backed by raft
type raftServer struct {
*component.ComponentHub
sync.RWMutex
pa p2pcommon.PeerAccessor
cluster *Cluster
confChangeC <-chan *consensus.ConfChangePropose // proposed cluster config changes
commitC chan *commitEntry // entries committed to log (k,v)
errorC chan error // errors from raft session
id uint64 // client ID for raft session
join bool // node is joining an existing cluster
UseBackup bool // use backup chaindb datafiles to join a existing cluster
getSnapshot func() ([]byte, error)
lastIndex uint64 // index of log at start
snapshotIndex uint64
appliedIndex uint64
// raft backing for the commit/error channel
node raftlib.Node
raftStorage *raftlib.MemoryStorage
//wal *wal.WAL
walDB *WalDB
snapshotter *ChainSnapshotter
snapFrequency uint64
transport Transporter
stopc chan struct{} // signals proposal channel closed
curTerm uint64
leaderStatus LeaderStatus
promotable bool
tickMS time.Duration
confState *raftpb.ConfState
commitProgress CommitProgress
}
type LeaderStatus struct {
sync.RWMutex
Leader uint64
Term uint64
leaderChanged uint64
IsLeader bool
}
type commitEntry struct {
block *types.Block
index uint64
term uint64
}
func (ce *commitEntry) IsReadyMarker() bool {
return ce.block == nil
}
type CommitProgress struct {
sync.Mutex
connect commitEntry // last connected entry to chain
request commitEntry // last requested entry to commitC
}
func (cp *CommitProgress) UpdateConnect(ce *commitEntry) {
logger.Debug().Uint64("term", ce.term).Uint64("index", ce.index).Uint64("no", ce.block.BlockNo()).Str("hash", ce.block.ID()).Msg("set progress of last connected block")
cp.Lock()
defer cp.Unlock()
cp.connect = *ce
}
func (cp *CommitProgress) UpdateRequest(ce *commitEntry) {
logger.Debug().Uint64("term", ce.term).Uint64("index", ce.index).Uint64("no", ce.block.BlockNo()).Str("hash", ce.block.ID()).Msg("set progress of last request block")
cp.Lock()
defer cp.Unlock()
cp.request = *ce
}
func (cp *CommitProgress) GetConnect() *commitEntry {
cp.Lock()
defer cp.Unlock()
return &cp.connect
}
func (cp *CommitProgress) GetRequest() *commitEntry {
cp.Lock()
defer cp.Unlock()
return &cp.request
}
func (cp *CommitProgress) IsReadyToPropose() bool {
cp.Lock()
defer cp.Unlock()
if cp.request.block == nil {
return true
}
var connNo, reqNo uint64
reqNo = cp.request.block.BlockNo()
if cp.connect.block != nil {
connNo = cp.connect.block.BlockNo()
}
if reqNo <= connNo {
return true
}
logger.Debug().Uint64("requested", reqNo).Uint64("connected", connNo).Msg("remain pending request to connect")
return false
}
func RecoverExit() {
if r := recover(); r != nil {
logger.Error().Str("callstack", string(debug.Stack())).Msg("panic occurred in raft server")
os.Exit(10)
}
}
func makeConfig(nodeID uint64, storage *raftlib.MemoryStorage) *raftlib.Config {
c := &raftlib.Config{
ID: nodeID,
ElectionTick: ElectionTickCount,
HeartbeatTick: 1,
Storage: storage,
MaxSizePerMsg: 1024 * 1024,
MaxInflightMsgs: 256,
Logger: raftLogger,
CheckQuorum: true,
DisableProposalForwarding: true,
}
return c
}
// newRaftServer initiates a raft instance and returns a committed log entry
// channel and error channel. Proposals for log updates are sent over the
// provided the proposal channel. All log entries are replayed over the
// commit channel, followed by a nil message (to indicate the channel is
// current), then new log entries. To shutdown, close proposeC and read errorC.
func newRaftServer(hub *component.ComponentHub,
cluster *Cluster,
join bool, useBackup bool,
getSnapshot func() ([]byte, error),
tickMS time.Duration,
confChangeC chan *consensus.ConfChangePropose,
commitC chan *commitEntry,
delayPromote bool,
chainWal consensus.ChainWAL) *raftServer {
errorC := make(chan error, 1)
rs := &raftServer{
ComponentHub: hub,
RWMutex: sync.RWMutex{},
cluster: cluster,
walDB: NewWalDB(chainWal),
confChangeC: confChangeC,
commitC: commitC,
errorC: errorC,
join: join,
UseBackup: useBackup,
getSnapshot: getSnapshot,
snapFrequency: ConfSnapFrequency,
stopc: make(chan struct{}),
// rest of structure populated after WAL replay
promotable: true,
tickMS: tickMS,
commitProgress: CommitProgress{},
}
if delayPromote {
rs.SetPromotable(false)
}
rs.snapshotter = newChainSnapshotter(nil, rs.ComponentHub, rs.cluster, rs.walDB, func() uint64 { return rs.GetLeader() })
return rs
}
func (rs *raftServer) SetPeerAccessor(pa p2pcommon.PeerAccessor) {
rs.pa = pa
rs.snapshotter.setPeerAccessor(pa)
}
func (rs *raftServer) SetPromotable(val bool) {
rs.Lock()
defer rs.Unlock()
rs.promotable = val
}
func (rs *raftServer) GetPromotable() bool {
rs.RLock()
defer rs.RUnlock()
val := rs.promotable
return val
}
func (rs *raftServer) Start() {
go rs.startRaft()
}
func (rs *raftServer) makeStartPeers() ([]raftlib.Peer, error) {
return rs.cluster.getStartPeers()
}
type RaftServerState int
const (
RaftServerStateRestart = iota
RaftServerStateNewCluster
RaftServerStateJoinCluster
)
func (rs *raftServer) startRaft() {
var node raftlib.Node
getState := func() RaftServerState {
hasWal, err := rs.walDB.HasWal(rs.cluster.identity)
if err != nil {
logger.Info().Err(err).Msg("wal database of raft is missing or not valid")
}
switch {
case hasWal:
return RaftServerStateRestart
case rs.join:
return RaftServerStateJoinCluster
default:
return RaftServerStateNewCluster
}
}
isEmptyLog := func() bool {
var (
last uint64
err error
)
if last, err = rs.walDB.GetRaftEntryLastIdx(); err != nil {
return true
}
// If joined node is crashed before writing snapshot, it may occur that last index is 0 and there is not snapshot.
if last == 0 {
if tmpsnap, err := rs.walDB.GetSnapshot(); tmpsnap == nil || err != nil {
return true
}
}
return false
}
switch getState() {
case RaftServerStateRestart:
logger.Info().Msg("raft restart from wal")
rs.cluster.ResetMembers()
if isEmptyLog() {
logger.Info().Msg("there is no log, so import cluster information from remote. This server may have been added and terminated before the first synchronization was completed")
if _, err := rs.ImportExistingCluster(); err != nil {
logger.Fatal().Err(err).Str("mine", rs.cluster.toString()).Msg("failed to import existing cluster info")
}
}
node = rs.restartNode(false)
case RaftServerStateJoinCluster:
logger.Info().Msg("raft start and join existing cluster")
var (
hardstateinfo *types.HardStateInfo
err error
)
rs.cluster.ResetMembers()
// get cluster info from existing cluster member and hardstate of bestblock
if hardstateinfo, err = rs.ImportExistingCluster(); err != nil {
logger.Fatal().Err(err).Str("mine", rs.cluster.toString()).Msg("failed to import existing cluster info")
}
if rs.UseBackup {
logger.Info().Msg("raft use given backup as wal")
if err := rs.walDB.ResetWAL(hardstateinfo); err != nil {
logger.Fatal().Err(err).Msg("reset wal failed for raft")
}
if err := rs.SaveIdentity(); err != nil {
logger.Fatal().Err(err).Msg("failed to save identity")
}
node = rs.restartNode(true)
logger.Info().Msg("raft restarted from backup")
} else {
node = rs.startNode(nil)
}
case RaftServerStateNewCluster:
logger.Info().Bool("usebackup", rs.UseBackup).Msg("raft start and make new cluster")
if rs.UseBackup {
rs.walDB.ClearWAL()
}
var startPeers []raftlib.Peer
startPeers, err := rs.makeStartPeers()
if err != nil {
logger.Fatal().Err(err).Msg("failed to make raft peer list")
}
node = rs.startNode(startPeers)
}
// need locking for sync with consensusAccessor
rs.setNodeSync(node)
rs.startTransport()
go rs.serveChannels()
}
func (rs *raftServer) ImportExistingCluster() (*types.HardStateInfo, error) {
logger.Info().Msg("import cluster information from remote")
// get cluster info from existing cluster member and hardstate of bestblock
existCluster, hardstateinfo, err := rs.GetExistingCluster()
if err != nil {
logger.Fatal().Err(err).Str("mine", rs.cluster.toString()).Msg("failed to get existing cluster info")
}
if hardstateinfo != nil {
logger.Info().Str("hardstate", hardstateinfo.ToString()).Msg("received hard state of best hash from remote cluster")
}
// config validate
if !rs.cluster.ValidateAndMergeExistingCluster(existCluster) {
logger.Fatal().Str("existcluster", existCluster.toString()).Str("mycluster", rs.cluster.toString()).Msg("this cluster configuration is not compatible with existing cluster")
}
return hardstateinfo, nil
}
func (rs *raftServer) ID() uint64 {
return rs.cluster.NodeID()
}
func (rs *raftServer) startNode(startPeers []raftlib.Peer) raftlib.Node {
var (
blk *types.Block
err error
id *consensus.RaftIdentity
)
validateEmpty := func() {
if blk, err = rs.walDB.GetBestBlock(); err != nil {
logger.Fatal().Err(err).Msg("failed to get best block, so failed to start raft server")
}
if blk.BlockNo() > 0 {
if rs.UseBackup {
logger.Info().Uint64("best no", blk.BlockNo()).Str("best hash", blk.ID()).Msg("start from existing block chain")
} else {
logger.Fatal().Err(err).Msg("blockchain data is not empty, so failed to start raft server")
}
}
if id, err = rs.walDB.GetIdentity(); err == nil && id != nil {
logger.Fatal().Err(err).Str("id", id.ToString()).Msg("raft identity is already written. so failed to start raft server")
}
}
validateEmpty()
if err := rs.cluster.SetThisNodeID(); err != nil {
logger.Fatal().Err(err).Msg("failed to set id of this node")
}
// when join to exising cluster, cluster ID is already set
if rs.cluster.ClusterID() == InvalidClusterID {
rs.cluster.GenerateID(rs.UseBackup)
}
if err := rs.SaveIdentity(); err != nil {
logger.Fatal().Err(err).Str("identity", rs.cluster.identity.ToString()).Msg("failed to save identity")
}
rs.raftStorage = raftlib.NewMemoryStorage()
c := makeConfig(rs.ID(), rs.raftStorage)
logger.Info().Msg("raft node start")
return raftlib.StartNode(c, startPeers)
}
func (rs *raftServer) restartNode(join bool) raftlib.Node {
snapshot, err := rs.loadSnapshot()
if err != nil {
logger.Fatal().Err(err).Msg("failed to read snapshot")
}
if err := rs.replayWAL(snapshot); err != nil {
logger.Fatal().Err(err).Msg("replay wal failed for raft")
}
// members of cluster will be loaded from snapshot or wal
// When restart from join, cluster must not recover from temporary snapshot since members are empty.
// Instead, node must use a cluster info received from remote server
if join == false && snapshot != nil {
if _, err := rs.cluster.Recover(snapshot); err != nil {
logger.Fatal().Err(err).Msg("failt to recover cluster from snapshot")
}
}
c := makeConfig(rs.ID(), rs.raftStorage)
logger.Info().Msg("raft node restart")
return raftlib.RestartNode(c)
}
func (rs *raftServer) startTransport() {
//rs.transport = rs.createHttpTransporter()
rs.transport = rs.createAergoP2PTransporter()
if err := rs.transport.Start(); err != nil {
logger.Fatal().Err(err).Msg("failed to start raft http")
}
for _, member := range rs.cluster.Members().MapByID {
if rs.cluster.NodeID() != member.ID {
rs.transport.AddPeer(etcdtypes.ID(member.ID), member.GetPeerID(), []string{member.Address})
}
}
}
func (rs *raftServer) createHttpTransporter() Transporter {
transporter := &HttpTransportWrapper{Transport: rafthttp.Transport{
ID: etcdtypes.ID(rs.ID()),
ClusterID: etcdtypes.ID(rs.cluster.ClusterID()),
Raft: rs,
ServerStats: stats.NewServerStats("", ""),
LeaderStats: stats.NewLeaderStats(strconv.FormatUint(rs.ID(), 10)),
Snapshotter: rs.snapshotter,
ErrorC: rs.errorC,
}}
transporter.SetLogger(httpLogger)
return transporter
}
func (rs *raftServer) createAergoP2PTransporter() Transporter {
future := rs.RequestFuture(message.P2PSvc, message.GetRaftTransport{Cluster: rs.cluster}, time.Second<<4, "getbackend")
result, err := future.Result()
if err != nil {
logger.Panic().Err(err).Msg("failed to get backend")
}
return result.(Transporter)
}
func (rs *raftServer) SaveIdentity() error {
if rs.cluster.ClusterID() == 0 || rs.cluster.NodeID() == consensus.InvalidMemberID || len(rs.cluster.NodeName()) == 0 || len(rs.cluster.NodePeerID()) == 0 {
logger.Error().Str("identity", rs.cluster.identity.ToString()).Msg("failed to save raft identity. identity is invalid")
return ErrInvalidRaftIdentity
}
if err := rs.walDB.WriteIdentity(&rs.cluster.identity); err != nil {
logger.Fatal().Err(err).Msg("failed to write raft identity to wal")
return err
}
return nil
}
func (rs *raftServer) setNodeSync(node raftlib.Node) {
rs.Lock()
defer rs.Unlock()
rs.node = node
}
func (rs *raftServer) getNodeSync() raftlib.Node {
var node raftlib.Node
rs.RLock()
defer rs.RUnlock()
node = rs.node
return node
}
// stop closes http, closes all channels, and stops raft.
func (rs *raftServer) stop() {
logger.Info().Msg("stop raft server")
rs.stopHTTP()
close(rs.commitC)
close(rs.errorC)
rs.node.Stop()
}
func (rs *raftServer) stopHTTP() {
rs.transport.Stop()
}
func (rs *raftServer) writeError(err error) {
logger.Error().Err(err).Msg("write err has occurend raft server. ")
}
// TODO timeout handling with context
func (rs *raftServer) Propose(block *types.Block) error {
if block == nil {
return ErrProposeNilBlock
}
logger.Debug().Msg("propose block")
if data, err := marshalEntryData(block); err == nil {
// blocks until accepted by raft state machine
if err := rs.node.Propose(context.TODO(), data); err != nil {
return err
}
logger.Debug().Int("len", len(data)).Msg("proposed data to raft node")
} else {
logger.Fatal().Err(err).Msg("poposed data is invalid")
}
return nil
}
func (rs *raftServer) saveConfChangeState(id uint64, state types.ConfChangeState, errCC error) error {
var errStr string
if errCC != nil {
errStr = errCC.Error()
}
pr := types.ConfChangeProgress{State: state, Err: errStr, Members: rs.cluster.appliedMembers.ToMemberAttrArray()}
return rs.walDB.WriteConfChangeProgress(id, &pr)
}
func (rs *raftServer) serveConfChange() {
handleConfChange := func(propose *consensus.ConfChangePropose) {
var err error
err = rs.node.ProposeConfChange(context.TODO(), *propose.Cc)
if err != nil {
logger.Error().Err(err).Msg("failed to propose configure change")
rs.cluster.AfterConfChange(propose.Cc, nil, err)
}
err = rs.saveConfChangeState(propose.Cc.ID, types.ConfChangeState_CONF_CHANGE_STATE_PROPOSED, err)
if err != nil {
logger.Error().Err(err).Msg("failed to save progress of configure change")
}
}
// send proposals over raft
for rs.confChangeC != nil {
select {
case confChangePropose, ok := <-rs.confChangeC:
if !ok {
rs.confChangeC = nil
} else {
handleConfChange(confChangePropose)
}
}
}
// client closed channel; shutdown raft if not already
close(rs.stopc)
}
func (rs *raftServer) serveChannels() {
defer RecoverExit()
snapshot, err := rs.raftStorage.Snapshot()
if err != nil {
logger.Panic().Err(err).Msg("failed to get snapshot")
}
rs.setConfState(&snapshot.Metadata.ConfState)
rs.setSnapshotIndex(snapshot.Metadata.Index)
rs.setAppliedIndex(snapshot.Metadata.Index)
ticker := time.NewTicker(rs.tickMS)
defer ticker.Stop()
go rs.serveConfChange()
// event loop on raft state machine updates
for {
select {
case <-ticker.C:
if rs.GetPromotable() {
rs.node.Tick()
}
// store raft entries to walDB, then publish over commit channel
case rd := <-rs.node.Ready():
if len(rd.Entries) > 0 || len(rd.CommittedEntries) > 0 || !raftlib.IsEmptyHardState(rd.HardState) || rd.SoftState != nil {
logger.Debug().Int("entries", len(rd.Entries)).Int("commitentries", len(rd.CommittedEntries)).Str("hardstate", types.RaftHardStateToString(rd.HardState)).Msg("ready to process")
}
if rs.IsLeader() {
if err := rs.processMessages(rd.Messages); err != nil {
logger.Fatal().Err(err).Msg("leader process message error")
}
}
if err := rs.walDB.SaveEntry(rd.HardState, rd.Entries); err != nil {
logger.Fatal().Err(err).Msg("failed to save entry to wal")
}
if !raftlib.IsEmptySnap(rd.Snapshot) {
if err := rs.walDB.WriteSnapshot(&rd.Snapshot); err != nil {
logger.Fatal().Err(err).Msg("failed to save snapshot to wal")
}
if err := rs.raftStorage.ApplySnapshot(rd.Snapshot); err != nil {
logger.Fatal().Err(err).Msg("failed to apply snapshot")
}
if err := rs.publishSnapshot(rd.Snapshot); err != nil {
logger.Fatal().Err(err).Msg("failed to publish snapshot")
}
}
if err := rs.raftStorage.Append(rd.Entries); err != nil {
logger.Fatal().Err(err).Msg("failed to append new entries to raft log")
}
if !rs.IsLeader() {
if err := rs.processMessages(rd.Messages); err != nil {
logger.Fatal().Err(err).Msg("process message error")
}
}
if ok := rs.publishEntries(rs.entriesToApply(rd.CommittedEntries)); !ok {
rs.stop()
return
}
rs.triggerSnapshot()
// New block must be created after connecting all commited block
if !raftlib.IsEmptyHardState(rd.HardState) {
rs.updateTerm(rd.HardState.Term)
}
if rd.SoftState != nil {
rs.updateLeader(rd.SoftState)
}
rs.node.Advance()
case err := <-rs.errorC:
rs.writeError(err)
return
case <-rs.stopc:
rs.stop()
return
}
}
}
func (rs *raftServer) processMessages(msgs []raftpb.Message) error {
var err error
var tmpSnapMsg *snap.Message
snapMsgs := make([]*snap.Message, 0)
// reset MsgSnap to send snap.Message
for i, msg := range msgs {
if msg.Type == raftpb.MsgSnap {
tmpSnapMsg, err = rs.makeSnapMessage(&msg)
if err != nil {
return err
}
snapMsgs = append(snapMsgs, tmpSnapMsg)
msgs[i].To = 0
}
}
rs.transport.Send(msgs)
for _, tmpSnapMsg := range snapMsgs {
rs.transport.SendSnapshot(*tmpSnapMsg)
}
return nil
}
func (rs *raftServer) makeSnapMessage(msg *raftpb.Message) (*snap.Message, error) {
if msg.Type != raftpb.MsgSnap {
return nil, ErrNotMsgSnap
}
/*
// make snapshot with last progress of raftserver
snapshot, err := rs.snapshotter.createSnapshot(rs.prevProgress, rs.confState)
if err != nil {
return nil, err
}
msg.Snapshot = *snapshot
*/
// TODO add cluster info to snapshot.data
logger.Debug().Uint64("term", msg.Snapshot.Metadata.Term).Uint64("index", msg.Snapshot.Metadata.Index).Msg("send merged snapshot message")
// not using pipe to send snapshot
pr, pw := io.Pipe()
go func() {
buf := new(bytes.Buffer)
err := binary.Write(buf, binary.LittleEndian, int32(1))
if err != nil {
logger.Fatal().Err(err).Msg("raft pipe binary write err")
}
n, err := pw.Write(buf.Bytes())
if err == nil {
logger.Debug().Msgf("wrote database snapshot out [total bytes: %d]", n)
} else {
logger.Error().Msgf("failed to write database snapshot out [written bytes: %d]: %v", n, err)
}
if err := pw.CloseWithError(err); err != nil {
logger.Fatal().Err(err).Msg("raft pipe close error")
}
}()
return snap.NewMessage(*msg, pr, 4), nil
}
func (rs *raftServer) loadSnapshot() (*raftpb.Snapshot, error) {
snapshot, err := rs.walDB.GetSnapshot()
if err != nil {
logger.Fatal().Err(err).Msg("error loading snapshot")
return nil, err
}
if snapshot == nil {
logger.Info().Msg("snapshot does not exist")
return nil, nil
}
snapdata := &consensus.SnapshotData{}
err = snapdata.Decode(snapshot.Data)
if err != nil {
logger.Fatal().Err(err).Msg("error decoding snapshot")
return nil, err
}
logger.Info().Str("meta", consensus.SnapToString(snapshot, snapdata)).Msg("loaded snapshot meta")
return snapshot, nil
}
// replayWAL replays WAL entries into the raft instance.
func (rs *raftServer) replayWAL(snapshot *raftpb.Snapshot) error {
logger.Info().Msg("replaying WAL")
identity, st, ents, err := rs.walDB.ReadAll(snapshot)
if err != nil {
logger.Fatal().Err(err).Msg("failed to read WAL")
return err
}
if err := rs.cluster.RecoverIdentity(identity); err != nil {
logger.Fatal().Err(err).Msg("failed to recover raft identity from wal")
}
rs.raftStorage = raftlib.NewMemoryStorage()
if snapshot != nil {
if err := rs.raftStorage.ApplySnapshot(*snapshot); err != nil {
logger.Fatal().Err(err).Msg("failed to apply snapshot to reaply wal")
}
}
if err := rs.raftStorage.SetHardState(*st); err != nil {
logger.Fatal().Err(err).Msg("failed to set hard state to reaply wal")
}
// append to storage so raft starts at the right place in log
if err := rs.raftStorage.Append(ents); err != nil {
logger.Fatal().Err(err).Msg("failed to append entries to reaply wal")
}
// send nil once lastIndex is published so client knows commit channel is current
if len(ents) > 0 {
rs.lastIndex = ents[len(ents)-1].Index
}
rs.updateTerm(st.Term)
logger.Info().Uint64("lastindex", rs.lastIndex).Msg("replaying WAL done")
return nil
}
/*
// createSnapshot make marshalled data of chain & cluster info
func (rs *raftServer) createSnapshot() ([]byte, error) {
// this snapshot is used when reboot and initialize raft log
if rs.prevProgress.isEmpty() {
logger.Fatal().Msg("last applied block is nil")
}
snapBlock := rs.prevProgress.block
logger.Info().Str("hash", snapBlock.ID()).Uint64("no", snapBlock.BlockNo()).Msg("create new snapshot of chain")
snap := consensus.NewChainSnapshot(snapBlock)
if snap == nil {
panic("new snap failed")
}
return snap.Encode()
}*/
// triggerSnapshot create snapshot and make compaction for raft log storage
// raft can not wait until last applied entry commits. so snapshot must create from current best block.
//
// @ MatchBlockAndCluster
//
// snapshot use current state of cluster and confstate. but last applied block may not be commited yet.
// so raft use last commited block. because of this, some conf change log can cause error on node that received snapshot
func (rs *raftServer) triggerSnapshot() {
ce := rs.commitProgress.GetConnect()
newSnapshotIndex, snapBlock := ce.index, ce.block
if newSnapshotIndex == 0 || rs.confState == nil {
return
}
if len(rs.confState.Nodes) == 0 {
// TODO Fatal -> Error after test
logger.Fatal().Msg("confstate node is empty for snapshot")
return
}
if newSnapshotIndex-rs.snapshotIndex <= rs.snapFrequency {
return
}
logger.Info().Uint64("applied", rs.appliedIndex).Uint64("new snap index", newSnapshotIndex).Uint64("last snapshot index", rs.snapshotIndex).Msg("start snapshot")
// make snapshot data of best block
snapdata, err := rs.snapshotter.createSnapshotData(rs.cluster, snapBlock, rs.confState)
if err != nil {
logger.Fatal().Err(err).Msg("failed to create snapshot data from prev block")
}
data, err := snapdata.Encode()
if err != nil {
logger.Fatal().Err(err).Msg("failed to marshal snapshot data")
}
// snapshot.data is not used for snapshot transfer. At the time of transmission, a message is generated again with information at that time and sent.
snapshot, err := rs.raftStorage.CreateSnapshot(newSnapshotIndex, rs.confState, data)
if err != nil {
logger.Fatal().Err(err).Msg("failed to create snapshot")
}
// save snapshot to wal
if err := rs.walDB.WriteSnapshot(&snapshot); err != nil {
logger.Fatal().Err(err).Msg("failed to write snapshot")
}
compactIndex := uint64(1)
if newSnapshotIndex > ConfSnapshotCatchUpEntriesN {
compactIndex = newSnapshotIndex - ConfSnapshotCatchUpEntriesN
}
if err := rs.raftStorage.Compact(compactIndex); err != nil {
if err == raftlib.ErrCompacted {
return
}
logger.Fatal().Err(err).Uint64("index", compactIndex).Msg("failed to compact raft log")
}
logger.Info().Uint64("index", compactIndex).Msg("compacted raftLog.at index")
rs.setSnapshotIndex(newSnapshotIndex)
_ = chain.TestDebugger.Check(chain.DEBUG_RAFT_SNAP_FREQ, 0,
func(freq int) error {
rs.snapFrequency = uint64(freq)
return nil
})
}
func (rs *raftServer) publishSnapshot(snapshotToSave raftpb.Snapshot) error {
updateProgress := func() error {
var snapdata = &consensus.SnapshotData{}
err := snapdata.Decode(snapshotToSave.Data)
if err != nil {
logger.Error().Msg("failed to unmarshal snapshot data to progress")
return err
}
block, err := rs.walDB.GetBlockByNo(snapdata.Chain.No)
if err != nil {
logger.Fatal().Msg("failed to get synchronized block")
return err
}
rs.commitProgress.UpdateConnect(&commitEntry{block: block, index: snapshotToSave.Metadata.Index, term: snapshotToSave.Metadata.Term})
return nil
}
if raftlib.IsEmptySnap(snapshotToSave) {
return ErrEmptySnapshot
}
logger.Info().Uint64("index", rs.snapshotIndex).Str("snap", consensus.SnapToString(&snapshotToSave, nil)).Msg("publishing snapshot at index")
defer logger.Info().Uint64("index", rs.snapshotIndex).Msg("finished publishing snapshot at index")
if snapshotToSave.Metadata.Index <= rs.appliedIndex {
logger.Fatal().Msgf("snapshot index [%d] should > progress.appliedIndex [%d] + 1", snapshotToSave.Metadata.Index, rs.appliedIndex)
}
//rs.commitC <- nil // trigger kvstore to load snapshot
rs.setConfState(&snapshotToSave.Metadata.ConfState)
rs.setSnapshotIndex(snapshotToSave.Metadata.Index)
rs.setAppliedIndex(snapshotToSave.Metadata.Index)
var (
isEqual bool
err error
)
if isEqual, err = rs.cluster.Recover(&snapshotToSave); err != nil {
return err
}
if !isEqual {
rs.recoverTransport()
}
return updateProgress()
}
func (rs *raftServer) recoverTransport() {
logger.Info().Msg("remove all peers")
rs.transport.RemoveAllPeers()
for _, m := range rs.cluster.AppliedMembers().MapByID {
if m.ID == rs.cluster.NodeID() {
continue
}
logger.Info().Str("member", m.ToString()).Msg("add raft peer")
rs.transport.AddPeer(etcdtypes.ID(uint64(m.ID)), m.GetPeerID(), []string{m.Address})
}
}
func (rs *raftServer) entriesToApply(ents []raftpb.Entry) (nents []raftpb.Entry) {
if len(ents) == 0 {
return
}
firstIdx := ents[0].Index
if firstIdx > rs.appliedIndex+1 {
logger.Fatal().Msgf("first index of committed entry[%d] should <= progress.appliedIndex[%d] 1", firstIdx, rs.appliedIndex)
}
if rs.appliedIndex-firstIdx+1 < uint64(len(ents)) {
nents = ents[rs.appliedIndex-firstIdx+1:]
}
return nents
}
var (
ErrInvCCType = errors.New("change type of ")
)
func unmarshalConfChangeEntry(entry *raftpb.Entry) (*raftpb.ConfChange, *consensus.Member, error) {
var cc raftpb.ConfChange
if err := cc.Unmarshal(entry.Data); err != nil {
logger.Fatal().Err(err).Uint64("idx", entry.Index).Uint64("term", entry.Term).Msg("failed to unmarshal of conf change entry")
return nil, nil, err
}
// skip confChange of empty context
if len(cc.Context) == 0 {
return nil, nil, nil
}
var member = consensus.Member{}
if err := json.Unmarshal(cc.Context, &member); err != nil {
logger.Fatal().Err(err).Uint64("idx", entry.Index).Uint64("term", entry.Term).Msg("failed to unmarshal of context of cc entry")
return nil, nil, err
}
return &cc, &member, nil
}
func (rs *raftServer) ValidateConfChangeEntry(entry *raftpb.Entry) (*raftpb.ConfChange, *consensus.Member, error) {
// TODO XXX validate from current cluster configure
var cc *raftpb.ConfChange
var member *consensus.Member
var err error
alreadyApplied := func(entry *raftpb.Entry) bool {
return rs.cluster.appliedTerm >= entry.Term || rs.cluster.appliedIndex >= entry.Index
}
cc, member, err = unmarshalConfChangeEntry(entry)
if err != nil {
logger.Fatal().Err(err).Str("entry", entry.String()).Uint64("requestID", cc.ID).Msg("failed to unmarshal conf change")
}
if alreadyApplied(entry) {
return cc, member, ErrCCAlreadyApplied
}
if err = rs.cluster.validateChangeMembership(cc, member, true); err != nil {
return cc, member, err
}
return cc, member, nil
}
// TODO refactoring by cc.Type
//
// separate unmarshal & apply[type]
//
// applyConfChange returns false if this node is removed from cluster
func (rs *raftServer) applyConfChange(ent *raftpb.Entry) bool {
var cc *raftpb.ConfChange
var member *consensus.Member
var err error
postWork := func(err error) bool {
if err != nil {
cc.NodeID = raftlib.None
rs.node.ApplyConfChange(*cc)
}
if cc.ID != 0 {
if err = rs.saveConfChangeState(cc.ID, types.ConfChangeState_CONF_CHANGE_STATE_APPLIED, err); err != nil {
logger.Error().Err(err).Msg("failed to save conf change status")
}
rs.cluster.AfterConfChange(cc, member, err)
}
return true
}
// ConfChanges may be applied more than once. This is because cluster information is more up-to-date than block information when a snapshot is received.
if cc, member, err = rs.ValidateConfChangeEntry(ent); err != nil {
logger.Warn().Err(err).Str("entry", types.RaftEntryToString(ent)).Str("cluster", rs.cluster.toString()).Msg("failed to validate conf change")
return postWork(err)
}
rs.confState = rs.node.ApplyConfChange(*cc)
logger.Info().Uint64("requestID", cc.ID).Str("type", cc.Type.String()).Str("member", member.ToString()).Msg("publish conf change entry")
switch cc.Type {
case raftpb.ConfChangeAddNode:
if err := rs.cluster.addMember(member, true); err != nil {
logger.Fatal().Str("member", member.ToString()).Msg("failed to add member to cluster")
}
if len(cc.Context) > 0 && rs.ID() != cc.NodeID {
rs.transport.AddPeer(etcdtypes.ID(cc.NodeID), member.GetPeerID(), []string{member.Address})
} else {
logger.Debug().Msg("skip add peer myself for addnode ")
}
case raftpb.ConfChangeRemoveNode:
if err := rs.cluster.removeMember(member); err != nil {
logger.Fatal().Str("member", member.ToString()).Msg("failed to add member to cluster")
}
if cc.NodeID == rs.ID() {
logger.Info().Msg("I've been removed from the cluster! Shutting down.")
return false
}
rs.transport.RemovePeer(etcdtypes.ID(cc.NodeID))
}
logger.Debug().Uint64("requestID", cc.ID).Str("cluster", rs.cluster.toString()).Msg("after conf changed")
return postWork(nil)
}
// publishEntries writes committed log entries to commit channel and returns
// whether all entries could be published.
func (rs *raftServer) publishEntries(ents []raftpb.Entry) bool {
var lastBlockEnt *raftpb.Entry
isDuplicateCommit := func(block *types.Block) bool {
lastReq := rs.commitProgress.GetRequest()
if lastReq != nil && lastReq.block.BlockNo() >= block.BlockNo() {
if StopDupCommit {
logger.Fatal().Str("last", lastReq.block.ID()).Str("dup", block.ID()).Uint64("no", block.BlockNo()).Msg("fork occured by invalid commit entry")
} else {
logger.Debug().Str("last", lastReq.block.ID()).Str("dup", block.ID()).Uint64("no", block.BlockNo()).Msg("skip commit entry of smaller index")
}
return true
}
return false
}
for i := range ents {
logger.Info().Uint64("idx", ents[i].Index).Uint64("term", ents[i].Term).Str("type", ents[i].Type.String()).Int("datalen", len(ents[i].Data)).Msg("publish entry")
switch ents[i].Type {
case raftpb.EntryNormal:
var block *types.Block
var err error
if len(ents[i].Data) != 0 {
if block, err = unmarshalEntryData(ents[i].Data); err != nil {
logger.Fatal().Err(err).Uint64("idx", ents[i].Index).Uint64("term", ents[i].Term).Msg("commit entry is corrupted")
continue
}
if block != nil {
if isDuplicateCommit(block) {
continue
}
logger.Info().Str("hash", block.ID()).Uint64("no", block.BlockNo()).Msg("commit normal block entry")
rs.commitProgress.UpdateRequest(&commitEntry{block: block, index: ents[i].Index, term: ents[i].Term})
}
}
select {
case rs.commitC <- &commitEntry{block: block, index: ents[i].Index, term: ents[i].Term}:
case <-rs.stopc:
return false
}
case raftpb.EntryConfChange:
if !rs.applyConfChange(&ents[i]) {
return false
}
}
// after commit, update appliedIndex
rs.setAppliedIndex(ents[i].Index)
}
if lastBlockEnt != nil {
}
return true
}
func (rs *raftServer) setSnapshotIndex(idx uint64) {
logger.Debug().Uint64("index", idx).Msg("raft server set snapshotIndex")
rs.snapshotIndex = idx
}
func (rs *raftServer) setAppliedIndex(idx uint64) {
logger.Debug().Uint64("index", idx).Msg("raft server set appliedIndex")
rs.appliedIndex = idx
}
func (rs *raftServer) setConfState(state *raftpb.ConfState) {
logger.Debug().Str("state", consensus.ConfStateToString(state)).Msg("raft server set confstate")
rs.confState = state
}
func (rs *raftServer) Process(ctx context.Context, m raftpb.Message) error {
node := rs.getNodeSync()
if node == nil {
return ErrRaftNotReady
}
return node.Step(ctx, m)
}
func (rs *raftServer) IsIDRemoved(id uint64) bool {
return rs.cluster.IsIDRemoved(id)
}
func (rs *raftServer) ReportUnreachable(id uint64) {
logger.Debug().Str("toID", EtcdIDToString(id)).Msg("report unreachable")
rs.node.ReportUnreachable(id)
}
func (rs *raftServer) ReportSnapshot(id uint64, status raftlib.SnapshotStatus) {
logger.Info().Str("toID", EtcdIDToString(id)).Bool("isSucceed", status == raftlib.SnapshotFinish).Msg("finished to send snapshot")
rs.node.ReportSnapshot(id, status)
}
func (rs *raftServer) WaitStartup() {
logger.Debug().Msg("raft start wait")
for s := range rs.commitC {
if s == nil {
break
}
}
logger.Debug().Msg("raft start succeed")
}
// updateTerm is called only by raftserver. so it doesn't have lock.
func (rs *raftServer) updateTerm(term uint64) {
rs.curTerm = term
}
func (rs *raftServer) updateLeader(softState *raftlib.SoftState) {
if softState.Lead != rs.GetLeader() {
rs.Lock()
defer rs.Unlock()
rs.leaderStatus.Leader = softState.Lead
if rs.curTerm == 0 {
logger.Fatal().Msg("term must not be 0")
}
rs.leaderStatus.Term = rs.curTerm
rs.leaderStatus.IsLeader = rs.checkLeader()
rs.leaderStatus.leaderChanged++
logger.Info().Uint64("term", rs.curTerm).Str("ID", EtcdIDToString(rs.ID())).Str("leader", EtcdIDToString(softState.Lead)).Msg("leader changed")
} else {
logger.Info().Uint64("term", rs.curTerm).Str("ID", EtcdIDToString(rs.ID())).Str("leader", EtcdIDToString(softState.Lead)).Msg("soft state leader unchanged")
}
}
func (rs *raftServer) GetLeader() uint64 {
rs.leaderStatus.RLock()
defer rs.leaderStatus.RUnlock()
return rs.leaderStatus.Leader
}
func (rs *raftServer) checkLeader() bool {
return rs.ID() != consensus.InvalidMemberID && rs.ID() == rs.leaderStatus.Leader
}
func (rs *raftServer) IsLeader() bool {
rs.leaderStatus.RLock()
defer rs.leaderStatus.RUnlock()
return rs.leaderStatus.IsLeader
}
func (rs *raftServer) GetLeaderStatus() LeaderStatus {
rs.leaderStatus.RLock()
defer rs.leaderStatus.RUnlock()
tmpStatus := rs.leaderStatus
return tmpStatus
}
// IsTermLeader returns true if this node is leader of given term
func (rs *raftServer) IsLeaderOfTerm(term uint64) bool {
status := rs.GetLeaderStatus()
return status.IsLeader && status.Term == term
}
func (rs *raftServer) Status() raftlib.Status {
node := rs.getNodeSync()
if node == nil {
return raftlib.Status{}
}
return node.Status()
}
type MemberProgressState int32
const (
MemberProgressStateHealthy MemberProgressState = iota
MemberProgressStateSlow
MemberProgressStateSyncing
MemberProgressStateUnknown
)
var (
MemberProgressStateNames = map[MemberProgressState]string{
MemberProgressStateHealthy: "MemberProgressStateHealthy",
MemberProgressStateSlow: "MemberProgressStateSlow",
MemberProgressStateSyncing: "MemberProgressStateSyncing",
MemberProgressStateUnknown: "MemberProgressStateUnknown",
}
)
type MemberProgress struct {
MemberID uint64
Status MemberProgressState
LogDifference uint64
progress raftlib.Progress
}
type ClusterProgress struct {
N int
MemberProgresses map[uint64]*MemberProgress
}
func (cp *ClusterProgress) ToString() string {
buf := fmt.Sprintf("{ Total: %d, Members[", cp.N)
for _, mp := range cp.MemberProgresses {
buf = buf + mp.ToString()
}
buf = buf + fmt.Sprintf("] }")
return buf
}
func (cp *MemberProgress) ToString() string {
return fmt.Sprintf("{ id: %x, Staus: \"%s\", LogDifference: %d }", cp.MemberID, MemberProgressStateNames[cp.Status], cp.LogDifference)
}
func (rs *raftServer) GetLastIndex() (uint64, error) {
return rs.raftStorage.LastIndex()
}
func (rs *raftServer) GetClusterProgress() (*ClusterProgress, error) {
getProgressState := func(raftProgress *raftlib.Progress, lastLeaderIndex uint64, nodeID uint64, leadID uint64) MemberProgressState {
isLeader := nodeID == leadID
if !isLeader {
// syncing
if raftProgress.State == raftlib.ProgressStateSnapshot {
return MemberProgressStateSyncing
}
// slow
// - Even if node state is ProgressStateReplicate, if matchNo of the node is too lower than last index of leader, it is considered a slow state.
var isSlowFollower bool
if lastLeaderIndex > raftProgress.Match && (lastLeaderIndex-raftProgress.Match) > MaxSlowNodeGap {
isSlowFollower = true
}
if raftProgress.State == raftlib.ProgressStateProbe || isSlowFollower {
return MemberProgressStateSlow
}
}
// normal
return MemberProgressStateHealthy
}
var (
lastIdx uint64
err error
)
prog := ClusterProgress{}
node := rs.getNodeSync()
if node == nil || !rs.IsLeader() {
return &prog, nil
}
status := node.Status()
n := len(status.Progress)
if n == 0 {
return &prog, nil
}
statusBytes, err := status.MarshalJSON()
if err != nil {
logger.Error().Err(err).Msg("failed to marshalEntryData raft status")
} else {
logger.Debug().Str("status", string(statusBytes)).Msg("raft status")
}
if lastIdx, err = rs.GetLastIndex(); err != nil {
logger.Error().Err(err).Msg("Get last raft index on leader")
return &prog, err
}
prog.MemberProgresses = make(map[uint64]*MemberProgress)
prog.N = n
for id, nodeProgress := range status.Progress {
prog.MemberProgresses[id] = &MemberProgress{MemberID: id, Status: getProgressState(&nodeProgress, lastIdx, rs.cluster.NodeID(), id), LogDifference: lastIdx - nodeProgress.Match, progress: nodeProgress}
}
return &prog, nil
}
// GetExistingCluster returns information of existing cluster.
// It request member info to all of peers.
func (rs *raftServer) GetExistingCluster() (*Cluster, *types.HardStateInfo, error) {
var (
cl *Cluster
hardstate *types.HardStateInfo
err error
bestHash []byte
bestBlk *types.Block
)
getBestHash := func() []byte {
if bestBlk, err = rs.walDB.GetBestBlock(); err != nil {
logger.Fatal().Msg("failed to get best block of my chain to get existing cluster info")
}
if bestBlk.BlockNo() == 0 {
return nil
}
logger.Info().Str("hash", bestBlk.ID()).Uint64("no", bestBlk.BlockNo()).Msg("best block of blockchain")
return bestBlk.BlockHash()
}
bestHash = getBestHash()
for i := 1; i <= MaxTryGetCluster; i++ {
cl, hardstate, err = GetClusterInfo(rs.ComponentHub, bestHash)
if err != nil {
if err != ErrGetClusterTimeout && i != MaxTryGetCluster {
logger.Error().Err(err).Int("try", i).Msg("failed try to get cluster. and sleep")
time.Sleep(time.Second * 10)
} else {
logger.Warn().Err(err).Int("try", i).Msg("failed try to get cluster")
}
continue
}
return cl, hardstate, nil
}
return nil, nil, ErrGetClusterFail
}
func marshalEntryData(block *types.Block) ([]byte, error) {
var data []byte
var err error
if data, err = proto.Encode(block); err != nil {
logger.Fatal().Err(err).Msg("poposed data is invalid")
}
return data, nil
}
var (
ErrUnmarshal = errors.New("failed to unmarshalEntryData log entry")
)
func unmarshalEntryData(data []byte) (*types.Block, error) {
block := &types.Block{}
if err := proto.Decode(data, block); err != nil {
return block, ErrUnmarshal
}
return block, nil
}
type raftHttpWrapper struct {
bf *BlockFactory
raftServer *raftServer
}
func (rhw *raftHttpWrapper) Process(ctx context.Context, peerID types.PeerID, m raftpb.Message) error {
return rhw.raftServer.Process(ctx, m)
}
func (rhw *raftHttpWrapper) IsIDRemoved(peerID types.PeerID) bool {
if member := rhw.raftServer.cluster.Members().getMemberByPeerID(peerID); member != nil {
return rhw.raftServer.IsIDRemoved(member.ID)
}
return true
}
func (rhw *raftHttpWrapper) ReportUnreachable(peerID types.PeerID) {
if member := rhw.raftServer.cluster.Members().getMemberByPeerID(peerID); member != nil {
rhw.raftServer.ReportUnreachable(member.ID)
}
}
func (rhw *raftHttpWrapper) ReportSnapshot(peerID types.PeerID, status raftlib.SnapshotStatus) {
if member := rhw.raftServer.cluster.Members().getMemberByPeerID(peerID); member != nil {
rhw.raftServer.ReportSnapshot(member.ID, status)
}
}
func (rhw *raftHttpWrapper) GetMemberByID(id uint64) *consensus.Member {
return rhw.raftServer.cluster.Members().getMember(id)
}
func (rhw *raftHttpWrapper) GetMemberByPeerID(peerID types.PeerID) *consensus.Member {
return rhw.raftServer.cluster.Members().getMemberByPeerID(peerID)
}
func (rhw *raftHttpWrapper) SaveFromRemote(r io.Reader, id uint64, msg raftpb.Message) (int64, error) {
return rhw.raftServer.snapshotter.SaveFromRemote(r, id, msg)
}