logger: copy to log driver's bufsize, fixes #34887
| ... | ... |
@@ -275,6 +275,10 @@ func (l *logStream) Name() string {
|
| 275 | 275 |
return name |
| 276 | 276 |
} |
| 277 | 277 |
|
| 278 |
+func (l *logStream) BufSize() int {
|
|
| 279 |
+ return maximumBytesPerEvent |
|
| 280 |
+} |
|
| 281 |
+ |
|
| 278 | 282 |
// Log submits messages for logging by an instance of the awslogs logging driver |
| 279 | 283 |
func (l *logStream) Log(msg *logger.Message) error {
|
| 280 | 284 |
l.lock.RLock() |
| ... | ... |
@@ -1052,6 +1052,11 @@ func TestCreateTagSuccess(t *testing.T) {
|
| 1052 | 1052 |
} |
| 1053 | 1053 |
} |
| 1054 | 1054 |
|
| 1055 |
+func TestIsSizedLogger(t *testing.T) {
|
|
| 1056 |
+ awslogs := &logStream{}
|
|
| 1057 |
+ assert.Implements(t, (*logger.SizedLogger)(nil), awslogs, "awslogs should implement SizedLogger") |
|
| 1058 |
+} |
|
| 1059 |
+ |
|
| 1055 | 1060 |
func BenchmarkUnwrapEvents(b *testing.B) {
|
| 1056 | 1061 |
events := make([]wrappedEvent, maximumLogEventsPerPut) |
| 1057 | 1062 |
for i := 0; i < maximumLogEventsPerPut; i++ {
|
| ... | ... |
@@ -10,8 +10,13 @@ import ( |
| 10 | 10 |
) |
| 11 | 11 |
|
| 12 | 12 |
const ( |
| 13 |
- bufSize = 16 * 1024 |
|
| 13 |
+ // readSize is the maximum bytes read during a single read |
|
| 14 |
+ // operation. |
|
| 14 | 15 |
readSize = 2 * 1024 |
| 16 |
+ |
|
| 17 |
+ // defaultBufSize provides a reasonable default for loggers that do |
|
| 18 |
+ // not have an external limit to impose on log line size. |
|
| 19 |
+ defaultBufSize = 16 * 1024 |
|
| 15 | 20 |
) |
| 16 | 21 |
|
| 17 | 22 |
// Copier can copy logs from specified sources to Logger and attach Timestamp. |
| ... | ... |
@@ -44,7 +49,13 @@ func (c *Copier) Run() {
|
| 44 | 44 |
|
| 45 | 45 |
func (c *Copier) copySrc(name string, src io.Reader) {
|
| 46 | 46 |
defer c.copyJobs.Done() |
| 47 |
+ |
|
| 48 |
+ bufSize := defaultBufSize |
|
| 49 |
+ if sizedLogger, ok := c.dst.(SizedLogger); ok {
|
|
| 50 |
+ bufSize = sizedLogger.BufSize() |
|
| 51 |
+ } |
|
| 47 | 52 |
buf := make([]byte, bufSize) |
| 53 |
+ |
|
| 48 | 54 |
n := 0 |
| 49 | 55 |
eof := false |
| 50 | 56 |
|
| ... | ... |
@@ -31,6 +31,25 @@ func (l *TestLoggerJSON) Close() error { return nil }
|
| 31 | 31 |
|
| 32 | 32 |
func (l *TestLoggerJSON) Name() string { return "json" }
|
| 33 | 33 |
|
| 34 |
+type TestSizedLoggerJSON struct {
|
|
| 35 |
+ *json.Encoder |
|
| 36 |
+ mu sync.Mutex |
|
| 37 |
+} |
|
| 38 |
+ |
|
| 39 |
+func (l *TestSizedLoggerJSON) Log(m *Message) error {
|
|
| 40 |
+ l.mu.Lock() |
|
| 41 |
+ defer l.mu.Unlock() |
|
| 42 |
+ return l.Encode(m) |
|
| 43 |
+} |
|
| 44 |
+ |
|
| 45 |
+func (*TestSizedLoggerJSON) Close() error { return nil }
|
|
| 46 |
+ |
|
| 47 |
+func (*TestSizedLoggerJSON) Name() string { return "sized-json" }
|
|
| 48 |
+ |
|
| 49 |
+func (*TestSizedLoggerJSON) BufSize() int {
|
|
| 50 |
+ return 32 * 1024 |
|
| 51 |
+} |
|
| 52 |
+ |
|
| 34 | 53 |
func TestCopier(t *testing.T) {
|
| 35 | 54 |
stdoutLine := "Line that thinks that it is log line from docker stdout" |
| 36 | 55 |
stderrLine := "Line that thinks that it is log line from docker stderr" |
| ... | ... |
@@ -104,10 +123,9 @@ func TestCopier(t *testing.T) {
|
| 104 | 104 |
|
| 105 | 105 |
// TestCopierLongLines tests long lines without line breaks |
| 106 | 106 |
func TestCopierLongLines(t *testing.T) {
|
| 107 |
- // Long lines (should be split at "bufSize") |
|
| 108 |
- const bufSize = 16 * 1024 |
|
| 109 |
- stdoutLongLine := strings.Repeat("a", bufSize)
|
|
| 110 |
- stderrLongLine := strings.Repeat("b", bufSize)
|
|
| 107 |
+ // Long lines (should be split at "defaultBufSize") |
|
| 108 |
+ stdoutLongLine := strings.Repeat("a", defaultBufSize)
|
|
| 109 |
+ stderrLongLine := strings.Repeat("b", defaultBufSize)
|
|
| 111 | 110 |
stdoutTrailingLine := "stdout trailing line" |
| 112 | 111 |
stderrTrailingLine := "stderr trailing line" |
| 113 | 112 |
|
| ... | ... |
@@ -205,6 +223,41 @@ func TestCopierSlow(t *testing.T) {
|
| 205 | 205 |
} |
| 206 | 206 |
} |
| 207 | 207 |
|
| 208 |
+func TestCopierWithSized(t *testing.T) {
|
|
| 209 |
+ var jsonBuf bytes.Buffer |
|
| 210 |
+ expectedMsgs := 2 |
|
| 211 |
+ sizedLogger := &TestSizedLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)}
|
|
| 212 |
+ logbuf := bytes.NewBufferString(strings.Repeat(".", sizedLogger.BufSize()*expectedMsgs))
|
|
| 213 |
+ c := NewCopier(map[string]io.Reader{"stdout": logbuf}, sizedLogger)
|
|
| 214 |
+ |
|
| 215 |
+ c.Run() |
|
| 216 |
+ // Wait for Copier to finish writing to the buffered logger. |
|
| 217 |
+ c.Wait() |
|
| 218 |
+ c.Close() |
|
| 219 |
+ |
|
| 220 |
+ recvdMsgs := 0 |
|
| 221 |
+ dec := json.NewDecoder(&jsonBuf) |
|
| 222 |
+ for {
|
|
| 223 |
+ var msg Message |
|
| 224 |
+ if err := dec.Decode(&msg); err != nil {
|
|
| 225 |
+ if err == io.EOF {
|
|
| 226 |
+ break |
|
| 227 |
+ } |
|
| 228 |
+ t.Fatal(err) |
|
| 229 |
+ } |
|
| 230 |
+ if msg.Source != "stdout" {
|
|
| 231 |
+ t.Fatalf("Wrong Source: %q, should be %q", msg.Source, "stdout")
|
|
| 232 |
+ } |
|
| 233 |
+ if len(msg.Line) != sizedLogger.BufSize() {
|
|
| 234 |
+ t.Fatalf("Line was not of expected max length %d, was %d", sizedLogger.BufSize(), len(msg.Line))
|
|
| 235 |
+ } |
|
| 236 |
+ recvdMsgs++ |
|
| 237 |
+ } |
|
| 238 |
+ if recvdMsgs != expectedMsgs {
|
|
| 239 |
+ t.Fatalf("expected to receive %d messages, actually received %d", expectedMsgs, recvdMsgs)
|
|
| 240 |
+ } |
|
| 241 |
+} |
|
| 242 |
+ |
|
| 208 | 243 |
type BenchmarkLoggerDummy struct {
|
| 209 | 244 |
} |
| 210 | 245 |
|
| ... | ... |
@@ -78,6 +78,13 @@ type Logger interface {
|
| 78 | 78 |
Close() error |
| 79 | 79 |
} |
| 80 | 80 |
|
| 81 |
+// SizedLogger is the interface for logging drivers that can control |
|
| 82 |
+// the size of buffer used for their messages. |
|
| 83 |
+type SizedLogger interface {
|
|
| 84 |
+ Logger |
|
| 85 |
+ BufSize() int |
|
| 86 |
+} |
|
| 87 |
+ |
|
| 81 | 88 |
// ReadConfig is the configuration passed into ReadLogs. |
| 82 | 89 |
type ReadConfig struct {
|
| 83 | 90 |
Since time.Time |