swarmd/dockerexec/adapter.go
package dockerexec
import (
"context"
"encoding/json"
"fmt"
"io"
"strings"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/events"
engineapi "github.com/docker/docker/client"
gogotypes "github.com/gogo/protobuf/types"
"github.com/moby/swarmkit/v2/agent/exec"
"github.com/moby/swarmkit/v2/api"
"github.com/moby/swarmkit/v2/log"
"github.com/pkg/errors"
"golang.org/x/time/rate"
)
// containerAdapter conducts remote operations for a container. All calls
// are mostly naked calls to the client API, seeded with information from
// containerConfig.
type containerAdapter struct {
client engineapi.APIClient
container *containerConfig
secrets exec.SecretGetter
}
func newContainerAdapter(client engineapi.APIClient, nodeDescription *api.NodeDescription, task *api.Task, secrets exec.SecretGetter) (*containerAdapter, error) {
ctnr, err := newContainerConfig(nodeDescription, task)
if err != nil {
return nil, err
}
return &containerAdapter{
client: client,
container: ctnr,
secrets: secrets,
}, nil
}
func noopPrivilegeFn() (string, error) { return "", nil }
func (c *containerConfig) imagePullOptions() types.ImagePullOptions {
var registryAuth string
if c.spec().PullOptions != nil {
registryAuth = c.spec().PullOptions.RegistryAuth
}
return types.ImagePullOptions{
// if the image needs to be pulled, the auth config will be retrieved and updated
RegistryAuth: registryAuth,
PrivilegeFunc: noopPrivilegeFn,
}
}
func (c *containerAdapter) pullImage(ctx context.Context) error {
rc, err := c.client.ImagePull(ctx, c.container.image(), c.container.imagePullOptions())
if err != nil {
return err
}
dec := json.NewDecoder(rc)
dec.UseNumber()
m := map[string]interface{}{}
spamLimiter := rate.NewLimiter(rate.Every(1000*time.Millisecond), 1)
lastStatus := ""
for {
if err := dec.Decode(&m); err != nil {
if err == io.EOF {
break
}
return err
}
l := log.G(ctx)
// limit pull progress logs unless the status changes
if spamLimiter.Allow() || lastStatus != m["status"] {
// if we have progress details, we have everything we need
if progress, ok := m["progressDetail"].(map[string]interface{}); ok {
// first, log the image and status
l = l.WithFields(log.Fields{
"image": c.container.image(),
"status": m["status"],
})
// then, if we have progress, log the progress
if progress["current"] != nil && progress["total"] != nil {
l = l.WithFields(log.Fields{
"current": progress["current"],
"total": progress["total"],
})
}
}
l.Debug("pull in progress")
}
// sometimes, we get no useful information at all, and add no fields
if status, ok := m["status"].(string); ok {
lastStatus = status
}
}
// if the final stream object contained an error, return it
if errMsg, ok := m["error"]; ok {
return errors.Errorf("%v", errMsg)
}
return nil
}
func (c *containerAdapter) createNetworks(ctx context.Context) error {
for _, network := range c.container.networks() {
opts, err := c.container.networkCreateOptions(network)
if err != nil {
return err
}
if _, err := c.client.NetworkCreate(ctx, network, opts); err != nil {
if isNetworkExistError(err, network) {
continue
}
return err
}
}
return nil
}
func (c *containerAdapter) removeNetworks(ctx context.Context) error {
for _, nid := range c.container.networks() {
if err := c.client.NetworkRemove(ctx, nid); err != nil {
if isActiveEndpointError(err) {
continue
}
log.G(ctx).Errorf("network %s remove failed", nid)
return err
}
}
return nil
}
func (c *containerAdapter) create(ctx context.Context) error {
_, err := c.client.ContainerCreate(ctx,
c.container.config(),
c.container.hostConfig(),
c.container.networkingConfig(),
nil,
c.container.name(),
)
return err
}
func (c *containerAdapter) start(ctx context.Context) error {
// TODO(nishanttotla): Consider adding checkpoint handling later
return c.client.ContainerStart(ctx, c.container.name(), types.ContainerStartOptions{})
}
func (c *containerAdapter) inspect(ctx context.Context) (types.ContainerJSON, error) {
return c.client.ContainerInspect(ctx, c.container.name())
}
// events issues a call to the events API and returns a channel with all
// events. The stream of events can be shutdown by cancelling the context.
//
// A chan struct{} is returned that will be closed if the event processing
// fails and needs to be restarted.
func (c *containerAdapter) events(ctx context.Context) (<-chan events.Message, <-chan struct{}, error) {
// TODO(stevvooe): Move this to a single, global event dispatch. For
// now, we create a connection per container.
var (
eventsq = make(chan events.Message)
closed = make(chan struct{})
)
log.G(ctx).Debugf("waiting on events")
// TODO(stevvooe): For long running tasks, it is likely that we will have
// to restart this under failure.
eventCh, errCh := c.client.Events(ctx, types.EventsOptions{
Since: "0",
Filters: c.container.eventFilter(),
})
go func() {
defer close(closed)
for {
select {
case msg := <-eventCh:
select {
case eventsq <- msg:
case <-ctx.Done():
return
}
case err := <-errCh:
log.G(ctx).WithError(err).Error("error from events stream")
return
case <-ctx.Done():
// exit
return
}
}
}()
return eventsq, closed, nil
}
func (c *containerAdapter) shutdown(ctx context.Context) error {
// Default stop grace period to 10s.
// TODO(thaJeztah): this should probably not set a default and leave it to the daemon to pick
// a default (which could be in the container's config or daemon config)
// see https://github.com/docker/cli/commit/86c30e6a0d153c2a99d9d12385179b22e8b7b935
stopgraceSeconds := 10
spec := c.container.spec()
if spec.StopGracePeriod != nil {
stopgraceFromProto, _ := gogotypes.DurationFromProto(spec.StopGracePeriod)
stopgraceSeconds = int(stopgraceFromProto.Seconds())
}
return c.client.ContainerStop(ctx, c.container.name(), container.StopOptions{Timeout: &stopgraceSeconds})
}
func (c *containerAdapter) terminate(ctx context.Context) error {
return c.client.ContainerKill(ctx, c.container.name(), "")
}
func (c *containerAdapter) remove(ctx context.Context) error {
return c.client.ContainerRemove(ctx, c.container.name(), types.ContainerRemoveOptions{
RemoveVolumes: true,
Force: true,
})
}
func (c *containerAdapter) createVolumes(ctx context.Context) error {
// Create plugin volumes that are embedded inside a Mount
for _, mount := range c.container.spec().Mounts {
if mount.Type != api.MountTypeVolume {
continue
}
// we create volumes when there is a volume driver available volume options
if mount.VolumeOptions == nil {
continue
}
if mount.VolumeOptions.DriverConfig == nil {
continue
}
req := c.container.volumeCreateRequest(&mount)
if _, err := c.client.VolumeCreate(ctx, *req); err != nil {
// TODO(amitshukla): Today, volume create through the engine api does not return an error
// when the named volume with the same parameters already exists.
// It returns an error if the driver name is different - that is a valid error
return err
}
}
return nil
}
func (c *containerAdapter) logs(ctx context.Context, options api.LogSubscriptionOptions) (io.ReadCloser, error) {
conf := c.container.config()
if conf != nil && conf.Tty {
return nil, errors.New("logs not supported on services with TTY")
}
apiOptions := types.ContainerLogsOptions{
Follow: options.Follow,
Timestamps: true,
Details: false,
}
if options.Since != nil {
since, err := gogotypes.TimestampFromProto(options.Since)
if err != nil {
return nil, err
}
apiOptions.Since = fmt.Sprintf("%d.%09d", since.Unix(), int64(since.Nanosecond()))
}
if options.Tail < 0 {
// See protobuf documentation for details of how this works.
apiOptions.Tail = fmt.Sprint(-options.Tail - 1)
} else if options.Tail > 0 {
return nil, fmt.Errorf("tail relative to start of logs not supported via docker API")
}
if len(options.Streams) == 0 {
// empty == all
apiOptions.ShowStdout, apiOptions.ShowStderr = true, true
} else {
for _, stream := range options.Streams {
switch stream {
case api.LogStreamStdout:
apiOptions.ShowStdout = true
case api.LogStreamStderr:
apiOptions.ShowStderr = true
}
}
}
return c.client.ContainerLogs(ctx, c.container.name(), apiOptions)
}
// TODO(mrjana/stevvooe): There is no proper error code for network not found
// error in engine-api. Resort to string matching until engine-api is fixed.
func isActiveEndpointError(err error) bool {
return strings.Contains(err.Error(), "has active endpoints")
}
func isNetworkExistError(err error, name string) bool {
return strings.Contains(err.Error(), fmt.Sprintf("network with name %s already exists", name))
}
func isContainerCreateNameConflict(err error) bool {
return strings.Contains(err.Error(), "Conflict. The name")
}
func isUnknownContainer(err error) bool {
return strings.Contains(err.Error(), "No such container:")
}
func isStoppedContainer(err error) bool {
return strings.Contains(err.Error(), "is already stopped")
}