dotcloud/docker

View on GitHub
daemon/cluster/cluster.go

Summary

Maintainability
A
0 mins
Test Coverage
package cluster // import "github.com/docker/docker/daemon/cluster"

//
// ## Swarmkit integration
//
// Cluster - static configurable object for accessing everything swarm related.
// Contains methods for connecting and controlling the cluster. Exists always,
// even if swarm mode is not enabled.
//
// NodeRunner - Manager for starting the swarmkit node. Is present only and
// always if swarm mode is enabled. Implements backoff restart loop in case of
// errors.
//
// NodeState - Information about the current node status including access to
// gRPC clients if a manager is active.
//
// ### Locking
//
// `cluster.controlMutex` - taken for the whole lifecycle of the processes that
// can reconfigure cluster(init/join/leave etc). Protects that one
// reconfiguration action has fully completed before another can start.
//
// `cluster.mu` - taken when the actual changes in cluster configurations
// happen. Different from `controlMutex` because in some cases we need to
// access current cluster state even if the long-running reconfiguration is
// going on. For example network stack may ask for the current cluster state in
// the middle of the shutdown. Any time current cluster state is asked you
// should take the read lock of `cluster.mu`. If you are writing an API
// responder that returns synchronously, hold `cluster.mu.RLock()` for the
// duration of the whole handler function. That ensures that node will not be
// shut down until the handler has finished.
//
// NodeRunner implements its internal locks that should not be used outside of
// the struct. Instead, you should just call `nodeRunner.State()` method to get
// the current state of the cluster(still need `cluster.mu.RLock()` to access
// `cluster.nr` reference itself). Most of the changes in NodeRunner happen
// because of an external event(network problem, unexpected swarmkit error) and
// Docker shouldn't take any locks that delay these changes from happening.
//

import (
    "context"
    "fmt"
    "math"
    "net"
    "os"
    "path/filepath"
    "runtime"
    "sync"
    "time"

    "github.com/containerd/log"
    "github.com/docker/docker/api/types/network"
    types "github.com/docker/docker/api/types/swarm"
    "github.com/docker/docker/daemon/cluster/controllers/plugin"
    executorpkg "github.com/docker/docker/daemon/cluster/executor"
    lncluster "github.com/docker/docker/libnetwork/cluster"
    "github.com/docker/docker/pkg/stack"
    swarmapi "github.com/moby/swarmkit/v2/api"
    swarmnode "github.com/moby/swarmkit/v2/node"
    "github.com/pkg/errors"
    "google.golang.org/grpc"
)

const (
    swarmDirName                   = "swarm"
    controlSocket                  = "control.sock"
    swarmConnectTimeout            = 20 * time.Second
    swarmRequestTimeout            = 20 * time.Second
    stateFile                      = "docker-state.json"
    defaultAddr                    = "tcp://0.0.0.0:2377"
    isWindows                      = runtime.GOOS == "windows"
    initialReconnectDelay          = 100 * time.Millisecond
    maxReconnectDelay              = 30 * time.Second
    contextPrefix                  = "com.docker.swarm"
    defaultRecvSizeForListResponse = math.MaxInt32 // the max recv limit grpc <1.4.0
)

// NetworkSubnetsProvider exposes functions for retrieving the subnets
// of networks managed by Docker, so they can be filtered.
type NetworkSubnetsProvider interface {
    Subnets() ([]net.IPNet, []net.IPNet)
}

// Config provides values for Cluster.
type Config struct {
    Root                   string
    Name                   string
    Backend                executorpkg.Backend
    ImageBackend           executorpkg.ImageBackend
    PluginBackend          plugin.Backend
    VolumeBackend          executorpkg.VolumeBackend
    NetworkSubnetsProvider NetworkSubnetsProvider

    // DefaultAdvertiseAddr is the default host/IP or network interface to use
    // if no AdvertiseAddr value is specified.
    DefaultAdvertiseAddr string

    // path to store runtime state, such as the swarm control socket
    RuntimeRoot string

    // WatchStream is a channel to pass watch API notifications to daemon
    WatchStream chan *swarmapi.WatchMessage

    // RaftHeartbeatTick is the number of ticks for heartbeat of quorum members
    RaftHeartbeatTick uint32

    // RaftElectionTick is the number of ticks to elapse before followers propose a new round of leader election
    // This value should be 10x that of RaftHeartbeatTick
    RaftElectionTick uint32
}

// Cluster provides capabilities to participate in a cluster as a worker or a
// manager.
type Cluster struct {
    mu           sync.RWMutex
    controlMutex sync.RWMutex // protect init/join/leave user operations
    nr           *nodeRunner
    root         string
    runtimeRoot  string
    config       Config
    configEvent  chan lncluster.ConfigEventType // todo: make this array and goroutine safe
    attachers    map[string]*attacher
    watchStream  chan *swarmapi.WatchMessage
}

// attacher manages the in-memory attachment state of a container
// attachment to a global scope network managed by swarm manager. It
// helps in identifying the attachment ID via the taskID and the
// corresponding attachment configuration obtained from the manager.
type attacher struct {
    taskID           string
    config           *network.NetworkingConfig
    inProgress       bool
    attachWaitCh     chan *network.NetworkingConfig
    attachCompleteCh chan struct{}
    detachWaitCh     chan struct{}
}

// New creates a new Cluster instance using provided config.
func New(config Config) (*Cluster, error) {
    root := filepath.Join(config.Root, swarmDirName)
    if err := os.MkdirAll(root, 0o700); err != nil {
        return nil, err
    }
    if config.RuntimeRoot == "" {
        config.RuntimeRoot = root
    }
    if config.RaftHeartbeatTick == 0 {
        config.RaftHeartbeatTick = 1
    }
    if config.RaftElectionTick == 0 {
        // 10X heartbeat tick is the recommended ratio according to etcd docs.
        config.RaftElectionTick = 10 * config.RaftHeartbeatTick
    }

    if err := os.MkdirAll(config.RuntimeRoot, 0o700); err != nil {
        return nil, err
    }
    c := &Cluster{
        root:        root,
        config:      config,
        configEvent: make(chan lncluster.ConfigEventType, 10),
        runtimeRoot: config.RuntimeRoot,
        attachers:   make(map[string]*attacher),
        watchStream: config.WatchStream,
    }
    return c, nil
}

// Start the Cluster instance
// TODO The split between New and Start can be join again when the SendClusterEvent
// method is no longer required
func (c *Cluster) Start() error {
    root := filepath.Join(c.config.Root, swarmDirName)

    nodeConfig, err := loadPersistentState(root)
    if err != nil {
        if os.IsNotExist(err) {
            return nil
        }
        return err
    }

    nr, err := c.newNodeRunner(*nodeConfig)
    if err != nil {
        return err
    }
    c.nr = nr

    timer := time.NewTimer(swarmConnectTimeout)
    defer timer.Stop()

    select {
    case <-timer.C:
        log.G(context.TODO()).Error("swarm component could not be started before timeout was reached")
    case err := <-nr.Ready():
        if err != nil {
            log.G(context.TODO()).WithError(err).Error("swarm component could not be started")
            return nil
        }
    }
    return nil
}

func (c *Cluster) newNodeRunner(conf nodeStartConfig) (*nodeRunner, error) {
    if err := c.config.Backend.IsSwarmCompatible(); err != nil {
        return nil, err
    }

    actualLocalAddr := conf.LocalAddr
    if actualLocalAddr == "" {
        // If localAddr was not specified, resolve it automatically
        // based on the route to joinAddr. localAddr can only be left
        // empty on "join".
        listenHost, _, err := net.SplitHostPort(conf.ListenAddr)
        if err != nil {
            return nil, fmt.Errorf("could not parse listen address: %v", err)
        }

        listenAddrIP := net.ParseIP(listenHost)
        if listenAddrIP == nil || !listenAddrIP.IsUnspecified() {
            actualLocalAddr = listenHost
        } else {
            if conf.RemoteAddr == "" {
                // Should never happen except using swarms created by
                // old versions that didn't save remoteAddr.
                conf.RemoteAddr = "8.8.8.8:53"
            }
            conn, err := net.Dial("udp", conf.RemoteAddr)
            if err != nil {
                return nil, fmt.Errorf("could not find local IP address: %v", err)
            }
            localHostPort := conn.LocalAddr().String()
            actualLocalAddr, _, _ = net.SplitHostPort(localHostPort)
            _ = conn.Close()
        }
    }

    nr := &nodeRunner{cluster: c}
    nr.actualLocalAddr = actualLocalAddr

    if err := nr.Start(conf); err != nil {
        return nil, err
    }

    c.config.Backend.DaemonJoinsCluster(c)

    return nr, nil
}

// IsManager returns true if Cluster is participating as a manager.
func (c *Cluster) IsManager() bool {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return c.currentNodeState().IsActiveManager()
}

// IsAgent returns true if Cluster is participating as a worker/agent.
func (c *Cluster) IsAgent() bool {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return c.currentNodeState().status == types.LocalNodeStateActive
}

// GetLocalAddress returns the local address.
func (c *Cluster) GetLocalAddress() string {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return c.currentNodeState().actualLocalAddr
}

// GetListenAddress returns the listen address.
func (c *Cluster) GetListenAddress() string {
    c.mu.RLock()
    defer c.mu.RUnlock()
    if c.nr != nil {
        return c.nr.config.ListenAddr
    }
    return ""
}

// GetAdvertiseAddress returns the remotely reachable address of this node.
func (c *Cluster) GetAdvertiseAddress() string {
    c.mu.RLock()
    defer c.mu.RUnlock()
    if c.nr != nil && c.nr.config.AdvertiseAddr != "" {
        advertiseHost, _, _ := net.SplitHostPort(c.nr.config.AdvertiseAddr)
        return advertiseHost
    }
    return c.currentNodeState().actualLocalAddr
}

