Browse code

Merge pull request #10070 from dcbw/sdn-deltafifo

Merged by openshift-bot

OpenShift Bot authored on 2016/09/19 14:11:33
Showing 8 changed files
... ...
@@ -16,11 +16,10 @@ import (
16 16
 
17 17
 	kapi "k8s.io/kubernetes/pkg/api"
18 18
 	kapierrs "k8s.io/kubernetes/pkg/api/errors"
19
+	"k8s.io/kubernetes/pkg/client/cache"
19 20
 	kexec "k8s.io/kubernetes/pkg/util/exec"
20
-	utilruntime "k8s.io/kubernetes/pkg/util/runtime"
21 21
 	"k8s.io/kubernetes/pkg/util/sysctl"
22 22
 	utilwait "k8s.io/kubernetes/pkg/util/wait"
23
-	"k8s.io/kubernetes/pkg/watch"
24 23
 )
25 24
 
26 25
 const (
... ...
@@ -420,20 +419,12 @@ func (plugin *OsdnNode) SetupEgressNetworkPolicy() error {
420 420
 }
421 421
 
422 422
 func (plugin *OsdnNode) watchEgressNetworkPolicies() {
423
-	eventQueue := plugin.registry.RunEventQueue(EgressNetworkPolicies)
424
-
425
-	for {
426
-		eventType, obj, err := eventQueue.Pop()
427
-		if err != nil {
428
-			utilruntime.HandleError(fmt.Errorf("EventQueue failed for EgressNetworkPolicy: %v", err))
429
-			return
430
-		}
431
-		policy := obj.(*osapi.EgressNetworkPolicy)
423
+	plugin.registry.RunEventQueue(EgressNetworkPolicies, func(delta cache.Delta) error {
424
+		policy := delta.Object.(*osapi.EgressNetworkPolicy)
432 425
 
433 426
 		vnid, err := plugin.vnids.GetVNID(policy.Namespace)
434 427
 		if err != nil {
435
-			glog.Warningf("Could not find netid for namespace %q: %v", policy.Namespace, err)
436
-			continue
428
+			return fmt.Errorf("could not find netid for namespace %q: %v", policy.Namespace, err)
437 429
 		}
438 430
 
439 431
 		policies := plugin.egressPolicies[vnid]
... ...
@@ -443,17 +434,17 @@ func (plugin *OsdnNode) watchEgressNetworkPolicies() {
443 443
 				break
444 444
 			}
445 445
 		}
446
-		if eventType != watch.Deleted && len(policy.Spec.Egress) > 0 {
446
+		if delta.Type != cache.Deleted && len(policy.Spec.Egress) > 0 {
447 447
 			policies = append(policies, policy)
448 448
 		}
449 449
 		plugin.egressPolicies[vnid] = policies
450 450
 
451 451
 		err = plugin.updateEgressNetworkPolicy(vnid)
452 452
 		if err != nil {
453
-			utilruntime.HandleError(err)
454
-			return
453
+			return err
455 454
 		}
456
-	}
455
+		return nil
456
+	})
457 457
 }
458 458
 
459 459
 func (plugin *OsdnNode) UpdateEgressNetworkPolicyVNID(namespace string, oldVnid, newVnid uint32) error {
460 460
new file mode 100644
... ...
@@ -0,0 +1,220 @@
0
+package plugin
1
+
2
+import (
3
+	"fmt"
4
+
5
+	"k8s.io/kubernetes/pkg/client/cache"
6
+	utilruntime "k8s.io/kubernetes/pkg/util/runtime"
7
+)
8
+
9
+// EventQueue is an enhanced DeltaFIFO that provides reliable Deleted deltas
10
+// even if no knownObjects store is given, and compresses multiple deltas
11
+// to reduce duplicate events.
12
+//
13
+// Without a store, DeltaFIFO will drop Deleted deltas when its queue is empty
14
+// because the deleted object is not present in the queue and DeltaFIFO tries
15
+// to protect against duplicate Deleted deltas resulting from Replace().
16
+//
17
+// To get reliable deletion, a store must be provided, and EventQueue provides
18
+// one if the caller does not.
19
+type EventQueue struct {
20
+	*cache.DeltaFIFO
21
+
22
+	// Private store if not intitialized with one to ensure deletion
23
+	// events are always recognized.
24
+	knownObjects cache.Store
25
+}
26
+
27
+func NewEventQueue(keyFunc cache.KeyFunc) *EventQueue {
28
+	knownObjects := cache.NewStore(keyFunc)
29
+	return &EventQueue{
30
+		DeltaFIFO: cache.NewDeltaFIFO(
31
+			keyFunc,
32
+			cache.DeltaCompressorFunc(func(d cache.Deltas) cache.Deltas {
33
+				return deltaCompressor(d, keyFunc)
34
+			}),
35
+			knownObjects),
36
+		knownObjects: knownObjects,
37
+	}
38
+}
39
+
40
+func NewEventQueueForStore(keyFunc cache.KeyFunc, knownObjects cache.KeyListerGetter) *EventQueue {
41
+	return &EventQueue{
42
+		DeltaFIFO: cache.NewDeltaFIFO(
43
+			keyFunc,
44
+			cache.DeltaCompressorFunc(func(d cache.Deltas) cache.Deltas {
45
+				return deltaCompressor(d, keyFunc)
46
+			}),
47
+			knownObjects),
48
+	}
49
+}
50
+
51
+func (queue *EventQueue) updateKnownObjects(delta cache.Delta) {
52
+	switch delta.Type {
53
+	case cache.Added:
54
+		queue.knownObjects.Add(delta.Object)
55
+	case cache.Updated:
56
+		queue.knownObjects.Update(delta.Object)
57
+	case cache.Sync:
58
+		if _, ok, _ := queue.knownObjects.Get(delta.Object); ok {
59
+			queue.knownObjects.Update(delta.Object)
60
+		} else {
61
+			queue.knownObjects.Add(delta.Object)
62
+		}
63
+	case cache.Deleted:
64
+		queue.knownObjects.Delete(delta.Object)
65
+	}
66
+}
67
+
68
+type ProcessEventFunc func(delta cache.Delta) error
69
+
70
+// Process queued changes for an object.  The 'process' function is called
71
+// repeatedly with each available cache.Delta that describes state changes
72
+// for that object. If the process function returns an error queued changes
73
+// for that object are dropped but processing continues with the next available
74
+// object's cache.Deltas.  The error is logged with call stack information.
75
+func (queue *EventQueue) Pop(process ProcessEventFunc) (interface{}, error) {
76
+	return queue.DeltaFIFO.Pop(func(obj interface{}) error {
77
+		// Oldest to newest delta lists
78
+		for _, delta := range obj.(cache.Deltas) {
79
+			// Update private store to track object deletion
80
+			if queue.knownObjects != nil {
81
+				queue.updateKnownObjects(delta)
82
+			}
83
+
84
+			// Process one delta for the object
85
+			if err := process(delta); err != nil {
86
+				utilruntime.HandleError(fmt.Errorf("event processing failed: %v", err))
87
+				return nil
88
+			}
89
+		}
90
+		return nil
91
+	})
92
+}
93
+
94
+// Describes the action to take for a given combination of deltas
95
+type actionType string
96
+
97
+const (
98
+	// The delta combination should result in the delta being added to the compressor cache
99
+	actionAdd actionType = "ADD"
100
+
101
+	// The delta combination should should be compressed into a single delta
102
+	actionCompress actionType = "COMPRESS"
103
+
104
+	// The delta combination should result in the object being deleted from the compressor cache
105
+	actionDelete actionType = "DELETE"
106
+)
107
+
108
+type deltaAction struct {
109
+	// The action to take for the delta combination
110
+	action actionType
111
+	// The type for the new compressed delta
112
+	deltaType cache.DeltaType
113
+}
114
+
115
+// The delta combination action matrix defines the valid delta sequences and
116
+// how to compress specific combinations of deltas.
117
+//
118
+// A delta combination that produces an invalid sequence results in a panic.
119
+var deltaActionMatrix = map[cache.DeltaType]map[cache.DeltaType]deltaAction{
120
+	cache.Added: {
121
+		cache.Sync:    {actionCompress, cache.Added},
122
+		cache.Updated: {actionCompress, cache.Added},
123
+		cache.Deleted: {actionDelete, cache.Deleted},
124
+	},
125
+	cache.Sync: {
126
+		cache.Sync:    {actionCompress, cache.Sync},
127
+		cache.Updated: {actionCompress, cache.Sync},
128
+		cache.Deleted: {actionCompress, cache.Deleted},
129
+	},
130
+	cache.Updated: {
131
+		cache.Updated: {actionCompress, cache.Updated},
132
+		cache.Deleted: {actionCompress, cache.Deleted},
133
+	},
134
+	cache.Deleted: {
135
+		cache.Added: {actionCompress, cache.Updated},
136
+		cache.Sync:  {actionCompress, cache.Sync},
137
+	},
138
+}
139
+
140
+func removeDeltasWithKey(deltas cache.Deltas, removeKey string, keyFunc cache.KeyFunc) cache.Deltas {
141
+	newDeltas := cache.Deltas{}
142
+	for _, d := range deltas {
143
+		key, err := keyFunc(d.Object)
144
+		if err == nil && key != removeKey {
145
+			newDeltas = append(newDeltas, d)
146
+		}
147
+	}
148
+	return newDeltas
149
+}
150
+
151
+// This DeltaFIFO compressor combines deltas for the same object, the exact
152
+// compression semantics of which are as follows:
153
+//
154
+// 1.  If a cache.Added/cache.Sync is enqueued with state X and a cache.Updated with state Y
155
+//     is received, these are compressed into (Added/Sync, Y)
156
+//
157
+// 2.  If a cache.Added is enqueued with state X and a cache.Deleted is received with state Y,
158
+//     these are dropped and consumers will not see either event
159
+//
160
+// 3.  If a cache.Sync/cache.Updated is enqueued with state X and a cache.Deleted
161
+//     is received with state Y, these are compressed into (Deleted, Y)
162
+//
163
+// 4.  If a cache.Updated is enqueued with state X and a cache.Updated with state Y is received,
164
+//     these two events are compressed into (Updated, Y)
165
+//
166
+// 5.  If a cache.Added is enqueued with state X and a cache.Sync with state Y is received,
167
+//     these are compressed into (Added, Y)
168
+//
169
+// 6.  If a cache.Sync is enqueued with state X and a cache.Sync with state Y is received,
170
+//     these are compressed into (Sync, Y)
171
+//
172
+// 7.  Invalid combinations (eg, Sync + Added or Updated + Added) result in a panic.
173
+//
174
+// This function will compress all events for the same object into a single delta.
175
+func deltaCompressor(deltas cache.Deltas, keyFunc cache.KeyFunc) cache.Deltas {
176
+	// Final compressed deltas list
177
+	newDeltas := cache.Deltas{}
178
+
179
+	// Cache of object's current state including previous deltas
180
+	objects := make(map[string]cache.DeltaType)
181
+
182
+	// Deltas range from oldest (index 0) to newest (last index)
183
+	for _, d := range deltas {
184
+		key, err := keyFunc(d.Object)
185
+		if err != nil {
186
+			panic(fmt.Sprintf("unkeyable object: %v, %v", d.Object, err))
187
+		}
188
+
189
+		var compressAction deltaAction
190
+		if oldType, ok := objects[key]; !ok {
191
+			compressAction = deltaAction{actionAdd, d.Type}
192
+		} else {
193
+			// Older event exists; combine them
194
+			compressAction, ok = deltaActionMatrix[oldType][d.Type]
195
+			if !ok {
196
+				panic(fmt.Sprintf("invalid state transition: %v -> %v", oldType, d.Type))
197
+			}
198
+		}
199
+
200
+		switch compressAction.action {
201
+		case actionAdd:
202
+			newDeltas = append(newDeltas, d)
203
+			objects[key] = d.Type
204
+		case actionCompress:
205
+			newDelta := cache.Delta{
206
+				Type:   compressAction.deltaType,
207
+				Object: d.Object,
208
+			}
209
+			objects[key] = newDelta.Type
210
+			newDeltas = removeDeltasWithKey(newDeltas, key, keyFunc)
211
+			newDeltas = append(newDeltas, newDelta)
212
+		case actionDelete:
213
+			delete(objects, key)
214
+			newDeltas = removeDeltasWithKey(newDeltas, key, keyFunc)
215
+		}
216
+	}
217
+
218
+	return newDeltas
219
+}
0 220
new file mode 100644
... ...
@@ -0,0 +1,322 @@
0
+package plugin
1
+
2
+import (
3
+	"fmt"
4
+	"strings"
5
+	"testing"
6
+
7
+	"k8s.io/kubernetes/pkg/client/cache"
8
+)
9
+
10
+func testKeyFunc(obj interface{}) (string, error) {
11
+	if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
12
+		return d.Key, nil
13
+	}
14
+	key, ok := obj.(string)
15
+	if !ok {
16
+		return "", cache.KeyError{Obj: obj, Err: fmt.Errorf("object not a string")}
17
+	}
18
+	return key, nil
19
+}
20
+
21
+type initialDelta struct {
22
+	deltaType cache.DeltaType
23
+	object    interface{}
24
+	// knownObjects should be given for Sync DeltaTypes
25
+	knownObjects []interface{}
26
+}
27
+
28
+type eventQueueTest struct {
29
+	initial      []initialDelta
30
+	compressed   []cache.Delta
31
+	knownObjects []interface{}
32
+	expectPanic  bool
33
+}
34
+
35
+func testDesc(test eventQueueTest) string {
36
+	var start, result []string
37
+	for _, delta := range test.initial {
38
+		start = append(start, string(delta.deltaType))
39
+	}
40
+	for _, delta := range test.compressed {
41
+		result = append(result, string(delta.Type))
42
+	}
43
+	return strings.Join(start, "+") + "=" + strings.Join(result, "+")
44
+}
45
+
46
+// Returns false on success, true on panic
47
+func addInitialDeltas(queue *EventQueue, deltas []initialDelta) (paniced bool, msg string) {
48
+	defer func() {
49
+		if r := recover(); r != nil {
50
+			paniced = true
51
+			msg = fmt.Sprintf("%#v", r)
52
+		}
53
+	}()
54
+
55
+	for _, initial := range deltas {
56
+		switch initial.deltaType {
57
+		case cache.Added:
58
+			queue.Add(initial.object)
59
+		case cache.Updated:
60
+			queue.Update(initial.object)
61
+		case cache.Deleted:
62
+			queue.Delete(initial.object)
63
+		case cache.Sync:
64
+			// knownObjects should be valid for Sync operations
65
+			queue.Replace(initial.knownObjects, "123")
66
+		}
67
+	}
68
+	return
69
+}
70
+
71
+func TestEventQueueCompress(t *testing.T) {
72
+	tests := []eventQueueTest{
73
+		// 1.  If a cache.Added/cache.Sync is enqueued with state X and a cache.Updated with state Y
74
+		//     is received, these are compressed into (Added/Sync, Y)
75
+		{
76
+			initial: []initialDelta{
77
+				{deltaType: cache.Added, object: "obj1"},
78
+				{deltaType: cache.Updated, object: "obj1"},
79
+				{deltaType: cache.Updated, object: "obj1"},
80
+			},
81
+			compressed: []cache.Delta{
82
+				{Type: cache.Added, Object: "obj1"},
83
+			},
84
+		},
85
+
86
+		// 1.  If a cache.Added/cache.Sync is enqueued with state X and a cache.Updated with state Y
87
+		//     is received, these are compressed into (Added/Sync, Y)
88
+		{
89
+			initial: []initialDelta{
90
+				{deltaType: cache.Added, object: "obj1"},
91
+				// test that a second object doesn't affect compression of the first
92
+				{deltaType: cache.Added, object: "obj2"},
93
+				{deltaType: cache.Updated, object: "obj2"},
94
+				{deltaType: cache.Updated, object: "obj1"},
95
+				{deltaType: cache.Updated, object: "obj1"},
96
+			},
97
+			compressed: []cache.Delta{
98
+				{Type: cache.Added, Object: "obj1"},
99
+			},
100
+		},
101
+
102
+		// 1.  If a cache.Added/cache.Sync is enqueued with state X and a cache.Updated with state Y
103
+		//     is received, these are compressed into (Added/Sync, Y)
104
+		{
105
+			initial: []initialDelta{
106
+				{deltaType: cache.Sync, object: "obj1", knownObjects: []interface{}{"obj1"}},
107
+				{deltaType: cache.Updated, object: "obj1"},
108
+				{deltaType: cache.Updated, object: "obj1"},
109
+			},
110
+			compressed: []cache.Delta{
111
+				{Type: cache.Sync, Object: "obj1"},
112
+			},
113
+		},
114
+
115
+		// 1.  If a cache.Added/cache.Sync is enqueued with state X and a cache.Updated with state Y
116
+		//     is received, these are compressed into (Added/Sync, Y)
117
+		{
118
+			initial: []initialDelta{
119
+				{deltaType: cache.Sync, object: "obj1", knownObjects: []interface{}{"obj1"}},
120
+				{deltaType: cache.Updated, object: "obj1"},
121
+				// test that a second object doesn't affect compression of the first
122
+				{deltaType: cache.Added, object: "obj2"},
123
+				{deltaType: cache.Updated, object: "obj2"},
124
+				{deltaType: cache.Updated, object: "obj1"},
125
+			},
126
+			compressed: []cache.Delta{
127
+				{Type: cache.Sync, Object: "obj1"},
128
+			},
129
+		},
130
+
131
+		// 2.  If a cache.Added is enqueued with state X and a cache.Deleted is received with state Y,
132
+		//     these are dropped and consumers will not see either event
133
+		{
134
+			initial: []initialDelta{
135
+				{deltaType: cache.Added, object: "obj1"},
136
+				// test that a second object doesn't affect compression of the first
137
+				{deltaType: cache.Added, object: "obj2"},
138
+				{deltaType: cache.Deleted, object: "obj2"},
139
+				{deltaType: cache.Deleted, object: "obj1"},
140
+			},
141
+			compressed: []cache.Delta{},
142
+		},
143
+
144
+		// 3.  If a cache.Sync/cache.Updated is enqueued with state X and a cache.Deleted
145
+		//     is received with state Y, these are compressed into (Deleted, Y)
146
+		{
147
+			initial: []initialDelta{
148
+				{deltaType: cache.Sync, object: "obj1", knownObjects: []interface{}{"obj1"}},
149
+				// test that a second object doesn't affect compression of the first
150
+				{deltaType: cache.Sync, object: "obj2", knownObjects: []interface{}{"obj1", "obj2"}},
151
+				{deltaType: cache.Updated, object: "obj2"},
152
+				{deltaType: cache.Deleted, object: "obj1"},
153
+			},
154
+			compressed: []cache.Delta{
155
+				{Type: cache.Deleted, Object: "obj1"},
156
+			},
157
+		},
158
+
159
+		// 3.  If a cache.Sync/cache.Updated is enqueued with state X and a cache.Deleted
160
+		//     is received with state Y, these are compressed into (Deleted, Y)
161
+		{
162
+			initial: []initialDelta{
163
+				{deltaType: cache.Updated, object: "obj1"},
164
+				{deltaType: cache.Deleted, object: "obj1"},
165
+			},
166
+			compressed: []cache.Delta{
167
+				{Type: cache.Deleted, Object: "obj1"},
168
+			},
169
+		},
170
+
171
+		// 4.  If a cache.Updated is enqueued with state X and a cache.Updated with state Y is received,
172
+		//     these two events are compressed into (Updated, Y)
173
+		{
174
+			initial: []initialDelta{
175
+				{deltaType: cache.Updated, object: "obj1"},
176
+				{deltaType: cache.Updated, object: "obj1"},
177
+			},
178
+			compressed: []cache.Delta{
179
+				{Type: cache.Updated, Object: "obj1"},
180
+			},
181
+		},
182
+
183
+		// 5.  If a cache.Added is enqueued with state X and a cache.Sync with state Y is received,
184
+		//     these are compressed into (Added, Y)
185
+		{
186
+			initial: []initialDelta{
187
+				{deltaType: cache.Added, object: "obj1"},
188
+				{deltaType: cache.Sync, object: "obj1", knownObjects: []interface{}{"obj1"}},
189
+			},
190
+			compressed: []cache.Delta{
191
+				{Type: cache.Added, Object: "obj1"},
192
+			},
193
+		},
194
+
195
+		// 6.  If a cache.Sync is enqueued with state X and a cache.Sync with state Y is received,
196
+		//     these are compressed into (Sync, Y)
197
+		{
198
+			initial: []initialDelta{
199
+				{deltaType: cache.Sync, object: "obj1", knownObjects: []interface{}{"obj1"}},
200
+				{deltaType: cache.Sync, object: "obj1", knownObjects: []interface{}{"obj1"}},
201
+			},
202
+			compressed: []cache.Delta{
203
+				{Type: cache.Sync, Object: "obj1"},
204
+			},
205
+		},
206
+
207
+		// 7.  Invalid combinations (eg, Sync + Added or Updated + Added) result in a panic.
208
+		{
209
+			initial: []initialDelta{
210
+				{deltaType: cache.Sync, object: "obj1", knownObjects: []interface{}{"obj1"}},
211
+				{deltaType: cache.Added, object: "obj1"},
212
+			},
213
+			compressed:  []cache.Delta{},
214
+			expectPanic: true,
215
+		},
216
+
217
+		// 7.  Invalid combinations (eg, Sync + Added or Updated + Added) result in a panic.
218
+		{
219
+			initial: []initialDelta{
220
+				{deltaType: cache.Updated, object: "obj1"},
221
+				{deltaType: cache.Added, object: "obj1"},
222
+			},
223
+			compressed:  []cache.Delta{},
224
+			expectPanic: true,
225
+		},
226
+	}
227
+
228
+	for _, test := range tests {
229
+		queue := NewEventQueue(testKeyFunc)
230
+
231
+		paniced, msg := addInitialDeltas(queue, test.initial)
232
+		if paniced != test.expectPanic {
233
+			t.Fatalf("(%s) unexpected panic result %v (expected %v): %v", testDesc(test), paniced, test.expectPanic, msg)
234
+		}
235
+		if test.expectPanic {
236
+			continue
237
+		}
238
+
239
+		items, ok, err := queue.Get("obj1")
240
+		if err != nil {
241
+			t.Fatalf("(%s) error getting expected object: %v", testDesc(test), err)
242
+		}
243
+		if len(test.compressed) > 0 {
244
+			if !ok {
245
+				t.Fatalf("(%s) expected object doesn't exist", testDesc(test))
246
+			}
247
+			compressedDeltas := items.(cache.Deltas)
248
+			if len(compressedDeltas) != len(test.compressed) {
249
+				t.Fatalf("(%s) wrong number of compressed deltas (got %d, expected %d)", testDesc(test), len(compressedDeltas), len(test.compressed))
250
+			}
251
+			for j, expected := range test.compressed {
252
+				have := compressedDeltas[j]
253
+				if expected.Type != have.Type {
254
+					t.Fatalf("(%s) wrong delta type (got %s, expected %s)", testDesc(test), have.Type, expected.Type)
255
+				}
256
+				if expected.Object.(string) != have.Object.(string) {
257
+					t.Fatalf("(%s) wrong delta object key (got %s, expected %s)", testDesc(test), have.Object.(string), expected.Object.(string))
258
+				}
259
+			}
260
+		} else if ok {
261
+			t.Fatalf("(%s) unexpected object", testDesc(test))
262
+		}
263
+	}
264
+}
265
+
266
+// Test that single events are passed through uncompressed
267
+func TestEventQueueUncompressed(t *testing.T) {
268
+	obj := "obj1"
269
+
270
+	for _, dtype := range []cache.DeltaType{cache.Added, cache.Updated, cache.Deleted, cache.Sync} {
271
+		queue := NewEventQueue(testKeyFunc)
272
+
273
+		// Deleted requires the object to already be in the known objects
274
+		// list, and we must pop that cache.Added off before testing
275
+		// to ensure the Deleted delta comes through even when the queue
276
+		// is empty.
277
+		if dtype == cache.Deleted {
278
+			queue.Add(obj)
279
+			items, err := queue.Pop(func(delta cache.Delta) error {
280
+				return nil
281
+			})
282
+			if err != nil {
283
+				t.Fatalf("(%s) unexpected error popping initial Added delta: %v", dtype, err)
284
+			}
285
+			deltas := items.(cache.Deltas)
286
+			if len(deltas) != 1 {
287
+				t.Fatalf("(%s) expected 1 delta popping initial Added, got %d", dtype, len(deltas))
288
+			}
289
+			if deltas[0].Type != cache.Added {
290
+				t.Fatalf("(%s) expected initial Added delta, got %v", dtype, deltas[0].Type)
291
+			}
292
+		}
293
+
294
+		// Now add the real delta type under test
295
+		switch dtype {
296
+		case cache.Added:
297
+			queue.Add(obj)
298
+		case cache.Updated:
299
+			queue.Update(obj)
300
+		case cache.Deleted:
301
+			queue.Delete(obj)
302
+		case cache.Sync:
303
+			queue.Replace([]interface{}{obj}, "123")
304
+		}
305
+
306
+		// And pop the expected item out of the queue
307
+		items, err := queue.Pop(func(delta cache.Delta) error {
308
+			return nil
309
+		})
310
+		if err != nil {
311
+			t.Fatalf("(%s) unexpected error popping delta: %v", dtype, err)
312
+		}
313
+		deltas := items.(cache.Deltas)
314
+		if len(deltas) != 1 {
315
+			t.Fatalf("(%s) expected 1 delta popping delta, got %d", dtype, len(deltas))
316
+		}
317
+		if deltas[0].Type != dtype {
318
+			t.Fatalf("(%s) expected same delta, got %v", dtype, deltas[0].Type)
319
+		}
320
+	}
321
+}
... ...
@@ -13,11 +13,10 @@ import (
13 13
 
14 14
 	kapi "k8s.io/kubernetes/pkg/api"
15 15
 	kapierrs "k8s.io/kubernetes/pkg/api/errors"
16
+	"k8s.io/kubernetes/pkg/client/cache"
16 17
 	kclient "k8s.io/kubernetes/pkg/client/unversioned"
17 18
 	pconfig "k8s.io/kubernetes/pkg/proxy/config"
18
-	utilruntime "k8s.io/kubernetes/pkg/util/runtime"
19 19
 	utilwait "k8s.io/kubernetes/pkg/util/wait"
20
-	"k8s.io/kubernetes/pkg/watch"
21 20
 )
22 21
 
23 22
 type proxyFirewallItem struct {
... ...
@@ -68,16 +67,9 @@ func (proxy *ovsProxyPlugin) Start(baseHandler pconfig.EndpointsConfigHandler) e
68 68
 }
69 69
 
70 70
 func (proxy *ovsProxyPlugin) watchEgressNetworkPolicies() {
71
-	eventQueue := proxy.registry.RunEventQueue(EgressNetworkPolicies)
72
-
73
-	for {
74
-		eventType, obj, err := eventQueue.Pop()
75
-		if err != nil {
76
-			utilruntime.HandleError(fmt.Errorf("EventQueue failed for EgressNetworkPolicy: %v", err))
77
-			return
78
-		}
79
-		policy := obj.(*osapi.EgressNetworkPolicy)
80
-		if eventType == watch.Deleted {
71
+	proxy.registry.RunEventQueue(EgressNetworkPolicies, func(delta cache.Delta) error {
72
+		policy := delta.Object.(*osapi.EgressNetworkPolicy)
73
+		if delta.Type == cache.Deleted {
81 74
 			policy.Spec.Egress = nil
82 75
 		}
83 76
 
... ...
@@ -89,7 +81,8 @@ func (proxy *ovsProxyPlugin) watchEgressNetworkPolicies() {
89 89
 				proxy.updateEndpoints()
90 90
 			}
91 91
 		}()
92
-	}
92
+		return nil
93
+	})
93 94
 }
94 95
 
95 96
 func (proxy *ovsProxyPlugin) updateNetworkPolicy(policy osapi.EgressNetworkPolicy) {
... ...
@@ -16,7 +16,6 @@ import (
16 16
 	"k8s.io/kubernetes/pkg/labels"
17 17
 
18 18
 	osclient "github.com/openshift/origin/pkg/client"
19
-	oscache "github.com/openshift/origin/pkg/client/cache"
20 19
 	osapi "github.com/openshift/origin/pkg/sdn/api"
21 20
 )
22 21
 
... ...
@@ -263,8 +262,12 @@ func (registry *Registry) GetEgressNetworkPolicies() ([]osapi.EgressNetworkPolic
263 263
 	return policyList.Items, nil
264 264
 }
265 265
 
266
-// Run event queue for the given resource
267
-func (registry *Registry) RunEventQueue(resourceName ResourceName) *oscache.EventQueue {
266
+// Run event queue for the given resource. The 'process' function is called
267
+// repeatedly with each available cache.Delta that describes state changes
268
+// to an object. If the process function returns an error queued changes
269
+// for that object are dropped but processing continues with the next available
270
+// object's cache.Deltas.  The error is logged with call stack information.
271
+func (registry *Registry) RunEventQueue(resourceName ResourceName, process ProcessEventFunc) {
268 272
 	var client cache.Getter
269 273
 	var expectedType interface{}
270 274
 
... ...
@@ -295,11 +298,15 @@ func (registry *Registry) RunEventQueue(resourceName ResourceName) *oscache.Even
295 295
 	}
296 296
 
297 297
 	lw := cache.NewListWatchFromClient(client, strings.ToLower(string(resourceName)), kapi.NamespaceAll, fields.Everything())
298
-	eventQueue := oscache.NewEventQueue(cache.MetaNamespaceKeyFunc)
298
+	eventQueue := NewEventQueue(cache.MetaNamespaceKeyFunc)
299 299
 	// Repopulate event queue every 30 mins
300 300
 	// Existing items in the event queue will have watch.Modified event type
301 301
 	cache.NewReflector(lw, expectedType, eventQueue, 30*time.Minute).Run()
302
-	return eventQueue
302
+
303
+	// Run the queue
304
+	for {
305
+		eventQueue.Pop(process)
306
+	}
303 307
 }
304 308
 
305 309
 func (registry *Registry) ValidateNodeIP(nodeIP string) error {
... ...
@@ -3,18 +3,17 @@ package plugin
3 3
 import (
4 4
 	"fmt"
5 5
 	"net"
6
-	"strings"
7 6
 	"time"
8 7
 
9 8
 	log "github.com/golang/glog"
10 9
 
11 10
 	kapi "k8s.io/kubernetes/pkg/api"
12 11
 	kapiunversioned "k8s.io/kubernetes/pkg/api/unversioned"
12
+	"k8s.io/kubernetes/pkg/client/cache"
13 13
 	kclient "k8s.io/kubernetes/pkg/client/unversioned"
14 14
 	"k8s.io/kubernetes/pkg/types"
15 15
 	utilruntime "k8s.io/kubernetes/pkg/util/runtime"
16 16
 	utilwait "k8s.io/kubernetes/pkg/util/wait"
17
-	"k8s.io/kubernetes/pkg/watch"
18 17
 
19 18
 	osapi "github.com/openshift/origin/pkg/sdn/api"
20 19
 	"github.com/openshift/origin/pkg/util/netutils"
... ...
@@ -152,51 +151,43 @@ func (master *OsdnMaster) clearInitialNodeNetworkUnavailableCondition(node *kapi
152 152
 }
153 153
 
154 154
 func (master *OsdnMaster) watchNodes() {
155
-	eventQueue := master.registry.RunEventQueue(Nodes)
156 155
 	nodeAddressMap := map[types.UID]string{}
157
-
158
-	for {
159
-		eventType, obj, err := eventQueue.Pop()
160
-		if err != nil {
161
-			utilruntime.HandleError(fmt.Errorf("EventQueue failed for nodes: %v", err))
162
-			return
163
-		}
164
-		node := obj.(*kapi.Node)
156
+	master.registry.RunEventQueue(Nodes, func(delta cache.Delta) error {
157
+		node := delta.Object.(*kapi.Node)
165 158
 		name := node.ObjectMeta.Name
166 159
 		uid := node.ObjectMeta.UID
167 160
 
168 161
 		nodeIP, err := getNodeIP(node)
169 162
 		if err != nil {
170
-			log.Errorf("Failed to get node IP for %s, skipping event: %v, node: %v", name, eventType, node)
171
-			continue
163
+			return fmt.Errorf("failed to get node IP for %s, skipping event: %v, node: %v", name, delta.Type, node)
172 164
 		}
173 165
 
174
-		switch eventType {
175
-		case watch.Added, watch.Modified:
166
+		switch delta.Type {
167
+		case cache.Sync, cache.Added, cache.Updated:
176 168
 			master.clearInitialNodeNetworkUnavailableCondition(node)
177 169
 
178 170
 			if oldNodeIP, ok := nodeAddressMap[uid]; ok && (oldNodeIP == nodeIP) {
179
-				continue
171
+				break
180 172
 			}
181 173
 			// Node status is frequently updated by kubelet, so log only if the above condition is not met
182
-			log.V(5).Infof("Watch %s event for Node %q", strings.Title(string(eventType)), name)
174
+			log.V(5).Infof("Watch %s event for Node %q", delta.Type, name)
183 175
 
184 176
 			err = master.addNode(name, nodeIP)
185 177
 			if err != nil {
186
-				log.Errorf("Error creating subnet for node %s, ip %s: %v", name, nodeIP, err)
187
-				continue
178
+				return fmt.Errorf("error creating subnet for node %s, ip %s: %v", name, nodeIP, err)
188 179
 			}
189 180
 			nodeAddressMap[uid] = nodeIP
190
-		case watch.Deleted:
191
-			log.V(5).Infof("Watch %s event for Node %q", strings.Title(string(eventType)), name)
181
+		case cache.Deleted:
182
+			log.V(5).Infof("Watch %s event for Node %q", delta.Type, name)
192 183
 			delete(nodeAddressMap, uid)
193 184
 
194 185
 			err = master.deleteNode(name)
195 186
 			if err != nil {
196
-				log.Errorf("Error deleting node %s: %v", name, err)
187
+				return fmt.Errorf("Error deleting node %s: %v", name, err)
197 188
 			}
198 189
 		}
199
-	}
190
+		return nil
191
+	})
200 192
 }
201 193
 
202 194
 func (node *OsdnNode) SubnetStartNode(mtu uint32) (bool, error) {
... ...
@@ -252,50 +243,41 @@ func (node *OsdnNode) initSelfSubnet() error {
252 252
 // Only run on the nodes
253 253
 func (node *OsdnNode) watchSubnets() {
254 254
 	subnets := make(map[string]*osapi.HostSubnet)
255
-	eventQueue := node.registry.RunEventQueue(HostSubnets)
256
-
257
-	for {
258
-		eventType, obj, err := eventQueue.Pop()
259
-		if err != nil {
260
-			utilruntime.HandleError(fmt.Errorf("EventQueue failed for subnets: %v", err))
261
-			return
262
-		}
263
-		hs := obj.(*osapi.HostSubnet)
264
-
255
+	node.registry.RunEventQueue(HostSubnets, func(delta cache.Delta) error {
256
+		hs := delta.Object.(*osapi.HostSubnet)
265 257
 		if hs.HostIP == node.localIP {
266
-			continue
258
+			return nil
267 259
 		}
268 260
 
269
-		log.V(5).Infof("Watch %s event for HostSubnet %q", strings.Title(string(eventType)), hs.ObjectMeta.Name)
270
-		switch eventType {
271
-		case watch.Added, watch.Modified:
261
+		log.V(5).Infof("Watch %s event for HostSubnet %q", delta.Type, hs.ObjectMeta.Name)
262
+		switch delta.Type {
263
+		case cache.Sync, cache.Added, cache.Updated:
272 264
 			oldSubnet, exists := subnets[string(hs.UID)]
273 265
 			if exists {
274 266
 				if oldSubnet.HostIP == hs.HostIP {
275
-					continue
267
+					break
276 268
 				} else {
277 269
 					// Delete old subnet rules
278
-					if err = node.DeleteHostSubnetRules(oldSubnet); err != nil {
279
-						log.Error(err)
270
+					if err := node.DeleteHostSubnetRules(oldSubnet); err != nil {
271
+						return err
280 272
 					}
281 273
 				}
282 274
 			}
283
-			if err = node.registry.ValidateNodeIP(hs.HostIP); err != nil {
284
-				log.Errorf("Ignoring invalid subnet for node %s: %v", hs.HostIP, err)
285
-				continue
275
+			if err := node.registry.ValidateNodeIP(hs.HostIP); err != nil {
276
+				log.Warningf("Ignoring invalid subnet for node %s: %v", hs.HostIP, err)
277
+				break
286 278
 			}
287 279
 
288
-			if err = node.AddHostSubnetRules(hs); err != nil {
289
-				log.Error(err)
290
-				continue
280
+			if err := node.AddHostSubnetRules(hs); err != nil {
281
+				return err
291 282
 			}
292 283
 			subnets[string(hs.UID)] = hs
293
-		case watch.Deleted:
284
+		case cache.Deleted:
294 285
 			delete(subnets, string(hs.UID))
295
-
296
-			if err = node.DeleteHostSubnetRules(hs); err != nil {
297
-				log.Error(err)
286
+			if err := node.DeleteHostSubnetRules(hs); err != nil {
287
+				return err
298 288
 			}
299 289
 		}
300
-	}
290
+		return nil
291
+	})
301 292
 }
... ...
@@ -2,16 +2,14 @@ package plugin
2 2
 
3 3
 import (
4 4
 	"fmt"
5
-	"strings"
6 5
 	"sync"
7 6
 
8 7
 	log "github.com/golang/glog"
9 8
 
10 9
 	kapi "k8s.io/kubernetes/pkg/api"
11
-	utilruntime "k8s.io/kubernetes/pkg/util/runtime"
10
+	"k8s.io/kubernetes/pkg/client/cache"
12 11
 	"k8s.io/kubernetes/pkg/util/sets"
13 12
 	utilwait "k8s.io/kubernetes/pkg/util/wait"
14
-	"k8s.io/kubernetes/pkg/watch"
15 13
 
16 14
 	osapi "github.com/openshift/origin/pkg/sdn/api"
17 15
 	pnetid "github.com/openshift/origin/pkg/sdn/plugin/netid"
... ...
@@ -269,56 +267,41 @@ func (master *OsdnMaster) VnidStartMaster() error {
269 269
 
270 270
 func (master *OsdnMaster) watchNamespaces() {
271 271
 	registry := master.registry
272
-	eventQueue := registry.RunEventQueue(Namespaces)
273 272
 
274
-	for {
275
-		eventType, obj, err := eventQueue.Pop()
276
-		if err != nil {
277
-			utilruntime.HandleError(fmt.Errorf("EventQueue failed for namespaces: %v", err))
278
-			return
279
-		}
280
-		ns := obj.(*kapi.Namespace)
273
+	registry.RunEventQueue(Namespaces, func(delta cache.Delta) error {
274
+		ns := delta.Object.(*kapi.Namespace)
281 275
 		name := ns.ObjectMeta.Name
282 276
 
283
-		log.V(5).Infof("Watch %s event for Namespace %q", strings.Title(string(eventType)), name)
284
-		switch eventType {
285
-		case watch.Added, watch.Modified:
286
-			err = master.vnids.assignVNID(registry, name)
287
-			if err != nil {
288
-				log.Errorf("Error assigning netid: %v", err)
289
-				continue
277
+		log.V(5).Infof("Watch %s event for Namespace %q", delta.Type, name)
278
+		switch delta.Type {
279
+		case cache.Sync, cache.Added, cache.Updated:
280
+			if err := master.vnids.assignVNID(registry, name); err != nil {
281
+				return fmt.Errorf("Error assigning netid: %v", err)
290 282
 			}
291
-		case watch.Deleted:
292
-			err = master.vnids.revokeVNID(registry, name)
293
-			if err != nil {
294
-				log.Errorf("Error revoking netid: %v", err)
295
-				continue
283
+		case cache.Deleted:
284
+			if err := master.vnids.revokeVNID(registry, name); err != nil {
285
+				return fmt.Errorf("Error revoking netid: %v", err)
296 286
 			}
297 287
 		}
298
-	}
288
+		return nil
289
+	})
299 290
 }
300 291
 
301 292
 func (master *OsdnMaster) watchNetNamespaces() {
302 293
 	registry := master.registry
303
-	eventQueue := registry.RunEventQueue(NetNamespaces)
304 294
 
305
-	for {
306
-		eventType, obj, err := eventQueue.Pop()
307
-		if err != nil {
308
-			utilruntime.HandleError(fmt.Errorf("EventQueue failed for network namespaces: %v", err))
309
-			return
310
-		}
311
-		netns := obj.(*osapi.NetNamespace)
295
+	registry.RunEventQueue(NetNamespaces, func(delta cache.Delta) error {
296
+		netns := delta.Object.(*osapi.NetNamespace)
312 297
 		name := netns.ObjectMeta.Name
313 298
 
314
-		log.V(5).Infof("Watch %s event for NetNamespace %q", strings.Title(string(eventType)), name)
315
-		switch eventType {
316
-		case watch.Added, watch.Modified:
317
-			err = master.vnids.updateVNID(registry, netns)
299
+		log.V(5).Infof("Watch %s event for NetNamespace %q", delta.Type, name)
300
+		switch delta.Type {
301
+		case cache.Sync, cache.Added, cache.Updated:
302
+			err := master.vnids.updateVNID(registry, netns)
318 303
 			if err != nil {
319
-				log.Errorf("Error updating netid: %v", err)
320
-				continue
304
+				return fmt.Errorf("Error updating netid: %v", err)
321 305
 			}
322 306
 		}
323
-	}
307
+		return nil
308
+	})
324 309
 }
... ...
@@ -2,19 +2,17 @@ package plugin
2 2
 
3 3
 import (
4 4
 	"fmt"
5
-	"strings"
6 5
 	"sync"
7 6
 	"time"
8 7
 
9 8
 	log "github.com/golang/glog"
10 9
 
11 10
 	kapi "k8s.io/kubernetes/pkg/api"
11
+	"k8s.io/kubernetes/pkg/client/cache"
12 12
 	kubetypes "k8s.io/kubernetes/pkg/kubelet/container"
13 13
 	kerrors "k8s.io/kubernetes/pkg/util/errors"
14
-	utilruntime "k8s.io/kubernetes/pkg/util/runtime"
15 14
 	"k8s.io/kubernetes/pkg/util/sets"
16 15
 	utilwait "k8s.io/kubernetes/pkg/util/wait"
17
-	"k8s.io/kubernetes/pkg/watch"
18 16
 
19 17
 	osapi "github.com/openshift/origin/pkg/sdn/api"
20 18
 )
... ...
@@ -188,42 +186,34 @@ func (node *OsdnNode) updatePodNetwork(namespace string, oldNetID, netID uint32)
188 188
 }
189 189
 
190 190
 func (node *OsdnNode) watchNetNamespaces() {
191
-	eventQueue := node.registry.RunEventQueue(NetNamespaces)
191
+	node.registry.RunEventQueue(NetNamespaces, func(delta cache.Delta) error {
192
+		netns := delta.Object.(*osapi.NetNamespace)
192 193
 
193
-	for {
194
-		eventType, obj, err := eventQueue.Pop()
195
-		if err != nil {
196
-			utilruntime.HandleError(fmt.Errorf("EventQueue failed for network namespaces: %v", err))
197
-			return
198
-		}
199
-		netns := obj.(*osapi.NetNamespace)
200
-
201
-		log.V(5).Infof("Watch %s event for NetNamespace %q", strings.Title(string(eventType)), netns.ObjectMeta.Name)
202
-		switch eventType {
203
-		case watch.Added, watch.Modified:
194
+		log.V(5).Infof("Watch %s event for NetNamespace %q", delta.Type, netns.ObjectMeta.Name)
195
+		switch delta.Type {
196
+		case cache.Sync, cache.Added, cache.Updated:
204 197
 			// Skip this event if the old and new network ids are same
205
-			var oldNetID uint32
206
-			oldNetID, err = node.vnids.GetVNID(netns.NetName)
198
+			oldNetID, err := node.vnids.GetVNID(netns.NetName)
207 199
 			if (err == nil) && (oldNetID == netns.NetID) {
208
-				continue
200
+				break
209 201
 			}
210 202
 			node.vnids.setVNID(netns.NetName, netns.NetID)
211 203
 
212 204
 			err = node.updatePodNetwork(netns.NetName, oldNetID, netns.NetID)
213 205
 			if err != nil {
214
-				log.Errorf("Failed to update pod network for namespace '%s', error: %s", netns.NetName, err)
215 206
 				node.vnids.setVNID(netns.NetName, oldNetID)
216
-				continue
207
+				return fmt.Errorf("failed to update pod network for namespace '%s', error: %s", netns.NetName, err)
217 208
 			}
218
-		case watch.Deleted:
209
+		case cache.Deleted:
219 210
 			// updatePodNetwork needs vnid, so unset vnid after this call
220
-			err = node.updatePodNetwork(netns.NetName, netns.NetID, osapi.GlobalVNID)
211
+			err := node.updatePodNetwork(netns.NetName, netns.NetID, osapi.GlobalVNID)
221 212
 			if err != nil {
222
-				log.Errorf("Failed to update pod network for namespace '%s', error: %s", netns.NetName, err)
213
+				return fmt.Errorf("failed to update pod network for namespace '%s', error: %s", netns.NetName, err)
223 214
 			}
224 215
 			node.vnids.unsetVNID(netns.NetName)
225 216
 		}
226
-	}
217
+		return nil
218
+	})
227 219
 }
228 220
 
229 221
 func isServiceChanged(oldsvc, newsvc *kapi.Service) bool {
... ...
@@ -241,52 +231,42 @@ func isServiceChanged(oldsvc, newsvc *kapi.Service) bool {
241 241
 
242 242
 func (node *OsdnNode) watchServices() {
243 243
 	services := make(map[string]*kapi.Service)
244
-	eventQueue := node.registry.RunEventQueue(Services)
245
-
246
-	for {
247
-		eventType, obj, err := eventQueue.Pop()
248
-		if err != nil {
249
-			utilruntime.HandleError(fmt.Errorf("EventQueue failed for services: %v", err))
250
-			return
251
-		}
252
-		serv := obj.(*kapi.Service)
244
+	node.registry.RunEventQueue(Services, func(delta cache.Delta) error {
245
+		serv := delta.Object.(*kapi.Service)
253 246
 
254 247
 		// Ignore headless services
255 248
 		if !kapi.IsServiceIPSet(serv) {
256
-			continue
249
+			return nil
257 250
 		}
258 251
 
259
-		log.V(5).Infof("Watch %s event for Service %q", strings.Title(string(eventType)), serv.ObjectMeta.Name)
260
-		switch eventType {
261
-		case watch.Added, watch.Modified:
252
+		log.V(5).Infof("Watch %s event for Service %q", delta.Type, serv.ObjectMeta.Name)
253
+		switch delta.Type {
254
+		case cache.Sync, cache.Added, cache.Updated:
262 255
 			oldsvc, exists := services[string(serv.UID)]
263 256
 			if exists {
264 257
 				if !isServiceChanged(oldsvc, serv) {
265
-					continue
258
+					break
266 259
 				}
267
-				if err = node.DeleteServiceRules(oldsvc); err != nil {
260
+				if err := node.DeleteServiceRules(oldsvc); err != nil {
268 261
 					log.Error(err)
269 262
 				}
270 263
 			}
271 264
 
272
-			var netid uint32
273
-			netid, err = node.vnids.WaitAndGetVNID(serv.Namespace)
265
+			netid, err := node.vnids.WaitAndGetVNID(serv.Namespace)
274 266
 			if err != nil {
275
-				log.Errorf("Skipped adding service rules for serviceEvent: %v, Error: %v", eventType, err)
276
-				continue
267
+				return fmt.Errorf("skipped adding service rules for serviceEvent: %v, Error: %v", delta.Type, err)
277 268
 			}
278 269
 
279 270
 			if err = node.AddServiceRules(serv, netid); err != nil {
280
-				log.Error(err)
281
-				continue
271
+				return err
282 272
 			}
283 273
 			services[string(serv.UID)] = serv
284
-		case watch.Deleted:
274
+		case cache.Deleted:
285 275
 			delete(services, string(serv.UID))
286
-
287
-			if err = node.DeleteServiceRules(serv); err != nil {
288
-				log.Error(err)
276
+			if err := node.DeleteServiceRules(serv); err != nil {
277
+				return err
289 278
 			}
290 279
 		}
291
-	}
280
+		return nil
281
+	})
292 282
 }