libcontainerd/remote/client.go
package remote // import "github.com/docker/docker/libcontainerd/remote"
import (
"context"
"encoding/json"
"io"
"os"
"path/filepath"
"reflect"
"runtime"
"strings"
"sync"
"syscall"
"time"
"github.com/containerd/containerd"
apievents "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/archive"
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/protobuf"
v2runcoptions "github.com/containerd/containerd/runtime/v2/runc/options"
cerrdefs "github.com/containerd/errdefs"
"github.com/containerd/log"
"github.com/containerd/typeurl/v2"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/libcontainerd/queue"
libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
"github.com/docker/docker/pkg/ioutils"
"github.com/hashicorp/go-multierror"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"go.opentelemetry.io/otel"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)
// DockerContainerBundlePath is the label key pointing to the container's bundle path
const DockerContainerBundlePath = "com.docker/engine.bundle.path"
type client struct {
client *containerd.Client
stateDir string
logger *log.Entry
ns string
backend libcontainerdtypes.Backend
eventQ queue.Queue
}
type container struct {
client *client
c8dCtr containerd.Container
v2runcoptions *v2runcoptions.Options
}
type task struct {
containerd.Task
ctr *container
}
type process struct {
containerd.Process
}
// NewClient creates a new libcontainerd client from a containerd client
func NewClient(ctx context.Context, cli *containerd.Client, stateDir, ns string, b libcontainerdtypes.Backend) (libcontainerdtypes.Client, error) {
c := &client{
client: cli,
stateDir: stateDir,
logger: log.G(ctx).WithField("module", "libcontainerd").WithField("namespace", ns),
ns: ns,
backend: b,
}
go c.processEventStream(ctx, ns)
return c, nil
}
func (c *client) Version(ctx context.Context) (containerd.Version, error) {
return c.client.Version(ctx)
}
func (c *container) newTask(t containerd.Task) *task {
return &task{Task: t, ctr: c}
}
func (c *container) AttachTask(ctx context.Context, attachStdio libcontainerdtypes.StdioCallback) (_ libcontainerdtypes.Task, err error) {
var dio *cio.DirectIO
defer func() {
if err != nil && dio != nil {
dio.Cancel()
dio.Close()
}
}()
attachIO := func(fifos *cio.FIFOSet) (cio.IO, error) {
// dio must be assigned to the previously defined dio for the defer above
// to handle cleanup
dio, err = c.client.newDirectIO(ctx, fifos)
if err != nil {
return nil, err
}
return attachStdio(dio)
}
t, err := c.c8dCtr.Task(ctx, attachIO)
if err != nil {
return nil, errors.Wrap(wrapError(err), "error getting containerd task for container")
}
return c.newTask(t), nil
}
func (c *client) NewContainer(ctx context.Context, id string, ociSpec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) (libcontainerdtypes.Container, error) {
bdir := c.bundleDir(id)
c.logger.WithField("bundle", bdir).WithField("root", ociSpec.Root.Path).Debug("bundle dir created")
newOpts := []containerd.NewContainerOpts{
containerd.WithSpec(ociSpec),
containerd.WithRuntime(shim, runtimeOptions),
WithBundle(bdir, ociSpec),
}
opts = append(opts, newOpts...)
ctr, err := c.client.NewContainer(ctx, id, opts...)
if err != nil {
if cerrdefs.IsAlreadyExists(err) {
return nil, errors.WithStack(errdefs.Conflict(errors.New("id already in use")))
}
return nil, wrapError(err)
}
created := container{
client: c,
c8dCtr: ctr,
}
if x, ok := runtimeOptions.(*v2runcoptions.Options); ok {
created.v2runcoptions = x
}
return &created, nil
}
// NewTask creates a task for the specified containerd id
func (c *container) NewTask(ctx context.Context, checkpointDir string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (libcontainerdtypes.Task, error) {
var (
checkpoint *types.Descriptor
t containerd.Task
rio cio.IO
stdinCloseSync = make(chan containerd.Process, 1)
)
ctx, span := otel.Tracer("").Start(ctx, "libcontainerd.remote.NewTask")
defer span.End()
if checkpointDir != "" {
// write checkpoint to the content store
tar := archive.Diff(ctx, "", checkpointDir)
var err error
checkpoint, err = c.client.writeContent(ctx, images.MediaTypeContainerd1Checkpoint, checkpointDir, tar)
// remove the checkpoint when we're done
defer func() {
if checkpoint != nil {
err := c.client.client.ContentStore().Delete(ctx, digest.Digest(checkpoint.Digest))
if err != nil {
c.client.logger.WithError(err).WithFields(log.Fields{
"ref": checkpointDir,
"digest": checkpoint.Digest,
}).Warnf("failed to delete temporary checkpoint entry")
}
}
}()
if err := tar.Close(); err != nil {
return nil, errors.Wrap(err, "failed to close checkpoint tar stream")
}
if err != nil {
return nil, errors.Wrapf(err, "failed to upload checkpoint to containerd")
}
}
// Optimization: assume the relevant metadata has not changed in the
// moment since the container was created. Elide redundant RPC requests
// to refresh the metadata separately for spec and labels.
md, err := c.c8dCtr.Info(ctx, containerd.WithoutRefreshedMetadata)
if err != nil {
return nil, errors.Wrap(err, "failed to retrieve metadata")
}
bundle := md.Labels[DockerContainerBundlePath]
var spec specs.Spec
if err := json.Unmarshal(md.Spec.GetValue(), &spec); err != nil {
return nil, errors.Wrap(err, "failed to retrieve spec")
}
uid, gid := getSpecUser(&spec)
taskOpts := []containerd.NewTaskOpts{
func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error {
info.Checkpoint = checkpoint
return nil
},
}
if runtime.GOOS != "windows" {
taskOpts = append(taskOpts, func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error {
if c.v2runcoptions != nil {
opts := proto.Clone(c.v2runcoptions).(*v2runcoptions.Options)
opts.IoUid = uint32(uid)
opts.IoGid = uint32(gid)
info.Options = opts
}
return nil
})
} else {
taskOpts = append(taskOpts, withLogLevel(c.client.logger.Level))
}
t, err = c.c8dCtr.NewTask(ctx,
func(id string) (cio.IO, error) {
fifos := newFIFOSet(bundle, id, withStdin, spec.Process.Terminal)
rio, err = c.createIO(fifos, stdinCloseSync, attachStdio)
return rio, err
},
taskOpts...,
)
if err != nil {
close(stdinCloseSync)
if rio != nil {
rio.Cancel()
rio.Close()
}
return nil, errors.Wrap(wrapError(err), "failed to create task for container")
}
// Signal c.createIO that it can call CloseIO
stdinCloseSync <- t
return c.newTask(t), nil
}
func (t *task) Start(ctx context.Context) error {
ctx, span := otel.Tracer("").Start(ctx, "libcontainerd.remote.task.Start")
defer span.End()
return wrapError(t.Task.Start(ctx))
}
// Exec creates exec process.
//
// The containerd client calls Exec to register the exec config in the shim side.
// When the client calls Start, the shim will create stdin fifo if needs. But
// for the container main process, the stdin fifo will be created in Create not
// the Start call. stdinCloseSync channel should be closed after Start exec
// process.
func (t *task) Exec(ctx context.Context, processID string, spec *specs.Process, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (libcontainerdtypes.Process, error) {
var (
p containerd.Process
rio cio.IO
stdinCloseSync = make(chan containerd.Process, 1)
)
// Optimization: assume the DockerContainerBundlePath label has not been
// updated since the container metadata was last loaded/refreshed.
md, err := t.ctr.c8dCtr.Info(ctx, containerd.WithoutRefreshedMetadata)
if err != nil {
return nil, wrapError(err)
}
fifos := newFIFOSet(md.Labels[DockerContainerBundlePath], processID, withStdin, spec.Terminal)
defer func() {
if err != nil {
if rio != nil {
rio.Cancel()
rio.Close()
}
}
}()
p, err = t.Task.Exec(ctx, processID, spec, func(id string) (cio.IO, error) {
rio, err = t.ctr.createIO(fifos, stdinCloseSync, attachStdio)
return rio, err
})
if err != nil {
close(stdinCloseSync)
if cerrdefs.IsAlreadyExists(err) {
return nil, errors.WithStack(errdefs.Conflict(errors.New("id already in use")))
}
return nil, wrapError(err)
}
// Signal c.createIO that it can call CloseIO
//
// the stdin of exec process will be created after p.Start in containerd
defer func() { stdinCloseSync <- p }()
if err = p.Start(ctx); err != nil {
// use new context for cleanup because old one may be cancelled by user, but leave a timeout to make sure
// we are not waiting forever if containerd is unresponsive or to work around fifo cancelling issues in
// older containerd-shim
ctx, cancel := context.WithTimeout(context.Background(), 45*time.Second)
defer cancel()
p.Delete(ctx)
return nil, wrapError(err)
}
return process{p}, nil
}
func (t *task) Kill(ctx context.Context, signal syscall.Signal) error {
return wrapError(t.Task.Kill(ctx, signal))
}
func (p process) Kill(ctx context.Context, signal syscall.Signal) error {
return wrapError(p.Process.Kill(ctx, signal))
}
func (t *task) Pause(ctx context.Context) error {
return wrapError(t.Task.Pause(ctx))
}
func (t *task) Resume(ctx context.Context) error {
return wrapError(t.Task.Resume(ctx))
}
func (t *task) Stats(ctx context.Context) (*libcontainerdtypes.Stats, error) {
m, err := t.Metrics(ctx)
if err != nil {
return nil, err
}
v, err := typeurl.UnmarshalAny(m.Data)
if err != nil {
return nil, err
}
return libcontainerdtypes.InterfaceToStats(protobuf.FromTimestamp(m.Timestamp), v), nil
}
func (t *task) Summary(ctx context.Context) ([]libcontainerdtypes.Summary, error) {
pis, err := t.Pids(ctx)
if err != nil {
return nil, err
}
var infos []libcontainerdtypes.Summary
for _, pi := range pis {
i, err := typeurl.UnmarshalAny(pi.Info)
if err != nil {
return nil, errors.Wrap(err, "unable to decode process details")
}
s, err := summaryFromInterface(i)
if err != nil {
return nil, err
}
infos = append(infos, *s)
}
return infos, nil
}
func (t *task) Delete(ctx context.Context) (*containerd.ExitStatus, error) {
s, err := t.Task.Delete(ctx)
return s, wrapError(err)
}
func (p process) Delete(ctx context.Context) (*containerd.ExitStatus, error) {
s, err := p.Process.Delete(ctx)
return s, wrapError(err)
}
func (c *container) Delete(ctx context.Context) error {
// Optimization: assume the DockerContainerBundlePath label has not been
// updated since the container metadata was last loaded/refreshed.
md, err := c.c8dCtr.Info(ctx, containerd.WithoutRefreshedMetadata)
if err != nil {
return err
}
bundle := md.Labels[DockerContainerBundlePath]
if err := c.c8dCtr.Delete(ctx); err != nil {
return wrapError(err)
}
if os.Getenv("LIBCONTAINERD_NOCLEAN") != "1" {
if err := os.RemoveAll(bundle); err != nil {
c.client.logger.WithContext(ctx).WithError(err).WithFields(log.Fields{
"container": c.c8dCtr.ID(),
"bundle": bundle,
}).Error("failed to remove state dir")
}
}
return nil
}
func (t *task) ForceDelete(ctx context.Context) error {
_, err := t.Task.Delete(ctx, containerd.WithProcessKill)
return wrapError(err)
}
func (t *task) Status(ctx context.Context) (containerd.Status, error) {
s, err := t.Task.Status(ctx)
return s, wrapError(err)
}
func (p process) Status(ctx context.Context) (containerd.Status, error) {
s, err := p.Process.Status(ctx)
return s, wrapError(err)
}
func (c *container) getCheckpointOptions(exit bool) containerd.CheckpointTaskOpts {
return func(r *containerd.CheckpointTaskInfo) error {
if r.Options == nil && c.v2runcoptions != nil {
r.Options = &v2runcoptions.CheckpointOptions{}
}
switch opts := r.Options.(type) {
case *v2runcoptions.CheckpointOptions:
opts.Exit = exit
}
return nil
}
}
func (t *task) CreateCheckpoint(ctx context.Context, checkpointDir string, exit bool) error {
img, err := t.Task.Checkpoint(ctx, t.ctr.getCheckpointOptions(exit))
if err != nil {
return wrapError(err)
}
// Whatever happens, delete the checkpoint from containerd
defer func() {
err := t.ctr.client.client.ImageService().Delete(ctx, img.Name())
if err != nil {
t.ctr.client.logger.WithError(err).WithField("digest", img.Target().Digest).
Warnf("failed to delete checkpoint image")
}
}()
b, err := content.ReadBlob(ctx, t.ctr.client.client.ContentStore(), img.Target())
if err != nil {
return errdefs.System(errors.Wrapf(err, "failed to retrieve checkpoint data"))
}
var index ocispec.Index
if err := json.Unmarshal(b, &index); err != nil {
return errdefs.System(errors.Wrapf(err, "failed to decode checkpoint data"))
}
var cpDesc *ocispec.Descriptor
for _, m := range index.Manifests {
m := m
if m.MediaType == images.MediaTypeContainerd1Checkpoint {
cpDesc = &m //nolint:gosec
break
}
}
if cpDesc == nil {
return errdefs.System(errors.Wrapf(err, "invalid checkpoint"))
}
rat, err := t.ctr.client.client.ContentStore().ReaderAt(ctx, *cpDesc)
if err != nil {
return errdefs.System(errors.Wrapf(err, "failed to get checkpoint reader"))
}
defer rat.Close()
_, err = archive.Apply(ctx, checkpointDir, content.NewReader(rat))
if err != nil {
return errdefs.System(errors.Wrapf(err, "failed to read checkpoint reader"))
}
return err
}
// LoadContainer loads the containerd container.
func (c *client) LoadContainer(ctx context.Context, id string) (libcontainerdtypes.Container, error) {
ctr, err := c.client.LoadContainer(ctx, id)
if err != nil {
if cerrdefs.IsNotFound(err) {
return nil, errors.WithStack(errdefs.NotFound(errors.New("no such container")))
}
return nil, wrapError(err)
}
return &container{client: c, c8dCtr: ctr}, nil
}
func (c *container) Task(ctx context.Context) (libcontainerdtypes.Task, error) {
t, err := c.c8dCtr.Task(ctx, nil)
if err != nil {
return nil, wrapError(err)
}
return c.newTask(t), nil
}
// createIO creates the io to be used by a process
// This needs to get a pointer to interface as upon closure the process may not have yet been registered
func (c *container) createIO(fifos *cio.FIFOSet, stdinCloseSync chan containerd.Process, attachStdio libcontainerdtypes.StdioCallback) (cio.IO, error) {
var (
io *cio.DirectIO
err error
)
io, err = c.client.newDirectIO(context.Background(), fifos)
if err != nil {
return nil, err
}
if io.Stdin != nil {
var (
closeErr error
stdinOnce sync.Once
)
pipe := io.Stdin
io.Stdin = ioutils.NewWriteCloserWrapper(pipe, func() error {
stdinOnce.Do(func() {
closeErr = pipe.Close()
select {
case p, ok := <-stdinCloseSync:
if !ok {
return
}
if err := closeStdin(context.Background(), p); err != nil {
if closeErr != nil {
closeErr = multierror.Append(closeErr, err)
} else {
// Avoid wrapping a single error in a multierror.
closeErr = err
}
}
default:
// The process wasn't ready. Close its stdin asynchronously.
go func() {
p, ok := <-stdinCloseSync
if !ok {
return
}
if err := closeStdin(context.Background(), p); err != nil {
c.client.logger.WithError(err).
WithField("container", c.c8dCtr.ID()).
Error("failed to close container stdin")
}
}()
}
})
return closeErr
})
}
rio, err := attachStdio(io)
if err != nil {
io.Cancel()
io.Close()
}
return rio, err
}
func closeStdin(ctx context.Context, p containerd.Process) error {
err := p.CloseIO(ctx, containerd.WithStdinCloser)
if err != nil && strings.Contains(err.Error(), "transport is closing") {
err = nil
}
return err
}
func (c *client) processEvent(ctx context.Context, et libcontainerdtypes.EventType, ei libcontainerdtypes.EventInfo) {
c.eventQ.Append(ei.ContainerID, func() {
err := c.backend.ProcessEvent(ei.ContainerID, et, ei)
if err != nil {
c.logger.WithContext(ctx).WithError(err).WithFields(log.Fields{
"container": ei.ContainerID,
"event": et,
"event-info": ei,
}).Error("failed to process event")
}
})
}
func (c *client) waitServe(ctx context.Context) bool {
t := 100 * time.Millisecond
delay := time.NewTimer(t)
if !delay.Stop() {
<-delay.C
}
defer delay.Stop()
// `IsServing` will actually block until the service is ready.
// However it can return early, so we'll loop with a delay to handle it.
for {
serving, err := c.client.IsServing(ctx)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
return false
}
log.G(ctx).WithError(err).Warn("Error while testing if containerd API is ready")
}
if serving {
return true
}
delay.Reset(t)
select {
case <-ctx.Done():
return false
case <-delay.C:
}
}
}
func (c *client) processEventStream(ctx context.Context, ns string) {
// Create a new context specifically for this subscription.
// The context must be cancelled to cancel the subscription.
// In cases where we have to restart event stream processing,
// we'll need the original context b/c this one will be cancelled
subCtx, cancel := context.WithCancel(ctx)
defer cancel()
// Filter on both namespace *and* topic. To create an "and" filter,
// this must be a single, comma-separated string
eventStream, errC := c.client.EventService().Subscribe(subCtx, "namespace=="+ns+",topic~=|^/tasks/|")
c.logger.Debug("processing event stream")
for {
select {
case err := <-errC:
if err != nil {
errStatus, ok := status.FromError(err)
if !ok || errStatus.Code() != codes.Canceled {
c.logger.WithError(err).Error("Failed to get event")
c.logger.Info("Waiting for containerd to be ready to restart event processing")
if c.waitServe(ctx) {
go c.processEventStream(ctx, ns)
return
}
}
c.logger.WithError(ctx.Err()).Info("stopping event stream following graceful shutdown")
}
return
case ev := <-eventStream:
if ev.Event == nil {
c.logger.WithField("event", ev).Warn("invalid event")
continue
}
v, err := typeurl.UnmarshalAny(ev.Event)
if err != nil {
c.logger.WithError(err).WithField("event", ev).Warn("failed to unmarshal event")
continue
}
c.logger.WithField("topic", ev.Topic).Debug("event")
switch t := v.(type) {
case *apievents.TaskCreate:
c.processEvent(ctx, libcontainerdtypes.EventCreate, libcontainerdtypes.EventInfo{
ContainerID: t.ContainerID,
ProcessID: t.ContainerID,
Pid: t.Pid,
})
case *apievents.TaskStart:
c.processEvent(ctx, libcontainerdtypes.EventStart, libcontainerdtypes.EventInfo{
ContainerID: t.ContainerID,
ProcessID: t.ContainerID,
Pid: t.Pid,
})
case *apievents.TaskExit:
c.processEvent(ctx, libcontainerdtypes.EventExit, libcontainerdtypes.EventInfo{
ContainerID: t.ContainerID,
ProcessID: t.ID,
Pid: t.Pid,
ExitCode: t.ExitStatus,
ExitedAt: protobuf.FromTimestamp(t.ExitedAt),
})
case *apievents.TaskOOM:
c.processEvent(ctx, libcontainerdtypes.EventOOM, libcontainerdtypes.EventInfo{
ContainerID: t.ContainerID,
})
case *apievents.TaskExecAdded:
c.processEvent(ctx, libcontainerdtypes.EventExecAdded, libcontainerdtypes.EventInfo{
ContainerID: t.ContainerID,
ProcessID: t.ExecID,
})
case *apievents.TaskExecStarted:
c.processEvent(ctx, libcontainerdtypes.EventExecStarted, libcontainerdtypes.EventInfo{
ContainerID: t.ContainerID,
ProcessID: t.ExecID,
Pid: t.Pid,
})
case *apievents.TaskPaused:
c.processEvent(ctx, libcontainerdtypes.EventPaused, libcontainerdtypes.EventInfo{
ContainerID: t.ContainerID,
})
case *apievents.TaskResumed:
c.processEvent(ctx, libcontainerdtypes.EventResumed, libcontainerdtypes.EventInfo{
ContainerID: t.ContainerID,
})
case *apievents.TaskDelete:
c.logger.WithFields(log.Fields{
"topic": ev.Topic,
"type": reflect.TypeOf(t),
"container": t.ContainerID,
}).Info("ignoring event")
default:
c.logger.WithFields(log.Fields{
"topic": ev.Topic,
"type": reflect.TypeOf(t),
}).Info("ignoring event")
}
}
}
}
func (c *client) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) {
writer, err := c.client.ContentStore().Writer(ctx, content.WithRef(ref))
if err != nil {
return nil, err
}
defer writer.Close()
size, err := io.Copy(writer, r)
if err != nil {
return nil, err
}
labels := map[string]string{
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
}
if err := writer.Commit(ctx, 0, "", content.WithLabels(labels)); err != nil {
return nil, err
}
return &types.Descriptor{
MediaType: mediaType,
Digest: writer.Digest().String(),
Size: size,
}, nil
}
func (c *client) bundleDir(id string) string {
return filepath.Join(c.stateDir, id)
}
func wrapError(err error) error {
switch {
case err == nil:
return nil
case cerrdefs.IsNotFound(err):
return errdefs.NotFound(err)
}
msg := err.Error()
for _, s := range []string{"container does not exist", "not found", "no such container"} {
if strings.Contains(msg, s) {
return errdefs.NotFound(err)
}
}
return err
}