Browse code

create a new file swarm.go and move swarm part code from cluster.go into swarm.go

Signed-off-by: allencloud <allen.sun@daocloud.io>

allencloud authored on 2017/02/12 03:53:03
Showing 2 changed files
... ...
@@ -44,22 +44,15 @@ import (
44 44
 	"net"
45 45
 	"os"
46 46
 	"path/filepath"
47
-	"strings"
48 47
 	"sync"
49 48
 	"time"
50 49
 
51 50
 	"github.com/Sirupsen/logrus"
52
-	apierrors "github.com/docker/docker/api/errors"
53
-	apitypes "github.com/docker/docker/api/types"
54
-	"github.com/docker/docker/api/types/filters"
55 51
 	"github.com/docker/docker/api/types/network"
56 52
 	types "github.com/docker/docker/api/types/swarm"
57
-	"github.com/docker/docker/daemon/cluster/convert"
58 53
 	executorpkg "github.com/docker/docker/daemon/cluster/executor"
59
-	"github.com/docker/docker/opts"
60 54
 	"github.com/docker/docker/pkg/signal"
61 55
 	swarmapi "github.com/docker/swarmkit/api"
62
-	"github.com/docker/swarmkit/manager/encryption"
63 56
 	swarmnode "github.com/docker/swarmkit/node"
64 57
 	"github.com/pkg/errors"
65 58
 	"golang.org/x/net/context"
... ...
@@ -237,408 +230,10 @@ func (c *Cluster) newNodeRunner(conf nodeStartConfig) (*nodeRunner, error) {
237 237
 	return nr, nil
238 238
 }
239 239
 
240
-// Init initializes new cluster from user provided request.
241
-func (c *Cluster) Init(req types.InitRequest) (string, error) {
242
-	c.controlMutex.Lock()
243
-	defer c.controlMutex.Unlock()
244
-	c.mu.Lock()
245
-	if c.nr != nil {
246
-		if req.ForceNewCluster {
247
-			if err := c.nr.Stop(); err != nil {
248
-				c.mu.Unlock()
249
-				return "", err
250
-			}
251
-		} else {
252
-			c.mu.Unlock()
253
-			return "", errSwarmExists
254
-		}
255
-	}
256
-	c.mu.Unlock()
257
-
258
-	if err := validateAndSanitizeInitRequest(&req); err != nil {
259
-		return "", apierrors.NewBadRequestError(err)
260
-	}
261
-
262
-	listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
263
-	if err != nil {
264
-		return "", err
265
-	}
266
-
267
-	advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
268
-	if err != nil {
269
-		return "", err
270
-	}
271
-
272
-	localAddr := listenHost
273
-
274
-	// If the local address is undetermined, the advertise address
275
-	// will be used as local address, if it belongs to this system.
276
-	// If the advertise address is not local, then we try to find
277
-	// a system address to use as local address. If this fails,
278
-	// we give up and ask the user to pass the listen address.
279
-	if net.ParseIP(localAddr).IsUnspecified() {
280
-		advertiseIP := net.ParseIP(advertiseHost)
281
-
282
-		found := false
283
-		for _, systemIP := range listSystemIPs() {
284
-			if systemIP.Equal(advertiseIP) {
285
-				localAddr = advertiseIP.String()
286
-				found = true
287
-				break
288
-			}
289
-		}
290
-
291
-		if !found {
292
-			ip, err := c.resolveSystemAddr()
293
-			if err != nil {
294
-				logrus.Warnf("Could not find a local address: %v", err)
295
-				return "", errMustSpecifyListenAddr
296
-			}
297
-			localAddr = ip.String()
298
-		}
299
-	}
300
-
301
-	if !req.ForceNewCluster {
302
-		clearPersistentState(c.root)
303
-	}
304
-
305
-	nr, err := c.newNodeRunner(nodeStartConfig{
306
-		forceNewCluster: req.ForceNewCluster,
307
-		autolock:        req.AutoLockManagers,
308
-		LocalAddr:       localAddr,
309
-		ListenAddr:      net.JoinHostPort(listenHost, listenPort),
310
-		AdvertiseAddr:   net.JoinHostPort(advertiseHost, advertisePort),
311
-		availability:    req.Availability,
312
-	})
313
-	if err != nil {
314
-		return "", err
315
-	}
316
-	c.mu.Lock()
317
-	c.nr = nr
318
-	c.mu.Unlock()
319
-
320
-	if err := <-nr.Ready(); err != nil {
321
-		if !req.ForceNewCluster { // if failure on first attempt don't keep state
322
-			if err := clearPersistentState(c.root); err != nil {
323
-				return "", err
324
-			}
325
-		}
326
-		if err != nil {
327
-			c.mu.Lock()
328
-			c.nr = nil
329
-			c.mu.Unlock()
330
-		}
331
-		return "", err
332
-	}
333
-	state := nr.State()
334
-	if state.swarmNode == nil { // should never happen but protect from panic
335
-		return "", errors.New("invalid cluster state for spec initialization")
336
-	}
337
-	if err := initClusterSpec(state.swarmNode, req.Spec); err != nil {
338
-		return "", err
339
-	}
340
-	return state.NodeID(), nil
341
-}
342
-
343
-// Join makes current Cluster part of an existing swarm cluster.
344
-func (c *Cluster) Join(req types.JoinRequest) error {
345
-	c.controlMutex.Lock()
346
-	defer c.controlMutex.Unlock()
347
-	c.mu.Lock()
348
-	if c.nr != nil {
349
-		c.mu.Unlock()
350
-		return errSwarmExists
351
-	}
352
-	c.mu.Unlock()
353
-
354
-	if err := validateAndSanitizeJoinRequest(&req); err != nil {
355
-		return apierrors.NewBadRequestError(err)
356
-	}
357
-
358
-	listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
359
-	if err != nil {
360
-		return err
361
-	}
362
-
363
-	var advertiseAddr string
364
-	if req.AdvertiseAddr != "" {
365
-		advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
366
-		// For joining, we don't need to provide an advertise address,
367
-		// since the remote side can detect it.
368
-		if err == nil {
369
-			advertiseAddr = net.JoinHostPort(advertiseHost, advertisePort)
370
-		}
371
-	}
372
-
373
-	clearPersistentState(c.root)
374
-
375
-	nr, err := c.newNodeRunner(nodeStartConfig{
376
-		RemoteAddr:    req.RemoteAddrs[0],
377
-		ListenAddr:    net.JoinHostPort(listenHost, listenPort),
378
-		AdvertiseAddr: advertiseAddr,
379
-		joinAddr:      req.RemoteAddrs[0],
380
-		joinToken:     req.JoinToken,
381
-		availability:  req.Availability,
382
-	})
383
-	if err != nil {
384
-		return err
385
-	}
386
-
387
-	c.mu.Lock()
388
-	c.nr = nr
389
-	c.mu.Unlock()
390
-
391
-	select {
392
-	case <-time.After(swarmConnectTimeout):
393
-		return errSwarmJoinTimeoutReached
394
-	case err := <-nr.Ready():
395
-		if err != nil {
396
-			c.mu.Lock()
397
-			c.nr = nil
398
-			c.mu.Unlock()
399
-		}
400
-		return err
401
-	}
402
-}
403
-
404
-// GetUnlockKey returns the unlock key for the swarm.
405
-func (c *Cluster) GetUnlockKey() (string, error) {
406
-	c.mu.RLock()
407
-	defer c.mu.RUnlock()
408
-
409
-	state := c.currentNodeState()
410
-	if !state.IsActiveManager() {
411
-		return "", c.errNoManager(state)
412
-	}
413
-
414
-	ctx, cancel := c.getRequestContext()
415
-	defer cancel()
416
-
417
-	client := swarmapi.NewCAClient(state.grpcConn)
418
-
419
-	r, err := client.GetUnlockKey(ctx, &swarmapi.GetUnlockKeyRequest{})
420
-	if err != nil {
421
-		return "", err
422
-	}
423
-
424
-	if len(r.UnlockKey) == 0 {
425
-		// no key
426
-		return "", nil
427
-	}
428
-
429
-	return encryption.HumanReadableKey(r.UnlockKey), nil
430
-}
431
-
432
-// UnlockSwarm provides a key to decrypt data that is encrypted at rest.
433
-func (c *Cluster) UnlockSwarm(req types.UnlockRequest) error {
434
-	c.controlMutex.Lock()
435
-	defer c.controlMutex.Unlock()
436
-
437
-	c.mu.RLock()
438
-	state := c.currentNodeState()
439
-
440
-	if !state.IsActiveManager() {
441
-		// when manager is not active,
442
-		// unless it is locked, otherwise return error.
443
-		if err := c.errNoManager(state); err != errSwarmLocked {
444
-			c.mu.RUnlock()
445
-			return err
446
-		}
447
-	} else {
448
-		// when manager is active, return an error of "not locked"
449
-		c.mu.RUnlock()
450
-		return errors.New("swarm is not locked")
451
-	}
452
-
453
-	// only when swarm is locked, code running reaches here
454
-	nr := c.nr
455
-	c.mu.RUnlock()
456
-
457
-	key, err := encryption.ParseHumanReadableKey(req.UnlockKey)
458
-	if err != nil {
459
-		return err
460
-	}
461
-
462
-	config := nr.config
463
-	config.lockKey = key
464
-	if err := nr.Stop(); err != nil {
465
-		return err
466
-	}
467
-	nr, err = c.newNodeRunner(config)
468
-	if err != nil {
469
-		return err
470
-	}
471
-
472
-	c.mu.Lock()
473
-	c.nr = nr
474
-	c.mu.Unlock()
475
-
476
-	if err := <-nr.Ready(); err != nil {
477
-		if errors.Cause(err) == errSwarmLocked {
478
-			return errors.New("swarm could not be unlocked: invalid key provided")
479
-		}
480
-		return fmt.Errorf("swarm component could not be started: %v", err)
481
-	}
482
-	return nil
483
-}
484
-
485
-// Leave shuts down Cluster and removes current state.
486
-func (c *Cluster) Leave(force bool) error {
487
-	c.controlMutex.Lock()
488
-	defer c.controlMutex.Unlock()
489
-
490
-	c.mu.Lock()
491
-	nr := c.nr
492
-	if nr == nil {
493
-		c.mu.Unlock()
494
-		return errNoSwarm
495
-	}
496
-
497
-	state := c.currentNodeState()
498
-
499
-	if errors.Cause(state.err) == errSwarmLocked && !force {
500
-		// leave a locked swarm without --force is not allowed
501
-		c.mu.Unlock()
502
-		return errors.New("Swarm is encrypted and locked. Please unlock it first or use `--force` to ignore this message.")
503
-	}
504
-
505
-	if state.IsManager() && !force {
506
-		msg := "You are attempting to leave the swarm on a node that is participating as a manager. "
507
-		if state.IsActiveManager() {
508
-			active, reachable, unreachable, err := managerStats(state.controlClient, state.NodeID())
509
-			if err == nil {
510
-				if active && removingManagerCausesLossOfQuorum(reachable, unreachable) {
511
-					if isLastManager(reachable, unreachable) {
512
-						msg += "Removing the last manager erases all current state of the swarm. Use `--force` to ignore this message. "
513
-						c.mu.Unlock()
514
-						return errors.New(msg)
515
-					}
516
-					msg += fmt.Sprintf("Removing this node leaves %v managers out of %v. Without a Raft quorum your swarm will be inaccessible. ", reachable-1, reachable+unreachable)
517
-				}
518
-			}
519
-		} else {
520
-			msg += "Doing so may lose the consensus of your cluster. "
521
-		}
522
-
523
-		msg += "The only way to restore a swarm that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to suppress this message."
524
-		c.mu.Unlock()
525
-		return errors.New(msg)
526
-	}
527
-	// release readers in here
528
-	if err := nr.Stop(); err != nil {
529
-		logrus.Errorf("failed to shut down cluster node: %v", err)
530
-		signal.DumpStacks("")
531
-		c.mu.Unlock()
532
-		return err
533
-	}
534
-	c.nr = nil
535
-	c.mu.Unlock()
536
-	if nodeID := state.NodeID(); nodeID != "" {
537
-		nodeContainers, err := c.listContainerForNode(nodeID)
538
-		if err != nil {
539
-			return err
540
-		}
541
-		for _, id := range nodeContainers {
542
-			if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil {
543
-				logrus.Errorf("error removing %v: %v", id, err)
544
-			}
545
-		}
546
-	}
547
-
548
-	c.configEvent <- struct{}{}
549
-	// todo: cleanup optional?
550
-	if err := clearPersistentState(c.root); err != nil {
551
-		return err
552
-	}
553
-	c.config.Backend.DaemonLeavesCluster()
554
-	return nil
555
-}
556
-
557
-func (c *Cluster) listContainerForNode(nodeID string) ([]string, error) {
558
-	var ids []string
559
-	filters := filters.NewArgs()
560
-	filters.Add("label", fmt.Sprintf("com.docker.swarm.node.id=%s", nodeID))
561
-	containers, err := c.config.Backend.Containers(&apitypes.ContainerListOptions{
562
-		Filters: filters,
563
-	})
564
-	if err != nil {
565
-		return []string{}, err
566
-	}
567
-	for _, c := range containers {
568
-		ids = append(ids, c.ID)
569
-	}
570
-	return ids, nil
571
-}
572
-
573 240
 func (c *Cluster) getRequestContext() (context.Context, func()) { // TODO: not needed when requests don't block on qourum lost
574 241
 	return context.WithTimeout(context.Background(), swarmRequestTimeout)
575 242
 }
576 243
 
577
-// Inspect retrieves the configuration properties of a managed swarm cluster.
578
-func (c *Cluster) Inspect() (types.Swarm, error) {
579
-	c.mu.RLock()
580
-	defer c.mu.RUnlock()
581
-
582
-	state := c.currentNodeState()
583
-	if !state.IsActiveManager() {
584
-		return types.Swarm{}, c.errNoManager(state)
585
-	}
586
-
587
-	ctx, cancel := c.getRequestContext()
588
-	defer cancel()
589
-
590
-	swarm, err := getSwarm(ctx, state.controlClient)
591
-	if err != nil {
592
-		return types.Swarm{}, err
593
-	}
594
-
595
-	return convert.SwarmFromGRPC(*swarm), nil
596
-}
597
-
598
-// Update updates configuration of a managed swarm cluster.
599
-func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error {
600
-	c.mu.RLock()
601
-	defer c.mu.RUnlock()
602
-
603
-	state := c.currentNodeState()
604
-	if !state.IsActiveManager() {
605
-		return c.errNoManager(state)
606
-	}
607
-
608
-	ctx, cancel := c.getRequestContext()
609
-	defer cancel()
610
-
611
-	swarm, err := getSwarm(ctx, state.controlClient)
612
-	if err != nil {
613
-		return err
614
-	}
615
-
616
-	// In update, client should provide the complete spec of the swarm, including
617
-	// Name and Labels. If a field is specified with 0 or nil, then the default value
618
-	// will be used to swarmkit.
619
-	clusterSpec, err := convert.SwarmSpecToGRPC(spec)
620
-	if err != nil {
621
-		return apierrors.NewBadRequestError(err)
622
-	}
623
-
624
-	_, err = state.controlClient.UpdateCluster(
625
-		ctx,
626
-		&swarmapi.UpdateClusterRequest{
627
-			ClusterID: swarm.ID,
628
-			Spec:      &clusterSpec,
629
-			ClusterVersion: &swarmapi.Version{
630
-				Index: version,
631
-			},
632
-			Rotation: swarmapi.KeyRotation{
633
-				WorkerJoinToken:  flags.RotateWorkerToken,
634
-				ManagerJoinToken: flags.RotateManagerToken,
635
-				ManagerUnlockKey: flags.RotateManagerUnlockKey,
636
-			},
637
-		},
638
-	)
639
-	return err
640
-}
641
-
642 244
 // IsManager returns true if Cluster is participating as a manager.
643 245
 func (c *Cluster) IsManager() bool {
644 246
 	c.mu.RLock()
... ...
@@ -711,55 +306,6 @@ func (c *Cluster) ListenClusterEvents() <-chan struct{} {
711 711
 	return c.configEvent
712 712
 }
713 713
 
714
-// Info returns information about the current cluster state.
715
-func (c *Cluster) Info() types.Info {
716
-	info := types.Info{
717
-		NodeAddr: c.GetAdvertiseAddress(),
718
-	}
719
-	c.mu.RLock()
720
-	defer c.mu.RUnlock()
721
-
722
-	state := c.currentNodeState()
723
-	info.LocalNodeState = state.status
724
-	if state.err != nil {
725
-		info.Error = state.err.Error()
726
-	}
727
-
728
-	ctx, cancel := c.getRequestContext()
729
-	defer cancel()
730
-
731
-	if state.IsActiveManager() {
732
-		info.ControlAvailable = true
733
-		swarm, err := c.Inspect()
734
-		if err != nil {
735
-			info.Error = err.Error()
736
-		}
737
-
738
-		// Strip JoinTokens
739
-		info.Cluster = swarm.ClusterInfo
740
-
741
-		if r, err := state.controlClient.ListNodes(ctx, &swarmapi.ListNodesRequest{}); err != nil {
742
-			info.Error = err.Error()
743
-		} else {
744
-			info.Nodes = len(r.Nodes)
745
-			for _, n := range r.Nodes {
746
-				if n.ManagerStatus != nil {
747
-					info.Managers = info.Managers + 1
748
-				}
749
-			}
750
-		}
751
-	}
752
-
753
-	if state.swarmNode != nil {
754
-		for _, r := range state.swarmNode.Remotes() {
755
-			info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr})
756
-		}
757
-		info.NodeID = state.swarmNode.NodeID()
758
-	}
759
-
760
-	return info
761
-}
762
-
763 714
 // currentNodeState should not be called without a read lock
