dotcloud/docker

View on GitHub
daemon/logger/local/read.go

Summary

Maintainability
A
1 hr
Test Coverage
package local

import (
    "bytes"
    "context"
    "encoding/binary"
    "fmt"
    "io"

    "github.com/docker/docker/api/types/plugins/logdriver"
    "github.com/docker/docker/daemon/logger"
    "github.com/docker/docker/daemon/logger/loggerutils"
    "github.com/docker/docker/errdefs"
    "github.com/pkg/errors"
)

// maxMsgLen is the maximum size of the logger.Message after serialization.
// logger.defaultBufSize caps the size of Line field.
const maxMsgLen int = 1e6 // 1MB.

func (d *driver) ReadLogs(ctx context.Context, config logger.ReadConfig) *logger.LogWatcher {
    return d.logfile.ReadLogs(ctx, config)
}

func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (loggerutils.SizeReaderAt, int, error) {
    size := r.Size()
    if req < 0 {
        return nil, 0, errdefs.InvalidParameter(errors.Errorf("invalid number of lines to tail: %d", req))
    }

    if size < (encodeBinaryLen*2)+1 {
        return bytes.NewReader(nil), 0, nil
    }

    const encodeBinaryLen64 = int64(encodeBinaryLen)
    var found int

    buf := make([]byte, encodeBinaryLen)

    offset := size
    for {
        select {
        case <-ctx.Done():
            return nil, 0, ctx.Err()
        default:
        }

        n, err := r.ReadAt(buf, offset-encodeBinaryLen64)
        if err != nil && err != io.EOF {
            return nil, 0, errors.Wrap(err, "error reading log message footer")
        }

        if n != encodeBinaryLen {
            return nil, 0, errdefs.DataLoss(errors.New("unexpected number of bytes read from log message footer"))
        }

        msgLen := binary.BigEndian.Uint32(buf)

        n, err = r.ReadAt(buf, offset-encodeBinaryLen64-encodeBinaryLen64-int64(msgLen))
        if err != nil && err != io.EOF {
            return nil, 0, errors.Wrap(err, "error reading log message header")
        }

        if n != encodeBinaryLen {
            return nil, 0, errdefs.DataLoss(errors.New("unexpected number of bytes read from log message header"))
        }

        if msgLen != binary.BigEndian.Uint32(buf) {
            return nil, 0, errdefs.DataLoss(errors.New("log message header and footer indicate different message sizes"))
        }

        found++
        offset -= int64(msgLen)
        offset -= encodeBinaryLen64 * 2
        if found == req {
            break
        }
        if offset <= 0 {
            break
        }
    }

    return io.NewSectionReader(r, offset, size), found, nil
}

type decoder struct {
    rdr   io.Reader
    proto *logdriver.LogEntry
    // buf keeps bytes from rdr.
    buf []byte
    // offset is the position in buf.
    // If offset > 0, buf[offset:] has bytes which are read but haven't used.
    offset int
    // nextMsgLen is the length of the next log message.
    // If nextMsgLen = 0, a new value must be read from rdr.
    nextMsgLen int
}

func (d *decoder) readRecord(size int) error {
    var err error
    for i := 0; i < maxDecodeRetry; i++ {
        var n int
        n, err = io.ReadFull(d.rdr, d.buf[d.offset:size])
        d.offset += n
        if err != nil {
            if err != io.ErrUnexpectedEOF {
                return err
            }
            continue
        }
        break
    }
    if err != nil {
        return err
    }
    d.offset = 0
    return nil
}

func (d *decoder) Decode() (*logger.Message, error) {
    if d.proto == nil {
        d.proto = &logdriver.LogEntry{}
    } else {
        resetProto(d.proto)
    }
    if d.buf == nil {
        d.buf = make([]byte, initialBufSize)
    }

    if d.nextMsgLen == 0 {
        msgLen, err := d.decodeSizeHeader()
        if err != nil {
            return nil, err
        }

        if msgLen > maxMsgLen {
            return nil, fmt.Errorf("log message is too large (%d > %d)", msgLen, maxMsgLen)
        }

        if len(d.buf) < msgLen+encodeBinaryLen {
            d.buf = make([]byte, msgLen+encodeBinaryLen)
        } else if msgLen <= initialBufSize {
            d.buf = d.buf[:initialBufSize]
        } else {
            d.buf = d.buf[:msgLen+encodeBinaryLen]
        }

        d.nextMsgLen = msgLen
    }
    return d.decodeLogEntry()
}

func (d *decoder) Reset(rdr io.Reader) {
    if d.rdr == rdr {
        return
    }

    d.rdr = rdr
    if d.proto != nil {
        resetProto(d.proto)
    }
    if d.buf != nil {
        d.buf = d.buf[:initialBufSize]
    }
    d.offset = 0
    d.nextMsgLen = 0
}

func (d *decoder) Close() {
    d.buf = d.buf[:0]
    d.buf = nil
    if d.proto != nil {
        resetProto(d.proto)
    }
    d.rdr = nil
}

func decodeFunc(rdr io.Reader) loggerutils.Decoder {
    return &decoder{rdr: rdr}
}

func (d *decoder) decodeSizeHeader() (int, error) {
    err := d.readRecord(encodeBinaryLen)
    if err != nil {
        return 0, errors.Wrap(err, "could not read a size header")
    }

    msgLen := int(binary.BigEndian.Uint32(d.buf[:encodeBinaryLen]))
    return msgLen, nil
}

func (d *decoder) decodeLogEntry() (*logger.Message, error) {
    msgLen := d.nextMsgLen
    err := d.readRecord(msgLen + encodeBinaryLen)
    if err != nil {
        return nil, errors.Wrapf(err, "could not read a log entry (size=%d+%d)", msgLen, encodeBinaryLen)
    }
    d.nextMsgLen = 0

    if err := d.proto.Unmarshal(d.buf[:msgLen]); err != nil {
        return nil, errors.Wrapf(err, "error unmarshalling log entry (size=%d)", msgLen)
    }

    msg := protoToMessage(d.proto)
    if msg.PLogMetaData == nil || msg.PLogMetaData.Last {
        msg.Line = append(msg.Line, '\n')
    }

    return msg, nil
}