Browse code

Add a lockedManagerAction method to Cluster…

… in order to remove duplication.
Each time we update a cluster object, we do some common
operations (lock, verify it's on a manager, get the request context,
and the update). This introduce a method and refactor few
update/remove method that allows to duplicate less code.

Signed-off-by: Vincent Demeester <vincent@sbr.pm>

Vincent Demeester authored on 2017/02/28 19:12:11
Showing 7 changed files
... ...
@@ -386,3 +386,18 @@ func detectLockedError(err error) error {
386 386
 	}
387 387
 	return err
388 388
 }
389
+
390
+func (c *Cluster) lockedManagerAction(fn func(ctx context.Context, state nodeState) error) error {
391
+	c.mu.RLock()
392
+	defer c.mu.RUnlock()
393
+
394
+	state := c.currentNodeState()
395
+	if !state.IsActiveManager() {
396
+		return c.errNoManager(state)
397
+	}
398
+
399
+	ctx, cancel := c.getRequestContext()
400
+	defer cancel()
401
+
402
+	return fn(ctx, state)
403
+}
... ...
@@ -48,19 +48,16 @@ func (c *Cluster) getNetworks(filters *swarmapi.ListNetworksRequest_Filters) ([]
48 48
 
49 49
 // GetNetwork returns a cluster network by an ID.
50 50
 func (c *Cluster) GetNetwork(input string) (apitypes.NetworkResource, error) {
51
-	c.mu.RLock()
52
-	defer c.mu.RUnlock()
53
-
54
-	state := c.currentNodeState()
55
-	if !state.IsActiveManager() {
56
-		return apitypes.NetworkResource{}, c.errNoManager(state)
57
-	}
51
+	var network *swarmapi.Network
58 52
 
59
-	ctx, cancel := c.getRequestContext()
60
-	defer cancel()
61
-
62
-	network, err := getNetwork(ctx, state.controlClient, input)
63
-	if err != nil {
53
+	if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
54
+		n, err := getNetwork(ctx, state.controlClient, input)
55
+		if err != nil {
56
+			return err
57
+		}
58
+		network = n
59
+		return nil
60
+	}); err != nil {
64 61
 		return apitypes.NetworkResource{}, err
65 62
 	}
66 63
 	return convert.BasicNetworkFromGRPC(*network), nil
... ...
@@ -224,51 +221,38 @@ func (c *Cluster) DetachNetwork(target string, containerID string) error {
224 224
 
225 225
 // CreateNetwork creates a new cluster managed network.
226 226
 func (c *Cluster) CreateNetwork(s apitypes.NetworkCreateRequest) (string, error) {
227
-	c.mu.RLock()
228
-	defer c.mu.RUnlock()
229
-
230
-	state := c.currentNodeState()
231
-	if !state.IsActiveManager() {
232
-		return "", c.errNoManager(state)
233
-	}
234
-
235 227
 	if runconfig.IsPreDefinedNetwork(s.Name) {
236 228
 		err := fmt.Errorf("%s is a pre-defined network and cannot be created", s.Name)
237 229
 		return "", apierrors.NewRequestForbiddenError(err)
238 230
 	}
239 231
 
240
-	ctx, cancel := c.getRequestContext()
241
-	defer cancel()
242
-
243
-	networkSpec := convert.BasicNetworkCreateToGRPC(s)
244
-	r, err := state.controlClient.CreateNetwork(ctx, &swarmapi.CreateNetworkRequest{Spec: &networkSpec})
245
-	if err != nil {
232
+	var resp *swarmapi.CreateNetworkResponse
233
+	if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
234
+		networkSpec := convert.BasicNetworkCreateToGRPC(s)
235
+		r, err := state.controlClient.CreateNetwork(ctx, &swarmapi.CreateNetworkRequest{Spec: &networkSpec})
236
+		if err != nil {
237
+			return err
238
+		}
239
+		resp = r
240
+		return nil
241
+	}); err != nil {
246 242
 		return "", err
247 243
 	}
248 244
 
249
-	return r.Network.ID, nil
245
+	return resp.Network.ID, nil
250 246
 }
251 247
 
252 248
 // RemoveNetwork removes a cluster network.
253 249
 func (c *Cluster) RemoveNetwork(input string) error {
254
-	c.mu.RLock()
255
-	defer c.mu.RUnlock()
256
-
257
-	state := c.currentNodeState()
258
-	if !state.IsActiveManager() {
259
-		return c.errNoManager(state)
260
-	}
261
-
262
-	ctx, cancel := c.getRequestContext()
263
-	defer cancel()
250
+	return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
251
+		network, err := getNetwork(ctx, state.controlClient, input)
252
+		if err != nil {
253
+			return err
254
+		}
264 255
 
265
-	network, err := getNetwork(ctx, state.controlClient, input)
266
-	if err != nil {
256
+		_, err = state.controlClient.RemoveNetwork(ctx, &swarmapi.RemoveNetworkRequest{NetworkID: network.ID})
267 257
 		return err
268
-	}
269
-
270
-	_, err = state.controlClient.RemoveNetwork(ctx, &swarmapi.RemoveNetworkRequest{NetworkID: network.ID})
271
-	return err
258
+	})
272 259
 }
