dotcloud/docker

View on GitHub
daemon/cluster/executor/container/adapter.go

Summary

Maintainability
B
6 hrs
Test Coverage
package container // import "github.com/docker/docker/daemon/cluster/executor/container"

import (
    "context"
    "encoding/base64"
    "encoding/json"
    "fmt"
    "io"
    "os"
    "strings"
    "syscall"
    "time"

    "github.com/containerd/log"
    "github.com/distribution/reference"
    "github.com/docker/docker/api/types/backend"
    containertypes "github.com/docker/docker/api/types/container"
    "github.com/docker/docker/api/types/events"
    "github.com/docker/docker/api/types/network"
    "github.com/docker/docker/api/types/registry"
    "github.com/docker/docker/container"
    "github.com/docker/docker/daemon"
    "github.com/docker/docker/daemon/cluster/convert"
    executorpkg "github.com/docker/docker/daemon/cluster/executor"
    networkSettings "github.com/docker/docker/daemon/network"
    "github.com/docker/docker/libnetwork"
    volumeopts "github.com/docker/docker/volume/service/opts"
    gogotypes "github.com/gogo/protobuf/types"
    "github.com/moby/swarmkit/v2/agent/exec"
    "github.com/moby/swarmkit/v2/api"
    swarmlog "github.com/moby/swarmkit/v2/log"
    "github.com/opencontainers/go-digest"
    "github.com/pkg/errors"
    "golang.org/x/time/rate"
)

// nodeAttachmentReadyInterval is the interval to poll
const nodeAttachmentReadyInterval = 100 * time.Millisecond

// containerAdapter conducts remote operations for a container. All calls
// are mostly naked calls to the client API, seeded with information from
// containerConfig.
type containerAdapter struct {
    backend       executorpkg.Backend
    imageBackend  executorpkg.ImageBackend
    volumeBackend executorpkg.VolumeBackend
    container     *containerConfig
    dependencies  exec.DependencyGetter
}

func newContainerAdapter(b executorpkg.Backend, i executorpkg.ImageBackend, v executorpkg.VolumeBackend, task *api.Task, node *api.NodeDescription, dependencies exec.DependencyGetter) (*containerAdapter, error) {
    ctnr, err := newContainerConfig(task, node)
    if err != nil {
        return nil, err
    }

    return &containerAdapter{
        container:     ctnr,
        backend:       b,
        imageBackend:  i,
        volumeBackend: v,
        dependencies:  dependencies,
    }, nil
}

func (c *containerAdapter) pullImage(ctx context.Context) error {
    spec := c.container.spec()

    // Skip pulling if the image is referenced by image ID.
    if _, err := digest.Parse(spec.Image); err == nil {
        return nil
    }

    // Skip pulling if the image is referenced by digest and already
    // exists locally.
    named, err := reference.ParseNormalizedNamed(spec.Image)
    if err == nil {
        if _, ok := named.(reference.Canonical); ok {
            _, err := c.imageBackend.GetImage(ctx, spec.Image, backend.GetImageOpts{})
            if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
                return err
            }
            if err == nil {
                return nil
            }
        }
    }

    // if the image needs to be pulled, the auth config will be retrieved and updated
    var encodedAuthConfig string
    if spec.PullOptions != nil {
        encodedAuthConfig = spec.PullOptions.RegistryAuth
    }

    authConfig := &registry.AuthConfig{}
    if encodedAuthConfig != "" {
        if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuthConfig))).Decode(authConfig); err != nil {
            swarmlog.G(ctx).Warnf("invalid authconfig: %v", err)
        }
    }

    pr, pw := io.Pipe()
    metaHeaders := map[string][]string{}
    go func() {
        // TODO LCOW Support: This will need revisiting as
        // the stack is built up to include LCOW support for swarm.

        // Make sure the image has a tag, otherwise it will pull all tags.
        ref := reference.TagNameOnly(named)
        err := c.imageBackend.PullImage(ctx, ref, nil, metaHeaders, authConfig, pw)
        pw.CloseWithError(err)
    }()

    dec := json.NewDecoder(pr)
    dec.UseNumber()
    m := map[string]interface{}{}
    spamLimiter := rate.NewLimiter(rate.Every(time.Second), 1)

    lastStatus := ""
    for {
        if err := dec.Decode(&m); err != nil {
            if err == io.EOF {
                break
            }
            return err
        }
        l := swarmlog.G(ctx)
        // limit pull progress logs unless the status changes
        if spamLimiter.Allow() || lastStatus != m["status"] {
            // if we have progress details, we have everything we need
            if progress, ok := m["progressDetail"].(map[string]interface{}); ok {
                // first, log the image and status
                l = l.WithFields(log.Fields{
                    "image":  c.container.image(),
                    "status": m["status"],
                })
                // then, if we have progress, log the progress
                if progress["current"] != nil && progress["total"] != nil {
                    l = l.WithFields(log.Fields{
                        "current": progress["current"],
                        "total":   progress["total"],
                    })
                }
            }
            l.Debug("pull in progress")
        }
        // sometimes, we get no useful information at all, and add no fields
        if status, ok := m["status"].(string); ok {
            lastStatus = status
        }
    }

    // if the final stream object contained an error, return it
    if errMsg, ok := m["error"]; ok {
        return fmt.Errorf("%v", errMsg)
    }
    return nil
}

