Browse code

fix docker logs error handling

Previously, `docker logs` (and by extension, `docker service logs`) had
an error in the way they returned errors that occurred after a stream
had already been started. Specifically, the errors were added verbatim
to the HTTP response, which caused StdCopy to fail with an esoteric
error. This change updates StdCopy to accept errors from the source
stream and then return those errors when copying to the destination
stream.

Signed-off-by: Drew Erny <drew.erny@docker.com>

Drew Erny authored on 2017/02/10 07:11:29
Showing 4 changed files
... ...
@@ -18,6 +18,7 @@ import (
18 18
 	"github.com/docker/docker/api/types/versions"
19 19
 	"github.com/docker/docker/pkg/ioutils"
20 20
 	"github.com/docker/docker/pkg/signal"
21
+	"github.com/docker/docker/pkg/stdcopy"
21 22
 	"golang.org/x/net/context"
22 23
 	"golang.org/x/net/websocket"
23 24
 )
... ...
@@ -108,9 +109,10 @@ func (s *containerRouter) getContainersLogs(ctx context.Context, w http.Response
108 108
 		select {
109 109
 		case <-chStarted:
110 110
 			// The client may be expecting all of the data we're sending to
111
-			// be multiplexed, so send it through OutStream, which will
112
-			// have been set up to handle that if needed.
113
-			fmt.Fprintf(logsConfig.OutStream, "Error running logs job: %v\n", err)
111
+			// be multiplexed, so mux it through the Systemerr stream, which
112
+			// will cause the client to throw an error when demuxing
113
+			stdwriter := stdcopy.NewStdWriter(logsConfig.OutStream, stdcopy.Systemerr)
114
+			fmt.Fprintf(stdwriter, "Error running logs job: %v\n", err)
114 115
 		default:
115 116
 			return err
116 117
 		}
... ...
@@ -13,6 +13,7 @@ import (
13 13
 	"github.com/docker/docker/api/types/backend"
14 14
 	"github.com/docker/docker/api/types/filters"
15 15
 	types "github.com/docker/docker/api/types/swarm"
16
+	"github.com/docker/docker/pkg/stdcopy"
16 17
 	"golang.org/x/net/context"
17 18
 )
18 19
 
... ...
@@ -252,7 +253,8 @@ func (sr *swarmRouter) getServiceLogs(ctx context.Context, w http.ResponseWriter
252 252
 			// The client may be expecting all of the data we're sending to
253 253
 			// be multiplexed, so send it through OutStream, which will
254 254
 			// have been set up to handle that if needed.
255
-			fmt.Fprintf(logsConfig.OutStream, "Error grabbing service logs: %v\n", err)
255
+			stdwriter := stdcopy.NewStdWriter(w, stdcopy.Systemerr)
256
+			fmt.Fprintf(stdwriter, "Error grabbing service logs: %v\n", err)
256 257
 		default:
257 258
 			return err
258 259
 		}
... ...
@@ -20,6 +20,9 @@ const (
20 20
 	Stdout
21 21
 	// Stderr represents standard error steam type.
22 22
 	Stderr
23
+	// Systemerr represents errors originating from the system that make it
24
+	// into the the multiplexed stream.
25
+	Systemerr
23 26
 
24 27
 	stdWriterPrefixLen = 8
25 28
 	stdWriterFdIndex   = 0
... ...
@@ -115,8 +118,9 @@ func StdCopy(dstout, dsterr io.Writer, src io.Reader) (written int64, err error)
115 115
 			}
116 116
 		}
117 117
 
118
+		stream := StdType(buf[stdWriterFdIndex])
118 119
 		// Check the first byte to know where to write
119
-		switch StdType(buf[stdWriterFdIndex]) {
120
+		switch stream {
120 121
 		case Stdin:
121 122
 			fallthrough
122 123
 		case Stdout:
... ...
@@ -125,6 +129,11 @@ func StdCopy(dstout, dsterr io.Writer, src io.Reader) (written int64, err error)
125 125
 		case Stderr:
126 126
 			// Write on stderr
127 127
 			out = dsterr
128
+		case Systemerr:
129
+			// If we're on Systemerr, we won't write anywhere.
130
+			// NB: if this code changes later, make sure you don't try to write
131
+			// to outstream if Systemerr is the stream
132
+			out = nil
128 133
 		default:
129 134
 			return 0, fmt.Errorf("Unrecognized input header: %d", buf[stdWriterFdIndex])
130 135
 		}
... ...
@@ -155,11 +164,18 @@ func StdCopy(dstout, dsterr io.Writer, src io.Reader) (written int64, err error)
155 155
 			}
156 156
 		}
157 157
 
158
+		// we might have an error from the source mixed up in our multiplexed
159
+		// stream. if we do, return it.
160
+		if stream == Systemerr {
161
+			return written, fmt.Errorf("error from daemon in stream: %s", string(buf[stdWriterPrefixLen:frameSize+stdWriterPrefixLen]))
162
+		}
163
+
158 164
 		// Write the retrieved frame (without header)
159 165
 		nw, ew = out.Write(buf[stdWriterPrefixLen : frameSize+stdWriterPrefixLen])
160 166
 		if ew != nil {
161 167
 			return 0, ew
162 168
 		}
169
+
163 170
 		// If the frame has not been fully written: error
164 171
 		if nw != frameSize {
165 172
 			return 0, io.ErrShortWrite
... ...
@@ -246,6 +246,35 @@ func TestStdCopyDetectsNotFullyWrittenFrames(t *testing.T) {
246 246
 	}
247 247
 }
248 248
 
249
+// TestStdCopyReturnsErrorFromSystem tests that StdCopy correctly returns an
250
+// error, when that error is muxed into the Systemerr stream.
251
+func TestStdCopyReturnsErrorFromSystem(t *testing.T) {
252
+	// write in the basic messages, just so there's some fluff in there
253
+	stdOutBytes := []byte(strings.Repeat("o", startingBufLen))
254
+	stdErrBytes := []byte(strings.Repeat("e", startingBufLen))
255
+	buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes)
256
+	if err != nil {
257
+		t.Fatal(err)
258
+	}
259
+	// add in an error message on the Systemerr stream
260
+	systemErrBytes := []byte(strings.Repeat("S", startingBufLen))
261
+	systemWriter := NewStdWriter(buffer, Systemerr)
262
+	_, err = systemWriter.Write(systemErrBytes)
263
+	if err != nil {
264
+		t.Fatal(err)
265
+	}
266
+
267
+	// now copy and demux. we should expect an error containing the string we
268
+	// wrote out
269
+	_, err = StdCopy(ioutil.Discard, ioutil.Discard, buffer)
270
+	if err == nil {
271
+		t.Fatal("expected error, got none")
272
+	}
273
+	if !strings.Contains(err.Error(), string(systemErrBytes)) {
274
+		t.Fatal("expected error to contain message")
275
+	}
276
+}
277
+
249 278
 func BenchmarkWrite(b *testing.B) {
250 279
 	w := NewStdWriter(ioutil.Discard, Stdout)
251 280
 	data := []byte("Test line for testing stdwriter performance\n")