Browse code

Move container.WaitStop, AttachWithLogs and WsAttachWithLogs to daemon service in api server

Signed-off-by: Antonio Murdaca <me@runcom.ninja>

Antonio Murdaca authored on 2015/05/06 05:25:05
Showing 4 changed files
... ...
@@ -1058,14 +1058,11 @@ func (s *Server) postContainersWait(version version.Version, w http.ResponseWrit
1058 1058
 		return fmt.Errorf("Missing parameter")
1059 1059
 	}
1060 1060
 
1061
-	name := vars["name"]
1062
-	cont, err := s.daemon.Get(name)
1061
+	status, err := s.daemon.ContainerWait(vars["name"], -1*time.Second)
1063 1062
 	if err != nil {
1064 1063
 		return err
1065 1064
 	}
1066 1065
 
1067
-	status, _ := cont.WaitStop(-1 * time.Second)
1068
-
1069 1066
 	return writeJSON(w, http.StatusOK, &types.ContainerWaitResponse{
1070 1067
 		StatusCode: status,
1071 1068
 	})
... ...
@@ -1099,50 +1096,33 @@ func (s *Server) postContainersAttach(version version.Version, w http.ResponseWr
1099 1099
 		return fmt.Errorf("Missing parameter")
1100 1100
 	}
1101 1101
 
1102
-	cont, err := s.daemon.Get(vars["name"])
1103
-	if err != nil {
1104
-		return err
1105
-	}
1106
-
1107 1102
 	inStream, outStream, err := hijackServer(w)
1108 1103
 	if err != nil {
1109 1104
 		return err
1110 1105
 	}
1111 1106
 	defer closeStreams(inStream, outStream)
1112 1107
 
1113
-	var errStream io.Writer
1114
-
1115 1108
 	if _, ok := r.Header["Upgrade"]; ok {
1116 1109
 		fmt.Fprintf(outStream, "HTTP/1.1 101 UPGRADED\r\nContent-Type: application/vnd.docker.raw-stream\r\nConnection: Upgrade\r\nUpgrade: tcp\r\n\r\n")
1117 1110
 	} else {
1118 1111
 		fmt.Fprintf(outStream, "HTTP/1.1 200 OK\r\nContent-Type: application/vnd.docker.raw-stream\r\n\r\n")
1119 1112
 	}
1120 1113
 
