Browse code

Make StdCopy works with huge amount of data

Guillaume J. Charmes authored on 2013/09/12 03:35:09
Showing 6 changed files
... ...
@@ -766,32 +766,43 @@ func postContainersAttach(srv *Server, version float64, w http.ResponseWriter, r
766 766
 	}
767 767
 	name := vars["name"]
768 768
 
769
-	if _, err := srv.ContainerInspect(name); err != nil {
769
+	c, err := srv.ContainerInspect(name)
770
+	if err != nil {
770 771
 		return err
771 772
 	}
772 773
 
773
-	in, out, err := hijackServer(w)
774
+	inStream, outStream, err := hijackServer(w)
774 775
 	if err != nil {
775 776
 		return err
776 777
 	}
777 778
 	defer func() {
778
-		if tcpc, ok := in.(*net.TCPConn); ok {
779
+		if tcpc, ok := inStream.(*net.TCPConn); ok {
779 780
 			tcpc.CloseWrite()
780 781
 		} else {
781
-			in.Close()
782
+			inStream.Close()
782 783
 		}
783 784
 	}()
784 785
 	defer func() {
785
-		if tcpc, ok := out.(*net.TCPConn); ok {
786
+		if tcpc, ok := outStream.(*net.TCPConn); ok {
786 787
 			tcpc.CloseWrite()
787
-		} else if closer, ok := out.(io.Closer); ok {
788
+		} else if closer, ok := outStream.(io.Closer); ok {
788 789
 			closer.Close()
789 790
 		}
790 791
 	}()
791 792
 
792
-	fmt.Fprintf(out, "HTTP/1.1 200 OK\r\nContent-Type: application/vnd.docker.raw-stream\r\n\r\n")
793
-	if err := srv.ContainerAttach(name, logs, stream, stdin, stdout, stderr, in, out); err != nil {
794
-		fmt.Fprintf(out, "Error: %s\n", err)
793
+	var errStream io.Writer
794
+
795
+	fmt.Fprintf(outStream, "HTTP/1.1 200 OK\r\nContent-Type: application/vnd.docker.raw-stream\r\n\r\n")
796
+
797
+	if !c.Config.Tty && version >= 1.4 {
798
+		errStream = utils.NewStdWriter(outStream, utils.Stderr)
799
+		outStream = utils.NewStdWriter(outStream, utils.Stdout)
800
+	} else {
801
+		errStream = outStream
802
+	}
803
+
804
+	if err := srv.ContainerAttach(name, logs, stream, stdin, stdout, stderr, inStream, outStream, errStream); err != nil {
805
+		fmt.Fprintf(outStream, "Error: %s\n", err)
795 806
 	}
796 807
 	return nil
797 808
 }
... ...
@@ -834,7 +845,7 @@ func wsContainersAttach(srv *Server, version float64, w http.ResponseWriter, r *
834 834
 	h := websocket.Handler(func(ws *websocket.Conn) {
835 835
 		defer ws.Close()
836 836
 
837
-		if err := srv.ContainerAttach(name, logs, stream, stdin, stdout, stderr, ws, ws); err != nil {
837
+		if err := srv.ContainerAttach(name, logs, stream, stdin, stdout, stderr, ws, ws, ws); err != nil {
838 838
 			utils.Debugf("Error: %s", err)
839 839
 		}
840 840
 	})
... ...
@@ -951,7 +951,7 @@ func TestPostContainersAttach(t *testing.T) {
951 951
 	})
952 952
 
953 953
 	setTimeout(t, "read/write assertion timed out", 2*time.Second, func() {
954
-		if err := assertPipe("hello\n", string(utils.Stdout)+"hello", stdout, stdinPipe, 15); err != nil {
954
+		if err := assertPipe("hello\n", string([]byte{1, 0, 0, 0, 6, 0, 0, 0})+"hello", stdout, stdinPipe, 15); err != nil {
955 955
 			t.Fatal(err)
956 956
 		}
957 957
 	})
... ...
@@ -1040,7 +1040,7 @@ func TestPostContainersAttachStderr(t *testing.T) {
1040 1040
 	})
1041 1041
 
1042 1042
 	setTimeout(t, "read/write assertion timed out", 2*time.Second, func() {
1043
-		if err := assertPipe("hello\n", string(utils.Stderr)+"hello", stdout, stdinPipe, 15); err != nil {
1043
+		if err := assertPipe("hello\n", string([]byte{2, 0, 0, 0, 6, 0, 0, 0})+"hello", stdout, stdinPipe, 15); err != nil {
1044 1044
 			t.Fatal(err)
1045 1045
 		}
1046 1046
 	})
... ...
@@ -1570,7 +1570,7 @@ func (cli *DockerCli) CmdRun(args ...string) error {
1570 1570
 			return err
1571 1571
 		}
1572 1572
 		if status != 0 {
1573
-			return &utils.StatusError{status}
1573
+			return &utils.StatusError{Status: status}
1574 1574
 		}
1575 1575
 	}
1576 1576
 
... ...
@@ -1175,11 +1175,12 @@ func (srv *Server) ContainerResize(name string, h, w int) error {
1175 1175
 	return fmt.Errorf("No such container: %s", name)
1176 1176
 }
1177 1177
 
1178
-func (srv *Server) ContainerAttach(name string, logs, stream, stdin, stdout, stderr bool, in io.ReadCloser, out io.Writer) error {
1178
+func (srv *Server) ContainerAttach(name string, logs, stream, stdin, stdout, stderr bool, inStream io.ReadCloser, outStream, errStream io.Writer) error {
1179 1179
 	container := srv.runtime.Get(name)
1180 1180
 	if container == nil {
1181 1181
 		return fmt.Errorf("No such container: %s", name)
1182 1182
 	}
1183
+
1183 1184
 	//logs
1184 1185
 	if logs {
1185 1186
 		cLog, err := container.ReadLog("json")
... ...
@@ -1190,7 +1191,7 @@ func (srv *Server) ContainerAttach(name string, logs, stream, stdin, stdout, std
1190 1190
 				cLog, err := container.ReadLog("stdout")
1191 1191
 				if err != nil {
1192 1192
 					utils.Debugf("Error reading logs (stdout): %s", err)
1193
-				} else if _, err := io.Copy(out, cLog); err != nil {
1193
+				} else if _, err := io.Copy(outStream, cLog); err != nil {
1194 1194
 					utils.Debugf("Error streaming logs (stdout): %s", err)
1195 1195
 				}
1196 1196
 			}
... ...
@@ -1198,7 +1199,7 @@ func (srv *Server) ContainerAttach(name string, logs, stream, stdin, stdout, std
1198 1198
 				cLog, err := container.ReadLog("stderr")
1199 1199
 				if err != nil {
1200 1200
 					utils.Debugf("Error reading logs (stderr): %s", err)
1201
-				} else if _, err := io.Copy(out, cLog); err != nil {
1201
+				} else if _, err := io.Copy(errStream, cLog); err != nil {
1202 1202
 					utils.Debugf("Error streaming logs (stderr): %s", err)
1203 1203
 				}
1204 1204
 			}
... ...
@@ -1215,7 +1216,7 @@ func (srv *Server) ContainerAttach(name string, logs, stream, stdin, stdout, std
1215 1215
 					break
1216 1216
 				}
1217 1217
 				if (l.Stream == "stdout" && stdout) || (l.Stream == "stderr" && stderr) {
1218
-					fmt.Fprintf(out, "%s", l.Log)
1218
+					fmt.Fprintf(outStream, "%s", l.Log)
1219 1219
 				}
1220 1220
 			}
1221 1221
 		}
... ...
@@ -1238,24 +1239,16 @@ func (srv *Server) ContainerAttach(name string, logs, stream, stdin, stdout, std
1238 1238
 			go func() {
1239 1239
 				defer w.Close()
1240 1240
 				defer utils.Debugf("Closing buffered stdin pipe")
1241
-				io.Copy(w, in)
1241
+				io.Copy(w, inStream)
1242 1242
 			}()
1243 1243
 			cStdin = r
1244
-			cStdinCloser = in
1244
+			cStdinCloser = inStream
1245 1245
 		}
1246 1246
 		if stdout {
1247
-			if container.Config.Tty {
1248
-				cStdout = out
1249
-			} else {
1250
-				cStdout = utils.NewStdWriter(out, utils.Stdout)
1251
-			}
1247
+			cStdout = outStream
1252 1248
 		}
1253 1249
 		if stderr {
1254
-			if container.Config.Tty {
1255
-				cStderr = out
1256
-			} else {
1257
-				cStderr = utils.NewStdWriter(out, utils.Stderr)
1258
-			}
1250
+			cStderr = errStream
1259 1251
 		}
1260 1252
 
1261 1253
 		<-container.Attach(cStdin, cStdinCloser, cStdout, cStderr)
1262 1254
new file mode 100644
... ...
@@ -0,0 +1,179 @@
0
+package utils
1
+
2
+import (
3
+	"encoding/binary"
4
+	"errors"
5
+	"io"
6
+	"unsafe"
7
+)
8
+
9
+func CheckBigEndian() bool {
10
+	var x uint32 = 0x01020304
11
+
12
+	if *(*byte)(unsafe.Pointer(&x)) == 0x01 {
13
+		return true
14
+	}
15
+	return false
16
+}
17
+
18
+const (
19
+	StdWriterPrefixLen = 8
20
+	StdWriterFdIndex   = 0
21
+	StdWriterSizeIndex = 4
22
+)
23
+
24
+type StdType [StdWriterPrefixLen]byte
25
+
26
+var (
27
+	Stdin  StdType = StdType{0: 0}
28
+	Stdout StdType = StdType{0: 1}
29
+	Stderr StdType = StdType{0: 2}
30
+)
31
+
32
+type StdWriter struct {
33
+	io.Writer
34
+	prefix    StdType
35
+	sizeBuf   []byte
36
+	byteOrder binary.ByteOrder
37
+}
38
+
39
+func (w *StdWriter) Write(buf []byte) (n int, err error) {
40
+	if w == nil || w.Writer == nil {
41
+		return 0, errors.New("Writer not instanciated")
42
+	}
43
+	w.byteOrder.PutUint32(w.prefix[4:], uint32(len(buf)))
44
+	buf = append(w.prefix[:], buf...)
45
+
46
+	n, err = w.Writer.Write(buf)
47
+	return n - StdWriterPrefixLen, err
48
+}
49
+
50
+// NewStdWriter instanciate a new Writer based on the given type `t`.
51
+// the utils package contains the valid parametres for `t`:
52
+func NewStdWriter(w io.Writer, t StdType) *StdWriter {
53
+	if len(t) != StdWriterPrefixLen {
54
+		return nil
55
+	}
56
+
57
+	var bo binary.ByteOrder
58
+
59
+	if CheckBigEndian() {
60
+		bo = binary.BigEndian
61
+	} else {
62
+		bo = binary.LittleEndian
63
+	}
64
+	return &StdWriter{
65
+		Writer:    w,
66
+		prefix:    t,
67
+		sizeBuf:   make([]byte, 4),
68
+		byteOrder: bo,
69
+	}
70
+}
71
+
72
+var ErrInvalidStdHeader = errors.New("Unrecognized input header")
73
+
74
+// StdCopy is a modified version of io.Copy.
75
+//
76
+// StdCopy copies from src to dstout or dsterr until either EOF is reached
77
+// on src or an error occurs.  It returns the number of bytes
78
+// copied and the first error encountered while copying, if any.
79
+//
80
+// A successful Copy returns err == nil, not err == EOF.
81
+// Because Copy is defined to read from src until EOF, it does
82
+// not treat an EOF from Read as an error to be reported.
83
+//
84
+// The source needs to be writter via StdWriter, dstout or dsterr is selected
85
+// based on the prefix added by StdWriter
86
+func StdCopy(dstout, dsterr io.Writer, src io.Reader) (written int64, err error) {
87
+	var (
88
+		buf       = make([]byte, 32*1024+StdWriterPrefixLen+1)
89
+		bufLen    = len(buf)
90
+		nr, nw    int
91
+		er, ew    error
92
+		out       io.Writer
93
+		byteOrder binary.ByteOrder
94
+		frameSize int
95
+	)
96
+
97
+	// Check the machine's endianness
98
+	if CheckBigEndian() {
99
+		byteOrder = binary.BigEndian
100
+	} else {
101
+		byteOrder = binary.LittleEndian
102
+	}
103
+
104
+	for {
105
+		// Make sure we have at least a full header
106
+		for nr < StdWriterPrefixLen {
107
+			var nr2 int
108
+			nr2, er = src.Read(buf[nr:])
109
+			if er == io.EOF {
110
+				return written, nil
111
+			}
112
+			if er != nil {
113
+				return 0, er
114
+			}
115
+			nr += nr2
116
+		}
117
+
118
+		// Check the first byte to know where to write
119
+		switch buf[StdWriterFdIndex] {
120
+		case 0:
121
+			fallthrough
122
+		case 1:
123
+			// Write on stdout
124
+			out = dstout
125
+		case 2:
126
+			// Write on stderr
127
+			out = dsterr
128
+		default:
129
+			Debugf("Error selecting output fd: (%d)", buf[StdWriterFdIndex])
130
+			return 0, ErrInvalidStdHeader
131
+		}
132
+
133
+		// Retrieve the size of the frame
134
+		frameSize = int(byteOrder.Uint32(buf[StdWriterSizeIndex : StdWriterSizeIndex+4]))
135
+
136
+		// Check if the buffer is big enough to read the frame.
137
+		// Extend it if necessary.
138
+		if frameSize+StdWriterPrefixLen > bufLen {
139
+			Debugf("Extending buffer cap.")
140
+			buf = append(buf, make([]byte, frameSize-len(buf)+1)...)
141
+			bufLen = len(buf)
142
+		}
143
+
144
+		// While the amount of bytes read is less than the size of the frame + header, we keep reading
145
+		for nr < frameSize+StdWriterPrefixLen {
146
+			var nr2 int
147
+			nr2, er = src.Read(buf[nr:])
148
+			if er == io.EOF {
149
+				return written, nil
150
+			}
151
+			if er != nil {
152
+				Debugf("Error reading frame: %s", er)
153
+				return 0, er
154
+			}
155
+			nr += nr2
156
+		}
157
+
158
+		// Write the retrieved frame (without header)
159
+		nw, ew = out.Write(buf[StdWriterPrefixLen : frameSize+StdWriterPrefixLen])
160
+		if nw > 0 {
161
+			written += int64(nw)
162
+		}
163
+		if ew != nil {
164
+			Debugf("Error writing frame: %s", ew)
165
+			return 0, ew
166
+		}
167
+		// If the frame has not been fully written: error
168
+		if nw != frameSize {
169
+			Debugf("Error Short Write: (%d on %d)", nw, frameSize)
170
+			return 0, io.ErrShortWrite
171
+		}
172
+
173
+		// Move the rest of the buffer to the beginning
174
+		copy(buf, buf[frameSize+StdWriterPrefixLen:])
175
+		// Move the index
176
+		nr -= frameSize + StdWriterPrefixLen
177
+	}
178
+}
... ...
@@ -1021,90 +1021,3 @@ type StatusError struct {
1021 1021
 func (e *StatusError) Error() string {
1022 1022
 	return fmt.Sprintf("Status: %d", e.Status)
1023 1023
 }
1024
-
1025
-type StdType []byte
1026
-
1027
-const StdWriterPrefixLen = 8
1028
-
1029
-var (
1030
-	Stdin  StdType = StdType("\001 stdin\002")
1031
-	Stdout StdType = StdType("\001stdout\002")
1032
-	Stderr StdType = StdType("\001stderr\002")
1033
-)
1034
-
1035
-type StdWriter struct {
1036
-	io.Writer
1037
-	prefix []byte
1038
-}
1039
-
1040
-func (w *StdWriter) Write(buf []byte) (n int, err error) {
1041
-	n, err = w.Writer.Write(append(w.prefix, buf...))
1042
-	if n >= len(buf)+StdWriterPrefixLen {
1043
-		n -= StdWriterPrefixLen
1044
-	}
1045
-	return n, err
1046
-}
1047
-
1048
-// NewStdWriter instanciate a new Writer based on the given type `t`.
1049
-// the utils package contains the valid parametres for `t`:
1050
-func NewStdWriter(w io.Writer, t StdType) *StdWriter {
1051
-	if len(t) != StdWriterPrefixLen {
1052
-		return nil
1053
-	}
1054
-	return &StdWriter{
1055
-		Writer: w,
1056
-		prefix: []byte(t),
1057
-	}
1058
-}
1059
-
1060
-// StdCopy is a modified version of io.Copy.
1061
-//
1062
-// StdCopy copies from src to dstout or dsterr until either EOF is reached
1063
-// on src or an error occurs.  It returns the number of bytes
1064
-// copied and the first error encountered while copying, if any.
1065
-//
1066
-// A successful Copy returns err == nil, not err == EOF.
1067
-// Because Copy is defined to read from src until EOF, it does
1068
-// not treat an EOF from Read as an error to be reported.
1069
-//
1070
-// The source needs to be writter via StdWriter, dstout or dsterr is selected
1071
-// based on the prefix added by StdWriter
1072
-func StdCopy(dstout, dsterr io.Writer, src io.Reader) (written int64, err error) {
1073
-	var (
1074
-		buf = make([]byte, 32*1024)
1075
-		nw  int
1076
-		ew  error
1077
-	)
1078
-
1079
-	for {
1080
-		nr, er := src.Read(buf)
1081
-		if nr > 0 {
1082
-			if bytes.Compare(buf[:StdWriterPrefixLen], Stdout) == 0 {
1083
-				nw, ew = dstout.Write(buf[StdWriterPrefixLen:nr])
1084
-			} else if bytes.Compare(buf[:StdWriterPrefixLen], Stderr) == 0 {
1085
-				nw, ew = dsterr.Write(buf[StdWriterPrefixLen:nr])
1086
-			} else if bytes.Compare(buf[:StdWriterPrefixLen], Stdin) == 0 {
1087
-				nw, ew = dstout.Write(buf[StdWriterPrefixLen:nr])
1088
-			}
1089
-			if nw > 0 {
1090
-				written += int64(nw)
1091
-			}
1092
-			if ew != nil {
1093
-				err = ew
1094
-				break
1095
-			}
1096
-			if nr-StdWriterPrefixLen != nw {
1097
-				err = io.ErrShortWrite
1098
-				break
1099
-			}
1100
-		}
1101
-		if er == io.EOF {
1102
-			break
1103
-		}
1104
-		if er != nil {
1105
-			err = er
1106
-			break
1107
-		}
1108
-	}
1109
-	return written, err
1110
-}