Browse code

deploy: add shared caches in the trigger controller

Michail Kargakis authored on 2016/06/18 00:20:32
Showing 10 changed files
... ...
@@ -351,14 +351,15 @@ func (c *MasterConfig) RunDeploymentConfigController() {
351 351
 
352 352
 // RunDeploymentTriggerController starts the deployment trigger controller process.
353 353
 func (c *MasterConfig) RunDeploymentTriggerController() {
354
+	dcInfomer := c.Informers.DeploymentConfigs().Informer()
355
+	streamInformer := c.Informers.ImageStreams().Informer()
354 356
 	osclient, kclient := c.DeploymentTriggerControllerClients()
355
-	factory := triggercontroller.DeploymentTriggerControllerFactory{
356
-		Client:     osclient,
357
-		KubeClient: kclient,
358
-		Codec:      c.EtcdHelper.Codec(),
359
-	}
360
-	controller := factory.Create()
361
-	controller.Run()
357
+
358
+	controller := triggercontroller.NewDeploymentTriggerController(dcInfomer, streamInformer, osclient, kclient, c.EtcdHelper.Codec())
359
+	// TODO: Make the stop channel actually work.
360
+	stopCh := make(chan struct{})
361
+	// TODO: Make the number of workers configurable.
362
+	go controller.Run(5, stopCh)
362 363
 }
363 364
 
364 365
 // RunDeploymentImageChangeTriggerController starts the image change trigger controller process.
... ...
@@ -21,10 +21,6 @@ type ClusterPolicyInformer interface {
21 21
 	Lister() client.SyncedClusterPoliciesListerInterface
22 22
 }
23 23
 
24
-func (f *sharedInformerFactory) ClusterPolicies() ClusterPolicyInformer {
25
-	return &clusterPolicyInformer{sharedInformerFactory: f}
26
-}
27
-
28 24
 type clusterPolicyInformer struct {
29 25
 	*sharedInformerFactory
30 26
 }
... ...
@@ -79,10 +75,6 @@ type ClusterPolicyBindingInformer interface {
79 79
 	Lister() client.SyncedClusterPolicyBindingsListerInterface
80 80
 }
81 81
 
82
-func (f *sharedInformerFactory) ClusterPolicyBindings() ClusterPolicyBindingInformer {
83
-	return &clusterPolicyBindingInformer{sharedInformerFactory: f}
84
-}
85
-
86 82
 type clusterPolicyBindingInformer struct {
87 83
 	*sharedInformerFactory
88 84
 }
... ...
@@ -137,10 +129,6 @@ type PolicyInformer interface {
137 137
 	Lister() client.SyncedPoliciesListerNamespacer
138 138
 }
139 139
 
140
-func (f *sharedInformerFactory) Policies() PolicyInformer {
141
-	return &policyInformer{sharedInformerFactory: f}
142
-}
143
-
144 140
 type policyInformer struct {
145 141
 	*sharedInformerFactory
146 142
 }
... ...
@@ -195,10 +183,6 @@ type PolicyBindingInformer interface {
195 195
 	Lister() client.SyncedPolicyBindingsListerNamespacer
196 196
 }
197 197
 
198
-func (f *sharedInformerFactory) PolicyBindings() PolicyBindingInformer {
199
-	return &policyBindingInformer{sharedInformerFactory: f}
200
-}
201
-
202 198
 type policyBindingInformer struct {
203 199
 	*sharedInformerFactory
204 200
 }
205 201
new file mode 100644
... ...
@@ -0,0 +1,62 @@
0
+package controller
1
+
2
+import (
3
+	"reflect"
4
+
5
+	kapi "k8s.io/kubernetes/pkg/api"
6
+	"k8s.io/kubernetes/pkg/client/cache"
7
+	"k8s.io/kubernetes/pkg/controller/framework"
8
+	"k8s.io/kubernetes/pkg/runtime"
9
+	"k8s.io/kubernetes/pkg/watch"
10
+
11
+	oscache "github.com/openshift/origin/pkg/client/cache"
12
+)
13
+
14
+type ImageStreamInformer interface {
15
+	Informer() framework.SharedIndexInformer
16
+	Indexer() cache.Indexer
17
+	Lister() *oscache.StoreToImageStreamLister
18
+}
19
+
20
+type imageStreamInformer struct {
21
+	*sharedInformerFactory
22
+}
23
+
24
+func (f *imageStreamInformer) Informer() framework.SharedIndexInformer {
25
+	f.lock.Lock()
26
+	defer f.lock.Unlock()
27
+
28
+	informerObj := &imageapi.ImageStream{}
29
+	informerType := reflect.TypeOf(informerObj)
30
+	informer, exists := f.informers[informerType]
31
+	if exists {
32
+		return informer
33
+	}
34
+
35
+	informer = framework.NewSharedIndexInformer(
36
+		&cache.ListWatch{
37
+			ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
38
+				return f.originClient.ImageStreams(kapi.NamespaceAll).List(options)
39
+			},
40
+			WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
41
+				return f.originClient.ImageStreams(kapi.NamespaceAll).Watch(options)
42
+			},
43
+		},
44
+		informerObj,
45
+		f.defaultResync,
46
+		cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
47
+	)
48
+	f.informers[informerType] = informer
49
+
50
+	return informer
51
+}
52
+
53
+func (f *imageStreamInformer) Indexer() cache.Indexer {
54
+	informer := f.Informer()
55
+	return informer.GetIndexer()
56
+}
57
+
58
+func (f *imageStreamInformer) Lister() *oscache.StoreToImageStreamLister {
59
+	informer := f.Informer()
60
+	return &oscache.StoreToImageStreamLister{Indexer: informer.GetIndexer()}
61
+}
... ...
@@ -16,10 +16,6 @@ type PodInformer interface {
16 16
 	Lister() *cache.StoreToPodLister
17 17
 }
18 18
 
19
-func (f *sharedInformerFactory) Pods() PodInformer {
20
-	return &podInformer{sharedInformerFactory: f}
21
-}
22
-
23 19
 type podInformer struct {
24 20
 	*sharedInformerFactory
25 21
 }
... ...
@@ -75,10 +71,6 @@ type ReplicationControllerInformer interface {
75 75
 	Lister() *cache.StoreToReplicationControllerLister
76 76
 }
77 77
 
78
-func (f *sharedInformerFactory) ReplicationControllers() ReplicationControllerInformer {
79
-	return &replicationControllerInformer{sharedInformerFactory: f}
80
-}
81
-
82 78
 type replicationControllerInformer struct {
83 79
 	*sharedInformerFactory
84 80
 }
... ...
@@ -28,6 +28,7 @@ type InformerFactory interface {
28 28
 	PolicyBindings() PolicyBindingInformer
29 29
 
30 30
 	DeploymentConfigs() DeploymentConfigInformer
31
+	ImageStreams() ImageStreamInformer
31 32
 }
32 33
 
33 34
 // ListerWatcherOverrides allows a caller to specify special behavior for particular ListerWatchers
... ...
@@ -73,12 +74,40 @@ func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
73 73
 	}
74 74
 }
75 75
 
76
-func (f *sharedInformerFactory) DeploymentConfigs() DeploymentConfigInformer {
77
-	return &deploymentConfigInformer{sharedInformerFactory: f}
78
-}
79
-
80 76
 func (f *sharedInformerFactory) StartCore(stopCh <-chan struct{}) {
81 77
 	for _, informer := range f.coreInformers {
82 78
 		go informer.Run(stopCh)
83 79
 	}
84 80
 }
