dotcloud/docker

View on GitHub
distribution/xfer/transfer.go

Summary

Maintainability
A
1 hr
Test Coverage
package xfer // import "github.com/docker/docker/distribution/xfer"

import (
    "context"
    "runtime"
    "sync"

    "github.com/docker/docker/pkg/progress"
    "github.com/pkg/errors"
)

// DoNotRetry is an error wrapper indicating that the error cannot be resolved
// with a retry.
type DoNotRetry struct {
    Err error
}

// Error returns the stringified representation of the encapsulated error.
func (e DoNotRetry) Error() string {
    return e.Err.Error()
}

// IsDoNotRetryError returns true if the error is caused by DoNotRetry error,
// and the transfer should not be retried.
func IsDoNotRetryError(err error) bool {
    var dnr DoNotRetry
    return errors.As(err, &dnr)
}

// watcher is returned by Watch and can be passed to Release to stop watching.
type watcher struct {
    // signalChan is used to signal to the watcher goroutine that
    // new progress information is available, or that the transfer
    // has finished.
    signalChan chan struct{}
    // releaseChan signals to the watcher goroutine that the watcher
    // should be detached.
    releaseChan chan struct{}
    // running remains open as long as the watcher is watching the
    // transfer. It gets closed if the transfer finishes or the
    // watcher is detached.
    running chan struct{}
}

// transfer represents an in-progress transfer.
type transfer interface {
    watch(progressOutput progress.Output) *watcher
    release(*watcher)
    context() context.Context
    close()
    done() <-chan struct{}
    released() <-chan struct{}
    broadcast(mainProgressChan <-chan progress.Progress)
}

type xfer struct {
    mu sync.Mutex

    ctx    context.Context
    cancel context.CancelFunc

    // watchers keeps track of the goroutines monitoring progress output,
    // indexed by the channels that release them.
    watchers map[chan struct{}]*watcher

    // lastProgress is the most recently received progress event.
    lastProgress progress.Progress
    // hasLastProgress is true when lastProgress has been set.
    hasLastProgress bool

    // running remains open as long as the transfer is in progress.
    running chan struct{}
    // releasedChan stays open until all watchers release the transfer and
    // the transfer is no longer tracked by the transferManager.
    releasedChan chan struct{}

    // broadcastDone is true if the main progress channel has closed.
    broadcastDone bool
    // closed is true if Close has been called
    closed bool
    // broadcastSyncChan allows watchers to "ping" the broadcasting
    // goroutine to wait for it for deplete its input channel. This ensures
    // a detaching watcher won't miss an event that was sent before it
    // started detaching.
    broadcastSyncChan chan struct{}
}

// newTransfer creates a new transfer.
func newTransfer() transfer {
    t := &xfer{
        watchers:          make(map[chan struct{}]*watcher),
        running:           make(chan struct{}),
        releasedChan:      make(chan struct{}),
        broadcastSyncChan: make(chan struct{}),
    }

    // This uses context.Background instead of a caller-supplied context
    // so that a transfer won't be cancelled automatically if the client
    // which requested it is ^C'd (there could be other viewers).
    t.ctx, t.cancel = context.WithCancel(context.Background())

    return t
}

// Broadcast copies the progress and error output to all viewers.
func (t *xfer) broadcast(mainProgressChan <-chan progress.Progress) {
    for {
        var (
            p  progress.Progress
            ok bool
        )
        select {
        case p, ok = <-mainProgressChan:
        default:
            // We've depleted the channel, so now we can handle
            // reads on broadcastSyncChan to let detaching watchers
            // know we're caught up.
            select {
            case <-t.broadcastSyncChan:
                continue
            case p, ok = <-mainProgressChan:
            }
        }

        t.mu.Lock()
        if ok {
            t.lastProgress = p
            t.hasLastProgress = true
            for _, w := range t.watchers {
                select {
                case w.signalChan <- struct{}{}:
                default:
                }
            }
        } else {
            t.broadcastDone = true
        }
        t.mu.Unlock()
        if !ok {
            close(t.running)
            return
        }
    }
}

// Watch adds a watcher to the transfer. The supplied channel gets progress
// updates and is closed when the transfer finishes.
func (t *xfer) watch(progressOutput progress.Output) *watcher {
    t.mu.Lock()
    defer t.mu.Unlock()

    w := &watcher{
        releaseChan: make(chan struct{}),
        signalChan:  make(chan struct{}),
        running:     make(chan struct{}),
    }

    t.watchers[w.releaseChan] = w

    if t.broadcastDone {
        close(w.running)
        return w
    }

    go func() {
        defer func() {
            close(w.running)
        }()
        var (
            done           bool
            lastWritten    progress.Progress
            hasLastWritten bool
        )
        for {
            t.mu.Lock()
            hasLastProgress := t.hasLastProgress
            lastProgress := t.lastProgress
            t.mu.Unlock()

            // Make sure we don't write the last progress item
            // twice.
            if hasLastProgress && (!done || !hasLastWritten || lastProgress != lastWritten) {
                progressOutput.WriteProgress(lastProgress)
                lastWritten = lastProgress
                hasLastWritten = true
            }

            if done {
                return
            }

            select {
            case <-w.signalChan:
            case <-w.releaseChan:
                done = true
                // Since the watcher is going to detach, make
                // sure the broadcaster is caught up so we
                // don't miss anything.
                select {
                case t.broadcastSyncChan <- struct{}{}:
                case <-t.running:
                }
            case <-t.running:
                done = true
            }
        }
    }()

    return w
}

