docker/docker

View on GitHub
pkg/ioutils/bytespipe.go

Summary

Maintainability
A
25 mins
Test Coverage
package ioutils // import "github.com/docker/docker/pkg/ioutils"

import (
    "errors"
    "io"
    "sync"
)

// maxCap is the highest capacity to use in byte slices that buffer data.
const maxCap = 1e6

// minCap is the lowest capacity to use in byte slices that buffer data
const minCap = 64

// blockThreshold is the minimum number of bytes in the buffer which will cause
// a write to BytesPipe to block when allocating a new slice.
const blockThreshold = 1e6

var (
    // ErrClosed is returned when Write is called on a closed BytesPipe.
    ErrClosed = errors.New("write to closed BytesPipe")

    bufPools     = make(map[int]*sync.Pool)
    bufPoolsLock sync.Mutex
)

// BytesPipe is io.ReadWriteCloser which works similarly to pipe(queue).
// All written data may be read at most once. Also, BytesPipe allocates
// and releases new byte slices to adjust to current needs, so the buffer
// won't be overgrown after peak loads.
type BytesPipe struct {
    mu        sync.Mutex
    wait      *sync.Cond
    buf       []*fixedBuffer
    bufLen    int
    closeErr  error // error to return from next Read. set to nil if not closed.
    readBlock bool  // check read BytesPipe is Wait() or not
}

// NewBytesPipe creates new BytesPipe, initialized by specified slice.
// If buf is nil, then it will be initialized with slice which cap is 64.
// buf will be adjusted in a way that len(buf) == 0, cap(buf) == cap(buf).
func NewBytesPipe() *BytesPipe {
    bp := &BytesPipe{}
    bp.buf = append(bp.buf, getBuffer(minCap))
    bp.wait = sync.NewCond(&bp.mu)
    return bp
}

// Write writes p to BytesPipe.
// It can allocate new []byte slices in a process of writing.
func (bp *BytesPipe) Write(p []byte) (int, error) {
    bp.mu.Lock()
    defer bp.mu.Unlock()

    written := 0
loop0:
    for {
        if bp.closeErr != nil {
            return written, ErrClosed
        }

        if len(bp.buf) == 0 {
            bp.buf = append(bp.buf, getBuffer(64))
        }
        // get the last buffer
        b := bp.buf[len(bp.buf)-1]

        n, err := b.Write(p)
        written += n
        bp.bufLen += n

        // errBufferFull is an error we expect to get if the buffer is full
        if err != nil && err != errBufferFull {
            bp.wait.Broadcast()
            return written, err
        }

        // if there was enough room to write all then break
        if len(p) == n {
            break
        }

        // more data: write to the next slice
        p = p[n:]

        // make sure the buffer doesn't grow too big from this write
        for bp.bufLen >= blockThreshold {
            if bp.readBlock {
                bp.wait.Broadcast()
            }
            bp.wait.Wait()
            if bp.closeErr != nil {
                continue loop0
            }
        }

        // add new byte slice to the buffers slice and continue writing
        nextCap := b.Cap() * 2
        if nextCap > maxCap {
            nextCap = maxCap
        }
        bp.buf = append(bp.buf, getBuffer(nextCap))
    }
    bp.wait.Broadcast()
    return written, nil
}

// CloseWithError causes further reads from a BytesPipe to return immediately.
func (bp *BytesPipe) CloseWithError(err error) error {
    bp.mu.Lock()
    if err != nil {
        bp.closeErr = err
    } else {
        bp.closeErr = io.EOF
    }
    bp.wait.Broadcast()
    bp.mu.Unlock()
    return nil
}

// Close causes further reads from a BytesPipe to return immediately.
func (bp *BytesPipe) Close() error {
    return bp.CloseWithError(nil)
}

// Read reads bytes from BytesPipe.
// Data could be read only once.
func (bp *BytesPipe) Read(p []byte) (n int, err error) {
    bp.mu.Lock()
    defer bp.mu.Unlock()
    if bp.bufLen == 0 {
        if bp.closeErr != nil {
            return 0, bp.closeErr
        }
        bp.readBlock = true
        bp.wait.Wait()
        bp.readBlock = false
        if bp.bufLen == 0 && bp.closeErr != nil {
            return 0, bp.closeErr
        }
    }

    for bp.bufLen > 0 {
        b := bp.buf[0]
        read, _ := b.Read(p) // ignore error since fixedBuffer doesn't really return an error
        n += read
        bp.bufLen -= read

        if b.Len() == 0 {
            // it's empty so return it to the pool and move to the next one
            returnBuffer(b)
            bp.buf[0] = nil
            bp.buf = bp.buf[1:]
        }

        if len(p) == read {
            break
        }

        p = p[read:]
    }

    bp.wait.Broadcast()
    return
}

func returnBuffer(b *fixedBuffer) {
    b.Reset()
    bufPoolsLock.Lock()
    pool := bufPools[b.Cap()]
    bufPoolsLock.Unlock()
    if pool != nil {
        pool.Put(b)
    }
}

func getBuffer(size int) *fixedBuffer {
    bufPoolsLock.Lock()
    pool, ok := bufPools[size]
    if !ok {
        pool = &sync.Pool{New: func() interface{} { return &fixedBuffer{buf: make([]byte, 0, size)} }}
        bufPools[size] = pool
    }
    bufPoolsLock.Unlock()
    return pool.Get().(*fixedBuffer)
}