Browse code

Introduce Unidling Controller

The unidling controller listens for "NeedPods" events. Upon receiving
such an event, it pulls up the related service/endpoints, ensures that
the event is not out of date, and then attempts to wake up the replication
controllers associated with the service, and remove the idled
annotations from the service's endpoints (which were presumably
placed on the service by the `oc idle` command).

Solly Ross authored on 2016/01/15 01:44:15
Showing 10 changed files
... ...
@@ -59,6 +59,9 @@ const (
59 59
 	InfraPetSetControllerServiceAccountName = "pet-set-controller"
60 60
 	PetSetControllerRoleName                = "system:pet-set-controller"
61 61
 
62
+	InfraUnidlingControllerServiceAccountName = "unidling-controller"
63
+	UnidlingControllerRoleName                = "system:unidling-controller"
64
+
62 65
 	ServiceServingCertServiceAccountName = "service-serving-cert-controller"
63 66
 	ServiceServingCertControllerRoleName = "system:service-serving-cert-controller"
64 67
 
... ...
@@ -774,6 +777,55 @@ func init() {
774 774
 	}
775 775
 
776 776
 	err = InfraSAs.addServiceAccount(
777
+		InfraUnidlingControllerServiceAccountName,
778
+		authorizationapi.ClusterRole{
779
+			ObjectMeta: kapi.ObjectMeta{
780
+				Name: UnidlingControllerRoleName,
781
+			},
782
+			Rules: []authorizationapi.PolicyRule{
783
+				{
784
+					APIGroups: []string{kapi.GroupName, extensions.GroupName},
785
+					Verbs:     sets.NewString("get", "update"),
786
+					Resources: sets.NewString("replicationcontrollers/scale"),
787
+				},
788
+				{
789
+					APIGroups: []string{extensions.GroupName},
790
+					Verbs:     sets.NewString("get", "update"),
791
+					Resources: sets.NewString("replicasets/scale", "deployments/scale"),
792
+				},
793
+				{
794
+					Verbs:     sets.NewString("get", "update"),
795
+					Resources: sets.NewString("deploymentconfigs/scale"),
796
+				},
797
+				{
798
+					Verbs:     sets.NewString("list", "watch"),
799
+					Resources: sets.NewString("events"),
800
+				},
801
+				{
802
+					APIGroups: []string{kapi.GroupName},
803
+					Verbs:     sets.NewString("get", "update"),
804
+					Resources: sets.NewString("endpoints"),
805
+				},
806
+				// these are used to "manually" scale and annotate known objects, and should be
807
+				// removed once we can set the last-scale-reason field via the scale subresource
808
+				{
809
+					APIGroups: []string{kapi.GroupName},
810
+					Verbs:     sets.NewString("get", "update"),
811
+					Resources: sets.NewString("replicationcontrollers"),
812
+				},
813
+				{
814
+					APIGroups: []string{},
815
+					Verbs:     sets.NewString("get", "update"),
816
+					Resources: sets.NewString("deploymentconfigs"),
817
+				},
818
+			},
819
+		},
820
+	)
821
+	if err != nil {
822
+		panic(err)
823
+	}
824
+
825
+	err = InfraSAs.addServiceAccount(
777 826
 		ServiceServingCertServiceAccountName,
778 827
 		authorizationapi.ClusterRole{
779 828
 			ObjectMeta: kapi.ObjectMeta{
... ...
@@ -924,6 +924,15 @@ func (c *MasterConfig) OriginNamespaceControllerClients() (*osclient.Client, *kc
924 924
 	return c.PrivilegedLoopbackOpenShiftClient, c.PrivilegedLoopbackKubernetesClient
925 925
 }
926 926
 
927
+// UnidlingControllerClients returns the unidling controller clients
928
+func (c *MasterConfig) UnidlingControllerClients() (*osclient.Client, *kclient.Client) {
929
+	_, osClient, kClient, err := c.GetServiceAccountClients(bootstrappolicy.InfraUnidlingControllerServiceAccountName)
930
+	if err != nil {
931
+		glog.Fatal(err)
932
+	}
933
+	return osClient, kClient
934
+}
935
+
927 936
 // NewEtcdStorage returns a storage interface for the provided storage version.
928 937
 func NewEtcdStorage(client newetcdclient.Client, version unversioned.GroupVersion, prefix string) (oshelper storage.Interface, err error) {
929 938
 	return etcdstorage.NewEtcdStorage(client, kapi.Codecs.LegacyCodec(version), prefix, false, genericapiserveroptions.DefaultDeserializationCacheSize), nil
... ...
@@ -9,6 +9,7 @@ import (
9 9
 
10 10
 	"github.com/golang/glog"
11 11
 
12
+	deployclient "github.com/openshift/origin/pkg/deploy/client/clientset_generated/internalclientset/typed/core/unversioned"
12 13
 	kctrlmgr "k8s.io/kubernetes/cmd/kube-controller-manager/app"
13 14
 	cmapp "k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
14 15
 	"k8s.io/kubernetes/pkg/admission"
... ...
@@ -30,6 +31,7 @@ import (
30 30
 	buildclient "github.com/openshift/origin/pkg/build/client"
31 31
 	buildcontrollerfactory "github.com/openshift/origin/pkg/build/controller/factory"
32 32
 	buildstrategy "github.com/openshift/origin/pkg/build/controller/strategy"
33
+	osclient "github.com/openshift/origin/pkg/client"
33 34
 	cmdadmission "github.com/openshift/origin/pkg/cmd/server/admission"
34 35
 	"github.com/openshift/origin/pkg/cmd/server/crypto"
35 36
 	cmdutil "github.com/openshift/origin/pkg/cmd/util"
... ...
@@ -48,6 +50,7 @@ import (
48 48
 	"github.com/openshift/origin/pkg/security/uidallocator"
49 49
 	"github.com/openshift/origin/pkg/service/controller/ingressip"
50 50
 	servingcertcontroller "github.com/openshift/origin/pkg/service/controller/servingcert"
51
+	unidlingcontroller "github.com/openshift/origin/pkg/unidling/controller"
51 52
 
52 53
 	configapi "github.com/openshift/origin/pkg/cmd/server/api"
53 54
 	"github.com/openshift/origin/pkg/cmd/server/bootstrappolicy"
... ...
@@ -540,3 +543,15 @@ func (c *MasterConfig) RunIngressIPController(client *kclient.Client) {
540 540
 	ingressIPController := ingressip.NewIngressIPController(client, ipNet, defaultIngressIPSyncPeriod)
541 541
 	go ingressIPController.Run(utilwait.NeverStop)
542 542
 }
543
+
544
+// RunUnidlingController starts the unidling controller
545
+func (c *MasterConfig) RunUnidlingController() {
546
+	oc, kc := c.UnidlingControllerClients()
547
+	resyncPeriod := 2 * time.Hour
548
+	scaleNamespacer := osclient.NewDelegatingScaleNamespacer(oc, kc)
549
+	coreClient := clientadapter.FromUnversionedClient(kc).Core()
550
+	dcCoreClient := deployclient.New(oc.RESTClient)
551
+	cont := unidlingcontroller.NewUnidlingController(scaleNamespacer, coreClient, coreClient, dcCoreClient, coreClient, resyncPeriod)
552
+
553
+	cont.Run(utilwait.NeverStop)
554
+}
... ...
@@ -674,6 +674,7 @@ func startControllers(oc *origin.MasterConfig, kc *kubernetes.MasterConfig) erro
674 674
 		glog.Fatalf("Could not get client: %v", err)
675 675
 	}
676 676
 	oc.RunServiceServingCertController(serviceServingCertClient)
677
+	oc.RunUnidlingController()
677 678
 
678 679
 	_, _, ingressIPClient, err := oc.GetServiceAccountClients(bootstrappolicy.InfraServiceIngressIPControllerServiceAccountName)
679 680
 	if err != nil {
... ...
@@ -12,3 +12,28 @@ const (
12 12
 	// NeedPodsReason is the reason for the event emitted to indicate that endpoints should be unidled
13 13
 	NeedPodsReason = "NeedPods"
14 14
 )
15
+
16
+// NB: if these get changed, you'll need to actually add in the full API machinery for them
17
+
18
+// RecordedScaleReference is a CrossGroupObjectReference to a scale subresource that also
19
+// has the previous replica count recorded
20
+type RecordedScaleReference struct {
21
+	// Reference to the idled resource
22
+	CrossGroupObjectReference `json:",inline" protobuf:"bytes,1,opt,name=crossVersionObjectReference"`
23
+	// The last seen scale of the idled resource (before idling)
24
+	Replicas int32 `json:"replicas" protobuf:"varint,2,opt,name=replicas"`
25
+}
26
+
27
+// CrossGroupObjectReference is a reference to an object in the same
28
+// namespace in the specified group.  It is similar to
29
+// autoscaling.CrossVersionObjectReference.
30
+type CrossGroupObjectReference struct {
31
+	// Kind of the referent; More info: http://releases.k8s.io/release-1.3/docs/devel/api-conventions.md#types-kinds"
32
+	Kind string `json:"kind" protobuf:"bytes,1,opt,name=kind"`
33
+	// Name of the referent; More info: http://releases.k8s.io/release-1.3/docs/user-guide/identifiers.md#names
34
+	Name string `json:"name" protobuf:"bytes,2,opt,name=name"`
35
+	// API version of the referent (deprecated, prefer usng Group instead)
36
+	APIVersion string `json:"apiVersion,omitempty" protobuf:"bytes,3,opt,name=apiVersion"`
37
+	// Group of the referent
38
+	Group string `json:"group,omitempty" protobuf:"bytes,3,opt,name=group"`
39
+}
15 40
new file mode 100644
... ...
@@ -0,0 +1,339 @@
0
+package controller
1
+
2
+import (
3
+	"encoding/json"
4
+	"fmt"
5
+	"sync"
6
+	"time"
7
+
8
+	unidlingapi "github.com/openshift/origin/pkg/unidling/api"
9
+	unidlingutil "github.com/openshift/origin/pkg/unidling/util"
10
+
11
+	deployclient "github.com/openshift/origin/pkg/deploy/client/clientset_generated/internalclientset/typed/core/unversioned"
12
+	kapi "k8s.io/kubernetes/pkg/api"
13
+	"k8s.io/kubernetes/pkg/api/errors"
14
+	kextapi "k8s.io/kubernetes/pkg/apis/extensions"
15
+	"k8s.io/kubernetes/pkg/client/cache"
16
+	kclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
17
+	kextclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned"
18
+	"k8s.io/kubernetes/pkg/controller/framework"
19
+	"k8s.io/kubernetes/pkg/fields"
20
+	"k8s.io/kubernetes/pkg/runtime"
21
+	"k8s.io/kubernetes/pkg/types"
22
+	utilruntime "k8s.io/kubernetes/pkg/util/runtime"
23
+	"k8s.io/kubernetes/pkg/util/wait"
24
+	"k8s.io/kubernetes/pkg/util/workqueue"
25
+	"k8s.io/kubernetes/pkg/watch"
26
+
27
+	"github.com/golang/glog"
28
+)
29
+
30
+const MaxRetries = 5
31
+
32
+type lastFiredCache struct {
33
+	sync.RWMutex
34
+	items map[types.NamespacedName]time.Time
35
+}
36
+
37
+func (c *lastFiredCache) Get(info types.NamespacedName) time.Time {
38
+	c.RLock()
39
+	defer c.RUnlock()
40
+
41
+	return c.items[info]
42
+}
43
+
44
+func (c *lastFiredCache) Clear(info types.NamespacedName) {
45
+	c.Lock()
46
+	defer c.Unlock()
47
+
48
+	delete(c.items, info)
49
+}
50
+
51
+func (c *lastFiredCache) AddIfNewer(info types.NamespacedName, newLastFired time.Time) bool {
52
+	c.Lock()
53
+	defer c.Unlock()
54
+
55
+	if lastFired, hasLastFired := c.items[info]; !hasLastFired || lastFired.Before(newLastFired) {
56
+		c.items[info] = newLastFired
57
+		return true
58
+	}
59
+
60
+	return false
61
+}
62
+
63
+type UnidlingController struct {
64
+	controller          *framework.Controller
65
+	scaleNamespacer     kextclient.ScalesGetter
66
+	endpointsNamespacer kclient.EndpointsGetter
67
+	queue               workqueue.RateLimitingInterface
68
+	lastFiredCache      *lastFiredCache
69
+
70
+	// TODO: remove these once we get the scale-source functionality in the scale endpoints
71
+	dcNamespacer deployclient.DeploymentConfigsGetter
72
+	rcNamespacer kclient.ReplicationControllersGetter
73
+}
74
+
75
+func NewUnidlingController(scaleNS kextclient.ScalesGetter, endptsNS kclient.EndpointsGetter, evtNS kclient.EventsGetter, dcNamespacer deployclient.DeploymentConfigsGetter, rcNamespacer kclient.ReplicationControllersGetter, resyncPeriod time.Duration) *UnidlingController {
76
+	fieldSet := fields.Set{}
77
+	fieldSet["reason"] = unidlingapi.NeedPodsReason
78
+	fieldSelector := fieldSet.AsSelector()
79
+
80
+	unidlingController := &UnidlingController{
81
+		scaleNamespacer:     scaleNS,
82
+		endpointsNamespacer: endptsNS,
83
+		queue:               workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
84
+		lastFiredCache: &lastFiredCache{
85
+			items: make(map[types.NamespacedName]time.Time),
86
+		},
87
+
88
+		dcNamespacer: dcNamespacer,
89
+		rcNamespacer: rcNamespacer,
90
+	}
91
+
92
+	_, controller := framework.NewInformer(
93
+		&cache.ListWatch{
94
+			// No need to list -- we only care about new events
95
+			ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
96
+				return &kapi.EventList{}, nil
97
+			},
98
+			WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
99
+				options.FieldSelector = fieldSelector
100
+				return evtNS.Events(kapi.NamespaceAll).Watch(options)
101
+			},
102
+		},
103
+		&kapi.Event{},
104
+		resyncPeriod,
105
+		framework.ResourceEventHandlerFuncs{
106
+			AddFunc: func(obj interface{}) {
107
+				event := obj.(*kapi.Event)
108
+				unidlingController.enqueueEvent(event)
109
+			},
110
+			UpdateFunc: func(oldObj interface{}, newObj interface{}) {
111
+				// retrigger on new last-seen times
112
+				event := newObj.(*kapi.Event)
113
+				unidlingController.enqueueEvent(event)
114
+			},
115
+			DeleteFunc: func(obj interface{}) {
116
+				// this is just to clean up our cache of the last seen times
117
+				event := obj.(*kapi.Event)
118
+				unidlingController.clearEventFromCache(event)
119
+			},
120
+		},
121
+	)
122
+
123
+	unidlingController.controller = controller
124
+
125
+	return unidlingController
126
+}
127
+
128
+// clearEventFromCache removes the entry for the given event from the lastFiredCache.
129
+func (c *UnidlingController) clearEventFromCache(event *kapi.Event) {
130
+	if event.Reason != unidlingapi.NeedPodsReason {
131
+		return
132
+	}
133
+
134
+	info := types.NamespacedName{
135
+		Namespace: event.InvolvedObject.Namespace,
136
+		Name:      event.InvolvedObject.Name,
137
+	}
138
+	c.lastFiredCache.Clear(info)
139
+}
140
+
141
+// equeueEvent checks if the given event is relevant (i.e. if it's a NeedPods event),
142
+// and, if so, extracts relevant information, and enqueues that information in the
143
+// processing queue.
144
+func (c *UnidlingController) enqueueEvent(event *kapi.Event) {
145
+	if event.Reason != unidlingapi.NeedPodsReason {
146
+		return
147
+	}
148
+
149
+	info := types.NamespacedName{
150
+		Namespace: event.InvolvedObject.Namespace,
151
+		Name:      event.InvolvedObject.Name,
152
+	}
153
+
154
+	// only add things to the queue if they're newer than what we already have
155
+	if c.lastFiredCache.AddIfNewer(info, event.LastTimestamp.Time) {
156
+		c.queue.Add(info)
157
+	}
158
+}
159
+
160
+func (c *UnidlingController) Run(stopCh <-chan struct{}) {
161
+	defer utilruntime.HandleCrash()
162
+	go c.controller.Run(stopCh)
163
+	go wait.Until(c.processRequests, time.Second, stopCh)
164
+}
165
+
166
+// processRequests calls awaitRequest repeatedly, until told to stop by
167
+// the return value of awaitRequest.
168
+func (c *UnidlingController) processRequests() {
169
+	for {
170
+		if !c.awaitRequest() {
171
+			return
172
+		}
173
+	}
174
+}
175
+
176
+// awaitRequest awaits a new request on the queue, and sends it off for processing.
177
+// If more requests on the queue should be processed, it returns true.  If we should
178
+// stop processing, it returns false.
179
+func (c *UnidlingController) awaitRequest() bool {
180
+	infoRaw, stop := c.queue.Get()
181
+	if stop {
182
+		return false
183
+	}
184
+
185
+	defer c.queue.Done(infoRaw)
186
+
187
+	info := infoRaw.(types.NamespacedName)
188
+	lastFired := c.lastFiredCache.Get(info)
189
+
190
+	var retry bool
191
+	var err error
192
+	if retry, err = c.handleRequest(info, lastFired); err == nil {
193
+		// if there was no error, we succeeded in the unidling, and we need to
194
+		// tell the rate limitter to stop tracking this request
195
+		c.queue.Forget(infoRaw)
196
+		return true
197
+	}
198
+
199
+	// check to see if we think the error was transient (e.g. server error on the update request),
200
+	// and if not, do not retry
201
+	if !retry {
202
+		utilruntime.HandleError(fmt.Errorf("Unable to process unidling event for %s/%s at (%s), will not retry: %v", info.Namespace, info.Name, lastFired, err))
203
+		return true
204
+	}
205
+
206
+	// Otherwise, if we have an error, we were at least partially unsucessful in unidling, so
207
+	// we requeue the event to process later
208
+
209
+	// don't try to process failing requests forever
210
+	if c.queue.NumRequeues(infoRaw) > MaxRetries {
211
+		utilruntime.HandleError(fmt.Errorf("Unable to process unidling event for %s/%s (at %s), will not retry again: %v", info.Namespace, info.Name, lastFired, err))
212
+		c.queue.Forget(infoRaw)
213
+		return true
214
+	}
215
+
216
+	glog.V(4).Infof("Unable to fully process unidling request for %s/%s (at %s), will retry: %v", info.Namespace, info.Name, lastFired, err)
217
+	c.queue.AddRateLimited(infoRaw)
218
+	return true
219
+}
220
+
221
+// handleRequest handles a single request to unidle.  After checking the validity of the request,
222
+// it will examine the endpoints in question to determine which scalables to scale, and will scale
223
+// them and remove them from the endpoints' list of idled scalables.  If it is unable to properly
224
+// process the request, it will return a boolean indicating whether or not we should retry later,
225
+// as well as an error (e.g. if we're unable to parse an annotation, retrying later won't help,
226
+// so it will return false).
227
+func (c *UnidlingController) handleRequest(info types.NamespacedName, lastFired time.Time) (bool, error) {
228
+	// fetch the endpoints associated with the service in question
229
+	targetEndpoints, err := c.endpointsNamespacer.Endpoints(info.Namespace).Get(info.Name)
230
+	if err != nil {
231
+		return true, fmt.Errorf("unable to retrieve endpoints: %v", err)
232
+	}
233
+
234
+	// make sure we actually were idled...
235
+	idledTimeRaw, wasIdled := targetEndpoints.Annotations[unidlingapi.IdledAtAnnotation]
236
+	if !wasIdled {
237
+		glog.V(5).Infof("UnidlingController received a NeedPods event for a service that was not idled, ignoring")
238
+		return false, nil
239
+	}
240
+
241
+	// ...and make sure this request was to wake up from the most recent idling, and not a previous one
242
+	idledTime, err := time.Parse(time.RFC3339, idledTimeRaw)
243
+	if err != nil {
244
+		// retrying here won't help, we're just stuck as idle since we can't get parse the idled time
245
+		return false, fmt.Errorf("unable to check idled-at time: %v", err)
246
+	}
247
+	if lastFired.Before(idledTime) {
248
+		glog.V(5).Infof("UnidlingController received an out-of-date NeedPods event, ignoring")
249
+		return false, nil
250
+	}
251
+
252
+	// TODO: ew, this is unversioned.  Such is life when working with annotations.
253
+	var targetScalables []unidlingapi.RecordedScaleReference
254
+	if targetScalablesStr, hasTargetScalables := targetEndpoints.Annotations[unidlingapi.UnidleTargetAnnotation]; hasTargetScalables {
255
+		if err = json.Unmarshal([]byte(targetScalablesStr), &targetScalables); err != nil {
256
+			// retrying here won't help, we're just stuck as idled since we can't parse the idled scalables list
257
+			return false, fmt.Errorf("unable to unmarshal target scalable references: %v", err)
258
+		}
259
+	} else {
260
+		glog.V(4).Infof("Service %s/%s had no scalables to unidle", info.Namespace, info.Name)
261
+		targetScalables = []unidlingapi.RecordedScaleReference{}
262
+	}
263
+
264
+	targetScalablesSet := make(map[unidlingapi.RecordedScaleReference]struct{}, len(targetScalables))
265
+	for _, v := range targetScalables {
266
+		targetScalablesSet[v] = struct{}{}
267
+	}
268
+
269
+	deleteIdledAtAnnotation := func(annotations map[string]string) {
270
+		delete(annotations, unidlingapi.IdledAtAnnotation)
271
+	}
272
+
273
+	scaleAnnotater := unidlingutil.NewScaleAnnotater(c.scaleNamespacer, c.dcNamespacer, c.rcNamespacer, deleteIdledAtAnnotation)
274
+
275
+	for _, scalableRef := range targetScalables {
276
+		var scale *kextapi.Scale
277
+		var obj runtime.Object
278
+
279
+		obj, scale, err = scaleAnnotater.GetObjectWithScale(info.Namespace, scalableRef.CrossGroupObjectReference)
280
+		if err != nil {
281
+			if errors.IsNotFound(err) {
282
+				utilruntime.HandleError(fmt.Errorf("%s %q does not exist, removing from list of scalables while unidling service %s/%s: %v", scalableRef.Kind, scalableRef.Name, info.Namespace, info.Name, err))
283
+				delete(targetScalablesSet, scalableRef)
284
+			} else {
285
+				utilruntime.HandleError(fmt.Errorf("Unable to get scale for %s %q while unidling service %s/%s, will try again later: %v", scalableRef.Kind, scalableRef.Name, info.Namespace, info.Name, err))
286
+			}
287
+			continue
288
+		}
289
+
290
+		if scale.Spec.Replicas > 0 {
291
+			glog.V(4).Infof("%s %q is not idle, skipping while unidling service %s/%s", scalableRef.Kind, scalableRef.Name, info.Namespace, info.Name)
292
+			continue
293
+		}
294
+
295
+		scale.Spec.Replicas = scalableRef.Replicas
296
+
297
+		if err = scaleAnnotater.UpdateObjectScale(info.Namespace, scalableRef.CrossGroupObjectReference, obj, scale); err != nil {
298
+			if errors.IsNotFound(err) {
299
+				utilruntime.HandleError(fmt.Errorf("%s %q does not exist, removing from list of scalables while unidling service %s/%s: %v", scalableRef.Kind, scalableRef.Name, info.Namespace, info.Name, err))
300
+				delete(targetScalablesSet, scalableRef)
301
+			} else {
302
+				utilruntime.HandleError(fmt.Errorf("Unable to scale up %s %q while unidling service %s/%s: %v", scalableRef.Kind, scalableRef.Name, info.Namespace, info.Name, err))
303
+			}
304
+			continue
305
+		} else {
306
+			glog.V(4).Infof("Scaled up %s %q while unidling service %s/%s", scalableRef.Kind, scalableRef.Name, info.Namespace, info.Name)
307
+		}
308
+
309
+		delete(targetScalablesSet, scalableRef)
310
+	}
311
+
312
+	newAnnotationList := make([]unidlingapi.RecordedScaleReference, 0, len(targetScalablesSet))
313
+	for k := range targetScalablesSet {
314
+		newAnnotationList = append(newAnnotationList, k)
315
+	}
316
+
317
+	if len(newAnnotationList) == 0 {
318
+		delete(targetEndpoints.Annotations, unidlingapi.UnidleTargetAnnotation)
319
+		delete(targetEndpoints.Annotations, unidlingapi.IdledAtAnnotation)
320
+	} else {
321
+		var newAnnotationBytes []byte
322
+		newAnnotationBytes, err = json.Marshal(newAnnotationList)
323
+		if err != nil {
324
+			utilruntime.HandleError(fmt.Errorf("unable to update/remove idle annotations from %s/%s: unable to marshal list of remaining scalables, removing list entirely: %v", info.Namespace, info.Name, err))
325
+
326
+			delete(targetEndpoints.Annotations, unidlingapi.UnidleTargetAnnotation)
327
+			delete(targetEndpoints.Annotations, unidlingapi.IdledAtAnnotation)
328
+		} else {
329
+			targetEndpoints.Annotations[unidlingapi.UnidleTargetAnnotation] = string(newAnnotationBytes)
330
+		}
331
+	}
332
+
333
+	if _, err = c.endpointsNamespacer.Endpoints(info.Namespace).Update(targetEndpoints); err != nil {
334
+		return true, fmt.Errorf("unable to update/remove idle annotations from %s/%s: %v", info.Namespace, info.Name, err)
335
+	}
336
+
337
+	return false, nil
338
+}
0 339
new file mode 100644
... ...
@@ -0,0 +1,671 @@
0
+package controller
1
+
2
+import (
3
+	"encoding/json"
4
+	"fmt"
5
+	"testing"
6
+	"time"
7
+
8
+	unidlingapi "github.com/openshift/origin/pkg/unidling/api"
9
+
10
+	deployapi "github.com/openshift/origin/pkg/deploy/api"
11
+	deployfake "github.com/openshift/origin/pkg/deploy/client/clientset_generated/internalclientset/fake"
12
+	kapi "k8s.io/kubernetes/pkg/api"
13
+	"k8s.io/kubernetes/pkg/api/errors"
14
+	kunversioned "k8s.io/kubernetes/pkg/api/unversioned"
15
+	kextapi "k8s.io/kubernetes/pkg/apis/extensions"
16
+	kfake "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
17
+	ktestingcore "k8s.io/kubernetes/pkg/client/testing/core"
18
+	"k8s.io/kubernetes/pkg/runtime"
19
+	"k8s.io/kubernetes/pkg/types"
20
+)
21
+
22
+type fakeResults struct {
23
+	resMap       map[unidlingapi.CrossGroupObjectReference]kextapi.Scale
24
+	resEndpoints *kapi.Endpoints
25
+}
26
+
27
+func prepFakeClient(t *testing.T, nowTime time.Time, scales ...kextapi.Scale) (*kfake.Clientset, *deployfake.Clientset, *fakeResults) {
28
+	fakeClient := &kfake.Clientset{}
29
+	fakeDeployClient := &deployfake.Clientset{}
30
+
31
+	nowTimeStr := nowTime.Format(time.RFC3339)
32
+
33
+	targets := make([]unidlingapi.RecordedScaleReference, len(scales))
34
+	for i, scale := range scales {
35
+		targets[i] = unidlingapi.RecordedScaleReference{
36
+			CrossGroupObjectReference: unidlingapi.CrossGroupObjectReference{
37
+				Name: scale.Name,
38
+				Kind: scale.Kind,
39
+			},
40
+			Replicas: 2,
41
+		}
42
+	}
43
+	targetsAnnotation, err := json.Marshal(targets)
44
+	if err != nil {
45
+		t.Fatalf("unexpected error: %v", err)
46
+	}
47
+
48
+	endpointsObj := kapi.Endpoints{
49
+		ObjectMeta: kapi.ObjectMeta{
50
+			Name: "somesvc",
51
+			Annotations: map[string]string{
52
+				unidlingapi.IdledAtAnnotation:      nowTimeStr,
53
+				unidlingapi.UnidleTargetAnnotation: string(targetsAnnotation),
54
+			},
55
+		},
56
+	}
57
+	fakeClient.PrependReactor("get", "endpoints", func(action ktestingcore.Action) (bool, runtime.Object, error) {
58
+		if action.(ktestingcore.GetAction).GetName() == endpointsObj.Name {
59
+			return true, &endpointsObj, nil
60
+		}
61
+
62
+		return false, nil, nil
63
+	})
64
+
65
+	fakeDeployClient.PrependReactor("get", "deploymentconfigs", func(action ktestingcore.Action) (bool, runtime.Object, error) {
66
+		objName := action.(ktestingcore.GetAction).GetName()
67
+		for _, scale := range scales {
68
+			if scale.Kind == "DeploymentConfig" && objName == scale.Name {
69
+				return true, &deployapi.DeploymentConfig{
70
+					ObjectMeta: kapi.ObjectMeta{
71
+						Name: objName,
72
+					},
73
+					Spec: deployapi.DeploymentConfigSpec{
74
+						Replicas: scale.Spec.Replicas,
75
+					},
76
+				}, nil
77
+			}
78
+		}
79
+
80
+		return true, nil, errors.NewNotFound(action.GetResource().GroupResource(), objName)
81
+	})
82
+
83
+	fakeClient.PrependReactor("get", "replicationcontrollers", func(action ktestingcore.Action) (bool, runtime.Object, error) {
84
+		objName := action.(ktestingcore.GetAction).GetName()
85
+		for _, scale := range scales {
86
+			if scale.Kind == "ReplicationController" && objName == scale.Name {
87
+				return true, &kapi.ReplicationController{
88
+					ObjectMeta: kapi.ObjectMeta{
89
+						Name: objName,
90
+					},
91
+					Spec: kapi.ReplicationControllerSpec{
92
+						Replicas: scale.Spec.Replicas,
93
+					},
94
+				}, nil
95
+			}
96
+		}
97
+
98
+		return true, nil, errors.NewNotFound(action.GetResource().GroupResource(), objName)
99
+	})
100
+
101
+	res := &fakeResults{
102
+		resMap: make(map[unidlingapi.CrossGroupObjectReference]kextapi.Scale),
103
+	}
104
+
105
+	fakeDeployClient.PrependReactor("update", "deploymentconfigs", func(action ktestingcore.Action) (bool, runtime.Object, error) {
106
+		obj := action.(ktestingcore.UpdateAction).GetObject().(*deployapi.DeploymentConfig)
107
+		for _, scale := range scales {
108
+			if scale.Kind == "DeploymentConfig" && obj.Name == scale.Name {
109
+				newScale := scale
110
+				newScale.Spec.Replicas = obj.Spec.Replicas
111
+				res.resMap[unidlingapi.CrossGroupObjectReference{Name: obj.Name, Kind: "DeploymentConfig"}] = newScale
112
+				return true, &deployapi.DeploymentConfig{}, nil
113
+			}
114
+		}
115
+
116
+		return true, nil, errors.NewNotFound(action.GetResource().GroupResource(), obj.Name)
117
+	})
118
+
119
+	fakeClient.PrependReactor("update", "replicationcontrollers", func(action ktestingcore.Action) (bool, runtime.Object, error) {
120
+		obj := action.(ktestingcore.UpdateAction).GetObject().(*kapi.ReplicationController)
121
+		for _, scale := range scales {
122
+			if scale.Kind == "ReplicationController" && obj.Name == scale.Name {
123
+				newScale := scale
124
+				newScale.Spec.Replicas = obj.Spec.Replicas
125
+				res.resMap[unidlingapi.CrossGroupObjectReference{Name: obj.Name, Kind: "ReplicationController"}] = newScale
126
+				return true, &kapi.ReplicationController{}, nil
127
+			}
128
+		}
129
+
130
+		return true, nil, errors.NewNotFound(action.GetResource().GroupResource(), obj.Name)
131
+	})
132
+
133
+	fakeClient.AddReactor("*", "endpoints", func(action ktestingcore.Action) (bool, runtime.Object, error) {
134
+		obj := action.(ktestingcore.UpdateAction).GetObject().(*kapi.Endpoints)
135
+		if obj.Name != endpointsObj.Name {
136
+			return false, nil, nil
137
+		}
138
+
139
+		res.resEndpoints = obj
140
+
141
+		return true, obj, nil
142
+	})
143
+
144
+	return fakeClient, fakeDeployClient, res
145
+}
146
+
147
+func TestControllerHandlesStaleEvents(t *testing.T) {
148
+	nowTime := time.Now().Truncate(time.Second)
149
+	fakeClient, fakeDeployClient, res := prepFakeClient(t, nowTime)
150
+	controller := &UnidlingController{
151
+		scaleNamespacer:     fakeClient.Extensions(),
152
+		endpointsNamespacer: fakeClient.Core(),
153
+		rcNamespacer:        fakeClient.Core(),
154
+		dcNamespacer:        fakeDeployClient.Core(),
155
+	}
156
+
157
+	retry, err := controller.handleRequest(types.NamespacedName{
158
+		Namespace: "somens",
159
+		Name:      "somesvc",
160
+	}, nowTime.Add(-10*time.Second))
161
+
162
+	if err != nil {
163
+		t.Fatalf("Unable to unidle: unexpected error (retry: %v): %v", retry, err)
164
+	}
165
+
166
+	if len(res.resMap) != 0 {
167
+		t.Errorf("Did not expect to have anything scaled, but got %v", res.resMap)
168
+	}
169
+
170
+	if res.resEndpoints != nil {
171
+		t.Errorf("Did not expect to have endpoints object updated, but got %v", res.resEndpoints)
172
+	}
173
+}
174
+
175
+func TestControllerIgnoresAlreadyScaledObjects(t *testing.T) {
176
+	// truncate to avoid conversion comparison issues
177
+	nowTime := time.Now().Truncate(time.Second)
178
+	baseScales := []kextapi.Scale{
179
+		{
180
+			ObjectMeta: kapi.ObjectMeta{
181
+				Name: "somerc",
182
+			},
183
+			TypeMeta: kunversioned.TypeMeta{
184
+				Kind: "ReplicationController",
185
+			},
186
+			Spec: kextapi.ScaleSpec{
187
+				Replicas: 0,
188
+			},
189
+		},
190
+		{
191
+			ObjectMeta: kapi.ObjectMeta{
192
+				Name: "somedc",
193
+			},
194
+			TypeMeta: kunversioned.TypeMeta{
195
+				Kind: "DeploymentConfig",
196
+			},
197
+			Spec: kextapi.ScaleSpec{
198
+				Replicas: 5,
199
+			},
200
+		},
201
+	}
202
+
203
+	idledTime := nowTime.Add(-10 * time.Second)
204
+	fakeClient, fakeDeployClient, res := prepFakeClient(t, idledTime, baseScales...)
205
+
206
+	controller := &UnidlingController{
207
+		scaleNamespacer:     fakeClient.Extensions(),
208
+		endpointsNamespacer: fakeClient.Core(),
209
+		rcNamespacer:        fakeClient.Core(),
210
+		dcNamespacer:        fakeDeployClient.Core(),
211
+	}
212
+
213
+	retry, err := controller.handleRequest(types.NamespacedName{
214
+		Namespace: "somens",
215
+		Name:      "somesvc",
216
+	}, nowTime)
217
+
218
+	if err != nil {
219
+		t.Fatalf("Unable to unidle: unexpected error (retry: %v): %v", retry, err)
220
+	}
221
+
222
+	if len(res.resMap) != 1 {
223
+		t.Errorf("Incorrect unidling results: got %v, expected to end up with 1 objects scaled to 1", res.resMap)
224
+	}
225
+
226
+	stillPresent := make(map[unidlingapi.CrossGroupObjectReference]struct{})
227
+
228
+	for _, scale := range baseScales {
229
+		scaleRef := unidlingapi.CrossGroupObjectReference{Kind: scale.Kind, Name: scale.Name}
230
+		resScale, ok := res.resMap[scaleRef]
231
+		if scale.Spec.Replicas != 0 {
232
+			stillPresent[scaleRef] = struct{}{}
233
+			if ok {
234
+				t.Errorf("Expected to %s %q to not have been scaled, but it was scaled to %v", scale.Kind, scale.Name, resScale.Spec.Replicas)
235
+			}
236
+			continue
237
+		} else if !ok {
238
+			t.Errorf("Expected to %s %q to have been scaled, but it was not", scale.Kind, scale.Name)
239
+			continue
240
+		}
241
+
242
+		if resScale.Spec.Replicas != 2 {
243
+			t.Errorf("Expected %s %q to have been scaled to 2, but it was scaled to %v", scale.Kind, scale.Name, resScale.Spec.Replicas)
244
+		}
245
+	}
246
+
247
+	if res.resEndpoints == nil {
248
+		t.Fatalf("Expected endpoints object to be updated, but it was not")
249
+	}
250
+
251
+	resTargetsRaw, hadTargets := res.resEndpoints.Annotations[unidlingapi.UnidleTargetAnnotation]
252
+	resIdledTimeRaw, hadIdledTime := res.resEndpoints.Annotations[unidlingapi.IdledAtAnnotation]
253
+
254
+	if !hadTargets {
255
+		t.Errorf("Expected targets annotation to still be present, but it was not")
256
+	}
257
+	var resTargets []unidlingapi.RecordedScaleReference
258
+	if err = json.Unmarshal([]byte(resTargetsRaw), &resTargets); err != nil {
259
+		t.Fatalf("Unexpected error: %v", err)
260
+	}
261
+	if len(resTargets) != len(stillPresent) {
262
+		t.Errorf("Expected the new target list to contain the unscaled scalables only, but it was %v", resTargets)
263
+	}
264
+	for _, target := range resTargets {
265
+		if _, ok := stillPresent[target.CrossGroupObjectReference]; !ok {
266
+			t.Errorf("Expected new target list to contain the unscaled scalables only, but it was %v", resTargets)
267
+		}
268
+	}
269
+
270
+	if !hadIdledTime {
271
+		t.Errorf("Expected idled-at annotation to still be present, but it was not")
272
+	}
273
+	resIdledTime, err := time.Parse(time.RFC3339, resIdledTimeRaw)
274
+	if err != nil {
275
+		t.Fatalf("Unexpected error: %v", err)
276
+	}
277
+	if !resIdledTime.Equal(idledTime) {
278
+		t.Errorf("Expected output idled time annotation to be %s, but was changed to %s", idledTime, resIdledTime)
279
+	}
280
+}
281
+
282
+func TestControllerUnidlesProperly(t *testing.T) {
283
+	nowTime := time.Now().Truncate(time.Second)
284
+	baseScales := []kextapi.Scale{
285
+		{
286
+			ObjectMeta: kapi.ObjectMeta{
287
+				Name: "somerc",
288
+			},
289
+			TypeMeta: kunversioned.TypeMeta{
290
+				Kind: "ReplicationController",
291
+			},
292
+			Spec: kextapi.ScaleSpec{
293
+				Replicas: 0,
294
+			},
295
+		},
296
+		{
297
+			ObjectMeta: kapi.ObjectMeta{
298
+				Name: "somedc",
299
+			},
300
+			TypeMeta: kunversioned.TypeMeta{
301
+				Kind: "DeploymentConfig",
302
+			},
303
+			Spec: kextapi.ScaleSpec{
304
+				Replicas: 0,
305
+			},
306
+		},
307
+	}
308
+
309
+	fakeClient, fakeDeployClient, res := prepFakeClient(t, nowTime.Add(-10*time.Second), baseScales...)
310
+
311
+	controller := &UnidlingController{
312
+		scaleNamespacer:     fakeClient.Extensions(),
313
+		endpointsNamespacer: fakeClient.Core(),
314
+		rcNamespacer:        fakeClient.Core(),
315
+		dcNamespacer:        fakeDeployClient.Core(),
316
+	}
317
+
318
+	retry, err := controller.handleRequest(types.NamespacedName{
319
+		Namespace: "somens",
320
+		Name:      "somesvc",
321
+	}, nowTime)
322
+
323
+	if err != nil {
324
+		t.Fatalf("Unable to unidle: unexpected error (retry: %v): %v", retry, err)
325
+	}
326
+
327
+	if len(res.resMap) != len(baseScales) {
328
+		t.Errorf("Incorrect unidling results: got %v, expected to end up with %v objects scaled to 1", res.resMap, len(baseScales))
329
+	}
330
+
331
+	for _, scale := range baseScales {
332
+		resScale, ok := res.resMap[unidlingapi.CrossGroupObjectReference{Kind: scale.Kind, Name: scale.Name}]
333
+		if !ok {
334
+			t.Errorf("Expected to %s %q to have been scaled, but it was not", scale.Kind, scale.Name)
335
+			continue
336
+		}
337
+
338
+		if resScale.Spec.Replicas != 2 {
339
+			t.Errorf("Expected %s %q to have been scaled to 2, but it was scaled to %v", scale.Kind, scale.Name, resScale.Spec.Replicas)
340
+		}
341
+	}
342
+
343
+	if res.resEndpoints == nil {
344
+		t.Fatalf("Expected endpoints object to be updated, but it was not")
345
+	}
346
+
347
+	resTargets, hadTargets := res.resEndpoints.Annotations[unidlingapi.UnidleTargetAnnotation]
348
+	resIdledTime, hadIdledTime := res.resEndpoints.Annotations[unidlingapi.IdledAtAnnotation]
349
+
350
+	if hadTargets {
351
+		t.Errorf("Expected targets annotation to be removed, but it was %q", resTargets)
352
+	}
353
+
354
+	if hadIdledTime {
355
+		t.Errorf("Expected idled-at annotation to be removed, but it was %q", resIdledTime)
356
+	}
357
+}
358
+
359
+type failureTestInfo struct {
360
+	name                   string
361
+	endpointsGet           *kapi.Endpoints
362
+	scaleGets              []kextapi.Scale
363
+	scaleUpdatesNotFound   []bool
364
+	preventEndpointsUpdate bool
365
+
366
+	errorExpected       bool
367
+	retryExpected       bool
368
+	annotationsExpected map[string]string
369
+}
370
+
371
+func prepareFakeClientForFailureTest(test failureTestInfo) (*kfake.Clientset, *deployfake.Clientset) {
372
+	fakeClient := &kfake.Clientset{}
373
+	fakeDeployClient := &deployfake.Clientset{}
374
+
375
+	fakeClient.PrependReactor("get", "endpoints", func(action ktestingcore.Action) (bool, runtime.Object, error) {
376
+		objName := action.(ktestingcore.GetAction).GetName()
377
+		if test.endpointsGet != nil && objName == test.endpointsGet.Name {
378
+			return true, test.endpointsGet, nil
379
+		}
380
+
381
+		return true, nil, errors.NewNotFound(action.GetResource().GroupResource(), objName)
382
+	})
383
+
384
+	fakeDeployClient.PrependReactor("get", "deploymentconfigs", func(action ktestingcore.Action) (bool, runtime.Object, error) {
385
+		objName := action.(ktestingcore.GetAction).GetName()
386
+		for _, scale := range test.scaleGets {
387
+			if scale.Kind == "DeploymentConfig" && objName == scale.Name {
388
+				return true, &deployapi.DeploymentConfig{
389
+					ObjectMeta: kapi.ObjectMeta{
390
+						Name: objName,
391
+					},
392
+					Spec: deployapi.DeploymentConfigSpec{
393
+						Replicas: scale.Spec.Replicas,
394
+					},
395
+				}, nil
396
+			}
397
+		}
398
+
399
+		return true, nil, errors.NewNotFound(action.GetResource().GroupResource(), objName)
400
+	})
401
+
402
+	fakeClient.PrependReactor("get", "replicationcontrollers", func(action ktestingcore.Action) (bool, runtime.Object, error) {
403
+		objName := action.(ktestingcore.GetAction).GetName()
404
+		for _, scale := range test.scaleGets {
405
+			if scale.Kind == "ReplicationController" && objName == scale.Name {
406
+				return true, &kapi.ReplicationController{
407
+					ObjectMeta: kapi.ObjectMeta{
408
+						Name: objName,
409
+					},
410
+					Spec: kapi.ReplicationControllerSpec{
411
+						Replicas: scale.Spec.Replicas,
412
+					},
413
+				}, nil
414
+			}
415
+		}
416
+
417
+		return true, nil, errors.NewNotFound(action.GetResource().GroupResource(), objName)
418
+	})
419
+
420
+	fakeDeployClient.PrependReactor("update", "deploymentconfigs", func(action ktestingcore.Action) (bool, runtime.Object, error) {
421
+		obj := action.(ktestingcore.UpdateAction).GetObject().(*deployapi.DeploymentConfig)
422
+		for i, scale := range test.scaleGets {
423
+			if scale.Kind == "DeploymentConfig" && obj.Name == scale.Name {
424
+				if test.scaleUpdatesNotFound != nil && test.scaleUpdatesNotFound[i] {
425
+					return false, nil, nil
426
+				}
427
+
428
+				return true, &deployapi.DeploymentConfig{}, nil
429
+			}
430
+		}
431
+
432
+		return true, nil, errors.NewNotFound(action.GetResource().GroupResource(), obj.Name)
433
+	})
434
+
435
+	fakeClient.PrependReactor("update", "replicationcontrollers", func(action ktestingcore.Action) (bool, runtime.Object, error) {
436
+		obj := action.(ktestingcore.UpdateAction).GetObject().(*kapi.ReplicationController)
437
+		for i, scale := range test.scaleGets {
438
+			if scale.Kind == "ReplicationController" && obj.Name == scale.Name {
439
+				if test.scaleUpdatesNotFound != nil && test.scaleUpdatesNotFound[i] {
440
+					return false, nil, nil
441
+				}
442
+				return true, &kapi.ReplicationController{}, nil
443
+			}
444
+		}
445
+
446
+		return true, nil, errors.NewNotFound(action.GetResource().GroupResource(), obj.Name)
447
+	})
448
+
449
+	fakeClient.PrependReactor("update", "endpoints", func(action ktestingcore.Action) (bool, runtime.Object, error) {
450
+		obj := action.(ktestingcore.UpdateAction).GetObject().(*kapi.Endpoints)
451
+		if obj.Name != test.endpointsGet.Name {
452
+			return false, nil, nil
453
+		}
454
+
455
+		if test.preventEndpointsUpdate {
456
+			return true, nil, fmt.Errorf("some problem updating the endpoints")
457
+		}
458
+
459
+		return true, obj, nil
460
+	})
461
+
462
+	return fakeClient, fakeDeployClient
463
+}
464
+
465
+func TestControllerPerformsCorrectlyOnFailures(t *testing.T) {
466
+	nowTime := time.Now().Truncate(time.Second)
467
+
468
+	baseScalables := []unidlingapi.RecordedScaleReference{
469
+		{
470
+			CrossGroupObjectReference: unidlingapi.CrossGroupObjectReference{
471
+				Kind: "ReplicationController",
472
+				Name: "somerc",
473
+			},
474
+			Replicas: 2,
475
+		},
476
+		{
477
+			CrossGroupObjectReference: unidlingapi.CrossGroupObjectReference{
478
+				Kind: "DeploymentConfig",
479
+				Name: "somedc",
480
+			},
481
+			Replicas: 2,
482
+		},
483
+	}
484
+	baseScalablesBytes, err := json.Marshal(baseScalables)
485
+	if err != nil {
486
+		t.Fatalf("Unexpected error: %v", err)
487
+	}
488
+
489
+	outScalables := []unidlingapi.RecordedScaleReference{
490
+		{
491
+			CrossGroupObjectReference: unidlingapi.CrossGroupObjectReference{
492
+				Kind: "DeploymentConfig",
493
+				Name: "somedc",
494
+			},
495
+			Replicas: 2,
496
+		},
497
+	}
498
+	outScalablesBytes, err := json.Marshal(outScalables)
499
+	if err != nil {
500
+		t.Fatalf("Unexpected error: %v", err)
501
+	}
502
+
503
+	tests := []failureTestInfo{
504
+		{
505
+			name:          "retry on failed endpoints get",
506
+			endpointsGet:  nil,
507
+			errorExpected: true,
508
+			retryExpected: true,
509
+		},
510
+		{
511
+			name: "not retry on failure to parse time",
512
+			endpointsGet: &kapi.Endpoints{
513
+				ObjectMeta: kapi.ObjectMeta{
514
+					Name: "somesvc",
515
+					Annotations: map[string]string{
516
+						unidlingapi.IdledAtAnnotation: "cheddar",
517
+					},
518
+				},
519
+			},
520
+			errorExpected: true,
521
+			retryExpected: false,
522
+		},
523
+		{
524
+			name: "not retry on failure to unmarshal target scalables",
525
+			endpointsGet: &kapi.Endpoints{
526
+				ObjectMeta: kapi.ObjectMeta{
527
+					Name: "somesvc",
528
+					Annotations: map[string]string{
529
+						unidlingapi.IdledAtAnnotation:      nowTime.Format(time.RFC3339),
530
+						unidlingapi.UnidleTargetAnnotation: "pecorino romano",
531
+					},
532
+				},
533
+			},
534
+			errorExpected: true,
535
+			retryExpected: false,
536
+		},
537
+		{
538
+			name: "remove a scalable from the list if it cannot be found (while getting)",
539
+			endpointsGet: &kapi.Endpoints{
540
+				ObjectMeta: kapi.ObjectMeta{
541
+					Name: "somesvc",
542
+					Annotations: map[string]string{
543
+						unidlingapi.IdledAtAnnotation:      nowTime.Format(time.RFC3339),
544
+						unidlingapi.UnidleTargetAnnotation: string(baseScalablesBytes),
545
+					},
546
+				},
547
+			},
548
+			scaleGets: []kextapi.Scale{
549
+				{
550
+					TypeMeta: kunversioned.TypeMeta{
551
+						Kind: "DeploymentConfig",
552
+					},
553
+					ObjectMeta: kapi.ObjectMeta{
554
+						Name: "somedc",
555
+					},
556
+					Spec: kextapi.ScaleSpec{Replicas: 0},
557
+				},
558
+			},
559
+			errorExpected: false,
560
+			annotationsExpected: map[string]string{
561
+				unidlingapi.IdledAtAnnotation:      nowTime.Format(time.RFC3339),
562
+				unidlingapi.UnidleTargetAnnotation: string(outScalablesBytes),
563
+			},
564
+		},
565
+		{
566
+			name: "should remove a scalable from the list if it cannot be found (while updating)",
567
+			endpointsGet: &kapi.Endpoints{
568
+				ObjectMeta: kapi.ObjectMeta{
569
+					Name: "somesvc",
570
+					Annotations: map[string]string{
571
+						unidlingapi.IdledAtAnnotation:      nowTime.Format(time.RFC3339),
572
+						unidlingapi.UnidleTargetAnnotation: string(baseScalablesBytes),
573
+					},
574
+				},
575
+			},
576
+			scaleGets: []kextapi.Scale{
577
+				{
578
+					TypeMeta: kunversioned.TypeMeta{
579
+						Kind: "ReplicationController",
580
+					},
581
+					ObjectMeta: kapi.ObjectMeta{
582
+						Name: "somerc",
583
+					},
584
+					Spec: kextapi.ScaleSpec{Replicas: 0},
585
+				},
586
+				{
587
+					TypeMeta: kunversioned.TypeMeta{
588
+						Kind: "DeploymentConfig",
589
+					},
590
+					ObjectMeta: kapi.ObjectMeta{
591
+						Name: "somedc",
592
+					},
593
+					Spec: kextapi.ScaleSpec{Replicas: 0},
594
+				},
595
+			},
596
+			scaleUpdatesNotFound: []bool{false, true},
597
+			errorExpected:        false,
598
+			annotationsExpected: map[string]string{
599
+				unidlingapi.IdledAtAnnotation:      nowTime.Format(time.RFC3339),
600
+				unidlingapi.UnidleTargetAnnotation: string(outScalablesBytes),
601
+			},
602
+		},
603
+		{
604
+			name: "retry on failed endpoints update",
605
+			endpointsGet: &kapi.Endpoints{
606
+				ObjectMeta: kapi.ObjectMeta{
607
+					Name: "somesvc",
608
+					Annotations: map[string]string{
609
+						unidlingapi.IdledAtAnnotation:      nowTime.Format(time.RFC3339),
610
+						unidlingapi.UnidleTargetAnnotation: string(baseScalablesBytes),
611
+					},
612
+				},
613
+			},
614
+			scaleGets: []kextapi.Scale{
615
+				{
616
+					TypeMeta: kunversioned.TypeMeta{
617
+						Kind: "ReplicationController",
618
+					},
619
+					ObjectMeta: kapi.ObjectMeta{
620
+						Name: "somerc",
621
+					},
622
+					Spec: kextapi.ScaleSpec{Replicas: 0},
623
+				},
624
+				{
625
+					TypeMeta: kunversioned.TypeMeta{
626
+						Kind: "DeploymentConfig",
627
+					},
628
+					ObjectMeta: kapi.ObjectMeta{
629
+						Name: "somedc",
630
+					},
631
+					Spec: kextapi.ScaleSpec{Replicas: 0},
632
+				},
633
+			},
634
+			preventEndpointsUpdate: true,
635
+			errorExpected:          true,
636
+			retryExpected:          true,
637
+		},
638
+	}
639
+
640
+	for _, test := range tests {
641
+		fakeClient, fakeDeployClient := prepareFakeClientForFailureTest(test)
642
+		controller := &UnidlingController{
643
+			scaleNamespacer:     fakeClient.Extensions(),
644
+			endpointsNamespacer: fakeClient.Core(),
645
+			rcNamespacer:        fakeClient.Core(),
646
+			dcNamespacer:        fakeDeployClient.Core(),
647
+		}
648
+
649
+		var retry bool
650
+		retry, err = controller.handleRequest(types.NamespacedName{
651
+			Namespace: "somens",
652
+			Name:      "somesvc",
653
+		}, nowTime.Add(10*time.Second))
654
+
655
+		if err != nil && !test.errorExpected {
656
+			t.Errorf("for test 'it should %s': unexpected error while idling: %v", test.name, err)
657
+			continue
658
+		}
659
+
660
+		if err == nil && test.errorExpected {
661
+			t.Errorf("for test 'it should %s': expected error, but did not get one", test.name)
662
+			continue
663
+		}
664
+
665
+		if test.errorExpected && (test.retryExpected != retry) {
666
+			t.Errorf("for test 'it should %s': expected retry to be %v, but it was %v with error %v", test.name, test.retryExpected, retry, err)
667
+			return
668
+		}
669
+	}
670
+}
0 671
new file mode 100644
... ...
@@ -0,0 +1,109 @@
0
+package util
1
+
2
+import (
3
+	unidlingapi "github.com/openshift/origin/pkg/unidling/api"
4
+
5
+	deployapi "github.com/openshift/origin/pkg/deploy/api"
6
+	kapi "k8s.io/kubernetes/pkg/api"
7
+	kextapi "k8s.io/kubernetes/pkg/apis/extensions"
8
+	"k8s.io/kubernetes/pkg/runtime"
9
+
10
+	deployclient "github.com/openshift/origin/pkg/deploy/client/clientset_generated/internalclientset/typed/core/unversioned"
11
+	kclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
12
+	kextclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned"
13
+
14
+	"github.com/golang/glog"
15
+)
16
+
17
+// TODO: remove the below functions once we get a way to mark/unmark an object as idled
18
+// via the scale endpoint
19
+
20
+type AnnotationFunc func(annotations map[string]string)
21
+
22
+func NewScaleAnnotater(scales kextclient.ScalesGetter, dcs deployclient.DeploymentConfigsGetter, rcs kclient.ReplicationControllersGetter, changeAnnots AnnotationFunc) *ScaleAnnotater {
23
+	return &ScaleAnnotater{
24
+		scales:            scales,
25
+		dcs:               dcs,
26
+		rcs:               rcs,
27
+		changeAnnotations: changeAnnots,
28
+	}
29
+}
30
+
31
+type ScaleAnnotater struct {
32
+	scales            kextclient.ScalesGetter
33
+	dcs               deployclient.DeploymentConfigsGetter
34
+	rcs               kclient.ReplicationControllersGetter
35
+	changeAnnotations AnnotationFunc
36
+}
37
+
38
+// getObjectWithScale either fetches a known type of object and constructs a Scale from that, or uses the scale
39
+// subresource to fetch a Scale by itself.
40
+func (c *ScaleAnnotater) GetObjectWithScale(namespace string, ref unidlingapi.CrossGroupObjectReference) (runtime.Object, *kextapi.Scale, error) {
41
+	var obj runtime.Object
42
+	var err error
43
+	var scale *kextapi.Scale
44
+
45
+	switch {
46
+	case ref.Kind == "DeploymentConfig" && ref.Group == deployapi.GroupName:
47
+		var dc *deployapi.DeploymentConfig
48
+		dc, err = c.dcs.DeploymentConfigs(namespace).Get(ref.Name)
49
+		if err != nil {
50
+			return nil, nil, err
51
+		}
52
+		scale = &kextapi.Scale{
53
+			Spec: kextapi.ScaleSpec{Replicas: dc.Spec.Replicas},
54
+		}
55
+		obj = dc
56
+	case ref.Kind == "ReplicationController" && ref.Group == kapi.GroupName:
57
+		var rc *kapi.ReplicationController
58
+		rc, err = c.rcs.ReplicationControllers(namespace).Get(ref.Name)
59
+		if err != nil {
60
+			return nil, nil, err
61
+		}
62
+		scale = &kextapi.Scale{
63
+			Spec: kextapi.ScaleSpec{Replicas: rc.Spec.Replicas},
64
+		}
65
+		obj = rc
66
+	default:
67
+		scale, err = c.scales.Scales(namespace).Get(ref.Kind, ref.Name)
68
+		if err != nil {
69
+			return nil, nil, err
70
+		}
71
+	}
72
+
73
+	return obj, scale, err
74
+}
75
+
76
+// updateObjectScale updates the scale of an object and removes unidling annotations for objects of a know type.
77
+// For objects of an unknown type, it scales the object using the scale subresource
78
+// (and does not change annotations).
79
+func (c *ScaleAnnotater) UpdateObjectScale(namespace string, ref unidlingapi.CrossGroupObjectReference, obj runtime.Object, scale *kextapi.Scale) error {
80
+	var err error
81
+
82
+	if obj == nil {
83
+		_, err = c.scales.Scales(namespace).Update(ref.Kind, scale)
84
+		return err
85
+	}
86
+
87
+	switch typedObj := obj.(type) {
88
+	case *deployapi.DeploymentConfig:
89
+		if typedObj.Annotations == nil {
90
+			typedObj.Annotations = make(map[string]string)
91
+		}
92
+		c.changeAnnotations(typedObj.Annotations)
93
+		typedObj.Spec.Replicas = scale.Spec.Replicas
94
+		_, err = c.dcs.DeploymentConfigs(namespace).Update(typedObj)
95
+	case *kapi.ReplicationController:
96
+		if typedObj.Annotations == nil {
97
+			typedObj.Annotations = make(map[string]string)
98
+		}
99
+		c.changeAnnotations(typedObj.Annotations)
100
+		typedObj.Spec.Replicas = scale.Spec.Replicas
101
+		_, err = c.rcs.ReplicationControllers(namespace).Update(typedObj)
102
+	default:
103
+		glog.V(2).Infof("Unidling unknown type %t: using scale interface and not removing annotations")
104
+		_, err = c.scales.Scales(namespace).Update(ref.Kind, scale)
105
+	}
106
+
107
+	return err
108
+}
... ...
@@ -10,6 +10,7 @@ import (
10 10
 	kapi "k8s.io/kubernetes/pkg/api"
11 11
 	kapierror "k8s.io/kubernetes/pkg/api/errors"
12 12
 	"k8s.io/kubernetes/pkg/api/unversioned"
13
+	kunvapi "k8s.io/kubernetes/pkg/api/unversioned"
13 14
 	extensionsapi "k8s.io/kubernetes/pkg/apis/extensions"
14 15
 	"k8s.io/kubernetes/pkg/util/sets"
15 16
 	"k8s.io/kubernetes/pkg/util/wait"
... ...
@@ -27,6 +28,63 @@ import (
27 27
 	testserver "github.com/openshift/origin/test/util/server"
28 28
 )
29 29
 
30
+func prettyPrintAction(act *authorizationapi.Action, defaultNamespaceStr string) string {
31
+	nsStr := fmt.Sprintf("in namespace %q", act.Namespace)
32
+	if act.Namespace == "" {
33
+		nsStr = defaultNamespaceStr
34
+	}
35
+
36
+	var resourceStr string
37
+	if act.Group == "" && act.Version == "" {
38
+		resourceStr = act.Resource
39
+	} else {
40
+		groupVer := kunvapi.GroupVersion{Group: act.Group, Version: act.Version}
41
+		resourceStr = fmt.Sprintf("%s/%s", act.Resource, groupVer.String())
42
+	}
43
+
44
+	var base string
45
+	if act.ResourceName == "" {
46
+		base = fmt.Sprintf("who can %s %s %s", act.Verb, resourceStr, nsStr)
47
+	} else {
48
+		base = fmt.Sprintf("who can %s the %s named %q %s", act.Verb, resourceStr, act.ResourceName, nsStr)
49
+	}
50
+
51
+	if act.Content != nil {
52
+		return fmt.Sprintf("%s with content %#v", base, act.Content)
53
+	}
54
+
55
+	return base
56
+}
57
+
58
+func prettyPrintReviewResponse(resp *authorizationapi.ResourceAccessReviewResponse) string {
59
+	nsStr := fmt.Sprintf("(in the namespace %q)\n", resp.Namespace)
60
+	if resp.Namespace == "" {
61
+		nsStr = "(in all namespaces)\n"
62
+	}
63
+
64
+	var usersStr string
65
+	if resp.Users.Len() > 0 {
66
+		userStrList := make([]string, 0, len(resp.Users))
67
+		for userName := range resp.Users {
68
+			userStrList = append(userStrList, fmt.Sprintf("    - %s\n", userName))
69
+		}
70
+
71
+		usersStr = fmt.Sprintf("  users:\n%s", strings.Join(userStrList, ""))
72
+	}
73
+
74
+	var groupsStr string
75
+	if resp.Groups.Len() > 0 {
76
+		groupStrList := make([]string, 0, len(resp.Groups))
77
+		for groupName := range resp.Groups {
78
+			groupStrList = append(groupStrList, fmt.Sprintf("    - %s\n", groupName))
79
+		}
80
+
81
+		groupsStr = fmt.Sprintf("  groups:\n%s", strings.Join(groupStrList, ""))
82
+	}
83
+
84
+	return fmt.Sprintf(nsStr + usersStr + groupsStr)
85
+}
86
+
30 87
 func TestClusterReaderCoverage(t *testing.T) {
31 88
 	testutil.RequireEtcd(t)
32 89
 	defer testutil.DumpEtcdOnFailure(t)
... ...
@@ -277,6 +335,9 @@ var globalClusterAdminGroups = sets.NewString("system:cluster-admins", "system:m
277 277
 var globalClusterReaderUsers = sets.NewString("system:serviceaccount:openshift-infra:namespace-controller", "system:admin")
278 278
 var globalClusterReaderGroups = sets.NewString("system:cluster-readers", "system:cluster-admins", "system:masters")
279 279
 
280
+// this list includes any other users who can get DeploymentConfigs
281
+var globalDeploymentConfigGetterUsers = sets.NewString("system:serviceaccount:openshift-infra:unidling-controller")
282
+
280 283
 type resourceAccessReviewTest struct {
281 284
 	description     string
282 285
 	clientInterface client.ResourceAccessReviewInterface
... ...
@@ -313,7 +374,7 @@ func (test resourceAccessReviewTest) run(t *testing.T) {
313 313
 			!reflect.DeepEqual(actualResponse.Users.List(), test.response.Users.List()) ||
314 314
 			!reflect.DeepEqual(actualResponse.Groups.List(), test.response.Groups.List()) ||
315 315
 			actualResponse.EvaluationError != test.response.EvaluationError {
316
-			failMessage = fmt.Sprintf("%s: %#v: expected %#v, got %#v", test.description, test.review, test.response, actualResponse)
316
+			failMessage = fmt.Sprintf("%s:\n  %s:\n  expected %s\n  got %s", test.description, prettyPrintAction(&test.review.Action, "(in any namespace)"), prettyPrintReviewResponse(&test.response), prettyPrintReviewResponse(actualResponse))
317 317
 			return false, nil
318 318
 		}
319 319
 
... ...
@@ -366,7 +427,7 @@ func (test localResourceAccessReviewTest) run(t *testing.T) {
366 366
 			!reflect.DeepEqual(actualResponse.Users.List(), test.response.Users.List()) ||
367 367
 			!reflect.DeepEqual(actualResponse.Groups.List(), test.response.Groups.List()) ||
368 368
 			actualResponse.EvaluationError != test.response.EvaluationError {
369
-			failMessage = fmt.Sprintf("%s: %#v: expected %#v, got %#v", test.description, test.review, test.response, actualResponse)
369
+			failMessage = fmt.Sprintf("%s:\n  %s:\n  expected %s\n  got %s", test.description, prettyPrintAction(&test.review.Action, "(in the current namespace)"), prettyPrintReviewResponse(&test.response), prettyPrintReviewResponse(actualResponse))
370 370
 			return false, nil
371 371
 		}
372 372
 
... ...
@@ -451,6 +512,7 @@ func TestAuthorizationResourceAccessReview(t *testing.T) {
451 451
 			},
452 452
 		}
453 453
 		test.response.Users.Insert(globalClusterReaderUsers.List()...)
454
+		test.response.Users.Insert(globalDeploymentConfigGetterUsers.List()...)
454 455
 		test.response.Groups.Insert(globalClusterReaderGroups.List()...)
455 456
 		test.run(t)
456 457
 	}
... ...
@@ -466,6 +528,7 @@ func TestAuthorizationResourceAccessReview(t *testing.T) {
466 466
 			},
467 467
 		}
468 468
 		test.response.Users.Insert(globalClusterReaderUsers.List()...)
469
+		test.response.Users.Insert(globalDeploymentConfigGetterUsers.List()...)
469 470
 		test.response.Groups.Insert(globalClusterReaderGroups.List()...)
470 471
 		test.run(t)
471 472
 	}
... ...
@@ -493,6 +556,7 @@ func TestAuthorizationResourceAccessReview(t *testing.T) {
493 493
 			},
494 494
 		}
495 495
 		test.response.Users.Insert(globalClusterReaderUsers.List()...)
496
+		test.response.Users.Insert(globalDeploymentConfigGetterUsers.List()...)
496 497
 		test.response.Groups.Insert(globalClusterReaderGroups.List()...)
497 498
 		test.run(t)
498 499
 	}
... ...
@@ -513,6 +577,7 @@ func TestAuthorizationResourceAccessReview(t *testing.T) {
513 513
 			},
514 514
 		}
515 515
 		test.response.Users.Insert(globalClusterReaderUsers.List()...)
516
+		test.response.Users.Insert(globalDeploymentConfigGetterUsers.List()...)
516 517
 		test.response.Groups.Insert(globalClusterReaderGroups.List()...)
517 518
 		test.run(t)
518 519
 	}
... ...
@@ -3095,5 +3095,69 @@ items:
3095 3095
     verbs:
3096 3096
     - create
3097 3097
     - get
3098
+- apiVersion: v1
3099
+  kind: ClusterRole
3100
+  metadata:
3101
+    creationTimestamp: null
3102
+    name: system:unidling-controller
3103
+  rules:
3104
+  - apiGroups:
3105
+    - ""
3106
+    - extensions
3107
+    attributeRestrictions: null
3108
+    resources:
3109
+    - replicationcontrollers/scale
3110
+    verbs:
3111
+    - get
3112
+    - update
3113
+  - apiGroups:
3114
+    - extensions
3115
+    attributeRestrictions: null
3116
+    resources:
3117
+    - deployments/scale
3118
+    - replicasets/scale
3119
+    verbs:
3120
+    - get
3121
+    - update
3122
+  - apiGroups:
3123
+    - ""
3124
+    attributeRestrictions: null
3125
+    resources:
3126
+    - deploymentconfigs/scale
3127
+    verbs:
3128
+    - get
3129
+    - update
3130
+  - apiGroups:
3131
+    - ""
3132
+    attributeRestrictions: null
3133
+    resources:
3134
+    - events
3135
+    verbs:
3136
+    - list
3137
+    - watch
3138
+  - apiGroups:
3139
+    - ""
3140
+    attributeRestrictions: null
3141
+    resources:
3142
+    - endpoints
3143
+    verbs:
3144
+    - get
3145
+    - update
3146
+  - apiGroups:
3147
+    - ""
3148
+    attributeRestrictions: null
3149
+    resources:
3150
+    - replicationcontrollers
3151
+    verbs:
3152
+    - get
3153
+    - update
3154
+  - apiGroups:
3155
+    - ""
3156
+    attributeRestrictions: null
3157
+    resources:
3158
+    - deploymentconfigs
3159
+    verbs:
3160
+    - get
3161
+    - update
3098 3162
 kind: List
3099 3163
 metadata: {}