Browse code

add support for swarmkit generic runtime

Signed-off-by: Evan Hazlett <ejhazlett@gmail.com>

Evan Hazlett authored on 2017/03/02 05:52:55
Showing 6 changed files
1 1
new file mode 100644
... ...
@@ -0,0 +1,19 @@
0
+package swarm
1
+
2
+// RuntimeType is the type of runtime used for the TaskSpec
3
+type RuntimeType string
4
+
5
+// RuntimeURL is the proto type url
6
+type RuntimeURL string
7
+
8
+const (
9
+	// RuntimeContainer is the container based runtime
10
+	RuntimeContainer RuntimeType = "container"
11
+	// RuntimePlugin is the plugin based runtime
12
+	RuntimePlugin RuntimeType = "plugin"
13
+
14
+	// RuntimeURLContainer is the proto url for the container type
15
+	RuntimeURLContainer RuntimeURL = "types.docker.com/RuntimeContainer"
16
+	// RuntimeURLPlugin is the proto url for the plugin type
17
+	RuntimeURLPlugin RuntimeURL = "types.docker.com/RuntimePlugin"
18
+)
... ...
@@ -65,6 +65,9 @@ type TaskSpec struct {
65 65
 	// ForceUpdate is a counter that triggers an update even if no relevant
66 66
 	// parameters have been changed.
67 67
 	ForceUpdate uint64
68
+
69
+	Runtime     RuntimeType `json:",omitempty"`
70
+	RuntimeData []byte      `json:",omitempty"`
68 71
 }
69 72
 
70 73
 // Resources represents resources (CPU/Memory).
71 74
new file mode 100644
... ...
@@ -0,0 +1,79 @@
0
+package plugin
1
+
2
+import (
3
+	"github.com/Sirupsen/logrus"
4
+	"github.com/docker/swarmkit/api"
5
+	"golang.org/x/net/context"
6
+)
7
+
8
+// Controller is the controller for the plugin backend
9
+type Controller struct{}
10
+
11
+// NewController returns a new cluster plugin controller
12
+func NewController() (*Controller, error) {
13
+	return &Controller{}, nil
14
+}
15
+
16
+// Update is the update phase from swarmkit
17
+func (p *Controller) Update(ctx context.Context, t *api.Task) error {
18
+	logrus.WithFields(logrus.Fields{
19
+		"controller": "plugin",
20
+	}).Debug("Update")
21
+	return nil
22
+}
23
+
24
+// Prepare is the prepare phase from swarmkit
25
+func (p *Controller) Prepare(ctx context.Context) error {
26
+	logrus.WithFields(logrus.Fields{
27
+		"controller": "plugin",
28
+	}).Debug("Prepare")
29
+	return nil
30
+}
31
+
32
+// Start is the start phase from swarmkit
33
+func (p *Controller) Start(ctx context.Context) error {
34
+	logrus.WithFields(logrus.Fields{
35
+		"controller": "plugin",
36
+	}).Debug("Start")
37
+	return nil
38
+}
39
+
40
+// Wait causes the task to wait until returned
41
+func (p *Controller) Wait(ctx context.Context) error {
42
+	logrus.WithFields(logrus.Fields{
43
+		"controller": "plugin",
44
+	}).Debug("Wait")
45
+	return nil
46
+}
47
+
48
+// Shutdown is the shutdown phase from swarmkit
49
+func (p *Controller) Shutdown(ctx context.Context) error {
50
+	logrus.WithFields(logrus.Fields{
51
+		"controller": "plugin",
52
+	}).Debug("Shutdown")
53
+	return nil
54
+}
55
+
56
+// Terminate is the terminate phase from swarmkit
57
+func (p *Controller) Terminate(ctx context.Context) error {
58
+	logrus.WithFields(logrus.Fields{
59
+		"controller": "plugin",
60
+	}).Debug("Terminate")
61
+	return nil
62
+}
63
+
64
+// Remove is the remove phase from swarmkit
65
+func (p *Controller) Remove(ctx context.Context) error {
66
+	logrus.WithFields(logrus.Fields{
67
+		"controller": "plugin",
68
+	}).Debug("Remove")
69
+	return nil
70
+}
71
+
72
+// Close is the close phase from swarmkit
73
+func (p *Controller) Close() error {
74
+	logrus.WithFields(logrus.Fields{
75
+		"controller": "plugin",
76
+	}).Debug("Close")
77
+	return nil
78
+}
... ...
@@ -11,11 +11,19 @@ import (
11 11
 )
12 12
 
13 13
 // ServiceFromGRPC converts a grpc Service to a Service.
