Browse code

Extract StreamConfig struct out of the daemon package.

This is a small configuration struct used in two scenarios:

1. To attach I/O pipes to a running containers.
2. To attach to execution processes inside running containers.

Although they are similar, keeping the struct in the same package
than exec and container can generate cycled dependencies if we
move any of them outside the daemon, like we want to do
with the container.

Signed-off-by: David Calavera <david.calavera@gmail.com>

David Calavera authored on 2015/11/18 09:21:44
Showing 5 changed files
... ...
@@ -19,8 +19,6 @@ import (
19 19
 	"github.com/docker/docker/daemon/logger/jsonfilelog"
20 20
 	"github.com/docker/docker/daemon/network"
21 21
 	derr "github.com/docker/docker/errors"
22
-	"github.com/docker/docker/pkg/broadcaster"
23
-	"github.com/docker/docker/pkg/ioutils"
24 22
 	"github.com/docker/docker/pkg/nat"
25 23
 	"github.com/docker/docker/pkg/promise"
26 24
 	"github.com/docker/docker/pkg/signal"
... ...
@@ -36,17 +34,10 @@ var (
36 36
 	ErrRootFSReadOnly = errors.New("container rootfs is marked read-only")
37 37
 )
38 38
 
39
-type streamConfig struct {
40
-	stdout    *broadcaster.Unbuffered
41
-	stderr    *broadcaster.Unbuffered
42
-	stdin     io.ReadCloser
43
-	stdinPipe io.WriteCloser
44
-}
45
-
46 39
 // CommonContainer holds the fields for a container which are
47 40
 // applicable across all platforms supported by the daemon.
48 41
 type CommonContainer struct {
49
-	streamConfig
42
+	*runconfig.StreamConfig
50 43
 	// embed for Container to support states directly.
51 44
 	*State          `json:"State"` // Needed for remote api version <= 1.11
52 45
 	root            string         // Path to the "home" of the container, including metadata.
... ...
@@ -87,6 +78,7 @@ func newBaseContainer(id, root string) *Container {
87 87
 			execCommands: newExecStore(),
88 88
 			root:         root,
89 89
 			MountPoints:  make(map[string]*volume.MountPoint),
90
+			StreamConfig: runconfig.NewStreamConfig(),
90 91
 		},
91 92
 	}
92 93
 }
... ...
@@ -243,30 +235,6 @@ func (container *Container) getRootResourcePath(path string) (string, error) {
243 243
 	return symlink.FollowSymlinkInScope(filepath.Join(container.root, cleanPath), container.root)
244 244
 }
245 245
 
246
-// streamConfig.StdinPipe returns a WriteCloser which can be used to feed data
247
-// to the standard input of the container's active process.
248
-// Container.StdoutPipe and Container.StderrPipe each return a ReadCloser
249
-// which can be used to retrieve the standard output (and error) generated
250
-// by the container's active process. The output (and error) are actually
251
-// copied and delivered to all StdoutPipe and StderrPipe consumers, using
252
-// a kind of "broadcaster".
253
-
254
-func (streamConfig *streamConfig) StdinPipe() io.WriteCloser {
255
-	return streamConfig.stdinPipe
256
-}
257
-
258
-func (streamConfig *streamConfig) StdoutPipe() io.ReadCloser {
259
-	bytesPipe := ioutils.NewBytesPipe(nil)
260
-	streamConfig.stdout.Add(bytesPipe)
261
-	return bytesPipe
262
-}
263
-
264
-func (streamConfig *streamConfig) StderrPipe() io.ReadCloser {
265
-	bytesPipe := ioutils.NewBytesPipe(nil)
266
-	streamConfig.stderr.Add(bytesPipe)
267
-	return bytesPipe
268
-}
269
-
270 246
 // ExitOnNext signals to the monitor that it should not restart the container
271 247
 // after we send the kill signal.
