Browse code

Don't append new line for maximum sized events

Signed-off-by: Justin Menga <justin.menga@gmail.com>

Justin Menga authored on 2018/01/21 10:29:55
Showing 2 changed files
... ...
@@ -410,7 +410,7 @@ func (l *logStream) collectBatch() {
410 410
 			// If event buffer is older than batch publish frequency flush the event buffer
411 411
 			if eventBufferTimestamp > 0 && len(eventBuffer) > 0 {
412 412
 				eventBufferAge := t.UnixNano()/int64(time.Millisecond) - eventBufferTimestamp
413
-				eventBufferExpired := eventBufferAge > int64(batchPublishFrequency)/int64(time.Millisecond)
413
+				eventBufferExpired := eventBufferAge >= int64(batchPublishFrequency)/int64(time.Millisecond)
414 414
 				eventBufferNegative := eventBufferAge < 0
415 415
 				if eventBufferExpired || eventBufferNegative {
416 416
 					l.processEvent(batch, eventBuffer, eventBufferTimestamp)
... ...
@@ -431,21 +431,23 @@ func (l *logStream) collectBatch() {
431 431
 			if eventBufferTimestamp == 0 {
432 432
 				eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
433 433
 			}
434
-			unprocessedLine := msg.Line
434
+			line := msg.Line
435 435
 			if l.multilinePattern != nil {
436
-				if l.multilinePattern.Match(unprocessedLine) || len(eventBuffer)+len(unprocessedLine) > maximumBytesPerEvent {
436
+				if l.multilinePattern.Match(line) || len(eventBuffer)+len(line) > maximumBytesPerEvent {
437 437
 					// This is a new log event or we will exceed max bytes per event
438 438
 					// so flush the current eventBuffer to events and reset timestamp
439 439
 					l.processEvent(batch, eventBuffer, eventBufferTimestamp)
440 440
 					eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
441 441
 					eventBuffer = eventBuffer[:0]
442 442
 				}
443
-				// Append new line
444
-				processedLine := append(unprocessedLine, "\n"...)
445
-				eventBuffer = append(eventBuffer, processedLine...)
443
+				// Append new line if event is less than max event size
444
+				if len(line) < maximumBytesPerEvent {
445
+					line = append(line, "\n"...)
446
+				}
447
+				eventBuffer = append(eventBuffer, line...)
446 448
 				logger.PutMessage(msg)
447 449
 			} else {
448
-				l.processEvent(batch, unprocessedLine, msg.Timestamp.UnixNano()/int64(time.Millisecond))
450
+				l.processEvent(batch, line, msg.Timestamp.UnixNano()/int64(time.Millisecond))
449 451
 				logger.PutMessage(msg)
450 452
 			}
451 453
 		}
... ...
@@ -461,14 +463,14 @@ func (l *logStream) collectBatch() {
461 461
 // bytes per event (defined in maximumBytesPerEvent).  There is a fixed per-event
462 462
 // byte overhead (defined in perEventBytes) which is accounted for in split- and
463 463
 // batch-calculations.
464
-func (l *logStream) processEvent(batch *eventBatch, unprocessedLine []byte, timestamp int64) {
465
-	for len(unprocessedLine) > 0 {
464
+func (l *logStream) processEvent(batch *eventBatch, events []byte, timestamp int64) {
465
+	for len(events) > 0 {
466 466
 		// Split line length so it does not exceed the maximum
467
-		lineBytes := len(unprocessedLine)
467
+		lineBytes := len(events)
468 468
 		if lineBytes > maximumBytesPerEvent {
469 469
 			lineBytes = maximumBytesPerEvent
470 470
 		}
471
-		line := unprocessedLine[:lineBytes]
471
+		line := events[:lineBytes]
472 472
 
473 473
 		event := wrappedEvent{
474 474
 			inputLogEvent: &cloudwatchlogs.InputLogEvent{
... ...
@@ -480,7 +482,7 @@ func (l *logStream) processEvent(batch *eventBatch, unprocessedLine []byte, time
480 480
 
481 481
 		added := batch.add(event, lineBytes)
482 482
 		if added {
483
-			unprocessedLine = unprocessedLine[lineBytes:]
483
+			events = events[lineBytes:]
484 484
 		} else {
485 485
 			l.publishBatch(batch)
486 486
 			batch.reset()
... ...
@@ -726,6 +726,59 @@ func TestCollectBatchMultilinePatternNegativeEventAge(t *testing.T) {
726 726
 	stream.Close()
727 727
 }
728 728
 
729
+func TestCollectBatchMultilinePatternMaxEventSize(t *testing.T) {
730
+	mockClient := newMockClient()
731
+	multilinePattern := regexp.MustCompile("xxxx")
732
+	stream := &logStream{
733
+		client:           mockClient,
734
+		logGroupName:     groupName,
735
+		logStreamName:    streamName,
736
+		multilinePattern: multilinePattern,
737
+		sequenceToken:    aws.String(sequenceToken),
738
+		messages:         make(chan *logger.Message),
739
+	}
740
+	mockClient.putLogEventsResult <- &putLogEventsResult{
741
+		successResult: &cloudwatchlogs.PutLogEventsOutput{
742
+			NextSequenceToken: aws.String(nextSequenceToken),
743
+		},
744
+	}
745
+	ticks := make(chan time.Time)
746
+	newTicker = func(_ time.Duration) *time.Ticker {
747
+		return &time.Ticker{
748
+			C: ticks,
749
+		}
750
+	}
751
+
752
+	go stream.collectBatch()
753
+
754
+	// Log max event size
755
+	longline := strings.Repeat("A", maximumBytesPerEvent)
756
+	stream.Log(&logger.Message{
757
+		Line:      []byte(longline),
758
+		Timestamp: time.Now(),
759
+	})
760
+
761
+	// Log short event
762
+	shortline := strings.Repeat("B", 100)
763
+	stream.Log(&logger.Message{
764
+		Line:      []byte(shortline),
765
+		Timestamp: time.Now(),
766
+	})
767
+
768
+	// Fire ticker
769
+	ticks <- time.Now().Add(batchPublishFrequency)
770
+
771
+	// Verify multiline events
772
+	// We expect a maximum sized event with no new line characters and a
773
+	// second short event with a new line character at the end
774
+	argument := <-mockClient.putLogEventsArgument
775
+	assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput")
776
+	assert.Equal(t, 2, len(argument.LogEvents), "Expected two events")
777
+	assert.Equal(t, longline, *argument.LogEvents[0].Message, "Received incorrect multiline message")
778
+	assert.Equal(t, shortline+"\n", *argument.LogEvents[1].Message, "Received incorrect multiline message")
779
+	stream.Close()
780
+}
781
+
729 782
 func TestCollectBatchClose(t *testing.T) {
730 783
 	mockClient := newMockClient()
731 784
 	stream := &logStream{