package kubernetes
import (
"fmt"
"io/ioutil"
"net"
"os"
"time"
"github.com/emicklei/go-restful"
"github.com/golang/glog"
kctrlmgr "k8s.io/kubernetes/cmd/kube-controller-manager/app"
federationv1beta1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
appsv1alpha1 "k8s.io/kubernetes/pkg/apis/apps/v1alpha1"
autoscalingv1 "k8s.io/kubernetes/pkg/apis/autoscaling/v1"
"k8s.io/kubernetes/pkg/apis/batch"
batchv1 "k8s.io/kubernetes/pkg/apis/batch/v1"
batchv2alpha1 "k8s.io/kubernetes/pkg/apis/batch/v2alpha1"
certificatesv1alpha1 "k8s.io/kubernetes/pkg/apis/certificates/v1alpha1"
"k8s.io/kubernetes/pkg/apis/componentconfig"
extv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
policyv1alpha1 "k8s.io/kubernetes/pkg/apis/policy/v1alpha1"
kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/typed/dynamic"
kclient "k8s.io/kubernetes/pkg/client/unversioned"
adapter "k8s.io/kubernetes/pkg/client/unversioned/adapters/internalclientset"
"k8s.io/kubernetes/pkg/controller/daemon"
"k8s.io/kubernetes/pkg/controller/deployment"
"k8s.io/kubernetes/pkg/controller/disruption"
endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/pkg/controller/garbagecollector"
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
jobcontroller "k8s.io/kubernetes/pkg/controller/job"
namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace"
nodecontroller "k8s.io/kubernetes/pkg/controller/node"
petsetcontroller "k8s.io/kubernetes/pkg/controller/petset"
podautoscalercontroller "k8s.io/kubernetes/pkg/controller/podautoscaler"
"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
gccontroller "k8s.io/kubernetes/pkg/controller/podgc"
replicasetcontroller "k8s.io/kubernetes/pkg/controller/replicaset"
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
"k8s.io/kubernetes/pkg/controller/scheduledjob"
servicecontroller "k8s.io/kubernetes/pkg/controller/service"
attachdetachcontroller "k8s.io/kubernetes/pkg/controller/volume/attachdetach"
persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
"k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer"
"k8s.io/kubernetes/pkg/storage"
storagefactory "k8s.io/kubernetes/pkg/storage/storagebackend/factory"
utilwait "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/registry/endpoint"
endpointsetcd "k8s.io/kubernetes/pkg/registry/endpoint/etcd"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/aws_ebs"
"k8s.io/kubernetes/pkg/volume/cinder"
"k8s.io/kubernetes/pkg/volume/flexvolume"
"k8s.io/kubernetes/pkg/volume/gce_pd"
"k8s.io/kubernetes/pkg/volume/glusterfs"
"k8s.io/kubernetes/pkg/volume/host_path"
"k8s.io/kubernetes/pkg/volume/nfs"
"k8s.io/kubernetes/pkg/volume/rbd"
"k8s.io/kubernetes/pkg/volume/vsphere_volume"
"k8s.io/kubernetes/plugin/pkg/scheduler"
_ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest"
"k8s.io/kubernetes/plugin/pkg/scheduler/factory"
osclient "github.com/openshift/origin/pkg/client"
configapi "github.com/openshift/origin/pkg/cmd/server/api"
"github.com/openshift/origin/pkg/cmd/server/election"
)
const (
KubeAPIPrefix = "/api"
KubeAPIGroupPrefix = "/apis"
)
// InstallAPI starts a Kubernetes master and registers the supported REST APIs
// into the provided mux, then returns an array of strings indicating what
// endpoints were started (these are format strings that will expect to be sent
// a single string value).
func (c *MasterConfig) InstallAPI(container *restful.Container) ([]string, error) {
c.Master.RestfulContainer = container
if c.Master.EnableCoreControllers {
glog.V(2).Info("Using the lease endpoint reconciler")
config, err := c.Master.StorageFactory.NewConfig(kapi.Resource("apiServerIPInfo"))
if err != nil {
return nil, err
}
leaseStorage, _, err := storagefactory.Create(*config)
if err != nil {
return nil, err
}
masterLeases := newMasterLeases(leaseStorage)
endpointConfig, err := c.Master.StorageFactory.NewConfig(kapi.Resource("endpoints"))
if err != nil {
return nil, err
}
endpointsStorage := endpointsetcd.NewREST(generic.RESTOptions{
StorageConfig: endpointConfig,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: 0,
ResourcePrefix: c.Master.StorageFactory.ResourcePrefix(kapi.Resource("endpoints")),
})
endpointRegistry := endpoint.NewRegistry(endpointsStorage)
c.Master.EndpointReconcilerConfig = master.EndpointReconcilerConfig{
Reconciler: election.NewLeaseEndpointReconciler(endpointRegistry, masterLeases),
Interval: master.DefaultEndpointReconcilerInterval,
}
}
_, err := master.New(c.Master)
if err != nil {
return nil, err
}
messages := []string{}
// v1 has to be printed separately since it's served from different endpoint than groups
if configapi.HasKubernetesAPIVersion(c.Options, v1.SchemeGroupVersion) {
messages = append(messages, fmt.Sprintf("Started Kubernetes API at %%s%s", KubeAPIPrefix))
}
// TODO: this is a bit much - I exist in some code somewhere
versions := []unversioned.GroupVersion{
extv1beta1.SchemeGroupVersion,
batchv1.SchemeGroupVersion,
batchv2alpha1.SchemeGroupVersion,
autoscalingv1.SchemeGroupVersion,
certificatesv1alpha1.SchemeGroupVersion,
appsv1alpha1.SchemeGroupVersion,
policyv1alpha1.SchemeGroupVersion,
federationv1beta1.SchemeGroupVersion,
}
for _, ver := range versions {
if configapi.HasKubernetesAPIVersion(c.Options, ver) {
messages = append(messages, fmt.Sprintf("Started Kubernetes API %s at %%s%s", ver.String(), KubeAPIGroupPrefix))
}
}
return messages, nil
}
func newMasterLeases(storage storage.Interface) election.Leases {
// leaseTTL is in seconds, i.e. 15 means 15 seconds; do NOT do 15*time.Second!
leaseTTL := uint64((master.DefaultEndpointReconcilerInterval + 5*time.Second) / time.Second) // add 5 seconds for wiggle room
return election.NewLeases(storage, "/masterleases/", leaseTTL)
}
// RunNamespaceController starts the Kubernetes Namespace Manager
func (c *MasterConfig) RunNamespaceController(kubeClient kclientset.Interface, clientPool dynamic.ClientPool) {
// Find the list of namespaced resources via discovery that the namespace controller must manage
groupVersionResources, err := kubeClient.Discovery().ServerPreferredNamespacedResources()
if err != nil {
glog.Fatalf("Failed to get supported resources from server: %v", err)
}
namespaceController := namespacecontroller.NewNamespaceController(kubeClient, clientPool, groupVersionResources, c.ControllerManager.NamespaceSyncPeriod.Duration, kapi.FinalizerKubernetes)
go namespaceController.Run(int(c.ControllerManager.ConcurrentNamespaceSyncs), utilwait.NeverStop)
}
func (c *MasterConfig) RunPersistentVolumeController(client *kclientset.Clientset, namespace, recyclerImageName, recyclerServiceAccountName string) {
s := c.ControllerManager
alphaProvisioner, err := kctrlmgr.NewAlphaVolumeProvisioner(c.CloudProvider, s.VolumeConfiguration)
if err != nil {
glog.Fatalf("A backward-compatible provisioner could not be created: %v, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.", err)
}
volumeController := persistentvolumecontroller.NewPersistentVolumeController(
client,
s.PVClaimBinderSyncPeriod.Duration,
alphaProvisioner,
probeRecyclableVolumePlugins(s.VolumeConfiguration, namespace, recyclerImageName, recyclerServiceAccountName),
c.CloudProvider,
s.ClusterName,
nil, nil, nil,
nil, // event recorder
s.VolumeConfiguration.EnableDynamicProvisioning,
)
volumeController.Run(utilwait.NeverStop)
}
func (c *MasterConfig) RunPersistentVolumeAttachDetachController(client *kclientset.Clientset) {
s := c.ControllerManager
attachDetachController, err :=
attachdetachcontroller.NewAttachDetachController(
client,
c.Informers.Pods().Informer(),
c.Informers.Nodes().Informer(),
c.Informers.PersistentVolumeClaims().Informer(),
c.Informers.PersistentVolumes().Informer(),
c.CloudProvider,
kctrlmgr.ProbeAttachableVolumePlugins(s.VolumeConfiguration),
nil,
)
if err != nil {
glog.Fatalf("Failed to start attach/detach controller: %v", err)
} else {
go attachDetachController.Run(utilwait.NeverStop)
}
}
// probeRecyclableVolumePlugins collects all persistent volume plugins into an easy to use list.
func probeRecyclableVolumePlugins(config componentconfig.VolumeConfiguration, namespace, recyclerImageName, recyclerServiceAccountName string) []volume.VolumePlugin {
uid := int64(0)
defaultScrubPod := volume.NewPersistentVolumeRecyclerPodTemplate()
defaultScrubPod.Namespace = namespace
defaultScrubPod.Spec.ServiceAccountName = recyclerServiceAccountName
defaultScrubPod.Spec.Containers[0].Image = recyclerImageName
defaultScrubPod.Spec.Containers[0].Command = []string{"/usr/bin/openshift-recycle"}
defaultScrubPod.Spec.Containers[0].Args = []string{"/scrub"}
defaultScrubPod.Spec.Containers[0].SecurityContext = &kapi.SecurityContext{RunAsUser: &uid}
defaultScrubPod.Spec.Containers[0].ImagePullPolicy = kapi.PullIfNotPresent
allPlugins := []volume.VolumePlugin{}
// The list of plugins to probe is decided by this binary, not
// by dynamic linking or other "magic". Plugins will be analyzed and
// initialized later.
// Each plugin can make use of VolumeConfig. The single arg to this func contains *all* enumerated
// options meant to configure volume plugins. From that single config, create an instance of volume.VolumeConfig
// for a specific plugin and pass that instance to the plugin's ProbeVolumePlugins(config) func.
// HostPath recycling is for testing and development purposes only!
hostPathConfig := volume.VolumeConfig{
RecyclerMinimumTimeout: int(config.PersistentVolumeRecyclerConfiguration.MinimumTimeoutHostPath),
RecyclerTimeoutIncrement: int(config.PersistentVolumeRecyclerConfiguration.IncrementTimeoutHostPath),
RecyclerPodTemplate: defaultScrubPod,
}
if err := kctrlmgr.AttemptToLoadRecycler(config.PersistentVolumeRecyclerConfiguration.PodTemplateFilePathHostPath, &hostPathConfig); err != nil {
glog.Fatalf("Could not create hostpath recycler pod from file %s: %+v", config.PersistentVolumeRecyclerConfiguration.PodTemplateFilePathHostPath, err)
}
allPlugins = append(allPlugins, host_path.ProbeVolumePlugins(hostPathConfig)...)
nfsConfig := volume.VolumeConfig{
RecyclerMinimumTimeout: int(config.PersistentVolumeRecyclerConfiguration.MinimumTimeoutNFS),
RecyclerTimeoutIncrement: int(config.PersistentVolumeRecyclerConfiguration.IncrementTimeoutNFS),
RecyclerPodTemplate: defaultScrubPod,
}
if err := kctrlmgr.AttemptToLoadRecycler(config.PersistentVolumeRecyclerConfiguration.PodTemplateFilePathNFS, &nfsConfig); err != nil {
glog.Fatalf("Could not create NFS recycler pod from file %s: %+v", config.PersistentVolumeRecyclerConfiguration.PodTemplateFilePathNFS, err)
}
allPlugins = append(allPlugins, nfs.ProbeVolumePlugins(nfsConfig)...)
allPlugins = append(allPlugins, aws_ebs.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, gce_pd.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, cinder.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, flexvolume.ProbeVolumePlugins(config.FlexVolumePluginDir)...)
allPlugins = append(allPlugins, vsphere_volume.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, glusterfs.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, rbd.ProbeVolumePlugins()...)
return allPlugins
}
func (c *MasterConfig) RunReplicaSetController(client *kclientset.Clientset) {
controller := replicasetcontroller.NewReplicaSetController(
c.Informers.Pods().Informer(),
client,
kctrlmgr.ResyncPeriod(c.ControllerManager),
replicasetcontroller.BurstReplicas,
int(c.ControllerManager.LookupCacheSizeForRC),
c.ControllerManager.EnableGarbageCollector,
)
go controller.Run(int(c.ControllerManager.ConcurrentRSSyncs), utilwait.NeverStop)
}
// RunReplicationController starts the Kubernetes replication controller sync loop
func (c *MasterConfig) RunReplicationController(client *kclientset.Clientset) {
controllerManager := replicationcontroller.NewReplicationManager(
c.Informers.Pods().Informer(),
client,
kctrlmgr.ResyncPeriod(c.ControllerManager),
replicationcontroller.BurstReplicas,
int(c.ControllerManager.LookupCacheSizeForRC),
c.ControllerManager.EnableGarbageCollector,
)
go controllerManager.Run(int(c.ControllerManager.ConcurrentRCSyncs), utilwait.NeverStop)
}
func (c *MasterConfig) RunDeploymentController(client *kclientset.Clientset) {
controller := deployment.NewDeploymentController(
client,
kctrlmgr.ResyncPeriod(c.ControllerManager),
)
go controller.Run(int(c.ControllerManager.ConcurrentDeploymentSyncs), utilwait.NeverStop)
}
// RunJobController starts the Kubernetes job controller sync loop
func (c *MasterConfig) RunJobController(client *kclientset.Clientset) {
controller := jobcontroller.NewJobController(c.Informers.Pods().Informer(), client)
go controller.Run(int(c.ControllerManager.ConcurrentJobSyncs), utilwait.NeverStop)
}
// RunScheduledJobController starts the Kubernetes scheduled job controller sync loop
func (c *MasterConfig) RunScheduledJobController(config *restclient.Config) {
// TODO: this is a temp fix for allowing kubeClient list v2alpha1 jobs, should switch to using clientset
config.ContentConfig.GroupVersion = &unversioned.GroupVersion{Group: batch.GroupName, Version: "v2alpha1"}
client, err := kclient.New(config)
if err != nil {
glog.Fatalf("Unable to configure scheduled job controller: %v", err)
}
go scheduledjob.NewScheduledJobController(client).Run(utilwait.NeverStop)
}
// RunDisruptionBudgetController starts the Kubernetes disruption budget controller
func (c *MasterConfig) RunDisruptionBudgetController(client *kclient.Client) {
go disruption.NewDisruptionController(c.Informers.Pods().Informer(), client).Run(utilwait.NeverStop)
}
// RunHPAController starts the Kubernetes hpa controller sync loop
func (c *MasterConfig) RunHPAController(oc *osclient.Client, kc *kclientset.Clientset, heapsterNamespace string) {
delegatingScaleNamespacer := osclient.NewDelegatingScaleNamespacer(oc, kc)
podautoscaler := podautoscalercontroller.NewHorizontalController(
kc,
delegatingScaleNamespacer,
kc,
metrics.NewHeapsterMetricsClient(kc, heapsterNamespace, "https", "heapster", ""),
c.ControllerManager.HorizontalPodAutoscalerSyncPeriod.Duration,
)
go podautoscaler.Run(utilwait.NeverStop)
}
func (c *MasterConfig) RunDaemonSetsController(client *kclientset.Clientset) {
controller := daemon.NewDaemonSetsController(
c.Informers.Pods().Informer(),
client,
kctrlmgr.ResyncPeriod(c.ControllerManager),
int(c.ControllerManager.LookupCacheSizeForDaemonSet),
)
go controller.Run(int(c.ControllerManager.ConcurrentDaemonSetSyncs), utilwait.NeverStop)
}
// RunEndpointController starts the Kubernetes replication controller sync loop
func (c *MasterConfig) RunEndpointController(client *kclientset.Clientset) {
endpoints := endpointcontroller.NewEndpointController(c.Informers.Pods().Informer(), client)
go endpoints.Run(int(c.ControllerManager.ConcurrentEndpointSyncs), utilwait.NeverStop)
}
// RunScheduler starts the Kubernetes scheduler
func (c *MasterConfig) RunScheduler() {
config, err := c.createSchedulerConfig()
if err != nil {
glog.Fatalf("Unable to start scheduler: %v", err)
}
eventcast := record.NewBroadcaster()
config.Recorder = eventcast.NewRecorder(kapi.EventSource{Component: kapi.DefaultSchedulerName})
eventcast.StartRecordingToSink(c.KubeClient.Events(""))
s := scheduler.New(config)
s.Run()
}
// RunGCController handles deletion of terminated pods.
func (c *MasterConfig) RunGCController(client *kclientset.Clientset) {
if c.ControllerManager.TerminatedPodGCThreshold > 0 {
gcController := gccontroller.New(client, kctrlmgr.ResyncPeriod(c.ControllerManager), int(c.ControllerManager.TerminatedPodGCThreshold))
go gcController.Run(utilwait.NeverStop)
}
}
// RunGarbageCollectorController starts generic garbage collection for the cluster.
func (c *MasterConfig) RunGarbageCollectorController(client *osclient.Client, config *restclient.Config) {
if !c.ControllerManager.EnableGarbageCollector {
return
}
groupVersionResources, err := client.Discovery().ServerPreferredResources()
if err != nil {
glog.Fatalf("Failed to get supported resources from server: %v", err)
}
config = restclient.AddUserAgent(config, "generic-garbage-collector")
config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()}
// TODO: needs to take GVR
metaOnlyClientPool := dynamic.NewClientPool(config, dynamic.LegacyAPIPathResolverFunc)
config.ContentConfig.NegotiatedSerializer = nil
// TODO: needs to take GVR
clientPool := dynamic.NewClientPool(config, dynamic.LegacyAPIPathResolverFunc)
garbageCollector, err := garbagecollector.NewGarbageCollector(metaOnlyClientPool, clientPool, groupVersionResources)
if err != nil {
glog.Fatalf("Failed to start the garbage collector: %v", err)
}
workers := int(c.ControllerManager.ConcurrentGCSyncs)
go garbageCollector.Run(workers, utilwait.NeverStop)
}
// RunNodeController starts the node controller
// TODO: handle node CIDR and route allocation
func (c *MasterConfig) RunNodeController() {
s := c.ControllerManager
// this cidr has been validated already
_, clusterCIDR, _ := net.ParseCIDR(s.ClusterCIDR)
_, serviceCIDR, _ := net.ParseCIDR(s.ServiceCIDR)
controller, err := nodecontroller.NewNodeController(
c.Informers.Pods().Informer(),
c.CloudProvider,
adapter.FromUnversionedClient(c.KubeClient),
s.PodEvictionTimeout.Duration,
s.NodeEvictionRate,
s.SecondaryNodeEvictionRate,
s.LargeClusterSizeThreshold,
s.UnhealthyZoneThreshold,
s.NodeMonitorGracePeriod.Duration,
s.NodeStartupGracePeriod.Duration,
s.NodeMonitorPeriod.Duration,
clusterCIDR,
serviceCIDR,
int(s.NodeCIDRMaskSize),
s.AllocateNodeCIDRs,
)
if err != nil {
glog.Fatalf("Unable to start node controller: %v", err)
}
controller.Run()
}
// RunServiceLoadBalancerController starts the service loadbalancer controller if the cloud provider is configured.
func (c *MasterConfig) RunServiceLoadBalancerController(client *kclientset.Clientset) {
if c.CloudProvider == nil {
glog.V(2).Infof("Service controller will not start - no cloud provider configured")
return
}
serviceController, err := servicecontroller.New(c.CloudProvider, client, c.ControllerManager.ClusterName)
if err != nil {
glog.Errorf("Unable to start service controller: %v", err)
} else {
serviceController.Run(int(c.ControllerManager.ConcurrentServiceSyncs))
}
}
// RunPetSetController starts the PetSet controller
func (c *MasterConfig) RunPetSetController(client *kclient.Client) {
ps := petsetcontroller.NewPetSetController(c.Informers.Pods().Informer(), client, kctrlmgr.ResyncPeriod(c.ControllerManager)())
go ps.Run(1, utilwait.NeverStop)
}
func (c *MasterConfig) createSchedulerConfig() (*scheduler.Config, error) {
var policy schedulerapi.Policy
var configData []byte
// TODO make the rate limiter configurable
configFactory := factory.NewConfigFactory(c.KubeClient, c.SchedulerServer.SchedulerName, int(c.SchedulerServer.HardPodAffinitySymmetricWeight), c.SchedulerServer.FailureDomains)
if _, err := os.Stat(c.Options.SchedulerConfigFile); err == nil {
configData, err = ioutil.ReadFile(c.SchedulerServer.PolicyConfigFile)
if err != nil {
return nil, fmt.Errorf("unable to read scheduler config: %v", err)
}
err = runtime.DecodeInto(latestschedulerapi.Codec, configData, &policy)
if err != nil {
return nil, fmt.Errorf("invalid scheduler configuration: %v", err)
}
return configFactory.CreateFromConfig(policy)
}
// if the config file isn't provided, use the default provider
return configFactory.CreateFromProvider(factory.DefaultProvider)
}