273 260
 
274 261
 func (c *Cluster) populateNetworkID(ctx context.Context, client swarmapi.ControlClient, s *types.ServiceSpec) error {
... ...
@@ -6,6 +6,7 @@ import (
6 6
 	types "github.com/docker/docker/api/types/swarm"
7 7
 	"github.com/docker/docker/daemon/cluster/convert"
8 8
 	swarmapi "github.com/docker/swarmkit/api"
9
+	"golang.org/x/net/context"
9 10
 )
10 11
 
11 12
 // GetNodes returns a list of all nodes known to a cluster.
... ...
@@ -43,78 +44,61 @@ func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, erro
43 43
 
44 44
 // GetNode returns a node based on an ID.
45 45
 func (c *Cluster) GetNode(input string) (types.Node, error) {
46
-	c.mu.RLock()
47
-	defer c.mu.RUnlock()
48
-
49
-	state := c.currentNodeState()
50
-	if !state.IsActiveManager() {
51
-		return types.Node{}, c.errNoManager(state)
52
-	}
53
-
54
-	ctx, cancel := c.getRequestContext()
55
-	defer cancel()
56
-
57
-	node, err := getNode(ctx, state.controlClient, input)
58
-	if err != nil {
46
+	var node *swarmapi.Node
47
+
48
+	if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
49
+		n, err := getNode(ctx, state.controlClient, input)
50
+		if err != nil {
51
+			return err
52
+		}
53
+		node = n
54
+		return nil
55
+	}); err != nil {
59 56
 		return types.Node{}, err
60 57
 	}
58
+
61 59
 	return convert.NodeFromGRPC(*node), nil
62 60
 }
63 61
 
64 62
 // UpdateNode updates existing nodes properties.
65 63
 func (c *Cluster) UpdateNode(input string, version uint64, spec types.NodeSpec) error {
66
-	c.mu.RLock()
67
-	defer c.mu.RUnlock()
68
-
69
-	state := c.currentNodeState()
70
-	if !state.IsActiveManager() {
71
-		return c.errNoManager(state)
72
-	}
73
-
74
-	nodeSpec, err := convert.NodeSpecToGRPC(spec)
75
-	if err != nil {
76
-		return apierrors.NewBadRequestError(err)
77
-	}
78
-
79
-	ctx, cancel := c.getRequestContext()
80
-	defer cancel()
81
-
82
-	currentNode, err := getNode(ctx, state.controlClient, input)
83
-	if err != nil {
84
-		return err
85
-	}
86
-
87
-	_, err = state.controlClient.UpdateNode(
88
-		ctx,
89
-		&swarmapi.UpdateNodeRequest{
90
-			NodeID: currentNode.ID,
91
-			Spec:   &nodeSpec,
92
-			NodeVersion: &swarmapi.Version{
93
-				Index: version,
64
+	return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
65
+		nodeSpec, err := convert.NodeSpecToGRPC(spec)
66
+		if err != nil {
67
+			return apierrors.NewBadRequestError(err)
68
+		}
69
+
70
+		ctx, cancel := c.getRequestContext()
71
+		defer cancel()
72
+
73
+		currentNode, err := getNode(ctx, state.controlClient, input)
74
+		if err != nil {
75
+			return err
76
+		}
77
+
78
+		_, err = state.controlClient.UpdateNode(
79
+			ctx,
80
+			&swarmapi.UpdateNodeRequest{
81
+				NodeID: currentNode.ID,
82
+				Spec:   &nodeSpec,
83
+				NodeVersion: &swarmapi.Version{
84
+					Index: version,
85
+				},
94 86
 			},
95
-		},
96
-	)
97
-	return err
87
+		)
88
+		return err
89
+	})
98 90
 }
99 91
 
100 92
 // RemoveNode removes a node from a cluster
101 93
 func (c *Cluster) RemoveNode(input string, force bool) error {
102
-	c.mu.RLock()
103
-	defer c.mu.RUnlock()
94
+	return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
95
+		node, err := getNode(ctx, state.controlClient, input)
96
+		if err != nil {
97
+			return err
98
+		}
104 99
 
105
-	state := c.currentNodeState()
106
-	if !state.IsActiveManager() {
107
-		return c.errNoManager(state)
108
-	}
109
-
110
-	ctx, cancel := c.getRequestContext()
111
-	defer cancel()
112
-
113
-	node, err := getNode(ctx, state.controlClient, input)
114
-	if err != nil {
100
+		_, err = state.controlClient.RemoveNode(ctx, &swarmapi.RemoveNodeRequest{NodeID: node.ID, Force: force})
115 101
 		return err
116
-	}
117
-
118
-	_, err = state.controlClient.RemoveNode(ctx, &swarmapi.RemoveNodeRequest{NodeID: node.ID, Force: force})
119
-	return err
102
+	})
120 103
 }
... ...
@@ -5,23 +5,21 @@ import (
5 5
 	types "github.com/docker/docker/api/types/swarm"
6 6
 	"github.com/docker/docker/daemon/cluster/convert"
7 7
 	swarmapi "github.com/docker/swarmkit/api"
8
+	"golang.org/x/net/context"
8 9
 )
9 10
 
10 11
 // GetSecret returns a secret from a managed swarm cluster
11 12
 func (c *Cluster) GetSecret(input string) (types.Secret, error) {
12
-	c.mu.RLock()
13
-	defer c.mu.RUnlock()
14
-
15
-	state := c.currentNodeState()
16
-	if !state.IsActiveManager() {
17
-		return types.Secret{}, c.errNoManager(state)
18
-	}
19
-
20
-	ctx, cancel := c.getRequestContext()
21
-	defer cancel()
22
-
23
-	secret, err := getSecret(ctx, state.controlClient, input)
24
-	if err != nil {
13
+	var secret *swarmapi.Secret
14
+
15
+	if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
16
+		s, err := getSecret(ctx, state.controlClient, input)
17
+		if err != nil {
18
+			return err
19
+		}
20
+		secret = s
21
+		return nil
22
+	}); err != nil {
25 23
 		return types.Secret{}, err
26 24
 	}
27 25
 	return convert.SecretFromGRPC(secret), nil
... ...
@@ -61,77 +59,54 @@ func (c *Cluster) GetSecrets(options apitypes.SecretListOptions) ([]types.Secret
61 61
 
62 62
 // CreateSecret creates a new secret in a managed swarm cluster.
63 63
 func (c *Cluster) CreateSecret(s types.SecretSpec) (string, error) {
64
-	c.mu.RLock()
65
-	defer c.mu.RUnlock()
66
-
67
-	state := c.currentNodeState()
68
-	if !state.IsActiveManager() {
69
-		return "", c.errNoManager(state)
70
-	}
71
-
72
-	ctx, cancel := c.getRequestContext()
73
-	defer cancel()
74
-
75
-	secretSpec := convert.SecretSpecToGRPC(s)
76
-
77
-	r, err := state.controlClient.CreateSecret(ctx,
78
-		&swarmapi.CreateSecretRequest{Spec: &secretSpec})
79
-	if err != nil {
64
+	var resp *swarmapi.CreateSecretResponse
65
+	if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
66
+		secretSpec := convert.SecretSpecToGRPC(s)
67
+
68
+		r, err := state.controlClient.CreateSecret(ctx,
69
+			&swarmapi.CreateSecretRequest{Spec: &secretSpec})
70
+		if err != nil {
71
+			return err
72
+		}
73
+		resp = r
74
+		return nil
75
+	}); err != nil {
80 76
 		return "", err
81 77
 	}
82
-
83
-	return r.Secret.ID, nil
78
+	return resp.Secret.ID, nil
84 79
 }
85 80
 
86 81
 // RemoveSecret removes a secret from a managed swarm cluster.
87 82
 func (c *Cluster) RemoveSecret(input string) error {
88
-	c.mu.RLock()
89
-	defer c.mu.RUnlock()
90
-
91
-	state := c.currentNodeState()
92
-	if !state.IsActiveManager() {
93
-		return c.errNoManager(state)
94
-	}
83
+	return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
84
+		secret, err := getSecret(ctx, state.controlClient, input)
85
+		if err != nil {
86
+			return err
87
+		}
95 88
 
96
-	ctx, cancel := c.getRequestContext()
97
-	defer cancel()
89
+		req := &swarmapi.RemoveSecretRequest{
90
+			SecretID: secret.ID,
91
+		}
98 92
 
99
-	secret, err := getSecret(ctx, state.controlClient, input)
100
-	if err != nil {
93
+		_, err = state.controlClient.RemoveSecret(ctx, req)
101 94
 		return err
102
-	}
103
-
104
-	req := &swarmapi.RemoveSecretRequest{
105
-		SecretID: secret.ID,
106
-	}
107
-
108
-	_, err = state.controlClient.RemoveSecret(ctx, req)
109
-	return err
95
+	})
110 96
 }
