Browse code

Make bytesPipe use linear allocations

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>

Tonis Tiigi authored on 2015/09/17 09:51:11
Showing 3 changed files
... ...
@@ -1,15 +1,16 @@
1 1
 package ioutils
2 2
 
3
-const maxCap = 10 * 1e6
3
+const maxCap = 1e6
4 4
 
5
-// BytesPipe is io.ReadWriter which works similary to pipe(queue).
6
-// All written data could be read only once. Also BytesPipe trying to adjust
7
-// internal []byte slice to current needs, so there won't be overgrown buffer
8
-// after highload peak.
5
+// BytesPipe is io.ReadWriter which works similarly to pipe(queue).
6
+// All written data could be read only once. Also BytesPipe is allocating
7
+// and releasing new byte slices to adjust to current needs, so there won't be
8
+// overgrown buffer after high load peak.
9 9
 // BytesPipe isn't goroutine-safe, caller must synchronize it if needed.
10 10
 type BytesPipe struct {
11
-	buf      []byte
12
-	lastRead int
11
+	buf      [][]byte // slice of byte-slices of buffered data
12
+	lastRead int      // index in the first slice to a read point
13
+	bufLen   int      // length of data buffered over the slices
13 14
 }
14 15
 
15 16
 // NewBytesPipe creates new BytesPipe, initialized by specified slice.
... ...
@@ -20,63 +21,69 @@ func NewBytesPipe(buf []byte) *BytesPipe {
20 20
 		buf = make([]byte, 0, 64)
21 21
 	}
22 22
 	return &BytesPipe{
23
-		buf: buf[:0],
24
-	}
25
-}
26
-
27
-func (bp *BytesPipe) grow(n int) {
28
-	if len(bp.buf)+n > cap(bp.buf) {
29
-		// not enough space
30
-		var buf []byte
31
-		remain := bp.len()
32
-		if remain+n <= cap(bp.buf)/2 {
33
-			// enough space in current buffer, just move data to head
34
-			copy(bp.buf, bp.buf[bp.lastRead:])
35
-			buf = bp.buf[:remain]
36
-		} else {
37
-			// reallocate buffer
38
-			buf = make([]byte, remain, 2*cap(bp.buf)+n)
39
-			copy(buf, bp.buf[bp.lastRead:])
40
-		}
41
-		bp.buf = buf
42
-		bp.lastRead = 0
23
+		buf: [][]byte{buf[:0]},
43 24
 	}
44 25
 }
45 26
 
46 27
 // Write writes p to BytesPipe.
47
-// It can increase cap of internal []byte slice in a process of writing.
28
+// It can allocate new []byte slices in a process of writing.
48 29
 func (bp *BytesPipe) Write(p []byte) (n int, err error) {
49
-	bp.grow(len(p))
50
-	bp.buf = append(bp.buf, p...)
30
+	for {
31
+		// write data to the last buffer
32
+		b := bp.buf[len(bp.buf)-1]
33
+		// copy data to the current empty allocated area
34
+		n := copy(b[len(b):cap(b)], p)
35
+		// increment buffered data length
36
+		bp.bufLen += n
37
+		// include written data in last buffer
38
+		bp.buf[len(bp.buf)-1] = b[:len(b)+n]
39
+
40
+		// if there was enough room to write all then break
41
+		if len(p) == n {
42
+			break
43
+		}
44
+
45
+		// more data: write to the next slice
46
+		p = p[n:]
47
+		// allocate slice that has twice the size of the last unless maximum reached
48
+		nextCap := 2 * cap(bp.buf[len(bp.buf)-1])
49
+		if maxCap < nextCap {
50
+			nextCap = maxCap
51
+		}
52
+		// add new byte slice to the buffers slice and continue writing
53
+		bp.buf = append(bp.buf, make([]byte, 0, nextCap))
54
+	}
51 55
 	return
52 56
 }
53 57
 
54 58
 func (bp *BytesPipe) len() int {
55
-	return len(bp.buf) - bp.lastRead
56
-}
57
-
58
-func (bp *BytesPipe) crop() {
59
-	// shortcut for empty buffer
60
-	if bp.lastRead == len(bp.buf) {
61
-		bp.lastRead = 0
62
-		bp.buf = bp.buf[:0]
63
-	}
64
-	r := bp.len()
65
-	// if we have too large buffer for too small data
66
-	if cap(bp.buf) > maxCap && r < cap(bp.buf)/10 {
67
-		copy(bp.buf, bp.buf[bp.lastRead:])
68
-		// will use same underlying slice until reach cap
69
-		bp.buf = bp.buf[:r : cap(bp.buf)/2]
70
-		bp.lastRead = 0
71
-	}
59
+	return bp.bufLen - bp.lastRead
72 60
 }
73 61
 
74 62
 // Read reads bytes from BytesPipe.
75 63
 // Data could be read only once.
