Browse code

Merge pull request #13966 from mountkin/fix-stats-goroutine-leak

fix the goroutine leak in the stats API if the container is not running

David Calavera authored on 2015/06/24 02:06:35
Showing 4 changed files
... ...
@@ -582,7 +582,18 @@ func (s *Server) getContainersStats(version version.Version, w http.ResponseWrit
582 582
 		out = ioutils.NewWriteFlusher(w)
583 583
 	}
584 584
 
585
-	return s.daemon.ContainerStats(vars["name"], stream, out)
585
+	var closeNotifier <-chan bool
586
+	if notifier, ok := w.(http.CloseNotifier); ok {
587
+		closeNotifier = notifier.CloseNotify()
588
+	}
589
+
590
+	config := &daemon.ContainerStatsConfig{
591
+		Stream:    stream,
592
+		OutStream: out,
593
+		Stop:      closeNotifier,
594
+	}
595
+
596
+	return s.daemon.ContainerStats(vars["name"], config)
586 597
 }
587 598
 
588 599
 func (s *Server) getContainersLogs(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
... ...
@@ -8,12 +8,22 @@ import (
8 8
 	"github.com/docker/docker/daemon/execdriver"
9 9
 )
10 10
 
11
-func (daemon *Daemon) ContainerStats(name string, stream bool, out io.Writer) error {
11
+type ContainerStatsConfig struct {
12
+	Stream    bool
13
+	OutStream io.Writer
14
+	Stop      <-chan bool
15
+}
16
+
17
+func (daemon *Daemon) ContainerStats(name string, config *ContainerStatsConfig) error {
12 18
 	updates, err := daemon.SubscribeToContainerStats(name)
13 19
 	if err != nil {
14 20
 		return err
15 21
 	}
16 22
 
23
+	if config.Stream {
24
+		config.OutStream.Write(nil)
25
+	}
26
+
17 27
 	var preCpuStats types.CpuStats
18 28
 	getStat := func(v interface{}) *types.Stats {
19 29
 		update := v.(*execdriver.ResourceStats)
... ...
@@ -26,26 +36,34 @@ func (daemon *Daemon) ContainerStats(name string, stream bool, out io.Writer) er
26 26
 		return ss
27 27
 	}
28 28
 
29
-	enc := json.NewEncoder(out)
29
+	enc := json.NewEncoder(config.OutStream)
30 30
 
31
-	if !stream {
32
-		// prime the cpu stats so they aren't 0 in the final output
33
-		s := getStat(<-updates)
31
+	defer daemon.UnsubscribeToContainerStats(name, updates)
34 32
 
35
-		// now pull stats again with the cpu stats primed
36
-		s = getStat(<-updates)
37
-		err := enc.Encode(s)
38
-		daemon.UnsubscribeToContainerStats(name, updates)
39
-		return err
40
-	}
33
+	noStreamFirstFrame := true
34
+	for {
35
+		select {
36
+		case v, ok := <-updates:
37
+			if !ok {
38
+				return nil
39
+			}
40
+
41
+			s := getStat(v)
42
+			if !config.Stream && noStreamFirstFrame {
43
+				// prime the cpu stats so they aren't 0 in the final output
44
+				noStreamFirstFrame = false
45
+				continue
46
+			}
47
+
48
+			if err := enc.Encode(s); err != nil {
49
+				return err
50
+			}
41 51
 
42
-	for v := range updates {
43
-		s := getStat(v)
44
-		if err := enc.Encode(s); err != nil {
45
-			// TODO: handle the specific broken pipe
46
-			daemon.UnsubscribeToContainerStats(name, updates)
47
-			return err
52
+			if !config.Stream {
53
+				return nil
54
+			}
55
+		case <-config.Stop:
56
+			return nil
48 57
 		}
49 58
 	}
50
-	return nil
51 59
 }
... ...
@@ -4,6 +4,7 @@ import (
4 4
 	"encoding/json"
5 5
 	"fmt"
6 6
 	"strings"
7
+	"time"
7 8
 
8 9
 	"github.com/docker/docker/api/types"
9 10
 	"github.com/go-check/check"
... ...
@@ -33,3 +34,38 @@ func (s *DockerSuite) TestCliStatsNoStreamGetCpu(c *check.C) {
33 33
 		c.Fatalf("docker stats with no-stream get cpu usage failed: was %v", cpuPercent)
34 34
 	}
35 35
 }
36
+
37
+func (s *DockerSuite) TestStoppedContainerStatsGoroutines(c *check.C) {
38
+	out, _ := dockerCmd(c, "run", "-d", "busybox", "/bin/sh", "-c", "echo 1")
39
+	id := strings.TrimSpace(out)
40
+
41
+	getGoRoutines := func() int {
42
+		_, body, err := sockRequestRaw("GET", fmt.Sprintf("/info"), nil, "")
43
+		c.Assert(err, check.IsNil)
44
+		info := types.Info{}
45
+		err = json.NewDecoder(body).Decode(&info)
46
+		c.Assert(err, check.IsNil)
47
+		body.Close()
48
+		return info.NGoroutines
49
+	}
50
+
51
+	// When the HTTP connection is closed, the number of goroutines should not increase.
52
+	routines := getGoRoutines()
53
+	_, body, err := sockRequestRaw("GET", fmt.Sprintf("/containers/%s/stats", id), nil, "")
54
+	c.Assert(err, check.IsNil)
55
+	body.Close()
56
+
57
+	t := time.After(30 * time.Second)
58
+	for {
59
+		select {
60
+		case <-t:
61
+			c.Assert(getGoRoutines() <= routines, check.Equals, true)
62
+			return
63
+		default:
64
+			if n := getGoRoutines(); n <= routines {
65
+				return
66
+			}
67
+			time.Sleep(200 * time.Millisecond)
68
+		}
69
+	}
70
+}
... ...
@@ -366,8 +366,8 @@ func sockRequestRaw(method, endpoint string, data io.Reader, ct string) (*http.R
366 366
 		return nil, nil, fmt.Errorf("could not perform request: %v", err)
367 367
 	}
368 368
 	body := ioutils.NewReadCloserWrapper(resp.Body, func() error {
369
-		defer client.Close()
370
-		return resp.Body.Close()
369
+		defer resp.Body.Close()
370
+		return client.Close()
371 371
 	})
372 372
 
373 373
 	return resp, body, nil