Browse code

Merge pull request #38301 from cyphar/waitgroup-limits

daemon: switch to semaphore-gated WaitGroup for startup tasks

Akihiro Suda authored on 2018/12/22 00:07:55
Showing 3 changed files
... ...
@@ -66,6 +66,7 @@ import (
66 66
 	"github.com/docker/libnetwork/cluster"
67 67
 	nwconfig "github.com/docker/libnetwork/config"
68 68
 	"github.com/pkg/errors"
69
+	"golang.org/x/sync/semaphore"
69 70
 )
70 71
 
71 72
 // ContainersNamespace is the name of the namespace used for users containers
... ...
@@ -197,6 +198,7 @@ func (daemon *Daemon) NewResolveOptionsFunc() resolver.ResolveOptionsFunc {
197 197
 }
198 198
 
199 199
 func (daemon *Daemon) restore() error {
200
+	var mapLock sync.Mutex
200 201
 	containers := make(map[string]*container.Container)
201 202
 
202 203
 	logrus.Info("Loading containers: start.")
... ...
@@ -206,68 +208,99 @@ func (daemon *Daemon) restore() error {
206 206
 		return err
207 207
 	}
208 208
 
209
+	// parallelLimit is the maximum number of parallel startup jobs that we
210
+	// allow (this is the limited used for all startup semaphores). The multipler
211
+	// (128) was chosen after some fairly significant benchmarking -- don't change
212
+	// it unless you've tested it significantly (this value is adjusted if
213
+	// RLIMIT_NOFILE is small to avoid EMFILE).
214
+	parallelLimit := adjustParallelLimit(len(dir), 128*runtime.NumCPU())
215
+
216
+	// Re-used for all parallel startup jobs.
217
+	var group sync.WaitGroup
218
+	sem := semaphore.NewWeighted(int64(parallelLimit))
219
+
209 220
 	for _, v := range dir {
210
-		id := v.Name()
211
-		container, err := daemon.load(id)
212
-		if err != nil {
213
-			logrus.Errorf("Failed to load container %v: %v", id, err)
214
-			continue
215
-		}
216
-		if !system.IsOSSupported(container.OS) {
217
-			logrus.Errorf("Failed to load container %v: %s (%q)", id, system.ErrNotSupportedOperatingSystem, container.OS)
218
-			continue
219
-		}
220
-		// Ignore the container if it does not support the current driver being used by the graph
221
-		currentDriverForContainerOS := daemon.graphDrivers[container.OS]
222
-		if (container.Driver == "" && currentDriverForContainerOS == "aufs") || container.Driver == currentDriverForContainerOS {
223
-			rwlayer, err := daemon.imageService.GetLayerByID(container.ID, container.OS)
221
+		group.Add(1)
222
+		go func(id string) {
223
+			defer group.Done()
224
+			_ = sem.Acquire(context.Background(), 1)
225
+			defer sem.Release(1)
226
+
227
+			container, err := daemon.load(id)
224 228
 			if err != nil {
225
-				logrus.Errorf("Failed to load container mount %v: %v", id, err)
226
-				continue
229
+				logrus.Errorf("Failed to load container %v: %v", id, err)
230
+				return
227 231
 			}
228
-			container.RWLayer = rwlayer
229
-			logrus.Debugf("Loaded container %v, isRunning: %v", container.ID, container.IsRunning())
232
+			if !system.IsOSSupported(container.OS) {
233
+				logrus.Errorf("Failed to load container %v: %s (%q)", id, system.ErrNotSupportedOperatingSystem, container.OS)
234
+				return
235
+			}
236
+			// Ignore the container if it does not support the current driver being used by the graph
237
+			currentDriverForContainerOS := daemon.graphDrivers[container.OS]
238
+			if (container.Driver == "" && currentDriverForContainerOS == "aufs") || container.Driver == currentDriverForContainerOS {
239
+				rwlayer, err := daemon.imageService.GetLayerByID(container.ID, container.OS)
240
+				if err != nil {
241
+					logrus.Errorf("Failed to load container mount %v: %v", id, err)
242
+					return
243
+				}
244
+				container.RWLayer = rwlayer
245
+				logrus.Debugf("Loaded container %v, isRunning: %v", container.ID, container.IsRunning())
230 246
 
231
-			containers[container.ID] = container
232
-		} else {
233
-			logrus.Debugf("Cannot load container %s because it was created with another graph driver.", container.ID)
234
-		}
247
+				mapLock.Lock()
248
+				containers[container.ID] = container
249
+				mapLock.Unlock()
250
+			} else {
251
+				logrus.Debugf("Cannot load container %s because it was created with another graph driver.", container.ID)
252
+			}
253
+		}(v.Name())
235 254
 	}
255
+	group.Wait()
236 256
 
237 257
 	removeContainers := make(map[string]*container.Container)
238 258
 	restartContainers := make(map[*container.Container]chan struct{})
239 259
 	activeSandboxes := make(map[string]interface{})
260
+
240 261
 	for id, c := range containers {
241
-		if err := daemon.registerName(c); err != nil {
242
-			logrus.Errorf("Failed to register container name %s: %s", c.ID, err)
243
-			delete(containers, id)
244
-			continue
245
-		}
246
-		if err := daemon.Register(c); err != nil {
247
-			logrus.Errorf("Failed to register container %s: %s", c.ID, err)
248
-			delete(containers, id)
249
-			continue
250
-		}
262
+		group.Add(1)
263
+		go func(c *container.Container) {
264
+			defer group.Done()
265
+			_ = sem.Acquire(context.Background(), 1)
266
+			defer sem.Release(1)
251 267
 
252
-		// The LogConfig.Type is empty if the container was created before docker 1.12 with default log driver.
253
-		// We should rewrite it to use the daemon defaults.
254
-		// Fixes https://github.com/docker/docker/issues/22536
255
-		if c.HostConfig.LogConfig.Type == "" {
256
-			if err := daemon.mergeAndVerifyLogConfig(&c.HostConfig.LogConfig); err != nil {
257
-				logrus.Errorf("Failed to verify log config for container %s: %q", c.ID, err)
258
-				continue
268
+			if err := daemon.registerName(c); err != nil {
269
+				logrus.Errorf("Failed to register container name %s: %s", c.ID, err)
270
+				mapLock.Lock()
271
+				delete(containers, id)
272
+				mapLock.Unlock()
273
+				return
259 274
 			}
260
-		}
275
+			if err := daemon.Register(c); err != nil {
276
+				logrus.Errorf("Failed to register container %s: %s", c.ID, err)
277
+				mapLock.Lock()
278
+				delete(containers, id)
279
+				mapLock.Unlock()
280
+				return
281
+			}
282
+
283
+			// The LogConfig.Type is empty if the container was created before docker 1.12 with default log driver.
284
+			// We should rewrite it to use the daemon defaults.
285
+			// Fixes https://github.com/docker/docker/issues/22536
286
+			if c.HostConfig.LogConfig.Type == "" {
287
+				if err := daemon.mergeAndVerifyLogConfig(&c.HostConfig.LogConfig); err != nil {
288
+					logrus.Errorf("Failed to verify log config for container %s: %q", c.ID, err)
289
+				}
290
+			}
291
+		}(c)
261 292
 	}
293
+	group.Wait()
262 294
 
263
-	var (
264
-		wg      sync.WaitGroup
265
-		mapLock sync.Mutex
266
-	)
267 295
 	for _, c := range containers {
268
-		wg.Add(1)
296
+		group.Add(1)
269 297
 		go func(c *container.Container) {
270
-			defer wg.Done()
298
+			defer group.Done()
299
+			_ = sem.Acquire(context.Background(), 1)
300
+			defer sem.Release(1)
301
+
271 302
 			daemon.backportMountSpec(c)
272 303
 			if err := daemon.checkpointAndSave(c); err != nil {
273 304
 				logrus.WithError(err).WithField("container", c.ID).Error("error saving backported mountspec to disk")
... ...
@@ -414,7 +447,8 @@ func (daemon *Daemon) restore() error {
414 414
 			c.Unlock()
415 415
 		}(c)
416 416
 	}
417
-	wg.Wait()
417
+	group.Wait()
418
+
418 419
 	daemon.netController, err = daemon.initNetworkController(daemon.configStore, activeSandboxes)
419 420
 	if err != nil {
420 421
 		return fmt.Errorf("Error initializing network controller: %v", err)
... ...
@@ -422,18 +456,24 @@ func (daemon *Daemon) restore() error {
422 422
 
423 423
 	// Now that all the containers are registered, register the links
424 424
 	for _, c := range containers {
425
-		if err := daemon.registerLinks(c, c.HostConfig); err != nil {
426
-			logrus.Errorf("failed to register link for container %s: %v", c.ID, err)
427
-		}
425
+		group.Add(1)
426
+		go func(c *container.Container) {
427
+			_ = sem.Acquire(context.Background(), 1)
428
+
429
+			if err := daemon.registerLinks(c, c.HostConfig); err != nil {
430
+				logrus.Errorf("failed to register link for container %s: %v", c.ID, err)
431
+			}
432
+
433
+			sem.Release(1)
434
+			group.Done()
435
+		}(c)
428 436
 	}
437
+	group.Wait()
429 438
 
430
-	group := sync.WaitGroup{}
431 439
 	for c, notifier := range restartContainers {
432 440
 		group.Add(1)
433
-
434 441
 		go func(c *container.Container, chNotify chan struct{}) {
435
-			defer group.Done()
436
-
442
+			_ = sem.Acquire(context.Background(), 1)
437 443
 			logrus.Debugf("Starting container %s", c.ID)
438 444
 
439 445
 			// ignore errors here as this is a best effort to wait for children to be
... ...
@@ -455,22 +495,27 @@ func (daemon *Daemon) restore() error {
455 455
 				logrus.Errorf("Failed to start container %s: %s", c.ID, err)
456 456
 			}
457 457
 			close(chNotify)
458
-		}(c, notifier)
459 458
 
459
+			sem.Release(1)
460
+			group.Done()
461
+		}(c, notifier)
460 462
 	}
461 463
 	group.Wait()
462 464
 
463
-	removeGroup := sync.WaitGroup{}
464 465
 	for id := range removeContainers {
465
-		removeGroup.Add(1)
466
+		group.Add(1)
466 467
 		go func(cid string) {
468
+			_ = sem.Acquire(context.Background(), 1)
469
+
467 470
 			if err := daemon.ContainerRm(cid, &types.ContainerRmConfig{ForceRemove: true, RemoveVolume: true}); err != nil {
468 471
 				logrus.Errorf("Failed to remove container %s: %s", cid, err)
469 472
 			}
470
-			removeGroup.Done()
473
+
474
+			sem.Release(1)
475
+			group.Done()
471 476
 		}(id)
472 477
 	}
473
-	removeGroup.Wait()
478
+	group.Wait()
474 479
 
475 480
 	// any containers that were started above would already have had this done,
476 481
 	// however we need to now prepare the mountpoints for the rest of the containers as well.
... ...
@@ -491,13 +536,16 @@ func (daemon *Daemon) restore() error {
491 491
 
492 492
 		group.Add(1)
493 493
 		go func(c *container.Container) {
494
-			defer group.Done()
494
+			_ = sem.Acquire(context.Background(), 1)
495
+
495 496
 			if err := daemon.prepareMountPoints(c); err != nil {
496 497
 				logrus.Error(err)
497 498
 			}
499
+
500
+			sem.Release(1)
501
+			group.Done()
498 502
 		}(c)
499 503
 	}
500
-
501 504
 	group.Wait()
502 505
 
503 506
 	logrus.Info("Loading containers: done.")
... ...
@@ -508,7 +556,18 @@ func (daemon *Daemon) restore() error {
508 508
 // RestartSwarmContainers restarts any autostart container which has a
509 509
 // swarm endpoint.
510 510
 func (daemon *Daemon) RestartSwarmContainers() {
511
-	group := sync.WaitGroup{}
511
+	ctx := context.Background()
512
+
513
+	// parallelLimit is the maximum number of parallel startup jobs that we
514
+	// allow (this is the limited used for all startup semaphores). The multipler
515
+	// (128) was chosen after some fairly significant benchmarking -- don't change
516
+	// it unless you've tested it significantly (this value is adjusted if
517
+	// RLIMIT_NOFILE is small to avoid EMFILE).
518
+	parallelLimit := adjustParallelLimit(len(daemon.List()), 128*runtime.NumCPU())
519
+
520
+	var group sync.WaitGroup
521
+	sem := semaphore.NewWeighted(int64(parallelLimit))
522
+
512 523
 	for _, c := range daemon.List() {
513 524
 		if !c.IsRunning() && !c.IsPaused() {
514 525
 			// Autostart all the containers which has a
... ...
@@ -517,14 +576,21 @@ func (daemon *Daemon) RestartSwarmContainers() {
517 517
 			if daemon.configStore.AutoRestart && c.ShouldRestart() && c.NetworkSettings.HasSwarmEndpoint && c.HasBeenStartedBefore {
518 518
 				group.Add(1)
519 519
 				go func(c *container.Container) {
520
-					defer group.Done()
520
+					if err := sem.Acquire(ctx, 1); err != nil {
521
+						// ctx is done.
522
+						group.Done()
523
+						return
524
+					}
525
+
521 526
 					if err := daemon.containerStart(c, "", "", true); err != nil {
522 527
 						logrus.Error(err)
523 528
 					}
529
+
530
+					sem.Release(1)
531
+					group.Done()
524 532
 				}(c)
525 533
 			}
526 534
 		}
527
-
528 535
 	}
529 536
 	group.Wait()
530 537
 }
... ...
@@ -257,6 +257,41 @@ func getBlkioThrottleDevices(devs []*blkiodev.ThrottleDevice) ([]specs.LinuxThro
257 257
 	return throttleDevices, nil
258 258
 }
259 259
 
260
+// adjustParallelLimit takes a number of objects and a proposed limit and
261
+// figures out if it's reasonable (and adjusts it accordingly). This is only
262
+// used for daemon startup, which does a lot of parallel loading of containers
263
+// (and if we exceed RLIMIT_NOFILE then we're in trouble).
264
+func adjustParallelLimit(n int, limit int) int {
265
+	// Rule-of-thumb overhead factor (how many files will each goroutine open
266
+	// simultaneously). Yes, this is ugly but to be frank this whole thing is
267
+	// ugly.
268
+	const overhead = 2
269
+
270
+	// On Linux, we need to ensure that parallelStartupJobs doesn't cause us to
271
+	// exceed RLIMIT_NOFILE. If parallelStartupJobs is too large, we reduce it
272
+	// and give a warning (since in theory the user should increase their
273
+	// ulimits to the largest possible value for dockerd).
274
+	var rlim unix.Rlimit
275
+	if err := unix.Getrlimit(unix.RLIMIT_NOFILE, &rlim); err != nil {
276
+		logrus.Warnf("Couldn't find dockerd's RLIMIT_NOFILE to double-check startup parallelism factor: %v", err)
277
+		return limit
278
+	}
279
+	softRlimit := int(rlim.Cur)
280
+
281
+	// Much fewer containers than RLIMIT_NOFILE. No need to adjust anything.
282
+	if softRlimit > overhead*n {
283
+		return limit
284
+	}
285
+
286
+	// RLIMIT_NOFILE big enough, no need to adjust anything.
287
+	if softRlimit > overhead*limit {
288
+		return limit
289
+	}
290
+
291
+	logrus.Warnf("Found dockerd's open file ulimit (%v) is far too small -- consider increasing it significantly (at least %v)", softRlimit, overhead*limit)
292
+	return softRlimit / overhead
293
+}
294
+
260 295
 func checkKernel() error {
261 296
 	// Check for unsupported kernel versions
262 297
 	// FIXME: it would be cleaner to not test for specific versions, but rather
... ...
@@ -40,6 +40,11 @@ const (
40 40
 	windowsMaxCPUPercent = 100
41 41
 )
42 42
 
43
+// Windows doesn't really have rlimits.
44
+func adjustParallelLimit(n int, limit int) int {
45
+	return limit
46
+}
47
+
43 48
 // Windows has no concept of an execution state directory. So use config.Root here.
44 49
 func getPluginExecRoot(root string) string {
45 50
 	return filepath.Join(root, "plugins")