Browse code

Improve performance/reduce allocs of bytespipe

Creates a `fixedBuffer` type that is used to encapsulate functionality
for reading/writing from the underlying byte slices.

Uses lazily-loaded set of sync.Pools for storing buffers that are no
longer needed so they can be re-used.

```
benchmark old ns/op new ns/op delta
BenchmarkBytesPipeWrite-8 138469 48985 -64.62%
BenchmarkBytesPipeRead-8 130922 56601 -56.77%

benchmark old allocs new allocs delta
BenchmarkBytesPipeWrite-8 18 8 -55.56%
BenchmarkBytesPipeRead-8 0 0 +0.00%

benchmark old bytes new bytes delta
BenchmarkBytesPipeWrite-8 66903 1649 -97.54%
BenchmarkBytesPipeRead-8 0 1 +Inf%
```

Signed-off-by: Brian Goff <cpuguy83@gmail.com>

Brian Goff authored on 2016/04/01 01:50:50
Showing 5 changed files
1 1
new file mode 100644
... ...
@@ -0,0 +1,51 @@
0
+package ioutils
1
+
2
+import (
3
+	"errors"
4
+	"io"
5
+)
6
+
7
+var errBufferFull = errors.New("buffer is full")
8
+
9
+type fixedBuffer struct {
10
+	buf      []byte
11
+	pos      int
12
+	lastRead int
13
+}
14
+
15
+func (b *fixedBuffer) Write(p []byte) (int, error) {
16
+	n := copy(b.buf[b.pos:cap(b.buf)], p)
17
+	b.pos += n
18
+
19
+	if n < len(p) {
20
+		if b.pos == cap(b.buf) {
21
+			return n, errBufferFull
22
+		}
23
+		return n, io.ErrShortWrite
24
+	}
25
+	return n, nil
26
+}
27
+
28
+func (b *fixedBuffer) Read(p []byte) (int, error) {
29
+	n := copy(p, b.buf[b.lastRead:b.pos])
30
+	b.lastRead += n
31
+	return n, nil
32
+}
33
+
34
+func (b *fixedBuffer) Len() int {
35
+	return b.pos - b.lastRead
36
+}
37
+
38
+func (b *fixedBuffer) Cap() int {
39
+	return cap(b.buf)
40
+}
41
+
42
+func (b *fixedBuffer) Reset() {
43
+	b.pos = 0
44
+	b.lastRead = 0
45
+	b.buf = b.buf[:0]
46
+}
47
+
48
+func (b *fixedBuffer) String() string {
49
+	return string(b.buf[b.lastRead:b.pos])
50
+}
0 51
new file mode 100644
... ...
@@ -0,0 +1,75 @@
0
+package ioutils
1
+
2
+import (
3
+	"bytes"
4
+	"testing"
5
+)
6
+
7
+func TestFixedBufferWrite(t *testing.T) {
8
+	buf := &fixedBuffer{buf: make([]byte, 0, 64)}
9
+	n, err := buf.Write([]byte("hello"))
10
+	if err != nil {
11
+		t.Fatal(err)
12
+	}
13
+
14
+	if n != 5 {
15
+		t.Fatalf("expected 5 bytes written, got %d", n)
16
+	}
17
+
18
+	if string(buf.buf[:5]) != "hello" {
19
+		t.Fatalf("expected \"hello\", got %q", string(buf.buf[:5]))
20
+	}
21
+
22
+	n, err = buf.Write(bytes.Repeat([]byte{1}, 64))
23
+	if err != errBufferFull {
24
+		t.Fatalf("expected errBufferFull, got %v - %v", err, buf.buf[:64])
25
+	}
26
+}
27
+
28
+func TestFixedBufferRead(t *testing.T) {
29
+	buf := &fixedBuffer{buf: make([]byte, 0, 64)}
30
+	if _, err := buf.Write([]byte("hello world")); err != nil {
31
+		t.Fatal(err)
32
+	}
33
+
34
+	b := make([]byte, 5)
35
+	n, err := buf.Read(b)
36
+	if err != nil {
37
+		t.Fatal(err)
38
+	}
39
+
40
+	if n != 5 {
41
+		t.Fatalf("expected 5 bytes read, got %d - %s", n, buf.String())
42
+	}
43
+
44
+	if string(b) != "hello" {
45
+		t.Fatalf("expected \"hello\", got %q", string(b))
46
+	}
47
+
48
+	n, err = buf.Read(b)
49
+	if err != nil {
50
+		t.Fatal(err)
51
+	}
52
+
53
+	if n != 5 {
54
+		t.Fatalf("expected 5 bytes read, got %d", n)
55
+	}
56
+
57
+	if string(b) != " worl" {
58
+		t.Fatalf("expected \" worl\", got %s", string(b))
59
+	}
60
+
61
+	b = b[:1]
62
+	n, err = buf.Read(b)
63
+	if err != nil {
64
+		t.Fatal(err)
65
+	}
66
+
67
+	if n != 1 {
68
+		t.Fatalf("expected 1 byte read, got %d - %s", n, buf.String())
69
+	}
70
+
71
+	if string(b) != "d" {
72
+		t.Fatalf("expected \"d\", got %s", string(b))
73
+	}
74
+}
... ...
@@ -9,12 +9,19 @@ import (
9 9
 // maxCap is the highest capacity to use in byte slices that buffer data.
10 10
 const maxCap = 1e6
11 11
 
12
+// minCap is the lowest capacity to use in byte slices that buffer data
13
+const minCap = 64
14
+
12 15
 // blockThreshold is the minimum number of bytes in the buffer which will cause
13 16
 // a write to BytesPipe to block when allocating a new slice.
14 17
 const blockThreshold = 1e6
15 18
 
16
-// ErrClosed is returned when Write is called on a closed BytesPipe.
17
-var ErrClosed = errors.New("write to closed BytesPipe")
19
+var (
20
+	// ErrClosed is returned when Write is called on a closed BytesPipe.
21
+	ErrClosed = errors.New("write to closed BytesPipe")
22
+
23
+	bufPools = make(map[int]*sync.Pool)
24
+)
18 25
 
19 26
 // BytesPipe is io.ReadWriteCloser which works similarly to pipe(queue).
20 27
 // All written data may be read at most once. Also, BytesPipe allocates
... ...
@@ -23,22 +30,17 @@ var ErrClosed = errors.New("write to closed BytesPipe")
23 23
 type BytesPipe struct {
24 24
 	mu       sync.Mutex
25 25
 	wait     *sync.Cond
26
-	buf      [][]byte // slice of byte-slices of buffered data
27
-	lastRead int      // index in the first slice to a read point
28
-	bufLen   int      // length of data buffered over the slices
29
-	closeErr error    // error to return from next Read. set to nil if not closed.
26
+	buf      []*fixedBuffer
27
+	bufLen   int
28
+	closeErr error // error to return from next Read. set to nil if not closed.
30 29
 }
31 30
 
32 31
 // NewBytesPipe creates new BytesPipe, initialized by specified slice.
33 32
 // If buf is nil, then it will be initialized with slice which cap is 64.
34 33
 // buf will be adjusted in a way that len(buf) == 0, cap(buf) == cap(buf).
35
-func NewBytesPipe(buf []byte) *BytesPipe {
36
-	if cap(buf) == 0 {
37
-		buf = make([]byte, 0, 64)
38
-	}
39
-	bp := &BytesPipe{
40
-		buf: [][]byte{buf[:0]},
41
-	}
34
+func NewBytesPipe() *BytesPipe {
35
+	bp := &BytesPipe{}
36
+	bp.buf = append(bp.buf, getBuffer(minCap))
42 37
 	bp.wait = sync.NewCond(&bp.mu)
43 38
 	return bp
44 39
 }
... ...
@@ -47,22 +49,30 @@ func NewBytesPipe(buf []byte) *BytesPipe {
47 47
 // It can allocate new []byte slices in a process of writing.
48 48
 func (bp *BytesPipe) Write(p []byte) (int, error) {
49 49
 	bp.mu.Lock()
50
-	defer bp.mu.Unlock()
50
+
51 51
 	written := 0
52 52
 	for {
53 53
 		if bp.closeErr != nil {
54
+			bp.mu.Unlock()
54 55
 			return written, ErrClosed
55 56
 		}
56
-		// write data to the last buffer
57
+
58
+		if len(bp.buf) == 0 {
59
+			bp.buf = append(bp.buf, getBuffer(64))
60
+		}
61
+		// get the last buffer
57 62
 		b := bp.buf[len(bp.buf)-1]
58
-		// copy data to the current empty allocated area
59
-		n := copy(b[len(b):cap(b)], p)
60
-		// increment buffered data length
61
-		bp.bufLen += n
62
-		// include written data in last buffer
63
-		bp.buf[len(bp.buf)-1] = b[:len(b)+n]
64 63
 
64
+		n, err := b.Write(p)
65 65
 		written += n
66
+		bp.bufLen += n
67
+
68
+		// errBufferFull is an error we expect to get if the buffer is full
69
+		if err != nil && err != errBufferFull {
70
+			bp.wait.Broadcast()
71
+			bp.mu.Unlock()
72
+			return written, err
73
+		}
66 74
 
67 75
 		// if there was enough room to write all then break
68 76
 		if len(p) == n {
... ...
@@ -72,20 +82,20 @@ func (bp *BytesPipe) Write(p []byte) (int, error) {
72 72
 		// more data: write to the next slice
73 73
 		p = p[n:]
74 74
 
75
-		// block if too much data is still in the buffer
75
+		// make sure the buffer doesn't grow too big from this write
76 76
 		for bp.bufLen >= blockThreshold {
77 77
 			bp.wait.Wait()
78 78
 		}
79 79
 
80
-		// allocate slice that has twice the size of the last unless maximum reached
81
-		nextCap := 2 * cap(bp.buf[len(bp.buf)-1])
80
+		// add new byte slice to the buffers slice and continue writing
81
+		nextCap := b.Cap() * 2
82 82
 		if nextCap > maxCap {
83 83
 			nextCap = maxCap
84 84
 		}
85
-		// add new byte slice to the buffers slice and continue writing
86
-		bp.buf = append(bp.buf, make([]byte, 0, nextCap))
85
+		bp.buf = append(bp.buf, getBuffer(nextCap))
87 86
 	}
88 87
 	bp.wait.Broadcast()
88
+	bp.mu.Unlock()
89 89
 	return written, nil
90 90
 }
91 91
 
... ...
@@ -107,46 +117,60 @@ func (bp *BytesPipe) Close() error {
107 107
 	return bp.CloseWithError(nil)
108 108
 }
109 109
 
110
-func (bp *BytesPipe) len() int {
111
-	return bp.bufLen - bp.lastRead
112
-}
113
-
114 110
 // Read reads bytes from BytesPipe.
115 111
 // Data could be read only once.
116 112
 func (bp *BytesPipe) Read(p []byte) (n int, err error) {
117 113
 	bp.mu.Lock()
118
-	defer bp.mu.Unlock()
119
-	if bp.len() == 0 {
114
+	if bp.bufLen == 0 {
120 115
 		if bp.closeErr != nil {
116
+			bp.mu.Unlock()
121 117
 			return 0, bp.closeErr
122 118
 		}
123 119
 		bp.wait.Wait()
124
-		if bp.len() == 0 && bp.closeErr != nil {
120
+		if bp.bufLen == 0 && bp.closeErr != nil {
121
+			bp.mu.Unlock()
125 122
 			return 0, bp.closeErr
126 123
 		}
127 124
 	}
128
-	for {
129
-		read := copy(p, bp.buf[0][bp.lastRead:])
125
+
126
+	for bp.bufLen > 0 {
127
+		b := bp.buf[0]
128
+		read, _ := b.Read(p) // ignore error since fixedBuffer doesn't really return an error
130 129
 		n += read
131
-		bp.lastRead += read
132
-		if bp.len() == 0 {
133
-			// we have read everything. reset to the beginning.
134
-			bp.lastRead = 0
135
-			bp.bufLen -= len(bp.buf[0])
136
-			bp.buf[0] = bp.buf[0][:0]
137
-			break
130
+		bp.bufLen -= read
131
+
132
+		if b.Len() == 0 {
133
+			// it's empty so return it to the pool and move to the next one
134
+			returnBuffer(b)
135
+			bp.buf[0] = nil
136
+			bp.buf = bp.buf[1:]
138 137
 		}
139
-		// break if everything was read
138
+
140 139
 		if len(p) == read {
141 140
 			break
142 141
 		}
143
-		// more buffered data and more asked. read from next slice.
142
+
144 143
 		p = p[read:]
145
-		bp.lastRead = 0
146
-		bp.bufLen -= len(bp.buf[0])
147
-		bp.buf[0] = nil     // throw away old slice
148
-		bp.buf = bp.buf[1:] // switch to next
149 144
 	}
145
+
150 146
 	bp.wait.Broadcast()
147
+	bp.mu.Unlock()
151 148
 	return
152 149
 }
150
+
151
+func returnBuffer(b *fixedBuffer) {
152
+	b.Reset()
153
+	pool := bufPools[b.Cap()]
154
+	if pool != nil {
155
+		pool.Put(b)
156
+	}
157
+}
158
+
159
+func getBuffer(size int) *fixedBuffer {
160
+	pool, ok := bufPools[size]
161
+	if !ok {
162
+		pool = &sync.Pool{New: func() interface{} { return &fixedBuffer{buf: make([]byte, 0, size)} }}
163
+		bufPools[size] = pool
164
+	}
165
+	return pool.Get().(*fixedBuffer)
166
+}
... ...
@@ -9,7 +9,7 @@ import (
9 9
 )
10 10
 
11 11
 func TestBytesPipeRead(t *testing.T) {
12
-	buf := NewBytesPipe(nil)
12
+	buf := NewBytesPipe()
13 13
 	buf.Write([]byte("12"))
14 14
 	buf.Write([]byte("34"))
15 15
 	buf.Write([]byte("56"))
... ...
@@ -49,14 +49,14 @@ func TestBytesPipeRead(t *testing.T) {
49 49
 }
50 50
 
51 51
 func TestBytesPipeWrite(t *testing.T) {
52
-	buf := NewBytesPipe(nil)
52
+	buf := NewBytesPipe()
53 53
 	buf.Write([]byte("12"))
54 54
 	buf.Write([]byte("34"))
55 55
 	buf.Write([]byte("56"))
56 56
 	buf.Write([]byte("78"))
57 57
 	buf.Write([]byte("90"))
58
-	if string(buf.buf[0]) != "1234567890" {
59
-		t.Fatalf("Buffer %s, must be %s", buf.buf, "1234567890")
58
+	if buf.buf[0].String() != "1234567890" {
59
+		t.Fatalf("Buffer %q, must be %q", buf.buf[0].String(), "1234567890")
60 60
 	}
61 61
 }
62 62
 
... ...
@@ -86,7 +86,7 @@ func TestBytesPipeWriteRandomChunks(t *testing.T) {
86 86
 		expected := hex.EncodeToString(hash.Sum(nil))
87 87
 
88 88
 		// write/read through buffer
89
-		buf := NewBytesPipe(nil)
89
+		buf := NewBytesPipe()
90 90
 		hash.Reset()
91 91
 
92 92
 		done := make(chan struct{})
... ...
@@ -124,9 +124,10 @@ func TestBytesPipeWriteRandomChunks(t *testing.T) {
124 124
 }
125 125
 
126 126
 func BenchmarkBytesPipeWrite(b *testing.B) {
127
+	testData := []byte("pretty short line, because why not?")
127 128
 	for i := 0; i < b.N; i++ {
128 129
 		readBuf := make([]byte, 1024)
129
-		buf := NewBytesPipe(nil)
130
+		buf := NewBytesPipe()
130 131
 		go func() {
131 132
 			var err error
132 133
 			for err == nil {
... ...
@@ -134,7 +135,7 @@ func BenchmarkBytesPipeWrite(b *testing.B) {
134 134
 			}
135 135
 		}()
136 136
 		for j := 0; j < 1000; j++ {
137
-			buf.Write([]byte("pretty short line, because why not?"))
137
+			buf.Write(testData)
138 138
 		}
139 139
 		buf.Close()
140 140
 	}
... ...
@@ -144,7 +145,7 @@ func BenchmarkBytesPipeRead(b *testing.B) {
144 144
 	rd := make([]byte, 512)
145 145
 	for i := 0; i < b.N; i++ {
146 146
 		b.StopTimer()
147
-		buf := NewBytesPipe(nil)
147
+		buf := NewBytesPipe()
148 148
 		for j := 0; j < 500; j++ {
149 149
 			buf.Write(make([]byte, 1024))
150 150
 		}
... ...
@@ -60,7 +60,7 @@ func (streamConfig *StreamConfig) StdinPipe() io.WriteCloser {
60 60
 // StdoutPipe creates a new io.ReadCloser with an empty bytes pipe.
61 61
 // It adds this new out pipe to the Stdout broadcaster.
62 62
 func (streamConfig *StreamConfig) StdoutPipe() io.ReadCloser {
63
-	bytesPipe := ioutils.NewBytesPipe(nil)
63
+	bytesPipe := ioutils.NewBytesPipe()
64 64
 	streamConfig.stdout.Add(bytesPipe)
65 65
 	return bytesPipe
66 66
 }
... ...
@@ -68,7 +68,7 @@ func (streamConfig *StreamConfig) StdoutPipe() io.ReadCloser {
68 68
 // StderrPipe creates a new io.ReadCloser with an empty bytes pipe.
69 69
 // It adds this new err pipe to the Stderr broadcaster.
70 70
 func (streamConfig *StreamConfig) StderrPipe() io.ReadCloser {
71
-	bytesPipe := ioutils.NewBytesPipe(nil)
71
+	bytesPipe := ioutils.NewBytesPipe()
72 72
 	streamConfig.stderr.Add(bytesPipe)
73 73
 	return bytesPipe
74 74
 }