dotcloud/docker

View on GitHub
daemon/containerd/image_push.go

Summary

Maintainability
D
2 days
Test Coverage
package containerd

import (
    "context"
    "fmt"
    "io"
    "strings"
    "sync"
    "time"

    "github.com/containerd/containerd/content"
    "github.com/containerd/containerd/images"
    containerdimages "github.com/containerd/containerd/images"
    containerdlabels "github.com/containerd/containerd/labels"
    "github.com/containerd/containerd/platforms"
    "github.com/containerd/containerd/remotes"
    "github.com/containerd/containerd/remotes/docker"
    cerrdefs "github.com/containerd/errdefs"
    "github.com/containerd/log"
    "github.com/distribution/reference"
    "github.com/docker/docker/api/types/auxprogress"
    "github.com/docker/docker/api/types/events"
    "github.com/docker/docker/api/types/registry"
    dimages "github.com/docker/docker/daemon/images"
    "github.com/docker/docker/errdefs"
    "github.com/docker/docker/pkg/progress"
    "github.com/docker/docker/pkg/streamformatter"
    "github.com/opencontainers/go-digest"
    ocispec "github.com/opencontainers/image-spec/specs-go/v1"
    "github.com/pkg/errors"
    "golang.org/x/sync/semaphore"
)

// PushImage initiates a push operation of the image pointed to by sourceRef.
// If reference is untagged, all tags from the reference repository are pushed.
// Image manifest (or index) is pushed as is, which will probably fail if you
// don't have all content referenced by the index.
// Cross-repo mounts will be attempted for non-existing blobs.
//
// It will also add distribution source labels to the pushed content
// pointing to the new target repository. This will allow subsequent pushes
// to perform cross-repo mounts of the shared content when pushing to a different
// repository on the same registry.
func (i *ImageService) PushImage(ctx context.Context, sourceRef reference.Named, platform *ocispec.Platform, metaHeaders map[string][]string, authConfig *registry.AuthConfig, outStream io.Writer) (retErr error) {
    start := time.Now()
    defer func() {
        if retErr == nil {
            dimages.ImageActions.WithValues("push").UpdateSince(start)
        }
    }()
    out := streamformatter.NewJSONProgressOutput(outStream, false)
    progress.Messagef(out, "", "The push refers to repository [%s]", sourceRef.Name())

    if _, tagged := sourceRef.(reference.Tagged); !tagged {
        if _, digested := sourceRef.(reference.Digested); !digested {
            // Image is not tagged nor digested, that means all tags push was requested.

            // Find all images with the same repository.
            imgs, err := i.getAllImagesWithRepository(ctx, sourceRef)
            if err != nil {
                return err
            }

            if len(imgs) == 0 {
                return fmt.Errorf("An image does not exist locally with the tag: %s", reference.FamiliarName(sourceRef))
            }

            for _, img := range imgs {
                named, err := reference.ParseNamed(img.Name)
                if err != nil {
                    // This shouldn't happen, but log a warning just in case.
                    log.G(ctx).WithFields(log.Fields{
                        "image":     img.Name,
                        "sourceRef": sourceRef,
                    }).Warn("refusing to push an invalid tag")
                    continue
                }

                if err := i.pushRef(ctx, named, platform, metaHeaders, authConfig, out); err != nil {
                    return err
                }
            }

            return nil
        }
    }

    return i.pushRef(ctx, sourceRef, platform, metaHeaders, authConfig, out)
}

