dotcloud/docker

View on GitHub
daemon/cluster/executor/container/executor.go

Summary

Maintainability
C
7 hrs
Test Coverage
package container // import "github.com/docker/docker/daemon/cluster/executor/container"

import (
    "context"
    "fmt"
    "sort"
    "strings"
    "sync"

    "github.com/containerd/log"
    "github.com/docker/docker/api/types/filters"
    "github.com/docker/docker/api/types/network"
    swarmtypes "github.com/docker/docker/api/types/swarm"
    "github.com/docker/docker/daemon/cluster/controllers/plugin"
    "github.com/docker/docker/daemon/cluster/convert"
    executorpkg "github.com/docker/docker/daemon/cluster/executor"
    clustertypes "github.com/docker/docker/daemon/cluster/provider"
    "github.com/docker/docker/libnetwork"
    networktypes "github.com/docker/docker/libnetwork/types"
    "github.com/moby/swarmkit/v2/agent"
    "github.com/moby/swarmkit/v2/agent/exec"
    "github.com/moby/swarmkit/v2/api"
    "github.com/moby/swarmkit/v2/api/naming"
    swarmlog "github.com/moby/swarmkit/v2/log"
    "github.com/moby/swarmkit/v2/template"
    "github.com/pkg/errors"
)

type executor struct {
    backend       executorpkg.Backend
    imageBackend  executorpkg.ImageBackend
    pluginBackend plugin.Backend
    volumeBackend executorpkg.VolumeBackend
    dependencies  exec.DependencyManager
    mutex         sync.Mutex // This mutex protects the following node field
    node          *api.NodeDescription

    // nodeObj holds a copy of the swarmkit Node object from the time of the
    // last call to executor.Configure. This allows us to discover which
    // network attachments the node previously had, which further allows us to
    // determine which, if any, need to be removed. nodeObj is not protected by
    // a mutex, because it is only written to in the method (Configure) that it
    // is read from. If that changes, it may need to be guarded.
    nodeObj *api.Node
}

// NewExecutor returns an executor from the docker client.
func NewExecutor(b executorpkg.Backend, p plugin.Backend, i executorpkg.ImageBackend, v executorpkg.VolumeBackend) exec.Executor {
    return &executor{
        backend:       b,
        pluginBackend: p,
        imageBackend:  i,
        volumeBackend: v,
        dependencies:  agent.NewDependencyManager(convert.SwarmPluginGetter(b.PluginGetter())),
    }
}

// Describe returns the underlying node description from the docker client.
func (e *executor) Describe(ctx context.Context) (*api.NodeDescription, error) {
    info, err := e.backend.SystemInfo(ctx)
    if err != nil {
        return nil, err
    }

    plugins := map[api.PluginDescription]struct{}{}
    addPlugins := func(typ string, names []string) {
        for _, name := range names {
            plugins[api.PluginDescription{
                Type: typ,
                Name: name,
            }] = struct{}{}
        }
    }

    // add v1 plugins
    addPlugins("Volume", info.Plugins.Volume)
    // Add builtin driver "overlay" (the only builtin multi-host driver) to
    // the plugin list by default.
    addPlugins("Network", append([]string{"overlay"}, info.Plugins.Network...))
    addPlugins("Authorization", info.Plugins.Authorization)
    addPlugins("Log", info.Plugins.Log)

    // add v2 plugins
    v2Plugins, err := e.backend.PluginManager().List(filters.NewArgs())
    if err == nil {
        for _, plgn := range v2Plugins {
            for _, typ := range plgn.Config.Interface.Types {
                if typ.Prefix != "docker" || !plgn.Enabled {
                    continue
                }
                plgnTyp := typ.Capability
                switch typ.Capability {
                case "volumedriver":
                    plgnTyp = "Volume"
                case "networkdriver":
                    plgnTyp = "Network"
                case "logdriver":
                    plgnTyp = "Log"
                }

                plugins[api.PluginDescription{
                    Type: plgnTyp,
                    Name: plgn.Name,
                }] = struct{}{}
            }
        }
    }

    pluginFields := make([]api.PluginDescription, 0, len(plugins))
    for k := range plugins {
        pluginFields = append(pluginFields, k)
    }

    sort.Sort(sortedPlugins(pluginFields))

    // parse []string labels into a map[string]string
    labels := map[string]string{}
    for _, l := range info.Labels {
        k, v, ok := strings.Cut(l, "=")
        // this will take the last value in the list for a given key
        // ideally, one shouldn't assign multiple values to the same key
        if ok {
            labels[k] = v
        }
    }

    // TODO(dperny): don't ignore the error here
    csiInfo, _ := e.Volumes().Plugins().NodeInfo(ctx)

    description := &api.NodeDescription{
        Hostname: info.Name,
        Platform: &api.Platform{
            Architecture: info.Architecture,
            OS:           info.OSType,
        },
        Engine: &api.EngineDescription{
            EngineVersion: info.ServerVersion,
            Labels:        labels,
            Plugins:       pluginFields,
        },
        Resources: &api.Resources{
            NanoCPUs:    int64(info.NCPU) * 1e9,
            MemoryBytes: info.MemTotal,
            Generic:     convert.GenericResourcesToGRPC(info.GenericResources),
        },
        CSIInfo: csiInfo,
    }

    // Save the node information in the executor field
    e.mutex.Lock()
    e.node = description
    e.mutex.Unlock()

    return description, nil
}

