pkg/deploy/controller/deploymentconfig/controller.go
6c68632b
 package deploymentconfig
 
 import (
 	"fmt"
0e85054d
 	"reflect"
6c68632b
 
 	"github.com/golang/glog"
 
83c702b4
 	kapi "k8s.io/kubernetes/pkg/api"
95d647c8
 	kapierrors "k8s.io/kubernetes/pkg/api/errors"
cdbabcb7
 	"k8s.io/kubernetes/pkg/client/cache"
97e6f1de
 	kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
d970f0a5
 	"k8s.io/kubernetes/pkg/client/record"
 	kclient "k8s.io/kubernetes/pkg/client/unversioned"
0e85054d
 	"k8s.io/kubernetes/pkg/labels"
d970f0a5
 	"k8s.io/kubernetes/pkg/runtime"
95d647c8
 	kutilerrors "k8s.io/kubernetes/pkg/util/errors"
cdbabcb7
 	utilruntime "k8s.io/kubernetes/pkg/util/runtime"
 	"k8s.io/kubernetes/pkg/util/workqueue"
6c68632b
 
0b38d810
 	osclient "github.com/openshift/origin/pkg/client"
cdbabcb7
 	oscache "github.com/openshift/origin/pkg/client/cache"
6c68632b
 	deployapi "github.com/openshift/origin/pkg/deploy/api"
 	deployutil "github.com/openshift/origin/pkg/deploy/util"
 )
 
cdbabcb7
 // fatalError is an error which can't be retried.
 type fatalError string
 
 func (e fatalError) Error() string {
 	return fmt.Sprintf("fatal error handling deployment config: %s", string(e))
 }
 
d970f0a5
 // DeploymentConfigController is responsible for creating a new deployment
 // when:
6c68632b
 //
 //    1. The config version is > 0 and,
d970f0a5
 //    2. No deployment for the version exists.
6c68632b
 //
d970f0a5
 // The controller reconciles deployments with the replica count specified on
 // the config. The active deployment (that is, the latest successful
 // deployment) will always be scaled to the config replica count. All other
 // deployments will be scaled to zero.
6c68632b
 //
d970f0a5
 // If a new version is observed for which no deployment exists, any running
 // deployments will be cancelled. The controller will not attempt to scale
 // running deployments.
6c68632b
 type DeploymentConfigController struct {
cdbabcb7
 	// dn provides access to deploymentconfigs.
 	dn osclient.DeploymentConfigsNamespacer
 	// rn provides access to replication controllers.
97e6f1de
 	rn kcoreclient.ReplicationControllersGetter
cdbabcb7
 
 	// queue contains deployment configs that need to be synced.
 	queue workqueue.RateLimitingInterface
 
 	// dcStore provides a local cache for deployment configs.
 	dcStore oscache.StoreToDeploymentConfigLister
 	// rcStore provides a local cache for replication controllers.
 	rcStore cache.StoreToReplicationControllerLister
0e85054d
 	// podStore provides a local cache for pods.
 	podStore cache.StoreToPodLister
 
cdbabcb7
 	// dcStoreSynced makes sure the dc store is synced before reconcling any deployment config.
 	dcStoreSynced func() bool
 	// rcStoreSynced makes sure the rc store is synced before reconcling any deployment config.
 	rcStoreSynced func() bool
0e85054d
 	// podStoreSynced makes sure the pod store is synced before reconcling any deployment config.
 	podStoreSynced func() bool
cdbabcb7
 
d970f0a5
 	// codec is used to build deployments from configs.
 	codec runtime.Codec
 	// recorder is used to record events.
 	recorder record.EventRecorder
6c68632b
 }
 
cdbabcb7
 // Handle implements the loop that processes deployment configs. Since this controller started
 // using caches, the provided config MUST be deep-copied beforehand (see work() in factory.go).
