docker/docker

View on GitHub
distribution/push_v2.go

Summary

Maintainability
D
3 days
Test Coverage
package distribution // import "github.com/docker/docker/distribution"

import (
    "context"
    "fmt"
    "io"
    "os"
    "runtime"
    "sort"
    "strings"
    "sync"

    "github.com/containerd/log"
    "github.com/distribution/reference"
    "github.com/docker/distribution"
    "github.com/docker/distribution/manifest/schema1"
    "github.com/docker/distribution/manifest/schema2"
    "github.com/docker/distribution/registry/api/errcode"
    "github.com/docker/distribution/registry/client"
    apitypes "github.com/docker/docker/api/types"
    "github.com/docker/docker/distribution/metadata"
    "github.com/docker/docker/distribution/xfer"
    "github.com/docker/docker/layer"
    "github.com/docker/docker/pkg/ioutils"
    "github.com/docker/docker/pkg/progress"
    "github.com/docker/docker/pkg/stringid"
    "github.com/docker/docker/registry"
    "github.com/docker/libtrust"
    "github.com/opencontainers/go-digest"
    "github.com/pkg/errors"
)

const (
    smallLayerMaximumSize  = 100 * (1 << 10) // 100KB
    middleLayerMaximumSize = 10 * (1 << 20)  // 10MB
)

// newPusher creates a new pusher for pushing to a v2 registry.
// The parameters are passed through to the underlying pusher implementation for
// use during the actual push operation.
func newPusher(ref reference.Named, endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, config *ImagePushConfig) *pusher {
    return &pusher{
        metadataService: metadata.NewV2MetadataService(config.MetadataStore),
        ref:             ref,
        endpoint:        endpoint,
        repoInfo:        repoInfo,
        config:          config,
    }
}

type pusher struct {
    metadataService metadata.V2MetadataService
    ref             reference.Named
    endpoint        registry.APIEndpoint
    repoInfo        *registry.RepositoryInfo
    config          *ImagePushConfig
    repo            distribution.Repository

    // pushState is state built by the Upload functions.
    pushState pushState
}

type pushState struct {
    sync.Mutex
    // remoteLayers is the set of layers known to exist on the remote side.
    // This avoids redundant queries when pushing multiple tags that
    // involve the same layers. It is also used to fill in digest and size
    // information when building the manifest.
    remoteLayers map[layer.DiffID]distribution.Descriptor
    hasAuthInfo  bool
}

// TODO(tiborvass): have push() take a reference to repository + tag, so that the pusher itself is repository-agnostic.
func (p *pusher) push(ctx context.Context) (err error) {
    p.pushState.remoteLayers = make(map[layer.DiffID]distribution.Descriptor)

    p.repo, err = newRepository(ctx, p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "push", "pull")
    p.pushState.hasAuthInfo = p.config.AuthConfig.RegistryToken != "" || (p.config.AuthConfig.Username != "" && p.config.AuthConfig.Password != "")
    if err != nil {
        log.G(ctx).Debugf("Error getting v2 registry: %v", err)
        return err
    }

    if err = p.pushRepository(ctx); err != nil {
        if continueOnError(err, p.endpoint.Mirror) {
            return fallbackError{
                err:         err,
                transportOK: true,
            }
        }
    }
    return err
}

func (p *pusher) pushRepository(ctx context.Context) (err error) {
    if namedTagged, isNamedTagged := p.ref.(reference.NamedTagged); isNamedTagged {
        imageID, err := p.config.ReferenceStore.Get(p.ref)
        if err != nil {
            return fmt.Errorf("tag does not exist: %s", reference.FamiliarString(p.ref))
        }

        return p.pushTag(ctx, namedTagged, imageID)
    }

    if !reference.IsNameOnly(p.ref) {
        return errors.New("cannot push a digest reference")
    }

    // Push all tags
    pushed := 0
    for _, association := range p.config.ReferenceStore.ReferencesByName(p.ref) {
        if namedTagged, isNamedTagged := association.Ref.(reference.NamedTagged); isNamedTagged {
            pushed++
            if err := p.pushTag(ctx, namedTagged, association.ID); err != nil {
                return err
            }
        }
    }

    if pushed == 0 {
        return fmt.Errorf("no tags to push for %s", reference.FamiliarName(p.repoInfo.Name))
    }

    return nil
}

