Browse code

daemon: switch to semaphore-gated WaitGroup for startup tasks

Many startup tasks have to run for each container, and thus using a
WaitGroup (which doesn't have a limit to the number of parallel tasks)
can result in Docker exceeding the NOFILE limit quite trivially. A more
optimal solution is to have a parallelism limit by using a semaphore.

In addition, several startup tasks were not parallelised previously
which resulted in very long startup times. According to my testing, 20K
dead containers resulted in ~6 minute startup times (during which time
Docker is completely unusable).

This patch fixes both issues, and the parallelStartupTimes factor chosen
(128 * NumCPU) is based on my own significant testing of the 20K
container case. This patch (on my machines) reduces the startup time
from 6 minutes to less than a minute (ideally this could be further
reduced by removing the need to scan all dead containers on startup --
but that's beyond the scope of this patchset).

In order to avoid the NOFILE limit problem, we also detect this
on-startup and if NOFILE < 2*128*NumCPU we will reduce the parallelism
factor to avoid hitting NOFILE limits (but also emit a warning since
this is almost certainly a mis-configuration).

Signed-off-by: Aleksa Sarai <asarai@suse.de>

Aleksa Sarai authored on 2018/12/05 01:44:45
Showing 3 changed files
... ...
@@ -67,6 +67,7 @@ import (
67 67
 	"github.com/docker/libnetwork/cluster"
68 68
 	nwconfig "github.com/docker/libnetwork/config"
69 69
 	"github.com/pkg/errors"
70
+	"golang.org/x/sync/semaphore"
70 71
 )
71 72
 
72 73
 // ContainersNamespace is the name of the namespace used for users containers
... ...
@@ -198,6 +199,7 @@ func (daemon *Daemon) NewResolveOptionsFunc() resolver.ResolveOptionsFunc {
198 198
 }
199 199
 
200 200
 func (daemon *Daemon) restore() error {
201
+	var mapLock sync.Mutex
201 202
 	containers := make(map[string]*container.Container)
202 203
 
203 204
 	logrus.Info("Loading containers: start.")
... ...
@@ -207,68 +209,99 @@ func (daemon *Daemon) restore() error {
207 207
 		return err
208 208
 	}
209 209
 
210
+	// parallelLimit is the maximum number of parallel startup jobs that we
211
+	// allow (this is the limited used for all startup semaphores). The multipler
212
+	// (128) was chosen after some fairly significant benchmarking -- don't change
213
+	// it unless you've tested it significantly (this value is adjusted if
214
+	// RLIMIT_NOFILE is small to avoid EMFILE).
215
+	parallelLimit := adjustParallelLimit(len(dir), 128*runtime.NumCPU())
216
+
217
+	// Re-used for all parallel startup jobs.
218
+	var group sync.WaitGroup
219
+	sem := semaphore.NewWeighted(int64(parallelLimit))
220
+
210 221
 	for _, v := range dir {
211
-		id := v.Name()
212
-		container, err := daemon.load(id)
213
-		if err != nil {
214
-			logrus.Errorf("Failed to load container %v: %v", id, err)
215
-			continue
216
-		}
217
-		if !system.IsOSSupported(container.OS) {
218
-			logrus.Errorf("Failed to load container %v: %s (%q)", id, system.ErrNotSupportedOperatingSystem, container.OS)
219
-			continue
220
-		}
221
-		// Ignore the container if it does not support the current driver being used by the graph
222
-		currentDriverForContainerOS := daemon.graphDrivers[container.OS]
223
-		if (container.Driver == "" && currentDriverForContainerOS == "aufs") || container.Driver == currentDriverForContainerOS {
224
-			rwlayer, err := daemon.imageService.GetLayerByID(container.ID, container.OS)
222
+		group.Add(1)
223
+		go func(id string) {
224
+			defer group.Done()
225
+			_ = sem.Acquire(context.Background(), 1)
226
+			defer sem.Release(1)
227
+
228
+			container, err := daemon.load(id)
225 229
 			if err != nil {
226
-				logrus.Errorf("Failed to load container mount %v: %v", id, err)
227
-				continue
230
+				logrus.Errorf("Failed to load container %v: %v", id, err)
231
+				return
228 232
 			}
229
-			container.RWLayer = rwlayer
230
-			logrus.Debugf("Loaded container %v, isRunning: %v", container.ID, container.IsRunning())
233
+			if !system.IsOSSupported(container.OS) {
234
+				logrus.Errorf("Failed to load container %v: %s (%q)", id, system.ErrNotSupportedOperatingSystem, container.OS)
235
+				return
236
+			}
237
+			// Ignore the container if it does not support the current driver being used by the graph
238
+			currentDriverForContainerOS := daemon.graphDrivers[container.OS]
239
+			if (container.Driver == "" && currentDriverForContainerOS == "aufs") || container.Driver == currentDriverForContainerOS {
240
+				rwlayer, err := daemon.imageService.GetLayerByID(container.ID, container.OS)
241
+				if err != nil {
242
+					logrus.Errorf("Failed to load container mount %v: %v", id, err)
243
+					return
244
+				}
245
+				container.RWLayer = rwlayer
246
+				logrus.Debugf("Loaded container %v, isRunning: %v", container.ID, container.IsRunning())
231 247
 
232
-			containers[container.ID] = container
233
-		} else {
234
-			logrus.Debugf("Cannot load container %s because it was created with another graph driver.", container.ID)
235
-		}
248
+				mapLock.Lock()
249
+				containers[container.ID] = container
250
+				mapLock.Unlock()
251
+			} else {
252
+				logrus.Debugf("Cannot load container %s because it was created with another graph driver.", container.ID)
253
+			}
254
+		}(v.Name())
236 255
 	}
256
+	group.Wait()
237 257
 
238 258
 	removeContainers := make(map[string]*container.Container)
239 259
 	restartContainers := make(map[*container.Container]chan struct{})
240 260
 	activeSandboxes := make(map[string]interface{})
261
+
241 262
 	for id, c := range containers {
242
-		if err := daemon.registerName(c); err != nil {
243
-			logrus.Errorf("Failed to register container name %s: %s", c.ID, err)
244
-			delete(containers, id)
245
-			continue
246
-		}
247
-		if err := daemon.Register(c); err != nil {
248
-			logrus.Errorf("Failed to register container %s: %s", c.ID, err)
249
-			delete(containers, id)
250
-			continue
251
-		}
263
+		group.Add(1)
264
+		go func(c *container.Container) {
265
+			defer group.Done()
266
+			_ = sem.Acquire(context.Background(), 1)
267
+			defer sem.Release(1)
252 268
 
253
-		// The LogConfig.Type is empty if the container was created before docker 1.12 with default log driver.
254
-		// We should rewrite it to use the daemon defaults.
255
-		// Fixes https://github.com/docker/docker/issues/22536
256
-		if c.HostConfig.LogConfig.Type == "" {
257
-			if err := daemon.mergeAndVerifyLogConfig(&c.HostConfig.LogConfig); err != nil {
258
-				logrus.Errorf("Failed to verify log config for container %s: %q", c.ID, err)
259
-				continue
269
+			if err := daemon.registerName(c); err != nil {
270
+				logrus.Errorf("Failed to register container name %s: %s", c.ID, err)
271
+				mapLock.Lock()
272
+				delete(containers, id)
273
+				mapLock.Unlock()
274
+				return
260 275
 			}
261
-		}
276
+			if err := daemon.Register(c); err != nil {
277
+				logrus.Errorf("Failed to register container %s: %s", c.ID, err)
278
+				mapLock.Lock()
279
+				delete(containers, id)
280
+				mapLock.Unlock()
281
+				return
282
+			}
283
+
284
+			// The LogConfig.Type is empty if the container was created before docker 1.12 with default log driver.
285
+			// We should rewrite it to use the daemon defaults.
286
+			// Fixes https://github.com/docker/docker/issues/22536
287
+			if c.HostConfig.LogConfig.Type == "" {
288
+				if err := daemon.mergeAndVerifyLogConfig(&c.HostConfig.LogConfig); err != nil {
289
+					logrus.Errorf("Failed to verify log config for container %s: %q", c.ID, err)
290
+				}
291
+			}
292
+		}(c)
262 293
 	}
294
+	group.Wait()
263 295
 
264
-	var (
265
-		wg      sync.WaitGroup
266
-		mapLock sync.Mutex
267
-	)
268 296
 	for _, c := range containers {
269
-		wg.Add(1)
297
+		group.Add(1)
270 298
 		go func(c *container.Container) {
271
-			defer wg.Done()
299
+			defer group.Done()
300
+			_ = sem.Acquire(context.Background(), 1)
301
+			defer sem.Release(1)
302
+
272 303
 			daemon.backportMountSpec(c)
273 304
 			if err := daemon.checkpointAndSave(c); err != nil {
274 305
 				logrus.WithError(err).WithField("container", c.ID).Error("error saving backported mountspec to disk")
... ...
@@ -415,7 +448,8 @@ func (daemon *Daemon) restore() error {
415 415
 			c.Unlock()
416 416
 		}(c)
417 417
 	}
418
-	wg.Wait()
418
+	group.Wait()
419
+
419 420
 	daemon.netController, err = daemon.initNetworkController(daemon.configStore, activeSandboxes)
420 421
 	if err != nil {
421 422
 		return fmt.Errorf("Error initializing network controller: %v", err)
... ...
@@ -423,18 +457,24 @@ func (daemon *Daemon) restore() error {
423 423
 
424 424
 	// Now that all the containers are registered, register the links
425 425
 	for _, c := range containers {
426
-		if err := daemon.registerLinks(c, c.HostConfig); err != nil {
427
-			logrus.Errorf("failed to register link for container %s: %v", c.ID, err)
428
-		}
426
+		group.Add(1)
427
+		go func(c *container.Container) {
428
+			_ = sem.Acquire(context.Background(), 1)
429
+
430
+			if err := daemon.registerLinks(c, c.HostConfig); err != nil {
431
+				logrus.Errorf("failed to register link for container %s: %v", c.ID, err)
432
+			}
433
+
434
+			sem.Release(1)
435
+			group.Done()
436
+		}(c)
429 437
 	}
438
+	group.Wait()
430 439
 
431
-	group := sync.WaitGroup{}
432 440
 	for c, notifier := range restartContainers {
433 441
 		group.Add(1)
434
-
435 442
 		go func(c *container.Container, chNotify chan struct{}) {
436
-			defer group.Done()
437
-
443
+			_ = sem.Acquire(context.Background(), 1)
438 444
 			logrus.Debugf("Starting container %s", c.ID)
439 445
 
440 446
 			// ignore errors here as this is a best effort to wait for children to be
... ...
@@ -456,22 +496,27 @@ func (daemon *Daemon) restore() error {
456 456
 				logrus.Errorf("Failed to start container %s: %s", c.ID, err)
457 457
 			}
458 458
 			close(chNotify)
459
-		}(c, notifier)
460 459
 
460
+			sem.Release(1)
461
+			group.Done()
462
+		}(c, notifier)
461 463
 	}
462 464
 	group.Wait()
463 465
 
464
-	removeGroup := sync.WaitGroup{}
465 466
 	for id := range removeContainers {
466
-		removeGroup.Add(1)
467
+		group.Add(1)
467 468
 		go func(cid string) {
469
+			_ = sem.Acquire(context.Background(), 1)
470
+
468 471
 			if err := daemon.ContainerRm(cid, &types.ContainerRmConfig{ForceRemove: true, RemoveVolume: true}); err != nil {
469 472
 				logrus.Errorf("Failed to remove container %s: %s", cid, err)
470 473
 			}
471
-			removeGroup.Done()
474
+
475
+			sem.Release(1)
476
+			group.Done()
472 477
 		}(id)
473 478
 	}
474
-	removeGroup.Wait()
479
+	group.Wait()
475 480
 
476 481
 	// any containers that were started above would already have had this done,
477 482
 	// however we need to now prepare the mountpoints for the rest of the containers as well.
... ...
@@ -492,13 +537,16 @@ func (daemon *Daemon) restore() error {
492 492
 
493 493
 		group.Add(1)
494 494
 		go func(c *container.Container) {
495
-			defer group.Done()
495
+			_ = sem.Acquire(context.Background(), 1)
496
+
496 497
 			if err := daemon.prepareMountPoints(c); err != nil {
497 498
 				logrus.Error(err)
498 499
 			}
500
+
501
+			sem.Release(1)
502
+			group.Done()
499 503
 		}(c)
500 504
 	}
501
-
502 505
 	group.Wait()
503 506
 
504 507
 	logrus.Info("Loading containers: done.")
... ...
@@ -509,7 +557,18 @@ func (daemon *Daemon) restore() error {
509 509
 // RestartSwarmContainers restarts any autostart container which has a
510 510
 // swarm endpoint.
511 511
 func (daemon *Daemon) RestartSwarmContainers() {
512
-	group := sync.WaitGroup{}
512
+	ctx := context.Background()
513
+
514
+	// parallelLimit is the maximum number of parallel startup jobs that we
515
+	// allow (this is the limited used for all startup semaphores). The multipler
516
+	// (128) was chosen after some fairly significant benchmarking -- don't change
517
+	// it unless you've tested it significantly (this value is adjusted if
518
+	// RLIMIT_NOFILE is small to avoid EMFILE).
519
+	parallelLimit := adjustParallelLimit(len(daemon.List()), 128*runtime.NumCPU())
520
+
521
+	var group sync.WaitGroup
522
+	sem := semaphore.NewWeighted(int64(parallelLimit))
523
+
513 524
 	for _, c := range daemon.List() {
514 525
 		if !c.IsRunning() && !c.IsPaused() {
515 526
 			// Autostart all the containers which has a
... ...
@@ -518,14 +577,21 @@ func (daemon *Daemon) RestartSwarmContainers() {
518 518
 			if daemon.configStore.AutoRestart && c.ShouldRestart() && c.NetworkSettings.HasSwarmEndpoint && c.HasBeenStartedBefore {
519 519
 				group.Add(1)
520 520
 				go func(c *container.Container) {
521
-					defer group.Done()
521
+					if err := sem.Acquire(ctx, 1); err != nil {
522
+						// ctx is done.
523
+						group.Done()
524
+						return
525
+					}
526
+
522 527
 					if err := daemon.containerStart(c, "", "", true); err != nil {
523 528
 						logrus.Error(err)
524 529
 					}
530
+
531
+					sem.Release(1)
532
+					group.Done()
525 533
 				}(c)
526 534
 			}
527 535
 		}
528
-
529 536
 	}
530 537
 	group.Wait()
531 538
 }
... ...
@@ -257,6 +257,41 @@ func getBlkioThrottleDevices(devs []*blkiodev.ThrottleDevice) ([]specs.LinuxThro
257 257
 	return throttleDevices, nil
258 258
 }
259 259
 
260
+// adjustParallelLimit takes a number of objects and a proposed limit and
261
+// figures out if it's reasonable (and adjusts it accordingly). This is only
262
+// used for daemon startup, which does a lot of parallel loading of containers
263
+// (and if we exceed RLIMIT_NOFILE then we're in trouble).
264
+func adjustParallelLimit(n int, limit int) int {
265
+	// Rule-of-thumb overhead factor (how many files will each goroutine open
266
+	// simultaneously). Yes, this is ugly but to be frank this whole thing is
267
+	// ugly.
268
+	const overhead = 2
269
+
270
+	// On Linux, we need to ensure that parallelStartupJobs doesn't cause us to
271
+	// exceed RLIMIT_NOFILE. If parallelStartupJobs is too large, we reduce it
272
+	// and give a warning (since in theory the user should increase their
273
+	// ulimits to the largest possible value for dockerd).
274
+	var rlim unix.Rlimit
275
+	if err := unix.Getrlimit(unix.RLIMIT_NOFILE, &rlim); err != nil {
276
+		logrus.Warnf("Couldn't find dockerd's RLIMIT_NOFILE to double-check startup parallelism factor: %v", err)
277
+		return limit
278
+	}
279
+	softRlimit := int(rlim.Cur)
280
+
281
+	// Much fewer containers than RLIMIT_NOFILE. No need to adjust anything.
282
+	if softRlimit > overhead*n {
283
+		return limit
284
+	}
285
+
286
+	// RLIMIT_NOFILE big enough, no need to adjust anything.
287
+	if softRlimit > overhead*limit {
288
+		return limit
289
+	}
290
+
291
+	logrus.Warnf("Found dockerd's open file ulimit (%v) is far too small -- consider increasing it significantly (at least %v)", softRlimit, overhead*limit)
292
+	return softRlimit / overhead
293
+}
294
+
260 295
 func checkKernel() error {
261 296
 	// Check for unsupported kernel versions
262 297
 	// FIXME: it would be cleaner to not test for specific versions, but rather
... ...
@@ -40,6 +40,11 @@ const (
40 40
 	windowsMaxCPUPercent = 100
41 41
 )
42 42
 
43
+// Windows doesn't really have rlimits.
44
+func adjustParallelLimit(n int, limit int) int {
45
+	return limit
46
+}
47
+
43 48
 // Windows has no concept of an execution state directory. So use config.Root here.
44 49
 func getPluginExecRoot(root string) string {
45 50
 	return filepath.Join(root, "plugins")