Browse code

Support rolling deployment hooks

Introduce hook support for the rolling deployment strategy.

Dan Mace authored on 2015/05/15 05:33:38
Showing 15 changed files
... ...
@@ -161,7 +161,39 @@
161 161
           "replicas": 2
162 162
         },
163 163
         "strategy": {
164
-          "type": "Rolling"
164
+          "type": "Rolling",
165
+          "rollingParams": {
166
+            "pre": {
167
+              "failurePolicy": "Abort",
168
+              "execNewPod": {
169
+                "containerName": "ruby-helloworld",
170
+                "command": [
171
+                  "/bin/true"
172
+                ],
173
+                "env": [
174
+                  {
175
+                    "name": "CUSTOM_VAR1",
176
+                    "value": "custom_value1"
177
+                  }
178
+                ]
179
+              }
180
+            },
181
+            "post": {
182
+              "failurePolicy": "Ignore",
183
+              "execNewPod": {
184
+                "containerName": "ruby-helloworld",
185
+                "command": [
186
+                  "/bin/false"
187
+                ],
188
+                "env": [
189
+                  {
190
+                    "name": "CUSTOM_VAR2",
191
+                    "value": "custom_value2"
192
+                  }
193
+                ]
194
+              }
195
+            }
196
+          }
165 197
         }
166 198
       },
167 199
       "triggers": [
... ...
@@ -252,7 +284,39 @@
252 252
           "replicas": 1
253 253
         },
254 254
         "strategy": {
255
-          "type": "Recreate"
255
+          "type": "Recreate",
256
+          "recreateParams": {
257
+            "pre": {
258
+              "failurePolicy": "Abort",
259
+              "execNewPod": {
260
+                "containerName": "ruby-helloworld-database",
261
+                "command": [
262
+                  "/bin/true"
263
+                ],
264
+                "env": [
265
+                  {
266
+                    "name": "CUSTOM_VAR1",
267
+                    "value": "custom_value1"
268
+                  }
269
+                ]
270
+              }
271
+            },
272
+            "post": {
273
+              "failurePolicy": "Ignore",
274
+              "execNewPod": {
275
+                "containerName": "ruby-helloworld-database",
276
+                "command": [
277
+                  "/bin/false"
278
+                ],
279
+                "env": [
280
+                  {
281
+                    "name": "CUSTOM_VAR2",
282
+                    "value": "custom_value2"
283
+                  }
284
+                ]
285
+              }
286
+            }
287
+          }
256 288
         }
257 289
       },
258 290
       "triggers": [
... ...
@@ -181,6 +181,17 @@ func printStrategy(strategy deployapi.DeploymentStrategy, w *tabwriter.Writer) {
181 181
 				printHook("Post-deployment", post, w)
182 182
 			}
183 183
 		}
184
+	case deployapi.DeploymentStrategyTypeRolling:
185
+		if strategy.RollingParams != nil {
186
+			pre := strategy.RollingParams.Pre
187
+			post := strategy.RollingParams.Post
188
+			if pre != nil {
189
+				printHook("Pre-deployment", pre, w)
190
+			}
191
+			if post != nil {
192
+				printHook("Post-deployment", post, w)
193
+			}
194
+		}
184 195
 	case deployapi.DeploymentStrategyTypeCustom:
185 196
 		fmt.Fprintf(w, "\t  Image:\t%s\n", strategy.CustomParams.Image)
186 197
 
... ...
@@ -24,6 +24,12 @@ func OkCustomStrategy() deployapi.DeploymentStrategy {
24 24
 	return deployapi.DeploymentStrategy{
25 25
 		Type:         deployapi.DeploymentStrategyTypeCustom,
26 26
 		CustomParams: OkCustomParams(),
27
+		Resources: kapi.ResourceRequirements{
28
+			Limits: kapi.ResourceList{
29
+				kapi.ResourceName(kapi.ResourceCPU):    resource.MustParse("10"),
30
+				kapi.ResourceName(kapi.ResourceMemory): resource.MustParse("10G"),
31
+			},
32
+		},
27 33
 	}
28 34
 }
29 35
 
... ...
@@ -40,6 +46,27 @@ func OkCustomParams() *deployapi.CustomDeploymentStrategyParams {
40 40
 	}
41 41
 }
42 42
 
43
+func OkRollingStrategy() deployapi.DeploymentStrategy {
44
+	mkintp := func(i int) *int64 {
45
+		v := int64(i)
46
+		return &v
47
+	}
48
+	return deployapi.DeploymentStrategy{
49
+		Type: deployapi.DeploymentStrategyTypeRolling,
50
+		RollingParams: &deployapi.RollingDeploymentStrategyParams{
51
+			UpdatePeriodSeconds: mkintp(1),
52
+			IntervalSeconds:     mkintp(1),
53
+			TimeoutSeconds:      mkintp(20),
54
+		},
55
+		Resources: kapi.ResourceRequirements{
56
+			Limits: kapi.ResourceList{
57
+				kapi.ResourceName(kapi.ResourceCPU):    resource.MustParse("10"),
58
+				kapi.ResourceName(kapi.ResourceMemory): resource.MustParse("10G"),
59
+			},
60
+		},
61
+	}
62
+}
63
+
43 64
 func OkControllerTemplate() kapi.ReplicationControllerSpec {
44 65
 	return kapi.ReplicationControllerSpec{
45 66
 		Replicas: 1,
... ...
@@ -139,6 +139,13 @@ type RollingDeploymentStrategyParams struct {
139 139
 	// TimeoutSeconds is the time to wait for updates before giving up. If the
140 140
 	// value is nil, a default will be used.
141 141
 	TimeoutSeconds *int64 `json:"timeoutSeconds,omitempty" description:"the time to wait for updates before giving up"`
142
+	// Pre is a lifecycle hook which is executed before the deployment process
143
+	// begins. All LifecycleHookFailurePolicy values are supported.
144
+	Pre *LifecycleHook `json:"pre,omitempty" description:"a hook executed before the strategy starts the deployment"`
145
+	// Post is a lifecycle hook which is executed after the strategy has
146
+	// finished all deployment logic. The LifecycleHookFailurePolicyAbort policy
147
+	// is NOT supported.
148
+	Post *LifecycleHook `json:"post,omitempty" description:"a hook executed after the strategy finishes the deployment"`
142 149
 }
143 150
 
144 151
 // DeploymentList is a collection of deployments.
... ...
@@ -115,6 +115,13 @@ type RollingDeploymentStrategyParams struct {
115 115
 	// TimeoutSeconds is the time to wait for updates before giving up. If the
116 116
 	// value is nil, a default will be used.
117 117
 	TimeoutSeconds *int64 `json:"timeoutSeconds,omitempty" description:"the time to wait for updates before giving up"`
118
+	// Pre is a lifecycle hook which is executed before the deployment process
119
+	// begins. All LifecycleHookFailurePolicy values are supported.
120
+	Pre *LifecycleHook `json:"pre,omitempty" description:"a hook executed before the strategy starts the deployment"`
121
+	// Post is a lifecycle hook which is executed after the strategy has
122
+	// finished all deployment logic. The LifecycleHookFailurePolicyAbort policy
123
+	// is NOT supported.
124
+	Post *LifecycleHook `json:"post,omitempty" description:"a hook executed after the strategy finishes the deployment"`
118 125
 }
119 126
 
120 127
 // These constants represent keys used for correlating objects related to deployments.
... ...
@@ -140,6 +140,13 @@ type RollingDeploymentStrategyParams struct {
140 140
 	// TimeoutSeconds is the time to wait for updates before giving up. If the
141 141
 	// value is nil, a default will be used.
142 142
 	TimeoutSeconds *int64 `json:"timeoutSeconds,omitempty" description:"the time to wait for updates before giving up"`
143
+	// Pre is a lifecycle hook which is executed before the deployment process
144
+	// begins. All LifecycleHookFailurePolicy values are supported.
145
+	Pre *LifecycleHook `json:"pre,omitempty" description:"a hook executed before the strategy starts the deployment"`
146
+	// Post is a lifecycle hook which is executed after the strategy has
147
+	// finished all deployment logic. The LifecycleHookFailurePolicyAbort policy
148
+	// is NOT supported.
149
+	Post *LifecycleHook `json:"post,omitempty" description:"a hook executed after the strategy finishes the deployment"`
143 150
 }
144 151
 
145 152
 // A DeploymentList is a collection of deployments.
... ...
@@ -115,6 +115,13 @@ type RollingDeploymentStrategyParams struct {
115 115
 	// TimeoutSeconds is the time to wait for updates before giving up. If the
116 116
 	// value is nil, a default will be used.
117 117
 	TimeoutSeconds *int64 `json:"timeoutSeconds,omitempty" description:"the time to wait for updates before giving up"`
118
+	// Pre is a lifecycle hook which is executed before the deployment process
119
+	// begins. All LifecycleHookFailurePolicy values are supported.
120
+	Pre *LifecycleHook `json:"pre,omitempty" description:"a hook executed before the strategy starts the deployment"`
121
+	// Post is a lifecycle hook which is executed after the strategy has
122
+	// finished all deployment logic. The LifecycleHookFailurePolicyAbort policy
123
+	// is NOT supported.
124
+	Post *LifecycleHook `json:"post,omitempty" description:"a hook executed after the strategy finishes the deployment"`
118 125
 }
119 126
 
120 127
 // These constants represent keys used for correlating objects related to deployments.
... ...
@@ -199,6 +199,13 @@ func validateRollingParams(params *deployapi.RollingDeploymentStrategyParams) fi
199 199
 		errs = append(errs, fielderrors.NewFieldInvalid("timeoutSeconds", *params.TimeoutSeconds, "must be >0"))
200 200
 	}
201 201
 
202
+	if params.Pre != nil {
203
+		errs = append(errs, validateLifecycleHook(params.Pre).Prefix("pre")...)
204
+	}
205
+	if params.Post != nil {
206
+		errs = append(errs, validateLifecycleHook(params.Post).Prefix("post")...)
207
+	}
208
+
202 209
 	return errs
203 210
 }
