ipfs/ipfs-cluster

View on GitHub
consensus/raft/raft.go

Summary

Maintainability
A
3 hrs
Test Coverage
package raft

import (
    "context"
    "errors"
    "fmt"
    "io"
    "os"
    "path/filepath"
    "time"

    "github.com/ipfs-cluster/ipfs-cluster/state"

    p2praft "github.com/libp2p/go-libp2p-raft"
    host "github.com/libp2p/go-libp2p/core/host"
    peer "github.com/libp2p/go-libp2p/core/peer"

    hraft "github.com/hashicorp/raft"
    raftboltdb "github.com/hashicorp/raft-boltdb"
    "go.opencensus.io/trace"
)

var raftLogger = &p2praft.HcLogToLogger{}

// ErrWaitingForSelf is returned when we are waiting for ourselves to depart
// the peer set, which won't happen
var errWaitingForSelf = errors.New("waiting for ourselves to depart")

// RaftMaxSnapshots indicates how many snapshots to keep in the consensus data
// folder.
// TODO: Maybe include this in Config. Not sure how useful it is to touch
// this anyways.
var RaftMaxSnapshots = 5

// RaftLogCacheSize is the maximum number of logs to cache in-memory.
// This is used to reduce disk I/O for the recently committed entries.
var RaftLogCacheSize = 512

// How long we wait for updates during shutdown before snapshotting
var waitForUpdatesShutdownTimeout = 5 * time.Second
var waitForUpdatesInterval = 400 * time.Millisecond

// How many times to retry snapshotting when shutting down
var maxShutdownSnapshotRetries = 5

// raftWrapper wraps the hraft.Raft object and related things like the
// different stores used or the hraft.Configuration.
// Its methods provide functionality for working with Raft.
type raftWrapper struct {
    ctx           context.Context
    cancel        context.CancelFunc
    raft          *hraft.Raft
    config        *Config
    host          host.Host
    serverConfig  hraft.Configuration
    transport     *hraft.NetworkTransport
    snapshotStore hraft.SnapshotStore
    logStore      hraft.LogStore
    stableStore   hraft.StableStore
    boltdb        *raftboltdb.BoltStore
    staging       bool
}

// newRaftWrapper creates a Raft instance and initializes
// everything leaving it ready to use. Note, that Bootstrap() should be called
// to make sure the raft instance is usable.
func newRaftWrapper(
    host host.Host,
    cfg *Config,
    fsm hraft.FSM,
    staging bool,
) (*raftWrapper, error) {

    raftW := &raftWrapper{}
    raftW.config = cfg
    raftW.host = host
    raftW.staging = staging
    // Set correct LocalID
    cfg.RaftConfig.LocalID = hraft.ServerID(host.ID().String())

    df := cfg.GetDataFolder()
    err := makeDataFolder(df)
    if err != nil {
        return nil, err
    }

    raftW.makeServerConfig()

    err = raftW.makeTransport()
    if err != nil {
        return nil, err
    }

    err = raftW.makeStores()
    if err != nil {
        return nil, err
    }

    logger.Debug("creating Raft")
    raftW.raft, err = hraft.NewRaft(
        cfg.RaftConfig,
        fsm,
        raftW.logStore,
        raftW.stableStore,
        raftW.snapshotStore,
        raftW.transport,
    )
    if err != nil {
        logger.Error("initializing raft: ", err)
        return nil, err
    }

    raftW.ctx, raftW.cancel = context.WithCancel(context.Background())
    go raftW.observePeers()

    return raftW, nil
}

// makeDataFolder creates the folder that is meant to store Raft data. Ensures
// we always set 0700 mode.
func makeDataFolder(folder string) error {
    return os.MkdirAll(folder, 0700)
}

func (rw *raftWrapper) makeTransport() (err error) {
    logger.Debug("creating libp2p Raft transport")
    rw.transport, err = p2praft.NewLibp2pTransport(
        rw.host,
        rw.config.NetworkTimeout,
    )
    return err
}

