package deployment import ( "time" "github.com/golang/glog" kapi "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/cache" kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" "k8s.io/kubernetes/pkg/client/record" kcontroller "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/workqueue" deployutil "github.com/openshift/origin/pkg/deploy/util" ) const ( // We must avoid creating processing deployment configs until the deployment config and image // stream stores have synced. If it hasn't synced, to avoid a hot loop, we'll wait this long // between checks. storeSyncedPollPeriod = 100 * time.Millisecond ) // NewDeploymentController creates a new DeploymentController. func NewDeploymentController(rcInformer, podInformer framework.SharedIndexInformer, kc kclientset.Interface, sa, image string, env []kapi.EnvVar, codec runtime.Codec) *DeploymentController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartRecordingToSink(&kcoreclient.EventSinkImpl{Interface: kc.Core().Events("")}) recorder := eventBroadcaster.NewRecorder(kapi.EventSource{Component: "deployments-controller"}) c := &DeploymentController{ rn: kc.Core(), pn: kc.Core(), queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), serviceAccount: sa, deployerImage: image, environment: env, recorder: recorder, codec: codec, } c.rcStore.Indexer = rcInformer.GetIndexer() rcInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{ AddFunc: c.addReplicationController, UpdateFunc: c.updateReplicationController, }) c.podStore.Indexer = podInformer.GetIndexer() podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{ UpdateFunc: c.updatePod, DeleteFunc: c.deletePod, }) c.rcStoreSynced = rcInformer.HasSynced c.podStoreSynced = podInformer.HasSynced return c } // Run begins watching and syncing. func (c *DeploymentController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() // Wait for the dc store to sync before starting any work in this controller. ready := make(chan struct{}) go c.waitForSyncedStores(ready, stopCh) select { case <-ready: case <-stopCh: return } for i := 0; i < workers; i++ { go wait.Until(c.worker, time.Second, stopCh) } <-stopCh glog.Infof("Shutting down deployment controller") c.queue.ShutDown() } func (c *DeploymentController) waitForSyncedStores(ready chan<- struct{}, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() for !c.rcStoreSynced() || !c.podStoreSynced() { glog.V(4).Infof("Waiting for the rc and pod caches to sync before starting the deployment controller workers") select { case <-time.After(storeSyncedPollPeriod): case <-stopCh: return } } close(ready) } func (c *DeploymentController) addReplicationController(obj interface{}) { rc := obj.(*kapi.ReplicationController) // Filter out all unrelated replication controllers. if !deployutil.IsOwnedByConfig(rc) { return } c.enqueueReplicationController(rc) } func (c *DeploymentController) updateReplicationController(old, cur interface{}) { // A periodic relist will send update events for all known controllers. curRC := cur.(*kapi.ReplicationController) oldRC := old.(*kapi.ReplicationController) if curRC.ResourceVersion == oldRC.ResourceVersion { return } // Filter out all unrelated replication controllers. if !deployutil.IsOwnedByConfig(curRC) { return } c.enqueueReplicationController(curRC) } func (c *DeploymentController) updatePod(old, cur interface{}) { // A periodic relist will send update events for all known pods. curPod := cur.(*kapi.Pod) oldPod := old.(*kapi.Pod) if curPod.ResourceVersion == oldPod.ResourceVersion { return } if rc, err := c.rcForDeployerPod(curPod); err == nil && rc != nil { c.enqueueReplicationController(rc) } } func (c *DeploymentController) deletePod(obj interface{}) { pod, ok := obj.(*kapi.Pod) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { glog.Errorf("Couldn't get object from tombstone: %+v", obj) return } pod, ok = tombstone.Obj.(*kapi.Pod) if !ok { glog.Errorf("Tombstone contained object that is not a pod: %+v", obj) return } } if rc, err := c.rcForDeployerPod(pod); err == nil && rc != nil { c.enqueueReplicationController(rc) } } func (c *DeploymentController) enqueueReplicationController(rc *kapi.ReplicationController) { key, err := kcontroller.KeyFunc(rc) if err != nil { glog.Errorf("Couldn't get key for object %#v: %v", rc, err) return } c.queue.Add(key) } func (c *DeploymentController) rcForDeployerPod(pod *kapi.Pod) (*kapi.ReplicationController, error) { key := pod.Namespace + "/" + deployutil.DeploymentNameFor(pod) return c.getByKey(key) } func (c *DeploymentController) worker() { for { if quit := c.work(); quit { return } } } func (c *DeploymentController) work() bool { key, quit := c.queue.Get() if quit { return true } defer c.queue.Done(key) rc, err := c.getByKey(key.(string)) if err != nil { glog.Error(err.Error()) } if rc == nil { return false } err = c.Handle(rc) c.handleErr(err, key, rc) return false } func (c *DeploymentController) getByKey(key string) (*kapi.ReplicationController, error) { obj, exists, err := c.rcStore.Indexer.GetByKey(key) if err != nil { glog.Infof("Unable to retrieve replication controller %q from store: %v", key, err) c.queue.Add(key) return nil, err } if !exists { glog.Infof("Replication controller %q has been deleted", key) return nil, nil } return obj.(*kapi.ReplicationController), nil }