dotcloud/docker

View on GitHub
daemon/cluster/swarm.go

Summary

Maintainability
D
2 days
Test Coverage
package cluster // import "github.com/docker/docker/daemon/cluster"

import (
    "context"
    "fmt"
    "net"
    "strings"
    "time"

    "github.com/containerd/log"
    "github.com/docker/docker/api/types/backend"
    "github.com/docker/docker/api/types/container"
    "github.com/docker/docker/api/types/filters"
    types "github.com/docker/docker/api/types/swarm"
    "github.com/docker/docker/daemon/cluster/convert"
    "github.com/docker/docker/errdefs"
    "github.com/docker/docker/opts"
    "github.com/docker/docker/pkg/stack"
    swarmapi "github.com/moby/swarmkit/v2/api"
    "github.com/moby/swarmkit/v2/manager/encryption"
    swarmnode "github.com/moby/swarmkit/v2/node"
    "github.com/pkg/errors"
    "google.golang.org/grpc"
)

// Init initializes new cluster from user provided request.
func (c *Cluster) Init(req types.InitRequest) (string, error) {
    c.controlMutex.Lock()
    defer c.controlMutex.Unlock()
    if c.nr != nil {
        if req.ForceNewCluster {

            // Take c.mu temporarily to wait for presently running
            // API handlers to finish before shutting down the node.
            c.mu.Lock()
            if !c.nr.nodeState.IsManager() {
                c.mu.Unlock()
                return "", errSwarmNotManager
            }
            c.mu.Unlock()

            if err := c.nr.Stop(); err != nil {
                return "", err
            }
        } else {
            return "", errSwarmExists
        }
    }

    if err := validateAndSanitizeInitRequest(&req); err != nil {
        return "", errdefs.InvalidParameter(err)
    }

    listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
    if err != nil {
        return "", errdefs.InvalidParameter(err)
    }

    advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
    if err != nil {
        return "", err
    }

    dataPathAddr, err := resolveDataPathAddr(req.DataPathAddr)
    if err != nil {
        return "", err
    }

    localAddr := listenHost

    // If the local address is undetermined, the advertise address
    // will be used as local address, if it belongs to this system.
    // If the advertise address is not local, then we try to find
    // a system address to use as local address. If this fails,
    // we give up and ask the user to pass the listen address.
    if net.ParseIP(localAddr).IsUnspecified() {
        advertiseIP := net.ParseIP(advertiseHost)

        found := false
        for _, systemIP := range listSystemIPs() {
            if systemIP.Equal(advertiseIP) {
                localAddr = advertiseIP.String()
                found = true
                break
            }
        }

        if !found {
            ip, err := c.resolveSystemAddr()
            if err != nil {
                log.G(context.TODO()).Warnf("Could not find a local address: %v", err)
                return "", errMustSpecifyListenAddr
            }
            localAddr = ip.String()
        }
    }

    if err := validateDefaultAddrPool(req.DefaultAddrPool, req.SubnetSize); err != nil {
        return "", err
    }

    port, err := getDataPathPort(req.DataPathPort)
    if err != nil {
        return "", err
    }

    nr, err := c.newNodeRunner(nodeStartConfig{
        forceNewCluster:    req.ForceNewCluster,
        autolock:           req.AutoLockManagers,
        LocalAddr:          localAddr,
        ListenAddr:         net.JoinHostPort(listenHost, listenPort),
        AdvertiseAddr:      net.JoinHostPort(advertiseHost, advertisePort),
        DataPathAddr:       dataPathAddr,
        DefaultAddressPool: req.DefaultAddrPool,
        SubnetSize:         req.SubnetSize,
        availability:       req.Availability,
        DataPathPort:       port,
    })
    if err != nil {
        return "", err
    }
    c.mu.Lock()
    c.nr = nr
    c.mu.Unlock()

    if err := <-nr.Ready(); err != nil {
        c.mu.Lock()
        c.nr = nil
        c.mu.Unlock()
        if !req.ForceNewCluster { // if failure on first attempt don't keep state
            if err := clearPersistentState(c.root); err != nil {
                return "", err
            }
        }
        return "", err
    }
    state := nr.State()
    if state.swarmNode == nil { // should never happen but protect from panic
        return "", errors.New("invalid cluster state for spec initialization")
    }
    if err := initClusterSpec(state.swarmNode, req.Spec); err != nil {
        return "", err
    }
    return state.NodeID(), nil
}

// Join makes current Cluster part of an existing swarm cluster.
func (c *Cluster) Join(req types.JoinRequest) error {
    c.controlMutex.Lock()
    defer c.controlMutex.Unlock()
    c.mu.Lock()
    if c.nr != nil {
        c.mu.Unlock()
        return errors.WithStack(errSwarmExists)
    }
    c.mu.Unlock()

    if err := validateAndSanitizeJoinRequest(&req); err != nil {
        return errdefs.InvalidParameter(err)
    }

    listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
    if err != nil {
        return err
    }

    var advertiseAddr string
    if req.AdvertiseAddr != "" {
        advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
        // For joining, we don't need to provide an advertise address,
        // since the remote side can detect it.
        if err == nil {
            advertiseAddr = net.JoinHostPort(advertiseHost, advertisePort)
        }
    }

    dataPathAddr, err := resolveDataPathAddr(req.DataPathAddr)
    if err != nil {
        return err
    }

    nr, err := c.newNodeRunner(nodeStartConfig{
        RemoteAddr:    req.RemoteAddrs[0],
        ListenAddr:    net.JoinHostPort(listenHost, listenPort),
        AdvertiseAddr: advertiseAddr,
        DataPathAddr:  dataPathAddr,
        joinAddr:      req.RemoteAddrs[0],
        joinToken:     req.JoinToken,
        availability:  req.Availability,
    })
    if err != nil {
        return err
    }

    c.mu.Lock()
    c.nr = nr
    c.mu.Unlock()

    timeout := time.NewTimer(swarmConnectTimeout)
    defer timeout.Stop()

    select {
    case <-timeout.C:
        return errSwarmJoinTimeoutReached
    case err := <-nr.Ready():
        if err != nil {
            c.mu.Lock()
            c.nr = nil
            c.mu.Unlock()
            if err := clearPersistentState(c.root); err != nil {
                return err
            }
        }
        return err
    }
}

// Inspect retrieves the configuration properties of a managed swarm cluster.
func (c *Cluster) Inspect() (types.Swarm, error) {
    var swarm types.Swarm
    if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
        s, err := c.inspect(ctx, state)
        if err != nil {
            return err
        }
        swarm = s
        return nil
    }); err != nil {
        return types.Swarm{}, err
    }
    return swarm, nil
}

func (c *Cluster) inspect(ctx context.Context, state nodeState) (types.Swarm, error) {
    s, err := getSwarm(ctx, state.controlClient)
    if err != nil {
        return types.Swarm{}, err
    }
    return convert.SwarmFromGRPC(*s), nil
}

// Update updates configuration of a managed swarm cluster.
func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error {
    return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
        swarm, err := getSwarm(ctx, state.controlClient)
        if err != nil {
            return err
        }

        // Validate spec name.
        if spec.Annotations.Name == "" {
            spec.Annotations.Name = "default"
        } else if spec.Annotations.Name != "default" {
            return errdefs.InvalidParameter(errors.New(`swarm spec must be named "default"`))
        }

        // In update, client should provide the complete spec of the swarm, including
        // Name and Labels. If a field is specified with 0 or nil, then the default value
        // will be used to swarmkit.
        clusterSpec, err := convert.SwarmSpecToGRPC(spec)
        if err != nil {
            return errdefs.InvalidParameter(err)
        }

        _, err = state.controlClient.UpdateCluster(
            ctx,
            &swarmapi.UpdateClusterRequest{
                ClusterID: swarm.ID,
                Spec:      &clusterSpec,
                ClusterVersion: &swarmapi.Version{
                    Index: version,
                },
                Rotation: swarmapi.KeyRotation{
                    WorkerJoinToken:  flags.RotateWorkerToken,
                    ManagerJoinToken: flags.RotateManagerToken,
                    ManagerUnlockKey: flags.RotateManagerUnlockKey,
                },
            },
        )
        return err
    })
}

// GetUnlockKey returns the unlock key for the swarm.
func (c *Cluster) GetUnlockKey() (string, error) {
    var resp *swarmapi.GetUnlockKeyResponse
    if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
        client := swarmapi.NewCAClient(state.grpcConn)

        r, err := client.GetUnlockKey(ctx, &swarmapi.GetUnlockKeyRequest{})
        if err != nil {
            return err
        }
        resp = r
        return nil
    }); err != nil {
        return "", err
    }
    if len(resp.UnlockKey) == 0 {
        // no key
        return "", nil
    }
    return encryption.HumanReadableKey(resp.UnlockKey), nil
}

