Browse code

Reduce allocations for logfile reader

Before this change, the log decoder function provided by the log driver
to logfile would not be able to re-use buffers, causing undeeded
allocations and memory bloat for dockerd.

This change introduces an interface that allows the log driver to manage
it's memory usge more effectively.
This only affects json-file and local log drivers.

`json-file` still is not great just because of how the json decoder in the
stdlib works.
`local` is significantly improved.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>

Brian Goff authored on 2020/04/09 04:24:31
Showing 7 changed files
... ...
@@ -21,5 +21,7 @@ func (jl *JSONLog) Reset() {
21 21
 	jl.Log = ""
22 22
 	jl.Stream = ""
23 23
 	jl.Created = time.Time{}
24
-	jl.Attrs = make(map[string]string)
24
+	for k := range jl.Attrs {
25
+		delete(jl.Attrs, k)
26
+	}
25 27
 }
... ...
@@ -60,35 +60,65 @@ func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, erro
60 60
 	return msg, nil
61 61
 }
62 62
 
63
-// decodeFunc is used to create a decoder for the log file reader
64
-func decodeFunc(rdr io.Reader) func() (*logger.Message, error) {
65
-	l := &jsonlog.JSONLog{}
66
-	dec := json.NewDecoder(rdr)
67
-	return func() (msg *logger.Message, err error) {
68
-		for retries := 0; retries < maxJSONDecodeRetry; retries++ {
69
-			msg, err = decodeLogLine(dec, l)
70
-			if err == nil || err == io.EOF {
71
-				break
72
-			}
73
-
74
-			logrus.WithError(err).WithField("retries", retries).Warn("got error while decoding json")
75
-			// try again, could be due to a an incomplete json object as we read
76
-			if _, ok := err.(*json.SyntaxError); ok {
77
-				dec = json.NewDecoder(rdr)
78
-				continue
79
-			}
80
-
81
-			// io.ErrUnexpectedEOF is returned from json.Decoder when there is
82
-			// remaining data in the parser's buffer while an io.EOF occurs.
83
-			// If the json logger writes a partial json log entry to the disk
84
-			// while at the same time the decoder tries to decode it, the race condition happens.
85
-			if err == io.ErrUnexpectedEOF {
86
-				reader := io.MultiReader(dec.Buffered(), rdr)
87
-				dec = json.NewDecoder(reader)
88
-				continue
89
-			}
63
+type decoder struct {
64
+	rdr io.Reader
65
+	dec *json.Decoder
66
+	jl  *jsonlog.JSONLog
67
+}
68
+
69
+func (d *decoder) Reset(rdr io.Reader) {
70
+	d.rdr = rdr
71
+	d.dec = nil
72
+	if d.jl != nil {
73
+		d.jl.Reset()
74
+	}
75
+}
76
+
77
+func (d *decoder) Close() {
78
+	d.dec = nil
79
+	d.rdr = nil
80
+	d.jl = nil
81
+}
82
+
83
+func (d *decoder) Decode() (msg *logger.Message, err error) {
84
+	if d.dec == nil {
85
+		d.dec = json.NewDecoder(d.rdr)
86
+	}
87
+	if d.jl == nil {
88
+		d.jl = &jsonlog.JSONLog{}
89
+	}
90
+	for retries := 0; retries < maxJSONDecodeRetry; retries++ {
91
+		msg, err = decodeLogLine(d.dec, d.jl)
92
+		if err == nil || err == io.EOF {
93
+			break
94
+		}
95
+
96
+		logrus.WithError(err).WithField("retries", retries).Warn("got error while decoding json")
97
+		// try again, could be due to a an incomplete json object as we read
98
+		if _, ok := err.(*json.SyntaxError); ok {
99
+			d.dec = json.NewDecoder(d.rdr)
100
+			continue
101
+		}
102
+
103
+		// io.ErrUnexpectedEOF is returned from json.Decoder when there is
104
+		// remaining data in the parser's buffer while an io.EOF occurs.
105
+		// If the json logger writes a partial json log entry to the disk
106
+		// while at the same time the decoder tries to decode it, the race condition happens.
107
+		if err == io.ErrUnexpectedEOF {
108
+			d.rdr = io.MultiReader(d.dec.Buffered(), d.rdr)
109
+			d.dec = json.NewDecoder(d.rdr)
110
+			continue
90 111
 		}
91
-		return msg, err
112
+	}
113
+	return msg, err
114
+}
115
+
116
+// decodeFunc is used to create a decoder for the log file reader
117
+func decodeFunc(rdr io.Reader) loggerutils.Decoder {
118
+	return &decoder{
119
+		rdr: rdr,
120
+		dec: nil,
121
+		jl:  nil,
92 122
 	}
93 123
 }
94 124
 
... ...
@@ -75,19 +75,21 @@ func TestEncodeDecode(t *testing.T) {
75 75
 	assert.Assert(t, marshalMessage(m2, nil, buf))
76 76
 	assert.Assert(t, marshalMessage(m3, nil, buf))
77 77
 
78
-	decode := decodeFunc(buf)
79
-	msg, err := decode()
78
+	dec := decodeFunc(buf)
79
+	defer dec.Close()
80
+
81
+	msg, err := dec.Decode()
80 82
 	assert.NilError(t, err)
81 83
 	assert.Assert(t, string(msg.Line) == "hello 1\n", string(msg.Line))
82 84
 
83
-	msg, err = decode()
85
+	msg, err = dec.Decode()
84 86
 	assert.NilError(t, err)
85 87
 	assert.Assert(t, string(msg.Line) == "hello 2\n")
86 88
 
87
-	msg, err = decode()
89
+	msg, err = dec.Decode()
88 90
 	assert.NilError(t, err)
89 91
 	assert.Assert(t, string(msg.Line) == "hello 3\n")
90 92
 
91
-	_, err = decode()
93
+	_, err = dec.Decode()
92 94
 	assert.Assert(t, err == io.EOF)
93 95
 }
... ...
@@ -1,21 +1,18 @@
1 1
 package local
2 2
 
3 3
 import (
4
+	"bytes"
4 5
 	"context"
5 6
 	"encoding/binary"
7
+	"fmt"
8
+	"io"
6 9
 	"io/ioutil"
7 10
 	"os"
8 11
 	"path/filepath"
12
+	"strings"
9 13
 	"testing"
10 14
 	"time"
11 15
 
12
-	"bytes"
13
-	"fmt"
14
-
15
-	"strings"
16
-
17
-	"io"
18
-
19 16
 	"github.com/docker/docker/api/types/backend"
20 17
 	"github.com/docker/docker/api/types/plugins/logdriver"
21 18
 	"github.com/docker/docker/daemon/logger"
... ...
@@ -96,51 +96,81 @@ func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (io
96 96
 	return io.NewSectionReader(r, offset, size), found, nil
97 97
 }
98 98
 
99
-func decodeFunc(rdr io.Reader) func() (*logger.Message, error) {
100
-	proto := &logdriver.LogEntry{}
101
-	buf := make([]byte, initialBufSize)
102
-
103
-	return func() (*logger.Message, error) {
104
-		var (
105
-			read int
106
-			err  error
107
-		)
108
-
109
-		resetProto(proto)
110
-
111
-		for i := 0; i < maxDecodeRetry; i++ {
112
-			var n int
113
-			n, err = io.ReadFull(rdr, buf[read:encodeBinaryLen])
114
-			if err != nil {
115
-				if err != io.ErrUnexpectedEOF {
116
-					return nil, errors.Wrap(err, "error reading log message length")
117
-				}
118
-				read += n
119
-				continue
99
+type decoder struct {
100
+	rdr   io.Reader
101
+	proto *logdriver.LogEntry
102
+	buf   []byte
103
+}
104
+
105
+func (d *decoder) Decode() (*logger.Message, error) {
106
+	if d.proto == nil {
107
+		d.proto = &logdriver.LogEntry{}
108
+	} else {
109
+		resetProto(d.proto)
110
+	}
111
+	if d.buf == nil {
112
+		d.buf = make([]byte, initialBufSize)
113
+	}
114
+	var (
115
+		read int
116
+		err  error
117
+	)
118
+
119
+	for i := 0; i < maxDecodeRetry; i++ {
120
+		var n int
121
+		n, err = io.ReadFull(d.rdr, d.buf[read:encodeBinaryLen])
122
+		if err != nil {
123
+			if err != io.ErrUnexpectedEOF {
124
+				return nil, errors.Wrap(err, "error reading log message length")
120 125
 			}
121 126
 			read += n
122
-			break
123
-		}
124
-		if err != nil {
125
-			return nil, errors.Wrapf(err, "could not read log message length: read: %d, expected: %d", read, encodeBinaryLen)
127
+			continue
126 128
 		}
129
+		read += n
130
+		break
131
+	}
132
+	if err != nil {
133
+		return nil, errors.Wrapf(err, "could not read log message length: read: %d, expected: %d", read, encodeBinaryLen)
134
+	}
127 135
 
128
-		msgLen := int(binary.BigEndian.Uint32(buf[:read]))
136
+	msgLen := int(binary.BigEndian.Uint32(d.buf[:read]))
129 137
 
130
-		if len(buf) < msgLen+encodeBinaryLen {
131
-			buf = make([]byte, msgLen+encodeBinaryLen)
138
+	if len(d.buf) < msgLen+encodeBinaryLen {
139
+		d.buf = make([]byte, msgLen+encodeBinaryLen)
140
+	} else {
141
+		if msgLen <= initialBufSize {
142
+			d.buf = d.buf[:initialBufSize]
132 143
 		} else {
133
-			if msgLen <= initialBufSize {
134
-				buf = buf[:initialBufSize]
135
-			} else {
136
-				buf = buf[:msgLen+encodeBinaryLen]
137
-			}
144
+			d.buf = d.buf[:msgLen+encodeBinaryLen]
138 145
 		}
146
+	}
139 147
 
140
-		return decodeLogEntry(rdr, proto, buf, msgLen)
148
+	return decodeLogEntry(d.rdr, d.proto, d.buf, msgLen)
149
+}
150
+
151
+func (d *decoder) Reset(rdr io.Reader) {
152
+	d.rdr = rdr
153
+	if d.proto != nil {
154
+		resetProto(d.proto)
155
+	}
156
+	if d.buf != nil {
157
+		d.buf = d.buf[:initialBufSize]
141 158
 	}
142 159
 }
143 160
 
161
+func (d *decoder) Close() {
162
+	d.buf = d.buf[:0]
163
+	d.buf = nil
164
+	if d.proto != nil {
165
+		resetProto(d.proto)
166
+	}
167
+	d.rdr = nil
168
+}
169
+
170
+func decodeFunc(rdr io.Reader) loggerutils.Decoder {
171
+	return &decoder{rdr: rdr}
172
+}
173
+
144 174
 func decodeLogEntry(rdr io.Reader, proto *logdriver.LogEntry, buf []byte, msgLen int) (*logger.Message, error) {
145 175
 	var (
146 176
 		read int
... ...
@@ -89,12 +89,25 @@ type LogFile struct {
89 89
 	filesRefCounter refCounter // keep reference-counted of decompressed files
90 90
 	notifyRotate    *pubsub.Publisher
91 91
 	marshal         logger.MarshalFunc
92
-	createDecoder   makeDecoderFunc
92
+	createDecoder   MakeDecoderFn
93 93
 	getTailReader   GetTailReaderFunc
94 94
 	perms           os.FileMode
95 95
 }
96 96
 
97
-type makeDecoderFunc func(rdr io.Reader) func() (*logger.Message, error)
97
+// MakeDecoderFn creates a decoder
98
+type MakeDecoderFn func(rdr io.Reader) Decoder
99
+
100
+// Decoder is for reading logs
101
+// It is created by the log reader by calling the `MakeDecoderFunc`
102
+type Decoder interface {
103
+	// Reset resets the decoder
104
+	// Reset is called for certain events, such as log rotations
105
+	Reset(io.Reader)
106
+	// Decode decodes the next log messeage from the stream
107
+	Decode() (*logger.Message, error)
108
+	// Close signals to the decoder that it can release whatever resources it was using.
109
+	Close()
110
+}
98 111
 
99 112
 // SizeReaderAt defines a ReaderAt that also reports its size.
100 113
 // This is used for tailing log files.
... ...
@@ -110,7 +123,7 @@ type SizeReaderAt interface {
110 110
 type GetTailReaderFunc func(ctx context.Context, f SizeReaderAt, nLogLines int) (rdr io.Reader, nLines int, err error)
111 111
 
112 112
 // NewLogFile creates new LogFile
113
-func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, marshaller logger.MarshalFunc, decodeFunc makeDecoderFunc, perms os.FileMode, getTailReader GetTailReaderFunc) (*LogFile, error) {
113
+func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, marshaller logger.MarshalFunc, decodeFunc MakeDecoderFn, perms os.FileMode, getTailReader GetTailReaderFunc) (*LogFile, error) {
114 114
 	log, err := openFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, perms)
115 115
 	if err != nil {
116 116
 		return nil, err
... ...
@@ -314,6 +327,9 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher)
314 314
 	}
315 315
 	defer currentFile.Close()
316 316
 
317
+	dec := w.createDecoder(nil)
318
+	defer dec.Close()
319
+
317 320
 	currentChunk, err := newSectionReader(currentFile)
318 321
 	if err != nil {
319 322
 		w.mu.RUnlock()
... ...
@@ -359,7 +375,7 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher)
359 359
 			readers = append(readers, currentChunk)
360 360
 		}
361 361
 
362
-		tailFiles(readers, watcher, w.createDecoder, w.getTailReader, config)
362
+		tailFiles(readers, watcher, dec, w.getTailReader, config)
363 363
 		closeFiles()
364 364
 
365 365
 		w.mu.RLock()
... ...
@@ -373,7 +389,7 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher)
373 373
 
374 374
 	notifyRotate := w.notifyRotate.Subscribe()
375 375
 	defer w.notifyRotate.Evict(notifyRotate)
376
-	followLogs(currentFile, watcher, notifyRotate, w.createDecoder, config.Since, config.Until)
376
+	followLogs(currentFile, watcher, notifyRotate, dec, config.Since, config.Until)
377 377
 }
378 378
 
379 379
 func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []*os.File, err error) {
... ...
@@ -479,7 +495,7 @@ func newSectionReader(f *os.File) (*io.SectionReader, error) {
479 479
 	return io.NewSectionReader(f, 0, size), nil
480 480
 }
481 481
 
482
-func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, createDecoder makeDecoderFunc, getTailReader GetTailReaderFunc, config logger.ReadConfig) {
482
+func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, config logger.ReadConfig) {
483 483
 	nLines := config.Tail
484 484
 
485 485
 	ctx, cancel := context.WithCancel(context.Background())
... ...
@@ -512,9 +528,10 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, createDecoder m
512 512
 	}
513 513
 
514 514
 	rdr := io.MultiReader(readers...)
515
-	decodeLogLine := createDecoder(rdr)
515
+	dec.Reset(rdr)
516
+
516 517
 	for {
517
-		msg, err := decodeLogLine()
518
+		msg, err := dec.Decode()
518 519
 		if err != nil {
519 520
 			if errors.Cause(err) != io.EOF {
520 521
 				watcher.Err <- err
... ...
@@ -535,8 +552,8 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, createDecoder m
535 535
 	}
536 536
 }
537 537
 
538
-func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, createDecoder makeDecoderFunc, since, until time.Time) {
539
-	decodeLogLine := createDecoder(f)
538
+func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, dec Decoder, since, until time.Time) {
539
+	dec.Reset(f)
540 540
 
541 541
 	name := f.Name()
542 542
 	fileWatcher, err := watchFile(name)
... ...
@@ -567,7 +584,7 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
567 567
 		if err := fileWatcher.Add(name); err != nil {
568 568
 			return err
569 569
 		}
570
-		decodeLogLine = createDecoder(f)
570
+		dec.Reset(f)
571 571
 		return nil
572 572
 	}
573 573
 
... ...
@@ -578,7 +595,7 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
578 578
 		case e := <-fileWatcher.Events():
579 579
 			switch e.Op {
580 580
 			case fsnotify.Write:
581
-				decodeLogLine = createDecoder(f)
581
+				dec.Reset(f)
582 582
 				return nil
583 583
 			case fsnotify.Rename, fsnotify.Remove:
584 584
 				select {
... ...
@@ -648,7 +665,7 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
648 648
 
649 649
 	// main loop
650 650
 	for {
651
-		msg, err := decodeLogLine()
651
+		msg, err := dec.Decode()
652 652
 		if err != nil {
653 653
 			if err := handleDecodeErr(err); err != nil {
654 654
 				if err == errDone {
... ...
@@ -15,6 +15,32 @@ import (
15 15
 	"gotest.tools/v3/assert"
16 16
 )
17 17
 
18
+type testDecoder struct {
19
+	rdr     io.Reader
20
+	scanner *bufio.Scanner
21
+}
22
+
23
+func (d *testDecoder) Decode() (*logger.Message, error) {
24
+	if d.scanner == nil {
25
+		d.scanner = bufio.NewScanner(d.rdr)
26
+	}
27
+	if !d.scanner.Scan() {
28
+		return nil, d.scanner.Err()
29
+	}
30
+	// some comment
31
+	return &logger.Message{Line: d.scanner.Bytes(), Timestamp: time.Now()}, nil
32
+}
33
+
34
+func (d *testDecoder) Reset(rdr io.Reader) {
35
+	d.rdr = rdr
36
+	d.scanner = bufio.NewScanner(rdr)
37
+}
38
+
39
+func (d *testDecoder) Close() {
40
+	d.rdr = nil
41
+	d.scanner = nil
42
+}
43
+
18 44
 func TestTailFiles(t *testing.T) {
19 45
 	s1 := strings.NewReader("Hello.\nMy name is Inigo Montoya.\n")
20 46
 	s2 := strings.NewReader("I'm serious.\nDon't call me Shirley!\n")
... ...
@@ -22,27 +48,18 @@ func TestTailFiles(t *testing.T) {
22 22
 
23 23
 	files := []SizeReaderAt{s1, s2, s3}
24 24
 	watcher := logger.NewLogWatcher()
25
-	createDecoder := func(r io.Reader) func() (*logger.Message, error) {
26
-		scanner := bufio.NewScanner(r)
27
-		return func() (*logger.Message, error) {
28
-			if !scanner.Scan() {
29
-				return nil, scanner.Err()
30
-			}
31
-			// some comment
32
-			return &logger.Message{Line: scanner.Bytes(), Timestamp: time.Now()}, nil
33
-		}
34
-	}
35 25
 
36 26
 	tailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
37 27
 		return tailfile.NewTailReader(ctx, r, lines)
38 28
 	}
29
+	dec := &testDecoder{}
39 30
 
40 31
 	for desc, config := range map[string]logger.ReadConfig{} {
41 32
 		t.Run(desc, func(t *testing.T) {
42 33
 			started := make(chan struct{})
43 34
 			go func() {
44 35
 				close(started)
45
-				tailFiles(files, watcher, createDecoder, tailReader, config)
36
+				tailFiles(files, watcher, dec, tailReader, config)
46 37
 			}()
47 38
 			<-started
48 39
 		})
... ...
@@ -52,7 +69,7 @@ func TestTailFiles(t *testing.T) {
52 52
 	started := make(chan struct{})
53 53
 	go func() {
54 54
 		close(started)
55
-		tailFiles(files, watcher, createDecoder, tailReader, config)
55
+		tailFiles(files, watcher, dec, tailReader, config)
56 56
 	}()
57 57
 	<-started
58 58
 
... ...
@@ -77,6 +94,15 @@ func TestTailFiles(t *testing.T) {
77 77
 	}
78 78
 }
79 79
 
80
+type dummyDecoder struct{}
81
+
82
+func (dummyDecoder) Decode() (*logger.Message, error) {
83
+	return &logger.Message{}, nil
84
+}
85
+
86
+func (dummyDecoder) Close()          {}
87
+func (dummyDecoder) Reset(io.Reader) {}
88
+
80 89
 func TestFollowLogsConsumerGone(t *testing.T) {
81 90
 	lw := logger.NewLogWatcher()
82 91
 
... ...
@@ -87,16 +113,12 @@ func TestFollowLogsConsumerGone(t *testing.T) {
87 87
 		os.Remove(f.Name())
88 88
 	}()
89 89
 
90
-	makeDecoder := func(rdr io.Reader) func() (*logger.Message, error) {
91
-		return func() (*logger.Message, error) {
92
-			return &logger.Message{}, nil
93
-		}
94
-	}
90
+	dec := dummyDecoder{}
95 91
 
96 92
 	followLogsDone := make(chan struct{})
97 93
 	var since, until time.Time
98 94
 	go func() {
99
-		followLogs(f, lw, make(chan interface{}), makeDecoder, since, until)
95
+		followLogs(f, lw, make(chan interface{}), dec, since, until)
100 96
 		close(followLogsDone)
101 97
 	}()
102 98
 
... ...
@@ -118,6 +140,18 @@ func TestFollowLogsConsumerGone(t *testing.T) {
118 118
 	}
119 119
 }
120 120
 
121
+type dummyWrapper struct {
122
+	dummyDecoder
123
+	fn func() error
124
+}
125
+
126
+func (d *dummyWrapper) Decode() (*logger.Message, error) {
127
+	if err := d.fn(); err != nil {
128
+		return nil, err
129
+	}
130
+	return d.dummyDecoder.Decode()
131
+}
132
+
121 133
 func TestFollowLogsProducerGone(t *testing.T) {
122 134
 	lw := logger.NewLogWatcher()
123 135
 
... ...
@@ -126,25 +160,25 @@ func TestFollowLogsProducerGone(t *testing.T) {
126 126
 	defer os.Remove(f.Name())
127 127
 
128 128
 	var sent, received, closed int
129
-	makeDecoder := func(rdr io.Reader) func() (*logger.Message, error) {
130
-		return func() (*logger.Message, error) {
131
-			if closed == 1 {
132
-				closed++
133
-				t.Logf("logDecode() closed after sending %d messages\n", sent)
134
-				return nil, io.EOF
135
-			} else if closed > 1 {
136
-				t.Fatal("logDecode() called after closing!")
137
-				return nil, io.EOF
138
-			}
129
+	dec := &dummyWrapper{fn: func() error {
130
+		switch closed {
131
+		case 0:
139 132
 			sent++
140
-			return &logger.Message{}, nil
133
+			return nil
134
+		case 1:
135
+			closed++
136
+			t.Logf("logDecode() closed after sending %d messages\n", sent)
137
+			return io.EOF
138
+		default:
139
+			t.Fatal("logDecode() called after closing!")
140
+			return io.EOF
141 141
 		}
142
-	}
142
+	}}
143 143
 	var since, until time.Time
144 144
 
145 145
 	followLogsDone := make(chan struct{})
146 146
 	go func() {
147
-		followLogs(f, lw, make(chan interface{}), makeDecoder, since, until)
147
+		followLogs(f, lw, make(chan interface{}), dec, since, until)
148 148
 		close(followLogsDone)
149 149
 	}()
150 150