package logger import ( "bytes" "encoding/json" "fmt" "io" "os" "strings" "sync" "testing" "time" ) type TestLoggerJSON struct { *json.Encoder mu sync.Mutex delay time.Duration } func (l *TestLoggerJSON) Log(m *Message) error { if l.delay > 0 { time.Sleep(l.delay) } l.mu.Lock() defer l.mu.Unlock() return l.Encode(m) } func (l *TestLoggerJSON) Close() error { return nil } func (l *TestLoggerJSON) Name() string { return "json" } type TestSizedLoggerJSON struct { *json.Encoder mu sync.Mutex } func (l *TestSizedLoggerJSON) Log(m *Message) error { l.mu.Lock() defer l.mu.Unlock() return l.Encode(m) } func (*TestSizedLoggerJSON) Close() error { return nil } func (*TestSizedLoggerJSON) Name() string { return "sized-json" } func (*TestSizedLoggerJSON) BufSize() int { return 32 * 1024 } func TestCopier(t *testing.T) { stdoutLine := "Line that thinks that it is log line from docker stdout" stderrLine := "Line that thinks that it is log line from docker stderr" stdoutTrailingLine := "stdout trailing line" stderrTrailingLine := "stderr trailing line" var stdout bytes.Buffer var stderr bytes.Buffer for i := 0; i < 30; i++ { if _, err := stdout.WriteString(stdoutLine + "\n"); err != nil { t.Fatal(err) } if _, err := stderr.WriteString(stderrLine + "\n"); err != nil { t.Fatal(err) } } // Test remaining lines without line-endings if _, err := stdout.WriteString(stdoutTrailingLine); err != nil { t.Fatal(err) } if _, err := stderr.WriteString(stderrTrailingLine); err != nil { t.Fatal(err) } var jsonBuf bytes.Buffer jsonLog := &TestLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)} c := NewCopier( map[string]io.Reader{ "stdout": &stdout, "stderr": &stderr, }, jsonLog) c.Run() wait := make(chan struct{}) go func() { c.Wait() close(wait) }() select { case <-time.After(1 * time.Second): t.Fatal("Copier failed to do its work in 1 second") case <-wait: } dec := json.NewDecoder(&jsonBuf) for { var msg Message if err := dec.Decode(&msg); err != nil { if err == io.EOF { break } t.Fatal(err) } if msg.Source != "stdout" && msg.Source != "stderr" { t.Fatalf("Wrong Source: %q, should be %q or %q", msg.Source, "stdout", "stderr") } if msg.Source == "stdout" { if string(msg.Line) != stdoutLine && string(msg.Line) != stdoutTrailingLine { t.Fatalf("Wrong Line: %q, expected %q or %q", msg.Line, stdoutLine, stdoutTrailingLine) } } if msg.Source == "stderr" { if string(msg.Line) != stderrLine && string(msg.Line) != stderrTrailingLine { t.Fatalf("Wrong Line: %q, expected %q or %q", msg.Line, stderrLine, stderrTrailingLine) } } } } // TestCopierLongLines tests long lines without line breaks func TestCopierLongLines(t *testing.T) { // Long lines (should be split at "defaultBufSize") stdoutLongLine := strings.Repeat("a", defaultBufSize) stderrLongLine := strings.Repeat("b", defaultBufSize) stdoutTrailingLine := "stdout trailing line" stderrTrailingLine := "stderr trailing line" var stdout bytes.Buffer var stderr bytes.Buffer for i := 0; i < 3; i++ { if _, err := stdout.WriteString(stdoutLongLine); err != nil { t.Fatal(err) } if _, err := stderr.WriteString(stderrLongLine); err != nil { t.Fatal(err) } } if _, err := stdout.WriteString(stdoutTrailingLine); err != nil { t.Fatal(err) } if _, err := stderr.WriteString(stderrTrailingLine); err != nil { t.Fatal(err) } var jsonBuf bytes.Buffer jsonLog := &TestLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)} c := NewCopier( map[string]io.Reader{ "stdout": &stdout, "stderr": &stderr, }, jsonLog) c.Run() wait := make(chan struct{}) go func() { c.Wait() close(wait) }() select { case <-time.After(1 * time.Second): t.Fatal("Copier failed to do its work in 1 second") case <-wait: } dec := json.NewDecoder(&jsonBuf) for { var msg Message if err := dec.Decode(&msg); err != nil { if err == io.EOF { break } t.Fatal(err) } if msg.Source != "stdout" && msg.Source != "stderr" { t.Fatalf("Wrong Source: %q, should be %q or %q", msg.Source, "stdout", "stderr") } if msg.Source == "stdout" { if string(msg.Line) != stdoutLongLine && string(msg.Line) != stdoutTrailingLine { t.Fatalf("Wrong Line: %q, expected 'stdoutLongLine' or 'stdoutTrailingLine'", msg.Line) } } if msg.Source == "stderr" { if string(msg.Line) != stderrLongLine && string(msg.Line) != stderrTrailingLine { t.Fatalf("Wrong Line: %q, expected 'stderrLongLine' or 'stderrTrailingLine'", msg.Line) } } } } func TestCopierSlow(t *testing.T) { stdoutLine := "Line that thinks that it is log line from docker stdout" var stdout bytes.Buffer for i := 0; i < 30; i++ { if _, err := stdout.WriteString(stdoutLine + "\n"); err != nil { t.Fatal(err) } } var jsonBuf bytes.Buffer //encoder := &encodeCloser{Encoder: json.NewEncoder(&jsonBuf)} jsonLog := &TestLoggerJSON{Encoder: json.NewEncoder(&jsonBuf), delay: 100 * time.Millisecond} c := NewCopier(map[string]io.Reader{"stdout": &stdout}, jsonLog) c.Run() wait := make(chan struct{}) go func() { c.Wait() close(wait) }() <-time.After(150 * time.Millisecond) c.Close() select { case <-time.After(200 * time.Millisecond): t.Fatal("failed to exit in time after the copier is closed") case <-wait: } } func TestCopierWithSized(t *testing.T) { var jsonBuf bytes.Buffer expectedMsgs := 2 sizedLogger := &TestSizedLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)} logbuf := bytes.NewBufferString(strings.Repeat(".", sizedLogger.BufSize()*expectedMsgs)) c := NewCopier(map[string]io.Reader{"stdout": logbuf}, sizedLogger) c.Run() // Wait for Copier to finish writing to the buffered logger. c.Wait() c.Close() recvdMsgs := 0 dec := json.NewDecoder(&jsonBuf) for { var msg Message if err := dec.Decode(&msg); err != nil { if err == io.EOF { break } t.Fatal(err) } if msg.Source != "stdout" { t.Fatalf("Wrong Source: %q, should be %q", msg.Source, "stdout") } if len(msg.Line) != sizedLogger.BufSize() { t.Fatalf("Line was not of expected max length %d, was %d", sizedLogger.BufSize(), len(msg.Line)) } recvdMsgs++ } if recvdMsgs != expectedMsgs { t.Fatalf("expected to receive %d messages, actually received %d", expectedMsgs, recvdMsgs) } } type BenchmarkLoggerDummy struct { } func (l *BenchmarkLoggerDummy) Log(m *Message) error { PutMessage(m); return nil } func (l *BenchmarkLoggerDummy) Close() error { return nil } func (l *BenchmarkLoggerDummy) Name() string { return "dummy" } func BenchmarkCopier64(b *testing.B) { benchmarkCopier(b, 1<<6) } func BenchmarkCopier128(b *testing.B) { benchmarkCopier(b, 1<<7) } func BenchmarkCopier256(b *testing.B) { benchmarkCopier(b, 1<<8) } func BenchmarkCopier512(b *testing.B) { benchmarkCopier(b, 1<<9) } func BenchmarkCopier1K(b *testing.B) { benchmarkCopier(b, 1<<10) } func BenchmarkCopier2K(b *testing.B) { benchmarkCopier(b, 1<<11) } func BenchmarkCopier4K(b *testing.B) { benchmarkCopier(b, 1<<12) } func BenchmarkCopier8K(b *testing.B) { benchmarkCopier(b, 1<<13) } func BenchmarkCopier16K(b *testing.B) { benchmarkCopier(b, 1<<14) } func BenchmarkCopier32K(b *testing.B) { benchmarkCopier(b, 1<<15) } func BenchmarkCopier64K(b *testing.B) { benchmarkCopier(b, 1<<16) } func BenchmarkCopier128K(b *testing.B) { benchmarkCopier(b, 1<<17) } func BenchmarkCopier256K(b *testing.B) { benchmarkCopier(b, 1<<18) } func piped(b *testing.B, iterations int, delay time.Duration, buf []byte) io.Reader { r, w, err := os.Pipe() if err != nil { b.Fatal(err) return nil } go func() { for i := 0; i < iterations; i++ { time.Sleep(delay) if n, err := w.Write(buf); err != nil || n != len(buf) { if err != nil { b.Fatal(err) } b.Fatal(fmt.Errorf("short write")) } } w.Close() }() return r } func benchmarkCopier(b *testing.B, length int) { b.StopTimer() buf := []byte{'A'} for len(buf) < length { buf = append(buf, buf...) } buf = append(buf[:length-1], []byte{'\n'}...) b.StartTimer() for i := 0; i < b.N; i++ { c := NewCopier( map[string]io.Reader{ "buffer": piped(b, 10, time.Nanosecond, buf), }, &BenchmarkLoggerDummy{}) c.Run() c.Wait() c.Close() } }