func (e *executor) Configure(ctx context.Context, node *api.Node) error {
    var ingressNA *api.NetworkAttachment
    attachments := make(map[string]string)

    for _, na := range node.Attachments {
        if na == nil || na.Network == nil || len(na.Addresses) == 0 {
            // this should not happen, but we got a panic here and don't have a
            // good idea about what the underlying data structure looks like.
            swarmlog.G(ctx).WithField("NetworkAttachment", fmt.Sprintf("%#v", na)).Warn("skipping nil or malformed node network attachment entry")
            continue
        }

        if na.Network.Spec.Ingress {
            ingressNA = na
        }

        attachments[na.Network.ID] = na.Addresses[0]
    }

    // discover which, if any, attachments have been removed.
    //
    // we aren't responsible directly for creating these networks. that is
    // handled indirectly when a container using that network is created.
    // however, when it comes time to remove the network, none of the relevant
    // tasks may exist anymore. this means we should go ahead and try to remove
    // any network we know to no longer be in use.

    // removeAttachments maps the network ID to a boolean. This boolean
    // indicates whether the attachment in question is totally removed (true),
    // or has just had its IP changed (false)
    removeAttachments := make(map[string]bool)

    // the first time we Configure, nodeObj wil be nil, because it will not be
    // set yet. in that case, skip this check.
    if e.nodeObj != nil {
        for _, na := range e.nodeObj.Attachments {
            // same thing as above, check sanity of the attachments so we don't
            // get a panic.
            if na == nil || na.Network == nil || len(na.Addresses) == 0 {
                swarmlog.G(ctx).WithField("NetworkAttachment", fmt.Sprintf("%#v", na)).Warn("skipping nil or malformed node network attachment entry")
                continue
            }

            // now, check if the attachment exists and shares the same IP address.
            if ip, ok := attachments[na.Network.ID]; !ok || na.Addresses[0] != ip {
                // if the map entry exists, then the network still exists, and the
                // IP must be what has changed
                removeAttachments[na.Network.ID] = !ok
            }
        }
    }

    if (ingressNA == nil) && (node.Attachment != nil) && (len(node.Attachment.Addresses) > 0) {
        ingressNA = node.Attachment
        attachments[ingressNA.Network.ID] = ingressNA.Addresses[0]
    }

    if ingressNA == nil {
        e.backend.ReleaseIngress()
        return e.backend.GetAttachmentStore().ResetAttachments(attachments)
    }

    options := network.CreateOptions{
        Driver: ingressNA.Network.DriverState.Name,
        IPAM: &network.IPAM{
            Driver: ingressNA.Network.IPAM.Driver.Name,
        },
        Options: ingressNA.Network.DriverState.Options,
        Ingress: true,
    }

    for _, ic := range ingressNA.Network.IPAM.Configs {
        c := network.IPAMConfig{
            Subnet:  ic.Subnet,
            IPRange: ic.Range,
            Gateway: ic.Gateway,
        }
        options.IPAM.Config = append(options.IPAM.Config, c)
    }

    _, err := e.backend.SetupIngress(clustertypes.NetworkCreateRequest{
        ID: ingressNA.Network.ID,
        CreateRequest: network.CreateRequest{
            Name:          ingressNA.Network.Spec.Annotations.Name,
            CreateOptions: options,
        },
    }, ingressNA.Addresses[0])
    if err != nil {
        return err
    }

    var (
        activeEndpointsError *libnetwork.ActiveEndpointsError
        errNoSuchNetwork     libnetwork.ErrNoSuchNetwork
    )

    // now, finally, remove any network LB attachments that we no longer have.
    for nw, gone := range removeAttachments {
        err := e.backend.DeleteManagedNetwork(nw)
        switch {
        case err == nil:
            continue
        case errors.As(err, &activeEndpointsError):
            // this is the purpose of the boolean in the map. it's literally
            // just to log an appropriate, informative error. i'm unsure if
            // this can ever actually occur, but we need to know if it does.
            if gone {
                swarmlog.G(ctx).Warnf("network %s should be removed, but still has active attachments", nw)
            } else {
                swarmlog.G(ctx).Warnf("network %s should have its node LB IP changed, but cannot be removed because of active attachments", nw)
            }
            continue
        case errors.As(err, &errNoSuchNetwork):
            // NoSuchNetworkError indicates the network is already gone.
            continue
        default:
            swarmlog.G(ctx).Errorf("network %s remove failed: %v", nw, err)
        }
    }

    // now update our copy of the node object, reset the attachment store, and
    // return
    e.nodeObj = node

    return e.backend.GetAttachmentStore().ResetAttachments(attachments)
}

