Browse code

Decouple logfile from tailfile.

This makes it so consumers of `LogFile` should pass in how to get an
io.Reader to the requested number of lines to tail.

This is also much more efficient when tailing a large number of lines.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>

Brian Goff authored on 2018/04/06 01:39:28
Showing 6 changed files
... ...
@@ -110,7 +110,7 @@ func New(info logger.Info) (logger.Logger, error) {
110 110
 		return b, nil
111 111
 	}
112 112
 
113
-	writer, err := loggerutils.NewLogFile(info.LogPath, capval, maxFiles, compress, marshalFunc, decodeFunc, 0640)
113
+	writer, err := loggerutils.NewLogFile(info.LogPath, capval, maxFiles, compress, marshalFunc, decodeFunc, 0640, getTailReader)
114 114
 	if err != nil {
115 115
 		return nil, err
116 116
 	}
... ...
@@ -4,6 +4,7 @@ import (
4 4
 	"bytes"
5 5
 	"compress/gzip"
6 6
 	"encoding/json"
7
+	"fmt"
7 8
 	"io/ioutil"
8 9
 	"os"
9 10
 	"path/filepath"
... ...
@@ -107,7 +108,10 @@ func BenchmarkJSONFileLoggerLog(b *testing.B) {
107 107
 		ContainerID: "a7317399f3f857173c6179d44823594f8294678dea9999662e5c625b5a1c7657",
108 108
 		LogPath:     tmp.Join("container.log"),
109 109
 		Config: map[string]string{
110
-			"labels": "first,second",
110
+			"labels":   "first,second",
111
+			"max-file": "10",
112
+			"compress": "true",
113
+			"max-size": "20m",
111 114
 		},
112 115
 		ContainerLabels: map[string]string{
113 116
 			"first":  "label_value",
... ...
@@ -117,21 +121,34 @@ func BenchmarkJSONFileLoggerLog(b *testing.B) {
117 117
 	assert.NilError(b, err)
118 118
 	defer jsonlogger.Close()
119 119
 
120
-	msg := &logger.Message{
121
-		Line:      []byte("Line that thinks that it is log line from docker\n"),
122
-		Source:    "stderr",
123
-		Timestamp: time.Now().UTC(),
124
-	}
125
-
126
-	buf := bytes.NewBuffer(nil)
127
-	assert.NilError(b, marshalMessage(msg, nil, buf))
128
-	b.SetBytes(int64(buf.Len()))
120
+	t := time.Now().UTC()
121
+	for _, data := range [][]byte{
122
+		[]byte(""),
123
+		[]byte("a short string"),
124
+		bytes.Repeat([]byte("a long string"), 100),
125
+		bytes.Repeat([]byte("a really long string"), 10000),
126
+	} {
127
+		b.Run(fmt.Sprintf("%d", len(data)), func(b *testing.B) {
128
+			testMsg := &logger.Message{
129
+				Line:      data,
130
+				Source:    "stderr",
131
+				Timestamp: t,
132
+			}
129 133
 
130
-	b.ResetTimer()
131
-	for i := 0; i < b.N; i++ {
132
-		if err := jsonlogger.Log(msg); err != nil {
133
-			b.Fatal(err)
134
-		}
134
+			buf := bytes.NewBuffer(nil)
135
+			assert.NilError(b, marshalMessage(testMsg, nil, buf))
136
+			b.SetBytes(int64(buf.Len()))
137
+			b.ResetTimer()
138
+			for i := 0; i < b.N; i++ {
139
+				msg := logger.NewMessage()
140
+				msg.Line = testMsg.Line
141
+				msg.Timestamp = testMsg.Timestamp
142
+				msg.Source = testMsg.Source
143
+				if err := jsonlogger.Log(msg); err != nil {
144
+					b.Fatal(err)
145
+				}
146
+			}
147
+		})
135 148
 	}
136 149
 }
137 150
 
... ...
@@ -1,12 +1,16 @@
1 1
 package jsonfilelog // import "github.com/docker/docker/daemon/logger/jsonfilelog"
2 2
 
3 3
 import (
4
+	"context"
4 5
 	"encoding/json"
5 6
 	"io"
6 7
 
7 8
 	"github.com/docker/docker/api/types/backend"
8 9
 	"github.com/docker/docker/daemon/logger"
9 10
 	"github.com/docker/docker/daemon/logger/jsonfilelog/jsonlog"
11
+	"github.com/docker/docker/daemon/logger/loggerutils"
12
+	"github.com/docker/docker/pkg/tailfile"
13
+	"github.com/sirupsen/logrus"
10 14
 )
11 15
 
12 16
 const maxJSONDecodeRetry = 20000
... ...
@@ -63,14 +67,14 @@ func decodeFunc(rdr io.Reader) func() (*logger.Message, error) {
63 63
 	return func() (msg *logger.Message, err error) {
64 64
 		for retries := 0; retries < maxJSONDecodeRetry; retries++ {
65 65
 			msg, err = decodeLogLine(dec, l)
66
-			if err == nil {
66
+			if err == nil || err == io.EOF {
67 67
 				break
68 68
 			}
69 69
 
70
+			logrus.WithError(err).WithField("retries", retries).Warn("got error while decoding json")
70 71
 			// try again, could be due to a an incomplete json object as we read
71 72
 			if _, ok := err.(*json.SyntaxError); ok {
72 73
 				dec = json.NewDecoder(rdr)
73
-				retries++
74 74
 				continue
75 75
 			}
76 76
 
... ...
@@ -81,9 +85,13 @@ func decodeFunc(rdr io.Reader) func() (*logger.Message, error) {
81 81
 			if err == io.ErrUnexpectedEOF {
82 82
 				reader := io.MultiReader(dec.Buffered(), rdr)
83 83
 				dec = json.NewDecoder(reader)
84
-				retries++
84
+				continue
85 85
 			}
86 86
 		}
87 87
 		return msg, err
88 88
 	}
89 89
 }
90
+
91
+func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (io.Reader, int, error) {
92
+	return tailfile.NewTailReader(ctx, r, req)
93
+}
... ...
@@ -2,6 +2,7 @@ package jsonfilelog // import "github.com/docker/docker/daemon/logger/jsonfilelo
2 2
 
3 3
 import (
4 4
 	"bytes"
5
+	"io"
5 6
 	"testing"
6 7
 	"time"
7 8
 
... ...
@@ -62,3 +63,32 @@ func BenchmarkJSONFileLoggerReadLogs(b *testing.B) {
62 62
 		}
63 63
 	}
64 64
 }
65
+
66
+func TestEncodeDecode(t *testing.T) {
67
+	t.Parallel()
68
+
69
+	m1 := &logger.Message{Line: []byte("hello 1"), Timestamp: time.Now(), Source: "stdout"}
70
+	m2 := &logger.Message{Line: []byte("hello 2"), Timestamp: time.Now(), Source: "stdout"}
71
+	m3 := &logger.Message{Line: []byte("hello 3"), Timestamp: time.Now(), Source: "stdout"}
72
+
73
+	buf := bytes.NewBuffer(nil)
74
+	assert.Assert(t, marshalMessage(m1, nil, buf))
75
+	assert.Assert(t, marshalMessage(m2, nil, buf))
76
+	assert.Assert(t, marshalMessage(m3, nil, buf))
77
+
78
+	decode := decodeFunc(buf)
79
+	msg, err := decode()
80
+	assert.Assert(t, err)
81
+	assert.Assert(t, string(msg.Line) == "hello 1\n", string(msg.Line))
82
+
83
+	msg, err = decode()
84
+	assert.Assert(t, err)
85
+	assert.Assert(t, string(msg.Line) == "hello 2\n")
86
+
87
+	msg, err = decode()
88
+	assert.Assert(t, err)
89
+	assert.Assert(t, string(msg.Line) == "hello 3\n")
90
+
91
+	_, err = decode()
92
+	assert.Assert(t, err == io.EOF)
93
+}
... ...
@@ -1,7 +1,6 @@
1 1
 package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils"
2 2
 
3 3
 import (
4
-	"bytes"
5 4
 	"compress/gzip"
6 5
 	"context"
7 6
 	"encoding/json"
... ...
@@ -14,11 +13,9 @@ import (
14 14
 	"time"
15 15
 
16 16
 	"github.com/docker/docker/daemon/logger"
17
-	"github.com/docker/docker/daemon/logger/loggerutils/multireader"
18 17
 	"github.com/docker/docker/pkg/filenotify"
19 18
 	"github.com/docker/docker/pkg/pools"
20 19
 	"github.com/docker/docker/pkg/pubsub"
21
-	"github.com/docker/docker/pkg/tailfile"
22 20
 	"github.com/fsnotify/fsnotify"
23 21
 	"github.com/pkg/errors"
24 22
 	"github.com/sirupsen/logrus"
... ...
@@ -92,13 +89,27 @@ type LogFile struct {
92 92
 	notifyRotate    *pubsub.Publisher
93 93
 	marshal         logger.MarshalFunc
94 94
 	createDecoder   makeDecoderFunc
95
+	getTailReader   GetTailReaderFunc
95 96
 	perms           os.FileMode
96 97
 }
97 98
 
98 99
 type makeDecoderFunc func(rdr io.Reader) func() (*logger.Message, error)
99 100
 
101
+// SizeReaderAt defines a ReaderAt that also reports its size.
102
+// This is used for tailing log files.
103
+type SizeReaderAt interface {
104
+	io.ReaderAt
105
+	Size() int64
106
+}
107
+
108
+// GetTailReaderFunc is used to truncate a reader to only read as much as is required
109
+// in order to get the passed in number of log lines.
110
+// It returns the sectioned reader, the number of lines that the section reader
111
+// contains, and any error that occurs.
112
+type GetTailReaderFunc func(ctx context.Context, f SizeReaderAt, nLogLines int) (rdr io.Reader, nLines int, err error)
113
+
100 114
 // NewLogFile creates new LogFile
101
-func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, marshaller logger.MarshalFunc, decodeFunc makeDecoderFunc, perms os.FileMode) (*LogFile, error) {
115
+func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, marshaller logger.MarshalFunc, decodeFunc makeDecoderFunc, perms os.FileMode, getTailReader GetTailReaderFunc) (*LogFile, error) {
102 116
 	log, err := os.OpenFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, perms)
103 117
 	if err != nil {
104 118
 		return nil, err
... ...
@@ -120,6 +131,7 @@ func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, mar
120 120
 		marshal:         marshaller,
121 121
 		createDecoder:   decodeFunc,
122 122
 		perms:           perms,
123
+		getTailReader:   getTailReader,
123 124
 	}, nil
124 125
 }
125 126
 
... ...
@@ -309,33 +321,45 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher)
309 309
 	}
310 310
 
311 311
 	if config.Tail != 0 {
312
+		// TODO(@cpuguy83): Instead of opening every file, only get the files which
313
+		// are needed to tail.
314
+		// This is especially costly when compression is enabled.
312 315
 		files, err := w.openRotatedFiles(config)
316
+		w.mu.RUnlock()
313 317
 		if err != nil {
314
-			w.mu.RUnlock()
315 318
 			watcher.Err <- err
316 319
 			return
317 320
 		}
318
-		w.mu.RUnlock()
319
-		seekers := make([]io.ReadSeeker, 0, len(files)+1)
320
-		for _, f := range files {
321
-			seekers = append(seekers, f)
322
-		}
323
-		if currentChunk.Size() > 0 {
324
-			seekers = append(seekers, currentChunk)
325
-		}
326
-		if len(seekers) > 0 {
327
-			tailFile(multireader.MultiReadSeeker(seekers...), watcher, w.createDecoder, config)
321
+
322
+		closeFiles := func() {
323
+			for _, f := range files {
324
+				f.Close()
325
+				fileName := f.Name()
326
+				if strings.HasSuffix(fileName, tmpLogfileSuffix) {
327
+					err := w.filesRefCounter.Dereference(fileName)
328
+					if err != nil {
329
+						logrus.Errorf("Failed to dereference the log file %q: %v", fileName, err)
330
+					}
331
+				}
332
+			}
328 333
 		}
334
+
335
+		readers := make([]SizeReaderAt, 0, len(files)+1)
329 336
 		for _, f := range files {
330
-			f.Close()
331
-			fileName := f.Name()
332
-			if strings.HasSuffix(fileName, tmpLogfileSuffix) {
333
-				err := w.filesRefCounter.Dereference(fileName)
334
-				if err != nil {
335
-					logrus.Errorf("Failed to dereference log file %q: %v", fileName, err)
336
-				}
337
+			stat, err := f.Stat()
338
+			if err != nil {
339
+				watcher.Err <- errors.Wrap(err, "error reading size of rotated file")
340
+				closeFiles()
341
+				return
337 342
 			}
343
+			readers = append(readers, io.NewSectionReader(f, 0, stat.Size()))
338 344
 		}
345
+		if currentChunk.Size() > 0 {
346
+			readers = append(readers, currentChunk)
347
+		}
348
+
349
+		tailFiles(readers, watcher, w.createDecoder, w.getTailReader, config)
350
+		closeFiles()
339 351
 
340 352
 		w.mu.RLock()
341 353
 	}
... ...
@@ -454,19 +478,39 @@ func newSectionReader(f *os.File) (*io.SectionReader, error) {
454 454
 	return io.NewSectionReader(f, 0, size), nil
455 455
 }
456 456
 
457
-type decodeFunc func() (*logger.Message, error)
457
+func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, createDecoder makeDecoderFunc, getTailReader GetTailReaderFunc, config logger.ReadConfig) {
458
+	nLines := config.Tail
459
+
460
+	ctx, cancel := context.WithCancel(context.Background())
461
+	defer cancel()
462
+	// TODO(@cpuguy83): we should plumb a context through instead of dealing with `WatchClose()` here.
463
+	go func() {
464
+		select {
465
+		case <-ctx.Done():
466
+		case <-watcher.WatchClose():
467
+			cancel()
468
+		}
469
+	}()
470
+
471
+	readers := make([]io.Reader, 0, len(files))
458 472
 
459
-func tailFile(f io.ReadSeeker, watcher *logger.LogWatcher, createDecoder makeDecoderFunc, config logger.ReadConfig) {
460
-	var rdr io.Reader = f
461 473
 	if config.Tail > 0 {
462
-		ls, err := tailfile.TailFile(f, config.Tail)
463
-		if err != nil {
464
-			watcher.Err <- err
465
-			return
474
+		for i := len(files) - 1; i >= 0 && nLines > 0; i-- {
475
+			tail, n, err := getTailReader(ctx, files[i], nLines)
476
+			if err != nil {
477
+				watcher.Err <- errors.Wrap(err, "error finding file position to start log tailing")
478
+				return
479
+			}
480
+			nLines -= n
481
+			readers = append([]io.Reader{tail}, readers...)
482
+		}
483
+	} else {
484
+		for _, r := range files {
485
+			readers = append(readers, &wrappedReaderAt{ReaderAt: r})
466 486
 		}
467
-		rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
468 487
 	}
469 488
 
489
+	rdr := io.MultiReader(readers...)
470 490
 	decodeLogLine := createDecoder(rdr)
471 491
 	for {
472 492
 		msg, err := decodeLogLine()
... ...
@@ -483,7 +527,7 @@ func tailFile(f io.ReadSeeker, watcher *logger.LogWatcher, createDecoder makeDec
483 483
 			return
484 484
 		}
485 485
 		select {
486
-		case <-watcher.WatchClose():
486
+		case <-ctx.Done():
487 487
 			return
488 488
 		case watcher.Msg <- msg:
489 489
 		}
... ...
@@ -664,3 +708,14 @@ func watchFile(name string) (filenotify.FileWatcher, error) {
664 664
 	}
665 665
 	return fileWatcher, nil
666 666
 }
667
+
668
+type wrappedReaderAt struct {
669
+	io.ReaderAt
670
+	pos int64
671
+}
672
+
673
+func (r *wrappedReaderAt) Read(p []byte) (int, error) {
674
+	n, err := r.ReaderAt.ReadAt(p, r.pos)
675
+	r.pos += int64(n)
676
+	return n, err
677
+}
667 678
new file mode 100644
... ...
@@ -0,0 +1,76 @@
0
+package loggerutils
1
+
2
+import (
3
+	"bufio"
4
+	"context"
5
+	"io"
6
+	"strings"
7
+	"testing"
8
+	"time"
9
+
10
+	"github.com/docker/docker/daemon/logger"
11
+	"github.com/docker/docker/pkg/tailfile"
12
+	"gotest.tools/assert"
13
+)
14
+
15
+func TestTailFiles(t *testing.T) {
16
+	s1 := strings.NewReader("Hello.\nMy name is Inigo Montoya.\n")
17
+	s2 := strings.NewReader("I'm serious.\nDon't call me Shirley!\n")
18
+	s3 := strings.NewReader("Roads?\nWhere we're going we don't need roads.\n")
19
+
20
+	files := []SizeReaderAt{s1, s2, s3}
21
+	watcher := logger.NewLogWatcher()
22
+	createDecoder := func(r io.Reader) func() (*logger.Message, error) {
23
+		scanner := bufio.NewScanner(r)
24
+		return func() (*logger.Message, error) {
25
+			if !scanner.Scan() {
26
+				return nil, scanner.Err()
27
+			}
28
+			// some comment
29
+			return &logger.Message{Line: scanner.Bytes(), Timestamp: time.Now()}, nil
30
+		}
31
+	}
32
+
33
+	tailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
34
+		return tailfile.NewTailReader(ctx, r, lines)
35
+	}
36
+
37
+	for desc, config := range map[string]logger.ReadConfig{} {
38
+		t.Run(desc, func(t *testing.T) {
39
+			started := make(chan struct{})
40
+			go func() {
41
+				close(started)
42
+				tailFiles(files, watcher, createDecoder, tailReader, config)
43
+			}()
44
+			<-started
45
+		})
46
+	}
47
+
48
+	config := logger.ReadConfig{Tail: 2}
49
+	started := make(chan struct{})
50
+	go func() {
51
+		close(started)
52
+		tailFiles(files, watcher, createDecoder, tailReader, config)
53
+	}()
54
+	<-started
55
+
56
+	select {
57
+	case <-time.After(60 * time.Second):
58
+		t.Fatal("timeout waiting for tail line")
59
+	case err := <-watcher.Err:
60
+		assert.Assert(t, err)
61
+	case msg := <-watcher.Msg:
62
+		assert.Assert(t, msg != nil)
63
+		assert.Assert(t, string(msg.Line) == "Roads?", string(msg.Line))
64
+	}
65
+
66
+	select {
67
+	case <-time.After(60 * time.Second):
68
+		t.Fatal("timeout waiting for tail line")
69
+	case err := <-watcher.Err:
70
+		assert.Assert(t, err)
71
+	case msg := <-watcher.Msg:
72
+		assert.Assert(t, msg != nil)
73
+		assert.Assert(t, string(msg.Line) == "Where we're going we don't need roads.", string(msg.Line))
74
+	}
75
+}