// waitNodeAttachments validates that NetworkAttachments exist on this node
// for every network in use by this task. It blocks until the network
// attachments are ready, or the context times out. If it returns nil, then the
// node's network attachments are all there.
func (c *containerAdapter) waitNodeAttachments(ctx context.Context) error {
    // to do this, we're going to get the attachment store and try getting the
    // IP address for each network. if any network comes back not existing,
    // we'll wait and try again.
    attachmentStore := c.backend.GetAttachmentStore()
    if attachmentStore == nil {
        return fmt.Errorf("error getting attachment store")
    }

    // essentially, we're long-polling here. this is really sub-optimal, but a
    // better solution based off signaling channels would require a more
    // substantial rearchitecture and probably not be worth our time in terms
    // of performance gains.
    poll := time.NewTicker(nodeAttachmentReadyInterval)
    defer poll.Stop()
    for {
        // set a flag ready to true. if we try to get a network IP that doesn't
        // exist yet, we will set this flag to "false"
        ready := true
        for _, attachment := range c.container.networksAttachments {
            // we only need node attachments (IP address) for overlay networks
            // TODO(dperny): unsure if this will work with other network
            // drivers, but i also don't think other network drivers use the
            // node attachment IP address.
            if attachment.Network.DriverState.Name == "overlay" {
                if _, exists := attachmentStore.GetIPForNetwork(attachment.Network.ID); !exists {
                    ready = false
                }
            }
        }

        // if everything is ready here, then we can just return no error
        if ready {
            return nil
        }

        // otherwise, try polling again, or wait for context canceled.
        select {
        case <-ctx.Done():
            return fmt.Errorf("node is missing network attachments, ip addresses may be exhausted")
        case <-poll.C:
        }
    }
}

func (c *containerAdapter) createNetworks(ctx context.Context) error {
    for name := range c.container.networksAttachments {
        ncr, err := c.container.networkCreateRequest(name)
        if err != nil {
            return err
        }

        if err := c.backend.CreateManagedNetwork(ncr); err != nil { // todo name missing
            if _, ok := err.(libnetwork.NetworkNameError); ok {
                continue
            }
            // We will continue if CreateManagedNetwork returns PredefinedNetworkError error.
            // Other callers still can treat it as Error.
            if _, ok := err.(daemon.PredefinedNetworkError); ok {
                continue
            }
            return err
        }
    }

    return nil
}

func (c *containerAdapter) removeNetworks(ctx context.Context) error {
    var (
        activeEndpointsError *libnetwork.ActiveEndpointsError
        errNoSuchNetwork     libnetwork.ErrNoSuchNetwork
    )

    for name, v := range c.container.networksAttachments {
        if err := c.backend.DeleteManagedNetwork(v.Network.ID); err != nil {
            switch {
            case errors.As(err, &activeEndpointsError):
                continue
            case errors.As(err, &errNoSuchNetwork):
                continue
            default:
                swarmlog.G(ctx).Errorf("network %s remove failed: %v", name, err)
                return err
            }
        }
    }

    return nil
}

func (c *containerAdapter) networkAttach(ctx context.Context) error {
    config := c.container.createNetworkingConfig(c.backend)

    var (
        networkName string
        networkID   string
    )

    if config != nil {
        for n, epConfig := range config.EndpointsConfig {
            networkName = n
            networkID = epConfig.NetworkID
            break
        }
    }

    return c.backend.UpdateAttachment(networkName, networkID, c.container.networkAttachmentContainerID(), config)
}

func (c *containerAdapter) waitForDetach(ctx context.Context) error {
    config := c.container.createNetworkingConfig(c.backend)

    var (
        networkName string
        networkID   string
    )

    if config != nil {
        for n, epConfig := range config.EndpointsConfig {
            networkName = n
            networkID = epConfig.NetworkID
            break
        }
    }

    return c.backend.WaitForDetachment(ctx, networkName, networkID, c.container.taskID(), c.container.networkAttachmentContainerID())
}