14
-func ServiceFromGRPC(s swarmapi.Service) types.Service {
14
+func ServiceFromGRPC(s swarmapi.Service) (types.Service, error) {
15
+	curSpec, err := serviceSpecFromGRPC(&s.Spec)
16
+	if err != nil {
17
+		return types.Service{}, err
18
+	}
19
+	prevSpec, err := serviceSpecFromGRPC(s.PreviousSpec)
20
+	if err != nil {
21
+		return types.Service{}, err
22
+	}
15 23
 	service := types.Service{
16 24
 		ID:           s.ID,
17
-		Spec:         *serviceSpecFromGRPC(&s.Spec),
18
-		PreviousSpec: serviceSpecFromGRPC(s.PreviousSpec),
25
+		Spec:         *curSpec,
26
+		PreviousSpec: prevSpec,
19 27
 
20 28
 		Endpoint: endpointFromGRPC(s.Endpoint),
21 29
 	}
... ...
@@ -56,12 +64,12 @@ func ServiceFromGRPC(s swarmapi.Service) types.Service {
56 56
 		service.UpdateStatus.Message = s.UpdateStatus.Message
57 57
 	}
58 58
 
59
-	return service
59
+	return service, nil
60 60
 }
61 61
 
62
-func serviceSpecFromGRPC(spec *swarmapi.ServiceSpec) *types.ServiceSpec {
62
+func serviceSpecFromGRPC(spec *swarmapi.ServiceSpec) (*types.ServiceSpec, error) {
63 63
 	if spec == nil {
64
-		return nil
64
+		return nil, nil
65 65
 	}
66 66
 
67 67
 	serviceNetworks := make([]types.NetworkAttachmentConfig, 0, len(spec.Networks))
... ...
@@ -69,9 +77,29 @@ func serviceSpecFromGRPC(spec *swarmapi.ServiceSpec) *types.ServiceSpec {
69 69
 		serviceNetworks = append(serviceNetworks, types.NetworkAttachmentConfig{Target: n.Target, Aliases: n.Aliases})
70 70
 	}
71 71
 
72
+	taskTemplate := taskSpecFromGRPC(spec.Task)
73
+
74
+	switch t := spec.Task.Runtime.(type) {
75
+	case *swarmapi.TaskSpec_Container:
76
+		containerConfig := t.Container
77
+		taskTemplate.ContainerSpec = containerSpecFromGRPC(containerConfig)
78
+		taskTemplate.Runtime = types.RuntimeContainer
79
+	case *swarmapi.TaskSpec_Generic:
80
+		switch t.Generic.Payload.TypeUrl {
81
+		case string(types.RuntimeURLPlugin):
82
+			taskTemplate.Runtime = types.RuntimePlugin
83
+		default:
84
+			return nil, fmt.Errorf("unknown task runtime type: %s", t.Generic.Payload.TypeUrl)
85
+		}
86
+
87
+		taskTemplate.RuntimeData = t.Generic.Payload.Value
88
+	default:
89
+		return nil, fmt.Errorf("error creating service; unsupported runtime %T", t)
90
+	}
91
+
72 92
 	convertedSpec := &types.ServiceSpec{
73 93
 		Annotations:  annotationsFromGRPC(spec.Annotations),
74
-		TaskTemplate: taskSpecFromGRPC(spec.Task),
94
+		TaskTemplate: taskTemplate,
75 95
 		Networks:     serviceNetworks,
76 96
 		EndpointSpec: endpointSpecFromGRPC(spec.Endpoint),
77 97
 	}
... ...
@@ -90,7 +118,7 @@ func serviceSpecFromGRPC(spec *swarmapi.ServiceSpec) *types.ServiceSpec {
90 90
 		}
91 91
 	}
92 92
 
93
-	return convertedSpec
93
+	return convertedSpec, nil
94 94
 }
95 95
 
96 96
 // ServiceSpecToGRPC converts a ServiceSpec to a grpc ServiceSpec.
... ...
@@ -124,11 +152,26 @@ func ServiceSpecToGRPC(s types.ServiceSpec) (swarmapi.ServiceSpec, error) {
124 124
 		Networks: serviceNetworks,
125 125
 	}
126 126
 
127
-	containerSpec, err := containerToGRPC(s.TaskTemplate.ContainerSpec)
128
-	if err != nil {
129
-		return swarmapi.ServiceSpec{}, err
127
+	switch s.TaskTemplate.Runtime {
128
+	case types.RuntimeContainer, "": // if empty runtime default to container
129
+		containerSpec, err := containerToGRPC(s.TaskTemplate.ContainerSpec)
130
+		if err != nil {
131
+			return swarmapi.ServiceSpec{}, err
132
+		}
133
+		spec.Task.Runtime = &swarmapi.TaskSpec_Container{Container: containerSpec}
134
+	case types.RuntimePlugin:
135
+		spec.Task.Runtime = &swarmapi.TaskSpec_Generic{
136
+			Generic: &swarmapi.GenericRuntimeSpec{
137
+				Kind: string(types.RuntimePlugin),
138
+				Payload: &gogotypes.Any{
139
+					TypeUrl: string(types.RuntimeURLPlugin),
140
+					Value:   s.TaskTemplate.RuntimeData,
141
+				},
142
+			},
143
+		}
144
+	default:
145
+		return swarmapi.ServiceSpec{}, fmt.Errorf("error creating service; unsupported runtime %q", s.TaskTemplate.Runtime)
130 146
 	}
131
-	spec.Task.Runtime = &swarmapi.TaskSpec_Container{Container: containerSpec}
132 147
 
133 148
 	restartPolicy, err := restartPolicyToGRPC(s.TaskTemplate.RestartPolicy)
134 149
 	if err != nil {
... ...
@@ -1,18 +1,23 @@
1 1
 package container
2 2
 
3 3
 import (
4
+	"fmt"
4 5
 	"sort"
5 6
 	"strings"
6 7
 
8
+	"github.com/Sirupsen/logrus"
7 9
 	"github.com/docker/docker/api/types"
8 10
 	"github.com/docker/docker/api/types/filters"
9 11
 	"github.com/docker/docker/api/types/network"
12
+	swarmtypes "github.com/docker/docker/api/types/swarm"
13
+	"github.com/docker/docker/daemon/cluster/controllers/plugin"
10 14
 	executorpkg "github.com/docker/docker/daemon/cluster/executor"
11 15
 	clustertypes "github.com/docker/docker/daemon/cluster/provider"
12 16
 	networktypes "github.com/docker/libnetwork/types"
13 17
 	"github.com/docker/swarmkit/agent/exec"
14 18
 	"github.com/docker/swarmkit/agent/secrets"
15 19
 	"github.com/docker/swarmkit/api"
20
+	"github.com/docker/swarmkit/api/naming"
16 21
 	"golang.org/x/net/context"
17 22
 )
18 23
 
... ...
@@ -156,9 +161,35 @@ func (e *executor) Controller(t *api.Task) (exec.Controller, error) {
156 156
 		return newNetworkAttacherController(e.backend, t, e.secrets)
157 157
 	}
158 158
 
159
-	ctlr, err := newController(e.backend, t, secrets.Restrict(e.secrets, t))
160
-	if err != nil {
161
-		return nil, err
159
+	var ctlr exec.Controller
160
+	switch r := t.Spec.GetRuntime().(type) {
161
+	case *api.TaskSpec_Generic:
162
+		logrus.WithFields(logrus.Fields{
163
+			"kind":       r.Generic.Kind,
164
+			"runtimeUrl": r.Generic.Payload.TypeUrl,
165
+		}).Debug("custom runtime requested")
166
+		runtimeKind, err := naming.Runtime(t.Spec)
167
+		if err != nil {
168
+			return ctlr, err
169
+		}
170
+		switch runtimeKind {
171
+		case string(swarmtypes.RuntimePlugin):
172
+			c, err := plugin.NewController()
173
+			if err != nil {
174
+				return ctlr, err
175
+			}
176
+			ctlr = c
177
+		default:
178
+			return ctlr, fmt.Errorf("unsupported runtime type: %q", r.Generic.Kind)
179
+		}
180
+	case *api.TaskSpec_Container:
181
+		c, err := newController(e.backend, t, secrets.Restrict(e.secrets, t))
182
+		if err != nil {
183
+			return nil, err
184
+		}
185
+		ctlr = c
186
+	default:
187
+		return nil, fmt.Errorf("unsupported runtime: %q", r)
162 188
 	}
163 189
 
164 190
 	return ctlr, nil
... ...
@@ -80,7 +80,11 @@ func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Serv
80 80
 				continue
81 81
 			}
82 82
 		}
83
-		services = append(services, convert.ServiceFromGRPC(*service))
83
+		svcs, err := convert.ServiceFromGRPC(*service)
84
+		if err != nil {
85
+			return nil, err
86
+		}
87
+		services = append(services, svcs)
84 88
 	}
85 89
 
86 90
 	return services, nil
... ...
@@ -99,7 +103,11 @@ func (c *Cluster) GetService(input string, insertDefaults bool) (types.Service,
99 99
 	}); err != nil {
100 100
 		return types.Service{}, err
101 101
 	}