272 248
 func (container *Container) ExitOnNext() {
... ...
@@ -372,10 +340,10 @@ func (container *Container) getExecIDs() []string {
372 372
 // Attach connects to the container's TTY, delegating to standard
373 373
 // streams or websockets depending on the configuration.
374 374
 func (container *Container) Attach(stdin io.ReadCloser, stdout io.Writer, stderr io.Writer) chan error {
375
-	return attach(&container.streamConfig, container.Config.OpenStdin, container.Config.StdinOnce, container.Config.Tty, stdin, stdout, stderr)
375
+	return attach(container.StreamConfig, container.Config.OpenStdin, container.Config.StdinOnce, container.Config.Tty, stdin, stdout, stderr)
376 376
 }
377 377
 
378
-func attach(streamConfig *streamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer) chan error {
378
+func attach(streamConfig *runconfig.StreamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer) chan error {
379 379
 	var (
380 380
 		cStdout, cStderr io.ReadCloser
381 381
 		cStdin           io.WriteCloser
... ...
@@ -32,12 +32,10 @@ import (
32 32
 	"github.com/docker/docker/graph"
33 33
 	"github.com/docker/docker/image"
34 34
 	"github.com/docker/docker/pkg/archive"
35
-	"github.com/docker/docker/pkg/broadcaster"
36 35
 	"github.com/docker/docker/pkg/discovery"
37 36
 	"github.com/docker/docker/pkg/fileutils"
38 37
 	"github.com/docker/docker/pkg/graphdb"
39 38
 	"github.com/docker/docker/pkg/idtools"
40
-	"github.com/docker/docker/pkg/ioutils"
41 39
 	"github.com/docker/docker/pkg/jsonmessage"
42 40
 	"github.com/docker/docker/pkg/mount"
43 41
 	"github.com/docker/docker/pkg/namesgenerator"
... ...
@@ -205,15 +203,11 @@ func (daemon *Daemon) Register(container *Container) error {
205 205
 	}
206 206
 
207 207
 	// Attach to stdout and stderr
208
-	container.stderr = new(broadcaster.Unbuffered)
209
-	container.stdout = new(broadcaster.Unbuffered)
210
-	// Attach to stdin
211 208
 	if container.Config.OpenStdin {
212
-		container.stdin, container.stdinPipe = io.Pipe()
209
+		container.NewInputPipes()
213 210
 	} else {
214
-		container.stdinPipe = ioutils.NopWriteCloser(ioutil.Discard) // Silently drop stdin
211
+		container.NewNopInputPipe()
215 212
 	}
216
-	// done
217 213
 	daemon.containers.Add(container.ID, container)
218 214
 
219 215
 	// don't update the Suffixarray if we're starting up
... ...
@@ -2,7 +2,6 @@ package daemon
2 2
 
3 3
 import (
4 4
 	"io"
5
-	"io/ioutil"
6 5
 	"strings"
7 6
 	"sync"
8 7
 	"time"
... ...
@@ -10,8 +9,6 @@ import (
10 10
 	"github.com/Sirupsen/logrus"
11 11
 	"github.com/docker/docker/daemon/execdriver"
12 12
 	derr "github.com/docker/docker/errors"
13
-	"github.com/docker/docker/pkg/broadcaster"
14
-	"github.com/docker/docker/pkg/ioutils"
15 13
 	"github.com/docker/docker/pkg/pools"
16 14
 	"github.com/docker/docker/pkg/promise"
17 15
 	"github.com/docker/docker/pkg/stringid"
... ...
@@ -28,12 +25,12 @@ type ExecConfig struct {
28 28
 	Running       bool
29 29
 	ExitCode      int
30 30
 	ProcessConfig *execdriver.ProcessConfig
31
-	streamConfig
32
-	OpenStdin  bool
33
-	OpenStderr bool
34
-	OpenStdout bool
35
-	Container  *Container
36
-	canRemove  bool
31
+	OpenStdin     bool
32
+	OpenStderr    bool
33
+	OpenStdout    bool
34
+	streamConfig  *runconfig.StreamConfig
35
+	Container     *Container
36
+	canRemove     bool
37 37
 
38 38
 	// waitStart will be closed immediately after the exec is really started.
39 39
 	waitStart chan struct{}
... ...
@@ -170,7 +167,7 @@ func (d *Daemon) ContainerExecCreate(config *runconfig.ExecConfig) (string, erro
170 170
 		OpenStdin:     config.AttachStdin,
171 171
 		OpenStdout:    config.AttachStdout,
172 172
 		OpenStderr:    config.AttachStderr,
173
-		streamConfig:  streamConfig{},
173
+		streamConfig:  runconfig.NewStreamConfig(),
174 174
 		ProcessConfig: processConfig,
175 175
 		Container:     container,
176 176
 		Running:       false,
... ...
@@ -225,16 +222,13 @@ func (d *Daemon) ContainerExecStart(name string, stdin io.ReadCloser, stdout io.
225 225
 		cStderr = stderr
226 226
 	}
227 227
 
228
-	ec.streamConfig.stderr = new(broadcaster.Unbuffered)
229
-	ec.streamConfig.stdout = new(broadcaster.Unbuffered)
230
-	// Attach to stdin
231 228
 	if ec.OpenStdin {
232
-		ec.streamConfig.stdin, ec.streamConfig.stdinPipe = io.Pipe()
229
+		ec.streamConfig.NewInputPipes()
233 230
 	} else {
234
-		ec.streamConfig.stdinPipe = ioutils.NopWriteCloser(ioutil.Discard) // Silently drop stdin
231
+		ec.streamConfig.NewNopInputPipe()
235 232
 	}
236 233
 
237
-	attachErr := attach(&ec.streamConfig, ec.OpenStdin, true, ec.ProcessConfig.Tty, cStdin, cStdout, cStderr)
234
+	attachErr := attach(ec.streamConfig, ec.OpenStdin, true, ec.ProcessConfig.Tty, cStdin, cStdout, cStderr)
238 235
 
239 236
 	execErr := make(chan error)
240 237
 
... ...
@@ -354,23 +348,17 @@ func (d *Daemon) containerExec(container *Container, ec *ExecConfig) error {
354 354
 }
355 355
 
356 356
 func (d *Daemon) monitorExec(container *Container, ExecConfig *ExecConfig, callback execdriver.DriverCallback) error {
357
-	pipes := execdriver.NewPipes(ExecConfig.streamConfig.stdin, ExecConfig.streamConfig.stdout, ExecConfig.streamConfig.stderr, ExecConfig.OpenStdin)
357
+	pipes := execdriver.NewPipes(ExecConfig.streamConfig.Stdin(), ExecConfig.streamConfig.Stdout(), ExecConfig.streamConfig.Stderr(), ExecConfig.OpenStdin)
358 358
 	exitCode, err := d.Exec(container, ExecConfig, pipes, callback)
359 359
 	if err != nil {
360 360
 		logrus.Errorf("Error running command in existing container %s: %s", container.ID, err)
361 361
 	}
362 362
 	logrus.Debugf("Exec task in container %s exited with code %d", container.ID, exitCode)
363
-	if ExecConfig.OpenStdin {
364
-		if err := ExecConfig.streamConfig.stdin.Close(); err != nil {
365
-			logrus.Errorf("Error closing stdin while running in %s: %s", container.ID, err)
366
-		}
367
-	}
368
-	if err := ExecConfig.streamConfig.stdout.Clean(); err != nil {
369
-		logrus.Errorf("Error closing stdout while running in %s: %s", container.ID, err)
370
-	}
371
-	if err := ExecConfig.streamConfig.stderr.Clean(); err != nil {
372
-		logrus.Errorf("Error closing stderr while running in %s: %s", container.ID, err)
363
+
364
+	if err := ExecConfig.streamConfig.CloseStreams(); err != nil {
365
+		logrus.Errorf("%s: %s", container.ID, err)
373 366
 	}
367
+
374 368
 	if ExecConfig.ProcessConfig.Terminal != nil {
375 369
 		if err := ExecConfig.ProcessConfig.Terminal.Close(); err != nil {
376 370
 			logrus.Errorf("Error closing terminal while running in container %s: %s", container.ID, err)
... ...
@@ -158,7 +158,7 @@ func (m *containerMonitor) Start() error {
158 158
 			return err
159 159
 		}
160 160
 
161
-		pipes := execdriver.NewPipes(m.container.stdin, m.container.stdout, m.container.stderr, m.container.Config.OpenStdin)
161
+		pipes := execdriver.NewPipes(m.container.Stdin(), m.container.Stdout(), m.container.Stderr(), m.container.Config.OpenStdin)
162 162
 
163 163
 		m.logEvent("start")
164 164
 
... ...
@@ -329,18 +329,8 @@ func (m *containerMonitor) resetContainer(lock bool) {
329 329
 		defer container.Unlock()
330 330
 	}
331 331
 
332
-	if container.Config.OpenStdin {
333
-		if err := container.stdin.Close(); err != nil {
334
-			logrus.Errorf("%s: Error close stdin: %s", container.ID, err)
335
-		}
336
-	}
337
-
338
-	if err := container.stdout.Clean(); err != nil {
339
-		logrus.Errorf("%s: Error close stdout: %s", container.ID, err)
340
-	}
341
-
342
-	if err := container.stderr.Clean(); err != nil {
343
-		logrus.Errorf("%s: Error close stderr: %s", container.ID, err)
332
+	if err := container.CloseStreams(); err != nil {
333
+		logrus.Errorf("%s: %s", container.ID, err)
344 334
 	}
345 335
 
346 336
 	if container.command != nil && container.command.ProcessConfig.Terminal != nil {
... ...
@@ -351,7 +341,7 @@ func (m *containerMonitor) resetContainer(lock bool) {
351 351
 
352 352
 	// Re-create a brand new stdin pipe once the container exited
353 353
 	if container.Config.OpenStdin {
354
-		container.stdin, container.stdinPipe = io.Pipe()
354
+		container.NewInputPipes()
355 355
 	}
356 356
 
357 357
 	if container.logDriver != nil {
358 358
new file mode 100644
... ...
@@ -0,0 +1,107 @@
0
+package runconfig
1
+
2
+import (
3
+	"fmt"
4
+	"io"
5
+	"io/ioutil"
6
+	"strings"
7
+
8
+	"github.com/docker/docker/pkg/broadcaster"
9
+	"github.com/docker/docker/pkg/ioutils"
10
+)
11
+
12
+// StreamConfig holds information about I/O streams managed together.
13
+//
14
+// streamConfig.StdinPipe returns a WriteCloser which can be used to feed data
15
+// to the standard input of the streamConfig's active process.
16
+// streamConfig.StdoutPipe and streamConfig.StderrPipe each return a ReadCloser
17
+// which can be used to retrieve the standard output (and error) generated
18
+// by the container's active process. The output (and error) are actually
19
+// copied and delivered to all StdoutPipe and StderrPipe consumers, using
20
+// a kind of "broadcaster".
21
+type StreamConfig struct {
22
+	stdout    *broadcaster.Unbuffered
23
+	stderr    *broadcaster.Unbuffered
24
+	stdin     io.ReadCloser
25
+	stdinPipe io.WriteCloser
26
+}
27
+
28
+// NewStreamConfig creates a stream config and initializes
29
+// the standard err and standard out to new unbuffered broadcasters.
30
+func NewStreamConfig() *StreamConfig {
31
+	return &StreamConfig{
32
+		stderr: new(broadcaster.Unbuffered),
33
+		stdout: new(broadcaster.Unbuffered),
34
+	}
35
+}
36
+
37
+// Stdout returns the standard output in the configuration.
38
+func (streamConfig *StreamConfig) Stdout() *broadcaster.Unbuffered {
39
+	return streamConfig.stdout
40
+}
41
+
42
+// Stderr returns the standard error in the configuration.
43
+func (streamConfig *StreamConfig) Stderr() *broadcaster.Unbuffered {
44
+	return streamConfig.stderr
45
+}
46
+
47
+// Stdin returns the standard input in the configuration.
48
+func (streamConfig *StreamConfig) Stdin() io.ReadCloser {
49
+	return streamConfig.stdin
50
+}
51
+
52
+// StdinPipe returns an input writer pipe as an io.WriteCloser.
53
+func (streamConfig *StreamConfig) StdinPipe() io.WriteCloser {
54
+	return streamConfig.stdinPipe
55
+}
56
+
57
+// StdoutPipe creates a new io.ReadCloser with an empty bytes pipe.
58
+// It adds this new out pipe to the Stdout broadcaster.
59
+func (streamConfig *StreamConfig) StdoutPipe() io.ReadCloser {
60
+	bytesPipe := ioutils.NewBytesPipe(nil)
61
+	streamConfig.stdout.Add(bytesPipe)
62
+	return bytesPipe
63
+}
64
+
65
+// StderrPipe creates a new io.ReadCloser with an empty bytes pipe.
66
+// It adds this new err pipe to the Stderr broadcaster.
67
+func (streamConfig *StreamConfig) StderrPipe() io.ReadCloser {
68
+	bytesPipe := ioutils.NewBytesPipe(nil)
69
+	streamConfig.stderr.Add(bytesPipe)
70
+	return bytesPipe
71
+}
72
+
73
+// NewInputPipes creates new pipes for both standard inputs, Stdin and StdinPipe.
74
+func (streamConfig *StreamConfig) NewInputPipes() {
75
+	streamConfig.stdin, streamConfig.stdinPipe = io.Pipe()
76
+}
77
+
78
+// NewNopInputPipe creates a new input pipe that will silently drop all messages in the input.
79
+func (streamConfig *StreamConfig) NewNopInputPipe() {
80
+	streamConfig.stdinPipe = ioutils.NopWriteCloser(ioutil.Discard)
81
+}
82
+
83
+// CloseStreams ensures that the configured streams are properly closed.
84
+func (streamConfig *StreamConfig) CloseStreams() error {
85
+	var errors []string
86
+
87
+	if streamConfig.stdin != nil {
88
+		if err := streamConfig.stdin.Close(); err != nil {
89
+			errors = append(errors, fmt.Sprintf("error close stdin: %s", err))
90
+		}
91
+	}
92
+
93
+	if err := streamConfig.stdout.Clean(); err != nil {
94
+		errors = append(errors, fmt.Sprintf("error close stdout: %s", err))
95
+	}
96
+
97
+	if err := streamConfig.stderr.Clean(); err != nil {
98
+		errors = append(errors, fmt.Sprintf("error close stderr: %s", err))
99
+	}
100
+
101
+	if len(errors) > 0 {
102
+		return fmt.Errorf(strings.Join(errors, "\n"))
103
+	}
104
+
105
+	return nil
106
+}