package deployment
import (
"fmt"
"github.com/golang/glog"
kapi "k8s.io/kubernetes/pkg/api"
kerrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/client/cache"
kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/runtime"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/workqueue"
deployapi "github.com/openshift/origin/pkg/deploy/api"
deployutil "github.com/openshift/origin/pkg/deploy/util"
"github.com/openshift/origin/pkg/util"
)
// fatalError is an error which can't be retried.
type fatalError string
func (e fatalError) Error() string { return "fatal error handling deployment: " + string(e) }
// actionableError is an error on which users can act.
type actionableError string
func (e actionableError) Error() string { return string(e) }
// DeploymentController starts a deployment by creating a deployer pod which
// implements a deployment strategy. The status of the deployment will follow
// the status of the deployer pod. The deployer pod is correlated to the
// deployment with annotations.
//
// When the deployment enters a terminal status:
//
// 1. If the deployment finished normally, the deployer pod is deleted.
// 2. If the deployment failed, the deployer pod is not deleted.
type DeploymentController struct {
// rn is used for updating replication controllers.
rn kcoreclient.ReplicationControllersGetter
// pn is used for creating, updating, and deleting deployer pods.
pn kcoreclient.PodsGetter
// queue contains replication controllers that need to be synced.
queue workqueue.RateLimitingInterface
// rcStore is a store of replication controllers.
rcStore cache.StoreToReplicationControllerLister
// podStore is a store of pods.
podStore cache.StoreToPodLister
// rcStoreSynced makes sure the rc store is synced before reconcling any deployment.
rcStoreSynced func() bool
// podStoreSynced makes sure the pod store is synced before reconcling any deployment.
podStoreSynced func() bool
// deployerImage specifies which Docker image can support the default strategies.
deployerImage string
// serviceAccount to create deployment pods with.
serviceAccount string
// environment is a set of environment variables which should be injected into all
// deployer pod containers.
environment []kapi.EnvVar
// codec is used for deserializing deploymentconfigs from replication controller
// annotations.
codec runtime.Codec
// recorder is used to record events.
recorder record.EventRecorder
}
// Handle processes deployment and either creates a deployer pod or responds
// to a terminal deployment status. Since this controller started using caches,
// the provided rc MUST be deep-copied beforehand (see work() in factory.go).
func (c *DeploymentController) Handle(deployment *kapi.ReplicationController) error {
// Copy all the annotations from the deployment.
updatedAnnotations := make(map[string]string)
for key, value := range deployment.Annotations {
updatedAnnotations[key] = value
}
currentStatus := deployutil.DeploymentStatusFor(deployment)
nextStatus := currentStatus
deployerPodName := deployutil.DeployerPodNameForDeployment(deployment.Name)
deployer, deployerErr := c.podStore.Pods(deployment.Namespace).Get(deployerPodName)
if deployerErr == nil {
nextStatus = c.nextStatus(deployer, deployment, updatedAnnotations)
}
switch currentStatus {
case deployapi.DeploymentStatusNew:
// If the deployment has been cancelled, don't create a deployer pod.
// Instead try to delete any deployer pods found and transition the
// deployment to Pending so that the deployment config controller
// continues to see the deployment as in-flight. Eventually the deletion
// of the deployer pod should cause a requeue of this deployment and
// then it can be transitioned to Failed by this controller.
if deployutil.IsDeploymentCancelled(deployment) {
nextStatus = deployapi.DeploymentStatusPending
if err := c.cleanupDeployerPods(deployment); err != nil {
return err
}
break
}
switch {
case kerrors.IsNotFound(deployerErr):
if _, ok := deployment.Annotations[deployapi.DeploymentIgnorePodAnnotation]; ok {
return nil
}
// Generate a deployer pod spec.
deployerPod, err := c.makeDeployerPod(deployment)
if err != nil {
return fatalError(fmt.Sprintf("couldn't make deployer pod for %s: %v", deployutil.LabelForDeployment(deployment), err))
}
// Create the deployer pod.
deploymentPod, err := c.pn.Pods(deployment.Namespace).Create(deployerPod)
// Retry on error.
if err != nil {
return actionableError(fmt.Sprintf("couldn't create deployer pod for %s: %v", deployutil.LabelForDeployment(deployment), err))
}
updatedAnnotations[deployapi.DeploymentPodAnnotation] = deploymentPod.Name
nextStatus = deployapi.DeploymentStatusPending
glog.V(4).Infof("Created deployer pod %s for deployment %s", deploymentPod.Name, deployutil.LabelForDeployment(deployment))
case deployerErr != nil:
// If the pod already exists, it's possible that a previous CreatePod
// succeeded but the deployment state update failed and now we're re-
// entering. Ensure that the pod is the one we created by verifying the
// annotation on it, and throw a retryable error.
return fmt.Errorf("couldn't fetch existing deployer pod for %s: %v", deployutil.LabelForDeployment(deployment), deployerErr)
default: /* deployerErr == nil */
// Do a stronger check to validate that the existing deployer pod is
// actually for this deployment, and if not, fail this deployment.
//
// TODO: Investigate checking the container image of the running pod and
// comparing with the intended deployer pod image. If we do so, we'll need
// to ensure that changes to 'unrelated' pods don't result in updates to
// the deployment. So, the image check will have to be done in other areas
// of the code as well.
if deployutil.DeploymentNameFor(deployer) != deployment.Name {
nextStatus = deployapi.DeploymentStatusFailed
updatedAnnotations[deployapi.DeploymentStatusReasonAnnotation] = deployapi.DeploymentFailedUnrelatedDeploymentExists
c.emitDeploymentEvent(deployment, kapi.EventTypeWarning, "FailedCreate", fmt.Sprintf("Error creating deployer pod since another pod with the same name (%q) exists", deployer.Name))
glog.V(2).Infof("Couldn't create deployer pod for %s since an unrelated pod with the same name (%q) exists", deployutil.LabelForDeployment(deployment), deployer.Name)
} else {
// Update to pending or to the appropriate status relative to the existing validated deployer pod.
updatedAnnotations[deployapi.DeploymentPodAnnotation] = deployer.Name
nextStatus = nextStatusComp(nextStatus, deployapi.DeploymentStatusPending)
glog.V(4).Infof("Detected existing deployer pod %s for deployment %s", deployer.Name, deployutil.LabelForDeployment(deployment))
}
}
case deployapi.DeploymentStatusPending, deployapi.DeploymentStatusRunning:
switch {
case kerrors.IsNotFound(deployerErr):
nextStatus = deployapi.DeploymentStatusFailed
// If the deployment is cancelled here then we deleted the deployer in a previous
// resync of the deployment.
if !deployutil.IsDeploymentCancelled(deployment) {
updatedAnnotations[deployapi.DeploymentStatusReasonAnnotation] = deployapi.DeploymentFailedDeployerPodNoLongerExists
c.emitDeploymentEvent(deployment, kapi.EventTypeWarning, "Failed", fmt.Sprintf("Deployer pod %q has gone missing", deployerPodName))
deployerErr = fmt.Errorf("Failing deployment %q because its deployer pod %q disappeared", deployutil.LabelForDeployment(deployment), deployerPodName)
utilruntime.HandleError(deployerErr)
}
case deployerErr != nil:
// We'll try again later on resync. Continue to process cancellations.
deployerErr = fmt.Errorf("Error getting deployer pod %q for deployment %q: %v", deployerPodName, deployutil.LabelForDeployment(deployment), deployerErr)
utilruntime.HandleError(deployerErr)
default: /* err == nil */
// If the deployment has been cancelled, delete any deployer pods
// found. Eventually the deletion of the deployer pod should cause
// a requeue of this deployment and then it can be transitioned to
// Failed.
if deployutil.IsDeploymentCancelled(deployment) {
if err := c.cleanupDeployerPods(deployment); err != nil {
return err
}
}
}
case deployapi.DeploymentStatusFailed:
// Try to cleanup once more a cancelled deployment in case hook pods
// were created just after we issued the first cleanup request.
if deployutil.IsDeploymentCancelled(deployment) {
if err := c.cleanupDeployerPods(deployment); err != nil {
return err
}
}
case deployapi.DeploymentStatusComplete:
if err := c.cleanupDeployerPods(deployment); err != nil {
return err
}
}
// Update only if we need to transition to a new phase.
if deployutil.CanTransitionPhase(currentStatus, nextStatus) {
deployment, err := deployutil.DeploymentDeepCopy(deployment)
if err != nil {
return err
}
updatedAnnotations[deployapi.DeploymentStatusAnnotation] = string(nextStatus)
deployment.Annotations = updatedAnnotations
// if we are going to transition to failed or complete and scale is non-zero, we'll check one more
// time to see if we are a test deployment to guarantee that we maintain the test invariant.
if deployment.Spec.Replicas != 0 && deployutil.IsTerminatedDeployment(deployment) {
if config, err := deployutil.DecodeDeploymentConfig(deployment, c.codec); err == nil && config.Spec.Test {
deployment.Spec.Replicas = 0
}
}
if _, err := c.rn.ReplicationControllers(deployment.Namespace).Update(deployment); err != nil {
return fmt.Errorf("couldn't update deployment %s to status %s: %v", deployutil.LabelForDeployment(deployment), nextStatus, err)
}
glog.V(4).Infof("Updated deployment %s status from %s to %s (scale: %d)", deployutil.LabelForDeployment(deployment), currentStatus, nextStatus, deployment.Spec.Replicas)
if deployutil.IsDeploymentCancelled(deployment) && deployutil.IsFailedDeployment(deployment) {
c.emitDeploymentEvent(deployment, kapi.EventTypeNormal, "DeploymentCancelled", fmt.Sprintf("Deployment %q cancelled", deployutil.LabelForDeployment(deployment)))
}
}
return nil
}
func (c *DeploymentController) nextStatus(pod *kapi.Pod, deployment *kapi.ReplicationController, updatedAnnotations map[string]string) deployapi.DeploymentStatus {
switch pod.Status.Phase {
case kapi.PodPending:
return deployapi.DeploymentStatusPending
case kapi.PodRunning:
return deployapi.DeploymentStatusRunning
case kapi.PodSucceeded:
// If the deployment was cancelled just prior to the deployer pod succeeding
// then we need to remove the cancel annotations from the complete deployment
// and emit an event letting users know their cancellation failed.
if deployutil.IsDeploymentCancelled(deployment) {
delete(updatedAnnotations, deployapi.DeploymentCancelledAnnotation)
delete(updatedAnnotations, deployapi.DeploymentStatusReasonAnnotation)
c.emitDeploymentEvent(deployment, kapi.EventTypeWarning, "FailedCancellation", "Succeeded before cancel recorded")
}
// Sync the internal replica annotation with the target so that we can
// distinguish deployer updates from other scaling events.
updatedAnnotations[deployapi.DeploymentReplicasAnnotation] = updatedAnnotations[deployapi.DesiredReplicasAnnotation]
delete(updatedAnnotations, deployapi.DesiredReplicasAnnotation)
return deployapi.DeploymentStatusComplete
case kapi.PodFailed:
return deployapi.DeploymentStatusFailed
}
return deployapi.DeploymentStatusNew
}
func nextStatusComp(fromDeployer, fromPath deployapi.DeploymentStatus) deployapi.DeploymentStatus {
if deployutil.CanTransitionPhase(fromPath, fromDeployer) {
return fromDeployer
}
return fromPath
}
// makeDeployerPod creates a pod which implements deployment behavior. The pod is correlated to
// the deployment with an annotation.
func (c *DeploymentController) makeDeployerPod(deployment *kapi.ReplicationController) (*kapi.Pod, error) {
deploymentConfig, err := deployutil.DecodeDeploymentConfig(deployment, c.codec)
if err != nil {
return nil, err
}
container := c.makeDeployerContainer(&deploymentConfig.Spec.Strategy)
// Add deployment environment variables to the container.
envVars := []kapi.EnvVar{}
for _, env := range container.Env {
envVars = append(envVars, env)
}
envVars = append(envVars, kapi.EnvVar{Name: "OPENSHIFT_DEPLOYMENT_NAME", Value: deployment.Name})
envVars = append(envVars, kapi.EnvVar{Name: "OPENSHIFT_DEPLOYMENT_NAMESPACE", Value: deployment.Namespace})
// Assigning to a variable since its address is required
maxDeploymentDurationSeconds := deployapi.MaxDeploymentDurationSeconds
if deploymentConfig.Spec.Strategy.ActiveDeadlineSeconds != nil {
maxDeploymentDurationSeconds = *(deploymentConfig.Spec.Strategy.ActiveDeadlineSeconds)
}
gracePeriod := int64(10)
pod := &kapi.Pod{
ObjectMeta: kapi.ObjectMeta{
Name: deployutil.DeployerPodNameForDeployment(deployment.Name),
Annotations: map[string]string{
deployapi.DeploymentAnnotation: deployment.Name,
},
Labels: map[string]string{
deployapi.DeployerPodForDeploymentLabel: deployment.Name,
},
},
Spec: kapi.PodSpec{
Containers: []kapi.Container{
{
Name: "deployment",
Command: container.Command,
Args: container.Args,
Image: container.Image,
Env: envVars,
Resources: deploymentConfig.Spec.Strategy.Resources,
},
},
ActiveDeadlineSeconds: &maxDeploymentDurationSeconds,
DNSPolicy: deployment.Spec.Template.Spec.DNSPolicy,
ImagePullSecrets: deployment.Spec.Template.Spec.ImagePullSecrets,
// Setting the node selector on the deployer pod so that it is created
// on the same set of nodes as the pods.
NodeSelector: deployment.Spec.Template.Spec.NodeSelector,
RestartPolicy: kapi.RestartPolicyNever,
ServiceAccountName: c.serviceAccount,
TerminationGracePeriodSeconds: &gracePeriod,
},
}
// MergeInfo will not overwrite values unless the flag OverwriteExistingDstKey is set.
util.MergeInto(pod.Labels, deploymentConfig.Spec.Strategy.Labels, 0)
util.MergeInto(pod.Annotations, deploymentConfig.Spec.Strategy.Annotations, 0)
pod.Spec.Containers[0].ImagePullPolicy = kapi.PullIfNotPresent
return pod, nil
}
// makeDeployerContainer creates containers in the following way:
//
// 1. For the Recreate and Rolling strategies, strategy, use the factory's
// DeployerImage as the container image, and the factory's Environment
// as the container environment.
// 2. For all Custom strategies, or if the CustomParams field is set, use
// the strategy's image for the container image, and use the combination
// of the factory's Environment and the strategy's environment as the
// container environment.
//
func (c *DeploymentController) makeDeployerContainer(strategy *deployapi.DeploymentStrategy) *kapi.Container {
image := c.deployerImage
var environment []kapi.EnvVar
var command []string
set := sets.NewString()
// Use user-defined values from the strategy input.
if p := strategy.CustomParams; p != nil {
if len(p.Image) > 0 {
image = p.Image
}
if len(p.Command) > 0 {
command = p.Command
}
for _, env := range strategy.CustomParams.Environment {
set.Insert(env.Name)
environment = append(environment, env)
}
}
// Set default environment values
for _, env := range c.environment {
if set.Has(env.Name) {
continue
}
environment = append(environment, env)
}
return &kapi.Container{
Image: image,
Command: command,
Env: environment,
}
}
func (c *DeploymentController) cleanupDeployerPods(deployment *kapi.ReplicationController) error {
selector := deployutil.DeployerPodSelector(deployment.Name)
deployerList, err := c.podStore.Pods(deployment.Namespace).List(selector)
if err != nil {
return fmt.Errorf("couldn't fetch deployer pods for %q: %v", deployutil.LabelForDeployment(deployment), err)
}
cleanedAll := true
for _, deployerPod := range deployerList {
if err := c.pn.Pods(deployerPod.Namespace).Delete(deployerPod.Name, &kapi.DeleteOptions{}); err != nil && !kerrors.IsNotFound(err) {
// if the pod deletion failed, then log the error and continue
// we will try to delete any remaining deployer pods and return an error later
utilruntime.HandleError(fmt.Errorf("couldn't delete completed deployer pod %q for deployment %q: %v", deployerPod.Name, deployutil.LabelForDeployment(deployment), err))
cleanedAll = false
}
}
if !cleanedAll {
return actionableError(fmt.Sprintf("couldn't clean up all deployer pods for %s", deployment.Name))
}
return nil
}
func (c *DeploymentController) emitDeploymentEvent(deployment *kapi.ReplicationController, eventType, title, message string) {
if config, _ := deployutil.DecodeDeploymentConfig(deployment, c.codec); config != nil {
c.recorder.Eventf(config, eventType, title, fmt.Sprintf("%s: %s", deployment.Name, message))
} else {
c.recorder.Eventf(deployment, eventType, title, message)
}
}
func (c *DeploymentController) handleErr(err error, key interface{}, deployment *kapi.ReplicationController) {
if err == nil {
c.queue.Forget(key)
return
}
if _, isFatal := err.(fatalError); isFatal {
utilruntime.HandleError(err)
c.queue.Forget(key)
return
}
if c.queue.NumRequeues(key) < 2 {
c.queue.AddRateLimited(key)
return
}
if _, isActionableErr := err.(actionableError); isActionableErr {
c.emitDeploymentEvent(deployment, kapi.EventTypeWarning, "FailedRetry", fmt.Sprintf("About to stop retrying %s: %v", deployment.Name, err))
}
c.queue.Forget(key)
}