81
+
82
+func (f *sharedInformerFactory) Pods() PodInformer {
83
+	return &podInformer{sharedInformerFactory: f}
84
+}
85
+
86
+func (f *sharedInformerFactory) ReplicationControllers() ReplicationControllerInformer {
87
+	return &replicationControllerInformer{sharedInformerFactory: f}
88
+}
89
+
90
+func (f *sharedInformerFactory) ClusterPolicies() ClusterPolicyInformer {
91
+	return &clusterPolicyInformer{sharedInformerFactory: f}
92
+}
93
+
94
+func (f *sharedInformerFactory) ClusterPolicyBindings() ClusterPolicyBindingInformer {
95
+	return &clusterPolicyBindingInformer{sharedInformerFactory: f}
96
+}
97
+
98
+func (f *sharedInformerFactory) Policies() PolicyInformer {
99
+	return &policyInformer{sharedInformerFactory: f}
100
+}
101
+
102
+func (f *sharedInformerFactory) PolicyBindings() PolicyBindingInformer {
103
+	return &policyBindingInformer{sharedInformerFactory: f}
104
+}
105
+
106
+func (f *sharedInformerFactory) DeploymentConfigs() DeploymentConfigInformer {
107
+	return &deploymentConfigInformer{sharedInformerFactory: f}
108
+}
109
+
110
+func (f *sharedInformerFactory) ImageStreams() ImageStreamInformer {
111
+	return &imageStreamInformer{sharedInformerFactory: f}
112
+}
... ...
@@ -1,7 +1,6 @@
1 1
 package deploymentconfig
2 2
 
