dotcloud/docker

View on GitHub
distribution/push.go

Summary

Maintainability
A
1 hr
Test Coverage
package distribution // import "github.com/docker/docker/distribution"

import (
    "bufio"
    "compress/gzip"
    "context"
    "fmt"
    "io"

    "github.com/containerd/log"
    "github.com/distribution/reference"
    "github.com/docker/docker/api/types/events"
    "github.com/docker/docker/pkg/progress"
)

const compressionBufSize = 32768

// Push initiates a push operation on ref. ref is the specific variant of the
// image to push. If no tag is provided, all tags are pushed.
func Push(ctx context.Context, ref reference.Named, config *ImagePushConfig) error {
    // FIXME: Allow to interrupt current push when new push of same image is done.

    // Resolve the Repository name from fqn to RepositoryInfo
    repoInfo, err := config.RegistryService.ResolveRepository(ref)
    if err != nil {
        return err
    }

    endpoints, err := config.RegistryService.LookupPushEndpoints(reference.Domain(repoInfo.Name))
    if err != nil {
        return err
    }

    progress.Messagef(config.ProgressOutput, "", "The push refers to repository [%s]", repoInfo.Name.Name())

    associations := config.ReferenceStore.ReferencesByName(repoInfo.Name)
    if len(associations) == 0 {
        return fmt.Errorf("An image does not exist locally with the tag: %s", reference.FamiliarName(repoInfo.Name))
    }

    var (
        lastErr error

        // confirmedTLSRegistries is a map indicating which registries
        // are known to be using TLS. There should never be a plaintext
        // retry for any of these.
        confirmedTLSRegistries = make(map[string]struct{})
    )

    for _, endpoint := range endpoints {
        if endpoint.URL.Scheme != "https" {
            if _, confirmedTLS := confirmedTLSRegistries[endpoint.URL.Host]; confirmedTLS {
                log.G(ctx).Debugf("Skipping non-TLS endpoint %s for host/port that appears to use TLS", endpoint.URL)
                continue
            }
        }

        log.G(ctx).Debugf("Trying to push %s to %s", repoInfo.Name.Name(), endpoint.URL)

        if err := newPusher(ref, endpoint, repoInfo, config).push(ctx); err != nil {
            // Was this push cancelled? If so, don't try to fall
            // back.
            select {
            case <-ctx.Done():
            default:
                if fallbackErr, ok := err.(fallbackError); ok {
                    if fallbackErr.transportOK && endpoint.URL.Scheme == "https" {
                        confirmedTLSRegistries[endpoint.URL.Host] = struct{}{}
                    }
                    err = fallbackErr.err
                    lastErr = err
                    log.G(ctx).Infof("Attempting next endpoint for push after error: %v", err)
                    continue
                }
            }

            log.G(ctx).Errorf("Not continuing with push after error: %v", err)
            return err
        }

        config.ImageEventLogger(reference.FamiliarString(ref), reference.FamiliarName(repoInfo.Name), events.ActionPush)
        return nil
    }

    if lastErr == nil {
        lastErr = fmt.Errorf("no endpoints found for %s", repoInfo.Name.Name())
    }
    return lastErr
}

// compress returns an io.ReadCloser which will supply a compressed version of
// the provided Reader. The caller must close the ReadCloser after reading the
// compressed data.
//
// Note that this function returns a reader instead of taking a writer as an
// argument so that it can be used with httpBlobWriter's ReadFrom method.
// Using httpBlobWriter's Write method would send a PATCH request for every
// Write call.
//
// The second return value is a channel that gets closed when the goroutine
// is finished. This allows the caller to make sure the goroutine finishes
// before it releases any resources connected with the reader that was
// passed in.
func compress(in io.Reader) (io.ReadCloser, chan struct{}) {
    compressionDone := make(chan struct{})

    pipeReader, pipeWriter := io.Pipe()
    // Use a bufio.Writer to avoid excessive chunking in HTTP request.
    bufWriter := bufio.NewWriterSize(pipeWriter, compressionBufSize)
    compressor := gzip.NewWriter(bufWriter)

    go func() {
        _, err := io.Copy(compressor, in)
        if err == nil {
            err = compressor.Close()
        }
        if err == nil {
            err = bufWriter.Flush()
        }
        if err != nil {
            pipeWriter.CloseWithError(err)
        } else {
            pipeWriter.Close()
        }
        close(compressionDone)
    }()

    return pipeReader, compressionDone
}