Browse code

Fixes issue with stats on start event

In situations where a client is called like `docker stats` with no
arguments or flags, if a container which was already created but not
started yet is then subsequently started it will not be added to the
stats list as expected.

Also splits some of the stats helpers to a separate file from the stats
CLI which is already quite long.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>

Brian Goff authored on 2016/03/01 04:24:51
Showing 3 changed files
... ...
@@ -6,10 +6,12 @@ import (
6 6
 	"io"
7 7
 	"sort"
8 8
 	"strings"
9
+	"sync"
9 10
 	"time"
10 11
 
11 12
 	"golang.org/x/net/context"
12 13
 
14
+	"github.com/Sirupsen/logrus"
13 15
 	Cli "github.com/docker/docker/cli"
14 16
 	"github.com/docker/docker/opts"
15 17
 	"github.com/docker/docker/pkg/jsonlog"
... ...
@@ -115,3 +117,31 @@ func printOutput(event eventtypes.Message, output io.Writer) {
115 115
 	}
116 116
 	fmt.Fprint(output, "\n")
117 117
 }
118
+
119
+type eventHandler struct {
120
+	handlers map[string]func(eventtypes.Message)
121
+	mu       sync.Mutex
122
+	closed   bool
123
+}
124
+
125
+func (w *eventHandler) Handle(action string, h func(eventtypes.Message)) {
126
+	w.mu.Lock()
127
+	w.handlers[action] = h
128
+	w.mu.Unlock()
129
+}
130
+
131
+// Watch ranges over the passed in event chan and processes the events based on the
132
+// handlers created for a given action.
133
+// To stop watching, close the event chan.
134
+func (w *eventHandler) Watch(c <-chan eventtypes.Message) {
135
+	for e := range c {
136
+		w.mu.Lock()
137
+		h, exists := w.handlers[e.Action]
138
+		w.mu.Unlock()
139
+		if !exists {
140
+			continue
141
+		}
142
+		logrus.Debugf("event handler: received event: %v", e)
143
+		go h(e)
144
+	}
145
+}
... ...
@@ -1,11 +1,9 @@
1 1
 package client
2 2
 
3 3
 import (
4
-	"encoding/json"
5 4
 	"fmt"
6 5
 	"io"
7 6
 	"strings"
8
-	"sync"
9 7
 	"text/tabwriter"
10 8
 	"time"
11 9
 
... ...
@@ -15,134 +13,8 @@ import (
15 15
 	"github.com/docker/engine-api/types"
16 16
 	"github.com/docker/engine-api/types/events"
17 17
 	"github.com/docker/engine-api/types/filters"
18
-	"github.com/docker/go-units"
19 18
 )
20 19
 
21
-type containerStats struct {
22
-	Name             string
23
-	CPUPercentage    float64
24
-	Memory           float64
25
-	MemoryLimit      float64
26
-	MemoryPercentage float64
27
-	NetworkRx        float64
28
-	NetworkTx        float64
29
-	BlockRead        float64
30
-	BlockWrite       float64
31
-	mu               sync.RWMutex
32
-	err              error
33
-}
34
-
35
-type stats struct {
36
-	mu sync.Mutex
37
-	cs []*containerStats
38
-}
39
-
40
-func (s *stats) isKnownContainer(cid string) bool {
41
-	for _, c := range s.cs {
42
-		if c.Name == cid {
43
-			return true
44
-		}
45
-	}
46
-	return false
47
-}
48
-
49
-func (s *containerStats) Collect(cli *DockerCli, streamStats bool) {
50
-	responseBody, err := cli.client.ContainerStats(context.Background(), s.Name, streamStats)
51
-	if err != nil {
52
-		s.mu.Lock()
53
-		s.err = err
54
-		s.mu.Unlock()
55
-		return
56
-	}
57
-	defer responseBody.Close()
58
-
59
-	var (
60
-		previousCPU    uint64
61
-		previousSystem uint64
62
-		dec            = json.NewDecoder(responseBody)
63
-		u              = make(chan error, 1)
64
-	)
65
-	go func() {
66
-		for {
67
-			var v *types.StatsJSON
68
-			if err := dec.Decode(&v); err != nil {
69
-				u <- err
70
-				return
71
-			}
72
-
73
-			var memPercent = 0.0
74
-			var cpuPercent = 0.0
75
-
76
-			// MemoryStats.Limit will never be 0 unless the container is not running and we haven't
77
-			// got any data from cgroup
78
-			if v.MemoryStats.Limit != 0 {
79
-				memPercent = float64(v.MemoryStats.Usage) / float64(v.MemoryStats.Limit) * 100.0
80
-			}
81
-
82
-			previousCPU = v.PreCPUStats.CPUUsage.TotalUsage
83
-			previousSystem = v.PreCPUStats.SystemUsage
84
-			cpuPercent = calculateCPUPercent(previousCPU, previousSystem, v)
85
-			blkRead, blkWrite := calculateBlockIO(v.BlkioStats)
86
-			s.mu.Lock()
87
-			s.CPUPercentage = cpuPercent
88
-			s.Memory = float64(v.MemoryStats.Usage)
89
-			s.MemoryLimit = float64(v.MemoryStats.Limit)
90
-			s.MemoryPercentage = memPercent
91
-			s.NetworkRx, s.NetworkTx = calculateNetwork(v.Networks)
92
-			s.BlockRead = float64(blkRead)
93
-			s.BlockWrite = float64(blkWrite)
94
-			s.mu.Unlock()
95
-			u <- nil
96
-			if !streamStats {
97
-				return
98
-			}
99
-		}
100
-	}()
101
-	for {
102
-		select {
103
-		case <-time.After(2 * time.Second):
104
-			// zero out the values if we have not received an update within
105
-			// the specified duration.
106
-			s.mu.Lock()
107
-			s.CPUPercentage = 0
108
-			s.Memory = 0
109
-			s.MemoryPercentage = 0
110
-			s.MemoryLimit = 0
111
-			s.NetworkRx = 0
112
-			s.NetworkTx = 0
113
-			s.BlockRead = 0
114
-			s.BlockWrite = 0
115
-			s.mu.Unlock()
116
-		case err := <-u:
117
-			if err != nil {
118
-				s.mu.Lock()
119
-				s.err = err
120
-				s.mu.Unlock()
121
-				return
122
-			}
123
-		}
124
-		if !streamStats {
125
-			return
126
-		}
127
-	}
128
-}
129
-
130
-func (s *containerStats) Display(w io.Writer) error {
131
-	s.mu.RLock()
132
-	defer s.mu.RUnlock()
133
-	if s.err != nil {
134
-		return s.err
135
-	}
136
-	fmt.Fprintf(w, "%s\t%.2f%%\t%s / %s\t%.2f%%\t%s / %s\t%s / %s\n",
137
-		s.Name,
138
-		s.CPUPercentage,
139
-		units.HumanSize(s.Memory), units.HumanSize(s.MemoryLimit),
140
-		s.MemoryPercentage,
141
-		units.HumanSize(s.NetworkRx), units.HumanSize(s.NetworkTx),
142
-		units.HumanSize(s.BlockRead), units.HumanSize(s.BlockWrite))
143
-	return nil
144
-}
145
-
146 20
 // CmdStats displays a live stream of resource usage statistics for one or more containers.
147 21
 //
148 22
 // This shows real-time information on CPU usage, memory usage, and network I/O.
... ...
@@ -157,27 +29,11 @@ func (cli *DockerCli) CmdStats(args ...string) error {
157 157
 
158 158
 	names := cmd.Args()
159 159
 	showAll := len(names) == 0
160
-
161
-	// The containerChan is the central synchronization piece for this function,
162
-	// and all messages to either add or remove an element to the list of
163
-	// monitored containers go through this.
164
-	//
165
-	//   - When watching all containers, a goroutine subscribes to the events
166
-	//     API endpoint and messages this channel accordingly.
167
-	//   - When watching a particular subset of containers, we feed the
168
-	//     requested list of containers to this channel.
169
-	//   - For both codepaths, a goroutine is responsible for watching this
170
-	//     channel and subscribing to the stats API for containers.
171
-	type containerEvent struct {
172
-		id    string
173
-		event string
174
-		err   error
175
-	}
176
-	containerChan := make(chan containerEvent)
160
+	closeChan := make(chan error)
177 161
 
178 162
 	// monitorContainerEvents watches for container creation and removal (only
179 163
 	// used when calling `docker stats` without arguments).
180
-	monitorContainerEvents := func(started chan<- struct{}, c chan<- containerEvent) {
164
+	monitorContainerEvents := func(started chan<- struct{}, c chan events.Message) {
181 165
 		f := filters.NewArgs()
182 166
 		f.Add("type", "container")
183 167
 		options := types.EventsOptions{
... ...
@@ -188,91 +44,82 @@ func (cli *DockerCli) CmdStats(args ...string) error {
188 188
 		// unblock the main goroutine.
189 189
 		close(started)
190 190
 		if err != nil {
191
-			c <- containerEvent{err: err}
191
+			closeChan <- err
192 192
 			return
193 193
 		}
194 194
 		defer resBody.Close()
195
+
195 196
 		decodeEvents(resBody, func(event events.Message, err error) error {
196 197
 			if err != nil {
197
-				c <- containerEvent{"", "", err}
198
-			} else {
199
-				c <- containerEvent{event.ID[:12], event.Action, err}
198
+				closeChan <- err
199
+				return nil
200 200
 			}
201
+			c <- event
201 202
 			return nil
202 203
 		})
203 204
 	}
204 205
 
206
+	cStats := stats{}
205 207
 	// getContainerList simulates creation event for all previously existing
206 208
 	// containers (only used when calling `docker stats` without arguments).
207
-	getContainerList := func(c chan<- containerEvent) {
209
+	getContainerList := func() {
208 210
 		options := types.ContainerListOptions{
209 211
 			All: *all,
210 212
 		}
211 213
 		cs, err := cli.client.ContainerList(options)
212 214
 		if err != nil {
213
-			containerChan <- containerEvent{"", "", err}
215
+			closeChan <- err
214 216
 		}
215
-		for _, c := range cs {
216
-			containerChan <- containerEvent{c.ID[:12], "create", nil}
217
+		for _, container := range cs {
218
+			s := &containerStats{Name: container.ID[:12]}
219
+			cStats.add(s)
220
+			go s.Collect(cli.client, !*noStream)
217 221
 		}
218 222
 	}
219 223
 
220
-	// Monitor the containerChan and start collection for each container.
221
-	cStats := stats{}
222
-	closeChan := make(chan error)
223
-	go func(stopChan chan<- error, c <-chan containerEvent) {
224
-		for {
225
-			event := <-c
226
-			if event.err != nil {
227
-				stopChan <- event.err
228
-				return
229
-			}
230
-			switch event.event {
231
-			case "create":
232
-				cStats.mu.Lock()
233
-				if !cStats.isKnownContainer(event.id) {
234
-					s := &containerStats{Name: event.id}
235
-					cStats.cs = append(cStats.cs, s)
236
-					go s.Collect(cli, !*noStream)
237
-				}
238
-				cStats.mu.Unlock()
239
-			case "stop":
240
-			case "die":
241
-				if !*all {
242
-					var remove int
243
-					// cStats cannot be O(1) with a map cause ranging over it would cause
244
-					// containers in stats to move up and down in the list...:(
245
-					cStats.mu.Lock()
246
-					for i, s := range cStats.cs {
247
-						if s.Name == event.id {
248
-							remove = i
249
-							break
250
-						}
251
-					}
252
-					cStats.cs = append(cStats.cs[:remove], cStats.cs[remove+1:]...)
253
-					cStats.mu.Unlock()
254
-				}
255
-			}
256
-		}
257
-	}(closeChan, containerChan)
258
-
259 224
 	if showAll {
260 225
 		// If no names were specified, start a long running goroutine which
261 226
 		// monitors container events. We make sure we're subscribed before
262 227
 		// retrieving the list of running containers to avoid a race where we
263 228
 		// would "miss" a creation.
264 229
 		started := make(chan struct{})
265
-		go monitorContainerEvents(started, containerChan)
230
+		eh := eventHandler{handlers: make(map[string]func(events.Message))}
231
+		eh.Handle("create", func(e events.Message) {
232
+			if *all {
233
+				s := &containerStats{Name: e.ID[:12]}
234
+				cStats.add(s)
235
+				go s.Collect(cli.client, !*noStream)
236
+			}
237
+		})
238
+
239
+		eh.Handle("start", func(e events.Message) {
240
+			s := &containerStats{Name: e.ID[:12]}
241
+			cStats.add(s)
242
+			go s.Collect(cli.client, !*noStream)
243
+		})
244
+
245
+		eh.Handle("die", func(e events.Message) {
246
+			if !*all {
247
+				cStats.remove(e.ID[:12])
248
+			}
249
+		})
250
+
251
+		eventChan := make(chan events.Message)
252
+		go eh.Watch(eventChan)
253
+		go monitorContainerEvents(started, eventChan)
254
+		defer close(eventChan)
266 255
 		<-started
267 256
 
268 257
 		// Start a short-lived goroutine to retrieve the initial list of
269 258
 		// containers.
270
-		go getContainerList(containerChan)
259
+		go getContainerList()
271 260
 	} else {
272 261
 		// Artificially send creation events for the containers we were asked to
273 262
 		// monitor (same code path than we use when monitoring all containers).
274 263
 		for _, name := range names {
275
-			containerChan <- containerEvent{name, "create", nil}
264
+			s := &containerStats{Name: name}
265
+			cStats.add(s)
266
+			go s.Collect(cli.client, !*noStream)
276 267
 		}
277 268
 
278 269
 		// We don't expect any asynchronous errors: closeChan can be closed.
... ...
@@ -304,6 +151,7 @@ func (cli *DockerCli) CmdStats(args ...string) error {
304 304
 		}
305 305
 		io.WriteString(w, "CONTAINER\tCPU %\tMEM USAGE / LIMIT\tMEM %\tNET I/O\tBLOCK I/O\n")
306 306
 	}
307
+
307 308
 	for range time.Tick(500 * time.Millisecond) {
308 309
 		printHeader()
309 310
 		toRemove := []int{}
... ...
@@ -343,40 +191,3 @@ func (cli *DockerCli) CmdStats(args ...string) error {
343 343
 	}
344 344
 	return nil
345 345
 }
346
-
347
-func calculateCPUPercent(previousCPU, previousSystem uint64, v *types.StatsJSON) float64 {
348
-	var (
349
-		cpuPercent = 0.0
350
-		// calculate the change for the cpu usage of the container in between readings
351
-		cpuDelta = float64(v.CPUStats.CPUUsage.TotalUsage) - float64(previousCPU)
352
-		// calculate the change for the entire system between readings
353
-		systemDelta = float64(v.CPUStats.SystemUsage) - float64(previousSystem)
354
-	)
355
-
356
-	if systemDelta > 0.0 && cpuDelta > 0.0 {
357
-		cpuPercent = (cpuDelta / systemDelta) * float64(len(v.CPUStats.CPUUsage.PercpuUsage)) * 100.0
358
-	}
359
-	return cpuPercent
360
-}
361
-
362
-func calculateBlockIO(blkio types.BlkioStats) (blkRead uint64, blkWrite uint64) {
363
-	for _, bioEntry := range blkio.IoServiceBytesRecursive {
364
-		switch strings.ToLower(bioEntry.Op) {
365
-		case "read":
366
-			blkRead = blkRead + bioEntry.Value
367
-		case "write":
368
-			blkWrite = blkWrite + bioEntry.Value
369
-		}
370
-	}
371
-	return
372
-}
373
-
374
-func calculateNetwork(network map[string]types.NetworkStats) (float64, float64) {
375
-	var rx, tx float64
376
-
377
-	for _, v := range network {
378
-		rx += float64(v.RxBytes)
379
-		tx += float64(v.TxBytes)
380
-	}
381
-	return rx, tx
382
-}
383 346
new file mode 100644
... ...
@@ -0,0 +1,193 @@
0
+package client
1
+
2
+import (
3
+	"encoding/json"
4
+	"fmt"
5
+	"io"
6
+	"strings"
7
+	"sync"
8
+	"time"
9
+
10
+	"github.com/docker/engine-api/client"
11
+	"github.com/docker/engine-api/types"
12
+	"github.com/docker/go-units"
13
+	"golang.org/x/net/context"
14
+)
15
+
16
+type containerStats struct {
17
+	Name             string
18
+	CPUPercentage    float64
19
+	Memory           float64
20
+	MemoryLimit      float64
21
+	MemoryPercentage float64
22
+	NetworkRx        float64
23
+	NetworkTx        float64
24
+	BlockRead        float64
25
+	BlockWrite       float64
26
+	mu               sync.RWMutex
27
+	err              error
28
+}
29
+
30
+type stats struct {
31
+	mu sync.Mutex
32
+	cs []*containerStats
33
+}
34
+
35
+func (s *stats) add(cs *containerStats) {
36
+	s.mu.Lock()
37
+	if _, exists := s.isKnownContainer(cs.Name); !exists {
38
+		s.cs = append(s.cs, cs)
39
+	}
40
+	s.mu.Unlock()
41
+}
42
+
43
+func (s *stats) remove(id string) {
44
+	s.mu.Lock()
45
+	if i, exists := s.isKnownContainer(id); exists {
46
+		s.cs = append(s.cs[:i], s.cs[i+1:]...)
47
+	}
48
+	s.mu.Unlock()
49
+}
50
+
51
+func (s *stats) isKnownContainer(cid string) (int, bool) {
52
+	for i, c := range s.cs {
53
+		if c.Name == cid {
54
+			return i, true
55
+		}
56
+	}
57
+	return -1, false
58
+}
59
+
60
+func (s *containerStats) Collect(cli client.APIClient, streamStats bool) {
61
+	responseBody, err := cli.ContainerStats(context.Background(), s.Name, streamStats)
62
+	if err != nil {
63
+		s.mu.Lock()
64
+		s.err = err
65
+		s.mu.Unlock()
66
+		return
67
+	}
68
+	defer responseBody.Close()
69
+
70
+	var (
71
+		previousCPU    uint64
72
+		previousSystem uint64
73
+		dec            = json.NewDecoder(responseBody)
74
+		u              = make(chan error, 1)
75
+	)
76
+	go func() {
77
+		for {
78
+			var v *types.StatsJSON
79
+			if err := dec.Decode(&v); err != nil {
80
+				u <- err
81
+				return
82
+			}
83
+
84
+			var memPercent = 0.0
85
+			var cpuPercent = 0.0
86
+
87
+			// MemoryStats.Limit will never be 0 unless the container is not running and we haven't
88
+			// got any data from cgroup
89
+			if v.MemoryStats.Limit != 0 {
90
+				memPercent = float64(v.MemoryStats.Usage) / float64(v.MemoryStats.Limit) * 100.0
91
+			}
92
+
93
+			previousCPU = v.PreCPUStats.CPUUsage.TotalUsage
94
+			previousSystem = v.PreCPUStats.SystemUsage
95
+			cpuPercent = calculateCPUPercent(previousCPU, previousSystem, v)
96
+			blkRead, blkWrite := calculateBlockIO(v.BlkioStats)
97
+			s.mu.Lock()
98
+			s.CPUPercentage = cpuPercent
99
+			s.Memory = float64(v.MemoryStats.Usage)
100
+			s.MemoryLimit = float64(v.MemoryStats.Limit)
101
+			s.MemoryPercentage = memPercent
102
+			s.NetworkRx, s.NetworkTx = calculateNetwork(v.Networks)
103
+			s.BlockRead = float64(blkRead)
104
+			s.BlockWrite = float64(blkWrite)
105
+			s.mu.Unlock()
106
+			u <- nil
107
+			if !streamStats {
108
+				return
109
+			}
110
+		}
111
+	}()
112
+	for {
113
+		select {
114
+		case <-time.After(2 * time.Second):
115
+			// zero out the values if we have not received an update within
116
+			// the specified duration.
117
+			s.mu.Lock()
118
+			s.CPUPercentage = 0
119
+			s.Memory = 0
120
+			s.MemoryPercentage = 0
121
+			s.MemoryLimit = 0
122
+			s.NetworkRx = 0
123
+			s.NetworkTx = 0
124
+			s.BlockRead = 0
125
+			s.BlockWrite = 0
126
+			s.mu.Unlock()
127
+		case err := <-u:
128
+			if err != nil {
129
+				s.mu.Lock()
130
+				s.err = err
131
+				s.mu.Unlock()
132
+				return
133
+			}
134
+		}
135
+		if !streamStats {
136
+			return
137
+		}
138
+	}
139
+}
140
+
141
+func (s *containerStats) Display(w io.Writer) error {
142
+	s.mu.RLock()
143
+	defer s.mu.RUnlock()
144
+	if s.err != nil {
145
+		return s.err
146
+	}
147
+	fmt.Fprintf(w, "%s\t%.2f%%\t%s / %s\t%.2f%%\t%s / %s\t%s / %s\n",
148
+		s.Name,
149
+		s.CPUPercentage,
150
+		units.HumanSize(s.Memory), units.HumanSize(s.MemoryLimit),
151
+		s.MemoryPercentage,
152
+		units.HumanSize(s.NetworkRx), units.HumanSize(s.NetworkTx),
153
+		units.HumanSize(s.BlockRead), units.HumanSize(s.BlockWrite))
154
+	return nil
155
+}
156
+
157
+func calculateCPUPercent(previousCPU, previousSystem uint64, v *types.StatsJSON) float64 {
158
+	var (
159
+		cpuPercent = 0.0
160
+		// calculate the change for the cpu usage of the container in between readings
161
+		cpuDelta = float64(v.CPUStats.CPUUsage.TotalUsage) - float64(previousCPU)
162
+		// calculate the change for the entire system between readings
163
+		systemDelta = float64(v.CPUStats.SystemUsage) - float64(previousSystem)
164
+	)
165
+
166
+	if systemDelta > 0.0 && cpuDelta > 0.0 {
167
+		cpuPercent = (cpuDelta / systemDelta) * float64(len(v.CPUStats.CPUUsage.PercpuUsage)) * 100.0
168
+	}
169
+	return cpuPercent
170
+}
171
+
172
+func calculateBlockIO(blkio types.BlkioStats) (blkRead uint64, blkWrite uint64) {
173
+	for _, bioEntry := range blkio.IoServiceBytesRecursive {
174
+		switch strings.ToLower(bioEntry.Op) {
175
+		case "read":
176
+			blkRead = blkRead + bioEntry.Value
177
+		case "write":
178
+			blkWrite = blkWrite + bioEntry.Value
179
+		}
180
+	}
181
+	return
182
+}
183
+
184
+func calculateNetwork(network map[string]types.NetworkStats) (float64, float64) {
185
+	var rx, tx float64
186
+
187
+	for _, v := range network {
188
+		rx += float64(v.RxBytes)
189
+		tx += float64(v.TxBytes)
190
+	}
191
+	return rx, tx
192
+}