3 3
 import (
4
-	"fmt"
5 4
 	"time"
6 5
 
7 6
 	"github.com/golang/glog"
... ...
@@ -18,11 +17,13 @@ import (
18 18
 
19 19
 	osclient "github.com/openshift/origin/pkg/client"
20 20
 	deployapi "github.com/openshift/origin/pkg/deploy/api"
21
+	deployutil "github.com/openshift/origin/pkg/deploy/util"
21 22
 )
22 23
 
23 24
 const (
24
-	// We must avoid creating new replication controllers until the {replication controller,pods} store
25
-	// has synced. If it hasn't synced, to avoid a hot loop, we'll wait this long between checks.
25
+	// We must avoid creating new replication controllers until the deployment config and replication
26
+	// controller stores have synced. If it hasn't synced, to avoid a hot loop, we'll wait this long
27
+	// between checks.
26 28
 	StoreSyncedPollPeriod = 100 * time.Millisecond
27 29
 )
28 30
 
... ...
@@ -69,7 +70,7 @@ func (c *DeploymentConfigController) Run(workers int, stopCh <-chan struct{}) {
69 69
 
70 70
 	// Wait for the rc and dc stores to sync before starting any work in this controller.
71 71
 	ready := make(chan struct{})
72
-	go c.waitForSyncedStores(ready)
72
+	go c.waitForSyncedStores(ready, stopCh)
73 73
 	select {
74 74
 	case <-ready:
75 75
 	case <-stopCh:
... ...
@@ -85,12 +86,16 @@ func (c *DeploymentConfigController) Run(workers int, stopCh <-chan struct{}) {
85 85
 	c.queue.ShutDown()
86 86
 }
87 87
 
88
-func (c *DeploymentConfigController) waitForSyncedStores(ready chan struct{}) {
88
+func (c *DeploymentConfigController) waitForSyncedStores(ready chan<- struct{}, stopCh <-chan struct{}) {
89 89
 	defer utilruntime.HandleCrash()
90 90
 
91 91
 	for !c.dcStoreSynced() || !c.rcStoreSynced() || !c.podStoreSynced() {
92 92
 		glog.V(4).Infof("Waiting for the dc, rc, and pod caches to sync before starting the deployment config controller workers")
93
-		time.Sleep(StoreSyncedPollPeriod)
93
+		select {
94
+		case <-time.After(StoreSyncedPollPeriod):
95
+		case <-stopCh:
96
+			return
97
+		}
94 98
 	}
95 99
 	close(ready)
96 100
 }
... ...
@@ -102,9 +107,9 @@ func (c *DeploymentConfigController) addDeploymentConfig(obj interface{}) {
102 102
 }
103 103
 
104 104
 func (c *DeploymentConfigController) updateDeploymentConfig(old, cur interface{}) {
105
-	oldDc := old.(*deployapi.DeploymentConfig)
106
-	glog.V(4).Infof("Updating deployment config %q", oldDc.Name)
107
-	c.enqueueDeploymentConfig(cur.(*deployapi.DeploymentConfig))
105
+	dc := cur.(*deployapi.DeploymentConfig)
106
+	glog.V(4).Infof("Updating deployment config %q", dc.Name)
107
+	c.enqueueDeploymentConfig(dc)
108 108
 }
109 109
 
110 110
 func (c *DeploymentConfigController) deleteDeploymentConfig(obj interface{}) {
... ...
@@ -125,7 +130,10 @@ func (c *DeploymentConfigController) deleteDeploymentConfig(obj interface{}) {
125 125
 	c.enqueueDeploymentConfig(dc)
126 126
 }
127 127
 
128
-// addReplicationController enqueues the deployment that manages a replicationcontroller when the replicationcontroller is created.
128
+// addReplicationController figures out which deploymentconfig is managing this replication
129
+// controller and requeues the deployment config.
130
+// TODO: Determine if we need to resync here. Would be useful for adoption but we cannot
131
+// adopt right now.
129 132
 func (c *DeploymentConfigController) addReplicationController(obj interface{}) {
130 133
 	rc := obj.(*kapi.ReplicationController)
131 134
 	glog.V(4).Infof("Replication controller %q added.", rc.Name)
... ...
@@ -211,7 +219,7 @@ func (c *DeploymentConfigController) work() bool {
211 211
 		return false
212 212
 	}
213 213
 
214
-	copied, err := dcCopy(dc)
214
+	copied, err := deployutil.DeploymentConfigDeepCopy(dc)
215 215
 	if err != nil {
216 216
 		glog.Error(err.Error())
217 217
 		return false
... ...
@@ -237,15 +245,3 @@ func (c *DeploymentConfigController) getByKey(key string) (*deployapi.Deployment
237 237
 
238 238
 	return obj.(*deployapi.DeploymentConfig), nil
239 239
 }
240
-
241
-func dcCopy(dc *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) {
242
-	objCopy, err := kapi.Scheme.DeepCopy(dc)
243
-	if err != nil {
244
-		return nil, err
245
-	}
246
-	copied, ok := objCopy.(*deployapi.DeploymentConfig)
247
-	if !ok {
248
-		return nil, fmt.Errorf("expected DeploymentConfig, got %#v", objCopy)
249
-	}
250
-	return copied, nil
251
-}
... ...
@@ -6,8 +6,10 @@ import (
6 6
 	kapi "k8s.io/kubernetes/pkg/api"
7 7
 	kclient "k8s.io/kubernetes/pkg/client/unversioned"
8 8
 	"k8s.io/kubernetes/pkg/runtime"
9
+	"k8s.io/kubernetes/pkg/util/workqueue"
9 10
 
10 11
 	osclient "github.com/openshift/origin/pkg/client"
12
+	oscache "github.com/openshift/origin/pkg/client/cache"
11 13
 	deployapi "github.com/openshift/origin/pkg/deploy/api"
12 14
 	deployutil "github.com/openshift/origin/pkg/deploy/util"
13 15
 )
... ...
@@ -19,19 +21,19 @@ type DeploymentTriggerController struct {
19 19
 	dn osclient.DeploymentConfigsNamespacer
20 20
 	// rn is used for getting the latest deployment for a config.
21 21
 	rn kclient.ReplicationControllersNamespacer
22
+
23
+	// queue contains deployment configs that need to be synced.
24
+	queue workqueue.RateLimitingInterface
25
+
26
+	// dcStore provides a local cache for deployment configs.
27
+	dcStore oscache.StoreToDeploymentConfigLister
28
+	// dcStoreSynced makes sure the dc store is synced before reconcling any deployment config.
29
+	dcStoreSynced func() bool
30
+
22 31
 	// codec is used for decoding a config out of a deployment.
23 32
 	codec runtime.Codec
24 33
 }
25 34
 
26
-// NewDeploymentTriggerController returns a new DeploymentTriggerController.
27
-func NewDeploymentTriggerController(oc osclient.Interface, kc kclient.Interface, codec runtime.Codec) *DeploymentTriggerController {
28
-	return &DeploymentTriggerController{
29
-		dn:    oc,
30
-		rn:    kc,
31
-		codec: codec,
32
-	}
33
-}
34
-
35 35
 // fatalError is an error which can't be retried.
36 36
 type fatalError string
37 37
 
... ...
@@ -59,7 +61,12 @@ func (c *DeploymentTriggerController) Handle(config *deployapi.DeploymentConfig)
59 59
 		return nil
60 60
 	}
61 61
 
62
-	return c.update(config, causes)
62
+	copied, err := deployutil.DeploymentConfigDeepCopy(config)
63
+	if err != nil {
64
+		return err
65
+	}
66
+
67
+	return c.update(copied, causes)
63 68
 }
64 69
 
65 70
 // decodeFromLatest will try to return the decoded version of the current deploymentconfig found
... ...
@@ -2,19 +2,54 @@ package generictrigger
2 2
 
3 3
 import (
4 4
 	"testing"
5
+	"time"
5 6
 
6 7
 	kapi "k8s.io/kubernetes/pkg/api"
8
+	"k8s.io/kubernetes/pkg/client/cache"
7 9
 	ktestclient "k8s.io/kubernetes/pkg/client/unversioned/testclient"
10
+	"k8s.io/kubernetes/pkg/controller/framework"
8 11
 	"k8s.io/kubernetes/pkg/runtime"
12
+	"k8s.io/kubernetes/pkg/watch"
9 13
 
10 14
 	"github.com/openshift/origin/pkg/client/testclient"
11 15
 	deployapi "github.com/openshift/origin/pkg/deploy/api"
12 16
 	_ "github.com/openshift/origin/pkg/deploy/api/install"
13 17
 	testapi "github.com/openshift/origin/pkg/deploy/api/test"
14 18
 	deployutil "github.com/openshift/origin/pkg/deploy/util"
19
+	imageapi "github.com/openshift/origin/pkg/image/api"
15 20
 )
16 21
 
17
-var codec = kapi.Codecs.LegacyCodec(deployapi.SchemeGroupVersion)
22
+var (
23
+	codec      = kapi.Codecs.LegacyCodec(deployapi.SchemeGroupVersion)
24
+	mock       = &testclient.Fake{}
25
+	dcInformer = framework.NewSharedIndexInformer(
26
+		&cache.ListWatch{
27
+			ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
28
+				return mock.DeploymentConfigs(kapi.NamespaceAll).List(options)
29
+			},
30
+			WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
31
+				return mock.DeploymentConfigs(kapi.NamespaceAll).Watch(options)
32
+			},
33
+		},
34
+		&deployapi.DeploymentConfig{},
35
+		2*time.Minute,
36
+		cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
37
+	)
38
+
39
+	streamInformer = framework.NewSharedIndexInformer(
40
+		&cache.ListWatch{
41
+			ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
42
+				return mock.ImageStreams(kapi.NamespaceAll).List(options)
43
+			},
44
+			WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
45
+				return mock.ImageStreams(kapi.NamespaceAll).Watch(options)
46
+			},
47
+		},
48
+		&imageapi.ImageStream{},
49
+		2*time.Minute,
50
+		cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
51
+	)
52
+)
18 53
 
19 54
 // TestHandle_newConfigNoTriggers ensures that a change to a config with no
20 55
 // triggers doesn't result in a new config version bump.
... ...
@@ -22,7 +57,7 @@ func TestHandle_newConfigNoTriggers(t *testing.T) {
22 22
 	fake := &testclient.Fake{}
23 23
 	kFake := &ktestclient.Fake{}
24 24
 
25
-	controller := NewDeploymentTriggerController(fake, kFake, codec)
25
+	controller := NewDeploymentTriggerController(dcInformer, streamInformer, fake, kFake, codec)
26 26
 
27 27
 	config := testapi.OkDeploymentConfig(1)
28 28
 	config.Namespace = kapi.NamespaceDefault
... ...
@@ -51,7 +86,7 @@ func TestHandle_newConfigTriggers(t *testing.T) {
51 51
 	})
52 52
 	kFake := &ktestclient.Fake{}
53 53
 
54
-	controller := NewDeploymentTriggerController(fake, kFake, codec)
54
+	controller := NewDeploymentTriggerController(dcInformer, streamInformer, fake, kFake, codec)
55 55
 
56 56
 	config := testapi.OkDeploymentConfig(0)
57 57
 	config.Namespace = kapi.NamespaceDefault
... ...
@@ -123,7 +158,7 @@ func TestHandle_changeWithTemplateDiff(t *testing.T) {
123 123
 			return true, deployment, nil
124 124
 		})