1121
-	if !cont.Config.Tty && version.GreaterThanOrEqualTo("1.6") {
1122
-		errStream = stdcopy.NewStdWriter(outStream, stdcopy.Stderr)
1123
-		outStream = stdcopy.NewStdWriter(outStream, stdcopy.Stdout)
1124
-	} else {
1125
-		errStream = outStream
1126
-	}
1127
-	logs := boolValue(r, "logs")
1128
-	stream := boolValue(r, "stream")
1129
-
1130
-	var stdin io.ReadCloser
1131
-	var stdout, stderr io.Writer
1132
-
1133
-	if boolValue(r, "stdin") {
1134
-		stdin = inStream
1135
-	}
1136
-	if boolValue(r, "stdout") {
1137
-		stdout = outStream
1138
-	}
1139
-	if boolValue(r, "stderr") {
1140
-		stderr = errStream
1114
+	attachWithLogsConfig := &daemon.ContainerAttachWithLogsConfig{
1115
+		InStream:  inStream,
1116
+		OutStream: outStream,
1117
+		UseStdin:  boolValue(r, "stdin"),
1118
+		UseStdout: boolValue(r, "stdout"),
1119
+		UseStderr: boolValue(r, "stderr"),
1120
+		Logs:      boolValue(r, "logs"),
1121
+		Stream:    boolValue(r, "stream"),
1122
+		Multiplex: version.GreaterThanOrEqualTo("1.6"),
1141 1123
 	}
1142 1124
 
1143
-	if err := cont.AttachWithLogs(stdin, stdout, stderr, logs, stream); err != nil {
1125
+	if err := s.daemon.ContainerAttachWithLogs(vars["name"], attachWithLogsConfig); err != nil {
1144 1126
 		fmt.Fprintf(outStream, "Error attaching: %s\n", err)
1145 1127
 	}
1128
+
1146 1129
 	return nil
1147 1130
 }
1148 1131
 
... ...
@@ -1153,17 +1133,19 @@ func (s *Server) wsContainersAttach(version version.Version, w http.ResponseWrit
1153 1153
 	if vars == nil {
1154 1154
 		return fmt.Errorf("Missing parameter")
1155 1155
 	}
1156
-	cont, err := s.daemon.Get(vars["name"])
1157
-	if err != nil {
1158
-		return err
1159
-	}
1160 1156
 
1161 1157
 	h := websocket.Handler(func(ws *websocket.Conn) {
1162 1158
 		defer ws.Close()
1163
-		logs := r.Form.Get("logs") != ""
1164
-		stream := r.Form.Get("stream") != ""
1165 1159
 
1166
-		if err := cont.AttachWithLogs(ws, ws, ws, logs, stream); err != nil {
1160
+		wsAttachWithLogsConfig := &daemon.ContainerWsAttachWithLogsConfig{
1161
+			InStream:  ws,
1162
+			OutStream: ws,
1163
+			ErrStream: ws,
1164
+			Logs:      boolValue(r, "logs"),
1165
+			Stream:    boolValue(r, "stream"),
1166
+		}
1167
+
1168
+		if err := s.daemon.ContainerWsAttachWithLogs(vars["name"], wsAttachWithLogsConfig); err != nil {
1167 1169
 			logrus.Errorf("Error attaching websocket: %s", err)
1168 1170
 		}
1169 1171
 	})
... ...
@@ -1,229 +1,61 @@
1 1
 package daemon
2 2
 
3 3
 import (
4
-	"encoding/json"
5 4
 	"io"
6
-	"os"
7
-	"sync"
8
-	"time"
9 5
 
10
-	"github.com/Sirupsen/logrus"
11
-	"github.com/docker/docker/pkg/jsonlog"
12
-	"github.com/docker/docker/pkg/promise"
6
+	"github.com/docker/docker/pkg/stdcopy"
13 7
 )
14 8
 
15
-func (c *Container) AttachWithLogs(stdin io.ReadCloser, stdout, stderr io.Writer, logs, stream bool) error {
16
-	if logs {
17
-		cLog, err := c.ReadLog("json")
18
-		if err != nil && os.IsNotExist(err) {
19
-			// Legacy logs
20
-			logrus.Debugf("Old logs format")
21
-			if stdout != nil {
22
-				cLog, err := c.ReadLog("stdout")
23
-				if err != nil {
24
-					logrus.Errorf("Error reading logs (stdout): %s", err)
25
-				} else if _, err := io.Copy(stdout, cLog); err != nil {
26
-					logrus.Errorf("Error streaming logs (stdout): %s", err)
27
-				}
28
-			}
29
-			if stderr != nil {
30
-				cLog, err := c.ReadLog("stderr")
31
-				if err != nil {
32
-					logrus.Errorf("Error reading logs (stderr): %s", err)
33
-				} else if _, err := io.Copy(stderr, cLog); err != nil {
34
-					logrus.Errorf("Error streaming logs (stderr): %s", err)
35
-				}
36
-			}
37
-		} else if err != nil {
38
-			logrus.Errorf("Error reading logs (json): %s", err)
39
-		} else {
40
-			dec := json.NewDecoder(cLog)
41
-			for {
42
-				l := &jsonlog.JSONLog{}
43
-
44
-				if err := dec.Decode(l); err == io.EOF {
45
-					break
46
-				} else if err != nil {
47
-					logrus.Errorf("Error streaming logs: %s", err)
48
-					break
49
-				}
50
-				if l.Stream == "stdout" && stdout != nil {
51
-					io.WriteString(stdout, l.Log)
52
-				}
53
-				if l.Stream == "stderr" && stderr != nil {
54
-					io.WriteString(stderr, l.Log)
55
-				}
56
-			}
57
-		}
58
-	}
59
-
60
-	//stream
61
-	if stream {
62
-		var stdinPipe io.ReadCloser
63
-		if stdin != nil {
64
-			r, w := io.Pipe()
65
-			go func() {
66
-				defer w.Close()
67
-				defer logrus.Debugf("Closing buffered stdin pipe")
68
-				io.Copy(w, stdin)
69
-			}()
70
-			stdinPipe = r
71
-		}
72
-		<-c.Attach(stdinPipe, stdout, stderr)
73
-		// If we are in stdinonce mode, wait for the process to end
74
-		// otherwise, simply return
75
-		if c.Config.StdinOnce && !c.Config.Tty {
76
-			c.WaitStop(-1 * time.Second)
77
-		}
78
-	}
79
-	return nil
9
+type ContainerAttachWithLogsConfig struct {
10
+	InStream                       io.ReadCloser
11
+	OutStream                      io.Writer
12
+	UseStdin, UseStdout, UseStderr bool
13
+	Logs, Stream                   bool
14
+	Multiplex                      bool
80 15
 }
81 16
 
82
-func (c *Container) Attach(stdin io.ReadCloser, stdout io.Writer, stderr io.Writer) chan error {
83
-	return attach(&c.StreamConfig, c.Config.OpenStdin, c.Config.StdinOnce, c.Config.Tty, stdin, stdout, stderr)
84
-}
85
-
86
-func attach(streamConfig *StreamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer) chan error {
87
-	var (
88
-		cStdout, cStderr io.ReadCloser
89
-		cStdin           io.WriteCloser
90
-		wg               sync.WaitGroup
91
-		errors           = make(chan error, 3)
92
-	)
93
-
94
-	if stdin != nil && openStdin {
95
-		cStdin = streamConfig.StdinPipe()
96
-		wg.Add(1)
17
+func (daemon *Daemon) ContainerAttachWithLogs(name string, c *ContainerAttachWithLogsConfig) error {
18
+	container, err := daemon.Get(name)
19
+	if err != nil {
20
+		return err
97 21
 	}
98 22
 
99
-	if stdout != nil {
100
-		cStdout = streamConfig.StdoutPipe()
101
-		wg.Add(1)
102
-	}
23
+	var errStream io.Writer
103 24
 
104
-	if stderr != nil {
105
-		cStderr = streamConfig.StderrPipe()
106
-		wg.Add(1)
25
+	if !container.Config.Tty && c.Multiplex {
26
+		errStream = stdcopy.NewStdWriter(c.OutStream, stdcopy.Stderr)
27
+		c.OutStream = stdcopy.NewStdWriter(c.OutStream, stdcopy.Stdout)
28
+	} else {
29
+		errStream = c.OutStream
107 30
 	}
108 31
 
109
-	// Connect stdin of container to the http conn.
110
-	go func() {
111
-		if stdin == nil || !openStdin {
112
-			return
113
-		}
114
-		logrus.Debugf("attach: stdin: begin")
115
-		defer func() {
116
-			if stdinOnce && !tty {
117
-				cStdin.Close()
118
-			} else {
119
-				// No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr
120
-				if cStdout != nil {
121
-					cStdout.Close()
122
-				}
123
-				if cStderr != nil {
124
-					cStderr.Close()
125
-				}
126
-			}
127
-			wg.Done()
128
-			logrus.Debugf("attach: stdin: end")
129
-		}()
32
+	var stdin io.ReadCloser
33
+	var stdout, stderr io.Writer
130 34
 
131
-		var err error
132
-		if tty {
133
-			_, err = copyEscapable(cStdin, stdin)
134
-		} else {
135
-			_, err = io.Copy(cStdin, stdin)
136
-
137
-		}
138
-		if err == io.ErrClosedPipe {
139
-			err = nil
140
-		}
141
-		if err != nil {
142
-			logrus.Errorf("attach: stdin: %s", err)
143
-			errors <- err
144
-			return
145
-		}
146
-	}()
147
-
148
-	attachStream := func(name string, stream io.Writer, streamPipe io.ReadCloser) {
149
-		if stream == nil {
150
-			return
151
-		}
152
-		defer func() {
153
-			// Make sure stdin gets closed
154
-			if stdin != nil {
155
-				stdin.Close()
156
-			}
157
-			streamPipe.Close()
158
-			wg.Done()
159
-			logrus.Debugf("attach: %s: end", name)
160
-		}()
161
-
162
-		logrus.Debugf("attach: %s: begin", name)
163
-		_, err := io.Copy(stream, streamPipe)
164
-		if err == io.ErrClosedPipe {
165
-			err = nil
166
-		}
167
-		if err != nil {
168
-			logrus.Errorf("attach: %s: %v", name, err)
169
-			errors <- err
170
-		}
35
+	if c.UseStdin {
36
+		stdin = c.InStream
37
+	}
38
+	if c.UseStdout {
39
+		stdout = c.OutStream
40
+	}
41
+	if c.UseStderr {
42
+		stderr = errStream
171 43
 	}
172 44
 
173
-	go attachStream("stdout", stdout, cStdout)
174
-	go attachStream("stderr", stderr, cStderr)
45
+	return container.AttachWithLogs(stdin, stdout, stderr, c.Logs, c.Stream)
46
+}
175 47
 
176
-	return promise.Go(func() error {
177
-		wg.Wait()
178
-		close(errors)
179
-		for err := range errors {
180
-			if err != nil {
181
-				return err
182
-			}
183
-		}
184
-		return nil
185
-	})
48
+type ContainerWsAttachWithLogsConfig struct {
49
+	InStream             io.ReadCloser
50
+	OutStream, ErrStream io.Writer
51
+	Logs, Stream         bool
186 52
 }
187 53
 
188
-// Code c/c from io.Copy() modified to handle escape sequence
189
-func copyEscapable(dst io.Writer, src io.ReadCloser) (written int64, err error) {
190
-	buf := make([]byte, 32*1024)
191
-	for {
192
-		nr, er := src.Read(buf)
193
-		if nr > 0 {
194
-			// ---- Docker addition
195
-			// char 16 is C-p
196
-			if nr == 1 && buf[0] == 16 {
197
-				nr, er = src.Read(buf)
198
-				// char 17 is C-q
199
-				if nr == 1 && buf[0] == 17 {
200
-					if err := src.Close(); err != nil {
201
-						return 0, err
202
-					}
203
-					return 0, nil
204
-				}
205
-			}
206
-			// ---- End of docker
207
-			nw, ew := dst.Write(buf[0:nr])
208
-			if nw > 0 {
209
-				written += int64(nw)
210
-			}
211
-			if ew != nil {
212
-				err = ew
213
-				break
214
-			}
215
-			if nr != nw {
216
-				err = io.ErrShortWrite
217
-				break
218
-			}
219
-		}
220
-		if er == io.EOF {
221
-			break
222
-		}
223
-		if er != nil {
224
-			err = er
225
-			break
226
-		}
54
+func (daemon *Daemon) ContainerWsAttachWithLogs(name string, c *ContainerWsAttachWithLogsConfig) error {
55
+	container, err := daemon.Get(name)
56
+	if err != nil {
57
+		return err
227 58
 	}
228
-	return written, err
59
+
60
+	return container.AttachWithLogs(c.InStream, c.OutStream, c.ErrStream, c.Logs, c.Stream)
229 61
 }
... ...
@@ -11,6 +11,7 @@ import (
11 11
 	"path"
12 12
 	"path/filepath"
13 13
 	"strings"
14
+	"sync"
14 15
 	"syscall"
15 16
 	"time"
16 17
 
... ...
@@ -34,6 +35,7 @@ import (
34 34
 	"github.com/docker/docker/pkg/directory"
35 35
 	"github.com/docker/docker/pkg/etchosts"
36 36
 	"github.com/docker/docker/pkg/ioutils"
37
+	"github.com/docker/docker/pkg/jsonlog"
37 38
 	"github.com/docker/docker/pkg/promise"
38 39
 	"github.com/docker/docker/pkg/resolvconf"
39 40
 	"github.com/docker/docker/pkg/stringid"
... ...
@@ -1636,3 +1638,219 @@ func (container *Container) monitorExec(execConfig *execConfig, callback execdri
1636 1636
 
1637 1637
 	return err
1638 1638
 }
1639
+
1640
+func (c *Container) Attach(stdin io.ReadCloser, stdout io.Writer, stderr io.Writer) chan error {
1641
+	return attach(&c.StreamConfig, c.Config.OpenStdin, c.Config.StdinOnce, c.Config.Tty, stdin, stdout, stderr)
1642
+}
1643
+
1644
+func (c *Container) AttachWithLogs(stdin io.ReadCloser, stdout, stderr io.Writer, logs, stream bool) error {
1645
+	if logs {
1646
+		cLog, err := c.ReadLog("json")
1647
+		if err != nil && os.IsNotExist(err) {
1648
+			// Legacy logs
1649
+			logrus.Debugf("Old logs format")
1650
+			if stdout != nil {
1651
+				cLog, err := c.ReadLog("stdout")
1652
+				if err != nil {
1653
+					logrus.Errorf("Error reading logs (stdout): %s", err)
1654
+				} else if _, err := io.Copy(stdout, cLog); err != nil {
1655
+					logrus.Errorf("Error streaming logs (stdout): %s", err)
1656
+				}
1657
+			}
1658
+			if stderr != nil {
1659
+				cLog, err := c.ReadLog("stderr")
1660
+				if err != nil {
1661
+					logrus.Errorf("Error reading logs (stderr): %s", err)
1662
+				} else if _, err := io.Copy(stderr, cLog); err != nil {
1663
+					logrus.Errorf("Error streaming logs (stderr): %s", err)
1664
+				}
1665
+			}
1666
+		} else if err != nil {
1667
+			logrus.Errorf("Error reading logs (json): %s", err)
1668
+		} else {
1669
+			dec := json.NewDecoder(cLog)
1670
+			for {
1671
+				l := &jsonlog.JSONLog{}
1672
+
1673
+				if err := dec.Decode(l); err == io.EOF {
1674
+					break
1675
+				} else if err != nil {
1676
+					logrus.Errorf("Error streaming logs: %s", err)
1677
+					break
1678
+				}
1679
+				if l.Stream == "stdout" && stdout != nil {
1680
+					io.WriteString(stdout, l.Log)
1681
+				}
1682
+				if l.Stream == "stderr" && stderr != nil {
1683
+					io.WriteString(stderr, l.Log)
1684
+				}
1685
+			}
1686
+		}
1687
+	}
1688
+
1689
+	//stream
1690
+	if stream {
1691
+		var stdinPipe io.ReadCloser
1692
+		if stdin != nil {
1693
+			r, w := io.Pipe()
1694
+			go func() {
1695
+				defer w.Close()
1696
+				defer logrus.Debugf("Closing buffered stdin pipe")
1697
+				io.Copy(w, stdin)
1698
+			}()
1699
+			stdinPipe = r
1700
+		}
1701
+		<-c.Attach(stdinPipe, stdout, stderr)
1702
+		// If we are in stdinonce mode, wait for the process to end
1703
+		// otherwise, simply return
1704
+		if c.Config.StdinOnce && !c.Config.Tty {
1705
+			c.WaitStop(-1 * time.Second)
1706
+		}
1707
+	}
1708
+	return nil
1709
+}
1710
+
1711
+func attach(streamConfig *StreamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer) chan error {
1712
+	var (
1713
+		cStdout, cStderr io.ReadCloser
1714
+		cStdin           io.WriteCloser
1715
+		wg               sync.WaitGroup
1716
+		errors           = make(chan error, 3)
1717
+	)
1718
+
1719
+	if stdin != nil && openStdin {
1720
+		cStdin = streamConfig.StdinPipe()
1721
+		wg.Add(1)
1722
+	}
1723
+
1724
+	if stdout != nil {
1725
+		cStdout = streamConfig.StdoutPipe()
1726
+		wg.Add(1)
1727
+	}
1728
+
1729
+	if stderr != nil {
1730
+		cStderr = streamConfig.StderrPipe()
1731
+		wg.Add(1)
1732
+	}
1733
+
1734
+	// Connect stdin of container to the http conn.
1735
+	go func() {
1736
+		if stdin == nil || !openStdin {
1737
+			return
1738
+		}
1739
+		logrus.Debugf("attach: stdin: begin")
1740
+		defer func() {
1741
+			if stdinOnce && !tty {
1742
+				cStdin.Close()
1743
+			} else {
1744
+				// No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr
1745
+				if cStdout != nil {
1746
+					cStdout.Close()
1747
+				}
1748
+				if cStderr != nil {
1749
+					cStderr.Close()
1750
+				}
1751
+			}
1752
+			wg.Done()
1753
+			logrus.Debugf("attach: stdin: end")
1754
+		}()
1755
+
1756
+		var err error
1757
+		if tty {
1758
+			_, err = copyEscapable(cStdin, stdin)
1759
+		} else {
1760
+			_, err = io.Copy(cStdin, stdin)
1761
+
1762
+		}
1763
+		if err == io.ErrClosedPipe {
1764
+			err = nil
1765
+		}
1766
+		if err != nil {
1767
+			logrus.Errorf("attach: stdin: %s", err)
1768
+			errors <- err
1769
+			return
1770
+		}
1771
+	}()
1772
+
1773
+	attachStream := func(name string, stream io.Writer, streamPipe io.ReadCloser) {
1774
+		if stream == nil {
1775
+			return
1776
+		}
1777
+		defer func() {
1778
+			// Make sure stdin gets closed
1779
+			if stdin != nil {
1780
+				stdin.Close()
1781
+			}
1782
+			streamPipe.Close()
1783
+			wg.Done()
1784
+			logrus.Debugf("attach: %s: end", name)
1785
+		}()
1786
+
1787
+		logrus.Debugf("attach: %s: begin", name)
1788
+		_, err := io.Copy(stream, streamPipe)
1789
+		if err == io.ErrClosedPipe {
1790
+			err = nil
1791
+		}
1792
+		if err != nil {
1793
+			logrus.Errorf("attach: %s: %v", name, err)
1794
+			errors <- err
1795
+		}
1796
+	}
1797
+
1798
+	go attachStream("stdout", stdout, cStdout)
1799
+	go attachStream("stderr", stderr, cStderr)
1800
+
1801
+	return promise.Go(func() error {
1802
+		wg.Wait()
1803
+		close(errors)
1804
+		for err := range errors {
1805
+			if err != nil {
1806
+				return err
1807
+			}
1808
+		}
1809
+		return nil
1810
+	})
1811
+}
1812
+
1813
+// Code c/c from io.Copy() modified to handle escape sequence
1814
+func copyEscapable(dst io.Writer, src io.ReadCloser) (written int64, err error) {
1815
+	buf := make([]byte, 32*1024)
1816
+	for {
1817
+		nr, er := src.Read(buf)
1818
+		if nr > 0 {
1819
+			// ---- Docker addition
1820
+			// char 16 is C-p
1821
+			if nr == 1 && buf[0] == 16 {
1822
+				nr, er = src.Read(buf)
1823
+				// char 17 is C-q
1824
+				if nr == 1 && buf[0] == 17 {
1825
+					if err := src.Close(); err != nil {
1826
+						return 0, err
1827
+					}
1828
+					return 0, nil
1829
+				}
1830
+			}
1831
+			// ---- End of docker
1832
+			nw, ew := dst.Write(buf[0:nr])
1833
+			if nw > 0 {
1834
+				written += int64(nw)
1835
+			}
1836
+			if ew != nil {
1837
+				err = ew
1838
+				break
1839
+			}
1840
+			if nr != nw {
1841
+				err = io.ErrShortWrite
1842
+				break
1843
+			}
1844
+		}
1845
+		if er == io.EOF {
1846
+			break
1847
+		}
1848
+		if er != nil {
1849
+			err = er
1850
+			break
1851
+		}
1852
+	}
1853
+	return written, err
1854
+}
1639 1855
new file mode 100644
... ...
@@ -0,0 +1,12 @@
0
+package daemon
1
+
2
+import "time"
3
+
4
+func (daemon *Daemon) ContainerWait(name string, timeout time.Duration) (int, error) {
5
+	container, err := daemon.Get(name)
6
+	if err != nil {
7
+		return -1, err
8
+	}
9
+
10
+	return container.WaitStop(timeout)
11
+}