func (c *containerAdapter) create(ctx context.Context) error {
    hostConfig := c.container.hostConfig(c.dependencies.Volumes())
    netConfig := c.container.createNetworkingConfig(c.backend)

    // We need to make sure no empty string or "default" NetworkMode is
    // provided to the daemon as it doesn't support them.
    //
    // This is in line with what the ContainerCreate API endpoint does, but
    // unlike that endpoint we can't do that in the ServiceCreate endpoint as
    // the cluster leader and the current node might not be running on the same
    // OS. Since the normalized value isn't the same on Windows and Linux, we
    // need to make this normalization happen once we're sure we won't make a
    // cross-OS API call.
    if hostConfig.NetworkMode == "" || hostConfig.NetworkMode.IsDefault() {
        hostConfig.NetworkMode = networkSettings.DefaultNetwork
        if v, ok := netConfig.EndpointsConfig[network.NetworkDefault]; ok {
            delete(netConfig.EndpointsConfig, network.NetworkDefault)
            netConfig.EndpointsConfig[networkSettings.DefaultNetwork] = v
        }
    }

    var cr containertypes.CreateResponse
    var err error
    if cr, err = c.backend.CreateManagedContainer(ctx, backend.ContainerCreateConfig{
        Name:       c.container.name(),
        Config:     c.container.config(),
        HostConfig: hostConfig,
        // Use the first network in container create
        NetworkingConfig: netConfig,
    }); err != nil {
        return err
    }

    ctr := c.container.task.Spec.GetContainer()
    if ctr == nil {
        return errors.New("unable to get container from task spec")
    }

    if err := c.backend.SetContainerDependencyStore(cr.ID, c.dependencies); err != nil {
        return err
    }

    // configure secrets
    secretRefs := convert.SecretReferencesFromGRPC(ctr.Secrets)
    if err := c.backend.SetContainerSecretReferences(cr.ID, secretRefs); err != nil {
        return err
    }

    configRefs := convert.ConfigReferencesFromGRPC(ctr.Configs)
    if err := c.backend.SetContainerConfigReferences(cr.ID, configRefs); err != nil {
        return err
    }

    return c.backend.UpdateContainerServiceConfig(cr.ID, c.container.serviceConfig())
}

// checkMounts ensures that the provided mounts won't have any host-specific
// problems at start up. For example, we disallow bind mounts without an
// existing path, which slightly different from the container API.
func (c *containerAdapter) checkMounts() error {
    spec := c.container.spec()
    for _, mount := range spec.Mounts {
        switch mount.Type {
        case api.MountTypeBind:
            if _, err := os.Stat(mount.Source); os.IsNotExist(err) {
                return fmt.Errorf("invalid bind mount source, source path not found: %s", mount.Source)
            }
        }
    }

    return nil
}

func (c *containerAdapter) start(ctx context.Context) error {
    if err := c.checkMounts(); err != nil {
        return err
    }

    return c.backend.ContainerStart(ctx, c.container.name(), "", "")
}

func (c *containerAdapter) inspect(ctx context.Context) (containertypes.InspectResponse, error) {
    cs, err := c.backend.ContainerInspect(ctx, c.container.name(), backend.ContainerInspectOptions{})
    if ctx.Err() != nil {
        return containertypes.InspectResponse{}, ctx.Err()
    }
    if err != nil {
        return containertypes.InspectResponse{}, err
    }
    return *cs, nil
}

// events issues a call to the events API and returns a channel with all
// events. The stream of events can be shutdown by cancelling the context.
func (c *containerAdapter) events(ctx context.Context) <-chan events.Message {
    swarmlog.G(ctx).Debugf("waiting on events")
    buffer, l := c.backend.SubscribeToEvents(time.Time{}, time.Time{}, c.container.eventFilter())
    eventsq := make(chan events.Message, len(buffer))

    for _, event := range buffer {
        eventsq <- event
    }

    go func() {
        defer c.backend.UnsubscribeFromEvents(l)

        for {
            select {
            case ev := <-l:
                jev, ok := ev.(events.Message)
                if !ok {
                    swarmlog.G(ctx).Warnf("unexpected event message: %q", ev)
                    continue
                }
                select {
                case eventsq <- jev:
                case <-ctx.Done():
                    return
                }
            case <-ctx.Done():
                return
            }
        }
    }()

    return eventsq
}