125 125
 
126
-		controller := NewDeploymentTriggerController(fake, kFake, codec)
126
+		controller := NewDeploymentTriggerController(dcInformer, streamInformer, fake, kFake, codec)
127 127
 
128 128
 		s.modify(config)
129 129
 		if err := controller.Handle(config); err != nil {
... ...
@@ -169,7 +204,7 @@ func TestHandle_waitForImageController(t *testing.T) {
169 169
 		return true, nil, nil
170 170
 	})
171 171
 
172
-	controller := NewDeploymentTriggerController(fake, kFake, codec)
172
+	controller := NewDeploymentTriggerController(dcInformer, streamInformer, fake, kFake, codec)
173 173
 
174 174
 	config := testapi.OkDeploymentConfig(0)
175 175
 	config.Namespace = kapi.NamespaceDefault
... ...
@@ -236,7 +271,7 @@ func TestHandle_automaticImageUpdates(t *testing.T) {
236 236
 			return true, deployment, nil
237 237
 		})
238 238
 
239
-		controller := NewDeploymentTriggerController(fake, kFake, codec)
239
+		controller := NewDeploymentTriggerController(dcInformer, streamInformer, fake, kFake, codec)
240 240
 
241 241
 		config := testapi.OkDeploymentConfig(test.version)
