Browse code

Improve logging of long log lines

This change updates how we handle long lines of output from the
container. The previous logic used a bufio reader to read entire lines
of output from the container through an intermediate BytesPipe, and that
allowed the container to cause dockerd to consume an unconstrained
amount of memory as it attempted to collect a whole line of output, by
outputting data without newlines.

To avoid that, we replace the bufio reader with our own buffering scheme
that handles log lines up to 16k in length, breaking up anything longer
than that into multiple chunks. If we can dispense with noting this
detail properly at the end of output, we can switch from using
ReadBytes() to using ReadLine() instead. We add a field ("Partial") to
the log message structure to flag when we pass data to the log driver
that did not end with a newline.

The Line member of Message structures that we pass to log drivers is now
a slice into data which can be overwritten between calls to the log
driver's Log() method, so drivers which batch up Messages before
processing them need to take additional care: we add a function
(logger.CopyMessage()) that can be used to create a deep copy of a
Message structure, and modify the awslogs driver to use it.

We update the jsonfile log driver to append a "\n" to the data that it
logs to disk only when the Partial flag is false (it previously did so
unconditionally), to make its "logs" output correctly reproduce the data
as we received it.

Likewise, we modify the journald log driver to add a data field with
value CONTAINER_PARTIAL_MESSAGE=true to entries when the Partial flag is
true, and update its "logs" reader to refrain from appending a "\n" to
the data that it retrieves if it does not see this field/value pair (it
also previously did this unconditionally).

Signed-off-by: Nalin Dahyabhai <nalin@redhat.com> (github: nalind)

Nalin Dahyabhai authored on 2016/05/25 03:12:47
Showing 6 changed files
... ...
@@ -165,7 +165,8 @@ func (l *logStream) Log(msg *logger.Message) error {
165 165
 	l.lock.RLock()
166 166
 	defer l.lock.RUnlock()
167 167
 	if !l.closed {
168
-		l.messages <- msg
168
+		// buffer up the data, making sure to copy the Line data
169
+		l.messages <- logger.CopyMessage(msg)
169 170
 	}
170 171
 	return nil
171 172
 }
... ...
@@ -1,7 +1,6 @@
1 1
 package logger
2 2
 