func (c *containerAdapter) wait(ctx context.Context) (<-chan container.StateStatus, error) {
    return c.backend.ContainerWait(ctx, c.container.nameOrID(), container.WaitConditionNotRunning)
}

func (c *containerAdapter) shutdown(ctx context.Context) error {
    options := containertypes.StopOptions{}
    // Default stop grace period to nil (daemon will use the stopTimeout of the container)
    if spec := c.container.spec(); spec.StopGracePeriod != nil {
        timeout := int(spec.StopGracePeriod.Seconds)
        options.Timeout = &timeout
    }
    return c.backend.ContainerStop(ctx, c.container.name(), options)
}

func (c *containerAdapter) terminate(ctx context.Context) error {
    return c.backend.ContainerKill(c.container.name(), syscall.SIGKILL.String())
}

func (c *containerAdapter) remove(ctx context.Context) error {
    return c.backend.ContainerRm(c.container.name(), &backend.ContainerRmConfig{
        RemoveVolume: true,
        ForceRemove:  true,
    })
}

func (c *containerAdapter) createVolumes(ctx context.Context) error {
    // Create plugin volumes that are embedded inside a Mount
    for _, mount := range c.container.task.Spec.GetContainer().Mounts {
        mount := mount
        if mount.Type != api.MountTypeVolume {
            continue
        }

        if mount.VolumeOptions == nil {
            continue
        }

        if mount.VolumeOptions.DriverConfig == nil {
            continue
        }

        req := c.container.volumeCreateRequest(&mount)

        // Check if this volume exists on the engine
        if _, err := c.volumeBackend.Create(ctx, req.Name, req.Driver,
            volumeopts.WithCreateOptions(req.DriverOpts),
            volumeopts.WithCreateLabels(req.Labels),
        ); err != nil {
            // TODO(amitshukla): Today, volume create through the engine api does not return an error
            // when the named volume with the same parameters already exists.
            // It returns an error if the driver name is different - that is a valid error
            return err
        }
    }

    return nil
}

// waitClusterVolumes blocks until the VolumeGetter returns a path for each
// cluster volume in use by this task
func (c *containerAdapter) waitClusterVolumes(ctx context.Context) error {
    for _, attached := range c.container.task.Volumes {
        // for every attachment, try until we succeed or until the context
        // is canceled.
        for {
            select {
            case <-ctx.Done():
                return ctx.Err()
            default:
                // continue through the code.
            }
            path, err := c.dependencies.Volumes().Get(attached.ID)
            if err == nil && path != "" {
                // break out of the inner-most loop
                break
            }
        }
    }
    swarmlog.G(ctx).Debug("volumes ready")
    return nil
}

func (c *containerAdapter) activateServiceBinding() error {
    return c.backend.ActivateContainerServiceBinding(c.container.name())
}

func (c *containerAdapter) deactivateServiceBinding() error {
    return c.backend.DeactivateContainerServiceBinding(c.container.name())
}

func (c *containerAdapter) logs(ctx context.Context, options api.LogSubscriptionOptions) (<-chan *backend.LogMessage, error) {
    apiOptions := &containertypes.LogsOptions{
        Follow: options.Follow,

        // Always say yes to Timestamps and Details. we make the decision
        // of whether to return these to the user or not way higher up the
        // stack.
        Timestamps: true,
        Details:    true,
    }

    if options.Since != nil {
        since, err := gogotypes.TimestampFromProto(options.Since)
        if err != nil {
            return nil, err
        }
        // print since as this formatted string because the docker container
        // logs interface expects it like this.
        // see github.com/docker/docker/api/types/time.ParseTimestamps
        apiOptions.Since = fmt.Sprintf("%d.%09d", since.Unix(), int64(since.Nanosecond()))
    }

    if options.Tail < 0 {
        // See protobuf documentation for details of how this works.
        apiOptions.Tail = fmt.Sprint(-options.Tail - 1)
    } else if options.Tail > 0 {
        return nil, errors.New("tail relative to start of logs not supported via docker API")
    }

    if len(options.Streams) == 0 {
        // empty == all
        apiOptions.ShowStdout, apiOptions.ShowStderr = true, true
    } else {
        for _, stream := range options.Streams {
            switch stream {
            case api.LogStreamStdout:
                apiOptions.ShowStdout = true
            case api.LogStreamStderr:
                apiOptions.ShowStderr = true
            }
        }
    }
    msgs, _, err := c.backend.ContainerLogs(ctx, c.container.name(), apiOptions)
    if err != nil {
        return nil, err
    }
    return msgs, nil
}