Browse code

Merge pull request #29370 from cpuguy83/move_streamconfig_112

[1.12.x] Move `StreamConfig` out of `runconfig`

Victor Vieux authored on 2016/12/14 09:47:51
Showing 8 changed files
... ...
@@ -16,6 +16,7 @@ import (
16 16
 	"golang.org/x/net/context"
17 17
 
18 18
 	"github.com/Sirupsen/logrus"
19
+	"github.com/docker/docker/container/stream"
19 20
 	"github.com/docker/docker/daemon/exec"
20 21
 	"github.com/docker/docker/daemon/logger"
21 22
 	"github.com/docker/docker/daemon/logger/jsonfilelog"
... ...
@@ -59,7 +60,7 @@ func (DetachError) Error() string {
59 59
 // CommonContainer holds the fields for a container which are
60 60
 // applicable across all platforms supported by the daemon.
61 61
 type CommonContainer struct {
62
-	*runconfig.StreamConfig
62
+	StreamConfig *stream.Config
63 63
 	// embed for Container to support states directly.
64 64
 	*State          `json:"State"` // Needed for remote api version <= 1.11
65 65
 	Root            string         `json:"-"` // Path to the "home" of the container, including metadata.
... ...
@@ -102,7 +103,7 @@ func NewBaseContainer(id, root string) *Container {
102 102
 			ExecCommands:  exec.NewStore(),
103 103
 			Root:          root,
104 104
 			MountPoints:   make(map[string]*volume.MountPoint),
105
-			StreamConfig:  runconfig.NewStreamConfig(),
105
+			StreamConfig:  stream.NewConfig(),
106 106
 			attachContext: &attachContext{},
107 107
 		},
108 108
 	}
... ...
@@ -367,7 +368,7 @@ func (container *Container) Attach(stdin io.ReadCloser, stdout io.Writer, stderr
367 367
 
368 368
 // AttachStreams connects streams to a TTY.
369 369
 // Used by exec too. Should this move somewhere else?
370
-func AttachStreams(ctx context.Context, streamConfig *runconfig.StreamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer, keys []byte) chan error {
370
+func AttachStreams(ctx context.Context, streamConfig *stream.Config, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer, keys []byte) chan error {
371 371
 	var (
372 372
 		cStdout, cStderr io.ReadCloser
373 373
 		cStdin           io.WriteCloser
... ...
@@ -997,6 +998,26 @@ func (container *Container) startLogging() error {
997 997
 	return nil
998 998
 }
999 999
 
1000
+// StdinPipe gets the stdin stream of the container
1001
+func (container *Container) StdinPipe() io.WriteCloser {
1002
+	return container.StreamConfig.StdinPipe()
1003
+}
1004
+
1005
+// StdoutPipe gets the stdout stream of the container
1006
+func (container *Container) StdoutPipe() io.ReadCloser {
1007
+	return container.StreamConfig.StdoutPipe()
1008
+}
1009
+
1010
+// StderrPipe gets the stderr stream of the container
1011
+func (container *Container) StderrPipe() io.ReadCloser {
1012
+	return container.StreamConfig.StderrPipe()
1013
+}
1014
+
1015
+// CloseStreams closes the container's stdio streams
1016
+func (container *Container) CloseStreams() error {
1017
+	return container.StreamConfig.CloseStreams()
1018
+}
1019
+
1000 1020
 // InitializeStdio is called by libcontainerd to connect the stdio.
1001 1021
 func (container *Container) InitializeStdio(iop libcontainerd.IOPipe) error {
1002 1022
 	if err := container.startLogging(); err != nil {
... ...
@@ -1006,7 +1027,7 @@ func (container *Container) InitializeStdio(iop libcontainerd.IOPipe) error {
1006 1006
 
1007 1007
 	container.StreamConfig.CopyToPipe(iop)
1008 1008
 
1009
-	if container.Stdin() == nil && !container.Config.Tty {
1009
+	if container.StreamConfig.Stdin() == nil && !container.Config.Tty {
1010 1010
 		if iop.Stdin != nil {
1011 1011
 			if err := iop.Stdin.Close(); err != nil {
1012 1012
 				logrus.Warnf("error closing stdin: %+v", err)
... ...
@@ -23,7 +23,7 @@ func (container *Container) Reset(lock bool) {
23 23
 
24 24
 	// Re-create a brand new stdin pipe once the container exited
25 25
 	if container.Config.OpenStdin {
26
-		container.NewInputPipes()
26
+		container.StreamConfig.NewInputPipes()
27 27
 	}
28 28
 
29 29
 	if container.LogDriver != nil {
30 30
new file mode 100644
... ...
@@ -0,0 +1,143 @@
0
+package stream
1
+
2
+import (
3
+	"fmt"
4
+	"io"
5
+	"io/ioutil"
6
+	"strings"
7
+	"sync"
8
+
9
+	"github.com/Sirupsen/logrus"
10
+	"github.com/docker/docker/libcontainerd"
11
+	"github.com/docker/docker/pkg/broadcaster"
12
+	"github.com/docker/docker/pkg/ioutils"
13
+	"github.com/docker/docker/pkg/pools"
14
+)
15
+
16
+// Config holds information about I/O streams managed together.
17
+//
18
+// config.StdinPipe returns a WriteCloser which can be used to feed data
19
+// to the standard input of the streamConfig's active process.
20
+// config.StdoutPipe and streamConfig.StderrPipe each return a ReadCloser
21
+// which can be used to retrieve the standard output (and error) generated
22
+// by the container's active process. The output (and error) are actually
23
+// copied and delivered to all StdoutPipe and StderrPipe consumers, using
24
+// a kind of "broadcaster".
25
+type Config struct {
26
+	sync.WaitGroup
27
+	stdout    *broadcaster.Unbuffered
28
+	stderr    *broadcaster.Unbuffered
29
+	stdin     io.ReadCloser
30
+	stdinPipe io.WriteCloser
31
+}
32
+
33
+// NewConfig creates a stream config and initializes
34
+// the standard err and standard out to new unbuffered broadcasters.
35
+func NewConfig() *Config {
36
+	return &Config{
37
+		stderr: new(broadcaster.Unbuffered),
38
+		stdout: new(broadcaster.Unbuffered),
39
+	}
40
+}
41
+
42
+// Stdout returns the standard output in the configuration.
43
+func (c *Config) Stdout() *broadcaster.Unbuffered {
44
+	return c.stdout
45
+}
46
+
47
+// Stderr returns the standard error in the configuration.
48
+func (c *Config) Stderr() *broadcaster.Unbuffered {
49
+	return c.stderr
50
+}
51
+
52
+// Stdin returns the standard input in the configuration.
53
+func (c *Config) Stdin() io.ReadCloser {
54
+	return c.stdin
55
+}
56
+
57
+// StdinPipe returns an input writer pipe as an io.WriteCloser.
58
+func (c *Config) StdinPipe() io.WriteCloser {
59
+	return c.stdinPipe
60
+}
61
+
62
+// StdoutPipe creates a new io.ReadCloser with an empty bytes pipe.
63
+// It adds this new out pipe to the Stdout broadcaster.
64
+func (c *Config) StdoutPipe() io.ReadCloser {
65
+	bytesPipe := ioutils.NewBytesPipe()
66
+	c.stdout.Add(bytesPipe)
67
+	return bytesPipe
68
+}
69
+
70
+// StderrPipe creates a new io.ReadCloser with an empty bytes pipe.
71
+// It adds this new err pipe to the Stderr broadcaster.
72
+func (c *Config) StderrPipe() io.ReadCloser {
73
+	bytesPipe := ioutils.NewBytesPipe()
74
+	c.stderr.Add(bytesPipe)
75
+	return bytesPipe
76
+}
77
+
78
+// NewInputPipes creates new pipes for both standard inputs, Stdin and StdinPipe.
79
+func (c *Config) NewInputPipes() {
80
+	c.stdin, c.stdinPipe = io.Pipe()
81
+}
82
+
83
+// NewNopInputPipe creates a new input pipe that will silently drop all messages in the input.
84
+func (c *Config) NewNopInputPipe() {
85
+	c.stdinPipe = ioutils.NopWriteCloser(ioutil.Discard)
86
+}
87
+
88
+// CloseStreams ensures that the configured streams are properly closed.
89
+func (c *Config) CloseStreams() error {
90
+	var errors []string
91
+
92
+	if c.stdin != nil {
93
+		if err := c.stdin.Close(); err != nil {
94
+			errors = append(errors, fmt.Sprintf("error close stdin: %s", err))
95
+		}
96
+	}
97
+
98
+	if err := c.stdout.Clean(); err != nil {
99
+		errors = append(errors, fmt.Sprintf("error close stdout: %s", err))
100
+	}
101
+
102
+	if err := c.stderr.Clean(); err != nil {
103
+		errors = append(errors, fmt.Sprintf("error close stderr: %s", err))
104
+	}
105
+
106
+	if len(errors) > 0 {
107
+		return fmt.Errorf(strings.Join(errors, "\n"))
108
+	}
109
+
110
+	return nil
111
+}
112
+
113
+// CopyToPipe connects streamconfig with a libcontainerd.IOPipe
114
+func (c *Config) CopyToPipe(iop libcontainerd.IOPipe) {
115
+	copyFunc := func(w io.Writer, r io.Reader) {
116
+		c.Add(1)
117
+		go func() {
118
+			if _, err := pools.Copy(w, r); err != nil {
119
+				logrus.Errorf("stream copy error: %+v", err)
120
+			}
121
+			c.Done()
122
+		}()
123
+	}
124
+
125
+	if iop.Stdout != nil {
126
+		copyFunc(c.Stdout(), iop.Stdout)
127
+	}
128
+	if iop.Stderr != nil {
129
+		copyFunc(c.Stderr(), iop.Stderr)
130
+	}
131
+
132
+	if stdin := c.Stdin(); stdin != nil {
133
+		if iop.Stdin != nil {
134
+			go func() {
135
+				pools.Copy(iop.Stdin, stdin)
136
+				if err := iop.Stdin.Close(); err != nil {
137
+					logrus.Errorf("failed to close stdin: %+v", err)
138
+				}
139
+			}()
140
+		}
141
+	}
142
+}
... ...
@@ -89,9 +89,9 @@ func (daemon *Daemon) load(id string) (*container.Container, error) {
89 89
 func (daemon *Daemon) Register(c *container.Container) error {
90 90
 	// Attach to stdout and stderr
91 91
 	if c.Config.OpenStdin {
92
-		c.NewInputPipes()
92
+		c.StreamConfig.NewInputPipes()
93 93
 	} else {
94
-		c.NewNopInputPipe()
94
+		c.StreamConfig.NewNopInputPipe()
95 95
 	}
96 96
 
97 97
 	daemon.containers.Add(c.ID, c)
... ...
@@ -188,9 +188,9 @@ func (d *Daemon) ContainerExecStart(ctx context.Context, name string, stdin io.R
188 188
 	}
189 189
 
190 190
 	if ec.OpenStdin {
191
-		ec.NewInputPipes()
191
+		ec.StreamConfig.NewInputPipes()
192 192
 	} else {
193
-		ec.NewNopInputPipe()
193
+		ec.StreamConfig.NewNopInputPipe()
194 194
 	}
195 195
 
196 196
 	p := libcontainerd.Process{
... ...
@@ -5,9 +5,9 @@ import (
5 5
 	"sync"
6 6
 
7 7
 	"github.com/Sirupsen/logrus"
8
+	"github.com/docker/docker/container/stream"
8 9
 	"github.com/docker/docker/libcontainerd"
9 10
 	"github.com/docker/docker/pkg/stringid"
10
-	"github.com/docker/docker/runconfig"
11 11
 )
12 12
 
13 13
 // Config holds the configurations for execs. The Daemon keeps
... ...
@@ -15,28 +15,28 @@ import (
15 15
 // examined both during and after completion.
16 16
 type Config struct {
17 17
 	sync.Mutex
18
-	*runconfig.StreamConfig
19
-	ID          string
20
-	Running     bool
21
-	ExitCode    *int
22
-	OpenStdin   bool
23
-	OpenStderr  bool
24
-	OpenStdout  bool
25
-	CanRemove   bool
26
-	ContainerID string
27
-	DetachKeys  []byte
28
-	Entrypoint  string
29
-	Args        []string
30
-	Tty         bool
31
-	Privileged  bool
32
-	User        string
18
+	StreamConfig *stream.Config
19
+	ID           string
20
+	Running      bool
21
+	ExitCode     *int
22
+	OpenStdin    bool
23
+	OpenStderr   bool
24
+	OpenStdout   bool
25
+	CanRemove    bool
26
+	ContainerID  string
27
+	DetachKeys   []byte
28
+	Entrypoint   string
29
+	Args         []string
30
+	Tty          bool
31
+	Privileged   bool
32
+	User         string
33 33
 }
34 34
 
35 35
 // NewConfig initializes the a new exec configuration
36 36
 func NewConfig() *Config {
37 37
 	return &Config{
38 38
 		ID:           stringid.GenerateNonCryptoID(),
39
-		StreamConfig: runconfig.NewStreamConfig(),
39
+		StreamConfig: stream.NewConfig(),
40 40
 	}
41 41
 }
42 42
 
... ...
@@ -44,7 +44,7 @@ func NewConfig() *Config {
44 44
 func (c *Config) InitializeStdio(iop libcontainerd.IOPipe) error {
45 45
 	c.StreamConfig.CopyToPipe(iop)
46 46
 
47
-	if c.Stdin() == nil && !c.Tty && runtime.GOOS == "windows" {
47
+	if c.StreamConfig.Stdin() == nil && !c.Tty && runtime.GOOS == "windows" {
48 48
 		if iop.Stdin != nil {
49 49
 			if err := iop.Stdin.Close(); err != nil {
50 50
 				logrus.Errorf("error closing exec stdin: %+v", err)
... ...
@@ -55,6 +55,11 @@ func (c *Config) InitializeStdio(iop libcontainerd.IOPipe) error {
55 55
 	return nil
56 56
 }
57 57
 
58
+// CloseStreams closes the stdio streams for the exec
59
+func (c *Config) CloseStreams() error {
60
+	return c.StreamConfig.CloseStreams()
61
+}
62
+
58 63
 // Store keeps track of the exec configurations.
59 64
 type Store struct {
60 65
 	commands map[string]*Config
... ...
@@ -28,7 +28,7 @@ func (daemon *Daemon) StateChanged(id string, e libcontainerd.StateInfo) error {
28 28
 	case libcontainerd.StateExit:
29 29
 		c.Lock()
30 30
 		defer c.Unlock()
31
-		c.Wait()
31
+		c.StreamConfig.Wait()
32 32
 		c.Reset(false)
33 33
 		c.SetStopped(platformConstructExitStatus(e))
34 34
 		attributes := map[string]string{
... ...
@@ -63,7 +63,7 @@ func (daemon *Daemon) StateChanged(id string, e libcontainerd.StateInfo) error {
63 63
 			defer execConfig.Unlock()
64 64
 			execConfig.ExitCode = &ec
65 65
 			execConfig.Running = false
66
-			execConfig.Wait()
66
+			execConfig.StreamConfig.Wait()
67 67
 			if err := execConfig.CloseStreams(); err != nil {
68 68
 				logrus.Errorf("%s: %s", c.ID, err)
69 69
 			}
70 70
deleted file mode 100644
... ...
@@ -1,143 +0,0 @@
1
-package runconfig
2
-
3
-import (
4
-	"fmt"
5
-	"io"
6
-	"io/ioutil"
7
-	"strings"
8
-	"sync"
9
-
10
-	"github.com/Sirupsen/logrus"
11
-	"github.com/docker/docker/libcontainerd"
12
-	"github.com/docker/docker/pkg/broadcaster"
13
-	"github.com/docker/docker/pkg/ioutils"
14
-	"github.com/docker/docker/pkg/pools"
15
-)
16
-
17
-// StreamConfig holds information about I/O streams managed together.
18
-//
19
-// streamConfig.StdinPipe returns a WriteCloser which can be used to feed data
20
-// to the standard input of the streamConfig's active process.
21
-// streamConfig.StdoutPipe and streamConfig.StderrPipe each return a ReadCloser
22
-// which can be used to retrieve the standard output (and error) generated
23
-// by the container's active process. The output (and error) are actually
24
-// copied and delivered to all StdoutPipe and StderrPipe consumers, using
25
-// a kind of "broadcaster".
26
-type StreamConfig struct {
27
-	sync.WaitGroup
28
-	stdout    *broadcaster.Unbuffered
29
-	stderr    *broadcaster.Unbuffered
30
-	stdin     io.ReadCloser
31
-	stdinPipe io.WriteCloser
32
-}
33
-
34
-// NewStreamConfig creates a stream config and initializes
35
-// the standard err and standard out to new unbuffered broadcasters.
36
-func NewStreamConfig() *StreamConfig {
37
-	return &StreamConfig{
38
-		stderr: new(broadcaster.Unbuffered),
39
-		stdout: new(broadcaster.Unbuffered),
40
-	}
41
-}
42
-
43
-// Stdout returns the standard output in the configuration.
44
-func (streamConfig *StreamConfig) Stdout() *broadcaster.Unbuffered {
45
-	return streamConfig.stdout
46
-}
47
-
48
-// Stderr returns the standard error in the configuration.
49
-func (streamConfig *StreamConfig) Stderr() *broadcaster.Unbuffered {
50
-	return streamConfig.stderr
51
-}
52
-
53
-// Stdin returns the standard input in the configuration.
54
-func (streamConfig *StreamConfig) Stdin() io.ReadCloser {
55
-	return streamConfig.stdin
56
-}
57
-
58
-// StdinPipe returns an input writer pipe as an io.WriteCloser.
59
-func (streamConfig *StreamConfig) StdinPipe() io.WriteCloser {
60
-	return streamConfig.stdinPipe
61
-}
62
-
63
-// StdoutPipe creates a new io.ReadCloser with an empty bytes pipe.
64
-// It adds this new out pipe to the Stdout broadcaster.
65
-func (streamConfig *StreamConfig) StdoutPipe() io.ReadCloser {
66
-	bytesPipe := ioutils.NewBytesPipe()
67
-	streamConfig.stdout.Add(bytesPipe)
68
-	return bytesPipe
69
-}
70
-
71
-// StderrPipe creates a new io.ReadCloser with an empty bytes pipe.
72
-// It adds this new err pipe to the Stderr broadcaster.
73
-func (streamConfig *StreamConfig) StderrPipe() io.ReadCloser {
74
-	bytesPipe := ioutils.NewBytesPipe()
75
-	streamConfig.stderr.Add(bytesPipe)
76
-	return bytesPipe
77
-}
78
-
79
-// NewInputPipes creates new pipes for both standard inputs, Stdin and StdinPipe.
80
-func (streamConfig *StreamConfig) NewInputPipes() {
81
-	streamConfig.stdin, streamConfig.stdinPipe = io.Pipe()
82
-}
83
-
84
-// NewNopInputPipe creates a new input pipe that will silently drop all messages in the input.
85
-func (streamConfig *StreamConfig) NewNopInputPipe() {
86
-	streamConfig.stdinPipe = ioutils.NopWriteCloser(ioutil.Discard)
87
-}
88
-
89
-// CloseStreams ensures that the configured streams are properly closed.
90
-func (streamConfig *StreamConfig) CloseStreams() error {
91
-	var errors []string
92
-
93
-	if streamConfig.stdin != nil {
94
-		if err := streamConfig.stdin.Close(); err != nil {
95
-			errors = append(errors, fmt.Sprintf("error close stdin: %s", err))
96
-		}
97
-	}
98
-
99
-	if err := streamConfig.stdout.Clean(); err != nil {
100
-		errors = append(errors, fmt.Sprintf("error close stdout: %s", err))
101
-	}
102
-
103
-	if err := streamConfig.stderr.Clean(); err != nil {
104
-		errors = append(errors, fmt.Sprintf("error close stderr: %s", err))
105
-	}
106
-
107
-	if len(errors) > 0 {
108
-		return fmt.Errorf(strings.Join(errors, "\n"))
109
-	}
110
-
111
-	return nil
112
-}
113
-
114
-// CopyToPipe connects streamconfig with a libcontainerd.IOPipe
115
-func (streamConfig *StreamConfig) CopyToPipe(iop libcontainerd.IOPipe) {
116
-	copyFunc := func(w io.Writer, r io.Reader) {
117
-		streamConfig.Add(1)
118
-		go func() {
119
-			if _, err := pools.Copy(w, r); err != nil {
120
-				logrus.Errorf("stream copy error: %+v", err)
121
-			}
122
-			streamConfig.Done()
123
-		}()
124
-	}
125
-
126
-	if iop.Stdout != nil {
127
-		copyFunc(streamConfig.Stdout(), iop.Stdout)
128
-	}
129
-	if iop.Stderr != nil {
130
-		copyFunc(streamConfig.Stderr(), iop.Stderr)
131
-	}
132
-
133
-	if stdin := streamConfig.Stdin(); stdin != nil {
134
-		if iop.Stdin != nil {
135
-			go func() {
136
-				pools.Copy(iop.Stdin, stdin)
137
-				if err := iop.Stdin.Close(); err != nil {
138
-					logrus.Errorf("failed to close stdin: %+v", err)
139
-				}
140
-			}()
141
-		}
142
-	}
143
-}