func (p *pusher) pushTag(ctx context.Context, ref reference.NamedTagged, id digest.Digest) error {
    log.G(ctx).Debugf("Pushing repository: %s", reference.FamiliarString(ref))

    imgConfig, err := p.config.ImageStore.Get(ctx, id)
    if err != nil {
        return fmt.Errorf("could not find image from tag %s: %v", reference.FamiliarString(ref), err)
    }

    rootfs, err := rootFSFromConfig(imgConfig)
    if err != nil {
        return fmt.Errorf("unable to get rootfs for image %s: %s", reference.FamiliarString(ref), err)
    }

    l, err := p.config.LayerStores.Get(rootfs.ChainID())
    if err != nil {
        return fmt.Errorf("failed to get top layer from image: %v", err)
    }
    defer l.Release()

    hmacKey, err := metadata.ComputeV2MetadataHMACKey(p.config.AuthConfig)
    if err != nil {
        return fmt.Errorf("failed to compute hmac key of auth config: %v", err)
    }

    var descriptors []xfer.UploadDescriptor

    descriptorTemplate := pushDescriptor{
        metadataService: p.metadataService,
        hmacKey:         hmacKey,
        repoInfo:        p.repoInfo.Name,
        ref:             p.ref,
        endpoint:        p.endpoint,
        repo:            p.repo,
        pushState:       &p.pushState,
    }

    // Loop bounds condition is to avoid pushing the base layer on Windows.
    for range rootfs.DiffIDs {
        descriptor := descriptorTemplate
        descriptor.layer = l
        descriptor.checkedDigests = make(map[digest.Digest]struct{})
        descriptors = append(descriptors, &descriptor)

        l = l.Parent()
    }

    if err := p.config.UploadManager.Upload(ctx, descriptors, p.config.ProgressOutput); err != nil {
        return err
    }

    // Try schema2 first
    builder := schema2.NewManifestBuilder(p.repo.Blobs(ctx), p.config.ConfigMediaType, imgConfig)
    manifest, err := manifestFromBuilder(ctx, builder, descriptors)
    if err != nil {
        return err
    }

    manSvc, err := p.repo.Manifests(ctx)
    if err != nil {
        return err
    }

    putOptions := []distribution.ManifestServiceOption{distribution.WithTag(ref.Tag())}
    if _, err = manSvc.Put(ctx, manifest, putOptions...); err != nil {
        if runtime.GOOS == "windows" {
            log.G(ctx).Warnf("failed to upload schema2 manifest: %v", err)
            return err
        }

        // This is a temporary environment variables used in CI to allow pushing
        // manifest v2 schema 1 images to test-registries used for testing *pulling*
        // these images.
        if os.Getenv("DOCKER_ALLOW_SCHEMA1_PUSH_DONOTUSE") == "" {
            if err.Error() == "tag invalid" {
                msg := "[DEPRECATED] support for pushing manifest v2 schema1 images has been removed. More information at https://docs.docker.com/registry/spec/deprecated-schema-v1/"
                log.G(ctx).WithError(err).Error(msg)
                return errors.Wrap(err, msg)
            }
            return err
        }

        log.G(ctx).Warnf("failed to upload schema2 manifest: %v - falling back to schema1", err)

        // Note: this fallback is deprecated, see log messages below
        manifestRef, err := reference.WithTag(p.repo.Named(), ref.Tag())
        if err != nil {
            return err
        }
        pk, err := libtrust.GenerateECP256PrivateKey()
        if err != nil {
            return errors.Wrap(err, "unexpected error generating private key")
        }
        builder = schema1.NewConfigManifestBuilder(p.repo.Blobs(ctx), pk, manifestRef, imgConfig)
        manifest, err = manifestFromBuilder(ctx, builder, descriptors)
        if err != nil {
            return err
        }

        if _, err = manSvc.Put(ctx, manifest, putOptions...); err != nil {
            return err
        }

        // schema2 failed but schema1 succeeded
        msg := fmt.Sprintf("[DEPRECATION NOTICE] support for pushing manifest v2 schema1 images will be removed in an upcoming release. Please contact admins of the %s registry NOW to avoid future disruption. More information at https://docs.docker.com/registry/spec/deprecated-schema-v1/", reference.Domain(ref))
        log.G(ctx).Warn(msg)
        progress.Message(p.config.ProgressOutput, "", msg)
    }

    var canonicalManifest []byte

    switch v := manifest.(type) {
    case *schema1.SignedManifest:
        canonicalManifest = v.Canonical
    case *schema2.DeserializedManifest:
        _, canonicalManifest, err = v.Payload()
        if err != nil {
            return err
        }
    }

    manifestDigest := digest.FromBytes(canonicalManifest)
    progress.Messagef(p.config.ProgressOutput, "", "%s: digest: %s size: %d", ref.Tag(), manifestDigest, len(canonicalManifest))

    if err := addDigestReference(p.config.ReferenceStore, ref, manifestDigest, id); err != nil {
        return err
    }

    // Signal digest to the trust client so it can sign the
    // push, if appropriate.
    progress.Aux(p.config.ProgressOutput, apitypes.PushResult{Tag: ref.Tag(), Digest: manifestDigest.String(), Size: len(canonicalManifest)})

    return nil
}

