Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
| ... | ... |
@@ -101,7 +101,7 @@ github.com/docker/containerd aa8187dbd3b7ad67d8e5e3a15115d3eef43a7ed1 |
| 101 | 101 |
github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4 |
| 102 | 102 |
|
| 103 | 103 |
# cluster |
| 104 |
-github.com/docker/swarmkit c7df892262aa0bec0a3e52ea76219b7b364ded38 |
|
| 104 |
+github.com/docker/swarmkit 30a4278953316a0abd88d35c8d6600ff5add2733 |
|
| 105 | 105 |
github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9 |
| 106 | 106 |
github.com/gogo/protobuf v0.3 |
| 107 | 107 |
github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a |
| ... | ... |
@@ -156,40 +156,53 @@ func reconcilePortConfigs(s *api.Service) []*api.PortConfig {
|
| 156 | 156 |
return s.Spec.Endpoint.Ports |
| 157 | 157 |
} |
| 158 | 158 |
|
| 159 |
- allocatedPorts := make(map[api.PortConfig]*api.PortConfig) |
|
| 159 |
+ portStates := allocatedPorts{}
|
|
| 160 | 160 |
for _, portState := range s.Endpoint.Ports {
|
| 161 |
- if portState.PublishMode != api.PublishModeIngress {
|
|
| 162 |
- continue |
|
| 161 |
+ if portState.PublishMode == api.PublishModeIngress {
|
|
| 162 |
+ portStates.addState(portState) |
|
| 163 | 163 |
} |
| 164 |
- |
|
| 165 |
- allocatedPorts[getPortConfigKey(portState)] = portState |
|
| 166 | 164 |
} |
| 167 | 165 |
|
| 168 | 166 |
var portConfigs []*api.PortConfig |
| 167 |
+ |
|
| 168 |
+ // Process the portConfig with portConfig.PublishMode != api.PublishModeIngress |
|
| 169 |
+ // and PublishedPort != 0 (high priority) |
|
| 169 | 170 |
for _, portConfig := range s.Spec.Endpoint.Ports {
|
| 170 |
- // If the PublishMode is not Ingress simply pick up |
|
| 171 |
- // the port config. |
|
| 172 | 171 |
if portConfig.PublishMode != api.PublishModeIngress {
|
| 172 |
+ // If the PublishMode is not Ingress simply pick up the port config. |
|
| 173 | 173 |
portConfigs = append(portConfigs, portConfig) |
| 174 |
- continue |
|
| 175 |
- } |
|
| 174 |
+ } else if portConfig.PublishedPort != 0 {
|
|
| 175 |
+ // Otherwise we only process PublishedPort != 0 in this round |
|
| 176 | 176 |
|
| 177 |
- portState, ok := allocatedPorts[getPortConfigKey(portConfig)] |
|
| 178 |
- |
|
| 179 |
- // If the portConfig is exactly the same as portState |
|
| 180 |
- // except if SwarmPort is not user-define then prefer |
|
| 181 |
- // portState to ensure sticky allocation of the same |
|
| 182 |
- // port that was allocated before. |
|
| 183 |
- if ok && portConfig.Name == portState.Name && |
|
| 184 |
- portConfig.TargetPort == portState.TargetPort && |
|
| 185 |
- portConfig.Protocol == portState.Protocol && |
|
| 186 |
- portConfig.PublishedPort == 0 {
|
|
| 187 |
- portConfigs = append(portConfigs, portState) |
|
| 188 |
- continue |
|
| 177 |
+ // Remove record from portState |
|
| 178 |
+ portStates.delState(portConfig) |
|
| 179 |
+ |
|
| 180 |
+ // For PublishedPort != 0 prefer the portConfig |
|
| 181 |
+ portConfigs = append(portConfigs, portConfig) |
|
| 189 | 182 |
} |
| 183 |
+ } |
|
| 184 |
+ |
|
| 185 |
+ // Iterate portConfigs with PublishedPort == 0 (low priority) |
|
| 186 |
+ for _, portConfig := range s.Spec.Endpoint.Ports {
|
|
| 187 |
+ // Ignore ports which are not PublishModeIngress (already processed) |
|
| 188 |
+ // And we only process PublishedPort == 0 in this round |
|
| 189 |
+ // So the following: |
|
| 190 |
+ // `portConfig.PublishMode == api.PublishModeIngress && portConfig.PublishedPort == 0` |
|
| 191 |
+ if portConfig.PublishMode == api.PublishModeIngress && portConfig.PublishedPort == 0 {
|
|
| 192 |
+ // If the portConfig is exactly the same as portState |
|
| 193 |
+ // except if SwarmPort is not user-define then prefer |
|
| 194 |
+ // portState to ensure sticky allocation of the same |
|
| 195 |
+ // port that was allocated before. |
|
| 196 |
+ |
|
| 197 |
+ // Remove record from portState |
|
| 198 |
+ if portState := portStates.delState(portConfig); portState != nil {
|
|
| 199 |
+ portConfigs = append(portConfigs, portState) |
|
| 200 |
+ continue |
|
| 201 |
+ } |
|
| 190 | 202 |
|
| 191 |
- // For all other cases prefer the portConfig |
|
| 192 |
- portConfigs = append(portConfigs, portConfig) |
|
| 203 |
+ // For all other cases prefer the portConfig |
|
| 204 |
+ portConfigs = append(portConfigs, portConfig) |
|
| 205 |
+ } |
|
| 193 | 206 |
} |
| 194 | 207 |
|
| 195 | 208 |
return portConfigs |
| ... | ... |
@@ -306,40 +319,31 @@ func (pa *portAllocator) isPortsAllocated(s *api.Service) bool {
|
| 306 | 306 |
return false |
| 307 | 307 |
} |
| 308 | 308 |
|
| 309 |
- allocatedPorts := make(map[api.PortConfig]*api.PortConfig) |
|
| 309 |
+ portStates := allocatedPorts{}
|
|
| 310 | 310 |
for _, portState := range s.Endpoint.Ports {
|
| 311 |
- if portState.PublishMode != api.PublishModeIngress {
|
|
| 312 |
- continue |
|
| 311 |
+ if portState.PublishMode == api.PublishModeIngress {
|
|
| 312 |
+ portStates.addState(portState) |
|
| 313 | 313 |
} |
| 314 |
- |
|
| 315 |
- allocatedPorts[getPortConfigKey(portState)] = portState |
|
| 316 | 314 |
} |
| 317 | 315 |
|
| 316 |
+ // Iterate portConfigs with PublishedPort != 0 (high priority) |
|
| 318 | 317 |
for _, portConfig := range s.Spec.Endpoint.Ports {
|
| 319 | 318 |
// Ignore ports which are not PublishModeIngress |
| 320 | 319 |
if portConfig.PublishMode != api.PublishModeIngress {
|
| 321 | 320 |
continue |
| 322 | 321 |
} |
| 323 |
- |
|
| 324 |
- portState, ok := allocatedPorts[getPortConfigKey(portConfig)] |
|
| 325 |
- |
|
| 326 |
- // If name, port, protocol values don't match then we |
|
| 327 |
- // are not allocated. |
|
| 328 |
- if !ok {
|
|
| 322 |
+ if portConfig.PublishedPort != 0 && portStates.delState(portConfig) == nil {
|
|
| 329 | 323 |
return false |
| 330 | 324 |
} |
| 325 |
+ } |
|
| 331 | 326 |
|
| 332 |
- // If SwarmPort was user defined but the port state |
|
| 333 |
- // SwarmPort doesn't match we are not allocated. |
|
| 334 |
- if portConfig.PublishedPort != portState.PublishedPort && |
|
| 335 |
- portConfig.PublishedPort != 0 {
|
|
| 336 |
- return false |
|
| 327 |
+ // Iterate portConfigs with PublishedPort == 0 (low priority) |
|
| 328 |
+ for _, portConfig := range s.Spec.Endpoint.Ports {
|
|
| 329 |
+ // Ignore ports which are not PublishModeIngress |
|
| 330 |
+ if portConfig.PublishMode != api.PublishModeIngress {
|
|
| 331 |
+ continue |
|
| 337 | 332 |
} |
| 338 |
- |
|
| 339 |
- // If SwarmPort was not defined by user and port state |
|
| 340 |
- // is not initialized with a valid SwarmPort value then |
|
| 341 |
- // we are not allocated. |
|
| 342 |
- if portConfig.PublishedPort == 0 && portState.PublishedPort == 0 {
|
|
| 333 |
+ if portConfig.PublishedPort == 0 && portStates.delState(portConfig) == nil {
|
|
| 343 | 334 |
return false |
| 344 | 335 |
} |
| 345 | 336 |
} |
| ... | ... |
@@ -1,11 +1,14 @@ |
| 1 | 1 |
package constraintenforcer |
| 2 | 2 |
|
| 3 | 3 |
import ( |
| 4 |
+ "time" |
|
| 5 |
+ |
|
| 4 | 6 |
"github.com/docker/swarmkit/api" |
| 5 | 7 |
"github.com/docker/swarmkit/log" |
| 6 | 8 |
"github.com/docker/swarmkit/manager/constraint" |
| 7 | 9 |
"github.com/docker/swarmkit/manager/state" |
| 8 | 10 |
"github.com/docker/swarmkit/manager/state/store" |
| 11 |
+ "github.com/docker/swarmkit/protobuf/ptypes" |
|
| 9 | 12 |
) |
| 10 | 13 |
|
| 11 | 14 |
// ConstraintEnforcer watches for updates to nodes and shuts down tasks that no |
| ... | ... |
@@ -43,7 +46,7 @@ func (ce *ConstraintEnforcer) Run() {
|
| 43 | 43 |
log.L.WithError(err).Error("failed to check nodes for noncompliant tasks")
|
| 44 | 44 |
} else {
|
| 45 | 45 |
for _, node := range nodes {
|
| 46 |
- ce.shutdownNoncompliantTasks(node) |
|
| 46 |
+ ce.rejectNoncompliantTasks(node) |
|
| 47 | 47 |
} |
| 48 | 48 |
} |
| 49 | 49 |
|
| ... | ... |
@@ -51,14 +54,14 @@ func (ce *ConstraintEnforcer) Run() {
|
| 51 | 51 |
select {
|
| 52 | 52 |
case event := <-watcher: |
| 53 | 53 |
node := event.(state.EventUpdateNode).Node |
| 54 |
- ce.shutdownNoncompliantTasks(node) |
|
| 54 |
+ ce.rejectNoncompliantTasks(node) |
|
| 55 | 55 |
case <-ce.stopChan: |
| 56 | 56 |
return |
| 57 | 57 |
} |
| 58 | 58 |
} |
| 59 | 59 |
} |
| 60 | 60 |
|
| 61 |
-func (ce *ConstraintEnforcer) shutdownNoncompliantTasks(node *api.Node) {
|
|
| 61 |
+func (ce *ConstraintEnforcer) rejectNoncompliantTasks(node *api.Node) {
|
|
| 62 | 62 |
// If the availability is "drain", the orchestrator will |
| 63 | 63 |
// shut down all tasks. |
| 64 | 64 |
// If the availability is "pause", we shouldn't touch |
| ... | ... |
@@ -134,7 +137,16 @@ func (ce *ConstraintEnforcer) shutdownNoncompliantTasks(node *api.Node) {
|
| 134 | 134 |
return nil |
| 135 | 135 |
} |
| 136 | 136 |
|
| 137 |
- t.DesiredState = api.TaskStateShutdown |
|
| 137 |
+ // We set the observed state to |
|
| 138 |
+ // REJECTED, rather than the desired |
|
| 139 |
+ // state. Desired state is owned by the |
|
| 140 |
+ // orchestrator, and setting it directly |
|
| 141 |
+ // will bypass actions such as |
|
| 142 |
+ // restarting the task on another node |
|
| 143 |
+ // (if applicable). |
|
| 144 |
+ t.Status.State = api.TaskStateRejected |
|
| 145 |
+ t.Status.Message = "assigned node no longer meets constraints" |
|
| 146 |
+ t.Status.Timestamp = ptypes.MustTimestampProto(time.Now()) |
|
| 138 | 147 |
return store.UpdateTask(tx, t) |
| 139 | 148 |
}) |
| 140 | 149 |
if err != nil {
|
| ... | ... |
@@ -460,6 +460,17 @@ func (g *Orchestrator) restartTask(ctx context.Context, taskID string, serviceID |
| 460 | 460 |
if service == nil {
|
| 461 | 461 |
return nil |
| 462 | 462 |
} |
| 463 |
+ |
|
| 464 |
+ node, nodeExists := g.nodes[t.NodeID] |
|
| 465 |
+ serviceEntry, serviceExists := g.globalServices[t.ServiceID] |
|
| 466 |
+ if !nodeExists || !serviceExists {
|
|
| 467 |
+ return nil |
|
| 468 |
+ } |
|
| 469 |
+ if !constraint.NodeMatches(serviceEntry.constraints, node) {
|
|
| 470 |
+ t.DesiredState = api.TaskStateShutdown |
|
| 471 |
+ return store.UpdateTask(tx, t) |
|
| 472 |
+ } |
|
| 473 |
+ |
|
| 463 | 474 |
return g.restarts.Restart(ctx, tx, g.cluster, service, *t) |
| 464 | 475 |
}) |
| 465 | 476 |
if err != nil {
|