package deployment
import (
"time"
"github.com/golang/glog"
kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
"k8s.io/kubernetes/pkg/client/record"
kcontroller "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
deployutil "github.com/openshift/origin/pkg/deploy/util"
)
const (
// We must avoid creating processing deployment configs until the deployment config and image
// stream stores have synced. If it hasn't synced, to avoid a hot loop, we'll wait this long
// between checks.
storeSyncedPollPeriod = 100 * time.Millisecond
)
// NewDeploymentController creates a new DeploymentController.
func NewDeploymentController(rcInformer, podInformer framework.SharedIndexInformer, kc kclientset.Interface, sa, image string, env []kapi.EnvVar, codec runtime.Codec) *DeploymentController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&kcoreclient.EventSinkImpl{Interface: kc.Core().Events("")})
recorder := eventBroadcaster.NewRecorder(kapi.EventSource{Component: "deployments-controller"})
c := &DeploymentController{
rn: kc.Core(),
pn: kc.Core(),
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
serviceAccount: sa,
deployerImage: image,
environment: env,
recorder: recorder,
codec: codec,
}
c.rcStore.Indexer = rcInformer.GetIndexer()
rcInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
AddFunc: c.addReplicationController,
UpdateFunc: c.updateReplicationController,
})
c.podStore.Indexer = podInformer.GetIndexer()
podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
UpdateFunc: c.updatePod,
DeleteFunc: c.deletePod,
})
c.rcStoreSynced = rcInformer.HasSynced
c.podStoreSynced = podInformer.HasSynced
return c
}
// Run begins watching and syncing.
func (c *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// Wait for the dc store to sync before starting any work in this controller.
ready := make(chan struct{})
go c.waitForSyncedStores(ready, stopCh)
select {
case <-ready:
case <-stopCh:
return
}
for i := 0; i < workers; i++ {
go wait.Until(c.worker, time.Second, stopCh)
}
<-stopCh
glog.Infof("Shutting down deployment controller")
c.queue.ShutDown()
}
func (c *DeploymentController) waitForSyncedStores(ready chan<- struct{}, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
for !c.rcStoreSynced() || !c.podStoreSynced() {
glog.V(4).Infof("Waiting for the rc and pod caches to sync before starting the deployment controller workers")
select {
case <-time.After(storeSyncedPollPeriod):
case <-stopCh:
return
}
}
close(ready)
}
func (c *DeploymentController) addReplicationController(obj interface{}) {
rc := obj.(*kapi.ReplicationController)
// Filter out all unrelated replication controllers.
if !deployutil.IsOwnedByConfig(rc) {
return
}
c.enqueueReplicationController(rc)
}
func (c *DeploymentController) updateReplicationController(old, cur interface{}) {
// A periodic relist will send update events for all known controllers.
curRC := cur.(*kapi.ReplicationController)
oldRC := old.(*kapi.ReplicationController)
if curRC.ResourceVersion == oldRC.ResourceVersion {
return
}
// Filter out all unrelated replication controllers.
if !deployutil.IsOwnedByConfig(curRC) {
return
}
c.enqueueReplicationController(curRC)
}
func (c *DeploymentController) updatePod(old, cur interface{}) {
// A periodic relist will send update events for all known pods.
curPod := cur.(*kapi.Pod)
oldPod := old.(*kapi.Pod)
if curPod.ResourceVersion == oldPod.ResourceVersion {
return
}
if rc, err := c.rcForDeployerPod(curPod); err == nil && rc != nil {
c.enqueueReplicationController(rc)
}
}
func (c *DeploymentController) deletePod(obj interface{}) {
pod, ok := obj.(*kapi.Pod)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone: %+v", obj)
return
}
pod, ok = tombstone.Obj.(*kapi.Pod)
if !ok {
glog.Errorf("Tombstone contained object that is not a pod: %+v", obj)
return
}
}
if rc, err := c.rcForDeployerPod(pod); err == nil && rc != nil {
c.enqueueReplicationController(rc)
}
}
func (c *DeploymentController) enqueueReplicationController(rc *kapi.ReplicationController) {
key, err := kcontroller.KeyFunc(rc)
if err != nil {
glog.Errorf("Couldn't get key for object %#v: %v", rc, err)
return
}
c.queue.Add(key)
}
func (c *DeploymentController) rcForDeployerPod(pod *kapi.Pod) (*kapi.ReplicationController, error) {
key := pod.Namespace + "/" + deployutil.DeploymentNameFor(pod)
return c.getByKey(key)
}
func (c *DeploymentController) worker() {
for {
if quit := c.work(); quit {
return
}
}
}
func (c *DeploymentController) work() bool {
key, quit := c.queue.Get()
if quit {
return true
}
defer c.queue.Done(key)
rc, err := c.getByKey(key.(string))
if err != nil {
glog.Error(err.Error())
}
if rc == nil {
return false
}
err = c.Handle(rc)
c.handleErr(err, key, rc)
return false
}
func (c *DeploymentController) getByKey(key string) (*kapi.ReplicationController, error) {
obj, exists, err := c.rcStore.Indexer.GetByKey(key)
if err != nil {
glog.Infof("Unable to retrieve replication controller %q from store: %v", key, err)
c.queue.Add(key)
return nil, err
}
if !exists {
glog.Infof("Replication controller %q has been deleted", key)
return nil, nil
}
return obj.(*kapi.ReplicationController), nil
}