By adding a (*WriteFlusher).Close, we limit the Write calls to possibly
deallocated http response buffers to the lifetime of an http request.
Typically, this is seen as a very confusing panic, the cause is usually a
situation where an http.ResponseWriter is held after request completion. We
avoid the panic by disallowing further writes to the response writer after the
request is completed.
Signed-off-by: Stephen J Day <stephen.day@docker.com>
| ... | ... |
@@ -63,7 +63,9 @@ func (s *router) getContainersStats(ctx context.Context, w http.ResponseWriter, |
| 63 | 63 |
w.Header().Set("Content-Type", "application/json")
|
| 64 | 64 |
out = w |
| 65 | 65 |
} else {
|
| 66 |
- out = ioutils.NewWriteFlusher(w) |
|
| 66 |
+ wf := ioutils.NewWriteFlusher(w) |
|
| 67 |
+ out = wf |
|
| 68 |
+ defer wf.Close() |
|
| 67 | 69 |
} |
| 68 | 70 |
|
| 69 | 71 |
var closeNotifier <-chan bool |
| ... | ... |
@@ -116,11 +118,16 @@ func (s *router) getContainersLogs(ctx context.Context, w http.ResponseWriter, r |
| 116 | 116 |
return derr.ErrorCodeNoSuchContainer.WithArgs(containerName) |
| 117 | 117 |
} |
| 118 | 118 |
|
| 119 |
- outStream := ioutils.NewWriteFlusher(w) |
|
| 120 | 119 |
// write an empty chunk of data (this is to ensure that the |
| 121 | 120 |
// HTTP Response is sent immediately, even if the container has |
| 122 | 121 |
// not yet produced any data) |
| 123 |
- outStream.Write(nil) |
|
| 122 |
+ w.WriteHeader(http.StatusOK) |
|
| 123 |
+ if flusher, ok := w.(http.Flusher); ok {
|
|
| 124 |
+ flusher.Flush() |
|
| 125 |
+ } |
|
| 126 |
+ |
|
| 127 |
+ output := ioutils.NewWriteFlusher(w) |
|
| 128 |
+ defer output.Close() |
|
| 124 | 129 |
|
| 125 | 130 |
logsConfig := &daemon.ContainerLogsConfig{
|
| 126 | 131 |
Follow: httputils.BoolValue(r, "follow"), |
| ... | ... |
@@ -129,7 +136,7 @@ func (s *router) getContainersLogs(ctx context.Context, w http.ResponseWriter, r |
| 129 | 129 |
Tail: r.Form.Get("tail"),
|
| 130 | 130 |
UseStdout: stdout, |
| 131 | 131 |
UseStderr: stderr, |
| 132 |
- OutStream: outStream, |
|
| 132 |
+ OutStream: output, |
|
| 133 | 133 |
Stop: closeNotifier, |
| 134 | 134 |
} |
| 135 | 135 |
|
| ... | ... |
@@ -105,6 +105,7 @@ func (s *router) postImagesCreate(ctx context.Context, w http.ResponseWriter, r |
| 105 | 105 |
err error |
| 106 | 106 |
output = ioutils.NewWriteFlusher(w) |
| 107 | 107 |
) |
| 108 |
+ defer output.Close() |
|
| 108 | 109 |
|
| 109 | 110 |
w.Header().Set("Content-Type", "application/json")
|
| 110 | 111 |
|
| ... | ... |
@@ -184,6 +185,7 @@ func (s *router) postImagesPush(ctx context.Context, w http.ResponseWriter, r *h |
| 184 | 184 |
|
| 185 | 185 |
name := vars["name"] |
| 186 | 186 |
output := ioutils.NewWriteFlusher(w) |
| 187 |
+ defer output.Close() |
|
| 187 | 188 |
imagePushConfig := &graph.ImagePushConfig{
|
| 188 | 189 |
MetaHeaders: metaHeaders, |
| 189 | 190 |
AuthConfig: authConfig, |
| ... | ... |
@@ -211,6 +213,7 @@ func (s *router) getImagesGet(ctx context.Context, w http.ResponseWriter, r *htt |
| 211 | 211 |
w.Header().Set("Content-Type", "application/x-tar")
|
| 212 | 212 |
|
| 213 | 213 |
output := ioutils.NewWriteFlusher(w) |
| 214 |
+ defer output.Close() |
|
| 214 | 215 |
var names []string |
| 215 | 216 |
if name, ok := vars["name"]; ok {
|
| 216 | 217 |
names = []string{name}
|
| ... | ... |
@@ -283,6 +286,7 @@ func (s *router) postBuild(ctx context.Context, w http.ResponseWriter, r *http.R |
| 283 | 283 |
|
| 284 | 284 |
version := httputils.VersionFromContext(ctx) |
| 285 | 285 |
output := ioutils.NewWriteFlusher(w) |
| 286 |
+ defer output.Close() |
|
| 286 | 287 |
sf := streamformatter.NewJSONStreamFormatter() |
| 287 | 288 |
errf := func(err error) error {
|
| 288 | 289 |
// Do not write the error in the http output if it's still empty. |
| ... | ... |
@@ -52,16 +52,6 @@ func (s *router) getInfo(ctx context.Context, w http.ResponseWriter, r *http.Req |
| 52 | 52 |
return httputils.WriteJSON(w, http.StatusOK, info) |
| 53 | 53 |
} |
| 54 | 54 |
|
| 55 |
-func buildOutputEncoder(w http.ResponseWriter) *json.Encoder {
|
|
| 56 |
- w.Header().Set("Content-Type", "application/json")
|
|
| 57 |
- outStream := ioutils.NewWriteFlusher(w) |
|
| 58 |
- // Write an empty chunk of data. |
|
| 59 |
- // This is to ensure that the HTTP status code is sent immediately, |
|
| 60 |
- // so that it will not block the receiver. |
|
| 61 |
- outStream.Write(nil) |
|
| 62 |
- return json.NewEncoder(outStream) |
|
| 63 |
-} |
|
| 64 |
- |
|
| 65 | 55 |
func (s *router) getEvents(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
|
| 66 | 56 |
if err := httputils.ParseForm(r); err != nil {
|
| 67 | 57 |
return err |
| ... | ... |
@@ -87,7 +77,19 @@ func (s *router) getEvents(ctx context.Context, w http.ResponseWriter, r *http.R |
| 87 | 87 |
return err |
| 88 | 88 |
} |
| 89 | 89 |
|
| 90 |
- enc := buildOutputEncoder(w) |
|
| 90 |
+ w.Header().Set("Content-Type", "application/json")
|
|
| 91 |
+ |
|
| 92 |
+ // This is to ensure that the HTTP status code is sent immediately, |
|
| 93 |
+ // so that it will not block the receiver. |
|
| 94 |
+ w.WriteHeader(http.StatusOK) |
|
| 95 |
+ if flusher, ok := w.(http.Flusher); ok {
|
|
| 96 |
+ flusher.Flush() |
|
| 97 |
+ } |
|
| 98 |
+ |
|
| 99 |
+ output := ioutils.NewWriteFlusher(w) |
|
| 100 |
+ defer output.Close() |
|
| 101 |
+ |
|
| 102 |
+ enc := json.NewEncoder(output) |
|
| 91 | 103 |
|
| 92 | 104 |
current, l, cancel := s.daemon.SubscribeToEvents() |
| 93 | 105 |
defer cancel() |
| ... | ... |
@@ -1,32 +1,54 @@ |
| 1 | 1 |
package ioutils |
| 2 | 2 |
|
| 3 | 3 |
import ( |
| 4 |
+ "errors" |
|
| 4 | 5 |
"io" |
| 5 | 6 |
"net/http" |
| 6 | 7 |
"sync" |
| 7 | 8 |
) |
| 8 | 9 |
|
| 9 |
-// WriteFlusher wraps the Write and Flush operation. |
|
| 10 |
+// WriteFlusher wraps the Write and Flush operation ensuring that every write |
|
| 11 |
+// is a flush. In addition, the Close method can be called to intercept |
|
| 12 |
+// Read/Write calls if the targets lifecycle has already ended. |
|
| 10 | 13 |
type WriteFlusher struct {
|
| 11 |
- sync.Mutex |
|
| 14 |
+ mu sync.Mutex |
|
| 12 | 15 |
w io.Writer |
| 13 | 16 |
flusher http.Flusher |
| 14 | 17 |
flushed bool |
| 18 |
+ closed error |
|
| 19 |
+ |
|
| 20 |
+ // TODO(stevvooe): Use channel for closed instead, remove mutex. Using a |
|
| 21 |
+ // channel will allow one to properly order the operations. |
|
| 15 | 22 |
} |
| 16 | 23 |
|
| 24 |
+var errWriteFlusherClosed = errors.New("writeflusher: closed")
|
|
| 25 |
+ |
|
| 17 | 26 |
func (wf *WriteFlusher) Write(b []byte) (n int, err error) {
|
| 18 |
- wf.Lock() |
|
| 19 |
- defer wf.Unlock() |
|
| 27 |
+ wf.mu.Lock() |
|
| 28 |
+ defer wf.mu.Unlock() |
|
| 29 |
+ if wf.closed != nil {
|
|
| 30 |
+ return 0, wf.closed |
|
| 31 |
+ } |
|
| 32 |
+ |
|
| 20 | 33 |
n, err = wf.w.Write(b) |
| 21 |
- wf.flushed = true |
|
| 22 |
- wf.flusher.Flush() |
|
| 34 |
+ wf.flush() // every write is a flush. |
|
| 23 | 35 |
return n, err |
| 24 | 36 |
} |
| 25 | 37 |
|
| 26 | 38 |
// Flush the stream immediately. |
| 27 | 39 |
func (wf *WriteFlusher) Flush() {
|
| 28 |
- wf.Lock() |
|
| 29 |
- defer wf.Unlock() |
|
| 40 |
+ wf.mu.Lock() |
|
| 41 |
+ defer wf.mu.Unlock() |
|
| 42 |
+ |
|
| 43 |
+ wf.flush() |
|
| 44 |
+} |
|
| 45 |
+ |
|
| 46 |
+// flush the stream immediately without taking a lock. Used internally. |
|
| 47 |
+func (wf *WriteFlusher) flush() {
|
|
| 48 |
+ if wf.closed != nil {
|
|
| 49 |
+ return |
|
| 50 |
+ } |
|
| 51 |
+ |
|
| 30 | 52 |
wf.flushed = true |
| 31 | 53 |
wf.flusher.Flush() |
| 32 | 54 |
} |
| ... | ... |
@@ -34,11 +56,30 @@ func (wf *WriteFlusher) Flush() {
|
| 34 | 34 |
// Flushed returns the state of flushed. |
| 35 | 35 |
// If it's flushed, return true, or else it return false. |
| 36 | 36 |
func (wf *WriteFlusher) Flushed() bool {
|
| 37 |
- wf.Lock() |
|
| 38 |
- defer wf.Unlock() |
|
| 37 |
+ // BUG(stevvooe): Remove this method. Its use is inherently racy. Seems to |
|
| 38 |
+ // be used to detect whether or a response code has been issued or not. |
|
| 39 |
+ // Another hook should be used instead. |
|
| 40 |
+ wf.mu.Lock() |
|
| 41 |
+ defer wf.mu.Unlock() |
|
| 42 |
+ |
|
| 39 | 43 |
return wf.flushed |
| 40 | 44 |
} |
| 41 | 45 |
|
| 46 |
+// Close closes the write flusher, disallowing any further writes to the |
|
| 47 |
+// target. After the flusher is closed, all calls to write or flush will |
|
| 48 |
+// result in an error. |
|
| 49 |
+func (wf *WriteFlusher) Close() error {
|
|
| 50 |
+ wf.mu.Lock() |
|
| 51 |
+ defer wf.mu.Unlock() |
|
| 52 |
+ |
|
| 53 |
+ if wf.closed != nil {
|
|
| 54 |
+ return wf.closed |
|
| 55 |
+ } |
|
| 56 |
+ |
|
| 57 |
+ wf.closed = errWriteFlusherClosed |
|
| 58 |
+ return nil |
|
| 59 |
+} |
|
| 60 |
+ |
|
| 42 | 61 |
// NewWriteFlusher returns a new WriteFlusher. |
| 43 | 62 |
func NewWriteFlusher(w io.Writer) *WriteFlusher {
|
| 44 | 63 |
var flusher http.Flusher |