docker/docker

View on GitHub
distribution/xfer/download.go

Summary

Maintainability
F
3 days
Test Coverage
package xfer // import "github.com/docker/docker/distribution/xfer"

import (
    "context"
    "errors"
    "fmt"
    "io"
    "time"

    "github.com/containerd/log"
    "github.com/docker/distribution"
    "github.com/docker/docker/image"
    "github.com/docker/docker/layer"
    "github.com/docker/docker/pkg/archive"
    "github.com/docker/docker/pkg/ioutils"
    "github.com/docker/docker/pkg/progress"
)

const maxDownloadAttempts = 5

// LayerDownloadManager figures out which layers need to be downloaded, then
// registers and downloads those, taking into account dependencies between
// layers.
type LayerDownloadManager struct {
    layerStore          layer.Store
    tm                  *transferManager
    waitDuration        time.Duration
    maxDownloadAttempts int
}

// SetConcurrency sets the max concurrent downloads for each pull
func (ldm *LayerDownloadManager) SetConcurrency(concurrency int) {
    ldm.tm.setConcurrency(concurrency)
}

// NewLayerDownloadManager returns a new LayerDownloadManager.
func NewLayerDownloadManager(layerStore layer.Store, concurrencyLimit int, options ...DownloadOption) *LayerDownloadManager {
    manager := LayerDownloadManager{
        layerStore:          layerStore,
        tm:                  newTransferManager(concurrencyLimit),
        waitDuration:        time.Second,
        maxDownloadAttempts: maxDownloadAttempts,
    }
    for _, option := range options {
        option(&manager)
    }
    return &manager
}

// DownloadOption set options for the LayerDownloadManager.
type DownloadOption func(*LayerDownloadManager)

// WithMaxDownloadAttempts configures the maximum number of download
// attempts for a download manager.
func WithMaxDownloadAttempts(max int) DownloadOption {
    return func(dlm *LayerDownloadManager) {
        dlm.maxDownloadAttempts = max
    }
}

type downloadTransfer struct {
    transfer

    layerStore layer.Store
    layer      layer.Layer
    err        error
}

// result returns the layer resulting from the download, if the download
// and registration were successful.
func (d *downloadTransfer) result() (layer.Layer, error) {
    return d.layer, d.err
}

// A DownloadDescriptor references a layer that may need to be downloaded.
type DownloadDescriptor interface {
    // Key returns the key used to deduplicate downloads.
    Key() string
    // ID returns the ID for display purposes.
    ID() string
    // DiffID should return the DiffID for this layer, or an error
    // if it is unknown (for example, if it has not been downloaded
    // before).
    DiffID() (layer.DiffID, error)
    // Download is called to perform the download.
    Download(ctx context.Context, progressOutput progress.Output) (io.ReadCloser, int64, error)
    // Close is called when the download manager is finished with this
    // descriptor and will not call Download again or read from the reader
    // that Download returned.
    Close()
}

// DigestRegisterer can be implemented by a DownloadDescriptor, and provides a
// Registered method which gets called after a downloaded layer is registered.
// This allows the user of the download manager to know the DiffID of each
// registered layer. This method is called if a cast to DigestRegisterer is
// successful.
type DigestRegisterer interface {
    // TODO existing implementations in distribution and builder-next swallow errors
    // when registering the diffID. Consider changing the Registered signature
    // to return the error.

    Registered(diffID layer.DiffID)
}

