package controller import ( "sync" "github.com/golang/glog" "k8s.io/kubernetes/pkg/util/flowcontrol" utilwait "k8s.io/kubernetes/pkg/util/wait" ) // Scheduler is a self-balancing, rate-limited, bucketed queue that can periodically invoke // an action on all items in a bucket before moving to the next bucket. A ratelimiter sets // an upper bound on the number of buckets processed per unit time. The queue has a key and a // value, so both uniqueness and equality can be tested (key must be unique, value can carry // info for the next processing). Items remain in the queue until removed by a call to Remove(). type Scheduler struct { handle func(key, value interface{}) position int limiter flowcontrol.RateLimiter mu sync.Mutex buckets []bucket } type bucket map[interface{}]interface{} // NewScheduler creates a scheduler with bucketCount buckets, a rate limiter for restricting // the rate at which buckets are processed, and a function to invoke when items are scanned in // a bucket. // TODO: remove DEBUG statements from this file once this logic has been adequately validated. func NewScheduler(bucketCount int, bucketLimiter flowcontrol.RateLimiter, fn func(key, value interface{})) *Scheduler { // add one more bucket to serve as the "current" bucket bucketCount++ buckets := make([]bucket, bucketCount) for i := range buckets { buckets[i] = make(bucket) } return &Scheduler{ handle: fn, buckets: buckets, limiter: bucketLimiter, } } // RunUntil launches the scheduler until ch is closed. func (s *Scheduler) RunUntil(ch <-chan struct{}) { go utilwait.Until(s.RunOnce, 0, ch) } // RunOnce takes a single item out of the current bucket and processes it. If // the bucket is empty, we wait for the rate limiter before returning. func (s *Scheduler) RunOnce() { key, value, last := s.next() if last { glog.V(5).Infof("DEBUG: scheduler: waiting for limit") s.limiter.Accept() return } glog.V(5).Infof("DEBUG: scheduler: handle %s", key) s.handle(key, value) } // at returns the bucket index relative to the current bucket. func (s *Scheduler) at(inc int) int { return (s.position + inc + len(s.buckets)) % len(s.buckets) } // next takes a key from the current bucket and places it in the last bucket, returns the // removed key. Returns true if the current bucket is empty and no key and value were returned. func (s *Scheduler) next() (interface{}, interface{}, bool) { s.mu.Lock() defer s.mu.Unlock() glog.V(5).Infof("DEBUG: scheduler: queue (%d):\n %#v", s.position, s.buckets) last := s.buckets[s.position] if len(last) == 0 { s.position = s.at(1) glog.V(5).Infof("DEBUG: scheduler: position: %d %d", s.position, len(s.buckets)) last = s.buckets[s.position] } for k, v := range last { delete(last, k) s.buckets[s.at(-1)][k] = v return k, v, false } return nil, nil, true } // Add places the key in the bucket with the least entries (except the current bucket). The key is used to // determine uniqueness, while value can be used to associate additional data for later retrieval. An Add // removes the previous key and value and will place the item in a new bucket. This allows callers to ensure // that Add'ing a new item to the queue purges old versions of the item, while Remove can be conditional on // removing only the known old version. func (s *Scheduler) Add(key, value interface{}) { s.mu.Lock() defer s.mu.Unlock() for _, bucket := range s.buckets { delete(bucket, key) } // pick the bucket with the least entries that is furthest from the current position n := len(s.buckets) base := s.position + n target, least := 0, 0 for i := n - 1; i > 0; i-- { position := (base + i) % n size := len(s.buckets[position]) if size == 0 { target = position break } if size < least || least == 0 { target = position least = size } } s.buckets[target][key] = value } // Remove takes the key out of all buckets. If value is non-nil, the key will only be removed if it has // the same value. Returns true if the key was removed. func (s *Scheduler) Remove(key, value interface{}) bool { s.mu.Lock() defer s.mu.Unlock() match := true for _, bucket := range s.buckets { if value != nil { if old, ok := bucket[key]; ok && old != value { match = false continue } } delete(bucket, key) } return match } // Delay moves the key to the end of the chain if it exists. func (s *Scheduler) Delay(key interface{}) { s.mu.Lock() defer s.mu.Unlock() last := s.at(-1) for i, bucket := range s.buckets { if i == last { continue } if value, ok := bucket[key]; ok { delete(bucket, key) s.buckets[last][key] = value } } } // Len returns the number of scheduled items. func (s *Scheduler) Len() int { s.mu.Lock() defer s.mu.Unlock() count := 0 for _, bucket := range s.buckets { count += len(bucket) } return count } // Map returns a copy of the scheduler contents, but does not copy the keys or values themselves. // If values and keys are not immutable, changing the value will affect the value in the queue. func (s *Scheduler) Map() map[interface{}]interface{} { s.mu.Lock() defer s.mu.Unlock() out := make(map[interface{}]interface{}) for _, bucket := range s.buckets { for k, v := range bucket { out[k] = v } } return out }