d970f0a5
 func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig) error {
e343373c
 	glog.V(5).Infof("Reconciling %s/%s", config.Namespace, config.Name)
0e85054d
 	// There's nothing to reconcile until the version is nonzero.
 	if config.Status.LatestVersion == 0 {
 		return c.updateStatus(config, []kapi.ReplicationController{})
6c68632b
 	}
 
cdbabcb7
 	// Find all deployments owned by the deployment config.
 	selector := deployutil.ConfigSelector(config.Name)
 	existingDeployments, err := c.rcStore.ReplicationControllers(config.Namespace).List(selector)
c02e9cfa
 	if err != nil {
 		return err
 	}
 
0e85054d
 	// In case the deployment config has been marked for deletion, merely update its status with
 	// the latest available information. Some deletions make take some time to complete so there
 	// is value in doing this.
 	if config.DeletionTimestamp != nil {
 		return c.updateStatus(config, existingDeployments)
 	}
 
d970f0a5
 	latestIsDeployed, latestDeployment := deployutil.LatestDeploymentInfo(config, existingDeployments)
 	// If the latest deployment doesn't exist yet, cancel any running
 	// deployments to allow them to be superceded by the new config version.
 	awaitingCancellations := false
 	if !latestIsDeployed {
7e6a823f
 		for i := range existingDeployments {
 			deployment := existingDeployments[i]
d970f0a5
 			// Skip deployments with an outcome.
 			if deployutil.IsTerminatedDeployment(&deployment) {
4c5ddeff
 				continue
 			}
d970f0a5
 			// Cancel running deployments.
 			awaitingCancellations = true
066150dd
 			if deployutil.IsDeploymentCancelled(&deployment) {
 				continue
 			}
7e6a823f
 
066150dd
 			// Retry faster on conflicts
 			var updatedDeployment *kapi.ReplicationController
 			if err := kclient.RetryOnConflict(kclient.DefaultBackoff, func() error {
 				rc, err := c.rcStore.ReplicationControllers(deployment.Namespace).Get(deployment.Name)
 				if kapierrors.IsNotFound(err) {
 					return nil
 				}
 				if err != nil {
f550e74e
 					return err
d970f0a5
 				}
066150dd
 				copied, err := deployutil.DeploymentDeepCopy(rc)
 				if err != nil {
 					return err
 				}
 				copied.Annotations[deployapi.DeploymentCancelledAnnotation] = deployapi.DeploymentCancelledAnnotationValue
 				copied.Annotations[deployapi.DeploymentStatusReasonAnnotation] = deployapi.DeploymentCancelledNewerDeploymentExists
 				updatedDeployment, err = c.rn.ReplicationControllers(copied.Namespace).Update(copied)
 				return err
 			}); err != nil {
 				c.recorder.Eventf(config, kapi.EventTypeWarning, "DeploymentCancellationFailed", "Failed to cancel deployment %q superceded by version %d: %s", deployment.Name, config.Status.LatestVersion, err)
 			} else if updatedDeployment != nil {
 				// replace the current deployment with the updated copy so that a future update has a chance at working
 				existingDeployments[i] = *updatedDeployment
 				c.recorder.Eventf(config, kapi.EventTypeNormal, "DeploymentCancelled", "Cancelled deployment %q superceded by version %d", deployment.Name, config.Status.LatestVersion)
4c5ddeff
 			}
 		}
 	}
d970f0a5
 	// Wait for deployment cancellations before reconciling or creating a new
 	// deployment to avoid competing with existing deployment processes.
 	if awaitingCancellations {
f638b86d
 		c.recorder.Eventf(config, kapi.EventTypeNormal, "DeploymentAwaitingCancellation", "Deployment of version %d awaiting cancellation of older running deployments", config.Status.LatestVersion)
cdbabcb7
 		return fmt.Errorf("found previous inflight deployment for %s - requeuing", deployutil.LabelForDeploymentConfig(config))
f881f281
 	}
d970f0a5
 	// If the latest deployment already exists, reconcile existing deployments
 	// and return early.
 	if latestIsDeployed {
 		// If the latest deployment is still running, try again later. We don't
 		// want to compete with the deployer.
 		if !deployutil.IsTerminatedDeployment(latestDeployment) {
0e85054d
 			return c.updateStatus(config, existingDeployments)
d970f0a5
 		}
95d647c8
 
d970f0a5
 		return c.reconcileDeployments(existingDeployments, config)
 	}
9b2c4ab9
 	// If the config is paused we shouldn't create new deployments for it.
 	if config.Spec.Paused {
95d647c8
 		// in order for revision history limit cleanup to work for paused
 		// deployments, we need to trigger it here
 		if err := c.cleanupOldDeployments(existingDeployments, config); err != nil {
 			c.recorder.Eventf(config, kapi.EventTypeWarning, "DeploymentCleanupFailed", "Couldn't clean up deployments: %v", err)
 		}
 
0e85054d
 		return c.updateStatus(config, existingDeployments)
9b2c4ab9
 	}
d970f0a5
 	// No deployments are running and the latest deployment doesn't exist, so
 	// create the new deployment.
 	deployment, err := deployutil.MakeDeployment(config, c.codec)
6c68632b
 	if err != nil {
0b38d810
 		return fatalError(fmt.Sprintf("couldn't make deployment from (potentially invalid) deployment config %s: %v", deployutil.LabelForDeploymentConfig(config), err))
6c68632b
 	}
cdbabcb7
 	created, err := c.rn.ReplicationControllers(config.Namespace).Create(deployment)
d970f0a5
 	if err != nil {
 		// If the deployment was already created, just move on. The cache could be
 		// stale, or another process could have already handled this update.
95d647c8
 		if kapierrors.IsAlreadyExists(err) {
0e85054d
 			return c.updateStatus(config, existingDeployments)
6c68632b
 		}
f638b86d
 		c.recorder.Eventf(config, kapi.EventTypeWarning, "DeploymentCreationFailed", "Couldn't deploy version %d: %s", config.Status.LatestVersion, err)
e343373c
 		// We don't care about this error since we need to report the create failure.
 		cond := deployutil.NewDeploymentCondition(deployapi.DeploymentProgressing, kapi.ConditionFalse, deployutil.FailedRcCreateReason, err.Error())
 		_ = c.updateStatus(config, existingDeployments, *cond)
0b38d810
 		return fmt.Errorf("couldn't create deployment for deployment config %s: %v", deployutil.LabelForDeploymentConfig(config), err)
6c68632b
 	}
e343373c
 	msg := fmt.Sprintf("Created new replication controller %q for version %d", created.Name, config.Status.LatestVersion)
 	c.recorder.Eventf(config, kapi.EventTypeNormal, "DeploymentCreated", msg)
dda90ccb
 
95d647c8
 	// As we've just created a new deployment, we need to make sure to clean
 	// up old deployments if we have reached our deployment history quota
 	existingDeployments = append(existingDeployments, *created)
 	if err := c.cleanupOldDeployments(existingDeployments, config); err != nil {
 		c.recorder.Eventf(config, kapi.EventTypeWarning, "DeploymentCleanupFailed", "Couldn't clean up deployments: %v", err)
 	}
 
24fa302c
 	cond := deployutil.NewDeploymentCondition(deployapi.DeploymentProgressing, kapi.ConditionTrue, deployutil.NewReplicationControllerReason, msg)
e343373c
 	return c.updateStatus(config, existingDeployments, *cond)
6c68632b
 }
 