func manifestFromBuilder(ctx context.Context, builder distribution.ManifestBuilder, descriptors []xfer.UploadDescriptor) (distribution.Manifest, error) {
    // descriptors is in reverse order; iterate backwards to get references
    // appended in the right order.
    for i := len(descriptors) - 1; i >= 0; i-- {
        if err := builder.AppendReference(descriptors[i].(*pushDescriptor)); err != nil {
            return nil, err
        }
    }

    return builder.Build(ctx)
}

type pushDescriptor struct {
    layer            PushLayer
    metadataService  metadata.V2MetadataService
    hmacKey          []byte
    repoInfo         reference.Named
    ref              reference.Named
    endpoint         registry.APIEndpoint
    repo             distribution.Repository
    pushState        *pushState
    remoteDescriptor distribution.Descriptor
    // a set of digests whose presence has been checked in a target repository
    checkedDigests map[digest.Digest]struct{}
}

func (pd *pushDescriptor) Key() string {
    return "v2push:" + pd.ref.Name() + " " + pd.layer.DiffID().String()
}

func (pd *pushDescriptor) ID() string {
    return stringid.TruncateID(pd.layer.DiffID().String())
}

func (pd *pushDescriptor) DiffID() layer.DiffID {
    return pd.layer.DiffID()
}