func (i *ImageService) pushRef(ctx context.Context, targetRef reference.Named, platform *ocispec.Platform, metaHeaders map[string][]string, authConfig *registry.AuthConfig, out progress.Output) (retErr error) {
    leasedCtx, release, err := i.client.WithLease(ctx)
    if err != nil {
        return err
    }
    defer func() {
        if err := release(context.WithoutCancel(leasedCtx)); err != nil {
            log.G(ctx).WithField("image", targetRef).WithError(err).Warn("failed to release lease created for push")
        }
    }()

    img, err := i.images.Get(ctx, targetRef.String())
    if err != nil {
        if cerrdefs.IsNotFound(err) {
            return errdefs.NotFound(fmt.Errorf("tag does not exist: %s", reference.FamiliarString(targetRef)))
        }
        return errdefs.System(err)
    }

    target := img.Target
    if platform != nil {
        target, err = i.getPushDescriptor(ctx, img, platform)
        if err != nil {
            return err
        }
    }

    store := i.content
    resolver, tracker := i.newResolverFromAuthConfig(ctx, authConfig, targetRef)
    pp := pushProgress{Tracker: tracker}
    jobsQueue := newJobs()
    finishProgress := jobsQueue.showProgress(ctx, out, combinedProgress([]progressUpdater{
        &pp,
        pullProgress{showExists: false, store: store},
    }))
    defer func() {
        finishProgress()
        if retErr == nil {
            if tagged, ok := targetRef.(reference.Tagged); ok {
                progress.Messagef(out, "", "%s: digest: %s size: %d", tagged.Tag(), target.Digest, target.Size)
            }
        }
    }()

    var limiter *semaphore.Weighted = nil // TODO: Respect max concurrent downloads/uploads

    mountableBlobs, err := findMissingMountable(ctx, store, jobsQueue, target, targetRef, limiter)
    if err != nil {
        return err
    }

    // Create a store which fakes the local existence of possibly mountable blobs.
    // Otherwise they can't be pushed at all.
    realStore := store
    wrapped := wrapWithFakeMountableBlobs(store, mountableBlobs)
    store = wrapped

    pusher, err := resolver.Pusher(ctx, targetRef.String())
    if err != nil {
        return err
    }

    addLayerJobs := containerdimages.HandlerFunc(
        func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
            switch {
            case containerdimages.IsIndexType(desc.MediaType),
                containerdimages.IsManifestType(desc.MediaType),
                containerdimages.IsConfigType(desc.MediaType):
            default:
                jobsQueue.Add(desc)
            }

            return nil, nil
        },
    )

    handlerWrapper := func(h images.Handler) images.Handler {
        return containerdimages.Handlers(addLayerJobs, h)
    }

    err = remotes.PushContent(ctx, pusher, target, store, limiter, platforms.All, handlerWrapper)
    if err != nil {
        // If push failed because of a missing content, no specific platform was requested
        // and the target is an index, select a platform-specific manifest to push instead.
        if cerrdefs.IsNotFound(err) && containerdimages.IsIndexType(target.MediaType) && platform == nil {
            var newTarget ocispec.Descriptor
            newTarget, err = i.getPushDescriptor(ctx, img, nil)
            if err != nil {
                return err
            }

            // Retry only if the new push candidate is different from the previous one.
            if newTarget.Digest != target.Digest {
                orgTarget := target
                target = newTarget
                pp.TurnNotStartedIntoUnavailable()
                err = remotes.PushContent(ctx, pusher, target, store, limiter, platforms.All, handlerWrapper)

                if err == nil {
                    progress.Aux(out, auxprogress.ManifestPushedInsteadOfIndex{
                        ManifestPushedInsteadOfIndex: true,
                        OriginalIndex:                orgTarget,
                        SelectedManifest:             newTarget,
                    })
                }
            }
        }

        if err != nil {
            if !cerrdefs.IsNotFound(err) {
                return errdefs.System(err)
            }
            progress.Aux(out, auxprogress.ContentMissing{
                ContentMissing: true,
                Desc:           target,
            })
            return errdefs.NotFound(err)
        }
    }

    appendDistributionSourceLabel(ctx, realStore, targetRef, target)

    i.LogImageEvent(reference.FamiliarString(targetRef), reference.FamiliarName(targetRef), events.ActionPush)

    return nil
}

func (i *ImageService) getPushDescriptor(ctx context.Context, img containerdimages.Image, platform *ocispec.Platform) (ocispec.Descriptor, error) {
    // Allow to override the host platform for testing purposes.
    hostPlatform := i.defaultPlatformOverride
    if hostPlatform == nil {
        hostPlatform = platforms.Default()
    }

    pm := matchAllWithPreference(hostPlatform)
    if platform != nil {
        pm = platforms.OnlyStrict(*platform)
    }

    anyMissing := false

    var bestMatchPlatform ocispec.Platform
    var bestMatch *ImageManifest
    var presentMatchingManifests []*ImageManifest
    err := i.walkReachableImageManifests(ctx, img, func(im *ImageManifest) error {
        available, err := im.CheckContentAvailable(ctx)
        if err != nil {
            return fmt.Errorf("failed to determine availability of image manifest %s: %w", im.Target().Digest, err)
        }

        if !available {
            anyMissing = true
            return nil
        }

        if im.IsAttestation() {
            return nil
        }

        imgPlatform, err := im.ImagePlatform(ctx)
        if err != nil {
            return fmt.Errorf("failed to determine platform of image %s: %w", img.Name, err)
        }

        if !pm.Match(imgPlatform) {
            return nil
        }

        presentMatchingManifests = append(presentMatchingManifests, im)
        if bestMatch == nil || pm.Less(imgPlatform, bestMatchPlatform) {
            bestMatchPlatform = imgPlatform
            bestMatch = im
        }

        return nil
    })
    if err != nil {
        return ocispec.Descriptor{}, err
    }

    switch len(presentMatchingManifests) {
    case 0:
        return ocispec.Descriptor{}, errdefs.NotFound(fmt.Errorf("no suitable image manifest found for platform %s", *platform))
    case 1:
        // Only one manifest is available AND matching the requested platform.

        if platform != nil {
            // Explicit platform was requested
            return presentMatchingManifests[0].Target(), nil
        }

        // No specific platform was requested, but only one manifest is available.
        if anyMissing {
            return presentMatchingManifests[0].Target(), nil
        }

        // Index has only one manifest anyway, select the full index.
        return img.Target, nil
    default:
        if platform == nil {
            if !anyMissing {
                // No specific platform requested, and all manifests are available, select the full index.
                return img.Target, nil
            }

            // No specific platform requested and not all manifests are available.
            // Select the manifest that matches the host platform the best.
            if bestMatch != nil && hostPlatform.Match(bestMatchPlatform) {
                return bestMatch.Target(), nil
            }

            return ocispec.Descriptor{}, errdefs.Conflict(errors.Errorf("multiple matching manifests found but no specific platform requested"))
        }

        return ocispec.Descriptor{}, errdefs.Conflict(errors.Errorf("multiple manifests found for platform %s", *platform))
    }
}

func appendDistributionSourceLabel(ctx context.Context, realStore content.Store, targetRef reference.Named, target ocispec.Descriptor) {
    appendSource, err := docker.AppendDistributionSourceLabel(realStore, targetRef.String())
    if err != nil {
        // This shouldn't happen at this point because the reference would have to be invalid
        // and if it was, then it would error out earlier.
        log.G(ctx).WithError(err).Warn("failed to create an handler that appends distribution source label to pushed content")
        return
    }

    handler := presentChildrenHandler(realStore, appendSource)
    if err := containerdimages.Dispatch(ctx, handler, nil, target); err != nil {
        // Shouldn't happen, but even if it would fail, then make it only a warning
        // because it doesn't affect the pushed data.
        log.G(ctx).WithError(err).Warn("failed to append distribution source labels to pushed content")
    }
}

// findMissingMountable will walk the target descriptor recursively and return
// missing contents with their distribution source which could potentially
// be cross-repo mounted.
func findMissingMountable(ctx context.Context, store content.Store, queue *jobs,
    target ocispec.Descriptor, targetRef reference.Named, limiter *semaphore.Weighted,
) (map[digest.Digest]distributionSource, error) {
    mountableBlobs := map[digest.Digest]distributionSource{}
    var mutex sync.Mutex

    sources, err := getDigestSources(ctx, store, target.Digest)
    if err != nil {
        if !errdefs.IsNotFound(err) {
            return nil, err
        }
        log.G(ctx).WithField("target", target).Debug("distribution source label not found")
        return mountableBlobs, nil
    }

    handler := func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
        _, err := store.Info(ctx, desc.Digest)
        if err != nil {
            if !cerrdefs.IsNotFound(err) {
                return nil, errdefs.System(errors.Wrapf(err, "failed to get metadata of content %s", desc.Digest.String()))
            }

            for _, source := range sources {
                if canBeMounted(desc.MediaType, targetRef, source) {
                    mutex.Lock()
                    mountableBlobs[desc.Digest] = source
                    mutex.Unlock()
                    queue.Add(desc)
                    break
                }
            }
            return nil, nil
        }

        return containerdimages.Children(ctx, store, desc)
    }

    err = containerdimages.Dispatch(ctx, containerdimages.HandlerFunc(handler), limiter, target)
    if err != nil {
        return nil, err
    }

    return mountableBlobs, nil
}

func getDigestSources(ctx context.Context, store content.Manager, digest digest.Digest) ([]distributionSource, error) {
    info, err := store.Info(ctx, digest)
    if err != nil {
        if cerrdefs.IsNotFound(err) {
            return nil, errdefs.NotFound(err)
        }
        return nil, errdefs.System(err)
    }

    sources := extractDistributionSources(info.Labels)
    if sources == nil {
        return nil, errdefs.NotFound(fmt.Errorf("label %q is not attached to %s", containerdlabels.LabelDistributionSource, digest.String()))
    }

    return sources, nil
}

func extractDistributionSources(labels map[string]string) []distributionSource {
    var sources []distributionSource

    // Check if this blob has a distributionSource label
    // if yes, read it as source
    for k, v := range labels {
        if reg := strings.TrimPrefix(k, containerdlabels.LabelDistributionSource); reg != k {
            for _, repo := range strings.Split(v, ",") {
                ref, err := reference.ParseNamed(reg + "/" + repo)
                if err != nil {
                    continue
                }

                sources = append(sources, distributionSource{
                    registryRef: ref,
                })
            }
        }
    }

    return sources
}

type distributionSource struct {
    registryRef reference.Named
}

// ToAnnotation returns key and value
func (source distributionSource) ToAnnotation() (string, string) {
    domain := reference.Domain(source.registryRef)
    v := reference.Path(source.registryRef)
    return containerdlabels.LabelDistributionSource + domain, v
}

func (source distributionSource) GetReference(dgst digest.Digest) (reference.Named, error) {
    return reference.WithDigest(source.registryRef, dgst)
}

// canBeMounted returns if the content with given media type can be cross-repo
// mounted when pushing it to a remote reference ref.
func canBeMounted(mediaType string, targetRef reference.Named, source distributionSource) bool {
    if containerdimages.IsManifestType(mediaType) {
        return false
    }
    if containerdimages.IsIndexType(mediaType) {
        return false
    }

    reg := reference.Domain(targetRef)
    // Remove :port suffix from domain
    // containerd distribution source label doesn't store port
    if portIdx := strings.LastIndex(reg, ":"); portIdx != -1 {
        reg = reg[:portIdx]
    }

    // If the source registry is the same as the one we are pushing to
    // then the cross-repo mount will work.
    return reg == reference.Domain(source.registryRef)
}