package deploy import ( "time" kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" kubeclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/golang/glog" osclient "github.com/openshift/origin/pkg/client" deployapi "github.com/openshift/origin/pkg/deploy/api" ) // A DeploymentController is responsible for executing Deployment objects stored in etcd type DeploymentController struct { osClient osclient.Interface kubeClient kubeclient.Interface syncTicker <-chan time.Time stateHandler DeploymentStateHandler } // DeploymentStateHandler holds methods that handle the possible deployment states. type DeploymentStateHandler interface { HandleNew(kapi.Context, *deployapi.Deployment) error HandlePending(kapi.Context, *deployapi.Deployment) error HandleRunning(kapi.Context, *deployapi.Deployment) error } // DefaultDeploymentRunner is the default implementation of DeploymentRunner interface. type DefaultDeploymentHandler struct { osClient osclient.Interface kubeClient kubeclient.Interface environment []kapi.EnvVar } // NewDeploymentController creates a new DeploymentController. func NewDeploymentController(kubeClient kubeclient.Interface, osClient osclient.Interface, initialEnvironment []kapi.EnvVar) *DeploymentController { dc := &DeploymentController{ kubeClient: kubeClient, osClient: osClient, stateHandler: &DefaultDeploymentHandler{ osClient: osClient, kubeClient: kubeClient, environment: initialEnvironment, }, } return dc } // Run begins watching and synchronizing deployment states. func (dc *DeploymentController) Run(period time.Duration) { ctx := kapi.NewContext() dc.syncTicker = time.Tick(period) go util.Forever(func() { dc.synchronize(ctx) }, period) } // The main synchronization loop. Iterates through all deployments and handles the current state // for each. func (dc *DeploymentController) synchronize(ctx kapi.Context) { deployments, err := dc.osClient.ListDeployments(ctx, labels.Everything()) if err != nil { glog.Errorf("Synchronization error: %v (%#v)", err, err) return } for ix := range deployments.Items { id := deployments.Items[ix].ID deployment, err := dc.osClient.GetDeployment(ctx, id) if err != nil { glog.Errorf("Got error retrieving deployment with id %s -- %v", id, err) continue } err = dc.syncDeployment(ctx, deployment) if err != nil { glog.Errorf("Error synchronizing: %#v", err) } } } // Invokes the appropriate handler for the current state of the given deployment. func (dc *DeploymentController) syncDeployment(ctx kapi.Context, deployment *deployapi.Deployment) error { glog.Infof("Synchronizing deployment id: %v state: %v resourceVersion: %v", deployment.ID, deployment.State, deployment.ResourceVersion) var err error = nil switch deployment.State { case deployapi.DeploymentNew: err = dc.stateHandler.HandleNew(ctx, deployment) case deployapi.DeploymentPending: err = dc.stateHandler.HandlePending(ctx, deployment) case deployapi.DeploymentRunning: err = dc.stateHandler.HandleRunning(ctx, deployment) } return err } func (dh *DefaultDeploymentHandler) saveDeployment(ctx kapi.Context, deployment *deployapi.Deployment) error { glog.Infof("Saving deployment %v state: %v", deployment.ID, deployment.State) _, err := dh.osClient.UpdateDeployment(ctx, deployment) if err != nil { glog.Errorf("Received error while saving deployment %v: %v", deployment.ID, err) } return err } func (dh *DefaultDeploymentHandler) makeDeploymentPod(deployment *deployapi.Deployment) *kapi.Pod { podID := deploymentPodID(deployment) envVars := deployment.Strategy.CustomPod.Environment envVars = append(envVars, kapi.EnvVar{Name: "KUBERNETES_DEPLOYMENT_ID", Value: deployment.ID}) for _, env := range dh.environment { envVars = append(envVars, env) } return &kapi.Pod{ JSONBase: kapi.JSONBase{ ID: podID, }, DesiredState: kapi.PodState{ Manifest: kapi.ContainerManifest{ Version: "v1beta1", Containers: []kapi.Container{ { Name: "deployment", Image: deployment.Strategy.CustomPod.Image, Env: envVars, }, }, RestartPolicy: kapi.RestartPolicy{ Never: &kapi.RestartPolicyNever{}, }, }, }, } } func deploymentPodID(deployment *deployapi.Deployment) string { return "deploy-" + deployment.ID } // Handler for a deployment in the 'new' state. func (dh *DefaultDeploymentHandler) HandleNew(ctx kapi.Context, deployment *deployapi.Deployment) error { deploymentPod := dh.makeDeploymentPod(deployment) glog.Infof("Attempting to create deployment pod: %+v", deploymentPod) if pod, err := dh.kubeClient.CreatePod(kapi.NewContext(), deploymentPod); err != nil { glog.Warningf("Received error creating pod: %v", err) deployment.State = deployapi.DeploymentFailed } else { glog.Infof("Successfully created pod %+v", pod) deployment.State = deployapi.DeploymentPending } return dh.saveDeployment(ctx, deployment) } // Handler for a deployment in the 'pending' state func (dh *DefaultDeploymentHandler) HandlePending(ctx kapi.Context, deployment *deployapi.Deployment) error { podID := deploymentPodID(deployment) glog.Infof("Retrieving deployment pod id %s", podID) pod, err := dh.kubeClient.GetPod(ctx, podID) if err != nil { glog.Errorf("Error retrieving pod for deployment ID %v: %#v", deployment.ID, err) deployment.State = deployapi.DeploymentFailed } else { glog.Infof("Deployment pod is %+v", pod) switch pod.CurrentState.Status { case kapi.PodRunning: deployment.State = deployapi.DeploymentRunning case kapi.PodTerminated: dh.checkForTerminatedDeploymentPod(deployment, pod) } } return dh.saveDeployment(ctx, deployment) } // Handler for a deployment in the 'running' state func (dh *DefaultDeploymentHandler) HandleRunning(ctx kapi.Context, deployment *deployapi.Deployment) error { podID := deploymentPodID(deployment) glog.Infof("Retrieving deployment pod id %s", podID) pod, err := dh.kubeClient.GetPod(ctx, podID) if err != nil { glog.Errorf("Error retrieving pod for deployment ID %v: %#v", deployment.ID, err) deployment.State = deployapi.DeploymentFailed } else { glog.Infof("Deployment pod is %+v", pod) dh.checkForTerminatedDeploymentPod(deployment, pod) } return dh.saveDeployment(ctx, deployment) } func (dh *DefaultDeploymentHandler) checkForTerminatedDeploymentPod(deployment *deployapi.Deployment, pod *kapi.Pod) { if pod.CurrentState.Status != kapi.PodTerminated { glog.Infof("The deployment has not yet finished. Pod status is %s. Continuing", pod.CurrentState.Status) return } deployment.State = deployapi.DeploymentComplete for _, info := range pod.CurrentState.Info { if info.State.Termination != nil && info.State.Termination.ExitCode != 0 { deployment.State = deployapi.DeploymentFailed } } if deployment.State == deployapi.DeploymentComplete { podID := deploymentPodID(deployment) glog.Infof("Removing deployment pod for ID %v", podID) dh.kubeClient.DeletePod(kapi.NewContext(), podID) } glog.Infof("The deployment pod has finished. Setting deployment state to %s", deployment.State) return }