// UnlockSwarm provides a key to decrypt data that is encrypted at rest.
func (c *Cluster) UnlockSwarm(req types.UnlockRequest) error {
    c.controlMutex.Lock()
    defer c.controlMutex.Unlock()

    c.mu.RLock()
    state := c.currentNodeState()

    if !state.IsActiveManager() {
        // when manager is not active,
        // unless it is locked, otherwise return error.
        if err := c.errNoManager(state); err != errSwarmLocked {
            c.mu.RUnlock()
            return err
        }
    } else {
        // when manager is active, return an error of "not locked"
        c.mu.RUnlock()
        return notLockedError{}
    }

    // only when swarm is locked, code running reaches here
    nr := c.nr
    c.mu.RUnlock()

    key, err := encryption.ParseHumanReadableKey(req.UnlockKey)
    if err != nil {
        return errdefs.InvalidParameter(err)
    }

    config := nr.config
    config.lockKey = key
    if err := nr.Stop(); err != nil {
        return err
    }
    nr, err = c.newNodeRunner(config)
    if err != nil {
        return err
    }

    c.mu.Lock()
    c.nr = nr
    c.mu.Unlock()

    if err := <-nr.Ready(); err != nil {
        if errors.Is(err, errSwarmLocked) {
            return invalidUnlockKey{}
        }
        return errors.Errorf("swarm component could not be started: %v", err)
    }
    return nil
}

// Leave shuts down Cluster and removes current state.
func (c *Cluster) Leave(ctx context.Context, force bool) error {
    c.controlMutex.Lock()
    defer c.controlMutex.Unlock()

    c.mu.Lock()
    nr := c.nr
    if nr == nil {
        c.mu.Unlock()
        return errors.WithStack(errNoSwarm)
    }

    state := c.currentNodeState()

    c.mu.Unlock()

    if errors.Is(state.err, errSwarmLocked) && !force {
        // leave a locked swarm without --force is not allowed
        return errors.WithStack(notAvailableError("Swarm is encrypted and locked. Please unlock it first or use `--force` to ignore this message."))
    }

    if state.IsManager() && !force {
        msg := "You are attempting to leave the swarm on a node that is participating as a manager. "
        if state.IsActiveManager() {
            active, reachable, unreachable, err := managerStats(state.controlClient, state.NodeID())
            if err == nil {
                if active && removingManagerCausesLossOfQuorum(reachable, unreachable) {
                    if isLastManager(reachable, unreachable) {
                        msg += "Removing the last manager erases all current state of the swarm. Use `--force` to ignore this message. "
                        return errors.WithStack(notAvailableError(msg))
                    }
                    msg += fmt.Sprintf("Removing this node leaves %v managers out of %v. Without a Raft quorum your swarm will be inaccessible. ", reachable-1, reachable+unreachable)
                }
            }
        } else {
            msg += "Doing so may lose the consensus of your cluster. "
        }

        msg += "The only way to restore a swarm that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to suppress this message."
        return errors.WithStack(notAvailableError(msg))
    }
    // release readers in here
    if err := nr.Stop(); err != nil {
        log.G(ctx).Errorf("failed to shut down cluster node: %v", err)
        stack.Dump()
        return err
    }

    c.mu.Lock()
    c.nr = nil
    c.mu.Unlock()

    if nodeID := state.NodeID(); nodeID != "" {
        nodeContainers, err := c.listContainerForNode(ctx, nodeID)
        if err != nil {
            return err
        }
        for _, id := range nodeContainers {
            if err := c.config.Backend.ContainerRm(id, &backend.ContainerRmConfig{ForceRemove: true}); err != nil {
                log.G(ctx).Errorf("error removing %v: %v", id, err)
            }
        }
    }

    // todo: cleanup optional?
    if err := clearPersistentState(c.root); err != nil {
        return err
    }
    c.config.Backend.DaemonLeavesCluster()
    return nil
}

// Info returns information about the current cluster state.
func (c *Cluster) Info(ctx context.Context) types.Info {
    info := types.Info{
        NodeAddr: c.GetAdvertiseAddress(),
    }
    c.mu.RLock()
    defer c.mu.RUnlock()

    state := c.currentNodeState()
    info.LocalNodeState = state.status
    if state.err != nil {
        info.Error = state.err.Error()
    }

    ctx, cancel := context.WithTimeout(ctx, swarmRequestTimeout)
    defer cancel()

    if state.IsActiveManager() {
        info.ControlAvailable = true
        swarm, err := c.inspect(ctx, state)
        if err != nil {
            info.Error = err.Error()
        }

        info.Cluster = &swarm.ClusterInfo

        if r, err := state.controlClient.ListNodes(
            ctx, &swarmapi.ListNodesRequest{},
            grpc.MaxCallRecvMsgSize(defaultRecvSizeForListResponse),
        ); err != nil {
            info.Error = err.Error()
        } else {
            info.Nodes = len(r.Nodes)
            for _, n := range r.Nodes {
                if n.ManagerStatus != nil {
                    info.Managers = info.Managers + 1
                }
            }
        }

        switch info.LocalNodeState {
        case types.LocalNodeStateInactive, types.LocalNodeStateLocked, types.LocalNodeStateError:
            // nothing to do
        default:
            if info.Managers == 2 {
                const warn string = `WARNING: Running Swarm in a two-manager configuration. This configuration provides
         no fault tolerance, and poses a high risk to lose control over the cluster.
         Refer to https://docs.docker.com/engine/swarm/admin_guide/ to configure the
         Swarm for fault-tolerance.`

                info.Warnings = append(info.Warnings, warn)
            }
        }
    }

    if state.swarmNode != nil {
        for _, r := range state.swarmNode.Remotes() {
            info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr})
        }
        info.NodeID = state.swarmNode.NodeID()
    }

    return info
}

