Browse code

Switch cluster locking strategy

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>

Tonis Tiigi authored on 2016/11/17 07:17:18
Showing 5 changed files
... ...
@@ -1,16 +1,52 @@
1 1
 package cluster
2 2
 
3
+//
4
+// ## Swarmkit integration
5
+//
6
+// Cluster - static configurable object for accessing everything swarm related.
7
+// Contains methods for connecting and controlling the cluster. Exists always,
8
+// even if swarm mode is not enabled.
9
+//
10
+// NodeRunner - Manager for starting the swarmkit node. Is present only and
11
+// always if swarm mode is enabled. Implements backoff restart loop in case of
12
+// errors.
13
+//
14
+// NodeState - Information about the current node status including access to
15
+// gRPC clients if a manager is active.
16
+//
17
+// ### Locking
18
+//
19
+// `cluster.controlMutex` - taken for the whole lifecycle of the processes that
20
+// can reconfigure cluster(init/join/leave etc). Protects that one
21
+// reconfiguration action has fully completed before another can start.
22
+//
23
+// `cluster.mu` - taken when the actual changes in cluster configurations
24
+// happen. Different from `controlMutex` because in some cases we need to
25
+// access current cluster state even if the long-running reconfiguration is
26
+// going on. For example network stack may ask for the current cluster state in
27
+// the middle of the shutdown. Any time current cluster state is asked you
28
+// should take the read lock of `cluster.mu`. If you are writing an API
29
+// responder that returns synchronously, hold `cluster.mu.RLock()` for the
30
+// duration of the whole handler function. That ensures that node will not be
31
+// shut down until the handler has finished.
32
+//
33
+// NodeRunner implements its internal locks that should not be used outside of
34
+// the struct. Instead, you should just call `nodeRunner.State()` method to get
35
+// the current state of the cluster(still need `cluster.mu.RLock()` to access
36
+// `cluster.nr` reference itself). Most of the changes in NodeRunner happen
37
+// because of an external event(network problem, unexpected swarmkit error) and
38
+// Docker shouldn't take any locks that delay these changes from happening.
39
+//
40
+
3 41
 import (
4 42
 	"crypto/x509"
5 43
 	"encoding/base64"
6 44
 	"encoding/json"
7 45
 	"fmt"
8 46
 	"io"
9
-	"io/ioutil"
10 47
 	"net"
11 48
 	"os"
12 49
 	"path/filepath"
13
-	"runtime"
14 50
 	"strings"
15 51
 	"sync"
16 52
 	"time"
... ...
@@ -25,7 +61,6 @@ import (
25 25
 	types "github.com/docker/docker/api/types/swarm"
26 26
 	"github.com/docker/docker/daemon/cluster/convert"
27 27
 	executorpkg "github.com/docker/docker/daemon/cluster/executor"
28
-	"github.com/docker/docker/daemon/cluster/executor/container"
29 28
 	"github.com/docker/docker/daemon/logger"
30 29
 	"github.com/docker/docker/opts"
31 30
 	"github.com/docker/docker/pkg/ioutils"
... ...
@@ -39,7 +74,6 @@ import (
39 39
 	"github.com/docker/swarmkit/protobuf/ptypes"
40 40
 	"github.com/pkg/errors"
41 41
 	"golang.org/x/net/context"
42
-	"google.golang.org/grpc"
43 42
 )
44 43
 
45 44
 const swarmDirName = "swarm"
... ...
@@ -95,19 +129,14 @@ type Config struct {
95 95
 // Cluster provides capabilities to participate in a cluster as a worker or a
96 96
 // manager.
97 97
 type Cluster struct {
98
-	sync.RWMutex
99
-	*node
100
-	root            string
101
-	runtimeRoot     string
102
-	config          Config
103
-	configEvent     chan struct{} // todo: make this array and goroutine safe
104
-	actualLocalAddr string        // after resolution, not persisted
105
-	stop            bool
106
-	err             error
107
-	cancelDelay     func()
108
-	attachers       map[string]*attacher
109
-	locked          bool
110
-	lastNodeConfig  *nodeStartConfig
98
+	mu           sync.RWMutex
99
+	controlMutex sync.RWMutex // protect init/join/leave user operations
100
+	nr           *nodeRunner
101
+	root         string
102
+	runtimeRoot  string
103
+	config       Config
104
+	configEvent  chan struct{} // todo: make this array and goroutine safe
105
+	attachers    map[string]*attacher
111 106
 }
112 107
 
113 108
 // attacher manages the in-memory attachment state of a container
... ...
@@ -122,38 +151,6 @@ type attacher struct {
122 122
 	detachWaitCh     chan struct{}
123 123
 }
124 124
 
125
-type node struct {
126
-	*swarmnode.Node
127
-	done           chan struct{}
128
-	ready          bool
129
-	conn           *grpc.ClientConn
130
-	client         swarmapi.ControlClient
131
-	logs           swarmapi.LogsClient
132
-	reconnectDelay time.Duration
133
-	config         nodeStartConfig
134
-}
135
-
136
-// nodeStartConfig holds configuration needed to start a new node. Exported
137
-// fields of this structure are saved to disk in json. Unexported fields
138
-// contain data that shouldn't be persisted between daemon reloads.
139
-type nodeStartConfig struct {
140
-	// LocalAddr is this machine's local IP or hostname, if specified.
141
-	LocalAddr string
142
-	// RemoteAddr is the address that was given to "swarm join". It is used
143
-	// to find LocalAddr if necessary.
144
-	RemoteAddr string
145
-	// ListenAddr is the address we bind to, including a port.
146
-	ListenAddr string
147
-	// AdvertiseAddr is the address other nodes should connect to,
148
-	// including a port.
149
-	AdvertiseAddr   string
150
-	joinAddr        string
151
-	forceNewCluster bool
152
-	joinToken       string
153
-	lockKey         []byte
154
-	autolock        bool
155
-}
156
-
157 125
 // New creates a new Cluster instance using provided config.
158 126
 func New(config Config) (*Cluster, error) {
159 127
 	root := filepath.Join(config.Root, swarmDirName)
... ...
@@ -174,7 +171,7 @@ func New(config Config) (*Cluster, error) {
174 174
 		attachers:   make(map[string]*attacher),
175 175
 	}
176 176
 
177
-	nodeConfig, err := c.loadState()
177
+	nodeConfig, err := loadPersistentState(root)
178 178
 	if err != nil {
179 179
 		if os.IsNotExist(err) {
180 180
 			return c, nil
... ...
@@ -182,95 +179,30 @@ func New(config Config) (*Cluster, error) {
182 182
 		return nil, err
183 183
 	}
184 184
 
185
-	n, err := c.startNewNode(*nodeConfig)
185
+	nr, err := c.newNodeRunner(*nodeConfig)
186 186
 	if err != nil {
187 187
 		return nil, err
188 188
 	}
189
+	c.nr = nr
189 190
 
190 191
 	select {
191 192
 	case <-time.After(swarmConnectTimeout):
192 193
 		logrus.Error("swarm component could not be started before timeout was reached")
193
-	case <-n.Ready():
194
-	case <-n.done:
195
-		if errors.Cause(c.err) == ErrSwarmLocked {
196
-			return c, nil
197
-		}
198
-		if err, ok := errors.Cause(c.err).(x509.CertificateInvalidError); ok && err.Reason == x509.Expired {
199
-			c.err = ErrSwarmCertificatesExpired
200
-			return c, nil
201
-		}
202
-		return nil, fmt.Errorf("swarm component could not be started: %v", c.err)
203
-	}
204
-	go c.reconnectOnFailure(n)
205
-	return c, nil
206
-}
207
-
208
-func (c *Cluster) loadState() (*nodeStartConfig, error) {
209
-	dt, err := ioutil.ReadFile(filepath.Join(c.root, stateFile))
210
-	if err != nil {
211
-		return nil, err
212
-	}
213
-	// missing certificate means no actual state to restore from
214
-	if _, err := os.Stat(filepath.Join(c.root, "certificates/swarm-node.crt")); err != nil {
215
-		if os.IsNotExist(err) {
216
-			c.clearState()
217
-		}
218
-		return nil, err
219
-	}
220
-	var st nodeStartConfig
221
-	if err := json.Unmarshal(dt, &st); err != nil {
222
-		return nil, err
223
-	}
224
-	return &st, nil
225
-}
226
-
227
-func (c *Cluster) saveState(config nodeStartConfig) error {
228
-	dt, err := json.Marshal(config)
229
-	if err != nil {
230
-		return err
231
-	}
232
-	return ioutils.AtomicWriteFile(filepath.Join(c.root, stateFile), dt, 0600)
233
-}
234
-
235
-func (c *Cluster) reconnectOnFailure(n *node) {
236
-	for {
237
-		<-n.done
238
-		c.Lock()
239
-		if c.stop || c.node != nil {
240
-			c.Unlock()
241
-			return
242
-		}
243
-		n.reconnectDelay *= 2
244
-		if n.reconnectDelay > maxReconnectDelay {
245
-			n.reconnectDelay = maxReconnectDelay
246
-		}
247
-		logrus.Warnf("Restarting swarm in %.2f seconds", n.reconnectDelay.Seconds())
248
-		delayCtx, cancel := context.WithTimeout(context.Background(), n.reconnectDelay)
249
-		c.cancelDelay = cancel
250
-		c.Unlock()
251
-		<-delayCtx.Done()
252
-		if delayCtx.Err() != context.DeadlineExceeded {
253
-			return
254
-		}
255
-		c.Lock()
256
-		if c.node != nil {
257
-			c.Unlock()
258
-			return
259
-		}
260
-		var err error
261
-		config := n.config
262
-		config.RemoteAddr = c.getRemoteAddress()
263
-		config.joinAddr = config.RemoteAddr
264
-		n, err = c.startNewNode(config)
194
+	case err := <-nr.Ready():
265 195
 		if err != nil {
266
-			c.err = err
267
-			close(n.done)
196
+			if errors.Cause(err) == ErrSwarmLocked {
197
+				return c, nil
198
+			}
199
+			if err, ok := errors.Cause(c.nr.err).(x509.CertificateInvalidError); ok && err.Reason == x509.Expired {
200
+				return c, nil
201
+			}
202
+			return nil, errors.Wrap(err, "swarm component could not be started")
268 203
 		}
269
-		c.Unlock()
270 204
 	}
205
+	return c, nil
271 206
 }
272 207
 
273
-func (c *Cluster) startNewNode(conf nodeStartConfig) (*node, error) {
208
+func (c *Cluster) newNodeRunner(conf nodeStartConfig) (*nodeRunner, error) {
274 209
 	if err := c.config.Backend.IsSwarmCompatible(); err != nil {
275 210
 		return nil, err
276 211
 	}
... ...
@@ -304,128 +236,47 @@ func (c *Cluster) startNewNode(conf nodeStartConfig) (*node, error) {
304 304
 		}
305 305
 	}
306 306
 
307
-	var control string
308
-	if runtime.GOOS == "windows" {
309
-		control = `\\.\pipe\` + controlSocket
310
-	} else {
311
-		control = filepath.Join(c.runtimeRoot, controlSocket)
312
-	}
313
-
314
-	c.node = nil
315
-	c.cancelDelay = nil
316
-	c.stop = false
317
-	n, err := swarmnode.New(&swarmnode.Config{
318
-		Hostname:           c.config.Name,
319
-		ForceNewCluster:    conf.forceNewCluster,
320
-		ListenControlAPI:   control,
321
-		ListenRemoteAPI:    conf.ListenAddr,
322
-		AdvertiseRemoteAPI: conf.AdvertiseAddr,
323
-		JoinAddr:           conf.joinAddr,
324
-		StateDir:           c.root,
325
-		JoinToken:          conf.joinToken,
326
-		Executor:           container.NewExecutor(c.config.Backend),
327
-		HeartbeatTick:      1,
328
-		ElectionTick:       3,
329
-		UnlockKey:          conf.lockKey,
330
-		AutoLockManagers:   conf.autolock,
331
-	})
307
+	nr := &nodeRunner{cluster: c}
308
+	nr.actualLocalAddr = actualLocalAddr
332 309
 
333
-	if err != nil {
310
+	if err := nr.Start(conf); err != nil {
334 311
 		return nil, err
335 312
 	}
336
-	ctx := context.Background()
337
-	if err := n.Start(ctx); err != nil {
338
-		return nil, err
339
-	}
340
-	node := &node{
341
-		Node:           n,
342
-		done:           make(chan struct{}),
343
-		reconnectDelay: initialReconnectDelay,
344
-		config:         conf,
345
-	}
346
-	c.node = node
347
-	c.actualLocalAddr = actualLocalAddr // not saved
348
-	c.saveState(conf)
349 313
 
350 314
 	c.config.Backend.SetClusterProvider(c)
351
-	go func() {
352
-		err := detectLockedError(n.Err(ctx))
353
-		if err != nil {
354
-			logrus.Errorf("cluster exited with error: %v", err)
355
-		}
356
-		c.Lock()
357
-		c.node = nil
358
-		c.err = err
359
-		if errors.Cause(err) == ErrSwarmLocked {
360
-			c.locked = true
361
-			confClone := conf
362
-			c.lastNodeConfig = &confClone
363
-		}
364
-		c.Unlock()
365
-		close(node.done)
366
-	}()
367
-
368
-	go func() {
369
-		select {
370
-		case <-n.Ready():
371
-			c.Lock()
372
-			node.ready = true
373
-			c.err = nil
374
-			c.Unlock()
375
-		case <-ctx.Done():
376
-		}
377
-		c.configEvent <- struct{}{}
378
-	}()
379
-
380
-	go func() {
381
-		for conn := range n.ListenControlSocket(ctx) {
382
-			c.Lock()
383
-			if node.conn != conn {
384
-				if conn == nil {
385
-					node.client = nil
386
-					node.logs = nil
387
-				} else {
388
-					node.client = swarmapi.NewControlClient(conn)
389
-					node.logs = swarmapi.NewLogsClient(conn)
390
-				}
391
-			}
392
-			node.conn = conn
393
-			c.Unlock()
394
-			c.configEvent <- struct{}{}
395
-		}
396
-	}()
397 315
 
398
-	return node, nil
316
+	return nr, nil
399 317
 }
400 318
 
401 319
 // Init initializes new cluster from user provided request.
402 320
 func (c *Cluster) Init(req types.InitRequest) (string, error) {
403
-	c.Lock()
404
-	if c.swarmExists() {
405
-		if !req.ForceNewCluster {
406
-			c.Unlock()
321
+	c.controlMutex.Lock()
322
+	defer c.controlMutex.Unlock()
323
+	c.mu.Lock()
324
+	if c.nr != nil {
325
+		if req.ForceNewCluster {
326
+			if err := c.nr.Stop(); err != nil {
327
+				c.mu.Unlock()
328
+				return "", err
329
+			}
330
+		} else {
331
+			c.mu.Unlock()
407 332
 			return "", ErrSwarmExists
408 333
 		}
409
-		if err := c.stopNode(); err != nil {
410
-			c.Unlock()
411
-			return "", err
412
-		}
413 334
 	}
335
+	c.mu.Unlock()
414 336
 
415 337
 	if err := validateAndSanitizeInitRequest(&req); err != nil {
416
-		c.Unlock()
417 338
 		return "", err
418 339
 	}
419 340
 
420 341
 	listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
421 342
 	if err != nil {
422
-		c.Unlock()
423 343
 		return "", err
424 344
 	}
425 345
 
426 346
 	advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
427 347
 	if err != nil {
428
-		c.Unlock()
429 348
 		return "", err
430 349
 	}
431 350
 
... ...
@@ -451,7 +302,6 @@ func (c *Cluster) Init(req types.InitRequest) (string, error) {
451 451
 		if !found {
452 452
 			ip, err := c.resolveSystemAddr()
453 453
 			if err != nil {
454
-				c.Unlock()
455 454
 				logrus.Warnf("Could not find a local address: %v", err)
456 455
 				return "", errMustSpecifyListenAddr
457 456
 			}
... ...
@@ -459,8 +309,11 @@ func (c *Cluster) Init(req types.InitRequest) (string, error) {
459 459
 		}
460 460
 	}
461 461
 
462
-	// todo: check current state existing
463
-	n, err := c.startNewNode(nodeStartConfig{
462
+	if !req.ForceNewCluster {
463
+		clearPersistentState(c.root)
464
+	}
465
+
466
+	nr, err := c.newNodeRunner(nodeStartConfig{
464 467
 		forceNewCluster: req.ForceNewCluster,
465 468
 		autolock:        req.AutoLockManagers,
466 469
 		LocalAddr:       localAddr,
... ...
@@ -468,45 +321,52 @@ func (c *Cluster) Init(req types.InitRequest) (string, error) {
468 468
 		AdvertiseAddr:   net.JoinHostPort(advertiseHost, advertisePort),
469 469
 	})
470 470
 	if err != nil {
471
-		c.Unlock()
472 471
 		return "", err
473 472
 	}
474
-	c.Unlock()
473
+	c.mu.Lock()
474
+	c.nr = nr
475
+	c.mu.Unlock()
475 476
 
476
-	select {
477
-	case <-n.Ready():
478
-		if err := initClusterSpec(n, req.Spec); err != nil {
479
-			return "", err
480
-		}
481
-		go c.reconnectOnFailure(n)
482
-		return n.NodeID(), nil
483
-	case <-n.done:
484
-		c.RLock()
485
-		defer c.RUnlock()
477
+	if err := <-nr.Ready(); err != nil {
486 478
 		if !req.ForceNewCluster { // if failure on first attempt don't keep state
487
-			if err := c.clearState(); err != nil {
479
+			if err := clearPersistentState(c.root); err != nil {
488 480
 				return "", err
489 481
 			}
490 482
 		}
491
-		return "", c.err
483
+		if err != nil {
484
+			c.mu.Lock()
485
+			c.nr = nil
486
+			c.mu.Unlock()
487
+		}
488
+		return "", err
492 489
 	}
490
+	state := nr.State()
491
+	if state.swarmNode == nil { // should never happen but protect from panic
492
+		return "", errors.New("invalid cluster state for spec initialization")
493
+	}
494
+	if err := initClusterSpec(state.swarmNode, req.Spec); err != nil {
495
+		return "", err
496
+	}
497
+	return state.NodeID(), nil
493 498
 }
494 499
 
495 500
 // Join makes current Cluster part of an existing swarm cluster.
496 501
 func (c *Cluster) Join(req types.JoinRequest) error {
497
-	c.Lock()
498
-	if c.swarmExists() {
499
-		c.Unlock()
502
+	c.controlMutex.Lock()
503
+	defer c.controlMutex.Unlock()
504
+	c.mu.Lock()
505
+	if c.nr != nil {
506
+		c.mu.Unlock()
500 507
 		return ErrSwarmExists
501 508
 	}
509
+	c.mu.Unlock()
510
+
502 511
 	if err := validateAndSanitizeJoinRequest(&req); err != nil {
503
-		c.Unlock()
504 512
 		return err
505 513
 	}
506 514
 
507 515
 	listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
508 516
 	if err != nil {
509
-		c.Unlock()
510 517
 		return err
511 518
 	}
512 519
 
... ...
@@ -520,8 +380,9 @@ func (c *Cluster) Join(req types.JoinRequest) error {
520 520
 		}
521 521
 	}
522 522
 
523
-	// todo: check current state existing
524
-	n, err := c.startNewNode(nodeStartConfig{
523
+	clearPersistentState(c.root)
524
+
525
+	nr, err := c.newNodeRunner(nodeStartConfig{
525 526
 		RemoteAddr:    req.RemoteAddrs[0],
526 527
 		ListenAddr:    net.JoinHostPort(listenHost, listenPort),
527 528
 		AdvertiseAddr: advertiseAddr,
... ...
@@ -529,46 +390,40 @@ func (c *Cluster) Join(req types.JoinRequest) error {
529 529
 		joinToken:     req.JoinToken,
530 530
 	})
531 531
 	if err != nil {
532
-		c.Unlock()
533 532
 		return err
534 533
 	}
535
-	c.Unlock()
534
+
535
+	c.mu.Lock()
536
+	c.nr = nr
537
+	c.mu.Unlock()
536 538
 
537 539
 	select {
538 540
 	case <-time.After(swarmConnectTimeout):
539
-		// attempt to connect will continue in background, but reconnect only if it didn't fail
540
-		go func() {
541
-			select {
542
-			case <-n.Ready():
543
-				c.reconnectOnFailure(n)
544
-			case <-n.done:
545
-				logrus.Errorf("failed to join the cluster: %+v", c.err)
546
-			}
547
-		}()
548 541
 		return ErrSwarmJoinTimeoutReached
549
-	case <-n.Ready():
550
-		go c.reconnectOnFailure(n)
551
-		return nil
552
-	case <-n.done:
553
-		c.RLock()
554
-		defer c.RUnlock()
555
-		return c.err
542
+	case err := <-nr.Ready():
543
+		if err != nil {
544
+			c.mu.Lock()
545
+			c.nr = nil
546
+			c.mu.Unlock()
547
+		}
548
+		return err
556 549
 	}
557 550
 }
558 551
 
559 552
 // GetUnlockKey returns the unlock key for the swarm.
560 553
 func (c *Cluster) GetUnlockKey() (string, error) {
561
-	c.RLock()
562
-	defer c.RUnlock()
554
+	c.mu.RLock()
555
+	defer c.mu.RUnlock()
563 556
 
564
-	if !c.isActiveManager() {
565
-		return "", c.errNoManager()
557
+	state := c.currentNodeState()
558
+	if !state.IsActiveManager() {
559
+		return "", c.errNoManager(state)
566 560
 	}
567 561
 
568 562
 	ctx, cancel := c.getRequestContext()
569 563
 	defer cancel()
570 564
 
571
-	client := swarmapi.NewCAClient(c.conn)
565
+	client := swarmapi.NewCAClient(state.grpcConn)
572 566
 
573 567
 	r, err := client.GetUnlockKey(ctx, &swarmapi.GetUnlockKeyRequest{})
574 568
 	if err != nil {
... ...
@@ -585,141 +440,104 @@ func (c *Cluster) GetUnlockKey() (string, error) {
585 585
 
586 586
 // UnlockSwarm provides a key to decrypt data that is encrypted at rest.
587 587
 func (c *Cluster) UnlockSwarm(req types.UnlockRequest) error {
588
-	c.RLock()
589
-	if !c.isActiveManager() {
590
-		if err := c.errNoManager(); err != ErrSwarmLocked {
591
-			c.RUnlock()
592
-			return err
593
-		}
594
-	}
595
-
596
-	if c.node != nil || c.locked != true {
597
-		c.RUnlock()
588
+	c.controlMutex.Lock()
589
+	defer c.controlMutex.Unlock()
590
+
591
+	c.mu.RLock()
592
+	state := c.currentNodeState()
593
+	nr := c.nr
594
+	c.mu.RUnlock()
595
+	if nr == nil || errors.Cause(state.err) != ErrSwarmLocked {
598 596
 		return errors.New("swarm is not locked")
599 597
 	}
600
-	c.RUnlock()
601
-
602 598
 	key, err := encryption.ParseHumanReadableKey(req.UnlockKey)
603 599
 	if err != nil {
604 600
 		return err
605 601
 	}
606 602
 
607
-	c.Lock()
608
-	config := *c.lastNodeConfig
603
+	config := nr.config
609 604
 	config.lockKey = key
610
-	n, err := c.startNewNode(config)
605
+	if err := nr.Stop(); err != nil {
606
+		return err
607
+	}
608
+	nr, err = c.newNodeRunner(config)
611 609
 	if err != nil {
612
-		c.Unlock()
613 610
 		return err
614 611
 	}
615
-	c.Unlock()
616
-	select {
617
-	case <-n.Ready():
618
-	case <-n.done:
619
-		if errors.Cause(c.err) == ErrSwarmLocked {
612
+
613
+	c.mu.Lock()
614
+	c.nr = nr
615
+	c.mu.Unlock()
616
+
617
+	if err := <-nr.Ready(); err != nil {
618
+		if errors.Cause(err) == ErrSwarmLocked {
620 619
 			return errors.New("swarm could not be unlocked: invalid key provided")
621 620
 		}
622
-		return fmt.Errorf("swarm component could not be started: %v", c.err)
621
+		return fmt.Errorf("swarm component could not be started: %v", err)
623 622
 	}
624
-	go c.reconnectOnFailure(n)
625 623
 	return nil
626 624
 }
627 625
 
628
-// stopNode is a helper that stops the active c.node and waits until it has
629
-// shut down. Call while keeping the cluster lock.
630
-func (c *Cluster) stopNode() error {
631
-	if c.node == nil {
632
-		return nil
633
-	}
634
-	c.stop = true
635
-	if c.cancelDelay != nil {
636
-		c.cancelDelay()
637
-		c.cancelDelay = nil
638
-	}
639
-	node := c.node
640
-	ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
641
-	defer cancel()
642
-	// TODO: can't hold lock on stop because it calls back to network
643
-	c.Unlock()
644
-	defer c.Lock()
645
-	if err := node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
646
-		return err
647
-	}
648
-	<-node.done
649
-	return nil
650
-}
651
-
652
-func removingManagerCausesLossOfQuorum(reachable, unreachable int) bool {
653
-	return reachable-2 <= unreachable
654
-}
655
-
656
-func isLastManager(reachable, unreachable int) bool {
657
-	return reachable == 1 && unreachable == 0
658
-}
659
-
660 626
 // Leave shuts down Cluster and removes current state.
661 627
 func (c *Cluster) Leave(force bool) error {
662
-	c.Lock()
663
-	node := c.node
664
-	if node == nil {
665
-		if c.locked {
666
-			c.locked = false
667
-			c.lastNodeConfig = nil
668
-			c.Unlock()
669
-		} else if c.err == ErrSwarmCertificatesExpired {
670
-			c.err = nil
671
-			c.Unlock()
672
-		} else {
673
-			c.Unlock()
674
-			return ErrNoSwarm
675
-		}
676
-	} else {
677
-		if node.Manager() != nil && !force {
678
-			msg := "You are attempting to leave the swarm on a node that is participating as a manager. "
679
-			if c.isActiveManager() {
680
-				active, reachable, unreachable, err := c.managerStats()
681
-				if err == nil {
682
-					if active && removingManagerCausesLossOfQuorum(reachable, unreachable) {
683
-						if isLastManager(reachable, unreachable) {
684
-							msg += "Removing the last manager erases all current state of the swarm. Use `--force` to ignore this message. "
685
-							c.Unlock()
686
-							return fmt.Errorf(msg)
687
-						}
688
-						msg += fmt.Sprintf("Removing this node leaves %v managers out of %v. Without a Raft quorum your swarm will be inaccessible. ", reachable-1, reachable+unreachable)
628
+	c.controlMutex.Lock()
629
+	defer c.controlMutex.Unlock()
630
+
631
+	c.mu.Lock()
632
+	nr := c.nr
633
+	if nr == nil {
634
+		c.mu.Unlock()
635
+		return ErrNoSwarm
636
+	}
637
+	state := c.currentNodeState()
638
+	if state.IsManager() && !force {
639
+		msg := "You are attempting to leave the swarm on a node that is participating as a manager. "
640
+		if state.IsActiveManager() {
641
+			active, reachable, unreachable, err := managerStats(state.controlClient, state.NodeID())
642
+			if err == nil {
643
+				if active && removingManagerCausesLossOfQuorum(reachable, unreachable) {
644
+					if isLastManager(reachable, unreachable) {
645
+						msg += "Removing the last manager erases all current state of the swarm. Use `--force` to ignore this message. "
646
+						c.mu.Unlock()
647
+						return fmt.Errorf(msg)
689 648
 					}
649
+					msg += fmt.Sprintf("Removing this node leaves %v managers out of %v. Without a Raft quorum your swarm will be inaccessible. ", reachable-1, reachable+unreachable)
690 650
 				}
691
-			} else {
692
-				msg += "Doing so may lose the consensus of your cluster. "
693 651
 			}
694
-
695
-			msg += "The only way to restore a swarm that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to suppress this message."
696
-			c.Unlock()
697
-			return fmt.Errorf(msg)
652
+		} else {
653
+			msg += "Doing so may lose the consensus of your cluster. "
698 654
 		}
699
-		if err := c.stopNode(); err != nil {
700
-			logrus.Errorf("failed to shut down cluster node: %v", err)
701
-			signal.DumpStacks("")
702
-			c.Unlock()
655
+
656
+		msg += "The only way to restore a swarm that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to suppress this message."
657
+		c.mu.Unlock()
658
+		return fmt.Errorf(msg)
659
+	}
660
+	// release readers in here
661
+	if err := nr.Stop(); err != nil {
662
+		logrus.Errorf("failed to shut down cluster node: %v", err)
663
+		signal.DumpStacks("")
664
+		c.mu.Unlock()
665
+		return err
666
+	}
667
+	c.nr = nil
668
+	c.mu.Unlock()
669
+	if nodeID := state.NodeID(); nodeID != "" {
670
+		nodeContainers, err := c.listContainerForNode(nodeID)
671
+		if err != nil {
703 672
 			return err
704 673
 		}
705
-		c.Unlock()
706
-		if nodeID := node.NodeID(); nodeID != "" {
707
-			nodeContainers, err := c.listContainerForNode(nodeID)
708
-			if err != nil {
709
-				return err
710
-			}
711
-			for _, id := range nodeContainers {
712
-				if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil {
713
-					logrus.Errorf("error removing %v: %v", id, err)
714
-				}
674
+		for _, id := range nodeContainers {
675
+			if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil {
676
+				logrus.Errorf("error removing %v: %v", id, err)
715 677
 			}
716 678
 		}
717 679
 	}
718 680
 	c.configEvent <- struct{}{}
719 681
 	// todo: cleanup optional?
720
-	if err := c.clearState(); err != nil {
682
+	if err := clearPersistentState(c.root); err != nil {
721 683
 		return err
722 684
 	}
685
+	c.config.Backend.SetClusterProvider(nil)
723 686
 	return nil
724 687
 }
725 688
 
... ...
@@ -739,35 +557,24 @@ func (c *Cluster) listContainerForNode(nodeID string) ([]string, error) {
739 739
 	return ids, nil
740 740
 }
741 741
 
742
-func (c *Cluster) clearState() error {
743
-	// todo: backup this data instead of removing?
744
-	if err := os.RemoveAll(c.root); err != nil {
745
-		return err
746
-	}
747
-	if err := os.MkdirAll(c.root, 0700); err != nil {
748
-		return err
749
-	}
750
-	c.config.Backend.SetClusterProvider(nil)
751
-	return nil
752
-}
753
-
754 742
 func (c *Cluster) getRequestContext() (context.Context, func()) { // TODO: not needed when requests don't block on qourum lost
755 743
 	return context.WithTimeout(context.Background(), swarmRequestTimeout)
756 744
 }
757 745
 
758 746
 // Inspect retrieves the configuration properties of a managed swarm cluster.
759 747
 func (c *Cluster) Inspect() (types.Swarm, error) {
760
-	c.RLock()
761
-	defer c.RUnlock()
748
+	c.mu.RLock()
749
+	defer c.mu.RUnlock()
762 750
 
763
-	if !c.isActiveManager() {
764
-		return types.Swarm{}, c.errNoManager()
751
+	state := c.currentNodeState()
752
+	if !state.IsActiveManager() {
753
+		return types.Swarm{}, c.errNoManager(state)
765 754
 	}
766 755
 
767 756
 	ctx, cancel := c.getRequestContext()
768 757
 	defer cancel()
769 758
 
770
-	swarm, err := getSwarm(ctx, c.client)
759
+	swarm, err := getSwarm(ctx, state.controlClient)
771 760
 	if err != nil {
772 761
 		return types.Swarm{}, err
773 762
 	}
... ...
@@ -777,17 +584,18 @@ func (c *Cluster) Inspect() (types.Swarm, error) {
777 777
 
778 778
 // Update updates configuration of a managed swarm cluster.
779 779
 func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error {
780
-	c.RLock()
781
-	defer c.RUnlock()
780
+	c.mu.RLock()
781
+	defer c.mu.RUnlock()
782 782
 
783
-	if !c.isActiveManager() {
784
-		return c.errNoManager()
783
+	state := c.currentNodeState()
784
+	if !state.IsActiveManager() {
785
+		return c.errNoManager(state)
785 786
 	}
786 787
 
787 788
 	ctx, cancel := c.getRequestContext()
788 789
 	defer cancel()
789 790
 
790
-	swarm, err := getSwarm(ctx, c.client)
791
+	swarm, err := getSwarm(ctx, state.controlClient)
791 792
 	if err != nil {
792 793
 		return err
793 794
 	}
... ...
@@ -800,7 +608,7 @@ func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlag
800 800
 		return err
801 801
 	}
802 802
 
803
-	_, err = c.client.UpdateCluster(
803
+	_, err = state.controlClient.UpdateCluster(
804 804
 		ctx,
805 805
 		&swarmapi.UpdateClusterRequest{
806 806
 			ClusterID: swarm.ID,
... ...
@@ -820,61 +628,62 @@ func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlag
820 820
 
821 821
 // IsManager returns true if Cluster is participating as a manager.
822 822
 func (c *Cluster) IsManager() bool {
823
-	c.RLock()
824
-	defer c.RUnlock()
825
-	return c.isActiveManager()
823
+	c.mu.RLock()
824
+	defer c.mu.RUnlock()
825
+	return c.currentNodeState().IsActiveManager()
826 826
 }
827 827
 
828 828
 // IsAgent returns true if Cluster is participating as a worker/agent.
829 829
 func (c *Cluster) IsAgent() bool {
830
-	c.RLock()
831
-	defer c.RUnlock()
832
-	return c.node != nil && c.ready
830
+	c.mu.RLock()
831
+	defer c.mu.RUnlock()
832
+	return c.currentNodeState().status == types.LocalNodeStateActive
833 833
 }
834 834
 
835 835
 // GetLocalAddress returns the local address.
836 836
 func (c *Cluster) GetLocalAddress() string {
837
-	c.RLock()
838
-	defer c.RUnlock()
839
-	return c.actualLocalAddr
837
+	c.mu.RLock()
838
+	defer c.mu.RUnlock()
839
+	return c.currentNodeState().actualLocalAddr
840 840
 }
841 841
 
842 842
 // GetListenAddress returns the listen address.
843 843
 func (c *Cluster) GetListenAddress() string {
844
-	c.RLock()
845
-	defer c.RUnlock()
846
-	if c.node != nil {
847
-		return c.node.config.ListenAddr
844
+	c.mu.RLock()
845
+	defer c.mu.RUnlock()
846
+	if c.nr != nil {
847
+		return c.nr.config.ListenAddr
848 848
 	}
849 849
 	return ""
850 850
 }
851 851
 
852 852
 // GetAdvertiseAddress returns the remotely reachable address of this node.
853 853
 func (c *Cluster) GetAdvertiseAddress() string {
854
-	c.RLock()
855
-	defer c.RUnlock()
856
-	if c.node != nil && c.node.config.AdvertiseAddr != "" {
857
-		advertiseHost, _, _ := net.SplitHostPort(c.node.config.AdvertiseAddr)
854
+	c.mu.RLock()
855
+	defer c.mu.RUnlock()
856
+	if c.nr != nil && c.nr.config.AdvertiseAddr != "" {
857
+		advertiseHost, _, _ := net.SplitHostPort(c.nr.config.AdvertiseAddr)
858 858
 		return advertiseHost
859 859
 	}
860
-	return c.actualLocalAddr
860
+	return c.currentNodeState().actualLocalAddr
861 861
 }
862 862
 
863 863
 // GetRemoteAddress returns a known advertise address of a remote manager if
864 864
 // available.
865 865
 // todo: change to array/connect with info
866 866
 func (c *Cluster) GetRemoteAddress() string {
867
-	c.RLock()
868
-	defer c.RUnlock()
867
+	c.mu.RLock()
868
+	defer c.mu.RUnlock()
869 869
 	return c.getRemoteAddress()
870 870
 }
871 871
 
872 872
 func (c *Cluster) getRemoteAddress() string {
873
-	if c.node == nil {
873
+	state := c.currentNodeState()
874
+	if state.swarmNode == nil {
874 875
 		return ""
875 876
 	}
876
-	nodeID := c.node.NodeID()
877
-	for _, r := range c.node.Remotes() {
877
+	nodeID := state.swarmNode.NodeID()
878
+	for _, r := range state.swarmNode.Remotes() {
878 879
 		if r.NodeID != nodeID {
879 880
 			return r.Addr
880 881
 		}
... ...
@@ -894,36 +703,19 @@ func (c *Cluster) Info() types.Info {
894 894
 	info := types.Info{
895 895
 		NodeAddr: c.GetAdvertiseAddress(),
896 896
 	}
897
+	c.mu.RLock()
898
+	defer c.mu.RUnlock()
897 899
 
898
-	c.RLock()
899
-	defer c.RUnlock()
900
-
901
-	if c.node == nil {
902
-		info.LocalNodeState = types.LocalNodeStateInactive
903
-		if c.cancelDelay != nil {
904
-			info.LocalNodeState = types.LocalNodeStateError
905
-		}
906
-		if c.locked {
907
-			info.LocalNodeState = types.LocalNodeStateLocked
908
-		} else if c.err == ErrSwarmCertificatesExpired {
909
-			info.LocalNodeState = types.LocalNodeStateError
910
-		}
911
-	} else {
912
-		info.LocalNodeState = types.LocalNodeStatePending
913
-		if c.ready == true {
914
-			info.LocalNodeState = types.LocalNodeStateActive
915
-		} else if c.locked {
916
-			info.LocalNodeState = types.LocalNodeStateLocked
917
-		}
918
-	}
919
-	if c.err != nil {
920
-		info.Error = c.err.Error()
900
+	state := c.currentNodeState()
901
+	info.LocalNodeState = state.status
902
+	if state.err != nil {
903
+		info.Error = state.err.Error()
921 904
 	}
922 905
 
923 906
 	ctx, cancel := c.getRequestContext()
924 907
 	defer cancel()
925 908
 
926
-	if c.isActiveManager() {
909
+	if state.IsActiveManager() {
927 910
 		info.ControlAvailable = true
928 911
 		swarm, err := c.Inspect()
929 912
 		if err != nil {
... ...
@@ -933,7 +725,7 @@ func (c *Cluster) Info() types.Info {
933 933
 		// Strip JoinTokens
934 934
 		info.Cluster = swarm.ClusterInfo
935 935
 
936
-		if r, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{}); err == nil {
936
+		if r, err := state.controlClient.ListNodes(ctx, &swarmapi.ListNodesRequest{}); err == nil {
937 937
 			info.Nodes = len(r.Nodes)
938 938
 			for _, n := range r.Nodes {
939 939
 				if n.ManagerStatus != nil {
... ...
@@ -943,39 +735,34 @@ func (c *Cluster) Info() types.Info {
943 943
 		}
944 944
 	}
945 945
 
946
-	if c.node != nil {
947
-		for _, r := range c.node.Remotes() {
946
+	if state.swarmNode != nil {
947
+		for _, r := range state.swarmNode.Remotes() {
948 948
 			info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr})
949 949
 		}
950
-		info.NodeID = c.node.NodeID()
950
+		info.NodeID = state.swarmNode.NodeID()
951 951
 	}
952 952
 
953 953
 	return info
954 954
 }
955 955
 
956
-// isActiveManager should not be called without a read lock
957
-func (c *Cluster) isActiveManager() bool {
958
-	return c.node != nil && c.conn != nil
959
-}
960
-
961
-// swarmExists should not be called without a read lock
962
-func (c *Cluster) swarmExists() bool {
963
-	return c.node != nil || c.locked || c.err == ErrSwarmCertificatesExpired
956
+// currentNodeState should not be called without a read lock
957
+func (c *Cluster) currentNodeState() nodeState {
958
+	return c.nr.State()
964 959
 }
965 960
 
966 961
 // errNoManager returns error describing why manager commands can't be used.
967 962
 // Call with read lock.
968
-func (c *Cluster) errNoManager() error {
969
-	if c.node == nil {
970
-		if c.locked {
963
+func (c *Cluster) errNoManager(st nodeState) error {
964
+	if st.swarmNode == nil {
965
+		if errors.Cause(st.err) == ErrSwarmLocked {
971 966
 			return ErrSwarmLocked
972 967
 		}
973
-		if c.err == ErrSwarmCertificatesExpired {
968
+		if st.err == ErrSwarmCertificatesExpired {
974 969
 			return ErrSwarmCertificatesExpired
975 970
 		}
976 971
 		return fmt.Errorf("This node is not a swarm manager. Use \"docker swarm init\" or \"docker swarm join\" to connect this node to swarm and try again.")
977 972
 	}
978
-	if c.node.Manager() != nil {
973
+	if st.swarmNode.Manager() != nil {
979 974
 		return fmt.Errorf("This node is not a swarm manager. Manager is being prepared or has trouble connecting to the cluster.")
980 975
 	}
981 976
 	return fmt.Errorf("This node is not a swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager.")
... ...
@@ -983,11 +770,12 @@ func (c *Cluster) errNoManager() error {
983 983
 
984 984
 // GetServices returns all services of a managed swarm cluster.
985 985
 func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Service, error) {
986
-	c.RLock()
987
-	defer c.RUnlock()
986
+	c.mu.RLock()
987
+	defer c.mu.RUnlock()
988 988
 
989
-	if !c.isActiveManager() {
990
-		return nil, c.errNoManager()
989
+	state := c.currentNodeState()
990
+	if !state.IsActiveManager() {
991
+		return nil, c.errNoManager(state)
991 992
 	}
992 993
 
993 994
 	filters, err := newListServicesFilters(options.Filters)
... ...
@@ -997,7 +785,7 @@ func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Serv
997 997
 	ctx, cancel := c.getRequestContext()
998 998
 	defer cancel()
999 999
 
1000
-	r, err := c.client.ListServices(
1000
+	r, err := state.controlClient.ListServices(
1001 1001
 		ctx,
1002 1002
 		&swarmapi.ListServicesRequest{Filters: filters})
1003 1003
 	if err != nil {
... ...
@@ -1059,17 +847,18 @@ func (c *Cluster) imageWithDigestString(ctx context.Context, image string, authC
1059 1059
 
1060 1060
 // CreateService creates a new service in a managed swarm cluster.
1061 1061
 func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string) (*apitypes.ServiceCreateResponse, error) {
1062
-	c.RLock()
1063
-	defer c.RUnlock()
1062
+	c.mu.RLock()
1063
+	defer c.mu.RUnlock()
1064 1064
 
1065
-	if !c.isActiveManager() {
1066
-		return nil, c.errNoManager()
1065
+	state := c.currentNodeState()
1066
+	if !state.IsActiveManager() {
1067
+		return nil, c.errNoManager(state)
1067 1068
 	}
1068 1069
 
1069 1070
 	ctx, cancel := c.getRequestContext()
1070 1071
 	defer cancel()
1071 1072
 
1072
-	err := c.populateNetworkID(ctx, c.client, &s)
1073
+	err := c.populateNetworkID(ctx, state.controlClient, &s)
1073 1074
 	if err != nil {
1074 1075
 		return nil, err
1075 1076
 	}
... ...
@@ -1110,7 +899,7 @@ func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string) (*apity
1110 1110
 		}
1111 1111
 	}
1112 1112
 
1113
-	r, err := c.client.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
1113
+	r, err := state.controlClient.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
1114 1114
 	if err != nil {
1115 1115
 		return nil, err
1116 1116
 	}
... ...
@@ -1121,17 +910,18 @@ func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string) (*apity
1121 1121
 
1122 1122
 // GetService returns a service based on an ID or name.
1123 1123
 func (c *Cluster) GetService(input string) (types.Service, error) {
1124
-	c.RLock()
1125
-	defer c.RUnlock()
1124
+	c.mu.RLock()
1125
+	defer c.mu.RUnlock()
1126 1126
 
1127
-	if !c.isActiveManager() {
1128
-		return types.Service{}, c.errNoManager()
1127
+	state := c.currentNodeState()
1128
+	if !state.IsActiveManager() {
1129
+		return types.Service{}, c.errNoManager(state)
1129 1130
 	}
1130 1131
 
1131 1132
 	ctx, cancel := c.getRequestContext()
1132 1133
 	defer cancel()
1133 1134
 
1134
-	service, err := getService(ctx, c.client, input)
1135
+	service, err := getService(ctx, state.controlClient, input)
1135 1136
 	if err != nil {
1136 1137
 		return types.Service{}, err
1137 1138
 	}
... ...
@@ -1140,17 +930,18 @@ func (c *Cluster) GetService(input string) (types.Service, error) {
1140 1140
 
1141 1141
 // UpdateService updates existing service to match new properties.
1142 1142
 func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec types.ServiceSpec, encodedAuth string, registryAuthFrom string) (*apitypes.ServiceUpdateResponse, error) {
1143
-	c.RLock()
1144
-	defer c.RUnlock()
1143
+	c.mu.RLock()
1144
+	defer c.mu.RUnlock()
1145 1145
 
1146
-	if !c.isActiveManager() {
1147
-		return nil, c.errNoManager()
1146
+	state := c.currentNodeState()
1147
+	if !state.IsActiveManager() {
1148
+		return nil, c.errNoManager(state)
1148 1149
 	}
1149 1150
 
1150 1151
 	ctx, cancel := c.getRequestContext()
1151 1152
 	defer cancel()
1152 1153
 
1153
-	err := c.populateNetworkID(ctx, c.client, &spec)
1154
+	err := c.populateNetworkID(ctx, state.controlClient, &spec)
1154 1155
 	if err != nil {
1155 1156
 		return nil, err
1156 1157
 	}
... ...
@@ -1160,7 +951,7 @@ func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec typ
1160 1160
 		return nil, err
1161 1161
 	}
1162 1162
 
1163
-	currentService, err := getService(ctx, c.client, serviceIDOrName)
1163
+	currentService, err := getService(ctx, state.controlClient, serviceIDOrName)
1164 1164
 	if err != nil {
1165 1165
 		return nil, err
1166 1166
 	}
... ...
@@ -1219,7 +1010,7 @@ func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec typ
1219 1219
 		}
1220 1220
 	}
1221 1221
 
1222
-	_, err = c.client.UpdateService(
1222
+	_, err = state.controlClient.UpdateService(
1223 1223
 		ctx,
1224 1224
 		&swarmapi.UpdateServiceRequest{
1225 1225
 			ServiceID: currentService.ID,
... ...
@@ -1235,22 +1026,23 @@ func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec typ
1235 1235
 
1236 1236
 // RemoveService removes a service from a managed swarm cluster.
1237 1237
 func (c *Cluster) RemoveService(input string) error {
1238
-	c.RLock()
1239
-	defer c.RUnlock()
1238
+	c.mu.RLock()
1239
+	defer c.mu.RUnlock()
1240 1240
 
1241
-	if !c.isActiveManager() {
1242
-		return c.errNoManager()
1241
+	state := c.currentNodeState()
1242
+	if !state.IsActiveManager() {
1243
+		return c.errNoManager(state)
1243 1244
 	}
1244 1245
 
1245 1246
 	ctx, cancel := c.getRequestContext()
1246 1247
 	defer cancel()
1247 1248
 
1248
-	service, err := getService(ctx, c.client, input)
1249
+	service, err := getService(ctx, state.controlClient, input)
1249 1250
 	if err != nil {
1250 1251
 		return err
1251 1252
 	}
1252 1253
 
1253
-	if _, err := c.client.RemoveService(ctx, &swarmapi.RemoveServiceRequest{ServiceID: service.ID}); err != nil {
1254
+	if _, err := state.controlClient.RemoveService(ctx, &swarmapi.RemoveServiceRequest{ServiceID: service.ID}); err != nil {
1254 1255
 		return err
1255 1256
 	}
1256 1257
 	return nil
... ...
@@ -1258,19 +1050,20 @@ func (c *Cluster) RemoveService(input string) error {
1258 1258
 
1259 1259
 // ServiceLogs collects service logs and writes them back to `config.OutStream`
1260 1260
 func (c *Cluster) ServiceLogs(ctx context.Context, input string, config *backend.ContainerLogsConfig, started chan struct{}) error {
1261
-	c.RLock()
1262
-	if !c.isActiveManager() {
1263
-		c.RUnlock()
1264
-		return c.errNoManager()
1261
+	c.mu.RLock()
1262
+	state := c.currentNodeState()
1263
+	if !state.IsActiveManager() {
1264
+		c.mu.RUnlock()
1265
+		return c.errNoManager(state)
1265 1266
 	}
1266 1267
 
1267
-	service, err := getService(ctx, c.client, input)
1268
+	service, err := getService(ctx, state.controlClient, input)
1268 1269
 	if err != nil {
1269
-		c.RUnlock()
1270
+		c.mu.RUnlock()
1270 1271
 		return err
1271 1272
 	}
1272 1273
 
1273
-	stream, err := c.logs.SubscribeLogs(ctx, &swarmapi.SubscribeLogsRequest{
1274
+	stream, err := state.logsClient.SubscribeLogs(ctx, &swarmapi.SubscribeLogsRequest{
1274 1275
 		Selector: &swarmapi.LogSelector{
1275 1276
 			ServiceIDs: []string{service.ID},
1276 1277
 		},
... ...
@@ -1279,7 +1072,7 @@ func (c *Cluster) ServiceLogs(ctx context.Context, input string, config *backend
1279 1279
 		},
1280 1280
 	})
1281 1281
 	if err != nil {
1282
-		c.RUnlock()
1282
+		c.mu.RUnlock()
1283 1283
 		return err
1284 1284
 	}
1285 1285
 
... ...
@@ -1292,7 +1085,7 @@ func (c *Cluster) ServiceLogs(ctx context.Context, input string, config *backend
1292 1292
 	errStream := stdcopy.NewStdWriter(wf, stdcopy.Stderr)
1293 1293
 
1294 1294
 	// Release the lock before starting the stream.
1295
-	c.RUnlock()
1295
+	c.mu.RUnlock()
1296 1296
 	for {
1297 1297
 		// Check the context before doing anything.
1298 1298
 		select {
... ...
@@ -1340,11 +1133,12 @@ func (c *Cluster) ServiceLogs(ctx context.Context, input string, config *backend
1340 1340
 
1341 1341
 // GetNodes returns a list of all nodes known to a cluster.
1342 1342
 func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, error) {
1343
-	c.RLock()
1344
-	defer c.RUnlock()
1343
+	c.mu.RLock()
1344
+	defer c.mu.RUnlock()
1345 1345
 
1346
-	if !c.isActiveManager() {
1347
-		return nil, c.errNoManager()
1346
+	state := c.currentNodeState()
1347
+	if !state.IsActiveManager() {
1348
+		return nil, c.errNoManager(state)
1348 1349
 	}
1349 1350
 
1350 1351
 	filters, err := newListNodesFilters(options.Filters)
... ...
@@ -1355,7 +1149,7 @@ func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, erro
1355 1355
 	ctx, cancel := c.getRequestContext()
1356 1356
 	defer cancel()
1357 1357
 
1358
-	r, err := c.client.ListNodes(
1358
+	r, err := state.controlClient.ListNodes(
1359 1359
 		ctx,
1360 1360
 		&swarmapi.ListNodesRequest{Filters: filters})
1361 1361
 	if err != nil {
... ...
@@ -1372,17 +1166,18 @@ func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, erro
1372 1372
 
1373 1373
 // GetNode returns a node based on an ID or name.
1374 1374
 func (c *Cluster) GetNode(input string) (types.Node, error) {
1375
-	c.RLock()
1376
-	defer c.RUnlock()
1375
+	c.mu.RLock()
1376
+	defer c.mu.RUnlock()
1377 1377
 
1378
-	if !c.isActiveManager() {
1379
-		return types.Node{}, c.errNoManager()
1378
+	state := c.currentNodeState()
1379
+	if !state.IsActiveManager() {
1380
+		return types.Node{}, c.errNoManager(state)
1380 1381
 	}
1381 1382
 
1382 1383
 	ctx, cancel := c.getRequestContext()
1383 1384
 	defer cancel()
1384 1385
 
1385
-	node, err := getNode(ctx, c.client, input)
1386
+	node, err := getNode(ctx, state.controlClient, input)
1386 1387
 	if err != nil {
1387 1388
 		return types.Node{}, err
1388 1389
 	}
... ...
@@ -1391,11 +1186,12 @@ func (c *Cluster) GetNode(input string) (types.Node, error) {
1391 1391
 
1392 1392
 // UpdateNode updates existing nodes properties.
1393 1393
 func (c *Cluster) UpdateNode(input string, version uint64, spec types.NodeSpec) error {
1394
-	c.RLock()
1395
-	defer c.RUnlock()
1394
+	c.mu.RLock()
1395
+	defer c.mu.RUnlock()
1396 1396
 
1397
-	if !c.isActiveManager() {
1398
-		return c.errNoManager()
1397
+	state := c.currentNodeState()
1398
+	if !state.IsActiveManager() {
1399
+		return c.errNoManager(state)
1399 1400
 	}
1400 1401
 
1401 1402
 	nodeSpec, err := convert.NodeSpecToGRPC(spec)
... ...
@@ -1406,12 +1202,12 @@ func (c *Cluster) UpdateNode(input string, version uint64, spec types.NodeSpec)
1406 1406
 	ctx, cancel := c.getRequestContext()
1407 1407
 	defer cancel()
1408 1408
 
1409
-	currentNode, err := getNode(ctx, c.client, input)
1409
+	currentNode, err := getNode(ctx, state.controlClient, input)
1410 1410
 	if err != nil {
1411 1411
 		return err
1412 1412
 	}
1413 1413
 
1414
-	_, err = c.client.UpdateNode(
1414
+	_, err = state.controlClient.UpdateNode(
1415 1415
 		ctx,
1416 1416
 		&swarmapi.UpdateNodeRequest{
1417 1417
 			NodeID: currentNode.ID,
... ...
@@ -1426,22 +1222,23 @@ func (c *Cluster) UpdateNode(input string, version uint64, spec types.NodeSpec)
1426 1426
 
1427 1427
 // RemoveNode removes a node from a cluster
1428 1428
 func (c *Cluster) RemoveNode(input string, force bool) error {
1429
-	c.RLock()
1430
-	defer c.RUnlock()
1429
+	c.mu.RLock()
1430
+	defer c.mu.RUnlock()
1431 1431
 
1432
-	if !c.isActiveManager() {
1433
-		return c.errNoManager()
1432
+	state := c.currentNodeState()
1433
+	if !state.IsActiveManager() {
1434
+		return c.errNoManager(state)
1434 1435
 	}
1435 1436
 
1436 1437
 	ctx, cancel := c.getRequestContext()
1437 1438
 	defer cancel()
1438 1439
 
1439
-	node, err := getNode(ctx, c.client, input)
1440
+	node, err := getNode(ctx, state.controlClient, input)
1440 1441
 	if err != nil {
1441 1442
 		return err
1442 1443
 	}
1443 1444
 
1444
-	if _, err := c.client.RemoveNode(ctx, &swarmapi.RemoveNodeRequest{NodeID: node.ID, Force: force}); err != nil {
1445
+	if _, err := state.controlClient.RemoveNode(ctx, &swarmapi.RemoveNodeRequest{NodeID: node.ID, Force: force}); err != nil {
1445 1446
 		return err
1446 1447
 	}
1447 1448
 	return nil
... ...
@@ -1449,11 +1246,12 @@ func (c *Cluster) RemoveNode(input string, force bool) error {
1449 1449
 
1450 1450
 // GetTasks returns a list of tasks matching the filter options.
1451 1451
 func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, error) {
1452
-	c.RLock()
1453
-	defer c.RUnlock()
1452
+	c.mu.RLock()
1453
+	defer c.mu.RUnlock()
1454 1454
 
1455
-	if !c.isActiveManager() {
1456
-		return nil, c.errNoManager()
1455
+	state := c.currentNodeState()
1456
+	if !state.IsActiveManager() {
1457
+		return nil, c.errNoManager(state)
1457 1458
 	}
1458 1459
 
1459 1460
 	byName := func(filter filters.Args) error {
... ...
@@ -1490,7 +1288,7 @@ func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, erro
1490 1490
 	ctx, cancel := c.getRequestContext()
1491 1491
 	defer cancel()
1492 1492
 
1493
-	r, err := c.client.ListTasks(
1493
+	r, err := state.controlClient.ListTasks(
1494 1494
 		ctx,
1495 1495
 		&swarmapi.ListTasksRequest{Filters: filters})
1496 1496
 	if err != nil {
... ...
@@ -1509,17 +1307,18 @@ func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, erro
1509 1509
 
1510 1510
 // GetTask returns a task by an ID.
1511 1511
 func (c *Cluster) GetTask(input string) (types.Task, error) {
1512
-	c.RLock()
1513
-	defer c.RUnlock()
1512
+	c.mu.RLock()
1513
+	defer c.mu.RUnlock()
1514 1514
 
1515
-	if !c.isActiveManager() {
1516
-		return types.Task{}, c.errNoManager()
1515
+	state := c.currentNodeState()
1516
+	if !state.IsActiveManager() {
1517
+		return types.Task{}, c.errNoManager(state)
1517 1518
 	}
1518 1519
 
1519 1520
 	ctx, cancel := c.getRequestContext()
1520 1521
 	defer cancel()
1521 1522
 
1522
-	task, err := getTask(ctx, c.client, input)
1523
+	task, err := getTask(ctx, state.controlClient, input)
1523 1524
 	if err != nil {
1524 1525
 		return types.Task{}, err
1525 1526
 	}
... ...
@@ -1528,17 +1327,18 @@ func (c *Cluster) GetTask(input string) (types.Task, error) {
1528 1528
 
1529 1529
 // GetNetwork returns a cluster network by an ID.
1530 1530
 func (c *Cluster) GetNetwork(input string) (apitypes.NetworkResource, error) {
1531
-	c.RLock()
1532
-	defer c.RUnlock()
1531
+	c.mu.RLock()
1532
+	defer c.mu.RUnlock()
1533 1533
 
1534
-	if !c.isActiveManager() {
1535
-		return apitypes.NetworkResource{}, c.errNoManager()
1534
+	state := c.currentNodeState()
1535
+	if !state.IsActiveManager() {
1536
+		return apitypes.NetworkResource{}, c.errNoManager(state)
1536 1537
 	}
1537 1538
 
1538 1539
 	ctx, cancel := c.getRequestContext()
1539 1540
 	defer cancel()
1540 1541
 
1541
-	network, err := getNetwork(ctx, c.client, input)
1542
+	network, err := getNetwork(ctx, state.controlClient, input)
1542 1543
 	if err != nil {
1543 1544
 		return apitypes.NetworkResource{}, err
1544 1545
 	}
... ...
@@ -1547,17 +1347,18 @@ func (c *Cluster) GetNetwork(input string) (apitypes.NetworkResource, error) {
1547 1547
 
1548 1548
 // GetNetworks returns all current cluster managed networks.
1549 1549
 func (c *Cluster) GetNetworks() ([]apitypes.NetworkResource, error) {
1550
-	c.RLock()
1551
-	defer c.RUnlock()
1550
+	c.mu.RLock()
1551
+	defer c.mu.RUnlock()
1552 1552
 
1553
-	if !c.isActiveManager() {
1554
-		return nil, c.errNoManager()
1553
+	state := c.currentNodeState()
1554
+	if !state.IsActiveManager() {
1555
+		return nil, c.errNoManager(state)
1555 1556
 	}
1556 1557
 
1557 1558
 	ctx, cancel := c.getRequestContext()
1558 1559
 	defer cancel()
1559 1560
 
1560
-	r, err := c.client.ListNetworks(ctx, &swarmapi.ListNetworksRequest{})
1561
+	r, err := state.controlClient.ListNetworks(ctx, &swarmapi.ListNetworksRequest{})
1561 1562
 	if err != nil {
1562 1563
 		return nil, err
1563 1564
 	}
... ...
@@ -1579,9 +1380,9 @@ func attacherKey(target, containerID string) string {
1579 1579
 // waiter who is trying to start or attach the container to the
1580 1580
 // network.
1581 1581
 func (c *Cluster) UpdateAttachment(target, containerID string, config *network.NetworkingConfig) error {
1582
-	c.RLock()
1582
+	c.mu.RLock()
1583 1583
 	attacher, ok := c.attachers[attacherKey(target, containerID)]
1584
-	c.RUnlock()
1584
+	c.mu.RUnlock()
1585 1585
 	if !ok || attacher == nil {
1586 1586
 		return fmt.Errorf("could not find attacher for container %s to network %s", containerID, target)
1587 1587
 	}
... ...
@@ -1594,19 +1395,19 @@ func (c *Cluster) UpdateAttachment(target, containerID string, config *network.N
1594 1594
 // WaitForDetachment waits for the container to stop or detach from
1595 1595
 // the network.
1596 1596
 func (c *Cluster) WaitForDetachment(ctx context.Context, networkName, networkID, taskID, containerID string) error {
1597
-	c.RLock()
1597
+	c.mu.RLock()
1598 1598
 	attacher, ok := c.attachers[attacherKey(networkName, containerID)]
1599 1599
 	if !ok {
1600 1600
 		attacher, ok = c.attachers[attacherKey(networkID, containerID)]
1601 1601
 	}
1602
-	if c.node == nil || c.node.Agent() == nil {
1603
-		c.RUnlock()
1602
+	state := c.currentNodeState()
1603
+	if state.swarmNode == nil || state.swarmNode.Agent() == nil {
1604
+		c.mu.RUnlock()
1604 1605
 		return fmt.Errorf("invalid cluster node while waiting for detachment")
1605 1606
 	}
1606 1607
 
1607
-	agent := c.node.Agent()
1608
-	c.RUnlock()
1609
-
1608
+	c.mu.RUnlock()
1609
+	agent := state.swarmNode.Agent()
1610 1610
 	if ok && attacher != nil &&
1611 1611
 		attacher.detachWaitCh != nil &&
1612 1612
 		attacher.attachCompleteCh != nil {
... ...
@@ -1633,17 +1434,18 @@ func (c *Cluster) WaitForDetachment(ctx context.Context, networkName, networkID,
1633 1633
 // AttachNetwork generates an attachment request towards the manager.
1634 1634
 func (c *Cluster) AttachNetwork(target string, containerID string, addresses []string) (*network.NetworkingConfig, error) {
1635 1635
 	aKey := attacherKey(target, containerID)
1636
-	c.Lock()
1637
-	if c.node == nil || c.node.Agent() == nil {
1638
-		c.Unlock()
1636
+	c.mu.Lock()
1637
+	state := c.currentNodeState()
1638
+	if state.swarmNode == nil || state.swarmNode.Agent() == nil {
1639
+		c.mu.Unlock()
1639 1640
 		return nil, fmt.Errorf("invalid cluster node while attaching to network")
1640 1641
 	}
1641 1642
 	if attacher, ok := c.attachers[aKey]; ok {
1642
-		c.Unlock()
1643
+		c.mu.Unlock()
1643 1644
 		return attacher.config, nil
1644 1645
 	}
1645 1646
 
1646
-	agent := c.node.Agent()
1647
+	agent := state.swarmNode.Agent()
1647 1648
 	attachWaitCh := make(chan *network.NetworkingConfig)
1648 1649
 	detachWaitCh := make(chan struct{})
1649 1650
 	attachCompleteCh := make(chan struct{})
... ...
@@ -1652,23 +1454,23 @@ func (c *Cluster) AttachNetwork(target string, containerID string, addresses []s
1652 1652
 		attachCompleteCh: attachCompleteCh,
1653 1653
 		detachWaitCh:     detachWaitCh,
1654 1654
 	}
1655
-	c.Unlock()
1655
+	c.mu.Unlock()
1656 1656
 
1657 1657
 	ctx, cancel := c.getRequestContext()
1658 1658
 	defer cancel()
1659 1659
 
1660 1660
 	taskID, err := agent.ResourceAllocator().AttachNetwork(ctx, containerID, target, addresses)
1661 1661
 	if err != nil {
1662
-		c.Lock()
1662
+		c.mu.Lock()
1663 1663
 		delete(c.attachers, aKey)
1664
-		c.Unlock()
1664
+		c.mu.Unlock()
1665 1665
 		return nil, fmt.Errorf("Could not attach to network %s: %v", target, err)
1666 1666
 	}
1667 1667
 
1668
-	c.Lock()
1668
+	c.mu.Lock()
1669 1669
 	c.attachers[aKey].taskID = taskID
1670 1670
 	close(attachCompleteCh)
1671
-	c.Unlock()
1671
+	c.mu.Unlock()
1672 1672
 
1673 1673
 	logrus.Debugf("Successfully attached to network %s with tid %s", target, taskID)
1674 1674
 
... ...
@@ -1679,9 +1481,9 @@ func (c *Cluster) AttachNetwork(target string, containerID string, addresses []s
1679 1679
 		return nil, fmt.Errorf("attaching to network failed, make sure your network options are correct and check manager logs: %v", ctx.Err())
1680 1680
 	}
1681 1681
 
1682
-	c.Lock()
1682
+	c.mu.Lock()
1683 1683
 	c.attachers[aKey].config = config
1684
-	c.Unlock()
1684
+	c.mu.Unlock()
1685 1685
 	return config, nil
1686 1686
 }
1687 1687
 
... ...
@@ -1690,10 +1492,10 @@ func (c *Cluster) AttachNetwork(target string, containerID string, addresses []s
1690 1690
 func (c *Cluster) DetachNetwork(target string, containerID string) error {
1691 1691
 	aKey := attacherKey(target, containerID)
1692 1692
 
1693
-	c.Lock()
1693
+	c.mu.Lock()
1694 1694
 	attacher, ok := c.attachers[aKey]
1695 1695
 	delete(c.attachers, aKey)
1696
-	c.Unlock()
1696
+	c.mu.Unlock()
1697 1697
 
1698 1698
 	if !ok {
1699 1699
 		return fmt.Errorf("could not find network attachment for container %s to network %s", containerID, target)
... ...
@@ -1705,11 +1507,12 @@ func (c *Cluster) DetachNetwork(target string, containerID string) error {
1705 1705
 
1706 1706
 // CreateNetwork creates a new cluster managed network.
1707 1707
 func (c *Cluster) CreateNetwork(s apitypes.NetworkCreateRequest) (string, error) {
1708
-	c.RLock()
1709
-	defer c.RUnlock()
1708
+	c.mu.RLock()
1709
+	defer c.mu.RUnlock()
1710 1710
 
1711
-	if !c.isActiveManager() {
1712
-		return "", c.errNoManager()
1711
+	state := c.currentNodeState()
1712
+	if !state.IsActiveManager() {
1713
+		return "", c.errNoManager(state)
1713 1714
 	}
1714 1715
 
1715 1716
 	if runconfig.IsPreDefinedNetwork(s.Name) {
... ...
@@ -1721,7 +1524,7 @@ func (c *Cluster) CreateNetwork(s apitypes.NetworkCreateRequest) (string, error)
1721 1721
 	defer cancel()
1722 1722
 
1723 1723
 	networkSpec := convert.BasicNetworkCreateToGRPC(s)
1724
-	r, err := c.client.CreateNetwork(ctx, &swarmapi.CreateNetworkRequest{Spec: &networkSpec})
1724
+	r, err := state.controlClient.CreateNetwork(ctx, &swarmapi.CreateNetworkRequest{Spec: &networkSpec})
1725 1725
 	if err != nil {
1726 1726
 		return "", err
1727 1727
 	}
... ...
@@ -1731,22 +1534,23 @@ func (c *Cluster) CreateNetwork(s apitypes.NetworkCreateRequest) (string, error)
1731 1731
 
1732 1732
 // RemoveNetwork removes a cluster network.
1733 1733
 func (c *Cluster) RemoveNetwork(input string) error {
1734
-	c.RLock()
1735
-	defer c.RUnlock()
1734
+	c.mu.RLock()
1735
+	defer c.mu.RUnlock()
1736 1736
 
1737
-	if !c.isActiveManager() {
1738
-		return c.errNoManager()
1737
+	state := c.currentNodeState()
1738
+	if !state.IsActiveManager() {
1739
+		return c.errNoManager(state)
1739 1740
 	}
1740 1741
 
1741 1742
 	ctx, cancel := c.getRequestContext()
1742 1743
 	defer cancel()
1743 1744
 
1744
-	network, err := getNetwork(ctx, c.client, input)
1745
+	network, err := getNetwork(ctx, state.controlClient, input)
1745 1746
 	if err != nil {
1746 1747
 		return err
1747 1748
 	}
1748 1749
 
1749
-	if _, err := c.client.RemoveNetwork(ctx, &swarmapi.RemoveNetworkRequest{NetworkID: network.ID}); err != nil {
1750
+	if _, err := state.controlClient.RemoveNetwork(ctx, &swarmapi.RemoveNetworkRequest{NetworkID: network.ID}); err != nil {
1750 1751
 		return err
1751 1752
 	}
1752 1753
 	return nil
... ...
@@ -1776,15 +1580,19 @@ func (c *Cluster) populateNetworkID(ctx context.Context, client swarmapi.Control
1776 1776
 
1777 1777
 // Cleanup stops active swarm node. This is run before daemon shutdown.
1778 1778
 func (c *Cluster) Cleanup() {
1779
-	c.Lock()
1780
-	node := c.node
1779
+	c.controlMutex.Lock()
1780
+	defer c.controlMutex.Unlock()
1781
+
1782
+	c.mu.Lock()
1783
+	node := c.nr
1781 1784
 	if node == nil {
1782
-		c.Unlock()
1785
+		c.mu.Unlock()
1783 1786
 		return
1784 1787
 	}
1785
-	defer c.Unlock()
1786
-	if c.isActiveManager() {
1787
-		active, reachable, unreachable, err := c.managerStats()
1788
+	defer c.mu.Unlock()
1789
+	state := c.currentNodeState()
1790
+	if state.IsActiveManager() {
1791
+		active, reachable, unreachable, err := managerStats(state.controlClient, state.NodeID())
1788 1792
 		if err == nil {
1789 1793
 			singlenode := active && isLastManager(reachable, unreachable)
1790 1794
 			if active && !singlenode && removingManagerCausesLossOfQuorum(reachable, unreachable) {
... ...
@@ -1792,13 +1600,17 @@ func (c *Cluster) Cleanup() {
1792 1792
 			}
1793 1793
 		}
1794 1794
 	}
1795
-	c.stopNode()
1795
+	if err := node.Stop(); err != nil {
1796
+		logrus.Errorf("failed to shut down cluster node: %v", err)
1797
+		signal.DumpStacks("")
1798
+	}
1799
+	c.nr = nil
1796 1800
 }
1797 1801
 
1798
-func (c *Cluster) managerStats() (current bool, reachable int, unreachable int, err error) {
1802
+func managerStats(client swarmapi.ControlClient, currentNodeID string) (current bool, reachable int, unreachable int, err error) {
1799 1803
 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1800 1804
 	defer cancel()
1801
-	nodes, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{})
1805
+	nodes, err := client.ListNodes(ctx, &swarmapi.ListNodesRequest{})
1802 1806
 	if err != nil {
1803 1807
 		return false, 0, 0, err
1804 1808
 	}
... ...
@@ -1806,7 +1618,7 @@ func (c *Cluster) managerStats() (current bool, reachable int, unreachable int,
1806 1806
 		if n.ManagerStatus != nil {
1807 1807
 			if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_REACHABLE {
1808 1808
 				reachable++
1809
-				if n.ID == c.node.NodeID() {
1809
+				if n.ID == currentNodeID {
1810 1810
 					current = true
1811 1811
 				}
1812 1812
 			}
... ...
@@ -1857,7 +1669,7 @@ func validateAddr(addr string) (string, error) {
1857 1857
 	return strings.TrimPrefix(newaddr, "tcp://"), nil
1858 1858
 }
1859 1859
 
1860
-func initClusterSpec(node *node, spec types.Spec) error {
1860
+func initClusterSpec(node *swarmnode.Node, spec types.Spec) error {
1861 1861
 	ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
1862 1862
 	for conn := range node.ListenControlSocket(ctx) {
1863 1863
 		if ctx.Err() != nil {
1864 1864
new file mode 100644
... ...
@@ -0,0 +1,296 @@
0
+package cluster
1
+
2
+import (
3
+	"path/filepath"
4
+	"runtime"
5
+	"strings"
6
+	"sync"
7
+	"time"
8
+
9
+	"github.com/Sirupsen/logrus"
10
+	types "github.com/docker/docker/api/types/swarm"
11
+	"github.com/docker/docker/daemon/cluster/executor/container"
12
+	swarmapi "github.com/docker/swarmkit/api"
13
+	swarmnode "github.com/docker/swarmkit/node"
14
+	"github.com/pkg/errors"
15
+	"golang.org/x/net/context"
16
+	"google.golang.org/grpc"
17
+)
18
+
19
+// nodeRunner implements a manager for continuously running swarmkit node, restarting them with backoff delays if needed.
20
+type nodeRunner struct {
21
+	nodeState
22
+	mu             sync.RWMutex
23
+	done           chan struct{} // closed when swarmNode exits
24
+	ready          chan struct{} // closed when swarmNode becomes active
25
+	reconnectDelay time.Duration
26
+	config         nodeStartConfig
27
+
28
+	repeatedRun     bool
29
+	cancelReconnect func()
30
+	stopping        bool
31
+	cluster         *Cluster // only for accessing config helpers, never call any methods. TODO: change to config struct
32
+}
33
+
34
+// nodeStartConfig holds configuration needed to start a new node. Exported
35
+// fields of this structure are saved to disk in json. Unexported fields
36
+// contain data that shouldn't be persisted between daemon reloads.
37
+type nodeStartConfig struct {
38
+	// LocalAddr is this machine's local IP or hostname, if specified.
39
+	LocalAddr string
40
+	// RemoteAddr is the address that was given to "swarm join". It is used
41
+	// to find LocalAddr if necessary.
42
+	RemoteAddr string
43
+	// ListenAddr is the address we bind to, including a port.
44
+	ListenAddr string
45
+	// AdvertiseAddr is the address other nodes should connect to,
46
+	// including a port.
47
+	AdvertiseAddr   string
48
+	joinAddr        string
49
+	forceNewCluster bool
50
+	joinToken       string
51
+	lockKey         []byte
52
+	autolock        bool
53
+}
54
+
55
+func (n *nodeRunner) Ready() chan error {
56
+	c := make(chan error, 1)
57
+	n.mu.RLock()
58
+	ready, done := n.ready, n.done
59
+	n.mu.RUnlock()
60
+	go func() {
61
+		select {
62
+		case <-ready:
63
+		case <-done:
64
+		}
65
+		select {
66
+		case <-ready:
67
+		default:
68
+			n.mu.RLock()
69
+			c <- n.err
70
+			n.mu.RUnlock()
71
+		}
72
+		close(c)
73
+	}()
74
+	return c
75
+}
76
+
77
+func (n *nodeRunner) Start(conf nodeStartConfig) error {
78
+	n.mu.Lock()
79
+	defer n.mu.Unlock()
80
+
81
+	n.reconnectDelay = initialReconnectDelay
82
+
83
+	return n.start(conf)
84
+}
85
+
86
+func (n *nodeRunner) start(conf nodeStartConfig) error {
87
+	var control string
88
+	if runtime.GOOS == "windows" {
89
+		control = `\\.\pipe\` + controlSocket
90
+	} else {
91
+		control = filepath.Join(n.cluster.runtimeRoot, controlSocket)
92
+	}
93
+
94
+	node, err := swarmnode.New(&swarmnode.Config{
95
+		Hostname:           n.cluster.config.Name,
96
+		ForceNewCluster:    conf.forceNewCluster,
97
+		ListenControlAPI:   control,
98
+		ListenRemoteAPI:    conf.ListenAddr,
99
+		AdvertiseRemoteAPI: conf.AdvertiseAddr,
100
+		JoinAddr:           conf.joinAddr,
101
+		StateDir:           n.cluster.root,
102
+		JoinToken:          conf.joinToken,
103
+		Executor:           container.NewExecutor(n.cluster.config.Backend),
104
+		HeartbeatTick:      1,
105
+		ElectionTick:       3,
106
+		UnlockKey:          conf.lockKey,
107
+		AutoLockManagers:   conf.autolock,
108
+	})
109
+	if err != nil {
110
+		return err
111
+	}
112
+	if err := node.Start(context.Background()); err != nil {
113
+		return err
114
+	}
115
+
116
+	n.done = make(chan struct{})
117
+	n.ready = make(chan struct{})
118
+	n.swarmNode = node
119
+	n.config = conf
120
+	savePersistentState(n.cluster.root, conf)
121
+
122
+	ctx, cancel := context.WithCancel(context.Background())
123
+
124
+	go func() {
125
+		n.handleNodeExit(node)
126
+		cancel()
127
+	}()
128
+
129
+	go n.handleReadyEvent(ctx, node, n.ready)
130
+	go n.handleControlSocketChange(ctx, node)
131
+
132
+	return nil
133
+}
134
+
135
+func (n *nodeRunner) handleControlSocketChange(ctx context.Context, node *swarmnode.Node) {
136
+	for conn := range node.ListenControlSocket(ctx) {
137
+		n.mu.Lock()
138
+		if n.grpcConn != conn {
139
+			if conn == nil {
140
+				n.controlClient = nil
141
+				n.logsClient = nil
142
+			} else {
143
+				n.controlClient = swarmapi.NewControlClient(conn)
144
+				n.logsClient = swarmapi.NewLogsClient(conn)
145
+			}
146
+		}
147
+		n.grpcConn = conn
148
+		n.mu.Unlock()
149
+		n.cluster.configEvent <- struct{}{}
150
+	}
151
+}
152
+
153
+func (n *nodeRunner) handleReadyEvent(ctx context.Context, node *swarmnode.Node, ready chan struct{}) {
154
+	select {
155
+	case <-node.Ready():
156
+		n.mu.Lock()
157
+		n.err = nil
158
+		n.mu.Unlock()
159
+		close(ready)
160
+	case <-ctx.Done():
161
+	}
162
+	n.cluster.configEvent <- struct{}{}
163
+}
164
+
165
+func (n *nodeRunner) handleNodeExit(node *swarmnode.Node) {
166
+	err := detectLockedError(node.Err(context.Background()))
167
+	if err != nil {
168
+		logrus.Errorf("cluster exited with error: %v", err)
169
+	}
170
+	n.mu.Lock()
171
+	n.swarmNode = nil
172
+	n.err = err
173
+	close(n.done)
174
+	select {
175
+	case <-n.ready:
176
+		n.enableReconnectWatcher()
177
+	default:
178
+		if n.repeatedRun {
179
+			n.enableReconnectWatcher()
180
+		}
181
+	}
182
+	n.repeatedRun = true
183
+	n.mu.Unlock()
184
+}
185
+
186
+// Stop stops the current swarm node if it is running.
187
+func (n *nodeRunner) Stop() error {
188
+	n.mu.Lock()
189
+	if n.cancelReconnect != nil { // between restarts
190
+		n.cancelReconnect()
191
+		n.cancelReconnect = nil
192
+	}
193
+	if n.swarmNode == nil {
194
+		n.mu.Unlock()
195
+		return nil
196
+	}
197
+	n.stopping = true
198
+	ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
199
+	defer cancel()
200
+	if err := n.swarmNode.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
201
+		n.mu.Unlock()
202
+		return err
203
+	}
204
+	n.mu.Unlock()
205
+	<-n.done
206
+	return nil
207
+}
208
+
209
+func (n *nodeRunner) State() nodeState {
210
+	if n == nil {
211
+		return nodeState{status: types.LocalNodeStateInactive}
212
+	}
213
+	n.mu.RLock()
214
+	defer n.mu.RUnlock()
215
+
216
+	ns := n.nodeState
217
+
218
+	if ns.err != nil || n.cancelReconnect != nil {
219
+		if errors.Cause(ns.err) == ErrSwarmLocked {
220
+			ns.status = types.LocalNodeStateLocked
221
+		} else {
222
+			ns.status = types.LocalNodeStateError
223
+		}
224
+	} else {
225
+		select {
226
+		case <-n.ready:
227
+			ns.status = types.LocalNodeStateActive
228
+		default:
229
+			ns.status = types.LocalNodeStatePending
230
+		}
231
+	}
232
+
233
+	return ns
234
+}
235
+
236
+func (n *nodeRunner) enableReconnectWatcher() {
237
+	if n.stopping {
238
+		return
239
+	}
240
+	n.reconnectDelay *= 2
241
+	if n.reconnectDelay > maxReconnectDelay {
242
+		n.reconnectDelay = maxReconnectDelay
243
+	}
244
+	logrus.Warnf("Restarting swarm in %.2f seconds", n.reconnectDelay.Seconds())
245
+	delayCtx, cancel := context.WithTimeout(context.Background(), n.reconnectDelay)
246
+	n.cancelReconnect = cancel
247
+
248
+	config := n.config
249
+	go func() {
250
+		<-delayCtx.Done()
251
+		if delayCtx.Err() != context.DeadlineExceeded {
252
+			return
253
+		}
254
+		n.mu.Lock()
255
+		defer n.mu.Unlock()
256
+		if n.stopping {
257
+			return
258
+		}
259
+		config.RemoteAddr = n.cluster.getRemoteAddress()
260
+		config.joinAddr = config.RemoteAddr
261
+		if err := n.start(config); err != nil {
262
+			n.err = err
263
+		}
264
+	}()
265
+}
266
+
267
+// nodeState represents information about the current state of the cluster and
268
+// provides access to the grpc clients.
269
+type nodeState struct {
270
+	swarmNode       *swarmnode.Node
271
+	grpcConn        *grpc.ClientConn
272
+	controlClient   swarmapi.ControlClient
273
+	logsClient      swarmapi.LogsClient
274
+	status          types.LocalNodeState
275
+	actualLocalAddr string
276
+	err             error
277
+}
278
+
279
+// IsActiveManager returns true if node is a manager ready to accept control requests. It is safe to access the client properties if this returns true.
280
+func (ns nodeState) IsActiveManager() bool {
281
+	return ns.controlClient != nil
282
+}
283
+
284
+// IsManager returns true if node is a manager.
285
+func (ns nodeState) IsManager() bool {
286
+	return ns.swarmNode != nil && ns.swarmNode.Manager() != nil
287
+}
288
+
289
+// NodeID returns node's ID or empty string if node is inactive.
290
+func (ns nodeState) NodeID() string {
291
+	if ns.swarmNode != nil {
292
+		return ns.swarmNode.NodeID()
293
+	}
294
+	return ""
295
+}
... ...
@@ -9,17 +9,18 @@ import (
9 9
 
10 10
 // GetSecret returns a secret from a managed swarm cluster
11 11
 func (c *Cluster) GetSecret(id string) (types.Secret, error) {
12
-	c.RLock()
13
-	defer c.RUnlock()
12
+	c.mu.RLock()
13
+	defer c.mu.RUnlock()
14 14
 
15
-	if !c.isActiveManager() {
16
-		return types.Secret{}, c.errNoManager()
15
+	state := c.currentNodeState()
16
+	if !state.IsActiveManager() {
17
+		return types.Secret{}, c.errNoManager(state)
17 18
 	}
18 19
 
19 20
 	ctx, cancel := c.getRequestContext()
20 21
 	defer cancel()
21 22
 
22
-	r, err := c.node.client.GetSecret(ctx, &swarmapi.GetSecretRequest{SecretID: id})
23
+	r, err := state.controlClient.GetSecret(ctx, &swarmapi.GetSecretRequest{SecretID: id})
23 24
 	if err != nil {
24 25
 		return types.Secret{}, err
25 26
 	}
... ...
@@ -29,11 +30,12 @@ func (c *Cluster) GetSecret(id string) (types.Secret, error) {
29 29
 
30 30
 // GetSecrets returns all secrets of a managed swarm cluster.
31 31
 func (c *Cluster) GetSecrets(options apitypes.SecretListOptions) ([]types.Secret, error) {
32
-	c.RLock()
33
-	defer c.RUnlock()
32
+	c.mu.RLock()
33
+	defer c.mu.RUnlock()
34 34
 
35
-	if !c.isActiveManager() {
36
-		return nil, c.errNoManager()
35
+	state := c.currentNodeState()
36
+	if !state.IsActiveManager() {
37
+		return nil, c.errNoManager(state)
37 38
 	}
38 39
 
39 40
 	filters, err := newListSecretsFilters(options.Filters)
... ...
@@ -43,7 +45,7 @@ func (c *Cluster) GetSecrets(options apitypes.SecretListOptions) ([]types.Secret
43 43
 	ctx, cancel := c.getRequestContext()
44 44
 	defer cancel()
45 45
 
46
-	r, err := c.node.client.ListSecrets(ctx,
46
+	r, err := state.controlClient.ListSecrets(ctx,
47 47
 		&swarmapi.ListSecretsRequest{Filters: filters})
48 48
 	if err != nil {
49 49
 		return nil, err
... ...
@@ -60,11 +62,12 @@ func (c *Cluster) GetSecrets(options apitypes.SecretListOptions) ([]types.Secret
60 60
 
61 61
 // CreateSecret creates a new secret in a managed swarm cluster.
62 62
 func (c *Cluster) CreateSecret(s types.SecretSpec) (string, error) {
63
-	c.RLock()
64
-	defer c.RUnlock()
63
+	c.mu.RLock()
64
+	defer c.mu.RUnlock()
65 65
 
66
-	if !c.isActiveManager() {
67
-		return "", c.errNoManager()
66
+	state := c.currentNodeState()
67
+	if !state.IsActiveManager() {
68
+		return "", c.errNoManager(state)
68 69
 	}
69 70
 
70 71
 	ctx, cancel := c.getRequestContext()
... ...
@@ -72,7 +75,7 @@ func (c *Cluster) CreateSecret(s types.SecretSpec) (string, error) {
72 72
 
73 73
 	secretSpec := convert.SecretSpecToGRPC(s)
74 74
 
75
-	r, err := c.node.client.CreateSecret(ctx,
75
+	r, err := state.controlClient.CreateSecret(ctx,
76 76
 		&swarmapi.CreateSecretRequest{Spec: &secretSpec})
77 77
 	if err != nil {
78 78
 		return "", err
... ...
@@ -83,11 +86,12 @@ func (c *Cluster) CreateSecret(s types.SecretSpec) (string, error) {
83 83
 
84 84
 // RemoveSecret removes a secret from a managed swarm cluster.
85 85
 func (c *Cluster) RemoveSecret(id string) error {
86
-	c.RLock()
87
-	defer c.RUnlock()
86
+	c.mu.RLock()
87
+	defer c.mu.RUnlock()
88 88
 
89
-	if !c.isActiveManager() {
90
-		return c.errNoManager()
89
+	state := c.currentNodeState()
90
+	if !state.IsActiveManager() {
91
+		return c.errNoManager(state)
91 92
 	}
92 93
 
93 94
 	ctx, cancel := c.getRequestContext()
... ...
@@ -97,7 +101,7 @@ func (c *Cluster) RemoveSecret(id string) error {
97 97
 		SecretID: id,
98 98
 	}
99 99
 
100
-	if _, err := c.node.client.RemoveSecret(ctx, req); err != nil {
100
+	if _, err := state.controlClient.RemoveSecret(ctx, req); err != nil {
101 101
 		return err
102 102
 	}
103 103
 	return nil
... ...
@@ -106,11 +110,12 @@ func (c *Cluster) RemoveSecret(id string) error {
106 106
 // UpdateSecret updates a secret in a managed swarm cluster.
107 107
 // Note: this is not exposed to the CLI but is available from the API only
108 108
 func (c *Cluster) UpdateSecret(id string, version uint64, spec types.SecretSpec) error {
109
-	c.RLock()
110
-	defer c.RUnlock()
109
+	c.mu.RLock()
110
+	defer c.mu.RUnlock()
111 111
 
112
-	if !c.isActiveManager() {
113
-		return c.errNoManager()
112
+	state := c.currentNodeState()
113
+	if !state.IsActiveManager() {
114
+		return c.errNoManager(state)
114 115
 	}
115 116
 
116 117
 	ctx, cancel := c.getRequestContext()
... ...
@@ -118,7 +123,7 @@ func (c *Cluster) UpdateSecret(id string, version uint64, spec types.SecretSpec)
118 118
 
119 119
 	secretSpec := convert.SecretSpecToGRPC(spec)
120 120
 
121
-	if _, err := c.client.UpdateSecret(ctx,
121
+	if _, err := state.controlClient.UpdateSecret(ctx,
122 122
 		&swarmapi.UpdateSecretRequest{
123 123
 			SecretID: id,
124 124
 			SecretVersion: &swarmapi.Version{
125 125
new file mode 100644
... ...
@@ -0,0 +1,56 @@
0
+package cluster
1
+
2
+import (
3
+	"encoding/json"
4
+	"io/ioutil"
5
+	"os"
6
+	"path/filepath"
7
+
8
+	"github.com/docker/docker/pkg/ioutils"
9
+)
10
+
11
+func loadPersistentState(root string) (*nodeStartConfig, error) {
12
+	dt, err := ioutil.ReadFile(filepath.Join(root, stateFile))
13
+	if err != nil {
14
+		return nil, err
15
+	}
16
+	// missing certificate means no actual state to restore from
17
+	if _, err := os.Stat(filepath.Join(root, "certificates/swarm-node.crt")); err != nil {
18
+		if os.IsNotExist(err) {
19
+			clearPersistentState(root)
20
+		}
21
+		return nil, err
22
+	}
23
+	var st nodeStartConfig
24
+	if err := json.Unmarshal(dt, &st); err != nil {
25
+		return nil, err
26
+	}
27
+	return &st, nil
28
+}
29
+
30
+func savePersistentState(root string, config nodeStartConfig) error {
31
+	dt, err := json.Marshal(config)
32
+	if err != nil {
33
+		return err
34
+	}
35
+	return ioutils.AtomicWriteFile(filepath.Join(root, stateFile), dt, 0600)
36
+}
37
+
38
+func clearPersistentState(root string) error {
39
+	// todo: backup this data instead of removing?
40
+	if err := os.RemoveAll(root); err != nil {
41
+		return err
42
+	}
43
+	if err := os.MkdirAll(root, 0700); err != nil {
44
+		return err
45
+	}
46
+	return nil
47
+}
48
+
49
+func removingManagerCausesLossOfQuorum(reachable, unreachable int) bool {
50
+	return reachable-2 <= unreachable
51
+}
52
+
53
+func isLastManager(reachable, unreachable int) bool {
54
+	return reachable == 1 && unreachable == 0
55
+}
... ...
@@ -75,6 +75,8 @@ func (s *DockerSwarmSuite) TestAPISwarmJoinToken(c *check.C) {
75 75
 	d1 := s.AddDaemon(c, false, false)
76 76
 	c.Assert(d1.Init(swarm.InitRequest{}), checker.IsNil)
77 77
 
78
+	// todo: error message differs depending if some components of token are valid
79
+
78 80
 	d2 := s.AddDaemon(c, false, false)
79 81
 	err := d2.Join(swarm.JoinRequest{RemoteAddrs: []string{d1.listenAddr}})
80 82
 	c.Assert(err, checker.NotNil)
... ...
@@ -85,7 +87,7 @@ func (s *DockerSwarmSuite) TestAPISwarmJoinToken(c *check.C) {
85 85
 
86 86
 	err = d2.Join(swarm.JoinRequest{JoinToken: "foobaz", RemoteAddrs: []string{d1.listenAddr}})
87 87
 	c.Assert(err, checker.NotNil)
88
-	c.Assert(err.Error(), checker.Contains, "join token is necessary")
88
+	c.Assert(err.Error(), checker.Contains, "invalid join token")
89 89
 	info, err = d2.info()
90 90
 	c.Assert(err, checker.IsNil)
91 91
 	c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateInactive)