Browse code

Update Swarmkit to pick up fixes to heartbeat period and stalled tasks

Signed-off-by: Adam Williams <awilliams@mirantis.com>
(cherry picked from commit cbd2f726bffc45a01d8737ae9a48b099691a09a4)
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>

Adam Williams authored on 2021/02/23 03:32:08
Showing 3 changed files
... ...
@@ -142,7 +142,7 @@ github.com/gogo/googleapis                          01e0f9cca9b92166042241267ee2
142 142
 github.com/cilium/ebpf                              1c8d4c9ef7759622653a1d319284a44652333b28
143 143
 
144 144
 # cluster
145
-github.com/docker/swarmkit                          d6592ddefd8a5319aadff74c558b816b1a0b2590
145
+github.com/docker/swarmkit                          17d8d4e4d8bdec33d386e6362d3537fa9493ba00
146 146
 github.com/gogo/protobuf                            5628607bb4c51c3157aacc3a50f0ab707582b805 # v1.3.1
147 147
 github.com/golang/protobuf                          84668698ea25b64748563aa20726db66a6b8d299 # v1.3.5
148 148
 github.com/cloudflare/cfssl                         5d63dbd981b5c408effbb58c442d54761ff94fbd # 1.3.2
... ...
@@ -1049,7 +1049,16 @@ func (m *Manager) becomeLeader(ctx context.Context) {
1049 1049
 
1050 1050
 	go func(d *dispatcher.Dispatcher) {
1051 1051
 		// Initialize the dispatcher.
1052
-		d.Init(m.raftNode, dispatcher.DefaultConfig(), drivers.New(m.config.PluginGetter), m.config.SecurityConfig)
1052
+		var cluster *api.Cluster
1053
+		s.View(func(tx store.ReadTx) {
1054
+			cluster = store.GetCluster(tx, clusterID)
1055
+		})
1056
+		var defaultConfig = dispatcher.DefaultConfig()
1057
+		heartbeatPeriod, err := gogotypes.DurationFromProto(cluster.Spec.Dispatcher.HeartbeatPeriod)
1058
+		if err == nil {
1059
+			defaultConfig.HeartbeatPeriod = heartbeatPeriod
1060
+		}
1061
+		d.Init(m.raftNode, defaultConfig, drivers.New(m.config.PluginGetter), m.config.SecurityConfig)
1053 1062
 		if err := d.Run(ctx); err != nil {
1054 1063
 			log.G(ctx).WithError(err).Error("Dispatcher exited with an error")
1055 1064
 		}
... ...
@@ -721,15 +721,32 @@ func (s *Scheduler) noSuitableNode(ctx context.Context, taskGroup map[string]*ap
721 721
 
722 722
 		newT := *t
723 723
 		newT.Status.Timestamp = ptypes.MustTimestampProto(time.Now())
724
-		if explanation != "" {
725
-			newT.Status.Err = "no suitable node (" + explanation + ")"
724
+		sv := service.SpecVersion
725
+		tv := newT.SpecVersion
726
+		if sv != nil && tv != nil && sv.Index > tv.Index {
727
+			log.G(ctx).WithField("task.id", t.ID).Debug(
728
+				"task belongs to old revision of service",
729
+			)
730
+			if t.Status.State == api.TaskStatePending && t.DesiredState >= api.TaskStateShutdown {
731
+				log.G(ctx).WithField("task.id", t.ID).Debug(
732
+					"task is desired shutdown, scheduler will go ahead and do so",
733
+				)
734
+				newT.Status.State = api.TaskStateShutdown
735
+				newT.Status.Err = ""
736
+			}
726 737
 		} else {
727
-			newT.Status.Err = "no suitable node"
738
+			if explanation != "" {
739
+				newT.Status.Err = "no suitable node (" + explanation + ")"
740
+			} else {
741
+				newT.Status.Err = "no suitable node"
742
+			}
743
+
744
+			// re-enqueue a task that should still be attempted
745
+			s.enqueue(&newT)
728 746
 		}
747
+
729 748
 		s.allTasks[t.ID] = &newT
730 749
 		schedulingDecisions[t.ID] = schedulingDecision{old: t, new: &newT}
731
-
732
-		s.enqueue(&newT)
733 750
 	}
734 751
 }
735 752