Browse code

Merge pull request #29927 from aaronlehmann/vendor-swarmkit-2e956c4

Revendor swarmkit to 2e956c4

Alexander Morozov authored on 2017/01/07 10:09:21
Showing 16 changed files
... ...
@@ -102,7 +102,7 @@ github.com/docker/containerd 03e5862ec0d8d3b3f750e19fca3ee367e13c090e
102 102
 github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4
103 103
 
104 104
 # cluster
105
-github.com/docker/swarmkit 9e4bd71a1690cd27400714fcd98c329b752b5c4c
105
+github.com/docker/swarmkit 2e956c40c02ad527c90ec85bdae25a0acac1bd87
106 106
 github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
107 107
 github.com/gogo/protobuf v0.3
108 108
 github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a
... ...
@@ -37,6 +37,8 @@ type Agent struct {
37 37
 	started   chan struct{}
38 38
 	startOnce sync.Once // start only once
39 39
 	ready     chan struct{}
40
+	leaving   chan struct{}
41
+	leaveOnce sync.Once
40 42
 	stopped   chan struct{} // requests shutdown
41 43
 	stopOnce  sync.Once     // only allow stop to be called once
42 44
 	closed    chan struct{} // only closed in run
... ...
@@ -53,6 +55,7 @@ func New(config *Config) (*Agent, error) {
53 53
 		config:   config,
54 54
 		sessionq: make(chan sessionOperation),
55 55
 		started:  make(chan struct{}),
56
+		leaving:  make(chan struct{}),
56 57
 		stopped:  make(chan struct{}),
57 58
 		closed:   make(chan struct{}),
58 59
 		ready:    make(chan struct{}),
... ...
@@ -78,6 +81,37 @@ func (a *Agent) Start(ctx context.Context) error {
78 78
 	return err
79 79
 }
80 80
 
81
+// Leave instructs the agent to leave the cluster. This method will shutdown
82
+// assignment processing and remove all assignments from the node.
83
+// Leave blocks until worker has finished closing all task managers or agent
84
+// is closed.
85
+func (a *Agent) Leave(ctx context.Context) error {
86
+	select {
87
+	case <-a.started:
88
+	default:
89
+		return errAgentNotStarted
90
+	}
91
+
92
+	a.leaveOnce.Do(func() {
93
+		close(a.leaving)
94
+	})
95
+
96
+	// agent could be closed while Leave is in progress
97
+	var err error
98
+	ch := make(chan struct{})
99
+	go func() {
100
+		err = a.worker.Wait(ctx)
101
+		close(ch)
102
+	}()
103
+
104
+	select {
105
+	case <-ch:
106
+		return err
107
+	case <-a.closed:
108
+		return ErrClosed
109
+	}
110
+}
111
+
81 112
 // Stop shuts down the agent, blocking until full shutdown. If the agent is not
82 113
 // started, Stop will block until the agent has fully shutdown.
83 114
 func (a *Agent) Stop(ctx context.Context) error {
... ...
@@ -151,6 +185,7 @@ func (a *Agent) run(ctx context.Context) {
151 151
 		registered    = session.registered
152 152
 		ready         = a.ready // first session ready
153 153
 		sessionq      chan sessionOperation
154
+		leaving       = a.leaving
154 155
 		subscriptions = map[string]context.CancelFunc{}
155 156
 	)
156 157
 
... ...
@@ -171,7 +206,21 @@ func (a *Agent) run(ctx context.Context) {
171 171
 		select {
172 172
 		case operation := <-sessionq:
173 173
 			operation.response <- operation.fn(session)
174
+		case <-leaving:
175
+			leaving = nil
176
+
177
+			// TODO(stevvooe): Signal to the manager that the node is leaving.
178
+
179
+			// when leaving we remove all assignments.
180
+			if err := a.worker.Assign(ctx, nil); err != nil {
181
+				log.G(ctx).WithError(err).Error("failed removing all assignments")
182
+			}
174 183
 		case msg := <-session.assignments:
184
+			// if we have left, accept no more assignments
185
+			if leaving == nil {
186
+				continue
187
+			}
188
+
175 189
 			switch msg.Type {
176 190
 			case api.AssignmentsMessage_COMPLETE:
177 191
 				// Need to assign secrets before tasks, because tasks might depend on new secrets
... ...
@@ -115,7 +115,7 @@ func (sr *statusReporter) run(ctx context.Context) {
115 115
 			}
116 116
 
117 117
 			if err != nil {
118
-				log.G(ctx).WithError(err).Error("failed reporting status to agent")
118
+				log.G(ctx).WithError(err).Error("status reporter failed to report status to agent")
119 119
 
120 120
 				// place it back in the map, if not there, allowing us to pick
121 121
 				// the value if a new one came in when we were sending the last
... ...
@@ -1,6 +1,7 @@
1 1
 package agent
2 2
 
3 3
 import (
4
+	"sync"
4 5
 	"time"
5 6
 
6 7
 	"github.com/docker/swarmkit/agent/exec"
... ...
@@ -19,8 +20,10 @@ type taskManager struct {
19 19
 
20 20
 	updateq chan *api.Task
21 21
 
22
-	shutdown chan struct{}
23
-	closed   chan struct{}
22
+	shutdown     chan struct{}
23
+	shutdownOnce sync.Once
24
+	closed       chan struct{}
25
+	closeOnce    sync.Once
24 26
 }
25 27
 
26 28
 func newTaskManager(ctx context.Context, task *api.Task, ctlr exec.Controller, reporter StatusReporter) *taskManager {
... ...
@@ -48,20 +51,15 @@ func (tm *taskManager) Update(ctx context.Context, task *api.Task) error {
48 48
 	}
49 49
 }
50 50
 
51
-// Close shuts down the task manager, blocking until it is stopped.
51
+// Close shuts down the task manager, blocking until it is closed.
52 52
 func (tm *taskManager) Close() error {
53
-	select {
54
-	case <-tm.closed:
55
-		return nil
56
-	case <-tm.shutdown:
57
-	default:
53
+	tm.shutdownOnce.Do(func() {
58 54
 		close(tm.shutdown)
59
-	}
55
+	})
60 56
 
61
-	select {
62
-	case <-tm.closed:
63
-		return nil
64
-	}
57
+	<-tm.closed
58
+
59
+	return nil
65 60
 }
66 61
 
67 62
 func (tm *taskManager) Logs(ctx context.Context, options api.LogSubscriptionOptions, publisher exec.LogPublisher) {
... ...
@@ -106,7 +104,8 @@ func (tm *taskManager) run(ctx context.Context) {
106 106
 			// always check for shutdown before running.
107 107
 			select {
108 108
 			case <-tm.shutdown:
109
-				continue // ignore run request and handle shutdown
109
+				shutdown = tm.shutdown // a little questionable
110
+				continue               // ignore run request and handle shutdown
110 111
 			case <-tm.closed:
111 112
 				continue
112 113
 			default:
... ...
@@ -143,7 +142,7 @@ func (tm *taskManager) run(ctx context.Context) {
143 143
 					}
144 144
 
145 145
 					if err := tm.reporter.UpdateTaskStatus(ctx, running.ID, status); err != nil {
146
-						log.G(ctx).WithError(err).Error("failed reporting status to agent")
146
+						log.G(ctx).WithError(err).Error("task manager failed to report status to agent")
147 147
 					}
148 148
 				}
149 149
 
... ...
@@ -230,25 +229,19 @@ func (tm *taskManager) run(ctx context.Context) {
230 230
 				continue       // wait until operation actually exits.
231 231
 			}
232 232
 
233
-			// TODO(stevvooe): This should be left for the repear.
234
-
235
-			// make an attempt at removing. this is best effort. any errors will be
236
-			// retried by the reaper later.
237
-			if err := tm.ctlr.Remove(ctx); err != nil {
238
-				log.G(ctx).WithError(err).WithField("task.id", tm.task.ID).Error("remove task failed")
239
-			}
240
-
241
-			if err := tm.ctlr.Close(); err != nil {
242
-				log.G(ctx).WithError(err).Error("error closing controller")
243
-			}
244 233
 			// disable everything, and prepare for closing.
245 234
 			statusq = nil
246 235
 			errs = nil
247 236
 			shutdown = nil
248
-			close(tm.closed)
237
+			tm.closeOnce.Do(func() {
238
+				close(tm.closed)
239
+			})
249 240
 		case <-tm.closed:
250 241
 			return
251 242
 		case <-ctx.Done():
243
+			tm.closeOnce.Do(func() {
244
+				close(tm.closed)
245
+			})
252 246
 			return
253 247
 		}
254 248
 	}
... ...
@@ -40,6 +40,9 @@ type Worker interface {
40 40
 
41 41
 	// Subscribe to log messages matching the subscription.
42 42
 	Subscribe(ctx context.Context, subscription *api.SubscriptionMessage) error
43
+
44
+	// Wait blocks until all task managers have closed
45
+	Wait(ctx context.Context) error
43 46
 }
44 47
 
45 48
 // statusReporterKey protects removal map from panic.
... ...
@@ -57,6 +60,9 @@ type worker struct {
57 57
 
58 58
 	taskManagers map[string]*taskManager
59 59
 	mu           sync.RWMutex
60
+
61
+	closed  bool
62
+	closers sync.WaitGroup // keeps track of active closers
60 63
 }
61 64
 
62 65
 func newWorker(db *bolt.DB, executor exec.Executor, publisherProvider exec.LogPublisherProvider) *worker {
... ...
@@ -106,6 +112,10 @@ func (w *worker) Init(ctx context.Context) error {
106 106
 
107 107
 // Close performs worker cleanup when no longer needed.
108 108
 func (w *worker) Close() {
109
+	w.mu.Lock()
110
+	w.closed = true
111
+	w.mu.Unlock()
112
+
109 113
 	w.taskevents.Close()
110 114
 }
111 115
 
... ...
@@ -118,6 +128,10 @@ func (w *worker) Assign(ctx context.Context, assignments []*api.AssignmentChange
118 118
 	w.mu.Lock()
119 119
 	defer w.mu.Unlock()
120 120
 
121
+	if w.closed {
122
+		return ErrClosed
123
+	}
124
+
121 125
 	log.G(ctx).WithFields(logrus.Fields{
122 126
 		"len(assignments)": len(assignments),
123 127
 	}).Debug("(*worker).Assign")
... ...
@@ -140,6 +154,10 @@ func (w *worker) Update(ctx context.Context, assignments []*api.AssignmentChange
140 140
 	w.mu.Lock()
141 141
 	defer w.mu.Unlock()
142 142
 
143
+	if w.closed {
144
+		return ErrClosed
145
+	}
146
+
143 147
 	log.G(ctx).WithFields(logrus.Fields{
144 148
 		"len(assignments)": len(assignments),
145 149
 	}).Debug("(*worker).Update")
... ...
@@ -222,10 +240,22 @@ func reconcileTaskState(ctx context.Context, w *worker, assignments []*api.Assig
222 222
 	}
223 223
 
224 224
 	closeManager := func(tm *taskManager) {
225
-		// when a task is no longer assigned, we shutdown the task manager for
226
-		// it and leave cleanup to the sweeper.
227
-		if err := tm.Close(); err != nil {
228
-			log.G(ctx).WithError(err).Error("error closing task manager")
225
+		go func(tm *taskManager) {
226
+			defer w.closers.Done()
227
+			// when a task is no longer assigned, we shutdown the task manager
228
+			if err := tm.Close(); err != nil {
229
+				log.G(ctx).WithError(err).Error("error closing task manager")
230
+			}
231
+		}(tm)
232
+
233
+		// make an attempt at removing. this is best effort. any errors will be
234
+		// retried by the reaper later.
235
+		if err := tm.ctlr.Remove(ctx); err != nil {
236
+			log.G(ctx).WithError(err).WithField("task.id", tm.task.ID).Error("remove task failed")
237
+		}
238
+
239
+		if err := tm.ctlr.Close(); err != nil {
240
+			log.G(ctx).WithError(err).Error("error closing controller")
229 241
 		}
230 242
 	}
231 243
 
... ...
@@ -359,6 +389,8 @@ func (w *worker) taskManager(ctx context.Context, tx *bolt.Tx, task *api.Task) (
359 359
 		return nil, err
360 360
 	}
361 361
 	w.taskManagers[task.ID] = tm
362
+	// keep track of active tasks
363
+	w.closers.Add(1)
362 364
 	return tm, nil
363 365
 }
364 366
 
... ...
@@ -484,3 +516,18 @@ func (w *worker) Subscribe(ctx context.Context, subscription *api.SubscriptionMe
484 484
 		}
485 485
 	}
486 486
 }
487
+
488
+func (w *worker) Wait(ctx context.Context) error {
489
+	ch := make(chan struct{})
490
+	go func() {
491
+		w.closers.Wait()
492
+		close(ch)
493
+	}()
494
+
495
+	select {
496
+	case <-ch:
497
+		return nil
498
+	case <-ctx.Done():
499
+		return ctx.Err()
500
+	}
501
+}
... ...
@@ -69,7 +69,7 @@ const (
69 69
 	MinNodeCertExpiration = 1 * time.Hour
70 70
 )
71 71
 
72
-// A recoverableErr is an non-fatal error encountered signing a certificate,
72
+// A recoverableErr is a non-fatal error encountered signing a certificate,
73 73
 // which means that the certificate issuance may be retried at a later time.
74 74
 type recoverableErr struct {
75 75
 	err error
... ...
@@ -459,13 +459,26 @@ func RenewTLSConfig(ctx context.Context, s *SecurityConfig, remotes remotes.Remo
459 459
 			if err != nil {
460 460
 				// We failed to read the expiration, let's stick with the starting default
461 461
 				log.Errorf("failed to read the expiration of the TLS certificate in: %s", s.KeyReader().Target())
462
-				updates <- CertificateUpdate{Err: errors.New("failed to read certificate expiration")}
462
+
463
+				select {
464
+				case updates <- CertificateUpdate{Err: errors.New("failed to read certificate expiration")}:
465
+				case <-ctx.Done():
466
+					log.Info("shutting down certificate renewal routine")
467
+					return
468
+				}
463 469
 			} else {
464 470
 				// If we have an expired certificate, we let's stick with the starting default in
465 471
 				// the hope that this is a temporary clock skew.
466 472
 				if validUntil.Before(time.Now()) {
467 473
 					log.WithError(err).Errorf("failed to create a new client TLS config")
468
-					updates <- CertificateUpdate{Err: errors.New("TLS certificate is expired")}
474
+
475
+					select {
476
+					case updates <- CertificateUpdate{Err: errors.New("TLS certificate is expired")}:
477
+					case <-ctx.Done():
478
+						log.Info("shutting down certificate renewal routine")
479
+						return
480
+					}
481
+
469 482
 				} else {
470 483
 					// Random retry time between 50% and 80% of the total time to expiration
471 484
 					retry = calculateRandomExpiry(validFrom, validUntil)
... ...
@@ -478,19 +491,27 @@ func RenewTLSConfig(ctx context.Context, s *SecurityConfig, remotes remotes.Remo
478 478
 
479 479
 			select {
480 480
 			case <-time.After(retry):
481
-				log.Infof("renewing certificate")
481
+				log.Info("renewing certificate")
482 482
 			case <-renew:
483
-				log.Infof("forced certificate renewal")
483
+				log.Info("forced certificate renewal")
484 484
 			case <-ctx.Done():
485
-				log.Infof("shuting down certificate renewal routine")
485
+				log.Info("shutting down certificate renewal routine")
486 486
 				return
487 487
 			}
488 488
 
489
-			// ignore errors - it will just try again laster
489
+			// ignore errors - it will just try again later
490
+			var certUpdate CertificateUpdate
490 491
 			if err := RenewTLSConfigNow(ctx, s, remotes); err != nil {
491
-				updates <- CertificateUpdate{Err: err}
492
+				certUpdate.Err = err
492 493
 			} else {
493
-				updates <- CertificateUpdate{Role: s.ClientTLSCreds.Role()}
494
+				certUpdate.Role = s.ClientTLSCreds.Role()
495
+			}
496
+
497
+			select {
498
+			case updates <- certUpdate:
499
+			case <-ctx.Done():
500
+				log.Info("shutting down certificate renewal routine")
501
+				return
494 502
 			}
495 503
 		}
496 504
 	}()
... ...
@@ -42,7 +42,7 @@ func GetLogger(ctx context.Context) *logrus.Entry {
42 42
 }
43 43
 
44 44
 // WithModule adds the module to the context, appending it with a slash if a
45
-// module already exists. A module is just an roughly correlated defined by the
45
+// module already exists. A module is just a roughly correlated defined by the
46 46
 // call tree for a given context.
47 47
 //
48 48
 // As an example, we might have a "node" module already part of a context. If
... ...
@@ -73,7 +73,7 @@ func newPortSpace(protocol api.PortConfig_Protocol) (*portSpace, error) {
73 73
 	}, nil
74 74
 }
75 75
 
76
-// getPortConfigkey returns a map key for doing set operations with
76
+// getPortConfigKey returns a map key for doing set operations with
77 77
 // ports. The key consists of name, protocol and target port which
78 78
 // uniquely identifies a port within a single Endpoint.
79 79
 func getPortConfigKey(p *api.PortConfig) api.PortConfig {
... ...
@@ -173,7 +173,7 @@ func (lb *LogBroker) watchSubscriptions(nodeID string) ([]*subscription, chan ev
173 173
 	}))
174 174
 
175 175
 	// Grab current subscriptions.
176
-	subscriptions := make([]*subscription, 0, len(lb.registeredSubscriptions))
176
+	var subscriptions []*subscription
177 177
 	for _, s := range lb.registeredSubscriptions {
178 178
 		if s.Contains(nodeID) {
179 179
 			subscriptions = append(subscriptions, s)
... ...
@@ -45,7 +45,7 @@ const (
45 45
 	defaultTaskHistoryRetentionLimit = 5
46 46
 )
47 47
 
48
-// RemoteAddrs provides an listening address and an optional advertise address
48
+// RemoteAddrs provides a listening address and an optional advertise address
49 49
 // for serving the remote API.
50 50
 type RemoteAddrs struct {
51 51
 	// Address to bind
... ...
@@ -102,7 +102,7 @@ func (r *Orchestrator) Stop() {
102 102
 
103 103
 func (r *Orchestrator) tick(ctx context.Context) {
104 104
 	// tickTasks must be called first, so we respond to task-level changes
105
-	// before performing service reconcillation.
105
+	// before performing service reconciliation.
106 106
 	r.tickTasks(ctx)
107 107
 	r.tickServices(ctx)
108 108
 }
... ...
@@ -181,7 +181,7 @@ func (r *Orchestrator) reconcile(ctx context.Context, service *api.Service) {
181 181
 func (r *Orchestrator) addTasks(ctx context.Context, batch *store.Batch, service *api.Service, runningSlots map[uint64]orchestrator.Slot, deadSlots map[uint64]orchestrator.Slot, count int) {
182 182
 	slot := uint64(0)
183 183
 	for i := 0; i < count; i++ {
184
-		// Find an slot number that is missing a running task
184
+		// Find a slot number that is missing a running task
185 185
 		for {
186 186
 			slot++
187 187
 			if _, ok := runningSlots[slot]; !ok {
... ...
@@ -15,7 +15,7 @@ import (
15 15
 
16 16
 // This file provides task-level orchestration. It observes changes to task
17 17
 // and node state and kills/recreates tasks if necessary. This is distinct from
18
-// service-level reconcillation, which observes changes to services and creates
18
+// service-level reconciliation, which observes changes to services and creates
19 19
 // and/or kills tasks to match the service definition.
20 20
 
21 21
 func invalidNode(n *api.Node) bool {
... ...
@@ -380,8 +380,8 @@ func (n *Node) Run(ctx context.Context) error {
380 380
 		cancel()
381 381
 		n.stop(ctx)
382 382
 		if nodeRemoved {
383
-			// Move WAL and snapshot out of the way, since
384
-			// they are no longer usable.
383
+			// Delete WAL and snapshots, since they are no longer
384
+			// usable.
385 385
 			if err := n.raftLogger.Clear(ctx); err != nil {
386 386
 				log.G(ctx).WithError(err).Error("failed to move wal after node removal")
387 387
 			}
... ...
@@ -405,7 +405,7 @@ func (n *Node) Run(ctx context.Context) error {
405 405
 
406 406
 			// Save entries to storage
407 407
 			if err := n.saveToStorage(ctx, &raftConfig, rd.HardState, rd.Entries, rd.Snapshot); err != nil {
408
-				log.G(ctx).WithError(err).Error("failed to save entries to storage")
408
+				return errors.Wrap(err, "failed to save entries to storage")
409 409
 			}
410 410
 
411 411
 			if len(rd.Messages) != 0 {
... ...
@@ -710,11 +710,20 @@ func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinRespons
710 710
 	defer n.membershipLock.Unlock()
711 711
 
712 712
 	if !n.IsMember() {
713
-		return nil, ErrNoRaftMember
713
+		return nil, grpc.Errorf(codes.FailedPrecondition, "%s", ErrNoRaftMember.Error())
714 714
 	}
715 715
 
716 716
 	if !n.isLeader() {
717
-		return nil, ErrLostLeadership
717
+		return nil, grpc.Errorf(codes.FailedPrecondition, "%s", ErrLostLeadership.Error())
718
+	}
719
+
720
+	// A single manager must not be able to join the raft cluster twice. If
721
+	// it did, that would cause the quorum to be computed incorrectly. This
722
+	// could happen if the WAL was deleted from an active manager.
723
+	for _, m := range n.cluster.Members() {
724
+		if m.NodeID == nodeInfo.NodeID {
725
+			return nil, grpc.Errorf(codes.AlreadyExists, "%s", "a raft member with this node ID already exists")
726
+		}
718 727
 	}
719 728
 
720 729
 	// Find a unique ID for the joining member.
... ...
@@ -734,7 +743,7 @@ func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinRespons
734 734
 
735 735
 	requestHost, requestPort, err := net.SplitHostPort(remoteAddr)
736 736
 	if err != nil {
737
-		return nil, fmt.Errorf("invalid address %s in raft join request", remoteAddr)
737
+		return nil, grpc.Errorf(codes.InvalidArgument, "invalid address %s in raft join request", remoteAddr)
738 738
 	}
739 739
 
740 740
 	requestIP := net.ParseIP(requestHost)
... ...
@@ -990,6 +999,11 @@ func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessa
990 990
 	defer n.stopMu.RUnlock()
991 991
 
992 992
 	if n.IsMember() {
993
+		if msg.Message.To != n.Config.ID {
994
+			n.processRaftMessageLogger(ctx, msg).Errorf("received message intended for raft_id %x", msg.Message.To)
995
+			return &api.ProcessRaftMessageResponse{}, nil
996
+		}
997
+
993 998
 		if err := n.raftNode.Step(ctx, *msg.Message); err != nil {
994 999
 			n.processRaftMessageLogger(ctx, msg).WithError(err).Debug("raft Step failed")
995 1000
 		}
... ...
@@ -295,7 +295,7 @@ func (n *Node) run(ctx context.Context) (err error) {
295 295
 	var wg sync.WaitGroup
296 296
 	wg.Add(2)
297 297
 	go func() {
298
-		managerErr = n.runManager(ctx, securityConfig, managerReady) // store err and loop
298
+		managerErr = n.superviseManager(ctx, securityConfig, managerReady) // store err and loop
299 299
 		wg.Done()
300 300
 		cancel()
301 301
 	}()
... ...
@@ -330,6 +330,14 @@ func (n *Node) Stop(ctx context.Context) error {
330 330
 	default:
331 331
 		return errNodeNotStarted
332 332
 	}
333
+	// ask agent to clean up assignments
334
+	n.Lock()
335
+	if n.agent != nil {
336
+		if err := n.agent.Leave(ctx); err != nil {
337
+			log.G(ctx).WithError(err).Error("agent failed to clean up assignments")
338
+		}
339
+	}
340
+	n.Unlock()
333 341
 
334 342
 	n.stopOnce.Do(func() {
335 343
 		close(n.stopped)
... ...
@@ -616,9 +624,7 @@ func (n *Node) waitRole(ctx context.Context, role string) error {
616 616
 		n.roleCond.Wait()
617 617
 		select {
618 618
 		case <-ctx.Done():
619
-			if ctx.Err() != nil {
620
-				return ctx.Err()
621
-			}
619
+			return ctx.Err()
622 620
 		default:
623 621
 		}
624 622
 	}
... ...
@@ -627,100 +633,117 @@ func (n *Node) waitRole(ctx context.Context, role string) error {
627 627
 }
628 628
 
629 629
 func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig, ready chan struct{}) error {
630
-	for {
631
-		if err := n.waitRole(ctx, ca.ManagerRole); err != nil {
632
-			return err
630
+	remoteAddr, _ := n.remotes.Select(n.NodeID())
631
+	m, err := manager.New(&manager.Config{
632
+		ForceNewCluster: n.config.ForceNewCluster,
633
+		RemoteAPI: manager.RemoteAddrs{
634
+			ListenAddr:    n.config.ListenRemoteAPI,
635
+			AdvertiseAddr: n.config.AdvertiseRemoteAPI,
636
+		},
637
+		ControlAPI:       n.config.ListenControlAPI,
638
+		SecurityConfig:   securityConfig,
639
+		ExternalCAs:      n.config.ExternalCAs,
640
+		JoinRaft:         remoteAddr.Addr,
641
+		StateDir:         n.config.StateDir,
642
+		HeartbeatTick:    n.config.HeartbeatTick,
643
+		ElectionTick:     n.config.ElectionTick,
644
+		AutoLockManagers: n.config.AutoLockManagers,
645
+		UnlockKey:        n.unlockKey,
646
+		Availability:     n.config.Availability,
647
+	})
648
+	if err != nil {
649
+		return err
650
+	}
651
+	done := make(chan struct{})
652
+	var runErr error
653
+	go func() {
654
+		if err := m.Run(context.Background()); err != nil && err != raft.ErrMemberRemoved {
655
+			runErr = err
633 656
 		}
657
+		close(done)
658
+	}()
634 659
 
635
-		remoteAddr, _ := n.remotes.Select(n.NodeID())
636
-		m, err := manager.New(&manager.Config{
637
-			ForceNewCluster: n.config.ForceNewCluster,
638
-			RemoteAPI: manager.RemoteAddrs{
639
-				ListenAddr:    n.config.ListenRemoteAPI,
640
-				AdvertiseAddr: n.config.AdvertiseRemoteAPI,
641
-			},
642
-			ControlAPI:       n.config.ListenControlAPI,
643
-			SecurityConfig:   securityConfig,
644
-			ExternalCAs:      n.config.ExternalCAs,
645
-			JoinRaft:         remoteAddr.Addr,
646
-			StateDir:         n.config.StateDir,
647
-			HeartbeatTick:    n.config.HeartbeatTick,
648
-			ElectionTick:     n.config.ElectionTick,
649
-			AutoLockManagers: n.config.AutoLockManagers,
650
-			UnlockKey:        n.unlockKey,
651
-			Availability:     n.config.Availability,
652
-		})
653
-		if err != nil {
654
-			return err
655
-		}
656
-		done := make(chan struct{})
657
-		var runErr error
658
-		go func() {
659
-			runErr = m.Run(context.Background())
660
-			close(done)
661
-		}()
660
+	workerRole := make(chan struct{})
661
+	waitRoleCtx, waitRoleCancel := context.WithCancel(ctx)
662
+	defer waitRoleCancel()
663
+	go func() {
664
+		n.waitRole(waitRoleCtx, ca.WorkerRole)
665
+		close(workerRole)
666
+	}()
662 667
 
668
+	defer func() {
663 669
 		n.Lock()
664
-		n.manager = m
670
+		n.manager = nil
665 671
 		n.Unlock()
672
+		m.Stop(ctx)
673
+		<-done
674
+		n.setControlSocket(nil)
675
+	}()
666 676
 
667
-		connCtx, connCancel := context.WithCancel(ctx)
668
-		go n.initManagerConnection(connCtx, ready)
669
-
670
-		// this happens only on initial start
671
-		if ready != nil {
672
-			go func(ready chan struct{}) {
673
-				select {
674
-				case <-ready:
675
-					addr, err := n.RemoteAPIAddr()
676
-					if err != nil {
677
-						log.G(ctx).WithError(err).Errorf("get remote api addr")
678
-					} else {
679
-						n.remotes.Observe(api.Peer{NodeID: n.NodeID(), Addr: addr}, remotes.DefaultObservationWeight)
680
-					}
681
-				case <-connCtx.Done():
682
-				}
683
-			}(ready)
684
-			ready = nil
685
-		}
677
+	n.Lock()
678
+	n.manager = m
679
+	n.Unlock()
686 680
 
687
-		roleChanged := make(chan error)
688
-		waitCtx, waitCancel := context.WithCancel(ctx)
689
-		go func() {
690
-			err := n.waitRole(waitCtx, ca.WorkerRole)
691
-			roleChanged <- err
692
-		}()
681
+	connCtx, connCancel := context.WithCancel(ctx)
682
+	defer connCancel()
693 683
 
694
-		select {
695
-		case <-done:
696
-			// Fail out if m.Run() returns error, otherwise wait for
697
-			// role change.
698
-			if runErr != nil && runErr != raft.ErrMemberRemoved {
699
-				err = runErr
700
-			} else {
701
-				err = <-roleChanged
702
-			}
703
-		case err = <-roleChanged:
704
-		}
684
+	go n.initManagerConnection(connCtx, ready)
705 685
 
706
-		n.Lock()
707
-		n.manager = nil
708
-		n.Unlock()
686
+	// this happens only on initial start
687
+	if ready != nil {
688
+		go func(ready chan struct{}) {
689
+			select {
690
+			case <-ready:
691
+				addr, err := n.RemoteAPIAddr()
692
+				if err != nil {
693
+					log.G(ctx).WithError(err).Errorf("get remote api addr")
694
+				} else {
695
+					n.remotes.Observe(api.Peer{NodeID: n.NodeID(), Addr: addr}, remotes.DefaultObservationWeight)
696
+				}
697
+			case <-connCtx.Done():
698
+			}
699
+		}(ready)
700
+	}
709 701
 
702
+	// wait for manager stop or for role change
703
+	// if manager stopped before role change, wait for new role for 16 seconds,
704
+	// then just restart manager, we might just miss that event.
705
+	// we need to wait for role to prevent manager to start again with wrong
706
+	// certificate
707
+	select {
708
+	case <-done:
709
+		timer := time.NewTimer(16 * time.Second)
710
+		defer timer.Stop()
711
+		select {
712
+		case <-timer.C:
713
+			log.G(ctx).Warn("failed to get worker role after manager stop, restart manager")
714
+		case <-workerRole:
715
+		case <-ctx.Done():
716
+			return ctx.Err()
717
+		}
718
+		return runErr
719
+	case <-workerRole:
720
+		log.G(ctx).Info("role changed to worker, wait for manager to stop")
710 721
 		select {
711 722
 		case <-done:
723
+			return runErr
712 724
 		case <-ctx.Done():
713
-			err = ctx.Err()
714
-			m.Stop(context.Background())
715
-			<-done
725
+			return ctx.Err()
716 726
 		}
717
-		connCancel()
718
-		n.setControlSocket(nil)
719
-		waitCancel()
727
+	case <-ctx.Done():
728
+		return ctx.Err()
729
+	}
730
+}
720 731
 
721
-		if err != nil {
732
+func (n *Node) superviseManager(ctx context.Context, securityConfig *ca.SecurityConfig, ready chan struct{}) error {
733
+	for {
734
+		if err := n.waitRole(ctx, ca.ManagerRole); err != nil {
722 735
 			return err
723 736
 		}
737
+		if err := n.runManager(ctx, securityConfig, ready); err != nil {
738
+			return errors.Wrap(err, "manager stopped")
739
+		}
740
+		ready = nil
724 741
 	}
725 742
 }
726 743