func (rw *raftWrapper) makeStores() error {
    logger.Debug("creating BoltDB store")
    df := rw.config.GetDataFolder()
    store, err := raftboltdb.NewBoltStore(filepath.Join(df, "raft.db"))
    if err != nil {
        return err
    }

    // wraps the store in a LogCache to improve performance.
    // See consul/agent/consul/server.go
    cacheStore, err := hraft.NewLogCache(RaftLogCacheSize, store)
    if err != nil {
        return err
    }

    logger.Debug("creating raft snapshot store")
    snapstore, err := hraft.NewFileSnapshotStoreWithLogger(
        df,
        RaftMaxSnapshots,
        raftLogger,
    )
    if err != nil {
        return err
    }

    rw.logStore = cacheStore
    rw.stableStore = store
    rw.snapshotStore = snapstore
    rw.boltdb = store
    return nil
}

// Bootstrap calls BootstrapCluster on the Raft instance with a valid
// Configuration (generated from InitPeerset) when Raft has no state
// and we are not setting up a staging peer. It returns if Raft
// was boostrapped (true) and an error.
func (rw *raftWrapper) Bootstrap() (bool, error) {
    logger.Debug("checking for existing raft states")
    hasState, err := hraft.HasExistingState(
        rw.logStore,
        rw.stableStore,
        rw.snapshotStore,
    )
    if err != nil {
        return false, err
    }

    if hasState {
        logger.Debug("raft cluster is already initialized")

        // Inform the user that we are working with a pre-existing peerset
        logger.Info("existing Raft state found! raft.InitPeerset will be ignored")
        cf := rw.raft.GetConfiguration()
        if err := cf.Error(); err != nil {
            logger.Debug(err)
            return false, err
        }
        currentCfg := cf.Configuration()
        srvs := ""
        for _, s := range currentCfg.Servers {
            srvs += fmt.Sprintf("        %s\n", s.ID)
        }

        logger.Debugf("Current Raft Peerset:\n%s\n", srvs)
        return false, nil
    }

    if rw.staging {
        logger.Debug("staging servers do not need initialization")
        logger.Info("peer is ready to join a cluster")
        return false, nil
    }

    voters := ""
    for _, s := range rw.serverConfig.Servers {
        voters += fmt.Sprintf("        %s\n", s.ID)
    }

    logger.Infof("initializing raft cluster with the following voters:\n%s\n", voters)

    future := rw.raft.BootstrapCluster(rw.serverConfig)
    if err := future.Error(); err != nil {
        logger.Error("bootstrapping cluster: ", err)
        return true, err
    }
    return true, nil
}

// create Raft servers configuration. The result is used
// by Bootstrap() when it proceeds to Bootstrap.
func (rw *raftWrapper) makeServerConfig() {
    rw.serverConfig = makeServerConf(append(rw.config.InitPeerset, rw.host.ID()))
}

// creates a server configuration with all peers as Voters.
func makeServerConf(peers []peer.ID) hraft.Configuration {
    sm := make(map[string]struct{})

    servers := make([]hraft.Server, 0)

    // Servers are peers + self. We avoid duplicate entries below
    for _, pid := range peers {
        p := pid.String()
        _, ok := sm[p]
        if !ok { // avoid dups
            sm[p] = struct{}{}
            servers = append(servers, hraft.Server{
                Suffrage: hraft.Voter,
                ID:       hraft.ServerID(p),
                Address:  hraft.ServerAddress(p),
            })
        }
    }
    return hraft.Configuration{Servers: servers}
}

// WaitForLeader holds until Raft says we have a leader.
// Returns if ctx is canceled.
func (rw *raftWrapper) WaitForLeader(ctx context.Context) (string, error) {
    ctx, span := trace.StartSpan(ctx, "consensus/raft/WaitForLeader")
    defer span.End()

    ticker := time.NewTicker(time.Second / 2)
    for {
        select {
        case <-ticker.C:
            if l := rw.raft.Leader(); l != "" {
                logger.Debug("waitForleaderTimer")
                logger.Infof("Current Raft Leader: %s", l)
                ticker.Stop()
                return string(l), nil
            }
        case <-ctx.Done():
            return "", ctx.Err()
        }
    }
}

