package kubernetes import ( "fmt" "io/ioutil" "os" "time" "github.com/emicklei/go-restful" "github.com/golang/glog" kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" kclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" minioncontroller "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/nodecontroller" "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/master" "github.com/GoogleCloudPlatform/kubernetes/pkg/resourcequota" "github.com/GoogleCloudPlatform/kubernetes/pkg/service" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/volumeclaimbinder" "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler" _ "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithmprovider" schedulerapi "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/api" latestschedulerapi "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/api/latest" "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory" "github.com/GoogleCloudPlatform/kubernetes/pkg/namespace" ) const ( KubeAPIPrefix = "/api" KubeAPIPrefixV1Beta1 = "/api/v1beta1" KubeAPIPrefixV1Beta2 = "/api/v1beta2" KubeAPIPrefixV1Beta3 = "/api/v1beta3" KubeAPIPrefixV1 = "/api/v1" ) // TODO: Longer term we should read this from some config store, rather than a flag. func (c *MasterConfig) EnsurePortalFlags() { if c.PortalNet == nil { glog.Fatal("No --portal-net specified") } } // InstallAPI starts a Kubernetes master and registers the supported REST APIs // into the provided mux, then returns an array of strings indicating what // endpoints were started (these are format strings that will expect to be sent // a single string value). func (c *MasterConfig) InstallAPI(container *restful.Container) []string { kubeletClient, err := kclient.NewKubeletClient(c.KubeletClientConfig) if err != nil { glog.Fatalf("Unable to configure Kubelet client: %v", err) } masterConfig := &master.Config{ PublicAddress: c.MasterIP, ReadWritePort: c.MasterPort, ReadOnlyPort: c.MasterPort, EtcdHelper: c.EtcdHelper, EventTTL: 2 * time.Hour, PortalNet: c.PortalNet, RequestContextMapper: c.RequestContextMapper, RestfulContainer: container, KubeletClient: kubeletClient, APIPrefix: KubeAPIPrefix, EnableV1: true, EnableCoreControllers: true, MasterCount: c.MasterCount, Authorizer: c.Authorizer, AdmissionControl: c.AdmissionControl, } _ = master.New(masterConfig) return []string{ fmt.Sprintf("Started Kubernetes API at %%s%s", KubeAPIPrefixV1Beta1), fmt.Sprintf("Started Kubernetes API at %%s%s", KubeAPIPrefixV1Beta3), fmt.Sprintf("Started Kubernetes API at %%s%s", KubeAPIPrefixV1Beta3), fmt.Sprintf("Started Kubernetes API at %%s%s (experimental)", KubeAPIPrefixV1), } } func (c *MasterConfig) RunNamespaceController() { namespaceController := namespace.NewNamespaceManager(c.KubeClient, 5*time.Minute) namespaceController.Run() glog.Infof("Started Kubernetes Namespace Manager") } func (c *MasterConfig) RunPersistentVolumeClaimBinder() { binder := volumeclaimbinder.NewPersistentVolumeClaimBinder(c.KubeClient, 5*time.Minute) binder.Run() glog.Infof("Started Kubernetes Persistent Volume Claim Binder") } // RunReplicationController starts the Kubernetes replication controller sync loop func (c *MasterConfig) RunReplicationController() { controllerManager := controller.NewReplicationManager(c.KubeClient, controller.BurstReplicas) go controllerManager.Run(5, util.NeverStop) glog.Infof("Started Kubernetes Replication Manager") } // RunEndpointController starts the Kubernetes replication controller sync loop func (c *MasterConfig) RunEndpointController() { endpoints := service.NewEndpointController(c.KubeClient) go endpoints.Run(5, util.NeverStop) glog.Infof("Started Kubernetes Endpoint Controller") } // RunScheduler starts the Kubernetes scheduler func (c *MasterConfig) RunScheduler() { config, err := c.createSchedulerConfig() if err != nil { glog.Fatalf("Unable to start scheduler: %v", err) } eventcast := record.NewBroadcaster() config.Recorder = eventcast.NewRecorder(kapi.EventSource{Component: "scheduler"}) eventcast.StartRecordingToSink(c.KubeClient.Events("")) s := scheduler.New(config) s.Run() glog.Infof("Started Kubernetes Scheduler") } func (c *MasterConfig) RunResourceQuotaManager() { resourceQuotaManager := resourcequota.NewResourceQuotaManager(c.KubeClient) resourceQuotaManager.Run(10 * time.Second) } func (c *MasterConfig) RunMinionController() { nodeResources := &kapi.NodeResources{ Capacity: kapi.ResourceList{ kapi.ResourceCPU: *resource.NewMilliQuantity(int64(1*1000), resource.DecimalSI), kapi.ResourceMemory: *resource.NewQuantity(int64(3*1024*1024*1024), resource.BinarySI), }, } minionController := minioncontroller.NewNodeController( nil, "", c.NodeHosts, nodeResources, c.KubeClient, 10, // registerRetryCount 5*time.Minute, // podEvictionTimeout util.NewTokenBucketRateLimiter(0.1, 10), // deleting pods qps / burst 40*time.Second, // monitor grace 1*time.Minute, // startup grace 10*time.Second, // monitor period nil, // clusterCIDR false, // allocateNodeCIDRs ) minionController.Run(10*time.Second, true) glog.Infof("Started Kubernetes Minion Controller") } func (c *MasterConfig) createSchedulerConfig() (*scheduler.Config, error) { var policy schedulerapi.Policy var configData []byte configFactory := factory.NewConfigFactory(c.KubeClient) if _, err := os.Stat(c.SchedulerConfigFile); err == nil { configData, err = ioutil.ReadFile(c.SchedulerConfigFile) if err != nil { return nil, fmt.Errorf("Unable to read scheduler config: %v", err) } err = latestschedulerapi.Codec.DecodeInto(configData, &policy) if err != nil { return nil, fmt.Errorf("Invalid scheduler configuration: %v", err) } return configFactory.CreateFromConfig(policy) } // if the config file isn't provided, use the default provider return configFactory.CreateFromProvider(factory.DefaultProvider) }