package controller
import (
"sync"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/util/flowcontrol"
utilwait "k8s.io/kubernetes/pkg/util/wait"
)
// Scheduler is a self-balancing, rate-limited, bucketed queue that can periodically invoke
// an action on all items in a bucket before moving to the next bucket. A ratelimiter sets
// an upper bound on the number of buckets processed per unit time. The queue has a key and a
// value, so both uniqueness and equality can be tested (key must be unique, value can carry
// info for the next processing). Items remain in the queue until removed by a call to Remove().
type Scheduler struct {
handle func(key, value interface{})
position int
limiter flowcontrol.RateLimiter
mu sync.Mutex
buckets []bucket
}
type bucket map[interface{}]interface{}
// NewScheduler creates a scheduler with bucketCount buckets, a rate limiter for restricting
// the rate at which buckets are processed, and a function to invoke when items are scanned in
// a bucket.
// TODO: remove DEBUG statements from this file once this logic has been adequately validated.
func NewScheduler(bucketCount int, bucketLimiter flowcontrol.RateLimiter, fn func(key, value interface{})) *Scheduler {
// add one more bucket to serve as the "current" bucket
bucketCount++
buckets := make([]bucket, bucketCount)
for i := range buckets {
buckets[i] = make(bucket)
}
return &Scheduler{
handle: fn,
buckets: buckets,
limiter: bucketLimiter,
}
}
// RunUntil launches the scheduler until ch is closed.
func (s *Scheduler) RunUntil(ch <-chan struct{}) {
go utilwait.Until(s.RunOnce, 0, ch)
}
// RunOnce takes a single item out of the current bucket and processes it. If
// the bucket is empty, we wait for the rate limiter before returning.
func (s *Scheduler) RunOnce() {
key, value, last := s.next()
if last {
glog.V(5).Infof("DEBUG: scheduler: waiting for limit")
s.limiter.Accept()
return
}
glog.V(5).Infof("DEBUG: scheduler: handle %s", key)
s.handle(key, value)
}
// at returns the bucket index relative to the current bucket.
func (s *Scheduler) at(inc int) int {
return (s.position + inc + len(s.buckets)) % len(s.buckets)
}
// next takes a key from the current bucket and places it in the last bucket, returns the
// removed key. Returns true if the current bucket is empty and no key and value were returned.
func (s *Scheduler) next() (interface{}, interface{}, bool) {
s.mu.Lock()
defer s.mu.Unlock()
glog.V(5).Infof("DEBUG: scheduler: queue (%d):\n %#v", s.position, s.buckets)
last := s.buckets[s.position]
if len(last) == 0 {
s.position = s.at(1)
glog.V(5).Infof("DEBUG: scheduler: position: %d %d", s.position, len(s.buckets))
last = s.buckets[s.position]
}
for k, v := range last {
delete(last, k)
s.buckets[s.at(-1)][k] = v
return k, v, false
}
return nil, nil, true
}
// Add places the key in the bucket with the least entries (except the current bucket). The key is used to
// determine uniqueness, while value can be used to associate additional data for later retrieval. An Add
// removes the previous key and value and will place the item in a new bucket. This allows callers to ensure
// that Add'ing a new item to the queue purges old versions of the item, while Remove can be conditional on
// removing only the known old version.
func (s *Scheduler) Add(key, value interface{}) {
s.mu.Lock()
defer s.mu.Unlock()
for _, bucket := range s.buckets {
delete(bucket, key)
}
// pick the bucket with the least entries that is furthest from the current position
n := len(s.buckets)
base := s.position + n
target, least := 0, 0
for i := n - 1; i > 0; i-- {
position := (base + i) % n
size := len(s.buckets[position])
if size == 0 {
target = position
break
}
if size < least || least == 0 {
target = position
least = size
}
}
s.buckets[target][key] = value
}
// Remove takes the key out of all buckets. If value is non-nil, the key will only be removed if it has
// the same value. Returns true if the key was removed.
func (s *Scheduler) Remove(key, value interface{}) bool {
s.mu.Lock()
defer s.mu.Unlock()
match := true
for _, bucket := range s.buckets {
if value != nil {
if old, ok := bucket[key]; ok && old != value {
match = false
continue
}
}
delete(bucket, key)
}
return match
}
// Delay moves the key to the end of the chain if it exists.
func (s *Scheduler) Delay(key interface{}) {
s.mu.Lock()
defer s.mu.Unlock()
last := s.at(-1)
for i, bucket := range s.buckets {
if i == last {
continue
}
if value, ok := bucket[key]; ok {
delete(bucket, key)
s.buckets[last][key] = value
}
}
}
// Len returns the number of scheduled items.
func (s *Scheduler) Len() int {
s.mu.Lock()
defer s.mu.Unlock()
count := 0
for _, bucket := range s.buckets {
count += len(bucket)
}
return count
}
// Map returns a copy of the scheduler contents, but does not copy the keys or values themselves.
// If values and keys are not immutable, changing the value will affect the value in the queue.
func (s *Scheduler) Map() map[interface{}]interface{} {
s.mu.Lock()
defer s.mu.Unlock()
out := make(map[interface{}]interface{})
for _, bucket := range s.buckets {
for k, v := range bucket {
out[k] = v
}
}
return out
}