102
-	return convert.ServiceFromGRPC(*service), nil
102
+	svc, err := convert.ServiceFromGRPC(*service)
103
+	if err != nil {
104
+		return types.Service{}, err
105
+	}
106
+	return svc, nil
103 107
 }
104 108
 
105 109
 // CreateService creates a new service in a managed swarm cluster.
... ...
@@ -116,58 +124,65 @@ func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string) (*apity
116 116
 			return apierrors.NewBadRequestError(err)
117 117
 		}
118 118
 
119
-		ctnr := serviceSpec.Task.GetContainer()
120
-		if ctnr == nil {
121
-			return errors.New("service does not use container tasks")
122
-		}
119
+		resp = &apitypes.ServiceCreateResponse{}
123 120
 
124
-		if encodedAuth != "" {
125
-			ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
126
-		}
121
+		switch serviceSpec.Task.Runtime.(type) {
122
+		// handle other runtimes here
123
+		case *swarmapi.TaskSpec_Container:
124
+			ctnr := serviceSpec.Task.GetContainer()
125
+			if ctnr == nil {
126
+				return errors.New("service does not use container tasks")
127
+			}
128
+			if encodedAuth != "" {
129
+				ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
130
+			}
127 131
 
128
-		// retrieve auth config from encoded auth
129
-		authConfig := &apitypes.AuthConfig{}
130
-		if encodedAuth != "" {
131
-			if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
132
-				logrus.Warnf("invalid authconfig: %v", err)
132
+			// retrieve auth config from encoded auth
133
+			authConfig := &apitypes.AuthConfig{}
134
+			if encodedAuth != "" {
135
+				if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
136
+					logrus.Warnf("invalid authconfig: %v", err)
137
+				}
133 138
 			}
134
-		}
135 139
 
