Browse code

Improve partial message support in logger

Docker daemon has a 16K buffer for log messages. If a message length
exceeds 16K, it should be split by the logger and merged at the
endpoint.

This change adds `PartialLogMetaData` struct for enhanced partial support
- LastPartial (bool) : indicates if this is the last of all partials.
- ID (string) : unique 32 bit ID. ID is same across all partials.
- Ordinal (int starts at 1) : indicates the position of msg in the series of partials.
Also, the timestamps across partials in the same.

Signed-off-by: Anusha Ragunathan <anusha.ragunathan@docker.com>

Anusha Ragunathan authored on 2017/12/19 11:26:55
Showing 8 changed files
... ...
@@ -25,17 +25,27 @@ type ContainerAttachConfig struct {
25 25
 	MuxStreams bool
26 26
 }
27 27
 
28
+// PartialLogMetaData provides meta data for a partial log message. Messages
29
+// exceeding a predefined size are split into chunks with this metadata. The
30
+// expectation is for the logger endpoints to assemble the chunks using this
31
+// metadata.
32
+type PartialLogMetaData struct {
33
+	Last    bool   //true if this message is last of a partial
34
+	ID      string // identifies group of messages comprising a single record
35
+	Ordinal int    // ordering of message in partial group
36
+}
37
+
28 38
 // LogMessage is datastructure that represents piece of output produced by some
29 39
 // container.  The Line member is a slice of an array whose contents can be
30 40
 // changed after a log driver's Log() method returns.
31 41
 // changes to this struct need to be reflect in the reset method in
32 42
 // daemon/logger/logger.go