// Download is a blocking function which ensures the requested layers are
// present in the layer store. It uses the string returned by the Key method to
// deduplicate downloads. If a given layer is not already known to present in
// the layer store, and the key is not used by an in-progress download, the
// Download method is called to get the layer tar data. Layers are then
// registered in the appropriate order.  The caller must call the returned
// release function once it is done with the returned RootFS object.
func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS image.RootFS, layers []DownloadDescriptor, progressOutput progress.Output) (image.RootFS, func(), error) {
    var (
        topLayer       layer.Layer
        topDownload    *downloadTransfer
        watcher        *watcher
        missingLayer   bool
        transferKey    = ""
        downloadsByKey = make(map[string]*downloadTransfer)
    )

    rootFS := initialRootFS
    for _, descriptor := range layers {
        key := descriptor.Key()
        transferKey += key

        if !missingLayer {
            missingLayer = true
            diffID, err := descriptor.DiffID()
            if err == nil {
                getRootFS := rootFS
                getRootFS.Append(diffID)
                l, err := ldm.layerStore.Get(getRootFS.ChainID())
                if err == nil {
                    // Layer already exists.
                    log.G(ctx).Debugf("Layer already exists: %s", descriptor.ID())
                    progress.Update(progressOutput, descriptor.ID(), "Already exists")
                    if topLayer != nil {
                        layer.ReleaseAndLog(ldm.layerStore, topLayer)
                    }
                    topLayer = l
                    missingLayer = false
                    rootFS.Append(diffID)
                    // Register this repository as a source of this layer.
                    if withRegistered, ok := descriptor.(DigestRegisterer); ok { // As layerstore may set the driver
                        withRegistered.Registered(diffID)
                    }
                    continue
                }
            }
        }

        // Does this layer have the same data as a previous layer in
        // the stack? If so, avoid downloading it more than once.
        var topDownloadUncasted transfer
        if existingDownload, ok := downloadsByKey[key]; ok {
            xferFunc := ldm.makeDownloadFuncFromDownload(descriptor, existingDownload, topDownload)
            defer topDownload.transfer.release(watcher)
            topDownloadUncasted, watcher = ldm.tm.transfer(transferKey, xferFunc, progressOutput)
            topDownload = topDownloadUncasted.(*downloadTransfer)
            continue
        }

        // Layer is not known to exist - download and register it.
        progress.Update(progressOutput, descriptor.ID(), "Pulling fs layer")

        var xferFunc doFunc
        if topDownload != nil {
            xferFunc = ldm.makeDownloadFunc(descriptor, "", topDownload)
            defer topDownload.transfer.release(watcher)
        } else {
            xferFunc = ldm.makeDownloadFunc(descriptor, rootFS.ChainID(), nil)
        }
        topDownloadUncasted, watcher = ldm.tm.transfer(transferKey, xferFunc, progressOutput)
        topDownload = topDownloadUncasted.(*downloadTransfer)
        downloadsByKey[key] = topDownload
    }

    if topDownload == nil {
        return rootFS, func() {
            if topLayer != nil {
                layer.ReleaseAndLog(ldm.layerStore, topLayer)
            }
        }, nil
    }

    // Won't be using the list built up so far - will generate it
    // from downloaded layers instead.
    rootFS.DiffIDs = []layer.DiffID{}

    defer func() {
        if topLayer != nil {
            layer.ReleaseAndLog(ldm.layerStore, topLayer)
        }
    }()

    select {
    case <-ctx.Done():
        topDownload.transfer.release(watcher)
        return rootFS, func() {}, ctx.Err()
    case <-topDownload.done():
        break
    }

    l, err := topDownload.result()
    if err != nil {
        topDownload.transfer.release(watcher)
        return rootFS, func() {}, err
    }

    // Must do this exactly len(layers) times, so we don't include the
    // base layer on Windows.
    for range layers {
        if l == nil {
            topDownload.transfer.release(watcher)
            return rootFS, func() {}, errors.New("internal error: too few parent layers")
        }
        rootFS.DiffIDs = append([]layer.DiffID{l.DiffID()}, rootFS.DiffIDs...)
        l = l.Parent()
    }
    return rootFS, func() { topDownload.transfer.release(watcher) }, err
}

