Browse code

Move `StreamConfig` out of `runconfig`

`StreamConfig` carries with it a dep on libcontainerd, which is used by
other projects, but libcontainerd doesn't compile on all platforms, so
move it to `github.com/docker/docker/container/stream`

Signed-off-by: Brian Goff <cpuguy83@gmail.com>

Brian Goff authored on 2016/11/15 05:15:09
Showing 8 changed files
... ...
@@ -19,6 +19,7 @@ import (
19 19
 	containertypes "github.com/docker/docker/api/types/container"
20 20
 	mounttypes "github.com/docker/docker/api/types/mount"
21 21
 	networktypes "github.com/docker/docker/api/types/network"
22
+	"github.com/docker/docker/container/stream"
22 23
 	"github.com/docker/docker/daemon/exec"
23 24
 	"github.com/docker/docker/daemon/logger"
24 25
 	"github.com/docker/docker/daemon/logger/jsonfilelog"
... ...
@@ -65,7 +66,7 @@ func (DetachError) Error() string {
65 65
 // CommonContainer holds the fields for a container which are
66 66
 // applicable across all platforms supported by the daemon.
67 67
 type CommonContainer struct {
68
-	*runconfig.StreamConfig
68
+	StreamConfig *stream.Config
69 69
 	// embed for Container to support states directly.
70 70
 	*State          `json:"State"` // Needed for remote api version <= 1.11
71 71
 	Root            string         `json:"-"` // Path to the "home" of the container, including metadata.
... ...
@@ -109,7 +110,7 @@ func NewBaseContainer(id, root string) *Container {
109 109
 			ExecCommands:  exec.NewStore(),
110 110
 			Root:          root,
111 111
 			MountPoints:   make(map[string]*volume.MountPoint),
112
-			StreamConfig:  runconfig.NewStreamConfig(),
112
+			StreamConfig:  stream.NewConfig(),
113 113
 			attachContext: &attachContext{},
114 114
 		},
115 115
 	}
... ...
@@ -377,7 +378,7 @@ func (container *Container) Attach(stdin io.ReadCloser, stdout io.Writer, stderr
377 377
 
378 378
 // AttachStreams connects streams to a TTY.
379 379
 // Used by exec too. Should this move somewhere else?
380
-func AttachStreams(ctx context.Context, streamConfig *runconfig.StreamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer, keys []byte) chan error {
380
+func AttachStreams(ctx context.Context, streamConfig *stream.Config, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer, keys []byte) chan error {
381 381
 	var (
382 382
 		cStdout, cStderr io.ReadCloser
383 383
 		cStdin           io.WriteCloser
... ...
@@ -1064,6 +1065,26 @@ func (container *Container) startLogging() error {
1064 1064
 	return nil
1065 1065
 }
1066 1066
 
1067
+// StdinPipe gets the stdin stream of the container
1068
+func (container *Container) StdinPipe() io.WriteCloser {
1069
+	return container.StreamConfig.StdinPipe()
1070
+}
1071
+
1072
+// StdoutPipe gets the stdout stream of the container
1073
+func (container *Container) StdoutPipe() io.ReadCloser {
1074
+	return container.StreamConfig.StdoutPipe()
1075
+}
1076
+
1077
+// StderrPipe gets the stderr stream of the container
1078
+func (container *Container) StderrPipe() io.ReadCloser {
1079
+	return container.StreamConfig.StderrPipe()
1080
+}
1081
+
1082
+// CloseStreams closes the container's stdio streams
1083
+func (container *Container) CloseStreams() error {
1084
+	return container.StreamConfig.CloseStreams()
1085
+}
1086
+
1067 1087
 // InitializeStdio is called by libcontainerd to connect the stdio.
1068 1088
 func (container *Container) InitializeStdio(iop libcontainerd.IOPipe) error {
1069 1089
 	if err := container.startLogging(); err != nil {
... ...
@@ -1073,7 +1094,7 @@ func (container *Container) InitializeStdio(iop libcontainerd.IOPipe) error {
1073 1073
 
1074 1074
 	container.StreamConfig.CopyToPipe(iop)
1075 1075
 
1076
-	if container.Stdin() == nil && !container.Config.Tty {
1076
+	if container.StreamConfig.Stdin() == nil && !container.Config.Tty {
1077 1077
 		if iop.Stdin != nil {
1078 1078
 			if err := iop.Stdin.Close(); err != nil {
1079 1079
 				logrus.Warnf("error closing stdin: %+v", err)
... ...
@@ -23,7 +23,7 @@ func (container *Container) Reset(lock bool) {
23 23
 
24 24
 	// Re-create a brand new stdin pipe once the container exited
25 25
 	if container.Config.OpenStdin {
26
-		container.NewInputPipes()
26
+		container.StreamConfig.NewInputPipes()
27 27
 	}
28 28
 
29 29
 	if container.LogDriver != nil {
30 30
new file mode 100644
... ...
@@ -0,0 +1,143 @@
0
+package stream
1
+
2
+import (
3
+	"fmt"
4
+	"io"
5
+	"io/ioutil"
6
+	"strings"
7
+	"sync"
8
+
9
+	"github.com/Sirupsen/logrus"
10
+	"github.com/docker/docker/libcontainerd"
11
+	"github.com/docker/docker/pkg/broadcaster"
12
+	"github.com/docker/docker/pkg/ioutils"
13
+	"github.com/docker/docker/pkg/pools"
14
+)
15
+
16
+// Config holds information about I/O streams managed together.
17
+//
18
+// config.StdinPipe returns a WriteCloser which can be used to feed data
19
+// to the standard input of the streamConfig's active process.
20
+// config.StdoutPipe and streamConfig.StderrPipe each return a ReadCloser
21
+// which can be used to retrieve the standard output (and error) generated
22
+// by the container's active process. The output (and error) are actually
23
+// copied and delivered to all StdoutPipe and StderrPipe consumers, using
24
+// a kind of "broadcaster".
25
+type Config struct {
26
+	sync.WaitGroup
27
+	stdout    *broadcaster.Unbuffered
28
+	stderr    *broadcaster.Unbuffered
29
+	stdin     io.ReadCloser
30
+	stdinPipe io.WriteCloser
31
+}
32
+
33
+// NewConfig creates a stream config and initializes
34
+// the standard err and standard out to new unbuffered broadcasters.
35
+func NewConfig() *Config {
36
+	return &Config{
37
+		stderr: new(broadcaster.Unbuffered),
38
+		stdout: new(broadcaster.Unbuffered),
39
+	}
40
+}
41
+
42
+// Stdout returns the standard output in the configuration.
43
+func (c *Config) Stdout() *broadcaster.Unbuffered {
44
+	return c.stdout
45
+}
46
+
47
+// Stderr returns the standard error in the configuration.
48
+func (c *Config) Stderr() *broadcaster.Unbuffered {
49
+	return c.stderr
50
+}
51
+
52
+// Stdin returns the standard input in the configuration.
53
+func (c *Config) Stdin() io.ReadCloser {
54
+	return c.stdin
55
+}
56
+
57
+// StdinPipe returns an input writer pipe as an io.WriteCloser.
58
+func (c *Config) StdinPipe() io.WriteCloser {
59
+	return c.stdinPipe
60
+}
61
+
62
+// StdoutPipe creates a new io.ReadCloser with an empty bytes pipe.
63
+// It adds this new out pipe to the Stdout broadcaster.
64
+func (c *Config) StdoutPipe() io.ReadCloser {
65
+	bytesPipe := ioutils.NewBytesPipe()
66
+	c.stdout.Add(bytesPipe)
67
+	return bytesPipe
68
+}
69
+
70
+// StderrPipe creates a new io.ReadCloser with an empty bytes pipe.
71
+// It adds this new err pipe to the Stderr broadcaster.
72
+func (c *Config) StderrPipe() io.ReadCloser {
73
+	bytesPipe := ioutils.NewBytesPipe()
74
+	c.stderr.Add(bytesPipe)
75
+	return bytesPipe
76
+}
77
+
78
+// NewInputPipes creates new pipes for both standard inputs, Stdin and StdinPipe.
79
+func (c *Config) NewInputPipes() {
80
+	c.stdin, c.stdinPipe = io.Pipe()
81
+}
82
+
83
+// NewNopInputPipe creates a new input pipe that will silently drop all messages in the input.
84
+func (c *Config) NewNopInputPipe() {
85
+	c.stdinPipe = ioutils.NopWriteCloser(ioutil.Discard)
86
+}
87
+
88
+// CloseStreams ensures that the configured streams are properly closed.
89
+func (c *Config) CloseStreams() error {
90
+	var errors []string
91
+
92
+	if c.stdin != nil {
93
+		if err := c.stdin.Close(); err != nil {
94
+			errors = append(errors, fmt.Sprintf("error close stdin: %s", err))
95
+		}
96
+	}
97
+
98
+	if err := c.stdout.Clean(); err != nil {
99
+		errors = append(errors, fmt.Sprintf("error close stdout: %s", err))
100
+	}
101
+
102
+	if err := c.stderr.Clean(); err != nil {
103
+		errors = append(errors, fmt.Sprintf("error close stderr: %s", err))
104
+	}
105
+
106
+	if len(errors) > 0 {
107
+		return fmt.Errorf(strings.Join(errors, "\n"))
108
+	}
109
+
110
+	return nil
111
+}
112
+
113
+// CopyToPipe connects streamconfig with a libcontainerd.IOPipe
114
+func (c *Config) CopyToPipe(iop libcontainerd.IOPipe) {
115
+	copyFunc := func(w io.Writer, r io.Reader) {
116
+		c.Add(1)
117
+		go func() {
118
+			if _, err := pools.Copy(w, r); err != nil {
119
+				logrus.Errorf("stream copy error: %+v", err)
120
+			}
121
+			c.Done()
122
+		}()
123
+	}
124
+
125
+	if iop.Stdout != nil {
126
+		copyFunc(c.Stdout(), iop.Stdout)
127
+	}
128
+	if iop.Stderr != nil {
129
+		copyFunc(c.Stderr(), iop.Stderr)
130
+	}
131
+
132
+	if stdin := c.Stdin(); stdin != nil {
133
+		if iop.Stdin != nil {
134
+			go func() {
135
+				pools.Copy(iop.Stdin, stdin)
136
+				if err := iop.Stdin.Close(); err != nil {
137
+					logrus.Errorf("failed to close stdin: %+v", err)
138
+				}
139
+			}()
140
+		}
141
+	}
142
+}
... ...
@@ -91,9 +91,9 @@ func (daemon *Daemon) load(id string) (*container.Container, error) {
91 91
 func (daemon *Daemon) Register(c *container.Container) error {
92 92
 	// Attach to stdout and stderr
93 93
 	if c.Config.OpenStdin {
94
-		c.NewInputPipes()
94
+		c.StreamConfig.NewInputPipes()
95 95
 	} else {
96
-		c.NewNopInputPipe()
96
+		c.StreamConfig.NewNopInputPipe()
97 97
 	}
98 98
 
99 99
 	daemon.containers.Add(c.ID, c)
... ...
@@ -195,9 +195,9 @@ func (d *Daemon) ContainerExecStart(ctx context.Context, name string, stdin io.R
195 195
 	}
196 196
 
197 197
 	if ec.OpenStdin {
198
-		ec.NewInputPipes()
198
+		ec.StreamConfig.NewInputPipes()
199 199
 	} else {
200
-		ec.NewNopInputPipe()
200
+		ec.StreamConfig.NewNopInputPipe()
201 201
 	}
202 202
 
203 203
 	p := libcontainerd.Process{
... ...
@@ -5,9 +5,9 @@ import (
5 5
 	"sync"
6 6
 
7 7
 	"github.com/Sirupsen/logrus"
8
+	"github.com/docker/docker/container/stream"
8 9
 	"github.com/docker/docker/libcontainerd"
9 10
 	"github.com/docker/docker/pkg/stringid"
10
-	"github.com/docker/docker/runconfig"
11 11
 )
12 12
 
13 13
 // Config holds the configurations for execs. The Daemon keeps
... ...
@@ -15,30 +15,30 @@ import (
15 15
 // examined both during and after completion.
16 16
 type Config struct {
17 17
 	sync.Mutex
18
-	*runconfig.StreamConfig
19
-	ID          string
20
-	Running     bool
21
-	ExitCode    *int
22
-	OpenStdin   bool
23
-	OpenStderr  bool
24
-	OpenStdout  bool
25
-	CanRemove   bool
26
-	ContainerID string
27
-	DetachKeys  []byte
28
-	Entrypoint  string
29
-	Args        []string
30
-	Tty         bool
31
-	Privileged  bool
32
-	User        string
33
-	Env         []string
34
-	Pid         int
18
+	StreamConfig *stream.Config
19
+	ID           string
20
+	Running      bool
21
+	ExitCode     *int
22
+	OpenStdin    bool
23
+	OpenStderr   bool
24
+	OpenStdout   bool
25
+	CanRemove    bool
26
+	ContainerID  string
27
+	DetachKeys   []byte
28
+	Entrypoint   string
29
+	Args         []string
30
+	Tty          bool
31
+	Privileged   bool
32
+	User         string
33
+	Env          []string
34
+	Pid          int
35 35
 }
36 36
 
37 37
 // NewConfig initializes the a new exec configuration
38 38
 func NewConfig() *Config {
39 39
 	return &Config{
40 40
 		ID:           stringid.GenerateNonCryptoID(),
41
-		StreamConfig: runconfig.NewStreamConfig(),
41
+		StreamConfig: stream.NewConfig(),
42 42
 	}
43 43
 }
44 44
 
... ...
@@ -46,7 +46,7 @@ func NewConfig() *Config {
46 46
 func (c *Config) InitializeStdio(iop libcontainerd.IOPipe) error {
47 47
 	c.StreamConfig.CopyToPipe(iop)
48 48
 
49
-	if c.Stdin() == nil && !c.Tty && runtime.GOOS == "windows" {
49
+	if c.StreamConfig.Stdin() == nil && !c.Tty && runtime.GOOS == "windows" {
50 50
 		if iop.Stdin != nil {
51 51
 			if err := iop.Stdin.Close(); err != nil {
52 52
 				logrus.Errorf("error closing exec stdin: %+v", err)
... ...
@@ -57,6 +57,11 @@ func (c *Config) InitializeStdio(iop libcontainerd.IOPipe) error {
57 57
 	return nil
58 58
 }
59 59
 
60
+// CloseStreams closes the stdio streams for the exec
61
+func (c *Config) CloseStreams() error {
62
+	return c.StreamConfig.CloseStreams()
63
+}
64
+
60 65
 // Store keeps track of the exec configurations.
61 66
 type Store struct {
62 67
 	commands map[string]*Config
... ...
@@ -39,7 +39,7 @@ func (daemon *Daemon) StateChanged(id string, e libcontainerd.StateInfo) error {
39 39
 		}
40 40
 
41 41
 		c.Lock()
42
-		c.Wait()
42
+		c.StreamConfig.Wait()
43 43
 		c.Reset(false)
44 44
 
45 45
 		restart, wait, err := c.RestartManager().ShouldRestart(e.ExitCode, false, time.Since(c.StartedAt))
... ...
@@ -88,7 +88,7 @@ func (daemon *Daemon) StateChanged(id string, e libcontainerd.StateInfo) error {
88 88
 			defer execConfig.Unlock()
89 89
 			execConfig.ExitCode = &ec
90 90
 			execConfig.Running = false
91
-			execConfig.Wait()
91
+			execConfig.StreamConfig.Wait()
92 92
 			if err := execConfig.CloseStreams(); err != nil {
93 93
 				logrus.Errorf("%s: %s", c.ID, err)
94 94
 			}
95 95
deleted file mode 100644
... ...
@@ -1,143 +0,0 @@
1
-package runconfig
2
-
3
-import (
4
-	"fmt"
5
-	"io"
6
-	"io/ioutil"
7
-	"strings"
8
-	"sync"
9
-
10
-	"github.com/Sirupsen/logrus"
11
-	"github.com/docker/docker/libcontainerd"
12
-	"github.com/docker/docker/pkg/broadcaster"
13
-	"github.com/docker/docker/pkg/ioutils"
14
-	"github.com/docker/docker/pkg/pools"
15
-)
16
-
17
-// StreamConfig holds information about I/O streams managed together.
18
-//
19
-// streamConfig.StdinPipe returns a WriteCloser which can be used to feed data
20
-// to the standard input of the streamConfig's active process.
21
-// streamConfig.StdoutPipe and streamConfig.StderrPipe each return a ReadCloser
22
-// which can be used to retrieve the standard output (and error) generated
23
-// by the container's active process. The output (and error) are actually
24
-// copied and delivered to all StdoutPipe and StderrPipe consumers, using
25
-// a kind of "broadcaster".
26
-type StreamConfig struct {
27
-	sync.WaitGroup
28
-	stdout    *broadcaster.Unbuffered
29
-	stderr    *broadcaster.Unbuffered
30
-	stdin     io.ReadCloser
31
-	stdinPipe io.WriteCloser
32
-}
33
-
34
-// NewStreamConfig creates a stream config and initializes
35
-// the standard err and standard out to new unbuffered broadcasters.
36
-func NewStreamConfig() *StreamConfig {
37
-	return &StreamConfig{
38
-		stderr: new(broadcaster.Unbuffered),
39
-		stdout: new(broadcaster.Unbuffered),
40
-	}
41
-}
42
-
43
-// Stdout returns the standard output in the configuration.
44
-func (streamConfig *StreamConfig) Stdout() *broadcaster.Unbuffered {
45
-	return streamConfig.stdout
46
-}
47
-
48
-// Stderr returns the standard error in the configuration.
49
-func (streamConfig *StreamConfig) Stderr() *broadcaster.Unbuffered {
50
-	return streamConfig.stderr
51
-}
52
-
53
-// Stdin returns the standard input in the configuration.
54
-func (streamConfig *StreamConfig) Stdin() io.ReadCloser {
55
-	return streamConfig.stdin
56
-}
57
-
58
-// StdinPipe returns an input writer pipe as an io.WriteCloser.
59
-func (streamConfig *StreamConfig) StdinPipe() io.WriteCloser {
60
-	return streamConfig.stdinPipe
61
-}
62
-
63
-// StdoutPipe creates a new io.ReadCloser with an empty bytes pipe.
64
-// It adds this new out pipe to the Stdout broadcaster.
65
-func (streamConfig *StreamConfig) StdoutPipe() io.ReadCloser {
66
-	bytesPipe := ioutils.NewBytesPipe()
67
-	streamConfig.stdout.Add(bytesPipe)
68
-	return bytesPipe
69
-}
70
-
71
-// StderrPipe creates a new io.ReadCloser with an empty bytes pipe.
72
-// It adds this new err pipe to the Stderr broadcaster.
73
-func (streamConfig *StreamConfig) StderrPipe() io.ReadCloser {
74
-	bytesPipe := ioutils.NewBytesPipe()
75
-	streamConfig.stderr.Add(bytesPipe)
76
-	return bytesPipe
77
-}
78
-
79
-// NewInputPipes creates new pipes for both standard inputs, Stdin and StdinPipe.
80
-func (streamConfig *StreamConfig) NewInputPipes() {
81
-	streamConfig.stdin, streamConfig.stdinPipe = io.Pipe()
82
-}
83
-
84
-// NewNopInputPipe creates a new input pipe that will silently drop all messages in the input.
85
-func (streamConfig *StreamConfig) NewNopInputPipe() {
86
-	streamConfig.stdinPipe = ioutils.NopWriteCloser(ioutil.Discard)
87
-}
88
-
89
-// CloseStreams ensures that the configured streams are properly closed.
90
-func (streamConfig *StreamConfig) CloseStreams() error {
91
-	var errors []string
92
-
93
-	if streamConfig.stdin != nil {
94
-		if err := streamConfig.stdin.Close(); err != nil {
95
-			errors = append(errors, fmt.Sprintf("error close stdin: %s", err))
96
-		}
97
-	}
98
-
99
-	if err := streamConfig.stdout.Clean(); err != nil {
100
-		errors = append(errors, fmt.Sprintf("error close stdout: %s", err))
101
-	}
102
-
103
-	if err := streamConfig.stderr.Clean(); err != nil {
104
-		errors = append(errors, fmt.Sprintf("error close stderr: %s", err))
105
-	}
106
-
107
-	if len(errors) > 0 {
108
-		return fmt.Errorf(strings.Join(errors, "\n"))
109
-	}
110
-
111
-	return nil
112
-}
113
-
114
-// CopyToPipe connects streamconfig with a libcontainerd.IOPipe
115
-func (streamConfig *StreamConfig) CopyToPipe(iop libcontainerd.IOPipe) {
116
-	copyFunc := func(w io.Writer, r io.Reader) {
117
-		streamConfig.Add(1)
118
-		go func() {
119
-			if _, err := pools.Copy(w, r); err != nil {
120
-				logrus.Errorf("stream copy error: %+v", err)
121
-			}
122
-			streamConfig.Done()
123
-		}()
124
-	}
125
-
126
-	if iop.Stdout != nil {
127
-		copyFunc(streamConfig.Stdout(), iop.Stdout)
128
-	}
129
-	if iop.Stderr != nil {
130
-		copyFunc(streamConfig.Stderr(), iop.Stderr)
131
-	}
132
-
133
-	if stdin := streamConfig.Stdin(); stdin != nil {
134
-		if iop.Stdin != nil {
135
-			go func() {
136
-				pools.Copy(iop.Stdin, stdin)
137
-				if err := iop.Stdin.Close(); err != nil {
138
-					logrus.Errorf("failed to close stdin: %+v", err)
139
-				}
140
-			}()
141
-		}
142
-	}
143
-}