Browse code

Merge pull request #18051 from calavera/extract_streams

Extract StreamConfig struct out of the daemon package.

Michael Crosby authored on 2015/11/21 06:45:13
Showing 5 changed files
... ...
@@ -19,8 +19,6 @@ import (
19 19
 	"github.com/docker/docker/daemon/logger/jsonfilelog"
20 20
 	"github.com/docker/docker/daemon/network"
21 21
 	derr "github.com/docker/docker/errors"
22
-	"github.com/docker/docker/pkg/broadcaster"
23
-	"github.com/docker/docker/pkg/ioutils"
24 22
 	"github.com/docker/docker/pkg/nat"
25 23
 	"github.com/docker/docker/pkg/promise"
26 24
 	"github.com/docker/docker/pkg/signal"
... ...
@@ -36,17 +34,10 @@ var (
36 36
 	ErrRootFSReadOnly = errors.New("container rootfs is marked read-only")
37 37
 )
38 38
 
39
-type streamConfig struct {
40
-	stdout    *broadcaster.Unbuffered
41
-	stderr    *broadcaster.Unbuffered
42
-	stdin     io.ReadCloser
43
-	stdinPipe io.WriteCloser
44
-}
45
-
46 39
 // CommonContainer holds the fields for a container which are
47 40
 // applicable across all platforms supported by the daemon.
48 41
 type CommonContainer struct {
49
-	streamConfig
42
+	*runconfig.StreamConfig
50 43
 	// embed for Container to support states directly.
51 44
 	*State          `json:"State"` // Needed for remote api version <= 1.11
52 45
 	root            string         // Path to the "home" of the container, including metadata.
... ...
@@ -87,6 +78,7 @@ func newBaseContainer(id, root string) *Container {
87 87
 			execCommands: newExecStore(),
88 88
 			root:         root,
89 89
 			MountPoints:  make(map[string]*volume.MountPoint),
90
+			StreamConfig: runconfig.NewStreamConfig(),
90 91
 		},
91 92
 	}
92 93
 }
... ...
@@ -243,30 +235,6 @@ func (container *Container) getRootResourcePath(path string) (string, error) {
243 243
 	return symlink.FollowSymlinkInScope(filepath.Join(container.root, cleanPath), container.root)
244 244
 }
245 245
 
246
-// streamConfig.StdinPipe returns a WriteCloser which can be used to feed data
247
-// to the standard input of the container's active process.
248
-// Container.StdoutPipe and Container.StderrPipe each return a ReadCloser
249
-// which can be used to retrieve the standard output (and error) generated
250
-// by the container's active process. The output (and error) are actually
251
-// copied and delivered to all StdoutPipe and StderrPipe consumers, using
252
-// a kind of "broadcaster".
253
-
254
-func (streamConfig *streamConfig) StdinPipe() io.WriteCloser {
255
-	return streamConfig.stdinPipe
256
-}
257
-
258
-func (streamConfig *streamConfig) StdoutPipe() io.ReadCloser {
259
-	bytesPipe := ioutils.NewBytesPipe(nil)
260
-	streamConfig.stdout.Add(bytesPipe)
261
-	return bytesPipe
262
-}
263
-
264
-func (streamConfig *streamConfig) StderrPipe() io.ReadCloser {
265
-	bytesPipe := ioutils.NewBytesPipe(nil)
266
-	streamConfig.stderr.Add(bytesPipe)
267
-	return bytesPipe
268
-}
269
-
270 246
 // ExitOnNext signals to the monitor that it should not restart the container
271 247
 // after we send the kill signal.