func (pd *pushDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) {
    // Skip foreign layers unless this registry allows nondistributable artifacts.
    if !pd.endpoint.AllowNondistributableArtifacts {
        if fs, ok := pd.layer.(distribution.Describable); ok {
            if d := fs.Descriptor(); len(d.URLs) > 0 {
                progress.Update(progressOutput, pd.ID(), "Skipped foreign layer")
                return d, nil
            }
        }
    }

    diffID := pd.DiffID()

    pd.pushState.Lock()
    if descriptor, ok := pd.pushState.remoteLayers[diffID]; ok {
        // it is already known that the push is not needed and
        // therefore doing a stat is unnecessary
        pd.pushState.Unlock()
        progress.Update(progressOutput, pd.ID(), "Layer already exists")
        return descriptor, nil
    }
    pd.pushState.Unlock()

    maxMountAttempts, maxExistenceChecks, checkOtherRepositories := getMaxMountAndExistenceCheckAttempts(pd.layer)

    // Do we have any metadata associated with this layer's DiffID?
    metaData, err := pd.metadataService.GetMetadata(diffID)
    if err == nil {
        // check for blob existence in the target repository
        descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, true, 1, metaData)
        if exists || err != nil {
            return descriptor, err
        }
    }

    // if digest was empty or not saved, or if blob does not exist on the remote repository,
    // then push the blob.
    bs := pd.repo.Blobs(ctx)

    var layerUpload distribution.BlobWriter

    // Attempt to find another repository in the same registry to mount the layer from to avoid an unnecessary upload
    candidates := getRepositoryMountCandidates(pd.repoInfo, pd.hmacKey, maxMountAttempts, metaData)
    isUnauthorizedError := false
    for _, mc := range candidates {
        mountCandidate := mc
        log.G(ctx).Debugf("attempting to mount layer %s (%s) from %s", diffID, mountCandidate.Digest, mountCandidate.SourceRepository)
        createOpts := []distribution.BlobCreateOption{}

        if len(mountCandidate.SourceRepository) > 0 {
            namedRef, err := reference.ParseNormalizedNamed(mountCandidate.SourceRepository)
            if err != nil {
                log.G(ctx).WithError(err).Errorf("failed to parse source repository reference %v", reference.FamiliarString(namedRef))
                _ = pd.metadataService.Remove(mountCandidate)
                continue
            }

            // Candidates are always under same domain, create remote reference
            // with only path to set mount from with
            remoteRef, err := reference.WithName(reference.Path(namedRef))
            if err != nil {
                log.G(ctx).WithError(err).Errorf("failed to make remote reference out of %q", reference.Path(namedRef))
                continue
            }

            canonicalRef, err := reference.WithDigest(reference.TrimNamed(remoteRef), mountCandidate.Digest)
            if err != nil {
                log.G(ctx).WithError(err).Error("failed to make canonical reference")
                continue
            }

            createOpts = append(createOpts, client.WithMountFrom(canonicalRef))
        }

        // send the layer
        lu, err := bs.Create(ctx, createOpts...)
        switch err := err.(type) {
        case nil:
            // noop
        case distribution.ErrBlobMounted:
            progress.Updatef(progressOutput, pd.ID(), "Mounted from %s", err.From.Name())

            err.Descriptor.MediaType = schema2.MediaTypeLayer

            pd.pushState.Lock()
            pd.pushState.remoteLayers[diffID] = err.Descriptor
            pd.pushState.Unlock()

            // Cache mapping from this layer's DiffID to the blobsum
            if err := pd.metadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
                Digest:           err.Descriptor.Digest,
                SourceRepository: pd.repoInfo.Name(),
            }); err != nil {
                return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
            }
            return err.Descriptor, nil
        case errcode.Errors:
            for _, e := range err {
                switch e := e.(type) {
                case errcode.Error:
                    if e.Code == errcode.ErrorCodeUnauthorized {
                        // when unauthorized error that indicate user don't has right to push layer to register
                        log.G(ctx).Debugln("failed to push layer to registry because unauthorized error")
                        isUnauthorizedError = true
                    }
                default:
                }
            }
        default:
            log.G(ctx).Infof("failed to mount layer %s (%s) from %s: %v", diffID, mountCandidate.Digest, mountCandidate.SourceRepository, err)
        }

        // when error is unauthorizedError and user don't hasAuthInfo that's the case user don't has right to push layer to register
        // and he hasn't login either, in this case candidate cache should be removed
        if len(mountCandidate.SourceRepository) > 0 &&
            !(isUnauthorizedError && !pd.pushState.hasAuthInfo) &&
            (metadata.CheckV2MetadataHMAC(&mountCandidate, pd.hmacKey) ||
                len(mountCandidate.HMAC) == 0) {
            cause := "blob mount failure"
            if err != nil {
                cause = fmt.Sprintf("an error: %v", err.Error())
            }
            log.G(ctx).Debugf("removing association between layer %s and %s due to %s", mountCandidate.Digest, mountCandidate.SourceRepository, cause)
            _ = pd.metadataService.Remove(mountCandidate)
        }

        if lu != nil {
            // cancel previous upload
            cancelLayerUpload(ctx, mountCandidate.Digest, layerUpload)
            layerUpload = lu
        }
    }

    if maxExistenceChecks-len(pd.checkedDigests) > 0 {
        // do additional layer existence checks with other known digests if any
        descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, checkOtherRepositories, maxExistenceChecks-len(pd.checkedDigests), metaData)
        if exists || err != nil {
            return descriptor, err
        }
    }

    log.G(ctx).Debugf("Pushing layer: %s", diffID)
    if layerUpload == nil {
        layerUpload, err = bs.Create(ctx)
        if err != nil {
            return distribution.Descriptor{}, retryOnError(err)
        }
    }
    defer layerUpload.Close()
    // upload the blob
    return pd.uploadUsingSession(ctx, progressOutput, diffID, layerUpload)
}

