package plugin import ( "fmt" "reflect" "strings" "testing" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/cache" ) func testKeyFunc(obj interface{}) (string, error) { if d, ok := obj.(cache.DeletedFinalStateUnknown); ok { return d.Key, nil } key, ok := obj.(string) if !ok { return "", cache.KeyError{Obj: obj, Err: fmt.Errorf("object not a string")} } return key, nil } func deltaCompress(deltas cache.Deltas, keyFunc cache.KeyFunc) (newDeltas cache.Deltas, panicked bool, msg string) { defer func() { if r := recover(); r != nil { panicked = true msg = fmt.Sprintf("%#v", r) } }() newDeltas = deltaCompressor(deltas, keyFunc) return } func compressTestDesc(test compressTest) string { var start, result []string for _, delta := range test.initial { start = append(start, string(delta.Type)) } for _, delta := range test.compressed { result = append(result, string(delta.Type)) } return strings.Join(start, "+") + "=" + strings.Join(result, "+") } type compressTest struct { initial cache.Deltas compressed cache.Deltas expectPanic bool } // Test the delta compressor on its own func TestEventQueueDeltaCompressor(t *testing.T) { tests := []compressTest{ // 1. If a cache.Added/cache.Sync is enqueued with state X and a cache.Updated with state Y // is received, these are compressed into (Added/Sync, Y) { initial: cache.Deltas{ {Type: cache.Added, Object: "obj1"}, {Type: cache.Updated, Object: "obj1"}, {Type: cache.Updated, Object: "obj1"}, }, compressed: cache.Deltas{ {Type: cache.Added, Object: "obj1"}, }, }, // 1. If a cache.Added/cache.Sync is enqueued with state X and a cache.Updated with state Y // is received, these are compressed into (Added/Sync, Y) { initial: cache.Deltas{ {Type: cache.Added, Object: "obj1"}, // test that a second object doesn't affect compression of the first {Type: cache.Added, Object: "obj2"}, {Type: cache.Updated, Object: "obj2"}, {Type: cache.Updated, Object: "obj1"}, {Type: cache.Updated, Object: "obj1"}, }, compressed: cache.Deltas{ {Type: cache.Added, Object: "obj2"}, {Type: cache.Added, Object: "obj1"}, }, }, // 1. If a cache.Added/cache.Sync is enqueued with state X and a cache.Updated with state Y // is received, these are compressed into (Added/Sync, Y) { initial: cache.Deltas{ {Type: cache.Sync, Object: "obj1"}, {Type: cache.Updated, Object: "obj1"}, {Type: cache.Updated, Object: "obj1"}, }, compressed: cache.Deltas{ {Type: cache.Sync, Object: "obj1"}, }, }, // 1. If a cache.Added/cache.Sync is enqueued with state X and a cache.Updated with state Y // is received, these are compressed into (Added/Sync, Y) { initial: cache.Deltas{ {Type: cache.Sync, Object: "obj1"}, {Type: cache.Updated, Object: "obj1"}, // test that a second object doesn't affect compression of the first {Type: cache.Added, Object: "obj2"}, {Type: cache.Updated, Object: "obj2"}, {Type: cache.Updated, Object: "obj1"}, }, compressed: cache.Deltas{ {Type: cache.Added, Object: "obj2"}, {Type: cache.Sync, Object: "obj1"}, }, }, // 2. If a cache.Added is enqueued with state X and a cache.Deleted is received with state Y, // these are dropped and consumers will not see either event { initial: cache.Deltas{ {Type: cache.Added, Object: "obj1"}, // test that a second object doesn't affect compression of the first {Type: cache.Added, Object: "obj2"}, {Type: cache.Deleted, Object: "obj2"}, {Type: cache.Deleted, Object: "obj1"}, }, compressed: cache.Deltas{}, }, // 3. If a cache.Sync/cache.Updated is enqueued with state X and a cache.Deleted // is received with state Y, these are compressed into (Deleted, Y) { initial: cache.Deltas{ {Type: cache.Sync, Object: "obj1"}, // test that a second object doesn't affect compression of the first {Type: cache.Sync, Object: "obj2"}, {Type: cache.Updated, Object: "obj2"}, {Type: cache.Deleted, Object: "obj1"}, }, compressed: cache.Deltas{ {Type: cache.Sync, Object: "obj2"}, {Type: cache.Deleted, Object: "obj1"}, }, }, // 3. If a cache.Sync/cache.Updated is enqueued with state X and a cache.Deleted // is received with state Y, these are compressed into (Deleted, Y) { initial: cache.Deltas{ {Type: cache.Updated, Object: "obj1"}, {Type: cache.Deleted, Object: "obj1"}, }, compressed: cache.Deltas{ {Type: cache.Deleted, Object: "obj1"}, }, }, // 4. If a cache.Updated is enqueued with state X and a cache.Updated with state Y is received, // these two events are compressed into (Updated, Y) { initial: cache.Deltas{ {Type: cache.Updated, Object: "obj1"}, {Type: cache.Updated, Object: "obj1"}, }, compressed: cache.Deltas{ {Type: cache.Updated, Object: "obj1"}, }, }, // 5. If a cache.Added is enqueued with state X and a cache.Sync with state Y is received, // these are compressed into (Added, Y) { initial: cache.Deltas{ {Type: cache.Added, Object: "obj1"}, {Type: cache.Sync, Object: "obj1"}, }, compressed: cache.Deltas{ {Type: cache.Added, Object: "obj1"}, }, }, // 6. If a cache.Sync is enqueued with state X and a cache.Sync with state Y is received, // these are compressed into (Sync, Y) { initial: cache.Deltas{ {Type: cache.Sync, Object: "obj1"}, {Type: cache.Sync, Object: "obj1"}, }, compressed: cache.Deltas{ {Type: cache.Sync, Object: "obj1"}, }, }, // 7. Invalid combinations (eg, Sync + Added or Updated + Added) result in a panic. { initial: cache.Deltas{ {Type: cache.Sync, Object: "obj1"}, {Type: cache.Added, Object: "obj1"}, }, compressed: cache.Deltas{}, expectPanic: true, }, // 7. Invalid combinations (eg, Sync + Added or Updated + Added) result in a panic. { initial: cache.Deltas{ {Type: cache.Updated, Object: "obj1"}, {Type: cache.Added, Object: "obj1"}, }, compressed: cache.Deltas{}, expectPanic: true, }, } for _, test := range tests { newDeltas, panicked, msg := deltaCompress(test.initial, testKeyFunc) if panicked != test.expectPanic { t.Fatalf("(%s) unexpected panic result %v (expected %v): %v", compressTestDesc(test), panicked, test.expectPanic, msg) } if test.expectPanic { continue } if len(newDeltas) != len(test.compressed) { t.Fatalf("(%s) wrong number of compressed deltas (got %d, expected %d): %v", compressTestDesc(test), len(newDeltas), len(test.compressed), newDeltas) } for j, expected := range test.compressed { have := newDeltas[j] if expected.Type != have.Type { t.Fatalf("(%s) wrong delta type (got %s, expected %s): %v", compressTestDesc(test), have.Type, expected.Type, newDeltas) } if expected.Object.(string) != have.Object.(string) { t.Fatalf("(%s) wrong delta object key (got %s, expected %s)", compressTestDesc(test), have.Object.(string), expected.Object.(string)) } } } } func TestEventQueueDeltaCompressorDeletedFinalStateUnknown(t *testing.T) { deletedObj := cache.DeletedFinalStateUnknown{ Key: "namespace1/obj1", Obj: &api.ObjectMeta{Name: "obj1", Namespace: "namespace1"}, } initial := cache.Deltas{ { Type: cache.Deleted, Object: deletedObj, }, } newDeltas, panicked, msg := deltaCompress(initial, DeletionHandlingMetaNamespaceKeyFunc) if panicked { t.Fatalf("unexpected panic: %v", msg) } if len(newDeltas) != 1 { t.Fatalf("wrong number of compressed deltas (got %d, expected 1): %v", len(newDeltas), newDeltas) } if newDeltas[0].Type != cache.Deleted { t.Fatalf("unexpected delta type %v (expected Deleted)", newDeltas[0].Type) } if !reflect.DeepEqual(newDeltas[0].Object, deletedObj) { t.Fatalf("unexpected delta object %v (expected %v)", newDeltas[0].Object, deletedObj) } } // Ensure the compressor panics when its given a keyFunc that can't handle the delta object func TestEventQueueDeltaCompressorDeletedFinalStateUnknown2(t *testing.T) { initial := cache.Deltas{ { Type: cache.Deleted, Object: cache.DeletedFinalStateUnknown{ Key: "namespace1/obj1", Obj: &api.ObjectMeta{Name: "obj1", Namespace: "namespace1"}, }, }, } _, panicked, _ := deltaCompress(initial, cache.MetaNamespaceKeyFunc) if !panicked { t.Fatalf("expected panic but didn't get one") } } type initialDelta struct { deltaType cache.DeltaType object interface{} // knownObjects should be given for Sync DeltaTypes knownObjects []interface{} } type eventQueueTest struct { initial []initialDelta compressed cache.Deltas knownObjects []interface{} expectPanic bool } func testDesc(test eventQueueTest) string { var start, result []string for _, delta := range test.initial { start = append(start, string(delta.deltaType)) } for _, delta := range test.compressed { result = append(result, string(delta.Type)) } return strings.Join(start, "+") + "=" + strings.Join(result, "+") } // Returns false on success, true on panic func addInitialDeltas(queue *EventQueue, deltas []initialDelta) (panicked bool, msg string) { defer func() { if r := recover(); r != nil { panicked = true msg = fmt.Sprintf("%#v", r) } }() for _, initial := range deltas { switch initial.deltaType { case cache.Added: queue.Add(initial.object) case cache.Updated: queue.Update(initial.object) case cache.Deleted: queue.Delete(initial.object) case cache.Sync: // knownObjects should be valid for Sync operations queue.Replace(initial.knownObjects, "123") } if initial.object != nil { queue.updateKnownObjects(cache.Delta{Type: initial.deltaType, Object: initial.object}) } } return } // Test the whole event queue, not just the compressor itself; this will exercise // DeltaFIFO constructs like DeletedFinalStateUnknown, the EventQueue internal // store, the DeltaFIFO deletion compression, and the DeltaFIFO knownObjects array func TestEventQueueCompress(t *testing.T) { tests := []eventQueueTest{ // 1. If a cache.Added/cache.Sync is enqueued with state X and a cache.Updated with state Y // is received, these are compressed into (Added/Sync, Y) { initial: []initialDelta{ {deltaType: cache.Added, object: "obj1"}, {deltaType: cache.Updated, object: "obj1"}, {deltaType: cache.Updated, object: "obj1"}, }, compressed: cache.Deltas{ {Type: cache.Added, Object: "obj1"}, }, }, // 1. If a cache.Added/cache.Sync is enqueued with state X and a cache.Updated with state Y // is received, these are compressed into (Added/Sync, Y) { initial: []initialDelta{ {deltaType: cache.Added, object: "obj1"}, // test that a second object doesn't affect compression of the first {deltaType: cache.Added, object: "obj2"}, {deltaType: cache.Updated, object: "obj2"}, {deltaType: cache.Updated, object: "obj1"}, {deltaType: cache.Updated, object: "obj1"}, }, compressed: cache.Deltas{ {Type: cache.Added, Object: "obj1"}, }, }, // 1. If a cache.Added/cache.Sync is enqueued with state X and a cache.Updated with state Y // is received, these are compressed into (Added/Sync, Y) { initial: []initialDelta{ {deltaType: cache.Sync, object: "obj1", knownObjects: []interface{}{"obj1"}}, {deltaType: cache.Updated, object: "obj1"}, {deltaType: cache.Updated, object: "obj1"}, }, compressed: cache.Deltas{ {Type: cache.Sync, Object: "obj1"}, }, }, // 1. If a cache.Added/cache.Sync is enqueued with state X and a cache.Updated with state Y // is received, these are compressed into (Added/Sync, Y) { initial: []initialDelta{ {deltaType: cache.Sync, object: "obj1", knownObjects: []interface{}{"obj1"}}, {deltaType: cache.Updated, object: "obj1"}, // test that a second object doesn't affect compression of the first {deltaType: cache.Added, object: "obj2"}, {deltaType: cache.Updated, object: "obj2"}, {deltaType: cache.Updated, object: "obj1"}, }, compressed: cache.Deltas{ {Type: cache.Sync, Object: "obj1"}, }, }, // 2. If a cache.Added is enqueued with state X and a cache.Deleted is received with state Y, // these are dropped and consumers will not see either event { initial: []initialDelta{ {deltaType: cache.Added, object: "obj1"}, // test that a second object doesn't affect compression of the first {deltaType: cache.Added, object: "obj2"}, {deltaType: cache.Deleted, object: "obj2"}, {deltaType: cache.Deleted, object: "obj1"}, }, compressed: cache.Deltas{}, }, // 3. If a cache.Sync/cache.Updated is enqueued with state X and a cache.Deleted // is received with state Y, these are compressed into (Deleted, Y) { initial: []initialDelta{ {deltaType: cache.Sync, object: "obj1", knownObjects: []interface{}{"obj1"}}, // test that a second object doesn't affect compression of the first {deltaType: cache.Sync, object: "obj2", knownObjects: []interface{}{"obj1", "obj2"}}, {deltaType: cache.Updated, object: "obj2"}, {deltaType: cache.Deleted, object: "obj1"}, }, compressed: cache.Deltas{ {Type: cache.Deleted, Object: "obj1"}, }, }, // 3. If a cache.Sync/cache.Updated is enqueued with state X and a cache.Deleted // is received with state Y, these are compressed into (Deleted, Y) { initial: []initialDelta{ {deltaType: cache.Updated, object: "obj1"}, {deltaType: cache.Deleted, object: "obj1"}, }, compressed: cache.Deltas{ {Type: cache.Deleted, Object: "obj1"}, }, }, // 4. If a cache.Updated is enqueued with state X and a cache.Updated with state Y is received, // these two events are compressed into (Updated, Y) { initial: []initialDelta{ {deltaType: cache.Updated, object: "obj1"}, {deltaType: cache.Updated, object: "obj1"}, }, compressed: cache.Deltas{ {Type: cache.Updated, Object: "obj1"}, }, }, // 5. If a cache.Added is enqueued with state X and a cache.Sync with state Y is received, // these are compressed into (Added, Y) { initial: []initialDelta{ {deltaType: cache.Added, object: "obj1"}, {deltaType: cache.Sync, object: "obj1", knownObjects: []interface{}{"obj1"}}, }, compressed: cache.Deltas{ {Type: cache.Added, Object: "obj1"}, }, }, // 6. If a cache.Sync is enqueued with state X and a cache.Sync with state Y is received, // these are compressed into (Sync, Y) { initial: []initialDelta{ {deltaType: cache.Sync, object: "obj1", knownObjects: []interface{}{"obj1"}}, {deltaType: cache.Sync, object: "obj1", knownObjects: []interface{}{"obj1"}}, }, compressed: cache.Deltas{ {Type: cache.Sync, Object: "obj1"}, }, }, // 7. Invalid combinations (eg, Sync + Added or Updated + Added) result in a panic. { initial: []initialDelta{ {deltaType: cache.Sync, object: "obj1", knownObjects: []interface{}{"obj1"}}, {deltaType: cache.Added, object: "obj1"}, }, compressed: cache.Deltas{}, expectPanic: true, }, // 7. Invalid combinations (eg, Sync + Added or Updated + Added) result in a panic. { initial: []initialDelta{ {deltaType: cache.Updated, object: "obj1"}, {deltaType: cache.Added, object: "obj1"}, }, compressed: cache.Deltas{}, expectPanic: true, }, // Ensure DeletedFinalStateUnknown objects can be compressed { initial: []initialDelta{ {deltaType: cache.Added, object: "obj1"}, {deltaType: cache.Sync, knownObjects: []interface{}{}}, }, compressed: cache.Deltas{}, }, } for _, test := range tests { queue := NewEventQueue(testKeyFunc) panicked, msg := addInitialDeltas(queue, test.initial) if panicked != test.expectPanic { t.Fatalf("(%s) unexpected panic result %v (expected %v): %v", testDesc(test), panicked, test.expectPanic, msg) } if test.expectPanic { continue } items, ok, err := queue.Get("obj1") if err != nil { t.Fatalf("(%s) error getting expected object: %v", testDesc(test), err) } if len(test.compressed) > 0 { if !ok { t.Fatalf("(%s) expected object doesn't exist", testDesc(test)) } compressedDeltas := items.(cache.Deltas) if len(compressedDeltas) != len(test.compressed) { t.Fatalf("(%s) wrong number of compressed deltas (got %d, expected %d)", testDesc(test), len(compressedDeltas), len(test.compressed)) } for j, expected := range test.compressed { have := compressedDeltas[j] if expected.Type != have.Type { t.Fatalf("(%s) wrong delta type (got %s, expected %s)", testDesc(test), have.Type, expected.Type) } if expected.Object.(string) != have.Object.(string) { t.Fatalf("(%s) wrong delta object key (got %s, expected %s)", testDesc(test), have.Object.(string), expected.Object.(string)) } } } else if ok { t.Fatalf("(%s) unexpected object %v", testDesc(test), items) } } } // Test that single events are passed through uncompressed func TestEventQueueUncompressed(t *testing.T) { obj := "obj1" for _, dtype := range []cache.DeltaType{cache.Added, cache.Updated, cache.Deleted, cache.Sync} { queue := NewEventQueue(testKeyFunc) // Deleted requires the object to already be in the known objects // list, and we must pop that cache.Added off before testing // to ensure the Deleted delta comes through even when the queue // is empty. if dtype == cache.Deleted { queue.Add(obj) items, err := queue.Pop(func(delta cache.Delta) error { return nil }, nil) if err != nil { t.Fatalf("(%s) unexpected error popping initial Added delta: %v", dtype, err) } deltas := items.(cache.Deltas) if len(deltas) != 1 { t.Fatalf("(%s) expected 1 delta popping initial Added, got %d", dtype, len(deltas)) } if deltas[0].Type != cache.Added { t.Fatalf("(%s) expected initial Added delta, got %v", dtype, deltas[0].Type) } } // Now add the real delta type under test switch dtype { case cache.Added: queue.Add(obj) case cache.Updated: queue.Update(obj) case cache.Deleted: queue.Delete(obj) case cache.Sync: queue.Replace([]interface{}{obj}, "123") } // And pop the expected item out of the queue items, err := queue.Pop(func(delta cache.Delta) error { return nil }, nil) if err != nil { t.Fatalf("(%s) unexpected error popping delta: %v", dtype, err) } deltas := items.(cache.Deltas) if len(deltas) != 1 { t.Fatalf("(%s) expected 1 delta popping delta, got %d", dtype, len(deltas)) } if deltas[0].Type != dtype { t.Fatalf("(%s) expected same delta, got %v", dtype, deltas[0].Type) } } } // Test that DeletedFinalStateUnknown objects are handled correctly func TestEventQueueDeletedFinalStateUnknown(t *testing.T) { queue := NewEventQueue(DeletionHandlingMetaNamespaceKeyFunc) obj1 := &api.ObjectMeta{Name: "obj1", Namespace: "namespace1"} obj2 := &api.ObjectMeta{Name: "obj2", Namespace: "namespace1"} // Make sure objects are in knownObjects but not in the delta queue, // to ensure we get DeletedFinalStateUnknown delta objects queue.knownObjects.Add(obj1) queue.knownObjects.Add(obj2) // This should create two DeletedFinalStateUnknown objects queue.Replace([]interface{}{}, "123") // First test that we get actual DeletedFinalStateUnknown objects var called bool var processErr error if _, err := queue.Pop(func(delta cache.Delta) error { called = true if _, ok := delta.Object.(cache.DeletedFinalStateUnknown); !ok { // Capture error that Pop() logs the error but doesn't return processErr = fmt.Errorf("Unexpected item type %T", delta.Object) return processErr } return nil }, nil); err != nil { t.Fatalf(fmt.Sprintf("%v", err)) } if !called { t.Fatalf("Delta pop function wasn't called") } if processErr != nil { t.Fatalf("Delta pop function returned error %v", processErr) } // Repeat but this time make sure we get the objects we want, not DeletedFinalStateUnknown queue = NewEventQueue(DeletionHandlingMetaNamespaceKeyFunc) queue.knownObjects.Add(obj1) queue.knownObjects.Add(obj2) // This should create two DeletedFinalStateUnknown objects queue.Replace([]interface{}{}, "123") // Now test that we only get api.ObjectMeta objects since we passed that // as the expected type called = false if _, err := queue.Pop(func(delta cache.Delta) error { called = true if _, ok := delta.Object.(*api.ObjectMeta); !ok { // Capture error that Pop() logs the error but doesn't return processErr = fmt.Errorf("Unexpected item type %T", delta.Object) return processErr } return nil }, &api.ObjectMeta{}); err != nil { t.Fatalf(fmt.Sprintf("%v", err)) } if !called { t.Fatalf("Delta pop function wasn't called") } if processErr != nil { t.Fatalf("Delta pop function returned error %v", processErr) } }