package generictrigger import ( "time" kapi "k8s.io/kubernetes/pkg/api" kclient "k8s.io/kubernetes/pkg/client/unversioned" kcontroller "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/workqueue" "github.com/golang/glog" osclient "github.com/openshift/origin/pkg/client" deployapi "github.com/openshift/origin/pkg/deploy/api" imageapi "github.com/openshift/origin/pkg/image/api" ) const ( // We must avoid creating processing deployment configs until the deployment config and image // stream stores have synced. If it hasn't synced, to avoid a hot loop, we'll wait this long // between checks. StoreSyncedPollPeriod = 100 * time.Millisecond ) // NewDeploymentTriggerController returns a new DeploymentTriggerController. func NewDeploymentTriggerController(dcInformer, streamInformer framework.SharedIndexInformer, oc osclient.Interface, kc kclient.Interface, codec runtime.Codec) *DeploymentTriggerController { c := &DeploymentTriggerController{ dn: oc, rn: kc, queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), codec: codec, } c.dcStore.Indexer = dcInformer.GetIndexer() dcInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{ AddFunc: c.addDeploymentConfig, UpdateFunc: c.updateDeploymentConfig, }) c.dcStoreSynced = dcInformer.HasSynced streamInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{ AddFunc: c.addImageStream, UpdateFunc: c.updateImageStream, }) return c } // Run begins watching and syncing. func (c *DeploymentTriggerController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() // Wait for the dc store to sync before starting any work in this controller. ready := make(chan struct{}) go c.waitForSyncedStore(ready, stopCh) select { case <-ready: case <-stopCh: return } for i := 0; i < workers; i++ { go wait.Until(c.worker, time.Second, stopCh) } <-stopCh glog.Infof("Shutting down deployment trigger controller") c.queue.ShutDown() } func (c *DeploymentTriggerController) waitForSyncedStore(ready chan<- struct{}, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() for !c.dcStoreSynced() { glog.V(4).Infof("Waiting for the deployment config cache to sync before starting the trigger controller workers") select { case <-time.After(StoreSyncedPollPeriod): case <-stopCh: return } } close(ready) } func (c *DeploymentTriggerController) addDeploymentConfig(obj interface{}) { dc := obj.(*deployapi.DeploymentConfig) c.enqueueDeploymentConfig(dc) } func (c *DeploymentTriggerController) updateDeploymentConfig(old, cur interface{}) { dc := cur.(*deployapi.DeploymentConfig) c.enqueueDeploymentConfig(dc) } // addImageStream enqueues the deployment configs that point to the new image stream. func (c *DeploymentTriggerController) addImageStream(obj interface{}) { stream := obj.(*imageapi.ImageStream) glog.V(4).Infof("Image stream %q added.", stream.Name) dcList, err := c.dcStore.GetConfigsForImageStream(stream) if err != nil { return } for _, dc := range dcList { c.enqueueDeploymentConfig(dc) } } // updateImageStream enqueues the deployment configs that point to the updated image stream. func (c *DeploymentTriggerController) updateImageStream(old, cur interface{}) { // A periodic relist will send update events for all known streams. if kapi.Semantic.DeepEqual(old, cur) { return } stream := cur.(*imageapi.ImageStream) glog.V(4).Infof("Image stream %q updated.", stream.Name) dcList, err := c.dcStore.GetConfigsForImageStream(stream) if err != nil { return } for _, dc := range dcList { c.enqueueDeploymentConfig(dc) } } func (c *DeploymentTriggerController) enqueueDeploymentConfig(dc *deployapi.DeploymentConfig) { key, err := kcontroller.KeyFunc(dc) if err != nil { glog.Errorf("Couldn't get key for object %+v: %v", dc, err) return } c.queue.Add(key) } func (c *DeploymentTriggerController) worker() { for { if quit := c.work(); quit { return } } } func (c *DeploymentTriggerController) work() bool { key, quit := c.queue.Get() if quit { return true } defer c.queue.Done(key) dc, err := c.getByKey(key.(string)) if err != nil { glog.Error(err.Error()) } if dc == nil { return false } if err := c.Handle(dc); err != nil { utilruntime.HandleError(err) if c.queue.NumRequeues(key) < 2 { c.queue.AddRateLimited(key) return false } } c.queue.Forget(key) return false } func (c *DeploymentTriggerController) getByKey(key string) (*deployapi.DeploymentConfig, error) { obj, exists, err := c.dcStore.Indexer.GetByKey(key) if err != nil { glog.Infof("Unable to retrieve deployment config %q from store: %v", key, err) c.queue.Add(key) return nil, err } if !exists { glog.Infof("Deployment config %q has been deleted", key) return nil, nil } return obj.(*deployapi.DeploymentConfig), nil }