Browse code

Vendor swarmkit

Signed-off-by: Anusha Ragunathan <anusha.ragunathan@docker.com>

Anusha Ragunathan authored on 2017/01/20 10:18:22
Showing 26 changed files
... ...
@@ -103,7 +103,7 @@ github.com/docker/containerd 03e5862ec0d8d3b3f750e19fca3ee367e13c090e
103 103
 github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4
104 104
 
105 105
 # cluster
106
-github.com/docker/swarmkit 62d835f478b2e4fd2768deb88fb3b32e334faaee
106
+github.com/docker/swarmkit 98620dd1ddfcc03d8f4b0d2910ecf6b52918a731
107 107
 github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
108 108
 github.com/gogo/protobuf v0.3
109 109
 github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a
... ...
@@ -148,7 +148,7 @@ func (w *worker) Assign(ctx context.Context, assignments []*api.AssignmentChange
148 148
 // Update updates the set of tasks and secret for the worker.
149 149
 // Tasks in the added set will be added to the worker, and tasks in the removed set
150 150
 // will be removed from the worker
151
-// Serets in the added set will be added to the worker, and secrets in the removed set
151
+// Secrets in the added set will be added to the worker, and secrets in the removed set
152 152
 // will be removed from the worker.
153 153
 func (w *worker) Update(ctx context.Context, assignments []*api.AssignmentChange) error {
154 154
 	w.mu.Lock()
... ...
@@ -140,7 +140,12 @@ type ServiceSpec struct {
140 140
 	//	*ServiceSpec_Global
141 141
 	Mode isServiceSpec_Mode `protobuf_oneof:"mode"`
142 142
 	// UpdateConfig controls the rate and policy of updates.
143
-	Update   *UpdateConfig              `protobuf:"bytes,6,opt,name=update" json:"update,omitempty"`
143
+	Update *UpdateConfig `protobuf:"bytes,6,opt,name=update" json:"update,omitempty"`
144
+	// ServiceSpec.Networks has been deprecated and is replaced by
145
+	// Networks field in Task (TaskSpec.Networks).
146
+	// This field (ServiceSpec.Networks) is kept for compatibility.
147
+	// In case TaskSpec.Networks does not exist, ServiceSpec.Networks
148
+	// is still honored if it exists.
144 149
 	Networks []*NetworkAttachmentConfig `protobuf:"bytes,7,rep,name=networks" json:"networks,omitempty"`
145 150
 	// Service endpoint specifies the user provided configuration
146 151
 	// to properly discover and load balance a service.
... ...
@@ -72,6 +72,11 @@ message ServiceSpec {
72 72
 	// UpdateConfig controls the rate and policy of updates.
73 73
 	UpdateConfig update = 6;
74 74
 
75
+	// ServiceSpec.Networks has been deprecated and is replaced by
76
+	// Networks field in Task (TaskSpec.Networks).
77
+	// This field (ServiceSpec.Networks) is kept for compatibility.
78
+	// In case TaskSpec.Networks does not exist, ServiceSpec.Networks
79
+	// is still honored if it exists.
75 80
 	repeated NetworkAttachmentConfig networks = 7 [deprecated=true];
76 81
 
77 82
 	// Service endpoint specifies the user provided configuration
... ...
@@ -19,7 +19,7 @@ import (
19 19
 type localRequestKeyType struct{}
20 20
 
21 21
 // LocalRequestKey is a context key to mark a request that originating on the
22
-// local node. The assocated value is a RemoteNodeInfo structure describing the
22
+// local node. The associated value is a RemoteNodeInfo structure describing the
23 23
 // local node.
24 24
 var LocalRequestKey = localRequestKeyType{}
25 25
 
... ...
@@ -103,26 +103,9 @@ func makeExternalSignRequest(ctx context.Context, client *http.Client, url strin
103 103
 	if err != nil {
104 104
 		return nil, recoverableErr{err: errors.Wrap(err, "unable to perform certificate signing request")}
105 105
 	}
106
-
107
-	doneReading := make(chan struct{})
108
-	bodyClosed := make(chan struct{})
109
-	go func() {
110
-		select {
111
-		case <-ctx.Done():
112
-		case <-doneReading:
113
-		}
114
-		resp.Body.Close()
115
-		close(bodyClosed)
116
-	}()
106
+	defer resp.Body.Close()
117 107
 
118 108
 	body, err := ioutil.ReadAll(resp.Body)
119
-	close(doneReading)
120
-	<-bodyClosed
121
-	select {
122
-	case <-ctx.Done():
123
-		return nil, ctx.Err()
124
-	default:
125
-	}
126 109
 	if err != nil {
127 110
 		return nil, recoverableErr{err: errors.Wrap(err, "unable to read CSR response body")}
128 111
 	}
... ...
@@ -353,7 +353,7 @@ func (s *Server) issueRenewCertificate(ctx context.Context, nodeID string, csr [
353 353
 	}, nil
354 354
 }
355 355
 
356
-// GetRootCACertificate returns the certificate of the Root CA. It is used as a convinience for distributing
356
+// GetRootCACertificate returns the certificate of the Root CA. It is used as a convenience for distributing
357 357
 // the root of trust for the swarm. Clients should be using the CA hash to verify if they weren't target to
358 358
 // a MiTM. If they fail to do so, node bootstrap works with TOFU semantics.
359 359
 func (s *Server) GetRootCACertificate(ctx context.Context, request *api.GetRootCACertificateRequest) (*api.GetRootCACertificateResponse, error) {
... ...
@@ -96,9 +96,9 @@ func (c *Conn) Close(success bool) error {
96 96
 	}
97 97
 
98 98
 	if success {
99
-		c.remotes.ObserveIfExists(c.peer, -remotes.DefaultObservationWeight)
100
-	} else {
101 99
 		c.remotes.ObserveIfExists(c.peer, remotes.DefaultObservationWeight)
100
+	} else {
101
+		c.remotes.ObserveIfExists(c.peer, -remotes.DefaultObservationWeight)
102 102
 	}
103 103
 
104 104
 	return c.ClientConn.Close()
... ...
@@ -29,6 +29,21 @@ func WithLogger(ctx context.Context, logger *logrus.Entry) context.Context {
29 29
 	return context.WithValue(ctx, loggerKey{}, logger)
30 30
 }
31 31
 
32
+// WithFields returns a new context with added fields to logger.
33
+func WithFields(ctx context.Context, fields logrus.Fields) context.Context {
34
+	logger := ctx.Value(loggerKey{})
35
+
36
+	if logger == nil {
37
+		logger = L
38
+	}
39
+	return WithLogger(ctx, logger.(*logrus.Entry).WithFields(fields))
40
+}
41
+
42
+// WithField is convenience wrapper around WithFields.
43
+func WithField(ctx context.Context, key, value string) context.Context {
44
+	return WithFields(ctx, logrus.Fields{key: value})
45
+}
46
+
32 47
 // GetLogger retrieves the current logger from the context. If no logger is
33 48
 // available, the default logger is returned.
34 49
 func GetLogger(ctx context.Context) *logrus.Entry {
... ...
@@ -3,6 +3,7 @@ package allocator
3 3
 import (
4 4
 	"sync"
5 5
 
6
+	"github.com/docker/docker/pkg/plugingetter"
6 7
 	"github.com/docker/go-events"
7 8
 	"github.com/docker/swarmkit/manager/state"
8 9
 	"github.com/docker/swarmkit/manager/state/store"
... ...
@@ -27,6 +28,9 @@ type Allocator struct {
27 27
 	stopChan chan struct{}
28 28
 	// doneChan is closed when the allocator is finished running.
29 29
 	doneChan chan struct{}
30
+
31
+	// pluginGetter provides access to docker's plugin inventory.
32
+	pluginGetter plugingetter.PluginGetter
30 33
 }
31 34
 
32 35
 // taskBallot controls how the voting for task allocation is
... ...
@@ -67,14 +71,15 @@ type allocActor struct {
67 67
 
68 68
 // New returns a new instance of Allocator for use during allocation
69 69
 // stage of the manager.
70
-func New(store *store.MemoryStore) (*Allocator, error) {
70
+func New(store *store.MemoryStore, pg plugingetter.PluginGetter) (*Allocator, error) {
71 71
 	a := &Allocator{
72 72
 		store: store,
73 73
 		taskBallot: &taskBallot{
74 74
 			votes: make(map[string][]string),
75 75
 		},
76
-		stopChan: make(chan struct{}),
77
-		doneChan: make(chan struct{}),
76
+		stopChan:     make(chan struct{}),
77
+		doneChan:     make(chan struct{}),
78
+		pluginGetter: pg,
78 79
 	}
79 80
 
80 81
 	return a, nil
... ...
@@ -73,7 +73,7 @@ type networkContext struct {
73 73
 }
74 74
 
75 75
 func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {
76
-	na, err := networkallocator.New()
76
+	na, err := networkallocator.New(a.pluginGetter)
77 77
 	if err != nil {
78 78
 		return err
79 79
 	}
... ...
@@ -4,7 +4,7 @@ import (
4 4
 	"fmt"
5 5
 	"net"
6 6
 
7
-	"github.com/docker/docker/pkg/plugins"
7
+	"github.com/docker/docker/pkg/plugingetter"
8 8
 	"github.com/docker/libnetwork/datastore"
9 9
 	"github.com/docker/libnetwork/driverapi"
10 10
 	"github.com/docker/libnetwork/drvregistry"
... ...
@@ -49,7 +49,7 @@ type NetworkAllocator struct {
49 49
 	nodes map[string]struct{}
50 50
 }
51 51
 
52
-// Local in-memory state related to netwok that need to be tracked by NetworkAllocator
52
+// Local in-memory state related to network that need to be tracked by NetworkAllocator
53 53
 type network struct {
54 54
 	// A local cache of the store object.
55 55
 	nw *api.Network
... ...
@@ -69,7 +69,7 @@ type initializer struct {
69 69
 }
70 70
 
71 71
 // New returns a new NetworkAllocator handle
72
-func New() (*NetworkAllocator, error) {
72
+func New(pg plugingetter.PluginGetter) (*NetworkAllocator, error) {
73 73
 	na := &NetworkAllocator{
74 74
 		networks: make(map[string]*network),
75 75
 		services: make(map[string]struct{}),
... ...
@@ -79,7 +79,7 @@ func New() (*NetworkAllocator, error) {
79 79
 
80 80
 	// There are no driver configurations and notification
81 81
 	// functions as of now.
82
-	reg, err := drvregistry.New(nil, nil, nil, nil, nil)
82
+	reg, err := drvregistry.New(nil, nil, nil, nil, pg)
83 83
 	if err != nil {
84 84
 		return nil, err
85 85
 	}
... ...
@@ -133,7 +133,7 @@ func (na *NetworkAllocator) getNetwork(id string) *network {
133 133
 }
134 134
 
135 135
 // Deallocate frees all the general and driver specific resources
136
-// whichs were assigned to the passed network.
136
+// which were assigned to the passed network.
137 137
 func (na *NetworkAllocator) Deallocate(n *api.Network) error {
138 138
 	localNet := na.getNetwork(n.ID)
139 139
 	if localNet == nil {
... ...
@@ -657,7 +657,11 @@ func (na *NetworkAllocator) resolveDriver(n *api.Network) (driverapi.Driver, str
657 657
 }
658 658
 
659 659
 func (na *NetworkAllocator) loadDriver(name string) error {
660
-	_, err := plugins.Get(name, driverapi.NetworkPluginEndpointType)
660
+	pg := na.drvRegistry.GetPluginGetter()
661
+	if pg == nil {
662
+		return fmt.Errorf("plugin store is unintialized")
663
+	}
664
+	_, err := pg.Get(name, driverapi.NetworkPluginEndpointType, plugingetter.Lookup)
661 665
 	return err
662 666
 }
663 667
 
... ...
@@ -17,12 +17,12 @@ const (
17 17
 	dynamicPortEnd = 32767
18 18
 
19 19
 	// The start of master port range which will hold all the
20
-	// allocation state of ports allocated so far regerdless of
20
+	// allocation state of ports allocated so far regardless of
21 21
 	// whether it was user defined or not.
22 22
 	masterPortStart = 1
23 23
 
24 24
 	// The end of master port range which will hold all the
25
-	// allocation state of ports allocated so far regerdless of
25
+	// allocation state of ports allocated so far regardless of
26 26
 	// whether it was user defined or not.
27 27
 	masterPortEnd = 65535
28 28
 )
... ...
@@ -65,7 +65,7 @@ func (ps allocatedPorts) addState(p *api.PortConfig) {
65 65
 // Note multiple dynamically allocated ports might exists. In this case,
66 66
 // we will remove only at a time so both allocated ports are tracked.
67 67
 //
68
-// Note becasue of the potential co-existence of user-defined and dynamically
68
+// Note because of the potential co-existence of user-defined and dynamically
69 69
 // allocated ports, delState has to be called for user-defined port first.
70 70
 // dynamically allocated ports should be removed later.
71 71
 func (ps allocatedPorts) delState(p *api.PortConfig) *api.PortConfig {
... ...
@@ -277,7 +277,7 @@ func (pa *portAllocator) isPortsAllocated(s *api.Service) bool {
277 277
 	}
278 278
 
279 279
 	// If service has allocated endpoint while has no user-defined endpoint,
280
-	// we assume allocated endpoints are redudant, and they need deallocated.
280
+	// we assume allocated endpoints are redundant, and they need deallocated.
281 281
 	// If service has no allocated endpoint while has user-defined endpoint,
282 282
 	// we assume it is not allocated.
283 283
 	if (s.Endpoint != nil && s.Spec.Endpoint == nil) ||
... ...
@@ -502,7 +502,7 @@ func (s *Server) UpdateService(ctx context.Context, request *api.UpdateServiceRe
502 502
 		}
503 503
 
504 504
 		if !reflect.DeepEqual(requestSpecNetworks, specNetworks) {
505
-			return errNetworkUpdateNotSupported
505
+			return grpc.Errorf(codes.Unimplemented, errNetworkUpdateNotSupported.Error())
506 506
 		}
507 507
 
508 508
 		// Check to see if all the secrets being added exist as objects
... ...
@@ -516,11 +516,11 @@ func (s *Server) UpdateService(ctx context.Context, request *api.UpdateServiceRe
516 516
 		// with service mode change (comparing current config with previous config).
517 517
 		// proper way to change service mode is to delete and re-add.
518 518
 		if reflect.TypeOf(service.Spec.Mode) != reflect.TypeOf(request.Spec.Mode) {
519
-			return errModeChangeNotAllowed
519
+			return grpc.Errorf(codes.Unimplemented, errModeChangeNotAllowed.Error())
520 520
 		}
521 521
 
522 522
 		if service.Spec.Annotations.Name != request.Spec.Annotations.Name {
523
-			return errRenameNotSupported
523
+			return grpc.Errorf(codes.Unimplemented, errRenameNotSupported.Error())
524 524
 		}
525 525
 
526 526
 		service.Meta.Version = *request.ServiceVersion
... ...
@@ -106,6 +106,7 @@ type nodeUpdate struct {
106 106
 // Dispatcher is responsible for dispatching tasks and tracking agent health.
107 107
 type Dispatcher struct {
108 108
 	mu                   sync.Mutex
109
+	wg                   sync.WaitGroup
109 110
 	nodes                *nodeStore
110 111
 	store                *store.MemoryStore
111 112
 	mgrQueue             *watch.Queue
... ...
@@ -216,6 +217,9 @@ func (d *Dispatcher) Run(ctx context.Context) error {
216 216
 
217 217
 	defer cancel()
218 218
 	d.ctx, d.cancel = context.WithCancel(ctx)
219
+	ctx = d.ctx
220
+	d.wg.Add(1)
221
+	defer d.wg.Done()
219 222
 	d.mu.Unlock()
220 223
 
221 224
 	publishManagers := func(peers []*api.Peer) {
... ...
@@ -240,10 +244,10 @@ func (d *Dispatcher) Run(ctx context.Context) error {
240 240
 		case ev := <-peerWatcher:
241 241
 			publishManagers(ev.([]*api.Peer))
242 242
 		case <-d.processUpdatesTrigger:
243
-			d.processUpdates()
243
+			d.processUpdates(ctx)
244 244
 			batchTimer.Reset(maxBatchInterval)
245 245
 		case <-batchTimer.C:
246
-			d.processUpdates()
246
+			d.processUpdates(ctx)
247 247
 			batchTimer.Reset(maxBatchInterval)
248 248
 		case v := <-configWatcher:
249 249
 			cluster := v.(state.EventUpdateCluster)
... ...
@@ -260,7 +264,7 @@ func (d *Dispatcher) Run(ctx context.Context) error {
260 260
 			d.networkBootstrapKeys = cluster.Cluster.NetworkBootstrapKeys
261 261
 			d.mu.Unlock()
262 262
 			d.keyMgrQueue.Publish(cluster.Cluster.NetworkBootstrapKeys)
263
-		case <-d.ctx.Done():
263
+		case <-ctx.Done():
264 264
 			return nil
265 265
 		}
266 266
 	}
... ...
@@ -287,17 +291,20 @@ func (d *Dispatcher) Stop() error {
287 287
 	d.mgrQueue.Close()
288 288
 	d.keyMgrQueue.Close()
289 289
 
290
+	d.wg.Wait()
291
+
290 292
 	return nil
291 293
 }
292 294
 
293
-func (d *Dispatcher) isRunningLocked() error {
295
+func (d *Dispatcher) isRunningLocked() (context.Context, error) {
294 296
 	d.mu.Lock()
295 297
 	if !d.isRunning() {
296 298
 		d.mu.Unlock()
297
-		return grpc.Errorf(codes.Aborted, "dispatcher is stopped")
299
+		return nil, grpc.Errorf(codes.Aborted, "dispatcher is stopped")
298 300
 	}
301
+	ctx := d.ctx
299 302
 	d.mu.Unlock()
300
-	return nil
303
+	return ctx, nil
301 304
 }
302 305
 
303 306
 func (d *Dispatcher) markNodesUnknown(ctx context.Context) error {
... ...
@@ -377,7 +384,7 @@ func (d *Dispatcher) isRunning() bool {
377 377
 // markNodeReady updates the description of a node, updates its address, and sets status to READY
378 378
 // this is used during registration when a new node description is provided
379 379
 // and during node updates when the node description changes
380
-func (d *Dispatcher) markNodeReady(nodeID string, description *api.NodeDescription, addr string) error {
380
+func (d *Dispatcher) markNodeReady(ctx context.Context, nodeID string, description *api.NodeDescription, addr string) error {
381 381
 	d.nodeUpdatesLock.Lock()
382 382
 	d.nodeUpdates[nodeID] = nodeUpdate{
383 383
 		status: &api.NodeStatus{
... ...
@@ -396,8 +403,8 @@ func (d *Dispatcher) markNodeReady(nodeID string, description *api.NodeDescripti
396 396
 	if numUpdates >= maxBatchItems {
397 397
 		select {
398 398
 		case d.processUpdatesTrigger <- struct{}{}:
399
-		case <-d.ctx.Done():
400
-			return d.ctx.Err()
399
+		case <-ctx.Done():
400
+			return ctx.Err()
401 401
 		}
402 402
 
403 403
 	}
... ...
@@ -405,8 +412,8 @@ func (d *Dispatcher) markNodeReady(nodeID string, description *api.NodeDescripti
405 405
 	// Wait until the node update batch happens before unblocking register.
406 406
 	d.processUpdatesLock.Lock()
407 407
 	select {
408
-	case <-d.ctx.Done():
409
-		return d.ctx.Err()
408
+	case <-ctx.Done():
409
+		return ctx.Err()
410 410
 	default:
411 411
 	}
412 412
 	d.processUpdatesCond.Wait()
... ...
@@ -431,7 +438,8 @@ func nodeIPFromContext(ctx context.Context) (string, error) {
431 431
 // register is used for registration of node with particular dispatcher.
432 432
 func (d *Dispatcher) register(ctx context.Context, nodeID string, description *api.NodeDescription) (string, error) {
433 433
 	// prevent register until we're ready to accept it
434
-	if err := d.isRunningLocked(); err != nil {
434
+	dctx, err := d.isRunningLocked()
435
+	if err != nil {
435 436
 		return "", err
436 437
 	}
437 438
 
... ...
@@ -453,7 +461,7 @@ func (d *Dispatcher) register(ctx context.Context, nodeID string, description *a
453 453
 		log.G(ctx).Debugf(err.Error())
454 454
 	}
455 455
 
456
-	if err := d.markNodeReady(nodeID, description, addr); err != nil {
456
+	if err := d.markNodeReady(dctx, nodeID, description, addr); err != nil {
457 457
 		return "", err
458 458
 	}
459 459
 
... ...
@@ -496,7 +504,8 @@ func (d *Dispatcher) UpdateTaskStatus(ctx context.Context, r *api.UpdateTaskStat
496 496
 	}
497 497
 	log := log.G(ctx).WithFields(fields)
498 498
 
499
-	if err := d.isRunningLocked(); err != nil {
499
+	dctx, err := d.isRunningLocked()
500
+	if err != nil {
500 501
 		return nil, err
501 502
 	}
502 503
 
... ...
@@ -542,13 +551,13 @@ func (d *Dispatcher) UpdateTaskStatus(ctx context.Context, r *api.UpdateTaskStat
542 542
 	if numUpdates >= maxBatchItems {
543 543
 		select {
544 544
 		case d.processUpdatesTrigger <- struct{}{}:
545
-		case <-d.ctx.Done():
545
+		case <-dctx.Done():
546 546
 		}
547 547
 	}
548 548
 	return nil, nil
549 549
 }
550 550
 
551
-func (d *Dispatcher) processUpdates() {
551
+func (d *Dispatcher) processUpdates(ctx context.Context) {
552 552
 	var (
553 553
 		taskUpdates map[string]*api.TaskStatus
554 554
 		nodeUpdates map[string]nodeUpdate
... ...
@@ -571,7 +580,7 @@ func (d *Dispatcher) processUpdates() {
571 571
 		return
572 572
 	}
573 573
 
574
-	log := log.G(d.ctx).WithFields(logrus.Fields{
574
+	log := log.G(ctx).WithFields(logrus.Fields{
575 575
 		"method": "(*Dispatcher).processUpdates",
576 576
 	})
577 577
 
... ...
@@ -661,7 +670,8 @@ func (d *Dispatcher) Tasks(r *api.TasksRequest, stream api.Dispatcher_TasksServe
661 661
 	}
662 662
 	nodeID := nodeInfo.NodeID
663 663
 
664
-	if err := d.isRunningLocked(); err != nil {
664
+	dctx, err := d.isRunningLocked()
665
+	if err != nil {
665 666
 		return err
666 667
 	}
667 668
 
... ...
@@ -763,8 +773,8 @@ func (d *Dispatcher) Tasks(r *api.TasksRequest, stream api.Dispatcher_TasksServe
763 763
 				break batchingLoop
764 764
 			case <-stream.Context().Done():
765 765
 				return stream.Context().Err()
766
-			case <-d.ctx.Done():
767
-				return d.ctx.Err()
766
+			case <-dctx.Done():
767
+				return dctx.Err()
768 768
 			}
769 769
 		}
770 770
 
... ...
@@ -783,7 +793,8 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche
783 783
 	}
784 784
 	nodeID := nodeInfo.NodeID
785 785
 
786
-	if err := d.isRunningLocked(); err != nil {
786
+	dctx, err := d.isRunningLocked()
787
+	if err != nil {
787 788
 		return err
788 789
 	}
789 790
 
... ...
@@ -1075,8 +1086,8 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche
1075 1075
 				break batchingLoop
1076 1076
 			case <-stream.Context().Done():
1077 1077
 				return stream.Context().Err()
1078
-			case <-d.ctx.Done():
1079
-				return d.ctx.Err()
1078
+			case <-dctx.Done():
1079
+				return dctx.Err()
1080 1080
 			}
1081 1081
 		}
1082 1082
 
... ...
@@ -1197,16 +1208,14 @@ func (d *Dispatcher) moveTasksToOrphaned(nodeID string) error {
1197 1197
 
1198 1198
 // markNodeNotReady sets the node state to some state other than READY
1199 1199
 func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, message string) error {
1200
-	if err := d.isRunningLocked(); err != nil {
1200
+	dctx, err := d.isRunningLocked()
1201
+	if err != nil {
1201 1202
 		return err
1202 1203
 	}
1203 1204
 
1204 1205
 	// Node is down. Add it to down nodes so that we can keep
1205 1206
 	// track of tasks assigned to the node.
1206
-	var (
1207
-		node *api.Node
1208
-		err  error
1209
-	)
1207
+	var node *api.Node
1210 1208
 	d.store.View(func(readTx store.ReadTx) {
1211 1209
 		node = store.GetNode(readTx, id)
1212 1210
 		if node == nil {
... ...
@@ -1219,7 +1228,7 @@ func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, mes
1219 1219
 
1220 1220
 	expireFunc := func() {
1221 1221
 		if err := d.moveTasksToOrphaned(id); err != nil {
1222
-			log.G(context.TODO()).WithError(err).Error(`failed to move all tasks to "ORPHANED" state`)
1222
+			log.G(dctx).WithError(err).Error(`failed to move all tasks to "ORPHANED" state`)
1223 1223
 		}
1224 1224
 
1225 1225
 		d.downNodes.Delete(id)
... ...
@@ -1243,7 +1252,7 @@ func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, mes
1243 1243
 	if numUpdates >= maxBatchItems {
1244 1244
 		select {
1245 1245
 		case d.processUpdatesTrigger <- struct{}{}:
1246
-		case <-d.ctx.Done():
1246
+		case <-dctx.Done():
1247 1247
 		}
1248 1248
 	}
1249 1249
 
... ...
@@ -1291,7 +1300,8 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio
1291 1291
 	}
1292 1292
 	nodeID := nodeInfo.NodeID
1293 1293
 
1294
-	if err := d.isRunningLocked(); err != nil {
1294
+	dctx, err := d.isRunningLocked()
1295
+	if err != nil {
1295 1296
 		return err
1296 1297
 	}
1297 1298
 
... ...
@@ -1310,7 +1320,7 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio
1310 1310
 			log.G(ctx).Debugf(err.Error())
1311 1311
 		}
1312 1312
 		// update the node description
1313
-		if err := d.markNodeReady(nodeID, r.Description, addr); err != nil {
1313
+		if err := d.markNodeReady(dctx, nodeID, r.Description, addr); err != nil {
1314 1314
 			return err
1315 1315
 		}
1316 1316
 	}
... ...
@@ -1401,7 +1411,7 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio
1401 1401
 			return stream.Context().Err()
1402 1402
 		case <-node.Disconnect:
1403 1403
 			disconnect = true
1404
-		case <-d.ctx.Done():
1404
+		case <-dctx.Done():
1405 1405
 			disconnect = true
1406 1406
 		case ev := <-keyMgrUpdates:
1407 1407
 			netKeys = ev.([]*api.EncryptionKey)
... ...
@@ -14,6 +14,7 @@ import (
14 14
 	"time"
15 15
 
16 16
 	"github.com/Sirupsen/logrus"
17
+	"github.com/docker/docker/pkg/plugingetter"
17 18
 	"github.com/docker/go-events"
18 19
 	"github.com/docker/swarmkit/api"
19 20
 	"github.com/docker/swarmkit/ca"
... ...
@@ -105,6 +106,9 @@ type Config struct {
105 105
 
106 106
 	// Availability allows a user to control the current scheduling status of a node
107 107
 	Availability api.NodeSpec_Availability
108
+
109
+	// PluginGetter provides access to docker's plugin inventory.
110
+	PluginGetter plugingetter.PluginGetter
108 111
 }
109 112
 
110 113
 // Manager is the cluster manager for Swarm.
... ...
@@ -478,7 +482,7 @@ func (m *Manager) Stop(ctx context.Context, clearData bool) {
478 478
 	// starting up.
479 479
 	<-m.started
480 480
 
481
-	// the mutex stops us from trying to stop while we're alrady stopping, or
481
+	// the mutex stops us from trying to stop while we're already stopping, or
482 482
 	// from returning before we've finished stopping.
483 483
 	m.mu.Lock()
484 484
 	defer m.mu.Unlock()
... ...
@@ -833,7 +837,7 @@ func (m *Manager) becomeLeader(ctx context.Context) {
833 833
 	// shutdown underlying manager processes when leadership is
834 834
 	// lost.
835 835
 
836
-	m.allocator, err = allocator.New(s)
836
+	m.allocator, err = allocator.New(s, m.config.PluginGetter)
837 837
 	if err != nil {
838 838
 		log.G(ctx).WithError(err).Error("failed to create allocator")
839 839
 		// TODO(stevvooe): It doesn't seem correct here to fail
... ...
@@ -406,7 +406,11 @@ func (u *Updater) updateTask(ctx context.Context, slot orchestrator.Slot, update
406 406
 	}
407 407
 
408 408
 	if delayStartCh != nil {
409
-		<-delayStartCh
409
+		select {
410
+		case <-delayStartCh:
411
+		case <-u.stopChan:
412
+			return nil
413
+		}
410 414
 	}
411 415
 
412 416
 	// Wait for the new task to come up.
... ...
@@ -456,7 +460,11 @@ func (u *Updater) useExistingTask(ctx context.Context, slot orchestrator.Slot, e
456 456
 		}
457 457
 
458 458
 		if delayStartCh != nil {
459
-			<-delayStartCh
459
+			select {
460
+			case <-delayStartCh:
461
+			case <-u.stopChan:
462
+				return nil
463
+			}
460 464
 		}
461 465
 	}
462 466
 
... ...
@@ -39,7 +39,7 @@ func newNodeInfo(n *api.Node, tasks map[string]*api.Task, availableResources api
39 39
 	return nodeInfo
40 40
 }
41 41
 
42
-// addTask removes a task from nodeInfo if it's tracked there, and returns true
42
+// removeTask removes a task from nodeInfo if it's tracked there, and returns true
43 43
 // if nodeInfo was modified.
44 44
 func (nodeInfo *NodeInfo) removeTask(t *api.Task) bool {
45 45
 	oldTask, ok := nodeInfo.Tasks[t.ID]
... ...
@@ -532,7 +532,8 @@ func (s *Scheduler) scheduleTaskGroup(ctx context.Context, taskGroup map[string]
532 532
 	}
533 533
 
534 534
 	nodes := s.nodeSet.findBestNodes(len(taskGroup), s.pipeline.Process, nodeLess)
535
-	if len(nodes) == 0 {
535
+	nodeCount := len(nodes)
536
+	if nodeCount == 0 {
536 537
 		s.noSuitableNode(ctx, taskGroup, schedulingDecisions)
537 538
 		return
538 539
 	}
... ...
@@ -540,7 +541,7 @@ func (s *Scheduler) scheduleTaskGroup(ctx context.Context, taskGroup map[string]
540 540
 	failedConstraints := make(map[int]bool) // key is index in nodes slice
541 541
 	nodeIter := 0
542 542
 	for taskID, t := range taskGroup {
543
-		n := &nodes[nodeIter%len(nodes)]
543
+		n := &nodes[nodeIter%nodeCount]
544 544
 
545 545
 		log.G(ctx).WithField("task.id", t.ID).Debugf("assigning to node %s", n.ID)
546 546
 		newT := *t
... ...
@@ -555,16 +556,16 @@ func (s *Scheduler) scheduleTaskGroup(ctx context.Context, taskGroup map[string]
555 555
 		nodeInfo, err := s.nodeSet.nodeInfo(n.ID)
556 556
 		if err == nil && nodeInfo.addTask(&newT) {
557 557
 			s.nodeSet.updateNode(nodeInfo)
558
-			nodes[nodeIter%len(nodes)] = nodeInfo
558
+			nodes[nodeIter%nodeCount] = nodeInfo
559 559
 		}
560 560
 
561 561
 		schedulingDecisions[taskID] = schedulingDecision{old: t, new: &newT}
562 562
 		delete(taskGroup, taskID)
563 563
 
564
-		if nodeIter+1 < len(nodes) {
564
+		if nodeIter+1 < nodeCount {
565 565
 			// First pass fills the nodes until they have the same
566 566
 			// number of tasks from this service.
567
-			nextNode := nodes[(nodeIter+1)%len(nodes)]
567
+			nextNode := nodes[(nodeIter+1)%nodeCount]
568 568
 			if nodeLess(&nextNode, &nodeInfo) {
569 569
 				nodeIter++
570 570
 			}
... ...
@@ -575,10 +576,10 @@ func (s *Scheduler) scheduleTaskGroup(ctx context.Context, taskGroup map[string]
575 575
 		}
576 576
 
577 577
 		origNodeIter := nodeIter
578
-		for failedConstraints[nodeIter%len(nodes)] || !s.pipeline.Process(&nodes[nodeIter%len(nodes)]) {
579
-			failedConstraints[nodeIter%len(nodes)] = true
578
+		for failedConstraints[nodeIter%nodeCount] || !s.pipeline.Process(&nodes[nodeIter%nodeCount]) {
579
+			failedConstraints[nodeIter%nodeCount] = true
580 580
 			nodeIter++
581
-			if nodeIter-origNodeIter == len(nodes) {
581
+			if nodeIter-origNodeIter == nodeCount {
582 582
 				// None of the nodes meet the constraints anymore.
583 583
 				s.noSuitableNode(ctx, taskGroup, schedulingDecisions)
584 584
 				return
... ...
@@ -2,16 +2,12 @@ package membership
2 2
 
3 3
 import (
4 4
 	"errors"
5
-	"fmt"
6 5
 	"sync"
7 6
 
8
-	"google.golang.org/grpc"
9
-
10 7
 	"github.com/coreos/etcd/raft/raftpb"
11 8
 	"github.com/docker/swarmkit/api"
12 9
 	"github.com/docker/swarmkit/watch"
13 10
 	"github.com/gogo/protobuf/proto"
14
-	"golang.org/x/net/context"
15 11
 )
16 12
 
17 13
 var (
... ...
@@ -25,26 +21,19 @@ var (
25 25
 	ErrConfigChangeInvalid = errors.New("membership: ConfChange type should be either AddNode, RemoveNode or UpdateNode")
26 26
 	// ErrCannotUnmarshalConfig is thrown when a node cannot unmarshal a configuration change
27 27
 	ErrCannotUnmarshalConfig = errors.New("membership: cannot unmarshal configuration change")
28
+	// ErrMemberRemoved is thrown when a node was removed from the cluster
29
+	ErrMemberRemoved = errors.New("raft: member was removed from the cluster")
28 30
 )
29 31
 
30
-// deferredConn used to store removed members connection for some time.
31
-// We need this in case if removed node is redirector or endpoint of ControlAPI call.
32
-type deferredConn struct {
33
-	tick int
34
-	conn *grpc.ClientConn
35
-}
36
-
37 32
 // Cluster represents a set of active
38 33
 // raft Members
39 34
 type Cluster struct {
40
-	mu           sync.RWMutex
41
-	members      map[uint64]*Member
42
-	deferedConns map[*deferredConn]struct{}
35
+	mu      sync.RWMutex
36
+	members map[uint64]*Member
43 37
 
44 38
 	// removed contains the list of removed Members,
45 39
 	// those ids cannot be reused
46
-	removed        map[uint64]bool
47
-	heartbeatTicks int
40
+	removed map[uint64]bool
48 41
 
49 42
 	PeersBroadcast *watch.Queue
50 43
 }
... ...
@@ -52,74 +41,19 @@ type Cluster struct {
52 52
 // Member represents a raft Cluster Member
53 53
 type Member struct {
54 54
 	*api.RaftMember
55
-
56
-	Conn         *grpc.ClientConn
57
-	tick         int
58
-	active       bool
59
-	lastSeenHost string
60
-}
61
-
62
-// HealthCheck sends a health check RPC to the member and returns the response.
63
-func (member *Member) HealthCheck(ctx context.Context) error {
64
-	healthClient := api.NewHealthClient(member.Conn)
65
-	resp, err := healthClient.Check(ctx, &api.HealthCheckRequest{Service: "Raft"})
66
-	if err != nil {
67
-		return err
68
-	}
69
-	if resp.Status != api.HealthCheckResponse_SERVING {
70
-		return fmt.Errorf("health check returned status %s", resp.Status.String())
71
-	}
72
-	return nil
73 55
 }
74 56
 
75 57
 // NewCluster creates a new Cluster neighbors list for a raft Member.
76
-// Member marked as inactive if there was no call ReportActive for heartbeatInterval.
77
-func NewCluster(heartbeatTicks int) *Cluster {
58
+func NewCluster() *Cluster {
78 59
 	// TODO(abronan): generate Cluster ID for federation
79 60
 
80 61
 	return &Cluster{
81 62
 		members:        make(map[uint64]*Member),
82 63
 		removed:        make(map[uint64]bool),
83
-		deferedConns:   make(map[*deferredConn]struct{}),
84
-		heartbeatTicks: heartbeatTicks,
85 64
 		PeersBroadcast: watch.NewQueue(),
86 65
 	}
87 66
 }
88 67
 
89
-func (c *Cluster) handleInactive() {
90
-	for _, m := range c.members {
91
-		if !m.active {
92
-			continue
93
-		}
94
-		m.tick++
95
-		if m.tick > c.heartbeatTicks {
96
-			m.active = false
97
-			if m.Conn != nil {
98
-				m.Conn.Close()
99
-			}
100
-		}
101
-	}
102
-}
103
-
104
-func (c *Cluster) handleDeferredConns() {
105
-	for dc := range c.deferedConns {
106
-		dc.tick++
107
-		if dc.tick > c.heartbeatTicks {
108
-			dc.conn.Close()
109
-			delete(c.deferedConns, dc)
110
-		}
111
-	}
112
-}
113
-
114
-// Tick increases ticks for all members. After heartbeatTicks node marked as
115
-// inactive.
116
-func (c *Cluster) Tick() {
117
-	c.mu.Lock()
118
-	defer c.mu.Unlock()
119
-	c.handleInactive()
120
-	c.handleDeferredConns()
121
-}
122
-
123 68
 // Members returns the list of raft Members in the Cluster.
124 69
 func (c *Cluster) Members() map[uint64]*Member {
125 70
 	members := make(map[uint64]*Member)
... ...
@@ -168,8 +102,6 @@ func (c *Cluster) AddMember(member *Member) error {
168 168
 	if c.removed[member.RaftID] {
169 169
 		return ErrIDRemoved
170 170
 	}
171
-	member.active = true
172
-	member.tick = 0
173 171
 
174 172
 	c.members[member.RaftID] = member
175 173
 
... ...
@@ -187,55 +119,47 @@ func (c *Cluster) RemoveMember(id uint64) error {
187 187
 	return c.clearMember(id)
188 188
 }
189 189
 
190
-// ClearMember removes a node from the Cluster Memberlist, but does NOT add it
191
-// to the removed list.
192
-func (c *Cluster) ClearMember(id uint64) error {
190
+// UpdateMember updates member address.
191
+func (c *Cluster) UpdateMember(id uint64, m *api.RaftMember) error {
193 192
 	c.mu.Lock()
194 193
 	defer c.mu.Unlock()
195 194
 
196
-	return c.clearMember(id)
197
-}
198
-
199
-func (c *Cluster) clearMember(id uint64) error {
200
-	m, ok := c.members[id]
201
-	if ok {
202
-		if m.Conn != nil {
203
-			// defer connection close to after heartbeatTicks
204
-			dConn := &deferredConn{conn: m.Conn}
205
-			c.deferedConns[dConn] = struct{}{}
206
-		}
207
-		delete(c.members, id)
195
+	if c.removed[id] {
196
+		return ErrIDRemoved
208 197
 	}
209
-	c.broadcastUpdate()
210
-	return nil
211
-}
212
-
213
-// ReplaceMemberConnection replaces the member's GRPC connection.
214
-func (c *Cluster) ReplaceMemberConnection(id uint64, oldConn *Member, newConn *Member, newAddr string, force bool) error {
215
-	c.mu.Lock()
216
-	defer c.mu.Unlock()
217 198
 
218 199
 	oldMember, ok := c.members[id]
219 200
 	if !ok {
220 201
 		return ErrIDNotFound
221 202
 	}
222 203
 
223
-	if !force && oldConn.Conn != oldMember.Conn {
224
-		// The connection was already replaced. Don't do it again.
225
-		newConn.Conn.Close()
226
-		return nil
204
+	if oldMember.NodeID != m.NodeID {
205
+		// Should never happen; this is a sanity check
206
+		return errors.New("node ID mismatch match on node update")
227 207
 	}
228 208
 
229
-	if oldMember.Conn != nil {
230
-		oldMember.Conn.Close()
209
+	if oldMember.Addr == m.Addr {
210
+		// nothing to do
211
+		return nil
231 212
 	}
213
+	oldMember.RaftMember = m
214
+	return nil
215
+}
216
+
217
+// ClearMember removes a node from the Cluster Memberlist, but does NOT add it
218
+// to the removed list.
219
+func (c *Cluster) ClearMember(id uint64) error {
220
+	c.mu.Lock()
221
+	defer c.mu.Unlock()
232 222
 
233
-	newMember := *oldMember
234
-	newMember.RaftMember = oldMember.RaftMember.Copy()
235
-	newMember.RaftMember.Addr = newAddr
236
-	newMember.Conn = newConn.Conn
237
-	c.members[id] = &newMember
223
+	return c.clearMember(id)
224
+}
238 225
 
226
+func (c *Cluster) clearMember(id uint64) error {
227
+	if _, ok := c.members[id]; ok {
228
+		delete(c.members, id)
229
+		c.broadcastUpdate()
230
+	}
239 231
 	return nil
240 232
 }
241 233
 
... ...
@@ -249,60 +173,12 @@ func (c *Cluster) IsIDRemoved(id uint64) bool {
249 249
 // Clear resets the list of active Members and removed Members.
250 250
 func (c *Cluster) Clear() {
251 251
 	c.mu.Lock()
252
-	for _, member := range c.members {
253
-		if member.Conn != nil {
254
-			member.Conn.Close()
255
-		}
256
-	}
257
-
258
-	for dc := range c.deferedConns {
259
-		dc.conn.Close()
260
-	}
261 252
 
262 253
 	c.members = make(map[uint64]*Member)
263 254
 	c.removed = make(map[uint64]bool)
264
-	c.deferedConns = make(map[*deferredConn]struct{})
265 255
 	c.mu.Unlock()
266 256
 }
267 257
 
268
-// ReportActive reports that member is active (called ProcessRaftMessage),
269
-func (c *Cluster) ReportActive(id uint64, sourceHost string) {
270
-	c.mu.Lock()
271
-	defer c.mu.Unlock()
272
-	m, ok := c.members[id]
273
-	if !ok {
274
-		return
275
-	}
276
-	m.tick = 0
277
-	m.active = true
278
-	if sourceHost != "" {
279
-		m.lastSeenHost = sourceHost
280
-	}
281
-}
282
-
283
-// Active returns true if node is active.
284
-func (c *Cluster) Active(id uint64) bool {
285
-	c.mu.RLock()
286
-	defer c.mu.RUnlock()
287
-	m, ok := c.members[id]
288
-	if !ok {
289
-		return false
290
-	}
291
-	return m.active
292
-}
293
-
294
-// LastSeenHost returns the last observed source address that the specified
295
-// member connected from.
296
-func (c *Cluster) LastSeenHost(id uint64) string {
297
-	c.mu.RLock()
298
-	defer c.mu.RUnlock()
299
-	m, ok := c.members[id]
300
-	if ok {
301
-		return m.lastSeenHost
302
-	}
303
-	return ""
304
-}
305
-
306 258
 // ValidateConfigurationChange takes a proposed ConfChange and
307 259
 // ensures that it is valid.
308 260
 func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
... ...
@@ -334,34 +210,3 @@ func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
334 334
 	}
335 335
 	return nil
336 336
 }
337
-
338
-// CanRemoveMember checks if removing a Member would not result in a loss
339
-// of quorum, this check is needed before submitting a configuration change
340
-// that might block or harm the Cluster on Member recovery
341
-func (c *Cluster) CanRemoveMember(from uint64, id uint64) bool {
342
-	members := c.Members()
343
-	nreachable := 0 // reachable managers after removal
344
-
345
-	for _, m := range members {
346
-		if m.RaftID == id {
347
-			continue
348
-		}
349
-
350
-		// Local node from where the remove is issued
351
-		if m.RaftID == from {
352
-			nreachable++
353
-			continue
354
-		}
355
-
356
-		if c.Active(m.RaftID) {
357
-			nreachable++
358
-		}
359
-	}
360
-
361
-	nquorum := (len(members)-1)/2 + 1
362
-	if nreachable < nquorum {
363
-		return false
364
-	}
365
-
366
-	return true
367
-}
... ...
@@ -27,6 +27,7 @@ import (
27 27
 	"github.com/docker/swarmkit/manager/raftselector"
28 28
 	"github.com/docker/swarmkit/manager/state/raft/membership"
29 29
 	"github.com/docker/swarmkit/manager/state/raft/storage"
30
+	"github.com/docker/swarmkit/manager/state/raft/transport"
30 31
 	"github.com/docker/swarmkit/manager/state/store"
31 32
 	"github.com/docker/swarmkit/watch"
32 33
 	"github.com/gogo/protobuf/proto"
... ...
@@ -51,8 +52,6 @@ var (
51 51
 	ErrRequestTooLarge = errors.New("raft: raft message is too large and can't be sent")
52 52
 	// ErrCannotRemoveMember is thrown when we try to remove a member from the cluster but this would result in a loss of quorum
53 53
 	ErrCannotRemoveMember = errors.New("raft: member cannot be removed, because removing it may result in loss of quorum")
54
-	// ErrMemberRemoved is thrown when a node was removed from the cluster
55
-	ErrMemberRemoved = errors.New("raft: member was removed from the cluster")
56 54
 	// ErrNoClusterLeader is thrown when the cluster has no elected leader
57 55
 	ErrNoClusterLeader = errors.New("raft: no elected cluster leader")
58 56
 	// ErrMemberUnknown is sent in response to a message from an
... ...
@@ -88,8 +87,9 @@ type EncryptionKeyRotator interface {
88 88
 // Node represents the Raft Node useful
89 89
 // configuration.
90 90
 type Node struct {
91
-	raftNode raft.Node
92
-	cluster  *membership.Cluster
91
+	raftNode  raft.Node
92
+	cluster   *membership.Cluster
93
+	transport *transport.Transport
93 94
 
94 95
 	raftStore           *raft.MemoryStorage
95 96
 	memoryStore         *store.MemoryStore
... ...
@@ -100,6 +100,7 @@ type Node struct {
100 100
 	campaignWhenAble    bool
101 101
 	signalledLeadership uint32
102 102
 	isMember            uint32
103
+	bootstrapMembers    []*api.RaftMember
103 104
 
104 105
 	// waitProp waits for all the proposals to be terminated before
105 106
 	// shutting down the node.
... ...
@@ -113,9 +114,11 @@ type Node struct {
113 113
 	ticker clock.Ticker
114 114
 	doneCh chan struct{}
115 115
 	// RemovedFromRaft notifies about node deletion from raft cluster
116
-	RemovedFromRaft     chan struct{}
117
-	removeRaftFunc      func()
118
-	cancelFunc          func()
116
+	RemovedFromRaft chan struct{}
117
+	cancelFunc      func()
118
+	// removeRaftCh notifies about node deletion from raft cluster
119
+	removeRaftCh        chan struct{}
120
+	removeRaftOnce      sync.Once
119 121
 	leadershipBroadcast *watch.Queue
120 122
 
121 123
 	// used to coordinate shutdown
... ...
@@ -131,7 +134,6 @@ type Node struct {
131 131
 	// to stop.
132 132
 	stopped chan struct{}
133 133
 
134
-	lastSendToMember    map[uint64]chan struct{}
135 134
 	raftLogger          *storage.EncryptedRaftLogger
136 135
 	keyRotator          EncryptionKeyRotator
137 136
 	rotationQueued      bool
... ...
@@ -189,7 +191,7 @@ func NewNode(opts NodeOptions) *Node {
189 189
 	raftStore := raft.NewMemoryStorage()
190 190
 
191 191
 	n := &Node{
192
-		cluster:   membership.NewCluster(2 * cfg.ElectionTick),
192
+		cluster:   membership.NewCluster(),
193 193
 		raftStore: raftStore,
194 194
 		opts:      opts,
195 195
 		Config: &raft.Config{
... ...
@@ -204,7 +206,6 @@ func NewNode(opts NodeOptions) *Node {
204 204
 		RemovedFromRaft:     make(chan struct{}),
205 205
 		stopped:             make(chan struct{}),
206 206
 		leadershipBroadcast: watch.NewQueue(),
207
-		lastSendToMember:    make(map[uint64]chan struct{}),
208 207
 		keyRotator:          opts.KeyRotator,
209 208
 	}
210 209
 	n.memoryStore = store.NewMemoryStore(n)
... ...
@@ -218,16 +219,6 @@ func NewNode(opts NodeOptions) *Node {
218 218
 	n.reqIDGen = idutil.NewGenerator(uint16(n.Config.ID), time.Now())
219 219
 	n.wait = newWait()
220 220
 
221
-	n.removeRaftFunc = func(n *Node) func() {
222
-		var removeRaftOnce sync.Once
223
-		return func() {
224
-			removeRaftOnce.Do(func() {
225
-				atomic.StoreUint32(&n.isMember, 0)
226
-				close(n.RemovedFromRaft)
227
-			})
228
-		}
229
-	}(n)
230
-
231 221
 	n.cancelFunc = func(n *Node) func() {
232 222
 		var cancelOnce sync.Once
233 223
 		return func() {
... ...
@@ -240,6 +231,34 @@ func NewNode(opts NodeOptions) *Node {
240 240
 	return n
241 241
 }
242 242
 
243
+// IsIDRemoved reports if member with id was removed from cluster.
244
+// Part of transport.Raft interface.
245
+func (n *Node) IsIDRemoved(id uint64) bool {
246
+	return n.cluster.IsIDRemoved(id)
247
+}
248
+
249
+// NodeRemoved signals that node was removed from cluster and should stop.
250
+// Part of transport.Raft interface.
251
+func (n *Node) NodeRemoved() {
252
+	n.removeRaftOnce.Do(func() {
253
+		atomic.StoreUint32(&n.isMember, 0)
254
+		close(n.RemovedFromRaft)
255
+	})
256
+}
257
+
258
+// ReportSnapshot reports snapshot status to underlying raft node.
259
+// Part of transport.Raft interface.
260
+func (n *Node) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
261
+	n.raftNode.ReportSnapshot(id, status)
262
+}
263
+
264
+// ReportUnreachable reports to underlying raft node that member with id is
265
+// unreachable.
266
+// Part of transport.Raft interface.
267
+func (n *Node) ReportUnreachable(id uint64) {
268
+	n.raftNode.ReportUnreachable(id)
269
+}
270
+
243 271
 // WithContext returns context which is cancelled when parent context cancelled
244 272
 // or node is stopped.
245 273
 func (n *Node) WithContext(ctx context.Context) (context.Context, context.CancelFunc) {
... ...
@@ -255,13 +274,29 @@ func (n *Node) WithContext(ctx context.Context) (context.Context, context.Cancel
255 255
 	return ctx, cancel
256 256
 }
257 257
 
258
+func (n *Node) initTransport() {
259
+	transportConfig := &transport.Config{
260
+		HeartbeatInterval: time.Duration(n.Config.ElectionTick) * n.opts.TickInterval,
261
+		SendTimeout:       n.opts.SendTimeout,
262
+		Credentials:       n.opts.TLSCredentials,
263
+		Raft:              n,
264
+	}
265
+	n.transport = transport.New(transportConfig)
266
+}
267
+
258 268
 // JoinAndStart joins and starts the raft server
259 269
 func (n *Node) JoinAndStart(ctx context.Context) (err error) {
260 270
 	ctx, cancel := n.WithContext(ctx)
261 271
 	defer func() {
262 272
 		cancel()
263 273
 		if err != nil {
274
+			n.stopMu.Lock()
275
+			// to shutdown transport
276
+			close(n.stopped)
277
+			n.stopMu.Unlock()
264 278
 			n.done()
279
+		} else {
280
+			atomic.StoreUint32(&n.isMember, 1)
265 281
 		}
266 282
 	}()
267 283
 
... ...
@@ -281,58 +316,59 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) {
281 281
 	n.snapshotMeta = snapshot.Metadata
282 282
 	n.writtenWALIndex, _ = n.raftStore.LastIndex() // lastIndex always returns nil as an error
283 283
 
284
-	if loadAndStartErr == storage.ErrNoWAL {
284
+	// restore from snapshot
285
+	if loadAndStartErr == nil {
285 286
 		if n.opts.JoinAddr != "" {
286
-			c, err := n.ConnectToMember(n.opts.JoinAddr, 10*time.Second)
287
-			if err != nil {
288
-				return err
289
-			}
290
-			client := api.NewRaftMembershipClient(c.Conn)
291
-			defer func() {
292
-				_ = c.Conn.Close()
293
-			}()
294
-
295
-			joinCtx, joinCancel := context.WithTimeout(ctx, 10*time.Second)
296
-			defer joinCancel()
297
-			resp, err := client.Join(joinCtx, &api.JoinRequest{
298
-				Addr: n.opts.Addr,
299
-			})
300
-			if err != nil {
301
-				return err
302
-			}
287
+			log.G(ctx).Warning("ignoring request to join cluster, because raft state already exists")
288
+		}
289
+		n.campaignWhenAble = true
290
+		n.initTransport()
291
+		n.raftNode = raft.RestartNode(n.Config)
292
+		return nil
293
+	}
303 294
 
304
-			n.Config.ID = resp.RaftID
295
+	// first member of cluster
296
+	if n.opts.JoinAddr == "" {
297
+		// First member in the cluster, self-assign ID
298
+		n.Config.ID = uint64(rand.Int63()) + 1
299
+		peer, err := n.newRaftLogs(n.opts.ID)
300
+		if err != nil {
301
+			return err
302
+		}
303
+		n.campaignWhenAble = true
304
+		n.initTransport()
305
+		n.raftNode = raft.StartNode(n.Config, []raft.Peer{peer})
306
+		return nil
307
+	}
305 308
 
306
-			if _, err := n.newRaftLogs(n.opts.ID); err != nil {
307
-				return err
308
-			}
309
+	// join to existing cluster
309 310
 
310
-			n.raftNode = raft.StartNode(n.Config, []raft.Peer{})
311
+	conn, err := dial(n.opts.JoinAddr, "tcp", n.opts.TLSCredentials, 10*time.Second)
312
+	if err != nil {
313
+		return err
314
+	}
315
+	defer conn.Close()
316
+	client := api.NewRaftMembershipClient(conn)
311 317
 
312
-			if err := n.registerNodes(resp.Members); err != nil {
313
-				n.raftLogger.Close(ctx)
314
-				return err
315
-			}
316
-		} else {
317
-			// First member in the cluster, self-assign ID
318
-			n.Config.ID = uint64(rand.Int63()) + 1
319
-			peer, err := n.newRaftLogs(n.opts.ID)
320
-			if err != nil {
321
-				return err
322
-			}
323
-			n.raftNode = raft.StartNode(n.Config, []raft.Peer{peer})
324
-			n.campaignWhenAble = true
325
-		}
326
-		atomic.StoreUint32(&n.isMember, 1)
327
-		return nil
318
+	joinCtx, joinCancel := context.WithTimeout(ctx, 10*time.Second)
319
+	defer joinCancel()
320
+	resp, err := client.Join(joinCtx, &api.JoinRequest{
321
+		Addr: n.opts.Addr,
322
+	})
323
+	if err != nil {
324
+		return err
328 325
 	}
329 326
 
330
-	if n.opts.JoinAddr != "" {
331
-		log.G(ctx).Warning("ignoring request to join cluster, because raft state already exists")
327
+	n.Config.ID = resp.RaftID
328
+
329
+	if _, err := n.newRaftLogs(n.opts.ID); err != nil {
330
+		return err
332 331
 	}
333
-	n.campaignWhenAble = true
334
-	n.raftNode = raft.RestartNode(n.Config)
335
-	atomic.StoreUint32(&n.isMember, 1)
332
+	n.bootstrapMembers = resp.Members
333
+
334
+	n.initTransport()
335
+	n.raftNode = raft.StartNode(n.Config, nil)
336
+
336 337
 	return nil
337 338
 }
338 339
 
... ...
@@ -372,6 +408,9 @@ func (n *Node) done() {
372 372
 	n.leadershipBroadcast.Close()
373 373
 	n.cluster.PeersBroadcast.Close()
374 374
 	n.memoryStore.Close()
375
+	if n.transport != nil {
376
+		n.transport.Stop()
377
+	}
375 378
 
376 379
 	close(n.doneCh)
377 380
 }
... ...
@@ -391,6 +430,12 @@ func (n *Node) Run(ctx context.Context) error {
391 391
 	ctx = log.WithLogger(ctx, logrus.WithField("raft_id", fmt.Sprintf("%x", n.Config.ID)))
392 392
 	ctx, cancel := context.WithCancel(ctx)
393 393
 
394
+	for _, node := range n.bootstrapMembers {
395
+		if err := n.registerNode(node); err != nil {
396
+			log.G(ctx).WithError(err).Errorf("failed to register member %x", node.RaftID)
397
+		}
398
+	}
399
+
394 400
 	defer func() {
395 401
 		cancel()
396 402
 		n.stop(ctx)
... ...
@@ -414,7 +459,6 @@ func (n *Node) Run(ctx context.Context) error {
414 414
 		select {
415 415
 		case <-n.ticker.C():
416 416
 			n.raftNode.Tick()
417
-			n.cluster.Tick()
418 417
 		case rd := <-n.raftNode.Ready():
419 418
 			raftConfig := n.getCurrentRaftConfig()
420 419
 
... ...
@@ -423,10 +467,10 @@ func (n *Node) Run(ctx context.Context) error {
423 423
 				return errors.Wrap(err, "failed to save entries to storage")
424 424
 			}
425 425
 
426
-			if len(rd.Messages) != 0 {
426
+			for _, msg := range rd.Messages {
427 427
 				// Send raft messages to peers
428
-				if err := n.send(ctx, rd.Messages); err != nil {
429
-					log.G(ctx).WithError(err).Error("failed to send message to members")
428
+				if err := n.transport.Send(msg); err != nil {
429
+					log.G(ctx).WithError(err).Error("failed to send message to member")
430 430
 				}
431 431
 			}
432 432
 
... ...
@@ -435,8 +479,8 @@ func (n *Node) Run(ctx context.Context) error {
435 435
 			// saveToStorage.
436 436
 			if !raft.IsEmptySnap(rd.Snapshot) {
437 437
 				// Load the snapshot data into the store
438
-				if err := n.restoreFromSnapshot(rd.Snapshot.Data, false); err != nil {
439
-					log.G(ctx).WithError(err).Error("failed to restore from snapshot")
438
+				if err := n.restoreFromSnapshot(ctx, rd.Snapshot.Data); err != nil {
439
+					log.G(ctx).WithError(err).Error("failed to restore cluster from snapshot")
440 440
 				}
441 441
 				n.appliedIndex = rd.Snapshot.Metadata.Index
442 442
 				n.snapshotMeta = rd.Snapshot.Metadata
... ...
@@ -555,6 +599,40 @@ func (n *Node) Run(ctx context.Context) error {
555 555
 	}
556 556
 }
557 557
 
558
+func (n *Node) restoreFromSnapshot(ctx context.Context, data []byte) error {
559
+	snapCluster, err := n.clusterSnapshot(data)
560
+	if err != nil {
561
+		return err
562
+	}
563
+
564
+	oldMembers := n.cluster.Members()
565
+
566
+	for _, member := range snapCluster.Members {
567
+		delete(oldMembers, member.RaftID)
568
+	}
569
+
570
+	for _, removedMember := range snapCluster.Removed {
571
+		n.cluster.RemoveMember(removedMember)
572
+		if err := n.transport.RemovePeer(removedMember); err != nil {
573
+			log.G(ctx).WithError(err).Errorf("failed to remove peer %x from transport", removedMember)
574
+		}
575
+		delete(oldMembers, removedMember)
576
+	}
577
+
578
+	for id, member := range oldMembers {
579
+		n.cluster.ClearMember(id)
580
+		if err := n.transport.RemovePeer(member.RaftID); err != nil {
581
+			log.G(ctx).WithError(err).Errorf("failed to remove peer %x from transport", member.RaftID)
582
+		}
583
+	}
584
+	for _, node := range snapCluster.Members {
585
+		if err := n.registerNode(&api.RaftMember{RaftID: node.RaftID, NodeID: node.NodeID, Addr: node.Addr}); err != nil {
586
+			log.G(ctx).WithError(err).Error("failed to register node from snapshot")
587
+		}
588
+	}
589
+	return nil
590
+}
591
+
558 592
 func (n *Node) needsSnapshot(ctx context.Context) bool {
559 593
 	if n.waitForAppliedIndex == 0 && n.keyRotator.NeedsRotation() {
560 594
 		keys := n.keyRotator.GetKeys()
... ...
@@ -798,22 +876,27 @@ func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinRespons
798 798
 // checkHealth tries to contact an aspiring member through its advertised address
799 799
 // and checks if its raft server is running.
800 800
 func (n *Node) checkHealth(ctx context.Context, addr string, timeout time.Duration) error {
801
-	conn, err := n.ConnectToMember(addr, timeout)
801
+	conn, err := dial(addr, "tcp", n.opts.TLSCredentials, timeout)
802 802
 	if err != nil {
803 803
 		return err
804 804
 	}
805 805
 
806
+	defer conn.Close()
807
+
806 808
 	if timeout != 0 {
807 809
 		tctx, cancel := context.WithTimeout(ctx, timeout)
808 810
 		defer cancel()
809 811
 		ctx = tctx
810 812
 	}
811 813
 
812
-	defer conn.Conn.Close()
813
-
814
-	if err := conn.HealthCheck(ctx); err != nil {
814
+	healthClient := api.NewHealthClient(conn)
815
+	resp, err := healthClient.Check(ctx, &api.HealthCheckRequest{Service: "Raft"})
816
+	if err != nil {
815 817
 		return errors.Wrap(err, "could not connect to prospective new cluster member using its advertised address")
816 818
 	}
819
+	if resp.Status != api.HealthCheckResponse_SERVING {
820
+		return fmt.Errorf("health check returned status %s", resp.Status.String())
821
+	}
817 822
 
818 823
 	return nil
819 824
 }
... ...
@@ -841,11 +924,15 @@ func (n *Node) addMember(ctx context.Context, addr string, raftID uint64, nodeID
841 841
 	return n.configure(ctx, cc)
842 842
 }
843 843
 
844
-// updateMember submits a configuration change to change a member's address.
845
-func (n *Node) updateMember(ctx context.Context, addr string, raftID uint64, nodeID string) error {
844
+// updateNodeBlocking runs synchronous job to update node address in whole cluster.
845
+func (n *Node) updateNodeBlocking(ctx context.Context, id uint64, addr string) error {
846
+	m := n.cluster.GetMember(id)
847
+	if m == nil {
848
+		return errors.Errorf("member %x is not found for update", id)
849
+	}
846 850
 	node := api.RaftMember{
847
-		RaftID: raftID,
848
-		NodeID: nodeID,
851
+		RaftID: m.RaftID,
852
+		NodeID: m.NodeID,
849 853
 		Addr:   addr,
850 854
 	}
851 855
 
... ...
@@ -856,7 +943,7 @@ func (n *Node) updateMember(ctx context.Context, addr string, raftID uint64, nod
856 856
 
857 857
 	cc := raftpb.ConfChange{
858 858
 		Type:    raftpb.ConfChangeUpdateNode,
859
-		NodeID:  raftID,
859
+		NodeID:  id,
860 860
 		Context: meta,
861 861
 	}
862 862
 
... ...
@@ -864,6 +951,18 @@ func (n *Node) updateMember(ctx context.Context, addr string, raftID uint64, nod
864 864
 	return n.configure(ctx, cc)
865 865
 }
866 866
 
867
+// UpdateNode submits a configuration change to change a member's address.
868
+func (n *Node) UpdateNode(id uint64, addr string) {
869
+	ctx, cancel := n.WithContext(context.Background())
870
+	defer cancel()
871
+	// spawn updating info in raft in background to unblock transport
872
+	go func() {
873
+		if err := n.updateNodeBlocking(ctx, id, addr); err != nil {
874
+			log.G(ctx).WithFields(logrus.Fields{"raft_id": n.Config.ID, "update_id": id}).WithError(err).Error("failed to update member address in cluster")
875
+		}
876
+	}()
877
+}
878
+
867 879
 // Leave asks to a member of the raft to remove
868 880
 // us from the raft cluster. This method is called
869 881
 // from a member who is willing to leave its raft
... ...
@@ -897,7 +996,31 @@ func (n *Node) Leave(ctx context.Context, req *api.LeaveRequest) (*api.LeaveResp
897 897
 // CanRemoveMember checks if a member can be removed from
898 898
 // the context of the current node.
899 899
 func (n *Node) CanRemoveMember(id uint64) bool {
900
-	return n.cluster.CanRemoveMember(n.Config.ID, id)
900
+	members := n.cluster.Members()
901
+	nreachable := 0 // reachable managers after removal
902
+
903
+	for _, m := range members {
904
+		if m.RaftID == id {
905
+			continue
906
+		}
907
+
908
+		// Local node from where the remove is issued
909
+		if m.RaftID == n.Config.ID {
910
+			nreachable++
911
+			continue
912
+		}
913
+
914
+		if n.transport.Active(m.RaftID) {
915
+			nreachable++
916
+		}
917
+	}
918
+
919
+	nquorum := (len(members)-1)/2 + 1
920
+	if nreachable < nquorum {
921
+		return false
922
+	}
923
+
924
+	return true
901 925
 }
902 926
 
903 927
 func (n *Node) removeMember(ctx context.Context, id uint64) error {
... ...
@@ -915,7 +1038,7 @@ func (n *Node) removeMember(ctx context.Context, id uint64) error {
915 915
 
916 916
 	n.membershipLock.Lock()
917 917
 	defer n.membershipLock.Unlock()
918
-	if n.cluster.CanRemoveMember(n.Config.ID, id) {
918
+	if n.CanRemoveMember(id) {
919 919
 		cc := raftpb.ConfChange{
920 920
 			ID:      id,
921 921
 			Type:    raftpb.ConfChangeRemoveNode,
... ...
@@ -956,6 +1079,34 @@ func (n *Node) processRaftMessageLogger(ctx context.Context, msg *api.ProcessRaf
956 956
 	return log.G(ctx).WithFields(fields)
957 957
 }
958 958
 
959
+func (n *Node) reportNewAddress(ctx context.Context, id uint64) error {
960
+	// too early
961
+	if !n.IsMember() {
962
+		return nil
963
+	}
964
+	p, ok := peer.FromContext(ctx)
965
+	if !ok {
966
+		return nil
967
+	}
968
+	oldAddr, err := n.transport.PeerAddr(id)
969
+	if err != nil {
970
+		return err
971
+	}
972
+	newHost, _, err := net.SplitHostPort(p.Addr.String())
973
+	if err != nil {
974
+		return err
975
+	}
976
+	_, officialPort, err := net.SplitHostPort(oldAddr)
977
+	if err != nil {
978
+		return err
979
+	}
980
+	newAddr := net.JoinHostPort(newHost, officialPort)
981
+	if err := n.transport.UpdatePeerAddr(id, newAddr); err != nil {
982
+		return err
983
+	}
984
+	return nil
985
+}
986
+
959 987
 // ProcessRaftMessage calls 'Step' which advances the
960 988
 // raft state machine with the provided message on the
961 989
 // receiving node
... ...
@@ -969,32 +1120,25 @@ func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessa
969 969
 	// a node in the remove set
970 970
 	if n.cluster.IsIDRemoved(msg.Message.From) {
971 971
 		n.processRaftMessageLogger(ctx, msg).Debug("received message from removed member")
972
-		return nil, grpc.Errorf(codes.NotFound, "%s", ErrMemberRemoved.Error())
972
+		return nil, grpc.Errorf(codes.NotFound, "%s", membership.ErrMemberRemoved.Error())
973 973
 	}
974 974
 
975
-	var sourceHost string
976
-	peer, ok := peer.FromContext(ctx)
977
-	if ok {
978
-		sourceHost, _, _ = net.SplitHostPort(peer.Addr.String())
979
-	}
980
-
981
-	n.cluster.ReportActive(msg.Message.From, sourceHost)
982
-
983 975
 	ctx, cancel := n.WithContext(ctx)
984 976
 	defer cancel()
985 977
 
978
+	if err := n.reportNewAddress(ctx, msg.Message.From); err != nil {
979
+		log.G(ctx).WithError(err).Errorf("failed to report new address of %x to transport", msg.Message.From)
980
+	}
981
+
986 982
 	// Reject vote requests from unreachable peers
987 983
 	if msg.Message.Type == raftpb.MsgVote {
988 984
 		member := n.cluster.GetMember(msg.Message.From)
989
-		if member == nil || member.Conn == nil {
985
+		if member == nil {
990 986
 			n.processRaftMessageLogger(ctx, msg).Debug("received message from unknown member")
991 987
 			return &api.ProcessRaftMessageResponse{}, nil
992 988
 		}
993 989
 
994
-		healthCtx, cancel := context.WithTimeout(ctx, time.Duration(n.Config.ElectionTick)*n.opts.TickInterval)
995
-		defer cancel()
996
-
997
-		if err := member.HealthCheck(healthCtx); err != nil {
990
+		if err := n.transport.HealthCheck(ctx, msg.Message.From); err != nil {
998 991
 			n.processRaftMessageLogger(ctx, msg).WithError(err).Debug("member which sent vote request failed health check")
999 992
 			return &api.ProcessRaftMessageResponse{}, nil
1000 993
 		}
... ...
@@ -1064,17 +1208,11 @@ func (n *Node) getLeaderConn() (*grpc.ClientConn, error) {
1064 1064
 	if leader == n.Config.ID {
1065 1065
 		return nil, raftselector.ErrIsLeader
1066 1066
 	}
1067
-	l := n.cluster.GetMember(leader)
1068
-	if l == nil {
1069
-		return nil, errors.New("no leader found")
1070
-	}
1071
-	if !n.cluster.Active(leader) {
1072
-		return nil, errors.New("leader marked as inactive")
1073
-	}
1074
-	if l.Conn == nil {
1075
-		return nil, errors.New("no connection to leader in member list")
1067
+	conn, err := n.transport.PeerConn(leader)
1068
+	if err != nil {
1069
+		return nil, errors.Wrap(err, "failed to get connection to leader")
1076 1070
 	}
1077
-	return l.Conn, nil
1071
+	return conn, nil
1078 1072
 }
1079 1073
 
1080 1074
 // LeaderConn returns current connection to cluster leader or raftselector.ErrIsLeader
... ...
@@ -1122,8 +1260,12 @@ func (n *Node) registerNode(node *api.RaftMember) error {
1122 1122
 		// and are adding ourself now with the remotely-reachable
1123 1123
 		// address.
1124 1124
 		if existingMember.Addr != node.Addr {
1125
+			if node.RaftID != n.Config.ID {
1126
+				if err := n.transport.UpdatePeer(node.RaftID, node.Addr); err != nil {
1127
+					return err
1128
+				}
1129
+			}
1125 1130
 			member.RaftMember = node
1126
-			member.Conn = existingMember.Conn
1127 1131
 			n.cluster.AddMember(member)
1128 1132
 		}
1129 1133
 
... ...
@@ -1132,11 +1274,7 @@ func (n *Node) registerNode(node *api.RaftMember) error {
1132 1132
 
1133 1133
 	// Avoid opening a connection to the local node
1134 1134
 	if node.RaftID != n.Config.ID {
1135
-		// We don't want to impose a timeout on the grpc connection. It
1136
-		// should keep retrying as long as necessary, in case the peer
1137
-		// is temporarily unavailable.
1138
-		var err error
1139
-		if member, err = n.ConnectToMember(node.Addr, 0); err != nil {
1135
+		if err := n.transport.AddPeer(node.RaftID, node.Addr); err != nil {
1140 1136
 			return err
1141 1137
 		}
1142 1138
 	}
... ...
@@ -1144,8 +1282,8 @@ func (n *Node) registerNode(node *api.RaftMember) error {
1144 1144
 	member.RaftMember = node
1145 1145
 	err := n.cluster.AddMember(member)
1146 1146
 	if err != nil {
1147
-		if member.Conn != nil {
1148
-			_ = member.Conn.Close()
1147
+		if rerr := n.transport.RemovePeer(node.RaftID); rerr != nil {
1148
+			return errors.Wrapf(rerr, "failed to remove peer after error %v", err)
1149 1149
 		}
1150 1150
 		return err
1151 1151
 	}
... ...
@@ -1153,17 +1291,6 @@ func (n *Node) registerNode(node *api.RaftMember) error {
1153 1153
 	return nil
1154 1154
 }
1155 1155
 
1156
-// registerNodes registers a set of nodes in the cluster
1157
-func (n *Node) registerNodes(nodes []*api.RaftMember) error {
1158
-	for _, node := range nodes {
1159
-		if err := n.registerNode(node); err != nil {
1160
-			return err
1161
-		}
1162
-	}
1163
-
1164
-	return nil
1165
-}
1166
-
1167 1156
 // ProposeValue calls Propose on the raft and waits
1168 1157
 // on the commit log action before returning a result
1169 1158
 func (n *Node) ProposeValue(ctx context.Context, storeAction []*api.StoreAction, cb func()) error {
... ...
@@ -1209,7 +1336,7 @@ func (n *Node) GetMemberlist() map[uint64]*api.RaftMember {
1209 1209
 		leader := false
1210 1210
 
1211 1211
 		if member.RaftID != n.Config.ID {
1212
-			if !n.cluster.Active(member.RaftID) {
1212
+			if !n.transport.Active(member.RaftID) {
1213 1213
 				reachability = api.RaftMemberStatus_UNREACHABLE
1214 1214
 			}
1215 1215
 		}
... ...
@@ -1294,183 +1421,6 @@ func (n *Node) saveToStorage(
1294 1294
 	return nil
1295 1295
 }
1296 1296
 
1297
-// Sends a series of messages to members in the raft
1298
-func (n *Node) send(ctx context.Context, messages []raftpb.Message) error {
1299
-	members := n.cluster.Members()
1300
-
1301
-	n.stopMu.RLock()
1302
-	defer n.stopMu.RUnlock()
1303
-
1304
-	for _, m := range messages {
1305
-		// Process locally
1306
-		if m.To == n.Config.ID {
1307
-			if err := n.raftNode.Step(ctx, m); err != nil {
1308
-				return err
1309
-			}
1310
-			continue
1311
-		}
1312
-
1313
-		if m.Type == raftpb.MsgProp {
1314
-			// We don't forward proposals to the leader. Our
1315
-			// current architecture depends on only the leader
1316
-			// making proposals, so in-flight proposals can be
1317
-			// guaranteed not to conflict.
1318
-			continue
1319
-		}
1320
-
1321
-		ch := make(chan struct{})
1322
-
1323
-		n.asyncTasks.Add(1)
1324
-		go n.sendToMember(ctx, members, m, n.lastSendToMember[m.To], ch)
1325
-
1326
-		n.lastSendToMember[m.To] = ch
1327
-	}
1328
-
1329
-	return nil
1330
-}
1331
-
1332
-func (n *Node) sendToMember(ctx context.Context, members map[uint64]*membership.Member, m raftpb.Message, lastSend <-chan struct{}, thisSend chan<- struct{}) {
1333
-	defer n.asyncTasks.Done()
1334
-	defer close(thisSend)
1335
-
1336
-	if lastSend != nil {
1337
-		waitCtx, waitCancel := context.WithTimeout(ctx, n.opts.SendTimeout)
1338
-		defer waitCancel()
1339
-
1340
-		select {
1341
-		case <-lastSend:
1342
-		case <-waitCtx.Done():
1343
-			return
1344
-		}
1345
-
1346
-		select {
1347
-		case <-waitCtx.Done():
1348
-			return
1349
-		default:
1350
-		}
1351
-	}
1352
-
1353
-	ctx, cancel := context.WithTimeout(ctx, n.opts.SendTimeout)
1354
-	defer cancel()
1355
-
1356
-	if n.cluster.IsIDRemoved(m.To) {
1357
-		// Should not send to removed members
1358
-		return
1359
-	}
1360
-
1361
-	var conn *membership.Member
1362
-	if toMember, ok := members[m.To]; ok {
1363
-		conn = toMember
1364
-	} else {
1365
-		// If we are being asked to send to a member that's not in
1366
-		// our member list, that could indicate that the current leader
1367
-		// was added while we were offline. Try to resolve its address.
1368
-		log.G(ctx).Warningf("sending message to an unrecognized member ID %x", m.To)
1369
-
1370
-		// Choose a random member
1371
-		var (
1372
-			queryMember *membership.Member
1373
-			id          uint64
1374
-		)
1375
-		for id, queryMember = range members {
1376
-			if id != n.Config.ID {
1377
-				break
1378
-			}
1379
-		}
1380
-
1381
-		if queryMember == nil || queryMember.RaftID == n.Config.ID {
1382
-			log.G(ctx).Error("could not find cluster member to query for leader address")
1383
-			return
1384
-		}
1385
-
1386
-		resp, err := api.NewRaftClient(queryMember.Conn).ResolveAddress(ctx, &api.ResolveAddressRequest{RaftID: m.To})
1387
-		if err != nil {
1388
-			log.G(ctx).WithError(err).Errorf("could not resolve address of member ID %x", m.To)
1389
-			return
1390
-		}
1391
-		conn, err = n.ConnectToMember(resp.Addr, n.opts.SendTimeout)
1392
-		if err != nil {
1393
-			log.G(ctx).WithError(err).Errorf("could connect to member ID %x at %s", m.To, resp.Addr)
1394
-			return
1395
-		}
1396
-		// The temporary connection is only used for this message.
1397
-		// Eventually, we should catch up and add a long-lived
1398
-		// connection to the member list.
1399
-		defer conn.Conn.Close()
1400
-	}
1401
-
1402
-	_, err := api.NewRaftClient(conn.Conn).ProcessRaftMessage(ctx, &api.ProcessRaftMessageRequest{Message: &m})
1403
-	if err != nil {
1404
-		if grpc.Code(err) == codes.NotFound && grpc.ErrorDesc(err) == ErrMemberRemoved.Error() {
1405
-			n.removeRaftFunc()
1406
-		}
1407
-		if m.Type == raftpb.MsgSnap {
1408
-			n.raftNode.ReportSnapshot(m.To, raft.SnapshotFailure)
1409
-		}
1410
-		if !n.IsMember() {
1411
-			// node is removed from cluster or stopped
1412
-			return
1413
-		}
1414
-		n.raftNode.ReportUnreachable(m.To)
1415
-
1416
-		lastSeenHost := n.cluster.LastSeenHost(m.To)
1417
-		if lastSeenHost != "" {
1418
-			// Check if address has changed
1419
-			officialHost, officialPort, _ := net.SplitHostPort(conn.Addr)
1420
-			if officialHost != lastSeenHost {
1421
-				reconnectAddr := net.JoinHostPort(lastSeenHost, officialPort)
1422
-				log.G(ctx).Warningf("detected address change for %x (%s -> %s)", m.To, conn.Addr, reconnectAddr)
1423
-				if err := n.handleAddressChange(ctx, conn, reconnectAddr); err != nil {
1424
-					log.G(ctx).WithError(err).Error("failed to hande address change")
1425
-				}
1426
-				return
1427
-			}
1428
-		}
1429
-
1430
-		// Bounce the connection
1431
-		newConn, err := n.ConnectToMember(conn.Addr, 0)
1432
-		if err != nil {
1433
-			log.G(ctx).WithError(err).Errorf("could connect to member ID %x at %s", m.To, conn.Addr)
1434
-			return
1435
-		}
1436
-		err = n.cluster.ReplaceMemberConnection(m.To, conn, newConn, conn.Addr, false)
1437
-		if err != nil {
1438
-			log.G(ctx).WithError(err).Error("failed to replace connection to raft member")
1439
-			newConn.Conn.Close()
1440
-		}
1441
-	} else if m.Type == raftpb.MsgSnap {
1442
-		n.raftNode.ReportSnapshot(m.To, raft.SnapshotFinish)
1443
-	}
1444
-}
1445
-
1446
-func (n *Node) handleAddressChange(ctx context.Context, member *membership.Member, reconnectAddr string) error {
1447
-	newConn, err := n.ConnectToMember(reconnectAddr, 0)
1448
-	if err != nil {
1449
-		return errors.Wrapf(err, "could connect to member ID %x at observed address %s", member.RaftID, reconnectAddr)
1450
-	}
1451
-
1452
-	healthCtx, cancelHealth := context.WithTimeout(ctx, time.Duration(n.Config.ElectionTick)*n.opts.TickInterval)
1453
-	defer cancelHealth()
1454
-
1455
-	if err := newConn.HealthCheck(healthCtx); err != nil {
1456
-		return errors.Wrapf(err, "%x failed health check at observed address %s", member.RaftID, reconnectAddr)
1457
-	}
1458
-
1459
-	if err := n.cluster.ReplaceMemberConnection(member.RaftID, member, newConn, reconnectAddr, false); err != nil {
1460
-		newConn.Conn.Close()
1461
-		return errors.Wrap(err, "failed to replace connection to raft member")
1462
-	}
1463
-
1464
-	// If we're the leader, write the address change to raft
1465
-	updateCtx, cancelUpdate := context.WithTimeout(ctx, time.Duration(n.Config.ElectionTick)*n.opts.TickInterval)
1466
-	defer cancelUpdate()
1467
-	if err := n.updateMember(updateCtx, reconnectAddr, member.RaftID, member.NodeID); err != nil {
1468
-		return errors.Wrap(err, "failed to update member address in raft")
1469
-	}
1470
-
1471
-	return nil
1472
-}
1473
-
1474 1297
 // processInternalRaftRequest sends a message to nodes participating
1475 1298
 // in the raft to apply a log entry and then waits for it to be applied
1476 1299
 // on the server. It will block until the update is performed, there is
... ...
@@ -1681,32 +1631,13 @@ func (n *Node) applyUpdateNode(ctx context.Context, cc raftpb.ConfChange) error
1681 1681
 		return err
1682 1682
 	}
1683 1683
 
1684
-	oldMember := n.cluster.GetMember(newMember.RaftID)
1685
-
1686
-	if oldMember == nil {
1687
-		return ErrMemberUnknown
1688
-	}
1689
-	if oldMember.NodeID != newMember.NodeID {
1690
-		// Should never happen; this is a sanity check
1691
-		log.G(ctx).Errorf("node ID mismatch on node update (old: %x, new: %x)", oldMember.NodeID, newMember.NodeID)
1692
-		return errors.New("node ID mismatch match on node update")
1693
-	}
1694
-
1695
-	if oldMember.Addr == newMember.Addr || oldMember.Conn == nil {
1696
-		// nothing to do
1684
+	if newMember.RaftID == n.Config.ID {
1697 1685
 		return nil
1698 1686
 	}
1699
-
1700
-	newConn, err := n.ConnectToMember(newMember.Addr, 0)
1701
-	if err != nil {
1702
-		return errors.Errorf("could connect to member ID %x at %s: %v", newMember.RaftID, newMember.Addr, err)
1703
-	}
1704
-	if err := n.cluster.ReplaceMemberConnection(newMember.RaftID, oldMember, newConn, newMember.Addr, true); err != nil {
1705
-		newConn.Conn.Close()
1687
+	if err := n.transport.UpdatePeer(newMember.RaftID, newMember.Addr); err != nil {
1706 1688
 		return err
1707 1689
 	}
1708
-
1709
-	return nil
1690
+	return n.cluster.UpdateMember(newMember.RaftID, newMember)
1710 1691
 }
1711 1692
 
1712 1693
 // applyRemoveNode is called when we receive a ConfChange
... ...
@@ -1724,11 +1655,11 @@ func (n *Node) applyRemoveNode(ctx context.Context, cc raftpb.ConfChange) (err e
1724 1724
 	}
1725 1725
 
1726 1726
 	if cc.NodeID == n.Config.ID {
1727
+
1727 1728
 		// wait the commit ack to be sent before closing connection
1728 1729
 		n.asyncTasks.Wait()
1729 1730
 
1730
-		n.removeRaftFunc()
1731
-
1731
+		n.NodeRemoved()
1732 1732
 		// if there are only 2 nodes in the cluster, and leader is leaving
1733 1733
 		// before closing the connection, leader has to ensure that follower gets
1734 1734
 		// noticed about this raft conf change commit. Otherwise, follower would
... ...
@@ -1738,24 +1669,15 @@ func (n *Node) applyRemoveNode(ctx context.Context, cc raftpb.ConfChange) (err e
1738 1738
 		// while n.asyncTasks.Wait() could be helpful in this case
1739 1739
 		// it's the best-effort strategy, because this send could be fail due to some errors (such as time limit exceeds)
1740 1740
 		// TODO(Runshen Zhu): use leadership transfer to solve this case, after vendoring raft 3.0+
1741
+	} else {
1742
+		if err := n.transport.RemovePeer(cc.NodeID); err != nil {
1743
+			return err
1744
+		}
1741 1745
 	}
1742 1746
 
1743 1747
 	return n.cluster.RemoveMember(cc.NodeID)
1744 1748
 }
1745 1749
 
1746
-// ConnectToMember returns a member object with an initialized
1747
-// connection to communicate with other raft members
1748
-func (n *Node) ConnectToMember(addr string, timeout time.Duration) (*membership.Member, error) {
1749
-	conn, err := dial(addr, "tcp", n.opts.TLSCredentials, timeout)
1750
-	if err != nil {
1751
-		return nil, err
1752
-	}
1753
-
1754
-	return &membership.Member{
1755
-		Conn: conn,
1756
-	}, nil
1757
-}
1758
-
1759 1750
 // SubscribeLeadership returns channel to which events about leadership change
1760 1751
 // will be sent in form of raft.LeadershipState. Also cancel func is returned -
1761 1752
 // it should be called when listener is no longer interested in events.
... ...
@@ -60,10 +60,26 @@ func (n *Node) loadAndStart(ctx context.Context, forceNewCluster bool) error {
60 60
 	n.Config.ID = raftNode.RaftID
61 61
 
62 62
 	if snapshot != nil {
63
-		// Load the snapshot data into the store
64
-		if err := n.restoreFromSnapshot(snapshot.Data, forceNewCluster); err != nil {
63
+		snapCluster, err := n.clusterSnapshot(snapshot.Data)
64
+		if err != nil {
65 65
 			return err
66 66
 		}
67
+		var bootstrapMembers []*api.RaftMember
68
+		if forceNewCluster {
69
+			for _, m := range snapCluster.Members {
70
+				if m.RaftID != n.Config.ID {
71
+					n.cluster.RemoveMember(m.RaftID)
72
+					continue
73
+				}
74
+				bootstrapMembers = append(bootstrapMembers, m)
75
+			}
76
+		} else {
77
+			bootstrapMembers = snapCluster.Members
78
+		}
79
+		n.bootstrapMembers = bootstrapMembers
80
+		for _, removedMember := range snapCluster.Removed {
81
+			n.cluster.RemoveMember(removedMember)
82
+		}
67 83
 	}
68 84
 
69 85
 	ents, st := waldata.Entries, waldata.HardState
... ...
@@ -215,40 +231,18 @@ func (n *Node) doSnapshot(ctx context.Context, raftConfig api.RaftConfig) {
215 215
 	<-viewStarted
216 216
 }
217 217
 
218
-func (n *Node) restoreFromSnapshot(data []byte, forceNewCluster bool) error {
218
+func (n *Node) clusterSnapshot(data []byte) (api.ClusterSnapshot, error) {
219 219
 	var snapshot api.Snapshot
220 220
 	if err := snapshot.Unmarshal(data); err != nil {
221
-		return err
221
+		return snapshot.Membership, err
222 222
 	}
223 223
 	if snapshot.Version != api.Snapshot_V0 {
224
-		return fmt.Errorf("unrecognized snapshot version %d", snapshot.Version)
224
+		return snapshot.Membership, fmt.Errorf("unrecognized snapshot version %d", snapshot.Version)
225 225
 	}
226 226
 
227 227
 	if err := n.memoryStore.Restore(&snapshot.Store); err != nil {
228
-		return err
229
-	}
230
-
231
-	oldMembers := n.cluster.Members()
232
-
233
-	for _, member := range snapshot.Membership.Members {
234
-		if forceNewCluster && member.RaftID != n.Config.ID {
235
-			n.cluster.RemoveMember(member.RaftID)
236
-		} else {
237
-			if err := n.registerNode(&api.RaftMember{RaftID: member.RaftID, NodeID: member.NodeID, Addr: member.Addr}); err != nil {
238
-				return err
239
-			}
240
-		}
241
-		delete(oldMembers, member.RaftID)
242
-	}
243
-
244
-	for _, removedMember := range snapshot.Membership.Removed {
245
-		n.cluster.RemoveMember(removedMember)
246
-		delete(oldMembers, removedMember)
247
-	}
248
-
249
-	for member := range oldMembers {
250
-		n.cluster.ClearMember(member)
228
+		return snapshot.Membership, err
251 229
 	}
252 230
 
253
-	return nil
231
+	return snapshot.Membership, nil
254 232
 }
255 233
new file mode 100644
... ...
@@ -0,0 +1,299 @@
0
+package transport
1
+
2
+import (
3
+	"fmt"
4
+	"sync"
5
+	"time"
6
+
7
+	"golang.org/x/net/context"
8
+
9
+	"google.golang.org/grpc"
10
+	"google.golang.org/grpc/codes"
11
+
12
+	"github.com/coreos/etcd/raft"
13
+	"github.com/coreos/etcd/raft/raftpb"
14
+	"github.com/docker/swarmkit/api"
15
+	"github.com/docker/swarmkit/log"
16
+	"github.com/docker/swarmkit/manager/state/raft/membership"
17
+	"github.com/pkg/errors"
18
+)
19
+
20
+type peer struct {
21
+	id uint64
22
+
23
+	tr *Transport
24
+
25
+	msgc chan raftpb.Message
26
+
27
+	ctx    context.Context
28
+	cancel context.CancelFunc
29
+	done   chan struct{}
30
+
31
+	mu      sync.Mutex
32
+	cc      *grpc.ClientConn
33
+	addr    string
34
+	newAddr string
35
+
36
+	active       bool
37
+	becameActive time.Time
38
+}
39
+
40
+func newPeer(id uint64, addr string, tr *Transport) (*peer, error) {
41
+	cc, err := tr.dial(addr)
42
+	if err != nil {
43
+		return nil, errors.Wrapf(err, "failed to create conn for %x with addr %s", id, addr)
44
+	}
45
+	ctx, cancel := context.WithCancel(tr.ctx)
46
+	ctx = log.WithField(ctx, "peer_id", fmt.Sprintf("%x", id))
47
+	p := &peer{
48
+		id:     id,
49
+		addr:   addr,
50
+		cc:     cc,
51
+		tr:     tr,
52
+		ctx:    ctx,
53
+		cancel: cancel,
54
+		msgc:   make(chan raftpb.Message, 4096),
55
+		done:   make(chan struct{}),
56
+	}
57
+	go p.run(ctx)
58
+	return p, nil
59
+}
60
+
61
+func (p *peer) send(m raftpb.Message) (err error) {
62
+	p.mu.Lock()
63
+	defer func() {
64
+		if err != nil {
65
+			p.active = false
66
+			p.becameActive = time.Time{}
67
+		}
68
+		p.mu.Unlock()
69
+	}()
70
+	select {
71
+	case <-p.ctx.Done():
72
+		return p.ctx.Err()
73
+	default:
74
+	}
75
+	select {
76
+	case p.msgc <- m:
77
+	case <-p.ctx.Done():
78
+		return p.ctx.Err()
79
+	default:
80
+		p.tr.config.ReportUnreachable(p.id)
81
+		return errors.Errorf("peer is unreachable")
82
+	}
83
+	return nil
84
+}
85
+
86
+func (p *peer) update(addr string) error {
87
+	p.mu.Lock()
88
+	defer p.mu.Unlock()
89
+	if p.addr == addr {
90
+		return nil
91
+	}
92
+	cc, err := p.tr.dial(addr)
93
+	if err != nil {
94
+		return err
95
+	}
96
+
97
+	p.cc.Close()
98
+	p.cc = cc
99
+	p.addr = addr
100
+	return nil
101
+}
102
+
103
+func (p *peer) updateAddr(addr string) error {
104
+	p.mu.Lock()
105
+	defer p.mu.Unlock()
106
+	if p.addr == addr {
107
+		return nil
108
+	}
109
+	log.G(p.ctx).Debugf("peer %x updated to address %s, it will be used if old failed", p.id, addr)
110
+	p.newAddr = addr
111
+	return nil
112
+}
113
+
114
+func (p *peer) conn() *grpc.ClientConn {
115
+	p.mu.Lock()
116
+	defer p.mu.Unlock()
117
+	return p.cc
118
+}
119
+
120
+func (p *peer) address() string {
121
+	p.mu.Lock()
122
+	defer p.mu.Unlock()
123
+	return p.addr
124
+}
125
+
126
+func (p *peer) resolveAddr(ctx context.Context, id uint64) (string, error) {
127
+	resp, err := api.NewRaftClient(p.conn()).ResolveAddress(ctx, &api.ResolveAddressRequest{RaftID: id})
128
+	if err != nil {
129
+		return "", errors.Wrap(err, "failed to resolve address")
130
+	}
131
+	return resp.Addr, nil
132
+}
133
+
134
+func (p *peer) reportSnapshot(failure bool) {
135
+	if failure {
136
+		p.tr.config.ReportSnapshot(p.id, raft.SnapshotFailure)
137
+		return
138
+	}
139
+	p.tr.config.ReportSnapshot(p.id, raft.SnapshotFinish)
140
+}
141
+
142
+func (p *peer) sendProcessMessage(ctx context.Context, m raftpb.Message) error {
143
+	ctx, cancel := context.WithTimeout(ctx, p.tr.config.SendTimeout)
144
+	defer cancel()
145
+	_, err := api.NewRaftClient(p.conn()).ProcessRaftMessage(ctx, &api.ProcessRaftMessageRequest{Message: &m})
146
+	if grpc.Code(err) == codes.NotFound && grpc.ErrorDesc(err) == membership.ErrMemberRemoved.Error() {
147
+		p.tr.config.NodeRemoved()
148
+	}
149
+	if m.Type == raftpb.MsgSnap {
150
+		if err != nil {
151
+			p.tr.config.ReportSnapshot(m.To, raft.SnapshotFailure)
152
+		} else {
153
+		}
154
+	}
155
+	p.reportSnapshot(err != nil)
156
+	if err != nil {
157
+		p.tr.config.ReportUnreachable(m.To)
158
+		return err
159
+	}
160
+	return nil
161
+}
162
+
163
+func healthCheckConn(ctx context.Context, cc *grpc.ClientConn) error {
164
+	resp, err := api.NewHealthClient(cc).Check(ctx, &api.HealthCheckRequest{Service: "Raft"})
165
+	if err != nil {
166
+		return errors.Wrap(err, "failed to check health")
167
+	}
168
+	if resp.Status != api.HealthCheckResponse_SERVING {
169
+		return errors.Errorf("health check returned status %s", resp.Status)
170
+	}
171
+	return nil
172
+}
173
+
174
+func (p *peer) healthCheck(ctx context.Context) error {
175
+	ctx, cancel := context.WithTimeout(ctx, p.tr.config.SendTimeout)
176
+	defer cancel()
177
+	return healthCheckConn(ctx, p.conn())
178
+}
179
+
180
+func (p *peer) setActive() {
181
+	p.mu.Lock()
182
+	if !p.active {
183
+		p.active = true
184
+		p.becameActive = time.Now()
185
+	}
186
+	p.mu.Unlock()
187
+}
188
+
189
+func (p *peer) setInactive() {
190
+	p.mu.Lock()
191
+	p.active = false
192
+	p.becameActive = time.Time{}
193
+	p.mu.Unlock()
194
+}
195
+
196
+func (p *peer) activeTime() time.Time {
197
+	p.mu.Lock()
198
+	defer p.mu.Unlock()
199
+	return p.becameActive
200
+}
201
+
202
+func (p *peer) drain() error {
203
+	ctx, cancel := context.WithTimeout(context.Background(), 16*time.Second)
204
+	defer cancel()
205
+	for {
206
+		select {
207
+		case m, ok := <-p.msgc:
208
+			if !ok {
209
+				// all messages proceeded
210
+				return nil
211
+			}
212
+			if err := p.sendProcessMessage(ctx, m); err != nil {
213
+				return errors.Wrap(err, "send drain message")
214
+			}
215
+		case <-ctx.Done():
216
+			return ctx.Err()
217
+		}
218
+	}
219
+}
220
+
221
+func (p *peer) handleAddressChange(ctx context.Context) error {
222
+	p.mu.Lock()
223
+	newAddr := p.newAddr
224
+	p.newAddr = ""
225
+	p.mu.Unlock()
226
+	if newAddr == "" {
227
+		return nil
228
+	}
229
+	cc, err := p.tr.dial(newAddr)
230
+	if err != nil {
231
+		return err
232
+	}
233
+	ctx, cancel := context.WithTimeout(ctx, p.tr.config.SendTimeout)
234
+	defer cancel()
235
+	if err := healthCheckConn(ctx, cc); err != nil {
236
+		cc.Close()
237
+		return err
238
+	}
239
+	// there is possibility of race if host changing address too fast, but
240
+	// it's unlikely and eventually thing should be settled
241
+	p.mu.Lock()
242
+	p.cc.Close()
243
+	p.cc = cc
244
+	p.addr = newAddr
245
+	p.tr.config.UpdateNode(p.id, p.addr)
246
+	p.mu.Unlock()
247
+	return nil
248
+}
249
+
250
+func (p *peer) run(ctx context.Context) {
251
+	defer func() {
252
+		p.mu.Lock()
253
+		p.active = false
254
+		p.becameActive = time.Time{}
255
+		// at this point we can be sure that nobody will write to msgc
256
+		if p.msgc != nil {
257
+			close(p.msgc)
258
+		}
259
+		p.mu.Unlock()
260
+		if err := p.drain(); err != nil {
261
+			log.G(ctx).WithError(err).Error("failed to drain message queue")
262
+		}
263
+		close(p.done)
264
+	}()
265
+	if err := p.healthCheck(ctx); err == nil {
266
+		p.setActive()
267
+	}
268
+	for {
269
+		select {
270
+		case <-ctx.Done():
271
+			return
272
+		default:
273
+		}
274
+
275
+		select {
276
+		case m := <-p.msgc:
277
+			// we do not propagate context here, because this operation should be finished
278
+			// or timed out for correct raft work.
279
+			err := p.sendProcessMessage(context.Background(), m)
280
+			if err != nil {
281
+				log.G(ctx).WithError(err).Debugf("failed to send message %s", m.Type)
282
+				p.setInactive()
283
+				if err := p.handleAddressChange(ctx); err != nil {
284
+					log.G(ctx).WithError(err).Error("failed to change address after failure")
285
+				}
286
+				continue
287
+			}
288
+			p.setActive()
289
+		case <-ctx.Done():
290
+			return
291
+		}
292
+	}
293
+}
294
+
295
+func (p *peer) stop() {
296
+	p.cancel()
297
+	<-p.done
298
+}
0 299
new file mode 100644
... ...
@@ -0,0 +1,382 @@
0
+// Package transport provides grpc transport layer for raft.
1
+// All methods are non-blocking.
2
+package transport
3
+
4
+import (
5
+	"sync"
6
+	"time"
7
+
8
+	"golang.org/x/net/context"
9
+
10
+	"google.golang.org/grpc"
11
+	"google.golang.org/grpc/credentials"
12
+
13
+	"github.com/coreos/etcd/raft"
14
+	"github.com/coreos/etcd/raft/raftpb"
15
+	"github.com/docker/swarmkit/log"
16
+	"github.com/pkg/errors"
17
+)
18
+
19
+// ErrIsNotFound indicates that peer was never added to transport.
20
+var ErrIsNotFound = errors.New("peer not found")
21
+
22
+// Raft is interface which represents Raft API for transport package.
23
+type Raft interface {
24
+	ReportUnreachable(id uint64)
25
+	ReportSnapshot(id uint64, status raft.SnapshotStatus)
26
+	IsIDRemoved(id uint64) bool
27
+	UpdateNode(id uint64, addr string)
28
+
29
+	NodeRemoved()
30
+}
31
+
32
+// Config for Transport
33
+type Config struct {
34
+	HeartbeatInterval time.Duration
35
+	SendTimeout       time.Duration
36
+	Credentials       credentials.TransportCredentials
37
+	RaftID            string
38
+
39
+	Raft
40
+}
41
+
42
+// Transport is structure which manages remote raft peers and sends messages
43
+// to them.
44
+type Transport struct {
45
+	config *Config
46
+
47
+	unknownc chan raftpb.Message
48
+
49
+	mu      sync.Mutex
50
+	peers   map[uint64]*peer
51
+	stopped bool
52
+
53
+	ctx    context.Context
54
+	cancel context.CancelFunc
55
+	done   chan struct{}
56
+
57
+	deferredConns map[*grpc.ClientConn]*time.Timer
58
+}
59
+
60
+// New returns new Transport with specified Config.
61
+func New(cfg *Config) *Transport {
62
+	ctx, cancel := context.WithCancel(context.Background())
63
+	if cfg.RaftID != "" {
64
+		ctx = log.WithField(ctx, "raft_id", cfg.RaftID)
65
+	}
66
+	t := &Transport{
67
+		peers:    make(map[uint64]*peer),
68
+		config:   cfg,
69
+		unknownc: make(chan raftpb.Message),
70
+		done:     make(chan struct{}),
71
+		ctx:      ctx,
72
+		cancel:   cancel,
73
+
74
+		deferredConns: make(map[*grpc.ClientConn]*time.Timer),
75
+	}
76
+	go t.run(ctx)
77
+	return t
78
+}
79
+
80
+func (t *Transport) run(ctx context.Context) {
81
+	defer func() {
82
+		log.G(ctx).Debug("stop transport")
83
+		t.mu.Lock()
84
+		defer t.mu.Unlock()
85
+		t.stopped = true
86
+		for _, p := range t.peers {
87
+			p.stop()
88
+			p.cc.Close()
89
+		}
90
+		for cc, timer := range t.deferredConns {
91
+			timer.Stop()
92
+			cc.Close()
93
+		}
94
+		t.deferredConns = nil
95
+		close(t.done)
96
+	}()
97
+	for {
98
+		select {
99
+		case <-ctx.Done():
100
+			return
101
+		default:
102
+		}
103
+
104
+		select {
105
+		case m := <-t.unknownc:
106
+			if err := t.sendUnknownMessage(ctx, m); err != nil {
107
+				log.G(ctx).WithError(err).Warnf("ignored message %s to unknown peer %x", m.Type, m.To)
108
+			}
109
+		case <-ctx.Done():
110
+			return
111
+		}
112
+	}
113
+}
114
+
115
+// Stop stops transport and waits until it finished
116
+func (t *Transport) Stop() {
117
+	t.cancel()
118
+	<-t.done
119
+}
120
+
121
+// Send sends raft message to remote peers.
122
+func (t *Transport) Send(m raftpb.Message) error {
123
+	t.mu.Lock()
124
+	defer t.mu.Unlock()
125
+	if t.stopped {
126
+		return errors.New("transport stopped")
127
+	}
128
+	if t.config.IsIDRemoved(m.To) {
129
+		return errors.Errorf("refusing to send message %s to removed member %x", m.Type, m.To)
130
+	}
131
+	p, ok := t.peers[m.To]
132
+	if !ok {
133
+		log.G(t.ctx).Warningf("sending message %s to an unrecognized member ID %x", m.Type, m.To)
134
+		select {
135
+		// we need to process messages to unknown peers in separate goroutine
136
+		// to not block sender
137
+		case t.unknownc <- m:
138
+		case <-t.ctx.Done():
139
+			return t.ctx.Err()
140
+		default:
141
+			return errors.New("unknown messages queue is full")
142
+		}
143
+		return nil
144
+	}
145
+	if err := p.send(m); err != nil {
146
+		return errors.Wrapf(err, "failed to send message %x to %x", m.Type, m.To)
147
+	}
148
+	return nil
149
+}
150
+
151
+// AddPeer adds new peer with id and address addr to Transport.
152
+// If there is already peer with such id in Transport it will return error if
153
+// address is different (UpdatePeer should be used) or nil otherwise.
154
+func (t *Transport) AddPeer(id uint64, addr string) error {
155
+	t.mu.Lock()
156
+	defer t.mu.Unlock()
157
+	if t.stopped {
158
+		return errors.New("transport stopped")
159
+	}
160
+	if ep, ok := t.peers[id]; ok {
161
+		if ep.address() == addr {
162
+			return nil
163
+		}
164
+		return errors.Errorf("peer %x already added with addr %s", id, ep.addr)
165
+	}
166
+	log.G(t.ctx).Debugf("transport: add peer %x with address %s", id, addr)
167
+	p, err := newPeer(id, addr, t)
168
+	if err != nil {
169
+		return errors.Wrapf(err, "failed to create peer %x with addr %s", id, addr)
170
+	}
171
+	t.peers[id] = p
172
+	return nil
173
+}
174
+
175
+// RemovePeer removes peer from Transport and wait for it to stop.
176
+func (t *Transport) RemovePeer(id uint64) error {
177
+	t.mu.Lock()
178
+	defer t.mu.Unlock()
179
+
180
+	if t.stopped {
181
+		return errors.New("transport stopped")
182
+	}
183
+	p, ok := t.peers[id]
184
+	if !ok {
185
+		return ErrIsNotFound
186
+	}
187
+	delete(t.peers, id)
188
+	cc := p.conn()
189
+	p.stop()
190
+	timer := time.AfterFunc(8*time.Second, func() {
191
+		t.mu.Lock()
192
+		if !t.stopped {
193
+			delete(t.deferredConns, cc)
194
+			cc.Close()
195
+		}
196
+		t.mu.Unlock()
197
+	})
198
+	// store connection and timer for cleaning up on stop
199
+	t.deferredConns[cc] = timer
200
+
201
+	return nil
202
+}
203
+
204
+// UpdatePeer updates peer with new address. It replaces connection immediately.
205
+func (t *Transport) UpdatePeer(id uint64, addr string) error {
206
+	t.mu.Lock()
207
+	defer t.mu.Unlock()
208
+
209
+	if t.stopped {
210
+		return errors.New("transport stopped")
211
+	}
212
+	p, ok := t.peers[id]
213
+	if !ok {
214
+		return ErrIsNotFound
215
+	}
216
+	if err := p.update(addr); err != nil {
217
+		return err
218
+	}
219
+	log.G(t.ctx).Debugf("peer %x updated to address %s", id, addr)
220
+	return nil
221
+}
222
+
223
+// UpdatePeerAddr updates peer with new address, but delays connection creation.
224
+// New address won't be used until first failure on old address.
225
+func (t *Transport) UpdatePeerAddr(id uint64, addr string) error {
226
+	t.mu.Lock()
227
+	defer t.mu.Unlock()
228
+
229
+	if t.stopped {
230
+		return errors.New("transport stopped")
231
+	}
232
+	p, ok := t.peers[id]
233
+	if !ok {
234
+		return ErrIsNotFound
235
+	}
236
+	if err := p.updateAddr(addr); err != nil {
237
+		return err
238
+	}
239
+	return nil
240
+}
241
+
242
+// PeerConn returns raw grpc connection to peer.
243
+func (t *Transport) PeerConn(id uint64) (*grpc.ClientConn, error) {
244
+	t.mu.Lock()
245
+	defer t.mu.Unlock()
246
+	p, ok := t.peers[id]
247
+	if !ok {
248
+		return nil, ErrIsNotFound
249
+	}
250
+	p.mu.Lock()
251
+	active := p.active
252
+	p.mu.Unlock()
253
+	if !active {
254
+		return nil, errors.New("peer is inactive")
255
+	}
256
+	return p.conn(), nil
257
+}
258
+
259
+// PeerAddr returns address of peer with id.
260
+func (t *Transport) PeerAddr(id uint64) (string, error) {
261
+	t.mu.Lock()
262
+	defer t.mu.Unlock()
263
+	p, ok := t.peers[id]
264
+	if !ok {
265
+		return "", ErrIsNotFound
266
+	}
267
+	return p.address(), nil
268
+}
269
+
270
+// HealthCheck checks health of particular peer.
271
+func (t *Transport) HealthCheck(ctx context.Context, id uint64) error {
272
+	t.mu.Lock()
273
+	p, ok := t.peers[id]
274
+	t.mu.Unlock()
275
+	if !ok {
276
+		return ErrIsNotFound
277
+	}
278
+	ctx, cancel := t.withContext(ctx)
279
+	defer cancel()
280
+	return p.healthCheck(ctx)
281
+}
282
+
283
+// Active returns true if node was recently active and false otherwise.
284
+func (t *Transport) Active(id uint64) bool {
285
+	t.mu.Lock()
286
+	defer t.mu.Unlock()
287
+	p, ok := t.peers[id]
288
+	if !ok {
289
+		return false
290
+	}
291
+	p.mu.Lock()
292
+	active := p.active
293
+	p.mu.Unlock()
294
+	return active
295
+}
296
+
297
+func (t *Transport) longestActive() (*peer, error) {
298
+	var longest *peer
299
+	var longestTime time.Time
300
+	t.mu.Lock()
301
+	defer t.mu.Unlock()
302
+	for _, p := range t.peers {
303
+		becameActive := p.activeTime()
304
+		if becameActive.IsZero() {
305
+			continue
306
+		}
307
+		if longest == nil {
308
+			longest = p
309
+			continue
310
+		}
311
+		if becameActive.Before(longestTime) {
312
+			longest = p
313
+			longestTime = becameActive
314
+		}
315
+	}
316
+	if longest == nil {
317
+		return nil, errors.New("failed to find longest active peer")
318
+	}
319
+	return longest, nil
320
+}
321
+
322
+func (t *Transport) dial(addr string) (*grpc.ClientConn, error) {
323
+	grpcOptions := []grpc.DialOption{
324
+		grpc.WithBackoffMaxDelay(8 * time.Second),
325
+	}
326
+	if t.config.Credentials != nil {
327
+		grpcOptions = append(grpcOptions, grpc.WithTransportCredentials(t.config.Credentials))
328
+	} else {
329
+		grpcOptions = append(grpcOptions, grpc.WithInsecure())
330
+	}
331
+
332
+	if t.config.SendTimeout > 0 {
333
+		grpcOptions = append(grpcOptions, grpc.WithTimeout(t.config.SendTimeout))
334
+	}
335
+
336
+	cc, err := grpc.Dial(addr, grpcOptions...)
337
+	if err != nil {
338
+		return nil, err
339
+	}
340
+
341
+	return cc, nil
342
+}
343
+
344
+func (t *Transport) withContext(ctx context.Context) (context.Context, context.CancelFunc) {
345
+	ctx, cancel := context.WithCancel(ctx)
346
+
347
+	go func() {
348
+		select {
349
+		case <-ctx.Done():
350
+		case <-t.ctx.Done():
351
+			cancel()
352
+		}
353
+	}()
354
+	return ctx, cancel
355
+}
356
+
357
+func (t *Transport) resolvePeer(ctx context.Context, id uint64) (*peer, error) {
358
+	longestActive, err := t.longestActive()
359
+	if err != nil {
360
+		return nil, err
361
+	}
362
+	ctx, cancel := context.WithTimeout(ctx, t.config.SendTimeout)
363
+	defer cancel()
364
+	addr, err := longestActive.resolveAddr(ctx, id)
365
+	if err != nil {
366
+		return nil, err
367
+	}
368
+	return newPeer(id, addr, t)
369
+}
370
+
371
+func (t *Transport) sendUnknownMessage(ctx context.Context, m raftpb.Message) error {
372
+	p, err := t.resolvePeer(ctx, m.To)
373
+	if err != nil {
374
+		return errors.Wrapf(err, "failed to resolve peer")
375
+	}
376
+	defer p.cancel()
377
+	if err := p.sendProcessMessage(ctx, m); err != nil {
378
+		return errors.Wrapf(err, "failed to send message")
379
+	}
380
+	return nil
381
+}
... ...
@@ -14,6 +14,7 @@ import (
14 14
 
15 15
 	"github.com/Sirupsen/logrus"
16 16
 	"github.com/boltdb/bolt"
17
+	"github.com/docker/docker/pkg/plugingetter"
17 18
 	"github.com/docker/swarmkit/agent"
18 19
 	"github.com/docker/swarmkit/agent/exec"
19 20
 	"github.com/docker/swarmkit/api"
... ...
@@ -98,6 +99,9 @@ type Config struct {
98 98
 
99 99
 	// Availability allows a user to control the current scheduling status of a node
100 100
 	Availability api.NodeSpec_Availability
101
+
102
+	// PluginGetter provides access to docker's plugin inventory.
103
+	PluginGetter plugingetter.PluginGetter
101 104
 }
102 105
 
103 106
 // Node implements the primary node functionality for a member of a swarm
... ...
@@ -683,6 +687,7 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig
683 683
 		AutoLockManagers: n.config.AutoLockManagers,
684 684
 		UnlockKey:        n.unlockKey,
685 685
 		Availability:     n.config.Availability,
686
+		PluginGetter:     n.config.PluginGetter,
686 687
 	})
687 688
 	if err != nil {
688 689
 		return err
... ...
@@ -91,9 +91,9 @@ func (mwr *remotesWeightedRandom) Select(excludes ...string) (api.Peer, error) {
91 91
 
92 92
 	// https://github.com/LK4D4/sample
93 93
 	//
94
-	// The first link applies exponential distribution weight choice reservior
94
+	// The first link applies exponential distribution weight choice reservoir
95 95
 	// sampling. This may be relevant if we view the master selection as a
96
-	// distributed reservior sampling problem.
96
+	// distributed reservoir sampling problem.
97 97
 
98 98
 	// bias to zero-weighted remotes have same probability. otherwise, we
99 99
 	// always select first entry when all are zero.