Browse code

Refactor bufReader to use BytesPipe

Signed-off-by: Alexander Morozov <lk4d4@docker.com>

Alexander Morozov authored on 2015/09/10 01:48:34
Showing 2 changed files
... ...
@@ -1,13 +1,10 @@
1 1
 package ioutils
2 2
 
3 3
 import (
4
-	"bytes"
5 4
 	"crypto/sha256"
6 5
 	"encoding/hex"
7 6
 	"io"
8
-	"math/rand"
9 7
 	"sync"
10
-	"time"
11 8
 
12 9
 	"github.com/docker/docker/pkg/random"
13 10
 )
... ...
@@ -58,31 +55,19 @@ func NewReaderErrWrapper(r io.Reader, closer func()) io.Reader {
58 58
 // expanding buffer.
59 59
 type bufReader struct {
60 60
 	sync.Mutex
61
-	buf                  *bytes.Buffer
62
-	reader               io.Reader
63
-	err                  error
64
-	wait                 sync.Cond
65
-	drainBuf             []byte
66
-	reuseBuf             []byte
67
-	maxReuse             int64
68
-	resetTimeout         time.Duration
69
-	bufLenResetThreshold int64
70
-	maxReadDataReset     int64
61
+	buf      io.ReadWriter
62
+	reader   io.Reader
63
+	err      error
64
+	wait     sync.Cond
65
+	drainBuf []byte
71 66
 }
72 67
 
73 68
 // NewBufReader returns a new bufReader.
74 69
 func NewBufReader(r io.Reader) io.ReadCloser {
75
-	timeout := rand.New(rndSrc).Intn(90)
76
-
77 70
 	reader := &bufReader{
78
-		buf:                  &bytes.Buffer{},
79
-		drainBuf:             make([]byte, 1024),
80
-		reuseBuf:             make([]byte, 4096),
81
-		maxReuse:             1000,
82
-		resetTimeout:         time.Duration(timeout) * time.Second,
83
-		bufLenResetThreshold: 100 * 1024,
84
-		maxReadDataReset:     10 * 1024 * 1024,
85
-		reader:               r,
71
+		buf:      NewBytesPipe(nil),
72
+		reader:   r,
73
+		drainBuf: make([]byte, 1024),
86 74
 	}
87 75
 	reader.wait.L = &reader.Mutex
88 76
 	go reader.drain()
... ...
@@ -90,7 +75,7 @@ func NewBufReader(r io.Reader) io.ReadCloser {
90 90
 }
91 91
 
92 92
 // NewBufReaderWithDrainbufAndBuffer returns a BufReader with drainBuffer and buffer.
93
-func NewBufReaderWithDrainbufAndBuffer(r io.Reader, drainBuffer []byte, buffer *bytes.Buffer) io.ReadCloser {
93
+func NewBufReaderWithDrainbufAndBuffer(r io.Reader, drainBuffer []byte, buffer io.ReadWriter) io.ReadCloser {
94 94
 	reader := &bufReader{
95 95
 		buf:      buffer,
96 96
 		drainBuf: drainBuffer,
... ...
@@ -102,94 +87,19 @@ func NewBufReaderWithDrainbufAndBuffer(r io.Reader, drainBuffer []byte, buffer *
102 102
 }
103 103
 
104 104
 func (r *bufReader) drain() {
105
-	var (
106
-		duration       time.Duration
107
-		lastReset      time.Time
108
-		now            time.Time
109
-		reset          bool
110
-		bufLen         int64
111
-		dataSinceReset int64
112
-		maxBufLen      int64
113
-		reuseBufLen    int64
114
-		reuseCount     int64
115
-	)
116
-	reuseBufLen = int64(len(r.reuseBuf))
117
-	lastReset = time.Now()
118 105
 	for {
119 106
 		n, err := r.reader.Read(r.drainBuf)
120
-		dataSinceReset += int64(n)
121 107
 		r.Lock()
122
-		bufLen = int64(r.buf.Len())
123
-		if bufLen > maxBufLen {
124
-			maxBufLen = bufLen
125
-		}
126
-
127
-		// Avoid unbounded growth of the buffer over time.
128
-		// This has been discovered to be the only non-intrusive
129
-		// solution to the unbounded growth of the buffer.
130
-		// Alternative solutions such as compression, multiple
131
-		// buffers, channels and other similar pieces of code
132
-		// were reducing throughput, overall Docker performance
133
-		// or simply crashed Docker.
134
-		// This solution releases the buffer when specific
135
-		// conditions are met to avoid the continuous resizing
136
-		// of the buffer for long lived containers.
137
-		//
138
-		// Move data to the front of the buffer if it's
139
-		// smaller than what reuseBuf can store
140
-		if bufLen > 0 && reuseBufLen >= bufLen {
141
-			n, _ := r.buf.Read(r.reuseBuf)
142
-			r.buf.Write(r.reuseBuf[0:n])
143
-			// Take action if the buffer has been reused too many
144
-			// times and if there's data in the buffer.
145
-			// The timeout is also used as means to avoid doing
146
-			// these operations more often or less often than
147
-			// required.
148
-			// The various conditions try to detect heavy activity
149
-			// in the buffer which might be indicators of heavy
150
-			// growth of the buffer.
151
-		} else if reuseCount >= r.maxReuse && bufLen > 0 {
152
-			now = time.Now()
153
-			duration = now.Sub(lastReset)
154
-			timeoutReached := duration >= r.resetTimeout
155
-
156
-			// The timeout has been reached and the
157
-			// buffered data couldn't be moved to the front
158
-			// of the buffer, so the buffer gets reset.
159
-			if timeoutReached && bufLen > reuseBufLen {
160
-				reset = true
161
-			}
162
-			// The amount of buffered data is too high now,
163
-			// reset the buffer.
164
-			if timeoutReached && maxBufLen >= r.bufLenResetThreshold {
165
-				reset = true
166
-			}
167
-			// Reset the buffer if a certain amount of
168
-			// data has gone through the buffer since the
169
-			// last reset.
170
-			if timeoutReached && dataSinceReset >= r.maxReadDataReset {
171
-				reset = true
172
-			}
173
-			// The buffered data is moved to a fresh buffer,
174
-			// swap the old buffer with the new one and
175
-			// reset all counters.
176
-			if reset {
177
-				newbuf := &bytes.Buffer{}
178
-				newbuf.ReadFrom(r.buf)
179
-				r.buf = newbuf
180
-				lastReset = now
181
-				reset = false
182
-				dataSinceReset = 0
183
-				maxBufLen = 0
184
-				reuseCount = 0
185
-			}
186
-		}
187 108
 		if err != nil {
188 109
 			r.err = err
189 110
 		} else {
190
-			r.buf.Write(r.drainBuf[0:n])
111
+			if n == 0 {
112
+				// nothing written, no need to signal
113
+				r.Unlock()
114
+				continue
115
+			}
116
+			r.buf.Write(r.drainBuf[:n])
191 117
 		}
192
-		reuseCount++
193 118
 		r.wait.Signal()
194 119
 		r.Unlock()
195 120
 		callSchedulerIfNecessary()
... ...
@@ -7,6 +7,7 @@ import (
7 7
 	"io/ioutil"
8 8
 	"strings"
9 9
 	"testing"
10
+	"time"
10 11
 )
11 12
 
12 13
 // Implement io.Reader
... ...
@@ -61,8 +62,8 @@ func TestNewBufReaderWithDrainbufAndBuffer(t *testing.T) {
61 61
 	reader, writer := io.Pipe()
62 62
 
63 63
 	drainBuffer := make([]byte, 1024)
64
-	buffer := bytes.Buffer{}
65
-	bufreader := NewBufReaderWithDrainbufAndBuffer(reader, drainBuffer, &buffer)
64
+	buffer := NewBytesPipe(nil)
65
+	bufreader := NewBufReaderWithDrainbufAndBuffer(reader, drainBuffer, buffer)
66 66
 
67 67
 	// Write everything down to a Pipe
68 68
 	// Usually, a pipe should block but because of the buffered reader,
... ...
@@ -76,7 +77,11 @@ func TestNewBufReaderWithDrainbufAndBuffer(t *testing.T) {
76 76
 
77 77
 	// Drain the reader *after* everything has been written, just to verify
78 78
 	// it is indeed buffering
79
-	<-done
79
+	select {
80
+	case <-done:
81
+	case <-time.After(1 * time.Second):
82
+		t.Fatal("timeout")
83
+	}
80 84
 
81 85
 	output, err := ioutil.ReadAll(bufreader)
82 86
 	if err != nil {