func (rw *raftWrapper) WaitForVoter(ctx context.Context) error {
    ctx, span := trace.StartSpan(ctx, "consensus/raft/WaitForVoter")
    defer span.End()

    logger.Debug("waiting until we are promoted to a voter")

    pid := hraft.ServerID(rw.host.ID().String())
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
            logger.Debugf("%s: get configuration", pid)
            configFuture := rw.raft.GetConfiguration()
            if err := configFuture.Error(); err != nil {
                return err
            }

            if isVoter(pid, configFuture.Configuration()) {
                return nil
            }
            logger.Debugf("%s: not voter yet", pid)

            time.Sleep(waitForUpdatesInterval)
        }
    }
}

func isVoter(srvID hraft.ServerID, cfg hraft.Configuration) bool {
    for _, server := range cfg.Servers {
        if server.ID == srvID && server.Suffrage == hraft.Voter {
            return true
        }
    }
    return false
}

// WaitForUpdates holds until Raft has synced to the last index in the log
func (rw *raftWrapper) WaitForUpdates(ctx context.Context) error {
    ctx, span := trace.StartSpan(ctx, "consensus/raft/WaitForUpdates")
    defer span.End()

    logger.Debug("Raft state is catching up to the latest known version. Please wait...")
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
            lai := rw.raft.AppliedIndex()
            li := rw.raft.LastIndex()
            logger.Debugf("current Raft index: %d/%d",
                lai, li)
            if lai == li {
                return nil
            }
            time.Sleep(waitForUpdatesInterval)
        }
    }
}

func (rw *raftWrapper) WaitForPeer(ctx context.Context, pid string, depart bool) error {
    ctx, span := trace.StartSpan(ctx, "consensus/raft/WaitForPeer")
    defer span.End()

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
            peers, err := rw.Peers(ctx)
            if err != nil {
                return err
            }

            if len(peers) == 1 && pid == peers[0] && depart {
                return errWaitingForSelf
            }

            found := find(peers, pid)

            // departing
            if depart && !found {
                return nil
            }

            // joining
            if !depart && found {
                return nil
            }

            time.Sleep(50 * time.Millisecond)
        }
    }
}

// Snapshot tells Raft to take a snapshot.
func (rw *raftWrapper) Snapshot() error {
    future := rw.raft.Snapshot()
    err := future.Error()
    if err != nil && err.Error() != hraft.ErrNothingNewToSnapshot.Error() {
        return err
    }
    return nil
}

// snapshotOnShutdown attempts to take a snapshot before a shutdown.
// Snapshotting might fail if the raft applied index is not the last index.
// This waits for the updates and tries to take a snapshot when the
// applied index is up to date.
// It will retry if the snapshot still fails, in case more updates have arrived.
// If waiting for updates times-out, it will not try anymore, since something
// is wrong. This is a best-effort solution as there is no way to tell Raft
// to stop processing entries because we want to take a snapshot before
// shutting down.
func (rw *raftWrapper) snapshotOnShutdown() error {
    var err error
    for i := 0; i < maxShutdownSnapshotRetries; i++ {
        ctx, cancel := context.WithTimeout(context.Background(), waitForUpdatesShutdownTimeout)
        err = rw.WaitForUpdates(ctx)
        cancel()
        if err != nil {
            logger.Warn("timed out waiting for state updates before shutdown. Snapshotting may fail")
            return rw.Snapshot()
        }

        err = rw.Snapshot()
        if err == nil {
            return nil // things worked
        }

        // There was an error
        err = errors.New("could not snapshot raft: " + err.Error())
        logger.Warnf("retrying to snapshot (%d/%d)...", i+1, maxShutdownSnapshotRetries)
    }
    return err
}

// Shutdown shutdown Raft and closes the BoltDB.
func (rw *raftWrapper) Shutdown(ctx context.Context) error {
    _, span := trace.StartSpan(ctx, "consensus/raft/Shutdown")
    defer span.End()

    errMsgs := ""

    rw.cancel()

    err := rw.snapshotOnShutdown()
    if err != nil {
        errMsgs += err.Error() + ".\n"
    }

    future := rw.raft.Shutdown()
    err = future.Error()
    if err != nil {
        errMsgs += "could not shutdown raft: " + err.Error() + ".\n"
    }

    err = rw.boltdb.Close() // important!
    if err != nil {
        errMsgs += "could not close boltdb: " + err.Error()
    }

    if errMsgs != "" {
        return errors.New(errMsgs)
    }

    return nil
}