d970f0a5
 // reconcileDeployments reconciles existing deployment replica counts which
 // could have diverged outside the deployment process (e.g. due to auto or
 // manual scaling, or partial deployments). The active deployment is the last
 // successful deployment, not necessarily the latest in terms of the config
 // version. The active deployment replica count should follow the config, and
 // all other deployments should be scaled to zero.
cdbabcb7
 func (c *DeploymentConfigController) reconcileDeployments(existingDeployments []kapi.ReplicationController, config *deployapi.DeploymentConfig) error {
8127e853
 	activeDeployment := deployutil.ActiveDeployment(existingDeployments)
aa9b471e
 
d970f0a5
 	// Reconcile deployments. The active deployment follows the config, and all
 	// other deployments should be scaled to zero.
0e85054d
 	var updatedDeployments []kapi.ReplicationController
 	for i := range existingDeployments {
 		deployment := existingDeployments[i]
 		toAppend := deployment
 
d970f0a5
 		isActiveDeployment := activeDeployment != nil && deployment.Name == activeDeployment.Name
0b38d810
 
2429a35c
 		oldReplicaCount := deployment.Spec.Replicas
 		newReplicaCount := int32(0)
d970f0a5
 		if isActiveDeployment {
066150dd
 			newReplicaCount = config.Spec.Replicas
d970f0a5
 		}
aa9b471e
 		if config.Spec.Test {
 			glog.V(4).Infof("Deployment config %q is test and deployment %q will be scaled down", deployutil.LabelForDeploymentConfig(config), deployutil.LabelForDeployment(&deployment))
 			newReplicaCount = 0
 		}
066150dd
 
d970f0a5
 		// Only update if necessary.
f550e74e
 		var copied *kapi.ReplicationController
066150dd
 		if newReplicaCount != oldReplicaCount {
f550e74e
 			if err := kclient.RetryOnConflict(kclient.DefaultBackoff, func() error {
 				// refresh the replication controller version
 				rc, err := c.rcStore.ReplicationControllers(deployment.Namespace).Get(deployment.Name)
 				if err != nil {
 					return err
 				}
 				copied, err = deployutil.DeploymentDeepCopy(rc)
 				if err != nil {
 					glog.V(2).Infof("Deep copy of deployment %q failed: %v", rc.Name, err)
 					return err
 				}
 				copied.Spec.Replicas = newReplicaCount
066150dd
 				copied, err = c.rn.ReplicationControllers(copied.Namespace).Update(copied)
547a40ee
 				return err
f550e74e
 			}); err != nil {
066150dd
 				c.recorder.Eventf(config, kapi.EventTypeWarning, "ReplicationControllerScaleFailed",
 					"Failed to scale replication controler %q from %d to %d: %v", deployment.Name, oldReplicaCount, newReplicaCount, err)
d970f0a5
 				return err
 			}
f550e74e
 
066150dd
 			c.recorder.Eventf(config, kapi.EventTypeNormal, "ReplicationControllerScaled", "Scaled replication controller %q from %d to %d", copied.Name, oldReplicaCount, newReplicaCount)
0e85054d
 			toAppend = *copied
d970f0a5
 		}
0e85054d
 
 		updatedDeployments = append(updatedDeployments, toAppend)
0b38d810
 	}
dda90ccb
 
95d647c8
 	// As the deployment configuration has changed, we need to make sure to clean
 	// up old deployments if we have now reached our deployment history quota
066150dd
 	if err := c.cleanupOldDeployments(updatedDeployments, config); err != nil {
 		c.recorder.Eventf(config, kapi.EventTypeWarning, "ReplicationControllerCleanupFailed", "Couldn't clean up replication controllers: %v", err)
95d647c8
 	}
 
0e85054d
 	return c.updateStatus(config, updatedDeployments)
dda90ccb
 }
 
