Browse code

Resolve race conditions in attach API call

Signed-off-by: Jim Minter <jminter@redhat.com>

Jim Minter authored on 2017/01/30 21:49:22
Showing 4 changed files
... ...
@@ -365,19 +365,6 @@ func (container *Container) GetExecIDs() []string {
365 365
 	return container.ExecCommands.List()
366 366
 }
367 367
 
368
-// Attach connects to the container's stdio to the client streams
369
-func (container *Container) Attach(cfg *stream.AttachConfig) chan error {
370
-	ctx := container.InitAttachContext()
371
-
372
-	cfg.TTY = container.Config.Tty
373
-	if !container.Config.OpenStdin {
374
-		cfg.Stdin = nil
375
-	}
376
-	cfg.CloseStdin = cfg.Stdin != nil && container.Config.StdinOnce
377
-
378
-	return container.StreamConfig.Attach(ctx, cfg)
379
-}
380
-
381 368
 // ShouldRestart decides whether the daemon should restart the container or not.
382 369
 // This is based on the container's restart policy.
383 370
 func (container *Container) ShouldRestart() bool {
... ...
@@ -33,33 +33,51 @@ type AttachConfig struct {
33 33
 	// For example, this would close the attached container's stdin.
34 34
 	CloseStdin bool
35 35
 
36
+	// UseStd* indicate whether the client has requested to be connected to the
37
+	// given stream or not.  These flags are used instead of checking Std* != nil
38
+	// at points before the client streams Std* are wired up.
39
+	UseStdin, UseStdout, UseStderr bool
40
+
41
+	// CStd* are the streams directly connected to the container
42
+	CStdin           io.WriteCloser
43
+	CStdout, CStderr io.ReadCloser
44
+
36 45
 	// Provide client streams to wire up to
37 46
 	Stdin          io.ReadCloser
38 47
 	Stdout, Stderr io.Writer
39 48
 }
40 49
 
41
-// Attach attaches the stream config to the streams specified in
42
-// the AttachOptions
43
-func (c *Config) Attach(ctx context.Context, cfg *AttachConfig) chan error {
50
+// AttachStreams attaches the container's streams to the AttachConfig
51
+func (c *Config) AttachStreams(cfg *AttachConfig) {
52
+	if cfg.UseStdin {
53
+		cfg.CStdin = c.StdinPipe()
54
+	}
55
+
56
+	if cfg.UseStdout {
57
+		cfg.CStdout = c.StdoutPipe()
58
+	}
59
+
60
+	if cfg.UseStderr {
61
+		cfg.CStderr = c.StderrPipe()
62
+	}
63
+}
64
+
65
+// CopyStreams starts goroutines to copy data in and out to/from the container
66
+func (c *Config) CopyStreams(ctx context.Context, cfg *AttachConfig) chan error {
44 67
 	var (
45
-		cStdout, cStderr io.ReadCloser
46
-		cStdin           io.WriteCloser
47
-		wg               sync.WaitGroup
48
-		errors           = make(chan error, 3)
68
+		wg     sync.WaitGroup
69
+		errors = make(chan error, 3)
49 70
 	)
50 71
 
51 72
 	if cfg.Stdin != nil {
52
-		cStdin = c.StdinPipe()
53 73
 		wg.Add(1)
54 74
 	}
55 75
 
56 76
 	if cfg.Stdout != nil {
57
-		cStdout = c.StdoutPipe()
58 77
 		wg.Add(1)
59 78
 	}
60 79
 
61 80
 	if cfg.Stderr != nil {
62
-		cStderr = c.StderrPipe()
63 81
 		wg.Add(1)
64 82
 	}
65 83
 
... ...
@@ -72,9 +90,9 @@ func (c *Config) Attach(ctx context.Context, cfg *AttachConfig) chan error {
72 72
 
73 73
 		var err error
74 74
 		if cfg.TTY {
75
-			_, err = copyEscapable(cStdin, cfg.Stdin, cfg.DetachKeys)
75
+			_, err = copyEscapable(cfg.CStdin, cfg.Stdin, cfg.DetachKeys)
76 76
 		} else {
77
-			_, err = io.Copy(cStdin, cfg.Stdin)
77
+			_, err = io.Copy(cfg.CStdin, cfg.Stdin)
78 78
 		}
79 79
 		if err == io.ErrClosedPipe {
80 80
 			err = nil
... ...
@@ -84,14 +102,14 @@ func (c *Config) Attach(ctx context.Context, cfg *AttachConfig) chan error {
84 84
 			errors <- err
85 85
 		}
86 86
 		if cfg.CloseStdin && !cfg.TTY {
87
-			cStdin.Close()
87
+			cfg.CStdin.Close()
88 88
 		} else {
89 89
 			// No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr
90
-			if cStdout != nil {
91
-				cStdout.Close()
90
+			if cfg.CStdout != nil {
91
+				cfg.CStdout.Close()
92 92
 			}
93
-			if cStderr != nil {
94
-				cStderr.Close()
93
+			if cfg.CStderr != nil {
94
+				cfg.CStderr.Close()
95 95
 			}
96 96
 		}
97 97
 		logrus.Debug("attach: stdin: end")
... ...
@@ -121,8 +139,8 @@ func (c *Config) Attach(ctx context.Context, cfg *AttachConfig) chan error {
121 121
 		wg.Done()
122 122
 	}
123 123
 
124
-	go attachStream("stdout", cfg.Stdout, cStdout)
125
-	go attachStream("stderr", cfg.Stderr, cStderr)
124
+	go attachStream("stdout", cfg.Stdout, cfg.CStdout)
125
+	go attachStream("stderr", cfg.Stderr, cfg.CStderr)
126 126
 
127 127
 	return promise.Go(func() error {
128 128
 		done := make(chan struct{})
... ...
@@ -134,14 +152,14 @@ func (c *Config) Attach(ctx context.Context, cfg *AttachConfig) chan error {
134 134
 		case <-done:
135 135
 		case <-ctx.Done():
136 136
 			// close all pipes
137
-			if cStdin != nil {
138
-				cStdin.Close()
137
+			if cfg.CStdin != nil {
138
+				cfg.CStdin.Close()
139 139
 			}
140
-			if cStdout != nil {
141
-				cStdout.Close()
140
+			if cfg.CStdout != nil {
141
+				cfg.CStdout.Close()
142 142
 			}
143
-			if cStderr != nil {
144
-				cStderr.Close()
143
+			if cfg.CStderr != nil {
144
+				cfg.CStderr.Close()
145 145
 			}
146 146
 			<-done
147 147
 		}
... ...
@@ -15,14 +15,6 @@ import (
15 15
 	"github.com/docker/docker/pkg/term"
16 16
 )
17 17
 
18
-type containerAttachConfig struct {
19
-	detachKeys     []byte
20
-	stdin          io.ReadCloser
21
-	stdout, stderr io.Writer
22
-	showHistory    bool
23
-	stream         bool
24
-}
25
-
26 18
 // ContainerAttach attaches to logs according to the config passed in. See ContainerAttachConfig.
27 19
 func (daemon *Daemon) ContainerAttach(prefixOrName string, c *backend.ContainerAttachConfig) error {
28 20
 	keys := []byte{}
... ...
@@ -43,6 +35,16 @@ func (daemon *Daemon) ContainerAttach(prefixOrName string, c *backend.ContainerA
43 43
 		return errors.NewRequestConflictError(err)
44 44
 	}
45 45
 
46
+	cfg := stream.AttachConfig{
47
+		UseStdin:   c.UseStdin && container.Config.OpenStdin,
48
+		UseStdout:  c.UseStdout,
49
+		UseStderr:  c.UseStderr,
50
+		TTY:        container.Config.Tty,
51
+		CloseStdin: container.Config.StdinOnce,
52
+		DetachKeys: keys,
53
+	}
54
+	container.StreamConfig.AttachStreams(&cfg)
55
+
46 56
 	inStream, outStream, errStream, err := c.GetStreams()
47 57
 	if err != nil {
48 58
 		return err
... ...
@@ -54,48 +56,51 @@ func (daemon *Daemon) ContainerAttach(prefixOrName string, c *backend.ContainerA
54 54
 		outStream = stdcopy.NewStdWriter(outStream, stdcopy.Stdout)
55 55
 	}
56 56
 
57
-	var cfg containerAttachConfig
58
-
59
-	if c.UseStdin {
60
-		cfg.stdin = inStream
57
+	if cfg.UseStdin {
58
+		cfg.Stdin = inStream
61 59
 	}
62
-	if c.UseStdout {
63
-		cfg.stdout = outStream
60
+	if cfg.UseStdout {
61
+		cfg.Stdout = outStream
64 62
 	}
65
-	if c.UseStderr {
66
-		cfg.stderr = errStream
63
+	if cfg.UseStderr {
64
+		cfg.Stderr = errStream
67 65
 	}
68 66
 
69
-	cfg.showHistory = c.Logs
70
-	cfg.stream = c.Stream
71
-	cfg.detachKeys = keys
72
-
73
-	if err := daemon.containerAttach(container, &cfg); err != nil {
67
+	if err := daemon.containerAttach(container, &cfg, c.Logs, c.Stream); err != nil {
74 68
 		fmt.Fprintf(outStream, "Error attaching: %s\n", err)
75 69
 	}
76 70
 	return nil
77 71
 }
78 72
 
79 73
 // ContainerAttachRaw attaches the provided streams to the container's stdio
80
-func (daemon *Daemon) ContainerAttachRaw(prefixOrName string, stdin io.ReadCloser, stdout, stderr io.Writer, stream bool) error {
74
+func (daemon *Daemon) ContainerAttachRaw(prefixOrName string, stdin io.ReadCloser, stdout, stderr io.Writer, doStream bool) error {
81 75
 	container, err := daemon.GetContainer(prefixOrName)
82 76
 	if err != nil {
83 77
 		return err
84 78
 	}
85
-	cfg := &containerAttachConfig{
86
-		stdin:  stdin,
87
-		stdout: stdout,
88
-		stderr: stderr,
89
-		stream: stream,
79
+	cfg := stream.AttachConfig{
80
+		UseStdin:   stdin != nil && container.Config.OpenStdin,
81
+		UseStdout:  stdout != nil,
82
+		UseStderr:  stderr != nil,
83
+		TTY:        container.Config.Tty,
84
+		CloseStdin: container.Config.StdinOnce,
85
+	}
86
+	container.StreamConfig.AttachStreams(&cfg)
87
+	if cfg.UseStdin {
88
+		cfg.Stdin = stdin
90 89
 	}
91
-	return daemon.containerAttach(container, cfg)
90
+	if cfg.UseStdout {
91
+		cfg.Stdout = stdout
92
+	}
93
+	if cfg.UseStderr {
94
+		cfg.Stderr = stderr
95
+	}
96
+
97
+	return daemon.containerAttach(container, &cfg, false, doStream)
92 98
 }
93 99
 
94
-func (daemon *Daemon) containerAttach(c *container.Container, cfg *containerAttachConfig) error {
95
-	stdin := cfg.stdin
96
-	stdout := cfg.stdout
97
-	stderr := cfg.stderr
98
-	if cfg.showHistory {
100
+func (daemon *Daemon) containerAttach(c *container.Container, cfg *stream.AttachConfig, logs, doStream bool) error {
101
+	if logs {
99 102
 		logDriver, err := daemon.getLogger(c)
100 103
 		if err != nil {
101 104
 			return err
... ...
@@ -113,11 +118,11 @@ func (daemon *Daemon) containerAttach(c *container.Container, cfg *containerAtta
113 113
 				if !ok {
114 114
 					break LogLoop
115 115
 				}
116
-				if msg.Source == "stdout" && stdout != nil {
117
-					stdout.Write(msg.Line)
116
+				if msg.Source == "stdout" && cfg.Stdout != nil {
117
+					cfg.Stdout.Write(msg.Line)
118 118
 				}
119
-				if msg.Source == "stderr" && stderr != nil {
120
-					stderr.Write(msg.Line)
119
+				if msg.Source == "stderr" && cfg.Stderr != nil {
120
+					cfg.Stderr.Write(msg.Line)
121 121
 				}
122 122
 			case err := <-logs.Err:
123 123
 				logrus.Errorf("Error streaming logs: %v", err)
... ...
@@ -128,19 +133,18 @@ func (daemon *Daemon) containerAttach(c *container.Container, cfg *containerAtta
128 128
 
129 129
 	daemon.LogContainerEvent(c, "attach")
130 130
 
131
-	if !cfg.stream {
131
+	if !doStream {
132 132
 		return nil
133 133
 	}
134 134
 
135
-	var stdinPipe io.ReadCloser
136
-	if stdin != nil {
135
+	if cfg.Stdin != nil {
137 136
 		r, w := io.Pipe()
138
-		go func() {
137
+		go func(stdin io.ReadCloser) {
139 138
 			defer w.Close()
140 139
 			defer logrus.Debug("Closing buffered stdin pipe")
141 140
 			io.Copy(w, stdin)
142
-		}()
143
-		stdinPipe = r
141
+		}(cfg.Stdin)
142
+		cfg.Stdin = r
144 143
 	}
145 144
 
146 145
 	waitChan := make(chan struct{})
... ...
@@ -154,14 +158,8 @@ func (daemon *Daemon) containerAttach(c *container.Container, cfg *containerAtta
154 154
 		}()
155 155
 	}
156 156
 
157
-	aCfg := &stream.AttachConfig{
158
-		Stdin:      stdinPipe,
159
-		Stdout:     stdout,
160
-		Stderr:     stderr,
161
-		DetachKeys: cfg.detachKeys,
162
-	}
163
-
164
-	err := <-c.Attach(aCfg)
157
+	ctx := c.InitAttachContext()
158
+	err := <-c.StreamConfig.CopyStreams(ctx, cfg)
165 159
 	if err != nil {
166 160
 		if _, ok := err.(stream.DetachError); ok {
167 161
 			daemon.LogContainerEvent(c, "detach")
... ...
@@ -210,15 +210,19 @@ func (d *Daemon) ContainerExecStart(ctx context.Context, name string, stdin io.R
210 210
 		return err
211 211
 	}
212 212
 
213
-	attachConfig := &stream.AttachConfig{
213
+	attachConfig := stream.AttachConfig{
214 214
 		TTY:        ec.Tty,
215
+		UseStdin:   cStdin != nil,
216
+		UseStdout:  cStdout != nil,
217
+		UseStderr:  cStderr != nil,
215 218
 		Stdin:      cStdin,
216 219
 		Stdout:     cStdout,
217 220
 		Stderr:     cStderr,
218 221
 		DetachKeys: ec.DetachKeys,
219 222
 		CloseStdin: true,
220 223
 	}
221
-	attachErr := ec.StreamConfig.Attach(ctx, attachConfig)
224
+	ec.StreamConfig.AttachStreams(&attachConfig)
225
+	attachErr := ec.StreamConfig.CopyStreams(ctx, &attachConfig)
222 226
 
223 227
 	systemPid, err := d.containerd.AddProcess(ctx, c.ID, name, p, ec.InitializeStdio)
224 228
 	if err != nil {