Browse code

Add a bucketed, rate-limited queue for periodic events

The queue is self balancing, periodically invoking an action on all
items in a bucket before moving to the next bucket. A ratelimiter sets
an upper bound on the number of buckets processed per unit time. The
queue has a key and a value, so both uniqueness and equality can be
tested (key must be unique, value can carry info for the next
processing).

Clayton Coleman authored on 2016/01/27 15:03:22
Showing 3 changed files
... ...
@@ -12,6 +12,13 @@ type RunnableController interface {
12 12
 	Run()
13 13
 }
14 14
 
15
+// StoppableController is a controller which implements a Run loop.
16
+type StoppableController interface {
17
+	// RunUntil starts the asynchronous controller loop, which runs until
18
+	// ch is closed.
19
+	RunUntil(ch <-chan struct{})
20
+}
21
+
15 22
 // RetryController is a RunnableController which delegates resource
16 23
 // handling to a function and knows how to safely manage retries of a resource
17 24
 // which failed to be successfully handled.
18 25
new file mode 100644
... ...
@@ -0,0 +1,166 @@
0
+package controller
1
+
2
+import (
3
+	"sync"
4
+
5
+	"github.com/golang/glog"
6
+
7
+	kutil "k8s.io/kubernetes/pkg/util"
8
+)
9
+
10
+type Scheduler struct {
11
+	handle   func(key, value interface{})
12
+	position int
13
+	limiter  kutil.RateLimiter
14
+
15
+	mu      sync.Mutex
16
+	buckets []bucket
17
+}
18
+
19
+type bucket map[interface{}]interface{}
20
+
21
+func NewScheduler(bucketCount int, bucketLimiter kutil.RateLimiter, fn func(key, value interface{})) *Scheduler {
22
+	// add one more bucket to serve as the "current" bucket
23
+	bucketCount++
24
+	buckets := make([]bucket, bucketCount)
25
+	for i := range buckets {
26
+		buckets[i] = make(bucket)
27
+	}
28
+	return &Scheduler{
29
+		handle:  fn,
30
+		buckets: buckets,
31
+		limiter: bucketLimiter,
32
+	}
33
+}
34
+
35
+func (s *Scheduler) RunUntil(ch <-chan struct{}) {
36
+	go kutil.Until(s.RunOnce, 0, ch)
37
+}
38
+
39
+func (s *Scheduler) RunOnce() {
40
+	key, value, last := s.next()
41
+	if last {
42
+		glog.V(5).Infof("DEBUG: scheduler: waiting for limit")
43
+		s.limiter.Accept()
44
+		return
45
+	}
46
+	glog.V(5).Infof("DEBUG: scheduler: handle %s", key)
47
+	s.handle(key, value)
48
+}
49
+
50
+func (s *Scheduler) at(inc int) int {
51
+	return (s.position + inc + len(s.buckets)) % len(s.buckets)
52
+}
53
+
54
+// next takes a key from the current bucket and places it in the last bucket
55
+func (s *Scheduler) next() (interface{}, interface{}, bool) {
56
+	s.mu.Lock()
57
+	defer s.mu.Unlock()
58
+	glog.V(5).Infof("DEBUG: scheduler: queue (%d):\n %#v", s.position, s.buckets)
59
+
60
+	last := s.buckets[s.position]
61
+	if len(last) == 0 {
62
+		s.position = s.at(1)
63
+		glog.V(5).Infof("DEBUG: scheduler: position: %d %d", s.position, len(s.buckets))
64
+		last = s.buckets[s.position]
65
+	}
66
+
67
+	for k, v := range last {
68
+		delete(last, k)
69
+		s.buckets[s.at(-1)][k] = v
70
+		return k, v, false
71
+	}
72
+	return nil, nil, true
73
+}
74
+
75
+// Add places the key in the bucket with the least entries (except the current bucket). The key is used to
76
+// determine uniqueness, while value can be used to associate additional data for later retrieval. An Add
77
+// removes the previous key and value and will place the item in a new bucket.
78
+func (s *Scheduler) Add(key, value interface{}) {
79
+	s.mu.Lock()
80
+	defer s.mu.Unlock()
81
+
82
+	for _, bucket := range s.buckets {
83
+		delete(bucket, key)
84
+	}
85
+
86
+	// pick the bucket with the least entries that is furthest from the current position
87
+	n := len(s.buckets)
88
+	base := s.position + n
89
+	target, least := 0, 0
90
+	for i := n - 1; i > 0; i-- {
91
+		position := (base + i) % n
92
+		size := len(s.buckets[position])
93
+		if size == 0 {
94
+			target = position
95
+			break
96
+		}
97
+		if size < least || least == 0 {
98
+			target = position
99
+			least = size
100
+		}
101
+	}
102
+	s.buckets[target][key] = value
103
+}
104
+
105
+// Remove takes the key out of all buckets. If value is non-nil, the key will only be removed if it has
106
+// the same value. Returns true if the key was removed.
107
+func (s *Scheduler) Remove(key, value interface{}) bool {
108
+	s.mu.Lock()
109
+	defer s.mu.Unlock()
110
+
111
+	match := true
112
+	for _, bucket := range s.buckets {
113
+		if value != nil {
114
+			if old, ok := bucket[key]; ok && old != value {
115
+				match = false
116
+				continue
117
+			}
118
+		}
119
+		delete(bucket, key)
120
+	}
121
+	return match
122
+}
123
+
124
+// Delay moves the key to the end of the chain if it exists.
125
+func (s *Scheduler) Delay(key interface{}) {
126
+	s.mu.Lock()
127
+	defer s.mu.Unlock()
128
+
129
+	last := s.at(-1)
130
+	for i, bucket := range s.buckets {
131
+		if i == last {
132
+			continue
133
+		}
134
+		if value, ok := bucket[key]; ok {
135
+			delete(bucket, key)
136
+			s.buckets[last][key] = value
137
+		}
138
+	}
139
+}
140
+
141
+// Len returns the number of scheduled items.
142
+func (s *Scheduler) Len() int {
143
+	s.mu.Lock()
144
+	defer s.mu.Unlock()
145
+
146
+	count := 0
147
+	for _, bucket := range s.buckets {
148
+		count += len(bucket)
149
+	}
150
+	return count
151
+}
152
+
153
+// Map returns a copy of the scheduler contents for testing purposes.
154
+func (s *Scheduler) Map() map[interface{}]interface{} {
155
+	s.mu.Lock()
156
+	defer s.mu.Unlock()
157
+
158
+	out := make(map[interface{}]interface{})
159
+	for _, bucket := range s.buckets {
160
+		for k, v := range bucket {
161
+			out[k] = v
162
+		}
163
+	}
164
+	return out
165
+}
0 166
new file mode 100644
... ...
@@ -0,0 +1,111 @@
0
+package controller
1
+
2
+import (
3
+	"reflect"
4
+	"testing"
5
+
6
+	kutil "k8s.io/kubernetes/pkg/util"
7
+)
8
+
9
+func TestScheduler(t *testing.T) {
10
+	keys := []string{}
11
+	s := NewScheduler(2, kutil.NewFakeRateLimiter(), func(key, value interface{}) {
12
+		keys = append(keys, key.(string))
13
+	})
14
+
15
+	for i := 0; i < 6; i++ {
16
+		s.RunOnce()
17
+		if len(keys) > 0 {
18
+			t.Fatal(keys)
19
+		}
20
+		if s.position != (i+1)%3 {
21
+			t.Fatal(s.position)
22
+		}
23
+	}
24
+
25
+	s.Add("first", "test")
26
+	found := false
27
+	for i, buckets := range s.buckets {
28
+		if _, ok := buckets["first"]; ok {
29
+			found = true
30
+		} else {
31
+			continue
32
+		}
33
+		if i == s.position {
34
+			t.Fatal("should not insert into current bucket")
35
+		}
36
+	}
37
+	if !found {
38
+		t.Fatal("expected to find key in a bucket")
39
+	}
40
+
41
+	for i := 0; i < 10; i++ {
42
+		s.Delay("first")
43
+		if _, ok := s.buckets[(s.position-1+len(s.buckets))%len(s.buckets)]["first"]; !ok {
44
+			t.Fatal("key was not in the last bucket")
45
+		}
46
+	}
47
+
48
+	s.RunOnce()
49
+	if len(keys) != 0 {
50
+		t.Fatal(keys)
51
+	}
52
+	s.RunOnce()
53
+	if !reflect.DeepEqual(keys, []string{"first"}) {
54
+		t.Fatal(keys)
55
+	}
56
+}
57
+
58
+func TestSchedulerAdd(t *testing.T) {
59
+	s := NewScheduler(3, kutil.NewFakeRateLimiter(), func(key, value interface{}) {})
60
+	s.Add("first", "other")
61
+	if s.buckets[3]["first"] != "other" {
62
+		t.Fatalf("placed key in wrong bucket: %#v", s.buckets)
63
+	}
64
+	s.Add("second", "other")
65
+	if s.buckets[2]["second"] != "other" {
66
+		t.Fatalf("placed key in wrong bucket: %#v", s.buckets)
67
+	}
68
+	s.Add("third", "other")
69
+	if s.buckets[1]["third"] != "other" {
70
+		t.Fatalf("placed key in wrong bucket: %#v", s.buckets)
71
+	}
72
+	s.Add("fourth", "other")
73
+	if s.buckets[3]["fourth"] != "other" {
74
+		t.Fatalf("placed key in wrong bucket: %#v", s.buckets)
75
+	}
76
+	s.Add("fifth", "other")
77
+	if s.buckets[2]["fifth"] != "other" {
78
+		t.Fatalf("placed key in wrong bucket: %#v", s.buckets)
79
+	}
80
+	s.Remove("third", "other")
81
+	s.Add("sixth", "other")
82
+	if s.buckets[1]["sixth"] != "other" {
83
+		t.Fatalf("placed key in wrong bucket: %#v", s.buckets)
84
+	}
85
+}
86
+
87
+func TestSchedulerRemove(t *testing.T) {
88
+	s := NewScheduler(2, kutil.NewFakeRateLimiter(), func(key, value interface{}) {})
89
+	s.Add("test", "other")
90
+	if s.Remove("test", "value") {
91
+		t.Fatal(s)
92
+	}
93
+	if !s.Remove("test", "other") {
94
+		t.Fatal(s)
95
+	}
96
+	if s.Len() != 0 {
97
+		t.Fatal(s)
98
+	}
99
+	s.Add("test", "other")
100
+	s.Add("test", "new")
101
+	if s.Len() != 1 {
102
+		t.Fatal(s)
103
+	}
104
+	if s.Remove("test", "other") {
105
+		t.Fatal(s)
106
+	}
107
+	if !s.Remove("test", "new") {
108
+		t.Fatal(s)
109
+	}
110
+}