76
-// Internal []byte slice could be shrinked.
77 64
 func (bp *BytesPipe) Read(p []byte) (n int, err error) {
78
-	n = copy(p, bp.buf[bp.lastRead:])
79
-	bp.lastRead += n
80
-	bp.crop()
65
+	for {
66
+		read := copy(p, bp.buf[0][bp.lastRead:])
67
+		n += read
68
+		bp.lastRead += read
69
+		if bp.len() == 0 {
70
+			// we have read everything. reset to the beginning.
71
+			bp.lastRead = 0
72
+			bp.bufLen -= len(bp.buf[0])
73
+			bp.buf[0] = bp.buf[0][:0]
74
+			break
75
+		}
76
+		// break if everything was read
77
+		if len(p) == read {
78
+			break
79
+		}
80
+		// more buffered data and more asked. read from next slice.
81
+		p = p[read:]
82
+		bp.lastRead = 0
83
+		bp.bufLen -= len(bp.buf[0])
84
+		bp.buf[0] = nil     // throw away old slice
85
+		bp.buf = bp.buf[1:] // switch to next
86
+	}
81 87
 	return
82 88
 }
... ...
@@ -1,6 +1,10 @@
1 1
 package ioutils
2 2
 
3
-import "testing"
3
+import (
4
+	"crypto/sha1"
5
+	"encoding/hex"
6
+	"testing"
7
+)
4 8
 
5 9
 func TestBytesPipeRead(t *testing.T) {
6 10
 	buf := NewBytesPipe(nil)
... ...
@@ -49,11 +53,67 @@ func TestBytesPipeWrite(t *testing.T) {
49 49
 	buf.Write([]byte("56"))
50 50
 	buf.Write([]byte("78"))
51 51
 	buf.Write([]byte("90"))
52
-	if string(buf.buf) != "1234567890" {
52
+	if string(buf.buf[0]) != "1234567890" {
53 53
 		t.Fatalf("Buffer %s, must be %s", buf.buf, "1234567890")
54 54
 	}
55 55
 }
56 56
 
57
+// Write and read in different speeds/chunk sizes and check valid data is read.
58
+func TestBytesPipeWriteRandomChunks(t *testing.T) {
59
+	cases := []struct{ iterations, writesPerLoop, readsPerLoop int }{
60
+		{100, 10, 1},
61
+		{1000, 10, 5},
62
+		{1000, 100, 0},
63
+		{1000, 5, 6},
64
+		{10000, 50, 25},
65
+	}
66
+
67
+	testMessage := []byte("this is a random string for testing")
68
+	// random slice sizes to read and write
69
+	writeChunks := []int{25, 35, 15, 20}
70
+	readChunks := []int{5, 45, 20, 25}
71
+
72
+	for _, c := range cases {
73
+		// first pass: write directly to hash
74
+		hash := sha1.New()
75
+		for i := 0; i < c.iterations*c.writesPerLoop; i++ {
76
+			if _, err := hash.Write(testMessage[:writeChunks[i%len(writeChunks)]]); err != nil {
77
+				t.Fatal(err)
78
+			}
79
+		}
80
+		expected := hex.EncodeToString(hash.Sum(nil))
81
+
82
+		// write/read through buffer
83
+		buf := NewBytesPipe(nil)
84
+		hash.Reset()
85
+		for i := 0; i < c.iterations; i++ {
86
+			for w := 0; w < c.writesPerLoop; w++ {
87
+				buf.Write(testMessage[:writeChunks[(i*c.writesPerLoop+w)%len(writeChunks)]])
88
+			}
89
+			for r := 0; r < c.readsPerLoop; r++ {
90
+				p := make([]byte, readChunks[(i*c.readsPerLoop+r)%len(readChunks)])
91
+				n, _ := buf.Read(p)
92
+				hash.Write(p[:n])
93
+			}
94
+		}
95
+		// read rest of the data from buffer
96
+		for i := 0; ; i++ {
97
+			p := make([]byte, readChunks[(c.iterations*c.readsPerLoop+i)%len(readChunks)])
98
+			n, _ := buf.Read(p)
99
+			if n == 0 {
100
+				break
101
+			}
102
+			hash.Write(p[:n])
103
+		}
104
+		actual := hex.EncodeToString(hash.Sum(nil))
105
+
106
+		if expected != actual {
107
+			t.Fatalf("BytesPipe returned invalid data. Expected checksum %v, got %v", expected, actual)
108
+		}
109
+
110
+	}
111
+}
112
+
57 113
 func BenchmarkBytesPipeWrite(b *testing.B) {
58 114
 	for i := 0; i < b.N; i++ {
59 115
 		buf := NewBytesPipe(nil)
... ...
@@ -5,12 +5,8 @@ import (
5 5
 	"encoding/hex"
6 6
 	"io"
7 7
 	"sync"
8
-
9
-	"github.com/docker/docker/pkg/random"
10 8
 )
11 9
 
12
-var rndSrc = random.NewSource()
13
-
14 10
 type readCloserWrapper struct {
15 11
 	io.Reader
16 12
 	closer func() error