111 97
 
112 98
 // UpdateSecret updates a secret in a managed swarm cluster.
113 99
 // Note: this is not exposed to the CLI but is available from the API only
114 100
 func (c *Cluster) UpdateSecret(id string, version uint64, spec types.SecretSpec) error {
115
-	c.mu.RLock()
116
-	defer c.mu.RUnlock()
117
-
118
-	state := c.currentNodeState()
119
-	if !state.IsActiveManager() {
120
-		return c.errNoManager(state)
121
-	}
122
-
123
-	ctx, cancel := c.getRequestContext()
124
-	defer cancel()
125
-
126
-	secretSpec := convert.SecretSpecToGRPC(spec)
127
-
128
-	_, err := state.controlClient.UpdateSecret(ctx,
129
-		&swarmapi.UpdateSecretRequest{
130
-			SecretID: id,
131
-			SecretVersion: &swarmapi.Version{
132
-				Index: version,
133
-			},
134
-			Spec: &secretSpec,
135
-		})
136
-	return err
101
+	return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
102
+		secretSpec := convert.SecretSpecToGRPC(spec)
103
+
104
+		_, err := state.controlClient.UpdateSecret(ctx,
105
+			&swarmapi.UpdateSecretRequest{
106
+				SecretID: id,
107
+				SecretVersion: &swarmapi.Version{
108
+					Index: version,
109
+				},
110
+				Spec: &secretSpec,
111
+			})
112
+		return err
113
+	})
137 114
 }
... ...
@@ -59,19 +59,15 @@ func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Serv
59 59
 
60 60
 // GetService returns a service based on an ID or name.
61 61
 func (c *Cluster) GetService(input string) (types.Service, error) {
62
-	c.mu.RLock()
63
-	defer c.mu.RUnlock()
64
-
65
-	state := c.currentNodeState()
66
-	if !state.IsActiveManager() {
67
-		return types.Service{}, c.errNoManager(state)
68
-	}
69
-
70
-	ctx, cancel := c.getRequestContext()
71
-	defer cancel()
72
-
73
-	service, err := getService(ctx, state.controlClient, input)
74
-	if err != nil {
62
+	var service *swarmapi.Service
63
+	if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
64
+		s, err := getService(ctx, state.controlClient, input)
65
+		if err != nil {
66
+			return err
67
+		}
68
+		service = s
69
+		return nil
70
+	}); err != nil {
75 71
 		return types.Service{}, err
76 72
 	}
77 73
 	return convert.ServiceFromGRPC(*service), nil
... ...
@@ -79,187 +75,165 @@ func (c *Cluster) GetService(input string) (types.Service, error) {
79 79
 
80 80
 // CreateService creates a new service in a managed swarm cluster.
81 81
 func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string) (*apitypes.ServiceCreateResponse, error) {
82
-	c.mu.RLock()
83
-	defer c.mu.RUnlock()
84
-
85
-	state := c.currentNodeState()
86
-	if !state.IsActiveManager() {
87
-		return nil, c.errNoManager(state)
88
-	}
89
-
90
-	ctx, cancel := c.getRequestContext()
91
-	defer cancel()
92
-
93
-	err := c.populateNetworkID(ctx, state.controlClient, &s)
94
-	if err != nil {
95
-		return nil, err
96
-	}
82
+	var resp *apitypes.ServiceCreateResponse
83
+	err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
84
+		err := c.populateNetworkID(ctx, state.controlClient, &s)
85
+		if err != nil {
86
+			return err
87
+		}
97 88
 
98
-	serviceSpec, err := convert.ServiceSpecToGRPC(s)
99
-	if err != nil {
100
-		return nil, apierrors.NewBadRequestError(err)
101
-	}
89
+		serviceSpec, err := convert.ServiceSpecToGRPC(s)
90
+		if err != nil {
91
+			return apierrors.NewBadRequestError(err)
92
+		}
102 93
 
103
-	ctnr := serviceSpec.Task.GetContainer()
104
-	if ctnr == nil {
105
-		return nil, errors.New("service does not use container tasks")
106
-	}
94
+		ctnr := serviceSpec.Task.GetContainer()
95
+		if ctnr == nil {
96
+			return errors.New("service does not use container tasks")
97
+		}
107 98
 
108
-	if encodedAuth != "" {
109
-		ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
110
-	}
99
+		if encodedAuth != "" {
100
+			ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
101
+		}
111 102
 
112
-	// retrieve auth config from encoded auth
113
-	authConfig := &apitypes.AuthConfig{}
114
-	if encodedAuth != "" {
115
-		if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
116
-			logrus.Warnf("invalid authconfig: %v", err)
103
+		// retrieve auth config from encoded auth
104
+		authConfig := &apitypes.AuthConfig{}
105
+		if encodedAuth != "" {
106
+			if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
107
+				logrus.Warnf("invalid authconfig: %v", err)
108
+			}
117 109
 		}
118
-	}
119 110
 
