Browse code

Implement deployment canary and fixes

Implement the deployment canary and other deployment improvements
defined in https://github.com/openshift/origin/pull/2892.

Dan Mace authored on 2015/06/05 21:48:01
Showing 15 changed files
... ...
@@ -60,6 +60,9 @@ type RollingUpdaterConfig struct {
60 60
 	// CleanupPolicy defines the cleanup action to take after the deployment is
61 61
 	// complete.
62 62
 	CleanupPolicy RollingUpdaterCleanupPolicy
63
+	// UpdateAcceptor is optional and drives acceptance of the first controller
64
+	// during scale-up. If nil, controllers are always accepted.
65
+	UpdateAcceptor UpdateAcceptor
63 66
 }
64 67
 
65 68
 // RollingUpdaterCleanupPolicy is a cleanup action to take after the
... ...
@@ -76,6 +79,20 @@ const (
76 76
 	RenameRollingUpdateCleanupPolicy RollingUpdaterCleanupPolicy = "Rename"
77 77
 )
78 78
 
79
+// UpdateAcceptor is given a chance to accept or reject the first controller
80
+// during a deployment.
81
+//
82
+// After the successful scale-up of the first replica, the replica is given
83
+// the the UpdateAcceptor. If the UpdateAcceptor rejects the replica, the
84
+// deployment is stopped with an error.
85
+//
86
+// Only the first replica scaled up during the deployment is checked for
87
+// acceptance.
88
+type UpdateAcceptor interface {
89
+	// Accept returns nil if the controller is okay, otherwise returns an error.
90
+	Accept(*api.ReplicationController) error
91
+}
92
+
79 93
 func LoadExistingNextReplicationController(c *client.Client, namespace, newName string) (*api.ReplicationController, error) {
80 94
 	if len(newName) == 0 {
81 95
 		return nil, nil
... ...
@@ -354,6 +371,7 @@ func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error {
354 354
 	}
355 355
 
356 356
 	// +1, -1 on oldRc, newRc until newRc has desired number of replicas or oldRc has 0 replicas
357
+	updateAccepted := false
357 358
 	for newRc.Spec.Replicas < desired && oldRc.Spec.Replicas != 0 {
358 359
 		newRc.Spec.Replicas += 1
359 360
 		oldRc.Spec.Replicas -= 1
... ...
@@ -368,6 +386,14 @@ func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error {
368 368
 		if err != nil {
369 369
 			return err
370 370
 		}
371
+		// Perform the update acceptance check exactly once.
372
+		if config.UpdateAcceptor != nil && !updateAccepted {
373
+			err := config.UpdateAcceptor.Accept(newRc)
374
+			if err != nil {
375
+				return fmt.Errorf("Update rejected for %s: %v", newRc.Name, err)
376
+			}
377
+			updateAccepted = true
378
+		}
371 379
 		time.Sleep(updatePeriod)
372 380
 		oldRc, err = r.scaleAndWait(oldRc, retry, waitForReplicas)
373 381
 		if err != nil {
... ...
@@ -2,9 +2,12 @@ package deployer
2 2
 
3 3
 import (
4 4
 	"fmt"
5
+	"sort"
6
+	"time"
5 7
 
6 8
 	kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
7 9
 	kclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
10
+	"github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl"
8 11
 	"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
9 12
 	"github.com/golang/glog"
10 13
 	"github.com/spf13/cobra"
... ...
@@ -32,11 +35,6 @@ type config struct {
32 32
 	Namespace      string
33 33
 }
34 34
 
35
-type replicationControllerGetter interface {
36
-	Get(namespace, name string) (*kapi.ReplicationController, error)
37
-	List(namespace string, selector labels.Selector) (*kapi.ReplicationControllerList, error)
38
-}
39
-
40 35
 // NewCommandDeployer provides a CLI handler for deploy.
41 36
 func NewCommandDeployer(name string) *cobra.Command {
42 37
 	cfg := &config{
... ...
@@ -61,7 +59,8 @@ func NewCommandDeployer(name string) *cobra.Command {
61 61
 				glog.Fatal("namespace is required")
62 62
 			}
63 63
 
64
-			if err = deploy(kClient, cfg.Namespace, cfg.DeploymentName); err != nil {
64
+			deployer := NewDeployer(kClient)
65
+			if err = deployer.Deploy(cfg.Namespace, cfg.DeploymentName); err != nil {
65 66
 				glog.Fatal(err)
66 67
 			}
67 68
 		},
... ...
@@ -77,88 +76,124 @@ func NewCommandDeployer(name string) *cobra.Command {
77 77
 	return cmd
78 78
 }
79 79
 
80
-// deploy executes a deployment strategy.
81
-func deploy(kClient kclient.Interface, namespace, deploymentName string) error {
82
-	deployment, oldDeployments, err := getDeployerContext(&realReplicationControllerGetter{kClient}, namespace, deploymentName)
80
+// NewDeployer makes a new Deployer from a kube client.
81
+func NewDeployer(client kclient.Interface) *Deployer {
82
+	scaler, _ := kubectl.ScalerFor("ReplicationController", kubectl.NewScalerClient(client))
83
+	return &Deployer{
84
+		getDeployment: func(namespace, name string) (*kapi.ReplicationController, error) {
85
+			return client.ReplicationControllers(namespace).Get(name)
86
+		},
87
+		getControllers: func(namespace string) (*kapi.ReplicationControllerList, error) {
88
+			return client.ReplicationControllers(namespace).List(labels.Everything())
89
+		},
90
+		scaler: scaler,
91
+		strategyFor: func(config *deployapi.DeploymentConfig) (strategy.DeploymentStrategy, error) {
92
+			switch config.Template.Strategy.Type {
93
+			case deployapi.DeploymentStrategyTypeRecreate:
94
+				return recreate.NewRecreateDeploymentStrategy(client, latest.Codec), nil
95
+			case deployapi.DeploymentStrategyTypeRolling:
96
+				recreate := recreate.NewRecreateDeploymentStrategy(client, latest.Codec)
97
+				return rolling.NewRollingDeploymentStrategy(config.Namespace, client, latest.Codec, recreate), nil
98
+			default:
99
+				return nil, fmt.Errorf("unsupported strategy type: %s", config.Template.Strategy.Type)
100
+			}
101
+		},
102
+	}
103
+}
104
+
105
+// Deployer prepares and executes the deployment process. It will:
106
+//
107
+// 1. Validate the deployment has a desired replica count and strategy.
108
+// 2. Find the last completed deployment.
109
+// 3. Scale down to 0 any old deployments which aren't the new deployment or
110
+// the last complete deployment.
111
+// 4. Pass the last completed deployment and the new deployment to a strategy
112
+// to perform the deployment.
113
+type Deployer struct {
114
+	// strategyFor returns a DeploymentStrategy for config.
115
+	strategyFor func(config *deployapi.DeploymentConfig) (strategy.DeploymentStrategy, error)
116
+	// getDeployment finds the named deployment.
117
+	getDeployment func(namespace, name string) (*kapi.ReplicationController, error)
118
+	// getControllers finds all controllers in namespace.
119
+	getControllers func(namespace string) (*kapi.ReplicationControllerList, error)
120
+	// scaler is used to scale replication controllers.
121
+	scaler kubectl.Scaler
122
+}
123
+
124
+// Deploy starts the deployment process for deploymentName.
125
+func (d *Deployer) Deploy(namespace, deploymentName string) error {
126
+	// Look up the new deployment.
127
+	deployment, err := d.getDeployment(namespace, deploymentName)
83 128
 	if err != nil {
84
-		return err
129
+		return fmt.Errorf("couldn't get deployment %s/%s: %v", namespace, deploymentName, err)
85 130
 	}
86 131
 
132
+	// Decode the config from the deployment.
87 133
 	config, err := deployutil.DecodeDeploymentConfig(deployment, latest.Codec)
88 134
 	if err != nil {
89 135
 		return fmt.Errorf("couldn't decode DeploymentConfig from deployment %s/%s: %v", deployment.Namespace, deployment.Name, err)
90 136
 	}
91 137
 
92
-	var strategy strategy.DeploymentStrategy
93
-
94
-	switch config.Template.Strategy.Type {
95
-	case deployapi.DeploymentStrategyTypeRecreate:
96
-		strategy = recreate.NewRecreateDeploymentStrategy(kClient, latest.Codec)
97
-	case deployapi.DeploymentStrategyTypeRolling:
98
-		recreate := recreate.NewRecreateDeploymentStrategy(kClient, latest.Codec)
99
-		strategy = rolling.NewRollingDeploymentStrategy(deployment.Namespace, kClient, latest.Codec, recreate)
100
-	default:
101
-		return fmt.Errorf("unsupported strategy type: %s", config.Template.Strategy.Type)
138
+	// Get a strategy for the deployment.
139
+	strategy, err := d.strategyFor(config)
140
+	if err != nil {
141
+		return err
102 142
 	}
103 143
 
104
-	return strategy.Deploy(deployment, oldDeployments)
105
-}
106
-
107
-// getDeployerContext finds the target deployment and any deployments it considers to be prior to the
108
-// target deployment. Only deployments whose LatestVersion is less than the target deployment are
109
-// considered to be prior.
110
-func getDeployerContext(controllerGetter replicationControllerGetter, namespace, deploymentName string) (*kapi.ReplicationController, []*kapi.ReplicationController, error) {
111
-	var err error
112
-	var newDeployment *kapi.ReplicationController
113
-	var newConfig *deployapi.DeploymentConfig
114
-
115
-	// Look up the new deployment and its associated config.
116
-	if newDeployment, err = controllerGetter.Get(namespace, deploymentName); err != nil {
117
-		return nil, nil, err
144
+	// New deployments must have a desired replica count.
145
+	desiredReplicas, hasDesired := deployutil.DeploymentDesiredReplicas(deployment)
146
+	if !hasDesired {
147
+		return fmt.Errorf("deployment %s has no desired replica count", deployutil.LabelForDeployment(deployment))
118 148
 	}
119 149
 
120
-	if newConfig, err = deployutil.DecodeDeploymentConfig(newDeployment, latest.Codec); err != nil {
121
-		return nil, nil, err
150
+	// Find all controllers in order to pick out the deployments.
151
+	controllers, err := d.getControllers(namespace)
152
+	if err != nil {
153
+		return fmt.Errorf("couldn't get controllers in namespace %s: %v", namespace, err)
122 154
 	}
123 155
 
124
-	glog.Infof("Found new Deployment %s for DeploymentConfig %s/%s with latestVersion %d", newDeployment.Name, newConfig.Namespace, newConfig.Name, newConfig.LatestVersion)
125
-
126
-	// Collect all deployments that predate the new one by comparing all old ReplicationControllers with
127
-	// encoded DeploymentConfigs to the new one by LatestVersion. Treat a failure to interpret a given
128
-	// old deployment as a fatal error to prevent overlapping deployments.
129
-	var allControllers *kapi.ReplicationControllerList
130
-	oldDeployments := []*kapi.ReplicationController{}
156
+	// Find all deployments sorted by version.
157
+	deployments := deployutil.ConfigSelector(config.Name, controllers.Items)
158
+	sort.Sort(deployutil.DeploymentsByLatestVersionDesc(deployments))
131 159
 
132
-	if allControllers, err = controllerGetter.List(newDeployment.Namespace, labels.Everything()); err != nil {
133
-		return nil, nil, fmt.Errorf("unable to get list replication controllers in deployment namespace %s: %v", newDeployment.Namespace, err)
160
+	// Find any last completed deployment.
161
+	var lastDeployment *kapi.ReplicationController
162
+	for _, candidate := range deployments {
163
+		if candidate.Name == deployment.Name {
164
+			continue
165
+		}
166
+		if deployutil.DeploymentStatusFor(&candidate) == deployapi.DeploymentStatusComplete {
167
+			lastDeployment = &candidate
168
+			glog.Infof("Picked %s as the last completed deployment", deployutil.LabelForDeployment(&candidate))
169
+			break
170
+		}
171
+	}
172
+	if lastDeployment == nil {
173
+		glog.Info("No last completed deployment found")
134 174
 	}
135 175
 
136
-	glog.Infof("Inspecting %d potential prior deployments", len(allControllers.Items))
137
-	for i, controller := range allControllers.Items {
138
-		if oldName := deployutil.DeploymentConfigNameFor(&controller); oldName != newConfig.Name {
139
-			glog.Infof("Disregarding deployment %s (doesn't match target DeploymentConfig %s)", controller.Name, oldName)
176
+	// Scale down any deployments which aren't the new or last deployment.
177
+	for _, candidate := range deployments {
178
+		// Skip the from/to deployments.
179
+		if candidate.Name == deployment.Name {
140 180
 			continue
141 181
 		}
142
-
143
-		if deployutil.DeploymentVersionFor(&controller) < newConfig.LatestVersion {
144
-			glog.Infof("Marking deployment %s as a prior deployment", controller.Name)
145
-			oldDeployments = append(oldDeployments, &allControllers.Items[i])
182
+		if lastDeployment != nil && candidate.Name == lastDeployment.Name {
183
+			continue
184
+		}
185
+		// Skip the deployment if it's already scaled down.
186
+		if candidate.Spec.Replicas == 0 {
187
+			continue
188
+		}
189
+		// Scale the deployment down to zero.
190
+		retryWaitParams := kubectl.NewRetryParams(1*time.Second, 120*time.Second)
191
+		if err := d.scaler.Scale(candidate.Namespace, candidate.Name, uint(0), &kubectl.ScalePrecondition{-1, ""}, retryWaitParams, retryWaitParams); err != nil {
192
+			glog.Infof("Couldn't scale down prior deployment %s: %v", deployutil.LabelForDeployment(&candidate), err)
146 193
 		} else {
147
-			glog.Infof("Disregarding deployment %s (same as or newer than target)", controller.Name)
194
+			glog.Infof("Scaled down prior deployment %s", deployutil.LabelForDeployment(&candidate))
148 195
 		}
149 196
 	}
150 197
 
151
-	return newDeployment, oldDeployments, nil
152
-}
153
-
154
-type realReplicationControllerGetter struct {
155
-	kClient kclient.Interface
156
-}
157
-
158
-func (r *realReplicationControllerGetter) Get(namespace, name string) (*kapi.ReplicationController, error) {
159
-	return r.kClient.ReplicationControllers(namespace).Get(name)
160
-}
161
-
162
-func (r *realReplicationControllerGetter) List(namespace string, selector labels.Selector) (*kapi.ReplicationControllerList, error) {
163
-	return r.kClient.ReplicationControllers(namespace).List(selector)
198
+	// Perform the deployment.
199
+	return strategy.Deploy(lastDeployment, deployment, desiredReplicas)
164 200
 }
... ...
@@ -1,172 +1,204 @@
1 1
 package deployer
2 2
 
3 3
 import (
4
+	"fmt"
5
+	"strconv"
4 6
 	"testing"
5 7
 
6 8
 	kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
7
-	kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
8
-	"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
9 9
 
10 10
 	deployapi "github.com/openshift/origin/pkg/deploy/api"
11 11
 	deploytest "github.com/openshift/origin/pkg/deploy/api/test"
12
+	scalertest "github.com/openshift/origin/pkg/deploy/scaler/test"
13
+	"github.com/openshift/origin/pkg/deploy/strategy"
12 14
 	deployutil "github.com/openshift/origin/pkg/deploy/util"
13 15
 )
14 16
 
15
-func TestGetDeploymentContextMissingDeployment(t *testing.T) {
16
-	getter := &testReplicationControllerGetter{
17
-		getFunc: func(namespace, name string) (*kapi.ReplicationController, error) {
18
-			return nil, kerrors.NewNotFound("replicationController", name)
19
-		},
20
-		listFunc: func(namespace string, selector labels.Selector) (*kapi.ReplicationControllerList, error) {
21
-			t.Fatal("unexpected list call")
17
+func TestDeployer_getDeploymentFail(t *testing.T) {
18
+	deployer := &Deployer{
19
+		strategyFor: func(config *deployapi.DeploymentConfig) (strategy.DeploymentStrategy, error) {
20
+			t.Fatal("unexpected call")
22 21
 			return nil, nil
23 22
 		},
24
-	}
25
-
26
-	newDeployment, oldDeployments, err := getDeployerContext(getter, kapi.NamespaceDefault, "deployment")
27
-
28
-	if newDeployment != nil {
29
-		t.Fatalf("unexpected newDeployment: %#v", newDeployment)
30
-	}
31
-
32
-	if oldDeployments != nil {
33
-		t.Fatalf("unexpected oldDeployments: %#v", oldDeployments)
34
-	}
35
-
36
-	if err == nil {
37
-		t.Fatal("expected an error")
38
-	}
39
-}
40
-
41
-func TestGetDeploymentContextInvalidEncodedConfig(t *testing.T) {
42
-	getter := &testReplicationControllerGetter{
43
-		getFunc: func(namespace, name string) (*kapi.ReplicationController, error) {
44
-			return &kapi.ReplicationController{}, nil
23
+		getDeployment: func(namespace, name string) (*kapi.ReplicationController, error) {
24
+			return nil, fmt.Errorf("get error")
45 25
 		},
46
-		listFunc: func(namespace string, selector labels.Selector) (*kapi.ReplicationControllerList, error) {
47
-			return &kapi.ReplicationControllerList{}, nil
26
+		getControllers: func(namespace string) (*kapi.ReplicationControllerList, error) {
27
+			t.Fatal("unexpected call")
28
+			return nil, nil
48 29
 		},
30
+		scaler: &scalertest.FakeScaler{},
49 31
 	}
50 32
 
51
-	newDeployment, oldDeployments, err := getDeployerContext(getter, kapi.NamespaceDefault, "deployment")
52
-
53
-	if newDeployment != nil {
54
-		t.Fatalf("unexpected newDeployment: %#v", newDeployment)
55
-	}
56
-
57
-	if oldDeployments != nil {
58
-		t.Fatalf("unexpected oldDeployments: %#v", oldDeployments)
59
-	}
60
-
33
+	err := deployer.Deploy("namespace", "name")
61 34
 	if err == nil {
62
-		t.Fatal("expected an error")
35
+		t.Fatalf("expected an error")
63 36
 	}
37
+	t.Logf("got expected error: %v", err)
64 38
 }
65 39
 
66
-func TestGetDeploymentContextNoPriorDeployments(t *testing.T) {
67
-	getter := &testReplicationControllerGetter{
68
-		getFunc: func(namespace, name string) (*kapi.ReplicationController, error) {
69
-			deployment, _ := deployutil.MakeDeployment(deploytest.OkDeploymentConfig(1), kapi.Codec)
70
-			return deployment, nil
40
+func TestDeployer_deployScenarios(t *testing.T) {
41
+	mkd := func(version int, status deployapi.DeploymentStatus, replicas int, desired int) *kapi.ReplicationController {
42
+		deployment := mkdeployment(version, status)
43
+		deployment.Spec.Replicas = replicas
44
+		if desired > 0 {
45
+			deployment.Annotations[deployapi.DesiredReplicasAnnotation] = strconv.Itoa(desired)
46
+		}
47
+		return deployment
48
+	}
49
+	type scaleEvent struct {
50
+		version int
51
+		size    int
52
+	}
53
+	scenarios := []struct {
54
+		name        string
55
+		deployments []*kapi.ReplicationController
56
+		fromVersion int
57
+		toVersion   int
58
+		scaleEvents []scaleEvent
59
+	}{
60
+		{
61
+			"initial deployment",
62
+			// existing deployments
63
+			[]*kapi.ReplicationController{
64
+				mkd(1, deployapi.DeploymentStatusNew, 0, 3),
65
+			},
66
+			// from and to version
67
+			0, 1,
68
+			// expected scale events
69
+			[]scaleEvent{},
71 70
 		},
72
-		listFunc: func(namespace string, selector labels.Selector) (*kapi.ReplicationControllerList, error) {
73
-			return &kapi.ReplicationControllerList{}, nil
71
+		{
72
+			"last deploy failed",
73
+			// existing deployments
74
+			[]*kapi.ReplicationController{
75
+				mkd(1, deployapi.DeploymentStatusComplete, 3, 0),
76
+				mkd(2, deployapi.DeploymentStatusFailed, 1, 3),
77
+				mkd(3, deployapi.DeploymentStatusNew, 0, 3),
78
+			},
79
+			// from and to version
80
+			1, 3,
81
+			// expected scale events
82
+			[]scaleEvent{
83
+				{2, 0},
84
+			},
74 85
 		},
75
-	}
76
-
77
-	newDeployment, oldDeployments, err := getDeployerContext(getter, kapi.NamespaceDefault, "deployment")
78
-
79
-	if err != nil {
80
-		t.Fatalf("unexpected error: %v", err)
81
-	}
82
-
83
-	if newDeployment == nil {
84
-		t.Fatal("expected deployment")
85
-	}
86
-
87
-	if oldDeployments == nil {
88
-		t.Fatal("expected non-nil oldDeployments")
89
-	}
90
-
91
-	if len(oldDeployments) > 0 {
92
-		t.Fatalf("unexpected non-empty oldDeployments: %#v", oldDeployments)
93
-	}
94
-}
95
-
96
-func TestGetDeploymentContextWithPriorDeployments(t *testing.T) {
97
-	getter := &testReplicationControllerGetter{
98
-		getFunc: func(namespace, name string) (*kapi.ReplicationController, error) {
99
-			deployment, _ := deployutil.MakeDeployment(deploytest.OkDeploymentConfig(3), kapi.Codec)
100
-			return deployment, nil
86
+		{
87
+			"sequential complete",
88
+			// existing deployments
89
+			[]*kapi.ReplicationController{
90
+				mkd(1, deployapi.DeploymentStatusComplete, 0, 0),
91
+				mkd(2, deployapi.DeploymentStatusComplete, 3, 0),
92
+				mkd(3, deployapi.DeploymentStatusNew, 0, 3),
93
+			},
94
+			// from and to version
95
+			2, 3,
96
+			// expected scale events
97
+			[]scaleEvent{},
101 98
 		},
102
-		listFunc: func(namespace string, selector labels.Selector) (*kapi.ReplicationControllerList, error) {
103
-			deployment1, _ := deployutil.MakeDeployment(deploytest.OkDeploymentConfig(1), kapi.Codec)
104
-			deployment2, _ := deployutil.MakeDeployment(deploytest.OkDeploymentConfig(2), kapi.Codec)
105
-			deployment3, _ := deployutil.MakeDeployment(deploytest.OkDeploymentConfig(4), kapi.Codec)
106
-			deployment4, _ := deployutil.MakeDeployment(deploytest.OkDeploymentConfig(1), kapi.Codec)
107
-			deployment4.Annotations[deployapi.DeploymentConfigAnnotation] = "another-config"
108
-			return &kapi.ReplicationControllerList{
109
-				Items: []kapi.ReplicationController{
110
-					*deployment1,
111
-					*deployment2,
112
-					*deployment3,
113
-					*deployment4,
114
-					{},
115
-				},
116
-			}, nil
99
+		{
100
+			"sequential failure",
101
+			// existing deployments
102
+			[]*kapi.ReplicationController{
103
+				mkd(1, deployapi.DeploymentStatusFailed, 1, 3),
104
+				mkd(2, deployapi.DeploymentStatusFailed, 1, 3),
105
+				mkd(3, deployapi.DeploymentStatusNew, 0, 3),
106
+			},
107
+			// from and to version
108
+			0, 3,
109
+			// expected scale events
110
+			[]scaleEvent{
111
+				{1, 0},
112
+				{2, 0},
113
+			},
117 114
 		},
118 115
 	}
119 116
 
120
-	newDeployment, oldDeployments, err := getDeployerContext(getter, kapi.NamespaceDefault, "deployment")
121
-
122
-	if err != nil {
123
-		t.Fatalf("unexpected error: %v", err)
124
-	}
117
+	for _, s := range scenarios {
118
+		t.Logf("executing scenario %s", s.name)
119
+		findDeployment := func(version int) *kapi.ReplicationController {
120
+			for _, d := range s.deployments {
121
+				if deployutil.DeploymentVersionFor(d) == version {
122
+					return d
123
+				}
124
+			}
125
+			return nil
126
+		}
125 127
 
126
-	if newDeployment == nil {
127
-		t.Fatal("expected deployment")
128
-	}
128
+		var actualFrom, actualTo *kapi.ReplicationController
129
+		var actualDesired int
130
+		to := findDeployment(s.toVersion)
131
+		scaler := &scalertest.FakeScaler{}
132
+
133
+		deployer := &Deployer{
134
+			strategyFor: func(config *deployapi.DeploymentConfig) (strategy.DeploymentStrategy, error) {
135
+				return &testStrategy{
136
+					deployFunc: func(from *kapi.ReplicationController, to *kapi.ReplicationController, desiredReplicas int) error {
137
+						actualFrom = from
138
+						actualTo = to
139
+						actualDesired = desiredReplicas
140
+						return nil
141
+					},
142
+				}, nil
143
+			},
144
+			getDeployment: func(namespace, name string) (*kapi.ReplicationController, error) {
145
+				return to, nil
146
+			},
147
+			getControllers: func(namespace string) (*kapi.ReplicationControllerList, error) {
148
+				list := &kapi.ReplicationControllerList{}
149
+				for _, d := range s.deployments {
150
+					list.Items = append(list.Items, *d)
151
+				}
152
+				return list, nil
153
+			},
154
+			scaler: scaler,
155
+		}
129 156
 
130
-	if oldDeployments == nil {
131
-		t.Fatal("expected non-nil oldDeployments")
132
-	}
157
+		err := deployer.Deploy(to.Namespace, to.Name)
158
+		if err != nil {
159
+			t.Fatalf("unexpcted error: %v", err)
160
+		}
133 161
 
134
-	expected := []string{"config-1", "config-2"}
135
-	for _, e := range expected {
136
-		found := false
137
-		for _, d := range oldDeployments {
138
-			if d.Name == e {
139
-				found = true
140
-				break
162
+		if s.fromVersion > 0 {
163
+			if e, a := s.fromVersion, deployutil.DeploymentVersionFor(actualFrom); e != a {
164
+				t.Fatalf("expected from.latestVersion %d, got %d", e, a)
141 165
 			}
142 166
 		}
143
-		if !found {
144
-			t.Errorf("expected to find old deployment %s", e)
167
+		if e, a := s.toVersion, deployutil.DeploymentVersionFor(actualTo); e != a {
168
+			t.Fatalf("expected to.latestVersion %d, got %d", e, a)
145 169
 		}
146
-	}
147
-	for _, d := range oldDeployments {
148
-		ok := false
149
-		for _, e := range expected {
150
-			if d.Name == e {
151
-				ok = true
152
-				break
153
-			}
170
+		if e, a := len(s.scaleEvents), len(scaler.Events); e != a {
171
+			t.Fatalf("expected %d scale events, got %d", e, a)
154 172
 		}
155
-		if !ok {
156
-			t.Errorf("unexpected old deployment %s", d.Name)
173
+		for _, expected := range s.scaleEvents {
174
+			expectedTo := findDeployment(expected.version)
175
+			expectedWasScaled := false
176
+			for _, actual := range scaler.Events {
177
+				if actual.Name != expectedTo.Name {
178
+					continue
179
+				}
180
+				if e, a := uint(expected.size), actual.Size; e != a {
181
+					t.Fatalf("expected version %d to be scaled to %d, got %d", expected.version, e, a)
182
+				}
183
+				expectedWasScaled = true
184
+			}
185
+			if !expectedWasScaled {
186
+				t.Fatalf("expected version %d to be scaled to %d, but it wasn't scaled at all", expected.version, expected.size)
187
+			}
157 188
 		}
158 189
 	}
159 190
 }
160 191
 
161
-type testReplicationControllerGetter struct {
162
-	getFunc  func(namespace, name string) (*kapi.ReplicationController, error)
163
-	listFunc func(namespace string, selector labels.Selector) (*kapi.ReplicationControllerList, error)
192
+func mkdeployment(version int, status deployapi.DeploymentStatus) *kapi.ReplicationController {
193
+	deployment, _ := deployutil.MakeDeployment(deploytest.OkDeploymentConfig(version), kapi.Codec)
194
+	deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(status)
195
+	return deployment
164 196
 }
165 197
 
166
-func (t *testReplicationControllerGetter) Get(namespace, name string) (*kapi.ReplicationController, error) {
167
-	return t.getFunc(namespace, name)
198
+type testStrategy struct {
199
+	deployFunc func(from *kapi.ReplicationController, to *kapi.ReplicationController, desiredReplicas int) error
168 200
 }
169 201
 
170
-func (t *testReplicationControllerGetter) List(namespace string, selector labels.Selector) (*kapi.ReplicationControllerList, error) {
171
-	return t.listFunc(namespace, selector)
202
+func (t *testStrategy) Deploy(from *kapi.ReplicationController, to *kapi.ReplicationController, desiredReplicas int) error {
203
+	return t.deployFunc(from, to, desiredReplicas)
172 204
 }
... ...
@@ -2,6 +2,7 @@ package deploymentconfig
2 2
 
3 3
 import (
4 4
 	"fmt"
5
+	"sort"
5 6
 	"strconv"
6 7
 
7 8
 	"github.com/golang/glog"
... ...
@@ -116,21 +117,19 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig)
116 116
 		return fatalError(fmt.Sprintf("couldn't make Deployment from (potentially invalid) DeploymentConfig %s: %v", deployutil.LabelForDeploymentConfig(config), err))
117 117
 	}
118 118
 
119
-	// Compute the desired replicas for the deployment. The count should match
120
-	// the existing deployment replica count. To find this, simply sum the
121
-	// replicas of existing deployments for this config. Any deactivated
122
-	// deployments should already be scaled down to zero, and so the sum should
123
-	// reflect the count of the latest active deployment.
124
-	//
125
-	// If there are no existing deployments, use the replica count from the
126
-	// config template.
119
+	// Compute the desired replicas for the deployment. Use the last completed
120
+	// deployment's current replica count, or the config template if there is no
121
+	// prior completed deployment available.
127 122
 	desiredReplicas := config.Template.ControllerTemplate.Replicas
128 123
 	if len(existingDeployments.Items) > 0 {
129
-		desiredReplicas = 0
124
+		sort.Sort(deployutil.DeploymentsByLatestVersionDesc(existingDeployments.Items))
130 125
 		for _, existing := range existingDeployments.Items {
131
-			desiredReplicas += existing.Spec.Replicas
126
+			if deployutil.DeploymentStatusFor(&existing) == deployapi.DeploymentStatusComplete {
127
+				desiredReplicas = existing.Spec.Replicas
128
+				glog.V(4).Infof("Desired replicas for %s set to %d based on prior completed deployment %s", deployutil.LabelForDeploymentConfig(config), desiredReplicas, existing.Name)
129
+				break
130
+			}
132 131
 		}
133
-		glog.V(4).Infof("Desired replicas for %s adjusted to %d based on %d existing deployments", deployutil.LabelForDeploymentConfig(config), desiredReplicas, len(existingDeployments.Items))
134 132
 	}
135 133
 	deployment.Annotations[deployapi.DesiredReplicasAnnotation] = strconv.Itoa(desiredReplicas)
136 134
 
... ...
@@ -4,7 +4,6 @@ import (
4 4
 	"fmt"
5 5
 	"reflect"
6 6
 	"sort"
7
-	"strconv"
8 7
 	"testing"
9 8
 
10 9
 	kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
... ...
@@ -81,6 +80,7 @@ func TestHandle_updateOk(t *testing.T) {
81 81
 	type existing struct {
82 82
 		version  int
83 83
 		replicas int
84
+		status   deployapi.DeploymentStatus
84 85
 	}
85 86
 
86 87
 	type scenario struct {
... ...
@@ -90,14 +90,29 @@ func TestHandle_updateOk(t *testing.T) {
90 90
 	}
91 91
 
92 92
 	scenarios := []scenario{
93
-		// No existing deployments
94 93
 		{1, 1, []existing{}},
95
-		// A single existing deployment
96
-		{2, 1, []existing{{1, 1}}},
97
-		// An active and deactivated existing deployment
98
-		{3, 2, []existing{{2, 2}, {1, 0}}},
99
-		// An active and deactivated existing deployment with weird ordering
100
-		{4, 3, []existing{{1, 0}, {2, 0}, {3, 3}}},
94
+		{2, 1, []existing{
95
+			{1, 1, deployapi.DeploymentStatusComplete},
96
+		}},
97
+		{3, 4, []existing{
98
+			{1, 0, deployapi.DeploymentStatusComplete},
99
+			{2, 4, deployapi.DeploymentStatusComplete},
100
+		}},
101
+		{3, 4, []existing{
102
+			{1, 4, deployapi.DeploymentStatusComplete},
103
+			{2, 1, deployapi.DeploymentStatusFailed},
104
+		}},
105
+		{4, 2, []existing{
106
+			{1, 0, deployapi.DeploymentStatusComplete},
107
+			{2, 0, deployapi.DeploymentStatusFailed},
108
+			{3, 2, deployapi.DeploymentStatusComplete},
109
+		}},
110
+		// Scramble the order of the previous to ensure we still get it right.
111
+		{4, 2, []existing{
112
+			{2, 0, deployapi.DeploymentStatusFailed},
113
+			{3, 2, deployapi.DeploymentStatusComplete},
114
+			{1, 0, deployapi.DeploymentStatusComplete},
115
+		}},
101 116
 	}
102 117
 
103 118
 	for _, scenario := range scenarios {
... ...
@@ -107,7 +122,7 @@ func TestHandle_updateOk(t *testing.T) {
107 107
 		for _, e := range scenario.existing {
108 108
 			d, _ := deployutil.MakeDeployment(deploytest.OkDeploymentConfig(e.version), api.Codec)
109 109
 			d.Spec.Replicas = e.replicas
110
-			d.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusComplete)
110
+			d.Annotations[deployapi.DeploymentStatusAnnotation] = string(e.status)
111 111
 			existingDeployments.Items = append(existingDeployments.Items, *d)
112 112
 		}
113 113
 		err := controller.Handle(config)
... ...
@@ -120,8 +135,12 @@ func TestHandle_updateOk(t *testing.T) {
120 120
 			t.Fatalf("unexpected error: %v", err)
121 121
 		}
122 122
 
123
-		if e, a := strconv.Itoa(scenario.expectedReplicas), deployed.Annotations[deployapi.DesiredReplicasAnnotation]; e != a {
124
-			t.Errorf("expected desired replicas %s, got %s", e, a)
123
+		desired, hasDesired := deployutil.DeploymentDesiredReplicas(deployed)
124
+		if !hasDesired {
125
+			t.Fatalf("expected desired replicas")
126
+		}
127
+		if e, a := scenario.expectedReplicas, desired; e != a {
128
+			t.Errorf("expected desired replicas %d, got %d", e, a)
125 129
 		}
126 130
 	}
127 131
 }
128 132
new file mode 100644
... ...
@@ -0,0 +1,25 @@
0
+package test
1
+
2
+import (
3
+	"fmt"
4
+
5
+	"github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl"
6
+)
7
+
8
+type FakeScaler struct {
9
+	Events []ScaleEvent
10
+}
11
+
12
+type ScaleEvent struct {
13
+	Name string
14
+	Size uint
15
+}
16
+
17
+func (t *FakeScaler) Scale(namespace, name string, newSize uint, preconditions *kubectl.ScalePrecondition, retry, wait *kubectl.RetryParams) error {
18
+	t.Events = append(t.Events, ScaleEvent{name, newSize})
19
+	return nil
20
+}
21
+
22
+func (t *FakeScaler) ScaleSimple(namespace, name string, preconditions *kubectl.ScalePrecondition, newSize uint) (string, error) {
23
+	return "error", fmt.Errorf("unexpected call to ScaleSimple")
24
+}
... ...
@@ -6,6 +6,6 @@ import (
6 6
 
7 7
 // DeploymentStrategy knows how to make a deployment active.
8 8
 type DeploymentStrategy interface {
9
-	// Deploy makes a deployment active.
10
-	Deploy(deployment *kapi.ReplicationController, oldDeployments []*kapi.ReplicationController) error
9
+	// Deploy transitions an old deployment to a new one.
10
+	Deploy(from *kapi.ReplicationController, to *kapi.ReplicationController, desiredReplicas int) error
11 11
 }
... ...
@@ -2,14 +2,13 @@ package recreate
2 2
 
3 3
 import (
4 4
 	"fmt"
5
-	"strconv"
6 5
 	"time"
7 6
 
8 7
 	"github.com/golang/glog"
9 8
 
10 9
 	kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
11
-	kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
12 10
 	kclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
11
+	"github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl"
13 12
 	"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
14 13
 	"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
15 14
 
... ...
@@ -19,28 +18,36 @@ import (
19 19
 )
20 20
 
21 21
 // RecreateDeploymentStrategy is a simple strategy appropriate as a default.
22
-// Its behavior is to decrease the replica count of previous deployments to zero,
23
-// and to increase the replica count of the new deployment to 1.
22
+// Its behavior is to scale down the last deployment to 0, and to scale up the
23
+// new deployment to 1.
24 24
 //
25 25
 // A failure to disable any existing deployments will be considered a
26 26
 // deployment failure.
27 27
 type RecreateDeploymentStrategy struct {
28
-	// client is used to interact with ReplicatonControllers.
29
-	client replicationControllerClient
28
+	// getReplicationController knows how to get a replication controller.
29
+	getReplicationController func(namespace, name string) (*kapi.ReplicationController, error)
30
+	// scaler is used to scale replication controllers.
31
+	scaler kubectl.Scaler
30 32
 	// codec is used to decode DeploymentConfigs contained in deployments.
31 33
 	codec runtime.Codec
32 34
 	// hookExecutor can execute a lifecycle hook.
33 35
 	hookExecutor hookExecutor
34
-
36
+	// retryTimeout is how long to wait for the replica count update to succeed
37
+	// before giving up.
35 38
 	retryTimeout time.Duration
36
-	retryPeriod  time.Duration
39
+	// retryPeriod is how often to try updating the replica count.
40
+	retryPeriod time.Duration
37 41
 }
38 42
 
39 43
 // NewRecreateDeploymentStrategy makes a RecreateDeploymentStrategy backed by
40 44
 // a real HookExecutor and client.
41 45
 func NewRecreateDeploymentStrategy(client kclient.Interface, codec runtime.Codec) *RecreateDeploymentStrategy {
46
+	scaler, _ := kubectl.ScalerFor("ReplicationController", kubectl.NewScalerClient(client))
42 47
 	return &RecreateDeploymentStrategy{
43
-		client: &realReplicationControllerClient{client},
48
+		getReplicationController: func(namespace, name string) (*kapi.ReplicationController, error) {
49
+			return client.ReplicationControllers(namespace).Get(name)
50
+		},
51
+		scaler: scaler,
44 52
 		codec:  codec,
45 53
 		hookExecutor: &stratsupport.HookExecutor{
46 54
 			PodClient: &stratsupport.HookExecutorPodClientImpl{
... ...
@@ -52,126 +59,94 @@ func NewRecreateDeploymentStrategy(client kclient.Interface, codec runtime.Codec
52 52
 				},
53 53
 			},
54 54
 		},
55
-		retryTimeout: 10 * time.Second,
55
+		retryTimeout: 120 * time.Second,
56 56
 		retryPeriod:  1 * time.Second,
57 57
 	}
58 58
 }
59 59
 
60 60
 // Deploy makes deployment active and disables oldDeployments.
61
-func (s *RecreateDeploymentStrategy) Deploy(deployment *kapi.ReplicationController, oldDeployments []*kapi.ReplicationController) error {
62
-	var err error
63
-	var deploymentConfig *deployapi.DeploymentConfig
61
+func (s *RecreateDeploymentStrategy) Deploy(from *kapi.ReplicationController, to *kapi.ReplicationController, desiredReplicas int) error {
62
+	return s.DeployWithAcceptor(from, to, desiredReplicas, nil)
63
+}
64 64
 
65
-	if deploymentConfig, err = deployutil.DecodeDeploymentConfig(deployment, s.codec); err != nil {
66
-		return fmt.Errorf("couldn't decode DeploymentConfig from Deployment %s: %v", deployment.Name, err)
65
+// DeployWithAcceptor scales down from and then scales up to. If
66
+// updateAcceptor is provided and the desired replica count is >1, the first
67
+// replica of to is rolled out and validated before performing the full scale
68
+// up.
69
+//
70
+// This is currently only used in conjunction with the rolling update strategy
71
+// for initial deployments.
72
+func (s *RecreateDeploymentStrategy) DeployWithAcceptor(from *kapi.ReplicationController, to *kapi.ReplicationController, desiredReplicas int, updateAcceptor kubectl.UpdateAcceptor) error {
73
+	config, err := deployutil.DecodeDeploymentConfig(to, s.codec)
74
+	if err != nil {
75
+		return fmt.Errorf("couldn't decode config from deployment %s: %v", to.Name, err)
67 76
 	}
68 77
 
69
-	params := deploymentConfig.Template.Strategy.RecreateParams
78
+	params := config.Template.Strategy.RecreateParams
79
+	retryParams := kubectl.NewRetryParams(s.retryPeriod, s.retryTimeout)
80
+	waitParams := kubectl.NewRetryParams(s.retryPeriod, s.retryTimeout)
81
+
70 82
 	// Execute any pre-hook.
71 83
 	if params != nil && params.Pre != nil {
72
-		err := s.hookExecutor.Execute(params.Pre, deployment, "prehook")
73
-		if err != nil {
84
+		if err := s.hookExecutor.Execute(params.Pre, to, "prehook"); err != nil {
74 85
 			return fmt.Errorf("Pre hook failed: %s", err)
86
+		} else {
87
+			glog.Infof("Pre hook finished")
75 88
 		}
76 89
 	}
77 90
 
78
-	// Prefer to use an explicitly set desired replica count, falling back to
79
-	// the value defined on the config.
80
-	desiredReplicas := deploymentConfig.Template.ControllerTemplate.Replicas
81
-	if desired, hasDesired := deployment.Annotations[deployapi.DesiredReplicasAnnotation]; hasDesired {
82
-		val, err := strconv.Atoi(desired)
91
+	// Scale down the from deployment.
92
+	if from != nil {
93
+		_, err := s.scaleAndWait(from, 0, retryParams, waitParams)
83 94
 		if err != nil {
84
-			util.HandleError(fmt.Errorf("deployment has an invalid desired replica count '%s'; falling back to config value %d", desired, desiredReplicas))
85
-		} else {
86
-			glog.V(4).Infof("Deployment has an explicit desired replica count %d", val)
87
-			desiredReplicas = val
95
+			return fmt.Errorf("couldn't scale down 'from' deployment %s: %v", deployutil.LabelForDeployment(from), err)
88 96
 		}
89
-	} else {
90
-		glog.V(4).Infof("Deployment has no explicit desired replica count; using the config value %d", desiredReplicas)
91 97
 	}
92 98
 
93
-	// Disable any old deployments.
94
-	glog.V(4).Infof("Found %d prior deployments to disable", len(oldDeployments))
95
-	allProcessed := true
96
-	for _, oldDeployment := range oldDeployments {
97
-		if err = s.updateReplicas(oldDeployment.Namespace, oldDeployment.Name, 0); err != nil {
98
-			util.HandleError(fmt.Errorf("%v", err))
99
-			allProcessed = false
99
+	// If an UpdateAcceptor is provided and we're trying to scale up to more
100
+	// than one replica, scale up to 1 and validate the replica, aborting if the
101
+	// replica isn't acceptable.
102
+	if updateAcceptor != nil && desiredReplicas > 1 {
103
+		glog.Infof("Validating first replica of %s", to.Name)
104
+		updatedTo, err := s.scaleAndWait(to, 1, retryParams, waitParams)
105
+		if err != nil {
106
+			return err
100 107
 		}
108
+		if err := updateAcceptor.Accept(updatedTo); err != nil {
109
+			return fmt.Errorf("First replica rejected for %s: %v", to.Name, err)
110
+		}
111
+		to = updatedTo
101 112
 	}
102 113
 
103
-	if !allProcessed {
104
-		return fmt.Errorf("failed to disable all prior deployments for new Deployment %s", deployment.Name)
105
-	}
106
-
107
-	// Scale up the new deployment.
108
-	if err = s.updateReplicas(deployment.Namespace, deployment.Name, desiredReplicas); err != nil {
114
+	// Complete the scale up.
115
+	updatedTo, err := s.scaleAndWait(to, desiredReplicas, retryParams, waitParams)
116
+	if err != nil {
109 117
 		return err
110 118
 	}
119
+	to = updatedTo
111 120
 
112 121
 	// Execute any post-hook. Errors are logged and ignored.
113 122
 	if params != nil && params.Post != nil {
114
-		err := s.hookExecutor.Execute(params.Post, deployment, "posthook")
115
-		if err != nil {
123
+		if err := s.hookExecutor.Execute(params.Post, to, "posthook"); err != nil {
116 124
 			util.HandleError(fmt.Errorf("post hook failed: %s", err))
117 125
 		} else {
118 126
 			glog.Infof("Post hook finished")
119 127
 		}
120 128
 	}
121 129
 
122
-	glog.Infof("Deployment %s successfully made active", deployment.Name)
130
+	glog.Infof("Deployment %s successfully made active", to.Name)
123 131
 	return nil
124 132
 }
125 133
 
126
-// updateReplicas attempts to set the given deployment's replicaCount using retry logic.
127
-func (s *RecreateDeploymentStrategy) updateReplicas(namespace, name string, replicaCount int) error {
128
-	var err error
129
-	var deployment *kapi.ReplicationController
130
-
131
-	timeout := time.After(s.retryTimeout)
132
-	for {
133
-		select {
134
-		case <-timeout:
135
-			return fmt.Errorf("couldn't successfully update Deployment %s/%s replica count to %d (timeout exceeded)", namespace, name, replicaCount)
136
-		default:
137
-			if deployment, err = s.client.getReplicationController(namespace, name); err != nil {
138
-				util.HandleError(fmt.Errorf("couldn't get Deployment %s/%s: %v", namespace, name, err))
139
-			} else {
140
-				deployment.Spec.Replicas = replicaCount
141
-				glog.V(4).Infof("Updating Deployment %s/%s replica count to %d", namespace, name, replicaCount)
142
-				if _, err = s.client.updateReplicationController(namespace, deployment); err == nil {
143
-					return nil
144
-				}
145
-				// For conflict errors, retry immediately
146
-				if kerrors.IsConflict(err) {
147
-					continue
148
-				}
149
-				util.HandleError(fmt.Errorf("error updating Deployment %s/%s replica count to %d: %v", namespace, name, replicaCount, err))
150
-			}
151
-
152
-			time.Sleep(s.retryPeriod)
153
-		}
134
+func (s *RecreateDeploymentStrategy) scaleAndWait(deployment *kapi.ReplicationController, replicas int, retry *kubectl.RetryParams, wait *kubectl.RetryParams) (*kapi.ReplicationController, error) {
135
+	if err := s.scaler.Scale(deployment.Namespace, deployment.Name, uint(replicas), &kubectl.ScalePrecondition{-1, ""}, retry, wait); err != nil {
136
+		return nil, err
154 137
 	}
155
-}
156
-
157
-// replicationControllerClient provides access to ReplicationControllers.
158
-type replicationControllerClient interface {
159
-	getReplicationController(namespace, name string) (*kapi.ReplicationController, error)
160
-	updateReplicationController(namespace string, ctrl *kapi.ReplicationController) (*kapi.ReplicationController, error)
161
-}
162
-
163
-// realReplicationControllerClient is a replicationControllerClient which uses
164
-// a Kube client.
165
-type realReplicationControllerClient struct {
166
-	client kclient.Interface
167
-}
168
-
169
-func (r *realReplicationControllerClient) getReplicationController(namespace string, name string) (*kapi.ReplicationController, error) {
170
-	return r.client.ReplicationControllers(namespace).Get(name)
171
-}
172
-
173
-func (r *realReplicationControllerClient) updateReplicationController(namespace string, ctrl *kapi.ReplicationController) (*kapi.ReplicationController, error) {
174
-	return r.client.ReplicationControllers(namespace).Update(ctrl)
138
+	updatedDeployment, err := s.getReplicationController(deployment.Namespace, deployment.Name)
139
+	if err != nil {
140
+		return nil, err
141
+	}
142
+	return updatedDeployment, nil
175 143
 }
176 144
 
177 145
 // hookExecutor knows how to execute a deployment lifecycle hook.
... ...
@@ -10,351 +10,233 @@ import (
10 10
 	api "github.com/openshift/origin/pkg/api/latest"
11 11
 	deployapi "github.com/openshift/origin/pkg/deploy/api"
12 12
 	deploytest "github.com/openshift/origin/pkg/deploy/api/test"
13
+	scalertest "github.com/openshift/origin/pkg/deploy/scaler/test"
13 14
 	deployutil "github.com/openshift/origin/pkg/deploy/util"
14 15
 )
15 16
 
16 17
 func TestRecreate_initialDeployment(t *testing.T) {
17
-	var updatedController *kapi.ReplicationController
18 18
 	var deployment *kapi.ReplicationController
19
+	scaler := &scalertest.FakeScaler{}
19 20
 
20 21
 	strategy := &RecreateDeploymentStrategy{
21 22
 		codec:        api.Codec,
22 23
 		retryTimeout: 1 * time.Second,
23 24
 		retryPeriod:  1 * time.Millisecond,
24
-		client: &testControllerClient{
25
-			getReplicationControllerFunc: func(namespace, name string) (*kapi.ReplicationController, error) {
26
-				return deployment, nil
27
-			},
28
-			updateReplicationControllerFunc: func(namespace string, ctrl *kapi.ReplicationController) (*kapi.ReplicationController, error) {
29
-				updatedController = ctrl
30
-				return ctrl, nil
31
-			},
25
+		getReplicationController: func(namespace, name string) (*kapi.ReplicationController, error) {
26
+			return deployment, nil
32 27
 		},
28
+		scaler: scaler,
33 29
 	}
34 30
 
35
-	// Deployment replicas should follow the config as there's no explicit
36
-	// desired annotation.
37
-	deployment, _ = deployutil.MakeDeployment(deploytest.OkDeploymentConfig(1), kapi.Codec)
38
-	err := strategy.Deploy(deployment, []*kapi.ReplicationController{})
39
-	if err != nil {
40
-		t.Fatalf("unexpected deploy error: %#v", err)
41
-	}
42
-	if e, a := 1, updatedController.Spec.Replicas; e != a {
43
-		t.Fatalf("expected controller replicas to be %d, got %d", e, a)
44
-	}
45
-
46
-	// Deployment replicas should follow the explicit annotation.
47 31
 	deployment, _ = deployutil.MakeDeployment(deploytest.OkDeploymentConfig(1), kapi.Codec)
48
-	deployment.Annotations[deployapi.DesiredReplicasAnnotation] = "2"
49
-	err = strategy.Deploy(deployment, []*kapi.ReplicationController{})
32
+	err := strategy.Deploy(nil, deployment, 2)
50 33
 	if err != nil {
51 34
 		t.Fatalf("unexpected deploy error: %#v", err)
52 35
 	}
53
-	if e, a := 2, updatedController.Spec.Replicas; e != a {
54
-		t.Fatalf("expected controller replicas to be %d, got %d", e, a)
55
-	}
56 36
 
57
-	// Deployment replicas should follow the config as the explicit value is
58
-	// invalid.
59
-	deployment, _ = deployutil.MakeDeployment(deploytest.OkDeploymentConfig(1), kapi.Codec)
60
-	deployment.Annotations[deployapi.DesiredReplicasAnnotation] = "invalid"
61
-	err = strategy.Deploy(deployment, []*kapi.ReplicationController{})
62
-	if err != nil {
63
-		t.Fatalf("unexpected deploy error: %#v", err)
37
+	if e, a := 1, len(scaler.Events); e != a {
38
+		t.Fatalf("expected %s scale calls, got %d", e, a)
64 39
 	}
65
-	if e, a := 1, updatedController.Spec.Replicas; e != a {
66
-		t.Fatalf("expected controller replicas to be %d, got %d", e, a)
40
+	if e, a := uint(2), scaler.Events[0].Size; e != a {
41
+		t.Errorf("expected scale up to %d, got %d", e, a)
67 42
 	}
68 43
 }
69 44
 
70
-func TestRecreate_secondDeploymentWithSuccessfulRetries(t *testing.T) {
71
-	updatedControllers := make(map[string]*kapi.ReplicationController)
72
-	oldDeployment, _ := deployutil.MakeDeployment(deploytest.OkDeploymentConfig(1), kapi.Codec)
73
-	newConfig := deploytest.OkDeploymentConfig(2)
74
-	newDeployment, _ := deployutil.MakeDeployment(newConfig, kapi.Codec)
75
-
76
-	errorCounts := map[string]int{}
45
+func TestRecreate_deploymentPreHookSuccess(t *testing.T) {
46
+	config := deploytest.OkDeploymentConfig(1)
47
+	config.Template.Strategy.RecreateParams = recreateParams(deployapi.LifecycleHookFailurePolicyAbort, "")
48
+	deployment, _ := deployutil.MakeDeployment(config, kapi.Codec)
49
+	scaler := &scalertest.FakeScaler{}
77 50
 
51
+	hookExecuted := false
78 52
 	strategy := &RecreateDeploymentStrategy{
79 53
 		codec:        api.Codec,
80 54
 		retryTimeout: 1 * time.Second,
81 55
 		retryPeriod:  1 * time.Millisecond,
82
-		client: &testControllerClient{
83
-			getReplicationControllerFunc: func(namespace, name string) (*kapi.ReplicationController, error) {
84
-				switch name {
85
-				case oldDeployment.Name:
86
-					return oldDeployment, nil
87
-				case newDeployment.Name:
88
-					return newDeployment, nil
89
-				default:
90
-					t.Fatalf("unexpected call to getReplicationController: %s/%s", namespace, name)
91
-					return nil, nil
92
-				}
93
-			},
94
-			updateReplicationControllerFunc: func(namespace string, ctrl *kapi.ReplicationController) (*kapi.ReplicationController, error) {
95
-				if errorCounts[ctrl.Name] < 3 {
96
-					errorCounts[ctrl.Name] = errorCounts[ctrl.Name] + 1
97
-					return nil, fmt.Errorf("test error %d", errorCounts[ctrl.Name])
98
-				}
99
-				updatedControllers[ctrl.Name] = ctrl
100
-				return ctrl, nil
56
+		getReplicationController: func(namespace, name string) (*kapi.ReplicationController, error) {
57
+			return deployment, nil
58
+		},
59
+		hookExecutor: &hookExecutorImpl{
60
+			executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, label string) error {
61
+				hookExecuted = true
62
+				return nil
101 63
 			},
102 64
 		},
65
+		scaler: scaler,
103 66
 	}
104 67
 
105
-	err := strategy.Deploy(newDeployment, []*kapi.ReplicationController{oldDeployment})
106
-
68
+	err := strategy.Deploy(nil, deployment, 2)
107 69
 	if err != nil {
108 70
 		t.Fatalf("unexpected deploy error: %#v", err)
109 71
 	}
110
-
111
-	if e, a := 0, updatedControllers[oldDeployment.Name].Spec.Replicas; e != a {
112
-		t.Fatalf("expected old controller replicas to be %d, got %d", e, a)
113
-	}
114
-
115
-	if e, a := 1, updatedControllers[newDeployment.Name].Spec.Replicas; e != a {
116
-		t.Fatalf("expected new controller replicas to be %d, got %d", e, a)
72
+	if !hookExecuted {
73
+		t.Fatalf("exepcted hook execution")
117 74
 	}
118 75
 }
119 76
 
120
-func TestRecreate_secondDeploymentScaleUpRetries(t *testing.T) {
121
-	updatedControllers := make(map[string]*kapi.ReplicationController)
122
-	oldDeployment, _ := deployutil.MakeDeployment(deploytest.OkDeploymentConfig(1), kapi.Codec)
123
-	newConfig := deploytest.OkDeploymentConfig(2)
124
-	newDeployment, _ := deployutil.MakeDeployment(newConfig, kapi.Codec)
77
+func TestRecreate_deploymentPreHookFail(t *testing.T) {
78
+	config := deploytest.OkDeploymentConfig(1)
79
+	config.Template.Strategy.RecreateParams = recreateParams(deployapi.LifecycleHookFailurePolicyAbort, "")
80
+	deployment, _ := deployutil.MakeDeployment(config, kapi.Codec)
81
+	scaler := &scalertest.FakeScaler{}
125 82
 
126 83
 	strategy := &RecreateDeploymentStrategy{
127 84
 		codec:        api.Codec,
128
-		retryTimeout: 1 * time.Millisecond,
85
+		retryTimeout: 1 * time.Second,
129 86
 		retryPeriod:  1 * time.Millisecond,
130
-		client: &testControllerClient{
131
-			getReplicationControllerFunc: func(namespace, name string) (*kapi.ReplicationController, error) {
132
-				switch name {
133
-				case oldDeployment.Name:
134
-					return oldDeployment, nil
135
-				case newDeployment.Name:
136
-					return newDeployment, nil
137
-				default:
138
-					t.Fatalf("unexpected call to getReplicationController: %s/%s", namespace, name)
139
-					return nil, nil
140
-				}
141
-			},
142
-			updateReplicationControllerFunc: func(namespace string, ctrl *kapi.ReplicationController) (*kapi.ReplicationController, error) {
143
-				return nil, fmt.Errorf("update failure")
144
-			},
87
+		getReplicationController: func(namespace, name string) (*kapi.ReplicationController, error) {
88
+			return deployment, nil
145 89
 		},
146
-	}
147
-
148
-	err := strategy.Deploy(newDeployment, []*kapi.ReplicationController{oldDeployment})
149
-
150
-	if err == nil {
151
-		t.Fatalf("expected a deploy error: %#v", err)
152
-	}
153
-
154
-	if len(updatedControllers) > 0 {
155
-		t.Fatalf("unexpected controller updates: %v", updatedControllers)
156
-	}
157
-}
158
-
159
-func TestRecreate_secondDeploymentScaleDownRetries(t *testing.T) {
160
-	updatedControllers := make(map[string]*kapi.ReplicationController)
161
-	oldDeployment, _ := deployutil.MakeDeployment(deploytest.OkDeploymentConfig(1), kapi.Codec)
162
-	newConfig := deploytest.OkDeploymentConfig(2)
163
-	newDeployment, _ := deployutil.MakeDeployment(newConfig, kapi.Codec)
164
-
165
-	strategy := &RecreateDeploymentStrategy{
166
-		codec:        api.Codec,
167
-		retryTimeout: 1 * time.Millisecond,
168
-		retryPeriod:  1 * time.Millisecond,
169
-		client: &testControllerClient{
170
-			getReplicationControllerFunc: func(namespace, name string) (*kapi.ReplicationController, error) {
171
-				switch name {
172
-				case oldDeployment.Name:
173
-					return oldDeployment, nil
174
-				case newDeployment.Name:
175
-					return newDeployment, nil
176
-				default:
177
-					t.Fatalf("unexpected call to getReplicationController: %s/%s", namespace, name)
178
-					return nil, nil
179
-				}
180
-			},
181
-			updateReplicationControllerFunc: func(namespace string, ctrl *kapi.ReplicationController) (*kapi.ReplicationController, error) {
182
-				switch ctrl.Name {
183
-				case newDeployment.Name:
184
-					return newDeployment, nil
185
-				case oldDeployment.Name:
186
-					return nil, fmt.Errorf("update error")
187
-				default:
188
-					t.Fatalf("unexpected call to getReplicationController: %s/%s", namespace, ctrl.Name)
189
-					return nil, nil
190
-				}
90
+		hookExecutor: &hookExecutorImpl{
91
+			executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, label string) error {
92
+				return fmt.Errorf("hook execution failure")
191 93
 			},
192 94
 		},
95
+		scaler: scaler,
193 96
 	}
194 97
 
195
-	err := strategy.Deploy(newDeployment, []*kapi.ReplicationController{oldDeployment})
196
-
98
+	err := strategy.Deploy(nil, deployment, 2)
197 99
 	if err == nil {
198
-		t.Fatalf("expected a deploy error: %#v", err)
100
+		t.Fatalf("expected a deploy error")
199 101
 	}
200
-
201
-	if len(updatedControllers) > 0 {
202
-		t.Fatalf("unexpected controller updates: %v", updatedControllers)
102
+	if len(scaler.Events) > 0 {
103
+		t.Fatalf("unexpected scaling events: %v", scaler.Events)
203 104
 	}
204 105
 }
205 106
 
206
-func TestRecreate_deploymentPreHookSuccess(t *testing.T) {
207
-	var updatedController *kapi.ReplicationController
107
+func TestRecreate_deploymentPostHookSuccess(t *testing.T) {
208 108
 	config := deploytest.OkDeploymentConfig(1)
209
-	config.Template.Strategy.RecreateParams = recreateParams(deployapi.LifecycleHookFailurePolicyAbort, "")
109
+	config.Template.Strategy.RecreateParams = recreateParams("", deployapi.LifecycleHookFailurePolicyAbort)
210 110
 	deployment, _ := deployutil.MakeDeployment(config, kapi.Codec)
111
+	scaler := &scalertest.FakeScaler{}
211 112
 
113
+	hookExecuted := false
212 114
 	strategy := &RecreateDeploymentStrategy{
213 115
 		codec:        api.Codec,
214 116
 		retryTimeout: 1 * time.Second,
215 117
 		retryPeriod:  1 * time.Millisecond,
216
-		client: &testControllerClient{
217
-			getReplicationControllerFunc: func(namespace, name string) (*kapi.ReplicationController, error) {
218
-				return deployment, nil
219
-			},
220
-			updateReplicationControllerFunc: func(namespace string, ctrl *kapi.ReplicationController) (*kapi.ReplicationController, error) {
221
-				updatedController = ctrl
222
-				return ctrl, nil
223
-			},
118
+		getReplicationController: func(namespace, name string) (*kapi.ReplicationController, error) {
119
+			return deployment, nil
224 120
 		},
225 121
 		hookExecutor: &hookExecutorImpl{
226 122
 			executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, label string) error {
123
+				hookExecuted = true
227 124
 				return nil
228 125
 			},
229 126
 		},
127
+		scaler: scaler,
230 128
 	}
231 129
 
232
-	err := strategy.Deploy(deployment, []*kapi.ReplicationController{})
233
-
130
+	err := strategy.Deploy(nil, deployment, 2)
234 131
 	if err != nil {
235 132
 		t.Fatalf("unexpected deploy error: %#v", err)
236 133
 	}
237
-
238
-	if updatedController == nil {
239
-		t.Fatalf("expected a ReplicationController")
240
-	}
241
-
242
-	if e, a := 1, updatedController.Spec.Replicas; e != a {
243
-		t.Fatalf("expected controller replicas to be %d, got %d", e, a)
134
+	if !hookExecuted {
135
+		t.Fatalf("exepcted hook execution")
244 136
 	}
245 137
 }
246 138
 
247
-func TestRecreate_deploymentPreHookFail(t *testing.T) {
139
+func TestRecreate_deploymentPostHookFail(t *testing.T) {
248 140
 	config := deploytest.OkDeploymentConfig(1)
249
-	config.Template.Strategy.RecreateParams = recreateParams(deployapi.LifecycleHookFailurePolicyAbort, "")
141
+	config.Template.Strategy.RecreateParams = recreateParams("", deployapi.LifecycleHookFailurePolicyAbort)
250 142
 	deployment, _ := deployutil.MakeDeployment(config, kapi.Codec)
143
+	scaler := &scalertest.FakeScaler{}
251 144
 
145
+	hookExecuted := false
252 146
 	strategy := &RecreateDeploymentStrategy{
253 147
 		codec:        api.Codec,
254 148
 		retryTimeout: 1 * time.Second,
255 149
 		retryPeriod:  1 * time.Millisecond,
256
-		client: &testControllerClient{
257
-			getReplicationControllerFunc: func(namespace, name string) (*kapi.ReplicationController, error) {
258
-				t.Fatalf("unexpected call to getReplicationController")
259
-				return deployment, nil
260
-			},
261
-			updateReplicationControllerFunc: func(namespace string, ctrl *kapi.ReplicationController) (*kapi.ReplicationController, error) {
262
-				t.Fatalf("unexpected call to updateReplicationController")
263
-				return ctrl, nil
264
-			},
150
+		getReplicationController: func(namespace, name string) (*kapi.ReplicationController, error) {
151
+			return deployment, nil
265 152
 		},
266 153
 		hookExecutor: &hookExecutorImpl{
267 154
 			executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, label string) error {
268
-				return fmt.Errorf("hook execution failure")
155
+				hookExecuted = true
156
+				return fmt.Errorf("post hook failure")
269 157
 			},
270 158
 		},
159
+		scaler: scaler,
271 160
 	}
272 161
 
273
-	err := strategy.Deploy(deployment, []*kapi.ReplicationController{})
274
-	if err == nil {
275
-		t.Fatalf("expected deploy error: %v", err)
162
+	err := strategy.Deploy(nil, deployment, 2)
163
+	if err != nil {
164
+		t.Fatalf("unexpected deploy error: %#v", err)
165
+	}
166
+	if !hookExecuted {
167
+		t.Fatalf("exepcted hook execution")
276 168
 	}
277 169
 }
278 170
 
279
-func TestRecreate_deploymentPostHookSuccess(t *testing.T) {
280
-	var updatedController *kapi.ReplicationController
281
-	config := deploytest.OkDeploymentConfig(1)
282
-	config.Template.Strategy.RecreateParams = recreateParams("", deployapi.LifecycleHookFailurePolicyAbort)
283
-	deployment, _ := deployutil.MakeDeployment(config, kapi.Codec)
171
+func TestRecreate_acceptorSuccess(t *testing.T) {
172
+	var deployment *kapi.ReplicationController
173
+	scaler := &scalertest.FakeScaler{}
284 174
 
285 175
 	strategy := &RecreateDeploymentStrategy{
286 176
 		codec:        api.Codec,
287 177
 		retryTimeout: 1 * time.Second,
288 178
 		retryPeriod:  1 * time.Millisecond,
289
-		client: &testControllerClient{
290
-			getReplicationControllerFunc: func(namespace, name string) (*kapi.ReplicationController, error) {
291
-				return deployment, nil
292
-			},
293
-			updateReplicationControllerFunc: func(namespace string, ctrl *kapi.ReplicationController) (*kapi.ReplicationController, error) {
294
-				updatedController = ctrl
295
-				return ctrl, nil
296
-			},
297
-		},
298
-		hookExecutor: &hookExecutorImpl{
299
-			executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, label string) error {
300
-				return nil
301
-			},
179
+		getReplicationController: func(namespace, name string) (*kapi.ReplicationController, error) {
180
+			return deployment, nil
302 181
 		},
182
+		scaler: scaler,
303 183
 	}
304 184
 
305
-	err := strategy.Deploy(deployment, []*kapi.ReplicationController{})
185
+	acceptor := &testAcceptor{
186
+		acceptFn: func(deployment *kapi.ReplicationController) error {
187
+			return nil
188
+		},
189
+	}
306 190
 
191
+	deployment, _ = deployutil.MakeDeployment(deploytest.OkDeploymentConfig(1), kapi.Codec)
192
+	err := strategy.DeployWithAcceptor(nil, deployment, 2, acceptor)
307 193
 	if err != nil {
308 194
 		t.Fatalf("unexpected deploy error: %#v", err)
309 195
 	}
310 196
 
311
-	if updatedController == nil {
312
-		t.Fatalf("expected a ReplicationController")
197
+	if e, a := 2, len(scaler.Events); e != a {
198
+		t.Fatalf("expected %s scale calls, got %d", e, a)
313 199
 	}
314
-
315
-	if e, a := 1, updatedController.Spec.Replicas; e != a {
316
-		t.Fatalf("expected controller replicas to be %d, got %d", e, a)
200
+	if e, a := uint(1), scaler.Events[0].Size; e != a {
201
+		t.Errorf("expected scale up to %d, got %d", e, a)
202
+	}
203
+	if e, a := uint(2), scaler.Events[1].Size; e != a {
204
+		t.Errorf("expected scale up to %d, got %d", e, a)
317 205
 	}
318 206
 }
319 207
 
320
-func TestRecreate_deploymentPostHookFailureIgnored(t *testing.T) {
321
-	var updatedController *kapi.ReplicationController
322
-	config := deploytest.OkDeploymentConfig(1)
323
-	config.Template.Strategy.RecreateParams = recreateParams("", deployapi.LifecycleHookFailurePolicyIgnore)
324
-	deployment, _ := deployutil.MakeDeployment(config, kapi.Codec)
208
+func TestRecreate_acceptorFail(t *testing.T) {
209
+	var deployment *kapi.ReplicationController
210
+	scaler := &scalertest.FakeScaler{}
325 211
 
326 212
 	strategy := &RecreateDeploymentStrategy{
327 213
 		codec:        api.Codec,
328 214
 		retryTimeout: 1 * time.Second,
329 215
 		retryPeriod:  1 * time.Millisecond,
330
-		client: &testControllerClient{
331
-			getReplicationControllerFunc: func(namespace, name string) (*kapi.ReplicationController, error) {
332
-				return deployment, nil
333
-			},
334
-			updateReplicationControllerFunc: func(namespace string, ctrl *kapi.ReplicationController) (*kapi.ReplicationController, error) {
335
-				updatedController = ctrl
336
-				return ctrl, nil
337
-			},
338
-		},
339
-		hookExecutor: &hookExecutorImpl{
340
-			executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, label string) error {
341
-				return fmt.Errorf("hook execution failure")
342
-			},
216
+		getReplicationController: func(namespace, name string) (*kapi.ReplicationController, error) {
217
+			return deployment, nil
343 218
 		},
219
+		scaler: scaler,
344 220
 	}
345 221
 
346
-	err := strategy.Deploy(deployment, []*kapi.ReplicationController{})
347
-
348
-	if err != nil {
349
-		t.Fatalf("unexpected deploy error: %#v", err)
222
+	acceptor := &testAcceptor{
223
+		acceptFn: func(deployment *kapi.ReplicationController) error {
224
+			return fmt.Errorf("rejected")
225
+		},
350 226
 	}
351 227
 
352
-	if updatedController == nil {
353
-		t.Fatalf("expected a ReplicationController")
228
+	deployment, _ = deployutil.MakeDeployment(deploytest.OkDeploymentConfig(1), kapi.Codec)
229
+	err := strategy.DeployWithAcceptor(nil, deployment, 2, acceptor)
230
+	if err == nil {
231
+		t.Fatalf("expected a deployment failure")
354 232
 	}
233
+	t.Logf("got expected error: %v", err)
355 234
 
356
-	if e, a := 1, updatedController.Spec.Replicas; e != a {
357
-		t.Fatalf("expected controller replicas to be %d, got %d", e, a)
235
+	if e, a := 1, len(scaler.Events); e != a {
236
+		t.Fatalf("expected %s scale calls, got %d", e, a)
237
+	}
238
+	if e, a := uint(1), scaler.Events[0].Size; e != a {
239
+		t.Errorf("expected scale up to %d, got %d", e, a)
358 240
 	}
359 241
 }
360 242
 
... ...
@@ -392,3 +274,11 @@ func (t *testControllerClient) getReplicationController(namespace, name string)
392 392
 func (t *testControllerClient) updateReplicationController(namespace string, ctrl *kapi.ReplicationController) (*kapi.ReplicationController, error) {
393 393
 	return t.updateReplicationControllerFunc(namespace, ctrl)
394 394
 }
395
+
396
+type testAcceptor struct {
397
+	acceptFn func(*kapi.ReplicationController) error
398
+}
399
+
400
+func (t *testAcceptor) Accept(deployment *kapi.ReplicationController) error {
401
+	return t.acceptFn(deployment)
402
+}
... ...
@@ -15,7 +15,6 @@ import (
15 15
 	"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
16 16
 
17 17
 	deployapi "github.com/openshift/origin/pkg/deploy/api"
18
-	"github.com/openshift/origin/pkg/deploy/strategy"
19 18
 	stratsupport "github.com/openshift/origin/pkg/deploy/strategy/support"
20 19
 	deployutil "github.com/openshift/origin/pkg/deploy/util"
21 20
 )
... ...
@@ -40,7 +39,7 @@ const sourceIdAnnotation = "kubectl.kubernetes.io/update-source-id"
40 40
 // [2] https://github.com/GoogleCloudPlatform/kubernetes/issues/7851
41 41
 type RollingDeploymentStrategy struct {
42 42
 	// initialStrategy is used when there are no prior deployments.
43
-	initialStrategy strategy.DeploymentStrategy
43
+	initialStrategy acceptingDeploymentStrategy
44 44
 	// client is used to deal with ReplicationControllers.
45 45
 	client kubectl.RollingUpdaterClient
46 46
 	// rollingUpdate knows how to perform a rolling update.
... ...
@@ -49,10 +48,26 @@ type RollingDeploymentStrategy struct {
49 49
 	codec runtime.Codec
50 50
 	// hookExecutor can execute a lifecycle hook.
51 51
 	hookExecutor hookExecutor
52
+	// getUpdateAcceptor returns an UpdateAcceptor to verify the first replica
53
+	// of the deployment.
54
+	getUpdateAcceptor func(timeout time.Duration) kubectl.UpdateAcceptor
52 55
 }
53 56
 
57
+// acceptingDeploymentStrategy is a DeploymentStrategy which accepts an
58
+// injected UpdateAcceptor as part of the deploy function. This is a hack to
59
+// support using the Recreate strategy for initial deployments and should be
60
+// removed when https://github.com/GoogleCloudPlatform/kubernetes/pull/7183 is
61
+// fixed.
62
+type acceptingDeploymentStrategy interface {
63
+	DeployWithAcceptor(from *kapi.ReplicationController, to *kapi.ReplicationController, desiredReplicas int, updateAcceptor kubectl.UpdateAcceptor) error
64
+}
65
+
66
+// NewFirstContainerReadyInterval is how often to check for container
67
+// readiness with the FirstContainerReady acceptor.
68
+const NewFirstContainerReadyInterval = 1 * time.Second
69
+
54 70
 // NewRollingDeploymentStrategy makes a new RollingDeploymentStrategy.
55
-func NewRollingDeploymentStrategy(namespace string, client kclient.Interface, codec runtime.Codec, initialStrategy strategy.DeploymentStrategy) *RollingDeploymentStrategy {
71
+func NewRollingDeploymentStrategy(namespace string, client kclient.Interface, codec runtime.Codec, initialStrategy acceptingDeploymentStrategy) *RollingDeploymentStrategy {
56 72
 	updaterClient := &rollingUpdaterClient{
57 73
 		ControllerHasDesiredReplicasFn: func(rc *kapi.ReplicationController) wait.ConditionFunc {
58 74
 			return kclient.ControllerHasDesiredReplicas(client, rc)
... ...
@@ -95,32 +110,30 @@ func NewRollingDeploymentStrategy(namespace string, client kclient.Interface, co
95 95
 				},
96 96
 			},
97 97
 		},
98
+		getUpdateAcceptor: func(timeout time.Duration) kubectl.UpdateAcceptor {
99
+			return stratsupport.NewFirstContainerReady(client, timeout, NewFirstContainerReadyInterval)
100
+		},
98 101
 	}
99 102
 }
100 103
 
101
-func (s *RollingDeploymentStrategy) Deploy(deployment *kapi.ReplicationController, oldDeployments []*kapi.ReplicationController) error {
102
-	config, err := deployutil.DecodeDeploymentConfig(deployment, s.codec)
104
+func (s *RollingDeploymentStrategy) Deploy(from *kapi.ReplicationController, to *kapi.ReplicationController, desiredReplicas int) error {
105
+	config, err := deployutil.DecodeDeploymentConfig(to, s.codec)
103 106
 	if err != nil {
104
-		return fmt.Errorf("couldn't decode DeploymentConfig from Deployment %s/%s: %v", deployment.Namespace, deployment.Name, err)
107
+		return fmt.Errorf("couldn't decode DeploymentConfig from deployment %s: %v", deployutil.LabelForDeployment(to), err)
105 108
 	}
106 109
 
107 110
 	params := config.Template.Strategy.RollingParams
108
-
109
-	// Find the latest deployment (if any).
110
-	latest, err := s.findLatestDeployment(oldDeployments)
111
-	if err != nil {
112
-		return fmt.Errorf("couldn't determine latest Deployment: %v", err)
113
-	}
111
+	updateAcceptor := s.getUpdateAcceptor(time.Duration(*params.TimeoutSeconds) * time.Second)
114 112
 
115 113
 	// If there's no prior deployment, delegate to another strategy since the
116 114
 	// rolling updater only supports transitioning between two deployments.
117 115
 	//
118 116
 	// Hook support is duplicated here for now. When the rolling updater can
119 117
 	// handle initial deployments, all of this code can go away.
120
-	if latest == nil {
118
+	if from == nil {
121 119
 		// Execute any pre-hook.
122 120
 		if params.Pre != nil {
123
-			err := s.hookExecutor.Execute(params.Pre, deployment, "prehook")
121
+			err := s.hookExecutor.Execute(params.Pre, to, "prehook")
124 122
 			if err != nil {
125 123
 				return fmt.Errorf("Pre hook failed: %s", err)
126 124
 			}
... ...
@@ -128,14 +141,14 @@ func (s *RollingDeploymentStrategy) Deploy(deployment *kapi.ReplicationControlle
128 128
 		}
129 129
 
130 130
 		// Execute the delegate strategy.
131
-		err := s.initialStrategy.Deploy(deployment, oldDeployments)
131
+		err := s.initialStrategy.DeployWithAcceptor(from, to, desiredReplicas, updateAcceptor)
132 132
 		if err != nil {
133 133
 			return err
134 134
 		}
135 135
 
136 136
 		// Execute any post-hook. Errors are logged and ignored.
137 137
 		if params.Post != nil {
138
-			err := s.hookExecutor.Execute(params.Post, deployment, "posthook")
138
+			err := s.hookExecutor.Execute(params.Post, to, "posthook")
139 139
 			if err != nil {
140 140
 				util.HandleError(fmt.Errorf("post hook failed: %s", err))
141 141
 			} else {
... ...
@@ -150,7 +163,7 @@ func (s *RollingDeploymentStrategy) Deploy(deployment *kapi.ReplicationControlle
150 150
 	// Prepare for a rolling update.
151 151
 	// Execute any pre-hook.
152 152
 	if params.Pre != nil {
153
-		err := s.hookExecutor.Execute(params.Pre, deployment, "prehook")
153
+		err := s.hookExecutor.Execute(params.Pre, to, "prehook")
154 154
 		if err != nil {
155 155
 			return fmt.Errorf("pre hook failed: %s", err)
156 156
 		}
... ...
@@ -162,16 +175,16 @@ func (s *RollingDeploymentStrategy) Deploy(deployment *kapi.ReplicationControlle
162 162
 	//
163 163
 	// Related upstream issue:
164 164
 	// https://github.com/GoogleCloudPlatform/kubernetes/pull/7183
165
-	deployment, err = s.client.GetReplicationController(deployment.Namespace, deployment.Name)
165
+	to, err = s.client.GetReplicationController(to.Namespace, to.Name)
166 166
 	if err != nil {
167
-		return fmt.Errorf("couldn't look up deployment %s: %s", deployutil.LabelForDeployment(deployment))
167
+		return fmt.Errorf("couldn't look up deployment %s: %s", deployutil.LabelForDeployment(to))
168 168
 	}
169
-	if _, hasSourceId := deployment.Annotations[sourceIdAnnotation]; !hasSourceId {
170
-		deployment.Annotations[sourceIdAnnotation] = fmt.Sprintf("%s:%s", latest.Name, latest.ObjectMeta.UID)
171
-		if updated, err := s.client.UpdateReplicationController(deployment.Namespace, deployment); err != nil {
172
-			return fmt.Errorf("couldn't assign source annotation to deployment %s: %v", deployutil.LabelForDeployment(deployment), err)
169
+	if _, hasSourceId := to.Annotations[sourceIdAnnotation]; !hasSourceId {
170
+		to.Annotations[sourceIdAnnotation] = fmt.Sprintf("%s:%s", from.Name, from.ObjectMeta.UID)
171
+		if updated, err := s.client.UpdateReplicationController(to.Namespace, to); err != nil {
172
+			return fmt.Errorf("couldn't assign source annotation to deployment %s: %v", deployutil.LabelForDeployment(to), err)
173 173
 		} else {
174
-			deployment = updated
174
+			to = updated
175 175
 		}
176 176
 	}
177 177
 
... ...
@@ -182,36 +195,34 @@ func (s *RollingDeploymentStrategy) Deploy(deployment *kapi.ReplicationControlle
182 182
 	//
183 183
 	// Related upstream issue:
184 184
 	// https://github.com/GoogleCloudPlatform/kubernetes/pull/7183
185
-	deployment.Spec.Replicas = 1
185
+	to.Spec.Replicas = 1
186 186
 
187
-	glog.Infof("OldRc: %s, replicas=%d", latest.Name, latest.Spec.Replicas)
188 187
 	// Perform a rolling update.
189 188
 	rollingConfig := &kubectl.RollingUpdaterConfig{
190
-		Out:           &rollingUpdaterWriter{},
191
-		OldRc:         latest,
192
-		NewRc:         deployment,
193
-		UpdatePeriod:  time.Duration(*params.UpdatePeriodSeconds) * time.Second,
194
-		Interval:      time.Duration(*params.IntervalSeconds) * time.Second,
195
-		Timeout:       time.Duration(*params.TimeoutSeconds) * time.Second,
196
-		CleanupPolicy: kubectl.PreserveRollingUpdateCleanupPolicy,
189
+		Out:            &rollingUpdaterWriter{},
190
+		OldRc:          from,
191
+		NewRc:          to,
192
+		UpdatePeriod:   time.Duration(*params.UpdatePeriodSeconds) * time.Second,
193
+		Interval:       time.Duration(*params.IntervalSeconds) * time.Second,
194
+		Timeout:        time.Duration(*params.TimeoutSeconds) * time.Second,
195
+		CleanupPolicy:  kubectl.PreserveRollingUpdateCleanupPolicy,
196
+		UpdateAcceptor: updateAcceptor,
197 197
 	}
198
-	glog.Infof("Starting rolling update with DeploymentConfig: %#v (UpdatePeriod %d, Interval %d, Timeout %d) (UpdatePeriodSeconds %d, IntervalSeconds %d, TimeoutSeconds %d)",
199
-		rollingConfig,
200
-		rollingConfig.UpdatePeriod,
201
-		rollingConfig.Interval,
202
-		rollingConfig.Timeout,
198
+	glog.Infof("Starting rolling update from %s to %s (desired replicas: %d, UpdatePeriodSeconds=%d, IntervalSeconds=%d, TimeoutSeconds=%d)",
199
+		deployutil.LabelForDeployment(from),
200
+		deployutil.LabelForDeployment(to),
201
+		desiredReplicas,
203 202
 		*params.UpdatePeriodSeconds,
204 203
 		*params.IntervalSeconds,
205 204
 		*params.TimeoutSeconds,
206 205
 	)
207
-	err = s.rollingUpdate(rollingConfig)
208
-	if err != nil {
206
+	if err := s.rollingUpdate(rollingConfig); err != nil {
209 207
 		return err
210 208
 	}
211 209
 
212 210
 	// Execute any post-hook. Errors are logged and ignored.
213 211
 	if params.Post != nil {
214
-		err := s.hookExecutor.Execute(params.Post, deployment, "posthook")
212
+		err := s.hookExecutor.Execute(params.Post, to, "posthook")
215 213
 		if err != nil {
216 214
 			util.HandleError(fmt.Errorf("Post hook failed: %s", err))
217 215
 		} else {
... ...
@@ -222,28 +233,6 @@ func (s *RollingDeploymentStrategy) Deploy(deployment *kapi.ReplicationControlle
222 222
 	return nil
223 223
 }
224 224
 
225
-// findLatestDeployment retrieves deployments identified by oldDeployments and
226
-// returns the latest one from the list, or nil if there are no old
227
-// deployments.
228
-func (s *RollingDeploymentStrategy) findLatestDeployment(oldDeployments []*kapi.ReplicationController) (*kapi.ReplicationController, error) {
229
-	// Find the latest deployment from the list of old deployments.
230
-	var latest *kapi.ReplicationController
231
-	latestVersion := 0
232
-	for _, deployment := range oldDeployments {
233
-		version := deployutil.DeploymentVersionFor(deployment)
234
-		if version > latestVersion {
235
-			latest = deployment
236
-			latestVersion = version
237
-		}
238
-	}
239
-	if latest != nil {
240
-		glog.Infof("Found latest Deployment %s", latest.Name)
241
-	} else {
242
-		glog.Info("No latest Deployment found")
243
-	}
244
-	return latest, nil
245
-}
246
-
247 225
 type rollingUpdaterClient struct {
248 226
 	GetReplicationControllerFn     func(namespace, name string) (*kapi.ReplicationController, error)
249 227
 	UpdateReplicationControllerFn  func(namespace string, rc *kapi.ReplicationController) (*kapi.ReplicationController, error)
... ...
@@ -6,7 +6,7 @@ import (
6 6
 	"time"
7 7
 
8 8
 	kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
9
-	kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
9
+	// kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
10 10
 	"github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl"
11 11
 
12 12
 	api "github.com/openshift/origin/pkg/api/latest"
... ...
@@ -27,7 +27,7 @@ func TestRolling_deployInitial(t *testing.T) {
27 27
 			},
28 28
 		},
29 29
 		initialStrategy: &testStrategy{
30
-			deployFn: func(deployment *kapi.ReplicationController, oldDeployments []*kapi.ReplicationController) error {
30
+			deployFn: func(from *kapi.ReplicationController, to *kapi.ReplicationController, desiredReplicas int, updateAcceptor kubectl.UpdateAcceptor) error {
31 31
 				initialStrategyInvoked = true
32 32
 				return nil
33 33
 			},
... ...
@@ -36,12 +36,13 @@ func TestRolling_deployInitial(t *testing.T) {
36 36
 			t.Fatalf("unexpected call to rollingUpdate")
37 37
 			return nil
38 38
 		},
39
+		getUpdateAcceptor: getUpdateAcceptor,
39 40
 	}
40 41
 
41 42
 	config := deploytest.OkDeploymentConfig(1)
42 43
 	config.Template.Strategy = deploytest.OkRollingStrategy()
43 44
 	deployment, _ := deployutil.MakeDeployment(config, kapi.Codec)
44
-	err := strategy.Deploy(deployment, []*kapi.ReplicationController{})
45
+	err := strategy.Deploy(nil, deployment, 2)
45 46
 	if err != nil {
46 47
 		t.Fatalf("unexpected error: %v", err)
47 48
 	}
... ...
@@ -80,7 +81,7 @@ func TestRolling_deployRolling(t *testing.T) {
80 80
 			},
81 81
 		},
82 82
 		initialStrategy: &testStrategy{
83
-			deployFn: func(deployment *kapi.ReplicationController, oldDeployments []*kapi.ReplicationController) error {
83
+			deployFn: func(from *kapi.ReplicationController, to *kapi.ReplicationController, desiredReplicas int, updateAcceptor kubectl.UpdateAcceptor) error {
84 84
 				t.Fatalf("unexpected call to initial strategy")
85 85
 				return nil
86 86
 			},
... ...
@@ -89,9 +90,10 @@ func TestRolling_deployRolling(t *testing.T) {
89 89
 			rollingConfig = config
90 90
 			return nil
91 91
 		},
92
+		getUpdateAcceptor: getUpdateAcceptor,
92 93
 	}
93 94
 
94
-	err := strategy.Deploy(deployment, []*kapi.ReplicationController{latest})
95
+	err := strategy.Deploy(latest, deployment, 2)
95 96
 	if err != nil {
96 97
 		t.Fatalf("unexpected error: %v", err)
97 98
 	}
... ...
@@ -134,74 +136,6 @@ func TestRolling_deployRolling(t *testing.T) {
134 134
 	}
135 135
 }
136 136
 
137
-func TestRolling_findLatestDeployment(t *testing.T) {
138
-	deployments := map[string]*kapi.ReplicationController{}
139
-	for i := 1; i <= 10; i++ {
140
-		config := deploytest.OkDeploymentConfig(i)
141
-		config.Template.Strategy = deploytest.OkRollingStrategy()
142
-		deployment, _ := deployutil.MakeDeployment(config, kapi.Codec)
143
-		deployments[deployment.Name] = deployment
144
-	}
145
-
146
-	strategy := &RollingDeploymentStrategy{
147
-		codec: api.Codec,
148
-		client: &rollingUpdaterClient{
149
-			GetReplicationControllerFn: func(namespace, name string) (*kapi.ReplicationController, error) {
150
-				deployment, found := deployments[name]
151
-				if !found {
152
-					return nil, kerrors.NewNotFound("ReplicationController", name)
153
-				}
154
-				return deployment, nil
155
-			},
156
-		},
157
-	}
158
-
159
-	type scenario struct {
160
-		old    []string
161
-		latest string
162
-	}
163
-
164
-	scenarios := []scenario{
165
-		{
166
-			old: []string{
167
-				"config-1",
168
-				"config-2",
169
-				"config-3",
170
-			},
171
-			latest: "config-3",
172
-		},
173
-		{
174
-			old: []string{
175
-				"config-3",
176
-				"config-1",
177
-				"config-7",
178
-			},
179
-			latest: "config-7",
180
-		},
181
-	}
182
-
183
-	for _, scenario := range scenarios {
184
-		old := []*kapi.ReplicationController{}
185
-		for _, oldName := range scenario.old {
186
-			old = append(old, deployments[oldName])
187
-		}
188
-		found, err := strategy.findLatestDeployment(old)
189
-		if err != nil {
190
-			t.Errorf("unexpected error for scenario: %v: %v", scenario, err)
191
-			continue
192
-		}
193
-
194
-		if found == nil {
195
-			t.Errorf("expected to find a deployment for scenario: %v", scenario)
196
-			continue
197
-		}
198
-
199
-		if e, a := scenario.latest, found.Name; e != a {
200
-			t.Errorf("expected latest %s, got %s for scenario: %v", e, a, scenario)
201
-		}
202
-	}
203
-}
204
-
205 137
 func TestRolling_deployRollingHooks(t *testing.T) {
206 138
 	config := deploytest.OkDeploymentConfig(1)
207 139
 	config.Template.Strategy = deploytest.OkRollingStrategy()
... ...
@@ -222,7 +156,7 @@ func TestRolling_deployRollingHooks(t *testing.T) {
222 222
 			},
223 223
 		},
224 224
 		initialStrategy: &testStrategy{
225
-			deployFn: func(deployment *kapi.ReplicationController, oldDeployments []*kapi.ReplicationController) error {
225
+			deployFn: func(from *kapi.ReplicationController, to *kapi.ReplicationController, desiredReplicas int, updateAcceptor kubectl.UpdateAcceptor) error {
226 226
 				t.Fatalf("unexpected call to initial strategy")
227 227
 				return nil
228 228
 			},
... ...
@@ -235,6 +169,7 @@ func TestRolling_deployRollingHooks(t *testing.T) {
235 235
 				return hookError
236 236
 			},
237 237
 		},
238
+		getUpdateAcceptor: getUpdateAcceptor,
238 239
 	}
239 240
 
240 241
 	cases := []struct {
... ...
@@ -257,7 +192,7 @@ func TestRolling_deployRollingHooks(t *testing.T) {
257 257
 		if tc.hookShouldFail {
258 258
 			hookError = fmt.Errorf("hook failure")
259 259
 		}
260
-		err := strategy.Deploy(deployment, []*kapi.ReplicationController{latest})
260
+		err := strategy.Deploy(latest, deployment, 2)
261 261
 		if err != nil && tc.deploymentShouldFail {
262 262
 			t.Logf("got expected error: %v", err)
263 263
 		}
... ...
@@ -287,7 +222,7 @@ func TestRolling_deployInitialHooks(t *testing.T) {
287 287
 			},
288 288
 		},
289 289
 		initialStrategy: &testStrategy{
290
-			deployFn: func(deployment *kapi.ReplicationController, oldDeployments []*kapi.ReplicationController) error {
290
+			deployFn: func(from *kapi.ReplicationController, to *kapi.ReplicationController, desiredReplicas int, updateAcceptor kubectl.UpdateAcceptor) error {
291 291
 				return nil
292 292
 			},
293 293
 		},
... ...
@@ -299,6 +234,7 @@ func TestRolling_deployInitialHooks(t *testing.T) {
299 299
 				return hookError
300 300
 			},
301 301
 		},
302
+		getUpdateAcceptor: getUpdateAcceptor,
302 303
 	}
303 304
 
304 305
 	cases := []struct {
... ...
@@ -320,7 +256,7 @@ func TestRolling_deployInitialHooks(t *testing.T) {
320 320
 		if tc.hookShouldFail {
321 321
 			hookError = fmt.Errorf("hook failure")
322 322
 		}
323
-		err := strategy.Deploy(deployment, []*kapi.ReplicationController{})
323
+		err := strategy.Deploy(nil, deployment, 2)
324 324
 		if err != nil && tc.deploymentShouldFail {
325 325
 			t.Logf("got expected error: %v", err)
326 326
 		}
... ...
@@ -334,11 +270,11 @@ func TestRolling_deployInitialHooks(t *testing.T) {
334 334
 }
335 335
 
336 336
 type testStrategy struct {
337
-	deployFn func(deployment *kapi.ReplicationController, oldDeployments []*kapi.ReplicationController) error
337
+	deployFn func(from *kapi.ReplicationController, to *kapi.ReplicationController, desiredReplicas int, updateAcceptor kubectl.UpdateAcceptor) error
338 338
 }
339 339
 
340
-func (s *testStrategy) Deploy(deployment *kapi.ReplicationController, oldDeployments []*kapi.ReplicationController) error {
341
-	return s.deployFn(deployment, oldDeployments)
340
+func (s *testStrategy) DeployWithAcceptor(from *kapi.ReplicationController, to *kapi.ReplicationController, desiredReplicas int, updateAcceptor kubectl.UpdateAcceptor) error {
341
+	return s.deployFn(from, to, desiredReplicas, updateAcceptor)
342 342
 }
343 343
 
344 344
 func mkintp(i int) *int64 {
... ...
@@ -370,3 +306,19 @@ func rollingParams(preFailurePolicy, postFailurePolicy deployapi.LifecycleHookFa
370 370
 		Post:                post,
371 371
 	}
372 372
 }
373
+
374
+func getUpdateAcceptor(timeout time.Duration) kubectl.UpdateAcceptor {
375
+	return &testAcceptor{
376
+		acceptFn: func(deployment *kapi.ReplicationController) error {
377
+			return nil
378
+		},
379
+	}
380
+}
381
+
382
+type testAcceptor struct {
383
+	acceptFn func(*kapi.ReplicationController) error
384
+}
385
+
386
+func (t *testAcceptor) Accept(deployment *kapi.ReplicationController) error {
387
+	return t.acceptFn(deployment)
388
+}
... ...
@@ -13,6 +13,7 @@ import (
13 13
 	"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
14 14
 	"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
15 15
 	"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
16
+	"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
16 17
 	"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
17 18
 
18 19
 	deployapi "github.com/openshift/origin/pkg/deploy/api"
... ...
@@ -203,3 +204,118 @@ func NewPodWatch(client kclient.Interface, namespace, name, resourceVersion stri
203 203
 		return obj.(*kapi.Pod)
204 204
 	}
205 205
 }
206
+
207
+func NewFirstContainerReady(kclient kclient.Interface, timeout time.Duration, interval time.Duration) *FirstContainerReady {
208
+	return &FirstContainerReady{
209
+		timeout:  timeout,
210
+		interval: interval,
211
+		podsForDeployment: func(deployment *kapi.ReplicationController) (*kapi.PodList, error) {
212
+			selector := labels.Set(deployment.Spec.Selector).AsSelector()
213
+			return kclient.Pods(deployment.Namespace).List(selector, fields.Everything())
214
+		},
215
+		getPodStore: func(namespace, name string) (cache.Store, chan struct{}) {
216
+			sel, _ := fields.ParseSelector("metadata.name=" + name)
217
+			store := cache.NewStore(cache.MetaNamespaceKeyFunc)
218
+			lw := &deployutil.ListWatcherImpl{
219
+				ListFunc: func() (runtime.Object, error) {
220
+					return kclient.Pods(namespace).List(labels.Everything(), sel)
221
+				},
222
+				WatchFunc: func(resourceVersion string) (watch.Interface, error) {
223
+					return kclient.Pods(namespace).Watch(labels.Everything(), sel, resourceVersion)
224
+				},
225
+			}
226
+			stop := make(chan struct{})
227
+			cache.NewReflector(lw, &kapi.Pod{}, store, 10*time.Second).RunUntil(stop)
228
+			return store, stop
229
+		},
230
+	}
231
+}
232
+
233
+type FirstContainerReady struct {
234
+	podsForDeployment func(*kapi.ReplicationController) (*kapi.PodList, error)
235
+	getPodStore       func(namespace, name string) (cache.Store, chan struct{})
236
+	timeout           time.Duration
237
+	interval          time.Duration
238
+}
239
+
240
+func (c *FirstContainerReady) Accept(deployment *kapi.ReplicationController) error {
241
+	// For now, only validate the first replica.
242
+	if deployment.Spec.Replicas != 1 {
243
+		glog.Infof("automatically accepting deployment %s with %d replicas", deployutil.LabelForDeployment(deployment), deployment.Spec.Replicas)
244
+		return nil
245
+	}
246
+
247
+	// Try and find the pod for the deployment.
248
+	pods, err := c.podsForDeployment(deployment)
249
+	if err != nil {
250
+		return fmt.Errorf("couldn't get pods for deployment %s: %v", deployutil.LabelForDeployment(deployment), err)
251
+	}
252
+	if len(pods.Items) == 0 {
253
+		return fmt.Errorf("no pods found for deployment %s", deployutil.LabelForDeployment(deployment))
254
+	}
255
+
256
+	// If we found multiple, use the first one and log a warning.
257
+	// TODO: should finding multiple be an error?
258
+	pod := &pods.Items[0]
259
+	if len(pods.Items) > 1 {
260
+		glog.Infof("Warning: more than one pod for deployment %s; basing canary check on the first pod '%s'", deployutil.LabelForDeployment(deployment), pod.Name)
261
+	}
262
+
263
+	// Make a pod store to poll and ensure it gets cleaned up.
264
+	podStore, stopStore := c.getPodStore(pod.Namespace, pod.Name)
265
+	defer close(stopStore)
266
+
267
+	// Track container readiness based on those defined in the spec.
268
+	observedContainers := map[string]bool{}
269
+	for _, container := range pod.Spec.Containers {
270
+		observedContainers[container.Name] = false
271
+	}
272
+
273
+	// Start checking for pod updates.
274
+	glog.V(0).Infof("Waiting for pod %s/%s container readiness", pod.Namespace, pod.Name)
275
+	return wait.Poll(c.interval, c.timeout, func() (done bool, err error) {
276
+		// Get the latest state of the pod.
277
+		obj, exists, err := podStore.Get(pod)
278
+		// Try again later on error or if the pod isn't available yet.
279
+		if err != nil {
280
+			glog.V(0).Infof("Error getting pod %s/%s to inspect container readiness: %v", pod.Namespace, pod.Name, err)
281
+			return false, nil
282
+		}
283
+		if !exists {
284
+			glog.V(0).Infof("Couldn't find pod %s/%s to inspect container readiness", pod.Namespace, pod.Name)
285
+			return false, nil
286
+		}
287
+		// New pod state is available; update the observed ready status of any
288
+		// containers.
289
+		updatedPod := obj.(*kapi.Pod)
290
+		for _, status := range updatedPod.Status.ContainerStatuses {
291
+			// Ignore any containers which aren't defined in the deployment spec.
292
+			if _, known := observedContainers[status.Name]; !known {
293
+				glog.V(0).Infof("Ignoring readiness of container %s in pod %s/%s because it's not present in the pod spec", status.Name, pod.Namespace, pod.Name)
294
+				continue
295
+			}
296
+			// The status of the container could be transient; we only care if it
297
+			// was ever ready. If it was ready and then became not ready, we
298
+			// consider it ready.
299
+			if status.Ready {
300
+				observedContainers[status.Name] = true
301
+			}
302
+		}
303
+		// Check whether all containers have been observed as ready.
304
+		allReady := true
305
+		for _, ready := range observedContainers {
306
+			if !ready {
307
+				allReady = false
308
+				break
309
+			}
310
+		}
311
+		// If all containers have been ready once, return success.
312
+		if allReady {
313
+			glog.V(0).Infof("All containers ready for %s/%s", pod.Namespace, pod.Name)
314
+			return true, nil
315
+		}
316
+		// Otherwise, try again later.
317
+		glog.V(4).Infof("Still waiting for pod %s/%s container readiness; observed statuses: #%v", pod.Namespace, pod.Name, observedContainers)
318
+		return false, nil
319
+	})
320
+}
... ...
@@ -3,9 +3,11 @@ package support
3 3
 import (
4 4
 	"fmt"
5 5
 	"testing"
6
+	"time"
6 7
 
7 8
 	kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
8 9
 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
10
+	"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
9 11
 
10 12
 	deployapi "github.com/openshift/origin/pkg/deploy/api"
11 13
 	deploytest "github.com/openshift/origin/pkg/deploy/api/test"
... ...
@@ -260,3 +262,94 @@ func TestHookExecutor_makeHookPodRestart(t *testing.T) {
260 260
 		t.Errorf("expected pod restart policy %s, got %s", e, a)
261 261
 	}
262 262
 }
263
+
264
+func TestFirstContainerReady_scenarios(t *testing.T) {
265
+	type containerReady struct {
266
+		name  string
267
+		ready bool
268
+	}
269
+	scenarios := []struct {
270
+		name             string
271
+		specContainers   []string
272
+		initialReadiness []containerReady
273
+		updatedReadiness []containerReady
274
+		accept           bool
275
+	}{
276
+		{
277
+			"all ready",
278
+			[]string{"1", "2"},
279
+			[]containerReady{{"1", false}, {"2", false}},
280
+			[]containerReady{{"1", true}, {"2", true}},
281
+			true,
282
+		},
283
+		{
284
+			"none ready",
285
+			[]string{"1", "2"},
286
+			[]containerReady{{"1", false}, {"2", false}},
287
+			[]containerReady{{"1", false}, {"2", false}},
288
+			false,
289
+		},
290
+		{
291
+			"some ready",
292
+			[]string{"1", "2"},
293
+			[]containerReady{{"1", false}, {"2", false}},
294
+			[]containerReady{{"1", true}, {"2", false}},
295
+			false,
296
+		},
297
+	}
298
+	for _, s := range scenarios {
299
+		t.Logf("running scenario: %s", s.name)
300
+		mkpod := func(name string, readiness []containerReady) kapi.Pod {
301
+			containers := []kapi.Container{}
302
+			for _, c := range s.specContainers {
303
+				containers = append(containers, kapi.Container{Name: c})
304
+			}
305
+			containerStatuses := []kapi.ContainerStatus{}
306
+			for _, r := range readiness {
307
+				containerStatuses = append(containerStatuses, kapi.ContainerStatus{Name: r.name, Ready: r.ready})
308
+			}
309
+			return kapi.Pod{
310
+				ObjectMeta: kapi.ObjectMeta{
311
+					Name: name,
312
+				},
313
+				Spec: kapi.PodSpec{
314
+					Containers: containers,
315
+				},
316
+				Status: kapi.PodStatus{
317
+					ContainerStatuses: containerStatuses,
318
+				},
319
+			}
320
+		}
321
+		store := cache.NewStore(cache.MetaNamespaceKeyFunc)
322
+		ready := &FirstContainerReady{
323
+			podsForDeployment: func(deployment *kapi.ReplicationController) (*kapi.PodList, error) {
324
+				return &kapi.PodList{
325
+					Items: []kapi.Pod{
326
+						mkpod(deployment.Name+"-pod", s.initialReadiness),
327
+					},
328
+				}, nil
329
+			},
330
+			getPodStore: func(namespace, name string) (cache.Store, chan struct{}) {
331
+				return store, make(chan struct{})
332
+			},
333
+			timeout:  10 * time.Millisecond,
334
+			interval: 1 * time.Millisecond,
335
+		}
336
+
337
+		deployment, _ := deployutil.MakeDeployment(deploytest.OkDeploymentConfig(1), kapi.Codec)
338
+		deployment.Spec.Replicas = 1
339
+		pod := mkpod(deployment.Name+"-pod", s.updatedReadiness)
340
+		store.Add(&pod)
341
+
342
+		err := ready.Accept(deployment)
343
+
344
+		if s.accept && err != nil {
345
+			t.Fatalf("unexpected error: %v", err)
346
+		}
347
+		if !s.accept && err == nil {
348
+			t.Fatalf("expected an error")
349
+		} else {
350
+			t.Logf("got expected error: %s", err)
351
+		}
352
+	}
353
+}
... ...
@@ -300,3 +300,30 @@ func mappedAnnotationFor(obj runtime.Object, key string) string {
300 300
 	}
301 301
 	return ""
302 302
 }
303
+
304
+// DeploymentsByLatestVersionAsc sorts deployments by LatestVersion ascending.
305
+type DeploymentsByLatestVersionAsc []api.ReplicationController
306
+
307
+func (d DeploymentsByLatestVersionAsc) Len() int {
308
+	return len(d)
309
+}
310
+func (d DeploymentsByLatestVersionAsc) Swap(i, j int) {
311
+	d[i], d[j] = d[j], d[i]
312
+}
313
+func (d DeploymentsByLatestVersionAsc) Less(i, j int) bool {
314
+	return DeploymentVersionFor(&d[i]) < DeploymentVersionFor(&d[j])
315
+}
316
+
317
+// DeploymentsByLatestVersionAsc sorts deployments by LatestVersion
318
+// descending.
319
+type DeploymentsByLatestVersionDesc []api.ReplicationController
320
+
321
+func (d DeploymentsByLatestVersionDesc) Len() int {
322
+	return len(d)
323
+}
324
+func (d DeploymentsByLatestVersionDesc) Swap(i, j int) {
325
+	d[i], d[j] = d[j], d[i]
326
+}
327
+func (d DeploymentsByLatestVersionDesc) Less(i, j int) bool {
328
+	return DeploymentVersionFor(&d[j]) < DeploymentVersionFor(&d[i])
329
+}
... ...
@@ -1,6 +1,7 @@
1 1
 package util
2 2
 
3 3
 import (
4
+	"sort"
4 5
 	"strconv"
5 6
 	"testing"
6 7
 
... ...
@@ -122,3 +123,28 @@ func TestMakeDeploymentOk(t *testing.T) {
122 122
 		t.Fatalf("expected selector DeploymentLabel=%s, got %s", e, a)
123 123
 	}
124 124
 }
125
+
126
+func TestDeploymentsByLatestVersion_sorting(t *testing.T) {
127
+	mkdeployment := func(version int) kapi.ReplicationController {
128
+		deployment, _ := MakeDeployment(deploytest.OkDeploymentConfig(version), kapi.Codec)
129
+		return *deployment
130
+	}
131
+	deployments := []kapi.ReplicationController{
132
+		mkdeployment(4),
133
+		mkdeployment(1),
134
+		mkdeployment(2),
135
+		mkdeployment(3),
136
+	}
137
+	sort.Sort(DeploymentsByLatestVersionAsc(deployments))
138
+	for i := 0; i < 4; i++ {
139
+		if e, a := i+1, DeploymentVersionFor(&deployments[i]); e != a {
140
+			t.Errorf("expected deployment[%d]=%d, got %d", i, e, a)
141
+		}
142
+	}
143
+	sort.Sort(DeploymentsByLatestVersionDesc(deployments))
144
+	for i := 0; i < 4; i++ {
145
+		if e, a := 4-i, DeploymentVersionFor(&deployments[i]); e != a {
146
+			t.Errorf("expected deployment[%d]=%d, got %d", i, e, a)
147
+		}
148
+	}
149
+}