// Controller returns a docker container runner.
func (e *executor) Controller(t *api.Task) (exec.Controller, error) {
    dependencyGetter := template.NewTemplatedDependencyGetter(agent.Restrict(e.dependencies, t), t, nil)

    // Get the node description from the executor field
    e.mutex.Lock()
    nodeDescription := e.node
    e.mutex.Unlock()

    if t.Spec.GetAttachment() != nil {
        return newNetworkAttacherController(e.backend, e.imageBackend, e.volumeBackend, t, nodeDescription, dependencyGetter)
    }

    var ctlr exec.Controller
    switch r := t.Spec.GetRuntime().(type) {
    case *api.TaskSpec_Generic:
        swarmlog.G(context.TODO()).WithFields(log.Fields{
            "kind":     r.Generic.Kind,
            "type_url": r.Generic.Payload.TypeUrl,
        }).Debug("custom runtime requested")
        runtimeKind, err := naming.Runtime(t.Spec)
        if err != nil {
            return ctlr, err
        }
        switch runtimeKind {
        case string(swarmtypes.RuntimePlugin):
            if !e.backend.HasExperimental() {
                return ctlr, fmt.Errorf("runtime type %q only supported in experimental", swarmtypes.RuntimePlugin)
            }
            c, err := plugin.NewController(e.pluginBackend, t)
            if err != nil {
                return ctlr, err
            }
            ctlr = c
        default:
            return ctlr, fmt.Errorf("unsupported runtime type: %q", runtimeKind)
        }
    case *api.TaskSpec_Container:
        c, err := newController(e.backend, e.imageBackend, e.volumeBackend, t, nodeDescription, dependencyGetter)
        if err != nil {
            return ctlr, err
        }
        ctlr = c
    default:
        return ctlr, fmt.Errorf("unsupported runtime: %q", r)
    }

    return ctlr, nil
}

func (e *executor) SetNetworkBootstrapKeys(keys []*api.EncryptionKey) error {
    nwKeys := []*networktypes.EncryptionKey{}
    for _, key := range keys {
        nwKey := &networktypes.EncryptionKey{
            Subsystem:   key.Subsystem,
            Algorithm:   int32(key.Algorithm),
            Key:         make([]byte, len(key.Key)),
            LamportTime: key.LamportTime,
        }
        copy(nwKey.Key, key.Key)
        nwKeys = append(nwKeys, nwKey)
    }
    e.backend.SetNetworkBootstrapKeys(nwKeys)

    return nil
}

func (e *executor) Secrets() exec.SecretsManager {
    return e.dependencies.Secrets()
}

func (e *executor) Configs() exec.ConfigsManager {
    return e.dependencies.Configs()
}

func (e *executor) Volumes() exec.VolumesManager {
    return e.dependencies.Volumes()
}

type sortedPlugins []api.PluginDescription

func (sp sortedPlugins) Len() int { return len(sp) }

func (sp sortedPlugins) Swap(i, j int) { sp[i], sp[j] = sp[j], sp[i] }

func (sp sortedPlugins) Less(i, j int) bool {
    if sp[i].Type != sp[j].Type {
        return sp[i].Type < sp[j].Type
    }
    return sp[i].Name < sp[j].Name
}