Browse code

Merge pull request #36522 from IRCody/awslogs-non-blocking

Allow awslogs to use non-blocking mode

Brian Goff authored on 2018/05/02 05:30:52
Showing 2 changed files
... ...
@@ -61,6 +61,7 @@ type logStream struct {
61 61
 	logStreamName    string
62 62
 	logGroupName     string
63 63
 	logCreateGroup   bool
64
+	logNonBlocking   bool
64 65
 	multilinePattern *regexp.Regexp
65 66
 	client           api
66 67
 	messages         chan *logger.Message
... ...
@@ -129,6 +130,8 @@ func New(info logger.Info) (logger.Logger, error) {
129 129
 		}
130 130
 	}
131 131
 
132
+	logNonBlocking := info.Config["mode"] == "non-blocking"
133
+
132 134
 	if info.Config[logStreamKey] != "" {
133 135
 		logStreamName = info.Config[logStreamKey]
134 136
 	}
... ...
@@ -142,19 +145,54 @@ func New(info logger.Info) (logger.Logger, error) {
142 142
 	if err != nil {
143 143
 		return nil, err
144 144
 	}
145
+
145 146
 	containerStream := &logStream{
146 147
 		logStreamName:    logStreamName,
147 148
 		logGroupName:     logGroupName,
148 149
 		logCreateGroup:   logCreateGroup,
150
+		logNonBlocking:   logNonBlocking,
149 151
 		multilinePattern: multilinePattern,
150 152
 		client:           client,
151 153
 		messages:         make(chan *logger.Message, 4096),
152 154
 	}
153
-	err = containerStream.create()
154
-	if err != nil {
155
-		return nil, err
155
+
156
+	creationDone := make(chan bool)
157
+	if logNonBlocking {
158
+		go func() {
159
+			backoff := 1
160
+			maxBackoff := 32
161
+			for {
162
+				// If logger is closed we are done
163
+				containerStream.lock.RLock()
164
+				if containerStream.closed {
165
+					containerStream.lock.RUnlock()
166
+					break
167
+				}
168
+				containerStream.lock.RUnlock()
169
+				err := containerStream.create()
170
+				if err == nil {
171
+					break
172
+				}
173
+
174
+				time.Sleep(time.Duration(backoff) * time.Second)
175
+				if backoff < maxBackoff {
176
+					backoff *= 2
177
+				}
178
+				logrus.
179
+					WithError(err).
180
+					WithField("container-id", info.ContainerID).
181
+					WithField("container-name", info.ContainerName).
182
+					Error("Error while trying to initialize awslogs. Retrying in: ", backoff, " seconds")
183
+			}
184
+			close(creationDone)
185
+		}()
186
+	} else {
187
+		if err = containerStream.create(); err != nil {
188
+			return nil, err
189
+		}
190
+		close(creationDone)
156 191
 	}
157
-	go containerStream.collectBatch()
192
+	go containerStream.collectBatch(creationDone)
158 193
 
159 194
 	return containerStream, nil
160 195
 }
... ...
@@ -296,9 +334,18 @@ func (l *logStream) BufSize() int {
296 296
 func (l *logStream) Log(msg *logger.Message) error {
297 297
 	l.lock.RLock()
298 298
 	defer l.lock.RUnlock()
299
-	if !l.closed {
300
-		l.messages <- msg
299
+	if l.closed {
300
+		return errors.New("awslogs is closed")
301
+	}
302
+	if l.logNonBlocking {
303
+		select {
304
+		case l.messages <- msg:
305
+			return nil
306
+		default:
307
+			return errors.New("awslogs buffer is full")
308
+		}
301 309
 	}
310
+	l.messages <- msg
302 311
 	return nil
303 312
 }
304 313
 
... ...
@@ -324,7 +371,9 @@ func (l *logStream) create() error {
324 324
 				return l.createLogStream()
325 325
 			}
326 326
 		}
327
-		return err
327
+		if err != nil {
328
+			return err
329
+		}
328 330
 	}
329 331
 
330 332
 	return nil
... ...
@@ -401,7 +450,9 @@ var newTicker = func(freq time.Duration) *time.Ticker {
401 401
 // seconds.  When events are ready to be processed for submission to CloudWatch
402 402
 // Logs, the processEvents method is called.  If a multiline pattern is not
403 403
 // configured, log events are submitted to the processEvents method immediately.
404
-func (l *logStream) collectBatch() {
404
+func (l *logStream) collectBatch(created chan bool) {
405
+	// Wait for the logstream/group to be created
406
+	<-created
405 407
 	ticker := newTicker(batchPublishFrequency)
406 408
 	var eventBuffer []byte
407 409
 	var eventBufferTimestamp int64
... ...
@@ -201,6 +201,93 @@ func TestCreateAlreadyExists(t *testing.T) {
201 201
 	}
202 202
 }
203 203
 
204
+func TestLogClosed(t *testing.T) {
205
+	mockClient := newMockClient()
206
+	stream := &logStream{
207
+		client: mockClient,
208
+		closed: true,
209
+	}
210
+	err := stream.Log(&logger.Message{})
211
+	if err == nil {
212
+		t.Fatal("Expected non-nil error")
213
+	}
214
+}
215
+
216
+func TestLogBlocking(t *testing.T) {
217
+	mockClient := newMockClient()
218
+	stream := &logStream{
219
+		client:   mockClient,
220
+		messages: make(chan *logger.Message),
221
+	}
222
+
223
+	errorCh := make(chan error, 1)
224
+	started := make(chan bool)
225
+	go func() {
226
+		started <- true
227
+		err := stream.Log(&logger.Message{})
228
+		errorCh <- err
229
+	}()
230
+	<-started
231
+	select {
232
+	case err := <-errorCh:
233
+		t.Fatal("Expected stream.Log to block: ", err)
234
+	default:
235
+		break
236
+	}
237
+	select {
238
+	case <-stream.messages:
239
+		break
240
+	default:
241
+		t.Fatal("Expected to be able to read from stream.messages but was unable to")
242
+	}
243
+	select {
244
+	case err := <-errorCh:
245
+		if err != nil {
246
+			t.Fatal(err)
247
+		}
248
+	case <-time.After(30 * time.Second):
249
+		t.Fatal("timed out waiting for read")
250
+	}
251
+}
252
+
253
+func TestLogNonBlockingBufferEmpty(t *testing.T) {
254
+	mockClient := newMockClient()
255
+	stream := &logStream{
256
+		client:         mockClient,
257
+		messages:       make(chan *logger.Message, 1),
258
+		logNonBlocking: true,
259
+	}
260
+	err := stream.Log(&logger.Message{})
261
+	if err != nil {
262
+		t.Fatal(err)
263
+	}
264
+}
265
+
266
+func TestLogNonBlockingBufferFull(t *testing.T) {
267
+	mockClient := newMockClient()
268
+	stream := &logStream{
269
+		client:         mockClient,
270
+		messages:       make(chan *logger.Message, 1),
271
+		logNonBlocking: true,
272
+	}
273
+	stream.messages <- &logger.Message{}
274
+	errorCh := make(chan error)
275
+	started := make(chan bool)
276
+	go func() {
277
+		started <- true
278
+		err := stream.Log(&logger.Message{})
279
+		errorCh <- err
280
+	}()
281
+	<-started
282
+	select {
283
+	case err := <-errorCh:
284
+		if err == nil {
285
+			t.Fatal("Expected non-nil error")
286
+		}
287
+	case <-time.After(30 * time.Second):
288
+		t.Fatal("Expected Log call to not block")
289
+	}
290
+}
204 291
 func TestPublishBatchSuccess(t *testing.T) {
205 292
 	mockClient := newMockClient()
206 293
 	stream := &logStream{
... ...
@@ -410,8 +497,9 @@ func TestCollectBatchSimple(t *testing.T) {
410 410
 			C: ticks,
411 411
 		}
412 412
 	}
413
-
414
-	go stream.collectBatch()
413
+	d := make(chan bool)
414
+	close(d)
415
+	go stream.collectBatch(d)
415 416
 
416 417
 	stream.Log(&logger.Message{
417 418
 		Line:      []byte(logline),
... ...
@@ -454,7 +542,9 @@ func TestCollectBatchTicker(t *testing.T) {
454 454
 		}
455 455
 	}
456 456
 
457
-	go stream.collectBatch()
457
+	d := make(chan bool)
458
+	close(d)
459
+	go stream.collectBatch(d)
458 460
 
459 461
 	stream.Log(&logger.Message{
460 462
 		Line:      []byte(logline + " 1"),
... ...
@@ -526,7 +616,9 @@ func TestCollectBatchMultilinePattern(t *testing.T) {
526 526
 		}
527 527
 	}
528 528
 
529
-	go stream.collectBatch()
529
+	d := make(chan bool)
530
+	close(d)
531
+	go stream.collectBatch(d)
530 532
 
531 533
 	stream.Log(&logger.Message{
532 534
 		Line:      []byte(logline),
... ...
@@ -580,7 +672,9 @@ func BenchmarkCollectBatch(b *testing.B) {
580 580
 			}
581 581
 		}
582 582
 
583
-		go stream.collectBatch()
583
+		d := make(chan bool)
584
+		close(d)
585
+		go stream.collectBatch(d)
584 586
 		stream.logGenerator(10, 100)
585 587
 		ticks <- time.Time{}
586 588
 		stream.Close()
... ...
@@ -610,7 +704,9 @@ func BenchmarkCollectBatchMultilinePattern(b *testing.B) {
610 610
 				C: ticks,
611 611
 			}
612 612
 		}
613
-		go stream.collectBatch()
613
+		d := make(chan bool)
614
+		close(d)
615
+		go stream.collectBatch(d)
614 616
 		stream.logGenerator(10, 100)
615 617
 		ticks <- time.Time{}
616 618
 		stream.Close()
... ...
@@ -640,7 +736,9 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) {
640 640
 		}
641 641
 	}
642 642
 
643
-	go stream.collectBatch()
643
+	d := make(chan bool)
644
+	close(d)
645
+	go stream.collectBatch(d)
644 646
 
645 647
 	stream.Log(&logger.Message{
646 648
 		Line:      []byte(logline),
... ...
@@ -702,7 +800,9 @@ func TestCollectBatchMultilinePatternNegativeEventAge(t *testing.T) {
702 702
 		}
703 703
 	}
704 704
 
705
-	go stream.collectBatch()
705
+	d := make(chan bool)
706
+	close(d)
707
+	go stream.collectBatch(d)
706 708
 
707 709
 	stream.Log(&logger.Message{
708 710
 		Line:      []byte(logline),
... ...
@@ -750,7 +850,9 @@ func TestCollectBatchMultilinePatternMaxEventSize(t *testing.T) {
750 750
 		}
751 751
 	}
752 752
 
753
-	go stream.collectBatch()
753
+	d := make(chan bool)
754
+	close(d)
755
+	go stream.collectBatch(d)
754 756
 
755 757
 	// Log max event size
756 758
 	longline := strings.Repeat("A", maximumBytesPerEvent)
... ...
@@ -801,7 +903,9 @@ func TestCollectBatchClose(t *testing.T) {
801 801
 		}
802 802
 	}
803 803
 
804
-	go stream.collectBatch()
804
+	d := make(chan bool)
805
+	close(d)
806
+	go stream.collectBatch(d)
805 807
 
806 808
 	stream.Log(&logger.Message{
807 809
 		Line:      []byte(logline),
... ...
@@ -844,7 +948,9 @@ func TestCollectBatchLineSplit(t *testing.T) {
844 844
 		}
845 845
 	}
846 846
 
847
-	go stream.collectBatch()
847
+	d := make(chan bool)
848
+	close(d)
849
+	go stream.collectBatch(d)
848 850
 
849 851
 	longline := strings.Repeat("A", maximumBytesPerEvent)
850 852
 	stream.Log(&logger.Message{
... ...
@@ -891,7 +997,9 @@ func TestCollectBatchMaxEvents(t *testing.T) {
891 891
 		}
892 892
 	}
893 893
 
894
-	go stream.collectBatch()
894
+	d := make(chan bool)
895
+	close(d)
896
+	go stream.collectBatch(d)
895 897
 
896 898
 	line := "A"
897 899
 	for i := 0; i <= maximumLogEventsPerPut; i++ {
... ...
@@ -946,7 +1054,9 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) {
946 946
 		}
947 947
 	}
948 948
 
949
-	go stream.collectBatch()
949
+	d := make(chan bool)
950
+	close(d)
951
+	go stream.collectBatch(d)
950 952
 
951 953
 	numPayloads := maximumBytesPerPut / (maximumBytesPerEvent + perEventBytes)
952 954
 	// maxline is the maximum line that could be submitted after
... ...
@@ -1025,7 +1135,9 @@ func TestCollectBatchWithDuplicateTimestamps(t *testing.T) {
1025 1025
 		}
1026 1026
 	}
1027 1027
 
1028
-	go stream.collectBatch()
1028
+	d := make(chan bool)
1029
+	close(d)
1030
+	go stream.collectBatch(d)
1029 1031
 
1030 1032
 	times := maximumLogEventsPerPut
1031 1033
 	expectedEvents := []*cloudwatchlogs.InputLogEvent{}