Browse code

deploy: use shared caches in the dc controller

Michail Kargakis authored on 2016/06/20 23:55:00
Showing 6 changed files
... ...
@@ -342,14 +342,15 @@ func (c *MasterConfig) RunDeployerPodController() {
342 342
 
343 343
 // RunDeploymentConfigController starts the deployment config controller process.
344 344
 func (c *MasterConfig) RunDeploymentConfigController() {
345
+	dcInfomer := c.Informers.DeploymentConfigs().Informer()
346
+	rcInformer := c.Informers.ReplicationControllers().Informer()
345 347
 	osclient, kclient := c.DeploymentConfigControllerClients()
346
-	factory := deployconfigcontroller.DeploymentConfigControllerFactory{
347
-		Client:     osclient,
348
-		KubeClient: kclient,
349
-		Codec:      c.EtcdHelper.Codec(),
350
-	}
351
-	controller := factory.Create()
352
-	controller.Run()
348
+
349
+	controller := deployconfigcontroller.NewDeploymentConfigController(dcInfomer, rcInformer, osclient, kclient, c.EtcdHelper.Codec())
350
+	// TODO: Make the stop channel actually work.
351
+	stopCh := make(chan struct{})
352
+	// TODO: Make the number of workers configurable.
353
+	go controller.Run(5, stopCh)
353 354
 }
354 355
 
355 356
 // RunDeploymentTriggerController starts the deployment trigger controller process.
... ...
@@ -13,12 +13,16 @@ import (
13 13
 	"k8s.io/kubernetes/pkg/watch"
14 14
 
15 15
 	oclient "github.com/openshift/origin/pkg/client"
16
+	oscache "github.com/openshift/origin/pkg/client/cache"
17
+	deployapi "github.com/openshift/origin/pkg/deploy/api"
16 18
 )
17 19
 
18 20
 type InformerFactory interface {
19 21
 	Start(stopCh <-chan struct{})
20 22
 
21 23
 	Pods() PodInformer
24
+	ReplicationControllers() ReplicationControllerInformer
25
+	DeploymentConfigs() DeploymentConfigInformer
22 26
 }
23 27
 
24 28
 type PodInformer interface {
... ...
@@ -27,6 +31,18 @@ type PodInformer interface {
27 27
 	Lister() *cache.StoreToPodLister
28 28
 }
29 29
 
30
+type ReplicationControllerInformer interface {
31
+	Informer() framework.SharedIndexInformer
32
+	Indexer() cache.Indexer
33
+	Lister() *cache.StoreToReplicationControllerLister
34
+}
35
+
36
+type DeploymentConfigInformer interface {
37
+	Informer() framework.SharedIndexInformer
38
+	Indexer() cache.Indexer
39
+	Lister() *oscache.StoreToDeploymentConfigLister
40
+}
41
+
30 42
 func NewInformerFactory(kubeClient kclient.Interface, originClient oclient.Interface, defaultResync time.Duration) InformerFactory {
31 43
 	return &sharedInformerFactory{
32 44
 		kubeClient:    kubeClient,
... ...
@@ -57,6 +73,14 @@ func (f *sharedInformerFactory) Pods() PodInformer {
57 57
 	return &podInformer{sharedInformerFactory: f}
58 58
 }
59 59
 
60
+func (f *sharedInformerFactory) ReplicationControllers() ReplicationControllerInformer {
61
+	return &replicationControllerInformer{sharedInformerFactory: f}
62
+}
63
+
64
+func (f *sharedInformerFactory) DeploymentConfigs() DeploymentConfigInformer {
65
+	return &deploymentConfigInformer{sharedInformerFactory: f}
66
+}
67
+
60 68
 type podInformer struct {
61 69
 	*sharedInformerFactory
62 70
 }
... ...
@@ -99,3 +123,89 @@ func (f *podInformer) Lister() *cache.StoreToPodLister {
99 99
 	informer := f.Informer()
100 100
 	return &cache.StoreToPodLister{Indexer: informer.GetIndexer()}
101 101
 }
102
+
103
+type replicationControllerInformer struct {
104
+	*sharedInformerFactory
105
+}
106
+
107
+func (f *replicationControllerInformer) Informer() framework.SharedIndexInformer {
108
+	f.lock.Lock()
109
+	defer f.lock.Unlock()
110
+
111
+	informerObj := &kapi.ReplicationController{}
112
+	informerType := reflect.TypeOf(informerObj)
113
+	informer, exists := f.informers[informerType]
114
+	if exists {
115
+		return informer
116
+	}
117
+
118
+	informer = framework.NewSharedIndexInformer(
119
+		&cache.ListWatch{
120
+			ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
121
+				return f.kubeClient.ReplicationControllers(kapi.NamespaceAll).List(options)
122
+			},
123
+			WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
124
+				return f.kubeClient.ReplicationControllers(kapi.NamespaceAll).Watch(options)
125
+			},
126
+		},
127
+		informerObj,
128
+		f.defaultResync,
129
+		cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
130
+	)
131
+	f.informers[informerType] = informer
132
+
133
+	return informer
134
+}
135
+
136
+func (f *replicationControllerInformer) Indexer() cache.Indexer {
137
+	informer := f.Informer()
138
+	return informer.GetIndexer()
139
+}
140
+
141
+func (f *replicationControllerInformer) Lister() *cache.StoreToReplicationControllerLister {
142
+	informer := f.Informer()
143
+	return &cache.StoreToReplicationControllerLister{Indexer: informer.GetIndexer()}
144
+}
145
+
146
+type deploymentConfigInformer struct {
147
+	*sharedInformerFactory
148
+}
149
+
150
+func (f *deploymentConfigInformer) Informer() framework.SharedIndexInformer {
151
+	f.lock.Lock()
152
+	defer f.lock.Unlock()
153
+
154
+	informerObj := &deployapi.DeploymentConfig{}
155
+	informerType := reflect.TypeOf(informerObj)
156
+	informer, exists := f.informers[informerType]
157
+	if exists {
158
+		return informer
159
+	}
160
+
161
+	informer = framework.NewSharedIndexInformer(
162
+		&cache.ListWatch{
163
+			ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
164
+				return f.originClient.DeploymentConfigs(kapi.NamespaceAll).List(options)
165
+			},
166
+			WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
167
+				return f.originClient.DeploymentConfigs(kapi.NamespaceAll).Watch(options)
168
+			},
169
+		},
170
+		informerObj,
171
+		f.defaultResync,
172
+		cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
173
+	)
174
+	f.informers[informerType] = informer
175
+
176
+	return informer
177
+}
178
+
179
+func (f *deploymentConfigInformer) Indexer() cache.Indexer {
180
+	informer := f.Informer()
181
+	return informer.GetIndexer()
182
+}
183
+
184
+func (f *deploymentConfigInformer) Lister() *oscache.StoreToDeploymentConfigLister {
185
+	informer := f.Informer()
186
+	return &oscache.StoreToDeploymentConfigLister{Indexer: informer.GetIndexer()}
187
+}
... ...
@@ -8,15 +8,26 @@ import (
8 8
 
9 9
 	kapi "k8s.io/kubernetes/pkg/api"
10 10
 	"k8s.io/kubernetes/pkg/api/errors"
11
+	"k8s.io/kubernetes/pkg/client/cache"
11 12
 	"k8s.io/kubernetes/pkg/client/record"
12 13
 	kclient "k8s.io/kubernetes/pkg/client/unversioned"
13 14
 	"k8s.io/kubernetes/pkg/runtime"
15
+	utilruntime "k8s.io/kubernetes/pkg/util/runtime"
16
+	"k8s.io/kubernetes/pkg/util/workqueue"
14 17
 
15 18
 	osclient "github.com/openshift/origin/pkg/client"
19
+	oscache "github.com/openshift/origin/pkg/client/cache"
16 20
 	deployapi "github.com/openshift/origin/pkg/deploy/api"
17 21
 	deployutil "github.com/openshift/origin/pkg/deploy/util"
18 22
 )
