Browse code

Implement docker attach with standalone client lib.

Signed-off-by: David Calavera <david.calavera@gmail.com>

David Calavera authored on 2015/12/06 12:22:00
Showing 8 changed files
... ...
@@ -1,10 +1,8 @@
1 1
 package client
2 2
 
3 3
 import (
4
-	"encoding/json"
5 4
 	"fmt"
6 5
 	"io"
7
-	"net/url"
8 6
 
9 7
 	"github.com/Sirupsen/logrus"
10 8
 	"github.com/docker/docker/api/types"
... ...
@@ -25,18 +23,11 @@ func (cli *DockerCli) CmdAttach(args ...string) error {
25 25
 
26 26
 	cmd.ParseFlags(args, true)
27 27
 
28
-	serverResp, err := cli.call("GET", "/containers/"+cmd.Arg(0)+"/json", nil, nil)
28
+	c, err := cli.client.ContainerInspect(cmd.Arg(0))
29 29
 	if err != nil {
30 30
 		return err
31 31
 	}
32 32
 
33
-	defer serverResp.body.Close()
34
-
35
-	var c types.ContainerJSON
36
-	if err := json.NewDecoder(serverResp.body).Decode(&c); err != nil {
37
-		return err
38
-	}
39
-
40 33
 	if !c.State.Running {
41 34
 		return fmt.Errorf("You cannot attach to a stopped container, start it first")
42 35
 	}
... ...
@@ -55,28 +46,35 @@ func (cli *DockerCli) CmdAttach(args ...string) error {
55 55
 		}
56 56
 	}
57 57
 
58
-	var in io.ReadCloser
58
+	options := types.ContainerAttachOptions{
59
+		ContainerID: cmd.Arg(0),
60
+		Stream:      true,
61
+		Stdin:       !*noStdin && c.Config.OpenStdin,
62
+		Stdout:      true,
63
+		Stderr:      true,
64
+	}
59 65
 
60
-	v := url.Values{}
61
-	v.Set("stream", "1")
62
-	if !*noStdin && c.Config.OpenStdin {
63
-		v.Set("stdin", "1")
66
+	var in io.ReadCloser
67
+	if options.Stdin {
64 68
 		in = cli.in
65 69
 	}
66 70
 
67
-	v.Set("stdout", "1")
68
-	v.Set("stderr", "1")
69
-
70 71
 	if *proxy && !c.Config.Tty {
71
-		sigc := cli.forwardAllSignals(cmd.Arg(0))
72
+		sigc := cli.forwardAllSignals(options.ContainerID)
72 73
 		defer signal.StopCatch(sigc)
73 74
 	}
74 75
 
75
-	if err := cli.hijack("POST", "/containers/"+cmd.Arg(0)+"/attach?"+v.Encode(), c.Config.Tty, in, cli.out, cli.err, nil, nil); err != nil {
76
+	resp, err := cli.client.ContainerAttach(options)
77
+	if err != nil {
78
+		return err
79
+	}
80
+	defer resp.Close()
81
+
82
+	if err := cli.holdHijackedConnection(c.Config.Tty, in, cli.out, cli.err, resp); err != nil {
76 83
 		return err
77 84
 	}
78 85
 
79
-	_, status, err := getExitCode(cli, cmd.Arg(0))
86
+	_, status, err := getExitCode(cli, options.ContainerID)
80 87
 	if err != nil {
81 88
 		return err
82 89
 	}
... ...
@@ -15,6 +15,7 @@ import (
15 15
 
16 16
 // apiClient is an interface that clients that talk with a docker server must implement.
17 17
 type apiClient interface {
18
+	ContainerAttach(options types.ContainerAttachOptions) (*types.HijackedResponse, error)
18 19
 	ContainerCommit(options types.ContainerCommitOptions) (types.ContainerCommitResponse, error)
19 20
 	ContainerCreate(config *runconfig.ContainerConfigWrapper, containerName string) (types.ContainerCreateResponse, error)
20 21
 	ContainerDiff(containerID string) ([]types.ContainerChange, error)
... ...
@@ -15,11 +15,79 @@ import (
15 15
 
16 16
 	"github.com/Sirupsen/logrus"
17 17
 	"github.com/docker/docker/api"
18
+	"github.com/docker/docker/api/types"
18 19
 	"github.com/docker/docker/dockerversion"
19 20
 	"github.com/docker/docker/pkg/stdcopy"
20 21
 	"github.com/docker/docker/pkg/term"
21 22
 )
22 23
 
24
+func (cli *DockerCli) holdHijackedConnection(setRawTerminal bool, inputStream io.ReadCloser, outputStream, errorStream io.Writer, resp *types.HijackedResponse) error {
25
+	var (
26
+		err      error
27
+		oldState *term.State
28
+	)
29
+	if inputStream != nil && setRawTerminal && cli.isTerminalIn && os.Getenv("NORAW") == "" {
30
+		oldState, err = term.SetRawTerminal(cli.inFd)
31
+		if err != nil {
32
+			return err
33
+		}
34
+		defer term.RestoreTerminal(cli.inFd, oldState)
35
+	}
36
+
37
+	receiveStdout := make(chan error, 1)
38
+	if outputStream != nil || errorStream != nil {
39
+		go func() {
40
+			defer func() {
41
+				if inputStream != nil {
42
+					if setRawTerminal && cli.isTerminalIn {
43
+						term.RestoreTerminal(cli.inFd, oldState)
44
+					}
45
+					inputStream.Close()
46
+				}
47
+			}()
48
+
49
+			// When TTY is ON, use regular copy
50
+			if setRawTerminal && outputStream != nil {
51
+				_, err = io.Copy(outputStream, resp.Reader)
52
+			} else {
53
+				_, err = stdcopy.StdCopy(outputStream, errorStream, resp.Reader)
54
+			}
55
+			logrus.Debugf("[hijack] End of stdout")
56
+			receiveStdout <- err
57
+		}()
58
+	}
59
+
60
+	stdinDone := make(chan struct{})
61
+	go func() {
62
+		if inputStream != nil {
63
+			io.Copy(resp.Conn, inputStream)
64
+			logrus.Debugf("[hijack] End of stdin")
65
+		}
66
+
67
+		if err := resp.CloseWrite(); err != nil {
68
+			logrus.Debugf("Couldn't send EOF: %s", err)
69
+		}
70
+		close(stdinDone)
71
+	}()
72
+
73
+	select {
74
+	case err := <-receiveStdout:
75
+		if err != nil {
76
+			logrus.Debugf("Error receiveStdout: %s", err)
77
+			return err
78
+		}
79
+	case <-stdinDone:
80
+		if outputStream != nil || errorStream != nil {
81
+			if err := <-receiveStdout; err != nil {
82
+				logrus.Debugf("Error receiveStdout: %s", err)
83
+				return err
84
+			}
85
+		}
86
+	}
87
+
88
+	return nil
89
+}
90
+
23 91
 type tlsClientCon struct {
24 92
 	*tls.Conn
25 93
 	rawConn net.Conn
... ...
@@ -190,7 +258,6 @@ func (cli *DockerCli) hijackWithContentType(method, path, contentType string, se
190 190
 	}
191 191
 
192 192
 	var oldState *term.State
193
-
194 193
 	if in != nil && setRawTerminal && cli.isTerminalIn && os.Getenv("NORAW") == "" {
195 194
 		oldState, err = term.SetRawTerminal(cli.inFd)
196 195
 		if err != nil {
... ...
@@ -24,6 +24,8 @@ type Client struct {
24 24
 	BasePath string
25 25
 	// scheme holds the scheme of the client i.e. https.
26 26
 	Scheme string
27
+	// tlsConfig holds the tls configuration to use in hijacked requests.
28
+	tlsConfig *tls.Config
27 29
 	// httpClient holds the client transport instance. Exported to keep the old code running.
28 30
 	HTTPClient *http.Client
29 31
 	// version of the server to talk to.
... ...
@@ -78,9 +80,11 @@ func NewClientWithVersion(host string, version version.Version, tlsOptions *tlsc
78 78
 	sockets.ConfigureTCPTransport(transport, proto, addr)
79 79
 
80 80
 	return &Client{
81
+		Proto:             proto,
81 82
 		Addr:              addr,
82 83
 		BasePath:          basePath,
83 84
 		Scheme:            scheme,
85
+		tlsConfig:         tlsConfig,
84 86
 		HTTPClient:        &http.Client{Transport: transport},
85 87
 		version:           version,
86 88
 		customHTTPHeaders: httpHeaders,
87 89
new file mode 100644
... ...
@@ -0,0 +1,30 @@
0
+package lib
1
+
2
+import (
3
+	"net/url"
4
+
5
+	"github.com/docker/docker/api/types"
6
+)
7
+
8
+// ContainerAttach attaches a connection to a container in the server.
9
+// It returns a types.HijackedConnection with the hijacked connection
10
+// and the a reader to get output. It's up to the called to close
11
+// the hijacked connection by calling types.HijackedResponse.Close.
12
+func (cli *Client) ContainerAttach(options types.ContainerAttachOptions) (*types.HijackedResponse, error) {
13
+	query := url.Values{}
14
+	if options.Stream {
15
+		query.Set("stream", "1")
16
+	}
17
+	if options.Stdin {
18
+		query.Set("stdin", "1")
19
+	}
20
+	if options.Stdout {
21
+		query.Set("stdout", "1")
22
+	}
23
+	if options.Stderr {
24
+		query.Set("stderr", "1")
25
+	}
26
+
27
+	headers := map[string][]string{"Content-Type": {"text/plain"}}
28
+	return cli.postHijacked("/containers/"+options.ContainerID+"/attach", query, nil, headers)
29
+}
0 30
new file mode 100644
... ...
@@ -0,0 +1,165 @@
0
+package lib
1
+
2
+import (
3
+	"crypto/tls"
4
+	"errors"
5
+	"fmt"
6
+	"io"
7
+	"net"
8
+	"net/http/httputil"
9
+	"net/url"
10
+	"strings"
11
+	"time"
12
+
13
+	"github.com/docker/docker/api/types"
14
+)
15
+
16
+// tlsClientCon holds tls information and a dialed connection.
17
+type tlsClientCon struct {
18
+	*tls.Conn
19
+	rawConn net.Conn
20
+}
21
+
22
+func (c *tlsClientCon) CloseWrite() error {
23
+	// Go standard tls.Conn doesn't provide the CloseWrite() method so we do it
24
+	// on its underlying connection.
25
+	if conn, ok := c.rawConn.(types.CloseWriter); ok {
26
+		return conn.CloseWrite()
27
+	}
28
+	return nil
29
+}
30
+
31
+// postHijacked sends a POST request and hijacks the connection.
32
+func (cli *Client) postHijacked(path string, query url.Values, body io.Reader, headers map[string][]string) (*types.HijackedResponse, error) {
33
+	bodyEncoded, err := encodeData(body)
34
+	if err != nil {
35
+		return nil, err
36
+	}
37
+
38
+	req, err := cli.newRequest("POST", path, query, bodyEncoded, headers)
39
+	if err != nil {
40
+		return nil, err
41
+	}
42
+	req.Host = cli.Addr
43
+
44
+	req.Header.Set("Connection", "Upgrade")
45
+	req.Header.Set("Upgrade", "tcp")
46
+
47
+	conn, err := dial(cli.Proto, cli.Addr, cli.tlsConfig)
48
+	if err != nil {
49
+		if strings.Contains(err.Error(), "connection refused") {
50
+			return nil, fmt.Errorf("Cannot connect to the Docker daemon. Is 'docker daemon' running on this host?")
51
+		}
52
+		return nil, err
53
+	}
54
+
55
+	// When we set up a TCP connection for hijack, there could be long periods
56
+	// of inactivity (a long running command with no output) that in certain
57
+	// network setups may cause ECONNTIMEOUT, leaving the client in an unknown
58
+	// state. Setting TCP KeepAlive on the socket connection will prohibit
59
+	// ECONNTIMEOUT unless the socket connection truly is broken
60
+	if tcpConn, ok := conn.(*net.TCPConn); ok {
61
+		tcpConn.SetKeepAlive(true)
62
+		tcpConn.SetKeepAlivePeriod(30 * time.Second)
63
+	}
64
+
65
+	clientconn := httputil.NewClientConn(conn, nil)
66
+	defer clientconn.Close()
67
+
68
+	// Server hijacks the connection, error 'connection closed' expected
69
+	clientconn.Do(req)
70
+
71
+	rwc, br := clientconn.Hijack()
72
+
73
+	return &types.HijackedResponse{rwc, br}, nil
74
+}
75
+
76
+func tlsDial(network, addr string, config *tls.Config) (net.Conn, error) {
77
+	return tlsDialWithDialer(new(net.Dialer), network, addr, config)
78
+}
79
+
80
+// We need to copy Go's implementation of tls.Dial (pkg/cryptor/tls/tls.go) in
81
+// order to return our custom tlsClientCon struct which holds both the tls.Conn
82
+// object _and_ its underlying raw connection. The rationale for this is that
83
+// we need to be able to close the write end of the connection when attaching,
84
+// which tls.Conn does not provide.
85
+func tlsDialWithDialer(dialer *net.Dialer, network, addr string, config *tls.Config) (net.Conn, error) {
86
+	// We want the Timeout and Deadline values from dialer to cover the
87
+	// whole process: TCP connection and TLS handshake. This means that we
88
+	// also need to start our own timers now.
89
+	timeout := dialer.Timeout
90
+
91
+	if !dialer.Deadline.IsZero() {
92
+		deadlineTimeout := dialer.Deadline.Sub(time.Now())
93
+		if timeout == 0 || deadlineTimeout < timeout {
94
+			timeout = deadlineTimeout
95
+		}
96
+	}
97
+
98
+	var errChannel chan error
99
+
100
+	if timeout != 0 {
101
+		errChannel = make(chan error, 2)
102
+		time.AfterFunc(timeout, func() {
103
+			errChannel <- errors.New("")
104
+		})
105
+	}
106
+
107
+	rawConn, err := dialer.Dial(network, addr)
108
+	if err != nil {
109
+		return nil, err
110
+	}
111
+	// When we set up a TCP connection for hijack, there could be long periods
112
+	// of inactivity (a long running command with no output) that in certain
113
+	// network setups may cause ECONNTIMEOUT, leaving the client in an unknown
114
+	// state. Setting TCP KeepAlive on the socket connection will prohibit
115
+	// ECONNTIMEOUT unless the socket connection truly is broken
116
+	if tcpConn, ok := rawConn.(*net.TCPConn); ok {
117
+		tcpConn.SetKeepAlive(true)
118
+		tcpConn.SetKeepAlivePeriod(30 * time.Second)
119
+	}
120
+
121
+	colonPos := strings.LastIndex(addr, ":")
122
+	if colonPos == -1 {
123
+		colonPos = len(addr)
124
+	}
125
+	hostname := addr[:colonPos]
126
+
127
+	// If no ServerName is set, infer the ServerName
128
+	// from the hostname we're connecting to.
129
+	if config.ServerName == "" {
130
+		// Make a copy to avoid polluting argument or default.
131
+		c := *config
132
+		c.ServerName = hostname
133
+		config = &c
134
+	}
135
+
136
+	conn := tls.Client(rawConn, config)
137
+
138
+	if timeout == 0 {
139
+		err = conn.Handshake()
140
+	} else {
141
+		go func() {
142
+			errChannel <- conn.Handshake()
143
+		}()
144
+
145
+		err = <-errChannel
146
+	}
147
+
148
+	if err != nil {
149
+		rawConn.Close()
150
+		return nil, err
151
+	}
152
+
153
+	// This is Docker difference with standard's crypto/tls package: returned a
154
+	// wrapper which holds both the TLS and raw connections.
155
+	return &tlsClientCon{conn, rawConn}, nil
156
+}
157
+
158
+func dial(proto, addr string, tlsConfig *tls.Config) (net.Conn, error) {
159
+	if tlsConfig != nil && proto != "unix" {
160
+		// Notice this isn't Go standard's tls.Dial function
161
+		return tlsDial(proto, addr, tlsConfig)
162
+	}
163
+	return net.Dial(proto, addr)
164
+}
... ...
@@ -71,38 +71,21 @@ func (cli *Client) sendRequest(method, path string, query url.Values, body inter
71 71
 	return cli.sendClientRequest(method, path, query, params, headers)
72 72
 }
73 73
 
74
-func (cli *Client) sendClientRequest(method, path string, query url.Values, in io.Reader, headers map[string][]string) (*serverResponse, error) {
74
+func (cli *Client) sendClientRequest(method, path string, query url.Values, body io.Reader, headers map[string][]string) (*serverResponse, error) {
75 75
 	serverResp := &serverResponse{
76 76
 		body:       nil,
77 77
 		statusCode: -1,
78 78
 	}
79 79
 
80 80
 	expectedPayload := (method == "POST" || method == "PUT")
81
-	if expectedPayload && in == nil {
82
-		in = bytes.NewReader([]byte{})
83
-	}
84
-
85
-	apiPath := cli.getAPIPath(path, query)
86
-	req, err := http.NewRequest(method, apiPath, in)
87
-	if err != nil {
88
-		return serverResp, err
89
-	}
90
-
91
-	// Add CLI Config's HTTP Headers BEFORE we set the Docker headers
92
-	// then the user can't change OUR headers
93
-	for k, v := range cli.customHTTPHeaders {
94
-		req.Header.Set(k, v)
81
+	if expectedPayload && body == nil {
82
+		body = bytes.NewReader([]byte{})
95 83
 	}
96 84
 
85
+	req, err := cli.newRequest(method, path, query, body, headers)
97 86
 	req.URL.Host = cli.Addr
98 87
 	req.URL.Scheme = cli.Scheme
99 88
 
100
-	if headers != nil {
101
-		for k, v := range headers {
102
-			req.Header[k] = v
103
-		}
104
-	}
105
-
106 89
 	if expectedPayload && req.Header.Get("Content-Type") == "" {
107 90
 		req.Header.Set("Content-Type", "text/plain")
108 91
 	}
... ...
@@ -143,6 +126,28 @@ func (cli *Client) sendClientRequest(method, path string, query url.Values, in i
143 143
 	return serverResp, nil
144 144
 }
145 145
 
146
+func (cli *Client) newRequest(method, path string, query url.Values, body io.Reader, headers map[string][]string) (*http.Request, error) {
147
+	apiPath := cli.getAPIPath(path, query)
148
+	req, err := http.NewRequest(method, apiPath, body)
149
+	if err != nil {
150
+		return nil, err
151
+	}
152
+
153
+	// Add CLI Config's HTTP Headers BEFORE we set the Docker headers
154
+	// then the user can't change OUR headers
155
+	for k, v := range cli.customHTTPHeaders {
156
+		req.Header.Set(k, v)
157
+	}
158
+
159
+	if headers != nil {
160
+		for k, v := range headers {
161
+			req.Header[k] = v
162
+		}
163
+	}
164
+
165
+	return req, nil
166
+}
167
+
146 168
 func encodeData(data interface{}) (*bytes.Buffer, error) {
147 169
 	params := bytes.NewBuffer(nil)
148 170
 	if data != nil {
... ...
@@ -1,14 +1,25 @@
1 1
 package types
2 2
 
3 3
 import (
4
+	"bufio"
4 5
 	"io"
6
+	"net"
5 7
 
6 8
 	"github.com/docker/docker/cliconfig"
7 9
 	"github.com/docker/docker/pkg/parsers/filters"
8 10
 	"github.com/docker/docker/pkg/ulimit"
9 11
 )
10 12
 
11
-// ContainerCommitOptions hods parameters to commit changes into a container.
13
+// ContainerAttachOptions holds parameters to attach to a container.
14
+type ContainerAttachOptions struct {
15
+	ContainerID string
16
+	Stream      bool
17
+	Stdin       bool
18
+	Stdout      bool
19
+	Stderr      bool
20
+}
21
+
22
+// ContainerCommitOptions holds parameters to commit changes into a container.
12 23
 type ContainerCommitOptions struct {
13 24
 	ContainerID    string
14 25
 	RepositoryName string
... ...
@@ -67,6 +78,31 @@ type EventsOptions struct {
67 67
 	Filters filters.Args
68 68
 }
69 69
 
70
+// HijackedResponse holds connection information for a hijacked request.
71
+type HijackedResponse struct {
72
+	Conn   net.Conn
73
+	Reader *bufio.Reader
74
+}
75
+
76
+// Close closes the hijacked connection and reader.
77
+func (h *HijackedResponse) Close() {
78
+	h.Conn.Close()
79
+}
80
+
81
+// CloseWriter is an interface that implement structs
82
+// that close input streams to prevent from writing.
83
+type CloseWriter interface {
84
+	CloseWrite() error
85
+}
86
+
87
+// CloseWrite closes a readWriter for writing.
88
+func (h *HijackedResponse) CloseWrite() error {
89
+	if conn, ok := h.Conn.(CloseWriter); ok {
90
+		return conn.CloseWrite()
91
+	}
92
+	return nil
93
+}
94
+
70 95
 // ImageBuildOptions holds the information
71 96
 // necessary to build images.
72 97
 type ImageBuildOptions struct {