Browse code

Allow awslogs to use non-blocking mode

When then non-blocking mode is specified, awslogs will:

- No longer potentially block calls to logstream.Log(), instead will
return an error if the awslogs buffer is full. This has the effect of
dropping log messages sent to awslogs.Log() that are made while the
buffer is full.
- Wait to initialize the log stream until the first Log() call instead of in
New(). This has the effect of allowing the container to start in
the case where Cloudwatch Logs is unreachable.

Both of these changes require the --log-opt mode=non-blocking to be
explicitly set and do not modify the default behavior.

Signed-off-by: Cody Roseborough <crrosebo@amazon.com>

Cody Roseborough authored on 2018/03/03 03:52:15
Showing 2 changed files
... ...
@@ -61,6 +61,7 @@ type logStream struct {
61 61
 	logStreamName    string
62 62
 	logGroupName     string
63 63
 	logCreateGroup   bool
64
+	logNonBlocking   bool
64 65
 	multilinePattern *regexp.Regexp
65 66
 	client           api
66 67
 	messages         chan *logger.Message
... ...
@@ -127,6 +128,8 @@ func New(info logger.Info) (logger.Logger, error) {
127 127
 		}
128 128
 	}
129 129
 
130
+	logNonBlocking := info.Config["mode"] == "non-blocking"
131
+
130 132
 	if info.Config[logStreamKey] != "" {
131 133
 		logStreamName = info.Config[logStreamKey]
132 134
 	}
