Browse code

awslogs: account for UTF-8 normalization in limits

The CloudWatch Logs API defines its limits in terms of bytes, but its
inputs in terms of UTF-8 encoded strings. Byte-sequences which are not
valid UTF-8 encodings are normalized to the Unicode replacement
character U+FFFD, which is a 3-byte sequence in UTF-8. This replacement
can cause the input to grow, exceeding the API limit and causing failed
API calls.

This commit adds logic for counting the effective byte length after
normalization and splitting input without splitting valid UTF-8
byte-sequences into two invalid byte-sequences.

Fixes https://github.com/moby/moby/issues/37747

Signed-off-by: Samuel Karp <skarp@amazon.com>

Samuel Karp authored on 2018/10/06 08:30:41
Showing 2 changed files
... ...
@@ -11,6 +11,7 @@ import (
11 11
 	"strings"
12 12
 	"sync"
13 13
 	"time"
14
+	"unicode/utf8"
14 15
 
15 16
 	"github.com/aws/aws-sdk-go/aws"
16 17
 	"github.com/aws/aws-sdk-go/aws/awserr"
... ...
@@ -46,6 +47,10 @@ const (
46 46
 	maximumLogEventsPerPut = 10000
47 47
 
48 48
 	// See: http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
49
+	// Because the events are interpreted as UTF-8 encoded Unicode, invalid UTF-8 byte sequences are replaced with the
50
+	// Unicode replacement character (U+FFFD), which is a 3-byte sequence in UTF-8.  To compensate for that and to avoid
51
+	// splitting valid UTF-8 characters into invalid byte sequences, we calculate the length of each event assuming that
52
+	// this replacement happens.
49 53
 	maximumBytesPerEvent = 262144 - perEventBytes
50 54
 
51 55
 	resourceAlreadyExistsCode = "ResourceAlreadyExistsException"
... ...
@@ -495,15 +500,16 @@ func (l *logStream) collectBatch(created chan bool) {
495 495
 			}
496 496
 			line := msg.Line
497 497
 			if l.multilinePattern != nil {
498
-				if l.multilinePattern.Match(line) || len(eventBuffer)+len(line) > maximumBytesPerEvent {
498
+				lineEffectiveLen := effectiveLen(string(line))
499
+				if l.multilinePattern.Match(line) || effectiveLen(string(eventBuffer))+lineEffectiveLen > maximumBytesPerEvent {
499 500
 					// This is a new log event or we will exceed max bytes per event
500 501
 					// so flush the current eventBuffer to events and reset timestamp
501 502
 					l.processEvent(batch, eventBuffer, eventBufferTimestamp)
502 503
 					eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
503 504
 					eventBuffer = eventBuffer[:0]
504 505
 				}
505
-				// Append new line if event is less than max event size
506
-				if len(line) < maximumBytesPerEvent {
506
+				// Append newline if event is less than max event size
507
+				if lineEffectiveLen < maximumBytesPerEvent {
507 508
 					line = append(line, "\n"...)
508 509
 				}
509 510
 				eventBuffer = append(eventBuffer, line...)
... ...
@@ -524,16 +530,17 @@ func (l *logStream) collectBatch(created chan bool) {
524 524
 // batch (defined in maximumBytesPerPut).  Log messages are split by the maximum
525 525
 // bytes per event (defined in maximumBytesPerEvent).  There is a fixed per-event
526 526
 // byte overhead (defined in perEventBytes) which is accounted for in split- and
527
-// batch-calculations.
528
-func (l *logStream) processEvent(batch *eventBatch, events []byte, timestamp int64) {
529
-	for len(events) > 0 {
527
+// batch-calculations.  Because the events are interpreted as UTF-8 encoded
528
+// Unicode, invalid UTF-8 byte sequences are replaced with the Unicode
529
+// replacement character (U+FFFD), which is a 3-byte sequence in UTF-8.  To
530
+// compensate for that and to avoid splitting valid UTF-8 characters into
531
+// invalid byte sequences, we calculate the length of each event assuming that
532
+// this replacement happens.
533
+func (l *logStream) processEvent(batch *eventBatch, bytes []byte, timestamp int64) {
534
+	for len(bytes) > 0 {
530 535
 		// Split line length so it does not exceed the maximum
531
-		lineBytes := len(events)
532
-		if lineBytes > maximumBytesPerEvent {
533
-			lineBytes = maximumBytesPerEvent
534
-		}
535
-		line := events[:lineBytes]
536
-
536
+		splitOffset, lineBytes := findValidSplit(string(bytes), maximumBytesPerEvent)
537
+		line := bytes[:splitOffset]
537 538
 		event := wrappedEvent{
538 539
 			inputLogEvent: &cloudwatchlogs.InputLogEvent{
539 540
 				Message:   aws.String(string(line)),
... ...
@@ -544,7 +551,7 @@ func (l *logStream) processEvent(batch *eventBatch, events []byte, timestamp int
544 544
 
545 545
 		added := batch.add(event, lineBytes)
546 546
 		if added {
547
-			events = events[lineBytes:]
547
+			bytes = bytes[splitOffset:]
548 548
 		} else {
549 549
 			l.publishBatch(batch)
550 550
 			batch.reset()
... ...
@@ -552,6 +559,37 @@ func (l *logStream) processEvent(batch *eventBatch, events []byte, timestamp int
552 552
 	}
553 553
 }
554 554
 
555
+// effectiveLen counts the effective number of bytes in the string, after
556
+// UTF-8 normalization.  UTF-8 normalization includes replacing bytes that do
557
+// not constitute valid UTF-8 encoded Unicode codepoints with the Unicode
558
+// replacement codepoint U+FFFD (a 3-byte UTF-8 sequence, represented in Go as
559
+// utf8.RuneError)
560
+func effectiveLen(line string) int {
561
+	effectiveBytes := 0
562
+	for _, rune := range line {
563
+		effectiveBytes += utf8.RuneLen(rune)
564
+	}
565
+	return effectiveBytes
566
+}
567
+
568
+// findValidSplit finds the byte offset to split a string without breaking valid
569
+// Unicode codepoints given a maximum number of total bytes.  findValidSplit
570
+// returns the byte offset for splitting a string or []byte, as well as the
571
+// effective number of bytes if the string were normalized to replace invalid
572
+// UTF-8 encoded bytes with the Unicode replacement character (a 3-byte UTF-8
573
+// sequence, represented in Go as utf8.RuneError)
574
+func findValidSplit(line string, maxBytes int) (splitOffset, effectiveBytes int) {
575
+	for offset, rune := range line {
576
+		splitOffset = offset
577
+		if effectiveBytes+utf8.RuneLen(rune) > maxBytes {
578
+			return splitOffset, effectiveBytes
579
+		}
580
+		effectiveBytes += utf8.RuneLen(rune)
581
+	}
582
+	splitOffset = len(line)
583
+	return
584
+}
585
+
555 586
 // publishBatch calls PutLogEvents for a given set of InputLogEvents,
556 587
 // accounting for sequencing requirements (each request must reference the
557 588
 // sequence token returned by the previous request).
... ...
@@ -938,6 +938,62 @@ func TestCollectBatchClose(t *testing.T) {
938 938
 	}
939 939
 }
940 940
 
941
+func TestEffectiveLen(t *testing.T) {
942
+	tests := []struct {
943
+		str            string
944
+		effectiveBytes int
945
+	}{
946
+		{"Hello", 5},
947
+		{string([]byte{1, 2, 3, 4}), 4},
948
+		{"🙃", 4},
949
+		{string([]byte{0xFF, 0xFF, 0xFF, 0xFF}), 12},
950
+		{"He\xff\xffo", 9},
951
+		{"", 0},
952
+	}
953
+	for i, tc := range tests {
954
+		t.Run(fmt.Sprintf("%d/%s", i, tc.str), func(t *testing.T) {
955
+			assert.Equal(t, tc.effectiveBytes, effectiveLen(tc.str))
956
+		})
957
+	}
958
+}
959
+
960
+func TestFindValidSplit(t *testing.T) {
961
+	tests := []struct {
962
+		str               string
963
+		maxEffectiveBytes int
964
+		splitOffset       int
965
+		effectiveBytes    int
966
+	}{
967
+		{"", 10, 0, 0},
968
+		{"Hello", 6, 5, 5},
969
+		{"Hello", 2, 2, 2},
970
+		{"Hello", 0, 0, 0},
971
+		{"🙃", 3, 0, 0},
972
+		{"🙃", 4, 4, 4},
973
+		{string([]byte{'a', 0xFF}), 2, 1, 1},
974
+		{string([]byte{'a', 0xFF}), 4, 2, 4},
975
+	}
976
+	for i, tc := range tests {
977
+		t.Run(fmt.Sprintf("%d/%s", i, tc.str), func(t *testing.T) {
978
+			splitOffset, effectiveBytes := findValidSplit(tc.str, tc.maxEffectiveBytes)
979
+			assert.Equal(t, tc.splitOffset, splitOffset, "splitOffset")
980
+			assert.Equal(t, tc.effectiveBytes, effectiveBytes, "effectiveBytes")
981
+			t.Log(tc.str[:tc.splitOffset])
982
+			t.Log(tc.str[tc.splitOffset:])
983
+		})
984
+	}
985
+}
986
+
987
+func TestProcessEventEmoji(t *testing.T) {
988
+	stream := &logStream{}
989
+	batch := &eventBatch{}
990
+	bytes := []byte(strings.Repeat("🙃", maximumBytesPerEvent/4+1))
991
+	stream.processEvent(batch, bytes, 0)
992
+	assert.Equal(t, 2, len(batch.batch), "should be two events in the batch")
993
+	assert.Equal(t, strings.Repeat("🙃", maximumBytesPerEvent/4), aws.StringValue(batch.batch[0].inputLogEvent.Message))
994
+	assert.Equal(t, "🙃", aws.StringValue(batch.batch[1].inputLogEvent.Message))
995
+}
996
+
941 997
 func TestCollectBatchLineSplit(t *testing.T) {
942 998
 	mockClient := newMockClient()
943 999
 	stream := &logStream{
... ...
@@ -987,6 +1043,55 @@ func TestCollectBatchLineSplit(t *testing.T) {
987 987
 	}
988 988
 }
989 989
 
990
+func TestCollectBatchLineSplitWithBinary(t *testing.T) {
991
+	mockClient := newMockClient()
992
+	stream := &logStream{
993
+		client:        mockClient,
994
+		logGroupName:  groupName,
995
+		logStreamName: streamName,
996
+		sequenceToken: aws.String(sequenceToken),
997
+		messages:      make(chan *logger.Message),
998
+	}
999
+	mockClient.putLogEventsResult <- &putLogEventsResult{
1000
+		successResult: &cloudwatchlogs.PutLogEventsOutput{
1001
+			NextSequenceToken: aws.String(nextSequenceToken),
1002
+		},
1003
+	}
1004
+	var ticks = make(chan time.Time)
1005
+	newTicker = func(_ time.Duration) *time.Ticker {
1006
+		return &time.Ticker{
1007
+			C: ticks,
1008
+		}
1009
+	}
1010
+
1011
+	d := make(chan bool)
1012
+	close(d)
1013
+	go stream.collectBatch(d)
1014
+
1015
+	longline := strings.Repeat("\xFF", maximumBytesPerEvent/3) // 0xFF is counted as the 3-byte utf8.RuneError
1016
+	stream.Log(&logger.Message{
1017
+		Line:      []byte(longline + "\xFD"),
1018
+		Timestamp: time.Time{},
1019
+	})
1020
+
1021
+	// no ticks
1022
+	stream.Close()
1023
+
1024
+	argument := <-mockClient.putLogEventsArgument
1025
+	if argument == nil {
1026
+		t.Fatal("Expected non-nil PutLogEventsInput")
1027
+	}
1028
+	if len(argument.LogEvents) != 2 {
1029
+		t.Errorf("Expected LogEvents to contain 2 elements, but contains %d", len(argument.LogEvents))
1030
+	}
1031
+	if *argument.LogEvents[0].Message != longline {
1032
+		t.Errorf("Expected message to be %s but was %s", longline, *argument.LogEvents[0].Message)
1033
+	}
1034
+	if *argument.LogEvents[1].Message != "\xFD" {
1035
+		t.Errorf("Expected message to be %s but was %s", "\xFD", *argument.LogEvents[1].Message)
1036
+	}
1037
+}
1038
+
990 1039
 func TestCollectBatchMaxEvents(t *testing.T) {
991 1040
 	mockClient := newMockClientBuffered(1)
992 1041
 	stream := &logStream{
... ...
@@ -1125,6 +1230,83 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) {
1125 1125
 	}
1126 1126
 }
1127 1127
 
1128
+func TestCollectBatchMaxTotalBytesWithBinary(t *testing.T) {
1129
+	expectedPuts := 2
1130
+	mockClient := newMockClientBuffered(expectedPuts)
1131
+	stream := &logStream{
1132
+		client:        mockClient,
1133
+		logGroupName:  groupName,
1134
+		logStreamName: streamName,
1135
+		sequenceToken: aws.String(sequenceToken),
1136
+		messages:      make(chan *logger.Message),
1137
+	}
1138
+	for i := 0; i < expectedPuts; i++ {
1139
+		mockClient.putLogEventsResult <- &putLogEventsResult{
1140
+			successResult: &cloudwatchlogs.PutLogEventsOutput{
1141
+				NextSequenceToken: aws.String(nextSequenceToken),
1142
+			},
1143
+		}
1144
+	}
1145
+
1146
+	var ticks = make(chan time.Time)
1147
+	newTicker = func(_ time.Duration) *time.Ticker {
1148
+		return &time.Ticker{
1149
+			C: ticks,
1150
+		}
1151
+	}
1152
+
1153
+	d := make(chan bool)
1154
+	close(d)
1155
+	go stream.collectBatch(d)
1156
+
1157
+	// maxline is the maximum line that could be submitted after
1158
+	// accounting for its overhead.
1159
+	maxline := strings.Repeat("\xFF", (maximumBytesPerPut-perEventBytes)/3) // 0xFF is counted as the 3-byte utf8.RuneError
1160
+	// This will be split and batched up to the `maximumBytesPerPut'
1161
+	// (+/- `maximumBytesPerEvent'). This /should/ be aligned, but
1162
+	// should also tolerate an offset within that range.
1163
+	stream.Log(&logger.Message{
1164
+		Line:      []byte(maxline),
1165
+		Timestamp: time.Time{},
1166
+	})
1167
+	stream.Log(&logger.Message{
1168
+		Line:      []byte("B"),
1169
+		Timestamp: time.Time{},
1170
+	})
1171
+
1172
+	// no ticks, guarantee batch by size (and chan close)
1173
+	stream.Close()
1174
+
1175
+	argument := <-mockClient.putLogEventsArgument
1176
+	if argument == nil {
1177
+		t.Fatal("Expected non-nil PutLogEventsInput")
1178
+	}
1179
+
1180
+	// Should total to the maximum allowed bytes.
1181
+	eventBytes := 0
1182
+	for _, event := range argument.LogEvents {
1183
+		eventBytes += effectiveLen(*event.Message)
1184
+	}
1185
+	eventsOverhead := len(argument.LogEvents) * perEventBytes
1186
+	payloadTotal := eventBytes + eventsOverhead
1187
+	// lowestMaxBatch allows the payload to be offset if the messages
1188
+	// don't lend themselves to align with the maximum event size.
1189
+	lowestMaxBatch := maximumBytesPerPut - maximumBytesPerEvent
1190
+
1191
+	if payloadTotal > maximumBytesPerPut {
1192
+		t.Errorf("Expected <= %d bytes but was %d", maximumBytesPerPut, payloadTotal)
1193
+	}
1194
+	if payloadTotal < lowestMaxBatch {
1195
+		t.Errorf("Batch to be no less than %d but was %d", lowestMaxBatch, payloadTotal)
1196
+	}
1197
+
1198
+	argument = <-mockClient.putLogEventsArgument
1199
+	message := *argument.LogEvents[len(argument.LogEvents)-1].Message
1200
+	if message[len(message)-1:] != "B" {
1201
+		t.Errorf("Expected message to be %s but was %s", "B", message[len(message)-1:])
1202
+	}
1203
+}
1204
+
1128 1205
 func TestCollectBatchWithDuplicateTimestamps(t *testing.T) {
1129 1206
 	mockClient := newMockClient()
1130 1207
 	stream := &logStream{