136
-		resp = &apitypes.ServiceCreateResponse{}
140
+			// pin image by digest
141
+			if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" {
142
+				digestImage, err := c.imageWithDigestString(ctx, ctnr.Image, authConfig)
143
+				if err != nil {
144
+					logrus.Warnf("unable to pin image %s to digest: %s", ctnr.Image, err.Error())
145
+					// warning in the client response should be concise
146
+					resp.Warnings = append(resp.Warnings, digestWarning(ctnr.Image))
137 147
 
138
-		// pin image by digest
139
-		if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" {
140
-			digestImage, err := c.imageWithDigestString(ctx, ctnr.Image, authConfig)
141
-			if err != nil {
142
-				logrus.Warnf("unable to pin image %s to digest: %s", ctnr.Image, err.Error())
143
-				// warning in the client response should be concise
144
-				resp.Warnings = append(resp.Warnings, digestWarning(ctnr.Image))
145
-			} else if ctnr.Image != digestImage {
146
-				logrus.Debugf("pinning image %s by digest: %s", ctnr.Image, digestImage)
147
-				ctnr.Image = digestImage
148
-			} else {
149
-				logrus.Debugf("creating service using supplied digest reference %s", ctnr.Image)
148
+				} else if ctnr.Image != digestImage {
149
+					logrus.Debugf("pinning image %s by digest: %s", ctnr.Image, digestImage)
150
+					ctnr.Image = digestImage
151
+
152
+				} else {
153
+					logrus.Debugf("creating service using supplied digest reference %s", ctnr.Image)
154
+
155
+				}
156
+
157
+				// Replace the context with a fresh one.
158
+				// If we timed out while communicating with the
159
+				// registry, then "ctx" will already be expired, which
160
+				// would cause UpdateService below to fail. Reusing
161
+				// "ctx" could make it impossible to create a service
162
+				// if the registry is slow or unresponsive.
163
+				var cancel func()
164
+				ctx, cancel = c.getRequestContext()
165
+				defer cancel()
150 166
 			}
151 167
 
152
-			// Replace the context with a fresh one.
153
-			// If we timed out while communicating with the
154
-			// registry, then "ctx" will already be expired, which
155
-			// would cause UpdateService below to fail. Reusing
156
-			// "ctx" could make it impossible to create a service
157
-			// if the registry is slow or unresponsive.
158
-			var cancel func()
159
-			ctx, cancel = c.getRequestContext()
160
-			defer cancel()
161
-		}
168
+			r, err := state.controlClient.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
169
+			if err != nil {
170
+				return err
171
+			}
162 172
 
163
-		r, err := state.controlClient.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
164
-		if err != nil {
165
-			return err
173
+			resp.ID = r.Service.ID
166 174
 		}
167
-
168
-		resp.ID = r.Service.ID
169 175
 		return nil
170 176
 	})
177
+
171 178
 	return resp, err
172 179
 }
173 180