Browse code

Fix race condition between swarm and libnetwork

This commit in conjunction with a libnetwork side commit,
cleans up the libnetwork SetClusterProvider logic interaction.
The previous code was inducing libnetwork to spawn several go
routines that were racing between each other during the agent
init and close.

A test got added to verify that back to back swarm init and leave
are properly processed and not raise crashes

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>

Flavio Crisciani authored on 2017/05/01 06:51:43
Showing 7 changed files
... ...
@@ -300,6 +300,11 @@ func (cli *DaemonCli) start(opts daemonOptions) (err error) {
300 300
 	if err != nil {
301 301
 		logrus.Fatalf("Error creating cluster component: %v", err)
302 302
 	}
303
+	d.SetCluster(c)
304
+	err = c.Start()
305
+	if err != nil {
306
+		logrus.Fatalf("Error starting cluster component: %v", err)
307
+	}
303 308
 
304 309
 	// Restart all autostart containers which has a swarm endpoint
305 310
 	// and is not yet running now that we have successfully
... ...
@@ -316,7 +321,6 @@ func (cli *DaemonCli) start(opts daemonOptions) (err error) {
316 316
 
317 317
 	cli.d = d
318 318
 
319
-	d.SetCluster(c)
320 319
 	initRouter(api, d, c)
321 320
 
322 321
 	cli.setupConfigReloadTrap()
... ...
@@ -2,12 +2,14 @@ package daemon
2 2
 
3 3
 import (
4 4
 	apitypes "github.com/docker/docker/api/types"
5
+	lncluster "github.com/docker/libnetwork/cluster"
5 6
 )
6 7
 
7 8
 // Cluster is the interface for github.com/docker/docker/daemon/cluster.(*Cluster).
8 9
 type Cluster interface {
9 10
 	ClusterStatus
10 11
 	NetworkManager
12
+	SendClusterEvent(event lncluster.ConfigEventType)
11 13
 }
12 14
 
13 15
 // ClusterStatus interface provides information about the Swarm status of the Cluster
... ...
@@ -51,6 +51,7 @@ import (
51 51
 	types "github.com/docker/docker/api/types/swarm"
52 52
 	executorpkg "github.com/docker/docker/daemon/cluster/executor"
53 53
 	"github.com/docker/docker/pkg/signal"
54
+	lncluster "github.com/docker/libnetwork/cluster"
54 55
 	swarmapi "github.com/docker/swarmkit/api"
55 56
 	swarmnode "github.com/docker/swarmkit/node"
56 57
 	"github.com/pkg/errors"
... ...
@@ -115,7 +116,7 @@ type Cluster struct {
115 115
 	root         string
116 116
 	runtimeRoot  string
117 117
 	config       Config
118
-	configEvent  chan struct{} // todo: make this array and goroutine safe
118
+	configEvent  chan lncluster.ConfigEventType // todo: make this array and goroutine safe
119 119
 	attachers    map[string]*attacher
120 120
 }
121 121
 
... ...
@@ -147,22 +148,30 @@ func New(config Config) (*Cluster, error) {
147 147
 	c := &Cluster{
148 148
 		root:        root,
149 149
 		config:      config,
150
-		configEvent: make(chan struct{}, 10),
150
+		configEvent: make(chan lncluster.ConfigEventType, 10),
151 151
 		runtimeRoot: config.RuntimeRoot,
152 152
 		attachers:   make(map[string]*attacher),
153 153
 	}
154
+	return c, nil
155
+}
156
+
157
+// Start the Cluster instance
158
+// TODO The split between New and Start can be join again when the SendClusterEvent
159
+// method is no longer required
160
+func (c *Cluster) Start() error {
161
+	root := filepath.Join(c.config.Root, swarmDirName)
154 162
 
155 163
 	nodeConfig, err := loadPersistentState(root)
156 164
 	if err != nil {
157 165
 		if os.IsNotExist(err) {
158
-			return c, nil
166
+			return nil
159 167
 		}
160
-		return nil, err
168
+		return err
161 169
 	}
162 170
 
163 171
 	nr, err := c.newNodeRunner(*nodeConfig)
164 172
 	if err != nil {
165
-		return nil, err
173
+		return err
166 174
 	}
167 175
 	c.nr = nr
168 176
 
... ...
@@ -172,10 +181,10 @@ func New(config Config) (*Cluster, error) {
172 172
 	case err := <-nr.Ready():
173 173
 		if err != nil {
174 174
 			logrus.WithError(err).Error("swarm component could not be started")
175
-			return c, nil
175
+			return nil
176 176
 		}
177 177
 	}
178
-	return c, nil
178
+	return nil
179 179
 }
180 180
 
181 181
 func (c *Cluster) newNodeRunner(conf nodeStartConfig) (*nodeRunner, error) {
... ...
@@ -308,7 +317,7 @@ func (c *Cluster) getRemoteAddressList() []string {
308 308
 // ListenClusterEvents returns a channel that receives messages on cluster
309 309
 // participation changes.
310 310
 // todo: make cancelable and accessible to multiple callers
311
-func (c *Cluster) ListenClusterEvents() <-chan struct{} {
311
+func (c *Cluster) ListenClusterEvents() <-chan lncluster.ConfigEventType {
312 312
 	return c.configEvent
313 313
 }
314 314
 
... ...
@@ -413,3 +422,13 @@ func (c *Cluster) lockedManagerAction(fn func(ctx context.Context, state nodeSta
413 413
 
414 414
 	return fn(ctx, state)
415 415
 }
416
+
417
+// SendClusterEvent allows to send cluster events on the configEvent channel
418
+// TODO This method should not be exposed.
419
+// Currently it is used to notify the network controller that the keys are
420
+// available
421
+func (c *Cluster) SendClusterEvent(event lncluster.ConfigEventType) {
422
+	c.mu.RLock()
423
+	defer c.mu.RUnlock()
424
+	c.configEvent <- event
425
+}
... ...
@@ -11,6 +11,7 @@ import (
11 11
 	"github.com/Sirupsen/logrus"
12 12
 	types "github.com/docker/docker/api/types/swarm"
13 13
 	"github.com/docker/docker/daemon/cluster/executor/container"
14
+	lncluster "github.com/docker/libnetwork/cluster"
14 15
 	swarmapi "github.com/docker/swarmkit/api"
15 16
 	swarmnode "github.com/docker/swarmkit/node"
16 17
 	"github.com/pkg/errors"
... ...
@@ -162,7 +163,7 @@ func (n *nodeRunner) handleControlSocketChange(ctx context.Context, node *swarmn
162 162
 		}
163 163
 		n.grpcConn = conn
164 164
 		n.mu.Unlock()
165
-		n.cluster.configEvent <- struct{}{}
165
+		n.cluster.SendClusterEvent(lncluster.EventSocketChange)
166 166
 	}
167 167
 }
168 168
 
... ...
@@ -175,7 +176,7 @@ func (n *nodeRunner) handleReadyEvent(ctx context.Context, node *swarmnode.Node,
175 175
 		close(ready)
176 176
 	case <-ctx.Done():
177 177
 	}
178
-	n.cluster.configEvent <- struct{}{}
178
+	n.cluster.SendClusterEvent(lncluster.EventNodeReady)
179 179
 }
180 180
 
181 181
 func (n *nodeRunner) handleNodeExit(node *swarmnode.Node) {
... ...
@@ -217,6 +218,7 @@ func (n *nodeRunner) Stop() error {
217 217
 	if err := n.swarmNode.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
218 218
 		return err
219 219
 	}
220
+	n.cluster.SendClusterEvent(lncluster.EventNodeLeave)
220 221
 	<-n.done
221 222
 	return nil
222 223
 }
... ...
@@ -388,7 +388,6 @@ func (c *Cluster) Leave(force bool) error {
388 388
 		}
389 389
 	}
390 390
 
391
-	c.configEvent <- struct{}{}
392 391
 	// todo: cleanup optional?
393 392
 	if err := clearPersistentState(c.root); err != nil {
394 393
 		return err
... ...
@@ -16,6 +16,7 @@ import (
16 16
 	"github.com/docker/docker/pkg/plugingetter"
17 17
 	"github.com/docker/docker/runconfig"
18 18
 	"github.com/docker/libnetwork"
19
+	lncluster "github.com/docker/libnetwork/cluster"
19 20
 	"github.com/docker/libnetwork/driverapi"
20 21
 	"github.com/docker/libnetwork/ipamapi"
21 22
 	networktypes "github.com/docker/libnetwork/types"
... ...
@@ -207,7 +208,6 @@ func (daemon *Daemon) setupIngress(create *clustertypes.NetworkCreateRequest, ip
207 207
 
208 208
 func (daemon *Daemon) releaseIngress(id string) {
209 209
 	controller := daemon.netController
210
-
211 210
 	if err := controller.SandboxDestroy("ingress-sbox"); err != nil {
212 211
 		logrus.Errorf("Failed to delete ingress sandbox: %v", err)
213 212
 	}
... ...
@@ -233,13 +233,17 @@ func (daemon *Daemon) releaseIngress(id string) {
233 233
 		logrus.Errorf("Failed to delete ingress network %s: %v", n.ID(), err)
234 234
 		return
235 235
 	}
236
-
237 236
 	return
238 237
 }
239 238
 
240 239
 // SetNetworkBootstrapKeys sets the bootstrap keys.
241 240
 func (daemon *Daemon) SetNetworkBootstrapKeys(keys []*networktypes.EncryptionKey) error {
242
-	return daemon.netController.SetKeys(keys)
241
+	err := daemon.netController.SetKeys(keys)
242
+	if err == nil {
243
+		// Upon successful key setting dispatch the keys available event
244
+		daemon.cluster.SendClusterEvent(lncluster.EventNetworkKeysAvailable)
245
+	}
246
+	return err
243 247
 }
244 248
 
245 249
 // UpdateAttachment notifies the attacher about the attachment config.
... ...
@@ -1980,3 +1980,24 @@ func (s *DockerSwarmSuite) TestSwarmInitUnspecifiedDataPathAddr(c *check.C) {
1980 1980
 	c.Assert(err, checker.NotNil)
1981 1981
 	c.Assert(out, checker.Contains, "data path address must be a non-zero IP")
1982 1982
 }
1983
+
1984
+func (s *DockerSwarmSuite) TestSwarmJoinLeave(c *check.C) {
1985
+	d := s.AddDaemon(c, true, true)
1986
+
1987
+	out, err := d.Cmd("swarm", "join-token", "-q", "worker")
1988
+	c.Assert(err, checker.IsNil)
1989
+	c.Assert(strings.TrimSpace(out), checker.Not(checker.Equals), "")
1990
+
1991
+	token := strings.TrimSpace(out)
1992
+
1993
+	// Verify that back to back join/leave does not cause panics
1994
+	d1 := s.AddDaemon(c, false, false)
1995
+	for i := 0; i < 10; i++ {
1996
+		out, err = d1.Cmd("swarm", "join", "--token", token, d.ListenAddr)
1997
+		c.Assert(err, checker.IsNil)
1998
+		c.Assert(strings.TrimSpace(out), checker.Not(checker.Equals), "")
1999
+
2000
+		_, err = d1.Cmd("swarm", "leave")
2001
+		c.Assert(err, checker.IsNil)
2002
+	}
2003
+}