120
-	resp := &apitypes.ServiceCreateResponse{}
111
+		resp = &apitypes.ServiceCreateResponse{}
112
+
113
+		// pin image by digest
114
+		if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" {
115
+			digestImage, err := c.imageWithDigestString(ctx, ctnr.Image, authConfig)
116
+			if err != nil {
117
+				logrus.Warnf("unable to pin image %s to digest: %s", ctnr.Image, err.Error())
118
+				resp.Warnings = append(resp.Warnings, fmt.Sprintf("unable to pin image %s to digest: %s", ctnr.Image, err.Error()))
119
+			} else if ctnr.Image != digestImage {
120
+				logrus.Debugf("pinning image %s by digest: %s", ctnr.Image, digestImage)
121
+				ctnr.Image = digestImage
122
+			} else {
123
+				logrus.Debugf("creating service using supplied digest reference %s", ctnr.Image)
124
+			}
125
+		}
121 126
 
122
-	// pin image by digest
123
-	if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" {
124
-		digestImage, err := c.imageWithDigestString(ctx, ctnr.Image, authConfig)
127
+		r, err := state.controlClient.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
125 128
 		if err != nil {
126
-			logrus.Warnf("unable to pin image %s to digest: %s", ctnr.Image, err.Error())
127
-			resp.Warnings = append(resp.Warnings, fmt.Sprintf("unable to pin image %s to digest: %s", ctnr.Image, err.Error()))
128
-		} else if ctnr.Image != digestImage {
129
-			logrus.Debugf("pinning image %s by digest: %s", ctnr.Image, digestImage)
130
-			ctnr.Image = digestImage
131
-		} else {
132
-			logrus.Debugf("creating service using supplied digest reference %s", ctnr.Image)
129
+			return err
133 130
 		}
134
-	}
135
-
136
-	r, err := state.controlClient.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
137
-	if err != nil {
138
-		return nil, err
139
-	}
140 131
 
141
-	resp.ID = r.Service.ID
142
-	return resp, nil
132
+		resp.ID = r.Service.ID
133
+		return nil
134
+	})
135
+	return resp, err
143 136
 }
144 137
 
145 138
 // UpdateService updates existing service to match new properties.