... ...
@@ -140,19 +143,54 @@ func New(info logger.Info) (logger.Logger, error) {
140 140
 	if err != nil {
141 141
 		return nil, err
142 142
 	}
143
+
143 144
 	containerStream := &logStream{
144 145
 		logStreamName:    logStreamName,
145 146
 		logGroupName:     logGroupName,
146 147
 		logCreateGroup:   logCreateGroup,
148
+		logNonBlocking:   logNonBlocking,
147 149
 		multilinePattern: multilinePattern,
148 150
 		client:           client,
149 151
 		messages:         make(chan *logger.Message, 4096),
150 152
 	}
151
-	err = containerStream.create()
152
-	if err != nil {
153
-		return nil, err
153
+
154
+	creationDone := make(chan bool)
155
+	if logNonBlocking {
156
+		go func() {
157
+			backoff := 1
158
+			maxBackoff := 32
159
+			for {
160
+				// If logger is closed we are done
161
+				containerStream.lock.RLock()
162
+				if containerStream.closed {
163
+					containerStream.lock.RUnlock()
164
+					break
165
+				}
166
+				containerStream.lock.RUnlock()
167
+				err := containerStream.create()
168
+				if err == nil {
169
+					break
170
+				}
171
+
172
+				time.Sleep(time.Duration(backoff) * time.Second)
173
+				if backoff < maxBackoff {
174
+					backoff *= 2
175
+				}
176
+				logrus.
177
+					WithError(err).
178
+					WithField("container-id", info.ContainerID).
179
+					WithField("container-name", info.ContainerName).
180
+					Error("Error while trying to initialize awslogs. Retrying in: ", backoff, " seconds")
181
+			}
182
+			close(creationDone)
183
+		}()
184
+	} else {
185
+		if err = containerStream.create(); err != nil {
186
+			return nil, err
187
+		}
188
+		close(creationDone)
154 189
 	}
155
-	go containerStream.collectBatch()
190
+	go containerStream.collectBatch(creationDone)
156 191
 
157 192
 	return containerStream, nil
158 193
 }
... ...
@@ -294,9 +332,18 @@ func (l *logStream) BufSize() int {
294 294
 func (l *logStream) Log(msg *logger.Message) error {
295 295
 	l.lock.RLock()
296 296
 	defer l.lock.RUnlock()
297
-	if !l.closed {
298
-		l.messages <- msg
297
+	if l.closed {
298
+		return errors.New("awslogs is closed")
299
+	}
300
+	if l.logNonBlocking {
301
+		select {
302
+		case l.messages <- msg:
303
+			return nil
304
+		default:
305
+			return errors.New("awslogs buffer is full")
306
+		}
299 307
 	}
308
+	l.messages <- msg
300 309
 	return nil
301 310
 }
302 311
 
... ...
@@ -322,7 +369,9 @@ func (l *logStream) create() error {
322 322
 				return l.createLogStream()
323 323
 			}
324 324
 		}
325
-		return err
325
+		if err != nil {
326
+			return err
327
+		}
326 328
 	}
327 329
 
328 330
 	return nil
... ...
@@ -399,7 +448,9 @@ var newTicker = func(freq time.Duration) *time.Ticker {
399 399
 // seconds.  When events are ready to be processed for submission to CloudWatch
400 400
 // Logs, the processEvents method is called.  If a multiline pattern is not
401 401
 // configured, log events are submitted to the processEvents method immediately.
402
-func (l *logStream) collectBatch() {
402
+func (l *logStream) collectBatch(created chan bool) {
403
+	// Wait for the logstream/group to be created
404
+	<-created
403 405
 	ticker := newTicker(batchPublishFrequency)
404 406
 	var eventBuffer []byte
405 407
 	var eventBufferTimestamp int64
... ...
@@ -200,6 +200,93 @@ func TestCreateAlreadyExists(t *testing.T) {
200 200
 	}
201 201
 }
202 202
 
203
+func TestLogClosed(t *testing.T) {
204
+	mockClient := newMockClient()
205
+	stream := &logStream{
206
+		client: mockClient,
207
+		closed: true,
208
+	}
209
+	err := stream.Log(&logger.Message{})
210
+	if err == nil {
211
+		t.Fatal("Expected non-nil error")
212
+	}
213
+}
214
+
215
+func TestLogBlocking(t *testing.T) {
216
+	mockClient := newMockClient()
217
+	stream := &logStream{
218
+		client:   mockClient,
219
+		messages: make(chan *logger.Message),
220
+	}
221
+
222
+	errorCh := make(chan error, 1)
223
+	started := make(chan bool)
224
+	go func() {
225
+		started <- true
226
+		err := stream.Log(&logger.Message{})
227
+		errorCh <- err
228
+	}()
229
+	<-started
230
+	select {
231
+	case err := <-errorCh:
232
+		t.Fatal("Expected stream.Log to block: ", err)
233
+	default:
234
+		break
235
+	}
236
+	select {
237
+	case <-stream.messages:
238
+		break
239
+	default:
240
+		t.Fatal("Expected to be able to read from stream.messages but was unable to")
241
+	}
242
+	select {
243
+	case err := <-errorCh:
244
+		if err != nil {
245
+			t.Fatal(err)
246
+		}
247
+	case <-time.After(30 * time.Second):
248
+		t.Fatal("timed out waiting for read")
249
+	}
250
+}
251
+
252
+func TestLogNonBlockingBufferEmpty(t *testing.T) {
253
+	mockClient := newMockClient()
254
+	stream := &logStream{
255
+		client:         mockClient,
256
+		messages:       make(chan *logger.Message, 1),
257
+		logNonBlocking: true,
258
+	}
259
+	err := stream.Log(&logger.Message{})
260
+	if err != nil {
261
+		t.Fatal(err)
262
+	}
263
+}
264
+
265
+func TestLogNonBlockingBufferFull(t *testing.T) {
266
+	mockClient := newMockClient()
267
+	stream := &logStream{
268
+		client:         mockClient,
269
+		messages:       make(chan *logger.Message, 1),
270
+		logNonBlocking: true,
271
+	}
272
+	stream.messages <- &logger.Message{}
273
+	errorCh := make(chan error)
274
+	started := make(chan bool)
275
+	go func() {
276
+		started <- true
277
+		err := stream.Log(&logger.Message{})
278
+		errorCh <- err
279
+	}()
280
+	<-started
281
+	select {
282
+	case err := <-errorCh:
283
+		if err == nil {
284
+			t.Fatal("Expected non-nil error")
285
+		}
286
+	case <-time.After(30 * time.Second):
287
+		t.Fatal("Expected Log call to not block")
288
+	}
289
+}
203 290
 func TestPublishBatchSuccess(t *testing.T) {
204 291
 	mockClient := newMockClient()
205 292
 	stream := &logStream{
... ...
@@ -409,8 +496,9 @@ func TestCollectBatchSimple(t *testing.T) {
409 409
 			C: ticks,
410 410
 		}
411 411
 	}
412
-
413
-	go stream.collectBatch()
412
+	d := make(chan bool)
413
+	close(d)
414
+	go stream.collectBatch(d)
414 415
 
415 416
 	stream.Log(&logger.Message{
416 417
 		Line:      []byte(logline),
... ...
@@ -453,7 +541,9 @@ func TestCollectBatchTicker(t *testing.T) {
453 453
 		}
454 454
 	}
455 455
 
456
-	go stream.collectBatch()
456
+	d := make(chan bool)
457
+	close(d)
458
+	go stream.collectBatch(d)
457 459
 
458 460
 	stream.Log(&logger.Message{
459 461
 		Line:      []byte(logline + " 1"),
... ...
@@ -525,7 +615,9 @@ func TestCollectBatchMultilinePattern(t *testing.T) {
525 525
 		}
526 526
 	}
527 527
 
528
-	go stream.collectBatch()
528
+	d := make(chan bool)
529
+	close(d)
530
+	go stream.collectBatch(d)
529 531
 
530 532
 	stream.Log(&logger.Message{
531 533
 		Line:      []byte(logline),
... ...
@@ -579,7 +671,9 @@ func BenchmarkCollectBatch(b *testing.B) {
579 579
 			}
580 580
 		}
581 581
 
582
-		go stream.collectBatch()
582
+		d := make(chan bool)
583
+		close(d)
584
+		go stream.collectBatch(d)
583 585
 		stream.logGenerator(10, 100)
584 586
 		ticks <- time.Time{}
585 587
 		stream.Close()
... ...
@@ -609,7 +703,9 @@ func BenchmarkCollectBatchMultilinePattern(b *testing.B) {
609 609
 				C: ticks,
610 610
 			}
611 611
 		}
612
-		go stream.collectBatch()
612
+		d := make(chan bool)
613
+		close(d)
614
+		go stream.collectBatch(d)
613 615
 		stream.logGenerator(10, 100)
614 616
 		ticks <- time.Time{}
615 617
 		stream.Close()
... ...
@@ -639,7 +735,9 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) {
639 639
 		}
640 640
 	}
641 641
 
642
-	go stream.collectBatch()
642
+	d := make(chan bool)
643
+	close(d)
644
+	go stream.collectBatch(d)
643 645
 
644 646
 	stream.Log(&logger.Message{
645 647
 		Line:      []byte(logline),
... ...
@@ -701,7 +799,9 @@ func TestCollectBatchMultilinePatternNegativeEventAge(t *testing.T) {
701 701
 		}
702 702
 	}
703 703
 
704
-	go stream.collectBatch()
704
+	d := make(chan bool)
705
+	close(d)
706
+	go stream.collectBatch(d)
705 707
 
706 708
 	stream.Log(&logger.Message{
707 709
 		Line:      []byte(logline),
... ...
@@ -749,7 +849,9 @@ func TestCollectBatchMultilinePatternMaxEventSize(t *testing.T) {
749 749
 		}
750 750
 	}
751 751
 
752
-	go stream.collectBatch()
752
+	d := make(chan bool)
753
+	close(d)
754
+	go stream.collectBatch(d)
753 755
 
754 756
 	// Log max event size
755 757
 	longline := strings.Repeat("A", maximumBytesPerEvent)
... ...
@@ -800,7 +902,9 @@ func TestCollectBatchClose(t *testing.T) {
800 800
 		}
801 801
 	}
802 802
 
803
-	go stream.collectBatch()
803
+	d := make(chan bool)
804
+	close(d)
805
+	go stream.collectBatch(d)
804 806
 
805 807
 	stream.Log(&logger.Message{
806 808
 		Line:      []byte(logline),
... ...
@@ -843,7 +947,9 @@ func TestCollectBatchLineSplit(t *testing.T) {
843 843
 		}
844 844
 	}
845 845
 
846
-	go stream.collectBatch()
846
+	d := make(chan bool)
847
+	close(d)
848
+	go stream.collectBatch(d)
847 849
 
848 850
 	longline := strings.Repeat("A", maximumBytesPerEvent)
849 851
 	stream.Log(&logger.Message{
... ...
@@ -890,7 +996,9 @@ func TestCollectBatchMaxEvents(t *testing.T) {
890 890
 		}
891 891
 	}
892 892
 
893
-	go stream.collectBatch()
893
+	d := make(chan bool)
894
+	close(d)
895
+	go stream.collectBatch(d)
894 896
 
895 897
 	line := "A"
896 898
 	for i := 0; i <= maximumLogEventsPerPut; i++ {
... ...
@@ -945,7 +1053,9 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) {
945 945
 		}
946 946
 	}
947 947
 
948
-	go stream.collectBatch()
948
+	d := make(chan bool)
949
+	close(d)
950
+	go stream.collectBatch(d)
949 951
 
950 952
 	numPayloads := maximumBytesPerPut / (maximumBytesPerEvent + perEventBytes)
951 953
 	// maxline is the maximum line that could be submitted after
... ...
@@ -1024,7 +1134,9 @@ func TestCollectBatchWithDuplicateTimestamps(t *testing.T) {
1024 1024
 		}
1025 1025
 	}
1026 1026
 
1027
-	go stream.collectBatch()
1027
+	d := make(chan bool)
1028
+	close(d)
1029
+	go stream.collectBatch(d)
1028 1030
 
1029 1031
 	times := maximumLogEventsPerPut
1030 1032
 	expectedEvents := []*cloudwatchlogs.InputLogEvent{}