// Status returns a textual representation of the node's swarm status and role (manager/worker)
func (c *Cluster) Status() string {
    c.mu.RLock()
    s := c.currentNodeState()
    c.mu.RUnlock()

    state := string(s.status)
    if s.status == types.LocalNodeStateActive {
        if s.IsActiveManager() || s.IsManager() {
            state += "/manager"
        } else {
            state += "/worker"
        }
    }
    return state
}

func validateAndSanitizeInitRequest(req *types.InitRequest) error {
    var err error
    req.ListenAddr, err = validateAddr(req.ListenAddr)
    if err != nil {
        return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
    }

    if req.Spec.Annotations.Name == "" {
        req.Spec.Annotations.Name = "default"
    } else if req.Spec.Annotations.Name != "default" {
        return errors.New(`swarm spec must be named "default"`)
    }

    return nil
}

func validateAndSanitizeJoinRequest(req *types.JoinRequest) error {
    var err error
    req.ListenAddr, err = validateAddr(req.ListenAddr)
    if err != nil {
        return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
    }
    if len(req.RemoteAddrs) == 0 {
        return errors.New("at least 1 RemoteAddr is required to join")
    }
    for i := range req.RemoteAddrs {
        req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i])
        if err != nil {
            return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err)
        }
    }
    return nil
}

func validateAddr(addr string) (string, error) {
    if addr == "" {
        return addr, errors.New("invalid empty address")
    }
    newaddr, err := opts.ParseTCPAddr(addr, defaultAddr)
    if err != nil {
        // TODO(thaJeztah) why are we ignoring the error here? Is this to allow "non-tcp" addresses?
        return addr, nil
    }
    return strings.TrimPrefix(newaddr, "tcp://"), nil
}

func initClusterSpec(node *swarmnode.Node, spec types.Spec) error {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    for conn := range node.ListenControlSocket(ctx) {
        if ctx.Err() != nil {
            return ctx.Err()
        }
        if conn != nil {
            client := swarmapi.NewControlClient(conn)
            var cluster *swarmapi.Cluster
            for i := 0; ; i++ {
                lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{})
                if err != nil {
                    return fmt.Errorf("error on listing clusters: %v", err)
                }
                if len(lcr.Clusters) == 0 {
                    if i < 10 {
                        time.Sleep(200 * time.Millisecond)
                        continue
                    }
                    return errors.New("empty list of clusters was returned")
                }
                cluster = lcr.Clusters[0]
                break
            }
            // In init, we take the initial default values from swarmkit, and merge
            // any non nil or 0 value from spec to GRPC spec. This will leave the
            // default value alone.
            // Note that this is different from Update(), as in Update() we expect
            // user to specify the complete spec of the cluster (as they already know
            // the existing one and knows which field to update)
            clusterSpec, err := convert.MergeSwarmSpecToGRPC(spec, cluster.Spec)
            if err != nil {
                return fmt.Errorf("error updating cluster settings: %v", err)
            }
            _, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{
                ClusterID:      cluster.ID,
                ClusterVersion: &cluster.Meta.Version,
                Spec:           &clusterSpec,
            })
            if err != nil {
                return fmt.Errorf("error updating cluster settings: %v", err)
            }
            return nil
        }
    }
    return ctx.Err()
}

func (c *Cluster) listContainerForNode(ctx context.Context, nodeID string) ([]string, error) {
    var ids []string
    containers, err := c.config.Backend.Containers(ctx, &container.ListOptions{
        Filters: filters.NewArgs(filters.Arg("label", "com.docker.swarm.node.id="+nodeID)),
    })
    if err != nil {
        return []string{}, err
    }
    for _, c := range containers {
        ids = append(ids, c.ID)
    }
    return ids, nil
}