204 211
 
... ...
@@ -385,6 +385,30 @@ func TestValidateDeploymentConfigMissingFields(t *testing.T) {
385 385
 			fielderrors.ValidationErrorTypeInvalid,
386 386
 			"template.strategy.rollingParams.timeoutSeconds",
387 387
 		},
388
+		"missing template.strategy.rollingParams.pre.failurePolicy": {
389
+			api.DeploymentConfig{
390
+				ObjectMeta: kapi.ObjectMeta{Name: "foo", Namespace: "bar"},
391
+				Template: api.DeploymentTemplate{
392
+					Strategy: api.DeploymentStrategy{
393
+						Type: api.DeploymentStrategyTypeRolling,
394
+						RollingParams: &api.RollingDeploymentStrategyParams{
395
+							IntervalSeconds:     mkintp(1),
396
+							UpdatePeriodSeconds: mkintp(1),
397
+							TimeoutSeconds:      mkintp(20),
398
+							Pre: &api.LifecycleHook{
399
+								ExecNewPod: &api.ExecNewPodHook{
400
+									Command:       []string{"cmd"},
401
+									ContainerName: "container",
402
+								},
403
+							},
404
+						},
405
+					},
406
+					ControllerTemplate: test.OkControllerTemplate(),
407
+				},
408
+			},
409
+			fielderrors.ValidationErrorTypeRequired,
410
+			"template.strategy.rollingParams.pre.failurePolicy",
411
+		},
388 412
 	}
389 413
 
390 414
 	for k, v := range errorCases {
... ...
@@ -11,7 +11,6 @@ import (
11 11
 	kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
12 12
 	kclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
13 13
 	"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
14
-	"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
15 14
 
16 15
 	deployapi "github.com/openshift/origin/pkg/deploy/api"
17 16
 	stratsupport "github.com/openshift/origin/pkg/deploy/strategy/support"
... ...
@@ -47,8 +46,8 @@ func NewRecreateDeploymentStrategy(client kclient.Interface, codec runtime.Codec
47 47
 				CreatePodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) {
48 48
 					return client.Pods(namespace).Create(pod)
49 49
 				},
50
-				WatchPodFunc: func(namespace, name string) (watch.Interface, error) {
51
-					return newPodWatch(client, namespace, name, 5*time.Second), nil
50
+				PodWatchFunc: func(namespace, name, resourceVersion string) func() *kapi.Pod {
51
+					return stratsupport.NewPodWatch(client, namespace, name, resourceVersion)
52 52
 				},
53 53
 			},
54 54
 		},
... ...
@@ -66,28 +65,12 @@ func (s *RecreateDeploymentStrategy) Deploy(deployment *kapi.ReplicationControll
66 66
 		return fmt.Errorf("couldn't decode DeploymentConfig from Deployment %s: %v", deployment.Name, err)
67 67
 	}
68 68
 
69
+	params := deploymentConfig.Template.Strategy.RecreateParams
69 70
 	// Execute any pre-hook.
70
-	if deploymentConfig.Template.Strategy.RecreateParams != nil {
71
-		preHook := deploymentConfig.Template.Strategy.RecreateParams.Pre
72
-		if preHook != nil {
73
-		preHookLoop:
74
-			for {
75
-				err := s.hookExecutor.Execute(preHook, deployment)
76
-				if err == nil {
77
-					glog.Info("Pre hook finished successfully")
78
-					break
79
-				}
80
-				switch preHook.FailurePolicy {
81
-				case deployapi.LifecycleHookFailurePolicyAbort:
82
-					return fmt.Errorf("Pre hook failed, aborting: %s", err)
83
-				case deployapi.LifecycleHookFailurePolicyIgnore:
84
-					glog.V(2).Infof("Pre hook failed, ignoring: %s", err)
85
-					break preHookLoop
86
-				case deployapi.LifecycleHookFailurePolicyRetry:
87
-					glog.V(2).Infof("Pre hook failed, retrying: %s", err)
88
-					time.Sleep(s.retryPeriod)
89
-				}
90
-			}
71
+	if params != nil && params.Pre != nil {
72
+		err := s.hookExecutor.Execute(params.Pre, deployment, s.retryPeriod)
73
+		if err != nil {
74
+			return fmt.Errorf("Pre hook failed: %s", err)
91 75
 		}
92 76
 	}
93 77
 
... ...
@@ -121,27 +104,15 @@ func (s *RecreateDeploymentStrategy) Deploy(deployment *kapi.ReplicationControll
121 121
 		}
122 122
 	}
123 123
 
124
-	// Execute any post-hook.
125
-	if deploymentConfig.Template.Strategy.RecreateParams != nil {
126
-		postHook := deploymentConfig.Template.Strategy.RecreateParams.Post
127
-		if postHook != nil {
128
-		postHookLoop:
129
-			for {
130
-				err := s.hookExecutor.Execute(postHook, deployment)
131
-				if err == nil {
132
-					glog.V(4).Info("Post hook finished successfully")
133
-					break
134
-				}
135
-				switch postHook.FailurePolicy {
136
-				case deployapi.LifecycleHookFailurePolicyIgnore, deployapi.LifecycleHookFailurePolicyAbort:
137
-					// Abort isn't supported here, so treat it like ignore.
138
-					glog.V(2).Infof("Post hook failed, ignoring: %s", err)
139
-					break postHookLoop
140
-				case deployapi.LifecycleHookFailurePolicyRetry:
141
-					glog.V(2).Infof("Post hook failed, retrying: %s", err)
142
-					time.Sleep(s.retryPeriod)
143
-				}
144
-			}
124
+	// Execute any post-hook. Errors are logged and ignored.
125
+	if params != nil && params.Post != nil {
126
+		// TODO: handle this in defaulting/conversion/validation?
127
+		if params.Post.FailurePolicy == deployapi.LifecycleHookFailurePolicyAbort {
128
+			params.Post.FailurePolicy = deployapi.LifecycleHookFailurePolicyIgnore
129
+		}
130
+		err := s.hookExecutor.Execute(params.Post, deployment, s.retryPeriod)
131
+		if err != nil {
132
+			glog.Infof("Post hook failed: %s", err)
145 133
 		}
146 134
 	}
147 135
 
... ...
@@ -206,65 +177,14 @@ func (r *realReplicationControllerClient) updateReplicationController(namespace
206 206
 
207 207
 // hookExecutor knows how to execute a deployment lifecycle hook.
208 208
 type hookExecutor interface {
209
-	Execute(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController) error
209
+	Execute(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, retryPeriod time.Duration) error
210 210
 }
211 211
 
212 212
 // hookExecutorImpl is a pluggable hookExecutor.
213 213
 type hookExecutorImpl struct {
214
-	executeFunc func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController) error
215
-}
216
-
217
-func (i *hookExecutorImpl) Execute(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController) error {
218
-	return i.executeFunc(hook, deployment)
219
-}
220
-
221
-// podWatch provides watch semantics for a pod backed by a poller, since
222
-// events aren't generated for pod status updates.
223
-type podWatch struct {
224
-	result chan watch.Event
225
-	stop   chan bool
226
-}
227
-
228
-// newPodWatch makes a new podWatch.
229
-func newPodWatch(client kclient.Interface, namespace, name string, period time.Duration) *podWatch {
230
-	pods := make(chan watch.Event)
231
-	stop := make(chan bool)
232
-	tick := time.NewTicker(period)
233
-	go func() {
234
-		for {
235
-			select {
236
-			case <-stop:
237
-				return
238
-			case <-tick.C:
239
-				pod, err := client.Pods(namespace).Get(name)
240
-				if err != nil {
241
-					pods <- watch.Event{
242
-						Type: watch.Error,
243
-						Object: &kapi.Status{
244
-							Status:  "Failure",
245
-							Message: fmt.Sprintf("couldn't get pod %s/%s: %s", namespace, name, err),
246
-						},
247
-					}
248
-					continue
249
-				}
250
-				pods <- watch.Event{
251
-					Type:   watch.Modified,
252
-					Object: pod,
253
-				}
254
-			}
255
-		}
256
-	}()
257
-
258
-	return &podWatch{
259
-		result: pods,
260
-		stop:   stop,
261
-	}
262
-}
263
-
264
-func (w *podWatch) Stop() {
265
-	w.stop <- true
214
+	executeFunc func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, retryPeriod time.Duration) error
266 215
 }
267 216
 
268
-func (w *podWatch) ResultChan() <-chan watch.Event {
269
-	return w.result
217
+func (i *hookExecutorImpl) Execute(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, retryPeriod time.Duration) error {
218
+	return i.executeFunc(hook, deployment, retryPeriod)
270 219
 }
... ...
@@ -30,11 +30,6 @@ func TestRecreate_initialDeployment(t *testing.T) {
30 30
 				return ctrl, nil
31 31
 			},
32 32
 		},
33
-		hookExecutor: &hookExecutorImpl{
34
-			executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController) error {
35
-				return nil
36
-			},
37
-		},
38 33
 	}
39 34
 
40 35
 	// Deployment replicas should follow the config as there's no explicit
... ...
@@ -105,11 +100,6 @@ func TestRecreate_secondDeploymentWithSuccessfulRetries(t *testing.T) {
105 105
 				return ctrl, nil
106 106
 			},
107 107
 		},
