Browse code

Enable scheduled jobs controller and disruption budgets conditionally

Clayton Coleman authored on 2016/09/08 12:58:49
Showing 3 changed files
... ...
@@ -37,6 +37,9 @@ const (
37 37
 	InfraDaemonSetControllerServiceAccountName = "daemonset-controller"
38 38
 	DaemonSetControllerRoleName                = "system:daemonset-controller"
39 39
 
40
+	InfraDisruptionControllerServiceAccountName = "disruption-controller"
41
+	DisruptionControllerRoleName                = "system:disruption-controller"
42
+
40 43
 	InfraHPAControllerServiceAccountName = "hpa-controller"
41 44
 	HPAControllerRoleName                = "system:hpa-controller"
42 45
 
... ...
@@ -330,13 +333,13 @@ func init() {
330 330
 				{
331 331
 					APIGroups: []string{extensions.GroupName, batch.GroupName},
332 332
 					Verbs:     sets.NewString("list", "watch"),
333
-					Resources: sets.NewString("jobs"),
333
+					Resources: sets.NewString("jobs", "scheduledjobs"),
334 334
 				},
335 335
 				// JobController.syncJob() -> updateJobStatus()
336 336
 				{
337 337
 					APIGroups: []string{extensions.GroupName, batch.GroupName},
338 338
 					Verbs:     sets.NewString("update"),
339
-					Resources: sets.NewString("jobs/status"),
339
+					Resources: sets.NewString("jobs/status", "scheduledjobs/status"),
340 340
 				},
341 341
 				// JobController.podController.ListWatch
342 342
 				{
... ...
@@ -679,6 +682,50 @@ func init() {
679 679
 	}
680 680
 
681 681
 	err = InfraSAs.addServiceAccount(
682
+		InfraDisruptionControllerServiceAccountName,
683
+		authorizationapi.ClusterRole{
684
+			ObjectMeta: kapi.ObjectMeta{
685
+				Name: DisruptionControllerRoleName,
686
+			},
687
+			Rules: []authorizationapi.PolicyRule{
688
+				// DisruptionBudgetController.dStore.ListWatch
689
+				{
690
+					APIGroups: []string{extensions.GroupName},
691
+					Verbs:     sets.NewString("list", "watch"),
692
+					Resources: sets.NewString("deployments"),
693
+				},
694
+				// DisruptionBudgetController.rsStore.ListWatch
695
+				{
696
+					APIGroups: []string{extensions.GroupName},
697
+					Verbs:     sets.NewString("list", "watch"),
698
+					Resources: sets.NewString("replicasets"),
699
+				},
700
+				// DisruptionBudgetController.rcStore.ListWatch
701
+				{
702
+					APIGroups: []string{kapi.GroupName},
703
+					Verbs:     sets.NewString("list", "watch"),
704
+					Resources: sets.NewString("replicationcontrollers"),
705
+				},
706
+				// DisruptionBudgetController.dStore.ListWatch
707
+				{
708
+					APIGroups: []string{policy.GroupName},
709
+					Verbs:     sets.NewString("get", "list", "watch"),
710
+					Resources: sets.NewString("poddisruptionbudgets"),
711
+				},
712
+				// DisruptionBudgetController.dbControl
713
+				{
714
+					APIGroups: []string{policy.GroupName},
715
+					Verbs:     sets.NewString("update"),
716
+					Resources: sets.NewString("poddisruptionbudgets/status"),
717
+				},
718
+			},
719
+		},
720
+	)
721
+	if err != nil {
722
+		panic(err)
723
+	}
724
+
725
+	err = InfraSAs.addServiceAccount(
682 726
 		InfraNamespaceControllerServiceAccountName,
683 727
 		authorizationapi.ClusterRole{
684 728
 			ObjectMeta: kapi.ObjectMeta{
... ...
@@ -17,12 +17,18 @@ import (
17 17
 	"k8s.io/kubernetes/pkg/api/v1"
18 18
 	appsv1alpha1 "k8s.io/kubernetes/pkg/apis/apps/v1alpha1"
19 19
 	autoscalingv1 "k8s.io/kubernetes/pkg/apis/autoscaling/v1"
20
+	"k8s.io/kubernetes/pkg/apis/batch"
20 21
 	batchv1 "k8s.io/kubernetes/pkg/apis/batch/v1"
22
+	batchv2alpha1 "k8s.io/kubernetes/pkg/apis/batch/v2alpha1"
23
+	certificatesv1alpha1 "k8s.io/kubernetes/pkg/apis/certificates/v1alpha1"
21 24
 	"k8s.io/kubernetes/pkg/apis/componentconfig"
22 25
 	extv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
26
+	policyv1alpha1 "k8s.io/kubernetes/pkg/apis/policy/v1alpha1"
23 27
 	"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
24 28
 	"k8s.io/kubernetes/pkg/client/record"
29
+	"k8s.io/kubernetes/pkg/client/restclient"
25 30
 	"k8s.io/kubernetes/pkg/client/typed/dynamic"
31
+	kclient "k8s.io/kubernetes/pkg/client/unversioned"
26 32
 	clientadapter "k8s.io/kubernetes/pkg/client/unversioned/adapters/internalclientset"
27 33
 	"k8s.io/kubernetes/pkg/controller/deployment"
28 34
 	"k8s.io/kubernetes/pkg/master"
... ...
@@ -34,6 +40,7 @@ import (
34 34
 
35 35
 	client "k8s.io/kubernetes/pkg/client/unversioned"
36 36
 	"k8s.io/kubernetes/pkg/controller/daemon"
37
+	"k8s.io/kubernetes/pkg/controller/disruption"
37 38
 	endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
38 39
 	jobcontroller "k8s.io/kubernetes/pkg/controller/job"
39 40
 	namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace"
... ...
@@ -44,6 +51,7 @@ import (
44 44
 	gccontroller "k8s.io/kubernetes/pkg/controller/podgc"
45 45
 	replicasetcontroller "k8s.io/kubernetes/pkg/controller/replicaset"
46 46
 	replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
47
+	"k8s.io/kubernetes/pkg/controller/scheduledjob"
47 48
 	servicecontroller "k8s.io/kubernetes/pkg/controller/service"
48 49
 	attachdetachcontroller "k8s.io/kubernetes/pkg/controller/volume/attachdetach"
49 50
 	persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
... ...
@@ -124,11 +132,15 @@ func (c *MasterConfig) InstallAPI(container *restful.Container) ([]string, error
124 124
 		messages = append(messages, fmt.Sprintf("Started Kubernetes API at %%s%s", KubeAPIPrefix))
125 125
 	}
126 126
 
127
+	// TODO: this is a bit much - I exist in some code somewhere
127 128
 	versions := []unversioned.GroupVersion{
128 129
 		extv1beta1.SchemeGroupVersion,
129 130
 		batchv1.SchemeGroupVersion,
131
+		batchv2alpha1.SchemeGroupVersion,
130 132
 		autoscalingv1.SchemeGroupVersion,
133
+		certificatesv1alpha1.SchemeGroupVersion,
131 134
 		appsv1alpha1.SchemeGroupVersion,
135
+		policyv1alpha1.SchemeGroupVersion,
132 136
 		federationv1beta1.SchemeGroupVersion,
133 137
 	}
134 138
 	for _, ver := range versions {
... ...
@@ -290,6 +302,22 @@ func (c *MasterConfig) RunJobController(client *client.Client) {
290 290
 	go controller.Run(int(c.ControllerManager.ConcurrentJobSyncs), utilwait.NeverStop)
291 291
 }
292 292
 
293
+// RunScheduledJobController starts the Kubernetes scheduled job controller sync loop
294
+func (c *MasterConfig) RunScheduledJobController(config *restclient.Config) {
295
+	// TODO: this is a temp fix for allowing kubeClient list v2alpha1 jobs, should switch to using clientset
296
+	config.ContentConfig.GroupVersion = &unversioned.GroupVersion{Group: batch.GroupName, Version: "v2alpha1"}
297
+	client, err := kclient.New(config)
298
+	if err != nil {
299
+		glog.Fatalf("Unable to configure scheduled job controller: %v", err)
300
+	}
301
+	go scheduledjob.NewScheduledJobController(client).Run(utilwait.NeverStop)
302
+}
303
+
304
+// RunDisruptionBudgetController starts the Kubernetes disruption budget controller
305
+func (c *MasterConfig) RunDisruptionBudgetController(client *client.Client) {
306
+	go disruption.NewDisruptionController(c.Informers.Pods().Informer(), client).Run(utilwait.NeverStop)
307
+}
308
+
293 309
 // RunHPAController starts the Kubernetes hpa controller sync loop
294 310
 func (c *MasterConfig) RunHPAController(oc *osclient.Client, kc *client.Client, heapsterNamespace string) {
295 311
 	clientsetClient := clientadapter.FromUnversionedClient(kc)
... ...
@@ -17,7 +17,11 @@ import (
17 17
 
18 18
 	cmapp "k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
19 19
 	kerrors "k8s.io/kubernetes/pkg/api/errors"
20
+	"k8s.io/kubernetes/pkg/apis/apps"
21
+	"k8s.io/kubernetes/pkg/apis/autoscaling"
22
+	"k8s.io/kubernetes/pkg/apis/batch"
20 23
 	"k8s.io/kubernetes/pkg/apis/extensions"
24
+	"k8s.io/kubernetes/pkg/apis/policy"
21 25
 	"k8s.io/kubernetes/pkg/capabilities"
22 26
 	"k8s.io/kubernetes/pkg/client/typed/dynamic"
23 27
 	clientadapter "k8s.io/kubernetes/pkg/client/unversioned/adapters/internalclientset"
... ...
@@ -569,7 +573,7 @@ func startControllers(oc *origin.MasterConfig, kc *kubernetes.MasterConfig) erro
569 569
 		if err != nil {
570 570
 			glog.Fatalf("Could not get client for deployment controller: %v", err)
571 571
 		}
572
-		_, _, jobClient, err := oc.GetServiceAccountClients(bootstrappolicy.InfraJobControllerServiceAccountName)
572
+		jobConfig, _, jobClient, err := oc.GetServiceAccountClients(bootstrappolicy.InfraJobControllerServiceAccountName)
573 573
 		if err != nil {
574 574
 			glog.Fatalf("Could not get client for job controller: %v", err)
575 575
 		}
... ...
@@ -593,6 +597,11 @@ func startControllers(oc *origin.MasterConfig, kc *kubernetes.MasterConfig) erro
593 593
 			glog.Fatalf("Could not get client for daemonset controller: %v", err)
594 594
 		}
595 595
 
596
+		_, _, disruptionClient, err := oc.GetServiceAccountClients(bootstrappolicy.InfraDisruptionControllerServiceAccountName)
597
+		if err != nil {
598
+			glog.Fatalf("Could not get client for disruption budget controller: %v", err)
599
+		}
600
+
596 601
 		_, _, gcClient, err := oc.GetServiceAccountClients(bootstrappolicy.InfraGCControllerServiceAccountName)
597 602
 		if err != nil {
598 603
 			glog.Fatalf("Could not get client for pod gc controller: %v", err)
... ...
@@ -629,20 +638,27 @@ func startControllers(oc *origin.MasterConfig, kc *kubernetes.MasterConfig) erro
629 629
 
630 630
 		extensionsEnabled := len(configapi.GetEnabledAPIVersionsForGroup(kc.Options, extensions.GroupName)) > 0
631 631
 
632
-		// TODO: enable this check once the job controller can use the batch API if the extensions API is disabled
633
-		// batchEnabled := len(configapi.GetEnabledAPIVersionsForGroup(kc.Options, batch.GroupName)) > 0
634
-		if extensionsEnabled /*|| batchEnabled*/ {
632
+		batchEnabled := len(configapi.GetEnabledAPIVersionsForGroup(kc.Options, batch.GroupName)) > 0
633
+		if extensionsEnabled || batchEnabled {
635 634
 			kc.RunJobController(jobClient)
636 635
 		}
636
+		if batchEnabled {
637
+			kc.RunScheduledJobController(jobConfig)
638
+		}
637 639
 		// TODO: enable this check once the HPA controller can use the autoscaling API if the extensions API is disabled
638
-		// autoscalingEnabled := len(configapi.GetEnabledAPIVersionsForGroup(kc.Options, autoscaling.GroupName)) > 0
639
-		if extensionsEnabled /*|| autoscalingEnabled*/ {
640
+		autoscalingEnabled := len(configapi.GetEnabledAPIVersionsForGroup(kc.Options, autoscaling.GroupName)) > 0
641
+		if extensionsEnabled || autoscalingEnabled {
640 642
 			kc.RunHPAController(hpaOClient, hpaKClient, oc.Options.PolicyConfig.OpenShiftInfrastructureNamespace)
641 643
 		}
642 644
 		if extensionsEnabled {
643 645
 			kc.RunDaemonSetsController(daemonSetClient)
644 646
 		}
645 647
 
648
+		policyEnabled := len(configapi.GetEnabledAPIVersionsForGroup(kc.Options, policy.GroupName)) > 0
649
+		if policyEnabled {
650
+			kc.RunDisruptionBudgetController(disruptionClient)
651
+		}
652
+
646 653
 		kc.RunEndpointController(endpointControllerClient)
647 654
 		kc.RunNamespaceController(namespaceControllerClientSet, namespaceControllerClientPool)
648 655
 		kc.RunPersistentVolumeController(binderClient, oc.Options.PolicyConfig.OpenShiftInfrastructureNamespace, oc.ImageFor("recycler"), bootstrappolicy.InfraPersistentVolumeRecyclerControllerServiceAccountName)
... ...
@@ -650,7 +666,11 @@ func startControllers(oc *origin.MasterConfig, kc *kubernetes.MasterConfig) erro
650 650
 		kc.RunGCController(gcClient)
651 651
 
652 652
 		kc.RunServiceLoadBalancerController(serviceLoadBalancerClient)
653
-		kc.RunPetSetController(petSetClient)
653
+
654
+		appsEnabled := len(configapi.GetEnabledAPIVersionsForGroup(kc.Options, apps.GroupName)) > 0
655
+		if appsEnabled {
656
+			kc.RunPetSetController(petSetClient)
657
+		}
654 658
 
655 659
 		glog.Infof("Started Kubernetes Controllers")
656 660
 	}