146 139
 func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec types.ServiceSpec, encodedAuth string, registryAuthFrom string) (*apitypes.ServiceUpdateResponse, error) {
147
-	c.mu.RLock()
148
-	defer c.mu.RUnlock()
149
-
150
-	state := c.currentNodeState()
151
-	if !state.IsActiveManager() {
152
-		return nil, c.errNoManager(state)
153
-	}
140
+	var resp *apitypes.ServiceUpdateResponse
154 141
 
155
-	ctx, cancel := c.getRequestContext()
156
-	defer cancel()
142
+	err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
157 143
 
158
-	err := c.populateNetworkID(ctx, state.controlClient, &spec)
159
-	if err != nil {
160
-		return nil, err
161
-	}
144
+		err := c.populateNetworkID(ctx, state.controlClient, &spec)
145
+		if err != nil {
146
+			return err
147
+		}
162 148
 
163
-	serviceSpec, err := convert.ServiceSpecToGRPC(spec)
164
-	if err != nil {
165
-		return nil, apierrors.NewBadRequestError(err)
166
-	}
149
+		serviceSpec, err := convert.ServiceSpecToGRPC(spec)
150
+		if err != nil {
151
+			return apierrors.NewBadRequestError(err)
152
+		}
167 153
 
168
-	currentService, err := getService(ctx, state.controlClient, serviceIDOrName)
169
-	if err != nil {
170
-		return nil, err
171
-	}
154
+		currentService, err := getService(ctx, state.controlClient, serviceIDOrName)
155
+		if err != nil {
156
+			return err
157
+		}
172 158
 
173
-	newCtnr := serviceSpec.Task.GetContainer()
174
-	if newCtnr == nil {
175
-		return nil, errors.New("service does not use container tasks")
176
-	}
159
+		newCtnr := serviceSpec.Task.GetContainer()
160
+		if newCtnr == nil {
161
+			return errors.New("service does not use container tasks")
162
+		}
177 163
 
178
-	if encodedAuth != "" {
179
-		newCtnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
180
-	} else {
181
-		// this is needed because if the encodedAuth isn't being updated then we
182
-		// shouldn't lose it, and continue to use the one that was already present
183
-		var ctnr *swarmapi.ContainerSpec
184
-		switch registryAuthFrom {
185
-		case apitypes.RegistryAuthFromSpec, "":
186
-			ctnr = currentService.Spec.Task.GetContainer()
187
-		case apitypes.RegistryAuthFromPreviousSpec:
188
-			if currentService.PreviousSpec == nil {
189
-				return nil, errors.New("service does not have a previous spec")
164
+		if encodedAuth != "" {
165
+			newCtnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
166
+		} else {
167
+			// this is needed because if the encodedAuth isn't being updated then we
168
+			// shouldn't lose it, and continue to use the one that was already present
169
+			var ctnr *swarmapi.ContainerSpec
170
+			switch registryAuthFrom {
171
+			case apitypes.RegistryAuthFromSpec, "":
172
+				ctnr = currentService.Spec.Task.GetContainer()
173
+			case apitypes.RegistryAuthFromPreviousSpec:
174
+				if currentService.PreviousSpec == nil {
175
+					return errors.New("service does not have a previous spec")
176
+				}
177
+				ctnr = currentService.PreviousSpec.Task.GetContainer()
178
+			default:
179
+				return errors.New("unsupported registryAuthFrom value")
180
+			}
181
+			if ctnr == nil {
182
+				return errors.New("service does not use container tasks")
183
+			}
184
+			newCtnr.PullOptions = ctnr.PullOptions
185
+			// update encodedAuth so it can be used to pin image by digest
186
+			if ctnr.PullOptions != nil {
187
+				encodedAuth = ctnr.PullOptions.RegistryAuth
190 188
 			}
191
-			ctnr = currentService.PreviousSpec.Task.GetContainer()
192
-		default:
193
-			return nil, errors.New("unsupported registryAuthFrom value")
194
-		}
195
-		if ctnr == nil {
196
-			return nil, errors.New("service does not use container tasks")
197
-		}
198
-		newCtnr.PullOptions = ctnr.PullOptions
199
-		// update encodedAuth so it can be used to pin image by digest
200
-		if ctnr.PullOptions != nil {
201
-			encodedAuth = ctnr.PullOptions.RegistryAuth
202 189
 		}
203
-	}
204 190
 
205
-	// retrieve auth config from encoded auth
206
-	authConfig := &apitypes.AuthConfig{}
207
-	if encodedAuth != "" {
208
-		if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
209
-			logrus.Warnf("invalid authconfig: %v", err)
191
+		// retrieve auth config from encoded auth
192
+		authConfig := &apitypes.AuthConfig{}
193
+		if encodedAuth != "" {
194
+			if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
195
+				logrus.Warnf("invalid authconfig: %v", err)
196
+			}
210 197
 		}
211
-	}
212 198
 
213
-	resp := &apitypes.ServiceUpdateResponse{}
214
-
215
-	// pin image by digest
216
-	if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" {
217
-		digestImage, err := c.imageWithDigestString(ctx, newCtnr.Image, authConfig)
218
-		if err != nil {
219
-			logrus.Warnf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error())
220
-			resp.Warnings = append(resp.Warnings, fmt.Sprintf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error()))
221
-		} else if newCtnr.Image != digestImage {
222
-			logrus.Debugf("pinning image %s by digest: %s", newCtnr.Image, digestImage)
223
-			newCtnr.Image = digestImage
224
-		} else {
225
-			logrus.Debugf("updating service using supplied digest reference %s", newCtnr.Image)
199
+		resp := &apitypes.ServiceUpdateResponse{}
200
+
201
+		// pin image by digest
202
+		if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" {
203
+			digestImage, err := c.imageWithDigestString(ctx, newCtnr.Image, authConfig)
204
+			if err != nil {
205
+				logrus.Warnf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error())
206
+				resp.Warnings = append(resp.Warnings, fmt.Sprintf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error()))
207
+			} else if newCtnr.Image != digestImage {
208
+				logrus.Debugf("pinning image %s by digest: %s", newCtnr.Image, digestImage)
209
+				newCtnr.Image = digestImage
210
+			} else {
211
+				logrus.Debugf("updating service using supplied digest reference %s", newCtnr.Image)
212
+			}
226 213
 		}
