Browse code

Fix stdout premature EOF

Never close attached stream before both stdout and stderr have written
all their buffered contents. Remove stdinCloser because it is not needed
any more as the stream is closed anyway after attach has finished.

Fixes #3631

Signed-off-by: Andy Goldstein <agoldste@redhat.com>

Andy Goldstein authored on 2014/10/04 02:38:44
Showing 3 changed files
... ...
@@ -83,7 +83,6 @@ func (daemon *Daemon) ContainerAttach(job *engine.Job) engine.Status {
83 83
 		var (
84 84
 			cStdin           io.ReadCloser
85 85
 			cStdout, cStderr io.Writer
86
-			cStdinCloser     io.Closer
87 86
 		)
88 87
 
89 88
 		if stdin {
... ...
@@ -94,7 +93,6 @@ func (daemon *Daemon) ContainerAttach(job *engine.Job) engine.Status {
94 94
 				io.Copy(w, job.Stdin)
95 95
 			}()
96 96
 			cStdin = r
97
-			cStdinCloser = job.Stdin
98 97
 		}
99 98
 		if stdout {
100 99
 			cStdout = job.Stdout
... ...
@@ -103,7 +101,7 @@ func (daemon *Daemon) ContainerAttach(job *engine.Job) engine.Status {
103 103
 			cStderr = job.Stderr
104 104
 		}
105 105
 
106
-		<-daemon.attach(&container.StreamConfig, container.Config.OpenStdin, container.Config.StdinOnce, container.Config.Tty, cStdin, cStdinCloser, cStdout, cStderr)
106
+		<-daemon.attach(&container.StreamConfig, container.Config.OpenStdin, container.Config.StdinOnce, container.Config.Tty, cStdin, cStdout, cStderr)
107 107
 		// If we are in stdinonce mode, wait for the process to end
108 108
 		// otherwise, simply return
109 109
 		if container.Config.StdinOnce && !container.Config.Tty {
... ...
@@ -113,7 +111,7 @@ func (daemon *Daemon) ContainerAttach(job *engine.Job) engine.Status {
113 113
 	return engine.StatusOK
114 114
 }
115 115
 
116
-func (daemon *Daemon) attach(streamConfig *StreamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdinCloser io.Closer, stdout io.Writer, stderr io.Writer) chan error {
116
+func (daemon *Daemon) attach(streamConfig *StreamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer) chan error {
117 117
 	var (
118 118
 		cStdout, cStderr io.ReadCloser
119 119
 		nJobs            int
... ...
@@ -130,10 +128,10 @@ func (daemon *Daemon) attach(streamConfig *StreamConfig, openStdin, stdinOnce, t
130 130
 			go func() {
131 131
 				log.Debugf("attach: stdin: begin")
132 132
 				defer log.Debugf("attach: stdin: end")
133
-				// No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr
134 133
 				if stdinOnce && !tty {
135 134
 					defer cStdin.Close()
136 135
 				} else {
136
+					// No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr
137 137
 					defer func() {
138 138
 						if cStdout != nil {
139 139
 							cStdout.Close()
... ...
@@ -173,9 +171,6 @@ func (daemon *Daemon) attach(streamConfig *StreamConfig, openStdin, stdinOnce, t
173 173
 				if stdinOnce && stdin != nil {
174 174
 					defer stdin.Close()
175 175
 				}
176
-				if stdinCloser != nil {
177
-					defer stdinCloser.Close()
178
-				}
179 176
 				_, err := io.Copy(stdout, cStdout)
180 177
 				if err == io.ErrClosedPipe {
181 178
 					err = nil
... ...
@@ -189,9 +184,6 @@ func (daemon *Daemon) attach(streamConfig *StreamConfig, openStdin, stdinOnce, t
189 189
 	} else {
190 190
 		// Point stdout of container to a no-op writer.
191 191
 		go func() {
192
-			if stdinCloser != nil {
193
-				defer stdinCloser.Close()
194
-			}
195 192
 			if cStdout, err := streamConfig.StdoutPipe(); err != nil {
196 193
 				log.Errorf("attach: stdout pipe: %s", err)
197 194
 			} else {
... ...
@@ -213,9 +205,6 @@ func (daemon *Daemon) attach(streamConfig *StreamConfig, openStdin, stdinOnce, t
213 213
 				if stdinOnce && stdin != nil {
214 214
 					defer stdin.Close()
215 215
 				}
216
-				if stdinCloser != nil {
217
-					defer stdinCloser.Close()
218
-				}
219 216
 				_, err := io.Copy(stderr, cStderr)
220 217
 				if err == io.ErrClosedPipe {
221 218
 					err = nil
... ...
@@ -229,10 +218,6 @@ func (daemon *Daemon) attach(streamConfig *StreamConfig, openStdin, stdinOnce, t
229 229
 	} else {
230 230
 		// Point stderr at a no-op writer.
231 231
 		go func() {
232
-			if stdinCloser != nil {
233
-				defer stdinCloser.Close()
234
-			}
235
-
236 232
 			if cStderr, err := streamConfig.StderrPipe(); err != nil {
237 233
 				log.Errorf("attach: stdout pipe: %s", err)
238 234
 			} else {
... ...
@@ -251,8 +236,6 @@ func (daemon *Daemon) attach(streamConfig *StreamConfig, openStdin, stdinOnce, t
251 251
 			}
252 252
 		}()
253 253
 
254
-		// FIXME: how to clean up the stdin goroutine without the unwanted side effect
255
-		// of closing the passed stdin? Add an intermediary io.Pipe?
256 254
 		for i := 0; i < nJobs; i++ {
257 255
 			log.Debugf("attach: waiting for job %d/%d", i+1, nJobs)
258 256
 			if err := <-errors; err != nil {
... ...
@@ -155,7 +155,6 @@ func (d *Daemon) ContainerExecStart(job *engine.Job) engine.Status {
155 155
 	var (
156 156
 		cStdin           io.ReadCloser
157 157
 		cStdout, cStderr io.Writer
158
-		cStdinCloser     io.Closer
159 158
 		execName         = job.Args[0]
160 159
 	)
161 160
 
... ...
@@ -183,10 +182,10 @@ func (d *Daemon) ContainerExecStart(job *engine.Job) engine.Status {
183 183
 		r, w := io.Pipe()
184 184
 		go func() {
185 185
 			defer w.Close()
186
+			defer log.Debugf("Closing buffered stdin pipe")
186 187
 			io.Copy(w, job.Stdin)
187 188
 		}()
188 189
 		cStdin = r
189
-		cStdinCloser = job.Stdin
190 190
 	}
191 191
 	if execConfig.OpenStdout {
192 192
 		cStdout = job.Stdout
... ...
@@ -204,7 +203,7 @@ func (d *Daemon) ContainerExecStart(job *engine.Job) engine.Status {
204 204
 		execConfig.StreamConfig.stdinPipe = ioutils.NopWriteCloser(ioutil.Discard) // Silently drop stdin
205 205
 	}
206 206
 
207
-	attachErr := d.attach(&execConfig.StreamConfig, execConfig.OpenStdin, false, execConfig.ProcessConfig.Tty, cStdin, cStdinCloser, cStdout, cStderr)
207
+	attachErr := d.attach(&execConfig.StreamConfig, execConfig.OpenStdin, false, execConfig.ProcessConfig.Tty, cStdin, cStdout, cStderr)
208 208
 
209 209
 	execErr := make(chan error)
210 210
 
... ...
@@ -4,6 +4,7 @@ import (
4 4
 	"bufio"
5 5
 	"bytes"
6 6
 	"fmt"
7
+	"io"
7 8
 	"io/ioutil"
8 9
 	"net"
9 10
 	"os"
... ...
@@ -2446,3 +2447,46 @@ func TestRunVolumesCleanPaths(t *testing.T) {
2446 2446
 
2447 2447
 	logDone("run - volume paths are cleaned")
2448 2448
 }
2449
+
2450
+// Regression test for #3631
2451
+func TestRunSlowStdoutConsumer(t *testing.T) {
2452
+	defer deleteAllContainers()
2453
+
2454
+	c := exec.Command("/bin/bash", "-c", dockerBinary+` run --rm -i busybox /bin/sh -c "dd if=/dev/zero of=/foo bs=1024 count=2000 &>/dev/null; catv /foo"`)
2455
+
2456
+	stdout, err := c.StdoutPipe()
2457
+	if err != nil {
2458
+		t.Fatal(err)
2459
+	}
2460
+
2461
+	if err := c.Start(); err != nil {
2462
+		t.Fatal(err)
2463
+	}
2464
+	n, err := consumeSlow(stdout, 10000, 5*time.Millisecond)
2465
+	if err != nil {
2466
+		t.Fatal(err)
2467
+	}
2468
+
2469
+	expected := 2 * 1024 * 2000
2470
+	if n != expected {
2471
+		t.Fatalf("Expected %d, got %d", expected, n)
2472
+	}
2473
+
2474
+	logDone("run - slow consumer")
2475
+}
2476
+
2477
+func consumeSlow(reader io.Reader, chunkSize int, interval time.Duration) (n int, err error) {
2478
+	buffer := make([]byte, chunkSize)
2479
+	for {
2480
+		var readBytes int
2481
+		readBytes, err = reader.Read(buffer)
2482
+		n += readBytes
2483
+		if err != nil {
2484
+			if err == io.EOF {
2485
+				err = nil
2486
+			}
2487
+			return
2488
+		}
2489
+		time.Sleep(interval)
2490
+	}
2491
+}