764 715
 func (c *Cluster) currentNodeState() nodeState {
765 716
 	return c.nr.State()
... ...
@@ -835,99 +381,6 @@ func managerStats(client swarmapi.ControlClient, currentNodeID string) (current
835 835
 	return
836 836
 }
837 837
 
838
-func validateAndSanitizeInitRequest(req *types.InitRequest) error {
839
-	var err error
840
-	req.ListenAddr, err = validateAddr(req.ListenAddr)
841
-	if err != nil {
842
-		return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
843
-	}
844
-
845
-	if req.Spec.Annotations.Name == "" {
846
-		req.Spec.Annotations.Name = "default"
847
-	} else if req.Spec.Annotations.Name != "default" {
848
-		return errors.New(`swarm spec must be named "default"`)
849
-	}
850
-
851
-	return nil
852
-}
853
-
854
-func validateAndSanitizeJoinRequest(req *types.JoinRequest) error {
855
-	var err error
856
-	req.ListenAddr, err = validateAddr(req.ListenAddr)
857
-	if err != nil {
858
-		return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
859
-	}
860
-	if len(req.RemoteAddrs) == 0 {
861
-		return errors.New("at least 1 RemoteAddr is required to join")
862
-	}
863
-	for i := range req.RemoteAddrs {
864
-		req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i])
865
-		if err != nil {
866
-			return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err)
867
-		}
868
-	}
869
-	return nil
870
-}
871
-
872
-func validateAddr(addr string) (string, error) {
873
-	if addr == "" {
874
-		return addr, errors.New("invalid empty address")
875
-	}
876
-	newaddr, err := opts.ParseTCPAddr(addr, defaultAddr)
877
-	if err != nil {
878
-		return addr, nil
879
-	}
880
-	return strings.TrimPrefix(newaddr, "tcp://"), nil
881
-}
882
-
883
-func initClusterSpec(node *swarmnode.Node, spec types.Spec) error {
884
-	ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
885
-	for conn := range node.ListenControlSocket(ctx) {
886
-		if ctx.Err() != nil {
887
-			return ctx.Err()
888
-		}
889
-		if conn != nil {
890
-			client := swarmapi.NewControlClient(conn)
891
-			var cluster *swarmapi.Cluster
892
-			for i := 0; ; i++ {
893
-				lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{})
894
-				if err != nil {
895
-					return fmt.Errorf("error on listing clusters: %v", err)
896
-				}
897
-				if len(lcr.Clusters) == 0 {
898
-					if i < 10 {
899
-						time.Sleep(200 * time.Millisecond)
900
-						continue
901
-					}
902
-					return errors.New("empty list of clusters was returned")
903
-				}
904
-				cluster = lcr.Clusters[0]
905
-				break
906
-			}
907
-			// In init, we take the initial default values from swarmkit, and merge
908
-			// any non nil or 0 value from spec to GRPC spec. This will leave the
909
-			// default value alone.
910
-			// Note that this is different from Update(), as in Update() we expect
911
-			// user to specify the complete spec of the cluster (as they already know
912
-			// the existing one and knows which field to update)
913
-			clusterSpec, err := convert.MergeSwarmSpecToGRPC(spec, cluster.Spec)
914
-			if err != nil {
915
-				return fmt.Errorf("error updating cluster settings: %v", err)
916
-			}
917
-			_, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{
918
-				ClusterID:      cluster.ID,
919
-				ClusterVersion: &cluster.Meta.Version,
920
-				Spec:           &clusterSpec,
921
-			})
922
-			if err != nil {
923
-				return fmt.Errorf("error updating cluster settings: %v", err)
924
-			}
925
-			return nil
926
-		}
927
-	}
928
-	return ctx.Err()
929
-}
930
-
931 838
 func detectLockedError(err error) error {
932 839
 	if err == swarmnode.ErrInvalidUnlockKey {
933 840
 		return errors.WithStack(errSwarmLocked)
934 841
new file mode 100644
... ...
@@ -0,0 +1,562 @@
0
+package cluster
1
+
2
+import (
3
+	"fmt"
4
+	"net"
5
+	"strings"
6
+	"time"
7
+
8
+	"github.com/Sirupsen/logrus"
9
+	apierrors "github.com/docker/docker/api/errors"
10
+	apitypes "github.com/docker/docker/api/types"
11
+	"github.com/docker/docker/api/types/filters"
12
+	types "github.com/docker/docker/api/types/swarm"
13
+	"github.com/docker/docker/daemon/cluster/convert"
14
+	"github.com/docker/docker/opts"
15
+	"github.com/docker/docker/pkg/signal"
16
+	swarmapi "github.com/docker/swarmkit/api"
17
+	"github.com/docker/swarmkit/manager/encryption"
18
+	swarmnode "github.com/docker/swarmkit/node"
19
+	"github.com/pkg/errors"
20
+	"golang.org/x/net/context"
21
+)
22
+
23
+// Init initializes new cluster from user provided request.
24
+func (c *Cluster) Init(req types.InitRequest) (string, error) {
25
+	c.controlMutex.Lock()
26
+	defer c.controlMutex.Unlock()
27
+	c.mu.Lock()
28
+	if c.nr != nil {
29
+		if req.ForceNewCluster {
30
+			if err := c.nr.Stop(); err != nil {
31
+				c.mu.Unlock()
32
+				return "", err
33
+			}
34
+		} else {
35
+			c.mu.Unlock()
36
+			return "", errSwarmExists
37
+		}
38
+	}
39
+	c.mu.Unlock()
40
+
41
+	if err := validateAndSanitizeInitRequest(&req); err != nil {
42
+		return "", apierrors.NewBadRequestError(err)
43
+	}
44
+
45
+	listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
46
+	if err != nil {
47
+		return "", err
48
+	}
49
+
50
+	advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
51
+	if err != nil {
52
+		return "", err
53
+	}
54
+
55
+	localAddr := listenHost
56
+
57
+	// If the local address is undetermined, the advertise address
58
+	// will be used as local address, if it belongs to this system.
59
+	// If the advertise address is not local, then we try to find
60
+	// a system address to use as local address. If this fails,
61
+	// we give up and ask the user to pass the listen address.
62
+	if net.ParseIP(localAddr).IsUnspecified() {
63
+		advertiseIP := net.ParseIP(advertiseHost)
64
+
65
+		found := false
66
+		for _, systemIP := range listSystemIPs() {
67
+			if systemIP.Equal(advertiseIP) {
68
+				localAddr = advertiseIP.String()
69
+				found = true
70
+				break
71
+			}
72
+		}
73
+
74
+		if !found {
75
+			ip, err := c.resolveSystemAddr()
76
+			if err != nil {
77
+				logrus.Warnf("Could not find a local address: %v", err)
78
+				return "", errMustSpecifyListenAddr
79
+			}
80
+			localAddr = ip.String()
81
+		}
82
+	}
83
+
84
+	if !req.ForceNewCluster {
85
+		clearPersistentState(c.root)
86
+	}
87
+
88
+	nr, err := c.newNodeRunner(nodeStartConfig{
89
+		forceNewCluster: req.ForceNewCluster,
90
+		autolock:        req.AutoLockManagers,
91
+		LocalAddr:       localAddr,
92
+		ListenAddr:      net.JoinHostPort(listenHost, listenPort),
93
+		AdvertiseAddr:   net.JoinHostPort(advertiseHost, advertisePort),
94
+		availability:    req.Availability,
95
+	})
96
+	if err != nil {
97
+		return "", err
98
+	}
99
+	c.mu.Lock()
100
+	c.nr = nr
101
+	c.mu.Unlock()
102
+
103
+	if err := <-nr.Ready(); err != nil {
104
+		if !req.ForceNewCluster { // if failure on first attempt don't keep state
105
+			if err := clearPersistentState(c.root); err != nil {
106
+				return "", err
107
+			}
108
+		}
109
+		if err != nil {
110
+			c.mu.Lock()
111
+			c.nr = nil
112
+			c.mu.Unlock()
113
+		}
114
+		return "", err
115
+	}
116
+	state := nr.State()
117
+	if state.swarmNode == nil { // should never happen but protect from panic
118
+		return "", errors.New("invalid cluster state for spec initialization")
119
+	}
120
+	if err := initClusterSpec(state.swarmNode, req.Spec); err != nil {
121
+		return "", err
122
+	}
123
+	return state.NodeID(), nil
124
+}
125
+
126
+// Join makes current Cluster part of an existing swarm cluster.
127
+func (c *Cluster) Join(req types.JoinRequest) error {
128
+	c.controlMutex.Lock()
129
+	defer c.controlMutex.Unlock()
130
+	c.mu.Lock()
131
+	if c.nr != nil {
132
+		c.mu.Unlock()
133
+		return errSwarmExists
134
+	}
135
+	c.mu.Unlock()
136
+
137
+	if err := validateAndSanitizeJoinRequest(&req); err != nil {
138
+		return apierrors.NewBadRequestError(err)
139
+	}
140
+
141
+	listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
142
+	if err != nil {
143
+		return err
144
+	}
145
+
146
+	var advertiseAddr string
147
+	if req.AdvertiseAddr != "" {
148
+		advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
149
+		// For joining, we don't need to provide an advertise address,
150
+		// since the remote side can detect it.
151
+		if err == nil {
152
+			advertiseAddr = net.JoinHostPort(advertiseHost, advertisePort)
153
+		}
154
+	}
155
+
156
+	clearPersistentState(c.root)
157
+
158
+	nr, err := c.newNodeRunner(nodeStartConfig{
159
+		RemoteAddr:    req.RemoteAddrs[0],
160
+		ListenAddr:    net.JoinHostPort(listenHost, listenPort),
161
+		AdvertiseAddr: advertiseAddr,
162
+		joinAddr:      req.RemoteAddrs[0],
163
+		joinToken:     req.JoinToken,
164
+		availability:  req.Availability,
165
+	})
166
+	if err != nil {
167
+		return err
168
+	}
169
+
170
+	c.mu.Lock()
171
+	c.nr = nr
172
+	c.mu.Unlock()
173
+
174
+	select {
175
+	case <-time.After(swarmConnectTimeout):
176
+		return errSwarmJoinTimeoutReached
177
+	case err := <-nr.Ready():
178
+		if err != nil {
179
+			c.mu.Lock()
180
+			c.nr = nil
181
+			c.mu.Unlock()
182
+		}
183
+		return err
184
+	}
185
+}
186
+
187
+// Inspect retrieves the configuration properties of a managed swarm cluster.
188
+func (c *Cluster) Inspect() (types.Swarm, error) {
189
+	c.mu.RLock()
190
+	defer c.mu.RUnlock()
191
+
192
+	state := c.currentNodeState()
193
+	if !state.IsActiveManager() {
194
+		return types.Swarm{}, c.errNoManager(state)
195
+	}
196
+
197
+	ctx, cancel := c.getRequestContext()
198
+	defer cancel()
199
+
200
+	swarm, err := getSwarm(ctx, state.controlClient)
201
+	if err != nil {
202
+		return types.Swarm{}, err
203
+	}
204
+
205
+	return convert.SwarmFromGRPC(*swarm), nil
206
+}
207
+
208
+// Update updates configuration of a managed swarm cluster.
209
+func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error {
210
+	c.mu.RLock()
211
+	defer c.mu.RUnlock()
212
+
213
+	state := c.currentNodeState()
214
+	if !state.IsActiveManager() {
215
+		return c.errNoManager(state)
216
+	}
217
+
218
+	ctx, cancel := c.getRequestContext()
219
+	defer cancel()
220
+
221
+	swarm, err := getSwarm(ctx, state.controlClient)
222
+	if err != nil {
223
+		return err
224
+	}
225
+
226
+	// In update, client should provide the complete spec of the swarm, including
227
+	// Name and Labels. If a field is specified with 0 or nil, then the default value
228
+	// will be used to swarmkit.
229
+	clusterSpec, err := convert.SwarmSpecToGRPC(spec)
230
+	if err != nil {
231
+		return apierrors.NewBadRequestError(err)
232
+	}
233
+
234
+	_, err = state.controlClient.UpdateCluster(
235
+		ctx,
236
+		&swarmapi.UpdateClusterRequest{
237
+			ClusterID: swarm.ID,
238
+			Spec:      &clusterSpec,
239
+			ClusterVersion: &swarmapi.Version{
240
+				Index: version,
241
+			},
242
+			Rotation: swarmapi.KeyRotation{
243
+				WorkerJoinToken:  flags.RotateWorkerToken,
244
+				ManagerJoinToken: flags.RotateManagerToken,
245
+				ManagerUnlockKey: flags.RotateManagerUnlockKey,
246
+			},
247
+		},
248
+	)
249
+	return err
250
+}
251
+
252
+// GetUnlockKey returns the unlock key for the swarm.
253
+func (c *Cluster) GetUnlockKey() (string, error) {
254
+	c.mu.RLock()
255
+	defer c.mu.RUnlock()
256
+
257
+	state := c.currentNodeState()
258
+	if !state.IsActiveManager() {
259
+		return "", c.errNoManager(state)
260
+	}
261
+
262
+	ctx, cancel := c.getRequestContext()
263
+	defer cancel()
264
+
265
+	client := swarmapi.NewCAClient(state.grpcConn)
266
+
267
+	r, err := client.GetUnlockKey(ctx, &swarmapi.GetUnlockKeyRequest{})
268
+	if err != nil {
269
+		return "", err
270
+	}
271
+
272
+	if len(r.UnlockKey) == 0 {
273
+		// no key
274
+		return "", nil
275
+	}
276
+
277
+	return encryption.HumanReadableKey(r.UnlockKey), nil
278
+}
279
+
280
+// UnlockSwarm provides a key to decrypt data that is encrypted at rest.
281
+func (c *Cluster) UnlockSwarm(req types.UnlockRequest) error {
282
+	c.controlMutex.Lock()
283
+	defer c.controlMutex.Unlock()
284
+
285
+	c.mu.RLock()
286
+	state := c.currentNodeState()
287
+
288
+	if !state.IsActiveManager() {
289
+		// when manager is not active,
290
+		// unless it is locked, otherwise return error.
291
+		if err := c.errNoManager(state); err != errSwarmLocked {
292
+			c.mu.RUnlock()
293
+			return err
294
+		}
295
+	} else {
296
+		// when manager is active, return an error of "not locked"
297
+		c.mu.RUnlock()
298
+		return errors.New("swarm is not locked")
299
+	}
300
+
301
+	// only when swarm is locked, code running reaches here
302
+	nr := c.nr
303
+	c.mu.RUnlock()
304
+
305
+	key, err := encryption.ParseHumanReadableKey(req.UnlockKey)
306
+	if err != nil {
307
+		return err
308
+	}
309
+
310
+	config := nr.config
311
+	config.lockKey = key
312
+	if err := nr.Stop(); err != nil {
313
+		return err
314
+	}
315
+	nr, err = c.newNodeRunner(config)
316
+	if err != nil {
317
+		return err
318
+	}
319
+
320
+	c.mu.Lock()
321
+	c.nr = nr
322
+	c.mu.Unlock()
323
+
324
+	if err := <-nr.Ready(); err != nil {
325
+		if errors.Cause(err) == errSwarmLocked {
326
+			return errors.New("swarm could not be unlocked: invalid key provided")
327
+		}
328
+		return fmt.Errorf("swarm component could not be started: %v", err)
329
+	}
330
+	return nil
331
+}
332
+
333
+// Leave shuts down Cluster and removes current state.
334
+func (c *Cluster) Leave(force bool) error {
335
+	c.controlMutex.Lock()
336
+	defer c.controlMutex.Unlock()
337
+
338
+	c.mu.Lock()
339
+	nr := c.nr
340
+	if nr == nil {
341
+		c.mu.Unlock()
342
+		return errNoSwarm
343
+	}
344
+
345
+	state := c.currentNodeState()
346
+
347
+	if errors.Cause(state.err) == errSwarmLocked && !force {
348
+		// leave a locked swarm without --force is not allowed
349
+		c.mu.Unlock()
350
+		return errors.New("Swarm is encrypted and locked. Please unlock it first or use `--force` to ignore this message.")
351
+	}
352
+
353
+	if state.IsManager() && !force {
354
+		msg := "You are attempting to leave the swarm on a node that is participating as a manager. "
355
+		if state.IsActiveManager() {
356
+			active, reachable, unreachable, err := managerStats(state.controlClient, state.NodeID())
357
+			if err == nil {
358
+				if active && removingManagerCausesLossOfQuorum(reachable, unreachable) {
359
+					if isLastManager(reachable, unreachable) {
360
+						msg += "Removing the last manager erases all current state of the swarm. Use `--force` to ignore this message. "
361
+						c.mu.Unlock()
362
+						return errors.New(msg)
363
+					}
364
+					msg += fmt.Sprintf("Removing this node leaves %v managers out of %v. Without a Raft quorum your swarm will be inaccessible. ", reachable-1, reachable+unreachable)
365
+				}
366
+			}
367
+		} else {
368
+			msg += "Doing so may lose the consensus of your cluster. "
369
+		}
370
+
371
+		msg += "The only way to restore a swarm that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to suppress this message."
372
+		c.mu.Unlock()
373
+		return errors.New(msg)
374
+	}
375
+	// release readers in here
376
+	if err := nr.Stop(); err != nil {
377
+		logrus.Errorf("failed to shut down cluster node: %v", err)
378
+		signal.DumpStacks("")
379
+		c.mu.Unlock()
380
+		return err
381
+	}
382
+	c.nr = nil
383
+	c.mu.Unlock()
384
+	if nodeID := state.NodeID(); nodeID != "" {
385
+		nodeContainers, err := c.listContainerForNode(nodeID)
386
+		if err != nil {
387
+			return err
388
+		}
389
+		for _, id := range nodeContainers {
390
+			if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil {
391
+				logrus.Errorf("error removing %v: %v", id, err)
392
+			}
393
+		}
394
+	}
395
+
396
+	c.configEvent <- struct{}{}
397
+	// todo: cleanup optional?
398
+	if err := clearPersistentState(c.root); err != nil {
399
+		return err
400
+	}
401
+	c.config.Backend.DaemonLeavesCluster()
402
+	return nil
403
+}
404
+
405
+// Info returns information about the current cluster state.
406
+func (c *Cluster) Info() types.Info {
407
+	info := types.Info{
408
+		NodeAddr: c.GetAdvertiseAddress(),
409
+	}
410
+	c.mu.RLock()
411
+	defer c.mu.RUnlock()
412
+
413
+	state := c.currentNodeState()
414
+	info.LocalNodeState = state.status
415
+	if state.err != nil {
416
+		info.Error = state.err.Error()
417
+	}
418
+
419
+	ctx, cancel := c.getRequestContext()
420
+	defer cancel()
421
+
422
+	if state.IsActiveManager() {
423
+		info.ControlAvailable = true
424
+		swarm, err := c.Inspect()
425
+		if err != nil {
426
+			info.Error = err.Error()
427
+		}
428
+
429
+		// Strip JoinTokens
430
+		info.Cluster = swarm.ClusterInfo
431
+
432
+		if r, err := state.controlClient.ListNodes(ctx, &swarmapi.ListNodesRequest{}); err != nil {
433
+			info.Error = err.Error()
434
+		} else {
435
+			info.Nodes = len(r.Nodes)
436
+			for _, n := range r.Nodes {
437
+				if n.ManagerStatus != nil {
438
+					info.Managers = info.Managers + 1
439
+				}
440
+			}
441
+		}
442
+	}
443
+
444
+	if state.swarmNode != nil {
445
+		for _, r := range state.swarmNode.Remotes() {
446
+			info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr})
447
+		}
448
+		info.NodeID = state.swarmNode.NodeID()
449
+	}
450
+
451
+	return info
452
+}
453
+
454
+func validateAndSanitizeInitRequest(req *types.InitRequest) error {
455
+	var err error
456
+	req.ListenAddr, err = validateAddr(req.ListenAddr)
457
+	if err != nil {
458
+		return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
459
+	}
460
+
461
+	if req.Spec.Annotations.Name == "" {
462
+		req.Spec.Annotations.Name = "default"
463
+	} else if req.Spec.Annotations.Name != "default" {
464
+		return errors.New(`swarm spec must be named "default"`)
465
+	}
466
+
467
+	return nil
468
+}
469
+
470
+func validateAndSanitizeJoinRequest(req *types.JoinRequest) error {
471
+	var err error
472
+	req.ListenAddr, err = validateAddr(req.ListenAddr)
473
+	if err != nil {
474
+		return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
475
+	}
476
+	if len(req.RemoteAddrs) == 0 {
477
+		return errors.New("at least 1 RemoteAddr is required to join")
478
+	}
479
+	for i := range req.RemoteAddrs {
480
+		req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i])
481
+		if err != nil {
482
+			return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err)
483
+		}
484
+	}
485
+	return nil
486
+}
487
+
488
+func validateAddr(addr string) (string, error) {
489
+	if addr == "" {
490
+		return addr, errors.New("invalid empty address")
491
+	}
492
+	newaddr, err := opts.ParseTCPAddr(addr, defaultAddr)
493
+	if err != nil {
494
+		return addr, nil
495
+	}
496
+	return strings.TrimPrefix(newaddr, "tcp://"), nil
497
+}
498
+
499
+func initClusterSpec(node *swarmnode.Node, spec types.Spec) error {
500
+	ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
501
+	for conn := range node.ListenControlSocket(ctx) {
502
+		if ctx.Err() != nil {
503
+			return ctx.Err()
504
+		}
505
+		if conn != nil {
506
+			client := swarmapi.NewControlClient(conn)
507
+			var cluster *swarmapi.Cluster
508
+			for i := 0; ; i++ {
509
+				lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{})
510
+				if err != nil {
511
+					return fmt.Errorf("error on listing clusters: %v", err)
512
+				}
513
+				if len(lcr.Clusters) == 0 {
514
+					if i < 10 {
515
+						time.Sleep(200 * time.Millisecond)
516
+						continue
517
+					}
518
+					return errors.New("empty list of clusters was returned")
519
+				}
520
+				cluster = lcr.Clusters[0]
521
+				break
522
+			}
523
+			// In init, we take the initial default values from swarmkit, and merge
524
+			// any non nil or 0 value from spec to GRPC spec. This will leave the
525
+			// default value alone.
526
+			// Note that this is different from Update(), as in Update() we expect
527
+			// user to specify the complete spec of the cluster (as they already know
528
+			// the existing one and knows which field to update)
529
+			clusterSpec, err := convert.MergeSwarmSpecToGRPC(spec, cluster.Spec)
530
+			if err != nil {
531
+				return fmt.Errorf("error updating cluster settings: %v", err)
532
+			}
533
+			_, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{
534
+				ClusterID:      cluster.ID,
535
+				ClusterVersion: &cluster.Meta.Version,
536
+				Spec:           &clusterSpec,
537
+			})
538
+			if err != nil {
539
+				return fmt.Errorf("error updating cluster settings: %v", err)
540
+			}
541
+			return nil
542
+		}
543
+	}
544
+	return ctx.Err()
545
+}
546
+
547
+func (c *Cluster) listContainerForNode(nodeID string) ([]string, error) {
548
+	var ids []string
549
+	filters := filters.NewArgs()
550
+	filters.Add("label", fmt.Sprintf("com.docker.swarm.node.id=%s", nodeID))
551
+	containers, err := c.config.Backend.Containers(&apitypes.ContainerListOptions{
552
+		Filters: filters,
553
+	})
554
+	if err != nil {
555
+		return []string{}, err
556
+	}
557
+	for _, c := range containers {
558
+		ids = append(ids, c.ID)
559
+	}
560
+	return ids, nil
561
+}