Browse code

Fix client-side race in `docker stats`

Subscribe to events and monitor for new containers before the initial
listing of currently running containers.

This fixes a race where a new container could appear between the first
list call but before the client was subscribed to events, leading to a
container never appearing in the output of `docker stats`.

Signed-off-by: Arnaud Porterie <arnaud.porterie@docker.com>

Arnaud Porterie authored on 2016/02/28 11:30:31
Showing 1 changed files
... ...
@@ -4,7 +4,6 @@ import (
4 4
 	"encoding/json"
5 5
 	"fmt"
6 6
 	"io"
7
-	"sort"
8 7
 	"strings"
9 8
 	"sync"
10 9
 	"text/tabwriter"
... ...
@@ -38,6 +37,15 @@ type stats struct {
38 38
 	cs []*containerStats
39 39
 }
40 40
 
41
+func (s *stats) isKnownContainer(cid string) bool {
42
+	for _, c := range s.cs {
43
+		if c.Name == cid {
44
+			return true
45
+		}
46
+	}
47
+	return false
48
+}
49
+
41 50
 func (s *containerStats) Collect(cli *DockerCli, streamStats bool) {
42 51
 	responseBody, err := cli.client.ContainerStats(context.Background(), s.Name, streamStats)
43 52
 	if err != nil {
... ...
@@ -150,123 +158,151 @@ func (cli *DockerCli) CmdStats(args ...string) error {
150 150
 	names := cmd.Args()
151 151
 	showAll := len(names) == 0
152 152
 
153
-	if showAll {
153
+	// The containerChan is the central synchronization piece for this function,
154
+	// and all messages to either add or remove an element to the list of
155
+	// monitored containers go through this.
156
+	//
157
+	//   - When watching all containers, a goroutine subscribes to the events
158
+	//     API endpoint and messages this channel accordingly.
159
+	//   - When watching a particular subset of containers, we feed the
160
+	//     requested list of containers to this channel.
161
+	//   - For both codepaths, a goroutine is responsible for watching this
162
+	//     channel and subscribing to the stats API for containers.
163
+	type containerEvent struct {
164
+		id    string
165
+		event string
166
+		err   error
167
+	}
168
+	containerChan := make(chan containerEvent)
169
+
170
+	// monitorContainerEvents watches for container creation and removal (only
171
+	// used when calling `docker stats` without arguments).
172
+	monitorContainerEvents := func(started chan<- struct{}, c chan<- containerEvent) {
173
+		f := filters.NewArgs()
174
+		f.Add("type", "container")
175
+		options := types.EventsOptions{
176
+			Filters: f,
177
+		}
178
+		resBody, err := cli.client.Events(context.Background(), options)
179
+		// Whether we successfully subscribed to events or not, we can now
180
+		// unblock the main goroutine.
181
+		close(started)
182
+		if err != nil {
183
+			c <- containerEvent{err: err}
184
+			return
185
+		}
186
+		defer resBody.Close()
187
+		decodeEvents(resBody, func(event events.Message, err error) error {
188
+			if err != nil {
189
+				c <- containerEvent{"", "", err}
190
+			} else {
191
+				c <- containerEvent{event.ID[:12], event.Action, err}
192
+			}
193
+			return nil
194
+		})
195
+	}
196
+
197
+	// getContainerList simulates creation event for all previously existing
198
+	// containers (only used when calling `docker stats` without arguments).
199
+	getContainerList := func(c chan<- containerEvent) {
154 200
 		options := types.ContainerListOptions{
155 201
 			All: *all,
156 202
 		}
157 203
 		cs, err := cli.client.ContainerList(options)
158 204
 		if err != nil {
159
-			return err
205
+			containerChan <- containerEvent{"", "", err}
160 206
 		}
161 207
 		for _, c := range cs {
162
-			names = append(names, c.ID[:12])
208
+			containerChan <- containerEvent{c.ID[:12], "create", nil}
163 209
 		}
164 210
 	}
165
-	if len(names) == 0 && !showAll {
166
-		return fmt.Errorf("No containers found")
167
-	}
168
-	sort.Strings(names)
169 211
 
170
-	var (
171
-		cStats = stats{}
172
-		w      = tabwriter.NewWriter(cli.out, 20, 1, 3, ' ', 0)
173
-	)
174
-	printHeader := func() {
175
-		if !*noStream {
176
-			fmt.Fprint(cli.out, "\033[2J")
177
-			fmt.Fprint(cli.out, "\033[H")
178
-		}
179
-		io.WriteString(w, "CONTAINER\tCPU %\tMEM USAGE / LIMIT\tMEM %\tNET I/O\tBLOCK I/O\n")
180
-	}
181
-	for _, n := range names {
182
-		s := &containerStats{Name: n}
183
-		// no need to lock here since only the main goroutine is running here
184
-		cStats.cs = append(cStats.cs, s)
185
-		go s.Collect(cli, !*noStream)
186
-	}
212
+	// Monitor the containerChan and start collection for each container.
213
+	cStats := stats{}
187 214
 	closeChan := make(chan error)
188
-	if showAll {
189
-		type watch struct {
190
-			cid   string
191
-			event string
192
-			err   error
193
-		}
194
-		getNewContainers := func(c chan<- watch) {
195
-			f := filters.NewArgs()
196
-			f.Add("type", "container")
197
-			options := types.EventsOptions{
198
-				Filters: f,
199
-			}
200
-			resBody, err := cli.client.Events(context.Background(), options)
201
-			if err != nil {
202
-				c <- watch{err: err}
215
+	go func(stopChan chan<- error, c <-chan containerEvent) {
216
+		for {
217
+			event := <-c
218
+			if event.err != nil {
219
+				stopChan <- event.err
203 220
 				return
204 221
 			}
205
-			defer resBody.Close()
206
-
207
-			decodeEvents(resBody, func(event events.Message, err error) error {
208
-				if err != nil {
209
-					c <- watch{err: err}
210
-					return nil
211
-				}
212
-
213
-				c <- watch{event.ID[:12], event.Action, nil}
214
-				return nil
215
-			})
216
-		}
217
-		go func(stopChan chan<- error) {
218
-			cChan := make(chan watch)
219
-			go getNewContainers(cChan)
220
-			for {
221
-				c := <-cChan
222
-				if c.err != nil {
223
-					stopChan <- c.err
224
-					return
225
-				}
226
-				switch c.event {
227
-				case "create":
228
-					s := &containerStats{Name: c.cid}
229
-					cStats.mu.Lock()
222
+			switch event.event {
223
+			case "create":
224
+				cStats.mu.Lock()
225
+				if !cStats.isKnownContainer(event.id) {
226
+					s := &containerStats{Name: event.id}
230 227
 					cStats.cs = append(cStats.cs, s)
231
-					cStats.mu.Unlock()
232 228
 					go s.Collect(cli, !*noStream)
233
-				case "stop":
234
-				case "die":
235
-					if !*all {
236
-						var remove int
237
-						// cStats cannot be O(1) with a map cause ranging over it would cause
238
-						// containers in stats to move up and down in the list...:(
239
-						cStats.mu.Lock()
240
-						for i, s := range cStats.cs {
241
-							if s.Name == c.cid {
242
-								remove = i
243
-								break
244
-							}
229
+				}
230
+				cStats.mu.Unlock()
231
+			case "stop":
232
+			case "die":
233
+				if !*all {
234
+					var remove int
235
+					// cStats cannot be O(1) with a map cause ranging over it would cause
236
+					// containers in stats to move up and down in the list...:(
237
+					cStats.mu.Lock()
238
+					for i, s := range cStats.cs {
239
+						if s.Name == event.id {
240
+							remove = i
241
+							break
245 242
 						}
246
-						cStats.cs = append(cStats.cs[:remove], cStats.cs[remove+1:]...)
247
-						cStats.mu.Unlock()
248 243
 					}
244
+					cStats.cs = append(cStats.cs[:remove], cStats.cs[remove+1:]...)
245
+					cStats.mu.Unlock()
249 246
 				}
250 247
 			}
251
-		}(closeChan)
248
+		}
249
+	}(closeChan, containerChan)
250
+
251
+	if showAll {
252
+		// If no names were specified, start a long running goroutine which
253
+		// monitors container events. We make sure we're subscribed before
254
+		// retrieving the list of running containers to avoid a race where we
255
+		// would "miss" a creation.
256
+		started := make(chan struct{})
257
+		go monitorContainerEvents(started, containerChan)
258
+		<-started
259
+
260
+		// Start a short-lived goroutine to retrieve the initial list of
261
+		// containers.
262
+		go getContainerList(containerChan)
252 263
 	} else {
264
+		// Artificially send creation events for the containers we were asked to
265
+		// monitor (same code path than we use when monitoring all containers).
266
+		for _, name := range names {
267
+			containerChan <- containerEvent{name, "create", nil}
268
+		}
269
+
270
+		// We don't expect any asynchronous errors: closeChan can be closed.
253 271
 		close(closeChan)
254
-	}
255
-	// do a quick pause so that any failed connections for containers that do not exist are able to be
256
-	// evicted before we display the initial or default values.
257
-	time.Sleep(1500 * time.Millisecond)
258
-	var errs []string
259
-	cStats.mu.Lock()
260
-	for _, c := range cStats.cs {
261
-		c.mu.Lock()
262
-		if c.err != nil {
263
-			errs = append(errs, fmt.Sprintf("%s: %v", c.Name, c.err))
272
+
273
+		// Do a quick pause to detect any error with the provided list of
274
+		// container names.
275
+		time.Sleep(1500 * time.Millisecond)
276
+		var errs []string
277
+		cStats.mu.Lock()
278
+		for _, c := range cStats.cs {
279
+			c.mu.Lock()
280
+			if c.err != nil {
281
+				errs = append(errs, fmt.Sprintf("%s: %v", c.Name, c.err))
282
+			}
283
+			c.mu.Unlock()
284
+		}
285
+		cStats.mu.Unlock()
286
+		if len(errs) > 0 {
287
+			return fmt.Errorf("%s", strings.Join(errs, ", "))
264 288
 		}
265
-		c.mu.Unlock()
266 289
 	}
267
-	cStats.mu.Unlock()
268
-	if len(errs) > 0 {
269
-		return fmt.Errorf("%s", strings.Join(errs, ", "))
290
+
291
+	w := tabwriter.NewWriter(cli.out, 20, 1, 3, ' ', 0)
292
+	printHeader := func() {
293
+		if !*noStream {
294
+			fmt.Fprint(cli.out, "\033[2J")
295
+			fmt.Fprint(cli.out, "\033[H")
296
+		}
297
+		io.WriteString(w, "CONTAINER\tCPU %\tMEM USAGE / LIMIT\tMEM %\tNET I/O\tBLOCK I/O\n")
270 298
 	}
271 299
 	for range time.Tick(500 * time.Millisecond) {
272 300
 		printHeader()