6c68632b |
package deploymentconfig
import (
"fmt" |
0e85054d |
"reflect" |
6c68632b |
"github.com/golang/glog"
|
83c702b4 |
kapi "k8s.io/kubernetes/pkg/api" |
95d647c8 |
kapierrors "k8s.io/kubernetes/pkg/api/errors" |
cdbabcb7 |
"k8s.io/kubernetes/pkg/client/cache" |
97e6f1de |
kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" |
d970f0a5 |
"k8s.io/kubernetes/pkg/client/record"
kclient "k8s.io/kubernetes/pkg/client/unversioned" |
0e85054d |
"k8s.io/kubernetes/pkg/labels" |
d970f0a5 |
"k8s.io/kubernetes/pkg/runtime" |
95d647c8 |
kutilerrors "k8s.io/kubernetes/pkg/util/errors" |
cdbabcb7 |
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/workqueue" |
6c68632b |
|
0b38d810 |
osclient "github.com/openshift/origin/pkg/client" |
cdbabcb7 |
oscache "github.com/openshift/origin/pkg/client/cache" |
6c68632b |
deployapi "github.com/openshift/origin/pkg/deploy/api"
deployutil "github.com/openshift/origin/pkg/deploy/util"
)
|
cdbabcb7 |
// fatalError is an error which can't be retried.
type fatalError string
func (e fatalError) Error() string {
return fmt.Sprintf("fatal error handling deployment config: %s", string(e))
}
|
d970f0a5 |
// DeploymentConfigController is responsible for creating a new deployment
// when: |
6c68632b |
//
// 1. The config version is > 0 and, |
d970f0a5 |
// 2. No deployment for the version exists. |
6c68632b |
// |
d970f0a5 |
// The controller reconciles deployments with the replica count specified on
// the config. The active deployment (that is, the latest successful
// deployment) will always be scaled to the config replica count. All other
// deployments will be scaled to zero. |
6c68632b |
// |
d970f0a5 |
// If a new version is observed for which no deployment exists, any running
// deployments will be cancelled. The controller will not attempt to scale
// running deployments. |
6c68632b |
type DeploymentConfigController struct { |
cdbabcb7 |
// dn provides access to deploymentconfigs.
dn osclient.DeploymentConfigsNamespacer
// rn provides access to replication controllers. |
97e6f1de |
rn kcoreclient.ReplicationControllersGetter |
cdbabcb7 |
// queue contains deployment configs that need to be synced.
queue workqueue.RateLimitingInterface
// dcStore provides a local cache for deployment configs.
dcStore oscache.StoreToDeploymentConfigLister
// rcStore provides a local cache for replication controllers.
rcStore cache.StoreToReplicationControllerLister |
0e85054d |
// podStore provides a local cache for pods.
podStore cache.StoreToPodLister
|
cdbabcb7 |
// dcStoreSynced makes sure the dc store is synced before reconcling any deployment config.
dcStoreSynced func() bool
// rcStoreSynced makes sure the rc store is synced before reconcling any deployment config.
rcStoreSynced func() bool |
0e85054d |
// podStoreSynced makes sure the pod store is synced before reconcling any deployment config.
podStoreSynced func() bool |
cdbabcb7 |
|
d970f0a5 |
// codec is used to build deployments from configs.
codec runtime.Codec
// recorder is used to record events.
recorder record.EventRecorder |
6c68632b |
}
|
cdbabcb7 |
// Handle implements the loop that processes deployment configs. Since this controller started
// using caches, the provided config MUST be deep-copied beforehand (see work() in factory.go). |
d970f0a5 |
func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig) error { |
e343373c |
glog.V(5).Infof("Reconciling %s/%s", config.Namespace, config.Name) |
0e85054d |
// There's nothing to reconcile until the version is nonzero.
if config.Status.LatestVersion == 0 {
return c.updateStatus(config, []kapi.ReplicationController{}) |
6c68632b |
}
|
cdbabcb7 |
// Find all deployments owned by the deployment config.
selector := deployutil.ConfigSelector(config.Name)
existingDeployments, err := c.rcStore.ReplicationControllers(config.Namespace).List(selector) |
c02e9cfa |
if err != nil {
return err
}
|
0e85054d |
// In case the deployment config has been marked for deletion, merely update its status with
// the latest available information. Some deletions make take some time to complete so there
// is value in doing this.
if config.DeletionTimestamp != nil {
return c.updateStatus(config, existingDeployments)
}
|
d970f0a5 |
latestIsDeployed, latestDeployment := deployutil.LatestDeploymentInfo(config, existingDeployments)
// If the latest deployment doesn't exist yet, cancel any running
// deployments to allow them to be superceded by the new config version.
awaitingCancellations := false
if !latestIsDeployed { |
7e6a823f |
for i := range existingDeployments {
deployment := existingDeployments[i] |
d970f0a5 |
// Skip deployments with an outcome.
if deployutil.IsTerminatedDeployment(&deployment) { |
4c5ddeff |
continue
} |
d970f0a5 |
// Cancel running deployments.
awaitingCancellations = true |
066150dd |
if deployutil.IsDeploymentCancelled(&deployment) {
continue
} |
7e6a823f |
|
066150dd |
// Retry faster on conflicts
var updatedDeployment *kapi.ReplicationController
if err := kclient.RetryOnConflict(kclient.DefaultBackoff, func() error {
rc, err := c.rcStore.ReplicationControllers(deployment.Namespace).Get(deployment.Name)
if kapierrors.IsNotFound(err) {
return nil
}
if err != nil { |
f550e74e |
return err |
d970f0a5 |
} |
066150dd |
copied, err := deployutil.DeploymentDeepCopy(rc)
if err != nil {
return err
}
copied.Annotations[deployapi.DeploymentCancelledAnnotation] = deployapi.DeploymentCancelledAnnotationValue
copied.Annotations[deployapi.DeploymentStatusReasonAnnotation] = deployapi.DeploymentCancelledNewerDeploymentExists
updatedDeployment, err = c.rn.ReplicationControllers(copied.Namespace).Update(copied)
return err
}); err != nil {
c.recorder.Eventf(config, kapi.EventTypeWarning, "DeploymentCancellationFailed", "Failed to cancel deployment %q superceded by version %d: %s", deployment.Name, config.Status.LatestVersion, err)
} else if updatedDeployment != nil {
// replace the current deployment with the updated copy so that a future update has a chance at working
existingDeployments[i] = *updatedDeployment
c.recorder.Eventf(config, kapi.EventTypeNormal, "DeploymentCancelled", "Cancelled deployment %q superceded by version %d", deployment.Name, config.Status.LatestVersion) |
4c5ddeff |
}
}
} |
d970f0a5 |
// Wait for deployment cancellations before reconciling or creating a new
// deployment to avoid competing with existing deployment processes.
if awaitingCancellations { |
f638b86d |
c.recorder.Eventf(config, kapi.EventTypeNormal, "DeploymentAwaitingCancellation", "Deployment of version %d awaiting cancellation of older running deployments", config.Status.LatestVersion) |
cdbabcb7 |
return fmt.Errorf("found previous inflight deployment for %s - requeuing", deployutil.LabelForDeploymentConfig(config)) |
f881f281 |
} |
d970f0a5 |
// If the latest deployment already exists, reconcile existing deployments
// and return early.
if latestIsDeployed {
// If the latest deployment is still running, try again later. We don't
// want to compete with the deployer.
if !deployutil.IsTerminatedDeployment(latestDeployment) { |
0e85054d |
return c.updateStatus(config, existingDeployments) |
d970f0a5 |
} |
95d647c8 |
|
d970f0a5 |
return c.reconcileDeployments(existingDeployments, config)
} |
9b2c4ab9 |
// If the config is paused we shouldn't create new deployments for it.
if config.Spec.Paused { |
95d647c8 |
// in order for revision history limit cleanup to work for paused
// deployments, we need to trigger it here
if err := c.cleanupOldDeployments(existingDeployments, config); err != nil {
c.recorder.Eventf(config, kapi.EventTypeWarning, "DeploymentCleanupFailed", "Couldn't clean up deployments: %v", err)
}
|
0e85054d |
return c.updateStatus(config, existingDeployments) |
9b2c4ab9 |
} |
d970f0a5 |
// No deployments are running and the latest deployment doesn't exist, so
// create the new deployment.
deployment, err := deployutil.MakeDeployment(config, c.codec) |
6c68632b |
if err != nil { |
0b38d810 |
return fatalError(fmt.Sprintf("couldn't make deployment from (potentially invalid) deployment config %s: %v", deployutil.LabelForDeploymentConfig(config), err)) |
6c68632b |
} |
cdbabcb7 |
created, err := c.rn.ReplicationControllers(config.Namespace).Create(deployment) |
d970f0a5 |
if err != nil {
// If the deployment was already created, just move on. The cache could be
// stale, or another process could have already handled this update. |
95d647c8 |
if kapierrors.IsAlreadyExists(err) { |
0e85054d |
return c.updateStatus(config, existingDeployments) |
6c68632b |
} |
f638b86d |
c.recorder.Eventf(config, kapi.EventTypeWarning, "DeploymentCreationFailed", "Couldn't deploy version %d: %s", config.Status.LatestVersion, err) |
e343373c |
// We don't care about this error since we need to report the create failure.
cond := deployutil.NewDeploymentCondition(deployapi.DeploymentProgressing, kapi.ConditionFalse, deployutil.FailedRcCreateReason, err.Error())
_ = c.updateStatus(config, existingDeployments, *cond) |
0b38d810 |
return fmt.Errorf("couldn't create deployment for deployment config %s: %v", deployutil.LabelForDeploymentConfig(config), err) |
6c68632b |
} |
e343373c |
msg := fmt.Sprintf("Created new replication controller %q for version %d", created.Name, config.Status.LatestVersion)
c.recorder.Eventf(config, kapi.EventTypeNormal, "DeploymentCreated", msg) |
dda90ccb |
|
95d647c8 |
// As we've just created a new deployment, we need to make sure to clean
// up old deployments if we have reached our deployment history quota
existingDeployments = append(existingDeployments, *created)
if err := c.cleanupOldDeployments(existingDeployments, config); err != nil {
c.recorder.Eventf(config, kapi.EventTypeWarning, "DeploymentCleanupFailed", "Couldn't clean up deployments: %v", err)
}
|
24fa302c |
cond := deployutil.NewDeploymentCondition(deployapi.DeploymentProgressing, kapi.ConditionTrue, deployutil.NewReplicationControllerReason, msg) |
e343373c |
return c.updateStatus(config, existingDeployments, *cond) |
6c68632b |
}
|
d970f0a5 |
// reconcileDeployments reconciles existing deployment replica counts which
// could have diverged outside the deployment process (e.g. due to auto or
// manual scaling, or partial deployments). The active deployment is the last
// successful deployment, not necessarily the latest in terms of the config
// version. The active deployment replica count should follow the config, and
// all other deployments should be scaled to zero. |
cdbabcb7 |
func (c *DeploymentConfigController) reconcileDeployments(existingDeployments []kapi.ReplicationController, config *deployapi.DeploymentConfig) error { |
8127e853 |
activeDeployment := deployutil.ActiveDeployment(existingDeployments) |
aa9b471e |
|
d970f0a5 |
// Reconcile deployments. The active deployment follows the config, and all
// other deployments should be scaled to zero. |
0e85054d |
var updatedDeployments []kapi.ReplicationController
for i := range existingDeployments {
deployment := existingDeployments[i]
toAppend := deployment
|
d970f0a5 |
isActiveDeployment := activeDeployment != nil && deployment.Name == activeDeployment.Name |
0b38d810 |
|
2429a35c |
oldReplicaCount := deployment.Spec.Replicas
newReplicaCount := int32(0) |
d970f0a5 |
if isActiveDeployment { |
066150dd |
newReplicaCount = config.Spec.Replicas |
d970f0a5 |
} |
aa9b471e |
if config.Spec.Test {
glog.V(4).Infof("Deployment config %q is test and deployment %q will be scaled down", deployutil.LabelForDeploymentConfig(config), deployutil.LabelForDeployment(&deployment))
newReplicaCount = 0
} |
066150dd |
|
d970f0a5 |
// Only update if necessary. |
f550e74e |
var copied *kapi.ReplicationController |
066150dd |
if newReplicaCount != oldReplicaCount { |
f550e74e |
if err := kclient.RetryOnConflict(kclient.DefaultBackoff, func() error {
// refresh the replication controller version
rc, err := c.rcStore.ReplicationControllers(deployment.Namespace).Get(deployment.Name)
if err != nil {
return err
}
copied, err = deployutil.DeploymentDeepCopy(rc)
if err != nil {
glog.V(2).Infof("Deep copy of deployment %q failed: %v", rc.Name, err)
return err
}
copied.Spec.Replicas = newReplicaCount |
066150dd |
copied, err = c.rn.ReplicationControllers(copied.Namespace).Update(copied) |
547a40ee |
return err |
f550e74e |
}); err != nil { |
066150dd |
c.recorder.Eventf(config, kapi.EventTypeWarning, "ReplicationControllerScaleFailed",
"Failed to scale replication controler %q from %d to %d: %v", deployment.Name, oldReplicaCount, newReplicaCount, err) |
d970f0a5 |
return err
} |
f550e74e |
|
066150dd |
c.recorder.Eventf(config, kapi.EventTypeNormal, "ReplicationControllerScaled", "Scaled replication controller %q from %d to %d", copied.Name, oldReplicaCount, newReplicaCount) |
0e85054d |
toAppend = *copied |
d970f0a5 |
} |
0e85054d |
updatedDeployments = append(updatedDeployments, toAppend) |
0b38d810 |
} |
dda90ccb |
|
95d647c8 |
// As the deployment configuration has changed, we need to make sure to clean
// up old deployments if we have now reached our deployment history quota |
066150dd |
if err := c.cleanupOldDeployments(updatedDeployments, config); err != nil {
c.recorder.Eventf(config, kapi.EventTypeWarning, "ReplicationControllerCleanupFailed", "Couldn't clean up replication controllers: %v", err) |
95d647c8 |
}
|
0e85054d |
return c.updateStatus(config, updatedDeployments) |
dda90ccb |
}
|
e343373c |
// Update the status of the provided deployment config. Additional conditions will override any other condition in the
// deployment config status.
func (c *DeploymentConfigController) updateStatus(config *deployapi.DeploymentConfig, deployments []kapi.ReplicationController, additional ...deployapi.DeploymentCondition) error {
newStatus, err := c.calculateStatus(*config, deployments, additional...) |
0e85054d |
if err != nil {
glog.V(2).Infof("Cannot calculate the status for %q: %v", deployutil.LabelForDeploymentConfig(config), err)
return err
}
|
cdbabcb7 |
// NOTE: We should update the status of the deployment config only if we need to, otherwise
// we hotloop between updates. |
0e85054d |
if reflect.DeepEqual(newStatus, config.Status) { |
cdbabcb7 |
return nil |
dda90ccb |
} |
0e85054d |
|
8be964bd |
copied, err := deployutil.DeploymentConfigDeepCopy(config)
if err != nil {
return err
}
copied.Status = newStatus |
e343373c |
// TODO: Retry update conficts |
8be964bd |
if _, err := c.dn.DeploymentConfigs(copied.Namespace).UpdateStatus(copied); err != nil {
glog.V(2).Infof("Cannot update the status for %q: %v", deployutil.LabelForDeploymentConfig(copied), err) |
cdbabcb7 |
return err |
dda90ccb |
} |
8be964bd |
glog.V(4).Infof("Updated the status for %q (observed generation: %d)", deployutil.LabelForDeploymentConfig(copied), copied.Status.ObservedGeneration) |
dda90ccb |
return nil |
0b38d810 |
} |
547a40ee |
|
e343373c |
func (c *DeploymentConfigController) calculateStatus(config deployapi.DeploymentConfig, deployments []kapi.ReplicationController, additional ...deployapi.DeploymentCondition) (deployapi.DeploymentConfigStatus, error) { |
0e85054d |
selector := labels.Set(config.Spec.Selector).AsSelector() |
e343373c |
// TODO: Replace with using rc.status.availableReplicas that comes with the next rebase. |
0e85054d |
pods, err := c.podStore.Pods(config.Namespace).List(selector)
if err != nil {
return config.Status, err
} |
baf9fa77 |
available := deployutil.GetAvailablePods(pods, config.Spec.MinReadySeconds) |
0e85054d |
// UpdatedReplicas represents the replicas that use the deployment config template which means
// we should inform about the replicas of the latest deployment and not the active.
latestReplicas := int32(0) |
e343373c |
latestExists, latestRC := deployutil.LatestDeploymentInfo(&config, deployments)
if !latestExists {
latestRC = nil
} else {
latestReplicas = deployutil.GetStatusReplicaCountForDeployments([]kapi.ReplicationController{*latestRC}) |
0e85054d |
}
total := deployutil.GetReplicaCountForDeployments(deployments)
|
e343373c |
status := deployapi.DeploymentConfigStatus{ |
0e85054d |
LatestVersion: config.Status.LatestVersion,
Details: config.Status.Details,
ObservedGeneration: config.Generation,
Replicas: deployutil.GetStatusReplicaCountForDeployments(deployments),
UpdatedReplicas: latestReplicas,
AvailableReplicas: available,
UnavailableReplicas: total - available, |
e343373c |
Conditions: config.Status.Conditions,
}
|
6b52405f |
updateConditions(config, &status, latestRC) |
e343373c |
for _, cond := range additional {
deployutil.SetDeploymentCondition(&status, cond)
}
return status, nil |
0e85054d |
}
|
6b52405f |
func updateConditions(config deployapi.DeploymentConfig, newStatus *deployapi.DeploymentConfigStatus, latestRC *kapi.ReplicationController) { |
48a4dbf8 |
// Availability condition. |
e343373c |
if newStatus.AvailableReplicas >= config.Spec.Replicas-deployutil.MaxUnavailable(config) && newStatus.AvailableReplicas > 0 { |
48a4dbf8 |
minAvailability := deployutil.NewDeploymentCondition(deployapi.DeploymentAvailable, kapi.ConditionTrue, "", "Deployment config has minimum availability.")
deployutil.SetDeploymentCondition(newStatus, *minAvailability)
} else {
noMinAvailability := deployutil.NewDeploymentCondition(deployapi.DeploymentAvailable, kapi.ConditionFalse, "", "Deployment config does not have minimum availability.")
deployutil.SetDeploymentCondition(newStatus, *noMinAvailability)
} |
e343373c |
|
48a4dbf8 |
// Condition about progress.
if latestRC != nil {
switch deployutil.DeploymentStatusFor(latestRC) { |
e343373c |
case deployapi.DeploymentStatusPending: |
56f46266 |
msg := fmt.Sprintf("Replication controller %q is waiting for pod %q to run", latestRC.Name, deployutil.DeployerPodNameForDeployment(latestRC.Name)) |
48a4dbf8 |
condition := deployutil.NewDeploymentCondition(deployapi.DeploymentProgressing, kapi.ConditionUnknown, "", msg)
deployutil.SetDeploymentCondition(newStatus, *condition)
case deployapi.DeploymentStatusRunning: |
6b52405f |
if deployutil.IsProgressing(config, *newStatus) { |
e343373c |
deployutil.RemoveDeploymentCondition(newStatus, deployapi.DeploymentProgressing)
msg := fmt.Sprintf("Replication controller %q is progressing", latestRC.Name)
condition := deployutil.NewDeploymentCondition(deployapi.DeploymentProgressing, kapi.ConditionTrue, deployutil.ReplicationControllerUpdatedReason, msg)
// TODO: Right now, we use lastTransitionTime for storing the last time we had any progress instead
// of the last time the condition transitioned to a new status. We should probably change that.
deployutil.SetDeploymentCondition(newStatus, *condition) |
48a4dbf8 |
} |
e343373c |
case deployapi.DeploymentStatusFailed: |
48a4dbf8 |
msg := fmt.Sprintf("Replication controller %q has failed progressing", latestRC.Name)
condition := deployutil.NewDeploymentCondition(deployapi.DeploymentProgressing, kapi.ConditionFalse, deployutil.TimedOutReason, msg)
deployutil.SetDeploymentCondition(newStatus, *condition)
case deployapi.DeploymentStatusComplete:
msg := fmt.Sprintf("Replication controller %q has completed progressing", latestRC.Name)
condition := deployutil.NewDeploymentCondition(deployapi.DeploymentProgressing, kapi.ConditionTrue, deployutil.NewRcAvailableReason, msg)
deployutil.SetDeploymentCondition(newStatus, *condition)
}
}
}
|
cdbabcb7 |
func (c *DeploymentConfigController) handleErr(err error, key interface{}) {
if err == nil { |
57d0a818 |
c.queue.Forget(key) |
cdbabcb7 |
return
} |
57d0a818 |
|
cdbabcb7 |
if _, isFatal := err.(fatalError); isFatal {
utilruntime.HandleError(err)
c.queue.Forget(key)
return
}
|
57d0a818 |
if c.queue.NumRequeues(key) < MaxRetries {
glog.V(2).Infof("Error syncing deployment config %v: %v", key, err) |
cdbabcb7 |
c.queue.AddRateLimited(key) |
57d0a818 |
return |
cdbabcb7 |
} |
57d0a818 |
utilruntime.HandleError(err)
c.queue.Forget(key) |
cdbabcb7 |
} |
95d647c8 |
// cleanupOldDeployments deletes old replication controller deployments if their quota has been reached
func (c *DeploymentConfigController) cleanupOldDeployments(existingDeployments []kapi.ReplicationController, deploymentConfig *deployapi.DeploymentConfig) error {
if deploymentConfig.Spec.RevisionHistoryLimit == nil {
// there is no past deplyoment quota set
return nil
}
prunableDeployments := deployutil.DeploymentsForCleanup(deploymentConfig, existingDeployments) |
b6c41af9 |
if len(prunableDeployments) <= int(*deploymentConfig.Spec.RevisionHistoryLimit) { |
95d647c8 |
// the past deployment quota has not been exceeded
return nil
}
deletionErrors := []error{} |
b6c41af9 |
for i := 0; i < (len(prunableDeployments) - int(*deploymentConfig.Spec.RevisionHistoryLimit)); i++ { |
95d647c8 |
deployment := prunableDeployments[i]
if deployment.Spec.Replicas != 0 {
// we do not want to clobber active older deployments, but we *do* want them to count
// against the quota so that they will be pruned when they're scaled down
continue
}
|
baf9fa77 |
err := c.rn.ReplicationControllers(deployment.Namespace).Delete(deployment.Name, nil) |
95d647c8 |
if err != nil && !kapierrors.IsNotFound(err) {
glog.V(2).Infof("Failed deleting old Replication Controller %q for Deployment Config %q: %v", deployment.Name, deploymentConfig.Name, err)
deletionErrors = append(deletionErrors, err)
}
}
return kutilerrors.NewAggregate(deletionErrors)
} |