108
-		hookExecutor: &hookExecutorImpl{
109
-			executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController) error {
110
-				return nil
111
-			},
112
-		},
113 108
 	}
114 109
 
115 110
 	err := strategy.Deploy(newDeployment, []*kapi.ReplicationController{oldDeployment})
... ...
@@ -153,11 +143,6 @@ func TestRecreate_secondDeploymentScaleUpRetries(t *testing.T) {
153 153
 				return nil, fmt.Errorf("update failure")
154 154
 			},
155 155
 		},
156
-		hookExecutor: &hookExecutorImpl{
157
-			executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController) error {
158
-				return nil
159
-			},
160
-		},
161 156
 	}
162 157
 
163 158
 	err := strategy.Deploy(newDeployment, []*kapi.ReplicationController{oldDeployment})
... ...
@@ -205,11 +190,6 @@ func TestRecreate_secondDeploymentScaleDownRetries(t *testing.T) {
205 205
 				}
206 206
 			},
207 207
 		},
208
-		hookExecutor: &hookExecutorImpl{
209
-			executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController) error {
210
-				return nil
211
-			},
212
-		},
213 208
 	}
214 209
 
215 210
 	err := strategy.Deploy(newDeployment, []*kapi.ReplicationController{oldDeployment})
... ...
@@ -243,7 +223,7 @@ func TestRecreate_deploymentPreHookSuccess(t *testing.T) {
243 243
 			},
244 244
 		},
245 245
 		hookExecutor: &hookExecutorImpl{
246
-			executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController) error {
246
+			executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, _ time.Duration) error {
247 247
 				return nil
248 248
 			},
249 249
 		},
... ...
@@ -264,7 +244,7 @@ func TestRecreate_deploymentPreHookSuccess(t *testing.T) {
264 264
 	}
265 265
 }
266 266
 
267
-func TestRecreate_deploymentPreHookFailAbort(t *testing.T) {
267
+func TestRecreate_deploymentPreHookFail(t *testing.T) {
268 268
 	config := deploytest.OkDeploymentConfig(1)
269 269
 	config.Template.Strategy.RecreateParams = recreateParams(deployapi.LifecycleHookFailurePolicyAbort, "")
270 270
 	deployment, _ := deployutil.MakeDeployment(config, kapi.Codec)
... ...
@@ -284,7 +264,7 @@ func TestRecreate_deploymentPreHookFailAbort(t *testing.T) {
284 284
 			},
285 285
 		},
286 286
 		hookExecutor: &hookExecutorImpl{
287
-			executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController) error {
287
+			executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, _ time.Duration) error {
288 288
 				return fmt.Errorf("hook execution failure")
289 289
 			},
290 290
 		},
... ...
@@ -292,95 +272,7 @@ func TestRecreate_deploymentPreHookFailAbort(t *testing.T) {
292 292
 
293 293
 	err := strategy.Deploy(deployment, []*kapi.ReplicationController{})
294 294
 	if err == nil {
295
-		t.Fatalf("expected a deploy error")
296
-	}
297
-	t.Logf("got expected error: %s", err)
298
-}
299
-
300
-func TestRecreate_deploymentPreHookFailureIgnored(t *testing.T) {
301
-	var updatedController *kapi.ReplicationController
302
-	config := deploytest.OkDeploymentConfig(1)
303
-	config.Template.Strategy.RecreateParams = recreateParams(deployapi.LifecycleHookFailurePolicyIgnore, "")
304
-	deployment, _ := deployutil.MakeDeployment(config, kapi.Codec)
305
-
306
-	strategy := &RecreateDeploymentStrategy{
307
-		codec:        api.Codec,
308
-		retryTimeout: 1 * time.Second,
309
-		retryPeriod:  1 * time.Millisecond,
310
-		client: &testControllerClient{
311
-			getReplicationControllerFunc: func(namespace, name string) (*kapi.ReplicationController, error) {
312
-				return deployment, nil
313
-			},
314
-			updateReplicationControllerFunc: func(namespace string, ctrl *kapi.ReplicationController) (*kapi.ReplicationController, error) {
315
-				updatedController = ctrl
316
-				return ctrl, nil
317
-			},
318
-		},
319
-		hookExecutor: &hookExecutorImpl{
320
-			executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController) error {
321
-				return fmt.Errorf("hook execution failure")
322
-			},
323
-		},
324
-	}
325
-
326
-	err := strategy.Deploy(deployment, []*kapi.ReplicationController{})
327
-
328
-	if err != nil {
329
-		t.Fatalf("unexpected deploy error: %#v", err)
330
-	}
331
-
332
-	if updatedController == nil {
333
-		t.Fatalf("expected a ReplicationController")
334
-	}
335
-
336
-	if e, a := 1, updatedController.Spec.Replicas; e != a {
337
-		t.Fatalf("expected controller replicas to be %d, got %d", e, a)
338
-	}
339
-}
340
-
341
-func TestRecreate_deploymentPreHookFailureRetried(t *testing.T) {
342
-	var updatedController *kapi.ReplicationController
343
-	config := deploytest.OkDeploymentConfig(1)
344
-	config.Template.Strategy.RecreateParams = recreateParams(deployapi.LifecycleHookFailurePolicyRetry, "")
345
-	deployment, _ := deployutil.MakeDeployment(config, kapi.Codec)
346
-
347
-	errorCount := 2
348
-	strategy := &RecreateDeploymentStrategy{
349
-		codec:        api.Codec,
350
-		retryTimeout: 1 * time.Second,
351
-		retryPeriod:  1 * time.Millisecond,
352
-		client: &testControllerClient{
353
-			getReplicationControllerFunc: func(namespace, name string) (*kapi.ReplicationController, error) {
354
-				return deployment, nil
355
-			},
356
-			updateReplicationControllerFunc: func(namespace string, ctrl *kapi.ReplicationController) (*kapi.ReplicationController, error) {
357
-				updatedController = ctrl
358
-				return ctrl, nil
359
-			},
360
-		},
361
-		hookExecutor: &hookExecutorImpl{
362
-			executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController) error {
363
-				if errorCount == 0 {
364
-					return nil
365
-				}
366
-				errorCount--
367
-				return fmt.Errorf("hook execution failure")
368
-			},
369
-		},
370
-	}
371
-
372
-	err := strategy.Deploy(deployment, []*kapi.ReplicationController{})
373
-
374
-	if err != nil {
375
-		t.Fatalf("unexpected deploy error: %#v", err)
376
-	}
377
-
378
-	if updatedController == nil {
379
-		t.Fatalf("expected a ReplicationController")
380
-	}
381
-
382
-	if e, a := 1, updatedController.Spec.Replicas; e != a {
383
-		t.Fatalf("expected controller replicas to be %d, got %d", e, a)
295
+		t.Fatalf("expected deploy error: %v", err)
384 296
 	}
