Browse code

Merge pull request #8571 from ncdc/3631-stdout-premature-eof

Fix stdout premature EOF

Jessie Frazelle authored on 2014/10/30 03:36:32
Showing 3 changed files
... ...
@@ -83,7 +83,6 @@ func (daemon *Daemon) ContainerAttach(job *engine.Job) engine.Status {
83 83
 		var (
84 84
 			cStdin           io.ReadCloser
85 85
 			cStdout, cStderr io.Writer
86
-			cStdinCloser     io.Closer
87 86
 		)
88 87
 
89 88
 		if stdin {
... ...
@@ -94,7 +93,6 @@ func (daemon *Daemon) ContainerAttach(job *engine.Job) engine.Status {
94 94
 				io.Copy(w, job.Stdin)
95 95
 			}()
96 96
 			cStdin = r
97
-			cStdinCloser = job.Stdin
98 97
 		}
99 98
 		if stdout {
100 99
 			cStdout = job.Stdout
... ...
@@ -103,7 +101,7 @@ func (daemon *Daemon) ContainerAttach(job *engine.Job) engine.Status {
103 103
 			cStderr = job.Stderr
104 104
 		}
105 105
 
106
-		<-daemon.attach(&container.StreamConfig, container.Config.OpenStdin, container.Config.StdinOnce, container.Config.Tty, cStdin, cStdinCloser, cStdout, cStderr)
106
+		<-daemon.attach(&container.StreamConfig, container.Config.OpenStdin, container.Config.StdinOnce, container.Config.Tty, cStdin, cStdout, cStderr)
107 107
 		// If we are in stdinonce mode, wait for the process to end
108 108
 		// otherwise, simply return
109 109
 		if container.Config.StdinOnce && !container.Config.Tty {
... ...
@@ -113,7 +111,7 @@ func (daemon *Daemon) ContainerAttach(job *engine.Job) engine.Status {
113 113
 	return engine.StatusOK
114 114
 }
115 115
 
116
-func (daemon *Daemon) attach(streamConfig *StreamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdinCloser io.Closer, stdout io.Writer, stderr io.Writer) chan error {
116
+func (daemon *Daemon) attach(streamConfig *StreamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer) chan error {
117 117
 	var (
118 118
 		cStdout, cStderr io.ReadCloser
119 119
 		nJobs            int
... ...
@@ -130,10 +128,10 @@ func (daemon *Daemon) attach(streamConfig *StreamConfig, openStdin, stdinOnce, t
130 130
 			go func() {
131 131
 				log.Debugf("attach: stdin: begin")
132 132
 				defer log.Debugf("attach: stdin: end")
133
-				// No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr
134 133
 				if stdinOnce && !tty {
135 134
 					defer cStdin.Close()
136 135
 				} else {
136
+					// No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr
137 137
 					defer func() {
138 138
 						if cStdout != nil {
139 139
 							cStdout.Close()
... ...
@@ -173,9 +171,6 @@ func (daemon *Daemon) attach(streamConfig *StreamConfig, openStdin, stdinOnce, t
173 173
 				if stdinOnce && stdin != nil {
174 174
 					defer stdin.Close()
175 175
 				}
176
-				if stdinCloser != nil {
177
-					defer stdinCloser.Close()
178
-				}
179 176
 				_, err := io.Copy(stdout, cStdout)
180 177
 				if err == io.ErrClosedPipe {
181 178
 					err = nil
... ...
@@ -189,9 +184,6 @@ func (daemon *Daemon) attach(streamConfig *StreamConfig, openStdin, stdinOnce, t
189 189
 	} else {
190 190
 		// Point stdout of container to a no-op writer.
191 191
 		go func() {
192
-			if stdinCloser != nil {
193
-				defer stdinCloser.Close()
194
-			}
195 192
 			if cStdout, err := streamConfig.StdoutPipe(); err != nil {
196 193
 				log.Errorf("attach: stdout pipe: %s", err)
197 194
 			} else {
... ...
@@ -213,9 +205,6 @@ func (daemon *Daemon) attach(streamConfig *StreamConfig, openStdin, stdinOnce, t
213 213
 				if stdinOnce && stdin != nil {
214 214
 					defer stdin.Close()
215 215
 				}
216
-				if stdinCloser != nil {
217
-					defer stdinCloser.Close()
218
-				}
219 216
 				_, err := io.Copy(stderr, cStderr)
220 217
 				if err == io.ErrClosedPipe {
221 218
 					err = nil
... ...
@@ -229,10 +218,6 @@ func (daemon *Daemon) attach(streamConfig *StreamConfig, openStdin, stdinOnce, t
229 229
 	} else {
230 230
 		// Point stderr at a no-op writer.
231 231
 		go func() {
232
-			if stdinCloser != nil {
233
-				defer stdinCloser.Close()
234
-			}
235
-
236 232
 			if cStderr, err := streamConfig.StderrPipe(); err != nil {
237 233
 				log.Errorf("attach: stdout pipe: %s", err)
238 234
 			} else {
... ...
@@ -251,8 +236,6 @@ func (daemon *Daemon) attach(streamConfig *StreamConfig, openStdin, stdinOnce, t
251 251
 			}
252 252
 		}()
253 253
 
254
-		// FIXME: how to clean up the stdin goroutine without the unwanted side effect
255
-		// of closing the passed stdin? Add an intermediary io.Pipe?
256 254
 		for i := 0; i < nJobs; i++ {
257 255
 			log.Debugf("attach: waiting for job %d/%d", i+1, nJobs)
258 256
 			if err := <-errors; err != nil {
... ...
@@ -155,7 +155,6 @@ func (d *Daemon) ContainerExecStart(job *engine.Job) engine.Status {
155 155
 	var (
156 156
 		cStdin           io.ReadCloser
157 157
 		cStdout, cStderr io.Writer
158
-		cStdinCloser     io.Closer
159 158
 		execName         = job.Args[0]
160 159
 	)
161 160
 
... ...
@@ -183,10 +182,10 @@ func (d *Daemon) ContainerExecStart(job *engine.Job) engine.Status {
183 183
 		r, w := io.Pipe()
184 184
 		go func() {
185 185
 			defer w.Close()
186
+			defer log.Debugf("Closing buffered stdin pipe")
186 187
 			io.Copy(w, job.Stdin)
187 188
 		}()
188 189
 		cStdin = r
189
-		cStdinCloser = job.Stdin
190 190
 	}
191 191
 	if execConfig.OpenStdout {
192 192
 		cStdout = job.Stdout
... ...
@@ -204,7 +203,7 @@ func (d *Daemon) ContainerExecStart(job *engine.Job) engine.Status {
204 204
 		execConfig.StreamConfig.stdinPipe = ioutils.NopWriteCloser(ioutil.Discard) // Silently drop stdin
205 205
 	}
206 206
 
207
-	attachErr := d.attach(&execConfig.StreamConfig, execConfig.OpenStdin, false, execConfig.ProcessConfig.Tty, cStdin, cStdinCloser, cStdout, cStderr)
207
+	attachErr := d.attach(&execConfig.StreamConfig, execConfig.OpenStdin, false, execConfig.ProcessConfig.Tty, cStdin, cStdout, cStderr)
208 208
 
209 209
 	execErr := make(chan error)
210 210
 
... ...
@@ -4,6 +4,7 @@ import (
4 4
 	"bufio"
5 5
 	"bytes"
6 6
 	"fmt"
7
+	"io"
7 8
 	"io/ioutil"
8 9
 	"net"
9 10
 	"os"
... ...
@@ -2446,3 +2447,46 @@ func TestRunVolumesCleanPaths(t *testing.T) {
2446 2446
 
2447 2447
 	logDone("run - volume paths are cleaned")
2448 2448
 }
2449
+
2450
+// Regression test for #3631
2451
+func TestRunSlowStdoutConsumer(t *testing.T) {
2452
+	defer deleteAllContainers()
2453
+
2454
+	c := exec.Command("/bin/bash", "-c", dockerBinary+` run --rm -i busybox /bin/sh -c "dd if=/dev/zero of=/foo bs=1024 count=2000 &>/dev/null; catv /foo"`)
2455
+
2456
+	stdout, err := c.StdoutPipe()
2457
+	if err != nil {
2458
+		t.Fatal(err)
2459
+	}
2460
+
2461
+	if err := c.Start(); err != nil {
2462
+		t.Fatal(err)
2463
+	}
2464
+	n, err := consumeSlow(stdout, 10000, 5*time.Millisecond)
2465
+	if err != nil {
2466
+		t.Fatal(err)
2467
+	}
2468
+
2469
+	expected := 2 * 1024 * 2000
2470
+	if n != expected {
2471
+		t.Fatalf("Expected %d, got %d", expected, n)
2472
+	}
2473
+
2474
+	logDone("run - slow consumer")
2475
+}
2476
+
2477
+func consumeSlow(reader io.Reader, chunkSize int, interval time.Duration) (n int, err error) {
2478
+	buffer := make([]byte, chunkSize)
2479
+	for {
2480
+		var readBytes int
2481
+		readBytes, err = reader.Read(buffer)
2482
+		n += readBytes
2483
+		if err != nil {
2484
+			if err == io.EOF {
2485
+				err = nil
2486
+			}
2487
+			return
2488
+		}
2489
+		time.Sleep(interval)
2490
+	}
2491
+}