Browse code

add controller to regenerate service serving certs

deads2k authored on 2016/11/29 05:47:57
Showing 10 changed files
... ...
@@ -973,7 +973,7 @@ func init() {
973 973
 				},
974 974
 				{
975 975
 					APIGroups: []string{kapi.GroupName},
976
-					Verbs:     sets.NewString("get", "create"),
976
+					Verbs:     sets.NewString("get", "list", "watch", "create", "update"),
977 977
 					Resources: sets.NewString("secrets"),
978 978
 				},
979 979
 			},
... ...
@@ -395,6 +395,9 @@ func (c *MasterConfig) RunServiceServingCertController(client *kclientset.Client
395 395
 
396 396
 	servingCertController := servingcertcontroller.NewServiceServingCertController(client.Core(), client.Core(), ca, "cluster.local", 2*time.Minute)
397 397
 	go servingCertController.Run(1, make(chan struct{}))
398
+
399
+	servingCertUpdateController := servingcertcontroller.NewServiceServingCertUpdateController(client.Core(), client.Core(), ca, "cluster.local", 20*time.Minute)
400
+	go servingCertUpdateController.Run(5, make(chan struct{}))
398 401
 }
399 402
 
400 403
 // RunImageImportController starts the image import trigger controller process.
401 404
deleted file mode 100644
... ...
@@ -1,297 +0,0 @@
1
-package servingcert
2
-
3
-import (
4
-	"errors"
5
-	"fmt"
6
-	"strconv"
7
-	"time"
8
-
9
-	"github.com/golang/glog"
10
-
11
-	kapi "k8s.io/kubernetes/pkg/api"
12
-	kapierrors "k8s.io/kubernetes/pkg/api/errors"
13
-	"k8s.io/kubernetes/pkg/client/cache"
14
-	kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
15
-	"k8s.io/kubernetes/pkg/controller"
16
-	"k8s.io/kubernetes/pkg/controller/framework"
17
-	"k8s.io/kubernetes/pkg/runtime"
18
-	utilruntime "k8s.io/kubernetes/pkg/util/runtime"
19
-	"k8s.io/kubernetes/pkg/util/sets"
20
-	"k8s.io/kubernetes/pkg/util/wait"
21
-	"k8s.io/kubernetes/pkg/util/workqueue"
22
-	"k8s.io/kubernetes/pkg/watch"
23
-
24
-	"github.com/openshift/origin/pkg/cmd/server/crypto"
25
-)
26
-
27
-const (
28
-	// ServingCertSecretAnnotation stores the name of the secret to generate into.
29
-	ServingCertSecretAnnotation = "service.alpha.openshift.io/serving-cert-secret-name"
30
-	// ServingCertCreatedByAnnotation stores the of the signer common name.  This could be used later to see if the
31
-	// services need to have the the serving certs regenerated.  The presence and matching of this annotation prevents
32
-	// regeneration
33
-	ServingCertCreatedByAnnotation = "service.alpha.openshift.io/serving-cert-signed-by"
34
-	// ServingCertErrorAnnotation stores the error that caused cert generation failures.
35
-	ServingCertErrorAnnotation = "service.alpha.openshift.io/serving-cert-generation-error"
36
-	// ServingCertErrorNumAnnotation stores how many consecutive errors we've hit.  A value of the maxRetries will prevent
37
-	// the controller from reattempting until it is cleared.
38
-	ServingCertErrorNumAnnotation = "service.alpha.openshift.io/serving-cert-generation-error-num"
39
-	// ServiceUIDAnnotation is an annotation on a secret that indicates which service created it, by UID
40
-	ServiceUIDAnnotation = "service.alpha.openshift.io/originating-service-uid"
41
-	// ServiceNameAnnotation is an annotation on a secret that indicates which service created it, by Name to allow reverse lookups on services
42
-	// for comparison against UIDs
43
-	ServiceNameAnnotation = "service.alpha.openshift.io/originating-service-name"
44
-)
45
-
46
-// ServiceServingCertController is responsible for synchronizing Service objects stored
47
-// in the system with actual running replica sets and pods.
48
-type ServiceServingCertController struct {
49
-	serviceClient kcoreclient.ServicesGetter
50
-	secretClient  kcoreclient.SecretsGetter
51
-
52
-	// Services that need to be checked
53
-	queue      workqueue.RateLimitingInterface
54
-	maxRetries int
55
-
56
-	serviceCache      cache.Store
57
-	serviceController *framework.Controller
58
-
59
-	ca         *crypto.CA
60
-	publicCert string
61
-	dnsSuffix  string
62
-
63
-	// syncHandler does the work. It's factored out for unit testing
64
-	syncHandler func(serviceKey string) error
65
-}
66
-
67
-// NewServiceServingCertController creates a new ServiceServingCertController.
68
-// TODO this should accept a shared informer
69
-func NewServiceServingCertController(serviceClient kcoreclient.ServicesGetter, secretClient kcoreclient.SecretsGetter, ca *crypto.CA, dnsSuffix string, resyncInterval time.Duration) *ServiceServingCertController {
70
-	sc := &ServiceServingCertController{
71
-		serviceClient: serviceClient,
72
-		secretClient:  secretClient,
73
-
74
-		queue:      workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
75
-		maxRetries: 10,
76
-
77
-		ca:        ca,
78
-		dnsSuffix: dnsSuffix,
79
-	}
80
-
81
-	sc.serviceCache, sc.serviceController = framework.NewInformer(
82
-		&cache.ListWatch{
83
-			ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
84
-				return sc.serviceClient.Services(kapi.NamespaceAll).List(options)
85
-			},
86
-			WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
87
-				return sc.serviceClient.Services(kapi.NamespaceAll).Watch(options)
88
-			},
89
-		},
90
-		&kapi.Service{},
91
-		resyncInterval,
92
-		framework.ResourceEventHandlerFuncs{
93
-			AddFunc: func(obj interface{}) {
94
-				service := obj.(*kapi.Service)
95
-				glog.V(4).Infof("Adding service %s", service.Name)
96
-				sc.enqueueService(obj)
97
-			},
98
-			UpdateFunc: func(old, cur interface{}) {
99
-				service := cur.(*kapi.Service)
100
-				glog.V(4).Infof("Updating service %s", service.Name)
101
-				// Resync on service object relist.
102
-				sc.enqueueService(cur)
103
-			},
104
-		},
105
-	)
106
-
107
-	sc.syncHandler = sc.syncService
108
-
109
-	return sc
110
-}
111
-
112
-// Run begins watching and syncing.
113
-func (sc *ServiceServingCertController) Run(workers int, stopCh <-chan struct{}) {
114
-	defer utilruntime.HandleCrash()
115
-	go sc.serviceController.Run(stopCh)
116
-	for i := 0; i < workers; i++ {
117
-		go wait.Until(sc.worker, time.Second, stopCh)
118
-	}
119
-
120
-	<-stopCh
121
-	glog.Infof("Shutting down service signing cert controller")
122
-	sc.queue.ShutDown()
123
-}
124
-
125
-func (sc *ServiceServingCertController) enqueueService(obj interface{}) {
126
-	key, err := controller.KeyFunc(obj)
127
-	if err != nil {
128
-		glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
129
-		return
130
-	}
131
-
132
-	sc.queue.Add(key)
133
-}
134
-
135
-// worker runs a worker thread that just dequeues items, processes them, and marks them done.
136
-// It enforces that the syncHandler is never invoked concurrently with the same key.
137
-func (sc *ServiceServingCertController) worker() {
138
-	for {
139
-		if !sc.work() {
140
-			return
141
-		}
142
-	}
143
-}
144
-
145
-// work returns true if the worker thread should continue
146
-func (sc *ServiceServingCertController) work() bool {
147
-	key, quit := sc.queue.Get()
148
-	if quit {
149
-		return false
150
-	}
151
-	defer sc.queue.Done(key)
152
-
153
-	if err := sc.syncHandler(key.(string)); err == nil {
154
-		// this means the request was successfully handled.  We should "forget" the item so that any retry
155
-		// later on is reset
156
-		sc.queue.Forget(key)
157
-
158
-	} else {
159
-		// if we had an error it means that we didn't handle it, which means that we want to requeue the work
160
-		utilruntime.HandleError(fmt.Errorf("error syncing service, it will be retried: %v", err))
161
-		sc.queue.AddRateLimited(key)
162
-	}
163
-
164
-	return true
165
-}
166
-
167
-// syncService will sync the service with the given key.
168
-// This function is not meant to be invoked concurrently with the same key.
169
-func (sc *ServiceServingCertController) syncService(key string) error {
170
-	obj, exists, err := sc.serviceCache.GetByKey(key)
171
-	if err != nil {
172
-		glog.V(4).Infof("Unable to retrieve service %v from store: %v", key, err)
173
-		return err
174
-	}
175
-	if !exists {
176
-		glog.V(4).Infof("Service has been deleted %v", key)
177
-		return nil
178
-	}
179
-
180
-	if !sc.requiresCertGeneration(obj.(*kapi.Service)) {
181
-		return nil
182
-	}
183
-
184
-	// make a copy to avoid mutating cache state
185
-	t, err := kapi.Scheme.DeepCopy(obj)
186
-	if err != nil {
187
-		return err
188
-	}
189
-	service := t.(*kapi.Service)
190
-	if service.Annotations == nil {
191
-		service.Annotations = map[string]string{}
192
-	}
193
-
194
-	dnsName := service.Name + "." + service.Namespace + ".svc"
195
-	fqDNSName := dnsName + "." + sc.dnsSuffix
196
-	servingCert, err := sc.ca.MakeServerCert(sets.NewString(dnsName, fqDNSName))
197
-	if err != nil {
198
-		return err
199
-	}
200
-	certBytes, keyBytes, err := servingCert.GetPEMBytes()
201
-	if err != nil {
202
-		return err
203
-	}
204
-
205
-	secret := &kapi.Secret{
206
-		ObjectMeta: kapi.ObjectMeta{
207
-			Namespace: service.Namespace,
208
-			Name:      service.Annotations[ServingCertSecretAnnotation],
209
-			Annotations: map[string]string{
210
-				ServiceUIDAnnotation:  string(service.UID),
211
-				ServiceNameAnnotation: service.Name,
212
-			},
213
-		},
214
-		Type: kapi.SecretTypeTLS,
215
-		Data: map[string][]byte{
216
-			kapi.TLSCertKey:       certBytes,
217
-			kapi.TLSPrivateKeyKey: keyBytes,
218
-		},
219
-	}
220
-
221
-	_, err = sc.secretClient.Secrets(service.Namespace).Create(secret)
222
-	if err != nil && !kapierrors.IsAlreadyExists(err) {
223
-		// if we have an error creating the secret, then try to update the service with that information.  If it fails,
224
-		// then we'll just try again later on  re-list or because the service had already been updated and we'll get triggered again.
225
-		service.Annotations[ServingCertErrorAnnotation] = err.Error()
226
-		service.Annotations[ServingCertErrorNumAnnotation] = strconv.Itoa(getNumFailures(service) + 1)
227
-		_, updateErr := sc.serviceClient.Services(service.Namespace).Update(service)
228
-
229
-		// if we're past the max retries and we successfully updated, then the sync loop successfully handled this service and we want to forget it
230
-		if updateErr == nil && getNumFailures(service) >= sc.maxRetries {
231
-			return nil
232
-		}
233
-		return err
234
-	}
235
-	if kapierrors.IsAlreadyExists(err) {
236
-		actualSecret, err := sc.secretClient.Secrets(service.Namespace).Get(secret.Name)
237
-		if err != nil {
238
-			// if we have an error creating the secret, then try to update the service with that information.  If it fails,
239
-			// then we'll just try again later on  re-list or because the service had already been updated and we'll get triggered again.
240
-			service.Annotations[ServingCertErrorAnnotation] = err.Error()
241
-			service.Annotations[ServingCertErrorNumAnnotation] = strconv.Itoa(getNumFailures(service) + 1)
242
-			_, updateErr := sc.serviceClient.Services(service.Namespace).Update(service)
243
-
244
-			// if we're past the max retries and we successfully updated, then the sync loop successfully handled this service and we want to forget it
245
-			if updateErr == nil && getNumFailures(service) >= sc.maxRetries {
246
-				return nil
247
-			}
248
-			return err
249
-		}
250
-
251
-		if actualSecret.Annotations[ServiceUIDAnnotation] != string(service.UID) {
252
-			service.Annotations[ServingCertErrorAnnotation] = fmt.Sprintf("secret/%v references serviceUID %v, which does not match %v", actualSecret.Name, actualSecret.Annotations[ServiceUIDAnnotation], service.UID)
253
-			service.Annotations[ServingCertErrorNumAnnotation] = strconv.Itoa(getNumFailures(service) + 1)
254
-			_, updateErr := sc.serviceClient.Services(service.Namespace).Update(service)
255
-
256
-			// if we're past the max retries and we successfully updated, then the sync loop successfully handled this service and we want to forget it
257
-			if updateErr == nil && getNumFailures(service) >= sc.maxRetries {
258
-				return nil
259
-			}
260
-			return errors.New(service.Annotations[ServingCertErrorAnnotation])
261
-		}
262
-	}
263
-
264
-	service.Annotations[ServingCertCreatedByAnnotation] = sc.ca.Config.Certs[0].Subject.CommonName
265
-	delete(service.Annotations, ServingCertErrorAnnotation)
266
-	delete(service.Annotations, ServingCertErrorNumAnnotation)
267
-	_, err = sc.serviceClient.Services(service.Namespace).Update(service)
268
-
269
-	return err
270
-}
271
-
272
-func getNumFailures(service *kapi.Service) int {
273
-	numFailuresString := service.Annotations[ServingCertErrorNumAnnotation]
274
-	if len(numFailuresString) == 0 {
275
-		return 0
276
-	}
277
-
278
-	numFailures, err := strconv.Atoi(numFailuresString)
279
-	if err != nil {
280
-		return 0
281
-	}
282
-	return numFailures
283
-}
284
-
285
-func (sc *ServiceServingCertController) requiresCertGeneration(service *kapi.Service) bool {
286
-	if secretName := service.Annotations[ServingCertSecretAnnotation]; len(secretName) == 0 {
287
-		return false
288
-	}
289
-	if getNumFailures(service) >= sc.maxRetries {
290
-		return false
291
-	}
292
-	if service.Annotations[ServingCertCreatedByAnnotation] == sc.ca.Config.Certs[0].Subject.CommonName {
293
-		return false
294
-	}
295
-
296
-	return true
297
-}
298 1
deleted file mode 100644
... ...
@@ -1,420 +0,0 @@
1
-package servingcert
2
-
3
-import (
4
-	"fmt"
5
-	"io/ioutil"
6
-	"reflect"
7
-	"testing"
8
-	"time"
9
-
10
-	kapi "k8s.io/kubernetes/pkg/api"
11
-	kapierrors "k8s.io/kubernetes/pkg/api/errors"
12
-	"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
13
-	"k8s.io/kubernetes/pkg/client/testing/core"
14
-	"k8s.io/kubernetes/pkg/runtime"
15
-	"k8s.io/kubernetes/pkg/types"
16
-	"k8s.io/kubernetes/pkg/watch"
17
-
18
-	"github.com/openshift/origin/pkg/cmd/server/admin"
19
-)
20
-
21
-func controllerSetup(startingObjects []runtime.Object, stopChannel chan struct{}, t *testing.T) ( /*caName*/ string, *fake.Clientset, *watch.FakeWatcher, *ServiceServingCertController) {
22
-	certDir, err := ioutil.TempDir("", "serving-cert-unit-")
23
-	if err != nil {
24
-		t.Fatalf("unexpected error: %v", err)
25
-	}
26
-	caInfo := admin.DefaultServiceSignerCAInfo(certDir)
27
-
28
-	caOptions := admin.CreateSignerCertOptions{
29
-		CertFile: caInfo.CertFile,
30
-		KeyFile:  caInfo.KeyFile,
31
-		Name:     admin.DefaultServiceServingCertSignerName(),
32
-		Output:   ioutil.Discard,
33
-	}
34
-	ca, err := caOptions.CreateSignerCert()
35
-	if err != nil {
36
-		t.Fatalf("unexpected error: %v", err)
37
-	}
38
-
39
-	kubeclient := fake.NewSimpleClientset(startingObjects...)
40
-	fakeWatch := watch.NewFake()
41
-	kubeclient.PrependReactor("create", "*", func(action core.Action) (handled bool, ret runtime.Object, err error) {
42
-		return true, action.(core.CreateAction).GetObject(), nil
43
-	})
44
-	kubeclient.PrependReactor("update", "*", func(action core.Action) (handled bool, ret runtime.Object, err error) {
45
-		return true, action.(core.UpdateAction).GetObject(), nil
46
-	})
47
-	kubeclient.PrependWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil))
48
-
49
-	controller := NewServiceServingCertController(kubeclient.Core(), kubeclient.Core(), ca, "cluster.local", 10*time.Minute)
50
-
51
-	return caOptions.Name, kubeclient, fakeWatch, controller
52
-}
53
-
54
-func TestBasicControllerFlow(t *testing.T) {
55
-	stopChannel := make(chan struct{})
56
-	defer close(stopChannel)
57
-	received := make(chan bool)
58
-
59
-	caName, kubeclient, fakeWatch, controller := controllerSetup([]runtime.Object{}, stopChannel, t)
60
-	controller.syncHandler = func(serviceKey string) error {
61
-		defer func() { received <- true }()
62
-
63
-		err := controller.syncService(serviceKey)
64
-		if err != nil {
65
-			t.Errorf("unexpected error: %v", err)
66
-		}
67
-
68
-		return err
69
-	}
70
-	go controller.Run(1, stopChannel)
71
-
72
-	expectedSecretName := "new-secret"
73
-	serviceName := "svc-name"
74
-	serviceUID := "some-uid"
75
-	expectedServiceAnnotations := map[string]string{ServingCertSecretAnnotation: expectedSecretName, ServingCertCreatedByAnnotation: caName}
76
-	expectedSecretAnnotations := map[string]string{ServiceUIDAnnotation: serviceUID, ServiceNameAnnotation: serviceName}
77
-	namespace := "ns"
78
-
79
-	serviceToAdd := &kapi.Service{}
80
-	serviceToAdd.Name = serviceName
81
-	serviceToAdd.Namespace = namespace
82
-	serviceToAdd.UID = types.UID(serviceUID)
83
-	serviceToAdd.Annotations = map[string]string{ServingCertSecretAnnotation: expectedSecretName}
84
-	fakeWatch.Add(serviceToAdd)
85
-
86
-	t.Log("waiting to reach syncHandler")
87
-	select {
88
-	case <-received:
89
-	case <-time.After(time.Duration(30 * time.Second)):
90
-		t.Fatalf("failed to call into syncService")
91
-	}
92
-
93
-	foundSecret := false
94
-	foundServiceUpdate := false
95
-	for _, action := range kubeclient.Actions() {
96
-		switch {
97
-		case action.Matches("create", "secrets"):
98
-			createSecret := action.(core.CreateAction)
99
-			newSecret := createSecret.GetObject().(*kapi.Secret)
100
-			if newSecret.Name != expectedSecretName {
101
-				t.Errorf("expected %v, got %v", expectedSecretName, newSecret.Name)
102
-				continue
103
-			}
104
-			if newSecret.Namespace != namespace {
105
-				t.Errorf("expected %v, got %v", namespace, newSecret.Namespace)
106
-				continue
107
-			}
108
-			if !reflect.DeepEqual(newSecret.Annotations, expectedSecretAnnotations) {
109
-				t.Errorf("expected %v, got %v", expectedSecretAnnotations, newSecret.Annotations)
110
-				continue
111
-			}
112
-			foundSecret = true
113
-
114
-		case action.Matches("update", "services"):
115
-			updateService := action.(core.UpdateAction)
116
-			service := updateService.GetObject().(*kapi.Service)
117
-			if !reflect.DeepEqual(service.Annotations, expectedServiceAnnotations) {
118
-				t.Errorf("expected %v, got %v", expectedServiceAnnotations, service.Annotations)
119
-				continue
120
-			}
121
-			foundServiceUpdate = true
122
-
123
-		}
124
-	}
125
-
126
-	if !foundSecret {
127
-		t.Errorf("secret wasn't created.  Got %v\n", kubeclient.Actions())
128
-	}
129
-	if !foundServiceUpdate {
130
-		t.Errorf("service wasn't updated.  Got %v\n", kubeclient.Actions())
131
-	}
132
-}
133
-
134
-func TestAlreadyExistingSecretControllerFlow(t *testing.T) {
135
-	stopChannel := make(chan struct{})
136
-	defer close(stopChannel)
137
-	received := make(chan bool)
138
-
139
-	expectedSecretName := "new-secret"
140
-	serviceName := "svc-name"
141
-	serviceUID := "some-uid"
142
-	expectedSecretAnnotations := map[string]string{ServiceUIDAnnotation: serviceUID, ServiceNameAnnotation: serviceName}
143
-	namespace := "ns"
144
-
145
-	existingSecret := &kapi.Secret{}
146
-	existingSecret.Name = expectedSecretName
147
-	existingSecret.Namespace = namespace
148
-	existingSecret.Type = kapi.SecretTypeTLS
149
-	existingSecret.Annotations = expectedSecretAnnotations
150
-
151
-	caName, kubeclient, fakeWatch, controller := controllerSetup([]runtime.Object{existingSecret}, stopChannel, t)
152
-	kubeclient.PrependReactor("create", "secrets", func(action core.Action) (handled bool, ret runtime.Object, err error) {
153
-		return true, &kapi.Secret{}, kapierrors.NewAlreadyExists(kapi.Resource("secrets"), "new-secret")
154
-	})
155
-	controller.syncHandler = func(serviceKey string) error {
156
-		defer func() { received <- true }()
157
-
158
-		err := controller.syncService(serviceKey)
159
-		if err != nil {
160
-			t.Errorf("unexpected error: %v", err)
161
-		}
162
-
163
-		return err
164
-	}
165
-	go controller.Run(1, stopChannel)
166
-
167
-	expectedServiceAnnotations := map[string]string{ServingCertSecretAnnotation: expectedSecretName, ServingCertCreatedByAnnotation: caName}
168
-
169
-	serviceToAdd := &kapi.Service{}
170
-	serviceToAdd.Name = serviceName
171
-	serviceToAdd.Namespace = namespace
172
-	serviceToAdd.UID = types.UID(serviceUID)
173
-	serviceToAdd.Annotations = map[string]string{ServingCertSecretAnnotation: expectedSecretName}
174
-	fakeWatch.Add(serviceToAdd)
175
-
176
-	t.Log("waiting to reach syncHandler")
177
-	select {
178
-	case <-received:
179
-	case <-time.After(time.Duration(30 * time.Second)):
180
-		t.Fatalf("failed to call into syncService")
181
-	}
182
-
183
-	foundSecret := false
184
-	foundServiceUpdate := false
185
-	for _, action := range kubeclient.Actions() {
186
-		switch {
187
-		case action.Matches("get", "secrets"):
188
-			foundSecret = true
189
-
190
-		case action.Matches("update", "services"):
191
-			updateService := action.(core.UpdateAction)
192
-			service := updateService.GetObject().(*kapi.Service)
193
-			if !reflect.DeepEqual(service.Annotations, expectedServiceAnnotations) {
194
-				t.Errorf("expected %v, got %v", expectedServiceAnnotations, service.Annotations)
195
-				continue
196
-			}
197
-			foundServiceUpdate = true
198
-
199
-		}
200
-	}
201
-
202
-	if !foundSecret {
203
-		t.Errorf("secret wasn't retrieved.  Got %v\n", kubeclient.Actions())
204
-	}
205
-	if !foundServiceUpdate {
206
-		t.Errorf("service wasn't updated.  Got %v\n", kubeclient.Actions())
207
-	}
208
-
209
-}
210
-
211
-func TestAlreadyExistingSecretForDifferentUIDControllerFlow(t *testing.T) {
212
-	stopChannel := make(chan struct{})
213
-	defer close(stopChannel)
214
-	received := make(chan bool)
215
-
216
-	expectedError := "secret/new-secret references serviceUID wrong-uid, which does not match some-uid"
217
-	expectedSecretName := "new-secret"
218
-	serviceName := "svc-name"
219
-	serviceUID := "some-uid"
220
-	namespace := "ns"
221
-
222
-	existingSecret := &kapi.Secret{}
223
-	existingSecret.Name = expectedSecretName
224
-	existingSecret.Namespace = namespace
225
-	existingSecret.Type = kapi.SecretTypeTLS
226
-	existingSecret.Annotations = map[string]string{ServiceUIDAnnotation: "wrong-uid", ServiceNameAnnotation: serviceName}
227
-
228
-	_, kubeclient, fakeWatch, controller := controllerSetup([]runtime.Object{existingSecret}, stopChannel, t)
229
-	kubeclient.PrependReactor("create", "secrets", func(action core.Action) (handled bool, ret runtime.Object, err error) {
230
-		return true, &kapi.Secret{}, kapierrors.NewAlreadyExists(kapi.Resource("secrets"), "new-secret")
231
-	})
232
-	controller.syncHandler = func(serviceKey string) error {
233
-		defer func() { received <- true }()
234
-
235
-		err := controller.syncService(serviceKey)
236
-		if err != nil && err.Error() != expectedError {
237
-			t.Errorf("unexpected error: %v", err)
238
-		}
239
-
240
-		return err
241
-	}
242
-	go controller.Run(1, stopChannel)
243
-
244
-	expectedServiceAnnotations := map[string]string{ServingCertSecretAnnotation: expectedSecretName, ServingCertErrorAnnotation: expectedError, ServingCertErrorNumAnnotation: "1"}
245
-
246
-	serviceToAdd := &kapi.Service{}
247
-	serviceToAdd.Name = serviceName
248
-	serviceToAdd.Namespace = namespace
249
-	serviceToAdd.UID = types.UID(serviceUID)
250
-	serviceToAdd.Annotations = map[string]string{ServingCertSecretAnnotation: expectedSecretName}
251
-	fakeWatch.Add(serviceToAdd)
252
-
253
-	t.Log("waiting to reach syncHandler")
254
-	select {
255
-	case <-received:
256
-	case <-time.After(time.Duration(30 * time.Second)):
257
-		t.Fatalf("failed to call into syncService")
258
-	}
259
-
260
-	foundSecret := false
261
-	foundServiceUpdate := false
262
-	for _, action := range kubeclient.Actions() {
263
-		switch {
264
-		case action.Matches("get", "secrets"):
265
-			foundSecret = true
266
-
267
-		case action.Matches("update", "services"):
268
-			updateService := action.(core.UpdateAction)
269
-			service := updateService.GetObject().(*kapi.Service)
270
-			if !reflect.DeepEqual(service.Annotations, expectedServiceAnnotations) {
271
-				t.Errorf("expected %v, got %v", expectedServiceAnnotations, service.Annotations)
272
-				continue
273
-			}
274
-			foundServiceUpdate = true
275
-
276
-		}
277
-	}
278
-
279
-	if !foundSecret {
280
-		t.Errorf("secret wasn't retrieved.  Got %v\n", kubeclient.Actions())
281
-	}
282
-	if !foundServiceUpdate {
283
-		t.Errorf("service wasn't updated.  Got %v\n", kubeclient.Actions())
284
-	}
285
-}
286
-
287
-func TestSecretCreationErrorControllerFlow(t *testing.T) {
288
-	stopChannel := make(chan struct{})
289
-	defer close(stopChannel)
290
-	received := make(chan bool)
291
-
292
-	expectedError := `secrets "new-secret" is forbidden: any reason`
293
-	expectedSecretName := "new-secret"
294
-	serviceName := "svc-name"
295
-	serviceUID := "some-uid"
296
-	namespace := "ns"
297
-
298
-	_, kubeclient, fakeWatch, controller := controllerSetup([]runtime.Object{}, stopChannel, t)
299
-	kubeclient.PrependReactor("create", "secrets", func(action core.Action) (handled bool, ret runtime.Object, err error) {
300
-		return true, &kapi.Secret{}, kapierrors.NewForbidden(kapi.Resource("secrets"), "new-secret", fmt.Errorf("any reason"))
301
-	})
302
-	controller.syncHandler = func(serviceKey string) error {
303
-		defer func() { received <- true }()
304
-
305
-		err := controller.syncService(serviceKey)
306
-		if err != nil && err.Error() != expectedError {
307
-			t.Errorf("unexpected error: %v", err)
308
-		}
309
-
310
-		return err
311
-	}
312
-	go controller.Run(1, stopChannel)
313
-
314
-	expectedServiceAnnotations := map[string]string{ServingCertSecretAnnotation: expectedSecretName, ServingCertErrorAnnotation: expectedError, ServingCertErrorNumAnnotation: "1"}
315
-
316
-	serviceToAdd := &kapi.Service{}
317
-	serviceToAdd.Name = serviceName
318
-	serviceToAdd.Namespace = namespace
319
-	serviceToAdd.UID = types.UID(serviceUID)
320
-	serviceToAdd.Annotations = map[string]string{ServingCertSecretAnnotation: expectedSecretName}
321
-	fakeWatch.Add(serviceToAdd)
322
-
323
-	t.Log("waiting to reach syncHandler")
324
-	select {
325
-	case <-received:
326
-	case <-time.After(time.Duration(30 * time.Second)):
327
-		t.Fatalf("failed to call into syncService")
328
-	}
329
-
330
-	foundServiceUpdate := false
331
-	for _, action := range kubeclient.Actions() {
332
-		switch {
333
-		case action.Matches("update", "services"):
334
-			updateService := action.(core.UpdateAction)
335
-			service := updateService.GetObject().(*kapi.Service)
336
-			if !reflect.DeepEqual(service.Annotations, expectedServiceAnnotations) {
337
-				t.Errorf("expected %v, got %v", expectedServiceAnnotations, service.Annotations)
338
-				continue
339
-			}
340
-			foundServiceUpdate = true
341
-
342
-		}
343
-	}
344
-
345
-	if !foundServiceUpdate {
346
-		t.Errorf("service wasn't updated.  Got %v\n", kubeclient.Actions())
347
-	}
348
-}
349
-
350
-func TestSkipGenerationControllerFlow(t *testing.T) {
351
-	stopChannel := make(chan struct{})
352
-	defer close(stopChannel)
353
-	received := make(chan bool)
354
-
355
-	expectedSecretName := "new-secret"
356
-	serviceName := "svc-name"
357
-	serviceUID := "some-uid"
358
-	namespace := "ns"
359
-
360
-	caName, kubeclient, fakeWatch, controller := controllerSetup([]runtime.Object{}, stopChannel, t)
361
-	kubeclient.PrependReactor("update", "service", func(action core.Action) (handled bool, ret runtime.Object, err error) {
362
-		return true, &kapi.Service{}, kapierrors.NewForbidden(kapi.Resource("fdsa"), "new-service", fmt.Errorf("any service reason"))
363
-	})
364
-	kubeclient.PrependReactor("create", "secret", func(action core.Action) (handled bool, ret runtime.Object, err error) {
365
-		return true, &kapi.Secret{}, kapierrors.NewForbidden(kapi.Resource("asdf"), "new-secret", fmt.Errorf("any reason"))
366
-	})
367
-	kubeclient.PrependReactor("update", "secret", func(action core.Action) (handled bool, ret runtime.Object, err error) {
368
-		return true, &kapi.Secret{}, kapierrors.NewForbidden(kapi.Resource("asdf"), "new-secret", fmt.Errorf("any reason"))
369
-	})
370
-	controller.syncHandler = func(serviceKey string) error {
371
-		defer func() { received <- true }()
372
-
373
-		err := controller.syncService(serviceKey)
374
-		if err != nil {
375
-			t.Errorf("unexpected error: %v", err)
376
-		}
377
-
378
-		return err
379
-	}
380
-	go controller.Run(1, stopChannel)
381
-
382
-	serviceToAdd := &kapi.Service{}
383
-	serviceToAdd.Name = serviceName
384
-	serviceToAdd.Namespace = namespace
385
-	serviceToAdd.UID = types.UID(serviceUID)
386
-	serviceToAdd.Annotations = map[string]string{ServingCertSecretAnnotation: expectedSecretName, ServingCertErrorAnnotation: "any-error", ServingCertErrorNumAnnotation: "11"}
387
-	fakeWatch.Add(serviceToAdd)
388
-
389
-	t.Log("waiting to reach syncHandler")
390
-	select {
391
-	case <-received:
392
-	case <-time.After(time.Duration(30 * time.Second)):
393
-		t.Fatalf("failed to call into syncService")
394
-	}
395
-
396
-	for _, action := range kubeclient.Actions() {
397
-		switch action.GetVerb() {
398
-		case "update", "create":
399
-			t.Errorf("no mutation expected, but we got %v", action)
400
-		}
401
-	}
402
-
403
-	kubeclient.ClearActions()
404
-	serviceToAdd.Annotations = map[string]string{ServingCertSecretAnnotation: expectedSecretName, ServingCertCreatedByAnnotation: caName}
405
-	fakeWatch.Add(serviceToAdd)
406
-
407
-	t.Log("waiting to reach syncHandler")
408
-	select {
409
-	case <-received:
410
-	case <-time.After(time.Duration(30 * time.Second)):
411
-		t.Fatalf("failed to call into syncService")
412
-	}
413
-
414
-	for _, action := range kubeclient.Actions() {
415
-		switch action.GetVerb() {
416
-		case "update", "create":
417
-			t.Errorf("no mutation expected, but we got %v", action)
418
-		}
419
-	}
420
-}
421 1
new file mode 100644
... ...
@@ -0,0 +1,301 @@
0
+package servingcert
1
+
2
+import (
3
+	"errors"
4
+	"fmt"
5
+	"strconv"
6
+	"time"
7
+
8
+	"github.com/golang/glog"
9
+
10
+	kapi "k8s.io/kubernetes/pkg/api"
11
+	kapierrors "k8s.io/kubernetes/pkg/api/errors"
12
+	"k8s.io/kubernetes/pkg/client/cache"
13
+	kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
14
+	"k8s.io/kubernetes/pkg/controller"
15
+	"k8s.io/kubernetes/pkg/controller/framework"
16
+	"k8s.io/kubernetes/pkg/runtime"
17
+	utilruntime "k8s.io/kubernetes/pkg/util/runtime"
18
+	"k8s.io/kubernetes/pkg/util/sets"
19
+	"k8s.io/kubernetes/pkg/util/wait"
20
+	"k8s.io/kubernetes/pkg/util/workqueue"
21
+	"k8s.io/kubernetes/pkg/watch"
22
+
23
+	"github.com/openshift/origin/pkg/cmd/server/crypto"
24
+)
25
+
26
+const (
27
+	// ServingCertSecretAnnotation stores the name of the secret to generate into.
28
+	ServingCertSecretAnnotation = "service.alpha.openshift.io/serving-cert-secret-name"
29
+	// ServingCertCreatedByAnnotation stores the of the signer common name.  This could be used later to see if the
30
+	// services need to have the the serving certs regenerated.  The presence and matching of this annotation prevents
31
+	// regeneration
32
+	ServingCertCreatedByAnnotation = "service.alpha.openshift.io/serving-cert-signed-by"
33
+	// ServingCertErrorAnnotation stores the error that caused cert generation failures.
34
+	ServingCertErrorAnnotation = "service.alpha.openshift.io/serving-cert-generation-error"
35
+	// ServingCertErrorNumAnnotation stores how many consecutive errors we've hit.  A value of the maxRetries will prevent
36
+	// the controller from reattempting until it is cleared.
37
+	ServingCertErrorNumAnnotation = "service.alpha.openshift.io/serving-cert-generation-error-num"
38
+	// ServiceUIDAnnotation is an annotation on a secret that indicates which service created it, by UID
39
+	ServiceUIDAnnotation = "service.alpha.openshift.io/originating-service-uid"
40
+	// ServiceNameAnnotation is an annotation on a secret that indicates which service created it, by Name to allow reverse lookups on services
41
+	// for comparison against UIDs
42
+	ServiceNameAnnotation = "service.alpha.openshift.io/originating-service-name"
43
+	// ServingCertExpiryAnnotation is an annotation that holds the expiry time of the certificate.  It accepts time in the
44
+	// RFC3339 format: 2018-11-29T17:44:39Z
45
+	ServingCertExpiryAnnotation = "service.alpha.openshift.io/expiry"
46
+)
47
+
48
+// ServiceServingCertController is responsible for synchronizing Service objects stored
49
+// in the system with actual running replica sets and pods.
50
+type ServiceServingCertController struct {
51
+	serviceClient kcoreclient.ServicesGetter
52
+	secretClient  kcoreclient.SecretsGetter
53
+
54
+	// Services that need to be checked
55
+	queue      workqueue.RateLimitingInterface
56
+	maxRetries int
57
+
58
+	serviceCache      cache.Store
59
+	serviceController *framework.Controller
60
+
61
+	ca         *crypto.CA
62
+	publicCert string
63
+	dnsSuffix  string
64
+
65
+	// syncHandler does the work. It's factored out for unit testing
66
+	syncHandler func(serviceKey string) error
67
+}
68
+
69
+// NewServiceServingCertController creates a new ServiceServingCertController.
70
+// TODO this should accept a shared informer
71
+func NewServiceServingCertController(serviceClient kcoreclient.ServicesGetter, secretClient kcoreclient.SecretsGetter, ca *crypto.CA, dnsSuffix string, resyncInterval time.Duration) *ServiceServingCertController {
72
+	sc := &ServiceServingCertController{
73
+		serviceClient: serviceClient,
74
+		secretClient:  secretClient,
75
+
76
+		queue:      workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
77
+		maxRetries: 10,
78
+
79
+		ca:        ca,
80
+		dnsSuffix: dnsSuffix,
81
+	}
82
+
83
+	sc.serviceCache, sc.serviceController = framework.NewInformer(
84
+		&cache.ListWatch{
85
+			ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
86
+				return sc.serviceClient.Services(kapi.NamespaceAll).List(options)
87
+			},
88
+			WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
89
+				return sc.serviceClient.Services(kapi.NamespaceAll).Watch(options)
90
+			},
91
+		},
92
+		&kapi.Service{},
93
+		resyncInterval,
94
+		framework.ResourceEventHandlerFuncs{
95
+			AddFunc: func(obj interface{}) {
96
+				service := obj.(*kapi.Service)
97
+				glog.V(4).Infof("Adding service %s", service.Name)
98
+				sc.enqueueService(obj)
99
+			},
100
+			UpdateFunc: func(old, cur interface{}) {
101
+				service := cur.(*kapi.Service)
102
+				glog.V(4).Infof("Updating service %s", service.Name)
103
+				// Resync on service object relist.
104
+				sc.enqueueService(cur)
105
+			},
106
+		},
107
+	)
108
+
109
+	sc.syncHandler = sc.syncService
110
+
111
+	return sc
112
+}
113
+
114
+// Run begins watching and syncing.
115
+func (sc *ServiceServingCertController) Run(workers int, stopCh <-chan struct{}) {
116
+	defer utilruntime.HandleCrash()
117
+	go sc.serviceController.Run(stopCh)
118
+	for i := 0; i < workers; i++ {
119
+		go wait.Until(sc.worker, time.Second, stopCh)
120
+	}
121
+
122
+	<-stopCh
123
+	glog.Infof("Shutting down service signing cert controller")
124
+	sc.queue.ShutDown()
125
+}
126
+
127
+func (sc *ServiceServingCertController) enqueueService(obj interface{}) {
128
+	key, err := controller.KeyFunc(obj)
129
+	if err != nil {
130
+		glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
131
+		return
132
+	}
133
+
134
+	sc.queue.Add(key)
135
+}
136
+
137
+// worker runs a worker thread that just dequeues items, processes them, and marks them done.
138
+// It enforces that the syncHandler is never invoked concurrently with the same key.
139
+func (sc *ServiceServingCertController) worker() {
140
+	for {
141
+		if !sc.work() {
142
+			return
143
+		}
144
+	}
145
+}
146
+
147
+// work returns true if the worker thread should continue
148
+func (sc *ServiceServingCertController) work() bool {
149
+	key, quit := sc.queue.Get()
150
+	if quit {
151
+		return false
152
+	}
153
+	defer sc.queue.Done(key)
154
+
155
+	if err := sc.syncHandler(key.(string)); err == nil {
156
+		// this means the request was successfully handled.  We should "forget" the item so that any retry
157
+		// later on is reset
158
+		sc.queue.Forget(key)
159
+
160
+	} else {
161
+		// if we had an error it means that we didn't handle it, which means that we want to requeue the work
162
+		utilruntime.HandleError(fmt.Errorf("error syncing service, it will be retried: %v", err))
163
+		sc.queue.AddRateLimited(key)
164
+	}
165
+
166
+	return true
167
+}
168
+
169
+// syncService will sync the service with the given key.
170
+// This function is not meant to be invoked concurrently with the same key.
171
+func (sc *ServiceServingCertController) syncService(key string) error {
172
+	obj, exists, err := sc.serviceCache.GetByKey(key)
173
+	if err != nil {
174
+		glog.V(4).Infof("Unable to retrieve service %v from store: %v", key, err)
175
+		return err
176
+	}
177
+	if !exists {
178
+		glog.V(4).Infof("Service has been deleted %v", key)
179
+		return nil
180
+	}
181
+
182
+	if !sc.requiresCertGeneration(obj.(*kapi.Service)) {
183
+		return nil
184
+	}
185
+
186
+	// make a copy to avoid mutating cache state
187
+	t, err := kapi.Scheme.DeepCopy(obj)
188
+	if err != nil {
189
+		return err
190
+	}
191
+	service := t.(*kapi.Service)
192
+	if service.Annotations == nil {
193
+		service.Annotations = map[string]string{}
194
+	}
195
+
196
+	dnsName := service.Name + "." + service.Namespace + ".svc"
197
+	fqDNSName := dnsName + "." + sc.dnsSuffix
198
+	servingCert, err := sc.ca.MakeServerCert(sets.NewString(dnsName, fqDNSName))
199
+	if err != nil {
200
+		return err
201
+	}
202
+	certBytes, keyBytes, err := servingCert.GetPEMBytes()
203
+	if err != nil {
204
+		return err
205
+	}
206
+
207
+	secret := &kapi.Secret{
208
+		ObjectMeta: kapi.ObjectMeta{
209
+			Namespace: service.Namespace,
210
+			Name:      service.Annotations[ServingCertSecretAnnotation],
211
+			Annotations: map[string]string{
212
+				ServiceUIDAnnotation:        string(service.UID),
213
+				ServiceNameAnnotation:       service.Name,
214
+				ServingCertExpiryAnnotation: servingCert.Certs[0].NotAfter.Format(time.RFC3339),
215
+			},
216
+		},
217
+		Type: kapi.SecretTypeTLS,
218
+		Data: map[string][]byte{
219
+			kapi.TLSCertKey:       certBytes,
220
+			kapi.TLSPrivateKeyKey: keyBytes,
221
+		},
222
+	}
223
+
224
+	_, err = sc.secretClient.Secrets(service.Namespace).Create(secret)
225
+	if err != nil && !kapierrors.IsAlreadyExists(err) {
226
+		// if we have an error creating the secret, then try to update the service with that information.  If it fails,
227
+		// then we'll just try again later on  re-list or because the service had already been updated and we'll get triggered again.
228
+		service.Annotations[ServingCertErrorAnnotation] = err.Error()
229
+		service.Annotations[ServingCertErrorNumAnnotation] = strconv.Itoa(getNumFailures(service) + 1)
230
+		_, updateErr := sc.serviceClient.Services(service.Namespace).Update(service)
231
+
232
+		// if we're past the max retries and we successfully updated, then the sync loop successfully handled this service and we want to forget it
233
+		if updateErr == nil && getNumFailures(service) >= sc.maxRetries {
234
+			return nil
235
+		}
236
+		return err
237
+	}
238
+	if kapierrors.IsAlreadyExists(err) {
239
+		actualSecret, err := sc.secretClient.Secrets(service.Namespace).Get(secret.Name)
240
+		if err != nil {
241
+			// if we have an error creating the secret, then try to update the service with that information.  If it fails,
242
+			// then we'll just try again later on  re-list or because the service had already been updated and we'll get triggered again.
243
+			service.Annotations[ServingCertErrorAnnotation] = err.Error()
244
+			service.Annotations[ServingCertErrorNumAnnotation] = strconv.Itoa(getNumFailures(service) + 1)
245
+			_, updateErr := sc.serviceClient.Services(service.Namespace).Update(service)
246
+
247
+			// if we're past the max retries and we successfully updated, then the sync loop successfully handled this service and we want to forget it
248
+			if updateErr == nil && getNumFailures(service) >= sc.maxRetries {
249
+				return nil
250
+			}
251
+			return err
252
+		}
253
+
254
+		if actualSecret.Annotations[ServiceUIDAnnotation] != string(service.UID) {
255
+			service.Annotations[ServingCertErrorAnnotation] = fmt.Sprintf("secret/%v references serviceUID %v, which does not match %v", actualSecret.Name, actualSecret.Annotations[ServiceUIDAnnotation], service.UID)
256
+			service.Annotations[ServingCertErrorNumAnnotation] = strconv.Itoa(getNumFailures(service) + 1)
257
+			_, updateErr := sc.serviceClient.Services(service.Namespace).Update(service)
258
+
259
+			// if we're past the max retries and we successfully updated, then the sync loop successfully handled this service and we want to forget it
260
+			if updateErr == nil && getNumFailures(service) >= sc.maxRetries {
261
+				return nil
262
+			}
263
+			return errors.New(service.Annotations[ServingCertErrorAnnotation])
264
+		}
265
+	}
266
+
267
+	service.Annotations[ServingCertCreatedByAnnotation] = sc.ca.Config.Certs[0].Subject.CommonName
268
+	delete(service.Annotations, ServingCertErrorAnnotation)
269
+	delete(service.Annotations, ServingCertErrorNumAnnotation)
270
+	_, err = sc.serviceClient.Services(service.Namespace).Update(service)
271
+
272
+	return err
273
+}
274
+
275
+func getNumFailures(service *kapi.Service) int {
276
+	numFailuresString := service.Annotations[ServingCertErrorNumAnnotation]
277
+	if len(numFailuresString) == 0 {
278
+		return 0
279
+	}
280
+
281
+	numFailures, err := strconv.Atoi(numFailuresString)
282
+	if err != nil {
283
+		return 0
284
+	}
285
+	return numFailures
286
+}
287
+
288
+func (sc *ServiceServingCertController) requiresCertGeneration(service *kapi.Service) bool {
289
+	if secretName := service.Annotations[ServingCertSecretAnnotation]; len(secretName) == 0 {
290
+		return false
291
+	}
292
+	if getNumFailures(service) >= sc.maxRetries {
293
+		return false
294
+	}
295
+	if service.Annotations[ServingCertCreatedByAnnotation] == sc.ca.Config.Certs[0].Subject.CommonName {
296
+		return false
297
+	}
298
+
299
+	return true
300
+}
0 301
new file mode 100644
... ...
@@ -0,0 +1,421 @@
0
+package servingcert
1
+
2
+import (
3
+	"fmt"
4
+	"io/ioutil"
5
+	"reflect"
6
+	"testing"
7
+	"time"
8
+
9
+	kapi "k8s.io/kubernetes/pkg/api"
10
+	kapierrors "k8s.io/kubernetes/pkg/api/errors"
11
+	"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
12
+	"k8s.io/kubernetes/pkg/client/testing/core"
13
+	"k8s.io/kubernetes/pkg/runtime"
14
+	"k8s.io/kubernetes/pkg/types"
15
+	"k8s.io/kubernetes/pkg/watch"
16
+
17
+	"github.com/openshift/origin/pkg/cmd/server/admin"
18
+)
19
+
20
+func controllerSetup(startingObjects []runtime.Object, stopChannel chan struct{}, t *testing.T) ( /*caName*/ string, *fake.Clientset, *watch.FakeWatcher, *ServiceServingCertController) {
21
+	certDir, err := ioutil.TempDir("", "serving-cert-unit-")
22
+	if err != nil {
23
+		t.Fatalf("unexpected error: %v", err)
24
+	}
25
+	caInfo := admin.DefaultServiceSignerCAInfo(certDir)
26
+
27
+	caOptions := admin.CreateSignerCertOptions{
28
+		CertFile: caInfo.CertFile,
29
+		KeyFile:  caInfo.KeyFile,
30
+		Name:     admin.DefaultServiceServingCertSignerName(),
31
+		Output:   ioutil.Discard,
32
+	}
33
+	ca, err := caOptions.CreateSignerCert()
34
+	if err != nil {
35
+		t.Fatalf("unexpected error: %v", err)
36
+	}
37
+
38
+	kubeclient := fake.NewSimpleClientset(startingObjects...)
39
+	fakeWatch := watch.NewFake()
40
+	kubeclient.PrependReactor("create", "*", func(action core.Action) (handled bool, ret runtime.Object, err error) {
41
+		return true, action.(core.CreateAction).GetObject(), nil
42
+	})
43
+	kubeclient.PrependReactor("update", "*", func(action core.Action) (handled bool, ret runtime.Object, err error) {
44
+		return true, action.(core.UpdateAction).GetObject(), nil
45
+	})
46
+	kubeclient.PrependWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil))
47
+
48
+	controller := NewServiceServingCertController(kubeclient.Core(), kubeclient.Core(), ca, "cluster.local", 10*time.Minute)
49
+
50
+	return caOptions.Name, kubeclient, fakeWatch, controller
51
+}
52
+
53
+func TestBasicControllerFlow(t *testing.T) {
54
+	stopChannel := make(chan struct{})
55
+	defer close(stopChannel)
56
+	received := make(chan bool)
57
+
58
+	caName, kubeclient, fakeWatch, controller := controllerSetup([]runtime.Object{}, stopChannel, t)
59
+	controller.syncHandler = func(serviceKey string) error {
60
+		defer func() { received <- true }()
61
+
62
+		err := controller.syncService(serviceKey)
63
+		if err != nil {
64
+			t.Errorf("unexpected error: %v", err)
65
+		}
66
+
67
+		return err
68
+	}
69
+	go controller.Run(1, stopChannel)
70
+
71
+	expectedSecretName := "new-secret"
72
+	serviceName := "svc-name"
73
+	serviceUID := "some-uid"
74
+	expectedServiceAnnotations := map[string]string{ServingCertSecretAnnotation: expectedSecretName, ServingCertCreatedByAnnotation: caName}
75
+	expectedSecretAnnotations := map[string]string{ServiceUIDAnnotation: serviceUID, ServiceNameAnnotation: serviceName}
76
+	namespace := "ns"
77
+
78
+	serviceToAdd := &kapi.Service{}
79
+	serviceToAdd.Name = serviceName
80
+	serviceToAdd.Namespace = namespace
81
+	serviceToAdd.UID = types.UID(serviceUID)
82
+	serviceToAdd.Annotations = map[string]string{ServingCertSecretAnnotation: expectedSecretName}
83
+	fakeWatch.Add(serviceToAdd)
84
+
85
+	t.Log("waiting to reach syncHandler")
86
+	select {
87
+	case <-received:
88
+	case <-time.After(time.Duration(30 * time.Second)):
89
+		t.Fatalf("failed to call into syncService")
90
+	}
91
+
92
+	foundSecret := false
93
+	foundServiceUpdate := false
94
+	for _, action := range kubeclient.Actions() {
95
+		switch {
96
+		case action.Matches("create", "secrets"):
97
+			createSecret := action.(core.CreateAction)
98
+			newSecret := createSecret.GetObject().(*kapi.Secret)
99
+			if newSecret.Name != expectedSecretName {
100
+				t.Errorf("expected %v, got %v", expectedSecretName, newSecret.Name)
101
+				continue
102
+			}
103
+			if newSecret.Namespace != namespace {
104
+				t.Errorf("expected %v, got %v", namespace, newSecret.Namespace)
105
+				continue
106
+			}
107
+			delete(newSecret.Annotations, ServingCertExpiryAnnotation)
108
+			if !reflect.DeepEqual(newSecret.Annotations, expectedSecretAnnotations) {
109
+				t.Errorf("expected %v, got %v", expectedSecretAnnotations, newSecret.Annotations)
110
+				continue
111
+			}
112
+			foundSecret = true
113
+
114
+		case action.Matches("update", "services"):
115
+			updateService := action.(core.UpdateAction)
116
+			service := updateService.GetObject().(*kapi.Service)
117
+			if !reflect.DeepEqual(service.Annotations, expectedServiceAnnotations) {
118
+				t.Errorf("expected %v, got %v", expectedServiceAnnotations, service.Annotations)
119
+				continue
120
+			}
121
+			foundServiceUpdate = true
122
+
123
+		}
124
+	}
125
+
126
+	if !foundSecret {
127
+		t.Errorf("secret wasn't created.  Got %v\n", kubeclient.Actions())
128
+	}
129
+	if !foundServiceUpdate {
130
+		t.Errorf("service wasn't updated.  Got %v\n", kubeclient.Actions())
131
+	}
132
+}
133
+
134
+func TestAlreadyExistingSecretControllerFlow(t *testing.T) {
135
+	stopChannel := make(chan struct{})
136
+	defer close(stopChannel)
137
+	received := make(chan bool)
138
+
139
+	expectedSecretName := "new-secret"
140
+	serviceName := "svc-name"
141
+	serviceUID := "some-uid"
142
+	expectedSecretAnnotations := map[string]string{ServiceUIDAnnotation: serviceUID, ServiceNameAnnotation: serviceName}
143
+	namespace := "ns"
144
+
145
+	existingSecret := &kapi.Secret{}
146
+	existingSecret.Name = expectedSecretName
147
+	existingSecret.Namespace = namespace
148
+	existingSecret.Type = kapi.SecretTypeTLS
149
+	existingSecret.Annotations = expectedSecretAnnotations
150
+
151
+	caName, kubeclient, fakeWatch, controller := controllerSetup([]runtime.Object{existingSecret}, stopChannel, t)
152
+	kubeclient.PrependReactor("create", "secrets", func(action core.Action) (handled bool, ret runtime.Object, err error) {
153
+		return true, &kapi.Secret{}, kapierrors.NewAlreadyExists(kapi.Resource("secrets"), "new-secret")
154
+	})
155
+	controller.syncHandler = func(serviceKey string) error {
156
+		defer func() { received <- true }()
157
+
158
+		err := controller.syncService(serviceKey)
159
+		if err != nil {
160
+			t.Errorf("unexpected error: %v", err)
161
+		}
162
+
163
+		return err
164
+	}
165
+	go controller.Run(1, stopChannel)
166
+
167
+	expectedServiceAnnotations := map[string]string{ServingCertSecretAnnotation: expectedSecretName, ServingCertCreatedByAnnotation: caName}
168
+
169
+	serviceToAdd := &kapi.Service{}
170
+	serviceToAdd.Name = serviceName
171
+	serviceToAdd.Namespace = namespace
172
+	serviceToAdd.UID = types.UID(serviceUID)
173
+	serviceToAdd.Annotations = map[string]string{ServingCertSecretAnnotation: expectedSecretName}
174
+	fakeWatch.Add(serviceToAdd)
175
+
176
+	t.Log("waiting to reach syncHandler")
177
+	select {
178
+	case <-received:
179
+	case <-time.After(time.Duration(30 * time.Second)):
180
+		t.Fatalf("failed to call into syncService")
181
+	}
182
+
183
+	foundSecret := false
184
+	foundServiceUpdate := false
185
+	for _, action := range kubeclient.Actions() {
186
+		switch {
187
+		case action.Matches("get", "secrets"):
188
+			foundSecret = true
189
+
190
+		case action.Matches("update", "services"):
191
+			updateService := action.(core.UpdateAction)
192
+			service := updateService.GetObject().(*kapi.Service)
193
+			if !reflect.DeepEqual(service.Annotations, expectedServiceAnnotations) {
194
+				t.Errorf("expected %v, got %v", expectedServiceAnnotations, service.Annotations)
195
+				continue
196
+			}
197
+			foundServiceUpdate = true
198
+
199
+		}
200
+	}
201
+
202
+	if !foundSecret {
203
+		t.Errorf("secret wasn't retrieved.  Got %v\n", kubeclient.Actions())
204
+	}
205
+	if !foundServiceUpdate {
206
+		t.Errorf("service wasn't updated.  Got %v\n", kubeclient.Actions())
207
+	}
208
+
209
+}
210
+
211
+func TestAlreadyExistingSecretForDifferentUIDControllerFlow(t *testing.T) {
212
+	stopChannel := make(chan struct{})
213
+	defer close(stopChannel)
214
+	received := make(chan bool)
215
+
216
+	expectedError := "secret/new-secret references serviceUID wrong-uid, which does not match some-uid"
217
+	expectedSecretName := "new-secret"
218
+	serviceName := "svc-name"
219
+	serviceUID := "some-uid"
220
+	namespace := "ns"
221
+
222
+	existingSecret := &kapi.Secret{}
223
+	existingSecret.Name = expectedSecretName
224
+	existingSecret.Namespace = namespace
225
+	existingSecret.Type = kapi.SecretTypeTLS
226
+	existingSecret.Annotations = map[string]string{ServiceUIDAnnotation: "wrong-uid", ServiceNameAnnotation: serviceName}
227
+
228
+	_, kubeclient, fakeWatch, controller := controllerSetup([]runtime.Object{existingSecret}, stopChannel, t)
229
+	kubeclient.PrependReactor("create", "secrets", func(action core.Action) (handled bool, ret runtime.Object, err error) {
230
+		return true, &kapi.Secret{}, kapierrors.NewAlreadyExists(kapi.Resource("secrets"), "new-secret")
231
+	})
232
+	controller.syncHandler = func(serviceKey string) error {
233
+		defer func() { received <- true }()
234
+
235
+		err := controller.syncService(serviceKey)
236
+		if err != nil && err.Error() != expectedError {
237
+			t.Errorf("unexpected error: %v", err)
238
+		}
239
+
240
+		return err
241
+	}
242
+	go controller.Run(1, stopChannel)
243
+
244
+	expectedServiceAnnotations := map[string]string{ServingCertSecretAnnotation: expectedSecretName, ServingCertErrorAnnotation: expectedError, ServingCertErrorNumAnnotation: "1"}
245
+
246
+	serviceToAdd := &kapi.Service{}
247
+	serviceToAdd.Name = serviceName
248
+	serviceToAdd.Namespace = namespace
249
+	serviceToAdd.UID = types.UID(serviceUID)
250
+	serviceToAdd.Annotations = map[string]string{ServingCertSecretAnnotation: expectedSecretName}
251
+	fakeWatch.Add(serviceToAdd)
252
+
253
+	t.Log("waiting to reach syncHandler")
254
+	select {
255
+	case <-received:
256
+	case <-time.After(time.Duration(30 * time.Second)):
257
+		t.Fatalf("failed to call into syncService")
258
+	}
259
+
260
+	foundSecret := false
261
+	foundServiceUpdate := false
262
+	for _, action := range kubeclient.Actions() {
263
+		switch {
264
+		case action.Matches("get", "secrets"):
265
+			foundSecret = true
266
+
267
+		case action.Matches("update", "services"):
268
+			updateService := action.(core.UpdateAction)
269
+			service := updateService.GetObject().(*kapi.Service)
270
+			if !reflect.DeepEqual(service.Annotations, expectedServiceAnnotations) {
271
+				t.Errorf("expected %v, got %v", expectedServiceAnnotations, service.Annotations)
272
+				continue
273
+			}
274
+			foundServiceUpdate = true
275
+
276
+		}
277
+	}
278
+
279
+	if !foundSecret {
280
+		t.Errorf("secret wasn't retrieved.  Got %v\n", kubeclient.Actions())
281
+	}
282
+	if !foundServiceUpdate {
283
+		t.Errorf("service wasn't updated.  Got %v\n", kubeclient.Actions())
284
+	}
285
+}
286
+
287
+func TestSecretCreationErrorControllerFlow(t *testing.T) {
288
+	stopChannel := make(chan struct{})
289
+	defer close(stopChannel)
290
+	received := make(chan bool)
291
+
292
+	expectedError := `secrets "new-secret" is forbidden: any reason`
293
+	expectedSecretName := "new-secret"
294
+	serviceName := "svc-name"
295
+	serviceUID := "some-uid"
296
+	namespace := "ns"
297
+
298
+	_, kubeclient, fakeWatch, controller := controllerSetup([]runtime.Object{}, stopChannel, t)
299
+	kubeclient.PrependReactor("create", "secrets", func(action core.Action) (handled bool, ret runtime.Object, err error) {
300
+		return true, &kapi.Secret{}, kapierrors.NewForbidden(kapi.Resource("secrets"), "new-secret", fmt.Errorf("any reason"))
301
+	})
302
+	controller.syncHandler = func(serviceKey string) error {
303
+		defer func() { received <- true }()
304
+
305
+		err := controller.syncService(serviceKey)
306
+		if err != nil && err.Error() != expectedError {
307
+			t.Errorf("unexpected error: %v", err)
308
+		}
309
+
310
+		return err
311
+	}
312
+	go controller.Run(1, stopChannel)
313
+
314
+	expectedServiceAnnotations := map[string]string{ServingCertSecretAnnotation: expectedSecretName, ServingCertErrorAnnotation: expectedError, ServingCertErrorNumAnnotation: "1"}
315
+
316
+	serviceToAdd := &kapi.Service{}
317
+	serviceToAdd.Name = serviceName
318
+	serviceToAdd.Namespace = namespace
319
+	serviceToAdd.UID = types.UID(serviceUID)
320
+	serviceToAdd.Annotations = map[string]string{ServingCertSecretAnnotation: expectedSecretName}
321
+	fakeWatch.Add(serviceToAdd)
322
+
323
+	t.Log("waiting to reach syncHandler")
324
+	select {
325
+	case <-received:
326
+	case <-time.After(time.Duration(30 * time.Second)):
327
+		t.Fatalf("failed to call into syncService")
328
+	}
329
+
330
+	foundServiceUpdate := false
331
+	for _, action := range kubeclient.Actions() {
332
+		switch {
333
+		case action.Matches("update", "services"):
334
+			updateService := action.(core.UpdateAction)
335
+			service := updateService.GetObject().(*kapi.Service)
336
+			if !reflect.DeepEqual(service.Annotations, expectedServiceAnnotations) {
337
+				t.Errorf("expected %v, got %v", expectedServiceAnnotations, service.Annotations)
338
+				continue
339
+			}
340
+			foundServiceUpdate = true
341
+
342
+		}
343
+	}
344
+
345
+	if !foundServiceUpdate {
346
+		t.Errorf("service wasn't updated.  Got %v\n", kubeclient.Actions())
347
+	}
348
+}
349
+
350
+func TestSkipGenerationControllerFlow(t *testing.T) {
351
+	stopChannel := make(chan struct{})
352
+	defer close(stopChannel)
353
+	received := make(chan bool)
354
+
355
+	expectedSecretName := "new-secret"
356
+	serviceName := "svc-name"
357
+	serviceUID := "some-uid"
358
+	namespace := "ns"
359
+
360
+	caName, kubeclient, fakeWatch, controller := controllerSetup([]runtime.Object{}, stopChannel, t)
361
+	kubeclient.PrependReactor("update", "service", func(action core.Action) (handled bool, ret runtime.Object, err error) {
362
+		return true, &kapi.Service{}, kapierrors.NewForbidden(kapi.Resource("fdsa"), "new-service", fmt.Errorf("any service reason"))
363
+	})
364
+	kubeclient.PrependReactor("create", "secret", func(action core.Action) (handled bool, ret runtime.Object, err error) {
365
+		return true, &kapi.Secret{}, kapierrors.NewForbidden(kapi.Resource("asdf"), "new-secret", fmt.Errorf("any reason"))
366
+	})
367
+	kubeclient.PrependReactor("update", "secret", func(action core.Action) (handled bool, ret runtime.Object, err error) {
368
+		return true, &kapi.Secret{}, kapierrors.NewForbidden(kapi.Resource("asdf"), "new-secret", fmt.Errorf("any reason"))
369
+	})
370
+	controller.syncHandler = func(serviceKey string) error {
371
+		defer func() { received <- true }()
372
+
373
+		err := controller.syncService(serviceKey)
374
+		if err != nil {
375
+			t.Errorf("unexpected error: %v", err)
376
+		}
377
+
378
+		return err
379
+	}
380
+	go controller.Run(1, stopChannel)
381
+
382
+	serviceToAdd := &kapi.Service{}
383
+	serviceToAdd.Name = serviceName
384
+	serviceToAdd.Namespace = namespace
385
+	serviceToAdd.UID = types.UID(serviceUID)
386
+	serviceToAdd.Annotations = map[string]string{ServingCertSecretAnnotation: expectedSecretName, ServingCertErrorAnnotation: "any-error", ServingCertErrorNumAnnotation: "11"}
387
+	fakeWatch.Add(serviceToAdd)
388
+
389
+	t.Log("waiting to reach syncHandler")
390
+	select {
391
+	case <-received:
392
+	case <-time.After(time.Duration(30 * time.Second)):
393
+		t.Fatalf("failed to call into syncService")
394
+	}
395
+
396
+	for _, action := range kubeclient.Actions() {
397
+		switch action.GetVerb() {
398
+		case "update", "create":
399
+			t.Errorf("no mutation expected, but we got %v", action)
400
+		}
401
+	}
402
+
403
+	kubeclient.ClearActions()
404
+	serviceToAdd.Annotations = map[string]string{ServingCertSecretAnnotation: expectedSecretName, ServingCertCreatedByAnnotation: caName}
405
+	fakeWatch.Add(serviceToAdd)
406
+
407
+	t.Log("waiting to reach syncHandler")
408
+	select {
409
+	case <-received:
410
+	case <-time.After(time.Duration(30 * time.Second)):
411
+		t.Fatalf("failed to call into syncService")
412
+	}
413
+
414
+	for _, action := range kubeclient.Actions() {
415
+		switch action.GetVerb() {
416
+		case "update", "create":
417
+			t.Errorf("no mutation expected, but we got %v", action)
418
+		}
419
+	}
420
+}
0 421
new file mode 100644
... ...
@@ -0,0 +1,285 @@
0
+package servingcert
1
+
2
+import (
3
+	"fmt"
4
+	"time"
5
+
6
+	"github.com/golang/glog"
7
+
8
+	kapi "k8s.io/kubernetes/pkg/api"
9
+	"k8s.io/kubernetes/pkg/client/cache"
10
+	kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
11
+	"k8s.io/kubernetes/pkg/controller"
12
+	"k8s.io/kubernetes/pkg/controller/framework"
13
+	"k8s.io/kubernetes/pkg/runtime"
14
+	utilruntime "k8s.io/kubernetes/pkg/util/runtime"
15
+	"k8s.io/kubernetes/pkg/util/sets"
16
+	"k8s.io/kubernetes/pkg/util/wait"
17
+	"k8s.io/kubernetes/pkg/util/workqueue"
18
+	"k8s.io/kubernetes/pkg/watch"
19
+
20
+	"github.com/openshift/origin/pkg/cmd/server/crypto"
21
+)
22
+
23
+// ServiceServingCertUpdateController is responsible for synchronizing Service objects stored
24
+// in the system with actual running replica sets and pods.
25
+type ServiceServingCertUpdateController struct {
26
+	secretClient kcoreclient.SecretsGetter
27
+
28
+	// Services that need to be checked
29
+	queue workqueue.RateLimitingInterface
30
+
31
+	serviceCache      cache.Store
32
+	serviceController *framework.Controller
33
+	serviceHasSynced  informerSynced
34
+
35
+	secretCache      cache.Store
36
+	secretController *framework.Controller
37
+	secretHasSynced  informerSynced
38
+
39
+	ca         *crypto.CA
40
+	publicCert string
41
+	dnsSuffix  string
42
+	// minTimeLeftForCert is how much time is remaining for the serving cert before regenerating it.
43
+	minTimeLeftForCert time.Duration
44
+
45
+	// syncHandler does the work. It's factored out for unit testing
46
+	syncHandler func(serviceKey string) error
47
+}
48
+
49
+// NewServiceServingCertUpdateController creates a new ServiceServingCertUpdateController.
50
+// TODO this should accept a shared informer
51
+func NewServiceServingCertUpdateController(serviceClient kcoreclient.ServicesGetter, secretClient kcoreclient.SecretsGetter, ca *crypto.CA, dnsSuffix string, resyncInterval time.Duration) *ServiceServingCertUpdateController {
52
+	sc := &ServiceServingCertUpdateController{
53
+		secretClient: secretClient,
54
+
55
+		queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
56
+
57
+		ca:        ca,
58
+		dnsSuffix: dnsSuffix,
59
+		// TODO base the expiry time on a percentage of the time for the lifespan of the cert
60
+		minTimeLeftForCert: 1 * time.Hour,
61
+	}
62
+
63
+	sc.serviceCache, sc.serviceController = framework.NewInformer(
64
+		&cache.ListWatch{
65
+			ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
66
+				return serviceClient.Services(kapi.NamespaceAll).List(options)
67
+			},
68
+			WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
69
+				return serviceClient.Services(kapi.NamespaceAll).Watch(options)
70
+			},
71
+		},
72
+		&kapi.Service{},
73
+		resyncInterval,
74
+		framework.ResourceEventHandlerFuncs{},
75
+	)
76
+	sc.serviceHasSynced = sc.serviceController.HasSynced
77
+
78
+	sc.secretCache, sc.secretController = framework.NewInformer(
79
+		&cache.ListWatch{
80
+			ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
81
+				return sc.secretClient.Secrets(kapi.NamespaceAll).List(options)
82
+			},
83
+			WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
84
+				return sc.secretClient.Secrets(kapi.NamespaceAll).Watch(options)
85
+			},
86
+		},
87
+		&kapi.Secret{},
88
+		resyncInterval,
89
+		framework.ResourceEventHandlerFuncs{
90
+			AddFunc:    sc.addSecret,
91
+			UpdateFunc: sc.updateSecret,
92
+		},
93
+	)
94
+	sc.secretHasSynced = sc.secretController.HasSynced
95
+
96
+	sc.syncHandler = sc.syncSecret
97
+
98
+	return sc
99
+}
100
+
101
+// Run begins watching and syncing.
102
+func (sc *ServiceServingCertUpdateController) Run(workers int, stopCh <-chan struct{}) {
103
+	defer utilruntime.HandleCrash()
104
+	defer glog.Infof("Shutting down service signing cert update controller")
105
+	defer sc.queue.ShutDown()
106
+
107
+	glog.Infof("starting service signing cert update controller")
108
+	go sc.serviceController.Run(stopCh)
109
+	go sc.secretController.Run(stopCh)
110
+
111
+	if !waitForCacheSync(stopCh, sc.serviceHasSynced, sc.secretHasSynced) {
112
+		return
113
+	}
114
+
115
+	for i := 0; i < workers; i++ {
116
+		go wait.Until(sc.runWorker, time.Second, stopCh)
117
+	}
118
+
119
+	<-stopCh
120
+}
121
+
122
+// TODO this is all in the kube library after the 1.5 rebase
123
+
124
+// informerSynced is a function that can be used to determine if an informer has synced.  This is useful for determining if caches have synced.
125
+type informerSynced func() bool
126
+
127
+// syncedPollPeriod controls how often you look at the status of your sync funcs
128
+const syncedPollPeriod = 100 * time.Millisecond
129
+
130
+func waitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...informerSynced) bool {
131
+	err := wait.PollUntil(syncedPollPeriod,
132
+		func() (bool, error) {
133
+			for _, syncFunc := range cacheSyncs {
134
+				if !syncFunc() {
135
+					return false, nil
136
+				}
137
+			}
138
+			return true, nil
139
+		},
140
+		stopCh)
141
+	if err != nil {
142
+		glog.V(2).Infof("stop requested")
143
+		return false
144
+	}
145
+
146
+	glog.V(4).Infof("caches populated")
147
+	return true
148
+}
149
+
150
+func (sc *ServiceServingCertUpdateController) enqueueSecret(obj interface{}) {
151
+	key, err := controller.KeyFunc(obj)
152
+	if err != nil {
153
+		glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
154
+		return
155
+	}
156
+
157
+	sc.queue.Add(key)
158
+}
159
+
160
+func (sc *ServiceServingCertUpdateController) addSecret(obj interface{}) {
161
+	secret := obj.(*kapi.Secret)
162
+	if len(secret.Annotations[ServiceNameAnnotation]) == 0 {
163
+		return
164
+	}
165
+
166
+	glog.V(4).Infof("adding %s", secret.Name)
167
+	sc.enqueueSecret(secret)
168
+}
169
+
170
+func (sc *ServiceServingCertUpdateController) updateSecret(old, cur interface{}) {
171
+	secret := cur.(*kapi.Secret)
172
+	if len(secret.Annotations[ServiceNameAnnotation]) == 0 {
173
+		// if the current doesn't have a service name, check the old
174
+		secret = old.(*kapi.Secret)
175
+		if len(secret.Annotations[ServiceNameAnnotation]) == 0 {
176
+			return
177
+		}
178
+	}
179
+
180
+	glog.V(4).Infof("updating %s", secret.Name)
181
+	sc.enqueueSecret(secret)
182
+}
183
+
184
+func (sc *ServiceServingCertUpdateController) runWorker() {
185
+	for sc.processNextWorkItem() {
186
+	}
187
+}
188
+
189
+// processNextWorkItem deals with one key off the queue.  It returns false when it's time to quit.
190
+func (sc *ServiceServingCertUpdateController) processNextWorkItem() bool {
191
+	key, quit := sc.queue.Get()
192
+	if quit {
193
+		return false
194
+	}
195
+	defer sc.queue.Done(key)
196
+
197
+	err := sc.syncHandler(key.(string))
198
+	if err == nil {
199
+		sc.queue.Forget(key)
200
+		return true
201
+	}
202
+
203
+	utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
204
+	sc.queue.AddRateLimited(key)
205
+
206
+	return true
207
+}
208
+
209
+// syncSecret will sync the service with the given key.
210
+// This function is not meant to be invoked concurrently with the same key.
211
+func (sc *ServiceServingCertUpdateController) syncSecret(key string) error {
212
+	obj, exists, err := sc.secretCache.GetByKey(key)
213
+	if err != nil {
214
+		glog.V(4).Infof("Unable to retrieve service %v from store: %v", key, err)
215
+		return err
216
+	}
217
+	if !exists {
218
+		glog.V(4).Infof("Secret has been deleted %v", key)
219
+		return nil
220
+	}
221
+
222
+	if !sc.requiresRegeneration(obj.(*kapi.Secret)) {
223
+		return nil
224
+	}
225
+
226
+	// make a copy to avoid mutating cache state
227
+	t, err := kapi.Scheme.DeepCopy(obj)
228
+	if err != nil {
229
+		return err
230
+	}
231
+	secret := t.(*kapi.Secret)
232
+
233
+	dnsName := secret.Annotations[ServiceNameAnnotation] + "." + secret.Namespace + ".svc"
234
+	fqDNSName := dnsName + "." + sc.dnsSuffix
235
+	servingCert, err := sc.ca.MakeServerCert(sets.NewString(dnsName, fqDNSName))
236
+	if err != nil {
237
+		return err
238
+	}
239
+	secret.Annotations[ServingCertExpiryAnnotation] = servingCert.Certs[0].NotAfter.Format(time.RFC3339)
240
+	secret.Data[kapi.TLSCertKey], secret.Data[kapi.TLSPrivateKeyKey], err = servingCert.GetPEMBytes()
241
+	if err != nil {
242
+		return err
243
+	}
244
+
245
+	_, err = sc.secretClient.Secrets(secret.Namespace).Update(secret)
246
+	return err
247
+}
248
+
249
+func (sc *ServiceServingCertUpdateController) requiresRegeneration(secret *kapi.Secret) bool {
250
+	serviceName := secret.Annotations[ServiceNameAnnotation]
251
+	if len(serviceName) == 0 {
252
+		return false
253
+	}
254
+
255
+	serviceObj, exists, err := sc.serviceCache.GetByKey(secret.Namespace + "/" + serviceName)
256
+	if err != nil {
257
+		return false
258
+	}
259
+	if !exists {
260
+		return false
261
+	}
262
+
263
+	service := serviceObj.(*kapi.Service)
264
+	if secret.Annotations[ServiceUIDAnnotation] != string(service.UID) {
265
+		return false
266
+	}
267
+
268
+	// if we don't have the annotation for expiry, just go ahead and regenerate.  It's easier than writing a
269
+	// secondary logic flow that creates the expiry dates
270
+	expiryString, ok := secret.Annotations[ServingCertExpiryAnnotation]
271
+	if !ok {
272
+		return true
273
+	}
274
+	expiry, err := time.Parse(time.RFC3339, expiryString)
275
+	if err != nil {
276
+		return true
277
+	}
278
+
279
+	if time.Now().Add(sc.minTimeLeftForCert).After(expiry) {
280
+		return true
281
+	}
282
+
283
+	return false
284
+}
0 285
new file mode 100644
... ...
@@ -0,0 +1,148 @@
0
+package servingcert
1
+
2
+import (
3
+	"testing"
4
+	"time"
5
+
6
+	kapi "k8s.io/kubernetes/pkg/api"
7
+	"k8s.io/kubernetes/pkg/client/cache"
8
+	"k8s.io/kubernetes/pkg/controller/framework"
9
+	"k8s.io/kubernetes/pkg/types"
10
+)
11
+
12
+func TestRequiresRegenerationServiceUIDMismatch(t *testing.T) {
13
+	tests := []struct {
14
+		name          string
15
+		primeServices func(cache.Store)
16
+		secret        *kapi.Secret
17
+		expected      bool
18
+	}{
19
+		{
20
+			name:          "no service annotation",
21
+			primeServices: func(serviceCache cache.Store) {},
22
+			secret: &kapi.Secret{
23
+				ObjectMeta: kapi.ObjectMeta{
24
+					Namespace: "ns1", Name: "mysecret",
25
+					Annotations: map[string]string{},
26
+				},
27
+			},
28
+			expected: false,
29
+		},
30
+		{
31
+			name:          "missing service",
32
+			primeServices: func(serviceCache cache.Store) {},
33
+			secret: &kapi.Secret{
34
+				ObjectMeta: kapi.ObjectMeta{
35
+					Namespace: "ns1", Name: "mysecret",
36
+					Annotations: map[string]string{
37
+						ServiceNameAnnotation: "foo",
38
+					},
39
+				},
40
+			},
41
+			expected: false,
42
+		},
43
+		{
44
+			name: "service-uid-mismatch",
45
+			primeServices: func(serviceCache cache.Store) {
46
+				serviceCache.Add(&kapi.Service{
47
+					ObjectMeta: kapi.ObjectMeta{Namespace: "ns1", Name: "foo", UID: types.UID("uid-2")},
48
+				})
49
+			},
50
+			secret: &kapi.Secret{
51
+				ObjectMeta: kapi.ObjectMeta{
52
+					Namespace: "ns1", Name: "mysecret",
53
+					Annotations: map[string]string{
54
+						ServiceNameAnnotation: "foo",
55
+						ServiceUIDAnnotation:  "uid-1",
56
+					},
57
+				},
58
+			},
59
+			expected: false,
60
+		},
61
+		{
62
+			name: "no expiry",
63
+			primeServices: func(serviceCache cache.Store) {
64
+				serviceCache.Add(&kapi.Service{
65
+					ObjectMeta: kapi.ObjectMeta{Namespace: "ns1", Name: "foo", UID: types.UID("uid-1")},
66
+				})
67
+			},
68
+			secret: &kapi.Secret{
69
+				ObjectMeta: kapi.ObjectMeta{
70
+					Namespace: "ns1", Name: "mysecret",
71
+					Annotations: map[string]string{
72
+						ServiceNameAnnotation: "foo",
73
+						ServiceUIDAnnotation:  "uid-1",
74
+					},
75
+				},
76
+			},
77
+			expected: true,
78
+		},
79
+		{
80
+			name: "bad expiry",
81
+			primeServices: func(serviceCache cache.Store) {
82
+				serviceCache.Add(&kapi.Service{
83
+					ObjectMeta: kapi.ObjectMeta{Namespace: "ns1", Name: "foo", UID: types.UID("uid-1")},
84
+				})
85
+			},
86
+			secret: &kapi.Secret{
87
+				ObjectMeta: kapi.ObjectMeta{
88
+					Namespace: "ns1", Name: "mysecret",
89
+					Annotations: map[string]string{
90
+						ServiceNameAnnotation:       "foo",
91
+						ServiceUIDAnnotation:        "uid-1",
92
+						ServingCertExpiryAnnotation: "bad-format",
93
+					},
94
+				},
95
+			},
96
+			expected: true,
97
+		},
98
+		{
99
+			name: "expired expiry",
100
+			primeServices: func(serviceCache cache.Store) {
101
+				serviceCache.Add(&kapi.Service{
102
+					ObjectMeta: kapi.ObjectMeta{Namespace: "ns1", Name: "foo", UID: types.UID("uid-1")},
103
+				})
104
+			},
105
+			secret: &kapi.Secret{
106
+				ObjectMeta: kapi.ObjectMeta{
107
+					Namespace: "ns1", Name: "mysecret",
108
+					Annotations: map[string]string{
109
+						ServiceNameAnnotation:       "foo",
110
+						ServiceUIDAnnotation:        "uid-1",
111
+						ServingCertExpiryAnnotation: time.Now().Add(-30 * time.Minute).Format(time.RFC3339),
112
+					},
113
+				},
114
+			},
115
+			expected: true,
116
+		},
117
+		{
118
+			name: "distant expiry",
119
+			primeServices: func(serviceCache cache.Store) {
120
+				serviceCache.Add(&kapi.Service{
121
+					ObjectMeta: kapi.ObjectMeta{Namespace: "ns1", Name: "foo", UID: types.UID("uid-1")},
122
+				})
123
+			},
124
+			secret: &kapi.Secret{
125
+				ObjectMeta: kapi.ObjectMeta{
126
+					Namespace: "ns1", Name: "mysecret",
127
+					Annotations: map[string]string{
128
+						ServiceNameAnnotation:       "foo",
129
+						ServiceUIDAnnotation:        "uid-1",
130
+						ServingCertExpiryAnnotation: time.Now().Add(10 * time.Minute).Format(time.RFC3339),
131
+					},
132
+				},
133
+			},
134
+			expected: false,
135
+		},
136
+	}
137
+	for _, tc := range tests {
138
+		c := &ServiceServingCertUpdateController{
139
+			serviceCache: cache.NewStore(framework.DeletionHandlingMetaNamespaceKeyFunc),
140
+		}
141
+		tc.primeServices(c.serviceCache)
142
+		actual := c.requiresRegeneration(tc.secret)
143
+		if tc.expected != actual {
144
+			t.Errorf("%s: expected %v, got %v", tc.name, tc.expected, actual)
145
+		}
146
+	}
147
+}
0 148
new file mode 100644
... ...
@@ -0,0 +1,149 @@
0
+package integration
1
+
2
+import (
3
+	"reflect"
4
+	"testing"
5
+	"time"
6
+
7
+	kapi "k8s.io/kubernetes/pkg/api"
8
+	"k8s.io/kubernetes/pkg/watch"
9
+
10
+	testutil "github.com/openshift/origin/test/util"
11
+	testserver "github.com/openshift/origin/test/util/server"
12
+
13
+	"github.com/openshift/origin/pkg/service/controller/servingcert"
14
+)
15
+
16
+func TestServiceServingCertSigner(t *testing.T) {
17
+	ns := "service-serving-cert-signer"
18
+
19
+	testutil.RequireEtcd(t)
20
+	defer testutil.DumpEtcdOnFailure(t)
21
+
22
+	_, clusterAdminKubeConfig, err := testserver.StartTestMaster()
23
+	if err != nil {
24
+		t.Fatal(err)
25
+	}
26
+	clusterAdminConfig, err := testutil.GetClusterAdminClientConfig(clusterAdminKubeConfig)
27
+	if err != nil {
28
+		t.Fatal(err)
29
+	}
30
+	clusterAdminClient, err := testutil.GetClusterAdminClient(clusterAdminKubeConfig)
31
+	if err != nil {
32
+		t.Fatal(err)
33
+	}
34
+	clusterAdminKubeClientset, err := testutil.GetClusterAdminKubeClient(clusterAdminKubeConfig)
35
+	if err != nil {
36
+		t.Fatal(err)
37
+	}
38
+	if _, err := testserver.CreateNewProject(clusterAdminClient, *clusterAdminConfig, "service-serving-cert-signer", "deads"); err != nil {
39
+		t.Fatal(err)
40
+	}
41
+
42
+	service := &kapi.Service{
43
+		ObjectMeta: kapi.ObjectMeta{
44
+			Name: "my-svc",
45
+			Annotations: map[string]string{
46
+				servingcert.ServingCertSecretAnnotation: "my-secret",
47
+			},
48
+		},
49
+		Spec: kapi.ServiceSpec{
50
+			Ports: []kapi.ServicePort{
51
+				{Port: 80},
52
+			},
53
+		},
54
+	}
55
+	actualService, err := clusterAdminKubeClientset.Services(ns).Create(service)
56
+	if err != nil {
57
+		t.Fatal(err)
58
+	}
59
+
60
+	var actualFirstSecret *kapi.Secret
61
+	secretWatcher1, err := clusterAdminKubeClientset.Secrets(ns).Watch(kapi.ListOptions{ResourceVersion: actualService.ResourceVersion})
62
+	if err != nil {
63
+		t.Fatal(err)
64
+	}
65
+	_, err = watch.Until(30*time.Second, secretWatcher1, func(event watch.Event) (bool, error) {
66
+		if event.Type != watch.Added {
67
+			return false, nil
68
+		}
69
+		secret := event.Object.(*kapi.Secret)
70
+		if secret.Name == "my-secret" {
71
+			actualFirstSecret = secret
72
+			return true, nil
73
+		}
74
+		return false, nil
75
+	})
76
+	if err != nil {
77
+		t.Fatal(err)
78
+	}
79
+	secretWatcher1.Stop()
80
+
81
+	// now check to make sure that regeneration works.  First, remove the annotation entirely, this simulates
82
+	// the "old data" case where the expiry didn't exist
83
+	delete(actualFirstSecret.Annotations, servingcert.ServingCertExpiryAnnotation)
84
+	actualSecondSecret, err := clusterAdminKubeClientset.Secrets(ns).Update(actualFirstSecret)
85
+	if err != nil {
86
+		t.Fatal(err)
87
+	}
88
+
89
+	var actualThirdSecret *kapi.Secret
90
+	secretWatcher2, err := clusterAdminKubeClientset.Secrets(ns).Watch(kapi.ListOptions{ResourceVersion: actualSecondSecret.ResourceVersion})
91
+	if err != nil {
92
+		t.Fatal(err)
93
+	}
94
+	_, err = watch.Until(30*time.Second, secretWatcher2, func(event watch.Event) (bool, error) {
95
+		if event.Type != watch.Modified {
96
+			return false, nil
97
+		}
98
+		secret := event.Object.(*kapi.Secret)
99
+		if secret.Name == "my-secret" {
100
+			actualThirdSecret = secret
101
+			return true, nil
102
+		}
103
+		return false, nil
104
+	})
105
+	if err != nil {
106
+		t.Fatal(err)
107
+	}
108
+	secretWatcher2.Stop()
109
+
110
+	if _, ok := actualThirdSecret.Annotations[servingcert.ServingCertExpiryAnnotation]; !ok {
111
+		t.Fatalf("missing annotation: %#v", actualThirdSecret)
112
+	}
113
+	if reflect.DeepEqual(actualThirdSecret.Data, actualSecondSecret.Data) {
114
+		t.Fatalf("didn't update secret content: %#v", actualThirdSecret)
115
+	}
116
+
117
+	// now change the annotation to indicate that we're about to expire.  The controller should regenerate.
118
+	actualThirdSecret.Annotations[servingcert.ServingCertExpiryAnnotation] = time.Now().Add(10 * time.Second).Format(time.RFC3339)
119
+	actualFourthSecret, err := clusterAdminKubeClientset.Secrets(ns).Update(actualThirdSecret)
120
+	if err != nil {
121
+		t.Fatal(err)
122
+	}
123
+
124
+	var actualFifthSecret *kapi.Secret
125
+	secretWatcher3, err := clusterAdminKubeClientset.Secrets(ns).Watch(kapi.ListOptions{ResourceVersion: actualFourthSecret.ResourceVersion})
126
+	if err != nil {
127
+		t.Fatal(err)
128
+	}
129
+	_, err = watch.Until(30*time.Second, secretWatcher3, func(event watch.Event) (bool, error) {
130
+		if event.Type != watch.Modified {
131
+			return false, nil
132
+		}
133
+		secret := event.Object.(*kapi.Secret)
134
+		if secret.Name == "my-secret" {
135
+			actualFifthSecret = secret
136
+			return true, nil
137
+		}
138
+		return false, nil
139
+	})
140
+	if err != nil {
141
+		t.Fatal(err)
142
+	}
143
+	secretWatcher3.Stop()
144
+
145
+	if reflect.DeepEqual(actualFourthSecret.Data, actualFifthSecret.Data) {
146
+		t.Fatalf("didn't update secret content: %#v", actualFifthSecret)
147
+	}
148
+}
... ...
@@ -3455,6 +3455,9 @@ items:
3455 3455
     verbs:
3456 3456
     - create
3457 3457
     - get
3458
+    - list
3459
+    - update
3460
+    - watch
3458 3461
 - apiVersion: v1
3459 3462
   kind: ClusterRole
3460 3463
   metadata: