dotcloud/docker

View on GitHub
daemon/containerd/image_import.go

Summary

Maintainability
B
6 hrs
Test Coverage
package containerd

import (
    "bufio"
    "bytes"
    "context"
    "encoding/json"
    "fmt"
    "io"
    "time"

    "github.com/containerd/containerd/content"
    "github.com/containerd/containerd/images"
    "github.com/containerd/containerd/platforms"
    cerrdefs "github.com/containerd/errdefs"
    "github.com/containerd/log"
    "github.com/distribution/reference"
    "github.com/docker/docker/api/types/container"
    "github.com/docker/docker/api/types/events"
    "github.com/docker/docker/builder/dockerfile"
    "github.com/docker/docker/errdefs"
    "github.com/docker/docker/image"
    "github.com/docker/docker/pkg/archive"
    "github.com/docker/docker/pkg/pools"
    "github.com/google/uuid"
    imagespec "github.com/moby/docker-image-spec/specs-go/v1"
    "github.com/opencontainers/go-digest"
    "github.com/opencontainers/image-spec/specs-go"
    ocispec "github.com/opencontainers/image-spec/specs-go/v1"
    "github.com/pkg/errors"
)

// ImportImage imports an image, getting the archived layer data from layerReader.
// Layer archive is imported as-is if the compression is gzip or zstd.
// Uncompressed, xz and bzip2 archives are recompressed into gzip.
// The image is tagged with the given reference.
// If the platform is nil, the default host platform is used.
// The message is used as the history comment.
// Image configuration is derived from the dockerfile instructions in changes.
func (i *ImageService) ImportImage(ctx context.Context, ref reference.Named, platform *ocispec.Platform, msg string, layerReader io.Reader, changes []string) (image.ID, error) {
    refString := ""
    if ref != nil {
        refString = ref.String()
    }
    logger := log.G(ctx).WithField("ref", refString)

    ctx, release, err := i.client.WithLease(ctx)
    if err != nil {
        return "", errdefs.System(err)
    }
    defer func() {
        if err := release(context.WithoutCancel(ctx)); err != nil {
            logger.WithError(err).Warn("failed to release lease created for import")
        }
    }()

    if platform == nil {
        def := platforms.DefaultSpec()
        platform = &def
    }

    imageConfig, err := dockerfile.BuildFromConfig(ctx, &container.Config{}, changes, platform.OS)
    if err != nil {
        logger.WithError(err).Debug("failed to process changes")
        return "", errdefs.InvalidParameter(err)
    }

    cs := i.content

    compressedDigest, uncompressedDigest, mt, err := saveArchive(ctx, cs, layerReader)
    if err != nil {
        logger.WithError(err).Debug("failed to write layer blob")
        return "", err
    }
    logger = logger.WithFields(log.Fields{
        "compressedDigest":   compressedDigest,
        "uncompressedDigest": uncompressedDigest,
    })

    size, err := fillUncompressedLabel(ctx, cs, compressedDigest, uncompressedDigest)
    if err != nil {
        logger.WithError(err).Debug("failed to set uncompressed label on the compressed blob")
        return "", err
    }

    compressedRootfsDesc := ocispec.Descriptor{
        MediaType: mt,
        Digest:    compressedDigest,
        Size:      size,
    }

    dockerCfg := containerConfigToDockerOCIImageConfig(imageConfig)
    createdAt := time.Now()
    config := imagespec.DockerOCIImage{
        Image: ocispec.Image{
            Platform: *platform,
            Created:  &createdAt,
            Author:   "",
            RootFS: ocispec.RootFS{
                Type:    "layers",
                DiffIDs: []digest.Digest{uncompressedDigest},
            },
            History: []ocispec.History{
                {
                    Created:    &createdAt,
                    CreatedBy:  "",
                    Author:     "",
                    Comment:    msg,
                    EmptyLayer: false,
                },
            },
        },
        Config: dockerCfg,
    }
    configDesc, err := storeJson(ctx, cs, ocispec.MediaTypeImageConfig, config, nil)
    if err != nil {
        return "", err
    }

    manifest := ocispec.Manifest{
        MediaType: ocispec.MediaTypeImageManifest,
        Versioned: specs.Versioned{
            SchemaVersion: 2,
        },
        Config: configDesc,
        Layers: []ocispec.Descriptor{
            compressedRootfsDesc,
        },
    }
    manifestDesc, err := storeJson(ctx, cs, ocispec.MediaTypeImageManifest, manifest, map[string]string{
        "containerd.io/gc.ref.content.config": configDesc.Digest.String(),
        "containerd.io/gc.ref.content.l.0":    compressedDigest.String(),
    })
    if err != nil {
        return "", err
    }

    id := image.ID(manifestDesc.Digest.String())
    img := images.Image{
        Name:      refString,
        Target:    manifestDesc,
        CreatedAt: createdAt,
    }
    if img.Name == "" {
        img.Name = danglingImageName(manifestDesc.Digest)
    }

    err = i.saveImage(ctx, img)
    if err != nil {
        logger.WithError(err).Debug("failed to save image")
        return "", err
    }

    err = i.unpackImage(ctx, i.StorageDriver(), img, manifestDesc)
    if err != nil {
        logger.WithError(err).Debug("failed to unpack image")
    } else {
        i.LogImageEvent(id.String(), id.String(), events.ActionImport)
    }

    return id, err
}

// saveArchive saves the archive from bufRd to the content store, compressing it if necessary.
// Returns compressed blob digest, digest of the uncompressed data and media type of the stored blob.
func saveArchive(ctx context.Context, cs content.Store, layerReader io.Reader) (digest.Digest, digest.Digest, string, error) {
    // Wrap the reader in buffered reader to allow peeks.
    p := pools.BufioReader32KPool
    bufRd := p.Get(layerReader)
    defer p.Put(bufRd)

    compression, err := detectCompression(bufRd)
    if err != nil {
        return "", "", "", err
    }

    var uncompressedReader io.Reader = bufRd
    switch compression {
    case archive.Gzip, archive.Zstd:
        // If the input is already a compressed layer, just save it as is.
        mediaType := ocispec.MediaTypeImageLayerGzip
        if compression == archive.Zstd {
            mediaType = ocispec.MediaTypeImageLayerZstd
        }

        compressedDigest, uncompressedDigest, err := writeCompressedBlob(ctx, cs, mediaType, bufRd)
        if err != nil {
            return "", "", "", err
        }

        return compressedDigest, uncompressedDigest, mediaType, nil
    case archive.Bzip2, archive.Xz:
        r, err := archive.DecompressStream(bufRd)
        if err != nil {
            return "", "", "", errdefs.InvalidParameter(err)
        }
        defer r.Close()
        uncompressedReader = r
        fallthrough
    case archive.Uncompressed:
        mediaType := ocispec.MediaTypeImageLayerGzip
        compression := archive.Gzip

        compressedDigest, uncompressedDigest, err := compressAndWriteBlob(ctx, cs, compression, mediaType, uncompressedReader)
        if err != nil {
            return "", "", "", err
        }

        return compressedDigest, uncompressedDigest, mediaType, nil
    }

    return "", "", "", errdefs.InvalidParameter(errors.New("unsupported archive compression"))
}

// writeCompressedBlob writes the blob and simultaneously computes the digest of the uncompressed data.
func writeCompressedBlob(ctx context.Context, cs content.Store, mediaType string, bufRd *bufio.Reader) (digest.Digest, digest.Digest, error) {
    pr, pw := io.Pipe()
    defer pw.Close()
    defer pr.Close()

    c := make(chan digest.Digest)
    // Start copying the blob to the content store from the pipe and tee it to the pipe.
    go func() {
        compressedDigest, err := writeBlobAndReturnDigest(ctx, cs, mediaType, io.TeeReader(bufRd, pw))
        pw.CloseWithError(err)
        c <- compressedDigest
    }()

    digester := digest.Canonical.Digester()

    // Decompress the piped blob.
    decompressedStream, err := archive.DecompressStream(pr)
    if err == nil {
        // Feed the digester with decompressed data.
        _, err = io.Copy(digester.Hash(), decompressedStream)
        decompressedStream.Close()
    }
    pr.CloseWithError(err)

    compressedDigest := <-c
    if err != nil {
        if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
            return "", "", errdefs.Cancelled(err)
        }
        return "", "", errdefs.System(err)
    }

    uncompressedDigest := digester.Digest()
    return compressedDigest, uncompressedDigest, nil
}

// compressAndWriteBlob compresses the uncompressedReader and stores it in the content store.
func compressAndWriteBlob(ctx context.Context, cs content.Store, compression archive.Compression, mediaType string, uncompressedLayerReader io.Reader) (digest.Digest, digest.Digest, error) {
    pr, pw := io.Pipe()
    defer pr.Close()
    defer pw.Close()

    compressor, err := archive.CompressStream(pw, compression)
    if err != nil {
        return "", "", errdefs.InvalidParameter(err)
    }

    writeChan := make(chan digest.Digest)
    // Start copying the blob to the content store from the pipe.
    go func() {
        dgst, err := writeBlobAndReturnDigest(ctx, cs, mediaType, pr)
        pr.CloseWithError(err)
        writeChan <- dgst
    }()

    // Copy archive to the pipe and tee it to a digester.
    // This will feed the pipe the above goroutine is reading from.
    uncompressedDigester := digest.Canonical.Digester()
    readFromInputAndDigest := io.TeeReader(uncompressedLayerReader, uncompressedDigester.Hash())
    _, err = io.Copy(compressor, readFromInputAndDigest)
    compressor.Close()
    pw.CloseWithError(err)

    compressedDigest := <-writeChan
    if err != nil {
        if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
            return "", "", errdefs.Cancelled(err)
        }
        return "", "", errdefs.System(err)
    }

    return compressedDigest, uncompressedDigester.Digest(), err
}

// writeBlobAndReturnDigest writes a blob to the content store and returns the digest.
func writeBlobAndReturnDigest(ctx context.Context, cs content.Store, mt string, reader io.Reader) (digest.Digest, error) {
    digester := digest.Canonical.Digester()
    if err := content.WriteBlob(ctx, cs, uuid.New().String(), io.TeeReader(reader, digester.Hash()), ocispec.Descriptor{MediaType: mt}); err != nil {
        return "", errdefs.System(err)
    }
    return digester.Digest(), nil
}

// saveImage creates an image in the ImageService or updates it if it exists.
func (i *ImageService) saveImage(ctx context.Context, img images.Image) error {
    if _, err := i.images.Update(ctx, img); err != nil {
        if cerrdefs.IsNotFound(err) {
            if _, err := i.images.Create(ctx, img); err != nil {
                return errdefs.Unknown(err)
            }
        } else {
            return errdefs.Unknown(err)
        }
    }

    return nil
}

// unpackImage unpacks the platform-specific manifest of a image into the snapshotter.
func (i *ImageService) unpackImage(ctx context.Context, snapshotter string, img images.Image, manifestDesc ocispec.Descriptor) error {
    c8dImg, err := i.NewImageManifest(ctx, img, manifestDesc)
    if err != nil {
        return err
    }

    if err := c8dImg.Unpack(ctx, snapshotter); err != nil {
        if !cerrdefs.IsAlreadyExists(err) {
            return errdefs.System(fmt.Errorf("failed to unpack image: %w", err))
        }
    }

    return nil
}

// detectCompression dectects the reader compression type.
func detectCompression(bufRd *bufio.Reader) (archive.Compression, error) {
    bs, err := bufRd.Peek(10)
    if err != nil && err != io.EOF {
        // Note: we'll ignore any io.EOF error because there are some odd
        // cases where the layer.tar file will be empty (zero bytes) and
        // that results in an io.EOF from the Peek() call. So, in those
        // cases we'll just treat it as a non-compressed stream and
        // that means just create an empty layer.
        // See Issue 18170
        return archive.Uncompressed, errdefs.Unknown(err)
    }

    return archive.DetectCompression(bs), nil
}

// fillUncompressedLabel sets the uncompressed digest label on the compressed blob metadata
// and returns the compressed blob size.
func fillUncompressedLabel(ctx context.Context, cs content.Store, compressedDigest digest.Digest, uncompressedDigest digest.Digest) (int64, error) {
    info, err := cs.Info(ctx, compressedDigest)
    if err != nil {
        return 0, errdefs.Unknown(errors.Wrapf(err, "couldn't open previously written blob"))
    }
    size := info.Size
    info.Labels = map[string]string{"containerd.io/uncompressed": uncompressedDigest.String()}

    _, err = cs.Update(ctx, info, "labels.*")
    if err != nil {
        return 0, errdefs.System(errors.Wrapf(err, "couldn't set uncompressed label"))
    }
    return size, nil
}

// storeJson marshals the provided object as json and stores it.
func storeJson(ctx context.Context, cs content.Ingester, mt string, obj interface{}, labels map[string]string) (ocispec.Descriptor, error) {
    configData, err := json.Marshal(obj)
    if err != nil {
        return ocispec.Descriptor{}, errdefs.InvalidParameter(err)
    }
    configDigest := digest.FromBytes(configData)
    if err != nil {
        return ocispec.Descriptor{}, errdefs.InvalidParameter(err)
    }
    desc := ocispec.Descriptor{
        MediaType: mt,
        Digest:    configDigest,
        Size:      int64(len(configData)),
    }

    var opts []content.Opt
    if labels != nil {
        opts = append(opts, content.WithLabels(labels))
    }

    err = content.WriteBlob(ctx, cs, configDigest.String(), bytes.NewReader(configData), desc, opts...)
    if err != nil {
        return ocispec.Descriptor{}, errdefs.System(err)
    }
    return desc, nil
}