Browse code

awslogs: Use batching type for ergonomics and correct counting

The previous bytes counter was moved out of scope was not counting the
total number of bytes in the batch. This type encapsulates the counter
and the batch for consideration and code ergonomics.

Signed-off-by: Jacob Vallejo <jakeev@amazon.com>

Jacob Vallejo authored on 2017/12/01 09:17:17
Showing 3 changed files
... ...
@@ -95,6 +95,17 @@ func init() {
95 95
 	}
96 96
 }
97 97
 
98
+// eventBatch holds the events that are batched for submission and the
99
+// associated data about it.
100
+//
101
+// Warning: this type is not threadsafe and must not be used
102
+// concurrently. This type is expected to be consumed in a single go
103
+// routine and never concurrently.
104
+type eventBatch struct {
105
+	batch []wrappedEvent
106
+	bytes int
107
+}
108
+
98 109
 // New creates an awslogs logger using the configuration passed in on the
99 110
 // context.  Supported context configuration variables are awslogs-region,
100 111
 // awslogs-group, awslogs-stream, awslogs-create-group, awslogs-multiline-pattern
... ...
@@ -389,32 +400,32 @@ var newTicker = func(freq time.Duration) *time.Ticker {
389 389
 // Logs, the processEvents method is called.  If a multiline pattern is not
390 390
 // configured, log events are submitted to the processEvents method immediately.
391 391
 func (l *logStream) collectBatch() {
392
-	timer := newTicker(batchPublishFrequency)
393
-	var events []wrappedEvent
392
+	ticker := newTicker(batchPublishFrequency)
394 393
 	var eventBuffer []byte
395 394
 	var eventBufferTimestamp int64
395
+	var batch = newEventBatch()
396 396
 	for {
397 397
 		select {
398
-		case t := <-timer.C:
398
+		case t := <-ticker.C:
399 399
 			// If event buffer is older than batch publish frequency flush the event buffer
400 400
 			if eventBufferTimestamp > 0 && len(eventBuffer) > 0 {
401 401
 				eventBufferAge := t.UnixNano()/int64(time.Millisecond) - eventBufferTimestamp
402 402
 				eventBufferExpired := eventBufferAge > int64(batchPublishFrequency)/int64(time.Millisecond)
403 403
 				eventBufferNegative := eventBufferAge < 0
404 404
 				if eventBufferExpired || eventBufferNegative {
405
-					events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
405
+					l.processEvent(batch, eventBuffer, eventBufferTimestamp)
406 406
 					eventBuffer = eventBuffer[:0]
407 407
 				}
408 408
 			}
409
-			l.publishBatch(events)
410
-			events = events[:0]
409
+			l.publishBatch(batch)
410
+			batch.reset()
411 411
 		case msg, more := <-l.messages:
412 412
 			if !more {
413 413
 				// Flush event buffer and release resources
414
-				events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
414
+				l.processEvent(batch, eventBuffer, eventBufferTimestamp)
415 415
 				eventBuffer = eventBuffer[:0]
416
-				l.publishBatch(events)
417
-				events = events[:0]
416
+				l.publishBatch(batch)
417
+				batch.reset()
418 418
 				return
419 419
 			}
420 420
 			if eventBufferTimestamp == 0 {
... ...
@@ -425,7 +436,7 @@ func (l *logStream) collectBatch() {
425 425
 				if l.multilinePattern.Match(unprocessedLine) || len(eventBuffer)+len(unprocessedLine) > maximumBytesPerEvent {
426 426
 					// This is a new log event or we will exceed max bytes per event
427 427
 					// so flush the current eventBuffer to events and reset timestamp
428
-					events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
428
+					l.processEvent(batch, eventBuffer, eventBufferTimestamp)
429 429
 					eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
430 430
 					eventBuffer = eventBuffer[:0]
431 431
 				}
... ...
@@ -434,7 +445,7 @@ func (l *logStream) collectBatch() {
434 434
 				eventBuffer = append(eventBuffer, processedLine...)
435 435
 				logger.PutMessage(msg)
436 436
 			} else {
437
-				events = l.processEvent(events, unprocessedLine, msg.Timestamp.UnixNano()/int64(time.Millisecond))
437
+				l.processEvent(batch, unprocessedLine, msg.Timestamp.UnixNano()/int64(time.Millisecond))
438 438
 				logger.PutMessage(msg)
439 439
 			}
440 440
 		}
... ...
@@ -450,8 +461,7 @@ func (l *logStream) collectBatch() {
450 450
 // bytes per event (defined in maximumBytesPerEvent).  There is a fixed per-event
451 451
 // byte overhead (defined in perEventBytes) which is accounted for in split- and
452 452
 // batch-calculations.
453
-func (l *logStream) processEvent(events []wrappedEvent, unprocessedLine []byte, timestamp int64) []wrappedEvent {
454
-	bytes := 0
453
+func (l *logStream) processEvent(batch *eventBatch, unprocessedLine []byte, timestamp int64) {
455 454
 	for len(unprocessedLine) > 0 {
456 455
 		// Split line length so it does not exceed the maximum
457 456
 		lineBytes := len(unprocessedLine)
... ...
@@ -459,38 +469,33 @@ func (l *logStream) processEvent(events []wrappedEvent, unprocessedLine []byte,
459 459
 			lineBytes = maximumBytesPerEvent
460 460
 		}
461 461
 		line := unprocessedLine[:lineBytes]
462
-		unprocessedLine = unprocessedLine[lineBytes:]
463
-		if (len(events) >= maximumLogEventsPerPut) || (bytes+lineBytes+perEventBytes > maximumBytesPerPut) {
464
-			// Publish an existing batch if it's already over the maximum number of events or if adding this
465
-			// event would push it over the maximum number of total bytes.
466
-			l.publishBatch(events)
467
-			events = events[:0]
468
-			bytes = 0
469
-		}
470
-		events = append(events, wrappedEvent{
462
+
463
+		event := wrappedEvent{
471 464
 			inputLogEvent: &cloudwatchlogs.InputLogEvent{
472 465
 				Message:   aws.String(string(line)),
473 466
 				Timestamp: aws.Int64(timestamp),
474 467
 			},
475
-			insertOrder: len(events),
476
-		})
477
-		bytes += (lineBytes + perEventBytes)
468
+			insertOrder: batch.count(),
469
+		}
470
+
471
+		added := batch.add(event, lineBytes)
472
+		if added {
473
+			unprocessedLine = unprocessedLine[lineBytes:]
474
+		} else {
475
+			l.publishBatch(batch)
476
+			batch.reset()
477
+		}
478 478
 	}
479
-	return events
480 479
 }
481 480
 
482 481
 // publishBatch calls PutLogEvents for a given set of InputLogEvents,
483 482
 // accounting for sequencing requirements (each request must reference the
484 483
 // sequence token returned by the previous request).
485
-func (l *logStream) publishBatch(events []wrappedEvent) {
486
-	if len(events) == 0 {
484
+func (l *logStream) publishBatch(batch *eventBatch) {
485
+	if batch.isEmpty() {
487 486
 		return
488 487
 	}
489
-
490
-	// events in a batch must be sorted by timestamp
491
-	// see http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
492
-	sort.Sort(byTimestamp(events))
493
-	cwEvents := unwrapEvents(events)
488
+	cwEvents := unwrapEvents(batch.events())
494 489
 
495 490
 	nextSequenceToken, err := l.putLogEvents(cwEvents, l.sequenceToken)
496 491
 
... ...
@@ -615,3 +620,70 @@ func unwrapEvents(events []wrappedEvent) []*cloudwatchlogs.InputLogEvent {
615 615
 	}
616 616
 	return cwEvents
617 617
 }
618
+
619
+func newEventBatch() *eventBatch {
620
+	return &eventBatch{
621
+		batch: make([]wrappedEvent, 0),
622
+		bytes: 0,
623
+	}
624
+}
625
+
626
+// events returns a slice of wrappedEvents sorted in order of their
627
+// timestamps and then by their insertion order (see `byTimestamp`).
628
+//
629
+// Warning: this method is not threadsafe and must not be used
630
+// concurrently.
631
+func (b *eventBatch) events() []wrappedEvent {
632
+	sort.Sort(byTimestamp(b.batch))
633
+	return b.batch
634
+}
635
+
636
+// add adds an event to the batch of events accounting for the
637
+// necessary overhead for an event to be logged. An error will be
638
+// returned if the event cannot be added to the batch due to service
639
+// limits.
640
+//
641
+// Warning: this method is not threadsafe and must not be used
642
+// concurrently.
643
+func (b *eventBatch) add(event wrappedEvent, size int) bool {
644
+	addBytes := size + perEventBytes
645
+
646
+	// verify we are still within service limits
647
+	switch {
648
+	case len(b.batch)+1 > maximumLogEventsPerPut:
649
+		return false
650
+	case b.bytes+addBytes > maximumBytesPerPut:
651
+		return false
652
+	}
653
+
654
+	b.bytes += addBytes
655
+	b.batch = append(b.batch, event)
656
+
657
+	return true
658
+}
659
+
660
+// count is the number of batched events.  Warning: this method
661
+// is not threadsafe and must not be used concurrently.
662
+func (b *eventBatch) count() int {
663
+	return len(b.batch)
664
+}
665
+
666
+// size is the total number of bytes that the batch represents.
667
+//
668
+// Warning: this method is not threadsafe and must not be used
669
+// concurrently.
670
+func (b *eventBatch) size() int {
671
+	return b.bytes
672
+}
673
+
674
+func (b *eventBatch) isEmpty() bool {
675
+	zeroEvents := b.count() == 0
676
+	zeroSize := b.size() == 0
677
+	return zeroEvents && zeroSize
678
+}
679
+
680
+// reset prepares the batch for reuse.
681
+func (b *eventBatch) reset() {
682
+	b.bytes = 0
683
+	b.batch = b.batch[:0]
684
+}
... ...
@@ -49,6 +49,15 @@ func (l *logStream) logGenerator(lineCount int, multilineCount int) {
49 49
 	}
50 50
 }
51 51
 
52
+func testEventBatch(events []wrappedEvent) *eventBatch {
53
+	batch := newEventBatch()
54
+	for _, event := range events {
55
+		eventlen := len([]byte(*event.inputLogEvent.Message))
56
+		batch.add(event, eventlen)
57
+	}
58
+	return batch
59
+}
60
+
52 61
 func TestNewAWSLogsClientUserAgentHandler(t *testing.T) {
53 62
 	info := logger.Info{
54 63
 		Config: map[string]string{
... ...
@@ -212,7 +221,7 @@ func TestPublishBatchSuccess(t *testing.T) {
212 212
 		},
213 213
 	}
214 214
 
215
-	stream.publishBatch(events)
215
+	stream.publishBatch(testEventBatch(events))
216 216
 	if stream.sequenceToken == nil {
217 217
 		t.Fatal("Expected non-nil sequenceToken")
218 218
 	}
... ...
@@ -257,7 +266,7 @@ func TestPublishBatchError(t *testing.T) {
257 257
 		},
258 258
 	}
259 259
 
260
-	stream.publishBatch(events)
260
+	stream.publishBatch(testEventBatch(events))
261 261
 	if stream.sequenceToken == nil {
262 262
 		t.Fatal("Expected non-nil sequenceToken")
263 263
 	}
... ...
@@ -291,7 +300,7 @@ func TestPublishBatchInvalidSeqSuccess(t *testing.T) {
291 291
 		},
292 292
 	}
293 293
 
294
-	stream.publishBatch(events)
294
+	stream.publishBatch(testEventBatch(events))
295 295
 	if stream.sequenceToken == nil {
296 296
 		t.Fatal("Expected non-nil sequenceToken")
297 297
 	}
... ...
@@ -354,7 +363,7 @@ func TestPublishBatchAlreadyAccepted(t *testing.T) {
354 354
 		},
355 355
 	}
356 356
 
357
-	stream.publishBatch(events)
357
+	stream.publishBatch(testEventBatch(events))
358 358
 	if stream.sequenceToken == nil {
359 359
 		t.Fatal("Expected non-nil sequenceToken")
360 360
 	}
... ...
@@ -859,7 +868,8 @@ func TestCollectBatchMaxEvents(t *testing.T) {
859 859
 }
860 860
 
861 861
 func TestCollectBatchMaxTotalBytes(t *testing.T) {
862
-	mockClient := newMockClientBuffered(1)
862
+	expectedPuts := 2
863
+	mockClient := newMockClientBuffered(expectedPuts)
863 864
 	stream := &logStream{
864 865
 		client:        mockClient,
865 866
 		logGroupName:  groupName,
... ...
@@ -867,11 +877,14 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) {
867 867
 		sequenceToken: aws.String(sequenceToken),
868 868
 		messages:      make(chan *logger.Message),
869 869
 	}
870
-	mockClient.putLogEventsResult <- &putLogEventsResult{
871
-		successResult: &cloudwatchlogs.PutLogEventsOutput{
872
-			NextSequenceToken: aws.String(nextSequenceToken),
873
-		},
870
+	for i := 0; i < expectedPuts; i++ {
871
+		mockClient.putLogEventsResult <- &putLogEventsResult{
872
+			successResult: &cloudwatchlogs.PutLogEventsOutput{
873
+				NextSequenceToken: aws.String(nextSequenceToken),
874
+			},
875
+		}
874 876
 	}
877
+
875 878
 	var ticks = make(chan time.Time)
876 879
 	newTicker = func(_ time.Duration) *time.Ticker {
877 880
 		return &time.Ticker{
... ...
@@ -881,32 +894,57 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) {
881 881
 
882 882
 	go stream.collectBatch()
883 883
 
884
-	longline := strings.Repeat("A", maximumBytesPerPut)
884
+	numPayloads := maximumBytesPerPut / (maximumBytesPerEvent + perEventBytes)
885
+	// maxline is the maximum line that could be submitted after
886
+	// accounting for its overhead.
887
+	maxline := strings.Repeat("A", maximumBytesPerPut-(perEventBytes*numPayloads))
888
+	// This will be split and batched up to the `maximumBytesPerPut'
889
+	// (+/- `maximumBytesPerEvent'). This /should/ be aligned, but
890
+	// should also tolerate an offset within that range.
885 891
 	stream.Log(&logger.Message{
886
-		Line:      []byte(longline + "B"),
892
+		Line:      []byte(maxline[:len(maxline)/2]),
893
+		Timestamp: time.Time{},
894
+	})
895
+	stream.Log(&logger.Message{
896
+		Line:      []byte(maxline[len(maxline)/2:]),
897
+		Timestamp: time.Time{},
898
+	})
899
+	stream.Log(&logger.Message{
900
+		Line:      []byte("B"),
887 901
 		Timestamp: time.Time{},
888 902
 	})
889 903
 
890
-	// no ticks
904
+	// no ticks, guarantee batch by size (and chan close)
891 905
 	stream.Close()
892 906
 
893 907
 	argument := <-mockClient.putLogEventsArgument
894 908
 	if argument == nil {
895 909
 		t.Fatal("Expected non-nil PutLogEventsInput")
896 910
 	}
897
-	bytes := 0
911
+
912
+	// Should total to the maximum allowed bytes.
913
+	eventBytes := 0
898 914
 	for _, event := range argument.LogEvents {
899
-		bytes += len(*event.Message)
915
+		eventBytes += len(*event.Message)
916
+	}
917
+	eventsOverhead := len(argument.LogEvents) * perEventBytes
918
+	payloadTotal := eventBytes + eventsOverhead
919
+	// lowestMaxBatch allows the payload to be offset if the messages
920
+	// don't lend themselves to align with the maximum event size.
921
+	lowestMaxBatch := maximumBytesPerPut - maximumBytesPerEvent
922
+
923
+	if payloadTotal > maximumBytesPerPut {
924
+		t.Errorf("Expected <= %d bytes but was %d", maximumBytesPerPut, payloadTotal)
900 925
 	}
901
-	if bytes > maximumBytesPerPut {
902
-		t.Errorf("Expected <= %d bytes but was %d", maximumBytesPerPut, bytes)
926
+	if payloadTotal < lowestMaxBatch {
927
+		t.Errorf("Batch to be no less than %d but was %d", lowestMaxBatch, payloadTotal)
903 928
 	}
904 929
 
905 930
 	argument = <-mockClient.putLogEventsArgument
906 931
 	if len(argument.LogEvents) != 1 {
907 932
 		t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents))
908 933
 	}
909
-	message := *argument.LogEvents[0].Message
934
+	message := *argument.LogEvents[len(argument.LogEvents)-1].Message
910 935
 	if message[len(message)-1:] != "B" {
911 936
 		t.Errorf("Expected message to be %s but was %s", "B", message[len(message)-1:])
912 937
 	}
... ...
@@ -1,6 +1,10 @@
1 1
 package awslogs
2 2
 
3
-import "github.com/aws/aws-sdk-go/service/cloudwatchlogs"
3
+import (
4
+	"fmt"
5
+
6
+	"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
7
+)
4 8
 
5 9
 type mockcwlogsclient struct {
6 10
 	createLogGroupArgument  chan *cloudwatchlogs.CreateLogGroupInput
... ...
@@ -67,7 +71,30 @@ func (m *mockcwlogsclient) PutLogEvents(input *cloudwatchlogs.PutLogEventsInput)
67 67
 		LogGroupName:  input.LogGroupName,
68 68
 		LogStreamName: input.LogStreamName,
69 69
 	}
70
+
71
+	// Intended mock output
70 72
 	output := <-m.putLogEventsResult
73
+
74
+	// Checked enforced limits in mock
75
+	totalBytes := 0
76
+	for _, evt := range events {
77
+		if evt.Message == nil {
78
+			continue
79
+		}
80
+		eventBytes := len([]byte(*evt.Message))
81
+		if eventBytes > maximumBytesPerEvent {
82
+			// exceeded per event message size limits
83
+			return nil, fmt.Errorf("maximum bytes per event exceeded: Event too large %d, max allowed: %d", eventBytes, maximumBytesPerEvent)
84
+		}
85
+		// total event bytes including overhead
86
+		totalBytes += eventBytes + perEventBytes
87
+	}
88
+
89
+	if totalBytes > maximumBytesPerPut {
90
+		// exceeded per put maximum size limit
91
+		return nil, fmt.Errorf("maximum bytes per put exceeded: Upload too large %d, max allowed: %d", totalBytes, maximumBytesPerPut)
92
+	}
93
+
71 94
 	return output.successResult, output.errorResult
72 95
 }
73 96