func (pd *pushDescriptor) SetRemoteDescriptor(descriptor distribution.Descriptor) {
    pd.remoteDescriptor = descriptor
}

func (pd *pushDescriptor) Descriptor() distribution.Descriptor {
    return pd.remoteDescriptor
}

func (pd *pushDescriptor) uploadUsingSession(
    ctx context.Context,
    progressOutput progress.Output,
    diffID layer.DiffID,
    layerUpload distribution.BlobWriter,
) (distribution.Descriptor, error) {
    var reader io.ReadCloser

    contentReader, err := pd.layer.Open()
    if err != nil {
        return distribution.Descriptor{}, retryOnError(err)
    }

    reader = progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, contentReader), progressOutput, pd.layer.Size(), pd.ID(), "Pushing")

    switch m := pd.layer.MediaType(); m {
    case schema2.MediaTypeUncompressedLayer:
        compressedReader, compressionDone := compress(reader)
        defer func(closer io.Closer) {
            closer.Close()
            <-compressionDone
        }(reader)
        reader = compressedReader
    case schema2.MediaTypeLayer:
    default:
        reader.Close()
        return distribution.Descriptor{}, xfer.DoNotRetry{Err: fmt.Errorf("unsupported layer media type %s", m)}
    }

    digester := digest.Canonical.Digester()
    tee := io.TeeReader(reader, digester.Hash())

    nn, err := layerUpload.ReadFrom(tee)
    reader.Close()
    if err != nil {
        return distribution.Descriptor{}, retryOnError(err)
    }

    pushDigest := digester.Digest()
    if _, err := layerUpload.Commit(ctx, distribution.Descriptor{Digest: pushDigest}); err != nil {
        return distribution.Descriptor{}, retryOnError(err)
    }

    log.G(ctx).Debugf("uploaded layer %s (%s), %d bytes", diffID, pushDigest, nn)
    progress.Update(progressOutput, pd.ID(), "Pushed")

    // Cache mapping from this layer's DiffID to the blobsum
    if err := pd.metadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
        Digest:           pushDigest,
        SourceRepository: pd.repoInfo.Name(),
    }); err != nil {
        return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
    }

    desc := distribution.Descriptor{
        Digest:    pushDigest,
        MediaType: schema2.MediaTypeLayer,
        Size:      nn,
    }

    pd.pushState.Lock()
    pd.pushState.remoteLayers[diffID] = desc
    pd.pushState.Unlock()

    return desc, nil
}

