Browse code

Merge pull request #9878 from ncdc/pick-terminal-resizing

Merged by openshift-bot

OpenShift Bot authored on 2016/07/24 15:51:10
Showing 45 changed files
... ...
@@ -219,7 +219,11 @@ func (h *binaryInstantiateHandler) handle(r io.Reader) (runtime.Object, error) {
219 219
 	if err != nil {
220 220
 		return nil, errors.NewInternalError(fmt.Errorf("unable to connect to server: %v", err))
221 221
 	}
222
-	if err := exec.Stream(kubeletremotecommand.SupportedStreamingProtocols, r, nil, nil, false); err != nil {
222
+	streamOptions := remotecommand.StreamOptions{
223
+		SupportedProtocols: kubeletremotecommand.SupportedStreamingProtocols,
224
+		Stdin:              r,
225
+	}
226
+	if err := exec.Stream(streamOptions); err != nil {
223 227
 		return nil, errors.NewInternalError(err)
224 228
 	}
225 229
 	return latest, nil
... ...
@@ -104,11 +104,13 @@ func NewCmdDebug(fullName string, f *clientcmd.Factory, in io.Reader, out, errou
104 104
 	options := &DebugOptions{
105 105
 		Timeout: 15 * time.Minute,
106 106
 		Attach: kcmd.AttachOptions{
107
-			In:    in,
108
-			Out:   out,
109
-			Err:   errout,
110
-			TTY:   true,
111
-			Stdin: true,
107
+			StreamOptions: kcmd.StreamOptions{
108
+				In:    in,
109
+				Out:   out,
110
+				Err:   errout,
111
+				TTY:   true,
112
+				Stdin: true,
113
+			},
112 114
 
113 115
 			Attach: &kcmd.DefaultRemoteAttach{},
114 116
 		},
... ...
@@ -59,12 +59,14 @@ func NewCmdRsh(name string, parent string, f *clientcmd.Factory, in io.Reader, o
59 59
 		ForceTTY:   false,
60 60
 		DisableTTY: false,
61 61
 		ExecOptions: &kubecmd.ExecOptions{
62
-			In:  in,
63
-			Out: out,
64
-			Err: err,
65
-
66
-			TTY:   true,
67
-			Stdin: true,
62
+			StreamOptions: kubecmd.StreamOptions{
63
+				In:  in,
64
+				Out: out,
65
+				Err: err,
66
+
67
+				TTY:   true,
68
+				Stdin: true,
69
+			},
68 70
 
69 71
 			Executor: &kubecmd.DefaultRemoteExecutor{},
70 72
 		},
... ...
@@ -28,17 +28,19 @@ var _ executor = &remoteExecutor{}
28 28
 func (e *remoteExecutor) Execute(command []string, in io.Reader, out, errOut io.Writer) error {
29 29
 	glog.V(3).Infof("Remote executor running command: %s", strings.Join(command, " "))
30 30
 	execOptions := &kubecmd.ExecOptions{
31
-		In:            in,
32
-		Out:           out,
33
-		Err:           errOut,
34
-		Stdin:         in != nil,
35
-		Executor:      &kubecmd.DefaultRemoteExecutor{},
36
-		Client:        e.Client,
37
-		Config:        e.Config,
38
-		PodName:       e.PodName,
39
-		ContainerName: e.ContainerName,
40
-		Namespace:     e.Namespace,
41
-		Command:       command,
31
+		StreamOptions: kubecmd.StreamOptions{
32
+			Namespace:     e.Namespace,
33
+			PodName:       e.PodName,
34
+			ContainerName: e.ContainerName,
35
+			In:            in,
36
+			Out:           out,
37
+			Err:           errOut,
38
+			Stdin:         in != nil,
39
+		},
40
+		Executor: &kubecmd.DefaultRemoteExecutor{},
41
+		Client:   e.Client,
42
+		Config:   e.Config,
43
+		Command:  command,
42 44
 	}
43 45
 	err := execOptions.Validate()
44 46
 	if err != nil {
... ...
@@ -2751,6 +2751,8 @@ const (
2751 2751
 	StreamTypeData = "data"
2752 2752
 	// Value for streamType header for error stream
2753 2753
 	StreamTypeError = "error"
2754
+	// Value for streamType header for terminal resize stream
2755
+	StreamTypeResize = "resize"
2754 2756
 
2755 2757
 	// Name of header that specifies the port being forwarded
2756 2758
 	PortHeader = "port"
... ...
@@ -29,15 +29,28 @@ import (
29 29
 	"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
30 30
 	"k8s.io/kubernetes/pkg/util/httpstream"
31 31
 	"k8s.io/kubernetes/pkg/util/httpstream/spdy"
32
+	"k8s.io/kubernetes/pkg/util/term"
32 33
 )
33 34
 
35
+// StreamOptions holds information pertaining to the current streaming session: supported stream
36
+// protocols, input/output streams, if the client is requesting a TTY, and a terminal size queue to
37
+// support terminal resizing.
38
+type StreamOptions struct {
39
+	SupportedProtocols []string
40
+	Stdin              io.Reader
41
+	Stdout             io.Writer
42
+	Stderr             io.Writer
43
+	Tty                bool
44
+	TerminalSizeQueue  term.TerminalSizeQueue
45
+}
46
+
34 47
 // Executor is an interface for transporting shell-style streams.
35 48
 type Executor interface {
36 49
 	// Stream initiates the transport of the standard shell streams. It will transport any
37 50
 	// non-nil stream to a remote system, and return an error if a problem occurs. If tty
38 51
 	// is set, the stderr stream is not used (raw TTY manages stdout and stderr over the
39 52
 	// stdout stream).
40
-	Stream(supportedProtocols []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) error
53
+	Stream(options StreamOptions) error
41 54
 }
42 55
 
43 56
 // StreamExecutor supports the ability to dial an httpstream connection and the ability to
... ...
@@ -129,14 +142,18 @@ func (e *streamExecutor) Dial(protocols ...string) (httpstream.Connection, strin
129 129
 	return conn, resp.Header.Get(httpstream.HeaderProtocolVersion), nil
130 130
 }
131 131
 
132
+type streamCreator interface {
133
+	CreateStream(headers http.Header) (httpstream.Stream, error)
134
+}
135
+
132 136
 type streamProtocolHandler interface {
133
-	stream(httpstream.Connection) error
137
+	stream(conn streamCreator) error
134 138
 }
135 139
 
136 140
 // Stream opens a protocol streamer to the server and streams until a client closes
137 141
 // the connection or the server disconnects.
138
-func (e *streamExecutor) Stream(supportedProtocols []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
139
-	conn, protocol, err := e.Dial(supportedProtocols...)
142
+func (e *streamExecutor) Stream(options StreamOptions) error {
143
+	conn, protocol, err := e.Dial(options.SupportedProtocols...)
140 144
 	if err != nil {
141 145
 		return err
142 146
 	}
... ...
@@ -145,23 +162,15 @@ func (e *streamExecutor) Stream(supportedProtocols []string, stdin io.Reader, st
145 145
 	var streamer streamProtocolHandler
146 146
 
147 147
 	switch protocol {
148
+	case remotecommand.StreamProtocolV3Name:
149
+		streamer = newStreamProtocolV3(options)
148 150
 	case remotecommand.StreamProtocolV2Name:
149
-		streamer = &streamProtocolV2{
150
-			stdin:  stdin,
151
-			stdout: stdout,
152
-			stderr: stderr,
153
-			tty:    tty,
154
-		}
151
+		streamer = newStreamProtocolV2(options)
155 152
 	case "":
156 153
 		glog.V(4).Infof("The server did not negotiate a streaming protocol version. Falling back to %s", remotecommand.StreamProtocolV1Name)
157 154
 		fallthrough
158 155
 	case remotecommand.StreamProtocolV1Name:
159
-		streamer = &streamProtocolV1{
160
-			stdin:  stdin,
161
-			stdout: stdout,
162
-			stderr: stderr,
163
-			tty:    tty,
164
-		}
156
+		streamer = newStreamProtocolV1(options)
165 157
 	}
166 158
 
167 159
 	return streamer.stream(conn)
... ...
@@ -36,6 +36,7 @@ import (
36 36
 	"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
37 37
 	"k8s.io/kubernetes/pkg/types"
38 38
 	"k8s.io/kubernetes/pkg/util/httpstream"
39
+	"k8s.io/kubernetes/pkg/util/term"
39 40
 )
40 41
 
41 42
 type fakeExecutor struct {
... ...
@@ -52,11 +53,11 @@ type fakeExecutor struct {
52 52
 	exec          bool
53 53
 }
54 54
 
55
-func (ex *fakeExecutor) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error {
55
+func (ex *fakeExecutor) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error {
56 56
 	return ex.run(name, uid, container, cmd, in, out, err, tty)
57 57
 }
58 58
 
59
-func (ex *fakeExecutor) AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool) error {
59
+func (ex *fakeExecutor) AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error {
60 60
 	return ex.run(name, uid, container, nil, in, out, err, tty)
61 61
 }
62 62
 
... ...
@@ -253,7 +254,13 @@ func TestStream(t *testing.T) {
253 253
 				t.Errorf("%s: unexpected error: %v", name, err)
254 254
 				continue
255 255
 			}
256
-			err = e.Stream(testCase.ClientProtocols, streamIn, streamOut, streamErr, testCase.Tty)
256
+			err = e.Stream(StreamOptions{
257
+				SupportedProtocols: testCase.ClientProtocols,
258
+				Stdin:              streamIn,
259
+				Stdout:             streamOut,
260
+				Stderr:             streamErr,
261
+				Tty:                testCase.Tty,
262
+			})
257 263
 			hasErr := err != nil
258 264
 
259 265
 			if len(testCase.Error) > 0 {
... ...
@@ -277,13 +284,13 @@ func TestStream(t *testing.T) {
277 277
 
278 278
 			if len(testCase.Stdout) > 0 {
279 279
 				if e, a := strings.Repeat(testCase.Stdout, testCase.MessageCount), localOut; e != a.String() {
280
-					t.Errorf("%s: expected stdout data '%s', got '%s'", name, e, a)
280
+					t.Errorf("%s: expected stdout data %q, got %q", name, e, a)
281 281
 				}
282 282
 			}
283 283
 
284 284
 			if testCase.Stderr != "" {
285 285
 				if e, a := strings.Repeat(testCase.Stderr, testCase.MessageCount), localErr; e != a.String() {
286
-					t.Errorf("%s: expected stderr data '%s', got '%s'", name, e, a)
286
+					t.Errorf("%s: expected stderr data %q, got %q", name, e, a)
287 287
 				}
288 288
 			}
289 289
 
... ...
@@ -28,19 +28,27 @@ import (
28 28
 )
29 29
 
30 30
 // streamProtocolV1 implements the first version of the streaming exec & attach
31
-// protocol. This version has some bugs, such as not being able to detecte when
31
+// protocol. This version has some bugs, such as not being able to detect when
32 32
 // non-interactive stdin data has ended. See http://issues.k8s.io/13394 and
33 33
 // http://issues.k8s.io/13395 for more details.
34 34
 type streamProtocolV1 struct {
35
-	stdin  io.Reader
36
-	stdout io.Writer
37
-	stderr io.Writer
38
-	tty    bool
35
+	StreamOptions
36
+
37
+	errorStream  httpstream.Stream
38
+	remoteStdin  httpstream.Stream
39
+	remoteStdout httpstream.Stream
40
+	remoteStderr httpstream.Stream
39 41
 }
40 42
 
41 43
 var _ streamProtocolHandler = &streamProtocolV1{}
42 44
 
43
-func (e *streamProtocolV1) stream(conn httpstream.Connection) error {
45
+func newStreamProtocolV1(options StreamOptions) streamProtocolHandler {
46
+	return &streamProtocolV1{
47
+		StreamOptions: options,
48
+	}
49
+}
50
+
51
+func (p *streamProtocolV1) stream(conn streamCreator) error {
44 52
 	doneChan := make(chan struct{}, 2)
45 53
 	errorChan := make(chan error)
46 54
 
... ...
@@ -55,19 +63,15 @@ func (e *streamProtocolV1) stream(conn httpstream.Connection) error {
55 55
 		}
56 56
 	}
57 57
 
58
-	var (
59
-		err                                                  error
60
-		errorStream, remoteStdin, remoteStdout, remoteStderr httpstream.Stream
61
-	)
62
-
63 58
 	// set up all the streams first
59
+	var err error
64 60
 	headers := http.Header{}
65 61
 	headers.Set(api.StreamType, api.StreamTypeError)
66
-	errorStream, err = conn.CreateStream(headers)
62
+	p.errorStream, err = conn.CreateStream(headers)
67 63
 	if err != nil {
68 64
 		return err
69 65
 	}
70
-	defer errorStream.Reset()
66
+	defer p.errorStream.Reset()
71 67
 
72 68
 	// Create all the streams first, then start the copy goroutines. The server doesn't start its copy
73 69
 	// goroutines until it's received all of the streams. If the client creates the stdin stream and
... ...
@@ -76,38 +80,38 @@ func (e *streamProtocolV1) stream(conn httpstream.Connection) error {
76 76
 	// getting processed because the server hasn't started its copying, and it won't do that until it
77 77
 	// gets all the streams. By creating all the streams first, we ensure that the server is ready to
78 78
 	// process data before the client starts sending any. See https://issues.k8s.io/16373 for more info.
79
-	if e.stdin != nil {
79
+	if p.Stdin != nil {
80 80
 		headers.Set(api.StreamType, api.StreamTypeStdin)
81
-		remoteStdin, err = conn.CreateStream(headers)
81
+		p.remoteStdin, err = conn.CreateStream(headers)
82 82
 		if err != nil {
83 83
 			return err
84 84
 		}
85
-		defer remoteStdin.Reset()
85
+		defer p.remoteStdin.Reset()
86 86
 	}
87 87
 
88
-	if e.stdout != nil {
88
+	if p.Stdout != nil {
89 89
 		headers.Set(api.StreamType, api.StreamTypeStdout)
90
-		remoteStdout, err = conn.CreateStream(headers)
90
+		p.remoteStdout, err = conn.CreateStream(headers)
91 91
 		if err != nil {
92 92
 			return err
93 93
 		}
94
-		defer remoteStdout.Reset()
94
+		defer p.remoteStdout.Reset()
95 95
 	}
96 96
 
97
-	if e.stderr != nil && !e.tty {
97
+	if p.Stderr != nil && !p.Tty {
98 98
 		headers.Set(api.StreamType, api.StreamTypeStderr)
99
-		remoteStderr, err = conn.CreateStream(headers)
99
+		p.remoteStderr, err = conn.CreateStream(headers)
100 100
 		if err != nil {
101 101
 			return err
102 102
 		}
103
-		defer remoteStderr.Reset()
103
+		defer p.remoteStderr.Reset()
104 104
 	}
105 105
 
106 106
 	// now that all the streams have been created, proceed with reading & copying
107 107
 
108 108
 	// always read from errorStream
109 109
 	go func() {
110
-		message, err := ioutil.ReadAll(errorStream)
110
+		message, err := ioutil.ReadAll(p.errorStream)
111 111
 		if err != nil && err != io.EOF {
112 112
 			errorChan <- fmt.Errorf("Error reading from error stream: %s", err)
113 113
 			return
... ...
@@ -118,25 +122,25 @@ func (e *streamProtocolV1) stream(conn httpstream.Connection) error {
118 118
 		}
119 119
 	}()
120 120
 
121
-	if e.stdin != nil {
121
+	if p.Stdin != nil {
122 122
 		// TODO this goroutine will never exit cleanly (the io.Copy never unblocks)
123 123
 		// because stdin is not closed until the process exits. If we try to call
124 124
 		// stdin.Close(), it returns no error but doesn't unblock the copy. It will
125 125
 		// exit when the process exits, instead.
126
-		go cp(api.StreamTypeStdin, remoteStdin, e.stdin)
126
+		go cp(api.StreamTypeStdin, p.remoteStdin, p.Stdin)
127 127
 	}
128 128
 
129 129
 	waitCount := 0
130 130
 	completedStreams := 0
131 131
 
132
-	if e.stdout != nil {
132
+	if p.Stdout != nil {
133 133
 		waitCount++
134
-		go cp(api.StreamTypeStdout, e.stdout, remoteStdout)
134
+		go cp(api.StreamTypeStdout, p.Stdout, p.remoteStdout)
135 135
 	}
136 136
 
137
-	if e.stderr != nil && !e.tty {
137
+	if p.Stderr != nil && !p.Tty {
138 138
 		waitCount++
139
-		go cp(api.StreamTypeStderr, e.stderr, remoteStderr)
139
+		go cp(api.StreamTypeStderr, p.Stderr, p.remoteStderr)
140 140
 	}
141 141
 
142 142
 Loop:
... ...
@@ -24,7 +24,6 @@ import (
24 24
 	"sync"
25 25
 
26 26
 	"k8s.io/kubernetes/pkg/api"
27
-	"k8s.io/kubernetes/pkg/util/httpstream"
28 27
 	"k8s.io/kubernetes/pkg/util/runtime"
29 28
 )
30 29
 
... ...
@@ -33,63 +32,69 @@ import (
33 33
 // version is referred to as version 2, even though it is the first actual
34 34
 // numbered version.
35 35
 type streamProtocolV2 struct {
36
-	stdin  io.Reader
37
-	stdout io.Writer
38
-	stderr io.Writer
39
-	tty    bool
36
+	StreamOptions
37
+
38
+	errorStream  io.Reader
39
+	remoteStdin  io.ReadWriteCloser
40
+	remoteStdout io.Reader
41
+	remoteStderr io.Reader
40 42
 }
41 43
 
42 44
 var _ streamProtocolHandler = &streamProtocolV2{}
43 45
 
44
-func (e *streamProtocolV2) stream(conn httpstream.Connection) error {
45
-	var (
46
-		err                                                  error
47
-		errorStream, remoteStdin, remoteStdout, remoteStderr httpstream.Stream
48
-	)
46
+func newStreamProtocolV2(options StreamOptions) streamProtocolHandler {
47
+	return &streamProtocolV2{
48
+		StreamOptions: options,
49
+	}
50
+}
49 51
 
52
+func (p *streamProtocolV2) createStreams(conn streamCreator) error {
53
+	var err error
50 54
 	headers := http.Header{}
51 55
 
52
-	// set up all the streams first
53 56
 	// set up error stream
54
-	errorChan := make(chan error)
55 57
 	headers.Set(api.StreamType, api.StreamTypeError)
56
-	errorStream, err = conn.CreateStream(headers)
58
+	p.errorStream, err = conn.CreateStream(headers)
57 59
 	if err != nil {
58 60
 		return err
59 61
 	}
60 62
 
61 63
 	// set up stdin stream
62
-	if e.stdin != nil {
64
+	if p.Stdin != nil {
63 65
 		headers.Set(api.StreamType, api.StreamTypeStdin)
64
-		remoteStdin, err = conn.CreateStream(headers)
66
+		p.remoteStdin, err = conn.CreateStream(headers)
65 67
 		if err != nil {
66 68
 			return err
67 69
 		}
68 70
 	}
69 71
 
70 72
 	// set up stdout stream
71
-	if e.stdout != nil {
73
+	if p.Stdout != nil {
72 74
 		headers.Set(api.StreamType, api.StreamTypeStdout)
73
-		remoteStdout, err = conn.CreateStream(headers)
75
+		p.remoteStdout, err = conn.CreateStream(headers)
74 76
 		if err != nil {
75 77
 			return err
76 78
 		}
77 79
 	}
78 80
 
79 81
 	// set up stderr stream
80
-	if e.stderr != nil && !e.tty {
82
+	if p.Stderr != nil && !p.Tty {
81 83
 		headers.Set(api.StreamType, api.StreamTypeStderr)
82
-		remoteStderr, err = conn.CreateStream(headers)
84
+		p.remoteStderr, err = conn.CreateStream(headers)
83 85
 		if err != nil {
84 86
 			return err
85 87
 		}
86 88
 	}
89
+	return nil
90
+}
87 91
 
88
-	// now that all the streams have been created, proceed with reading & copying
92
+func (p *streamProtocolV2) setupErrorStreamReading() chan error {
93
+	errorChan := make(chan error)
89 94
 
90
-	// always read from errorStream
91 95
 	go func() {
92
-		message, err := ioutil.ReadAll(errorStream)
96
+		defer runtime.HandleCrash()
97
+
98
+		message, err := ioutil.ReadAll(p.errorStream)
93 99
 		switch {
94 100
 		case err != nil && err != io.EOF:
95 101
 			errorChan <- fmt.Errorf("error reading from error stream: %s", err)
... ...
@@ -101,18 +106,23 @@ func (e *streamProtocolV2) stream(conn httpstream.Connection) error {
101 101
 		close(errorChan)
102 102
 	}()
103 103
 
104
-	var wg sync.WaitGroup
105
-	var once sync.Once
104
+	return errorChan
105
+}
106
+
107
+func (p *streamProtocolV2) copyStdin() {
108
+	if p.Stdin != nil {
109
+		var once sync.Once
106 110
 
107
-	if e.stdin != nil {
108 111
 		// copy from client's stdin to container's stdin
109 112
 		go func() {
110
-			// if e.stdin is noninteractive, e.g. `echo abc | kubectl exec -i <pod> -- cat`, make sure
111
-			// we close remoteStdin as soon as the copy from e.stdin to remoteStdin finishes. Otherwise
113
+			defer runtime.HandleCrash()
114
+
115
+			// if p.stdin is noninteractive, p.g. `echo abc | kubectl exec -i <pod> -- cat`, make sure
116
+			// we close remoteStdin as soon as the copy from p.stdin to remoteStdin finishes. Otherwise
112 117
 			// the executed command will remain running.
113
-			defer once.Do(func() { remoteStdin.Close() })
118
+			defer once.Do(func() { p.remoteStdin.Close() })
114 119
 
115
-			if _, err := io.Copy(remoteStdin, e.stdin); err != nil {
120
+			if _, err := io.Copy(p.remoteStdin, p.Stdin); err != nil {
116 121
 				runtime.HandleError(err)
117 122
 			}
118 123
 		}()
... ...
@@ -121,6 +131,9 @@ func (e *streamProtocolV2) stream(conn httpstream.Connection) error {
121 121
 		// be able to exit interactive sessions cleanly and not leak goroutines or
122 122
 		// hang the client's terminal.
123 123
 		//
124
+		// TODO we aren't using go-dockerclient any more; revisit this to determine if it's still
125
+		// required by engine-api.
126
+		//
124 127
 		// go-dockerclient's current hijack implementation
125 128
 		// (https://github.com/fsouza/go-dockerclient/blob/89f3d56d93788dfe85f864a44f85d9738fca0670/client.go#L564)
126 129
 		// waits for all three streams (stdin/stdout/stderr) to finish copying
... ...
@@ -129,35 +142,65 @@ func (e *streamProtocolV2) stream(conn httpstream.Connection) error {
129 129
 		// When that happens, we must Close() on our side of remoteStdin, to
130 130
 		// allow the copy in hijack to complete, and hijack to return.
131 131
 		go func() {
132
-			defer once.Do(func() { remoteStdin.Close() })
132
+			defer runtime.HandleCrash()
133
+			defer once.Do(func() { p.remoteStdin.Close() })
134
+
133 135
 			// this "copy" doesn't actually read anything - it's just here to wait for
134 136
 			// the server to close remoteStdin.
135
-			if _, err := io.Copy(ioutil.Discard, remoteStdin); err != nil {
137
+			if _, err := io.Copy(ioutil.Discard, p.remoteStdin); err != nil {
136 138
 				runtime.HandleError(err)
137 139
 			}
138 140
 		}()
139 141
 	}
142
+}
140 143
 
141
-	if e.stdout != nil {
142
-		wg.Add(1)
143
-		go func() {
144
-			defer wg.Done()
145
-			if _, err := io.Copy(e.stdout, remoteStdout); err != nil {
146
-				runtime.HandleError(err)
147
-			}
148
-		}()
144
+func (p *streamProtocolV2) copyStdout(wg *sync.WaitGroup) {
145
+	if p.Stdout == nil {
146
+		return
149 147
 	}
150 148
 
151
-	if e.stderr != nil && !e.tty {
152
-		wg.Add(1)
153
-		go func() {
154
-			defer wg.Done()
155
-			if _, err := io.Copy(e.stderr, remoteStderr); err != nil {
156
-				runtime.HandleError(err)
157
-			}
158
-		}()
149
+	wg.Add(1)
150
+	go func() {
151
+		defer runtime.HandleCrash()
152
+		defer wg.Done()
153
+
154
+		if _, err := io.Copy(p.Stdout, p.remoteStdout); err != nil {
155
+			runtime.HandleError(err)
156
+		}
157
+	}()
158
+}
159
+
160
+func (p *streamProtocolV2) copyStderr(wg *sync.WaitGroup) {
161
+	if p.Stderr == nil || p.Tty {
162
+		return
163
+	}
164
+
165
+	wg.Add(1)
166
+	go func() {
167
+		defer runtime.HandleCrash()
168
+		defer wg.Done()
169
+
170
+		if _, err := io.Copy(p.Stderr, p.remoteStderr); err != nil {
171
+			runtime.HandleError(err)
172
+		}
173
+	}()
174
+}
175
+
176
+func (p *streamProtocolV2) stream(conn streamCreator) error {
177
+	if err := p.createStreams(conn); err != nil {
178
+		return err
159 179
 	}
160 180
 
181
+	// now that all the streams have been created, proceed with reading & copying
182
+
183
+	errorChan := p.setupErrorStreamReading()
184
+
185
+	p.copyStdin()
186
+
187
+	var wg sync.WaitGroup
188
+	p.copyStdout(&wg)
189
+	p.copyStderr(&wg)
190
+
161 191
 	// we're waiting for stdout/stderr to finish copying
162 192
 	wg.Wait()
163 193
 
164 194
new file mode 100644
... ...
@@ -0,0 +1,228 @@
0
+/*
1
+Copyright 2016 The Kubernetes Authors.
2
+
3
+Licensed under the Apache License, Version 2.0 (the "License");
4
+you may not use this file except in compliance with the License.
5
+You may obtain a copy of the License at
6
+
7
+    http://www.apache.org/licenses/LICENSE-2.0
8
+
9
+Unless required by applicable law or agreed to in writing, software
10
+distributed under the License is distributed on an "AS IS" BASIS,
11
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
+See the License for the specific language governing permissions and
13
+limitations under the License.
14
+*/
15
+
16
+package remotecommand
17
+
18
+import (
19
+	"errors"
20
+	"io"
21
+	"net/http"
22
+	"strings"
23
+	"testing"
24
+	"time"
25
+
26
+	"k8s.io/kubernetes/pkg/api"
27
+	"k8s.io/kubernetes/pkg/util/httpstream"
28
+	"k8s.io/kubernetes/pkg/util/wait"
29
+)
30
+
31
+type fakeReader struct {
32
+	err error
33
+}
34
+
35
+func (r *fakeReader) Read([]byte) (int, error) { return 0, r.err }
36
+
37
+type fakeWriter struct{}
38
+
39
+func (*fakeWriter) Write([]byte) (int, error) { return 0, nil }
40
+
41
+type fakeStreamCreator struct {
42
+	created map[string]bool
43
+	errors  map[string]error
44
+}
45
+
46
+var _ streamCreator = &fakeStreamCreator{}
47
+
48
+func (f *fakeStreamCreator) CreateStream(headers http.Header) (httpstream.Stream, error) {
49
+	streamType := headers.Get(api.StreamType)
50
+	f.created[streamType] = true
51
+	return nil, f.errors[streamType]
52
+}
53
+
54
+func TestV2CreateStreams(t *testing.T) {
55
+	tests := []struct {
56
+		name        string
57
+		stdin       bool
58
+		stdinError  error
59
+		stdout      bool
60
+		stdoutError error
61
+		stderr      bool
62
+		stderrError error
63
+		errorError  error
64
+		tty         bool
65
+		expectError bool
66
+	}{
67
+		{
68
+			name:        "stdin error",
69
+			stdin:       true,
70
+			stdinError:  errors.New("stdin error"),
71
+			expectError: true,
72
+		},
73
+		{
74
+			name:        "stdout error",
75
+			stdout:      true,
76
+			stdoutError: errors.New("stdout error"),
77
+			expectError: true,
78
+		},
79
+		{
80
+			name:        "stderr error",
81
+			stderr:      true,
82
+			stderrError: errors.New("stderr error"),
83
+			expectError: true,
84
+		},
85
+		{
86
+			name:        "error stream error",
87
+			stdin:       true,
88
+			stdout:      true,
89
+			stderr:      true,
90
+			errorError:  errors.New("error stream error"),
91
+			expectError: true,
92
+		},
93
+		{
94
+			name:        "no errors",
95
+			stdin:       true,
96
+			stdout:      true,
97
+			stderr:      true,
98
+			expectError: false,
99
+		},
100
+		{
101
+			name:        "no errors, stderr & tty set, don't expect stderr",
102
+			stdin:       true,
103
+			stdout:      true,
104
+			stderr:      true,
105
+			tty:         true,
106
+			expectError: false,
107
+		},
108
+	}
109
+	for _, test := range tests {
110
+		conn := &fakeStreamCreator{
111
+			created: make(map[string]bool),
112
+			errors: map[string]error{
113
+				api.StreamTypeStdin:  test.stdinError,
114
+				api.StreamTypeStdout: test.stdoutError,
115
+				api.StreamTypeStderr: test.stderrError,
116
+				api.StreamTypeError:  test.errorError,
117
+			},
118
+		}
119
+
120
+		opts := StreamOptions{Tty: test.tty}
121
+		if test.stdin {
122
+			opts.Stdin = &fakeReader{}
123
+		}
124
+		if test.stdout {
125
+			opts.Stdout = &fakeWriter{}
126
+		}
127
+		if test.stderr {
128
+			opts.Stderr = &fakeWriter{}
129
+		}
130
+
131
+		h := newStreamProtocolV2(opts).(*streamProtocolV2)
132
+		err := h.createStreams(conn)
133
+
134
+		if test.expectError {
135
+			if err == nil {
136
+				t.Errorf("%s: expected error", test.name)
137
+				continue
138
+			}
139
+			if e, a := test.stdinError, err; test.stdinError != nil && e != a {
140
+				t.Errorf("%s: expected %v, got %v", test.name, e, a)
141
+			}
142
+			if e, a := test.stdoutError, err; test.stdoutError != nil && e != a {
143
+				t.Errorf("%s: expected %v, got %v", test.name, e, a)
144
+			}
145
+			if e, a := test.stderrError, err; test.stderrError != nil && e != a {
146
+				t.Errorf("%s: expected %v, got %v", test.name, e, a)
147
+			}
148
+			if e, a := test.errorError, err; test.errorError != nil && e != a {
149
+				t.Errorf("%s: expected %v, got %v", test.name, e, a)
150
+			}
151
+			continue
152
+		}
153
+
154
+		if !test.expectError && err != nil {
155
+			t.Errorf("%s: unexpected error: %v", test.name, err)
156
+			continue
157
+		}
158
+
159
+		if test.stdin && !conn.created[api.StreamTypeStdin] {
160
+			t.Errorf("%s: expected stdin stream", test.name)
161
+		}
162
+		if test.stdout && !conn.created[api.StreamTypeStdout] {
163
+			t.Errorf("%s: expected stdout stream", test.name)
164
+		}
165
+		if test.stderr {
166
+			if test.tty && conn.created[api.StreamTypeStderr] {
167
+				t.Errorf("%s: unexpected stderr stream because tty is set", test.name)
168
+			} else if !test.tty && !conn.created[api.StreamTypeStderr] {
169
+				t.Errorf("%s: expected stderr stream", test.name)
170
+			}
171
+		}
172
+		if !conn.created[api.StreamTypeError] {
173
+			t.Errorf("%s: expected error stream", test.name)
174
+		}
175
+
176
+	}
177
+}
178
+
179
+func TestV2ErrorStreamReading(t *testing.T) {
180
+	tests := []struct {
181
+		name          string
182
+		stream        io.Reader
183
+		expectedError error
184
+	}{
185
+		{
186
+			name:          "error reading from stream",
187
+			stream:        &fakeReader{errors.New("foo")},
188
+			expectedError: errors.New("error reading from error stream: foo"),
189
+		},
190
+		{
191
+			name:          "stream returns an error",
192
+			stream:        strings.NewReader("some error"),
193
+			expectedError: errors.New("error executing remote command: some error"),
194
+		},
195
+	}
196
+
197
+	for _, test := range tests {
198
+		h := newStreamProtocolV2(StreamOptions{}).(*streamProtocolV2)
199
+		h.errorStream = test.stream
200
+
201
+		ch := h.setupErrorStreamReading()
202
+		if ch == nil {
203
+			t.Fatalf("%s: unexpected nil channel", test.name)
204
+		}
205
+
206
+		var err error
207
+		select {
208
+		case err = <-ch:
209
+		case <-time.After(wait.ForeverTestTimeout):
210
+			t.Fatalf("%s: timed out", test.name)
211
+		}
212
+
213
+		if test.expectedError != nil {
214
+			if err == nil {
215
+				t.Errorf("%s: expected an error", test.name)
216
+			} else if e, a := test.expectedError, err; e.Error() != a.Error() {
217
+				t.Errorf("%s: expected %q, got %q", test.name, e, a)
218
+			}
219
+			continue
220
+		}
221
+
222
+		if test.expectedError == nil && err != nil {
223
+			t.Errorf("%s: unexpected error: %v", test.name, err)
224
+			continue
225
+		}
226
+	}
227
+}
0 228
new file mode 100644
... ...
@@ -0,0 +1,108 @@
0
+/*
1
+Copyright 2016 The Kubernetes Authors.
2
+
3
+Licensed under the Apache License, Version 2.0 (the "License");
4
+you may not use this file except in compliance with the License.
5
+You may obtain a copy of the License at
6
+
7
+    http://www.apache.org/licenses/LICENSE-2.0
8
+
9
+Unless required by applicable law or agreed to in writing, software
10
+distributed under the License is distributed on an "AS IS" BASIS,
11
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
+See the License for the specific language governing permissions and
13
+limitations under the License.
14
+*/
15
+
16
+package remotecommand
17
+
18
+import (
19
+	"encoding/json"
20
+	"io"
21
+	"net/http"
22
+	"sync"
23
+
24
+	"k8s.io/kubernetes/pkg/api"
25
+	"k8s.io/kubernetes/pkg/util/runtime"
26
+)
27
+
28
+// streamProtocolV3 implements version 3 of the streaming protocol for attach
29
+// and exec. This version adds support for resizing the container's terminal.
30
+type streamProtocolV3 struct {
31
+	*streamProtocolV2
32
+
33
+	resizeStream io.Writer
34
+}
35
+
36
+var _ streamProtocolHandler = &streamProtocolV3{}
37
+
38
+func newStreamProtocolV3(options StreamOptions) streamProtocolHandler {
39
+	return &streamProtocolV3{
40
+		streamProtocolV2: newStreamProtocolV2(options).(*streamProtocolV2),
41
+	}
42
+}
43
+
44
+func (p *streamProtocolV3) createStreams(conn streamCreator) error {
45
+	// set up the streams from v2
46
+	if err := p.streamProtocolV2.createStreams(conn); err != nil {
47
+		return err
48
+	}
49
+
50
+	// set up resize stream
51
+	if p.Tty {
52
+		headers := http.Header{}
53
+		headers.Set(api.StreamType, api.StreamTypeResize)
54
+		var err error
55
+		p.resizeStream, err = conn.CreateStream(headers)
56
+		if err != nil {
57
+			return err
58
+		}
59
+	}
60
+
61
+	return nil
62
+}
63
+
64
+func (p *streamProtocolV3) handleResizes() {
65
+	if p.resizeStream == nil {
66
+		return
67
+	}
68
+
69
+	go func() {
70
+		defer runtime.HandleCrash()
71
+
72
+		encoder := json.NewEncoder(p.resizeStream)
73
+		for {
74
+			size := p.TerminalSizeQueue.Next()
75
+			if size == nil {
76
+				return
77
+			}
78
+			if err := encoder.Encode(&size); err != nil {
79
+				runtime.HandleError(err)
80
+			}
81
+		}
82
+	}()
83
+}
84
+
85
+func (p *streamProtocolV3) stream(conn streamCreator) error {
86
+	if err := p.createStreams(conn); err != nil {
87
+		return err
88
+	}
89
+
90
+	// now that all the streams have been created, proceed with reading & copying
91
+
92
+	errorChan := p.setupErrorStreamReading()
93
+
94
+	p.handleResizes()
95
+
96
+	p.copyStdin()
97
+
98
+	var wg sync.WaitGroup
99
+	p.copyStdout(&wg)
100
+	p.copyStderr(&wg)
101
+
102
+	// we're waiting for stdout/stderr to finish copying
103
+	wg.Wait()
104
+
105
+	// waits for errorStream to finish reading with an error or nil
106
+	return <-errorChan
107
+}
... ...
@@ -31,7 +31,6 @@ import (
31 31
 	cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
32 32
 	remotecommandserver "k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
33 33
 	utilerrors "k8s.io/kubernetes/pkg/util/errors"
34
-	"k8s.io/kubernetes/pkg/util/interrupt"
35 34
 	"k8s.io/kubernetes/pkg/util/term"
36 35
 )
37 36
 
... ...
@@ -49,9 +48,11 @@ kubectl attach 123456-7890 -c ruby-container -i -t`
49 49
 
50 50
 func NewCmdAttach(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer) *cobra.Command {
51 51
 	options := &AttachOptions{
52
-		In:  cmdIn,
53
-		Out: cmdOut,
54
-		Err: cmdErr,
52
+		StreamOptions: StreamOptions{
53
+			In:  cmdIn,
54
+			Out: cmdOut,
55
+			Err: cmdErr,
56
+		},
55 57
 
56 58
 		Attach: &DefaultRemoteAttach{},
57 59
 	}
... ...
@@ -75,35 +76,32 @@ func NewCmdAttach(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer)
75 75
 
76 76
 // RemoteAttach defines the interface accepted by the Attach command - provided for test stubbing
77 77
 type RemoteAttach interface {
78
-	Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error
78
+	Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue term.TerminalSizeQueue) error
79 79
 }
80 80
 
81 81
 // DefaultRemoteAttach is the standard implementation of attaching
82 82
 type DefaultRemoteAttach struct{}
83 83
 
84
-func (*DefaultRemoteAttach) Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
84
+func (*DefaultRemoteAttach) Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue term.TerminalSizeQueue) error {
85 85
 	exec, err := remotecommand.NewExecutor(config, method, url)
86 86
 	if err != nil {
87 87
 		return err
88 88
 	}
89
-	return exec.Stream(remotecommandserver.SupportedStreamingProtocols, stdin, stdout, stderr, tty)
89
+	return exec.Stream(remotecommand.StreamOptions{
90
+		SupportedProtocols: remotecommandserver.SupportedStreamingProtocols,
91
+		Stdin:              stdin,
92
+		Stdout:             stdout,
93
+		Stderr:             stderr,
94
+		Tty:                tty,
95
+		TerminalSizeQueue:  terminalSizeQueue,
96
+	})
90 97
 }
91 98
 
92 99
 // AttachOptions declare the arguments accepted by the Exec command
93 100
 type AttachOptions struct {
94
-	Namespace     string
95
-	PodName       string
96
-	ContainerName string
97
-	Stdin         bool
98
-	TTY           bool
99
-	CommandName   string
100
-
101
-	// InterruptParent, if set, is used to handle interrupts while attached
102
-	InterruptParent *interrupt.Handler
101
+	StreamOptions
103 102
 
104
-	In  io.Reader
105
-	Out io.Writer
106
-	Err io.Writer
103
+	CommandName string
107 104
 
108 105
 	Pod *api.Pod
109 106
 
... ...
@@ -177,29 +175,48 @@ func (p *AttachOptions) Run() error {
177 177
 	}
178 178
 	pod := p.Pod
179 179
 
180
-	// ensure we can recover the terminal while attached
181
-	t := term.TTY{Parent: p.InterruptParent}
182
-
183 180
 	// check for TTY
184
-	tty := p.TTY
185 181
 	containerToAttach := p.GetContainer(pod)
186
-	if tty && !containerToAttach.TTY {
187
-		tty = false
188
-		fmt.Fprintf(p.Err, "Unable to use a TTY - container %s did not allocate one\n", containerToAttach.Name)
189
-	}
190
-	if p.Stdin {
191
-		t.In = p.In
192
-		if tty && !t.IsTerminal() {
193
-			tty = false
194
-			fmt.Fprintln(p.Err, "Unable to use a TTY - input is not a terminal or the right kind of file")
182
+	if p.TTY && !containerToAttach.TTY {
183
+		p.TTY = false
184
+		if p.Err != nil {
185
+			fmt.Fprintf(p.Err, "Unable to use a TTY - container %s did not allocate one\n", containerToAttach.Name)
186
+		}
187
+	} else if !p.TTY && containerToAttach.TTY {
188
+		// the container was launched with a TTY, so we have to force a TTY here, otherwise you'll get
189
+		// an error "Unrecognized input header"
190
+		p.TTY = true
191
+	}
192
+
193
+	// ensure we can recover the terminal while attached
194
+	t := p.setupTTY()
195
+
196
+	// save p.Err so we can print the command prompt message below
197
+	stderr := p.Err
198
+
199
+	var sizeQueue term.TerminalSizeQueue
200
+	if t.Raw {
201
+		if size := t.GetSize(); size != nil {
202
+			// fake resizing +1 and then back to normal so that attach-detach-reattach will result in the
203
+			// screen being redrawn
204
+			sizePlusOne := *size
205
+			sizePlusOne.Width++
206
+			sizePlusOne.Height++
207
+
208
+			// this call spawns a goroutine to monitor/update the terminal size
209
+			sizeQueue = t.MonitorSize(&sizePlusOne, size)
195 210
 		}
211
+
212
+		// unset p.Err if it was previously set because both stdout and stderr go over p.Out when tty is
213
+		// true
214
+		p.Err = nil
196 215
 	}
197
-	t.Raw = tty
198 216
 
199 217
 	fn := func() error {
200
-		if tty {
201
-			fmt.Fprintln(p.Out, "\nHit enter for command prompt")
218
+		if stderr != nil {
219
+			fmt.Fprintln(stderr, "If you don't see a command prompt, try pressing enter.")
202 220
 		}
221
+
203 222
 		// TODO: consider abstracting into a client invocation or client helper
204 223
 		req := p.Client.RESTClient.Post().
205 224
 			Resource("pods").
... ...
@@ -208,20 +225,20 @@ func (p *AttachOptions) Run() error {
208 208
 			SubResource("attach")
209 209
 		req.VersionedParams(&api.PodAttachOptions{
210 210
 			Container: containerToAttach.Name,
211
-			Stdin:     p.In != nil,
211
+			Stdin:     p.Stdin,
212 212
 			Stdout:    p.Out != nil,
213 213
 			Stderr:    p.Err != nil,
214
-			TTY:       tty,
214
+			TTY:       t.Raw,
215 215
 		}, api.ParameterCodec)
216 216
 
217
-		return p.Attach.Attach("POST", req.URL(), p.Config, p.In, p.Out, p.Err, tty)
217
+		return p.Attach.Attach("POST", req.URL(), p.Config, p.In, p.Out, p.Err, t.Raw, sizeQueue)
218 218
 	}
219 219
 
220 220
 	if err := t.Safe(fn); err != nil {
221 221
 		return err
222 222
 	}
223 223
 
224
-	if p.Stdin && tty && pod.Spec.RestartPolicy == api.RestartPolicyAlways {
224
+	if p.Stdin && t.Raw && pod.Spec.RestartPolicy == api.RestartPolicyAlways {
225 225
 		fmt.Fprintf(p.Out, "Session ended, resume using '%s %s -c %s -i -t' command when the pod is running\n", p.CommandName, pod.Name, containerToAttach.Name)
226 226
 	}
227 227
 	return nil
... ...
@@ -32,6 +32,7 @@ import (
32 32
 	"k8s.io/kubernetes/pkg/api/unversioned"
33 33
 	"k8s.io/kubernetes/pkg/client/restclient"
34 34
 	"k8s.io/kubernetes/pkg/client/unversioned/fake"
35
+	"k8s.io/kubernetes/pkg/util/term"
35 36
 )
36 37
 
37 38
 type fakeRemoteAttach struct {
... ...
@@ -40,7 +41,7 @@ type fakeRemoteAttach struct {
40 40
 	attachErr error
41 41
 }
42 42
 
43
-func (f *fakeRemoteAttach) Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
43
+func (f *fakeRemoteAttach) Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue term.TerminalSizeQueue) error {
44 44
 	f.method = method
45 45
 	f.url = url
46 46
 	return f.attachErr
... ...
@@ -73,7 +74,7 @@ func TestPodAndContainerAttach(t *testing.T) {
73 73
 			name:        "no container, no flags",
74 74
 		},
75 75
 		{
76
-			p:                 &AttachOptions{ContainerName: "bar"},
76
+			p:                 &AttachOptions{StreamOptions: StreamOptions{ContainerName: "bar"}},
77 77
 			args:              []string{"foo"},
78 78
 			expectedPod:       "foo",
79 79
 			expectedContainer: "bar",
... ...
@@ -159,11 +160,13 @@ func TestAttach(t *testing.T) {
159 159
 			ex.attachErr = fmt.Errorf("attach error")
160 160
 		}
161 161
 		params := &AttachOptions{
162
-			ContainerName: "bar",
163
-			In:            bufIn,
164
-			Out:           bufOut,
165
-			Err:           bufErr,
166
-			Attach:        ex,
162
+			StreamOptions: StreamOptions{
163
+				ContainerName: "bar",
164
+				In:            bufIn,
165
+				Out:           bufOut,
166
+				Err:           bufErr,
167
+			},
168
+			Attach: ex,
167 169
 		}
168 170
 		cmd := &cobra.Command{}
169 171
 		if err := params.Complete(f, cmd, []string{"foo"}); err != nil {
... ...
@@ -233,13 +236,15 @@ func TestAttachWarnings(t *testing.T) {
233 233
 		bufIn := bytes.NewBuffer([]byte{})
234 234
 		ex := &fakeRemoteAttach{}
235 235
 		params := &AttachOptions{
236
-			ContainerName: test.container,
237
-			In:            bufIn,
238
-			Out:           bufOut,
239
-			Err:           bufErr,
240
-			Stdin:         test.stdin,
241
-			TTY:           test.tty,
242
-			Attach:        ex,
236
+			StreamOptions: StreamOptions{
237
+				ContainerName: test.container,
238
+				In:            bufIn,
239
+				Out:           bufOut,
240
+				Err:           bufErr,
241
+				Stdin:         test.stdin,
242
+				TTY:           test.tty,
243
+			},
244
+			Attach: ex,
243 245
 		}
244 246
 		cmd := &cobra.Command{}
245 247
 		if err := params.Complete(f, cmd, []string{"foo"}); err != nil {
... ...
@@ -20,11 +20,8 @@ import (
20 20
 	"fmt"
21 21
 	"io"
22 22
 	"net/url"
23
-	"os"
24
-	"os/signal"
25
-	"syscall"
26 23
 
27
-	"github.com/docker/docker/pkg/term"
24
+	dockerterm "github.com/docker/docker/pkg/term"
28 25
 	"github.com/golang/glog"
29 26
 	"github.com/spf13/cobra"
30 27
 	"k8s.io/kubernetes/pkg/api"
... ...
@@ -34,6 +31,8 @@ import (
34 34
 	"k8s.io/kubernetes/pkg/client/unversioned/remotecommand"
35 35
 	cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
36 36
 	remotecommandserver "k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
37
+	"k8s.io/kubernetes/pkg/util/interrupt"
38
+	"k8s.io/kubernetes/pkg/util/term"
37 39
 )
38 40
 
39 41
 const (
... ...
@@ -50,9 +49,11 @@ kubectl exec 123456-7890 -c ruby-container -i -t -- bash -il`
50 50
 
51 51
 func NewCmdExec(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer) *cobra.Command {
52 52
 	options := &ExecOptions{
53
-		In:  cmdIn,
54
-		Out: cmdOut,
55
-		Err: cmdErr,
53
+		StreamOptions: StreamOptions{
54
+			In:  cmdIn,
55
+			Out: cmdOut,
56
+			Err: cmdErr,
57
+		},
56 58
 
57 59
 		Executor: &DefaultRemoteExecutor{},
58 60
 	}
... ...
@@ -78,32 +79,49 @@ func NewCmdExec(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer) *
78 78
 
79 79
 // RemoteExecutor defines the interface accepted by the Exec command - provided for test stubbing
80 80
 type RemoteExecutor interface {
81
-	Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error
81
+	Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue term.TerminalSizeQueue) error
82 82
 }
83 83
 
84 84
 // DefaultRemoteExecutor is the standard implementation of remote command execution
85 85
 type DefaultRemoteExecutor struct{}
86 86
 
87
-func (*DefaultRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
87
+func (*DefaultRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue term.TerminalSizeQueue) error {
88 88
 	exec, err := remotecommand.NewExecutor(config, method, url)
89 89
 	if err != nil {
90 90
 		return err
91 91
 	}
92
-	return exec.Stream(remotecommandserver.SupportedStreamingProtocols, stdin, stdout, stderr, tty)
92
+	return exec.Stream(remotecommand.StreamOptions{
93
+		SupportedProtocols: remotecommandserver.SupportedStreamingProtocols,
94
+		Stdin:              stdin,
95
+		Stdout:             stdout,
96
+		Stderr:             stderr,
97
+		Tty:                tty,
98
+		TerminalSizeQueue:  terminalSizeQueue,
99
+	})
93 100
 }
94 101
 
95
-// ExecOptions declare the arguments accepted by the Exec command
96
-type ExecOptions struct {
102
+type StreamOptions struct {
97 103
 	Namespace     string
98 104
 	PodName       string
99 105
 	ContainerName string
100 106
 	Stdin         bool
101 107
 	TTY           bool
102
-	Command       []string
108
+	// InterruptParent, if set, is used to handle interrupts while attached
109
+	InterruptParent *interrupt.Handler
110
+	In              io.Reader
111
+	Out             io.Writer
112
+	Err             io.Writer
113
+
114
+	// for testing
115
+	overrideStreams func() (io.ReadCloser, io.Writer, io.Writer)
116
+	isTerminalIn    func(t term.TTY) bool
117
+}
118
+
119
+// ExecOptions declare the arguments accepted by the Exec command
120
+type ExecOptions struct {
121
+	StreamOptions
103 122
 
104
-	In  io.Reader
105
-	Out io.Writer
106
-	Err io.Writer
123
+	Command []string
107 124
 
108 125
 	Executor RemoteExecutor
109 126
 	Client   *client.Client
... ...
@@ -168,6 +186,58 @@ func (p *ExecOptions) Validate() error {
168 168
 	return nil
169 169
 }
170 170
 
171
+func (o *StreamOptions) setupTTY() term.TTY {
172
+	t := term.TTY{
173
+		Parent: o.InterruptParent,
174
+		Out:    o.Out,
175
+	}
176
+
177
+	if !o.Stdin {
178
+		// need to nil out o.In to make sure we don't create a stream for stdin
179
+		o.In = nil
180
+		o.TTY = false
181
+		return t
182
+	}
183
+
184
+	t.In = o.In
185
+	if !o.TTY {
186
+		return t
187
+	}
188
+
189
+	if o.isTerminalIn == nil {
190
+		o.isTerminalIn = func(tty term.TTY) bool {
191
+			return tty.IsTerminalIn()
192
+		}
193
+	}
194
+	if !o.isTerminalIn(t) {
195
+		o.TTY = false
196
+
197
+		if o.Err != nil {
198
+			fmt.Fprintln(o.Err, "Unable to use a TTY - input is not a terminal or the right kind of file")
199
+		}
200
+
201
+		return t
202
+	}
203
+
204
+	// if we get to here, the user wants to attach stdin, wants a TTY, and o.In is a terminal, so we
205
+	// can safely set t.Raw to true
206
+	t.Raw = true
207
+
208
+	if o.overrideStreams == nil {
209
+		// use dockerterm.StdStreams() to get the right I/O handles on Windows
210
+		o.overrideStreams = dockerterm.StdStreams
211
+	}
212
+	stdin, stdout, _ := o.overrideStreams()
213
+	o.In = stdin
214
+	t.In = stdin
215
+	if o.Out != nil {
216
+		o.Out = stdout
217
+		t.Out = stdout
218
+	}
219
+
220
+	return t
221
+}
222
+
171 223
 // Run executes a validated remote execution against a pod.
172 224
 func (p *ExecOptions) Run() error {
173 225
 	pod, err := p.Client.Pods(p.Namespace).Get(p.PodName)
... ...
@@ -181,91 +251,74 @@ func (p *ExecOptions) Run() error {
181 181
 		containerName = pod.Spec.Containers[0].Name
182 182
 	}
183 183
 
184
-	// TODO: refactor with terminal helpers from the edit utility once that is merged
185
-	var stdin io.Reader
186
-	tty := p.TTY
187
-	if p.Stdin {
188
-		stdin = p.In
189
-		if tty {
190
-			if file, ok := stdin.(*os.File); ok {
191
-				inFd := file.Fd()
192
-				if term.IsTerminal(inFd) {
193
-					oldState, err := term.SetRawTerminal(inFd)
194
-					if err != nil {
195
-						glog.Fatal(err)
196
-					}
197
-					// this handles a clean exit, where the command finished
198
-					defer term.RestoreTerminal(inFd, oldState)
199
-
200
-					// SIGINT is handled by term.SetRawTerminal (it runs a goroutine that listens
201
-					// for SIGINT and restores the terminal before exiting)
202
-
203
-					// this handles SIGTERM
204
-					sigChan := make(chan os.Signal, 1)
205
-					signal.Notify(sigChan, syscall.SIGTERM)
206
-					go func() {
207
-						<-sigChan
208
-						term.RestoreTerminal(inFd, oldState)
209
-						os.Exit(0)
210
-					}()
211
-				} else {
212
-					fmt.Fprintln(p.Err, "STDIN is not a terminal")
213
-				}
214
-			} else {
215
-				tty = false
216
-				fmt.Fprintln(p.Err, "Unable to use a TTY - input is not the right kind of file")
217
-			}
218
-		}
219
-	}
184
+	// ensure we can recover the terminal while attached
185
+	t := p.setupTTY()
220 186
 
221
-	// TODO: consider abstracting into a client invocation or client helper
222
-	req := p.Client.RESTClient.Post().
223
-		Resource("pods").
224
-		Name(pod.Name).
225
-		Namespace(pod.Namespace).
226
-		SubResource("exec").
227
-		Param("container", containerName)
228
-
229
-	req.VersionedParams(&api.PodExecOptions{
230
-		Container: containerName,
231
-		Command:   p.Command,
232
-		Stdin:     stdin != nil,
233
-		Stdout:    p.Out != nil,
234
-		Stderr:    p.Err != nil,
235
-		TTY:       tty,
236
-	}, api.ParameterCodec)
237
-
238
-	postErr := p.Executor.Execute("POST", req.URL(), p.Config, stdin, p.Out, p.Err, tty)
239
-
240
-	// if we don't have an error, return.  If we did get an error, try a GET because v3.0.0 shipped with exec running as a GET.
241
-	if postErr == nil {
242
-		return nil
187
+	var sizeQueue term.TerminalSizeQueue
188
+	if t.Raw {
189
+		// this call spawns a goroutine to monitor/update the terminal size
190
+		sizeQueue = t.MonitorSize(t.GetSize())
191
+
192
+		// unset p.Err if it was previously set because both stdout and stderr go over p.Out when tty is
193
+		// true
194
+		p.Err = nil
243 195
 	}
244
-	// only try the get if the error is either a forbidden or method not supported, otherwise trying with a GET probably won't help
245
-	if !apierrors.IsForbidden(postErr) && !apierrors.IsMethodNotSupported(postErr) {
196
+
197
+	fn := func() error {
198
+		// TODO: consider abstracting into a client invocation or client helper
199
+		req := p.Client.RESTClient.Post().
200
+			Resource("pods").
201
+			Name(pod.Name).
202
+			Namespace(pod.Namespace).
203
+			SubResource("exec").
204
+			Param("container", containerName)
205
+		req.VersionedParams(&api.PodExecOptions{
206
+			Container: containerName,
207
+			Command:   p.Command,
208
+			Stdin:     p.Stdin,
209
+			Stdout:    p.Out != nil,
210
+			Stderr:    p.Err != nil,
211
+			TTY:       t.Raw,
212
+		}, api.ParameterCodec)
213
+
214
+		postErr := p.Executor.Execute("POST", req.URL(), p.Config, p.In, p.Out, p.Err, t.Raw, sizeQueue)
215
+
216
+		// if we don't have an error, return.  If we did get an error, try a GET because v3.0.0 shipped with exec running as a GET.
217
+		if postErr == nil {
218
+			return nil
219
+		}
220
+		// only try the get if the error is either a forbidden or method not supported, otherwise trying with a GET probably won't help
221
+		if !apierrors.IsForbidden(postErr) && !apierrors.IsMethodNotSupported(postErr) {
222
+			return postErr
223
+		}
224
+
225
+		getReq := p.Client.RESTClient.Get().
226
+			Resource("pods").
227
+			Name(pod.Name).
228
+			Namespace(pod.Namespace).
229
+			SubResource("exec").
230
+			Param("container", containerName)
231
+		getReq.VersionedParams(&api.PodExecOptions{
232
+			Container: containerName,
233
+			Command:   p.Command,
234
+			Stdin:     p.Stdin,
235
+			Stdout:    p.Out != nil,
236
+			Stderr:    p.Err != nil,
237
+			TTY:       t.Raw,
238
+		}, api.ParameterCodec)
239
+
240
+		getErr := p.Executor.Execute("GET", getReq.URL(), p.Config, p.In, p.Out, p.Err, t.Raw, sizeQueue)
241
+		if getErr == nil {
242
+			return nil
243
+		}
244
+
245
+		// if we got a getErr, return the postErr because it's more likely to be correct.  GET is legacy
246 246
 		return postErr
247 247
 	}
248 248
 
249
-	getReq := p.Client.RESTClient.Get().
250
-		Resource("pods").
251
-		Name(pod.Name).
252
-		Namespace(pod.Namespace).
253
-		SubResource("exec").
254
-		Param("container", containerName)
255
-	getReq.VersionedParams(&api.PodExecOptions{
256
-		Container: containerName,
257
-		Command:   p.Command,
258
-		Stdin:     stdin != nil,
259
-		Stdout:    p.Out != nil,
260
-		Stderr:    p.Err != nil,
261
-		TTY:       tty,
262
-	}, api.ParameterCodec)
263
-
264
-	getErr := p.Executor.Execute("GET", getReq.URL(), p.Config, stdin, p.Out, p.Err, tty)
265
-	if getErr == nil {
266
-		return nil
249
+	if err := t.Safe(fn); err != nil {
250
+		return err
267 251
 	}
268 252
 
269
-	// if we got a getErr, return the postErr because it's more likely to be correct.  GET is legacy
270
-	return postErr
253
+	return nil
271 254
 }
... ...
@@ -20,9 +20,11 @@ import (
20 20
 	"bytes"
21 21
 	"fmt"
22 22
 	"io"
23
+	"io/ioutil"
23 24
 	"net/http"
24 25
 	"net/url"
25 26
 	"reflect"
27
+	"strings"
26 28
 	"testing"
27 29
 
28 30
 	"github.com/spf13/cobra"
... ...
@@ -32,6 +34,7 @@ import (
32 32
 	"k8s.io/kubernetes/pkg/api/unversioned"
33 33
 	"k8s.io/kubernetes/pkg/client/restclient"
34 34
 	"k8s.io/kubernetes/pkg/client/unversioned/fake"
35
+	"k8s.io/kubernetes/pkg/util/term"
35 36
 )
36 37
 
37 38
 type fakeRemoteExecutor struct {
... ...
@@ -40,7 +43,7 @@ type fakeRemoteExecutor struct {
40 40
 	execErr error
41 41
 }
42 42
 
43
-func (f *fakeRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
43
+func (f *fakeRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue term.TerminalSizeQueue) error {
44 44
 	f.method = method
45 45
 	f.url = url
46 46
 	return f.execErr
... ...
@@ -64,19 +67,19 @@ func TestPodAndContainer(t *testing.T) {
64 64
 			name:          "empty",
65 65
 		},
66 66
 		{
67
-			p:             &ExecOptions{PodName: "foo"},
67
+			p:             &ExecOptions{StreamOptions: StreamOptions{PodName: "foo"}},
68 68
 			argsLenAtDash: -1,
69 69
 			expectError:   true,
70 70
 			name:          "no cmd",
71 71
 		},
72 72
 		{
73
-			p:             &ExecOptions{PodName: "foo", ContainerName: "bar"},
73
+			p:             &ExecOptions{StreamOptions: StreamOptions{PodName: "foo", ContainerName: "bar"}},
74 74
 			argsLenAtDash: -1,
75 75
 			expectError:   true,
76 76
 			name:          "no cmd, w/ container",
77 77
 		},
78 78
 		{
79
-			p:             &ExecOptions{PodName: "foo"},
79
+			p:             &ExecOptions{StreamOptions: StreamOptions{PodName: "foo"}},
80 80
 			args:          []string{"cmd"},
81 81
 			argsLenAtDash: -1,
82 82
 			expectedPod:   "foo",
... ...
@@ -114,7 +117,7 @@ func TestPodAndContainer(t *testing.T) {
114 114
 			name:          "cmd, cmd is behind dash",
115 115
 		},
116 116
 		{
117
-			p:                 &ExecOptions{ContainerName: "bar"},
117
+			p:                 &ExecOptions{StreamOptions: StreamOptions{ContainerName: "bar"}},
118 118
 			args:              []string{"foo", "cmd"},
119 119
 			argsLenAtDash:     -1,
120 120
 			expectedPod:       "foo",
... ...
@@ -205,12 +208,14 @@ func TestExec(t *testing.T) {
205 205
 			ex.execErr = fmt.Errorf("exec error")
206 206
 		}
207 207
 		params := &ExecOptions{
208
-			PodName:       "foo",
209
-			ContainerName: "bar",
210
-			In:            bufIn,
211
-			Out:           bufOut,
212
-			Err:           bufErr,
213
-			Executor:      ex,
208
+			StreamOptions: StreamOptions{
209
+				PodName:       "foo",
210
+				ContainerName: "bar",
211
+				In:            bufIn,
212
+				Out:           bufOut,
213
+				Err:           bufErr,
214
+			},
215
+			Executor: ex,
214 216
 		}
215 217
 		cmd := &cobra.Command{}
216 218
 		args := []string{"test", "command"}
... ...
@@ -256,3 +261,124 @@ func execPod() *api.Pod {
256 256
 		},
257 257
 	}
258 258
 }
259
+
260
+func TestSetupTTY(t *testing.T) {
261
+	stderr := &bytes.Buffer{}
262
+
263
+	// test 1 - don't attach stdin
264
+	o := &StreamOptions{
265
+		// InterruptParent: ,
266
+		Stdin: false,
267
+		In:    &bytes.Buffer{},
268
+		Out:   &bytes.Buffer{},
269
+		Err:   stderr,
270
+		TTY:   true,
271
+	}
272
+
273
+	tty := o.setupTTY()
274
+
275
+	if o.In != nil {
276
+		t.Errorf("don't attach stdin: o.In should be nil")
277
+	}
278
+	if tty.In != nil {
279
+		t.Errorf("don't attach stdin: tty.In should be nil")
280
+	}
281
+	if o.TTY {
282
+		t.Errorf("don't attach stdin: o.TTY should be false")
283
+	}
284
+	if tty.Raw {
285
+		t.Errorf("don't attach stdin: tty.Raw should be false")
286
+	}
287
+	if len(stderr.String()) > 0 {
288
+		t.Errorf("don't attach stdin: stderr wasn't empty: %s", stderr.String())
289
+	}
290
+
291
+	// tests from here on attach stdin
292
+	// test 2 - don't request a TTY
293
+	o.Stdin = true
294
+	o.In = &bytes.Buffer{}
295
+	o.TTY = false
296
+
297
+	tty = o.setupTTY()
298
+
299
+	if o.In == nil {
300
+		t.Errorf("attach stdin, no TTY: o.In should not be nil")
301
+	}
302
+	if tty.In != o.In {
303
+		t.Errorf("attach stdin, no TTY: tty.In should equal o.In")
304
+	}
305
+	if o.TTY {
306
+		t.Errorf("attach stdin, no TTY: o.TTY should be false")
307
+	}
308
+	if tty.Raw {
309
+		t.Errorf("attach stdin, no TTY: tty.Raw should be false")
310
+	}
311
+	if len(stderr.String()) > 0 {
312
+		t.Errorf("attach stdin, no TTY: stderr wasn't empty: %s", stderr.String())
313
+	}
314
+
315
+	// test 3 - request a TTY, but stdin is not a terminal
316
+	o.Stdin = true
317
+	o.In = &bytes.Buffer{}
318
+	o.Err = stderr
319
+	o.TTY = true
320
+
321
+	tty = o.setupTTY()
322
+
323
+	if o.In == nil {
324
+		t.Errorf("attach stdin, TTY, not a terminal: o.In should not be nil")
325
+	}
326
+	if tty.In != o.In {
327
+		t.Errorf("attach stdin, TTY, not a terminal: tty.In should equal o.In")
328
+	}
329
+	if o.TTY {
330
+		t.Errorf("attach stdin, TTY, not a terminal: o.TTY should be false")
331
+	}
332
+	if tty.Raw {
333
+		t.Errorf("attach stdin, TTY, not a terminal: tty.Raw should be false")
334
+	}
335
+	if !strings.Contains(stderr.String(), "input is not a terminal") {
336
+		t.Errorf("attach stdin, TTY, not a terminal: expected 'input is not a terminal' to stderr")
337
+	}
338
+
339
+	// test 4 - request a TTY, stdin is a terminal
340
+	o.Stdin = true
341
+	o.In = &bytes.Buffer{}
342
+	stderr.Reset()
343
+	o.TTY = true
344
+
345
+	overrideStdin := ioutil.NopCloser(&bytes.Buffer{})
346
+	overrideStdout := &bytes.Buffer{}
347
+	overrideStderr := &bytes.Buffer{}
348
+	o.overrideStreams = func() (io.ReadCloser, io.Writer, io.Writer) {
349
+		return overrideStdin, overrideStdout, overrideStderr
350
+	}
351
+
352
+	o.isTerminalIn = func(tty term.TTY) bool {
353
+		return true
354
+	}
355
+
356
+	tty = o.setupTTY()
357
+
358
+	if o.In != overrideStdin {
359
+		t.Errorf("attach stdin, TTY, is a terminal: o.In should equal overrideStdin")
360
+	}
361
+	if tty.In != o.In {
362
+		t.Errorf("attach stdin, TTY, is a terminal: tty.In should equal o.In")
363
+	}
364
+	if !o.TTY {
365
+		t.Errorf("attach stdin, TTY, is a terminal: o.TTY should be true")
366
+	}
367
+	if !tty.Raw {
368
+		t.Errorf("attach stdin, TTY, is a terminal: tty.Raw should be true")
369
+	}
370
+	if len(stderr.String()) > 0 {
371
+		t.Errorf("attach stdin, TTY, is a terminal: stderr wasn't empty: %s", stderr.String())
372
+	}
373
+	if o.Out != overrideStdout {
374
+		t.Errorf("attach stdin, TTY, is a terminal: o.Out should equal overrideStdout")
375
+	}
376
+	if tty.Out != o.Out {
377
+		t.Errorf("attach stdin, TTY, is a terminal: tty.Out should equal o.Out")
378
+	}
379
+}
... ...
@@ -160,6 +160,7 @@ func RunPortForward(f *cmdutil.Factory, cmd *cobra.Command, args []string, fw po
160 160
 		Namespace(namespace).
161 161
 		Name(pod.Name).
162 162
 		SubResource("portforward")
163
+
163 164
 	getErr := fw.ForwardPorts("GET", getReq.URL(), config, args, stopCh)
164 165
 	if getErr == nil {
165 166
 		return nil
... ...
@@ -237,11 +237,13 @@ func Run(f *cmdutil.Factory, opts *RunOptions, cmdIn io.Reader, cmdOut, cmdErr i
237 237
 
238 238
 	if attach {
239 239
 		opts := &AttachOptions{
240
-			In:    cmdIn,
241
-			Out:   cmdOut,
242
-			Err:   cmdErr,
243
-			Stdin: interactive,
244
-			TTY:   tty,
240
+			StreamOptions: StreamOptions{
241
+				In:    cmdIn,
242
+				Out:   cmdOut,
243
+				Err:   cmdErr,
244
+				Stdin: interactive,
245
+				TTY:   tty,
246
+			},
245 247
 
246 248
 			CommandName: cmd.Parent().CommandPath() + " attach",
247 249
 
248 250
new file mode 100644
... ...
@@ -0,0 +1,46 @@
0
+/*
1
+Copyright 2015 The Kubernetes Authors.
2
+
3
+Licensed under the Apache License, Version 2.0 (the "License");
4
+you may not use this file except in compliance with the License.
5
+You may obtain a copy of the License at
6
+
7
+    http://www.apache.org/licenses/LICENSE-2.0
8
+
9
+Unless required by applicable law or agreed to in writing, software
10
+distributed under the License is distributed on an "AS IS" BASIS,
11
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
+See the License for the specific language governing permissions and
13
+limitations under the License.
14
+*/
15
+
16
+package container
17
+
18
+import (
19
+	"k8s.io/kubernetes/pkg/util/runtime"
20
+	"k8s.io/kubernetes/pkg/util/term"
21
+)
22
+
23
+// handleResizing spawns a goroutine that processes the resize channel, calling resizeFunc for each
24
+// term.Size received from the channel. The resize channel must be closed elsewhere to stop the
25
+// goroutine.
26
+func HandleResizing(resize <-chan term.Size, resizeFunc func(size term.Size)) {
27
+	if resize == nil {
28
+		return
29
+	}
30
+
31
+	go func() {
32
+		defer runtime.HandleCrash()
33
+
34
+		for {
35
+			size, ok := <-resize
36
+			if !ok {
37
+				return
38
+			}
39
+			if size.Height < 1 || size.Width < 1 {
40
+				continue
41
+			}
42
+			resizeFunc(size)
43
+		}
44
+	}()
45
+}
... ...
@@ -27,6 +27,7 @@ import (
27 27
 	"k8s.io/kubernetes/pkg/api"
28 28
 	"k8s.io/kubernetes/pkg/types"
29 29
 	"k8s.io/kubernetes/pkg/util/flowcontrol"
30
+	"k8s.io/kubernetes/pkg/util/term"
30 31
 	"k8s.io/kubernetes/pkg/volume"
31 32
 )
32 33
 
... ...
@@ -126,7 +127,7 @@ type Runtime interface {
126 126
 }
127 127
 
128 128
 type ContainerAttacher interface {
129
-	AttachContainer(id ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) (err error)
129
+	AttachContainer(id ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) (err error)
130 130
 }
131 131
 
132 132
 // CommandRunner encapsulates the command runner interfaces for testability.
... ...
@@ -134,7 +135,7 @@ type ContainerCommandRunner interface {
134 134
 	// Runs the command in the container of the specified pod using nsenter.
135 135
 	// Attaches the processes stdin, stdout, and stderr. Optionally uses a
136 136
 	// tty.
137
-	ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error
137
+	ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error
138 138
 	// Forward the specified port from the specified pod to the stream.
139 139
 	PortForward(pod *Pod, port uint16, stream io.ReadWriteCloser) error
140 140
 }
... ...
@@ -27,6 +27,7 @@ import (
27 27
 	. "k8s.io/kubernetes/pkg/kubelet/container"
28 28
 	"k8s.io/kubernetes/pkg/types"
29 29
 	"k8s.io/kubernetes/pkg/util/flowcontrol"
30
+	"k8s.io/kubernetes/pkg/util/term"
30 31
 	"k8s.io/kubernetes/pkg/volume"
31 32
 )
32 33
 
... ...
@@ -273,7 +274,7 @@ func (f *FakeRuntime) GetPodStatus(uid types.UID, name, namespace string) (*PodS
273 273
 	return &status, f.Err
274 274
 }
275 275
 
276
-func (f *FakeRuntime) ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
276
+func (f *FakeRuntime) ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
277 277
 	f.Lock()
278 278
 	defer f.Unlock()
279 279
 
... ...
@@ -281,7 +282,7 @@ func (f *FakeRuntime) ExecInContainer(containerID ContainerID, cmd []string, std
281 281
 	return f.Err
282 282
 }
283 283
 
284
-func (f *FakeRuntime) AttachContainer(containerID ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
284
+func (f *FakeRuntime) AttachContainer(containerID ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
285 285
 	f.Lock()
286 286
 	defer f.Unlock()
287 287
 
... ...
@@ -24,6 +24,7 @@ import (
24 24
 	. "k8s.io/kubernetes/pkg/kubelet/container"
25 25
 	"k8s.io/kubernetes/pkg/types"
26 26
 	"k8s.io/kubernetes/pkg/util/flowcontrol"
27
+	"k8s.io/kubernetes/pkg/util/term"
27 28
 	"k8s.io/kubernetes/pkg/volume"
28 29
 )
29 30
 
... ...
@@ -88,12 +89,12 @@ func (r *Mock) GetPodStatus(uid types.UID, name, namespace string) (*PodStatus,
88 88
 	return args.Get(0).(*PodStatus), args.Error(1)
89 89
 }
90 90
 
91
-func (r *Mock) ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
91
+func (r *Mock) ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
92 92
 	args := r.Called(containerID, cmd, stdin, stdout, stderr, tty)
93 93
 	return args.Error(0)
94 94
 }
95 95
 
96
-func (r *Mock) AttachContainer(containerID ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
96
+func (r *Mock) AttachContainer(containerID ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
97 97
 	args := r.Called(containerID, stdin, stdout, stderr, tty)
98 98
 	return args.Error(0)
99 99
 }
... ...
@@ -77,6 +77,8 @@ type DockerInterface interface {
77 77
 	StartExec(string, dockertypes.ExecStartCheck, StreamOptions) error
78 78
 	InspectExec(id string) (*dockertypes.ContainerExecInspect, error)
79 79
 	AttachToContainer(string, dockertypes.ContainerAttachOptions, StreamOptions) error
80
+	ResizeContainerTTY(id string, height, width int) error
81
+	ResizeExecTTY(id string, height, width int) error
80 82
 }
81 83
 
82 84
 // KubeletContainerName encapsulates a pod name and a Kubernetes container name.
... ...
@@ -62,6 +62,7 @@ import (
62 62
 	utilruntime "k8s.io/kubernetes/pkg/util/runtime"
63 63
 	"k8s.io/kubernetes/pkg/util/sets"
64 64
 	utilstrings "k8s.io/kubernetes/pkg/util/strings"
65
+	"k8s.io/kubernetes/pkg/util/term"
65 66
 )
66 67
 
67 68
 const (
... ...
@@ -1060,7 +1061,7 @@ func (d *dockerExitError) ExitStatus() int {
1060 1060
 }
1061 1061
 
1062 1062
 // ExecInContainer runs the command inside the container identified by containerID.
1063
-func (dm *DockerManager) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
1063
+func (dm *DockerManager) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
1064 1064
 	if dm.execHandler == nil {
1065 1065
 		return errors.New("unable to exec without an exec handler")
1066 1066
 	}
... ...
@@ -1073,10 +1074,16 @@ func (dm *DockerManager) ExecInContainer(containerID kubecontainer.ContainerID,
1073 1073
 		return fmt.Errorf("container not running (%s)", container.ID)
1074 1074
 	}
1075 1075
 
1076
-	return dm.execHandler.ExecInContainer(dm.client, container, cmd, stdin, stdout, stderr, tty)
1076
+	return dm.execHandler.ExecInContainer(dm.client, container, cmd, stdin, stdout, stderr, tty, resize)
1077 1077
 }
1078 1078
 
1079
-func (dm *DockerManager) AttachContainer(containerID kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
1079
+func (dm *DockerManager) AttachContainer(containerID kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
1080
+	// Have to start this before the call to client.AttachToContainer because client.AttachToContainer is a blocking
1081
+	// call :-( Otherwise, resize events don't get processed and the terminal never resizes.
1082
+	kubecontainer.HandleResizing(resize, func(size term.Size) {
1083
+		dm.client.ResizeContainerTTY(containerID.ID, int(size.Height), int(size.Width))
1084
+	})
1085
+
1080 1086
 	// TODO(random-liu): Do we really use the *Logs* field here?
1081 1087
 	opts := dockertypes.ContainerAttachOptions{
1082 1088
 		Stream: true,
... ...
@@ -26,18 +26,19 @@ import (
26 26
 	dockertypes "github.com/docker/engine-api/types"
27 27
 	"github.com/golang/glog"
28 28
 	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
29
+	"k8s.io/kubernetes/pkg/util/term"
29 30
 )
30 31
 
31 32
 // ExecHandler knows how to execute a command in a running Docker container.
32 33
 type ExecHandler interface {
33
-	ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error
34
+	ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error
34 35
 }
35 36
 
36 37
 // NsenterExecHandler executes commands in Docker containers using nsenter.
37 38
 type NsenterExecHandler struct{}
38 39
 
39 40
 // TODO should we support nsenter in a container, running with elevated privs and --pid=host?
40
-func (*NsenterExecHandler) ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
41
+func (*NsenterExecHandler) ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
41 42
 	nsenter, err := exec.LookPath("nsenter")
42 43
 	if err != nil {
43 44
 		return fmt.Errorf("exec unavailable - unable to locate nsenter")
... ...
@@ -61,6 +62,10 @@ func (*NsenterExecHandler) ExecInContainer(client DockerInterface, container *do
61 61
 		// make sure to close the stdout stream
62 62
 		defer stdout.Close()
63 63
 
64
+		kubecontainer.HandleResizing(resize, func(size term.Size) {
65
+			term.SetSize(p.Fd(), size)
66
+		})
67
+
64 68
 		if stdin != nil {
65 69
 			go io.Copy(p, stdin)
66 70
 		}
... ...
@@ -98,7 +103,7 @@ func (*NsenterExecHandler) ExecInContainer(client DockerInterface, container *do
98 98
 // NativeExecHandler executes commands in Docker containers using Docker's exec API.
99 99
 type NativeExecHandler struct{}
100 100
 
101
-func (*NativeExecHandler) ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
101
+func (*NativeExecHandler) ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
102 102
 	createOpts := dockertypes.ExecConfig{
103 103
 		Cmd:          cmd,
104 104
 		AttachStdin:  stdin != nil,
... ...
@@ -110,6 +115,13 @@ func (*NativeExecHandler) ExecInContainer(client DockerInterface, container *doc
110 110
 	if err != nil {
111 111
 		return fmt.Errorf("failed to exec in container - Exec setup failed - %v", err)
112 112
 	}
113
+
114
+	// Have to start this before the call to client.StartExec because client.StartExec is a blocking
115
+	// call :-( Otherwise, resize events don't get processed and the terminal never resizes.
116
+	kubecontainer.HandleResizing(resize, func(size term.Size) {
117
+		client.ResizeExecTTY(execObj.ID, int(size.Height), int(size.Width))
118
+	})
119
+
113 120
 	startOpts := dockertypes.ExecStartCheck{Detach: false, Tty: tty}
114 121
 	streamOpts := StreamOptions{
115 122
 		InputStream:  stdin,
... ...
@@ -121,6 +133,7 @@ func (*NativeExecHandler) ExecInContainer(client DockerInterface, container *doc
121 121
 	if err != nil {
122 122
 		return err
123 123
 	}
124
+
124 125
 	ticker := time.NewTicker(2 * time.Second)
125 126
 	defer ticker.Stop()
126 127
 	count := 0
... ...
@@ -500,6 +500,20 @@ func (f *FakeDockerClient) updateContainerStatus(id, status string) {
500 500
 	}
501 501
 }
502 502
 
503
+func (f *FakeDockerClient) ResizeExecTTY(id string, height, width int) error {
504
+	f.Lock()
505
+	defer f.Unlock()
506
+	f.called = append(f.called, "resize_exec")
507
+	return nil
508
+}
509
+
510
+func (f *FakeDockerClient) ResizeContainerTTY(id string, height, width int) error {
511
+	f.Lock()
512
+	defer f.Unlock()
513
+	f.called = append(f.called, "resize_container")
514
+	return nil
515
+}
516
+
503 517
 // FakeDockerPuller is a stub implementation of DockerPuller.
504 518
 type FakeDockerPuller struct {
505 519
 	sync.Mutex
... ...
@@ -213,3 +213,21 @@ func (in instrumentedDockerInterface) ImageHistory(id string) ([]dockertypes.Ima
213 213
 	recordError(operation, err)
214 214
 	return out, err
215 215
 }
216
+
217
+func (in instrumentedDockerInterface) ResizeExecTTY(id string, height, width int) error {
218
+	const operation = "resize_exec"
219
+	defer recordOperation(operation, time.Now())
220
+
221
+	err := in.client.ResizeExecTTY(id, height, width)
222
+	recordError(operation, err)
223
+	return err
224
+}
225
+
226
+func (in instrumentedDockerInterface) ResizeContainerTTY(id string, height, width int) error {
227
+	const operation = "resize_container"
228
+	defer recordOperation(operation, time.Now())
229
+
230
+	err := in.client.ResizeContainerTTY(id, height, width)
231
+	recordError(operation, err)
232
+	return err
233
+}
... ...
@@ -454,6 +454,24 @@ func (d *kubeDockerClient) AttachToContainer(id string, opts dockertypes.Contain
454 454
 	return d.holdHijackedConnection(sopts.RawTerminal, sopts.InputStream, sopts.OutputStream, sopts.ErrorStream, resp)
455 455
 }
456 456
 
457
+func (d *kubeDockerClient) ResizeExecTTY(id string, height, width int) error {
458
+	ctx, cancel := d.getCancelableContext()
459
+	defer cancel()
460
+	return d.client.ContainerExecResize(ctx, id, dockertypes.ResizeOptions{
461
+		Height: height,
462
+		Width:  width,
463
+	})
464
+}
465
+
466
+func (d *kubeDockerClient) ResizeContainerTTY(id string, height, width int) error {
467
+	ctx, cancel := d.getCancelableContext()
468
+	defer cancel()
469
+	return d.client.ContainerResize(ctx, id, dockertypes.ResizeOptions{
470
+		Height: height,
471
+		Width:  width,
472
+	})
473
+}
474
+
457 475
 // redirectResponseToOutputStream redirect the response stream to stdout and stderr. When tty is true, all stream will
458 476
 // only be redirected to stdout.
459 477
 func (d *kubeDockerClient) redirectResponseToOutputStream(tty bool, outputStream, errorStream io.Writer, resp io.Reader) error {
... ...
@@ -88,6 +88,7 @@ import (
88 88
 	utilruntime "k8s.io/kubernetes/pkg/util/runtime"
89 89
 	"k8s.io/kubernetes/pkg/util/selinux"
90 90
 	"k8s.io/kubernetes/pkg/util/sets"
91
+	"k8s.io/kubernetes/pkg/util/term"
91 92
 	utilvalidation "k8s.io/kubernetes/pkg/util/validation"
92 93
 	"k8s.io/kubernetes/pkg/util/validation/field"
93 94
 	"k8s.io/kubernetes/pkg/util/wait"
... ...
@@ -3793,7 +3794,7 @@ func (kl *Kubelet) RunInContainer(podFullName string, podUID types.UID, containe
3793 3793
 
3794 3794
 	var buffer bytes.Buffer
3795 3795
 	output := ioutils.WriteCloserWrapper(&buffer)
3796
-	err = kl.runner.ExecInContainer(container.ID, cmd, nil, output, output, false)
3796
+	err = kl.runner.ExecInContainer(container.ID, cmd, nil, output, output, false, nil)
3797 3797
 	if err != nil {
3798 3798
 		return nil, err
3799 3799
 	}
... ...
@@ -3803,7 +3804,7 @@ func (kl *Kubelet) RunInContainer(podFullName string, podUID types.UID, containe
3803 3803
 
3804 3804
 // ExecInContainer executes a command in a container, connecting the supplied
3805 3805
 // stdin/stdout/stderr to the command's IO streams.
3806
-func (kl *Kubelet) ExecInContainer(podFullName string, podUID types.UID, containerName string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
3806
+func (kl *Kubelet) ExecInContainer(podFullName string, podUID types.UID, containerName string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
3807 3807
 	podUID = kl.podManager.TranslatePodUID(podUID)
3808 3808
 
3809 3809
 	container, err := kl.findContainer(podFullName, podUID, containerName)
... ...
@@ -3813,12 +3814,12 @@ func (kl *Kubelet) ExecInContainer(podFullName string, podUID types.UID, contain
3813 3813
 	if container == nil {
3814 3814
 		return fmt.Errorf("container not found (%q)", containerName)
3815 3815
 	}
3816
-	return kl.runner.ExecInContainer(container.ID, cmd, stdin, stdout, stderr, tty)
3816
+	return kl.runner.ExecInContainer(container.ID, cmd, stdin, stdout, stderr, tty, resize)
3817 3817
 }
3818 3818
 
3819 3819
 // AttachContainer uses the container runtime to attach the given streams to
3820 3820
 // the given container.
3821
-func (kl *Kubelet) AttachContainer(podFullName string, podUID types.UID, containerName string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
3821
+func (kl *Kubelet) AttachContainer(podFullName string, podUID types.UID, containerName string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
3822 3822
 	podUID = kl.podManager.TranslatePodUID(podUID)
3823 3823
 
3824 3824
 	container, err := kl.findContainer(podFullName, podUID, containerName)
... ...
@@ -3828,7 +3829,7 @@ func (kl *Kubelet) AttachContainer(podFullName string, podUID types.UID, contain
3828 3828
 	if container == nil {
3829 3829
 		return fmt.Errorf("container not found (%q)", containerName)
3830 3830
 	}
3831
-	return kl.containerRuntime.AttachContainer(container.ID, stdin, stdout, stderr, tty)
3831
+	return kl.containerRuntime.AttachContainer(container.ID, stdin, stdout, stderr, tty, resize)
3832 3832
 }
3833 3833
 
3834 3834
 // PortForward connects to the pod's port and copies data between the port
... ...
@@ -73,6 +73,7 @@ import (
73 73
 	"k8s.io/kubernetes/pkg/util/rand"
74 74
 	utilruntime "k8s.io/kubernetes/pkg/util/runtime"
75 75
 	"k8s.io/kubernetes/pkg/util/sets"
76
+	"k8s.io/kubernetes/pkg/util/term"
76 77
 	"k8s.io/kubernetes/pkg/util/wait"
77 78
 	"k8s.io/kubernetes/pkg/version"
78 79
 	"k8s.io/kubernetes/pkg/volume"
... ...
@@ -1078,7 +1079,7 @@ type fakeContainerCommandRunner struct {
1078 1078
 	Stream io.ReadWriteCloser
1079 1079
 }
1080 1080
 
1081
-func (f *fakeContainerCommandRunner) ExecInContainer(id kubecontainer.ContainerID, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error {
1081
+func (f *fakeContainerCommandRunner) ExecInContainer(id kubecontainer.ContainerID, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error {
1082 1082
 	f.Cmd = cmd
1083 1083
 	f.ID = id
1084 1084
 	f.Stdin = in
... ...
@@ -2095,6 +2096,7 @@ func TestExecInContainerNoSuchPod(t *testing.T) {
2095 2095
 		nil,
2096 2096
 		nil,
2097 2097
 		false,
2098
+		nil,
2098 2099
 	)
2099 2100
 	if err == nil {
2100 2101
 		t.Fatal("unexpected non-error")
... ...
@@ -2139,6 +2141,7 @@ func TestExecInContainerNoSuchContainer(t *testing.T) {
2139 2139
 		nil,
2140 2140
 		nil,
2141 2141
 		false,
2142
+		nil,
2142 2143
 	)
2143 2144
 	if err == nil {
2144 2145
 		t.Fatal("unexpected non-error")
... ...
@@ -2199,6 +2202,7 @@ func TestExecInContainer(t *testing.T) {
2199 2199
 		stdout,
2200 2200
 		stderr,
2201 2201
 		tty,
2202
+		nil,
2202 2203
 	)
2203 2204
 	if err != nil {
2204 2205
 		t.Fatalf("unexpected error: %s", err)
... ...
@@ -60,7 +60,7 @@ func (hr *HandlerRunner) Run(containerID kubecontainer.ContainerID, pod *api.Pod
60 60
 			msg    string
61 61
 		)
62 62
 		output := ioutils.WriteCloserWrapper(&buffer)
63
-		err := hr.commandRunner.ExecInContainer(containerID, handler.Exec.Command, nil, output, output, false)
63
+		err := hr.commandRunner.ExecInContainer(containerID, handler.Exec.Command, nil, output, output, false, nil)
64 64
 		if err != nil {
65 65
 			msg := fmt.Sprintf("Exec lifecycle hook (%v) for Container %q in Pod %q failed - %q", handler.Exec.Command, container.Name, format.Pod(pod), buffer.String())
66 66
 			glog.V(1).Infof(msg)
... ...
@@ -28,6 +28,7 @@ import (
28 28
 	"k8s.io/kubernetes/pkg/api"
29 29
 	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
30 30
 	"k8s.io/kubernetes/pkg/util/intstr"
31
+	"k8s.io/kubernetes/pkg/util/term"
31 32
 )
32 33
 
33 34
 func TestResolvePortInt(t *testing.T) {
... ...
@@ -80,7 +81,7 @@ type fakeContainerCommandRunner struct {
80 80
 	ID  kubecontainer.ContainerID
81 81
 }
82 82
 
83
-func (f *fakeContainerCommandRunner) ExecInContainer(id kubecontainer.ContainerID, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error {
83
+func (f *fakeContainerCommandRunner) ExecInContainer(id kubecontainer.ContainerID, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error {
84 84
 	f.Cmd = cmd
85 85
 	f.ID = id
86 86
 	return nil
... ...
@@ -228,7 +228,7 @@ func (p *prober) newExecInContainer(container api.Container, containerID kubecon
228 228
 	return execInContainer{func() ([]byte, error) {
229 229
 		var buffer bytes.Buffer
230 230
 		output := ioutils.WriteCloserWrapper(&buffer)
231
-		err := p.runner.ExecInContainer(containerID, cmd, nil, output, output, false)
231
+		err := p.runner.ExecInContainer(containerID, cmd, nil, output, output, false, nil)
232 232
 		if err != nil {
233 233
 			return nil, err
234 234
 		}
... ...
@@ -60,6 +60,7 @@ import (
60 60
 	"k8s.io/kubernetes/pkg/util/flowcontrol"
61 61
 	"k8s.io/kubernetes/pkg/util/selinux"
62 62
 	utilstrings "k8s.io/kubernetes/pkg/util/strings"
63
+	"k8s.io/kubernetes/pkg/util/term"
63 64
 	utilwait "k8s.io/kubernetes/pkg/util/wait"
64 65
 )
65 66
 
... ...
@@ -2007,14 +2008,14 @@ func newRktExitError(e error) error {
2007 2007
 	return e
2008 2008
 }
2009 2009
 
2010
-func (r *Runtime) AttachContainer(containerID kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
2010
+func (r *Runtime) AttachContainer(containerID kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
2011 2011
 	return fmt.Errorf("unimplemented")
2012 2012
 }
2013 2013
 
2014 2014
 // Note: In rkt, the container ID is in the form of "UUID:appName", where UUID is
2015 2015
 // the rkt UUID, and appName is the container name.
2016 2016
 // TODO(yifan): If the rkt is using lkvm as the stage1 image, then this function will fail.
2017
-func (r *Runtime) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
2017
+func (r *Runtime) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
2018 2018
 	glog.V(4).Infof("Rkt execing in container.")
2019 2019
 
2020 2020
 	id, err := parseContainerID(containerID)
... ...
@@ -2035,6 +2036,10 @@ func (r *Runtime) ExecInContainer(containerID kubecontainer.ContainerID, cmd []s
2035 2035
 		// make sure to close the stdout stream
2036 2036
 		defer stdout.Close()
2037 2037
 
2038
+		kubecontainer.HandleResizing(resize, func(size term.Size) {
2039
+			term.SetSize(p.Fd(), size)
2040
+		})
2041
+
2038 2042
 		if stdin != nil {
2039 2043
 			go io.Copy(p, stdin)
2040 2044
 		}
... ...
@@ -25,13 +25,14 @@ import (
25 25
 
26 26
 	"k8s.io/kubernetes/pkg/types"
27 27
 	"k8s.io/kubernetes/pkg/util/runtime"
28
+	"k8s.io/kubernetes/pkg/util/term"
28 29
 )
29 30
 
30 31
 // Attacher knows how to attach to a running container in a pod.
31 32
 type Attacher interface {
32 33
 	// AttachContainer attaches to the running container in the pod, copying data between in/out/err
33 34
 	// and the container's stdin/stdout/stderr.
34
-	AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool) error
35
+	AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error
35 36
 }
36 37
 
37 38
 // ServeAttach handles requests to attach to a container. After creating/receiving the required
... ...
@@ -44,7 +45,7 @@ func ServeAttach(w http.ResponseWriter, req *http.Request, attacher Attacher, po
44 44
 	}
45 45
 	defer ctx.conn.Close()
46 46
 
47
-	err := attacher.AttachContainer(podName, uid, container, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty)
47
+	err := attacher.AttachContainer(podName, uid, container, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan)
48 48
 	if err != nil {
49 49
 		msg := fmt.Sprintf("error attaching to container: %v", err)
50 50
 		runtime.HandleError(errors.New(msg))
51 51
new file mode 100644
... ...
@@ -0,0 +1,41 @@
0
+/*
1
+Copyright 2016 The Kubernetes Authors All rights reserved.
2
+
3
+Licensed under the Apache License, Version 2.0 (the "License");
4
+you may not use this file except in compliance with the License.
5
+You may obtain a copy of the License at
6
+
7
+    http://www.apache.org/licenses/LICENSE-2.0
8
+
9
+Unless required by applicable law or agreed to in writing, software
10
+distributed under the License is distributed on an "AS IS" BASIS,
11
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
+See the License for the specific language governing permissions and
13
+limitations under the License.
14
+*/
15
+
16
+package remotecommand
17
+
18
+import "time"
19
+
20
+const (
21
+	DefaultStreamCreationTimeout = 30 * time.Second
22
+
23
+	// The SPDY subprotocol "channel.k8s.io" is used for remote command
24
+	// attachment/execution. This represents the initial unversioned subprotocol,
25
+	// which has the known bugs http://issues.k8s.io/13394 and
26
+	// http://issues.k8s.io/13395.
27
+	StreamProtocolV1Name = "channel.k8s.io"
28
+
29
+	// The SPDY subprotocol "v2.channel.k8s.io" is used for remote command
30
+	// attachment/execution. It is the second version of the subprotocol and
31
+	// resolves the issues present in the first version.
32
+	StreamProtocolV2Name = "v2.channel.k8s.io"
33
+
34
+	// The SPDY subprotocol "v3.channel.k8s.io" is used for remote command
35
+	// attachment/execution. It is the third version of the subprotocol and
36
+	// adds support for resizing container terminals.
37
+	StreamProtocolV3Name = "v3.channel.k8s.io"
38
+)
39
+
40
+var SupportedStreamingProtocols = []string{StreamProtocolV3Name, StreamProtocolV2Name, StreamProtocolV1Name}
0 41
deleted file mode 100644
... ...
@@ -1,36 +0,0 @@
1
-/*
2
-Copyright 2016 The Kubernetes Authors All rights reserved.
3
-
4
-Licensed under the Apache License, Version 2.0 (the "License");
5
-you may not use this file except in compliance with the License.
6
-You may obtain a copy of the License at
7
-
8
-    http://www.apache.org/licenses/LICENSE-2.0
9
-
10
-Unless required by applicable law or agreed to in writing, software
11
-distributed under the License is distributed on an "AS IS" BASIS,
12
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
-See the License for the specific language governing permissions and
14
-limitations under the License.
15
-*/
16
-
17
-package remotecommand
18
-
19
-import "time"
20
-
21
-const (
22
-	DefaultStreamCreationTimeout = 30 * time.Second
23
-
24
-	// The SPDY subprotocol "channel.k8s.io" is used for remote command
25
-	// attachment/execution. This represents the initial unversioned subprotocol,
26
-	// which has the known bugs http://issues.k8s.io/13394 and
27
-	// http://issues.k8s.io/13395.
28
-	StreamProtocolV1Name = "channel.k8s.io"
29
-
30
-	// The SPDY subprotocol "v2.channel.k8s.io" is used for remote command
31
-	// attachment/execution. It is the second version of the subprotocol and
32
-	// resolves the issues present in the first version.
33
-	StreamProtocolV2Name = "v2.channel.k8s.io"
34
-)
35
-
36
-var SupportedStreamingProtocols = []string{StreamProtocolV2Name, StreamProtocolV1Name}
... ...
@@ -26,13 +26,14 @@ import (
26 26
 	"k8s.io/kubernetes/pkg/api"
27 27
 	"k8s.io/kubernetes/pkg/types"
28 28
 	"k8s.io/kubernetes/pkg/util/runtime"
29
+	"k8s.io/kubernetes/pkg/util/term"
29 30
 )
30 31
 
31 32
 // Executor knows how to execute a command in a container in a pod.
32 33
 type Executor interface {
33 34
 	// ExecInContainer executes a command in a container in the pod, copying data
34 35
 	// between in/out/err and the container's stdin/stdout/stderr.
35
-	ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error
36
+	ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error
36 37
 }
37 38
 
38 39
 // ServeExec handles requests to execute a command in a container. After
... ...
@@ -48,7 +49,7 @@ func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podN
48 48
 
49 49
 	cmd := req.URL.Query()[api.ExecCommandParamm]
50 50
 
51
-	err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty)
51
+	err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan)
52 52
 	if err != nil {
53 53
 		msg := fmt.Sprintf("error executing command in container: %v", err)
54 54
 		runtime.HandleError(errors.New(msg))
... ...
@@ -17,6 +17,7 @@ limitations under the License.
17 17
 package remotecommand
18 18
 
19 19
 import (
20
+	"encoding/json"
20 21
 	"errors"
21 22
 	"fmt"
22 23
 	"io"
... ...
@@ -27,6 +28,7 @@ import (
27 27
 	"k8s.io/kubernetes/pkg/util/httpstream"
28 28
 	"k8s.io/kubernetes/pkg/util/httpstream/spdy"
29 29
 	"k8s.io/kubernetes/pkg/util/runtime"
30
+	"k8s.io/kubernetes/pkg/util/term"
30 31
 	"k8s.io/kubernetes/pkg/util/wsstream"
31 32
 
32 33
 	"github.com/golang/glog"
... ...
@@ -87,6 +89,8 @@ type context struct {
87 87
 	stdoutStream io.WriteCloser
88 88
 	stderrStream io.WriteCloser
89 89
 	errorStream  io.WriteCloser
90
+	resizeStream io.ReadCloser
91
+	resizeChan   chan term.Size
90 92
 	tty          bool
91 93
 }
92 94
 
... ...
@@ -118,10 +122,26 @@ func createStreams(req *http.Request, w http.ResponseWriter, supportedStreamProt
118 118
 		return nil, false
119 119
 	}
120 120
 
121
+	var ctx *context
122
+	var ok bool
121 123
 	if wsstream.IsWebSocketRequest(req) {
122
-		return createWebSocketStreams(req, w, opts, idleTimeout)
124
+		ctx, ok = createWebSocketStreams(req, w, opts, idleTimeout)
125
+	} else {
126
+		ctx, ok = createHttpStreamStreams(req, w, opts, supportedStreamProtocols, idleTimeout, streamCreationTimeout)
127
+	}
128
+	if !ok {
129
+		return nil, false
130
+	}
131
+
132
+	if ctx.resizeStream != nil {
133
+		ctx.resizeChan = make(chan term.Size)
134
+		go handleResizeEvents(ctx.resizeStream, ctx.resizeChan)
123 135
 	}
124 136
 
137
+	return ctx, true
138
+}
139
+
140
+func createHttpStreamStreams(req *http.Request, w http.ResponseWriter, opts *options, supportedStreamProtocols []string, idleTimeout, streamCreationTimeout time.Duration) (*context, bool) {
125 141
 	protocol, err := httpstream.Handshake(req, w, supportedStreamProtocols)
126 142
 	if err != nil {
127 143
 		w.WriteHeader(http.StatusBadRequest)
... ...
@@ -148,6 +168,8 @@ func createStreams(req *http.Request, w http.ResponseWriter, supportedStreamProt
148 148
 
149 149
 	var handler protocolHandler
150 150
 	switch protocol {
151
+	case StreamProtocolV3Name:
152
+		handler = &v3ProtocolHandler{}
151 153
 	case StreamProtocolV2Name:
152 154
 		handler = &v2ProtocolHandler{}
153 155
 	case "":
... ...
@@ -157,6 +179,10 @@ func createStreams(req *http.Request, w http.ResponseWriter, supportedStreamProt
157 157
 		handler = &v1ProtocolHandler{}
158 158
 	}
159 159
 
160
+	if opts.tty && handler.supportsTerminalResizing() {
161
+		opts.expectedStreams++
162
+	}
163
+
160 164
 	expired := time.NewTimer(streamCreationTimeout)
161 165
 
162 166
 	ctx, err := handler.waitForStreams(streamCh, opts.expectedStreams, expired.C)
... ...
@@ -167,6 +193,7 @@ func createStreams(req *http.Request, w http.ResponseWriter, supportedStreamProt
167 167
 
168 168
 	ctx.conn = conn
169 169
 	ctx.tty = opts.tty
170
+
170 171
 	return ctx, true
171 172
 }
172 173
 
... ...
@@ -174,8 +201,61 @@ type protocolHandler interface {
174 174
 	// waitForStreams waits for the expected streams or a timeout, returning a
175 175
 	// remoteCommandContext if all the streams were received, or an error if not.
176 176
 	waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error)
177
+	// supportsTerminalResizing returns true if the protocol handler supports terminal resizing
178
+	supportsTerminalResizing() bool
179
+}
180
+
181
+// v3ProtocolHandler implements the V3 protocol version for streaming command execution.
182
+type v3ProtocolHandler struct{}
183
+
184
+func (*v3ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) {
185
+	ctx := &context{}
186
+	receivedStreams := 0
187
+	replyChan := make(chan struct{})
188
+	stop := make(chan struct{})
189
+	defer close(stop)
190
+WaitForStreams:
191
+	for {
192
+		select {
193
+		case stream := <-streams:
194
+			streamType := stream.Headers().Get(api.StreamType)
195
+			switch streamType {
196
+			case api.StreamTypeError:
197
+				ctx.errorStream = stream
198
+				go waitStreamReply(stream.replySent, replyChan, stop)
199
+			case api.StreamTypeStdin:
200
+				ctx.stdinStream = stream
201
+				go waitStreamReply(stream.replySent, replyChan, stop)
202
+			case api.StreamTypeStdout:
203
+				ctx.stdoutStream = stream
204
+				go waitStreamReply(stream.replySent, replyChan, stop)
205
+			case api.StreamTypeStderr:
206
+				ctx.stderrStream = stream
207
+				go waitStreamReply(stream.replySent, replyChan, stop)
208
+			case api.StreamTypeResize:
209
+				ctx.resizeStream = stream
210
+				go waitStreamReply(stream.replySent, replyChan, stop)
211
+			default:
212
+				runtime.HandleError(fmt.Errorf("Unexpected stream type: %q", streamType))
213
+			}
214
+		case <-replyChan:
215
+			receivedStreams++
216
+			if receivedStreams == expectedStreams {
217
+				break WaitForStreams
218
+			}
219
+		case <-expired:
220
+			// TODO find a way to return the error to the user. Maybe use a separate
221
+			// stream to report errors?
222
+			return nil, errors.New("timed out waiting for client to create streams")
223
+		}
224
+	}
225
+
226
+	return ctx, nil
177 227
 }
178 228
 
229
+// supportsTerminalResizing returns true because v3ProtocolHandler supports it
230
+func (*v3ProtocolHandler) supportsTerminalResizing() bool { return true }
231
+
179 232
 // v2ProtocolHandler implements the V2 protocol version for streaming command execution.
180 233
 type v2ProtocolHandler struct{}
181 234
 
... ...
@@ -221,6 +301,9 @@ WaitForStreams:
221 221
 	return ctx, nil
222 222
 }
223 223
 
224
+// supportsTerminalResizing returns false because v2ProtocolHandler doesn't support it.
225
+func (*v2ProtocolHandler) supportsTerminalResizing() bool { return false }
226
+
224 227
 // v1ProtocolHandler implements the V1 protocol version for streaming command execution.
225 228
 type v1ProtocolHandler struct{}
226 229
 
... ...
@@ -275,3 +358,19 @@ WaitForStreams:
275 275
 
276 276
 	return ctx, nil
277 277
 }
278
+
279
+// supportsTerminalResizing returns false because v1ProtocolHandler doesn't support it.
280
+func (*v1ProtocolHandler) supportsTerminalResizing() bool { return false }
281
+
282
+func handleResizeEvents(stream io.Reader, channel chan<- term.Size) {
283
+	defer runtime.HandleCrash()
284
+
285
+	decoder := json.NewDecoder(stream)
286
+	for {
287
+		size := term.Size{}
288
+		if err := decoder.Decode(&size); err != nil {
289
+			break
290
+		}
291
+		channel <- size
292
+	}
293
+}
... ...
@@ -17,61 +17,82 @@ limitations under the License.
17 17
 package remotecommand
18 18
 
19 19
 import (
20
+	"fmt"
20 21
 	"net/http"
21 22
 	"time"
22 23
 
23 24
 	"k8s.io/kubernetes/pkg/httplog"
25
+	"k8s.io/kubernetes/pkg/util/runtime"
24 26
 	"k8s.io/kubernetes/pkg/util/wsstream"
27
+)
25 28
 
26
-	"github.com/golang/glog"
29
+const (
30
+	stdinChannel = iota
31
+	stdoutChannel
32
+	stderrChannel
33
+	errorChannel
34
+	resizeChannel
27 35
 )
28 36
 
29
-// standardShellChannels returns the standard channel types for a shell connection (STDIN 0, STDOUT 1, STDERR 2)
30
-// along with the approximate duplex value. Supported subprotocols are "channel.k8s.io" and
31
-// "base64.channel.k8s.io".
32
-func standardShellChannels(stdin, stdout, stderr bool) []wsstream.ChannelType {
33
-	// open three half-duplex channels
34
-	channels := []wsstream.ChannelType{wsstream.ReadChannel, wsstream.WriteChannel, wsstream.WriteChannel}
35
-	if !stdin {
36
-		channels[0] = wsstream.IgnoreChannel
37
-	}
38
-	if !stdout {
39
-		channels[1] = wsstream.IgnoreChannel
37
+// createChannels returns the standard channel types for a shell connection (STDIN 0, STDOUT 1, STDERR 2)
38
+// along with the approximate duplex value. It also creates the error (3) and resize (4) channels.
39
+func createChannels(opts *options) []wsstream.ChannelType {
40
+	// open the requested channels, and always open the error channel
41
+	channels := make([]wsstream.ChannelType, 5)
42
+	channels[stdinChannel] = readChannel(opts.stdin)
43
+	channels[stdoutChannel] = writeChannel(opts.stdout)
44
+	channels[stderrChannel] = writeChannel(opts.stderr)
45
+	channels[errorChannel] = wsstream.WriteChannel
46
+	channels[resizeChannel] = wsstream.ReadChannel
47
+	return channels
48
+}
49
+
50
+// readChannel returns wsstream.ReadChannel if real is true, or wsstream.IgnoreChannel.
51
+func readChannel(real bool) wsstream.ChannelType {
52
+	if real {
53
+		return wsstream.ReadChannel
40 54
 	}
41
-	if !stderr {
42
-		channels[2] = wsstream.IgnoreChannel
55
+	return wsstream.IgnoreChannel
56
+}
57
+
58
+// writeChannel returns wsstream.WriteChannel if real is true, or wsstream.IgnoreChannel.
59
+func writeChannel(real bool) wsstream.ChannelType {
60
+	if real {
61
+		return wsstream.WriteChannel
43 62
 	}
44
-	return channels
63
+	return wsstream.IgnoreChannel
45 64
 }
46 65
 
47
-// createWebSocketStreams returns a remoteCommandContext containing the websocket connection and
66
+// createWebSocketStreams returns a context containing the websocket connection and
48 67
 // streams needed to perform an exec or an attach.
49 68
 func createWebSocketStreams(req *http.Request, w http.ResponseWriter, opts *options, idleTimeout time.Duration) (*context, bool) {
50
-	// open the requested channels, and always open the error channel
51
-	channels := append(standardShellChannels(opts.stdin, opts.stdout, opts.stderr), wsstream.WriteChannel)
69
+	channels := createChannels(opts)
52 70
 	conn := wsstream.NewConn(channels...)
53 71
 	conn.SetIdleTimeout(idleTimeout)
54 72
 	streams, err := conn.Open(httplog.Unlogged(w), req)
55 73
 	if err != nil {
56
-		glog.Errorf("Unable to upgrade websocket connection: %v", err)
74
+		runtime.HandleError(fmt.Errorf("Unable to upgrade websocket connection: %v", err))
57 75
 		return nil, false
58 76
 	}
77
+
59 78
 	// Send an empty message to the lowest writable channel to notify the client the connection is established
60 79
 	// TODO: make generic to SPDY and WebSockets and do it outside of this method?
61 80
 	switch {
62 81
 	case opts.stdout:
63
-		streams[1].Write([]byte{})
82
+		streams[stdoutChannel].Write([]byte{})
64 83
 	case opts.stderr:
65
-		streams[2].Write([]byte{})
84
+		streams[stderrChannel].Write([]byte{})
66 85
 	default:
67
-		streams[3].Write([]byte{})
86
+		streams[errorChannel].Write([]byte{})
68 87
 	}
88
+
69 89
 	return &context{
70 90
 		conn:         conn,
71
-		stdinStream:  streams[0],
72
-		stdoutStream: streams[1],
73
-		stderrStream: streams[2],
74
-		errorStream:  streams[3],
91
+		stdinStream:  streams[stdinChannel],
92
+		stdoutStream: streams[stdoutChannel],
93
+		stderrStream: streams[stderrChannel],
94
+		errorStream:  streams[errorChannel],
75 95
 		tty:          opts.tty,
96
+		resizeStream: streams[resizeChannel],
76 97
 	}, true
77 98
 }
... ...
@@ -58,6 +58,7 @@ import (
58 58
 	"k8s.io/kubernetes/pkg/util/httpstream/spdy"
59 59
 	"k8s.io/kubernetes/pkg/util/limitwriter"
60 60
 	utilruntime "k8s.io/kubernetes/pkg/util/runtime"
61
+	"k8s.io/kubernetes/pkg/util/term"
61 62
 	"k8s.io/kubernetes/pkg/volume"
62 63
 )
63 64
 
... ...
@@ -164,8 +165,8 @@ type HostInterface interface {
164 164
 	GetRunningPods() ([]*api.Pod, error)
165 165
 	GetPodByName(namespace, name string) (*api.Pod, bool)
166 166
 	RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error)
167
-	ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error
168
-	AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool) error
167
+	ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error
168
+	AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error
169 169
 	GetKubeletContainerLogs(podFullName, containerName string, logOptions *api.PodLogOptions, stdout, stderr io.Writer) error
170 170
 	ServeLogs(w http.ResponseWriter, req *http.Request)
171 171
 	PortForward(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error
... ...
@@ -48,6 +48,7 @@ import (
48 48
 	"k8s.io/kubernetes/pkg/util/httpstream"
49 49
 	"k8s.io/kubernetes/pkg/util/httpstream/spdy"
50 50
 	"k8s.io/kubernetes/pkg/util/sets"
51
+	"k8s.io/kubernetes/pkg/util/term"
51 52
 	"k8s.io/kubernetes/pkg/volume"
52 53
 )
53 54
 
... ...
@@ -119,11 +120,11 @@ func (fk *fakeKubelet) RunInContainer(podFullName string, uid types.UID, contain
119 119
 	return fk.runFunc(podFullName, uid, containerName, cmd)
120 120
 }
121 121
 
122
-func (fk *fakeKubelet) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error {
122
+func (fk *fakeKubelet) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error {
123 123
 	return fk.execFunc(name, uid, container, cmd, in, out, err, tty)
124 124
 }
125 125
 
126
-func (fk *fakeKubelet) AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool) error {
126
+func (fk *fakeKubelet) AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error {
127 127
 	return fk.attachFunc(name, uid, container, in, out, err, tty)
128 128
 }
129 129
 
130 130
new file mode 100644
... ...
@@ -0,0 +1,149 @@
0
+/*
1
+Copyright 2016 The Kubernetes Authors.
2
+
3
+Licensed under the Apache License, Version 2.0 (the "License");
4
+you may not use this file except in compliance with the License.
5
+You may obtain a copy of the License at
6
+
7
+    http://www.apache.org/licenses/LICENSE-2.0
8
+
9
+Unless required by applicable law or agreed to in writing, software
10
+distributed under the License is distributed on an "AS IS" BASIS,
11
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
+See the License for the specific language governing permissions and
13
+limitations under the License.
14
+*/
15
+
16
+package term
17
+
18
+import (
19
+	"fmt"
20
+
21
+	"github.com/docker/docker/pkg/term"
22
+	"k8s.io/kubernetes/pkg/util/runtime"
23
+)
24
+
25
+// Size represents the width and height of a terminal.
26
+type Size struct {
27
+	Width  uint16
28
+	Height uint16
29
+}
30
+
31
+// GetSize returns the current size of the user's terminal. If it isn't a terminal,
32
+// nil is returned.
33
+func (t TTY) GetSize() *Size {
34
+	outFd, isTerminal := term.GetFdInfo(t.Out)
35
+	if !isTerminal {
36
+		return nil
37
+	}
38
+	return GetSize(outFd)
39
+}
40
+
41
+// GetSize returns the current size of the terminal associated with fd.
42
+func GetSize(fd uintptr) *Size {
43
+	winsize, err := term.GetWinsize(fd)
44
+	if err != nil {
45
+		runtime.HandleError(fmt.Errorf("unable to get terminal size: %v", err))
46
+		return nil
47
+	}
48
+
49
+	return &Size{Width: winsize.Width, Height: winsize.Height}
50
+}
51
+
52
+// SetSize sets the terminal size associated with fd.
53
+func SetSize(fd uintptr, size Size) error {
54
+	return term.SetWinsize(fd, &term.Winsize{Height: size.Height, Width: size.Width})
55
+}
56
+
57
+// MonitorSize monitors the terminal's size. It returns a TerminalSizeQueue primed with
58
+// initialSizes, or nil if there's no TTY present.
59
+func (t *TTY) MonitorSize(initialSizes ...*Size) TerminalSizeQueue {
60
+	outFd, isTerminal := term.GetFdInfo(t.Out)
61
+	if !isTerminal {
62
+		return nil
63
+	}
64
+
65
+	t.sizeQueue = &sizeQueue{
66
+		t: *t,
67
+		// make it buffered so we can send the initial terminal sizes without blocking, prior to starting
68
+		// the streaming below
69
+		resizeChan:   make(chan Size, len(initialSizes)),
70
+		stopResizing: make(chan struct{}),
71
+	}
72
+
73
+	t.sizeQueue.monitorSize(outFd, initialSizes...)
74
+
75
+	return t.sizeQueue
76
+}
77
+
78
+// TerminalSizeQueue is capable of returning terminal resize events as they occur.
79
+type TerminalSizeQueue interface {
80
+	// Next returns the new terminal size after the terminal has been resized. It returns nil when
81
+	// monitoring has been stopped.
82
+	Next() *Size
83
+}
84
+
85
+// sizeQueue implements TerminalSizeQueue
86
+type sizeQueue struct {
87
+	t TTY
88
+	// resizeChan receives a Size each time the user's terminal is resized.
89
+	resizeChan   chan Size
90
+	stopResizing chan struct{}
91
+}
92
+
93
+// make sure sizeQueue implements the TerminalSizeQueue interface
94
+var _ TerminalSizeQueue = &sizeQueue{}
95
+
96
+// monitorSize primes resizeChan with initialSizes and then monitors for resize events. With each
97
+// new event, it sends the current terminal size to resizeChan.
98
+func (s *sizeQueue) monitorSize(outFd uintptr, initialSizes ...*Size) {
99
+	// send the initial sizes
100
+	for i := range initialSizes {
101
+		if initialSizes[i] != nil {
102
+			s.resizeChan <- *initialSizes[i]
103
+		}
104
+	}
105
+
106
+	resizeEvents := make(chan Size, 1)
107
+
108
+	monitorResizeEvents(outFd, resizeEvents, s.stopResizing)
109
+
110
+	// listen for resize events in the background
111
+	go func() {
112
+		defer runtime.HandleCrash()
113
+
114
+		for {
115
+			select {
116
+			case size, ok := <-resizeEvents:
117
+				if !ok {
118
+					return
119
+				}
120
+
121
+				select {
122
+				// try to send the size to resizeChan, but don't block
123
+				case s.resizeChan <- size:
124
+					// send successful
125
+				default:
126
+					// unable to send / no-op
127
+				}
128
+			case <-s.stopResizing:
129
+				return
130
+			}
131
+		}
132
+	}()
133
+}
134
+
135
+// Next returns the new terminal size after the terminal has been resized. It returns nil when
136
+// monitoring has been stopped.
137
+func (s *sizeQueue) Next() *Size {
138
+	size, ok := <-s.resizeChan
139
+	if !ok {
140
+		return nil
141
+	}
142
+	return &size
143
+}
144
+
145
+// stop stops the background goroutine that is monitoring for terminal resizes.
146
+func (s *sizeQueue) stop() {
147
+	close(s.stopResizing)
148
+}
0 149
new file mode 100644
... ...
@@ -0,0 +1,60 @@
0
+// +build !windows
1
+
2
+/*
3
+Copyright 2016 The Kubernetes Authors.
4
+
5
+Licensed under the Apache License, Version 2.0 (the "License");
6
+you may not use this file except in compliance with the License.
7
+You may obtain a copy of the License at
8
+
9
+    http://www.apache.org/licenses/LICENSE-2.0
10
+
11
+Unless required by applicable law or agreed to in writing, software
12
+distributed under the License is distributed on an "AS IS" BASIS,
13
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
+See the License for the specific language governing permissions and
15
+limitations under the License.
16
+*/
17
+
18
+package term
19
+
20
+import (
21
+	"os"
22
+	"os/signal"
23
+	"syscall"
24
+
25
+	"k8s.io/kubernetes/pkg/util/runtime"
26
+)
27
+
28
+// monitorResizeEvents spawns a goroutine that waits for SIGWINCH signals (these indicate the
29
+// terminal has resized). After receiving a SIGWINCH, this gets the terminal size and tries to send
30
+// it to the resizeEvents channel. The goroutine stops when the stop channel is closed.
31
+func monitorResizeEvents(fd uintptr, resizeEvents chan<- Size, stop chan struct{}) {
32
+	go func() {
33
+		defer runtime.HandleCrash()
34
+
35
+		winch := make(chan os.Signal, 1)
36
+		signal.Notify(winch, syscall.SIGWINCH)
37
+		defer signal.Stop(winch)
38
+
39
+		for {
40
+			select {
41
+			case <-winch:
42
+				size := GetSize(fd)
43
+				if size == nil {
44
+					return
45
+				}
46
+
47
+				// try to send size
48
+				select {
49
+				case resizeEvents <- *size:
50
+					// success
51
+				default:
52
+					// not sent
53
+				}
54
+			case <-stop:
55
+				return
56
+			}
57
+		}
58
+	}()
59
+}
0 60
new file mode 100644
... ...
@@ -0,0 +1,61 @@
0
+/*
1
+Copyright 2016 The Kubernetes Authors.
2
+
3
+Licensed under the Apache License, Version 2.0 (the "License");
4
+you may not use this file except in compliance with the License.
5
+You may obtain a copy of the License at
6
+
7
+    http://www.apache.org/licenses/LICENSE-2.0
8
+
9
+Unless required by applicable law or agreed to in writing, software
10
+distributed under the License is distributed on an "AS IS" BASIS,
11
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
+See the License for the specific language governing permissions and
13
+limitations under the License.
14
+*/
15
+
16
+package term
17
+
18
+import (
19
+	"time"
20
+
21
+	"k8s.io/kubernetes/pkg/util/runtime"
22
+)
23
+
24
+// monitorResizeEvents spawns a goroutine that periodically gets the terminal size and tries to send
25
+// it to the resizeEvents channel if the size has changed. The goroutine stops when the stop channel
26
+// is closed.
27
+func monitorResizeEvents(fd uintptr, resizeEvents chan<- Size, stop chan struct{}) {
28
+	go func() {
29
+		defer runtime.HandleCrash()
30
+
31
+		size := GetSize(fd)
32
+		if size == nil {
33
+			return
34
+		}
35
+		lastSize := *size
36
+
37
+		for {
38
+			// see if we need to stop running
39
+			select {
40
+			case <-stop:
41
+				return
42
+			default:
43
+			}
44
+
45
+			size := GetSize(fd)
46
+			if size == nil {
47
+				return
48
+			}
49
+
50
+			if size.Height != lastSize.Height || size.Width != lastSize.Width {
51
+				lastSize.Height = size.Height
52
+				lastSize.Width = size.Width
53
+				resizeEvents <- *size
54
+			}
55
+
56
+			// sleep to avoid hot looping
57
+			time.Sleep(250 * time.Millisecond)
58
+		}
59
+	}()
60
+}
... ...
@@ -21,17 +21,22 @@ import (
21 21
 	"os"
22 22
 
23 23
 	"github.com/docker/docker/pkg/term"
24
+
24 25
 	"k8s.io/kubernetes/pkg/util/interrupt"
25 26
 )
26 27
 
27 28
 // SafeFunc is a function to be invoked by TTY.
28 29
 type SafeFunc func() error
29 30
 
30
-// TTY helps invoke a function and preserve the state of the terminal, even if the
31
-// process is terminated during execution.
31
+// TTY helps invoke a function and preserve the state of the terminal, even if the process is
32
+// terminated during execution. It also provides support for terminal resizing for remote command
33
+// execution/attachment.
32 34
 type TTY struct {
33
-	// In is a reader to check for a terminal.
35
+	// In is a reader representing stdin. It is a required field.
34 36
 	In io.Reader
37
+	// Out is a writer representing stdout. It must be set to support terminal resizing. It is an
38
+	// optional field.
39
+	Out io.Writer
35 40
 	// Raw is true if the terminal should be set raw.
36 41
 	Raw bool
37 42
 	// TryDev indicates the TTY should try to open /dev/tty if the provided input
... ...
@@ -41,19 +46,30 @@ type TTY struct {
41 41
 	// it will be invoked after the terminal state is restored. If it is not provided,
42 42
 	// a signal received during the TTY will result in os.Exit(0) being invoked.
43 43
 	Parent *interrupt.Handler
44
-}
45 44
 
46
-// fd returns a file descriptor for a given object.
47
-type fd interface {
48
-	Fd() uintptr
45
+	// sizeQueue is set after a call to MonitorSize() and is used to monitor SIGWINCH signals when the
46
+	// user's terminal resizes.
47
+	sizeQueue *sizeQueue
49 48
 }
50 49
 
51
-// IsTerminal returns true if the provided input is a terminal. Does not check /dev/tty
50
+// IsTerminalIn returns true if t.In is a terminal. Does not check /dev/tty
52 51
 // even if TryDev is set.
53
-func (t TTY) IsTerminal() bool {
52
+func (t TTY) IsTerminalIn() bool {
54 53
 	return IsTerminal(t.In)
55 54
 }
56 55
 
56
+// IsTerminalOut returns true if t.Out is a terminal. Does not check /dev/tty
57
+// even if TryDev is set.
58
+func (t TTY) IsTerminalOut() bool {
59
+	return IsTerminal(t.Out)
60
+}
61
+
62
+// IsTerminal returns whether the passed object is a terminal or not
63
+func IsTerminal(i interface{}) bool {
64
+	_, terminal := term.GetFdInfo(i)
65
+	return terminal
66
+}
67
+
57 68
 // Safe invokes the provided function and will attempt to ensure that when the
58 69
 // function returns (or a termination signal is sent) that the terminal state
59 70
 // is reset to the condition it was in prior to the function being invoked. If
... ...
@@ -61,22 +77,16 @@ func (t TTY) IsTerminal() bool {
61 61
 // If the input file descriptor is not a TTY and TryDev is true, the /dev/tty file
62 62
 // will be opened (if available).
63 63
 func (t TTY) Safe(fn SafeFunc) error {
64
-	in := t.In
64
+	inFd, isTerminal := term.GetFdInfo(t.In)
65 65
 
66
-	var hasFd bool
67
-	var inFd uintptr
68
-	if desc, ok := in.(fd); ok && in != nil {
69
-		inFd = desc.Fd()
70
-		hasFd = true
71
-	}
72
-	if t.TryDev && (!hasFd || !term.IsTerminal(inFd)) {
66
+	if !isTerminal && t.TryDev {
73 67
 		if f, err := os.Open("/dev/tty"); err == nil {
74 68
 			defer f.Close()
75 69
 			inFd = f.Fd()
76
-			hasFd = true
70
+			isTerminal = term.IsTerminal(inFd)
77 71
 		}
78 72
 	}
79
-	if !hasFd || !term.IsTerminal(inFd) {
73
+	if !isTerminal {
80 74
 		return fn()
81 75
 	}
82 76
 
... ...
@@ -90,11 +100,11 @@ func (t TTY) Safe(fn SafeFunc) error {
90 90
 	if err != nil {
91 91
 		return err
92 92
 	}
93
-	return interrupt.Chain(t.Parent, func() { term.RestoreTerminal(inFd, state) }).Run(fn)
94
-}
93
+	return interrupt.Chain(t.Parent, func() {
94
+		if t.sizeQueue != nil {
95
+			t.sizeQueue.stop()
96
+		}
95 97
 
96
-// IsTerminal returns whether the passed io.Reader is a terminal or not
97
-func IsTerminal(r io.Reader) bool {
98
-	file, ok := r.(fd)
99
-	return ok && term.IsTerminal(file.Fd())
98
+		term.RestoreTerminal(inFd, state)
99
+	}).Run(fn)
100 100
 }