Browse code

Use sync.Pool for logger Messages

This reduces allocs and bytes used per log entry significantly as well
as some improvement to time per log operation.

Each log driver, however, must put messages back in the pool once they
are finished with the message.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>

Brian Goff authored on 2016/12/12 23:54:20
Showing 14 changed files
... ...
@@ -203,7 +203,6 @@ func (l *logStream) Log(msg *logger.Message) error {
203 203
 	l.lock.RLock()
204 204
 	defer l.lock.RUnlock()
205 205
 	if !l.closed {
206
-		// buffer up the data, making sure to copy the Line data
207 206
 		l.messages <- msg
208 207
 	}
209 208
 	return nil
... ...
@@ -347,6 +346,7 @@ func (l *logStream) collectBatch() {
347 347
 				})
348 348
 				bytes += (lineBytes + perEventBytes)
349 349
 			}
350
+			logger.PutMessage(msg)
350 351
 		}
351 352
 	}
352 353
 }
... ...
@@ -76,15 +76,14 @@ func (c *Copier) copySrc(name string, src io.Reader) {
76 76
 			}
77 77
 			// Break up the data that we've buffered up into lines, and log each in turn.
78 78
 			p := 0
79
-			for q := bytes.Index(buf[p:n], []byte{'\n'}); q >= 0; q = bytes.Index(buf[p:n], []byte{'\n'}) {
79
+			for q := bytes.IndexByte(buf[p:n], '\n'); q >= 0; q = bytes.IndexByte(buf[p:n], '\n') {
80 80
 				select {
81 81
 				case <-c.closed:
82 82
 					return
83 83
 				default:
84
-					msg := &Message{
85
-						Source:    name,
86
-						Timestamp: time.Now().UTC(),
87
-					}
84
+					msg := NewMessage()
85
+					msg.Source = name
86
+					msg.Timestamp = time.Now().UTC()
88 87
 					msg.Line = append(msg.Line, buf[p:p+q]...)
89 88
 
90 89
 					if logErr := c.dst.Log(msg); logErr != nil {
... ...
@@ -98,11 +97,9 @@ func (c *Copier) copySrc(name string, src io.Reader) {
98 98
 			// noting that it's a partial log line.
99 99
 			if eof || (p == 0 && n == len(buf)) {
100 100
 				if p < n {
101
-					msg := &Message{
102
-						Source:    name,
103
-						Timestamp: time.Now().UTC(),
104
-						Partial:   true,
105
-					}
101
+					msg := NewMessage()
102
+					msg.Source = name
103
+					msg.Timestamp = time.Now().UTC()
106 104
 					msg.Line = append(msg.Line, buf[p:n]...)
107 105
 					msg.Partial = true
108 106
 
... ...
@@ -208,7 +208,7 @@ func TestCopierSlow(t *testing.T) {
208 208
 type BenchmarkLoggerDummy struct {
209 209
 }
210 210
 
211
-func (l *BenchmarkLoggerDummy) Log(m *Message) error { return nil }
211
+func (l *BenchmarkLoggerDummy) Log(m *Message) error { PutMessage(m); return nil }
212 212
 
213 213
 func (l *BenchmarkLoggerDummy) Close() error { return nil }
214 214
 
... ...
@@ -76,7 +76,9 @@ func (etwLogger *etwLogs) Log(msg *logger.Message) error {
76 76
 		logrus.Error(errorMessage)
77 77
 		return errors.New(errorMessage)
78 78
 	}
79
-	return callEventWriteString(createLogMessage(etwLogger, msg))
79
+	m := createLogMessage(etwLogger, msg)
80
+	logger.PutMessage(msg)
81
+	return callEventWriteString(m)
80 82
 }
81 83
 
82 84
 // Close closes the logger by unregistering the ETW provider.
... ...
@@ -151,9 +151,12 @@ func (f *fluentd) Log(msg *logger.Message) error {
151 151
 	for k, v := range f.extra {
152 152
 		data[k] = v
153 153
 	}
154
+
155
+	ts := msg.Timestamp
156
+	logger.PutMessage(msg)
154 157
 	// fluent-logger-golang buffers logs from failures and disconnections,
155 158
 	// and these are transferred again automatically.
156
-	return f.writer.PostWithTime(f.tag, msg.Timestamp, data)
159
+	return f.writer.PostWithTime(f.tag, ts, data)
157 160
 }
158 161
 
159 162
 func (f *fluentd) Close() error {
... ...
@@ -194,12 +194,16 @@ func ValidateLogOpts(cfg map[string]string) error {
194 194
 }
195 195
 
196 196
 func (l *gcplogs) Log(m *logger.Message) error {
197
+	data := string(m.Line)
198
+	ts := m.Timestamp
199
+	logger.PutMessage(m)
200
+
197 201
 	l.logger.Log(logging.Entry{
198
-		Timestamp: m.Timestamp,
202
+		Timestamp: ts,
199 203
 		Payload: &dockerLogEntry{
200 204
 			Instance:  l.instance,
201 205
 			Container: l.container,
202
-			Data:      string(m.Line),
206
+			Data:      data,
203 207
 		},
204 208
 	})
205 209
 	return nil
... ...
@@ -133,6 +133,7 @@ func (s *gelfLogger) Log(msg *logger.Message) error {
133 133
 		Level:    level,
134 134
 		RawExtra: s.rawExtra,
135 135
 	}
136
+	logger.PutMessage(msg)
136 137
 
137 138
 	if err := s.writer.WriteMessage(&m); err != nil {
138 139
 		return fmt.Errorf("gelf: cannot send GELF message: %v", err)
... ...
@@ -105,10 +105,14 @@ func (s *journald) Log(msg *logger.Message) error {
105 105
 	if msg.Partial {
106 106
 		vars["CONTAINER_PARTIAL_MESSAGE"] = "true"
107 107
 	}
108
+
109
+	line := string(msg.Line)
110
+	logger.PutMessage(msg)
111
+
108 112
 	if msg.Source == "stderr" {
109
-		return journal.Send(string(msg.Line), journal.PriErr, vars)
113
+		return journal.Send(line, journal.PriErr, vars)
110 114
 	}
111
-	return journal.Send(string(msg.Line), journal.PriInfo, vars)
115
+	return journal.Send(line, journal.PriInfo, vars)
112 116
 }
113 117
 
114 118
 func (s *journald) Name() string {
... ...
@@ -100,6 +100,7 @@ func (l *JSONFileLogger) Log(msg *logger.Message) error {
100 100
 		Created:  timestamp,
101 101
 		RawAttrs: l.extra,
102 102
 	}).MarshalJSONBuf(l.buf)
103
+	logger.PutMessage(msg)
103 104
 	if err != nil {
104 105
 		l.mu.Unlock()
105 106
 		return err
... ...
@@ -61,7 +61,9 @@ func (f *logentries) Log(msg *logger.Message) error {
61 61
 	for k, v := range f.extra {
62 62
 		data[k] = v
63 63
 	}
64
-	f.writer.Println(f.tag, msg.Timestamp, data)
64
+	ts := msg.Timestamp
65
+	logger.PutMessage(msg)
66
+	f.writer.Println(f.tag, ts, data)
65 67
 	return nil
66 68
 }
67 69
 
... ...
@@ -26,9 +26,24 @@ const (
26 26
 	logWatcherBufferSize = 4096
27 27
 )
28 28
 
29
+var messagePool = &sync.Pool{New: func() interface{} { return &Message{Line: make([]byte, 0, 256)} }}
30
+
31
+// NewMessage returns a new message from the message sync.Pool
32
+func NewMessage() *Message {
33
+	return messagePool.Get().(*Message)
34
+}
35
+
36
+// PutMessage puts the specified message back n the message pool.
37
+// The message fields are reset before putting into the pool.
38
+func PutMessage(msg *Message) {
39
+	msg.reset()
40
+	messagePool.Put(msg)
41
+}
42
+
29 43
 // Message is datastructure that represents piece of output produced by some
30 44
 // container.  The Line member is a slice of an array whose contents can be
31 45
 // changed after a log driver's Log() method returns.
46
+// Any changes made to this struct must also be updated in the `reset` function
32 47
 type Message struct {
33 48
 	Line      []byte
34 49
 	Source    string
... ...
@@ -37,6 +52,16 @@ type Message struct {
37 37
 	Partial   bool
38 38
 }
39 39
 
40
+// reset sets the message back to default values
41
+// This is used when putting a message back into the message pool.
42
+// Any changes to the `Message` struct should be reflected here.
43
+func (m *Message) reset() {
44
+	m.Line = m.Line[:0]
45
+	m.Source = ""
46
+	m.Attrs = nil
47
+	m.Partial = false
48
+}
49
+
40 50
 // LogAttributes is used to hold the extra attributes available in the log message
41 51
 // Primarily used for converting the map type to string and sorting.
42 52
 type LogAttributes map[string]string
... ...
@@ -83,10 +83,18 @@ func (r *RingLogger) Close() error {
83 83
 	r.setClosed()
84 84
 	r.buffer.Close()
85 85
 	// empty out the queue
86
+	var logErr bool
86 87
 	for _, msg := range r.buffer.Drain() {
88
+		if logErr {
89
+			// some error logging a previous message, so re-insert to message pool
90
+			// and assume log driver is hosed
91
+			PutMessage(msg)
92
+			continue
93
+		}
94
+
87 95
 		if err := r.l.Log(msg); err != nil {
88 96
 			logrus.WithField("driver", r.l.Name()).WithField("container", r.logInfo.ContainerID).Errorf("Error writing log message: %v", r.l)
89
-			break
97
+			logErr = true
90 98
 		}
91 99
 	}
92 100
 	return r.l.Close()
... ...
@@ -336,7 +336,7 @@ func (l *splunkLoggerInline) Log(msg *logger.Message) error {
336 336
 	event.Source = msg.Source
337 337
 
338 338
 	message.Event = &event
339
-
339
+	logger.PutMessage(msg)
340 340
 	return l.queueMessageAsync(message)
341 341
 }
342 342
 
... ...
@@ -354,7 +354,7 @@ func (l *splunkLoggerJSON) Log(msg *logger.Message) error {
354 354
 	event.Source = msg.Source
355 355
 
356 356
 	message.Event = &event
357
-
357
+	logger.PutMessage(msg)
358 358
 	return l.queueMessageAsync(message)
359 359
 }
360 360
 
... ...
@@ -362,7 +362,7 @@ func (l *splunkLoggerRaw) Log(msg *logger.Message) error {
362 362
 	message := l.createSplunkMessage(msg)
363 363
 
364 364
 	message.Event = string(append(l.prefix, msg.Line...))
365
-
365
+	logger.PutMessage(msg)
366 366
 	return l.queueMessageAsync(message)
367 367
 }
368 368
 
... ...
@@ -132,10 +132,12 @@ func New(info logger.Info) (logger.Logger, error) {
132 132
 }
133 133
 
134 134
 func (s *syslogger) Log(msg *logger.Message) error {
135
+	line := string(msg.Line)
136
+	logger.PutMessage(msg)
135 137
 	if msg.Source == "stderr" {
136
-		return s.writer.Err(string(msg.Line))
138
+		return s.writer.Err(line)
137 139
 	}
138
-	return s.writer.Info(string(msg.Line))
140
+	return s.writer.Info(line)
139 141
 }
140 142
 
141 143
 func (s *syslogger) Close() error {