33 43
 type LogMessage struct {
34
-	Line      []byte
35
-	Source    string
36
-	Timestamp time.Time
37
-	Attrs     []LogAttr
38
-	Partial   bool
44
+	Line         []byte
45
+	Source       string
46
+	Timestamp    time.Time
47
+	Attrs        []LogAttr
48
+	PLogMetaData *PartialLogMetaData
39 49
 
40 50
 	// Err is an error associated with a message. Completeness of a message
41 51
 	// with Err is not expected, tho it may be partially complete (fields may
... ...
@@ -37,7 +37,7 @@ func (a *pluginAdapter) Log(msg *Message) error {
37 37
 
38 38
 	a.buf.Line = msg.Line
39 39
 	a.buf.TimeNano = msg.Timestamp.UnixNano()
40
-	a.buf.Partial = msg.Partial
40
+	a.buf.Partial = (msg.PLogMetaData != nil)
41 41
 	a.buf.Source = msg.Source
42 42
 
43 43
 	err := a.enc.Encode(&a.buf)
... ...
@@ -6,6 +6,8 @@ import (
6 6
 	"sync"
7 7
 	"time"
8 8
 
9
+	types "github.com/docker/docker/api/types/backend"
10
+	"github.com/docker/docker/pkg/stringid"
9 11
 	"github.com/sirupsen/logrus"
10 12
 )
11 13
 
... ...
@@ -58,6 +60,11 @@ func (c *Copier) copySrc(name string, src io.Reader) {
58 58
 
59 59
 	n := 0
60 60
 	eof := false
61
+	var partialid string
62
+	var partialTS time.Time
63
+	var ordinal int
64
+	firstPartial := true
65
+	hasMorePartial := false
61 66
 
62 67
 	for {
63 68
 		select {
... ...
@@ -87,6 +94,7 @@ func (c *Copier) copySrc(name string, src io.Reader) {
87 87
 			}
88 88
 			// Break up the data that we've buffered up into lines, and log each in turn.
89 89
 			p := 0
90
+
90 91
 			for q := bytes.IndexByte(buf[p:n], '\n'); q >= 0; q = bytes.IndexByte(buf[p:n], '\n') {
91 92
 				select {
92 93
 				case <-c.closed:
... ...
@@ -94,9 +102,23 @@ func (c *Copier) copySrc(name string, src io.Reader) {
94 94
 				default:
95 95
 					msg := NewMessage()
96 96
 					msg.Source = name
97
-					msg.Timestamp = time.Now().UTC()
98 97
 					msg.Line = append(msg.Line, buf[p:p+q]...)
99 98
 
99
+					if hasMorePartial {
100
+						msg.PLogMetaData = &types.PartialLogMetaData{ID: partialid, Ordinal: ordinal, Last: true}
101
+
102
+						// reset
103
+						partialid = ""
104
+						ordinal = 0
105
+						firstPartial = true
106
+						hasMorePartial = false
107
+					}
108
+					if msg.PLogMetaData == nil {
109
+						msg.Timestamp = time.Now().UTC()
110
+					} else {
111
+						msg.Timestamp = partialTS
112
+					}
113
+
100 114
 					if logErr := c.dst.Log(msg); logErr != nil {
101 115
 						logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr)
102 116
 					}
... ...
@@ -110,9 +132,23 @@ func (c *Copier) copySrc(name string, src io.Reader) {
110 110
 				if p < n {
111 111
 					msg := NewMessage()
112 112
 					msg.Source = name
113
-					msg.Timestamp = time.Now().UTC()
114 113
 					msg.Line = append(msg.Line, buf[p:n]...)
115
-					msg.Partial = true
114
+
115
+					// Generate unique partialID for first partial. Use it across partials.
116
+					// Record timestamp for first partial. Use it across partials.
117
+					// Initialize Ordinal for first partial. Increment it across partials.
118
+					if firstPartial {
119
+						msg.Timestamp = time.Now().UTC()
120
+						partialTS = msg.Timestamp
121
+						partialid = stringid.GenerateRandomID()
122
+						ordinal = 1
123
+						firstPartial = false
124
+					} else {
125
+						msg.Timestamp = partialTS
126
+					}
127
+					msg.PLogMetaData = &types.PartialLogMetaData{ID: partialid, Ordinal: ordinal, Last: false}
128
+					ordinal++
129
+					hasMorePartial = true
116 130
 
117 131
 					if logErr := c.dst.Log(msg); logErr != nil {
118 132
 						logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr)
... ...
@@ -258,6 +258,141 @@ func TestCopierWithSized(t *testing.T) {
258 258
 	}
259 259
 }
260 260
 
261
+func checkIdentical(t *testing.T, msg Message, expectedID string, expectedTS time.Time) {
262
+	if msg.PLogMetaData.ID != expectedID {
263
+		t.Fatalf("IDs are not he same across partials. Expected: %s Received: %s",
264
+			expectedID, msg.PLogMetaData.ID)
265
+	}
266
+	if msg.Timestamp != expectedTS {
267
+		t.Fatalf("Timestamps are not the same across partials. Expected: %v Received: %v",
268
+			expectedTS.Format(time.UnixDate), msg.Timestamp.Format(time.UnixDate))
269
+	}
270
+}
271
+
272
+// Have long lines and make sure that it comes out with PartialMetaData
273
+func TestCopierWithPartial(t *testing.T) {
274
+	stdoutLongLine := strings.Repeat("a", defaultBufSize)
275
+	stderrLongLine := strings.Repeat("b", defaultBufSize)
276
+	stdoutTrailingLine := "stdout trailing line"
277
+	stderrTrailingLine := "stderr trailing line"
278
+	normalStr := "This is an impartial message :)"
279
+
280
+	var stdout bytes.Buffer
281
+	var stderr bytes.Buffer
282
+	var normalMsg bytes.Buffer
283
+
284
+	for i := 0; i < 3; i++ {
285
+		if _, err := stdout.WriteString(stdoutLongLine); err != nil {
286
+			t.Fatal(err)
287
+		}
288
+		if _, err := stderr.WriteString(stderrLongLine); err != nil {
289
+			t.Fatal(err)
290
+		}
291
+	}
292
+
293
+	if _, err := stdout.WriteString(stdoutTrailingLine + "\n"); err != nil {
294
+		t.Fatal(err)
295
+	}
296
+	if _, err := stderr.WriteString(stderrTrailingLine + "\n"); err != nil {
297
+		t.Fatal(err)
298
+	}
299
+	if _, err := normalMsg.WriteString(normalStr + "\n"); err != nil {
300
+		t.Fatal(err)
301
+	}
302
+
303
+	var jsonBuf bytes.Buffer
304
+
305
+	jsonLog := &TestLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)}
306
+
307
+	c := NewCopier(
308
+		map[string]io.Reader{
309
+			"stdout": &stdout,
310
+			"normal": &normalMsg,
311
+			"stderr": &stderr,
312
+		},
313
+		jsonLog)
314
+	c.Run()
315
+	wait := make(chan struct{})
316
+	go func() {
317
+		c.Wait()
318
+		close(wait)
319
+	}()
320
+	select {
321
+	case <-time.After(1 * time.Second):
322
+		t.Fatal("Copier failed to do its work in 1 second")
323
+	case <-wait:
324
+	}
325
+
326
+	dec := json.NewDecoder(&jsonBuf)
327
+	expectedMsgs := 9
328
+	recvMsgs := 0
329
+	var expectedPartID1, expectedPartID2 string
330
+	var expectedTS1, expectedTS2 time.Time
331
+
332
+	for {
333
+		var msg Message
334
+
335
+		if err := dec.Decode(&msg); err != nil {
336
+			if err == io.EOF {
337
+				break
338
+			}
339
+			t.Fatal(err)
340
+		}
341
+		if msg.Source != "stdout" && msg.Source != "stderr" && msg.Source != "normal" {
342
+			t.Fatalf("Wrong Source: %q, should be %q or %q or %q", msg.Source, "stdout", "stderr", "normal")
343
+		}
344
+
345
+		if msg.Source == "stdout" {
346
+			if string(msg.Line) != stdoutLongLine && string(msg.Line) != stdoutTrailingLine {
347
+				t.Fatalf("Wrong Line: %q, expected 'stdoutLongLine' or 'stdoutTrailingLine'", msg.Line)
348
+			}
349
+
350
+			if msg.PLogMetaData.ID == "" {
351
+				t.Fatalf("Expected partial metadata. Got nothing")
352
+			}
353
+
354
+			if msg.PLogMetaData.Ordinal == 1 {
355
+				expectedPartID1 = msg.PLogMetaData.ID
356
+				expectedTS1 = msg.Timestamp
357
+			} else {
358
+				checkIdentical(t, msg, expectedPartID1, expectedTS1)
359
+			}
360
+			if msg.PLogMetaData.Ordinal == 4 && !msg.PLogMetaData.Last {
361
+				t.Fatalf("Last is not set for last chunk")
362
+			}
363
+		}
364
+
365
+		if msg.Source == "stderr" {
366
+			if string(msg.Line) != stderrLongLine && string(msg.Line) != stderrTrailingLine {
367
+				t.Fatalf("Wrong Line: %q, expected 'stderrLongLine' or 'stderrTrailingLine'", msg.Line)
368
+			}
369
+
370
+			if msg.PLogMetaData.ID == "" {
371
+				t.Fatalf("Expected partial metadata. Got nothing")
372
+			}
373
+
374
+			if msg.PLogMetaData.Ordinal == 1 {
375
+				expectedPartID2 = msg.PLogMetaData.ID
376
+				expectedTS2 = msg.Timestamp
377
+			} else {
378
+				checkIdentical(t, msg, expectedPartID2, expectedTS2)
379
+			}
380
+			if msg.PLogMetaData.Ordinal == 4 && !msg.PLogMetaData.Last {
381
+				t.Fatalf("Last is not set for last chunk")
382
+			}
383
+		}
384
+
385
+		if msg.Source == "normal" && msg.PLogMetaData != nil {
386
+			t.Fatalf("Normal messages should not have PartialLogMetaData")
387
+		}
388
+		recvMsgs++
389
+	}
390
+
391
+	if expectedMsgs != recvMsgs {
392
+		t.Fatalf("Expected msgs: %d Recv msgs: %d", expectedMsgs, recvMsgs)
393
+	}
394
+}
395
+
261 396
 type BenchmarkLoggerDummy struct {
262 397
 }
263 398
 
... ...
@@ -108,7 +108,7 @@ func (s *journald) Log(msg *logger.Message) error {
108 108
 	for k, v := range s.vars {
109 109
 		vars[k] = v
110 110
 	}
111
-	if msg.Partial {
111
+	if msg.PLogMetaData != nil {
112 112
 		vars["CONTAINER_PARTIAL_MESSAGE"] = "true"
113 113
 	}
114 114
 
... ...
@@ -132,7 +132,7 @@ func (l *JSONFileLogger) Log(msg *logger.Message) error {
132 132
 
133 133
 func marshalMessage(msg *logger.Message, extra json.RawMessage, buf *bytes.Buffer) error {
134 134
 	logLine := msg.Line
135
-	if !msg.Partial {
135
+	if msg.PLogMetaData == nil || (msg.PLogMetaData != nil && msg.PLogMetaData.Last) {
136 136
 		logLine = append(msg.Line, '\n')
137 137
 	}
138 138
 	err := (&jsonlog.JSONLogs{
... ...
@@ -60,7 +60,7 @@ func (m *Message) reset() {
60 60
 	m.Line = m.Line[:0]
61 61
 	m.Source = ""
62 62
 	m.Attrs = nil
63
-	m.Partial = false
63
+	m.PLogMetaData = nil
64 64
 
65 65
 	m.Err = nil
66 66
 }
... ...
@@ -6,9 +6,9 @@ import (
6 6
 
7 7
 func (m *Message) copy() *Message {
8 8
 	msg := &Message{
9
-		Source:    m.Source,
10
-		Partial:   m.Partial,
11
-		Timestamp: m.Timestamp,
9
+		Source:       m.Source,
10
+		PLogMetaData: m.PLogMetaData,
11
+		Timestamp:    m.Timestamp,
12 12
 	}
13 13
 
14 14
 	if m.Attrs != nil {