Browse code

Merge pull request #31143 from aaronlehmann/vendor-swarmkit-caa9c95

[1.13] Vendor swarmkit 30a4278

Victor Vieux authored on 2017/02/19 17:46:21
Showing 4 changed files
... ...
@@ -101,7 +101,7 @@ github.com/docker/containerd aa8187dbd3b7ad67d8e5e3a15115d3eef43a7ed1
101 101
 github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4
102 102
 
103 103
 # cluster
104
-github.com/docker/swarmkit c7df892262aa0bec0a3e52ea76219b7b364ded38
104
+github.com/docker/swarmkit 30a4278953316a0abd88d35c8d6600ff5add2733
105 105
 github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
106 106
 github.com/gogo/protobuf v0.3
107 107
 github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a
... ...
@@ -156,40 +156,53 @@ func reconcilePortConfigs(s *api.Service) []*api.PortConfig {
156 156
 		return s.Spec.Endpoint.Ports
157 157
 	}
158 158
 
159
-	allocatedPorts := make(map[api.PortConfig]*api.PortConfig)
159
+	portStates := allocatedPorts{}
160 160
 	for _, portState := range s.Endpoint.Ports {
161
-		if portState.PublishMode != api.PublishModeIngress {
162
-			continue
161
+		if portState.PublishMode == api.PublishModeIngress {
162
+			portStates.addState(portState)
163 163
 		}
164
-
165
-		allocatedPorts[getPortConfigKey(portState)] = portState
166 164
 	}
167 165
 
168 166
 	var portConfigs []*api.PortConfig
167
+
168
+	// Process the portConfig with portConfig.PublishMode != api.PublishModeIngress
169
+	// and PublishedPort != 0 (high priority)
169 170
 	for _, portConfig := range s.Spec.Endpoint.Ports {
170
-		// If the PublishMode is not Ingress simply pick up
171
-		// the port config.
172 171
 		if portConfig.PublishMode != api.PublishModeIngress {
172
+			// If the PublishMode is not Ingress simply pick up the port config.
173 173
 			portConfigs = append(portConfigs, portConfig)
174
-			continue
175
-		}
174
+		} else if portConfig.PublishedPort != 0 {
175
+			// Otherwise we only process PublishedPort != 0 in this round
176 176
 
177
-		portState, ok := allocatedPorts[getPortConfigKey(portConfig)]
178
-
179
-		// If the portConfig is exactly the same as portState
180
-		// except if SwarmPort is not user-define then prefer
181
-		// portState to ensure sticky allocation of the same
182
-		// port that was allocated before.
183
-		if ok && portConfig.Name == portState.Name &&
184
-			portConfig.TargetPort == portState.TargetPort &&
185
-			portConfig.Protocol == portState.Protocol &&
186
-			portConfig.PublishedPort == 0 {
187
-			portConfigs = append(portConfigs, portState)
188
-			continue
177
+			// Remove record from portState
178
+			portStates.delState(portConfig)
179
+
180
+			// For PublishedPort != 0 prefer the portConfig
181
+			portConfigs = append(portConfigs, portConfig)
189 182
 		}
183
+	}
184
+
185
+	// Iterate portConfigs with PublishedPort == 0 (low priority)
186
+	for _, portConfig := range s.Spec.Endpoint.Ports {
187
+		// Ignore ports which are not PublishModeIngress (already processed)
188
+		// And we only process PublishedPort == 0 in this round
189
+		// So the following:
190
+		//  `portConfig.PublishMode == api.PublishModeIngress && portConfig.PublishedPort == 0`
191
+		if portConfig.PublishMode == api.PublishModeIngress && portConfig.PublishedPort == 0 {
192
+			// If the portConfig is exactly the same as portState
193
+			// except if SwarmPort is not user-define then prefer
194
+			// portState to ensure sticky allocation of the same
195
+			// port that was allocated before.
196
+
197
+			// Remove record from portState
198
+			if portState := portStates.delState(portConfig); portState != nil {
199
+				portConfigs = append(portConfigs, portState)
200
+				continue
201
+			}
190 202
 
191
-		// For all other cases prefer the portConfig
192
-		portConfigs = append(portConfigs, portConfig)
203
+			// For all other cases prefer the portConfig
204
+			portConfigs = append(portConfigs, portConfig)
205
+		}
193 206
 	}
194 207
 
195 208
 	return portConfigs
... ...
@@ -306,40 +319,31 @@ func (pa *portAllocator) isPortsAllocated(s *api.Service) bool {
306 306
 		return false
307 307
 	}
308 308
 
309
-	allocatedPorts := make(map[api.PortConfig]*api.PortConfig)
309
+	portStates := allocatedPorts{}
310 310
 	for _, portState := range s.Endpoint.Ports {
311
-		if portState.PublishMode != api.PublishModeIngress {
312
-			continue
311
+		if portState.PublishMode == api.PublishModeIngress {
312
+			portStates.addState(portState)
313 313
 		}
314
-
315
-		allocatedPorts[getPortConfigKey(portState)] = portState
316 314
 	}
317 315
 
316
+	// Iterate portConfigs with PublishedPort != 0 (high priority)
318 317
 	for _, portConfig := range s.Spec.Endpoint.Ports {
319 318
 		// Ignore ports which are not PublishModeIngress
320 319
 		if portConfig.PublishMode != api.PublishModeIngress {
321 320
 			continue
322 321
 		}
323
-
324
-		portState, ok := allocatedPorts[getPortConfigKey(portConfig)]
325
-
326
-		// If name, port, protocol values don't match then we
327
-		// are not allocated.
328
-		if !ok {
322
+		if portConfig.PublishedPort != 0 && portStates.delState(portConfig) == nil {
329 323
 			return false
330 324
 		}
325
+	}
331 326
 
332
-		// If SwarmPort was user defined but the port state
333
-		// SwarmPort doesn't match we are not allocated.
334
-		if portConfig.PublishedPort != portState.PublishedPort &&
335
-			portConfig.PublishedPort != 0 {
336
-			return false
327
+	// Iterate portConfigs with PublishedPort == 0 (low priority)
328
+	for _, portConfig := range s.Spec.Endpoint.Ports {
329
+		// Ignore ports which are not PublishModeIngress
330
+		if portConfig.PublishMode != api.PublishModeIngress {
331
+			continue
337 332
 		}
338
-
339
-		// If SwarmPort was not defined by user and port state
340
-		// is not initialized with a valid SwarmPort value then
341
-		// we are not allocated.
342
-		if portConfig.PublishedPort == 0 && portState.PublishedPort == 0 {
333
+		if portConfig.PublishedPort == 0 && portStates.delState(portConfig) == nil {
343 334
 			return false
344 335
 		}
345 336
 	}
... ...
@@ -1,11 +1,14 @@
1 1
 package constraintenforcer
2 2
 
3 3
 import (
4
+	"time"
5
+
4 6
 	"github.com/docker/swarmkit/api"
5 7
 	"github.com/docker/swarmkit/log"
6 8
 	"github.com/docker/swarmkit/manager/constraint"
7 9
 	"github.com/docker/swarmkit/manager/state"
8 10
 	"github.com/docker/swarmkit/manager/state/store"
11
+	"github.com/docker/swarmkit/protobuf/ptypes"
9 12
 )
10 13
 
11 14
 // ConstraintEnforcer watches for updates to nodes and shuts down tasks that no
... ...
@@ -43,7 +46,7 @@ func (ce *ConstraintEnforcer) Run() {
43 43
 		log.L.WithError(err).Error("failed to check nodes for noncompliant tasks")
44 44
 	} else {
45 45
 		for _, node := range nodes {
46
-			ce.shutdownNoncompliantTasks(node)
46
+			ce.rejectNoncompliantTasks(node)
47 47
 		}
48 48
 	}
49 49
 
... ...
@@ -51,14 +54,14 @@ func (ce *ConstraintEnforcer) Run() {
51 51
 		select {
52 52
 		case event := <-watcher:
53 53
 			node := event.(state.EventUpdateNode).Node
54
-			ce.shutdownNoncompliantTasks(node)
54
+			ce.rejectNoncompliantTasks(node)
55 55
 		case <-ce.stopChan:
56 56
 			return
57 57
 		}
58 58
 	}
59 59
 }
60 60
 
61
-func (ce *ConstraintEnforcer) shutdownNoncompliantTasks(node *api.Node) {
61
+func (ce *ConstraintEnforcer) rejectNoncompliantTasks(node *api.Node) {
62 62
 	// If the availability is "drain", the orchestrator will
63 63
 	// shut down all tasks.
64 64
 	// If the availability is "pause", we shouldn't touch
... ...
@@ -134,7 +137,16 @@ func (ce *ConstraintEnforcer) shutdownNoncompliantTasks(node *api.Node) {
134 134
 						return nil
135 135
 					}
136 136
 
137
-					t.DesiredState = api.TaskStateShutdown
137
+					// We set the observed state to
138
+					// REJECTED, rather than the desired
139
+					// state. Desired state is owned by the
140
+					// orchestrator, and setting it directly
141
+					// will bypass actions such as
142
+					// restarting the task on another node
143
+					// (if applicable).
144
+					t.Status.State = api.TaskStateRejected
145
+					t.Status.Message = "assigned node no longer meets constraints"
146
+					t.Status.Timestamp = ptypes.MustTimestampProto(time.Now())
138 147
 					return store.UpdateTask(tx, t)
139 148
 				})
140 149
 				if err != nil {
... ...
@@ -460,6 +460,17 @@ func (g *Orchestrator) restartTask(ctx context.Context, taskID string, serviceID
460 460
 		if service == nil {
461 461
 			return nil
462 462
 		}
463
+
464
+		node, nodeExists := g.nodes[t.NodeID]
465
+		serviceEntry, serviceExists := g.globalServices[t.ServiceID]
466
+		if !nodeExists || !serviceExists {
467
+			return nil
468
+		}
469
+		if !constraint.NodeMatches(serviceEntry.constraints, node) {
470
+			t.DesiredState = api.TaskStateShutdown
471
+			return store.UpdateTask(tx, t)
472
+		}
473
+
463 474
 		return g.restarts.Restart(ctx, tx, g.cluster, service, *t)
464 475
 	})
465 476
 	if err != nil {