// AddPeer adds a peer to Raft
func (rw *raftWrapper) AddPeer(ctx context.Context, peer string) error {
    ctx, span := trace.StartSpan(ctx, "consensus/raft/AddPeer")
    defer span.End()

    // Check that we don't have it to not waste
    // log entries if so.
    peers, err := rw.Peers(ctx)
    if err != nil {
        return err
    }
    if find(peers, peer) {
        logger.Infof("%s is already a raft peer", peer)
        return nil
    }

    future := rw.raft.AddVoter(
        hraft.ServerID(peer),
        hraft.ServerAddress(peer),
        0,
        0,
    ) // TODO: Extra cfg value?
    err = future.Error()
    if err != nil {
        logger.Error("raft cannot add peer: ", err)
    }
    return err
}

// RemovePeer removes a peer from Raft
func (rw *raftWrapper) RemovePeer(ctx context.Context, peer string) error {
    ctx, span := trace.StartSpan(ctx, "consensus/RemovePeer")
    defer span.End()

    // Check that we have it to not waste
    // log entries if we don't.
    peers, err := rw.Peers(ctx)
    if err != nil {
        return err
    }
    if !find(peers, peer) {
        logger.Infof("%s is not among raft peers", peer)
        return nil
    }

    if len(peers) == 1 && peers[0] == peer {
        return errors.New("cannot remove ourselves from a 1-peer cluster")
    }

    rmFuture := rw.raft.RemoveServer(
        hraft.ServerID(peer),
        0,
        0,
    ) // TODO: Extra cfg value?
    err = rmFuture.Error()
    if err != nil {
        logger.Error("raft cannot remove peer: ", err)
        return err
    }

    return nil
}

// Leader returns Raft's leader. It may be an empty string if
// there is no leader or it is unknown.
func (rw *raftWrapper) Leader(ctx context.Context) string {
    _, span := trace.StartSpan(ctx, "consensus/raft/Leader")
    defer span.End()

    return string(rw.raft.Leader())
}

func (rw *raftWrapper) Peers(ctx context.Context) ([]string, error) {
    _, span := trace.StartSpan(ctx, "consensus/raft/Peers")
    defer span.End()

    ids := make([]string, 0)

    configFuture := rw.raft.GetConfiguration()
    if err := configFuture.Error(); err != nil {
        return nil, err
    }

    for _, server := range configFuture.Configuration().Servers {
        ids = append(ids, string(server.ID))
    }

    return ids, nil
}

// latestSnapshot looks for the most recent raft snapshot stored at the
// provided basedir.  It returns the snapshot's metadata, and a reader
// to the snapshot's bytes
func latestSnapshot(raftDataFolder string) (*hraft.SnapshotMeta, io.ReadCloser, error) {
    store, err := hraft.NewFileSnapshotStore(raftDataFolder, RaftMaxSnapshots, nil)
    if err != nil {
        return nil, nil, err
    }
    snapMetas, err := store.List()
    if err != nil {
        return nil, nil, err
    }
    if len(snapMetas) == 0 { // no error if snapshot isn't found
        return nil, nil, nil
    }
    meta, r, err := store.Open(snapMetas[0].ID)
    if err != nil {
        return nil, nil, err
    }
    return meta, r, nil
}

// LastStateRaw returns the bytes of the last snapshot stored, its metadata,
// and a flag indicating whether any snapshot was found.
func LastStateRaw(cfg *Config) (io.Reader, bool, error) {
    // Read most recent snapshot
    dataFolder := cfg.GetDataFolder()
    if _, err := os.Stat(dataFolder); os.IsNotExist(err) {
        // nothing to read
        return nil, false, nil
    }

    meta, r, err := latestSnapshot(dataFolder)
    if err != nil {
        return nil, false, err
    }
    if meta == nil { // no snapshots could be read
        return nil, false, nil
    }
    return r, true, nil
}

// SnapshotSave saves the provided state to a snapshot in the
// raft data path.  Old raft data is backed up and replaced
// by the new snapshot.  pids contains the config-specified
// peer ids to include in the snapshot metadata if no snapshot exists
// from which to copy the raft metadata
func SnapshotSave(cfg *Config, newState state.State, pids []peer.ID) error {
    dataFolder := cfg.GetDataFolder()
    err := makeDataFolder(dataFolder)
    if err != nil {
        return err
    }
    meta, _, err := latestSnapshot(dataFolder)
    if err != nil {
        return err
    }

    // make a new raft snapshot
    var raftSnapVersion hraft.SnapshotVersion = 1 // As of hraft v1.0.0 this is always 1
    configIndex := uint64(1)
    var raftIndex uint64
    var raftTerm uint64
    var srvCfg hraft.Configuration
    if meta != nil {
        raftIndex = meta.Index
        raftTerm = meta.Term
        srvCfg = meta.Configuration
        CleanupRaft(cfg)
    } else {
        // Begin the log after the index of a fresh start so that
        // the snapshot's state propagate's during bootstrap
        raftIndex = uint64(2)
        raftTerm = uint64(1)
        srvCfg = makeServerConf(pids)
    }

    snapshotStore, err := hraft.NewFileSnapshotStoreWithLogger(dataFolder, RaftMaxSnapshots, nil)
    if err != nil {
        return err
    }
    _, dummyTransport := hraft.NewInmemTransport("")

    sink, err := snapshotStore.Create(raftSnapVersion, raftIndex, raftTerm, srvCfg, configIndex, dummyTransport)
    if err != nil {
        return err
    }

    err = p2praft.EncodeSnapshot(newState, sink)
    if err != nil {
        sink.Cancel()
        return err
    }
    err = sink.Close()
    if err != nil {
        return err
    }
    return nil
}

// CleanupRaft moves the current data folder to a backup location
func CleanupRaft(cfg *Config) error {
    dataFolder := cfg.GetDataFolder()
    keep := cfg.BackupsRotate

    meta, _, err := latestSnapshot(dataFolder)
    if meta == nil && err == nil {
        // no snapshots at all. Avoid creating backups
        // from empty state folders.
        logger.Infof("cleaning empty Raft data folder (%s)", dataFolder)
        os.RemoveAll(dataFolder)
        return nil
    }

    logger.Infof("cleaning and backing up Raft data folder (%s)", dataFolder)
    dbh := newDataBackupHelper(dataFolder, keep)
    err = dbh.makeBackup()
    if err != nil {
        logger.Warn(err)
        logger.Warn("the state could not be cleaned properly")
        logger.Warn("manual intervention may be needed before starting cluster again")
    }
    return nil
}

// only call when Raft is shutdown
func (rw *raftWrapper) Clean() error {
    return CleanupRaft(rw.config)
}

func find(s []string, elem string) bool {
    for _, selem := range s {
        if selem == elem {
            return true
        }
    }
    return false
}

func (rw *raftWrapper) observePeers() {
    obsCh := make(chan hraft.Observation, 1)
    defer close(obsCh)

    observer := hraft.NewObserver(obsCh, true, func(o *hraft.Observation) bool {
        po, ok := o.Data.(hraft.PeerObservation)
        return ok && po.Removed
    })

    rw.raft.RegisterObserver(observer)
    defer rw.raft.DeregisterObserver(observer)

    for {
        select {
        case obs := <-obsCh:
            pObs := obs.Data.(hraft.PeerObservation)
            logger.Info("raft peer departed. Removing from peerstore: ", pObs.Peer.ID)
            pID, err := peer.Decode(string(pObs.Peer.ID))
            if err != nil {
                logger.Error(err)
                continue
            }
            rw.host.Peerstore().ClearAddrs(pID)
        case <-rw.ctx.Done():
            logger.Debug("stopped observing raft peers")
            return
        }
    }
}