242 242
 		config.Namespace = kapi.NamespaceDefault
... ...
@@ -4,67 +4,185 @@ import (
4 4
 	"time"
5 5
 
6 6
 	kapi "k8s.io/kubernetes/pkg/api"
7
-	"k8s.io/kubernetes/pkg/client/cache"
8
-	"k8s.io/kubernetes/pkg/client/record"
9 7
 	kclient "k8s.io/kubernetes/pkg/client/unversioned"
8
+	kcontroller "k8s.io/kubernetes/pkg/controller"
9
+	"k8s.io/kubernetes/pkg/controller/framework"
10 10
 	"k8s.io/kubernetes/pkg/runtime"
11
-	"k8s.io/kubernetes/pkg/util/flowcontrol"
12 11
 	utilruntime "k8s.io/kubernetes/pkg/util/runtime"
13
-	"k8s.io/kubernetes/pkg/watch"
12
+	"k8s.io/kubernetes/pkg/util/wait"
13
+	"k8s.io/kubernetes/pkg/util/workqueue"
14 14
 
15
+	"github.com/golang/glog"
15 16
 	osclient "github.com/openshift/origin/pkg/client"
16
-	controller "github.com/openshift/origin/pkg/controller"
17 17
 	deployapi "github.com/openshift/origin/pkg/deploy/api"
18
+	imageapi "github.com/openshift/origin/pkg/image/api"
18 19
 )
19 20
 
