daemon/logger/loggertest/logreader.go
package loggertest // import "github.com/docker/docker/daemon/logger/loggertest"
import (
"context"
"fmt"
"runtime"
"strings"
"sync"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"gotest.tools/v3/assert"
is "gotest.tools/v3/assert/cmp"
"gotest.tools/v3/assert/opt"
"github.com/docker/docker/api/types/backend"
"github.com/docker/docker/daemon/logger"
)
type syncer interface {
// Sync commits the current logs to stable storage such that the most
// recently-logged message can be immediately read back by a LogReader.
Sync() error
}
func syncLogger(t *testing.T, l logger.Logger) {
if sl, ok := l.(syncer); ok {
assert.NilError(t, sl.Sync())
}
}
// Reader tests that a logger.LogReader implementation behaves as it should.
type Reader struct {
// Factory returns a function which constructs loggers for the container
// specified in info. Each call to the returned function must yield a
// distinct logger instance which can read back logs written by earlier
// instances.
Factory func(*testing.T, logger.Info) func(*testing.T) logger.Logger
}
var compareLog cmp.Options = []cmp.Option{
// Not all log drivers can round-trip timestamps at full nanosecond
// precision.
opt.TimeWithThreshold(time.Millisecond),
// The json-log driver does not round-trip PLogMetaData and API users do
// not expect it.
cmpopts.IgnoreFields(logger.Message{}, "PLogMetaData"),
cmp.Transformer("string", func(b []byte) string { return string(b) }),
}
// TestTail tests the behavior of the LogReader's tail implementation.
func (tr Reader) TestTail(t *testing.T) {
t.Run("Live", func(t *testing.T) { tr.testTail(t, true) })
t.Run("LiveEmpty", func(t *testing.T) { tr.testTailEmptyLogs(t, true) })
t.Run("Stopped", func(t *testing.T) { tr.testTail(t, false) })
t.Run("StoppedEmpty", func(t *testing.T) { tr.testTailEmptyLogs(t, false) })
}
func makeTestMessages() []*logger.Message {
return []*logger.Message{
{Source: "stdout", Timestamp: time.Now().Add(-1 * 30 * time.Minute), Line: []byte("a message")},
{Source: "stdout", Timestamp: time.Now().Add(-1 * 20 * time.Minute), Line: []byte("another message"), PLogMetaData: &backend.PartialLogMetaData{ID: "aaaaaaaa", Ordinal: 1, Last: true}},
{Source: "stderr", Timestamp: time.Now().Add(-1 * 15 * time.Minute), Line: []byte("to be..."), PLogMetaData: &backend.PartialLogMetaData{ID: "bbbbbbbb", Ordinal: 1}},
{Source: "stderr", Timestamp: time.Now().Add(-1 * 15 * time.Minute), Line: []byte("continued"), PLogMetaData: &backend.PartialLogMetaData{ID: "bbbbbbbb", Ordinal: 2, Last: true}},
{Source: "stderr", Timestamp: time.Now().Add(-1 * 10 * time.Minute), Line: []byte("a really long message " + strings.Repeat("a", 4096))},
{Source: "stderr", Timestamp: time.Now().Add(-1 * 10 * time.Minute), Line: []byte("just one more message")},
{Source: "stdout", Timestamp: time.Now().Add(-1 * 90 * time.Minute), Line: []byte("someone adjusted the clock")},
}
}
func (tr Reader) testTail(t *testing.T, live bool) {
t.Parallel()
factory := tr.Factory(t, logger.Info{
ContainerID: "tailtest0000",
ContainerName: "logtail",
})
l := factory(t)
if live {
defer func() { assert.NilError(t, l.Close()) }()
}
mm := makeTestMessages()
expected := logMessages(t, l, mm)
if !live {
// Simulate reading from a stopped container.
assert.NilError(t, l.Close())
l = factory(t)
defer func() { assert.NilError(t, l.Close()) }()
}
lr := l.(logger.LogReader)
t.Run("Exact", func(t *testing.T) {
t.Parallel()
lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: len(mm)})
defer lw.ConsumerGone()
assert.DeepEqual(t, readAll(t, lw), expected, compareLog)
})
t.Run("LessThanAvailable", func(t *testing.T) {
t.Parallel()
lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: 2})
defer lw.ConsumerGone()
assert.DeepEqual(t, readAll(t, lw), expected[len(mm)-2:], compareLog)
})
t.Run("MoreThanAvailable", func(t *testing.T) {
t.Parallel()
lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: 100})
defer lw.ConsumerGone()
assert.DeepEqual(t, readAll(t, lw), expected, compareLog)
})
t.Run("All", func(t *testing.T) {
t.Parallel()
lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1})
defer lw.ConsumerGone()
assert.DeepEqual(t, readAll(t, lw), expected, compareLog)
})
t.Run("Since", func(t *testing.T) {
t.Parallel()
lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1, Since: mm[1].Timestamp.Truncate(time.Millisecond)})
defer lw.ConsumerGone()
assert.DeepEqual(t, readAll(t, lw), expected[1:], compareLog)
})
t.Run("MoreThanSince", func(t *testing.T) {
t.Parallel()
lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: len(mm), Since: mm[1].Timestamp.Truncate(time.Millisecond)})
defer lw.ConsumerGone()
assert.DeepEqual(t, readAll(t, lw), expected[1:], compareLog)
})
t.Run("LessThanSince", func(t *testing.T) {
t.Parallel()
lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: len(mm) - 2, Since: mm[1].Timestamp.Truncate(time.Millisecond)})
defer lw.ConsumerGone()
assert.DeepEqual(t, readAll(t, lw), expected[2:], compareLog)
})
t.Run("Until", func(t *testing.T) {
t.Parallel()
lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1, Until: mm[2].Timestamp.Add(-time.Millisecond)})
defer lw.ConsumerGone()
assert.DeepEqual(t, readAll(t, lw), expected[:2], compareLog)
})
t.Run("SinceAndUntil", func(t *testing.T) {
t.Parallel()
lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1, Since: mm[1].Timestamp.Truncate(time.Millisecond), Until: mm[1].Timestamp.Add(time.Millisecond)})
defer lw.ConsumerGone()
assert.DeepEqual(t, readAll(t, lw), expected[1:2], compareLog)
})
}
func (tr Reader) testTailEmptyLogs(t *testing.T, live bool) {
t.Parallel()
factory := tr.Factory(t, logger.Info{
ContainerID: "tailemptytest",
ContainerName: "logtail",
})
l := factory(t)
if !live {
assert.NilError(t, l.Close())
l = factory(t)
}
defer func() { assert.NilError(t, l.Close()) }()
for _, tt := range []struct {
name string
cfg logger.ReadConfig
}{
{name: "Zero", cfg: logger.ReadConfig{}},
{name: "All", cfg: logger.ReadConfig{Tail: -1}},
{name: "Tail", cfg: logger.ReadConfig{Tail: 42}},
{name: "Since", cfg: logger.ReadConfig{Since: time.Unix(1, 0)}},
{name: "Until", cfg: logger.ReadConfig{Until: time.Date(2100, time.January, 1, 1, 1, 1, 0, time.UTC)}},
{name: "SinceAndUntil", cfg: logger.ReadConfig{Since: time.Unix(1, 0), Until: time.Date(2100, time.January, 1, 1, 1, 1, 0, time.UTC)}},
} {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{})
defer lw.ConsumerGone()
assert.DeepEqual(t, readAll(t, lw), ([]*logger.Message)(nil), cmpopts.EquateEmpty())
})
}
}
// TestFollow tests the LogReader's follow implementation.
//
// The LogReader is expected to be able to follow an arbitrary number of
// messages at a high rate with no dropped messages.
func (tr Reader) TestFollow(t *testing.T) {
// Reader sends all logs and closes after logger is closed
// - Starting from empty log (like run)
for i, tail := range []int{-1, 0, 1, 42} {
i, tail := i, tail
t.Run(fmt.Sprintf("FromEmptyLog/Tail=%d", tail), func(t *testing.T) {
t.Parallel()
l := tr.Factory(t, logger.Info{
ContainerID: fmt.Sprintf("followstart%d", i),
ContainerName: fmt.Sprintf("logloglog%d", i),
})(t)
lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Tail: tail, Follow: true})
defer lw.ConsumerGone()
doneReading := make(chan struct{})
var logs []*logger.Message
go func() {
defer close(doneReading)
logs = readAll(t, lw)
}()
mm := makeTestMessages()
expected := logMessages(t, l, mm)
assert.NilError(t, l.Close())
<-doneReading
assert.DeepEqual(t, logs, expected, compareLog)
})
}
t.Run("AttachMidStream", func(t *testing.T) {
t.Parallel()
l := tr.Factory(t, logger.Info{
ContainerID: "followmiddle",
ContainerName: "logloglog",
})(t)
mm := makeTestMessages()
expected := logMessages(t, l, mm[0:1])
lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1, Follow: true})
defer lw.ConsumerGone()
doneReading := make(chan struct{})
var logs []*logger.Message
go func() {
defer close(doneReading)
logs = readAll(t, lw)
}()
expected = append(expected, logMessages(t, l, mm[1:])...)
assert.NilError(t, l.Close())
<-doneReading
assert.DeepEqual(t, logs, expected, compareLog)
})
t.Run("Since", func(t *testing.T) {
t.Parallel()
l := tr.Factory(t, logger.Info{
ContainerID: "followsince0",
ContainerName: "logloglog",
})(t)
mm := makeTestMessages()
lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1, Follow: true, Since: mm[2].Timestamp.Truncate(time.Millisecond)})
defer lw.ConsumerGone()
doneReading := make(chan struct{})
var logs []*logger.Message
go func() {
defer close(doneReading)
logs = readAll(t, lw)
}()
expected := logMessages(t, l, mm)[2:]
assert.NilError(t, l.Close())
<-doneReading
assert.DeepEqual(t, logs, expected, compareLog)
})
t.Run("Until", func(t *testing.T) {
t.Parallel()
l := tr.Factory(t, logger.Info{
ContainerID: "followuntil0",
ContainerName: "logloglog",
})(t)
mm := makeTestMessages()
lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1, Follow: true, Until: mm[2].Timestamp.Add(-time.Millisecond)})
defer lw.ConsumerGone()
doneReading := make(chan struct{})
var logs []*logger.Message
go func() {
defer close(doneReading)
logs = readAll(t, lw)
}()
expected := logMessages(t, l, mm)[:2]
defer assert.NilError(t, l.Close()) // Reading should end before the logger is closed.
<-doneReading
assert.DeepEqual(t, logs, expected, compareLog)
})
t.Run("SinceAndUntil", func(t *testing.T) {
t.Parallel()
l := tr.Factory(t, logger.Info{
ContainerID: "followbounded",
ContainerName: "logloglog",
})(t)
mm := makeTestMessages()
lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1, Follow: true, Since: mm[1].Timestamp.Add(-time.Millisecond), Until: mm[2].Timestamp.Add(-time.Millisecond)})
defer lw.ConsumerGone()
doneReading := make(chan struct{})
var logs []*logger.Message
go func() {
defer close(doneReading)
logs = readAll(t, lw)
}()
expected := logMessages(t, l, mm)[1:2]
defer assert.NilError(t, l.Close()) // Reading should end before the logger is closed.
<-doneReading
assert.DeepEqual(t, logs, expected, compareLog)
})
t.Run("Tail=0", func(t *testing.T) {
t.Parallel()
l := tr.Factory(t, logger.Info{
ContainerID: "followtail00",
ContainerName: "logloglog",
})(t)
mm := makeTestMessages()
logMessages(t, l, mm[0:2])
syncLogger(t, l)
lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Tail: 0, Follow: true})
defer lw.ConsumerGone()
doneReading := make(chan struct{})
var logs []*logger.Message
go func() {
defer close(doneReading)
logs = readAll(t, lw)
}()
expected := logMessages(t, l, mm[2:])
assert.NilError(t, l.Close())
<-doneReading
assert.DeepEqual(t, logs, expected, compareLog)
})
t.Run("Tail>0", func(t *testing.T) {
t.Parallel()
l := tr.Factory(t, logger.Info{
ContainerID: "followtail00",
ContainerName: "logloglog",
})(t)
mm := makeTestMessages()
expected := logMessages(t, l, mm[0:2])[1:]
syncLogger(t, l)
lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Tail: 1, Follow: true})
defer lw.ConsumerGone()
doneReading := make(chan struct{})
var logs []*logger.Message
go func() {
defer close(doneReading)
logs = readAll(t, lw)
}()
expected = append(expected, logMessages(t, l, mm[2:])...)
assert.NilError(t, l.Close())
<-doneReading
assert.DeepEqual(t, logs, expected, compareLog)
})
t.Run("MultipleStarts", func(t *testing.T) {
t.Parallel()
factory := tr.Factory(t, logger.Info{
ContainerID: "startrestart",
ContainerName: "startmeup",
})
mm := makeTestMessages()
l := factory(t)
expected := logMessages(t, l, mm[:3])
assert.NilError(t, l.Close())
l = factory(t)
lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1, Follow: true})
defer lw.ConsumerGone()
doneReading := make(chan struct{})
var logs []*logger.Message
go func() {
defer close(doneReading)
logs = readAll(t, lw)
}()
expected = append(expected, logMessages(t, l, mm[3:])...)
assert.NilError(t, l.Close())
<-doneReading
assert.DeepEqual(t, logs, expected, compareLog)
})
t.Run("Concurrent", tr.TestConcurrent)
}
// TestConcurrent tests the Logger and its LogReader implementation for
// race conditions when logging from multiple goroutines concurrently.
func (tr Reader) TestConcurrent(t *testing.T) {
t.Parallel()
l := tr.Factory(t, logger.Info{
ContainerID: "logconcurrent0",
ContainerName: "logconcurrent123",
})(t)
// Split test messages
stderrMessages := []*logger.Message{}
stdoutMessages := []*logger.Message{}
for _, m := range makeTestMessages() {
if m.Source == "stdout" {
stdoutMessages = append(stdoutMessages, m)
} else if m.Source == "stderr" {
stderrMessages = append(stderrMessages, m)
}
}
// Follow all logs
lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Follow: true, Tail: -1})
defer lw.ConsumerGone()
// Log concurrently from two sources and close log
wg := &sync.WaitGroup{}
logAll := func(msgs []*logger.Message) {
defer wg.Done()
for _, m := range msgs {
assert.Check(t, l.Log(copyLogMessage(m)), "failed to log message %+v", m)
}
}
closed := make(chan struct{})
wg.Add(2)
go logAll(stdoutMessages)
go logAll(stderrMessages)
go func() {
defer close(closed)
defer l.Close()
wg.Wait()
}()
defer func() {
// Make sure log gets closed before we return
// so the temporary dir can be deleted
select {
case <-time.After(10 * time.Second):
t.Fatal("timed out waiting for logger to close")
case <-closed:
}
}()
// Check if the message count, order and content is equal to what was logged
for {
l := readMessage(t, lw)
if l == nil {
break
}
var messages *[]*logger.Message
if l.Source == "stdout" {
messages = &stdoutMessages
} else if l.Source == "stderr" {
messages = &stderrMessages
} else {
t.Fatalf("Corrupted message.Source = %q", l.Source)
}
expectedMsg := transformToExpected((*messages)[0])
assert.DeepEqual(t, *expectedMsg, *l, compareLog)
*messages = (*messages)[1:]
}
assert.Check(t, is.Len(stdoutMessages, 0), "expected stdout messages were not read")
assert.Check(t, is.Len(stderrMessages, 0), "expected stderr messages were not read")
}
// logMessages logs messages to l and returns a slice of messages as would be
// expected to be read back. The message values are not modified and the
// returned slice of messages are deep-copied.
func logMessages(t *testing.T, l logger.Logger, messages []*logger.Message) []*logger.Message {
t.Helper()
var expected []*logger.Message
for _, m := range messages {
// Copy the log message because the underlying log writer resets
// the log message and returns it to a buffer pool.
assert.NilError(t, l.Log(copyLogMessage(m)))
runtime.Gosched()
expect := transformToExpected(m)
expected = append(expected, expect)
}
return expected
}
// Existing API consumers expect a newline to be appended to
// messages other than nonterminal partials as that matches the
// existing behavior of the json-file log driver.
func transformToExpected(m *logger.Message) *logger.Message {
// Copy the log message again so as not to mutate the input.
copy := copyLogMessage(m)
if m.PLogMetaData == nil || m.PLogMetaData.Last {
copy.Line = append(copy.Line, '\n')
}
return copy
}
func copyLogMessage(src *logger.Message) *logger.Message {
dst := logger.NewMessage()
dst.Source = src.Source
dst.Timestamp = src.Timestamp
dst.Attrs = src.Attrs
dst.Err = src.Err
dst.Line = append(dst.Line, src.Line...)
if src.PLogMetaData != nil {
lmd := *src.PLogMetaData
dst.PLogMetaData = &lmd
}
return dst
}
func readMessage(t *testing.T, lw *logger.LogWatcher) *logger.Message {
t.Helper()
timeout := time.NewTimer(5 * time.Second)
defer timeout.Stop()
select {
case <-timeout.C:
t.Error("timed out waiting for message")
return nil
case err, open := <-lw.Err:
t.Errorf("unexpected receive on lw.Err: err=%v, open=%v", err, open)
return nil
case msg, open := <-lw.Msg:
if !open {
select {
case err, open := <-lw.Err:
t.Errorf("unexpected receive on lw.Err with closed lw.Msg: err=%v, open=%v", err, open)
default:
}
return nil
}
assert.Assert(t, msg != nil)
t.Logf("[%v] %s: %s", msg.Timestamp, msg.Source, msg.Line)
return msg
}
}
func readAll(t *testing.T, lw *logger.LogWatcher) []*logger.Message {
t.Helper()
var msgs []*logger.Message
for {
m := readMessage(t, lw)
if m == nil {
return msgs
}
msgs = append(msgs, m)
}
}