3 3
 import (
4
-	"bufio"
5 4
 	"bytes"
6 5
 	"io"
7 6
 	"sync"
... ...
@@ -10,8 +9,13 @@ import (
10 10
 	"github.com/Sirupsen/logrus"
11 11
 )
12 12
 
13
+const (
14
+	bufSize  = 16 * 1024
15
+	readSize = 2 * 1024
16
+)
17
+
13 18
 // Copier can copy logs from specified sources to Logger and attach Timestamp.
14
-// Writes are concurrent, so you need implement some sync in your logger
19
+// Writes are concurrent, so you need implement some sync in your logger.
15 20
 type Copier struct {
16 21
 	// srcs is map of name -> reader pairs, for example "stdout", "stderr"
17 22
 	srcs     map[string]io.Reader
... ...
@@ -39,30 +43,76 @@ func (c *Copier) Run() {
39 39
 
40 40
 func (c *Copier) copySrc(name string, src io.Reader) {
41 41
 	defer c.copyJobs.Done()
42
-	reader := bufio.NewReader(src)
42
+	buf := make([]byte, bufSize)
43
+	n := 0
44
+	eof := false
45
+	msg := &Message{Source: name}
43 46
 
44 47
 	for {
45 48
 		select {
46 49
 		case <-c.closed:
47 50
 			return
48 51
 		default:
49
-			line, err := reader.ReadBytes('\n')
50
-			line = bytes.TrimSuffix(line, []byte{'\n'})
51
-
52
-			// ReadBytes can return full or partial output even when it failed.
53
-			// e.g. it can return a full entry and EOF.
54
-			if err == nil || len(line) > 0 {
55
-				if logErr := c.dst.Log(&Message{Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil {
56
-					logrus.Errorf("Failed to log msg %q for logger %s: %s", line, c.dst.Name(), logErr)
57
-				}
52
+			// Work out how much more data we are okay with reading this time.
53
+			upto := n + readSize
54
+			if upto > cap(buf) {
55
+				upto = cap(buf)
58 56
 			}
59
-
60
-			if err != nil {
61
-				if err != io.EOF {
62
-					logrus.Errorf("Error scanning log stream: %s", err)
57
+			// Try to read that data.
58
+			if upto > n {
59
+				read, err := src.Read(buf[n:upto])
60
+				if err != nil {
61
+					if err != io.EOF {
62
+						logrus.Errorf("Error scanning log stream: %s", err)
63
+						return
64
+					}
65
+					eof = true
63 66
 				}
67
+				n += read
68
+			}
69
+			// If we have no data to log, and there's no more coming, we're done.
70
+			if n == 0 && eof {
64 71
 				return
65 72
 			}
73
+			// Break up the data that we've buffered up into lines, and log each in turn.
74
+			p := 0
75
+			for q := bytes.Index(buf[p:n], []byte{'\n'}); q >= 0; q = bytes.Index(buf[p:n], []byte{'\n'}) {
76
+				msg.Line = buf[p : p+q]
77
+				msg.Timestamp = time.Now().UTC()
78
+				msg.Partial = false
79
+				select {
80
+				case <-c.closed:
81
+					return
82
+				default:
83
+					if logErr := c.dst.Log(msg); logErr != nil {
84
+						logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr)
85
+					}
86
+				}
87
+				p += q + 1
88
+			}
89
+			// If there's no more coming, or the buffer is full but
90
+			// has no newlines, log whatever we haven't logged yet,
91
+			// noting that it's a partial log line.
92
+			if eof || (p == 0 && n == len(buf)) {
93
+				if p < n {
94
+					msg.Line = buf[p:n]
95
+					msg.Timestamp = time.Now().UTC()
96
+					msg.Partial = true
97
+					if logErr := c.dst.Log(msg); logErr != nil {
98
+						logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr)
99
+					}
100
+					p = 0
101
+					n = 0
102
+				}
103
+				if eof {
104
+					return
105
+				}
106
+			}
107
+			// Move any unlogged data to the front of the buffer in preparation for another read.
108
+			if p > 0 {
109
+				copy(buf[0:], buf[p:n])
110
+				n -= p
111
+			}
66 112
 		}
67 113
 	}
68 114
 }
... ...
@@ -84,10 +84,17 @@ func validateLogOpt(cfg map[string]string) error {
84 84
 }
85 85
 
86 86
 func (s *journald) Log(msg *logger.Message) error {
87
+	vars := map[string]string{}
88
+	for k, v := range s.vars {
89
+		vars[k] = v
90
+	}
91
+	if msg.Partial {
92
+		vars["CONTAINER_PARTIAL_MESSAGE"] = "true"
93
+	}
87 94
 	if msg.Source == "stderr" {
88
-		return journal.Send(string(msg.Line), journal.PriErr, s.vars)
95
+		return journal.Send(string(msg.Line), journal.PriErr, vars)
89 96
 	}
90
-	return journal.Send(string(msg.Line), journal.PriInfo, s.vars)
97
+	return journal.Send(string(msg.Line), journal.PriInfo, vars)
91 98
 }
92 99
 
93 100
 func (s *journald) Name() string {
... ...
@@ -12,11 +12,15 @@ package journald
12 12
 // #include <time.h>
13 13
 // #include <unistd.h>
14 14
 //
15
-//static int get_message(sd_journal *j, const char **msg, size_t *length)
15
+//static int get_message(sd_journal *j, const char **msg, size_t *length, int *partial)
16 16
 //{
17 17
 //	int rc;
18
+//	size_t plength;
18 19
 //	*msg = NULL;
19 20
 //	*length = 0;
21
+//	plength = strlen("CONTAINER_PARTIAL_MESSAGE=true");
22
+//	rc = sd_journal_get_data(j, "CONTAINER_PARTIAL_MESSAGE", (const void **) msg, length);
23
+//	*partial = ((rc == 0) && (*length == plength) && (memcmp(*msg, "CONTAINER_PARTIAL_MESSAGE=true", plength) == 0));
20 24
 //	rc = sd_journal_get_data(j, "MESSAGE", (const void **) msg, length);
21 25
 //	if (rc == 0) {
22 26
 //		if (*length > 8) {
... ...
@@ -167,7 +171,7 @@ func (s *journald) drainJournal(logWatcher *logger.LogWatcher, config logger.Rea
167 167
 	var msg, data, cursor *C.char
168 168
 	var length C.size_t
169 169
 	var stamp C.uint64_t
170
-	var priority C.int
170
+	var priority, partial C.int
171 171
 
172 172
 	// Walk the journal from here forward until we run out of new entries.
173 173
 drain:
... ...
@@ -183,7 +187,7 @@ drain:
183 183
 			}
184 184
 		}
185 185
 		// Read and send the logged message, if there is one to read.
186
-		i := C.get_message(j, &msg, &length)
186
+		i := C.get_message(j, &msg, &length, &partial)
187 187
 		if i != -C.ENOENT && i != -C.EADDRNOTAVAIL {
188 188
 			// Read the entry's timestamp.
189 189
 			if C.sd_journal_get_realtime_usec(j, &stamp) != 0 {
... ...
@@ -191,7 +195,10 @@ drain:
191 191
 			}
192 192
 			// Set up the time and text of the entry.
193 193
 			timestamp := time.Unix(int64(stamp)/1000000, (int64(stamp)%1000000)*1000)
194
-			line := append(C.GoBytes(unsafe.Pointer(msg), C.int(length)), "\n"...)
194
+			line := C.GoBytes(unsafe.Pointer(msg), C.int(length))
195
+			if partial == 0 {
196
+				line = append(line, "\n"...)
197
+			}
195 198
 			// Recover the stream name by mapping
196 199
 			// from the journal priority back to
197 200
 			// the stream that we would have
... ...
@@ -90,8 +90,12 @@ func (l *JSONFileLogger) Log(msg *logger.Message) error {
90 90
 		return err
91 91
 	}
92 92
 	l.mu.Lock()
93
+	logline := msg.Line
94
+	if !msg.Partial {
95
+		logline = append(msg.Line, '\n')
96
+	}
93 97
 	err = (&jsonlog.JSONLogs{
94
-		Log:      append(msg.Line, '\n'),
98
+		Log:      logline,
95 99
 		Stream:   msg.Source,
96 100
 		Created:  timestamp,
97 101
 		RawAttrs: l.extra,
... ...
@@ -25,12 +25,33 @@ const (
25 25
 	logWatcherBufferSize = 4096
26 26
 )
27 27
 
28
-// Message is datastructure that represents record from some container.
28
+// Message is datastructure that represents piece of output produced by some
29
+// container.  The Line member is a slice of an array whose contents can be
30
+// changed after a log driver's Log() method returns.
29 31
 type Message struct {
30 32
 	Line      []byte
31 33
 	Source    string
32 34
 	Timestamp time.Time
33 35
 	Attrs     LogAttributes
36
+	Partial   bool
37
+}
38
+
39
+// CopyMessage creates a copy of the passed-in Message which will remain
40
+// unchanged if the original is changed.  Log drivers which buffer Messages
41
+// rather than dispatching them during their Log() method should use this
42
+// function to obtain a Message whose Line member's contents won't change.
43
+func CopyMessage(msg *Message) *Message {
44
+	m := new(Message)
45
+	m.Line = make([]byte, len(msg.Line))
46
+	copy(m.Line, msg.Line)
47
+	m.Source = msg.Source
48
+	m.Timestamp = msg.Timestamp
49
+	m.Partial = msg.Partial
50
+	m.Attrs = make(LogAttributes)
51
+	for k, v := range m.Attrs {
52
+		m.Attrs[k] = v
53
+	}
54
+	return m
34 55
 }
35 56
 
36 57
 // LogAttributes is used to hold the extra attributes available in the log message