Browse code

Merge pull request #30725 from aaronlehmann/topology

Topology-aware scheduling

Sebastiaan van Stijn authored on 2017/03/03 23:01:12
Showing 12 changed files
... ...
@@ -2050,6 +2050,18 @@ definitions:
2050 2050
             type: "array"
2051 2051
             items:
2052 2052
               type: "string"
2053
+          Preferences:
2054
+            description: "Preferences provide a way to make the scheduler aware of factors such as topology. They are provided in order from highest to lowest precedence."
2055
+            type: "array"
2056
+            items:
2057
+              type: "object"
2058
+              properties:
2059
+                Spread:
2060
+                  type: "object"
2061
+                  properties:
2062
+                    SpreadDescriptor:
2063
+                      description: "label descriptor, such as engine.labels.az"
2064
+                      type: "string"
2053 2065
       ForceUpdate:
2054 2066
         description: "A counter that triggers an update even if no relevant parameters have been changed."
2055 2067
         type: "integer"
... ...
@@ -81,7 +81,21 @@ type ResourceRequirements struct {
81 81
 
82 82
 // Placement represents orchestration parameters.
83 83
 type Placement struct {
84
-	Constraints []string `json:",omitempty"`
84
+	Constraints []string              `json:",omitempty"`
85
+	Preferences []PlacementPreference `json:",omitempty"`
86
+}
87
+
88
+// PlacementPreference provides a way to make the scheduler aware of factors
89
+// such as topology.
90
+type PlacementPreference struct {
91
+	Spread *SpreadOver
92
+}
93
+
94
+// SpreadOver is a scheduling preference that instructs the scheduler to spread
95
+// tasks evenly over groups of nodes identified by labels.
96
+type SpreadOver struct {
97
+	// label descriptor, such as engine.labels.az
98
+	SpreadDescriptor string
85 99
 }
86 100
 
87 101
 // RestartPolicy represents the restart policy.
... ...
@@ -39,9 +39,12 @@ UpdateStatus:
39 39
  Message:	{{ .UpdateStatusMessage }}
40 40
 {{- end }}
41 41
 Placement:
42
-{{- if .TaskPlacementConstraints -}}
42
+{{- if .TaskPlacementConstraints }}
43 43
  Constraints:	{{ .TaskPlacementConstraints }}
44 44
 {{- end }}
45
+{{- if .TaskPlacementPreferences }}
46
+ Preferences:   {{ .TaskPlacementPreferences }}
47
+{{- end }}
45 48
 {{- if .HasUpdateConfig }}
46 49
 UpdateConfig:
47 50
  Parallelism:	{{ .UpdateParallelism }}
... ...
@@ -211,6 +214,19 @@ func (ctx *serviceInspectContext) TaskPlacementConstraints() []string {
211 211
 	return nil
212 212
 }
213 213
 
214
+func (ctx *serviceInspectContext) TaskPlacementPreferences() []string {
215
+	if ctx.Service.Spec.TaskTemplate.Placement == nil {
216
+		return nil
217
+	}
218
+	var strings []string
219
+	for _, pref := range ctx.Service.Spec.TaskTemplate.Placement.Preferences {
220
+		if pref.Spread != nil {
221
+			strings = append(strings, "spread="+pref.Spread.SpreadDescriptor)
222
+		}
223
+	}
224
+	return strings
225
+}
226
+
214 227
 func (ctx *serviceInspectContext) HasUpdateConfig() bool {
215 228
 	return ctx.Service.Spec.UpdateConfig != nil
216 229
 }
... ...
@@ -37,6 +37,8 @@ func newCreateCommand(dockerCli *command.DockerCli) *cobra.Command {
37 37
 	flags.Var(&opts.envFile, flagEnvFile, "Read in a file of environment variables")
38 38
 	flags.Var(&opts.mounts, flagMount, "Attach a filesystem mount to the service")
39 39
 	flags.Var(&opts.constraints, flagConstraint, "Placement constraints")
40
+	flags.Var(&opts.placementPrefs, flagPlacementPref, "Add a placement preference")
41
+	flags.SetAnnotation(flagPlacementPref, "version", []string{"1.27"})
40 42
 	flags.Var(&opts.networks, flagNetwork, "Network attachments")
41 43
 	flags.Var(&opts.secrets, flagSecret, "Specify secrets to expose to the service")
42 44
 	flags.SetAnnotation(flagSecret, "version", []string{"1.25"})
... ...
@@ -1,6 +1,7 @@
1 1
 package service
2 2
 
3 3
 import (
4
+	"errors"
4 5
 	"fmt"
5 6
 	"strconv"
6 7
 	"strings"
... ...
@@ -117,6 +118,45 @@ func (f *floatValue) Value() float32 {
117 117
 	return float32(*f)
118 118
 }
119 119
 
120
+// placementPrefOpts holds a list of placement preferences.
121
+type placementPrefOpts struct {
122
+	prefs   []swarm.PlacementPreference
123
+	strings []string
124
+}
125
+
126
+func (opts *placementPrefOpts) String() string {
127
+	if len(opts.strings) == 0 {
128
+		return ""
129
+	}
130
+	return fmt.Sprintf("%v", opts.strings)
131
+}
132
+
133
+// Set validates the input value and adds it to the internal slices.
134
+// Note: in the future strategies other than "spread", may be supported,
135
+// as well as additional comma-separated options.
136
+func (opts *placementPrefOpts) Set(value string) error {
137
+	fields := strings.Split(value, "=")
138
+	if len(fields) != 2 {
139
+		return errors.New(`placement preference must be of the format "<strategy>=<arg>"`)
140
+	}
141
+	if fields[0] != "spread" {
142
+		return fmt.Errorf("unsupported placement preference %s (only spread is supported)", fields[0])
143
+	}
144
+
145
+	opts.prefs = append(opts.prefs, swarm.PlacementPreference{
146
+		Spread: &swarm.SpreadOver{
147
+			SpreadDescriptor: fields[1],
148
+		},
149
+	})
150
+	opts.strings = append(opts.strings, value)
151
+	return nil
152
+}
153
+
154
+// Type returns a string name for this Option type
155
+func (opts *placementPrefOpts) Type() string {
156
+	return "pref"
157
+}
158
+
120 159
 type updateOptions struct {
121 160
 	parallelism     uint64
122 161
 	delay           time.Duration
... ...
@@ -284,11 +324,12 @@ type serviceOptions struct {
284 284
 	replicas Uint64Opt
285 285
 	mode     string
286 286
 
287
-	restartPolicy restartPolicyOptions
288
-	constraints   opts.ListOpts
289
-	update        updateOptions
290
-	networks      opts.ListOpts
291
-	endpoint      endpointOptions
287
+	restartPolicy  restartPolicyOptions
288
+	constraints    opts.ListOpts
289
+	placementPrefs placementPrefOpts
290
+	update         updateOptions
291
+	networks       opts.ListOpts
292
+	endpoint       endpointOptions
292 293
 
293 294
 	registryAuth bool
294 295
 
... ...
@@ -400,6 +441,7 @@ func (opts *serviceOptions) ToService() (swarm.ServiceSpec, error) {
400 400
 			RestartPolicy: opts.restartPolicy.ToRestartPolicy(),
401 401
 			Placement: &swarm.Placement{
402 402
 				Constraints: opts.constraints.GetAll(),
403
+				Preferences: opts.placementPrefs.prefs,
403 404
 			},
404 405
 			LogDriver: opts.logDriver.toLogDriver(),
405 406
 		},
... ...
@@ -478,6 +520,9 @@ func addServiceFlags(cmd *cobra.Command, opts *serviceOptions) {
478 478
 }
479 479
 
480 480
 const (
481
+	flagPlacementPref         = "placement-pref"
482
+	flagPlacementPrefAdd      = "placement-pref-add"
483
+	flagPlacementPrefRemove   = "placement-pref-rm"
481 484
 	flagConstraint            = "constraint"
482 485
 	flagConstraintRemove      = "constraint-rm"
483 486
 	flagConstraintAdd         = "constraint-add"
... ...
@@ -69,6 +69,10 @@ func newUpdateCommand(dockerCli *command.DockerCli) *cobra.Command {
69 69
 	flags.SetAnnotation(flagSecretAdd, "version", []string{"1.25"})
70 70
 	flags.Var(&serviceOpts.mounts, flagMountAdd, "Add or update a mount on a service")
71 71
 	flags.Var(&serviceOpts.constraints, flagConstraintAdd, "Add or update a placement constraint")
72
+	flags.Var(&serviceOpts.placementPrefs, flagPlacementPrefAdd, "Add a placement preference")
73
+	flags.SetAnnotation(flagPlacementPrefAdd, "version", []string{"1.27"})
74
+	flags.Var(&placementPrefOpts{}, flagPlacementPrefRemove, "Remove a placement preference")
75
+	flags.SetAnnotation(flagPlacementPrefRemove, "version", []string{"1.27"})
72 76
 	flags.Var(&serviceOpts.endpoint.publishPorts, flagPublishAdd, "Add or update a published port")
73 77
 	flags.Var(&serviceOpts.groups, flagGroupAdd, "Add an additional supplementary user group to the container")
74 78
 	flags.SetAnnotation(flagGroupAdd, "version", []string{"1.25"})
... ...
@@ -260,7 +264,14 @@ func updateService(flags *pflag.FlagSet, spec *swarm.ServiceSpec) error {
260 260
 		if task.Placement == nil {
261 261
 			task.Placement = &swarm.Placement{}
262 262
 		}
263
-		updatePlacement(flags, task.Placement)
263
+		updatePlacementConstraints(flags, task.Placement)
264
+	}
265
+
266
+	if anyChanged(flags, flagPlacementPrefAdd, flagPlacementPrefRemove) {
267
+		if task.Placement == nil {
268
+			task.Placement = &swarm.Placement{}
269
+		}
270
+		updatePlacementPreferences(flags, task.Placement)
264 271
 	}
265 272
 
266 273
 	if err := updateReplicas(flags, &spec.Mode); err != nil {
... ...
@@ -374,7 +385,7 @@ func anyChanged(flags *pflag.FlagSet, fields ...string) bool {
374 374
 	return false
375 375
 }
376 376
 
377
-func updatePlacement(flags *pflag.FlagSet, placement *swarm.Placement) {
377
+func updatePlacementConstraints(flags *pflag.FlagSet, placement *swarm.Placement) {
378 378
 	if flags.Changed(flagConstraintAdd) {
379 379
 		values := flags.Lookup(flagConstraintAdd).Value.(*opts.ListOpts).GetAll()
380 380
 		placement.Constraints = append(placement.Constraints, values...)
... ...
@@ -393,6 +404,35 @@ func updatePlacement(flags *pflag.FlagSet, placement *swarm.Placement) {
393 393
 	placement.Constraints = newConstraints
394 394
 }
395 395
 
396
+func updatePlacementPreferences(flags *pflag.FlagSet, placement *swarm.Placement) {
397
+	var newPrefs []swarm.PlacementPreference
398
+
399
+	if flags.Changed(flagPlacementPrefRemove) {
400
+		for _, existing := range placement.Preferences {
401
+			removed := false
402
+			for _, removal := range flags.Lookup(flagPlacementPrefRemove).Value.(*placementPrefOpts).prefs {
403
+				if removal.Spread != nil && existing.Spread != nil && removal.Spread.SpreadDescriptor == existing.Spread.SpreadDescriptor {
404
+					removed = true
405
+					break
406
+				}
407
+			}
408
+			if !removed {
409
+				newPrefs = append(newPrefs, existing)
410
+			}
411
+		}
412
+	} else {
413
+		newPrefs = placement.Preferences
414
+	}
415
+
416
+	if flags.Changed(flagPlacementPrefAdd) {
417
+		for _, addition := range flags.Lookup(flagPlacementPrefAdd).Value.(*placementPrefOpts).prefs {
418
+			newPrefs = append(newPrefs, addition)
419
+		}
420
+	}
421
+
422
+	placement.Preferences = newPrefs
423
+}
424
+
396 425
 func updateContainerLabels(flags *pflag.FlagSet, field *map[string]string) {
397 426
 	if flags.Changed(flagContainerLabelAdd) {
398 427
 		if *field == nil {
... ...
@@ -51,7 +51,7 @@ func TestUpdateLabelsRemoveALabelThatDoesNotExist(t *testing.T) {
51 51
 	assert.Equal(t, len(labels), 1)
52 52
 }
53 53
 
54
-func TestUpdatePlacement(t *testing.T) {
54
+func TestUpdatePlacementConstraints(t *testing.T) {
55 55
 	flags := newUpdateCommand(nil).Flags()
56 56
 	flags.Set("constraint-add", "node=toadd")
57 57
 	flags.Set("constraint-rm", "node!=toremove")
... ...
@@ -60,12 +60,38 @@ func TestUpdatePlacement(t *testing.T) {
60 60
 		Constraints: []string{"node!=toremove", "container=tokeep"},
61 61
 	}
62 62
 
63
-	updatePlacement(flags, placement)
63
+	updatePlacementConstraints(flags, placement)
64 64
 	assert.Equal(t, len(placement.Constraints), 2)
65 65
 	assert.Equal(t, placement.Constraints[0], "container=tokeep")
66 66
 	assert.Equal(t, placement.Constraints[1], "node=toadd")
67 67
 }
68 68
 
69
+func TestUpdatePlacementPrefs(t *testing.T) {
70
+	flags := newUpdateCommand(nil).Flags()
71
+	flags.Set("placement-pref-add", "spread=node.labels.dc")
72
+	flags.Set("placement-pref-rm", "spread=node.labels.rack")
73
+
74
+	placement := &swarm.Placement{
75
+		Preferences: []swarm.PlacementPreference{
76
+			{
77
+				Spread: &swarm.SpreadOver{
78
+					SpreadDescriptor: "node.labels.rack",
79
+				},
80
+			},
81
+			{
82
+				Spread: &swarm.SpreadOver{
83
+					SpreadDescriptor: "node.labels.row",
84
+				},
85
+			},
86
+		},
87
+	}
88
+
89
+	updatePlacementPreferences(flags, placement)
90
+	assert.Equal(t, len(placement.Preferences), 2)
91
+	assert.Equal(t, placement.Preferences[0].Spread.SpreadDescriptor, "node.labels.row")
92
+	assert.Equal(t, placement.Preferences[1].Spread.SpreadDescriptor, "node.labels.dc")
93
+}
94
+
69 95
 func TestUpdateEnvironment(t *testing.T) {
70 96
 	flags := newUpdateCommand(nil).Flags()
71 97
 	flags.Set("env-add", "toadd=newenv")
... ...
@@ -162,8 +162,21 @@ func ServiceSpecToGRPC(s types.ServiceSpec) (swarmapi.ServiceSpec, error) {
162 162
 	spec.Task.Restart = restartPolicy
163 163
 
164 164
 	if s.TaskTemplate.Placement != nil {
165
+		var preferences []*swarmapi.PlacementPreference
166
+		for _, pref := range s.TaskTemplate.Placement.Preferences {
167
+			if pref.Spread != nil {
168
+				preferences = append(preferences, &swarmapi.PlacementPreference{
169
+					Preference: &swarmapi.PlacementPreference_Spread{
170
+						Spread: &swarmapi.SpreadOver{
171
+							SpreadDescriptor: pref.Spread.SpreadDescriptor,
172
+						},
173
+					},
174
+				})
175
+			}
176
+		}
165 177
 		spec.Task.Placement = &swarmapi.Placement{
166 178
 			Constraints: s.TaskTemplate.Placement.Constraints,
179
+			Preferences: preferences,
167 180
 		}
168 181
 	}
169 182
 
... ...
@@ -351,10 +364,21 @@ func restartPolicyToGRPC(p *types.RestartPolicy) (*swarmapi.RestartPolicy, error
351 351
 }
352 352
 
353 353
 func placementFromGRPC(p *swarmapi.Placement) *types.Placement {
354
-	var r *types.Placement
355
-	if p != nil {
356
-		r = &types.Placement{}
357
-		r.Constraints = p.Constraints
354
+	if p == nil {
355
+		return nil
356
+	}
357
+	r := &types.Placement{
358
+		Constraints: p.Constraints,
359
+	}
360
+
361
+	for _, pref := range p.Preferences {
362
+		if spread := pref.GetSpread(); spread != nil {
363
+			r.Preferences = append(r.Preferences, types.PlacementPreference{
364
+				Spread: &types.SpreadOver{
365
+					SpreadDescriptor: spread.SpreadDescriptor,
366
+				},
367
+			})
368
+		}
358 369
 	}
359 370
 
360 371
 	return r
... ...
@@ -47,6 +47,7 @@ Options:
47 47
       --name string                      Service name
48 48
       --network list                     Network attachments (default [])
49 49
       --no-healthcheck                   Disable any container-specified HEALTHCHECK
50
+      --placement-pref pref              Add a placement preference
50 51
   -p, --publish port                     Publish a port as a node port
51 52
       --read-only                        Mount the container's root filesystem as read only
52 53
       --replicas uint                    Number of tasks
... ...
@@ -436,6 +437,77 @@ $ docker service create \
436 436
   redis:3.0.6
437 437
 ```
438 438
 
439
+### Specify service placement preferences (--placement-pref)
440
+
441
+You can set up the service to divide tasks evenly over different categories of
442
+nodes. One example of where this can be useful is to balance tasks over a set
443
+of datacenters or availability zones. The example below illustrates this:
444
+
445
+```bash
446
+$ docker service create \
447
+  --replicas 9 \
448
+  --name redis_2 \
449
+  --placement-pref 'spread=node.labels.datacenter' \
450
+  redis:3.0.6
451
+```
452
+
453
+This uses `--placement-pref` with a `spread` strategy (currently the only
454
+supported strategy) to spread tasks evenly over the values of the `datacenter`
455
+node label. In this example, we assume that every node has a `datacenter` node
456
+label attached to it. If there are three different values of this label among
457
+nodes in the swarm, one third of the tasks will be placed on the nodes
458
+associated with each value. This is true even if there are more nodes with one
459
+value than another. For example, consider the following set of nodes:
460
+
461
+- Three nodes with `node.labels.datacenter=east`
462
+- Two nodes with `node.labels.datacenter=south`
463
+- One node with `node.labels.datacenter=west`
464
+
465
+Since we are spreading over the values of the `datacenter` label and the
466
+service has 9 replicas, 3 replicas will end up in each datacenter. There are
467
+three nodes associated with the value `east`, so each one will get one of the
468
+three replicas reserved for this value. There are two nodes with the value
469
+`south`, and the three replicas for this value will be divided between them,
470
+with one receiving two replicas and another receiving just one. Finally, `west`
471
+has a single node that will get all three replicas reserved for `west`.
472
+
473
+If the nodes in one category (for example, those with
474
+`node.labels.datacenter=south`) can't handle their fair share of tasks due to
475
+constraints or resource limitations, the extra tasks will be assigned to other
476
+nodes instead, if possible.
477
+
478
+Both engine labels and node labels are supported by placement preferences. The
479
+example above uses a node label, because the label is referenced with
480
+`node.labels.datacenter`. To spread over the values of an engine label, use
481
+`--placement-pref spread=engine.labels.<labelname>`.
482
+
483
+It is possible to add multiple placement preferences to a service. This
484
+establishes a hierarchy of preferences, so that tasks are first divided over
485
+one category, and then further divided over additional categories. One example
486
+of where this may be useful is dividing tasks fairly between datacenters, and
487
+then splitting the tasks within each datacenter over a choice of racks. To add
488
+multiple placement preferences, specify the `--placement-pref` flag multiple
489
+times. The order is significant, and the placement preferences will be applied
490
+in the order given when making scheduling decisions.
491
+
492
+The following example sets up a service with multiple placement preferences.
493
+Tasks are spread first over the various datacenters, and then over racks
494
+(as indicated by the respective labels):
495
+
496
+```bash
497
+$ docker service create \
498
+  --replicas 9 \
499
+  --name redis_2 \
500
+  --placement-pref 'spread=node.labels.datacenter' \
501
+  --placement-pref 'spread=node.labels.rack' \
502
+  redis:3.0.6
503
+```
504
+
505
+When updating a service with `docker service update`, `--placement-pref-add`
506
+appends a new placement preference after all existing placement preferences.
507
+`--placement-pref-rm` removes an existing placement preference that matches the
508
+argument.
509
+
439 510
 ### Attach a service to an existing network (--network)
440 511
 
441 512
 You can use overlay networks to connect one or more services within the swarm.
... ...
@@ -56,6 +56,8 @@ Options:
56 56
       --mount-add mount                  Add or update a mount on a service
57 57
       --mount-rm list                    Remove a mount by its target path (default [])
58 58
       --no-healthcheck                   Disable any container-specified HEALTHCHECK
59
+      --placement-pref-add pref          Add a placement preference
60
+      --placement-pref-rm pref           Remove a placement preference
59 61
       --publish-add port                 Add or update a published port
60 62
       --publish-rm port                  Remove a published port by its target port
61 63
       --read-only                        Mount the container's root filesystem as read only
... ...
@@ -365,6 +365,48 @@ func (s *DockerSwarmSuite) TestAPISwarmServiceConstraintLabel(c *check.C) {
365 365
 	}
366 366
 }
367 367
 
368
+func (s *DockerSwarmSuite) TestAPISwarmServicePlacementPrefs(c *check.C) {
369
+	const nodeCount = 3
370
+	var daemons [nodeCount]*daemon.Swarm
371
+	for i := 0; i < nodeCount; i++ {
372
+		daemons[i] = s.AddDaemon(c, true, i == 0)
373
+	}
374
+	// wait for nodes ready
375
+	waitAndAssert(c, 5*time.Second, daemons[0].CheckNodeReadyCount, checker.Equals, nodeCount)
376
+	nodes := daemons[0].ListNodes(c)
377
+	c.Assert(len(nodes), checker.Equals, nodeCount)
378
+
379
+	// add labels to nodes
380
+	daemons[0].UpdateNode(c, nodes[0].ID, func(n *swarm.Node) {
381
+		n.Spec.Annotations.Labels = map[string]string{
382
+			"rack": "a",
383
+		}
384
+	})
385
+	for i := 1; i < nodeCount; i++ {
386
+		daemons[0].UpdateNode(c, nodes[i].ID, func(n *swarm.Node) {
387
+			n.Spec.Annotations.Labels = map[string]string{
388
+				"rack": "b",
389
+			}
390
+		})
391
+	}
392
+
393
+	// create service
394
+	instances := 4
395
+	prefs := []swarm.PlacementPreference{{Spread: &swarm.SpreadOver{SpreadDescriptor: "node.labels.rack"}}}
396
+	id := daemons[0].CreateService(c, simpleTestService, setPlacementPrefs(prefs), setInstances(instances))
397
+	// wait for tasks ready
398
+	waitAndAssert(c, defaultReconciliationTimeout, daemons[0].CheckServiceRunningTasks(id), checker.Equals, instances)
399
+	tasks := daemons[0].GetServiceTasks(c, id)
400
+	// validate all tasks are running on nodes[0]
401
+	tasksOnNode := make(map[string]int)
402
+	for _, task := range tasks {
403
+		tasksOnNode[task.NodeID]++
404
+	}
405
+	c.Assert(tasksOnNode[nodes[0].ID], checker.Equals, 2)
406
+	c.Assert(tasksOnNode[nodes[1].ID], checker.Equals, 1)
407
+	c.Assert(tasksOnNode[nodes[2].ID], checker.Equals, 1)
408
+}
409
+
368 410
 func (s *DockerSwarmSuite) TestAPISwarmServicesStateReporting(c *check.C) {
369 411
 	testRequires(c, SameHostDaemon)
370 412
 	testRequires(c, DaemonIsLinux)
... ...
@@ -604,6 +604,15 @@ func setConstraints(constraints []string) daemon.ServiceConstructor {
604 604
 	}
605 605
 }
606 606
 
607
+func setPlacementPrefs(prefs []swarm.PlacementPreference) daemon.ServiceConstructor {
608
+	return func(s *swarm.Service) {
609
+		if s.Spec.TaskTemplate.Placement == nil {
610
+			s.Spec.TaskTemplate.Placement = &swarm.Placement{}
611
+		}
612
+		s.Spec.TaskTemplate.Placement.Preferences = prefs
613
+	}
614
+}
615
+
607 616
 func setGlobalMode(s *swarm.Service) {
608 617
 	s.Spec.Mode = swarm.ServiceMode{
609 618
 		Global: &swarm.GlobalService{},