dotcloud/docker

View on GitHub
daemon/cluster/noderunner.go

Summary

Maintainability
A
1 hr
Test Coverage
package cluster // import "github.com/docker/docker/daemon/cluster"

import (
    "context"
    "fmt"
    "path/filepath"
    "strings"
    "sync"
    "time"

    "github.com/containerd/log"
    types "github.com/docker/docker/api/types/swarm"
    "github.com/docker/docker/daemon/cluster/convert"
    "github.com/docker/docker/daemon/cluster/executor/container"
    lncluster "github.com/docker/docker/libnetwork/cluster"
    "github.com/docker/docker/libnetwork/cnmallocator"
    swarmapi "github.com/moby/swarmkit/v2/api"
    "github.com/moby/swarmkit/v2/manager/allocator/networkallocator"
    swarmnode "github.com/moby/swarmkit/v2/node"
    "github.com/pkg/errors"
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

// nodeRunner implements a manager for continuously running swarmkit node, restarting them with backoff delays if needed.
type nodeRunner struct {
    nodeState
    mu             sync.RWMutex
    done           chan struct{} // closed when swarmNode exits
    ready          chan struct{} // closed when swarmNode becomes active
    reconnectDelay time.Duration
    config         nodeStartConfig

    repeatedRun     bool
    cancelReconnect func()
    stopping        bool
    cluster         *Cluster // only for accessing config helpers, never call any methods. TODO: change to config struct
}

// nodeStartConfig holds configuration needed to start a new node. Exported
// fields of this structure are saved to disk in json. Unexported fields
// contain data that shouldn't be persisted between daemon reloads.
type nodeStartConfig struct {
    // LocalAddr is this machine's local IP or hostname, if specified.
    LocalAddr string
    // RemoteAddr is the address that was given to "swarm join". It is used
    // to find LocalAddr if necessary.
    RemoteAddr string
    // ListenAddr is the address we bind to, including a port.
    ListenAddr string
    // AdvertiseAddr is the address other nodes should connect to,
    // including a port.
    AdvertiseAddr string
    // DataPathAddr is the address that has to be used for the data path
    DataPathAddr string
    // DefaultAddressPool contains list of subnets
    DefaultAddressPool []string
    // SubnetSize contains subnet size of DefaultAddressPool
    SubnetSize uint32
    // DataPathPort contains Data path port (VXLAN UDP port) number that is used for data traffic.
    DataPathPort uint32
    // JoinInProgress is set to true if a join operation has started, but
    // not completed yet.
    JoinInProgress bool

    joinAddr        string
    forceNewCluster bool
    joinToken       string
    lockKey         []byte
    autolock        bool
    availability    types.NodeAvailability
}

func (n *nodeRunner) Ready() chan error {
    c := make(chan error, 1)
    n.mu.RLock()
    ready, done := n.ready, n.done
    n.mu.RUnlock()
    go func() {
        select {
        case <-ready:
        case <-done:
        }
        select {
        case <-ready:
        default:
            n.mu.RLock()
            c <- n.err
            n.mu.RUnlock()
        }
        close(c)
    }()
    return c
}

func (n *nodeRunner) Start(conf nodeStartConfig) error {
    n.mu.Lock()
    defer n.mu.Unlock()

    n.reconnectDelay = initialReconnectDelay

    return n.start(conf)
}

func (n *nodeRunner) start(conf nodeStartConfig) error {
    var control string
    if isWindows {
        control = `\\.\pipe\` + controlSocket
    } else {
        control = filepath.Join(n.cluster.runtimeRoot, controlSocket)
    }

    joinAddr := conf.joinAddr
    if joinAddr == "" && conf.JoinInProgress {
        // We must have been restarted while trying to join a cluster.
        // Continue trying to join instead of forming our own cluster.
        joinAddr = conf.RemoteAddr
    }

    // Hostname is not set here. Instead, it is obtained from
    // the node description that is reported periodically
    swarmnodeConfig := swarmnode.Config{
        ForceNewCluster:    conf.forceNewCluster,
        ListenControlAPI:   control,
        ListenRemoteAPI:    conf.ListenAddr,
        AdvertiseRemoteAPI: conf.AdvertiseAddr,
        NetworkConfig: &networkallocator.Config{
            DefaultAddrPool: conf.DefaultAddressPool,
            SubnetSize:      conf.SubnetSize,
            VXLANUDPPort:    conf.DataPathPort,
        },
        JoinAddr:  joinAddr,
        StateDir:  n.cluster.root,
        JoinToken: conf.joinToken,
        Executor: container.NewExecutor(
            n.cluster.config.Backend,
            n.cluster.config.PluginBackend,
            n.cluster.config.ImageBackend,
            n.cluster.config.VolumeBackend,
        ),
        HeartbeatTick: n.cluster.config.RaftHeartbeatTick,
        // Recommended value in etcd/raft is 10 x (HeartbeatTick).
        // Lower values were seen to have caused instability because of
        // frequent leader elections when running on flakey networks.
        ElectionTick:     n.cluster.config.RaftElectionTick,
        UnlockKey:        conf.lockKey,
        AutoLockManagers: conf.autolock,
        PluginGetter:     convert.SwarmPluginGetter(n.cluster.config.Backend.PluginGetter()),
        NetworkProvider:  cnmallocator.NewProvider(n.cluster.config.Backend.PluginGetter()),
    }
    if conf.availability != "" {
        avail, ok := swarmapi.NodeSpec_Availability_value[strings.ToUpper(string(conf.availability))]
        if !ok {
            return fmt.Errorf("invalid Availability: %q", conf.availability)
        }
        swarmnodeConfig.Availability = swarmapi.NodeSpec_Availability(avail)
    }
    node, err := swarmnode.New(&swarmnodeConfig)
    if err != nil {
        return err
    }
    if err := node.Start(context.Background()); err != nil {
        return err
    }

    n.done = make(chan struct{})
    n.ready = make(chan struct{})
    n.swarmNode = node
    if conf.joinAddr != "" {
        conf.JoinInProgress = true
    }
    n.config = conf
    savePersistentState(n.cluster.root, conf)

    ctx, cancel := context.WithCancel(context.Background())

    go func() {
        n.handleNodeExit(node)
        cancel()
    }()

    go n.handleReadyEvent(ctx, node, n.ready)
    go n.handleControlSocketChange(ctx, node)

    return nil
}

func (n *nodeRunner) handleControlSocketChange(ctx context.Context, node *swarmnode.Node) {
    for conn := range node.ListenControlSocket(ctx) {
        n.mu.Lock()
        if n.grpcConn != conn {
            if conn == nil {
                n.controlClient = nil
                n.logsClient = nil
            } else {
                n.controlClient = swarmapi.NewControlClient(conn)
                n.logsClient = swarmapi.NewLogsClient(conn)
                // push store changes to daemon
                go n.watchClusterEvents(ctx, conn)
            }
        }
        n.grpcConn = conn
        n.mu.Unlock()
        n.cluster.SendClusterEvent(lncluster.EventSocketChange)
    }
}

func (n *nodeRunner) watchClusterEvents(ctx context.Context, conn *grpc.ClientConn) {
    client := swarmapi.NewWatchClient(conn)
    watch, err := client.Watch(ctx, &swarmapi.WatchRequest{
        Entries: []*swarmapi.WatchRequest_WatchEntry{
            {
                Kind:   "node",
                Action: swarmapi.WatchActionKindCreate | swarmapi.WatchActionKindUpdate | swarmapi.WatchActionKindRemove,
            },
            {
                Kind:   "service",
                Action: swarmapi.WatchActionKindCreate | swarmapi.WatchActionKindUpdate | swarmapi.WatchActionKindRemove,
            },
            {
                Kind:   "network",
                Action: swarmapi.WatchActionKindCreate | swarmapi.WatchActionKindUpdate | swarmapi.WatchActionKindRemove,
            },
            {
                Kind:   "secret",
                Action: swarmapi.WatchActionKindCreate | swarmapi.WatchActionKindUpdate | swarmapi.WatchActionKindRemove,
            },
            {
                Kind:   "config",
                Action: swarmapi.WatchActionKindCreate | swarmapi.WatchActionKindUpdate | swarmapi.WatchActionKindRemove,
            },
        },
        IncludeOldObject: true,
    })
    if err != nil {
        log.G(ctx).WithError(err).Error("failed to watch cluster store")
        return
    }
    for {
        msg, err := watch.Recv()
        if err != nil {
            // store watch is broken
            errStatus, ok := status.FromError(err)
            if !ok || errStatus.Code() != codes.Canceled {
                log.G(ctx).WithError(err).Error("failed to receive changes from store watch API")
            }
            return
        }
        select {
        case <-ctx.Done():
            return
        case n.cluster.watchStream <- msg:
        }
    }
}

func (n *nodeRunner) handleReadyEvent(ctx context.Context, node *swarmnode.Node, ready chan struct{}) {
    select {
    case <-node.Ready():
        n.mu.Lock()
        n.err = nil
        if n.config.JoinInProgress {
            n.config.JoinInProgress = false
            savePersistentState(n.cluster.root, n.config)
        }
        n.mu.Unlock()
        close(ready)
    case <-ctx.Done():
    }
    n.cluster.SendClusterEvent(lncluster.EventNodeReady)
}

func (n *nodeRunner) handleNodeExit(node *swarmnode.Node) {
    err := detectLockedError(node.Err(context.Background()))
    if err != nil {
        log.G(context.TODO()).Errorf("cluster exited with error: %v", err)
    }
    n.mu.Lock()
    n.swarmNode = nil
    n.err = err
    close(n.done)
    select {
    case <-n.ready:
        // there is a case where a node can be promoted to manager while
        // another node is leaving the cluster. the node being promoted, by
        // random chance, picks the IP of the node being demoted as the one it
        // tries to connect to. in this case, the promotion will fail, and the
        // whole swarm Node object packs it in.
        //
        // when the Node object is relaunched by this code, because it has
        // joinAddr in the config, it attempts again to connect to the same
        // no-longer-manager node, and crashes again. this continues forever.
        //
        // to avoid this case, in this block, we remove JoinAddr from the
        // config.
        n.config.joinAddr = ""
        n.enableReconnectWatcher()
    default:
        if n.repeatedRun {
            n.enableReconnectWatcher()
        }
    }
    n.repeatedRun = true
    n.mu.Unlock()
}

// Stop stops the current swarm node if it is running.
func (n *nodeRunner) Stop() error {
    n.mu.Lock()
    if n.cancelReconnect != nil { // between restarts
        n.cancelReconnect()
        n.cancelReconnect = nil
    }
    if n.swarmNode == nil {
        // even though the swarm node is nil we still may need
        // to send a node leave event to perform any cleanup required.
        if n.cluster != nil {
            n.cluster.SendClusterEvent(lncluster.EventNodeLeave)
        }
        n.mu.Unlock()
        return nil
    }
    n.stopping = true
    ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
    defer cancel()
    n.mu.Unlock()
    if err := n.swarmNode.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
        return err
    }
    n.cluster.SendClusterEvent(lncluster.EventNodeLeave)
    <-n.done
    return nil
}

func (n *nodeRunner) State() nodeState {
    if n == nil {
        return nodeState{status: types.LocalNodeStateInactive}
    }
    n.mu.RLock()
    defer n.mu.RUnlock()

    ns := n.nodeState

    if ns.err != nil || n.cancelReconnect != nil {
        if errors.Is(ns.err, errSwarmLocked) {
            ns.status = types.LocalNodeStateLocked
        } else {
            ns.status = types.LocalNodeStateError
        }
    } else {
        select {
        case <-n.ready:
            ns.status = types.LocalNodeStateActive
        default:
            ns.status = types.LocalNodeStatePending
        }
    }

    return ns
}

func (n *nodeRunner) enableReconnectWatcher() {
    if n.stopping {
        return
    }
    n.reconnectDelay *= 2
    if n.reconnectDelay > maxReconnectDelay {
        n.reconnectDelay = maxReconnectDelay
    }
    log.G(context.TODO()).Warnf("Restarting swarm in %.2f seconds", n.reconnectDelay.Seconds())
    delayCtx, cancel := context.WithTimeout(context.Background(), n.reconnectDelay)
    n.cancelReconnect = cancel

    go func() {
        <-delayCtx.Done()
        if delayCtx.Err() != context.DeadlineExceeded {
            return
        }
        n.mu.Lock()
        defer n.mu.Unlock()
        if n.stopping {
            return
        }

        if err := n.start(n.config); err != nil {
            n.err = err
        }
    }()
}

// nodeState represents information about the current state of the cluster and
// provides access to the grpc clients.
type nodeState struct {
    swarmNode       *swarmnode.Node
    grpcConn        *grpc.ClientConn
    controlClient   swarmapi.ControlClient
    logsClient      swarmapi.LogsClient
    status          types.LocalNodeState
    actualLocalAddr string
    err             error
}

// IsActiveManager returns true if node is a manager ready to accept control requests. It is safe to access the client properties if this returns true.
func (ns nodeState) IsActiveManager() bool {
    return ns.controlClient != nil
}

// IsManager returns true if node is a manager.
func (ns nodeState) IsManager() bool {
    return ns.swarmNode != nil && ns.swarmNode.Manager() != nil
}

// NodeID returns node's ID or empty string if node is inactive.
func (ns nodeState) NodeID() string {
    if ns.swarmNode != nil {
        return ns.swarmNode.NodeID()
    }
    return ""
}