Browse code

Wait for discovery on container start error

This gives discovery a chance to initialize, particularly if the K/V
store being used is in a container.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
(cherry picked from commit 2dce79e05ab8f8fd22ca7b2f73121b2d7723f7cf)

Brian Goff authored on 2016/05/04 23:13:23
Showing 3 changed files
... ...
@@ -367,6 +367,9 @@ func (daemon *Daemon) restore() error {
367 367
 					}
368 368
 				}
369 369
 			}
370
+
371
+			// Make sure networks are available before starting
372
+			daemon.waitForNetworks(c)
370 373
 			if err := daemon.containerStart(c); err != nil {
371 374
 				logrus.Errorf("Failed to start container %s: %s", c.ID, err)
372 375
 			}
... ...
@@ -410,6 +413,33 @@ func (daemon *Daemon) restore() error {
410 410
 	return nil
411 411
 }
412 412
 
413
+// waitForNetworks is used during daemon initialization when starting up containers
414
+// It ensures that all of a container's networks are available before the daemon tries to start the container.
415
+// In practice it just makes sure the discovery service is available for containers which use a network that require discovery.
416
+func (daemon *Daemon) waitForNetworks(c *container.Container) {
417
+	if daemon.discoveryWatcher == nil {
418
+		return
419
+	}
420
+	// Make sure if the container has a network that requires discovery that the discovery service is available before starting
421
+	for netName := range c.NetworkSettings.Networks {
422
+		// If we get `ErrNoSuchNetwork` here, it can assumed that it is due to discovery not being ready
423
+		// Most likely this is because the K/V store used for discovery is in a container and needs to be started
424
+		if _, err := daemon.netController.NetworkByName(netName); err != nil {
425
+			if _, ok := err.(libnetwork.ErrNoSuchNetwork); !ok {
426
+				continue
427
+			}
428
+			// use a longish timeout here due to some slowdowns in libnetwork if the k/v store is on anything other than --net=host
429
+			// FIXME: why is this slow???
430
+			logrus.Debugf("Container %s waiting for network to be ready", c.Name)
431
+			select {
432
+			case <-daemon.discoveryWatcher.ReadyCh():
433
+			case <-time.After(60 * time.Second):
434
+			}
435
+			return
436
+		}
437
+	}
438
+}
439
+
413 440
 func (daemon *Daemon) mergeAndVerifyConfig(config *containertypes.Config, img *image.Image) error {
414 441
 	if img != nil && img.Config != nil {
415 442
 		if err := merge(config, img.Config); err != nil {
... ...
@@ -381,6 +381,12 @@ func TestDaemonDiscoveryReload(t *testing.T) {
381 381
 		&discovery.Entry{Host: "127.0.0.1", Port: "3333"},
382 382
 	}
383 383
 
384
+	select {
385
+	case <-time.After(10 * time.Second):
386
+		t.Fatal("timeout waiting for discovery")
387
+	case <-daemon.discoveryWatcher.ReadyCh():
388
+	}
389
+
384 390
 	stopCh := make(chan struct{})
385 391
 	defer close(stopCh)
386 392
 	ch, errCh := daemon.discoveryWatcher.Watch(stopCh)
... ...
@@ -414,6 +420,13 @@ func TestDaemonDiscoveryReload(t *testing.T) {
414 414
 	if err := daemon.Reload(newConfig); err != nil {
415 415
 		t.Fatal(err)
416 416
 	}
417
+
418
+	select {
419
+	case <-time.After(10 * time.Second):
420
+		t.Fatal("timeout waiting for discovery")
421
+	case <-daemon.discoveryWatcher.ReadyCh():
422
+	}
423
+
417 424
 	ch, errCh = daemon.discoveryWatcher.Watch(stopCh)
418 425
 
419 426
 	select {
... ...
@@ -450,6 +463,13 @@ func TestDaemonDiscoveryReloadFromEmptyDiscovery(t *testing.T) {
450 450
 	if err := daemon.Reload(newConfig); err != nil {
451 451
 		t.Fatal(err)
452 452
 	}
453
+
454
+	select {
455
+	case <-time.After(10 * time.Second):
456
+		t.Fatal("timeout waiting for discovery")
457
+	case <-daemon.discoveryWatcher.ReadyCh():
458
+	}
459
+
453 460
 	stopCh := make(chan struct{})
454 461
 	defer close(stopCh)
455 462
 	ch, errCh := daemon.discoveryWatcher.Watch(stopCh)
... ...
@@ -488,6 +508,12 @@ func TestDaemonDiscoveryReloadOnlyClusterAdvertise(t *testing.T) {
488 488
 	if err := daemon.Reload(newConfig); err != nil {
489 489
 		t.Fatal(err)
490 490
 	}
491
+
492
+	select {
493
+	case <-daemon.discoveryWatcher.ReadyCh():
494
+	case <-time.After(10 * time.Second):
495
+		t.Fatal("Timeout waiting for discovery")
496
+	}
491 497
 	stopCh := make(chan struct{})
492 498
 	defer close(stopCh)
493 499
 	ch, errCh := daemon.discoveryWatcher.Watch(stopCh)
... ...
@@ -27,18 +27,24 @@ type discoveryReloader interface {
27 27
 	discovery.Watcher
28 28
 	Stop()
29 29
 	Reload(backend, address string, clusterOpts map[string]string) error
30
+	ReadyCh() <-chan struct{}
30 31
 }
31 32
 
32 33
 type daemonDiscoveryReloader struct {
33 34
 	backend discovery.Backend
34 35
 	ticker  *time.Ticker
35 36
 	term    chan bool
37
+	readyCh chan struct{}
36 38
 }
37 39
 
38 40
 func (d *daemonDiscoveryReloader) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) {
39 41
 	return d.backend.Watch(stopCh)
40 42
 }
41 43
 
44
+func (d *daemonDiscoveryReloader) ReadyCh() <-chan struct{} {
45
+	return d.readyCh
46
+}
47
+
42 48
 func discoveryOpts(clusterOpts map[string]string) (time.Duration, time.Duration, error) {
43 49
 	var (
44 50
 		heartbeat = defaultDiscoveryHeartbeat
... ...
@@ -87,38 +93,64 @@ func initDiscovery(backendAddress, advertiseAddress string, clusterOpts map[stri
87 87
 		backend: backend,
88 88
 		ticker:  time.NewTicker(heartbeat),
89 89
 		term:    make(chan bool),
90
+		readyCh: make(chan struct{}),
90 91
 	}
91 92
 	// We call Register() on the discovery backend in a loop for the whole lifetime of the daemon,
92 93
 	// but we never actually Watch() for nodes appearing and disappearing for the moment.
93
-	reloader.advertise(advertiseAddress)
94
+	go reloader.advertiseHeartbeat(advertiseAddress)
94 95
 	return reloader, nil
95 96
 }
96 97
 
97
-func (d *daemonDiscoveryReloader) advertise(address string) {
98
-	d.registerAddr(address)
99
-	go d.advertiseHeartbeat(address)
100
-}
101
-
102
-func (d *daemonDiscoveryReloader) registerAddr(addr string) {
103
-	if err := d.backend.Register(addr); err != nil {
104
-		log.Warnf("Registering as %q in discovery failed: %v", addr, err)
105
-	}
106
-}
107
-
108 98
 // advertiseHeartbeat registers the current node against the discovery backend using the specified
109 99
 // address. The function never returns, as registration against the backend comes with a TTL and
110 100
 // requires regular heartbeats.
111 101
 func (d *daemonDiscoveryReloader) advertiseHeartbeat(address string) {
102
+	var ready bool
103
+	if err := d.initHeartbeat(address); err == nil {
104
+		ready = true
105
+		close(d.readyCh)
106
+	}
107
+
112 108
 	for {
113 109
 		select {
114 110
 		case <-d.ticker.C:
115
-			d.registerAddr(address)
111
+			if err := d.backend.Register(address); err != nil {
112
+				log.Warnf("Registering as %q in discovery failed: %v", address, err)
113
+			} else {
114
+				if !ready {
115
+					close(d.readyCh)
116
+					ready = true
117
+				}
118
+			}
116 119
 		case <-d.term:
117 120
 			return
118 121
 		}
119 122
 	}
120 123
 }
121 124
 
125
+// initHeartbeat is used to do the first heartbeat. It uses a tight loop until
126
+// either the timeout period is reached or the heartbeat is successful and returns.
127
+func (d *daemonDiscoveryReloader) initHeartbeat(address string) error {
128
+	// Setup a short ticker until the first heartbeat has succeeded
129
+	t := time.NewTicker(500 * time.Millisecond)
130
+	defer t.Stop()
131
+	// timeout makes sure that after a period of time we stop being so aggressive trying to reach the discovery service
132
+	timeout := time.After(60 * time.Second)
133
+
134
+	for {
135
+		select {
136
+		case <-timeout:
137
+			return errors.New("timeout waiting for initial discovery")
138
+		case <-d.term:
139
+			return errors.New("terminated")
140
+		case <-t.C:
141
+			if err := d.backend.Register(address); err == nil {
142
+				return nil
143
+			}
144
+		}
145
+	}
146
+}
147
+
122 148
 // Reload makes the watcher to stop advertising and reconfigures it to advertise in a new address.
123 149
 func (d *daemonDiscoveryReloader) Reload(backendAddress, advertiseAddress string, clusterOpts map[string]string) error {
124 150
 	d.Stop()
... ...
@@ -130,8 +162,9 @@ func (d *daemonDiscoveryReloader) Reload(backendAddress, advertiseAddress string
130 130
 
131 131
 	d.backend = backend
132 132
 	d.ticker = time.NewTicker(heartbeat)
133
+	d.readyCh = make(chan struct{})
133 134
 
134
-	d.advertise(advertiseAddress)
135
+	go d.advertiseHeartbeat(advertiseAddress)
135 136
 	return nil
136 137
 }
137 138