package generictrigger
import (
"reflect"
"time"
"github.com/golang/glog"
kcontroller "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
osclient "github.com/openshift/origin/pkg/client"
deployapi "github.com/openshift/origin/pkg/deploy/api"
deployutil "github.com/openshift/origin/pkg/deploy/util"
imageapi "github.com/openshift/origin/pkg/image/api"
)
const (
// We must avoid creating processing deployment configs until the deployment config and image
// stream stores have synced. If it hasn't synced, to avoid a hot loop, we'll wait this long
// between checks.
storeSyncedPollPeriod = 100 * time.Millisecond
// MaxRetries is the number of times a deployment config will be retried before it is dropped
// out of the queue.
MaxRetries = 5
)
// NewDeploymentTriggerController returns a new DeploymentTriggerController.
func NewDeploymentTriggerController(dcInformer, rcInformer, streamInformer framework.SharedIndexInformer, oc osclient.Interface, codec runtime.Codec) *DeploymentTriggerController {
c := &DeploymentTriggerController{
dn: oc,
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
codec: codec,
}
dcInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
AddFunc: c.addDeploymentConfig,
UpdateFunc: c.updateDeploymentConfig,
})
streamInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
AddFunc: c.addImageStream,
UpdateFunc: c.updateImageStream,
})
c.dcLister.Indexer = dcInformer.GetIndexer()
c.rcLister.Indexer = rcInformer.GetIndexer()
c.dcListerSynced = dcInformer.HasSynced
c.rcListerSynced = rcInformer.HasSynced
return c
}
// Run begins watching and syncing.
func (c *DeploymentTriggerController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// Wait for the dc store to sync before starting any work in this controller.
ready := make(chan struct{})
go c.waitForSyncedStore(ready, stopCh)
select {
case <-ready:
case <-stopCh:
return
}
for i := 0; i < workers; i++ {
go wait.Until(c.worker, time.Second, stopCh)
}
<-stopCh
glog.Infof("Shutting down deployment trigger controller")
c.queue.ShutDown()
}
func (c *DeploymentTriggerController) waitForSyncedStore(ready chan<- struct{}, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
for !c.dcListerSynced() || !c.rcListerSynced() {
glog.V(4).Infof("Waiting for the dc and rc caches to sync before starting the trigger controller workers")
select {
case <-time.After(storeSyncedPollPeriod):
case <-stopCh:
return
}
}
close(ready)
}
func (c *DeploymentTriggerController) addDeploymentConfig(obj interface{}) {
dc := obj.(*deployapi.DeploymentConfig)
// No need to enqueue deployment configs that have no triggers or are paused.
if len(dc.Spec.Triggers) == 0 || dc.Spec.Paused {
return
}
// We don't want to compete with the main deployment config controller. Let's process this
// config once it's synced.
if !deployutil.HasSynced(dc, dc.Generation) {
return
}
c.enqueueDeploymentConfig(dc)
}
func (c *DeploymentTriggerController) updateDeploymentConfig(old, cur interface{}) {
newDc := cur.(*deployapi.DeploymentConfig)
oldDc := old.(*deployapi.DeploymentConfig)
// A periodic relist will send update events for all known deployment configs.
if newDc.ResourceVersion == oldDc.ResourceVersion {
return
}
// No need to enqueue deployment configs that have no triggers or are paused.
if len(newDc.Spec.Triggers) == 0 || newDc.Spec.Paused {
return
}
// We don't want to compete with the main deployment config controller. Let's process this
// config once it's synced. Note that this does not eliminate conflicts between the two
// controllers because the main controller is constantly updating deployment configs as
// owning replication controllers and pods are updated.
if !deployutil.HasSynced(newDc, newDc.Generation) {
return
}
// Enqueue the deployment config if it hasn't been deployed yet.
if newDc.Status.LatestVersion == 0 {
c.enqueueDeploymentConfig(newDc)
return
}
// Compare deployment config templates before enqueueing. This reduces the amount of times
// we will try to instantiate a deployment config at the expense of duplicating some of the
// work that the instantiate endpoint is already doing but I think this is fine.
shouldInstantiate := true
latestRc, err := c.rcLister.ReplicationControllers(newDc.Namespace).Get(deployutil.LatestDeploymentNameForConfig(newDc))
if err != nil {
// If we get an error here it may be due to the rc cache lagging behind. In such a case
// just defer to the api server (instantiate REST) where we will retry this.
glog.V(2).Infof("Cannot get latest rc for dc %s:%d (%v) - will defer to instantiate", deployutil.LabelForDeploymentConfig(newDc), newDc.Status.LatestVersion, err)
} else {
initial, err := deployutil.DecodeDeploymentConfig(latestRc, c.codec)
if err != nil {
glog.V(2).Infof("Cannot decode dc from replication controller %s: %v", deployutil.LabelForDeployment(latestRc), err)
return
}
shouldInstantiate = !reflect.DeepEqual(newDc.Spec.Template, initial.Spec.Template)
}
if !shouldInstantiate {
return
}
c.enqueueDeploymentConfig(newDc)
}
// addImageStream enqueues the deployment configs that point to the new image stream.
func (c *DeploymentTriggerController) addImageStream(obj interface{}) {
stream := obj.(*imageapi.ImageStream)
glog.V(4).Infof("Image stream %q added.", stream.Name)
dcList, err := c.dcLister.GetConfigsForImageStream(stream)
if err != nil {
return
}
// TODO: We could check image stream tags here and enqueue only deployment configs
// with stale lastTriggeredImages.
for _, dc := range dcList {
c.enqueueDeploymentConfig(dc)
}
}
// updateImageStream enqueues the deployment configs that point to the updated image stream.
func (c *DeploymentTriggerController) updateImageStream(old, cur interface{}) {
// A periodic relist will send update events for all known streams.
newStream := cur.(*imageapi.ImageStream)
oldStream := old.(*imageapi.ImageStream)
if newStream.ResourceVersion == oldStream.ResourceVersion {
return
}
glog.V(4).Infof("Image stream %q updated.", newStream.Name)
dcList, err := c.dcLister.GetConfigsForImageStream(newStream)
if err != nil {
return
}
// TODO: We could check image stream tags here and enqueue only deployment configs
// with stale lastTriggeredImages.
for _, dc := range dcList {
c.enqueueDeploymentConfig(dc)
}
}
func (c *DeploymentTriggerController) enqueueDeploymentConfig(dc *deployapi.DeploymentConfig) {
key, err := kcontroller.KeyFunc(dc)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", dc, err)
return
}
c.queue.Add(key)
}
func (c *DeploymentTriggerController) worker() {
for {
if quit := c.work(); quit {
return
}
}
}
func (c *DeploymentTriggerController) work() bool {
key, quit := c.queue.Get()
if quit {
return true
}
defer c.queue.Done(key)
dc, err := c.getByKey(key.(string))
if err != nil {
glog.Error(err.Error())
}
if dc == nil {
return false
}
err = c.Handle(dc)
c.handleErr(err, key)
return false
}
func (c *DeploymentTriggerController) getByKey(key string) (*deployapi.DeploymentConfig, error) {
obj, exists, err := c.dcLister.Indexer.GetByKey(key)
if err != nil {
glog.Infof("Unable to retrieve deployment config %q from store: %v", key, err)
c.queue.Add(key)
return nil, err
}
if !exists {
glog.Infof("Deployment config %q has been deleted", key)
return nil, nil
}
return obj.(*deployapi.DeploymentConfig), nil
}