385 297
 }
386 298
 
... ...
@@ -404,7 +296,7 @@ func TestRecreate_deploymentPostHookSuccess(t *testing.T) {
404 404
 			},
405 405
 		},
406 406
 		hookExecutor: &hookExecutorImpl{
407
-			executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController) error {
407
+			executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, _ time.Duration) error {
408 408
 				return nil
409 409
 			},
410 410
 		},
... ...
@@ -425,48 +317,7 @@ func TestRecreate_deploymentPostHookSuccess(t *testing.T) {
425 425
 	}
426 426
 }
427 427
 
428
-func TestRecreate_deploymentPostHookAbortUnsupported(t *testing.T) {
429
-	var updatedController *kapi.ReplicationController
430
-	config := deploytest.OkDeploymentConfig(1)
431
-	config.Template.Strategy.RecreateParams = recreateParams("", deployapi.LifecycleHookFailurePolicyAbort)
432
-	deployment, _ := deployutil.MakeDeployment(config, kapi.Codec)
433
-
434
-	strategy := &RecreateDeploymentStrategy{
435
-		codec:        api.Codec,
436
-		retryTimeout: 1 * time.Second,
437
-		retryPeriod:  1 * time.Millisecond,
438
-		client: &testControllerClient{
439
-			getReplicationControllerFunc: func(namespace, name string) (*kapi.ReplicationController, error) {
440
-				return deployment, nil
441
-			},
442
-			updateReplicationControllerFunc: func(namespace string, ctrl *kapi.ReplicationController) (*kapi.ReplicationController, error) {
443
-				updatedController = ctrl
444
-				return ctrl, nil
445
-			},
446
-		},
447
-		hookExecutor: &hookExecutorImpl{
448
-			executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController) error {
449
-				return fmt.Errorf("hook execution failure")
450
-			},
451
-		},
452
-	}
453
-
454
-	err := strategy.Deploy(deployment, []*kapi.ReplicationController{})
455
-
456
-	if err != nil {
457
-		t.Fatalf("unexpected deploy error: %#v", err)
458
-	}
459
-
460
-	if updatedController == nil {
461
-		t.Fatalf("expected a ReplicationController")
462
-	}
463
-
464
-	if e, a := 1, updatedController.Spec.Replicas; e != a {
465
-		t.Fatalf("expected controller replicas to be %d, got %d", e, a)
466
-	}
467
-}
468
-
469
-func TestRecreate_deploymentPostHookFailIgnore(t *testing.T) {
428
+func TestRecreate_deploymentPostHookFailureIgnored(t *testing.T) {
470 429
 	var updatedController *kapi.ReplicationController
471 430
 	config := deploytest.OkDeploymentConfig(1)
472 431
 	config.Template.Strategy.RecreateParams = recreateParams("", deployapi.LifecycleHookFailurePolicyIgnore)
... ...
@@ -486,53 +337,7 @@ func TestRecreate_deploymentPostHookFailIgnore(t *testing.T) {
486 486
 			},
487 487
 		},
488 488
 		hookExecutor: &hookExecutorImpl{
489
-			executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController) error {
490
-				return fmt.Errorf("hook execution failure")
491
-			},
492
-		},
493
-	}
494
-
495
-	err := strategy.Deploy(deployment, []*kapi.ReplicationController{})
496
-
497
-	if err != nil {
498
-		t.Fatalf("unexpected deploy error: %#v", err)
499
-	}
500
-
501
-	if updatedController == nil {
502
-		t.Fatalf("expected a ReplicationController")
503
-	}
504
-
505
-	if e, a := 1, updatedController.Spec.Replicas; e != a {
506
-		t.Fatalf("expected controller replicas to be %d, got %d", e, a)
507
-	}
508
-}
509
-
510
-func TestRecreate_deploymentPostHookFailureRetried(t *testing.T) {
511
-	var updatedController *kapi.ReplicationController
512
-	config := deploytest.OkDeploymentConfig(1)
513
-	config.Template.Strategy.RecreateParams = recreateParams("", deployapi.LifecycleHookFailurePolicyRetry)
514
-	deployment, _ := deployutil.MakeDeployment(config, kapi.Codec)
515
-
516
-	errorCount := 2
517
-	strategy := &RecreateDeploymentStrategy{
518
-		codec:        api.Codec,
519
-		retryTimeout: 1 * time.Second,
520
-		retryPeriod:  1 * time.Millisecond,
521
-		client: &testControllerClient{
522
-			getReplicationControllerFunc: func(namespace, name string) (*kapi.ReplicationController, error) {
523
-				return deployment, nil
524
-			},
525
-			updateReplicationControllerFunc: func(namespace string, ctrl *kapi.ReplicationController) (*kapi.ReplicationController, error) {
526
-				updatedController = ctrl
527
-				return ctrl, nil
528
-			},
529
-		},
530
-		hookExecutor: &hookExecutorImpl{
531
-			executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController) error {
532
-				if errorCount == 0 {
533
-					return nil
534
-				}
535
-				errorCount--
489
+			executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, _ time.Duration) error {
536 490
 				return fmt.Errorf("hook execution failure")
537 491
 			},
538 492
 		},
... ...
@@ -13,7 +13,9 @@ import (
13 13
 	"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
14 14
 	"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
15 15
 
16
+	deployapi "github.com/openshift/origin/pkg/deploy/api"
16 17
 	"github.com/openshift/origin/pkg/deploy/strategy"
18
+	stratsupport "github.com/openshift/origin/pkg/deploy/strategy/support"
17 19
 	deployutil "github.com/openshift/origin/pkg/deploy/util"
18 20
 )
19 21
 
... ...
@@ -44,6 +46,8 @@ type RollingDeploymentStrategy struct {
44 44
 	rollingUpdate func(config *kubectl.RollingUpdaterConfig) error
45 45
 	// codec is used to access the encoded config on a deployment.
46 46
 	codec runtime.Codec
47
+	// hookExecutor can execute a lifecycle hook.
48
+	hookExecutor hookExecutor
47 49
 }
48 50
 
49 51
 // NewRollingDeploymentStrategy makes a new RollingDeploymentStrategy.
... ...
@@ -80,6 +84,16 @@ func NewRollingDeploymentStrategy(namespace string, client kclient.Interface, co
80 80
 			updater := kubectl.NewRollingUpdater(namespace, updaterClient)
81 81
 			return updater.Update(config)
82 82
 		},
83
+		hookExecutor: &stratsupport.HookExecutor{
84
+			PodClient: &stratsupport.HookExecutorPodClientImpl{
85
+				CreatePodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) {
86
+					return client.Pods(namespace).Create(pod)
87
+				},
88
+				PodWatchFunc: func(namespace, name, resourceVersion string) func() *kapi.Pod {
89
+					return stratsupport.NewPodWatch(client, namespace, name, resourceVersion)
90
+				},
91
+			},
92
+		},
83 93
 	}
84 94
 }
85 95
 
... ...
@@ -89,6 +103,10 @@ func (s *RollingDeploymentStrategy) Deploy(deployment *kapi.ReplicationControlle
89 89
 		return fmt.Errorf("couldn't decode DeploymentConfig from Deployment %s/%s: %v", deployment.Namespace, deployment.Name, err)
90 90
 	}
91 91
 