19 23
 
24
+// fatalError is an error which can't be retried.
25
+type fatalError string
26
+
27
+func (e fatalError) Error() string {
28
+	return fmt.Sprintf("fatal error handling deployment config: %s", string(e))
29
+}
30
+
20 31
 // DeploymentConfigController is responsible for creating a new deployment
21 32
 // when:
22 33
 //
... ...
@@ -32,48 +43,41 @@ import (
32 32
 // deployments will be cancelled. The controller will not attempt to scale
33 33
 // running deployments.
34 34
 type DeploymentConfigController struct {
35
-	// kubeClient provides acceess to Kube resources.
36
-	kubeClient kclient.Interface
37
-	// osClient provides access to OpenShift resources.
38
-	osClient osclient.Interface
35
+	// dn provides access to deploymentconfigs.
36
+	dn osclient.DeploymentConfigsNamespacer
37
+	// rn provides access to replication controllers.
38
+	rn kclient.ReplicationControllersNamespacer
39
+
40
+	// queue contains deployment configs that need to be synced.
41
+	queue workqueue.RateLimitingInterface
42
+
43
+	// dcStore provides a local cache for deployment configs.
44
+	dcStore oscache.StoreToDeploymentConfigLister
45
+	// rcStore provides a local cache for replication controllers.
46
+	rcStore cache.StoreToReplicationControllerLister
47
+	// dcStoreSynced makes sure the dc store is synced before reconcling any deployment config.
48
+	dcStoreSynced func() bool
49
+	// rcStoreSynced makes sure the rc store is synced before reconcling any deployment config.
50
+	rcStoreSynced func() bool
51
+
39 52
 	// codec is used to build deployments from configs.
40 53
 	codec runtime.Codec
41 54
 	// recorder is used to record events.
42 55
 	recorder record.EventRecorder
43 56
 }
44 57
 
45
-// fatalError is an error which can't be retried.
46
-type fatalError string
47
-
48
-// transientError is an error which should always be retried (indefinitely).
49
-type transientError string
50
-
51
-func (e fatalError) Error() string {
52
-	return fmt.Sprintf("fatal error handling deployment config: %s", string(e))
53
-}
54
-func (e transientError) Error() string {
55
-	return "transient error handling deployment config: " + string(e)
56
-}
57
-
58
-func NewDeploymentConfigController(kubeClient kclient.Interface, osClient osclient.Interface, codec runtime.Codec, recorder record.EventRecorder) *DeploymentConfigController {
59
-	return &DeploymentConfigController{
60
-		kubeClient: kubeClient,
61
-		osClient:   osClient,
62
-		codec:      codec,
63
-		recorder:   recorder,
64
-	}
65
-}
66
-
58
+// Handle implements the loop that processes deployment configs. Since this controller started
59
+// using caches, the provided config MUST be deep-copied beforehand (see work() in factory.go).
67 60
 func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig) error {
68
-	// There's nothing to reconcile until the version is nonzero.
69
-	if config.Status.LatestVersion == 0 {
70
-		glog.V(5).Infof("Waiting for first version of %q", deployutil.LabelForDeploymentConfig(config))
61
+	// There's nothing to reconcile until the version is nonzero or when the
62
+	// deployment config has been marked for deletion.
63
+	if config.Status.LatestVersion == 0 || config.DeletionTimestamp != nil {
71 64
 		return c.updateStatus(config)
72 65
 	}
73 66
 
74
-	// Find all deployments owned by the deploymentConfig.
75
-	sel := deployutil.ConfigSelector(config.Name)
76
-	existingDeployments, err := c.kubeClient.ReplicationControllers(config.Namespace).List(kapi.ListOptions{LabelSelector: sel})
67
+	// Find all deployments owned by the deployment config.
68
+	selector := deployutil.ConfigSelector(config.Name)
69
+	existingDeployments, err := c.rcStore.ReplicationControllers(config.Namespace).List(selector)
77 70
 	if err != nil {
78 71
 		return err
79 72
 	}
... ...
@@ -83,7 +87,7 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig)
83 83
 	// deployments to allow them to be superceded by the new config version.
84 84
 	awaitingCancellations := false
85 85
 	if !latestIsDeployed {
86
-		for _, deployment := range existingDeployments.Items {
86
+		for _, deployment := range existingDeployments {
87 87
 			// Skip deployments with an outcome.
88 88
 			if deployutil.IsTerminatedDeployment(&deployment) {
89 89
 				continue
... ...
@@ -93,7 +97,7 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig)
93 93
 			if !deployutil.IsDeploymentCancelled(&deployment) {
94 94
 				deployment.Annotations[deployapi.DeploymentCancelledAnnotation] = deployapi.DeploymentCancelledAnnotationValue
95 95
 				deployment.Annotations[deployapi.DeploymentStatusReasonAnnotation] = deployapi.DeploymentCancelledNewerDeploymentExists
96
-				_, err := c.kubeClient.ReplicationControllers(deployment.Namespace).Update(&deployment)
96
+				_, err := c.rn.ReplicationControllers(deployment.Namespace).Update(&deployment)
97 97
 				if err != nil {
98 98
 					c.recorder.Eventf(config, kapi.EventTypeWarning, "DeploymentCancellationFailed", "Failed to cancel deployment %q superceded by version %d: %s", deployment.Name, config.Status.LatestVersion, err)
99 99
 				} else {
... ...
@@ -106,8 +110,7 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig)
106 106
 	// deployment to avoid competing with existing deployment processes.
107 107
 	if awaitingCancellations {
108 108
 		c.recorder.Eventf(config, kapi.EventTypeNormal, "DeploymentAwaitingCancellation", "Deployment of version %d awaiting cancellation of older running deployments", config.Status.LatestVersion)
109
-		// raise a transientError so that the deployment config can be re-queued
110
-		return transientError(fmt.Sprintf("found previous inflight deployment for %s - requeuing", deployutil.LabelForDeploymentConfig(config)))
109
+		return fmt.Errorf("found previous inflight deployment for %s - requeuing", deployutil.LabelForDeploymentConfig(config))
111 110
 	}
112 111
 	// If the latest deployment already exists, reconcile existing deployments
113 112
 	// and return early.
... ...
@@ -130,7 +133,7 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig)
130 130
 	if err != nil {
131 131
 		return fatalError(fmt.Sprintf("couldn't make deployment from (potentially invalid) deployment config %s: %v", deployutil.LabelForDeploymentConfig(config), err))
132 132
 	}
133
-	created, err := c.kubeClient.ReplicationControllers(config.Namespace).Create(deployment)
133
+	created, err := c.rn.ReplicationControllers(config.Namespace).Create(deployment)
134 134
 	if err != nil {
135 135
 		// If the deployment was already created, just move on. The cache could be
136 136
 		// stale, or another process could have already handled this update.
... ...
@@ -157,7 +160,7 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig)
157 157
 // directly. To continue supporting that old behavior we must detect when the
158 158
 // deployment has been directly manipulated, and if so, preserve the directly
159 159
 // updated value and sync the config with the deployment.
160
-func (c *DeploymentConfigController) reconcileDeployments(existingDeployments *kapi.ReplicationControllerList, config *deployapi.DeploymentConfig) error {
160
+func (c *DeploymentConfigController) reconcileDeployments(existingDeployments []kapi.ReplicationController, config *deployapi.DeploymentConfig) error {
161 161
 	latestIsDeployed, latestDeployment := deployutil.LatestDeploymentInfo(config, existingDeployments)
162 162
 	if !latestIsDeployed {
163 163
 		// We shouldn't be reconciling if the latest deployment hasn't been
... ...
@@ -226,7 +229,7 @@ func (c *DeploymentConfigController) reconcileDeployments(existingDeployments *k
226 226
 		oldReplicas := config.Spec.Replicas
227 227
 		config.Spec.Replicas = activeReplicas
228 228
 		var err error
229
-		config, err = c.osClient.DeploymentConfigs(config.Namespace).Update(config)
229
+		config, err = c.dn.DeploymentConfigs(config.Namespace).Update(config)
230 230
 		if err != nil {
231 231
 			return err
232 232
 		}
... ...
@@ -235,7 +238,7 @@ func (c *DeploymentConfigController) reconcileDeployments(existingDeployments *k
235 235
 
236 236
 	// Reconcile deployments. The active deployment follows the config, and all
237 237
 	// other deployments should be scaled to zero.
238
-	for _, deployment := range existingDeployments.Items {
238
+	for _, deployment := range existingDeployments {
239 239
 		isActiveDeployment := activeDeployment != nil && deployment.Name == activeDeployment.Name
240 240
 
241 241
 		oldReplicaCount := deployment.Spec.Replicas
... ...
@@ -259,7 +262,7 @@ func (c *DeploymentConfigController) reconcileDeployments(existingDeployments *k
259 259
 			copied.Spec.Replicas = newReplicaCount
260 260
 			copied.Annotations[deployapi.DeploymentReplicasAnnotation] = strconv.Itoa(int(newReplicaCount))
261 261
 
262
-			if _, err := c.kubeClient.ReplicationControllers(copied.Namespace).Update(copied); err != nil {
262
+			if _, err := c.rn.ReplicationControllers(copied.Namespace).Update(copied); err != nil {
263 263
 				c.recorder.Eventf(config, kapi.EventTypeWarning, "DeploymentScaleFailed",
264 264
 					"Failed to scale deployment %q from %d to %d: %v", copied.Name, oldReplicaCount, newReplicaCount, err)
265 265
 				return err
... ...
@@ -278,17 +281,42 @@ func (c *DeploymentConfigController) reconcileDeployments(existingDeployments *k
278 278
 }
279 279
 
280 280
 func (c *DeploymentConfigController) updateStatus(config *deployapi.DeploymentConfig) error {
281
-	if config.Generation > config.Status.ObservedGeneration {
282
-		config.Status.ObservedGeneration = config.Generation
281
+	// NOTE: We should update the status of the deployment config only if we need to, otherwise
282
+	// we hotloop between updates.
283
+	if !needsUpdate(config) {
284
+		return nil
283 285
 	}
284
-	if _, err := c.osClient.DeploymentConfigs(config.Namespace).UpdateStatus(config); err != nil {
286
+	config.Status.ObservedGeneration = config.Generation
287
+	if _, err := c.dn.DeploymentConfigs(config.Namespace).UpdateStatus(config); err != nil {
285 288
 		glog.V(2).Infof("Cannot update the status for %q: %v", deployutil.LabelForDeploymentConfig(config), err)
286
-		return transientError(fmt.Sprintf("cannot update the status for %q - requeuing", deployutil.LabelForDeploymentConfig(config)))
289
+		return err
287 290
 	}
288 291
 	glog.V(4).Infof("Updated the status for %q (observed generation: %d)", deployutil.LabelForDeploymentConfig(config), config.Status.ObservedGeneration)
289 292
 	return nil
290 293
 }
291 294
 
295
+func (c *DeploymentConfigController) handleErr(err error, key interface{}) {
296
+	if err == nil {
297
+		return
298
+	}
299
+	if _, isFatal := err.(fatalError); isFatal {
300
+		utilruntime.HandleError(err)
301
+		c.queue.Forget(key)
302
+		return
303
+	}
304
+
305
+	if c.queue.NumRequeues(key) < 10 {
306
+		c.queue.AddRateLimited(key)
307
+	} else {
308
+		glog.V(2).Infof(err.Error())
309
+		c.queue.Forget(key)
310
+	}
311
+}
312
+
313
+func needsUpdate(config *deployapi.DeploymentConfig) bool {
314
+	return config.Generation > config.Status.ObservedGeneration
315
+}
316
+
292 317
 func deploymentCopy(rc *kapi.ReplicationController) (*kapi.ReplicationController, error) {
293 318
 	objCopy, err := kapi.Scheme.DeepCopy(rc)
294 319
 	if err != nil {
... ...
@@ -4,12 +4,15 @@ import (
4 4
 	"sort"
5 5
 	"strconv"
6 6
 	"testing"
7
+	"time"
7 8
 
8 9
 	kapi "k8s.io/kubernetes/pkg/api"
9
-	"k8s.io/kubernetes/pkg/client/record"
10
+	"k8s.io/kubernetes/pkg/client/cache"
10 11
 	ktestclient "k8s.io/kubernetes/pkg/client/unversioned/testclient"
12
+	"k8s.io/kubernetes/pkg/controller/framework"
11 13
 	"k8s.io/kubernetes/pkg/runtime"
12 14
 	"k8s.io/kubernetes/pkg/util/diff"
15
+	"k8s.io/kubernetes/pkg/watch"
13 16
 
14 17
 	"github.com/openshift/origin/pkg/client/testclient"
15 18
 	deployapi "github.com/openshift/origin/pkg/deploy/api"
... ...
@@ -620,19 +623,15 @@ func TestHandleScenarios(t *testing.T) {
620 620
 		t.Logf("evaluating test: %s", test.name)
621 621
 
622 622
 		deployments := map[string]kapi.ReplicationController{}
623
+		toStore := []kapi.ReplicationController{}
623 624
 		for _, template := range test.before {
624 625
 			deployment := mkdeployment(template)
625 626
 			deployments[deployment.Name] = deployment
627
+			toStore = append(toStore, deployment)
626 628
 		}
627 629
 
630
+		oc := &testclient.Fake{}
628 631
 		kc := &ktestclient.Fake{}
629
-		kc.AddReactor("list", "replicationcontrollers", func(action ktestclient.Action) (handled bool, ret runtime.Object, err error) {
630
-			list := []kapi.ReplicationController{}
631
-			for _, deployment := range deployments {
632
-				list = append(list, deployment)
633
-			}
634
-			return true, &kapi.ReplicationControllerList{Items: list}, nil
635
-		})
636 632
 		kc.AddReactor("create", "replicationcontrollers", func(action ktestclient.Action) (handled bool, ret runtime.Object, err error) {
637 633
 			rc := action.(ktestclient.CreateAction).GetObject().(*kapi.ReplicationController)
638 634
 			deployments[rc.Name] = *rc
... ...
@@ -643,15 +642,38 @@ func TestHandleScenarios(t *testing.T) {
643 643
 			deployments[rc.Name] = *rc
644 644
 			return true, rc, nil
645 645
 		})
646
+		codec := kapi.Codecs.LegacyCodec(deployapi.SchemeGroupVersion)
646 647
 
647
-		oc := &testclient.Fake{}
648
+		dcInformer := framework.NewSharedIndexInformer(
649
+			&cache.ListWatch{
650
+				ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
651
+					return oc.DeploymentConfigs(kapi.NamespaceAll).List(options)
652
+				},
653
+				WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
654
+					return oc.DeploymentConfigs(kapi.NamespaceAll).Watch(options)
655
+				},
656
+			},
657
+			&deployapi.DeploymentConfig{},
658
+			2*time.Minute,
659
+			cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
660
+		)
661
+		rcInformer := framework.NewSharedIndexInformer(
662
+			&cache.ListWatch{
663
+				ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
664
+					return kc.ReplicationControllers(kapi.NamespaceAll).List(options)
665
+				},
666
+				WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
667
+					return kc.ReplicationControllers(kapi.NamespaceAll).Watch(options)
668
+				},
669
+			},
670
+			&kapi.ReplicationController{},
671
+			2*time.Minute,
672
+			cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
673
+		)
674
+		c := NewDeploymentConfigController(dcInformer, rcInformer, oc, kc, codec)
648 675
 
649
-		recorder := &record.FakeRecorder{}
650
-		controller := &DeploymentConfigController{
651
-			kubeClient: kc,
652
-			osClient:   oc,
653
-			codec:      kapi.Codecs.LegacyCodec(deployapi.SchemeGroupVersion),
654
-			recorder:   recorder,
676
+		for i := range toStore {
677
+			c.rcStore.Add(&toStore[i])
655 678
 		}
656 679
 
657 680
 		config := deploytest.OkDeploymentConfig(test.newVersion)
... ...
@@ -659,9 +681,10 @@ func TestHandleScenarios(t *testing.T) {
659 659
 			config = deploytest.TestDeploymentConfig(config)
660 660
 		}
661 661
 		config.Spec.Replicas = test.replicas
662
-		err := controller.Handle(config)
663
-		if err != nil && !test.errExpected {
664
-			t.Fatalf("unexpected error: %s", err)
662
+
663
+		if err := c.Handle(config); err != nil && !test.errExpected {
664
+			t.Errorf("unexpected error: %s", err)
665
+			continue
665 666
 		}
666 667
 
667 668
 		expectedDeployments := []kapi.ReplicationController{}
... ...
@@ -677,22 +700,14 @@ func TestHandleScenarios(t *testing.T) {
677 677
 
678 678
 		if e, a := test.expectedReplicas, config.Spec.Replicas; e != a {
679 679
 			t.Errorf("expected config replicas to be %d, got %d", e, a)
680
-			// TODO: Disable as the recorder.Events is now `chan string`
681
-			//t.Fatalf("events:\n%s", strings.Join(recorder.Events, "\t\n"))
680
+			continue
682 681
 		}
683
-		anyDeploymentMismatches := false
684 682
 		for i := 0; i < len(expectedDeployments); i++ {
685 683
 			expected, actual := expectedDeployments[i], actualDeployments[i]
686 684
 			if !kapi.Semantic.DeepEqual(expected, actual) {
687
-				anyDeploymentMismatches = true
688 685
 				t.Errorf("actual deployment don't match expected: %v", diff.ObjectDiff(expected, actual))
689 686
 			}
690 687
 		}
691
-		if anyDeploymentMismatches {
692
-			// TODO: Disable as the recorder.Events is now `chan string`
693
-			//t.Fatalf("events:\n%s", strings.Join(recorder.Events, "\t\n"))
694
-			t.Fatalf("deployment mismatches detected")
695
-		}
696 688
 	}
697 689
 }
698 690
 
... ...
@@ -9,82 +9,232 @@ import (
9 9
 	"k8s.io/kubernetes/pkg/client/cache"
10 10
 	"k8s.io/kubernetes/pkg/client/record"
11 11
 	kclient "k8s.io/kubernetes/pkg/client/unversioned"
12
+	kcontroller "k8s.io/kubernetes/pkg/controller"
13
+	"k8s.io/kubernetes/pkg/controller/framework"
12 14
 	"k8s.io/kubernetes/pkg/runtime"
13
-	"k8s.io/kubernetes/pkg/util/flowcontrol"
14 15
 	utilruntime "k8s.io/kubernetes/pkg/util/runtime"
15
-	"k8s.io/kubernetes/pkg/watch"
16
+	"k8s.io/kubernetes/pkg/util/wait"
17
+	"k8s.io/kubernetes/pkg/util/workqueue"
16 18
 
17 19
 	osclient "github.com/openshift/origin/pkg/client"
18
-	controller "github.com/openshift/origin/pkg/controller"
19 20
 	deployapi "github.com/openshift/origin/pkg/deploy/api"
20 21
 )
21 22
 
22
-// DeploymentConfigControllerFactory can create a DeploymentConfigController which obtains
23
-// DeploymentConfigs from a queue populated from a watch of all DeploymentConfigs.
24
-type DeploymentConfigControllerFactory struct {
25
-	// Client is an OpenShift client.
26
-	Client osclient.Interface
27
-	// KubeClient is a Kubernetes client.
28
-	KubeClient kclient.Interface
29
-	// Codec is used to encode/decode.
30
-	Codec runtime.Codec
23
+const (
24
+	// We must avoid creating new replication controllers until the {replication controller,pods} store
25
+	// has synced. If it hasn't synced, to avoid a hot loop, we'll wait this long between checks.
26
+	StoreSyncedPollPeriod = 100 * time.Millisecond
27
+)
28
+
29
+// NewDeploymentConfigController creates a new DeploymentConfigController.
30
+func NewDeploymentConfigController(dcInformer, rcInformer framework.SharedIndexInformer, oc osclient.Interface, kc kclient.Interface, codec runtime.Codec) *DeploymentConfigController {
31
+	eventBroadcaster := record.NewBroadcaster()
32
+	eventBroadcaster.StartRecordingToSink(kc.Events(""))
33
+	recorder := eventBroadcaster.NewRecorder(kapi.EventSource{Component: "deploymentconfig-controller"})
34
+
35
+	c := &DeploymentConfigController{
36
+		dn: oc,
37
+		rn: kc,
38
+
39
+		queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
40
+
41
+		recorder: recorder,
42
+		codec:    codec,
43
+	}
44
+
45
+	c.dcStore.Indexer = dcInformer.GetIndexer()
46
+	dcInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
47
+		AddFunc:    c.addDeploymentConfig,
48
+		UpdateFunc: c.updateDeploymentConfig,
49
+		DeleteFunc: c.deleteDeploymentConfig,
50
+	})
51
+
52
+	c.rcStore.Indexer = rcInformer.GetIndexer()
53
+	rcInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
54
+		AddFunc:    c.addReplicationController,
55
+		UpdateFunc: c.updateReplicationController,
56
+		DeleteFunc: c.deleteReplicationController,
57
+	})
58
+
59
+	c.dcStoreSynced = dcInformer.HasSynced
60
+	c.rcStoreSynced = rcInformer.HasSynced
61
+
62
+	return c
31 63
 }
32 64
 
33
-// Create creates a DeploymentConfigController.
34
-func (factory *DeploymentConfigControllerFactory) Create() controller.RunnableController {
35
-	deploymentConfigLW := &cache.ListWatch{
36
-		ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
37
-			return factory.Client.DeploymentConfigs(kapi.NamespaceAll).List(options)
38
-		},
39
-		WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
40
-			return factory.Client.DeploymentConfigs(kapi.NamespaceAll).Watch(options)
41
-		},
65
+// Run begins watching and syncing.
66
+func (c *DeploymentConfigController) Run(workers int, stopCh <-chan struct{}) {
67
+	defer utilruntime.HandleCrash()
68
+
69
+	// Wait for the rc and dc stores to sync before starting any work in this controller.
70
+	ready := make(chan struct{})
71
+	go c.waitForSyncedStores(ready)
72
+	select {
73
+	case <-ready:
74
+	case <-stopCh:
75
+		return
42 76
 	}
43
-	queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
44
-	cache.NewReflector(deploymentConfigLW, &deployapi.DeploymentConfig{}, queue, 2*time.Minute).Run()
45 77
 
46
-	eventBroadcaster := record.NewBroadcaster()
47
-	eventBroadcaster.StartRecordingToSink(factory.KubeClient.Events(""))
48
-	recorder := eventBroadcaster.NewRecorder(kapi.EventSource{Component: "deploymentconfig-controller"})
78
+	for i := 0; i < workers; i++ {
79
+		go wait.Until(c.worker, time.Second, stopCh)
80
+	}
81
+
82
+	<-stopCh
83
+	glog.Infof("Shutting down deploymentconfig controller")
84
+	c.queue.ShutDown()
85
+}
86
+
87
+func (c *DeploymentConfigController) waitForSyncedStores(ready chan struct{}) {
88
+	defer utilruntime.HandleCrash()
89
+
90
+	for !c.dcStoreSynced() || !c.rcStoreSynced() {
91
+		glog.V(4).Infof("Waiting for the dc and rc controllers to sync before starting the deployment config controller workers")
92
+		time.Sleep(StoreSyncedPollPeriod)
93
+	}
94
+	close(ready)
95
+}
96
+
97
+func (c *DeploymentConfigController) addDeploymentConfig(obj interface{}) {
98
+	dc := obj.(*deployapi.DeploymentConfig)
99
+	glog.V(4).Infof("Adding deployment config %q", dc.Name)
100
+	c.enqueueDeploymentConfig(dc)
101
+}
102
+
103
+func (c *DeploymentConfigController) updateDeploymentConfig(old, cur interface{}) {
104
+	oldDc := old.(*deployapi.DeploymentConfig)
105
+	glog.V(4).Infof("Updating deployment config %q", oldDc.Name)
106
+	c.enqueueDeploymentConfig(cur.(*deployapi.DeploymentConfig))
107
+}
108
+
109
+func (c *DeploymentConfigController) deleteDeploymentConfig(obj interface{}) {
110
+	dc, ok := obj.(*deployapi.DeploymentConfig)
111
+	if !ok {
112
+		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
113
+		if !ok {
114
+			glog.Errorf("Couldn't get object from tombstone %+v", obj)
115
+			return
116
+		}
117
+		dc, ok = tombstone.Obj.(*deployapi.DeploymentConfig)
118
+		if !ok {
119
+			glog.Errorf("Tombstone contained object that is not a deployment config: %+v", obj)
120
+			return
121
+		}
122
+	}
123
+	glog.V(4).Infof("Deleting deployment config %q", dc.Name)
124
+	c.enqueueDeploymentConfig(dc)
125
+}
126
+
127
+// addReplicationController enqueues the deployment that manages a replicationcontroller when the replicationcontroller is created.
128
+func (c *DeploymentConfigController) addReplicationController(obj interface{}) {
129
+	rc := obj.(*kapi.ReplicationController)
130
+	glog.V(4).Infof("Replication controller %q added.", rc.Name)
131
+	// We are waiting for the deployment config store to sync but still there are pathological
132
+	// cases of highly latent watches.
133
+	if dc, err := c.dcStore.GetConfigForController(rc); err == nil && dc != nil {
134
+		c.enqueueDeploymentConfig(dc)
135
+	}
136
+}
137
+
138
+// updateReplicationController figures out which deploymentconfig is managing this replication
139
+// controller and requeues the deployment config.
140
+func (c *DeploymentConfigController) updateReplicationController(old, cur interface{}) {
141
+	// A periodic relist will send update events for all known controllers.
142
+	if kapi.Semantic.DeepEqual(old, cur) {
143
+		return
144
+	}
145
+
146
+	curRC := cur.(*kapi.ReplicationController)
147
+	glog.V(4).Infof("Replication controller %q updated.", curRC.Name)
148
+	if dc, err := c.dcStore.GetConfigForController(curRC); err == nil && dc != nil {
149
+		c.enqueueDeploymentConfig(dc)
150
+	}
151
+}
152
+
153
+// deleteReplicationController enqueues the deployment that manages a replicationcontroller when
154
+// the replicationcontroller is deleted. obj could be an *kapi.ReplicationController, or
155
+// a DeletionFinalStateUnknown marker item.
156
+func (c *DeploymentConfigController) deleteReplicationController(obj interface{}) {
157
+	rc, ok := obj.(*kapi.ReplicationController)
158
+
159
+	// When a delete is dropped, the relist will notice a pod in the store not
160
+	// in the list, leading to the insertion of a tombstone object which contains
161
+	// the deleted key/value.
162
+	if !ok {
163
+		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
164
+		if !ok {
165
+			glog.Errorf("Couldn't get object from tombstone %#v", obj)
166
+			return
167
+		}
168
+		rc, ok = tombstone.Obj.(*kapi.ReplicationController)
169
+		if !ok {
170
+			glog.Errorf("Tombstone contained object that is not a replication controller %#v", obj)
171
+			return
172
+		}
173
+	}
174
+	glog.V(4).Infof("Replication controller %q deleted.", rc.Name)
175
+	if dc, err := c.dcStore.GetConfigForController(rc); err == nil && dc != nil {
176
+		c.enqueueDeploymentConfig(dc)
177
+	}
178
+}
179
+
180
+func (c *DeploymentConfigController) enqueueDeploymentConfig(dc *deployapi.DeploymentConfig) {
181
+	key, err := kcontroller.KeyFunc(dc)
182
+	if err != nil {
183
+		glog.Errorf("Couldn't get key for object %#v: %v", dc, err)
184
+		return
185
+	}
186
+	c.queue.Add(key)
187
+}
188
+
189
+func (c *DeploymentConfigController) worker() {
190
+	for {
191
+		if quit := c.work(); quit {
192
+			return
193
+		}
194
+	}
195
+}
49 196
 
50
-	configController := NewDeploymentConfigController(factory.KubeClient, factory.Client, factory.Codec, recorder)
51
-
52
-	return &controller.RetryController{
53
-		Queue: queue,
54
-		RetryManager: controller.NewQueueRetryManager(
55
-			queue,
56
-			cache.MetaNamespaceKeyFunc,
57
-			func(obj interface{}, err error, retries controller.Retry) bool {
58
-				config := obj.(*deployapi.DeploymentConfig)
59
-				// no retries for a fatal error
60
-				if _, isFatal := err.(fatalError); isFatal {
61
-					glog.V(4).Infof("Will not retry fatal error for deploymentConfig %s/%s: %v", config.Namespace, config.Name, err)
62
-					utilruntime.HandleError(err)
63
-					return false
64
-				}
65
-				// infinite retries for a transient error
66
-				if _, isTransient := err.(transientError); isTransient {
67
-					glog.V(4).Infof("Retrying deploymentConfig %s/%s with error: %v", config.Namespace, config.Name, err)
68
-					return true
69
-				}
70
-				utilruntime.HandleError(err)
71
-				// no retries for anything else
72
-				if retries.Count > 0 {
73
-					return false
74
-				}
75
-				return true
76
-			},
77
-			flowcontrol.NewTokenBucketRateLimiter(1, 10),
78
-		),
79
-		Handle: func(obj interface{}) error {
80
-			config := obj.(*deployapi.DeploymentConfig)
81
-			copied, err := dcCopy(config)
82
-			if err != nil {
83
-				return err
84
-			}
85
-			return configController.Handle(copied)
86
-		},
197
+func (c *DeploymentConfigController) work() bool {
198
+	key, quit := c.queue.Get()
199
+	if quit {
200
+		return true
87 201
 	}
202
+	defer c.queue.Done(key)
203
+
204
+	dc, err := c.getByKey(key.(string))
205
+	if err != nil {
206
+		glog.Error(err.Error())
207
+	}
208
+
209
+	if dc == nil {
210
+		return false
211
+	}
212
+
213
+	copied, err := dcCopy(dc)
214
+	if err != nil {
215
+		glog.Error(err.Error())
216
+		return false
217
+	}
218
+
219
+	err = c.Handle(copied)
220
+	c.handleErr(err, key)
221
+
222
+	return false
223
+}
224
+
225
+func (c *DeploymentConfigController) getByKey(key string) (*deployapi.DeploymentConfig, error) {
226
+	obj, exists, err := c.dcStore.Indexer.GetByKey(key)
227
+	if err != nil {
228
+		glog.V(2).Infof("Unable to retrieve deployment config %q from store: %v", key, err)
229
+		c.queue.AddRateLimited(key)
230
+		return nil, err
231
+	}
232
+	if !exists {
233
+		glog.V(4).Infof("Deployment config %q has been deleted", key)
234
+		return nil, nil
235
+	}
236
+
237
+	return obj.(*deployapi.DeploymentConfig), nil
88 238
 }
89 239
 
90 240
 func dcCopy(dc *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) {
... ...
@@ -22,22 +22,22 @@ func LatestDeploymentNameForConfig(config *deployapi.DeploymentConfig) string {
22 22
 // LatestDeploymentInfo returns info about the latest deployment for a config,
23 23
 // or nil if there is no latest deployment. The latest deployment is not
24 24
 // always the same as the active deployment.
25
-func LatestDeploymentInfo(config *deployapi.DeploymentConfig, deployments *api.ReplicationControllerList) (bool, *api.ReplicationController) {
26
-	if config.Status.LatestVersion == 0 || len(deployments.Items) == 0 {
25
+func LatestDeploymentInfo(config *deployapi.DeploymentConfig, deployments []api.ReplicationController) (bool, *api.ReplicationController) {
26
+	if config.Status.LatestVersion == 0 || len(deployments) == 0 {
27 27
 		return false, nil
28 28
 	}
29
-	sort.Sort(ByLatestVersionDesc(deployments.Items))
30
-	candidate := &deployments.Items[0]
29
+	sort.Sort(ByLatestVersionDesc(deployments))
30
+	candidate := &deployments[0]
31 31
 	return DeploymentVersionFor(candidate) == config.Status.LatestVersion, candidate
32 32
 }
33 33
 
34 34
 // ActiveDeployment returns the latest complete deployment, or nil if there is
35 35
 // no such deployment. The active deployment is not always the same as the
36 36
 // latest deployment.
37
-func ActiveDeployment(config *deployapi.DeploymentConfig, deployments *api.ReplicationControllerList) *api.ReplicationController {
38
-	sort.Sort(ByLatestVersionDesc(deployments.Items))
37
+func ActiveDeployment(config *deployapi.DeploymentConfig, deployments []api.ReplicationController) *api.ReplicationController {
38
+	sort.Sort(ByLatestVersionDesc(deployments))
39 39
 	var activeDeployment *api.ReplicationController
40
-	for _, deployment := range deployments.Items {
40
+	for _, deployment := range deployments {
41 41
 		if DeploymentStatusFor(&deployment) == deployapi.DeploymentStatusComplete {
42 42
 			activeDeployment = &deployment
43 43
 			break