e343373c
 // Update the status of the provided deployment config. Additional conditions will override any other condition in the
 // deployment config status.
 func (c *DeploymentConfigController) updateStatus(config *deployapi.DeploymentConfig, deployments []kapi.ReplicationController, additional ...deployapi.DeploymentCondition) error {
 	newStatus, err := c.calculateStatus(*config, deployments, additional...)
0e85054d
 	if err != nil {
 		glog.V(2).Infof("Cannot calculate the status for %q: %v", deployutil.LabelForDeploymentConfig(config), err)
 		return err
 	}
 
cdbabcb7
 	// NOTE: We should update the status of the deployment config only if we need to, otherwise
 	// we hotloop between updates.
0e85054d
 	if reflect.DeepEqual(newStatus, config.Status) {
cdbabcb7
 		return nil
dda90ccb
 	}
0e85054d
 
8be964bd
 	copied, err := deployutil.DeploymentConfigDeepCopy(config)
 	if err != nil {
 		return err
 	}
 
 	copied.Status = newStatus
e343373c
 	// TODO: Retry update conficts
8be964bd
 	if _, err := c.dn.DeploymentConfigs(copied.Namespace).UpdateStatus(copied); err != nil {
 		glog.V(2).Infof("Cannot update the status for %q: %v", deployutil.LabelForDeploymentConfig(copied), err)
cdbabcb7
 		return err
dda90ccb
 	}
8be964bd
 	glog.V(4).Infof("Updated the status for %q (observed generation: %d)", deployutil.LabelForDeploymentConfig(copied), copied.Status.ObservedGeneration)
dda90ccb
 	return nil
0b38d810
 }
