Browse code

logger: copy to log driver's bufsize

Log drivers may have an internal buffer size that can be accommodated
by the copier as it is more effective to buffer and send fewer though
larger messages that the log driver can consume.

This eliminates the need for Partial handling for drivers that do not
support the concept (ie: awslogs, which can only have events up to
service limits).

Signed-off-by: Jacob Vallejo <jakeev@amazon.com>

Jacob Vallejo authored on 2017/08/23 02:52:52
Showing 5 changed files
... ...
@@ -245,6 +245,10 @@ func (l *logStream) Name() string {
245 245
 	return name
246 246
 }
247 247
 
248
+func (l *logStream) BufSize() int {
249
+	return maximumBytesPerEvent
250
+}
251
+
248 252
 // Log submits messages for logging by an instance of the awslogs logging driver
249 253
 func (l *logStream) Log(msg *logger.Message) error {
250 254
 	l.lock.RLock()
... ...
@@ -1049,6 +1049,11 @@ func TestCreateTagSuccess(t *testing.T) {
1049 1049
 	}
1050 1050
 }
1051 1051
 
1052
+func TestIsSizedLogger(t *testing.T) {
1053
+	awslogs := &logStream{}
1054
+	assert.Implements(t, (*logger.SizedLogger)(nil), awslogs, "awslogs should implement SizedLogger")
1055
+}
1056
+
1052 1057
 func BenchmarkUnwrapEvents(b *testing.B) {
1053 1058
 	events := make([]wrappedEvent, maximumLogEventsPerPut)
1054 1059
 	for i := 0; i < maximumLogEventsPerPut; i++ {
... ...
@@ -10,8 +10,13 @@ import (
10 10
 )
11 11
 
12 12
 const (
13
-	bufSize  = 16 * 1024
13
+	// readSize is the maximum bytes read during a single read
14
+	// operation.
14 15
 	readSize = 2 * 1024
16
+
17
+	// defaultBufSize provides a reasonable default for loggers that do
18
+	// not have an external limit to impose on log line size.
19
+	defaultBufSize = 16 * 1024
15 20
 )
16 21
 
17 22
 // Copier can copy logs from specified sources to Logger and attach Timestamp.
... ...
@@ -44,7 +49,13 @@ func (c *Copier) Run() {
44 44
 
45 45
 func (c *Copier) copySrc(name string, src io.Reader) {
46 46
 	defer c.copyJobs.Done()
47
+
48
+	bufSize := defaultBufSize
49
+	if sizedLogger, ok := c.dst.(SizedLogger); ok {
50
+		bufSize = sizedLogger.BufSize()
51
+	}
47 52
 	buf := make([]byte, bufSize)
53
+
48 54
 	n := 0
49 55
 	eof := false
50 56
 
... ...
@@ -31,6 +31,25 @@ func (l *TestLoggerJSON) Close() error { return nil }
31 31
 
32 32
 func (l *TestLoggerJSON) Name() string { return "json" }
33 33
 
34
+type TestSizedLoggerJSON struct {
35
+	*json.Encoder
36
+	mu sync.Mutex
37
+}
38
+
39
+func (l *TestSizedLoggerJSON) Log(m *Message) error {
40
+	l.mu.Lock()
41
+	defer l.mu.Unlock()
42
+	return l.Encode(m)
43
+}
44
+
45
+func (*TestSizedLoggerJSON) Close() error { return nil }
46
+
47
+func (*TestSizedLoggerJSON) Name() string { return "sized-json" }
48
+
49
+func (*TestSizedLoggerJSON) BufSize() int {
50
+	return 32 * 1024
51
+}
52
+
34 53
 func TestCopier(t *testing.T) {
35 54
 	stdoutLine := "Line that thinks that it is log line from docker stdout"
36 55
 	stderrLine := "Line that thinks that it is log line from docker stderr"
... ...
@@ -104,10 +123,9 @@ func TestCopier(t *testing.T) {
104 104
 
105 105
 // TestCopierLongLines tests long lines without line breaks
106 106
 func TestCopierLongLines(t *testing.T) {
107
-	// Long lines (should be split at "bufSize")
108
-	const bufSize = 16 * 1024
109
-	stdoutLongLine := strings.Repeat("a", bufSize)
110
-	stderrLongLine := strings.Repeat("b", bufSize)
107
+	// Long lines (should be split at "defaultBufSize")
108
+	stdoutLongLine := strings.Repeat("a", defaultBufSize)
109
+	stderrLongLine := strings.Repeat("b", defaultBufSize)
111 110
 	stdoutTrailingLine := "stdout trailing line"
112 111
 	stderrTrailingLine := "stderr trailing line"
113 112
 
... ...
@@ -205,6 +223,41 @@ func TestCopierSlow(t *testing.T) {
205 205
 	}
206 206
 }
207 207
 
208
+func TestCopierWithSized(t *testing.T) {
209
+	var jsonBuf bytes.Buffer
210
+	expectedMsgs := 2
211
+	sizedLogger := &TestSizedLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)}
212
+	logbuf := bytes.NewBufferString(strings.Repeat(".", sizedLogger.BufSize()*expectedMsgs))
213
+	c := NewCopier(map[string]io.Reader{"stdout": logbuf}, sizedLogger)
214
+
215
+	c.Run()
216
+	// Wait for Copier to finish writing to the buffered logger.
217
+	c.Wait()
218
+	c.Close()
219
+
220
+	recvdMsgs := 0
221
+	dec := json.NewDecoder(&jsonBuf)
222
+	for {
223
+		var msg Message
224
+		if err := dec.Decode(&msg); err != nil {
225
+			if err == io.EOF {
226
+				break
227
+			}
228
+			t.Fatal(err)
229
+		}
230
+		if msg.Source != "stdout" {
231
+			t.Fatalf("Wrong Source: %q, should be %q", msg.Source, "stdout")
232
+		}
233
+		if len(msg.Line) != sizedLogger.BufSize() {
234
+			t.Fatalf("Line was not of expected max length %d, was %d", sizedLogger.BufSize(), len(msg.Line))
235
+		}
236
+		recvdMsgs++
237
+	}
238
+	if recvdMsgs != expectedMsgs {
239
+		t.Fatalf("expected to receive %d messages, actually received %d", expectedMsgs, recvdMsgs)
240
+	}
241
+}
242
+
208 243
 type BenchmarkLoggerDummy struct {
209 244
 }
210 245
 
... ...
@@ -81,6 +81,13 @@ type Logger interface {
81 81
 	Close() error
82 82
 }
83 83
 
84
+// SizedLogger is the interface for logging drivers that can control
85
+// the size of buffer used for their messages.
86
+type SizedLogger interface {
87
+	Logger
88
+	BufSize() int
89
+}
90
+
84 91
 // ReadConfig is the configuration passed into ReadLogs.
85 92
 type ReadConfig struct {
86 93
 	Since  time.Time