// GetDataPathAddress returns the address to be used for the data path traffic, if specified.
func (c *Cluster) GetDataPathAddress() string {
    c.mu.RLock()
    defer c.mu.RUnlock()
    if c.nr != nil {
        return c.nr.config.DataPathAddr
    }
    return ""
}

// GetRemoteAddressList returns the advertise address for each of the remote managers if
// available.
func (c *Cluster) GetRemoteAddressList() []string {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return c.getRemoteAddressList()
}

// GetWatchStream returns the channel to pass changes from store watch API
func (c *Cluster) GetWatchStream() chan *swarmapi.WatchMessage {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return c.watchStream
}

func (c *Cluster) getRemoteAddressList() []string {
    state := c.currentNodeState()
    if state.swarmNode == nil {
        return []string{}
    }

    nodeID := state.swarmNode.NodeID()
    remotes := state.swarmNode.Remotes()
    addressList := make([]string, 0, len(remotes))
    for _, r := range remotes {
        if r.NodeID != nodeID {
            addressList = append(addressList, r.Addr)
        }
    }
    return addressList
}

// ListenClusterEvents returns a channel that receives messages on cluster
// participation changes.
// todo: make cancelable and accessible to multiple callers
func (c *Cluster) ListenClusterEvents() <-chan lncluster.ConfigEventType {
    return c.configEvent
}

// currentNodeState should not be called without a read lock
func (c *Cluster) currentNodeState() nodeState {
    return c.nr.State()
}

// errNoManager returns error describing why manager commands can't be used.
// Call with read lock.
func (c *Cluster) errNoManager(st nodeState) error {
    if st.swarmNode == nil {
        if errors.Is(st.err, errSwarmLocked) {
            return errSwarmLocked
        }
        if st.err == errSwarmCertificatesExpired {
            return errSwarmCertificatesExpired
        }
        return errors.WithStack(notAvailableError(`This node is not a swarm manager. Use "docker swarm init" or "docker swarm join" to connect this node to swarm and try again.`))
    }
    if st.swarmNode.Manager() != nil {
        return errors.WithStack(notAvailableError("This node is not a swarm manager. Manager is being prepared or has trouble connecting to the cluster."))
    }
    return errors.WithStack(notAvailableError("This node is not a swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager."))
}

// Cleanup stops active swarm node. This is run before daemon shutdown.
func (c *Cluster) Cleanup() {
    c.controlMutex.Lock()
    defer c.controlMutex.Unlock()

    c.mu.Lock()
    node := c.nr
    if node == nil {
        c.mu.Unlock()
        return
    }
    state := c.currentNodeState()
    c.mu.Unlock()

    if state.IsActiveManager() {
        active, reachable, unreachable, err := managerStats(state.controlClient, state.NodeID())
        if err == nil {
            singlenode := active && isLastManager(reachable, unreachable)
            if active && !singlenode && removingManagerCausesLossOfQuorum(reachable, unreachable) {
                log.G(context.TODO()).Errorf("Leaving cluster with %v managers left out of %v. Raft quorum will be lost.", reachable-1, reachable+unreachable)
            }
        }
    }

    if err := node.Stop(); err != nil {
        log.G(context.TODO()).Errorf("failed to shut down cluster node: %v", err)
        stack.Dump()
    }

    c.mu.Lock()
    c.nr = nil
    c.mu.Unlock()
}

func managerStats(client swarmapi.ControlClient, currentNodeID string) (current bool, reachable int, unreachable int, err error) {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    nodes, err := client.ListNodes(
        ctx, &swarmapi.ListNodesRequest{},
        grpc.MaxCallRecvMsgSize(defaultRecvSizeForListResponse),
    )
    if err != nil {
        return false, 0, 0, err
    }
    for _, n := range nodes.Nodes {
        if n.ManagerStatus != nil {
            if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_REACHABLE {
                reachable++
                if n.ID == currentNodeID {
                    current = true
                }
            }
            if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_UNREACHABLE {
                unreachable++
            }
        }
    }
    return
}

func detectLockedError(err error) error {
    if err == swarmnode.ErrInvalidUnlockKey {
        return errors.WithStack(errSwarmLocked)
    }
    return err
}

func (c *Cluster) lockedManagerAction(fn func(ctx context.Context, state nodeState) error) error {
    c.mu.RLock()
    defer c.mu.RUnlock()

    state := c.currentNodeState()
    if !state.IsActiveManager() {
        return c.errNoManager(state)
    }

    ctx := context.TODO()
    ctx, cancel := context.WithTimeout(ctx, swarmRequestTimeout)
    defer cancel()

    return fn(ctx, state)
}

// SendClusterEvent allows to send cluster events on the configEvent channel
// TODO This method should not be exposed.
// Currently it is used to notify the network controller that the keys are
// available
func (c *Cluster) SendClusterEvent(event lncluster.ConfigEventType) {
    c.mu.RLock()
    defer c.mu.RUnlock()
    c.configEvent <- event
}