package controller import ( "time" "github.com/golang/glog" kapi "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/flowcontrol" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/watch" "github.com/openshift/origin/pkg/client" "github.com/openshift/origin/pkg/controller" "github.com/openshift/origin/pkg/image/api" ) // ImportControllerFactory can create an ImportController. type ImportControllerFactory struct { Client client.Interface ResyncInterval time.Duration MinimumCheckInterval time.Duration ImportRateLimiter flowcontrol.RateLimiter ScheduleEnabled bool } // Create creates an ImportController. func (f *ImportControllerFactory) Create() (controller.RunnableController, controller.StoppableController) { lw := &cache.ListWatch{ ListFunc: func(options kapi.ListOptions) (runtime.Object, error) { return f.Client.ImageStreams(kapi.NamespaceAll).List(options) }, WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) { return f.Client.ImageStreams(kapi.NamespaceAll).Watch(options) }, } q := cache.NewResyncableFIFO(cache.MetaNamespaceKeyFunc) cache.NewReflector(lw, &api.ImageStream{}, q, f.ResyncInterval).Run() // instantiate a scheduled importer using a number of buckets buckets := 4 switch { case f.MinimumCheckInterval > time.Hour: buckets = 8 case f.MinimumCheckInterval < 10*time.Minute: buckets = 2 } seconds := f.MinimumCheckInterval / time.Second bucketQPS := 1.0 / float32(seconds) * float32(buckets) limiter := flowcontrol.NewTokenBucketRateLimiter(bucketQPS, 1) b := newScheduled(f.ScheduleEnabled, f.Client, buckets, limiter, f.ImportRateLimiter) // instantiate an importer for changes that happen to the image stream changed := &controller.RetryController{ Queue: q, RetryManager: controller.NewQueueRetryManager( q, cache.MetaNamespaceKeyFunc, func(obj interface{}, err error, retries controller.Retry) bool { utilruntime.HandleError(err) return retries.Count < 5 }, flowcontrol.NewTokenBucketRateLimiter(1, 10), ), Handle: b.Handle, } return changed, b.scheduler } type uniqueItem struct { uid string resourceVersion string } // scheduled watches for changes to image streams and adds them to the list of streams to be // periodically imported (later) or directly imported (now). type scheduled struct { enabled bool scheduler *controller.Scheduler rateLimiter flowcontrol.RateLimiter controller *ImportController } // newScheduled initializes a scheduled import object and sets its scheduler. Limiter is optional. func newScheduled(enabled bool, client client.ImageStreamsNamespacer, buckets int, bucketLimiter, importLimiter flowcontrol.RateLimiter) *scheduled { b := &scheduled{ enabled: enabled, rateLimiter: importLimiter, controller: &ImportController{ streams: client, }, } b.scheduler = controller.NewScheduler(buckets, bucketLimiter, b.HandleTimed) return b } // Handle ensures an image stream is checked for scheduling and then runs a direct import func (b *scheduled) Handle(obj interface{}) error { stream := obj.(*api.ImageStream) if b.enabled && needsScheduling(stream) { key, _ := cache.MetaNamespaceKeyFunc(stream) b.scheduler.Add(key, uniqueItem{uid: string(stream.UID), resourceVersion: stream.ResourceVersion}) } return b.controller.Next(stream, b) } // HandleTimed is invoked when a key is ready to be processed. func (b *scheduled) HandleTimed(key, value interface{}) { if !b.enabled { b.scheduler.Remove(key, value) return } glog.V(5).Infof("DEBUG: checking %s", key) if b.rateLimiter != nil && !b.rateLimiter.TryAccept() { glog.V(5).Infof("DEBUG: check of %s exceeded rate limit, will retry later", key) return } namespace, name, _ := cache.SplitMetaNamespaceKey(key.(string)) if err := b.controller.NextTimedByName(namespace, name); err != nil { // the stream cannot be imported if err == ErrNotImportable { // value must match to be removed, so we avoid races against creation by ensuring that we only // remove the stream if the uid and resource version in the scheduler are exactly the same. b.scheduler.Remove(key, value) return } utilruntime.HandleError(err) return } } // Importing is invoked when the controller decides to import a stream in order to push back // the next schedule time. func (b *scheduled) Importing(stream *api.ImageStream) { if !b.enabled { return } glog.V(5).Infof("DEBUG: stream %s was just imported", stream.Name) // Push the current key back to the end of the queue because it's just been imported key, _ := cache.MetaNamespaceKeyFunc(stream) b.scheduler.Delay(key) }