Browse code

Vendor swarmkit f93948c

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>

Aaron Lehmann authored on 2017/03/11 04:08:44
Showing 3 changed files
... ...
@@ -101,7 +101,7 @@ github.com/docker/containerd 977c511eda0925a723debdc94d09459af49d082a
101 101
 github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4
102 102
 
103 103
 # cluster
104
-github.com/docker/swarmkit 1f3e4e67eeac60456460a270179711d0808129f9
104
+github.com/docker/swarmkit f93948cb430facd540e3c65c606384a89b3ac40f
105 105
 github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
106 106
 github.com/gogo/protobuf v0.3
107 107
 github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a
... ...
@@ -26,7 +26,11 @@ const (
26 26
 	allocatedStatusMessage = "pending task scheduling"
27 27
 )
28 28
 
29
-var errNoChanges = errors.New("task unchanged")
29
+var (
30
+	errNoChanges = errors.New("task unchanged")
31
+
32
+	retryInterval = 5 * time.Minute
33
+)
30 34
 
31 35
 func newIngressNetwork() *api.Network {
32 36
 	return &api.Network{
... ...
@@ -57,19 +61,28 @@ type networkContext struct {
57 57
 	// the actual network allocation.
58 58
 	nwkAllocator *networkallocator.NetworkAllocator
59 59
 
60
-	// A table of unallocated tasks which will be revisited if any thing
60
+	// A set of tasks which are ready to be allocated as a batch. This is
61
+	// distinct from "unallocatedTasks" which are tasks that failed to
62
+	// allocate on the first try, being held for a future retry.
63
+	pendingTasks map[string]*api.Task
64
+
65
+	// A set of unallocated tasks which will be revisited if any thing
61 66
 	// changes in system state that might help task allocation.
62 67
 	unallocatedTasks map[string]*api.Task
63 68
 
64
-	// A table of unallocated services which will be revisited if
69
+	// A set of unallocated services which will be revisited if
65 70
 	// any thing changes in system state that might help service
66 71
 	// allocation.
67 72
 	unallocatedServices map[string]*api.Service
68 73
 
69
-	// A table of unallocated networks which will be revisited if
74
+	// A set of unallocated networks which will be revisited if
70 75
 	// any thing changes in system state that might help network
71 76
 	// allocation.
72 77
 	unallocatedNetworks map[string]*api.Network
78
+
79
+	// lastRetry is the last timestamp when unallocated
80
+	// tasks/services/networks were retried.
81
+	lastRetry time.Time
73 82
 }
74 83
 
75 84
 func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {
... ...
@@ -80,10 +93,12 @@ func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {
80 80
 
81 81
 	nc := &networkContext{
82 82
 		nwkAllocator:        na,
83
+		pendingTasks:        make(map[string]*api.Task),
83 84
 		unallocatedTasks:    make(map[string]*api.Task),
84 85
 		unallocatedServices: make(map[string]*api.Service),
85 86
 		unallocatedNetworks: make(map[string]*api.Network),
86 87
 		ingressNetwork:      newIngressNetwork(),
88
+		lastRetry:           time.Now(),
87 89
 	}
88 90
 	a.netCtx = nc
89 91
 	defer func() {
... ...
@@ -266,7 +281,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {
266 266
 	}
267 267
 
268 268
 	for _, t := range tasks {
269
-		if taskDead(t) {
269
+		if t.Status.State > api.TaskStateRunning {
270 270
 			continue
271 271
 		}
272 272
 
... ...
@@ -351,6 +366,8 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
351 351
 		if err := nc.nwkAllocator.Deallocate(n); err != nil {
352 352
 			log.G(ctx).WithError(err).Errorf("Failed during network free for network %s", n.ID)
353 353
 		}
354
+
355
+		delete(nc.unallocatedNetworks, n.ID)
354 356
 	case state.EventCreateService:
355 357
 		s := v.Service.Copy()
356 358
 
... ...
@@ -387,6 +404,9 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
387 387
 			return a.commitAllocatedService(ctx, batch, s)
388 388
 		}); err != nil {
389 389
 			log.G(ctx).WithError(err).Errorf("Failed to commit allocation during update for service %s", s.ID)
390
+			nc.unallocatedServices[s.ID] = s
391
+		} else {
392
+			delete(nc.unallocatedServices, s.ID)
390 393
 		}
391 394
 	case state.EventDeleteService:
392 395
 		s := v.Service.Copy()
... ...
@@ -403,10 +423,20 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
403 403
 	case state.EventCreateTask, state.EventUpdateTask, state.EventDeleteTask:
404 404
 		a.doTaskAlloc(ctx, ev)
405 405
 	case state.EventCommit:
406
-		a.procUnallocatedNetworks(ctx)
407
-		a.procUnallocatedServices(ctx)
408
-		a.procUnallocatedTasksNetwork(ctx)
409
-		return
406
+		a.procTasksNetwork(ctx, false)
407
+
408
+		if time.Since(nc.lastRetry) > retryInterval {
409
+			a.procUnallocatedNetworks(ctx)
410
+			a.procUnallocatedServices(ctx)
411
+			a.procTasksNetwork(ctx, true)
412
+			nc.lastRetry = time.Now()
413
+		}
414
+
415
+		// Any left over tasks are moved to the unallocated set
416
+		for _, t := range nc.pendingTasks {
417
+			nc.unallocatedTasks[t.ID] = t
418
+		}
419
+		nc.pendingTasks = make(map[string]*api.Task)
410 420
 	}
411 421
 }
412 422
 
... ...
@@ -456,17 +486,6 @@ func (a *Allocator) doNodeAlloc(ctx context.Context, ev events.Event) {
456 456
 	}
457 457
 }
458 458
 
459
-// taskRunning checks whether a task is either actively running, or in the
460
-// process of starting up.
461
-func taskRunning(t *api.Task) bool {
462
-	return t.DesiredState <= api.TaskStateRunning && t.Status.State <= api.TaskStateRunning
463
-}
464
-
465
-// taskDead checks whether a task is not actively running as far as allocator purposes are concerned.
466
-func taskDead(t *api.Task) bool {
467
-	return t.DesiredState > api.TaskStateRunning && t.Status.State > api.TaskStateRunning
468
-}
469
-
470 459
 // taskReadyForNetworkVote checks if the task is ready for a network
471 460
 // vote to move it to PENDING state.
472 461
 func taskReadyForNetworkVote(t *api.Task, s *api.Service, nc *networkContext) bool {
... ...
@@ -569,17 +588,17 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
569 569
 
570 570
 	nc := a.netCtx
571 571
 
572
-	// If the task has stopped running or it's being deleted then
573
-	// we should free the network resources associated with the
574
-	// task right away.
575
-	if taskDead(t) || isDelete {
572
+	// If the task has stopped running then we should free the network
573
+	// resources associated with the task right away.
574
+	if t.Status.State > api.TaskStateRunning || isDelete {
576 575
 		if nc.nwkAllocator.IsTaskAllocated(t) {
577 576
 			if err := nc.nwkAllocator.DeallocateTask(t); err != nil {
578 577
 				log.G(ctx).WithError(err).Errorf("Failed freeing network resources for task %s", t.ID)
579 578
 			}
580 579
 		}
581 580
 
582
-		// Cleanup any task references that might exist in unallocatedTasks
581
+		// Cleanup any task references that might exist
582
+		delete(nc.pendingTasks, t.ID)
583 583
 		delete(nc.unallocatedTasks, t.ID)
584 584
 		return
585 585
 	}
... ...
@@ -587,6 +606,7 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
587 587
 	// If we are already in allocated state, there is
588 588
 	// absolutely nothing else to do.
589 589
 	if t.Status.State >= api.TaskStatePending {
590
+		delete(nc.pendingTasks, t.ID)
590 591
 		delete(nc.unallocatedTasks, t.ID)
591 592
 		return
592 593
 	}
... ...
@@ -605,7 +625,7 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
605 605
 			// available in store. But we still need to
606 606
 			// cleanup network resources associated with
607 607
 			// the task.
608
-			if taskRunning(t) && !isDelete {
608
+			if t.Status.State <= api.TaskStateRunning && !isDelete {
609 609
 				log.G(ctx).Errorf("Event %T: Failed to get service %s for task %s state %s: could not find service %s", ev, t.ServiceID, t.ID, t.Status.State, t.ServiceID)
610 610
 				return
611 611
 			}
... ...
@@ -616,7 +636,7 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
616 616
 	// based on service spec.
617 617
 	a.taskCreateNetworkAttachments(t, s)
618 618
 
619
-	nc.unallocatedTasks[t.ID] = t
619
+	nc.pendingTasks[t.ID] = t
620 620
 }
621 621
 
622 622
 func (a *Allocator) allocateNode(ctx context.Context, node *api.Node) error {
... ...
@@ -948,15 +968,25 @@ func (a *Allocator) procUnallocatedServices(ctx context.Context) {
948 948
 	}
949 949
 }
950 950
 
951
-func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context) {
951
+func (a *Allocator) procTasksNetwork(ctx context.Context, onRetry bool) {
952 952
 	nc := a.netCtx
953
-	allocatedTasks := make([]*api.Task, 0, len(nc.unallocatedTasks))
953
+	quiet := false
954
+	toAllocate := nc.pendingTasks
955
+	if onRetry {
956
+		toAllocate = nc.unallocatedTasks
957
+		quiet = true
958
+	}
959
+	allocatedTasks := make([]*api.Task, 0, len(toAllocate))
954 960
 
955
-	for _, t := range nc.unallocatedTasks {
961
+	for _, t := range toAllocate {
956 962
 		if err := a.allocateTask(ctx, t); err == nil {
957 963
 			allocatedTasks = append(allocatedTasks, t)
958 964
 		} else if err != errNoChanges {
959
-			log.G(ctx).WithError(err).Error("task allocation failure")
965
+			if quiet {
966
+				log.G(ctx).WithError(err).Debug("task allocation failure")
967
+			} else {
968
+				log.G(ctx).WithError(err).Error("task allocation failure")
969
+			}
960 970
 		}
961 971
 	}
962 972
 
... ...
@@ -978,11 +1008,11 @@ func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context) {
978 978
 	})
979 979
 
980 980
 	if err != nil {
981
-		log.G(ctx).WithError(err).Error("failed a store batch operation while processing unallocated tasks")
981
+		log.G(ctx).WithError(err).Error("failed a store batch operation while processing tasks")
982 982
 	}
983 983
 
984 984
 	for _, t := range allocatedTasks[:committed] {
985
-		delete(nc.unallocatedTasks, t.ID)
985
+		delete(toAllocate, t.ID)
986 986
 	}
987 987
 }
988 988
 
... ...
@@ -167,8 +167,10 @@ func (s *Server) RemoveNetwork(ctx context.Context, request *api.RemoveNetworkRe
167 167
 			return grpc.Errorf(codes.Internal, "could not find tasks using network %s: %v", request.NetworkID, err)
168 168
 		}
169 169
 
170
-		if len(tasks) != 0 {
171
-			return grpc.Errorf(codes.FailedPrecondition, "network %s is in use by task %s", request.NetworkID, tasks[0].ID)
170
+		for _, t := range tasks {
171
+			if t.DesiredState <= api.TaskStateRunning && t.Status.State <= api.TaskStateRunning {
172
+				return grpc.Errorf(codes.FailedPrecondition, "network %s is in use by task %s", request.NetworkID, t.ID)
173
+			}
172 174
 		}
173 175
 
174 176
 		nw := store.GetNetwork(tx, request.NetworkID)