// layerAlreadyExists checks if the registry already knows about any of the metadata passed in the "metadata"
// slice. If it finds one that the registry knows about, it returns the known digest and "true". If
// "checkOtherRepositories" is true, stat will be performed also with digests mapped to any other repository
// (not just the target one).
func (pd *pushDescriptor) layerAlreadyExists(
    ctx context.Context,
    progressOutput progress.Output,
    diffID layer.DiffID,
    checkOtherRepositories bool,
    maxExistenceCheckAttempts int,
    v2Metadata []metadata.V2Metadata,
) (desc distribution.Descriptor, exists bool, err error) {
    // filter the metadata
    candidates := []metadata.V2Metadata{}
    for _, meta := range v2Metadata {
        if len(meta.SourceRepository) > 0 && !checkOtherRepositories && meta.SourceRepository != pd.repoInfo.Name() {
            continue
        }
        candidates = append(candidates, meta)
    }
    // sort the candidates by similarity
    sortV2MetadataByLikenessAndAge(pd.repoInfo, pd.hmacKey, candidates)

    digestToMetadata := make(map[digest.Digest]*metadata.V2Metadata)
    // an array of unique blob digests ordered from the best mount candidates to worst
    layerDigests := []digest.Digest{}
    for i := 0; i < len(candidates); i++ {
        if len(layerDigests) >= maxExistenceCheckAttempts {
            break
        }
        meta := &candidates[i]
        if _, exists := digestToMetadata[meta.Digest]; exists {
            // keep reference just to the first mapping (the best mount candidate)
            continue
        }
        if _, exists := pd.checkedDigests[meta.Digest]; exists {
            // existence of this digest has already been tested
            continue
        }
        digestToMetadata[meta.Digest] = meta
        layerDigests = append(layerDigests, meta.Digest)
    }

attempts:
    for _, dgst := range layerDigests {
        meta := digestToMetadata[dgst]
        log.G(ctx).Debugf("Checking for presence of layer %s (%s) in %s", diffID, dgst, pd.repoInfo.Name())
        desc, err = pd.repo.Blobs(ctx).Stat(ctx, dgst)
        pd.checkedDigests[meta.Digest] = struct{}{}
        switch err {
        case nil:
            if m, ok := digestToMetadata[desc.Digest]; !ok || m.SourceRepository != pd.repoInfo.Name() || !metadata.CheckV2MetadataHMAC(m, pd.hmacKey) {
                // cache mapping from this layer's DiffID to the blobsum
                if err := pd.metadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
                    Digest:           desc.Digest,
                    SourceRepository: pd.repoInfo.Name(),
                }); err != nil {
                    return distribution.Descriptor{}, false, xfer.DoNotRetry{Err: err}
                }
            }
            desc.MediaType = schema2.MediaTypeLayer
            exists = true
            break attempts
        case distribution.ErrBlobUnknown:
            if meta.SourceRepository == pd.repoInfo.Name() {
                // remove the mapping to the target repository
                pd.metadataService.Remove(*meta)
            }
        default:
            log.G(ctx).WithError(err).Debugf("Failed to check for presence of layer %s (%s) in %s", diffID, dgst, pd.repoInfo.Name())
        }
    }

    if exists {
        progress.Update(progressOutput, pd.ID(), "Layer already exists")
        pd.pushState.Lock()
        pd.pushState.remoteLayers[diffID] = desc
        pd.pushState.Unlock()
    }

    return desc, exists, nil
}

// getMaxMountAndExistenceCheckAttempts returns a maximum number of cross repository mount attempts from
// source repositories of target registry, maximum number of layer existence checks performed on the target
// repository and whether the check shall be done also with digests mapped to different repositories. The
// decision is based on layer size. The smaller the layer, the fewer attempts shall be made because the cost
// of upload does not outweigh a latency.
func getMaxMountAndExistenceCheckAttempts(layer PushLayer) (maxMountAttempts, maxExistenceCheckAttempts int, checkOtherRepositories bool) {
    size := layer.Size()
    switch {
    // big blob
    case size > middleLayerMaximumSize:
        // 1st attempt to mount the blob few times
        // 2nd few existence checks with digests associated to any repository
        // then fallback to upload
        return 4, 3, true

    // middle sized blobs; if we could not get the size, assume we deal with middle sized blob
    case size > smallLayerMaximumSize:
        // 1st attempt to mount blobs of average size few times
        // 2nd try at most 1 existence check if there's an existing mapping to the target repository
        // then fallback to upload
        return 3, 1, false

    // small blobs, do a minimum number of checks
    default:
        return 1, 1, false
    }
}

// getRepositoryMountCandidates returns an array of v2 metadata items belonging to the given registry. The
// array is sorted from youngest to oldest. The resulting array will contain only metadata entries having
// registry part of SourceRepository matching the part of repoInfo.
func getRepositoryMountCandidates(
    repoInfo reference.Named,
    hmacKey []byte,
    max int,
    v2Metadata []metadata.V2Metadata,
) []metadata.V2Metadata {
    candidates := []metadata.V2Metadata{}
    for _, meta := range v2Metadata {
        sourceRepo, err := reference.ParseNamed(meta.SourceRepository)
        if err != nil || reference.Domain(repoInfo) != reference.Domain(sourceRepo) {
            continue
        }
        // target repository is not a viable candidate
        if meta.SourceRepository == repoInfo.Name() {
            continue
        }
        candidates = append(candidates, meta)
    }

    sortV2MetadataByLikenessAndAge(repoInfo, hmacKey, candidates)
    if max >= 0 && len(candidates) > max {
        // select the youngest metadata
        candidates = candidates[:max]
    }

    return candidates
}

// byLikeness is a sorting container for v2 metadata candidates for cross repository mount. The
// candidate "a" is preferred over "b":
//
//  1. if it was hashed using the same AuthConfig as the one used to authenticate to target repository and the
//     "b" was not
//  2. if a number of its repository path components exactly matching path components of target repository is higher
type byLikeness struct {
    arr            []metadata.V2Metadata
    hmacKey        []byte
    pathComponents []string
}

func (bla byLikeness) Less(i, j int) bool {
    aMacMatch := metadata.CheckV2MetadataHMAC(&bla.arr[i], bla.hmacKey)
    bMacMatch := metadata.CheckV2MetadataHMAC(&bla.arr[j], bla.hmacKey)
    if aMacMatch != bMacMatch {
        return aMacMatch
    }
    aMatch := numOfMatchingPathComponents(bla.arr[i].SourceRepository, bla.pathComponents)
    bMatch := numOfMatchingPathComponents(bla.arr[j].SourceRepository, bla.pathComponents)
    return aMatch > bMatch
}

func (bla byLikeness) Swap(i, j int) {
    bla.arr[i], bla.arr[j] = bla.arr[j], bla.arr[i]
}
func (bla byLikeness) Len() int { return len(bla.arr) }

func sortV2MetadataByLikenessAndAge(repoInfo reference.Named, hmacKey []byte, marr []metadata.V2Metadata) {
    // reverse the metadata array to shift the newest entries to the beginning
    for i := 0; i < len(marr)/2; i++ {
        marr[i], marr[len(marr)-i-1] = marr[len(marr)-i-1], marr[i]
    }
    // keep equal entries ordered from the youngest to the oldest
    sort.Stable(byLikeness{
        arr:            marr,
        hmacKey:        hmacKey,
        pathComponents: getPathComponents(repoInfo.Name()),
    })
}

// numOfMatchingPathComponents returns a number of path components in "pth" that exactly match "matchComponents".
func numOfMatchingPathComponents(pth string, matchComponents []string) int {
    pthComponents := getPathComponents(pth)
    i := 0
    for ; i < len(pthComponents) && i < len(matchComponents); i++ {
        if matchComponents[i] != pthComponents[i] {
            return i
        }
    }
    return i
}

func getPathComponents(path string) []string {
    return strings.Split(path, "/")
}

func cancelLayerUpload(ctx context.Context, dgst digest.Digest, layerUpload distribution.BlobWriter) {
    if layerUpload != nil {
        log.G(ctx).Debugf("cancelling upload of blob %s", dgst)
        err := layerUpload.Cancel(ctx)
        if err != nil {
            log.G(ctx).Warnf("failed to cancel upload: %v", err)
        }
    }
}