// Release is the inverse of Watch; indicating that the watcher no longer wants
// to be notified about the progress of the transfer. All calls to Watch must
// be paired with later calls to Release so that the lifecycle of the transfer
// is properly managed.
func (t *xfer) release(watcher *watcher) {
    t.mu.Lock()
    delete(t.watchers, watcher.releaseChan)

    if len(t.watchers) == 0 {
        if t.closed {
            // released may have been closed already if all
            // watchers were released, then another one was added
            // while waiting for a previous watcher goroutine to
            // finish.
            select {
            case <-t.releasedChan:
            default:
                close(t.releasedChan)
            }
        } else {
            t.cancel()
        }
    }
    t.mu.Unlock()

    close(watcher.releaseChan)
    // Block until the watcher goroutine completes
    <-watcher.running
}

// Done returns a channel which is closed if the transfer completes or is
// cancelled. Note that having 0 watchers causes a transfer to be cancelled.
func (t *xfer) done() <-chan struct{} {
    // Note that this doesn't return t.ctx.Done() because that channel will
    // be closed the moment Cancel is called, and we need to return a
    // channel that blocks until a cancellation is actually acknowledged by
    // the transfer function.
    return t.running
}

// Released returns a channel which is closed once all watchers release the
// transfer AND the transfer is no longer tracked by the transferManager.
func (t *xfer) released() <-chan struct{} {
    return t.releasedChan
}

// Context returns the context associated with the transfer.
func (t *xfer) context() context.Context {
    return t.ctx
}

// Close is called by the transferManager when the transfer is no longer
// being tracked.
func (t *xfer) close() {
    t.mu.Lock()
    t.closed = true
    if len(t.watchers) == 0 {
        close(t.releasedChan)
    }
    t.mu.Unlock()
}

// doFunc is a function called by the transferManager to actually perform
// a transfer. It should be non-blocking. It should wait until the start channel
// is closed before transferring any data. If the function closes inactive, that
// signals to the transferManager that the job is no longer actively moving
// data - for example, it may be waiting for a dependent transfer to finish.
// This prevents it from taking up a slot.
type doFunc func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) transfer

// transferManager is used by LayerDownloadManager and LayerUploadManager to
// schedule and deduplicate transfers. It is up to the transferManager
// to make the scheduling and concurrency decisions.
type transferManager struct {
    mu sync.Mutex

    concurrencyLimit int
    activeTransfers  int
    transfers        map[string]transfer
    waitingTransfers []chan struct{}
}

// newTransferManager returns a new transferManager.
func newTransferManager(concurrencyLimit int) *transferManager {
    return &transferManager{
        concurrencyLimit: concurrencyLimit,
        transfers:        make(map[string]transfer),
    }
}

// setConcurrency sets the concurrencyLimit
func (tm *transferManager) setConcurrency(concurrency int) {
    tm.mu.Lock()
    tm.concurrencyLimit = concurrency
    tm.mu.Unlock()
}

// transfer checks if a transfer matching the given key is in progress. If not,
// it starts one by calling xferFunc. The caller supplies a channel which
// receives progress output from the transfer.
func (tm *transferManager) transfer(key string, xferFunc doFunc, progressOutput progress.Output) (transfer, *watcher) {
    tm.mu.Lock()
    defer tm.mu.Unlock()

    for {
        xfer, present := tm.transfers[key]
        if !present {
            break
        }
        // transfer is already in progress.
        watcher := xfer.watch(progressOutput)

        select {
        case <-xfer.context().Done():
            // We don't want to watch a transfer that has been cancelled.
            // Wait for it to be removed from the map and try again.
            xfer.release(watcher)
            tm.mu.Unlock()
            // The goroutine that removes this transfer from the
            // map is also waiting for xfer.Done(), so yield to it.
            // This could be avoided by adding a Closed method
            // to transfer to allow explicitly waiting for it to be
            // removed the map, but forcing a scheduling round in
            // this very rare case seems better than bloating the
            // interface definition.
            runtime.Gosched()
            <-xfer.done()
            tm.mu.Lock()
        default:
            return xfer, watcher
        }
    }

    start := make(chan struct{})
    inactive := make(chan struct{})

    if tm.concurrencyLimit == 0 || tm.activeTransfers < tm.concurrencyLimit {
        close(start)
        tm.activeTransfers++
    } else {
        tm.waitingTransfers = append(tm.waitingTransfers, start)
    }

    mainProgressChan := make(chan progress.Progress)
    xfer := xferFunc(mainProgressChan, start, inactive)
    watcher := xfer.watch(progressOutput)
    go xfer.broadcast(mainProgressChan)
    tm.transfers[key] = xfer

    // When the transfer is finished, remove from the map.
    go func() {
        for {
            select {
            case <-inactive:
                tm.mu.Lock()
                tm.inactivate(start)
                tm.mu.Unlock()
                inactive = nil
            case <-xfer.done():
                tm.mu.Lock()
                if inactive != nil {
                    tm.inactivate(start)
                }
                delete(tm.transfers, key)
                tm.mu.Unlock()
                xfer.close()
                return
            }
        }
    }()

    return xfer, watcher
}

func (tm *transferManager) inactivate(start chan struct{}) {
    // If the transfer was started, remove it from the activeTransfers
    // count.
    select {
    case <-start:
        // Start next transfer if any are waiting
        if len(tm.waitingTransfers) != 0 {
            close(tm.waitingTransfers[0])
            tm.waitingTransfers = tm.waitingTransfers[1:]
        } else {
            tm.activeTransfers--
        }
    default:
    }
}