547a40ee
 
e343373c
 func (c *DeploymentConfigController) calculateStatus(config deployapi.DeploymentConfig, deployments []kapi.ReplicationController, additional ...deployapi.DeploymentCondition) (deployapi.DeploymentConfigStatus, error) {
0e85054d
 	selector := labels.Set(config.Spec.Selector).AsSelector()
e343373c
 	// TODO: Replace with using rc.status.availableReplicas that comes with the next rebase.
0e85054d
 	pods, err := c.podStore.Pods(config.Namespace).List(selector)
 	if err != nil {
 		return config.Status, err
 	}
baf9fa77
 	available := deployutil.GetAvailablePods(pods, config.Spec.MinReadySeconds)
0e85054d
 
 	// UpdatedReplicas represents the replicas that use the deployment config template which means
 	// we should inform about the replicas of the latest deployment and not the active.
 	latestReplicas := int32(0)
e343373c
 	latestExists, latestRC := deployutil.LatestDeploymentInfo(&config, deployments)
 	if !latestExists {
 		latestRC = nil
 	} else {
 		latestReplicas = deployutil.GetStatusReplicaCountForDeployments([]kapi.ReplicationController{*latestRC})
0e85054d
 	}
 
 	total := deployutil.GetReplicaCountForDeployments(deployments)
 
e343373c
 	status := deployapi.DeploymentConfigStatus{
0e85054d
 		LatestVersion:       config.Status.LatestVersion,
 		Details:             config.Status.Details,
 		ObservedGeneration:  config.Generation,
 		Replicas:            deployutil.GetStatusReplicaCountForDeployments(deployments),
 		UpdatedReplicas:     latestReplicas,
 		AvailableReplicas:   available,
 		UnavailableReplicas: total - available,
e343373c
 		Conditions:          config.Status.Conditions,
 	}
 
6b52405f
 	updateConditions(config, &status, latestRC)
e343373c
 	for _, cond := range additional {
 		deployutil.SetDeploymentCondition(&status, cond)
 	}
 
 	return status, nil
0e85054d
 }
 
6b52405f
 func updateConditions(config deployapi.DeploymentConfig, newStatus *deployapi.DeploymentConfigStatus, latestRC *kapi.ReplicationController) {
48a4dbf8
 	// Availability condition.
e343373c
 	if newStatus.AvailableReplicas >= config.Spec.Replicas-deployutil.MaxUnavailable(config) && newStatus.AvailableReplicas > 0 {
48a4dbf8
 		minAvailability := deployutil.NewDeploymentCondition(deployapi.DeploymentAvailable, kapi.ConditionTrue, "", "Deployment config has minimum availability.")
 		deployutil.SetDeploymentCondition(newStatus, *minAvailability)
 	} else {
 		noMinAvailability := deployutil.NewDeploymentCondition(deployapi.DeploymentAvailable, kapi.ConditionFalse, "", "Deployment config does not have minimum availability.")
 		deployutil.SetDeploymentCondition(newStatus, *noMinAvailability)
 	}
e343373c
 
48a4dbf8
 	// Condition about progress.
 	if latestRC != nil {
 		switch deployutil.DeploymentStatusFor(latestRC) {
e343373c
 		case deployapi.DeploymentStatusPending:
56f46266
 			msg := fmt.Sprintf("Replication controller %q is waiting for pod %q to run", latestRC.Name, deployutil.DeployerPodNameForDeployment(latestRC.Name))
48a4dbf8
 			condition := deployutil.NewDeploymentCondition(deployapi.DeploymentProgressing, kapi.ConditionUnknown, "", msg)
 			deployutil.SetDeploymentCondition(newStatus, *condition)
 		case deployapi.DeploymentStatusRunning:
6b52405f
 			if deployutil.IsProgressing(config, *newStatus) {
e343373c
 				deployutil.RemoveDeploymentCondition(newStatus, deployapi.DeploymentProgressing)
 				msg := fmt.Sprintf("Replication controller %q is progressing", latestRC.Name)
 				condition := deployutil.NewDeploymentCondition(deployapi.DeploymentProgressing, kapi.ConditionTrue, deployutil.ReplicationControllerUpdatedReason, msg)
 				// TODO: Right now, we use lastTransitionTime for storing the last time we had any progress instead
 				// of the last time the condition transitioned to a new status. We should probably change that.
 				deployutil.SetDeploymentCondition(newStatus, *condition)
48a4dbf8
 			}
e343373c
 		case deployapi.DeploymentStatusFailed:
48a4dbf8
 			msg := fmt.Sprintf("Replication controller %q has failed progressing", latestRC.Name)
 			condition := deployutil.NewDeploymentCondition(deployapi.DeploymentProgressing, kapi.ConditionFalse, deployutil.TimedOutReason, msg)
 			deployutil.SetDeploymentCondition(newStatus, *condition)
 		case deployapi.DeploymentStatusComplete:
 			msg := fmt.Sprintf("Replication controller %q has completed progressing", latestRC.Name)
 			condition := deployutil.NewDeploymentCondition(deployapi.DeploymentProgressing, kapi.ConditionTrue, deployutil.NewRcAvailableReason, msg)
 			deployutil.SetDeploymentCondition(newStatus, *condition)
 		}
 	}
 }
 
