manager/state/raft/raft.go
package raft
import (
"context"
"fmt"
"io"
"math"
"math/rand"
"net"
"os"
"runtime"
"sync"
"sync/atomic"
"time"
"code.cloudfoundry.org/clock"
"github.com/docker/go-events"
"github.com/docker/go-metrics"
"github.com/gogo/protobuf/proto"
"github.com/moby/swarmkit/v2/api"
"github.com/moby/swarmkit/v2/ca"
"github.com/moby/swarmkit/v2/log"
"github.com/moby/swarmkit/v2/manager/raftselector"
"github.com/moby/swarmkit/v2/manager/state"
"github.com/moby/swarmkit/v2/manager/state/raft/membership"
"github.com/moby/swarmkit/v2/manager/state/raft/storage"
"github.com/moby/swarmkit/v2/manager/state/raft/transport"
"github.com/moby/swarmkit/v2/manager/state/store"
"github.com/moby/swarmkit/v2/watch"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"go.etcd.io/etcd/pkg/v3/idutil"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
)
var (
// ErrNoRaftMember is thrown when the node is not yet part of a raft cluster
ErrNoRaftMember = errors.New("raft: node is not yet part of a raft cluster")
// ErrConfChangeRefused is returned when there is an issue with the configuration change
ErrConfChangeRefused = errors.New("raft: propose configuration change refused")
// ErrApplyNotSpecified is returned during the creation of a raft node when no apply method was provided
ErrApplyNotSpecified = errors.New("raft: apply method was not specified")
// ErrSetHardState is returned when the node fails to set the hard state
ErrSetHardState = errors.New("raft: failed to set the hard state for log append entry")
// ErrStopped is returned when an operation was submitted but the node was stopped in the meantime
ErrStopped = errors.New("raft: failed to process the request: node is stopped")
// ErrLostLeadership is returned when an operation was submitted but the node lost leader status before it became committed
ErrLostLeadership = errors.New("raft: failed to process the request: node lost leader status")
// ErrRequestTooLarge is returned when a raft internal message is too large to be sent
ErrRequestTooLarge = errors.New("raft: raft message is too large and can't be sent")
// ErrCannotRemoveMember is thrown when we try to remove a member from the cluster but this would result in a loss of quorum
ErrCannotRemoveMember = errors.New("raft: member cannot be removed, because removing it may result in loss of quorum")
// ErrNoClusterLeader is thrown when the cluster has no elected leader
ErrNoClusterLeader = errors.New("raft: no elected cluster leader")
// ErrMemberUnknown is sent in response to a message from an
// unrecognized peer.
ErrMemberUnknown = errors.New("raft: member unknown")
// work around lint
lostQuorumMessage = "The swarm does not have a leader. It's possible that too few managers are online. Make sure more than half of the managers are online."
errLostQuorum = errors.New(lostQuorumMessage)
// Timer to capture ProposeValue() latency.
proposeLatencyTimer metrics.Timer
)
// LeadershipState indicates whether the node is a leader or follower.
type LeadershipState int
const (
// IsLeader indicates that the node is a raft leader.
IsLeader LeadershipState = iota
// IsFollower indicates that the node is a raft follower.
IsFollower
// lostQuorumTimeout is the number of ticks that can elapse with no
// leader before LeaderConn starts returning an error right away.
lostQuorumTimeout = 10
)
// EncryptionKeys are the current and, if necessary, pending DEKs with which to
// encrypt raft data
type EncryptionKeys struct {
CurrentDEK []byte
PendingDEK []byte
}
// EncryptionKeyRotator is an interface to find out if any keys need rotating.
type EncryptionKeyRotator interface {
GetKeys() EncryptionKeys
UpdateKeys(EncryptionKeys) error
NeedsRotation() bool
RotationNotify() chan struct{}
}
// Node represents the Raft Node useful
// configuration.
type Node struct {
raftNode raft.Node
cluster *membership.Cluster
transport *transport.Transport
raftStore *raft.MemoryStorage
memoryStore *store.MemoryStore
Config *raft.Config
opts NodeOptions
reqIDGen *idutil.Generator
wait *wait
campaignWhenAble bool
signalledLeadership uint32
isMember uint32
bootstrapMembers []*api.RaftMember
// waitProp waits for all the proposals to be terminated before
// shutting down the node.
waitProp sync.WaitGroup
confState raftpb.ConfState
appliedIndex uint64
snapshotMeta raftpb.SnapshotMetadata
writtenWALIndex uint64
ticker clock.Ticker
doneCh chan struct{}
// RemovedFromRaft notifies about node deletion from raft cluster
RemovedFromRaft chan struct{}
cancelFunc func()
removeRaftOnce sync.Once
leadershipBroadcast *watch.Queue
// used to coordinate shutdown
// Lock should be used only in stop(), all other functions should use RLock.
stopMu sync.RWMutex
// used for membership management checks
membershipLock sync.Mutex
// synchronizes access to n.opts.Addr, and makes sure the address is not
// updated concurrently with JoinAndStart.
addrLock sync.Mutex
snapshotInProgress chan raftpb.SnapshotMetadata
asyncTasks sync.WaitGroup
// stopped chan is used for notifying grpc handlers that raft node going
// to stop.
stopped chan struct{}
raftLogger *storage.EncryptedRaftLogger
keyRotator EncryptionKeyRotator
rotationQueued bool
clearData bool
// waitForAppliedIndex stores the index of the last log that was written using
// an raft DEK during a raft DEK rotation, so that we won't finish a rotation until
// a snapshot covering that index has been written encrypted with the new raft DEK
waitForAppliedIndex uint64
ticksWithNoLeader uint32
}
// NodeOptions provides node-level options.
type NodeOptions struct {
// ID is the node's ID, from its certificate's CN field.
ID string
// Addr is the address of this node's listener
Addr string
// ForceNewCluster defines if we have to force a new cluster
// because we are recovering from a backup data directory.
ForceNewCluster bool
// JoinAddr is the cluster to join. May be an empty string to create
// a standalone cluster.
JoinAddr string
// ForceJoin tells us to join even if already part of a cluster.
ForceJoin bool
// Config is the raft config.
Config *raft.Config
// StateDir is the directory to store durable state.
StateDir string
// TickInterval interval is the time interval between raft ticks.
TickInterval time.Duration
// ClockSource is a Clock interface to use as a time base.
// Leave this nil except for tests that are designed not to run in real
// time.
ClockSource clock.Clock
// SendTimeout is the timeout on the sending messages to other raft
// nodes. Leave this as 0 to get the default value.
SendTimeout time.Duration
TLSCredentials credentials.TransportCredentials
KeyRotator EncryptionKeyRotator
// DisableStackDump prevents Run from dumping goroutine stacks when the
// store becomes stuck.
DisableStackDump bool
// FIPS specifies whether the raft encryption should be FIPS compliant
FIPS bool
}
func init() {
rand.Seed(time.Now().UnixNano())
ns := metrics.NewNamespace("swarm", "raft", nil)
proposeLatencyTimer = ns.NewTimer("transaction_latency", "Raft transaction latency.")
metrics.Register(ns)
}
// NewNode generates a new Raft node
func NewNode(opts NodeOptions) *Node {
cfg := opts.Config
if cfg == nil {
cfg = DefaultNodeConfig()
}
if opts.TickInterval == 0 {
opts.TickInterval = time.Second
}
if opts.SendTimeout == 0 {
opts.SendTimeout = 2 * time.Second
}
raftStore := raft.NewMemoryStorage()
n := &Node{
cluster: membership.NewCluster(),
raftStore: raftStore,
opts: opts,
Config: &raft.Config{
ElectionTick: cfg.ElectionTick,
HeartbeatTick: cfg.HeartbeatTick,
Storage: raftStore,
MaxSizePerMsg: cfg.MaxSizePerMsg,
MaxInflightMsgs: cfg.MaxInflightMsgs,
Logger: cfg.Logger,
CheckQuorum: cfg.CheckQuorum,
},
doneCh: make(chan struct{}),
RemovedFromRaft: make(chan struct{}),
stopped: make(chan struct{}),
leadershipBroadcast: watch.NewQueue(),
keyRotator: opts.KeyRotator,
}
n.memoryStore = store.NewMemoryStore(n)
if opts.ClockSource == nil {
n.ticker = clock.NewClock().NewTicker(opts.TickInterval)
} else {
n.ticker = opts.ClockSource.NewTicker(opts.TickInterval)
}
n.reqIDGen = idutil.NewGenerator(uint16(n.Config.ID), time.Now())
n.wait = newWait()
n.cancelFunc = func(n *Node) func() {
var cancelOnce sync.Once
return func() {
cancelOnce.Do(func() {
close(n.stopped)
})
}
}(n)
return n
}
// IsIDRemoved reports if member with id was removed from cluster.
// Part of transport.Raft interface.
func (n *Node) IsIDRemoved(id uint64) bool {
return n.cluster.IsIDRemoved(id)
}
// NodeRemoved signals that node was removed from cluster and should stop.
// Part of transport.Raft interface.
func (n *Node) NodeRemoved() {
n.removeRaftOnce.Do(func() {
atomic.StoreUint32(&n.isMember, 0)
close(n.RemovedFromRaft)
})
}
// ReportSnapshot reports snapshot status to underlying raft node.
// Part of transport.Raft interface.
func (n *Node) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
n.raftNode.ReportSnapshot(id, status)
}
// ReportUnreachable reports to underlying raft node that member with id is
// unreachable.
// Part of transport.Raft interface.
func (n *Node) ReportUnreachable(id uint64) {
n.raftNode.ReportUnreachable(id)
}
// SetAddr provides the raft node's address. This can be used in cases where
// opts.Addr was not provided to NewNode, for example when a port was not bound
// until after the raft node was created.
func (n *Node) SetAddr(ctx context.Context, addr string) error {
n.addrLock.Lock()
defer n.addrLock.Unlock()
n.opts.Addr = addr
if !n.IsMember() {
return nil
}
newRaftMember := &api.RaftMember{
RaftID: n.Config.ID,
NodeID: n.opts.ID,
Addr: addr,
}
if err := n.cluster.UpdateMember(n.Config.ID, newRaftMember); err != nil {
return err
}
// If the raft node is running, submit a configuration change
// with the new address.
// TODO(aaronl): Currently, this node must be the leader to
// submit this configuration change. This works for the initial
// use cases (single-node cluster late binding ports, or calling
// SetAddr before joining a cluster). In the future, we may want
// to support having a follower proactively change its remote
// address.
leadershipCh, cancelWatch := n.SubscribeLeadership()
defer cancelWatch()
ctx, cancelCtx := n.WithContext(ctx)
defer cancelCtx()
isLeader := atomic.LoadUint32(&n.signalledLeadership) == 1
for !isLeader {
select {
case leadershipChange := <-leadershipCh:
if leadershipChange == IsLeader {
isLeader = true
}
case <-ctx.Done():
return ctx.Err()
}
}
return n.updateNodeBlocking(ctx, n.Config.ID, addr)
}
// WithContext returns context which is cancelled when parent context cancelled
// or node is stopped.
func (n *Node) WithContext(ctx context.Context) (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(ctx)
go func() {
select {
case <-ctx.Done():
case <-n.stopped:
cancel()
}
}()
return ctx, cancel
}
func (n *Node) initTransport() {
transportConfig := &transport.Config{
HeartbeatInterval: time.Duration(n.Config.ElectionTick) * n.opts.TickInterval,
SendTimeout: n.opts.SendTimeout,
Credentials: n.opts.TLSCredentials,
Raft: n,
}
n.transport = transport.New(transportConfig)
}
// JoinAndStart joins and starts the raft server
func (n *Node) JoinAndStart(ctx context.Context) (err error) {
ctx, cancel := n.WithContext(ctx)
defer func() {
cancel()
if err != nil {
n.stopMu.Lock()
// to shutdown transport
n.cancelFunc()
n.stopMu.Unlock()
n.done()
} else {
atomic.StoreUint32(&n.isMember, 1)
}
}()
loadAndStartErr := n.loadAndStart(ctx, n.opts.ForceNewCluster)
if loadAndStartErr != nil && loadAndStartErr != storage.ErrNoWAL {
return loadAndStartErr
}
snapshot, err := n.raftStore.Snapshot()
// Snapshot never returns an error
if err != nil {
panic("could not get snapshot of raft store")
}
n.confState = snapshot.Metadata.ConfState
n.appliedIndex = snapshot.Metadata.Index
n.snapshotMeta = snapshot.Metadata
n.writtenWALIndex, _ = n.raftStore.LastIndex() // lastIndex always returns nil as an error
n.addrLock.Lock()
defer n.addrLock.Unlock()
// override the module field entirely, since etcd/raft is not exactly a submodule
n.Config.Logger = log.G(ctx).WithField("module", "raft")
// restore from snapshot
if loadAndStartErr == nil {
if n.opts.JoinAddr != "" && n.opts.ForceJoin {
if err := n.joinCluster(ctx); err != nil {
return errors.Wrap(err, "failed to rejoin cluster")
}
}
n.campaignWhenAble = true
n.initTransport()
n.raftNode = raft.RestartNode(n.Config)
return nil
}
if n.opts.JoinAddr == "" {
// First member in the cluster, self-assign ID
n.Config.ID = uint64(rand.Int63()) + 1
peer, err := n.newRaftLogs(n.opts.ID)
if err != nil {
return err
}
n.campaignWhenAble = true
n.initTransport()
n.raftNode = raft.StartNode(n.Config, []raft.Peer{peer})
return nil
}
// join to existing cluster
if err := n.joinCluster(ctx); err != nil {
return err
}
if _, err := n.newRaftLogs(n.opts.ID); err != nil {
return err
}
n.initTransport()
n.raftNode = raft.RestartNode(n.Config)
return nil
}
func (n *Node) joinCluster(ctx context.Context) error {
if n.opts.Addr == "" {
return errors.New("attempted to join raft cluster without knowing own address")
}
conn, err := dial(n.opts.JoinAddr, "tcp", n.opts.TLSCredentials, 10*time.Second)
if err != nil {
return err
}
defer conn.Close()
client := api.NewRaftMembershipClient(conn)
joinCtx, joinCancel := context.WithTimeout(ctx, n.reqTimeout())
defer joinCancel()
resp, err := client.Join(joinCtx, &api.JoinRequest{
Addr: n.opts.Addr,
})
if err != nil {
return err
}
n.Config.ID = resp.RaftID
n.bootstrapMembers = resp.Members
return nil
}
// DefaultNodeConfig returns the default config for a
// raft node that can be modified and customized
func DefaultNodeConfig() *raft.Config {
return &raft.Config{
HeartbeatTick: 1,
// Recommended value in etcd/raft is 10 x (HeartbeatTick).
// Lower values were seen to have caused instability because of
// frequent leader elections when running on flakey networks.
ElectionTick: 10,
MaxSizePerMsg: math.MaxUint16,
MaxInflightMsgs: 256,
Logger: log.L,
CheckQuorum: true,
}
}
// DefaultRaftConfig returns a default api.RaftConfig.
func DefaultRaftConfig() api.RaftConfig {
return api.RaftConfig{
KeepOldSnapshots: 0,
SnapshotInterval: 10000,
LogEntriesForSlowFollowers: 500,
// Recommended value in etcd/raft is 10 x (HeartbeatTick).
// Lower values were seen to have caused instability because of
// frequent leader elections when running on flakey networks.
HeartbeatTick: 1,
ElectionTick: 10,
}
}
// MemoryStore returns the memory store that is kept in sync with the raft log.
func (n *Node) MemoryStore() *store.MemoryStore {
return n.memoryStore
}
func (n *Node) done() {
n.cluster.Clear()
n.ticker.Stop()
n.leadershipBroadcast.Close()
n.cluster.PeersBroadcast.Close()
n.memoryStore.Close()
if n.transport != nil {
n.transport.Stop()
}
close(n.doneCh)
}
// ClearData tells the raft node to delete its WALs, snapshots, and keys on
// shutdown.
func (n *Node) ClearData() {
n.clearData = true
}
// Run is the main loop for a Raft node, it goes along the state machine,
// acting on the messages received from other Raft nodes in the cluster.
//
// Before running the main loop, it first starts the raft node based on saved
// cluster state. If no saved state exists, it starts a single-node cluster.
func (n *Node) Run(ctx context.Context) error {
ctx = log.WithLogger(ctx, logrus.WithField("raft_id", fmt.Sprintf("%x", n.Config.ID)))
ctx, cancel := context.WithCancel(ctx)
for _, node := range n.bootstrapMembers {
if err := n.registerNode(node); err != nil {
log.G(ctx).WithError(err).Errorf("failed to register member %x", node.RaftID)
}
}
defer func() {
cancel()
n.stop(ctx)
if n.clearData {
// Delete WAL and snapshots, since they are no longer
// usable.
if err := n.raftLogger.Clear(ctx); err != nil {
log.G(ctx).WithError(err).Error("failed to move wal after node removal")
}
// clear out the DEKs
if err := n.keyRotator.UpdateKeys(EncryptionKeys{}); err != nil {
log.G(ctx).WithError(err).Error("could not remove DEKs")
}
}
n.done()
}()
// Flag that indicates if this manager node is *currently* the raft leader.
wasLeader := false
transferLeadershipLimit := rate.NewLimiter(rate.Every(time.Minute), 1)
for {
select {
case <-n.ticker.C():
n.raftNode.Tick()
if n.leader() == raft.None {
atomic.AddUint32(&n.ticksWithNoLeader, 1)
} else {
atomic.StoreUint32(&n.ticksWithNoLeader, 0)
}
case rd := <-n.raftNode.Ready():
raftConfig := n.getCurrentRaftConfig()
// Save entries to storage
if err := n.saveToStorage(ctx, &raftConfig, rd.HardState, rd.Entries, rd.Snapshot); err != nil {
return errors.Wrap(err, "failed to save entries to storage")
}
// If the memory store lock has been held for too long,
// transferring leadership is an easy way to break out of it.
if wasLeader &&
(rd.SoftState == nil || rd.SoftState.RaftState == raft.StateLeader) &&
n.memoryStore.Wedged() &&
transferLeadershipLimit.Allow() {
log.G(ctx).Error("Attempting to transfer leadership")
if !n.opts.DisableStackDump {
stackDump()
}
transferee, err := n.transport.LongestActive()
if err != nil {
log.G(ctx).WithError(err).Error("failed to get longest-active member")
} else {
log.G(ctx).Error("data store lock held too long - transferring leadership")
n.raftNode.TransferLeadership(ctx, n.Config.ID, transferee)
}
}
for _, msg := range rd.Messages {
// if the message is a snapshot, before we send it, we should
// overwrite the original ConfState from the snapshot with the
// current one
if msg.Type == raftpb.MsgSnap {
msg.Snapshot.Metadata.ConfState = n.confState
}
// Send raft messages to peers
if err := n.transport.Send(msg); err != nil {
log.G(ctx).WithError(err).Error("failed to send message to member")
}
}
// Apply snapshot to memory store. The snapshot
// was applied to the raft store in
// saveToStorage.
if !raft.IsEmptySnap(rd.Snapshot) {
// Load the snapshot data into the store
if err := n.restoreFromSnapshot(ctx, rd.Snapshot.Data); err != nil {
log.G(ctx).WithError(err).Error("failed to restore cluster from snapshot")
}
n.appliedIndex = rd.Snapshot.Metadata.Index
n.snapshotMeta = rd.Snapshot.Metadata
n.confState = rd.Snapshot.Metadata.ConfState
}
// If we cease to be the leader, we must cancel any
// proposals that are currently waiting for a quorum to
// acknowledge them. It is still possible for these to
// become committed, but if that happens we will apply
// them as any follower would.
// It is important that we cancel these proposals before
// calling processCommitted, so processCommitted does
// not deadlock.
if rd.SoftState != nil {
if wasLeader && rd.SoftState.RaftState != raft.StateLeader {
wasLeader = false
log.G(ctx).Error("soft state changed, node no longer a leader, resetting and cancelling all waits")
if atomic.LoadUint32(&n.signalledLeadership) == 1 {
atomic.StoreUint32(&n.signalledLeadership, 0)
n.leadershipBroadcast.Publish(IsFollower)
}
// It is important that we set n.signalledLeadership to 0
// before calling n.wait.cancelAll. When a new raft
// request is registered, it checks n.signalledLeadership
// afterwards, and cancels the registration if it is 0.
// If cancelAll was called first, this call might run
// before the new request registers, but
// signalledLeadership would be set after the check.
// Setting signalledLeadership before calling cancelAll
// ensures that if a new request is registered during
// this transition, it will either be cancelled by
// cancelAll, or by its own check of signalledLeadership.
n.wait.cancelAll()
} else if !wasLeader && rd.SoftState.RaftState == raft.StateLeader {
// Node just became a leader.
wasLeader = true
}
}
// Process committed entries
for _, entry := range rd.CommittedEntries {
if err := n.processCommitted(ctx, entry); err != nil {
log.G(ctx).WithError(err).Error("failed to process committed entries")
}
}
// in case the previous attempt to update the key failed
n.maybeMarkRotationFinished(ctx)
// Trigger a snapshot every once in awhile
if n.snapshotInProgress == nil &&
(n.needsSnapshot(ctx) || raftConfig.SnapshotInterval > 0 &&
n.appliedIndex-n.snapshotMeta.Index >= raftConfig.SnapshotInterval) {
n.triggerSnapshot(ctx, raftConfig)
}
if wasLeader && atomic.LoadUint32(&n.signalledLeadership) != 1 {
// If all the entries in the log have become
// committed, broadcast our leadership status.
if n.caughtUp() {
atomic.StoreUint32(&n.signalledLeadership, 1)
n.leadershipBroadcast.Publish(IsLeader)
}
}
// Advance the state machine
n.raftNode.Advance()
// On the first startup, or if we are the only
// registered member after restoring from the state,
// campaign to be the leader.
if n.campaignWhenAble {
members := n.cluster.Members()
if len(members) >= 1 {
n.campaignWhenAble = false
}
if len(members) == 1 && members[n.Config.ID] != nil {
n.raftNode.Campaign(ctx)
}
}
case snapshotMeta := <-n.snapshotInProgress:
raftConfig := n.getCurrentRaftConfig()
if snapshotMeta.Index > n.snapshotMeta.Index {
n.snapshotMeta = snapshotMeta
if err := n.raftLogger.GC(snapshotMeta.Index, snapshotMeta.Term, raftConfig.KeepOldSnapshots); err != nil {
log.G(ctx).WithError(err).Error("failed to clean up old snapshots and WALs")
}
}
n.snapshotInProgress = nil
n.maybeMarkRotationFinished(ctx)
if n.rotationQueued && n.needsSnapshot(ctx) {
// there was a key rotation that took place before while the snapshot
// was in progress - we have to take another snapshot and encrypt with the new key
n.rotationQueued = false
n.triggerSnapshot(ctx, raftConfig)
}
case <-n.keyRotator.RotationNotify():
// There are 2 separate checks: rotationQueued, and n.needsSnapshot().
// We set rotationQueued so that when we are notified of a rotation, we try to
// do a snapshot as soon as possible. However, if there is an error while doing
// the snapshot, we don't want to hammer the node attempting to do snapshots over
// and over. So if doing a snapshot fails, wait until the next entry comes in to
// try again.
switch {
case n.snapshotInProgress != nil:
n.rotationQueued = true
case n.needsSnapshot(ctx):
n.triggerSnapshot(ctx, n.getCurrentRaftConfig())
}
case <-ctx.Done():
return nil
}
}
}
func (n *Node) restoreFromSnapshot(ctx context.Context, data []byte) error {
snapCluster, err := n.clusterSnapshot(data)
if err != nil {
return err
}
oldMembers := n.cluster.Members()
for _, member := range snapCluster.Members {
delete(oldMembers, member.RaftID)
}
for _, removedMember := range snapCluster.Removed {
n.cluster.RemoveMember(removedMember)
n.transport.RemovePeer(removedMember)
delete(oldMembers, removedMember)
}
for id, member := range oldMembers {
n.cluster.ClearMember(id)
if err := n.transport.RemovePeer(member.RaftID); err != nil {
log.G(ctx).WithError(err).Errorf("failed to remove peer %x from transport", member.RaftID)
}
}
for _, node := range snapCluster.Members {
if err := n.registerNode(&api.RaftMember{RaftID: node.RaftID, NodeID: node.NodeID, Addr: node.Addr}); err != nil {
log.G(ctx).WithError(err).Error("failed to register node from snapshot")
}
}
return nil
}
func (n *Node) needsSnapshot(ctx context.Context) bool {
if n.waitForAppliedIndex == 0 && n.keyRotator.NeedsRotation() {
keys := n.keyRotator.GetKeys()
if keys.PendingDEK != nil {
n.raftLogger.RotateEncryptionKey(keys.PendingDEK)
// we want to wait for the last index written with the old DEK to be committed, else a snapshot taken
// may have an index less than the index of a WAL written with an old DEK. We want the next snapshot
// written with the new key to supercede any WAL written with an old DEK.
n.waitForAppliedIndex = n.writtenWALIndex
// if there is already a snapshot at this index or higher, bump the wait index up to 1 higher than the current
// snapshot index, because the rotation cannot be completed until the next snapshot
if n.waitForAppliedIndex <= n.snapshotMeta.Index {
n.waitForAppliedIndex = n.snapshotMeta.Index + 1
}
log.G(ctx).Debugf(
"beginning raft DEK rotation - last indices written with the old key are (snapshot: %d, WAL: %d) - waiting for snapshot of index %d to be written before rotation can be completed", n.snapshotMeta.Index, n.writtenWALIndex, n.waitForAppliedIndex)
}
}
result := n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.appliedIndex
if result {
log.G(ctx).Debugf(
"a snapshot at index %d is needed in order to complete raft DEK rotation - a snapshot with index >= %d can now be triggered",
n.waitForAppliedIndex, n.appliedIndex)
}
return result
}
func (n *Node) maybeMarkRotationFinished(ctx context.Context) {
if n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.snapshotMeta.Index {
// this means we tried to rotate - so finish the rotation
if err := n.keyRotator.UpdateKeys(EncryptionKeys{CurrentDEK: n.raftLogger.EncryptionKey}); err != nil {
log.G(ctx).WithError(err).Error("failed to update encryption keys after a successful rotation")
} else {
log.G(ctx).Debugf(
"a snapshot with index %d is available, which completes the DEK rotation requiring a snapshot of at least index %d - throwing away DEK and older snapshots encrypted with the old key",
n.snapshotMeta.Index, n.waitForAppliedIndex)
n.waitForAppliedIndex = 0
if err := n.raftLogger.GC(n.snapshotMeta.Index, n.snapshotMeta.Term, 0); err != nil {
log.G(ctx).WithError(err).Error("failed to remove old snapshots and WALs that were written with the previous raft DEK")
}
}
}
}
func (n *Node) getCurrentRaftConfig() api.RaftConfig {
raftConfig := DefaultRaftConfig()
n.memoryStore.View(func(readTx store.ReadTx) {
clusters, err := store.FindClusters(readTx, store.ByName(store.DefaultClusterName))
if err == nil && len(clusters) == 1 {
raftConfig = clusters[0].Spec.Raft
}
})
return raftConfig
}
// Cancel interrupts all ongoing proposals, and prevents new ones from
// starting. This is useful for the shutdown sequence because it allows
// the manager to shut down raft-dependent services that might otherwise
// block on shutdown if quorum isn't met. Then the raft node can be completely
// shut down once no more code is using it.
func (n *Node) Cancel() {
n.cancelFunc()
}
// Done returns channel which is closed when raft node is fully stopped.
func (n *Node) Done() <-chan struct{} {
return n.doneCh
}
func (n *Node) stop(ctx context.Context) {
n.stopMu.Lock()
defer n.stopMu.Unlock()
n.Cancel()
n.waitProp.Wait()
n.asyncTasks.Wait()
n.raftNode.Stop()
n.ticker.Stop()
n.raftLogger.Close(ctx)
atomic.StoreUint32(&n.isMember, 0)
// TODO(stevvooe): Handle ctx.Done()
}
// isLeader checks if we are the leader or not, without the protection of lock
func (n *Node) isLeader() bool {
if !n.IsMember() {
return false
}
if n.Status().Lead == n.Config.ID {
return true
}
return false
}
// IsLeader checks if we are the leader or not, with the protection of lock
func (n *Node) IsLeader() bool {
n.stopMu.RLock()
defer n.stopMu.RUnlock()
return n.isLeader()
}
// leader returns the id of the leader, without the protection of lock and
// membership check, so it's caller task.
func (n *Node) leader() uint64 {
return n.Status().Lead
}
// Leader returns the id of the leader, with the protection of lock
func (n *Node) Leader() (uint64, error) {
n.stopMu.RLock()
defer n.stopMu.RUnlock()
if !n.IsMember() {
return raft.None, ErrNoRaftMember
}
leader := n.leader()
if leader == raft.None {
return raft.None, ErrNoClusterLeader
}
return leader, nil
}
// ReadyForProposals returns true if the node has broadcasted a message
// saying that it has become the leader. This means it is ready to accept
// proposals.
func (n *Node) ReadyForProposals() bool {
return atomic.LoadUint32(&n.signalledLeadership) == 1
}
func (n *Node) caughtUp() bool {
// obnoxious function that always returns a nil error
lastIndex, _ := n.raftStore.LastIndex()
return n.appliedIndex >= lastIndex
}
// Join asks to a member of the raft to propose
// a configuration change and add us as a member thus
// beginning the log replication process. This method
// is called from an aspiring member to an existing member
func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinResponse, error) {
nodeInfo, err := ca.RemoteNode(ctx)
if err != nil {
return nil, err
}
fields := log.Fields{
"node.id": nodeInfo.NodeID,
"method": "(*Node).Join",
"raft_id": fmt.Sprintf("%x", n.Config.ID),
}
if nodeInfo.ForwardedBy != nil {
fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
}
logger := log.G(ctx).WithFields(fields)
logger.Debug("")
// can't stop the raft node while an async RPC is in progress
n.stopMu.RLock()
defer n.stopMu.RUnlock()
n.membershipLock.Lock()
defer n.membershipLock.Unlock()
if !n.IsMember() {
return nil, status.Errorf(codes.FailedPrecondition, "%s", ErrNoRaftMember.Error())
}
if !n.isLeader() {
return nil, status.Errorf(codes.FailedPrecondition, "%s", ErrLostLeadership.Error())
}
remoteAddr := req.Addr
// If the joining node sent an address like 0.0.0.0:4242, automatically
// determine its actual address based on the GRPC connection. This
// avoids the need for a prospective member to know its own address.
requestHost, requestPort, err := net.SplitHostPort(remoteAddr)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid address %s in raft join request", remoteAddr)
}
requestIP := net.ParseIP(requestHost)
if requestIP != nil && requestIP.IsUnspecified() {
remoteHost, _, err := net.SplitHostPort(nodeInfo.RemoteAddr)
if err != nil {
return nil, err
}
remoteAddr = net.JoinHostPort(remoteHost, requestPort)
}
// We do not bother submitting a configuration change for the
// new member if we can't contact it back using its address
if err := n.checkHealth(ctx, remoteAddr, 5*time.Second); err != nil {
return nil, err
}
// If the peer is already a member of the cluster, we will only update
// its information, not add it as a new member. Adding it again would
// cause the quorum to be computed incorrectly.
for _, m := range n.cluster.Members() {
if m.NodeID == nodeInfo.NodeID {
if remoteAddr == m.Addr {
return n.joinResponse(m.RaftID), nil
}
updatedRaftMember := &api.RaftMember{
RaftID: m.RaftID,
NodeID: m.NodeID,
Addr: remoteAddr,
}
if err := n.cluster.UpdateMember(m.RaftID, updatedRaftMember); err != nil {
return nil, err
}
if err := n.updateNodeBlocking(ctx, m.RaftID, remoteAddr); err != nil {
logger.WithError(err).Error("failed to update node address")
return nil, err
}
logger.Info("updated node address")
return n.joinResponse(m.RaftID), nil
}
}
// Find a unique ID for the joining member.
var raftID uint64
for {
raftID = uint64(rand.Int63()) + 1
if n.cluster.GetMember(raftID) == nil && !n.cluster.IsIDRemoved(raftID) {
break
}
}
err = n.addMember(ctx, remoteAddr, raftID, nodeInfo.NodeID)
if err != nil {
logger.WithError(err).Errorf("failed to add member %x", raftID)
return nil, err
}
logger.Debug("node joined")
return n.joinResponse(raftID), nil
}
func (n *Node) joinResponse(raftID uint64) *api.JoinResponse {
var nodes []*api.RaftMember
for _, node := range n.cluster.Members() {
nodes = append(nodes, &api.RaftMember{
RaftID: node.RaftID,
NodeID: node.NodeID,
Addr: node.Addr,
})
}
return &api.JoinResponse{Members: nodes, RaftID: raftID}
}
// checkHealth tries to contact an aspiring member through its advertised address
// and checks if its raft server is running.
func (n *Node) checkHealth(ctx context.Context, addr string, timeout time.Duration) error {
conn, err := dial(addr, "tcp", n.opts.TLSCredentials, timeout)
if err != nil {
return err
}
defer conn.Close()
if timeout != 0 {
tctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
ctx = tctx
}
healthClient := api.NewHealthClient(conn)
resp, err := healthClient.Check(ctx, &api.HealthCheckRequest{Service: "Raft"})
if err != nil {
return errors.Wrap(err, "could not connect to prospective new cluster member using its advertised address")
}
if resp.Status != api.HealthCheckResponse_SERVING {
return fmt.Errorf("health check returned status %s", resp.Status.String())
}
return nil
}
// addMember submits a configuration change to add a new member on the raft cluster.
func (n *Node) addMember(ctx context.Context, addr string, raftID uint64, nodeID string) error {
node := api.RaftMember{
RaftID: raftID,
NodeID: nodeID,
Addr: addr,
}
meta, err := node.Marshal()
if err != nil {
return err
}
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: raftID,
Context: meta,
}
// Wait for a raft round to process the configuration change
return n.configure(ctx, cc)
}
// updateNodeBlocking runs synchronous job to update node address in whole cluster.
func (n *Node) updateNodeBlocking(ctx context.Context, id uint64, addr string) error {
m := n.cluster.GetMember(id)
if m == nil {
return errors.Errorf("member %x is not found for update", id)
}
node := api.RaftMember{
RaftID: m.RaftID,
NodeID: m.NodeID,
Addr: addr,
}
meta, err := node.Marshal()
if err != nil {
return err
}
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeUpdateNode,
NodeID: id,
Context: meta,
}
// Wait for a raft round to process the configuration change
return n.configure(ctx, cc)
}
// UpdateNode submits a configuration change to change a member's address.
func (n *Node) UpdateNode(id uint64, addr string) {
ctx, cancel := n.WithContext(context.Background())
defer cancel()
// spawn updating info in raft in background to unblock transport
go func() {
if err := n.updateNodeBlocking(ctx, id, addr); err != nil {
log.G(ctx).WithFields(log.Fields{"raft_id": n.Config.ID, "update_id": id}).WithError(err).Error("failed to update member address in cluster")
}
}()
}
// Leave asks to a member of the raft to remove
// us from the raft cluster. This method is called
// from a member who is willing to leave its raft
// membership to an active member of the raft
func (n *Node) Leave(ctx context.Context, req *api.LeaveRequest) (*api.LeaveResponse, error) {
if req.Node == nil {
return nil, status.Errorf(codes.InvalidArgument, "no node information provided")
}
nodeInfo, err := ca.RemoteNode(ctx)
if err != nil {
return nil, err
}
ctx, cancel := n.WithContext(ctx)
defer cancel()
fields := log.Fields{
"node.id": nodeInfo.NodeID,
"method": "(*Node).Leave",
"raft_id": fmt.Sprintf("%x", n.Config.ID),
}
if nodeInfo.ForwardedBy != nil {
fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
}
log.G(ctx).WithFields(fields).Debug("")
if err := n.removeMember(ctx, req.Node.RaftID); err != nil {
return nil, err
}
return &api.LeaveResponse{}, nil
}
// CanRemoveMember checks if a member can be removed from
// the context of the current node.
func (n *Node) CanRemoveMember(id uint64) bool {
members := n.cluster.Members()
nreachable := 0 // reachable managers after removal
for _, m := range members {
if m.RaftID == id {
continue
}
// Local node from where the remove is issued
if m.RaftID == n.Config.ID {
nreachable++
continue
}
if n.transport.Active(m.RaftID) {
nreachable++
}
}
nquorum := (len(members)-1)/2 + 1
return nreachable >= nquorum
}
func (n *Node) removeMember(ctx context.Context, id uint64) error {
// can't stop the raft node while an async RPC is in progress
n.stopMu.RLock()
defer n.stopMu.RUnlock()
if !n.IsMember() {
return ErrNoRaftMember
}
if !n.isLeader() {
return ErrLostLeadership
}
n.membershipLock.Lock()
defer n.membershipLock.Unlock()
if !n.CanRemoveMember(id) {
return ErrCannotRemoveMember
}
cc := raftpb.ConfChange{
ID: id,
Type: raftpb.ConfChangeRemoveNode,
NodeID: id,
Context: []byte(""),
}
return n.configure(ctx, cc)
}
// TransferLeadership attempts to transfer leadership to a different node,
// and wait for the transfer to happen.
func (n *Node) TransferLeadership(ctx context.Context) error {
ctx, cancelTransfer := context.WithTimeout(ctx, n.reqTimeout())
defer cancelTransfer()
n.stopMu.RLock()
defer n.stopMu.RUnlock()
if !n.IsMember() {
return ErrNoRaftMember
}
if !n.isLeader() {
return ErrLostLeadership
}
transferee, err := n.transport.LongestActive()
if err != nil {
return errors.Wrap(err, "failed to get longest-active member")
}
start := time.Now()
n.raftNode.TransferLeadership(ctx, n.Config.ID, transferee)
ticker := time.NewTicker(n.opts.TickInterval / 10)
defer ticker.Stop()
var leader uint64
for {
leader = n.leader()
if leader != raft.None && leader != n.Config.ID {
break
}
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}
}
log.G(ctx).Infof("raft: transfer leadership %x -> %x finished in %v", n.Config.ID, leader, time.Since(start))
return nil
}
// RemoveMember submits a configuration change to remove a member from the raft cluster
// after checking if the operation would not result in a loss of quorum.
func (n *Node) RemoveMember(ctx context.Context, id uint64) error {
ctx, cancel := n.WithContext(ctx)
defer cancel()
return n.removeMember(ctx, id)
}
// processRaftMessageLogger is used to lazily create a logger for
// ProcessRaftMessage. Usually nothing will be logged, so it is useful to avoid
// formatting strings and allocating a logger when it won't be used.
func (n *Node) processRaftMessageLogger(ctx context.Context, msg *api.ProcessRaftMessageRequest) *logrus.Entry {
fields := log.Fields{
"method": "(*Node).ProcessRaftMessage",
}
if n.IsMember() {
fields["raft_id"] = fmt.Sprintf("%x", n.Config.ID)
}
if msg != nil && msg.Message != nil {
fields["from"] = fmt.Sprintf("%x", msg.Message.From)
}
return log.G(ctx).WithFields(fields)
}
//nolint:unused // currently unused, but should be used again; see TODO in Node.ProcessRaftMessage
func (n *Node) reportNewAddress(ctx context.Context, id uint64) error {
// too early
if !n.IsMember() {
return nil
}
p, ok := peer.FromContext(ctx)
if !ok {
return nil
}
oldAddr, err := n.transport.PeerAddr(id)
if err != nil {
return err
}
if oldAddr == "" {
// Don't know the address of the peer yet, so can't report an
// update.
return nil
}
newHost, _, err := net.SplitHostPort(p.Addr.String())
if err != nil {
return err
}
_, officialPort, err := net.SplitHostPort(oldAddr)
if err != nil {
return err
}
newAddr := net.JoinHostPort(newHost, officialPort)
return n.transport.UpdatePeerAddr(id, newAddr)
}
// StreamRaftMessage is the server endpoint for streaming Raft messages.
// It accepts a stream of raft messages to be processed on this raft member,
// returning a StreamRaftMessageResponse when processing of the streamed
// messages is complete.
// It is called from the Raft leader, which uses it to stream messages
// to this raft member.
// A single stream corresponds to a single raft message,
// which may be disassembled and streamed by the sender
// as individual messages. Therefore, each of the messages
// received by the stream will have the same raft message type and index.
// Currently, only messages of type raftpb.MsgSnap can be disassembled, sent
// and received on the stream.
func (n *Node) StreamRaftMessage(stream api.Raft_StreamRaftMessageServer) error {
// recvdMsg is the current messasge received from the stream.
// assembledMessage is where the data from recvdMsg is appended to.
var recvdMsg, assembledMessage *api.StreamRaftMessageRequest
var err error
// First message index.
var raftMsgIndex uint64
for {
recvdMsg, err = stream.Recv()
if err == io.EOF {
break
} else if err != nil {
log.G(stream.Context()).WithError(err).Error("error while reading from stream")
return err
}
// Initialized the message to be used for assembling
// the raft message.
if assembledMessage == nil {
// For all message types except raftpb.MsgSnap,
// we don't expect more than a single message
// on the stream so we'll get an EOF on the next Recv()
// and go on to process the received message.
assembledMessage = recvdMsg
raftMsgIndex = recvdMsg.Message.Index
continue
}
// Verify raft message index.
if recvdMsg.Message.Index != raftMsgIndex {
errMsg := fmt.Sprintf("Raft message chunk with index %d is different from the previously received raft message index %d",
recvdMsg.Message.Index, raftMsgIndex)
log.G(stream.Context()).Errorf(errMsg)
return status.Errorf(codes.InvalidArgument, "%s", errMsg)
}
// Verify that multiple message received on a stream
// can only be of type raftpb.MsgSnap.
if recvdMsg.Message.Type != raftpb.MsgSnap {
errMsg := fmt.Sprintf("Raft message chunk is not of type %d",
raftpb.MsgSnap)
log.G(stream.Context()).Errorf(errMsg)
return status.Errorf(codes.InvalidArgument, "%s", errMsg)
}
// Append the received snapshot data.
assembledMessage.Message.Snapshot.Data = append(assembledMessage.Message.Snapshot.Data, recvdMsg.Message.Snapshot.Data...)
}
// We should have the complete snapshot. Verify and process.
if err == io.EOF {
_, err = n.ProcessRaftMessage(stream.Context(), &api.ProcessRaftMessageRequest{Message: assembledMessage.Message})
if err == nil {
// Translate the response of ProcessRaftMessage() from
// ProcessRaftMessageResponse to StreamRaftMessageResponse if needed.
return stream.SendAndClose(&api.StreamRaftMessageResponse{})
}
}
return err
}
// ProcessRaftMessage calls 'Step' which advances the
// raft state machine with the provided message on the
// receiving node
func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessageRequest) (*api.ProcessRaftMessageResponse, error) {
if msg == nil || msg.Message == nil {
n.processRaftMessageLogger(ctx, msg).Debug("received empty message")
return &api.ProcessRaftMessageResponse{}, nil
}
// Don't process the message if this comes from
// a node in the remove set
if n.cluster.IsIDRemoved(msg.Message.From) {
n.processRaftMessageLogger(ctx, msg).Debug("received message from removed member")
return nil, status.Errorf(codes.NotFound, "%s", membership.ErrMemberRemoved.Error())
}
ctx, cancel := n.WithContext(ctx)
defer cancel()
// TODO(aaronl): Address changes are temporarily disabled.
// See https://github.com/docker/docker/issues/30455.
// This should be reenabled in the future with additional
// safeguards (perhaps storing multiple addresses per node).
// if err := n.reportNewAddress(ctx, msg.Message.From); err != nil {
// log.G(ctx).WithError(err).Errorf("failed to report new address of %x to transport", msg.Message.From)
// }
// Reject vote requests from unreachable peers
if msg.Message.Type == raftpb.MsgVote {
member := n.cluster.GetMember(msg.Message.From)
if member == nil {
n.processRaftMessageLogger(ctx, msg).Debug("received message from unknown member")
return &api.ProcessRaftMessageResponse{}, nil
}
if err := n.transport.HealthCheck(ctx, msg.Message.From); err != nil {
n.processRaftMessageLogger(ctx, msg).WithError(err).Debug("member which sent vote request failed health check")
return &api.ProcessRaftMessageResponse{}, nil
}
}
if msg.Message.Type == raftpb.MsgProp {
// We don't accept forwarded proposals. Our
// current architecture depends on only the leader
// making proposals, so in-flight proposals can be
// guaranteed not to conflict.
n.processRaftMessageLogger(ctx, msg).Debug("dropped forwarded proposal")
return &api.ProcessRaftMessageResponse{}, nil
}
// can't stop the raft node while an async RPC is in progress
n.stopMu.RLock()
defer n.stopMu.RUnlock()
if n.IsMember() {
if msg.Message.To != n.Config.ID {
n.processRaftMessageLogger(ctx, msg).Errorf("received message intended for raft_id %x", msg.Message.To)
return &api.ProcessRaftMessageResponse{}, nil
}
if err := n.raftNode.Step(ctx, *msg.Message); err != nil {
n.processRaftMessageLogger(ctx, msg).WithError(err).Debug("raft Step failed")
}
}
return &api.ProcessRaftMessageResponse{}, nil
}
// ResolveAddress returns the address reaching for a given node ID.
func (n *Node) ResolveAddress(ctx context.Context, msg *api.ResolveAddressRequest) (*api.ResolveAddressResponse, error) {
if !n.IsMember() {
return nil, ErrNoRaftMember
}
nodeInfo, err := ca.RemoteNode(ctx)
if err != nil {
return nil, err
}
fields := log.Fields{
"node.id": nodeInfo.NodeID,
"method": "(*Node).ResolveAddress",
"raft_id": fmt.Sprintf("%x", n.Config.ID),
}
if nodeInfo.ForwardedBy != nil {
fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
}
log.G(ctx).WithFields(fields).Debug("")
member := n.cluster.GetMember(msg.RaftID)
if member == nil {
return nil, status.Errorf(codes.NotFound, "member %x not found", msg.RaftID)
}
return &api.ResolveAddressResponse{Addr: member.Addr}, nil
}
func (n *Node) getLeaderConn() (*grpc.ClientConn, error) {
leader, err := n.Leader()
if err != nil {
return nil, err
}
if leader == n.Config.ID {
return nil, raftselector.ErrIsLeader
}
conn, err := n.transport.PeerConn(leader)
if err != nil {
return nil, errors.Wrap(err, "failed to get connection to leader")
}
return conn, nil
}
// LeaderConn returns current connection to cluster leader or raftselector.ErrIsLeader
// if current machine is leader.
func (n *Node) LeaderConn(ctx context.Context) (*grpc.ClientConn, error) {
cc, err := n.getLeaderConn()
if err == nil {
return cc, nil
}
if err == raftselector.ErrIsLeader {
return nil, err
}
if atomic.LoadUint32(&n.ticksWithNoLeader) > lostQuorumTimeout {
return nil, errLostQuorum
}
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
cc, err := n.getLeaderConn()
if err == nil {
return cc, nil
}
if err == raftselector.ErrIsLeader {
return nil, err
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
// registerNode registers a new node on the cluster memberlist
func (n *Node) registerNode(node *api.RaftMember) error {
if n.cluster.IsIDRemoved(node.RaftID) {
return nil
}
member := &membership.Member{}
existingMember := n.cluster.GetMember(node.RaftID)
if existingMember != nil {
// Member already exists
// If the address is different from what we thought it was,
// update it. This can happen if we just joined a cluster
// and are adding ourself now with the remotely-reachable
// address.
if existingMember.Addr != node.Addr {
if node.RaftID != n.Config.ID {
if err := n.transport.UpdatePeer(node.RaftID, node.Addr); err != nil {
return err
}
}
member.RaftMember = node
n.cluster.AddMember(member)
}
return nil
}
// Avoid opening a connection to the local node
if node.RaftID != n.Config.ID {
if err := n.transport.AddPeer(node.RaftID, node.Addr); err != nil {
return err
}
}
member.RaftMember = node
err := n.cluster.AddMember(member)
if err != nil {
if rerr := n.transport.RemovePeer(node.RaftID); rerr != nil {
return errors.Wrapf(rerr, "failed to remove peer after error %v", err)
}
return err
}
return nil
}
// ProposeValue calls Propose on the underlying raft library(etcd/raft) and waits
// on the commit log action before returning a result
func (n *Node) ProposeValue(ctx context.Context, storeAction []api.StoreAction, cb func()) error {
defer metrics.StartTimer(proposeLatencyTimer)()
ctx, cancel := n.WithContext(ctx)
defer cancel()
_, err := n.processInternalRaftRequest(ctx, &api.InternalRaftRequest{Action: storeAction}, cb)
return err
}
// GetVersion returns the sequence information for the current raft round.
func (n *Node) GetVersion() *api.Version {
n.stopMu.RLock()
defer n.stopMu.RUnlock()
if !n.IsMember() {
return nil
}
status := n.Status()
return &api.Version{Index: status.Commit}
}
// ChangesBetween returns the changes starting after "from", up to and
// including "to". If these changes are not available because the log
// has been compacted, an error will be returned.
func (n *Node) ChangesBetween(from, to api.Version) ([]state.Change, error) {
n.stopMu.RLock()
defer n.stopMu.RUnlock()
if from.Index > to.Index {
return nil, errors.New("versions are out of order")
}
if !n.IsMember() {
return nil, ErrNoRaftMember
}
// never returns error
last, _ := n.raftStore.LastIndex()
if to.Index > last {
return nil, errors.New("last version is out of bounds")
}
pbs, err := n.raftStore.Entries(from.Index+1, to.Index+1, math.MaxUint64)
if err != nil {
return nil, err
}
var changes []state.Change
for _, pb := range pbs {
if pb.Type != raftpb.EntryNormal || pb.Data == nil {
continue
}
r := &api.InternalRaftRequest{}
err := proto.Unmarshal(pb.Data, r)
if err != nil {
return nil, errors.Wrap(err, "error umarshalling internal raft request")
}
if r.Action != nil {
changes = append(changes, state.Change{StoreActions: r.Action, Version: api.Version{Index: pb.Index}})
}
}
return changes, nil
}
// SubscribePeers subscribes to peer updates in cluster. It sends always full
// list of peers.
func (n *Node) SubscribePeers() (q chan events.Event, cancel func()) {
return n.cluster.PeersBroadcast.Watch()
}
// GetMemberlist returns the current list of raft members in the cluster.
func (n *Node) GetMemberlist() map[uint64]*api.RaftMember {
memberlist := make(map[uint64]*api.RaftMember)
members := n.cluster.Members()
leaderID, err := n.Leader()
if err != nil {
leaderID = raft.None
}
for id, member := range members {
reachability := api.RaftMemberStatus_REACHABLE
leader := false
if member.RaftID != n.Config.ID {
if !n.transport.Active(member.RaftID) {
reachability = api.RaftMemberStatus_UNREACHABLE
}
}
if member.RaftID == leaderID {
leader = true
}
memberlist[id] = &api.RaftMember{
RaftID: member.RaftID,
NodeID: member.NodeID,
Addr: member.Addr,
Status: api.RaftMemberStatus{
Leader: leader,
Reachability: reachability,
},
}
}
return memberlist
}
// Status returns status of underlying etcd.Node.
func (n *Node) Status() raft.Status {
return n.raftNode.Status()
}
// GetMemberByNodeID returns member information based
// on its generic Node ID.
func (n *Node) GetMemberByNodeID(nodeID string) *membership.Member {
members := n.cluster.Members()
for _, member := range members {
if member.NodeID == nodeID {
return member
}
}
return nil
}
// GetNodeIDByRaftID returns the generic Node ID of a member given its raft ID.
// It returns ErrMemberUnknown if the raft ID is unknown.
func (n *Node) GetNodeIDByRaftID(raftID uint64) (string, error) {
if member, ok := n.cluster.Members()[raftID]; ok {
return member.NodeID, nil
}
// this is the only possible error value that should be returned; the
// manager code depends on this. if you need to add more errors later, make
// sure that you update the callers of this method accordingly
return "", ErrMemberUnknown
}
// IsMember checks if the raft node has effectively joined
// a cluster of existing members.
func (n *Node) IsMember() bool {
return atomic.LoadUint32(&n.isMember) == 1
}
// Saves a log entry to our Store
func (n *Node) saveToStorage(
ctx context.Context,
raftConfig *api.RaftConfig,
hardState raftpb.HardState,
entries []raftpb.Entry,
snapshot raftpb.Snapshot,
) (err error) {
if !raft.IsEmptySnap(snapshot) {
if err := n.raftLogger.SaveSnapshot(snapshot); err != nil {
return errors.Wrap(err, "failed to save snapshot")
}
if err := n.raftLogger.GC(snapshot.Metadata.Index, snapshot.Metadata.Term, raftConfig.KeepOldSnapshots); err != nil {
log.G(ctx).WithError(err).Error("unable to clean old snapshots and WALs")
}
if err = n.raftStore.ApplySnapshot(snapshot); err != nil {
return errors.Wrap(err, "failed to apply snapshot on raft node")
}
}
if err := n.raftLogger.SaveEntries(hardState, entries); err != nil {
return errors.Wrap(err, "failed to save raft log entries")
}
if len(entries) > 0 {
lastIndex := entries[len(entries)-1].Index
if lastIndex > n.writtenWALIndex {
n.writtenWALIndex = lastIndex
}
}
if err = n.raftStore.Append(entries); err != nil {
return errors.Wrap(err, "failed to append raft log entries")
}
return nil
}
// processInternalRaftRequest proposes a value to be appended to the raft log.
// It calls Propose() on etcd/raft, which calls back into the raft FSM,
// which then sends a message to each of the participating nodes
// in the raft group to apply a log entry and then waits for it to be applied
// on this node. It will block until the this node:
// 1. Gets the necessary replies back from the participating nodes and also performs the commit itself, or
// 2. There is an error, or
// 3. Until the raft node finalizes all the proposals on node shutdown.
func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRaftRequest, cb func()) (proto.Message, error) {
n.stopMu.RLock()
if !n.IsMember() {
n.stopMu.RUnlock()
return nil, ErrStopped
}
n.waitProp.Add(1)
defer n.waitProp.Done()
n.stopMu.RUnlock()
r.ID = n.reqIDGen.Next()
// This must be derived from the context which is cancelled by stop()
// to avoid a deadlock on shutdown.
waitCtx, cancel := context.WithCancel(ctx)
ch := n.wait.register(r.ID, cb, cancel)
// Do this check after calling register to avoid a race.
if atomic.LoadUint32(&n.signalledLeadership) != 1 {
log.G(ctx).Error("node is no longer leader, aborting propose")
n.wait.cancel(r.ID)
return nil, ErrLostLeadership
}
data, err := r.Marshal()
if err != nil {
n.wait.cancel(r.ID)
return nil, err
}
if len(data) > store.MaxTransactionBytes {
n.wait.cancel(r.ID)
return nil, ErrRequestTooLarge
}
err = n.raftNode.Propose(waitCtx, data)
if err != nil {
n.wait.cancel(r.ID)
return nil, err
}
select {
case x, ok := <-ch:
if !ok {
// Wait notification channel was closed. This should only happen if the wait was cancelled.
log.G(ctx).Error("wait cancelled")
if atomic.LoadUint32(&n.signalledLeadership) == 1 {
log.G(ctx).Error("wait cancelled but node is still a leader")
}
return nil, ErrLostLeadership
}
return x.(proto.Message), nil
case <-waitCtx.Done():
n.wait.cancel(r.ID)
// If we can read from the channel, wait item was triggered. Otherwise it was cancelled.
x, ok := <-ch
if !ok {
log.G(ctx).WithError(waitCtx.Err()).Error("wait context cancelled")
if atomic.LoadUint32(&n.signalledLeadership) == 1 {
log.G(ctx).Error("wait context cancelled but node is still a leader")
}
return nil, ErrLostLeadership
}
return x.(proto.Message), nil
case <-ctx.Done():
n.wait.cancel(r.ID)
// if channel is closed, wait item was canceled, otherwise it was triggered
x, ok := <-ch
if !ok {
return nil, ctx.Err()
}
return x.(proto.Message), nil
}
}
// configure sends a configuration change through consensus and
// then waits for it to be applied to the server. It will block
// until the change is performed or there is an error.
func (n *Node) configure(ctx context.Context, cc raftpb.ConfChange) error {
cc.ID = n.reqIDGen.Next()
ctx, cancel := context.WithCancel(ctx)
ch := n.wait.register(cc.ID, nil, cancel)
if err := n.raftNode.ProposeConfChange(ctx, cc); err != nil {
n.wait.cancel(cc.ID)
return err
}
select {
case x := <-ch:
if err, ok := x.(error); ok {
return err
}
if x != nil {
log.G(ctx).Panic("raft: configuration change error, return type should always be error")
}
return nil
case <-ctx.Done():
n.wait.cancel(cc.ID)
return ctx.Err()
}
}
func (n *Node) processCommitted(ctx context.Context, entry raftpb.Entry) error {
// Process a normal entry
if entry.Type == raftpb.EntryNormal && entry.Data != nil {
if err := n.processEntry(ctx, entry); err != nil {
return err
}
}
// Process a configuration change (add/remove node)
if entry.Type == raftpb.EntryConfChange {
n.processConfChange(ctx, entry)
}
n.appliedIndex = entry.Index
return nil
}
func (n *Node) processEntry(ctx context.Context, entry raftpb.Entry) error {
r := &api.InternalRaftRequest{}
err := proto.Unmarshal(entry.Data, r)
if err != nil {
return err
}
if !n.wait.trigger(r.ID, r) {
// There was no wait on this ID, meaning we don't have a
// transaction in progress that would be committed to the
// memory store by the "trigger" call. This could mean that:
// 1. Startup is in progress, and the raft WAL is being parsed,
// processed and applied to the store, or
// 2. Either a different node wrote this to raft,
// or we wrote it before losing the leader
// position and cancelling the transaction. This entry still needs
// to be committed since other nodes have already committed it.
// Create a new transaction to commit this entry.
// It should not be possible for processInternalRaftRequest
// to be running in this situation, but out of caution we
// cancel any current invocations to avoid a deadlock.
// TODO(anshul) This call is likely redundant, remove after consideration.
n.wait.cancelAll()
err := n.memoryStore.ApplyStoreActions(r.Action)
if err != nil {
log.G(ctx).WithError(err).Error("failed to apply actions from raft")
}
}
return nil
}
func (n *Node) processConfChange(ctx context.Context, entry raftpb.Entry) {
var (
err error
cc raftpb.ConfChange
)
if err := proto.Unmarshal(entry.Data, &cc); err != nil {
n.wait.trigger(cc.ID, err)
}
if err := n.cluster.ValidateConfigurationChange(cc); err != nil {
n.wait.trigger(cc.ID, err)
}
switch cc.Type {
case raftpb.ConfChangeAddNode:
err = n.applyAddNode(cc)
case raftpb.ConfChangeUpdateNode:
err = n.applyUpdateNode(ctx, cc)
case raftpb.ConfChangeRemoveNode:
err = n.applyRemoveNode(ctx, cc)
}
if err != nil {
n.wait.trigger(cc.ID, err)
}
n.confState = *n.raftNode.ApplyConfChange(cc)
n.wait.trigger(cc.ID, nil)
}
// applyAddNode is called when we receive a ConfChange
// from a member in the raft cluster, this adds a new
// node to the existing raft cluster
func (n *Node) applyAddNode(cc raftpb.ConfChange) error {
member := &api.RaftMember{}
err := proto.Unmarshal(cc.Context, member)
if err != nil {
return err
}
// ID must be non zero
if member.RaftID == 0 {
return nil
}
return n.registerNode(member)
}
// applyUpdateNode is called when we receive a ConfChange from a member in the
// raft cluster which update the address of an existing node.
func (n *Node) applyUpdateNode(ctx context.Context, cc raftpb.ConfChange) error {
newMember := &api.RaftMember{}
err := proto.Unmarshal(cc.Context, newMember)
if err != nil {
return err
}
if newMember.RaftID == n.Config.ID {
return nil
}
if err := n.transport.UpdatePeer(newMember.RaftID, newMember.Addr); err != nil {
return err
}
return n.cluster.UpdateMember(newMember.RaftID, newMember)
}
// applyRemoveNode is called when we receive a ConfChange
// from a member in the raft cluster, this removes a node
// from the existing raft cluster
func (n *Node) applyRemoveNode(ctx context.Context, cc raftpb.ConfChange) (err error) {
// If the node from where the remove is issued is
// a follower and the leader steps down, Campaign
// to be the leader.
if cc.NodeID == n.leader() && !n.isLeader() {
if err = n.raftNode.Campaign(ctx); err != nil {
return err
}
}
if cc.NodeID == n.Config.ID {
// wait for the commit ack to be sent before closing connection
n.asyncTasks.Wait()
n.NodeRemoved()
} else if err := n.transport.RemovePeer(cc.NodeID); err != nil {
return err
}
return n.cluster.RemoveMember(cc.NodeID)
}
// SubscribeLeadership returns channel to which events about leadership change
// will be sent in form of raft.LeadershipState. Also cancel func is returned -
// it should be called when listener is no longer interested in events.
func (n *Node) SubscribeLeadership() (q chan events.Event, cancel func()) {
return n.leadershipBroadcast.Watch()
}
// createConfigChangeEnts creates a series of Raft entries (i.e.
// EntryConfChange) to remove the set of given IDs from the cluster. The ID
// `self` is _not_ removed, even if present in the set.
// If `self` is not inside the given ids, it creates a Raft entry to add a
// default member with the given `self`.
func createConfigChangeEnts(ids []uint64, self uint64, term, index uint64) []raftpb.Entry {
var ents []raftpb.Entry
next := index + 1
found := false
for _, id := range ids {
if id == self {
found = true
continue
}
cc := &raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode,
NodeID: id,
}
data, err := cc.Marshal()
if err != nil {
log.L.WithError(err).Panic("marshal configuration change should never fail")
}
e := raftpb.Entry{
Type: raftpb.EntryConfChange,
Data: data,
Term: term,
Index: next,
}
ents = append(ents, e)
next++
}
if !found {
node := &api.RaftMember{RaftID: self}
meta, err := node.Marshal()
if err != nil {
log.L.WithError(err).Panic("marshal member should never fail")
}
cc := &raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: self,
Context: meta,
}
data, err := cc.Marshal()
if err != nil {
log.L.WithError(err).Panic("marshal configuration change should never fail")
}
e := raftpb.Entry{
Type: raftpb.EntryConfChange,
Data: data,
Term: term,
Index: next,
}
ents = append(ents, e)
}
return ents
}
// getIDs returns an ordered set of IDs included in the given snapshot and
// the entries. The given snapshot/entries can contain two kinds of
// ID-related entry:
// - ConfChangeAddNode, in which case the contained ID will be added into the set.
// - ConfChangeRemoveNode, in which case the contained ID will be removed from the set.
func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
ids := make(map[uint64]struct{})
if snap != nil {
for _, id := range snap.Metadata.ConfState.Voters {
ids[id] = struct{}{}
}
}
for _, e := range ents {
if e.Type != raftpb.EntryConfChange {
continue
}
if snap != nil && e.Index < snap.Metadata.Index {
continue
}
var cc raftpb.ConfChange
if err := cc.Unmarshal(e.Data); err != nil {
log.L.WithError(err).Panic("unmarshal configuration change should never fail")
}
switch cc.Type {
case raftpb.ConfChangeAddNode:
ids[cc.NodeID] = struct{}{}
case raftpb.ConfChangeRemoveNode:
delete(ids, cc.NodeID)
case raftpb.ConfChangeUpdateNode:
// do nothing
default:
log.L.Panic("ConfChange Type should be either ConfChangeAddNode, or ConfChangeRemoveNode, or ConfChangeUpdateNode!")
}
}
var sids []uint64
for id := range ids {
sids = append(sids, id)
}
return sids
}
func (n *Node) reqTimeout() time.Duration {
return 5*time.Second + 2*time.Duration(n.Config.ElectionTick)*n.opts.TickInterval
}
// stackDump outputs the runtime stack to os.StdErr.
//
// It is based on Moby's stack.Dump(); https://github.com/moby/moby/blob/471fd27709777d2cce3251129887e14e8bb2e0c7/pkg/stack/stackdump.go#L41-L57
func stackDump() {
var (
buf []byte
stackSize int
)
bufferLen := 16384
for stackSize == len(buf) {
buf = make([]byte, bufferLen)
stackSize = runtime.Stack(buf, true)
bufferLen *= 2
}
buf = buf[:stackSize]
_, _ = os.Stderr.Write(buf)
}