227
-	}
228 214
 
229
-	_, err = state.controlClient.UpdateService(
230
-		ctx,
231
-		&swarmapi.UpdateServiceRequest{
232
-			ServiceID: currentService.ID,
233
-			Spec:      &serviceSpec,
234
-			ServiceVersion: &swarmapi.Version{
235
-				Index: version,
215
+		_, err = state.controlClient.UpdateService(
216
+			ctx,
217
+			&swarmapi.UpdateServiceRequest{
218
+				ServiceID: currentService.ID,
219
+				Spec:      &serviceSpec,
220
+				ServiceVersion: &swarmapi.Version{
221
+					Index: version,
222
+				},
236 223
 			},
237
-		},
238
-	)
239
-
224
+		)
225
+		return err
226
+	})
240 227
 	return resp, err
241 228
 }
242 229
 
243 230
 // RemoveService removes a service from a managed swarm cluster.
244 231
 func (c *Cluster) RemoveService(input string) error {
245
-	c.mu.RLock()
246
-	defer c.mu.RUnlock()
247
-
248
-	state := c.currentNodeState()
249
-	if !state.IsActiveManager() {
250
-		return c.errNoManager(state)
251
-	}
252
-
253
-	ctx, cancel := c.getRequestContext()
254
-	defer cancel()
232
+	return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
233
+		service, err := getService(ctx, state.controlClient, input)
234
+		if err != nil {
235
+			return err
236
+		}
255 237
 
256
-	service, err := getService(ctx, state.controlClient, input)
257
-	if err != nil {
238
+		_, err = state.controlClient.RemoveService(ctx, &swarmapi.RemoveServiceRequest{ServiceID: service.ID})
258 239
 		return err
259
-	}
260
-
261
-	_, err = state.controlClient.RemoveService(ctx, &swarmapi.RemoveServiceRequest{ServiceID: service.ID})
262
-	return err
240
+	})
263 241
 }
264 242
 
265 243
 // ServiceLogs collects service logs and writes them back to `config.OutStream`
... ...
@@ -187,95 +187,75 @@ func (c *Cluster) Join(req types.JoinRequest) error {
187 187
 
188 188
 // Inspect retrieves the configuration properties of a managed swarm cluster.
189 189
 func (c *Cluster) Inspect() (types.Swarm, error) {
190
-	c.mu.RLock()
191
-	defer c.mu.RUnlock()
192
-
193
-	state := c.currentNodeState()
194
-	if !state.IsActiveManager() {
195
-		return types.Swarm{}, c.errNoManager(state)
196
-	}
197
-
198
-	ctx, cancel := c.getRequestContext()
199
-	defer cancel()
200
-
201
-	swarm, err := getSwarm(ctx, state.controlClient)
202
-	if err != nil {
190
+	var swarm *swarmapi.Cluster
191
+	if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
192
+		s, err := getSwarm(ctx, state.controlClient)
193
+		if err != nil {
194
+			return err
195
+		}
196
+		swarm = s
197
+		return nil
198
+	}); err != nil {
203 199
 		return types.Swarm{}, err
204 200
 	}
205
-
206 201
 	return convert.SwarmFromGRPC(*swarm), nil
207 202
 }
208 203
 
209 204
 // Update updates configuration of a managed swarm cluster.