92
+	params := config.Template.Strategy.RollingParams
93
+	// TODO: Consider exposing this via the API.
94
+	hookRetryPeriod := 1 * time.Second
95
+
92 96
 	// Find the latest deployment (if any).
93 97
 	latest, err := s.findLatestDeployment(oldDeployments)
94 98
 	if err != nil {
... ...
@@ -97,8 +115,51 @@ func (s *RollingDeploymentStrategy) Deploy(deployment *kapi.ReplicationControlle
97 97
 
98 98
 	// If there's no prior deployment, delegate to another strategy since the
99 99
 	// rolling updater only supports transitioning between two deployments.
100
+	//
101
+	// Hook support is duplicated here for now. When the rolling updater can
102
+	// handle initial deployments, all of this code can go away.
100 103
 	if latest == nil {
101
-		return s.initialStrategy.Deploy(deployment, oldDeployments)
104
+		// Execute any pre-hook.
105
+		if params.Pre != nil {
106
+			err := s.hookExecutor.Execute(params.Pre, deployment, hookRetryPeriod)
107
+			if err != nil {
108
+				return fmt.Errorf("Pre hook failed: %s", err)
109
+			}
110
+			glog.Infof("Pre hook finished")
111
+		}
112
+
113
+		// Execute the delegate strategy.
114
+		err := s.initialStrategy.Deploy(deployment, oldDeployments)
115
+		if err != nil {
116
+			return err
117
+		}
118
+
119
+		// Execute any post-hook. Errors are logged and ignored.
120
+		if params.Post != nil {
121
+			// TODO: handle this in defaulting/conversion/validation?
122
+			if params.Post.FailurePolicy == deployapi.LifecycleHookFailurePolicyAbort {
123
+				params.Post.FailurePolicy = deployapi.LifecycleHookFailurePolicyIgnore
124
+			}
125
+			err := s.hookExecutor.Execute(params.Post, deployment, hookRetryPeriod)
126
+			if err != nil {
127
+				glog.Errorf("Post hook failed: %s", err)
128
+			} else {
129
+				glog.Infof("Post hook finished")
130
+			}
131
+		}
132
+
133
+		// All done.
134
+		return nil
135
+	}
136
+
137
+	// Prepare for a rolling update.
138
+	// Execute any pre-hook.
139
+	if params.Pre != nil {
140
+		err := s.hookExecutor.Execute(params.Pre, deployment, hookRetryPeriod)
141
+		if err != nil {
142
+			return fmt.Errorf("Pre hook failed: %s", err)
143
+		}
144
+		glog.Infof("Pre hook finished")
102 145
 	}
103 146
 
104 147
 	// HACK: Assign the source ID annotation that the rolling updater expects,
... ...
@@ -106,10 +167,14 @@ func (s *RollingDeploymentStrategy) Deploy(deployment *kapi.ReplicationControlle
106 106
 	//
107 107
 	// Related upstream issue:
108 108
 	// https://github.com/GoogleCloudPlatform/kubernetes/pull/7183
109
+	deployment, err = s.client.GetReplicationController(deployment.Namespace, deployment.Name)
110
+	if err != nil {
111
+		return fmt.Errorf("couldn't look up deployment %s: %s", deployutil.LabelForDeployment(deployment))
112
+	}
109 113
 	if _, hasSourceId := deployment.Annotations[sourceIdAnnotation]; !hasSourceId {
110 114
 		deployment.Annotations[sourceIdAnnotation] = fmt.Sprintf("%s:%s", latest.Name, latest.ObjectMeta.UID)
111 115
 		if updated, err := s.client.UpdateReplicationController(deployment.Namespace, deployment); err != nil {
112
-			return fmt.Errorf("couldn't assign source annotation to Deployment %s/%s: %v", deployment.Namespace, deployment.Name, err)
116
+			return fmt.Errorf("couldn't assign source annotation to deployment %s: %v", deployutil.LabelForDeployment(deployment), err)
113 117
 		} else {
114 118
 			deployment = updated
115 119
 		}
... ...
@@ -126,7 +191,6 @@ func (s *RollingDeploymentStrategy) Deploy(deployment *kapi.ReplicationControlle
126 126
 
127 127
 	glog.Infof("OldRc: %s, replicas=%d", latest.Name, latest.Spec.Replicas)
128 128
 	// Perform a rolling update.
129
-	params := config.Template.Strategy.RollingParams
130 129
 	rollingConfig := &kubectl.RollingUpdaterConfig{
131 130
 		Out:           &rollingUpdaterWriter{},
132 131
 		OldRc:         latest,
... ...
@@ -145,7 +209,26 @@ func (s *RollingDeploymentStrategy) Deploy(deployment *kapi.ReplicationControlle
145 145
 		*params.IntervalSeconds,
146 146
 		*params.TimeoutSeconds,
147 147
 	)
148
-	return s.rollingUpdate(rollingConfig)
148
+	err = s.rollingUpdate(rollingConfig)
149
+	if err != nil {
150
+		return err
151
+	}
152
+
153
+	// Execute any post-hook. Errors are logged and ignored.
154
+	if params.Post != nil {
155
+		// TODO: handle this in defaulting/conversion/validation?
156
+		if params.Post.FailurePolicy == deployapi.LifecycleHookFailurePolicyAbort {
157
+			params.Post.FailurePolicy = deployapi.LifecycleHookFailurePolicyIgnore
158
+		}
159
+		err := s.hookExecutor.Execute(params.Post, deployment, hookRetryPeriod)
160
+		if err != nil {
161
+			glog.Errorf("Post hook failed: %s", err)
162
+		} else {
163
+			glog.Info("Post hook finished")
164
+		}
165
+	}
166
+
167
+	return nil
149 168
 }
150 169
 
151 170
 // findLatestDeployment retrieves deployments identified by oldDeployments and
... ...
@@ -210,3 +293,17 @@ func (w *rollingUpdaterWriter) Write(p []byte) (n int, err error) {
210 210
 	glog.Info(fmt.Sprintf("RollingUpdater: %s", p))
211 211
 	return len(p), nil
212 212
 }
213
+
214
+// hookExecutor knows how to execute a deployment lifecycle hook.
215
+type hookExecutor interface {
216
+	Execute(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, retryPeriod time.Duration) error
217
+}
218
+
219
+// hookExecutorImpl is a pluggable hookExecutor.
220
+type hookExecutorImpl struct {
221
+	executeFunc func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, retryPeriod time.Duration) error
222
+}
223
+
224
+func (i *hookExecutorImpl) Execute(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, retryPeriod time.Duration) error {
225
+	return i.executeFunc(hook, deployment, retryPeriod)
226
+}
... ...
@@ -38,7 +38,9 @@ func TestRolling_deployInitial(t *testing.T) {
38 38
 		},
39 39
 	}
40 40
 
41
-	deployment, _ := deployutil.MakeDeployment(deploytest.OkDeploymentConfig(1), kapi.Codec)
41
+	config := deploytest.OkDeploymentConfig(1)
42
+	config.Template.Strategy = deploytest.OkRollingStrategy()
43
+	deployment, _ := deployutil.MakeDeployment(config, kapi.Codec)
42 44
 	err := strategy.Deploy(deployment, []*kapi.ReplicationController{})
43 45
 	if err != nil {
44 46
 		t.Fatalf("unexpected error: %v", err)
... ...
@@ -49,28 +51,25 @@ func TestRolling_deployInitial(t *testing.T) {
49 49
 }
50 50
 
51 51
 func TestRolling_deployRolling(t *testing.T) {
52
-	latest, _ := deployutil.MakeDeployment(deploytest.OkDeploymentConfig(1), kapi.Codec)
52
+	latestConfig := deploytest.OkDeploymentConfig(1)
53
+	latestConfig.Template.Strategy = deploytest.OkRollingStrategy()
54
+	latest, _ := deployutil.MakeDeployment(latestConfig, kapi.Codec)
53 55
 	config := deploytest.OkDeploymentConfig(2)
54
-	config.Template.Strategy = deployapi.DeploymentStrategy{
55
-		Type: deployapi.DeploymentStrategyTypeRolling,
56
-		RollingParams: &deployapi.RollingDeploymentStrategyParams{
57
-			IntervalSeconds:     mkintp(1),
58
-			UpdatePeriodSeconds: mkintp(2),
59
-			TimeoutSeconds:      mkintp(3),
60
-		},
61
-	}
56
+	config.Template.Strategy = deploytest.OkRollingStrategy()
62 57
 	deployment, _ := deployutil.MakeDeployment(config, kapi.Codec)
63 58
 
59
+	deployments := map[string]*kapi.ReplicationController{
60
+		latest.Name:     latest,
61
+		deployment.Name: deployment,
62
+	}
63
+
64 64
 	var rollingConfig *kubectl.RollingUpdaterConfig
65 65
 	deploymentUpdated := false
66 66
 	strategy := &RollingDeploymentStrategy{
67 67
 		codec: api.Codec,
68 68
 		client: &rollingUpdaterClient{
69 69
 			GetReplicationControllerFn: func(namespace, name string) (*kapi.ReplicationController, error) {
70
-				if name != latest.Name {
71
-					t.Fatalf("unexpected call to GetReplicationController for %s", name)
72
-				}
73
-				return latest, nil
70
+				return deployments[name], nil
74 71
 			},
75 72
 			UpdateReplicationControllerFn: func(namespace string, rc *kapi.ReplicationController) (*kapi.ReplicationController, error) {
76 73
 				if rc.Name != deployment.Name {
... ...
@@ -112,11 +111,11 @@ func TestRolling_deployRolling(t *testing.T) {
112 112
 		t.Errorf("expected Interval %d, got %d", e, a)
113 113
 	}
114 114
 
115
-	if e, a := 2*time.Second, rollingConfig.UpdatePeriod; e != a {
115
+	if e, a := 1*time.Second, rollingConfig.UpdatePeriod; e != a {
116 116
 		t.Errorf("expected UpdatePeriod %d, got %d", e, a)
117 117
 	}
118 118
 
119
-	if e, a := 3*time.Second, rollingConfig.Timeout; e != a {
119
+	if e, a := 20*time.Second, rollingConfig.Timeout; e != a {
120 120
 		t.Errorf("expected Timeout %d, got %d", e, a)
121 121
 	}
122 122
 
... ...
@@ -138,7 +137,9 @@ func TestRolling_deployRolling(t *testing.T) {
138 138
 func TestRolling_findLatestDeployment(t *testing.T) {
139 139
 	deployments := map[string]*kapi.ReplicationController{}
140 140
 	for i := 1; i <= 10; i++ {
141
-		deployment, _ := deployutil.MakeDeployment(deploytest.OkDeploymentConfig(i), kapi.Codec)
141
+		config := deploytest.OkDeploymentConfig(i)
142
+		config.Template.Strategy = deploytest.OkRollingStrategy()
143
+		deployment, _ := deployutil.MakeDeployment(config, kapi.Codec)
142 144
 		deployments[deployment.Name] = deployment
143 145
 	}
144 146
 
... ...
@@ -201,6 +202,137 @@ func TestRolling_findLatestDeployment(t *testing.T) {
201 201
 	}
202 202
 }
203 203
 
204
+func TestRolling_deployRollingHooks(t *testing.T) {
205
+	config := deploytest.OkDeploymentConfig(1)
206
+	config.Template.Strategy = deploytest.OkRollingStrategy()
207
+	latest, _ := deployutil.MakeDeployment(config, kapi.Codec)
208
+
209
+	var hookError error
210
+
211
+	deployments := map[string]*kapi.ReplicationController{latest.Name: latest}
212
+
213
+	strategy := &RollingDeploymentStrategy{
214
+		codec: api.Codec,
215
+		client: &rollingUpdaterClient{
216
+			GetReplicationControllerFn: func(namespace, name string) (*kapi.ReplicationController, error) {
217
+				return deployments[name], nil
218
+			},
219
+			UpdateReplicationControllerFn: func(namespace string, rc *kapi.ReplicationController) (*kapi.ReplicationController, error) {
220
+				return rc, nil
221
+			},
222
+		},
223
+		initialStrategy: &testStrategy{
224
+			deployFn: func(deployment *kapi.ReplicationController, oldDeployments []*kapi.ReplicationController) error {
225
+				t.Fatalf("unexpected call to initial strategy")
226
+				return nil
227
+			},
228
+		},
229
+		rollingUpdate: func(config *kubectl.RollingUpdaterConfig) error {
230
+			return nil
231
+		},
232
+		hookExecutor: &hookExecutorImpl{
233
+			executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, _ time.Duration) error {
234
+				return hookError
235
+			},
236
+		},
237
+	}
238
+
239
+	cases := []struct {
240
+		params               *deployapi.RollingDeploymentStrategyParams
241
+		hookShouldFail       bool
242
+		deploymentShouldFail bool
243
+	}{
244
+		{rollingParams(deployapi.LifecycleHookFailurePolicyAbort, ""), true, true},
245
+		{rollingParams(deployapi.LifecycleHookFailurePolicyAbort, ""), false, false},
246
+		{rollingParams("", deployapi.LifecycleHookFailurePolicyAbort), true, false},
247
+		{rollingParams("", deployapi.LifecycleHookFailurePolicyAbort), false, false},
248
+	}
249
+
250
+	for _, tc := range cases {
251
+		config := deploytest.OkDeploymentConfig(2)
252
+		config.Template.Strategy.RollingParams = tc.params
253
+		deployment, _ := deployutil.MakeDeployment(config, kapi.Codec)
254
+		deployments[deployment.Name] = deployment
255
+		hookError = nil
256
+		if tc.hookShouldFail {
257
+			hookError = fmt.Errorf("hook failure")
258
+		}
259
+		err := strategy.Deploy(deployment, []*kapi.ReplicationController{latest})
260
+		if err != nil && tc.deploymentShouldFail {
261
+			t.Logf("got expected error: %v", err)
262
+		}
263
+		if err == nil && tc.deploymentShouldFail {
264
+			t.Errorf("expected an error for case: %v", tc)
265
+		}
266
+		if err != nil && !tc.deploymentShouldFail {
267
+			t.Errorf("unexpected error for case: %v: %v", tc, err)
268
+		}
269
+	}
270
+}
271
+
272
+// TestRolling_deployInitialHooks can go away once the rolling strategy
273
+// supports initial deployments.
274
+func TestRolling_deployInitialHooks(t *testing.T) {
275
+	var hookError error
276
+
277
+	strategy := &RollingDeploymentStrategy{
278
+		codec: api.Codec,
279
+		client: &rollingUpdaterClient{
280
+			GetReplicationControllerFn: func(namespace, name string) (*kapi.ReplicationController, error) {
281
+				t.Fatalf("unexpected call to GetReplicationController")
282
+				return nil, nil
283
+			},
284
+			UpdateReplicationControllerFn: func(namespace string, rc *kapi.ReplicationController) (*kapi.ReplicationController, error) {
285
+				return rc, nil
286
+			},
287
+		},
288
+		initialStrategy: &testStrategy{
289
+			deployFn: func(deployment *kapi.ReplicationController, oldDeployments []*kapi.ReplicationController) error {
290
+				return nil
291
+			},
292
+		},
293
+		rollingUpdate: func(config *kubectl.RollingUpdaterConfig) error {
294
+			return nil
295
+		},
296
+		hookExecutor: &hookExecutorImpl{
297
+			executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, _ time.Duration) error {
298
+				return hookError
299
+			},
300
+		},
301
+	}
302
+
303
+	cases := []struct {
304
+		params               *deployapi.RollingDeploymentStrategyParams
305
+		hookShouldFail       bool
306
+		deploymentShouldFail bool
307
+	}{
308
+		{rollingParams(deployapi.LifecycleHookFailurePolicyAbort, ""), true, true},
309
+		{rollingParams(deployapi.LifecycleHookFailurePolicyAbort, ""), false, false},
310
+		{rollingParams("", deployapi.LifecycleHookFailurePolicyAbort), true, false},
311
+		{rollingParams("", deployapi.LifecycleHookFailurePolicyAbort), false, false},
312
+	}
313
+
314
+	for _, tc := range cases {
315
+		config := deploytest.OkDeploymentConfig(2)
316
+		config.Template.Strategy.RollingParams = tc.params
317
+		deployment, _ := deployutil.MakeDeployment(config, kapi.Codec)
318
+		hookError = nil
319
+		if tc.hookShouldFail {
320
+			hookError = fmt.Errorf("hook failure")
321
+		}
322
+		err := strategy.Deploy(deployment, []*kapi.ReplicationController{})
323
+		if err != nil && tc.deploymentShouldFail {
324
+			t.Logf("got expected error: %v", err)
325
+		}
326
+		if err == nil && tc.deploymentShouldFail {
327
+			t.Errorf("expected an error for case: %v", tc)
328
+		}
329
+		if err != nil && !tc.deploymentShouldFail {
330
+			t.Errorf("unexpected error for case: %v: %v", tc, err)
331
+		}
332
+	}
333
+}
334
+
204 335
 type testStrategy struct {
205 336
 	deployFn func(deployment *kapi.ReplicationController, oldDeployments []*kapi.ReplicationController) error
206 337
 }
... ...
@@ -213,3 +345,28 @@ func mkintp(i int) *int64 {
213 213
 	v := int64(i)
214 214
 	return &v
215 215
 }
216
+
217
+func rollingParams(preFailurePolicy, postFailurePolicy deployapi.LifecycleHookFailurePolicy) *deployapi.RollingDeploymentStrategyParams {
218
+	var pre *deployapi.LifecycleHook
219
+	var post *deployapi.LifecycleHook
220
+
221
+	if len(preFailurePolicy) > 0 {
222
+		pre = &deployapi.LifecycleHook{
223
+			FailurePolicy: preFailurePolicy,
224
+			ExecNewPod:    &deployapi.ExecNewPodHook{},
225
+		}
226
+	}
227
+	if len(postFailurePolicy) > 0 {
228
+		post = &deployapi.LifecycleHook{
229
+			FailurePolicy: postFailurePolicy,
230
+			ExecNewPod:    &deployapi.ExecNewPodHook{},
231
+		}
232
+	}
233
+	return &deployapi.RollingDeploymentStrategyParams{
234
+		UpdatePeriodSeconds: mkintp(1),
235
+		IntervalSeconds:     mkintp(1),
236
+		TimeoutSeconds:      mkintp(20),
237
+		Pre:                 pre,
238
+		Post:                post,
239
+	}
240
+}
... ...
@@ -2,12 +2,17 @@ package support
2 2
 
3 3
 import (
4 4
 	"fmt"
5
-	"reflect"
5
+	"time"
6 6
 
7 7
 	"github.com/golang/glog"
8 8
 
9 9
 	kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
10 10
 	kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
11
+	kclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
12
+	"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
13
+	"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
14
+	"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
15
+	"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
11 16
 	"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
12 17
 
13 18
 	deployapi "github.com/openshift/origin/pkg/deploy/api"
... ...
@@ -21,11 +26,29 @@ type HookExecutor struct {
21 21
 }
22 22
 
23 23
 // Execute executes hook in the context of deployment.
24
-func (e *HookExecutor) Execute(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController) error {
25
-	if hook.ExecNewPod != nil {
26
-		return e.executeExecNewPod(hook.ExecNewPod, deployment)
24
+func (e *HookExecutor) Execute(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, retryPeriod time.Duration) error {
25
+	for {
26
+		var err error
27
+		switch {
28
+		case hook.ExecNewPod != nil:
29
+			err = e.executeExecNewPod(hook.ExecNewPod, deployment)
30
+		}
31
+
32
+		if err == nil {
33
+			return nil
34
+		}
35
+
36
+		switch hook.FailurePolicy {
37
+		case deployapi.LifecycleHookFailurePolicyAbort:
38
+			return fmt.Errorf("Hook failed, aborting: %s", err)
39
+		case deployapi.LifecycleHookFailurePolicyIgnore:
40
+			glog.Infof("Hook failed, ignoring: %s", err)
41
+			return nil
42
+		case deployapi.LifecycleHookFailurePolicyRetry:
43
+			glog.Infof("Hook failed, retrying: %s", err)
44
+			time.Sleep(retryPeriod)
45
+		}
27 46
 	}
28
-	return nil
29 47
 }
30 48
 
31 49
 // executeExecNewPod executes a ExecNewPod hook by creating a new pod based on
... ...
@@ -44,13 +67,6 @@ func (e *HookExecutor) executeExecNewPod(hook *deployapi.ExecNewPodHook, deploym
44 44
 		return err
45 45
 	}
46 46
 
47
-	// Set up a watch for the pod
48
-	podWatch, err := e.PodClient.WatchPod(deployment.Namespace, podSpec.Name)
49
-	if err != nil {
50
-		return fmt.Errorf("couldn't create watch for pod %s/%s: %s", deployment.Namespace, podSpec.Name, err)
51
-	}
52
-	defer podWatch.Stop()
53
-
54 47
 	// Try to create the pod
55 48
 	pod, err := e.PodClient.CreatePod(deployment.Namespace, podSpec)
56 49
 	if err != nil {
... ...
@@ -63,28 +79,16 @@ func (e *HookExecutor) executeExecNewPod(hook *deployapi.ExecNewPodHook, deploym
63 63
 
64 64
 	// Wait for the pod to finish.
65 65
 	// TODO: Delete pod before returning?
66
+	nextPod := e.PodClient.PodWatch(pod.Namespace, pod.Name, pod.ResourceVersion)
66 67
 	glog.V(0).Infof("Waiting for hook pod %s/%s to complete", pod.Namespace, pod.Name)
67 68
 	for {
68
-		select {
69
-		case event, ok := <-podWatch.ResultChan():
70
-			if !ok {
71
-				return fmt.Errorf("couldn't watch pod %s/%s", pod.Namespace, pod.Name)
72
-			}
73
-			if event.Type == watch.Error {
74
-				return kerrors.FromObject(event.Object)
75
-			}
76
-			pod, podOk := event.Object.(*kapi.Pod)
77
-			if !podOk {
78
-				return fmt.Errorf("expected a pod event, got a %s", reflect.TypeOf(event.Object))
79
-			}
80
-			glog.V(0).Infof("Lifecycle pod %s/%s in phase %s", pod.Namespace, pod.Name, pod.Status.Phase)
81
-			switch pod.Status.Phase {
82
-			case kapi.PodSucceeded:
83
-				return nil
84
-			case kapi.PodFailed:
85
-				// TODO: Add context
86
-				return fmt.Errorf("pod failed")
87
-			}
69
+		pod := nextPod()
70
+		switch pod.Status.Phase {
71
+		case kapi.PodSucceeded:
72
+			return nil
73
+		case kapi.PodFailed:
74
+			// TODO: Add context
75
+			return fmt.Errorf("pod failed")
88 76
 		}
89 77
 	}
90 78
 }
... ...
@@ -157,19 +161,40 @@ func buildContainer(hook *deployapi.ExecNewPodHook, deployment *kapi.Replication
157 157
 // HookExecutorPodClient abstracts access to pods.
158 158
 type HookExecutorPodClient interface {
159 159
 	CreatePod(namespace string, pod *kapi.Pod) (*kapi.Pod, error)
160
-	WatchPod(namespace, name string) (watch.Interface, error)
160
+	PodWatch(namespace, name, resourceVersion string) func() *kapi.Pod
161 161
 }
162 162
 
163 163
 // HookExecutorPodClientImpl is a pluggable HookExecutorPodClient.
164 164
 type HookExecutorPodClientImpl struct {
165 165
 	CreatePodFunc func(namespace string, pod *kapi.Pod) (*kapi.Pod, error)
166
-	WatchPodFunc  func(namespace, name string) (watch.Interface, error)
166
+	PodWatchFunc  func(namespace, name, resourceVersion string) func() *kapi.Pod
167 167
 }
168 168
 
169 169
 func (i *HookExecutorPodClientImpl) CreatePod(namespace string, pod *kapi.Pod) (*kapi.Pod, error) {
170 170
 	return i.CreatePodFunc(namespace, pod)
171 171
 }
172 172
 
173
-func (i *HookExecutorPodClientImpl) WatchPod(namespace, name string) (watch.Interface, error) {
174
-	return i.WatchPodFunc(namespace, name)
173
+func (i *HookExecutorPodClientImpl) PodWatch(namespace, name, resourceVersion string) func() *kapi.Pod {
174
+	return i.PodWatchFunc(namespace, name, resourceVersion)
175
+}
176
+
177
+// NewPodWatch creates a pod watching function which is backed by a
178
+// FIFO/reflector pair. This avoids managing watches directly.
179
+func NewPodWatch(client kclient.Interface, namespace, name, resourceVersion string) func() *kapi.Pod {
180
+	fieldSelector, _ := fields.ParseSelector("metadata.name=" + name)
181
+	podLW := &deployutil.ListWatcherImpl{
182
+		ListFunc: func() (runtime.Object, error) {
183
+			return client.Pods(namespace).List(labels.Everything(), fieldSelector)
184
+		},
185
+		WatchFunc: func(resourceVersion string) (watch.Interface, error) {
186
+			return client.Pods(namespace).Watch(labels.Everything(), fieldSelector, resourceVersion)
187
+		},
188
+	}
189
+	queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
190
+	cache.NewReflector(podLW, &kapi.Pod{}, queue, 1*time.Minute).Run()
191
+
192
+	return func() *kapi.Pod {
193
+		obj := queue.Pop()
194
+		return obj.(*kapi.Pod)
195
+	}
175 196
 }
... ...
@@ -6,65 +6,31 @@ import (
6 6
 
7 7
 	kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
8 8
 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
9
-	"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
10 9
 
11 10
 	deployapi "github.com/openshift/origin/pkg/deploy/api"
12 11
 	deploytest "github.com/openshift/origin/pkg/deploy/api/test"
13 12
 	deployutil "github.com/openshift/origin/pkg/deploy/util"
14 13
 )
15 14
 
16
-func TestHookExecutor_executeExecNewWatchFailure(t *testing.T) {
17
-	hook := &deployapi.LifecycleHook{
18
-		ExecNewPod: &deployapi.ExecNewPodHook{
19
-			ContainerName: "undefined",
20
-		},
21
-	}
22
-
23
-	deployment, _ := deployutil.MakeDeployment(deploytest.OkDeploymentConfig(1), kapi.Codec)
24
-
25
-	executor := &HookExecutor{
26
-		PodClient: &HookExecutorPodClientImpl{
27
-			CreatePodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) {
28
-				t.Fatalf("unexpected call to CreatePod")
29
-				return nil, nil
30
-			},
31
-			WatchPodFunc: func(namespace, name string) (watch.Interface, error) {
32
-				return nil, fmt.Errorf("couldn't make watch")
33
-			},
34
-		},
35
-	}
36
-
37
-	err := executor.Execute(hook, deployment)
38
-
39
-	if err == nil {
40
-		t.Fatalf("expected an error")
41
-	}
42
-	t.Logf("got expected error: %s", err)
43
-}
44
-
45 15
 func TestHookExecutor_executeExecNewCreatePodFailure(t *testing.T) {
46
-	hook := &deployapi.LifecycleHook{
47
-		ExecNewPod: &deployapi.ExecNewPodHook{
48
-			ContainerName: "container1",
49
-		},
16
+	hook := &deployapi.ExecNewPodHook{
17
+		ContainerName: "container1",
50 18
 	}
51 19
 
52 20
 	deployment, _ := deployutil.MakeDeployment(deploytest.OkDeploymentConfig(1), kapi.Codec)
53 21
 
54
-	podWatch := newTestWatch()
55
-
56 22
 	executor := &HookExecutor{
57 23
 		PodClient: &HookExecutorPodClientImpl{
58 24
 			CreatePodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) {
59 25
 				return nil, fmt.Errorf("couldn't create pod")
60 26
 			},
61
-			WatchPodFunc: func(namespace, name string) (watch.Interface, error) {
62
-				return podWatch, nil
27
+			PodWatchFunc: func(namespace, name, resourceVersion string) func() *kapi.Pod {
28
+				return func() *kapi.Pod { return nil }
63 29
 			},
64 30
 		},
65 31
 	}
66 32
 
67
-	err := executor.Execute(hook, deployment)
33
+	err := executor.executeExecNewPod(hook, deployment)
68 34
 
69 35
 	if err == nil {
70 36
 		t.Fatalf("expected an error")
... ...
@@ -73,40 +39,28 @@ func TestHookExecutor_executeExecNewCreatePodFailure(t *testing.T) {
73 73
 }
74 74
 
75 75
 func TestHookExecutor_executeExecNewPodSucceeded(t *testing.T) {
76
-	hook := &deployapi.LifecycleHook{
77
-		ExecNewPod: &deployapi.ExecNewPodHook{
78
-			ContainerName: "container1",
79
-		},
76
+	hook := &deployapi.ExecNewPodHook{
77
+		ContainerName: "container1",
80 78
 	}
81 79
 
82 80
 	config := deploytest.OkDeploymentConfig(1)
83 81
 	deployment, _ := deployutil.MakeDeployment(config, kapi.Codec)
84 82
 
85
-	podWatch := newTestWatch()
86
-
87 83
 	var createdPod *kapi.Pod
88 84
 	executor := &HookExecutor{
89 85
 		PodClient: &HookExecutorPodClientImpl{
90 86
 			CreatePodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) {
91
-				go func() {
92
-					obj, _ := kapi.Scheme.Copy(pod)
93
-					cp := obj.(*kapi.Pod)
94
-					cp.Status.Phase = kapi.PodSucceeded
95
-					podWatch.events <- watch.Event{
96
-						Type:   watch.Modified,
97
-						Object: cp,
98
-					}
99
-				}()
100 87
 				createdPod = pod
101 88
 				return createdPod, nil
102 89
 			},
103
-			WatchPodFunc: func(namespace, name string) (watch.Interface, error) {
104
-				return podWatch, nil
90
+			PodWatchFunc: func(namespace, name, resourceVersion string) func() *kapi.Pod {
91
+				createdPod.Status.Phase = kapi.PodSucceeded
92
+				return func() *kapi.Pod { return createdPod }
105 93
 			},
106 94
 		},
107 95
 	}
108 96
 
109
-	err := executor.Execute(hook, deployment)
97
+	err := executor.executeExecNewPod(hook, deployment)
110 98
 
111 99
 	if err != nil {
112 100
 		t.Fatalf("unexpected error: %s", err)
... ...
@@ -122,39 +76,27 @@ func TestHookExecutor_executeExecNewPodSucceeded(t *testing.T) {
122 122
 }
123 123
 
124 124
 func TestHookExecutor_executeExecNewPodFailed(t *testing.T) {
125
-	hook := &deployapi.LifecycleHook{
126
-		ExecNewPod: &deployapi.ExecNewPodHook{
127
-			ContainerName: "container1",
128
-		},
125
+	hook := &deployapi.ExecNewPodHook{
126
+		ContainerName: "container1",
129 127
 	}
130 128
 
131 129
 	deployment, _ := deployutil.MakeDeployment(deploytest.OkDeploymentConfig(1), kapi.Codec)
132 130
 
133
-	podWatch := newTestWatch()
134
-
135 131
 	var createdPod *kapi.Pod
136 132
 	executor := &HookExecutor{
137 133
 		PodClient: &HookExecutorPodClientImpl{
138 134
 			CreatePodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) {
139
-				go func() {
140
-					obj, _ := kapi.Scheme.Copy(pod)
141
-					cp := obj.(*kapi.Pod)
142
-					cp.Status.Phase = kapi.PodFailed
143
-					podWatch.events <- watch.Event{
144
-						Type:   watch.Modified,
145
-						Object: cp,
146
-					}
147
-				}()
148 135
 				createdPod = pod
149 136
 				return createdPod, nil
150 137
 			},
151
-			WatchPodFunc: func(namespace, name string) (watch.Interface, error) {
152
-				return podWatch, nil
138
+			PodWatchFunc: func(namespace, name, resourceVersion string) func() *kapi.Pod {
139
+				createdPod.Status.Phase = kapi.PodFailed
140
+				return func() *kapi.Pod { return createdPod }
153 141
 			},
154 142
 		},
155 143
 	}
156 144
 
157
-	err := executor.Execute(hook, deployment)
145
+	err := executor.executeExecNewPod(hook, deployment)
158 146
 
159 147
 	if err == nil {
160 148
 		t.Fatalf("expected an error", err)
... ...
@@ -262,18 +204,3 @@ func TestHookExecutor_buildContainerOk(t *testing.T) {
262 262
 		t.Fatalf("expected restart policy %s, got %s", e, a)
263 263
 	}
264 264
 }
265
-
266
-type testWatch struct {
267
-	events chan watch.Event
268
-}
269
-
270
-func newTestWatch() *testWatch {
271
-	return &testWatch{make(chan watch.Event)}
272
-}
273
-
274
-func (w *testWatch) Stop() {
275
-}
276
-
277
-func (w *testWatch) ResultChan() <-chan watch.Event {
278
-	return w.events
279
-}