cdbabcb7
 func (c *DeploymentConfigController) handleErr(err error, key interface{}) {
 	if err == nil {
57d0a818
 		c.queue.Forget(key)
cdbabcb7
 		return
 	}
57d0a818
 
cdbabcb7
 	if _, isFatal := err.(fatalError); isFatal {
 		utilruntime.HandleError(err)
 		c.queue.Forget(key)
 		return
 	}
 
57d0a818
 	if c.queue.NumRequeues(key) < MaxRetries {
 		glog.V(2).Infof("Error syncing deployment config %v: %v", key, err)
cdbabcb7
 		c.queue.AddRateLimited(key)
57d0a818
 		return
cdbabcb7
 	}
57d0a818
 
 	utilruntime.HandleError(err)
 	c.queue.Forget(key)
cdbabcb7
 }
95d647c8
 
 // cleanupOldDeployments deletes old replication controller deployments if their quota has been reached
 func (c *DeploymentConfigController) cleanupOldDeployments(existingDeployments []kapi.ReplicationController, deploymentConfig *deployapi.DeploymentConfig) error {
 	if deploymentConfig.Spec.RevisionHistoryLimit == nil {
 		// there is no past deplyoment quota set
 		return nil
 	}
 
 	prunableDeployments := deployutil.DeploymentsForCleanup(deploymentConfig, existingDeployments)
b6c41af9
 	if len(prunableDeployments) <= int(*deploymentConfig.Spec.RevisionHistoryLimit) {
95d647c8
 		// the past deployment quota has not been exceeded
 		return nil
 	}
 
 	deletionErrors := []error{}
b6c41af9
 	for i := 0; i < (len(prunableDeployments) - int(*deploymentConfig.Spec.RevisionHistoryLimit)); i++ {
95d647c8
 		deployment := prunableDeployments[i]
 		if deployment.Spec.Replicas != 0 {
 			// we do not want to clobber active older deployments, but we *do* want them to count
 			// against the quota so that they will be pruned when they're scaled down
 			continue
 		}
 
baf9fa77
 		err := c.rn.ReplicationControllers(deployment.Namespace).Delete(deployment.Name, nil)
95d647c8
 		if err != nil && !kapierrors.IsNotFound(err) {
 			glog.V(2).Infof("Failed deleting old Replication Controller %q for Deployment Config %q: %v", deployment.Name, deploymentConfig.Name, err)
 			deletionErrors = append(deletionErrors, err)
 		}
 	}
 
 	return kutilerrors.NewAggregate(deletionErrors)
 }