// makeDownloadFunc returns a function that performs the layer download and
// registration. If parentDownload is non-nil, it waits for that download to
// complete before the registration step, and registers the downloaded data
// on top of parentDownload's resulting layer. Otherwise, it registers the
// layer on top of the ChainID given by parentLayer.
func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, parentLayer layer.ChainID, parentDownload *downloadTransfer) doFunc {
    return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) transfer {
        d := &downloadTransfer{
            transfer:   newTransfer(),
            layerStore: ldm.layerStore,
        }

        go func() {
            defer func() {
                close(progressChan)
            }()

            progressOutput := progress.ChanOutput(progressChan)

            select {
            case <-start:
            default:
                progress.Update(progressOutput, descriptor.ID(), "Waiting")
                <-start
            }

            if parentDownload != nil {
                // Did the parent download already fail or get
                // cancelled?
                select {
                case <-parentDownload.done():
                    _, err := parentDownload.result()
                    if err != nil {
                        d.err = err
                        return
                    }
                default:
                }
            }

            var (
                downloadReader io.ReadCloser
                size           int64
                err            error
                attempt        int = 1
            )

            defer descriptor.Close()

            for {
                downloadReader, size, err = descriptor.Download(d.transfer.context(), progressOutput)
                if err == nil {
                    break
                }

                // If an error was returned because the context
                // was cancelled, we shouldn't retry.
                select {
                case <-d.transfer.context().Done():
                    d.err = err
                    return
                default:
                }

                if _, isDNR := err.(DoNotRetry); isDNR || attempt >= ldm.maxDownloadAttempts {
                    log.G(context.TODO()).Errorf("Download failed after %d attempts: %v", attempt, err)
                    d.err = err
                    return
                }

                log.G(context.TODO()).Infof("Download failed, retrying (%d/%d): %v", attempt, ldm.maxDownloadAttempts, err)
                delay := attempt * 5
                ticker := time.NewTicker(ldm.waitDuration)
                attempt++

            selectLoop:
                for {
                    progress.Updatef(progressOutput, descriptor.ID(), "Retrying in %d second%s", delay, (map[bool]string{true: "s"})[delay != 1])
                    select {
                    case <-ticker.C:
                        delay--
                        if delay == 0 {
                            ticker.Stop()
                            break selectLoop
                        }
                    case <-d.transfer.context().Done():
                        ticker.Stop()
                        d.err = errors.New("download cancelled during retry delay")
                        return
                    }
                }
            }

            close(inactive)

            if parentDownload != nil {
                select {
                case <-d.transfer.context().Done():
                    d.err = errors.New("layer registration cancelled")
                    downloadReader.Close()
                    return
                case <-parentDownload.done():
                }

                l, err := parentDownload.result()
                if err != nil {
                    d.err = err
                    downloadReader.Close()
                    return
                }
                parentLayer = l.ChainID()
            }

            reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(d.transfer.context(), downloadReader), progressOutput, size, descriptor.ID(), "Extracting")
            defer reader.Close()

            inflatedLayerData, err := archive.DecompressStream(reader)
            if err != nil {
                d.err = fmt.Errorf("could not get decompression stream: %v", err)
                return
            }
            defer inflatedLayerData.Close()

            var src distribution.Descriptor
            if fs, ok := descriptor.(distribution.Describable); ok {
                src = fs.Descriptor()
            }
            if ds, ok := d.layerStore.(layer.DescribableStore); ok {
                d.layer, err = ds.RegisterWithDescriptor(inflatedLayerData, parentLayer, src)
            } else {
                d.layer, err = d.layerStore.Register(inflatedLayerData, parentLayer)
            }
            if err != nil {
                select {
                case <-d.transfer.context().Done():
                    d.err = errors.New("layer registration cancelled")
                default:
                    d.err = fmt.Errorf("failed to register layer: %v", err)
                }
                return
            }

            progress.Update(progressOutput, descriptor.ID(), "Pull complete")

            if withRegistered, ok := descriptor.(DigestRegisterer); ok {
                withRegistered.Registered(d.layer.DiffID())
            }

            // Doesn't actually need to be its own goroutine, but
            // done like this so we can defer close(c).
            go func() {
                <-d.transfer.released()
                if d.layer != nil {
                    layer.ReleaseAndLog(d.layerStore, d.layer)
                }
            }()
        }()

        return d
    }
}

// makeDownloadFuncFromDownload returns a function that performs the layer
// registration when the layer data is coming from an existing download. It
// waits for sourceDownload and parentDownload to complete, and then
// reregisters the data from sourceDownload's top layer on top of
// parentDownload. This function does not log progress output because it would
// interfere with the progress reporting for sourceDownload, which has the same
// Key.
func (ldm *LayerDownloadManager) makeDownloadFuncFromDownload(descriptor DownloadDescriptor, sourceDownload *downloadTransfer, parentDownload *downloadTransfer) doFunc {
    return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) transfer {
        d := &downloadTransfer{
            transfer:   newTransfer(),
            layerStore: ldm.layerStore,
        }

        go func() {
            defer func() {
                close(progressChan)
            }()

            <-start

            close(inactive)

            select {
            case <-d.transfer.context().Done():
                d.err = errors.New("layer registration cancelled")
                return
            case <-parentDownload.done():
            }

            l, err := parentDownload.result()
            if err != nil {
                d.err = err
                return
            }
            parentLayer := l.ChainID()

            // sourceDownload should have already finished if
            // parentDownload finished, but wait for it explicitly
            // to be sure.
            select {
            case <-d.transfer.context().Done():
                d.err = errors.New("layer registration cancelled")
                return
            case <-sourceDownload.done():
            }

            l, err = sourceDownload.result()
            if err != nil {
                d.err = err
                return
            }

            layerReader, err := l.TarStream()
            if err != nil {
                d.err = err
                return
            }
            defer layerReader.Close()

            var src distribution.Descriptor
            if fs, ok := l.(distribution.Describable); ok {
                src = fs.Descriptor()
            }
            if ds, ok := d.layerStore.(layer.DescribableStore); ok {
                d.layer, err = ds.RegisterWithDescriptor(layerReader, parentLayer, src)
            } else {
                d.layer, err = d.layerStore.Register(layerReader, parentLayer)
            }
            if err != nil {
                d.err = fmt.Errorf("failed to register layer: %v", err)
                return
            }

            if withRegistered, ok := descriptor.(DigestRegisterer); ok {
                withRegistered.Registered(d.layer.DiffID())
            }

            // Doesn't actually need to be its own goroutine, but
            // done like this so we can defer close(c).
            go func() {
                <-d.transfer.released()
                if d.layer != nil {
                    layer.ReleaseAndLog(d.layerStore, d.layer)
                }
            }()
        }()

        return d
    }
}