20
-// DeploymentTriggerControllerFactory can create a DeploymentTriggerController that watches all DeploymentConfigs.
21
-type DeploymentTriggerControllerFactory struct {
22
-	// Client is an OpenShift client.
23
-	Client osclient.Interface
24
-	// KubeClient is a Kubernetes client.
25
-	KubeClient kclient.Interface
26
-	// Codec is used for encoding/decoding.
27
-	Codec runtime.Codec
21
+const (
22
+	// We must avoid creating processing deployment configs until the deployment config and image
23
+	// stream stores have synced. If it hasn't synced, to avoid a hot loop, we'll wait this long
24
+	// between checks.
25
+	StoreSyncedPollPeriod = 100 * time.Millisecond
26
+)
27
+
28
+// NewDeploymentTriggerController returns a new DeploymentTriggerController.
29
+func NewDeploymentTriggerController(dcInformer, streamInformer framework.SharedIndexInformer, oc osclient.Interface, kc kclient.Interface, codec runtime.Codec) *DeploymentTriggerController {
30
+	c := &DeploymentTriggerController{
31
+		dn: oc,
32
+		rn: kc,
33
+
34
+		queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
35
+
36
+		codec: codec,
37
+	}
38
+
39
+	c.dcStore.Indexer = dcInformer.GetIndexer()
40
+	dcInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
41
+		AddFunc:    c.addDeploymentConfig,
42
+		UpdateFunc: c.updateDeploymentConfig,
43
+	})
44
+	c.dcStoreSynced = dcInformer.HasSynced
45
+
46
+	streamInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
47
+		AddFunc:    c.addImageStream,
48
+		UpdateFunc: c.updateImageStream,
49
+	})
50
+
51
+	return c
52
+}
53
+
54
+// Run begins watching and syncing.
55
+func (c *DeploymentTriggerController) Run(workers int, stopCh <-chan struct{}) {
56
+	defer utilruntime.HandleCrash()
57
+
58
+	// Wait for the dc store to sync before starting any work in this controller.
59
+	ready := make(chan struct{})
60
+	go c.waitForSyncedStore(ready, stopCh)
61
+	select {
62
+	case <-ready:
63
+	case <-stopCh:
64
+		return
65
+	}
66
+
67
+	for i := 0; i < workers; i++ {
68
+		go wait.Until(c.worker, time.Second, stopCh)
69
+	}
70
+	<-stopCh
71
+	glog.Infof("Shutting down deployment trigger controller")
72
+	c.queue.ShutDown()
73
+}
74
+
75
+func (c *DeploymentTriggerController) waitForSyncedStore(ready chan<- struct{}, stopCh <-chan struct{}) {
76
+	defer utilruntime.HandleCrash()
77
+
78
+	for !c.dcStoreSynced() {
79
+		glog.V(4).Infof("Waiting for the deployment config cache to sync before starting the trigger controller workers")
80
+		select {
81
+		case <-time.After(StoreSyncedPollPeriod):
82
+		case <-stopCh:
83
+			return
84
+		}
85
+	}
86
+	close(ready)
87
+}
88
+
89
+func (c *DeploymentTriggerController) addDeploymentConfig(obj interface{}) {
90
+	dc := obj.(*deployapi.DeploymentConfig)
91
+	c.enqueueDeploymentConfig(dc)
92
+}
93
+
94
+func (c *DeploymentTriggerController) updateDeploymentConfig(old, cur interface{}) {
95
+	dc := cur.(*deployapi.DeploymentConfig)
96
+	c.enqueueDeploymentConfig(dc)
97
+}
98
+
99
+// addImageStream enqueues the deployment configs that point to the new image stream.
100
+func (c *DeploymentTriggerController) addImageStream(obj interface{}) {
101
+	stream := obj.(*imageapi.ImageStream)
102
+	glog.V(4).Infof("Image stream %q added.", stream.Name)
103
+	dcList, err := c.dcStore.GetConfigsForImageStream(stream)
104
+	if err != nil {
105
+		return
106
+	}
107
+	for _, dc := range dcList {
108
+		c.enqueueDeploymentConfig(dc)
109
+	}
110
+}
111
+
112
+// updateImageStream enqueues the deployment configs that point to the updated image stream.
113
+func (c *DeploymentTriggerController) updateImageStream(old, cur interface{}) {
114
+	// A periodic relist will send update events for all known streams.
115
+	if kapi.Semantic.DeepEqual(old, cur) {
116
+		return
117
+	}
118
+
119
+	stream := cur.(*imageapi.ImageStream)
120
+	glog.V(4).Infof("Image stream %q updated.", stream.Name)
121
+	dcList, err := c.dcStore.GetConfigsForImageStream(stream)
122
+	if err != nil {
123
+		return
124
+	}
125
+	for _, dc := range dcList {
126
+		c.enqueueDeploymentConfig(dc)
127
+	}
128
+}
129
+
130
+func (c *DeploymentTriggerController) enqueueDeploymentConfig(dc *deployapi.DeploymentConfig) {
131
+	key, err := kcontroller.KeyFunc(dc)
132
+	if err != nil {
133
+		glog.Errorf("Couldn't get key for object %+v: %v", dc, err)
134
+		return
135
+	}
136
+	c.queue.Add(key)
137
+}
138
+
139
+func (c *DeploymentTriggerController) worker() {
140
+	for {
141
+		if quit := c.work(); quit {
142
+			return
143
+		}
144
+	}
28 145
 }
29 146
 