272 248
 func (container *Container) ExitOnNext() {
... ...
@@ -372,10 +340,10 @@ func (container *Container) getExecIDs() []string {
372 372
 // Attach connects to the container's TTY, delegating to standard
373 373
 // streams or websockets depending on the configuration.
374 374
 func (container *Container) Attach(stdin io.ReadCloser, stdout io.Writer, stderr io.Writer) chan error {
375
-	return attach(&container.streamConfig, container.Config.OpenStdin, container.Config.StdinOnce, container.Config.Tty, stdin, stdout, stderr)
375
+	return attach(container.StreamConfig, container.Config.OpenStdin, container.Config.StdinOnce, container.Config.Tty, stdin, stdout, stderr)
376 376
 }
377 377
 
378
-func attach(streamConfig *streamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer) chan error {
378
+func attach(streamConfig *runconfig.StreamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer) chan error {
379 379
 	var (
380 380
 		cStdout, cStderr io.ReadCloser
381 381
 		cStdin           io.WriteCloser
... ...
@@ -32,12 +32,10 @@ import (
32 32
 	"github.com/docker/docker/graph"
33 33
 	"github.com/docker/docker/image"
34 34
 	"github.com/docker/docker/pkg/archive"
35
-	"github.com/docker/docker/pkg/broadcaster"
36 35
 	"github.com/docker/docker/pkg/discovery"
37 36
 	"github.com/docker/docker/pkg/fileutils"
38 37
 	"github.com/docker/docker/pkg/graphdb"
39 38
 	"github.com/docker/docker/pkg/idtools"
40
-	"github.com/docker/docker/pkg/ioutils"
41 39
 	"github.com/docker/docker/pkg/jsonmessage"
42 40
 	"github.com/docker/docker/pkg/mount"
43 41
 	"github.com/docker/docker/pkg/namesgenerator"
... ...
@@ -205,15 +203,11 @@ func (daemon *Daemon) Register(container *Container) error {
205 205
 	}
206 206
 
207 207
 	// Attach to stdout and stderr
208
-	container.stderr = new(broadcaster.Unbuffered)
209
-	container.stdout = new(broadcaster.Unbuffered)
210
-	// Attach to stdin
211 208
 	if container.Config.OpenStdin {
212
-		container.stdin, container.stdinPipe = io.Pipe()
209
+		container.NewInputPipes()
213 210
 	} else {
214
-		container.stdinPipe = ioutils.NopWriteCloser(ioutil.Discard) // Silently drop stdin
211
+		container.NewNopInputPipe()
215 212
 	}
216
-	// done
217 213
 	daemon.containers.Add(container.ID, container)
218 214
 
219 215
 	// don't update the Suffixarray if we're starting up
... ...
@@ -2,7 +2,6 @@ package daemon
2 2
 
3 3
 import (
4 4
 	"io"
5
-	"io/ioutil"
6 5
 	"strings"
7 6
 	"sync"
8 7
 	"time"
... ...
@@ -10,8 +9,6 @@ import (
10 10
 	"github.com/Sirupsen/logrus"
11 11
 	"github.com/docker/docker/daemon/execdriver"
12 12
 	derr "github.com/docker/docker/errors"
13
-	"github.com/docker/docker/pkg/broadcaster"
14
-	"github.com/docker/docker/pkg/ioutils"
15 13
 	"github.com/docker/docker/pkg/pools"
16 14
 	"github.com/docker/docker/pkg/promise"
17 15
 	"github.com/docker/docker/pkg/stringid"
... ...
@@ -28,12 +25,12 @@ type ExecConfig struct {
28 28
 	Running       bool
29 29
 	ExitCode      int
30 30
 	ProcessConfig *execdriver.ProcessConfig
31
-	streamConfig
32
-	OpenStdin  bool
33
-	OpenStderr bool
34
-	OpenStdout bool
35
-	Container  *Container
36
-	canRemove  bool
31
+	OpenStdin     bool
32
+	OpenStderr    bool
33
+	OpenStdout    bool
34
+	streamConfig  *runconfig.StreamConfig
35
+	Container     *Container
36
+	canRemove     bool
37 37
 
38 38
 	// waitStart will be closed immediately after the exec is really started.
39 39
 	waitStart chan struct{}
... ...
@@ -170,7 +167,7 @@ func (d *Daemon) ContainerExecCreate(config *runconfig.ExecConfig) (string, erro
170 170
 		OpenStdin:     config.AttachStdin,
171 171
 		OpenStdout:    config.AttachStdout,
172 172
 		OpenStderr:    config.AttachStderr,
173
-		streamConfig:  streamConfig{},
173
+		streamConfig:  runconfig.NewStreamConfig(),
174 174
 		ProcessConfig: processConfig,
175 175
 		Container:     container,
176 176
 		Running:       false,
... ...
@@ -225,16 +222,13 @@ func (d *Daemon) ContainerExecStart(name string, stdin io.ReadCloser, stdout io.
225 225
 		cStderr = stderr
226 226
 	}
227 227
 
228
-	ec.streamConfig.stderr = new(broadcaster.Unbuffered)
229
-	ec.streamConfig.stdout = new(broadcaster.Unbuffered)
230
-	// Attach to stdin
231 228
 	if ec.OpenStdin {
232
-		ec.streamConfig.stdin, ec.streamConfig.stdinPipe = io.Pipe()
229
+		ec.streamConfig.NewInputPipes()
233 230
 	} else {
234
-		ec.streamConfig.stdinPipe = ioutils.NopWriteCloser(ioutil.Discard) // Silently drop stdin
231
+		ec.streamConfig.NewNopInputPipe()
235 232
 	}
236 233
 
237
-	attachErr := attach(&ec.streamConfig, ec.OpenStdin, true, ec.ProcessConfig.Tty, cStdin, cStdout, cStderr)
234
+	attachErr := attach(ec.streamConfig, ec.OpenStdin, true, ec.ProcessConfig.Tty, cStdin, cStdout, cStderr)
238 235
 
239 236
 	execErr := make(chan error)
240 237
 
... ...
@@ -354,23 +348,17 @@ func (d *Daemon) containerExec(container *Container, ec *ExecConfig) error {
354 354
 }
355 355
 
356 356
 func (d *Daemon) monitorExec(container *Container, ExecConfig *ExecConfig, callback execdriver.DriverCallback) error {
357
-	pipes := execdriver.NewPipes(ExecConfig.streamConfig.stdin, ExecConfig.streamConfig.stdout, ExecConfig.streamConfig.stderr, ExecConfig.OpenStdin)
357
+	pipes := execdriver.NewPipes(ExecConfig.streamConfig.Stdin(), ExecConfig.streamConfig.Stdout(), ExecConfig.streamConfig.Stderr(), ExecConfig.OpenStdin)
358 358
 	exitCode, err := d.Exec(container, ExecConfig, pipes, callback)
359 359
 	if err != nil {
360 360
 		logrus.Errorf("Error running command in existing container %s: %s", container.ID, err)
361 361
 	}
362 362
 	logrus.Debugf("Exec task in container %s exited with code %d", container.ID, exitCode)
363
-	if ExecConfig.OpenStdin {
364
-		if err := ExecConfig.streamConfig.stdin.Close(); err != nil {
365
-			logrus.Errorf("Error closing stdin while running in %s: %s", container.ID, err)
366
-		}
367
-	}
368
-	if err := ExecConfig.streamConfig.stdout.Clean(); err != nil {
369
-		logrus.Errorf("Error closing stdout while running in %s: %s", container.ID, err)
370
-	}
371
-	if err := ExecConfig.streamConfig.stderr.Clean(); err != nil {
372
-		logrus.Errorf("Error closing stderr while running in %s: %s", container.ID, err)
363
+
364
+	if err := ExecConfig.streamConfig.CloseStreams(); err != nil {
365
+		logrus.Errorf("%s: %s", container.ID, err)
373 366
 	}
367
+
374 368
 	if ExecConfig.ProcessConfig.Terminal != nil {
375 369
 		if err := ExecConfig.ProcessConfig.Terminal.Close(); err != nil {
376 370
 			logrus.Errorf("Error closing terminal while running in container %s: %s", container.ID, err)
... ...
@@ -158,7 +158,7 @@ func (m *containerMonitor) Start() error {
158 158
 			return err
159 159
 		}
160 160
 
161
-		pipes := execdriver.NewPipes(m.container.stdin, m.container.stdout, m.container.stderr, m.container.Config.OpenStdin)
161
+		pipes := execdriver.NewPipes(m.container.Stdin(), m.container.Stdout(), m.container.Stderr(), m.container.Config.OpenStdin)
162 162
 
163 163
 		m.logEvent("start")
164 164
 
... ...
@@ -329,18 +329,8 @@ func (m *containerMonitor) resetContainer(lock bool) {
329 329
 		defer container.Unlock()
330 330
 	}
331 331
 
332
-	if container.Config.OpenStdin {
333
-		if err := container.stdin.Close(); err != nil {
334
-			logrus.Errorf("%s: Error close stdin: %s", container.ID, err)
335
-		}
336
-	}
337
-
338
-	if err := container.stdout.Clean(); err != nil {
339
-		logrus.Errorf("%s: Error close stdout: %s", container.ID, err)
340
-	}
341
-
342
-	if err := container.stderr.Clean(); err != nil {
343
-		logrus.Errorf("%s: Error close stderr: %s", container.ID, err)
332
+	if err := container.CloseStreams(); err != nil {
333
+		logrus.Errorf("%s: %s", container.ID, err)
344 334
 	}
345 335
 
346 336
 	if container.command != nil && container.command.ProcessConfig.Terminal != nil {
... ...
@@ -351,7 +341,7 @@ func (m *containerMonitor) resetContainer(lock bool) {
351 351
 
352 352
 	// Re-create a brand new stdin pipe once the container exited
353 353
 	if container.Config.OpenStdin {
354
-		container.stdin, container.stdinPipe = io.Pipe()
354
+		container.NewInputPipes()
355 355
 	}
356 356
 
357 357
 	if container.logDriver != nil {
358 358
new file mode 100644
... ...
@@ -0,0 +1,107 @@
0
+package runconfig
1
+
2
+import (
3
+	"fmt"
4
+	"io"
5
+	"io/ioutil"
6
+	"strings"
7
+
8
+	"github.com/docker/docker/pkg/broadcaster"
9
+	"github.com/docker/docker/pkg/ioutils"
10
+)
11
+
12
+// StreamConfig holds information about I/O streams managed together.
13
+//
14
+// streamConfig.StdinPipe returns a WriteCloser which can be used to feed data
15
+// to the standard input of the streamConfig's active process.
16
+// streamConfig.StdoutPipe and streamConfig.StderrPipe each return a ReadCloser
17
+// which can be used to retrieve the standard output (and error) generated
18
+// by the container's active process. The output (and error) are actually
19
+// copied and delivered to all StdoutPipe and StderrPipe consumers, using
20
+// a kind of "broadcaster".
21
+type StreamConfig struct {
22
+	stdout    *broadcaster.Unbuffered
23
+	stderr    *broadcaster.Unbuffered
24
+	stdin     io.ReadCloser
25
+	stdinPipe io.WriteCloser
26
+}
27
+
28
+// NewStreamConfig creates a stream config and initializes
29
+// the standard err and standard out to new unbuffered broadcasters.
30
+func NewStreamConfig() *StreamConfig {
31
+	return &StreamConfig{
32
+		stderr: new(broadcaster.Unbuffered),
33
+		stdout: new(broadcaster.Unbuffered),
34
+	}
35
+}
36
+
37
+// Stdout returns the standard output in the configuration.
38
+func (streamConfig *StreamConfig) Stdout() *broadcaster.Unbuffered {
39
+	return streamConfig.stdout
40
+}
41
+
42
+// Stderr returns the standard error in the configuration.
43
+func (streamConfig *StreamConfig) Stderr() *broadcaster.Unbuffered {
44
+	return streamConfig.stderr
45
+}
46
+
47
+// Stdin returns the standard input in the configuration.
48
+func (streamConfig *StreamConfig) Stdin() io.ReadCloser {
49
+	return streamConfig.stdin
50
+}
51
+
52
+// StdinPipe returns an input writer pipe as an io.WriteCloser.
53
+func (streamConfig *StreamConfig) StdinPipe() io.WriteCloser {
54
+	return streamConfig.stdinPipe
55
+}
56
+
57
+// StdoutPipe creates a new io.ReadCloser with an empty bytes pipe.
58
+// It adds this new out pipe to the Stdout broadcaster.
59
+func (streamConfig *StreamConfig) StdoutPipe() io.ReadCloser {
60
+	bytesPipe := ioutils.NewBytesPipe(nil)
61
+	streamConfig.stdout.Add(bytesPipe)
62
+	return bytesPipe
63
+}
64
+
65
+// StderrPipe creates a new io.ReadCloser with an empty bytes pipe.
66
+// It adds this new err pipe to the Stderr broadcaster.
67
+func (streamConfig *StreamConfig) StderrPipe() io.ReadCloser {
68
+	bytesPipe := ioutils.NewBytesPipe(nil)
69
+	streamConfig.stderr.Add(bytesPipe)
70
+	return bytesPipe
71
+}
72
+
73
+// NewInputPipes creates new pipes for both standard inputs, Stdin and StdinPipe.
74
+func (streamConfig *StreamConfig) NewInputPipes() {
75
+	streamConfig.stdin, streamConfig.stdinPipe = io.Pipe()
76
+}
77
+
78
+// NewNopInputPipe creates a new input pipe that will silently drop all messages in the input.
79
+func (streamConfig *StreamConfig) NewNopInputPipe() {
80
+	streamConfig.stdinPipe = ioutils.NopWriteCloser(ioutil.Discard)
81
+}
82
+
83
+// CloseStreams ensures that the configured streams are properly closed.
84
+func (streamConfig *StreamConfig) CloseStreams() error {
85
+	var errors []string
86
+
87
+	if streamConfig.stdin != nil {
88
+		if err := streamConfig.stdin.Close(); err != nil {
89
+			errors = append(errors, fmt.Sprintf("error close stdin: %s", err))
90
+		}
91
+	}
92
+
93
+	if err := streamConfig.stdout.Clean(); err != nil {
94
+		errors = append(errors, fmt.Sprintf("error close stdout: %s", err))
95
+	}
96
+
97
+	if err := streamConfig.stderr.Clean(); err != nil {
98
+		errors = append(errors, fmt.Sprintf("error close stderr: %s", err))
99
+	}
100
+
101
+	if len(errors) > 0 {
102
+		return fmt.Errorf(strings.Join(errors, "\n"))
103
+	}
104
+
105
+	return nil
106
+}