210 205
 func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error {
211
-	c.mu.RLock()
212
-	defer c.mu.RUnlock()
213
-
214
-	state := c.currentNodeState()
215
-	if !state.IsActiveManager() {
216
-		return c.errNoManager(state)
217
-	}
218
-
219
-	ctx, cancel := c.getRequestContext()
220
-	defer cancel()
221
-
222
-	swarm, err := getSwarm(ctx, state.controlClient)
223
-	if err != nil {
224
-		return err
225
-	}
206
+	return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
207
+		swarm, err := getSwarm(ctx, state.controlClient)
208
+		if err != nil {
209
+			return err
210
+		}
226 211
 
227
-	// In update, client should provide the complete spec of the swarm, including
228
-	// Name and Labels. If a field is specified with 0 or nil, then the default value
229
-	// will be used to swarmkit.
230
-	clusterSpec, err := convert.SwarmSpecToGRPC(spec)
231
-	if err != nil {
232
-		return apierrors.NewBadRequestError(err)
233
-	}
212
+		// In update, client should provide the complete spec of the swarm, including
213
+		// Name and Labels. If a field is specified with 0 or nil, then the default value
214
+		// will be used to swarmkit.
215
+		clusterSpec, err := convert.SwarmSpecToGRPC(spec)
216
+		if err != nil {
217
+			return apierrors.NewBadRequestError(err)
218
+		}
234 219
 
235
-	_, err = state.controlClient.UpdateCluster(
236
-		ctx,
237
-		&swarmapi.UpdateClusterRequest{
238
-			ClusterID: swarm.ID,
239
-			Spec:      &clusterSpec,
240
-			ClusterVersion: &swarmapi.Version{
241
-				Index: version,
242
-			},
243
-			Rotation: swarmapi.KeyRotation{
244
-				WorkerJoinToken:  flags.RotateWorkerToken,
245
-				ManagerJoinToken: flags.RotateManagerToken,
246
-				ManagerUnlockKey: flags.RotateManagerUnlockKey,
220
+		_, err = state.controlClient.UpdateCluster(
221
+			ctx,
222
+			&swarmapi.UpdateClusterRequest{
223
+				ClusterID: swarm.ID,
224
+				Spec:      &clusterSpec,
225
+				ClusterVersion: &swarmapi.Version{
226
+					Index: version,
227
+				},
228
+				Rotation: swarmapi.KeyRotation{
229
+					WorkerJoinToken:  flags.RotateWorkerToken,
230
+					ManagerJoinToken: flags.RotateManagerToken,
231
+					ManagerUnlockKey: flags.RotateManagerUnlockKey,
232
+				},
247 233
 			},
248
-		},
249
-	)
250
-	return err
234
+		)
235
+		return err
236
+	})
251 237
 }
252 238
 
253 239
 // GetUnlockKey returns the unlock key for the swarm.
254 240
 func (c *Cluster) GetUnlockKey() (string, error) {
255
-	c.mu.RLock()
256
-	defer c.mu.RUnlock()
241
+	var resp *swarmapi.GetUnlockKeyResponse
242
+	if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
243
+		client := swarmapi.NewCAClient(state.grpcConn)
257 244
 
258
-	state := c.currentNodeState()
259
-	if !state.IsActiveManager() {
260
-		return "", c.errNoManager(state)
261
-	}
262
-
263
-	ctx, cancel := c.getRequestContext()
264
-	defer cancel()
265
-
266
-	client := swarmapi.NewCAClient(state.grpcConn)
267
-
268
-	r, err := client.GetUnlockKey(ctx, &swarmapi.GetUnlockKeyRequest{})
269
-	if err != nil {
245
+		r, err := client.GetUnlockKey(ctx, &swarmapi.GetUnlockKeyRequest{})
246
+		if err != nil {
247
+			return err
248
+		}
249
+		resp = r
250
+		return nil
251
+	}); err != nil {
270 252
 		return "", err
271 253
 	}
272
-
273
-	if len(r.UnlockKey) == 0 {
254
+	if len(resp.UnlockKey) == 0 {
274 255
 		// no key
275 256
 		return "", nil
276 257
 	}
277
-
278
-	return encryption.HumanReadableKey(r.UnlockKey), nil
258
+	return encryption.HumanReadableKey(resp.UnlockKey), nil
279 259
 }
280 260
 
281 261
 // UnlockSwarm provides a key to decrypt data that is encrypted at rest.
... ...
@@ -6,6 +6,7 @@ import (
6 6
 	types "github.com/docker/docker/api/types/swarm"
7 7
 	"github.com/docker/docker/daemon/cluster/convert"
8 8
 	swarmapi "github.com/docker/swarmkit/api"
9
+	"golang.org/x/net/context"
9 10
 )
10 11
 
11 12
 // GetTasks returns a list of tasks matching the filter options.
... ...
@@ -71,19 +72,15 @@ func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, erro
71 71
 
72 72
 // GetTask returns a task by an ID.
73 73
 func (c *Cluster) GetTask(input string) (types.Task, error) {
74
-	c.mu.RLock()
75
-	defer c.mu.RUnlock()
76
-
77
-	state := c.currentNodeState()
78
-	if !state.IsActiveManager() {
79
-		return types.Task{}, c.errNoManager(state)
80
-	}
81
-
82
-	ctx, cancel := c.getRequestContext()
83
-	defer cancel()
84
-
85
-	task, err := getTask(ctx, state.controlClient, input)
86
-	if err != nil {
74
+	var task *swarmapi.Task
75
+	if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
76
+		t, err := getTask(ctx, state.controlClient, input)
77
+		if err != nil {
78
+			return err
79
+		}
80
+		task = t
81
+		return nil
82
+	}); err != nil {
87 83
 		return types.Task{}, err
88 84
 	}
89 85
 	return convert.TaskFromGRPC(*task), nil