30
-// Create creates a DeploymentTriggerController.
31
-func (factory *DeploymentTriggerControllerFactory) Create() controller.RunnableController {
32
-	deploymentConfigLW := &cache.ListWatch{
33
-		ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
34
-			return factory.Client.DeploymentConfigs(kapi.NamespaceAll).List(options)
35
-		},
36
-		WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
37
-			return factory.Client.DeploymentConfigs(kapi.NamespaceAll).Watch(options)
38
-		},
39
-	}
40
-	queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
41
-	cache.NewReflector(deploymentConfigLW, &deployapi.DeploymentConfig{}, queue, 2*time.Minute).Run()
42
-
43
-	eventBroadcaster := record.NewBroadcaster()
44
-	eventBroadcaster.StartRecordingToSink(factory.KubeClient.Events(""))
45
-
46
-	triggerController := NewDeploymentTriggerController(factory.Client, factory.KubeClient, factory.Codec)
47
-
48
-	return &controller.RetryController{
49
-		Queue: queue,
50
-		RetryManager: controller.NewQueueRetryManager(
51
-			queue,
52
-			cache.MetaNamespaceKeyFunc,
53
-			func(obj interface{}, err error, retries controller.Retry) bool {
54
-				utilruntime.HandleError(err)
55
-				if _, isFatal := err.(fatalError); isFatal {
56
-					return false
57
-				}
58
-				if retries.Count > 0 {
59
-					return false
60
-				}
61
-				return true
62
-			},
63
-			flowcontrol.NewTokenBucketRateLimiter(1, 10),
64
-		),
65
-		Handle: func(obj interface{}) error {
66
-			config := obj.(*deployapi.DeploymentConfig)
67
-			return triggerController.Handle(config)
68
-		},
147
+func (c *DeploymentTriggerController) work() bool {
148
+	key, quit := c.queue.Get()
149
+	if quit {
150
+		return true
69 151
 	}
152
+	defer c.queue.Done(key)
153
+
154
+	dc, err := c.getByKey(key.(string))
155
+	if err != nil {
156
+		glog.Error(err.Error())
157
+	}
158
+
159
+	if dc == nil {
160
+		return false
161
+	}
162
+
163
+	if err := c.Handle(dc); err != nil {
164
+		utilruntime.HandleError(err)
165
+
166
+		if c.queue.NumRequeues(key) < 2 {
167
+			c.queue.AddRateLimited(key)
168
+			return false
169
+		}
170
+	}
171
+	c.queue.Forget(key)
172
+	return false
173
+}
174
+
175
+func (c *DeploymentTriggerController) getByKey(key string) (*deployapi.DeploymentConfig, error) {
176
+	obj, exists, err := c.dcStore.Indexer.GetByKey(key)
177
+	if err != nil {
178
+		glog.Infof("Unable to retrieve deployment config %q from store: %v", key, err)
179
+		c.queue.Add(key)
180
+		return nil, err
181
+	}
182
+	if !exists {
183
+		glog.Infof("Deployment config %q has been deleted", key)
184
+		return nil, nil
185
+	}
186
+
187
+	return obj.(*deployapi.DeploymentConfig), nil
70 188
 }
... ...
@@ -105,6 +105,18 @@ func HasChangeTrigger(config *deployapi.DeploymentConfig) bool {
105 105
 	return false
106 106
 }
107 107
 
108
+func DeploymentConfigDeepCopy(dc *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) {
109
+	objCopy, err := api.Scheme.DeepCopy(dc)
110
+	if err != nil {
111
+		return nil, err
112
+	}
113
+	copied, ok := objCopy.(*deployapi.DeploymentConfig)
114
+	if !ok {
115
+		return nil, fmt.Errorf("expected DeploymentConfig, got %#v", objCopy)
116
+	}
117
+	return copied, nil
118
+}
119
+
108 120
 // DecodeDeploymentConfig decodes a DeploymentConfig from controller using codec. An error is returned
109 121
 // if the controller doesn't contain an encoded config.
110 122
 func DecodeDeploymentConfig(controller *api.ReplicationController, decoder runtime.Decoder) (*deployapi.DeploymentConfig, error) {