Browse code

Move attach code to stream package

This cleans up attach a little bit, and moves it out of the container
package.
Really `AttachStream` is a method on `*stream.Config`, so moved if from
a package level function to one bound to `Config`.
In addition, uses a config struct rather than passing around tons and
tons of arguments.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>

Brian Goff authored on 2017/01/20 01:02:51
Showing 5 changed files
... ...
@@ -31,7 +31,6 @@ import (
31 31
 	"github.com/docker/docker/opts"
32 32
 	"github.com/docker/docker/pkg/idtools"
33 33
 	"github.com/docker/docker/pkg/ioutils"
34
-	"github.com/docker/docker/pkg/promise"
35 34
 	"github.com/docker/docker/pkg/signal"
36 35
 	"github.com/docker/docker/pkg/symlink"
37 36
 	"github.com/docker/docker/restartmanager"
... ...
@@ -58,13 +57,6 @@ var (
58 58
 	errInvalidNetwork  = fmt.Errorf("invalid network settings while building port map info")
59 59
 )
60 60
 
61
-// DetachError is special error which returned in case of container detach.
62
-type DetachError struct{}
63
-
64
-func (DetachError) Error() string {
65
-	return "detached from container"
66
-}
67
-
68 61
 // CommonContainer holds the fields for a container which are
69 62
 // applicable across all platforms supported by the daemon.
70 63
 type CommonContainer struct {
... ...
@@ -373,183 +365,17 @@ func (container *Container) GetExecIDs() []string {
373 373
 	return container.ExecCommands.List()
374 374
 }
375 375
 
376
-// Attach connects to the container's TTY, delegating to standard
377
-// streams or websockets depending on the configuration.
378
-func (container *Container) Attach(stdin io.ReadCloser, stdout io.Writer, stderr io.Writer, keys []byte) chan error {
376
+// Attach connects to the container's stdio to the client streams
377
+func (container *Container) Attach(cfg *stream.AttachConfig) chan error {
379 378
 	ctx := container.InitAttachContext()
380
-	return AttachStreams(ctx, container.StreamConfig, container.Config.OpenStdin, container.Config.StdinOnce, container.Config.Tty, stdin, stdout, stderr, keys)
381
-}
382
-
383
-// AttachStreams connects streams to a TTY.
384
-// Used by exec too. Should this move somewhere else?
385
-func AttachStreams(ctx context.Context, streamConfig *stream.Config, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer, keys []byte) chan error {
386
-	var (
387
-		cStdout, cStderr io.ReadCloser
388
-		cStdin           io.WriteCloser
389
-		wg               sync.WaitGroup
390
-		errors           = make(chan error, 3)
391
-	)
392 379
 
393
-	if stdin != nil && openStdin {
394
-		cStdin = streamConfig.StdinPipe()
395
-		wg.Add(1)
380
+	cfg.TTY = container.Config.Tty
381
+	if !container.Config.OpenStdin {
382
+		cfg.Stdin = nil
396 383
 	}
384
+	cfg.CloseStdin = cfg.Stdin != nil && container.Config.StdinOnce
397 385
 
398
-	if stdout != nil {
399
-		cStdout = streamConfig.StdoutPipe()
400
-		wg.Add(1)
401
-	}
402
-
403
-	if stderr != nil {
404
-		cStderr = streamConfig.StderrPipe()
405
-		wg.Add(1)
406
-	}
407
-
408
-	// Connect stdin of container to the http conn.
409
-	go func() {
410
-		if stdin == nil || !openStdin {
411
-			return
412
-		}
413
-		logrus.Debug("attach: stdin: begin")
414
-
415
-		var err error
416
-		if tty {
417
-			_, err = copyEscapable(cStdin, stdin, keys)
418
-		} else {
419
-			_, err = io.Copy(cStdin, stdin)
420
-		}
421
-		if err == io.ErrClosedPipe {
422
-			err = nil
423
-		}
424
-		if err != nil {
425
-			logrus.Errorf("attach: stdin: %s", err)
426
-			errors <- err
427
-		}
428
-		if stdinOnce && !tty {
429
-			cStdin.Close()
430
-		} else {
431
-			// No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr
432
-			if cStdout != nil {
433
-				cStdout.Close()
434
-			}
435
-			if cStderr != nil {
436
-				cStderr.Close()
437
-			}
438
-		}
439
-		logrus.Debug("attach: stdin: end")
440
-		wg.Done()
441
-	}()
442
-
443
-	attachStream := func(name string, stream io.Writer, streamPipe io.ReadCloser) {
444
-		if stream == nil {
445
-			return
446
-		}
447
-
448
-		logrus.Debugf("attach: %s: begin", name)
449
-		_, err := io.Copy(stream, streamPipe)
450
-		if err == io.ErrClosedPipe {
451
-			err = nil
452
-		}
453
-		if err != nil {
454
-			logrus.Errorf("attach: %s: %v", name, err)
455
-			errors <- err
456
-		}
457
-		// Make sure stdin gets closed
458
-		if stdin != nil {
459
-			stdin.Close()
460
-		}
461
-		streamPipe.Close()
462
-		logrus.Debugf("attach: %s: end", name)
463
-		wg.Done()
464
-	}
465
-
466
-	go attachStream("stdout", stdout, cStdout)
467
-	go attachStream("stderr", stderr, cStderr)
468
-
469
-	return promise.Go(func() error {
470
-		done := make(chan struct{})
471
-		go func() {
472
-			wg.Wait()
473
-			close(done)
474
-		}()
475
-		select {
476
-		case <-done:
477
-		case <-ctx.Done():
478
-			// close all pipes
479
-			if cStdin != nil {
480
-				cStdin.Close()
481
-			}
482
-			if cStdout != nil {
483
-				cStdout.Close()
484
-			}
485
-			if cStderr != nil {
486
-				cStderr.Close()
487
-			}
488
-			<-done
489
-		}
490
-		close(errors)
491
-		for err := range errors {
492
-			if err != nil {
493
-				return err
494
-			}
495
-		}
496
-		return nil
497
-	})
498
-}
499
-
500
-// Code c/c from io.Copy() modified to handle escape sequence
501
-func copyEscapable(dst io.Writer, src io.ReadCloser, keys []byte) (written int64, err error) {
502
-	if len(keys) == 0 {
503
-		// Default keys : ctrl-p ctrl-q
504
-		keys = []byte{16, 17}
505
-	}
506
-	buf := make([]byte, 32*1024)
507
-	for {
508
-		nr, er := src.Read(buf)
509
-		if nr > 0 {
510
-			// ---- Docker addition
511
-			preservBuf := []byte{}
512
-			for i, key := range keys {
513
-				preservBuf = append(preservBuf, buf[0:nr]...)
514
-				if nr != 1 || buf[0] != key {
515
-					break
516
-				}
517
-				if i == len(keys)-1 {
518
-					src.Close()
519
-					return 0, DetachError{}
520
-				}
521
-				nr, er = src.Read(buf)
522
-			}
523
-			var nw int
524
-			var ew error
525
-			if len(preservBuf) > 0 {
526
-				nw, ew = dst.Write(preservBuf)
527
-				nr = len(preservBuf)
528
-			} else {
529
-				// ---- End of docker
530
-				nw, ew = dst.Write(buf[0:nr])
531
-			}
532
-			if nw > 0 {
533
-				written += int64(nw)
534
-			}
535
-			if ew != nil {
536
-				err = ew
537
-				break
538
-			}
539
-			if nr != nw {
540
-				err = io.ErrShortWrite
541
-				break
542
-			}
543
-		}
544
-		if er == io.EOF {
545
-			break
546
-		}
547
-		if er != nil {
548
-			err = er
549
-			break
550
-		}
551
-	}
552
-	return written, err
386
+	return container.StreamConfig.Attach(ctx, cfg)
553 387
 }
554 388
 
555 389
 // ShouldRestart decides whether the daemon should restart the container or not.
556 390
new file mode 100644
... ...
@@ -0,0 +1,209 @@
0
+package stream
1
+
2
+import (
3
+	"io"
4
+	"sync"
5
+
6
+	"golang.org/x/net/context"
7
+
8
+	"github.com/Sirupsen/logrus"
9
+	"github.com/docker/docker/pkg/promise"
10
+)
11
+
12
+// DetachError is special error which returned in case of container detach.
13
+type DetachError struct{}
14
+
15
+func (DetachError) Error() string {
16
+	return "detached from container"
17
+}
18
+
19
+// AttachConfig is the config struct used to attach a client to a stream's stdio
20
+type AttachConfig struct {
21
+	// Tells the attach copier that the stream's stdin is a TTY and to look for
22
+	// escape sequences in stdin to detach from the stream.
23
+	// When true the escape sequence is not passed to the underlying stream
24
+	TTY bool
25
+	// Specifies the detach keys the client will be using
26
+	// Only useful when `TTY` is true
27
+	DetachKeys []byte
28
+
29
+	// CloseStdin signals that once done, stdin for the attached stream should be closed
30
+	// For example, this would close the attached container's stdin.
31
+	CloseStdin bool
32
+
33
+	// Provide client streams to wire up to
34
+	Stdin          io.ReadCloser
35
+	Stdout, Stderr io.Writer
36
+}
37
+
38
+// Attach attaches the stream config to the streams specified in
39
+// the AttachOptions
40
+func (c *Config) Attach(ctx context.Context, cfg *AttachConfig) chan error {
41
+	var (
42
+		cStdout, cStderr io.ReadCloser
43
+		cStdin           io.WriteCloser
44
+		wg               sync.WaitGroup
45
+		errors           = make(chan error, 3)
46
+	)
47
+
48
+	if cfg.Stdin != nil {
49
+		cStdin = c.StdinPipe()
50
+		wg.Add(1)
51
+	}
52
+
53
+	if cfg.Stdout != nil {
54
+		cStdout = c.StdoutPipe()
55
+		wg.Add(1)
56
+	}
57
+
58
+	if cfg.Stderr != nil {
59
+		cStderr = c.StderrPipe()
60
+		wg.Add(1)
61
+	}
62
+
63
+	// Connect stdin of container to the attach stdin stream.
64
+	go func() {
65
+		if cfg.Stdin == nil {
66
+			return
67
+		}
68
+		logrus.Debug("attach: stdin: begin")
69
+
70
+		var err error
71
+		if cfg.TTY {
72
+			_, err = copyEscapable(cStdin, cfg.Stdin, cfg.DetachKeys)
73
+		} else {
74
+			_, err = io.Copy(cStdin, cfg.Stdin)
75
+		}
76
+		if err == io.ErrClosedPipe {
77
+			err = nil
78
+		}
79
+		if err != nil {
80
+			logrus.Errorf("attach: stdin: %s", err)
81
+			errors <- err
82
+		}
83
+		if cfg.CloseStdin && !cfg.TTY {
84
+			cStdin.Close()
85
+		} else {
86
+			// No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr
87
+			if cStdout != nil {
88
+				cStdout.Close()
89
+			}
90
+			if cStderr != nil {
91
+				cStderr.Close()
92
+			}
93
+		}
94
+		logrus.Debug("attach: stdin: end")
95
+		wg.Done()
96
+	}()
97
+
98
+	attachStream := func(name string, stream io.Writer, streamPipe io.ReadCloser) {
99
+		if stream == nil {
100
+			return
101
+		}
102
+
103
+		logrus.Debugf("attach: %s: begin", name)
104
+		_, err := io.Copy(stream, streamPipe)
105
+		if err == io.ErrClosedPipe {
106
+			err = nil
107
+		}
108
+		if err != nil {
109
+			logrus.Errorf("attach: %s: %v", name, err)
110
+			errors <- err
111
+		}
112
+		// Make sure stdin gets closed
113
+		if cfg.Stdin != nil {
114
+			cfg.Stdin.Close()
115
+		}
116
+		streamPipe.Close()
117
+		logrus.Debugf("attach: %s: end", name)
118
+		wg.Done()
119
+	}
120
+
121
+	go attachStream("stdout", cfg.Stdout, cStdout)
122
+	go attachStream("stderr", cfg.Stderr, cStderr)
123
+
124
+	return promise.Go(func() error {
125
+		done := make(chan struct{})
126
+		go func() {
127
+			wg.Wait()
128
+			close(done)
129
+		}()
130
+		select {
131
+		case <-done:
132
+		case <-ctx.Done():
133
+			// close all pipes
134
+			if cStdin != nil {
135
+				cStdin.Close()
136
+			}
137
+			if cStdout != nil {
138
+				cStdout.Close()
139
+			}
140
+			if cStderr != nil {
141
+				cStderr.Close()
142
+			}
143
+			<-done
144
+		}
145
+		close(errors)
146
+		for err := range errors {
147
+			if err != nil {
148
+				return err
149
+			}
150
+		}
151
+		return nil
152
+	})
153
+}
154
+
155
+// Code c/c from io.Copy() modified to handle escape sequence
156
+func copyEscapable(dst io.Writer, src io.ReadCloser, keys []byte) (written int64, err error) {
157
+	if len(keys) == 0 {
158
+		// Default keys : ctrl-p ctrl-q
159
+		keys = []byte{16, 17}
160
+	}
161
+	buf := make([]byte, 32*1024)
162
+	for {
163
+		nr, er := src.Read(buf)
164
+		if nr > 0 {
165
+			// ---- Docker addition
166
+			preservBuf := []byte{}
167
+			for i, key := range keys {
168
+				preservBuf = append(preservBuf, buf[0:nr]...)
169
+				if nr != 1 || buf[0] != key {
170
+					break
171
+				}
172
+				if i == len(keys)-1 {
173
+					src.Close()
174
+					return 0, DetachError{}
175
+				}
176
+				nr, er = src.Read(buf)
177
+			}
178
+			var nw int
179
+			var ew error
180
+			if len(preservBuf) > 0 {
181
+				nw, ew = dst.Write(preservBuf)
182
+				nr = len(preservBuf)
183
+			} else {
184
+				// ---- End of docker
185
+				nw, ew = dst.Write(buf[0:nr])
186
+			}
187
+			if nw > 0 {
188
+				written += int64(nw)
189
+			}
190
+			if ew != nil {
191
+				err = ew
192
+				break
193
+			}
194
+			if nr != nw {
195
+				err = io.ErrShortWrite
196
+				break
197
+			}
198
+		}
199
+		if er == io.EOF {
200
+			break
201
+		}
202
+		if er != nil {
203
+			err = er
204
+			break
205
+		}
206
+	}
207
+	return written, err
208
+}
... ...
@@ -62,6 +62,7 @@ func (c *Config) StdinPipe() io.WriteCloser {
62 62
 
63 63
 // StdoutPipe creates a new io.ReadCloser with an empty bytes pipe.
64 64
 // It adds this new out pipe to the Stdout broadcaster.
65
+// This will block stdout if unconsumed.
65 66
 func (c *Config) StdoutPipe() io.ReadCloser {
66 67
 	bytesPipe := ioutils.NewBytesPipe()
67 68
 	c.stdout.Add(bytesPipe)
... ...
@@ -70,6 +71,7 @@ func (c *Config) StdoutPipe() io.ReadCloser {
70 70
 
71 71
 // StderrPipe creates a new io.ReadCloser with an empty bytes pipe.
72 72
 // It adds this new err pipe to the Stderr broadcaster.
73
+// This will block stderr if unconsumed.
73 74
 func (c *Config) StderrPipe() io.ReadCloser {
74 75
 	bytesPipe := ioutils.NewBytesPipe()
75 76
 	c.stderr.Add(bytesPipe)
... ...
@@ -9,11 +9,20 @@ import (
9 9
 	"github.com/docker/docker/api/errors"
10 10
 	"github.com/docker/docker/api/types/backend"
11 11
 	"github.com/docker/docker/container"
12
+	"github.com/docker/docker/container/stream"
12 13
 	"github.com/docker/docker/daemon/logger"
13 14
 	"github.com/docker/docker/pkg/stdcopy"
14 15
 	"github.com/docker/docker/pkg/term"
15 16
 )
16 17
 
18
+type containerAttachConfig struct {
19
+	detachKeys     []byte
20
+	stdin          io.ReadCloser
21
+	stdout, stderr io.Writer
22
+	showHistory    bool
23
+	stream         bool
24
+}
25
+
17 26
 // ContainerAttach attaches to logs according to the config passed in. See ContainerAttachConfig.
18 27
 func (daemon *Daemon) ContainerAttach(prefixOrName string, c *backend.ContainerAttachConfig) error {
19 28
 	keys := []byte{}
... ...
@@ -45,20 +54,23 @@ func (daemon *Daemon) ContainerAttach(prefixOrName string, c *backend.ContainerA
45 45
 		outStream = stdcopy.NewStdWriter(outStream, stdcopy.Stdout)
46 46
 	}
47 47
 
48
-	var stdin io.ReadCloser
49
-	var stdout, stderr io.Writer
48
+	var cfg containerAttachConfig
50 49
 
51 50
 	if c.UseStdin {
52
-		stdin = inStream
51
+		cfg.stdin = inStream
53 52
 	}
54 53
 	if c.UseStdout {
55
-		stdout = outStream
54
+		cfg.stdout = outStream
56 55
 	}
57 56
 	if c.UseStderr {
58
-		stderr = errStream
57
+		cfg.stderr = errStream
59 58
 	}
60 59
 
61
-	if err := daemon.containerAttach(container, stdin, stdout, stderr, c.Logs, c.Stream, keys); err != nil {
60
+	cfg.showHistory = c.Logs
61
+	cfg.stream = c.Stream
62
+	cfg.detachKeys = keys
63
+
64
+	if err := daemon.containerAttach(container, &cfg); err != nil {
62 65
 		fmt.Fprintf(outStream, "Error attaching: %s\n", err)
63 66
 	}
64 67
 	return nil
... ...
@@ -70,11 +82,20 @@ func (daemon *Daemon) ContainerAttachRaw(prefixOrName string, stdin io.ReadClose
70 70
 	if err != nil {
71 71
 		return err
72 72
 	}
73
-	return daemon.containerAttach(container, stdin, stdout, stderr, false, stream, nil)
73
+	cfg := &containerAttachConfig{
74
+		stdin:  stdin,
75
+		stdout: stdout,
76
+		stderr: stderr,
77
+		stream: stream,
78
+	}
79
+	return daemon.containerAttach(container, cfg)
74 80
 }
75 81
 
76
-func (daemon *Daemon) containerAttach(c *container.Container, stdin io.ReadCloser, stdout, stderr io.Writer, logs, stream bool, keys []byte) error {
77
-	if logs {
82
+func (daemon *Daemon) containerAttach(c *container.Container, cfg *containerAttachConfig) error {
83
+	stdin := cfg.stdin
84
+	stdout := cfg.stdout
85
+	stderr := cfg.stderr
86
+	if cfg.showHistory {
78 87
 		logDriver, err := daemon.getLogger(c)
79 88
 		if err != nil {
80 89
 			return err
... ...
@@ -107,41 +128,47 @@ func (daemon *Daemon) containerAttach(c *container.Container, stdin io.ReadClose
107 107
 
108 108
 	daemon.LogContainerEvent(c, "attach")
109 109
 
110
-	//stream
111
-	if stream {
112
-		var stdinPipe io.ReadCloser
113
-		if stdin != nil {
114
-			r, w := io.Pipe()
115
-			go func() {
116
-				defer w.Close()
117
-				defer logrus.Debug("Closing buffered stdin pipe")
118
-				io.Copy(w, stdin)
119
-			}()
120
-			stdinPipe = r
121
-		}
122
-
123
-		waitChan := make(chan struct{})
124
-		if c.Config.StdinOnce && !c.Config.Tty {
125
-			go func() {
126
-				c.WaitStop(-1 * time.Second)
127
-				close(waitChan)
128
-			}()
129
-		}
110
+	if !cfg.stream {
111
+		return nil
112
+	}
130 113
 
131
-		err := <-c.Attach(stdinPipe, stdout, stderr, keys)
132
-		if err != nil {
133
-			if _, ok := err.(container.DetachError); ok {
134
-				daemon.LogContainerEvent(c, "detach")
135
-			} else {
136
-				logrus.Errorf("attach failed with error: %v", err)
137
-			}
138
-		}
114
+	var stdinPipe io.ReadCloser
115
+	if stdin != nil {
116
+		r, w := io.Pipe()
117
+		go func() {
118
+			defer w.Close()
119
+			defer logrus.Debug("Closing buffered stdin pipe")
120
+			io.Copy(w, stdin)
121
+		}()
122
+		stdinPipe = r
123
+	}
139 124
 
140
-		// If we are in stdinonce mode, wait for the process to end
141
-		// otherwise, simply return
142
-		if c.Config.StdinOnce && !c.Config.Tty {
125
+	waitChan := make(chan struct{})
126
+	if c.Config.StdinOnce && !c.Config.Tty {
127
+		defer func() {
143 128
 			<-waitChan
129
+		}()
130
+		go func() {
131
+			c.WaitStop(-1 * time.Second)
132
+			close(waitChan)
133
+		}()
134
+	}
135
+
136
+	aCfg := &stream.AttachConfig{
137
+		Stdin:      stdinPipe,
138
+		Stdout:     stdout,
139
+		Stderr:     stderr,
140
+		DetachKeys: cfg.detachKeys,
141
+	}
142
+
143
+	err := <-c.Attach(aCfg)
144
+	if err != nil {
145
+		if _, ok := err.(stream.DetachError); ok {
146
+			daemon.LogContainerEvent(c, "detach")
147
+		} else {
148
+			logrus.Errorf("attach failed with error: %v", err)
144 149
 		}
145 150
 	}
151
+
146 152
 	return nil
147 153
 }
... ...
@@ -13,6 +13,7 @@ import (
13 13
 	"github.com/docker/docker/api/types"
14 14
 	"github.com/docker/docker/api/types/strslice"
15 15
 	"github.com/docker/docker/container"
16
+	"github.com/docker/docker/container/stream"
16 17
 	"github.com/docker/docker/daemon/exec"
17 18
 	"github.com/docker/docker/libcontainerd"
18 19
 	"github.com/docker/docker/pkg/pools"
... ...
@@ -209,7 +210,15 @@ func (d *Daemon) ContainerExecStart(ctx context.Context, name string, stdin io.R
209 209
 		return err
210 210
 	}
211 211
 
212
-	attachErr := container.AttachStreams(ctx, ec.StreamConfig, ec.OpenStdin, true, ec.Tty, cStdin, cStdout, cStderr, ec.DetachKeys)
212
+	attachConfig := &stream.AttachConfig{
213
+		TTY:        ec.Tty,
214
+		Stdin:      cStdin,
215
+		Stdout:     cStdout,
216
+		Stderr:     cStderr,
217
+		DetachKeys: ec.DetachKeys,
218
+		CloseStdin: true,
219
+	}
220
+	attachErr := ec.StreamConfig.Attach(ctx, attachConfig)
213 221
 
214 222
 	systemPid, err := d.containerd.AddProcess(ctx, c.ID, name, p, ec.InitializeStdio)
215 223
 	if err != nil {
... ...
@@ -233,7 +242,7 @@ func (d *Daemon) ContainerExecStart(ctx context.Context, name string, stdin io.R
233 233
 		return fmt.Errorf("context cancelled")
234 234
 	case err := <-attachErr:
235 235
 		if err != nil {
236
-			if _, ok := err.(container.DetachError); !ok {
236
+			if _, ok := err.(stream.DetachError); !ok {
237 237
 				return fmt.Errorf("exec attach failed with error: %v", err)
238 238
 			}
239 239
 			d.LogContainerEvent(c, "exec_detach")