package plugin
import (
"fmt"
"reflect"
"strings"
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
)
func testKeyFunc(obj interface{}) (string, error) {
if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
return d.Key, nil
}
key, ok := obj.(string)
if !ok {
return "", cache.KeyError{Obj: obj, Err: fmt.Errorf("object not a string")}
}
return key, nil
}
func deltaCompress(deltas cache.Deltas, keyFunc cache.KeyFunc) (newDeltas cache.Deltas, panicked bool, msg string) {
defer func() {
if r := recover(); r != nil {
panicked = true
msg = fmt.Sprintf("%#v", r)
}
}()
newDeltas = deltaCompressor(deltas, keyFunc)
return
}
func compressTestDesc(test compressTest) string {
var start, result []string
for _, delta := range test.initial {
start = append(start, string(delta.Type))
}
for _, delta := range test.compressed {
result = append(result, string(delta.Type))
}
return strings.Join(start, "+") + "=" + strings.Join(result, "+")
}
type compressTest struct {
initial cache.Deltas
compressed cache.Deltas
expectPanic bool
}
// Test the delta compressor on its own
func TestEventQueueDeltaCompressor(t *testing.T) {
tests := []compressTest{
// 1. If a cache.Added/cache.Sync is enqueued with state X and a cache.Updated with state Y
// is received, these are compressed into (Added/Sync, Y)
{
initial: cache.Deltas{
{Type: cache.Added, Object: "obj1"},
{Type: cache.Updated, Object: "obj1"},
{Type: cache.Updated, Object: "obj1"},
},
compressed: cache.Deltas{
{Type: cache.Added, Object: "obj1"},
},
},
// 1. If a cache.Added/cache.Sync is enqueued with state X and a cache.Updated with state Y
// is received, these are compressed into (Added/Sync, Y)
{
initial: cache.Deltas{
{Type: cache.Added, Object: "obj1"},
// test that a second object doesn't affect compression of the first
{Type: cache.Added, Object: "obj2"},
{Type: cache.Updated, Object: "obj2"},
{Type: cache.Updated, Object: "obj1"},
{Type: cache.Updated, Object: "obj1"},
},
compressed: cache.Deltas{
{Type: cache.Added, Object: "obj2"},
{Type: cache.Added, Object: "obj1"},
},
},
// 1. If a cache.Added/cache.Sync is enqueued with state X and a cache.Updated with state Y
// is received, these are compressed into (Added/Sync, Y)
{
initial: cache.Deltas{
{Type: cache.Sync, Object: "obj1"},
{Type: cache.Updated, Object: "obj1"},
{Type: cache.Updated, Object: "obj1"},
},
compressed: cache.Deltas{
{Type: cache.Sync, Object: "obj1"},
},
},
// 1. If a cache.Added/cache.Sync is enqueued with state X and a cache.Updated with state Y
// is received, these are compressed into (Added/Sync, Y)
{
initial: cache.Deltas{
{Type: cache.Sync, Object: "obj1"},
{Type: cache.Updated, Object: "obj1"},
// test that a second object doesn't affect compression of the first
{Type: cache.Added, Object: "obj2"},
{Type: cache.Updated, Object: "obj2"},
{Type: cache.Updated, Object: "obj1"},
},
compressed: cache.Deltas{
{Type: cache.Added, Object: "obj2"},
{Type: cache.Sync, Object: "obj1"},
},
},
// 2. If a cache.Added is enqueued with state X and a cache.Deleted is received with state Y,
// these are dropped and consumers will not see either event
{
initial: cache.Deltas{
{Type: cache.Added, Object: "obj1"},
// test that a second object doesn't affect compression of the first
{Type: cache.Added, Object: "obj2"},
{Type: cache.Deleted, Object: "obj2"},
{Type: cache.Deleted, Object: "obj1"},
},
compressed: cache.Deltas{},
},
// 3. If a cache.Sync/cache.Updated is enqueued with state X and a cache.Deleted
// is received with state Y, these are compressed into (Deleted, Y)
{
initial: cache.Deltas{
{Type: cache.Sync, Object: "obj1"},
// test that a second object doesn't affect compression of the first
{Type: cache.Sync, Object: "obj2"},
{Type: cache.Updated, Object: "obj2"},
{Type: cache.Deleted, Object: "obj1"},
},
compressed: cache.Deltas{
{Type: cache.Sync, Object: "obj2"},
{Type: cache.Deleted, Object: "obj1"},
},
},
// 3. If a cache.Sync/cache.Updated is enqueued with state X and a cache.Deleted
// is received with state Y, these are compressed into (Deleted, Y)
{
initial: cache.Deltas{
{Type: cache.Updated, Object: "obj1"},
{Type: cache.Deleted, Object: "obj1"},
},
compressed: cache.Deltas{
{Type: cache.Deleted, Object: "obj1"},
},
},
// 4. If a cache.Updated is enqueued with state X and a cache.Updated with state Y is received,
// these two events are compressed into (Updated, Y)
{
initial: cache.Deltas{
{Type: cache.Updated, Object: "obj1"},
{Type: cache.Updated, Object: "obj1"},
},
compressed: cache.Deltas{
{Type: cache.Updated, Object: "obj1"},
},
},
// 5. If a cache.Added is enqueued with state X and a cache.Sync with state Y is received,
// these are compressed into (Added, Y)
{
initial: cache.Deltas{
{Type: cache.Added, Object: "obj1"},
{Type: cache.Sync, Object: "obj1"},
},
compressed: cache.Deltas{
{Type: cache.Added, Object: "obj1"},
},
},
// 6. If a cache.Sync is enqueued with state X and a cache.Sync with state Y is received,
// these are compressed into (Sync, Y)
{
initial: cache.Deltas{
{Type: cache.Sync, Object: "obj1"},
{Type: cache.Sync, Object: "obj1"},
},
compressed: cache.Deltas{
{Type: cache.Sync, Object: "obj1"},
},
},
// 7. Invalid combinations (eg, Sync + Added or Updated + Added) result in a panic.
{
initial: cache.Deltas{
{Type: cache.Sync, Object: "obj1"},
{Type: cache.Added, Object: "obj1"},
},
compressed: cache.Deltas{},
expectPanic: true,
},
// 7. Invalid combinations (eg, Sync + Added or Updated + Added) result in a panic.
{
initial: cache.Deltas{
{Type: cache.Updated, Object: "obj1"},
{Type: cache.Added, Object: "obj1"},
},
compressed: cache.Deltas{},
expectPanic: true,
},
}
for _, test := range tests {
newDeltas, panicked, msg := deltaCompress(test.initial, testKeyFunc)
if panicked != test.expectPanic {
t.Fatalf("(%s) unexpected panic result %v (expected %v): %v", compressTestDesc(test), panicked, test.expectPanic, msg)
}
if test.expectPanic {
continue
}
if len(newDeltas) != len(test.compressed) {
t.Fatalf("(%s) wrong number of compressed deltas (got %d, expected %d): %v", compressTestDesc(test), len(newDeltas), len(test.compressed), newDeltas)
}
for j, expected := range test.compressed {
have := newDeltas[j]
if expected.Type != have.Type {
t.Fatalf("(%s) wrong delta type (got %s, expected %s): %v", compressTestDesc(test), have.Type, expected.Type, newDeltas)
}
if expected.Object.(string) != have.Object.(string) {
t.Fatalf("(%s) wrong delta object key (got %s, expected %s)", compressTestDesc(test), have.Object.(string), expected.Object.(string))
}
}
}
}
func TestEventQueueDeltaCompressorDeletedFinalStateUnknown(t *testing.T) {
deletedObj := cache.DeletedFinalStateUnknown{
Key: "namespace1/obj1",
Obj: &api.ObjectMeta{Name: "obj1", Namespace: "namespace1"},
}
initial := cache.Deltas{
{
Type: cache.Deleted,
Object: deletedObj,
},
}
newDeltas, panicked, msg := deltaCompress(initial, DeletionHandlingMetaNamespaceKeyFunc)
if panicked {
t.Fatalf("unexpected panic: %v", msg)
}
if len(newDeltas) != 1 {
t.Fatalf("wrong number of compressed deltas (got %d, expected 1): %v", len(newDeltas), newDeltas)
}
if newDeltas[0].Type != cache.Deleted {
t.Fatalf("unexpected delta type %v (expected Deleted)", newDeltas[0].Type)
}
if !reflect.DeepEqual(newDeltas[0].Object, deletedObj) {
t.Fatalf("unexpected delta object %v (expected %v)", newDeltas[0].Object, deletedObj)
}
}
// Ensure the compressor panics when its given a keyFunc that can't handle the delta object
func TestEventQueueDeltaCompressorDeletedFinalStateUnknown2(t *testing.T) {
initial := cache.Deltas{
{
Type: cache.Deleted,
Object: cache.DeletedFinalStateUnknown{
Key: "namespace1/obj1",
Obj: &api.ObjectMeta{Name: "obj1", Namespace: "namespace1"},
},
},
}
_, panicked, _ := deltaCompress(initial, cache.MetaNamespaceKeyFunc)
if !panicked {
t.Fatalf("expected panic but didn't get one")
}
}
type initialDelta struct {
deltaType cache.DeltaType
object interface{}
// knownObjects should be given for Sync DeltaTypes
knownObjects []interface{}
}
type eventQueueTest struct {
initial []initialDelta
compressed cache.Deltas
knownObjects []interface{}
expectPanic bool
}
func testDesc(test eventQueueTest) string {
var start, result []string
for _, delta := range test.initial {
start = append(start, string(delta.deltaType))
}
for _, delta := range test.compressed {
result = append(result, string(delta.Type))
}
return strings.Join(start, "+") + "=" + strings.Join(result, "+")
}
// Returns false on success, true on panic
func addInitialDeltas(queue *EventQueue, deltas []initialDelta) (panicked bool, msg string) {
defer func() {
if r := recover(); r != nil {
panicked = true
msg = fmt.Sprintf("%#v", r)
}
}()
for _, initial := range deltas {
switch initial.deltaType {
case cache.Added:
queue.Add(initial.object)
case cache.Updated:
queue.Update(initial.object)
case cache.Deleted:
queue.Delete(initial.object)
case cache.Sync:
// knownObjects should be valid for Sync operations
queue.Replace(initial.knownObjects, "123")
}
if initial.object != nil {
queue.updateKnownObjects(cache.Delta{Type: initial.deltaType, Object: initial.object})
}
}
return
}
// Test the whole event queue, not just the compressor itself; this will exercise
// DeltaFIFO constructs like DeletedFinalStateUnknown, the EventQueue internal
// store, the DeltaFIFO deletion compression, and the DeltaFIFO knownObjects array
func TestEventQueueCompress(t *testing.T) {
tests := []eventQueueTest{
// 1. If a cache.Added/cache.Sync is enqueued with state X and a cache.Updated with state Y
// is received, these are compressed into (Added/Sync, Y)
{
initial: []initialDelta{
{deltaType: cache.Added, object: "obj1"},
{deltaType: cache.Updated, object: "obj1"},
{deltaType: cache.Updated, object: "obj1"},
},
compressed: cache.Deltas{
{Type: cache.Added, Object: "obj1"},
},
},
// 1. If a cache.Added/cache.Sync is enqueued with state X and a cache.Updated with state Y
// is received, these are compressed into (Added/Sync, Y)
{
initial: []initialDelta{
{deltaType: cache.Added, object: "obj1"},
// test that a second object doesn't affect compression of the first
{deltaType: cache.Added, object: "obj2"},
{deltaType: cache.Updated, object: "obj2"},
{deltaType: cache.Updated, object: "obj1"},
{deltaType: cache.Updated, object: "obj1"},
},
compressed: cache.Deltas{
{Type: cache.Added, Object: "obj1"},
},
},
// 1. If a cache.Added/cache.Sync is enqueued with state X and a cache.Updated with state Y
// is received, these are compressed into (Added/Sync, Y)
{
initial: []initialDelta{
{deltaType: cache.Sync, object: "obj1", knownObjects: []interface{}{"obj1"}},
{deltaType: cache.Updated, object: "obj1"},
{deltaType: cache.Updated, object: "obj1"},
},
compressed: cache.Deltas{
{Type: cache.Sync, Object: "obj1"},
},
},
// 1. If a cache.Added/cache.Sync is enqueued with state X and a cache.Updated with state Y
// is received, these are compressed into (Added/Sync, Y)
{
initial: []initialDelta{
{deltaType: cache.Sync, object: "obj1", knownObjects: []interface{}{"obj1"}},
{deltaType: cache.Updated, object: "obj1"},
// test that a second object doesn't affect compression of the first
{deltaType: cache.Added, object: "obj2"},
{deltaType: cache.Updated, object: "obj2"},
{deltaType: cache.Updated, object: "obj1"},
},
compressed: cache.Deltas{
{Type: cache.Sync, Object: "obj1"},
},
},
// 2. If a cache.Added is enqueued with state X and a cache.Deleted is received with state Y,
// these are dropped and consumers will not see either event
{
initial: []initialDelta{
{deltaType: cache.Added, object: "obj1"},
// test that a second object doesn't affect compression of the first
{deltaType: cache.Added, object: "obj2"},
{deltaType: cache.Deleted, object: "obj2"},
{deltaType: cache.Deleted, object: "obj1"},
},
compressed: cache.Deltas{},
},
// 3. If a cache.Sync/cache.Updated is enqueued with state X and a cache.Deleted
// is received with state Y, these are compressed into (Deleted, Y)
{
initial: []initialDelta{
{deltaType: cache.Sync, object: "obj1", knownObjects: []interface{}{"obj1"}},
// test that a second object doesn't affect compression of the first
{deltaType: cache.Sync, object: "obj2", knownObjects: []interface{}{"obj1", "obj2"}},
{deltaType: cache.Updated, object: "obj2"},
{deltaType: cache.Deleted, object: "obj1"},
},
compressed: cache.Deltas{
{Type: cache.Deleted, Object: "obj1"},
},
},
// 3. If a cache.Sync/cache.Updated is enqueued with state X and a cache.Deleted
// is received with state Y, these are compressed into (Deleted, Y)
{
initial: []initialDelta{
{deltaType: cache.Updated, object: "obj1"},
{deltaType: cache.Deleted, object: "obj1"},
},
compressed: cache.Deltas{
{Type: cache.Deleted, Object: "obj1"},
},
},
// 4. If a cache.Updated is enqueued with state X and a cache.Updated with state Y is received,
// these two events are compressed into (Updated, Y)
{
initial: []initialDelta{
{deltaType: cache.Updated, object: "obj1"},
{deltaType: cache.Updated, object: "obj1"},
},
compressed: cache.Deltas{
{Type: cache.Updated, Object: "obj1"},
},
},
// 5. If a cache.Added is enqueued with state X and a cache.Sync with state Y is received,
// these are compressed into (Added, Y)
{
initial: []initialDelta{
{deltaType: cache.Added, object: "obj1"},
{deltaType: cache.Sync, object: "obj1", knownObjects: []interface{}{"obj1"}},
},
compressed: cache.Deltas{
{Type: cache.Added, Object: "obj1"},
},
},
// 6. If a cache.Sync is enqueued with state X and a cache.Sync with state Y is received,
// these are compressed into (Sync, Y)
{
initial: []initialDelta{
{deltaType: cache.Sync, object: "obj1", knownObjects: []interface{}{"obj1"}},
{deltaType: cache.Sync, object: "obj1", knownObjects: []interface{}{"obj1"}},
},
compressed: cache.Deltas{
{Type: cache.Sync, Object: "obj1"},
},
},
// 7. Invalid combinations (eg, Sync + Added or Updated + Added) result in a panic.
{
initial: []initialDelta{
{deltaType: cache.Sync, object: "obj1", knownObjects: []interface{}{"obj1"}},
{deltaType: cache.Added, object: "obj1"},
},
compressed: cache.Deltas{},
expectPanic: true,
},
// 7. Invalid combinations (eg, Sync + Added or Updated + Added) result in a panic.
{
initial: []initialDelta{
{deltaType: cache.Updated, object: "obj1"},
{deltaType: cache.Added, object: "obj1"},
},
compressed: cache.Deltas{},
expectPanic: true,
},
// Ensure DeletedFinalStateUnknown objects can be compressed
{
initial: []initialDelta{
{deltaType: cache.Added, object: "obj1"},
{deltaType: cache.Sync, knownObjects: []interface{}{}},
},
compressed: cache.Deltas{},
},
}
for _, test := range tests {
queue := NewEventQueue(testKeyFunc)
panicked, msg := addInitialDeltas(queue, test.initial)
if panicked != test.expectPanic {
t.Fatalf("(%s) unexpected panic result %v (expected %v): %v", testDesc(test), panicked, test.expectPanic, msg)
}
if test.expectPanic {
continue
}
items, ok, err := queue.Get("obj1")
if err != nil {
t.Fatalf("(%s) error getting expected object: %v", testDesc(test), err)
}
if len(test.compressed) > 0 {
if !ok {
t.Fatalf("(%s) expected object doesn't exist", testDesc(test))
}
compressedDeltas := items.(cache.Deltas)
if len(compressedDeltas) != len(test.compressed) {
t.Fatalf("(%s) wrong number of compressed deltas (got %d, expected %d)", testDesc(test), len(compressedDeltas), len(test.compressed))
}
for j, expected := range test.compressed {
have := compressedDeltas[j]
if expected.Type != have.Type {
t.Fatalf("(%s) wrong delta type (got %s, expected %s)", testDesc(test), have.Type, expected.Type)
}
if expected.Object.(string) != have.Object.(string) {
t.Fatalf("(%s) wrong delta object key (got %s, expected %s)", testDesc(test), have.Object.(string), expected.Object.(string))
}
}
} else if ok {
t.Fatalf("(%s) unexpected object %v", testDesc(test), items)
}
}
}
// Test that single events are passed through uncompressed
func TestEventQueueUncompressed(t *testing.T) {
obj := "obj1"
for _, dtype := range []cache.DeltaType{cache.Added, cache.Updated, cache.Deleted, cache.Sync} {
queue := NewEventQueue(testKeyFunc)
// Deleted requires the object to already be in the known objects
// list, and we must pop that cache.Added off before testing
// to ensure the Deleted delta comes through even when the queue
// is empty.
if dtype == cache.Deleted {
queue.Add(obj)
items, err := queue.Pop(func(delta cache.Delta) error {
return nil
}, nil)
if err != nil {
t.Fatalf("(%s) unexpected error popping initial Added delta: %v", dtype, err)
}
deltas := items.(cache.Deltas)
if len(deltas) != 1 {
t.Fatalf("(%s) expected 1 delta popping initial Added, got %d", dtype, len(deltas))
}
if deltas[0].Type != cache.Added {
t.Fatalf("(%s) expected initial Added delta, got %v", dtype, deltas[0].Type)
}
}
// Now add the real delta type under test
switch dtype {
case cache.Added:
queue.Add(obj)
case cache.Updated:
queue.Update(obj)
case cache.Deleted:
queue.Delete(obj)
case cache.Sync:
queue.Replace([]interface{}{obj}, "123")
}
// And pop the expected item out of the queue
items, err := queue.Pop(func(delta cache.Delta) error {
return nil
}, nil)
if err != nil {
t.Fatalf("(%s) unexpected error popping delta: %v", dtype, err)
}
deltas := items.(cache.Deltas)
if len(deltas) != 1 {
t.Fatalf("(%s) expected 1 delta popping delta, got %d", dtype, len(deltas))
}
if deltas[0].Type != dtype {
t.Fatalf("(%s) expected same delta, got %v", dtype, deltas[0].Type)
}
}
}
// Test that DeletedFinalStateUnknown objects are handled correctly
func TestEventQueueDeletedFinalStateUnknown(t *testing.T) {
queue := NewEventQueue(DeletionHandlingMetaNamespaceKeyFunc)
obj1 := &api.ObjectMeta{Name: "obj1", Namespace: "namespace1"}
obj2 := &api.ObjectMeta{Name: "obj2", Namespace: "namespace1"}
// Make sure objects are in knownObjects but not in the delta queue,
// to ensure we get DeletedFinalStateUnknown delta objects
queue.knownObjects.Add(obj1)
queue.knownObjects.Add(obj2)
// This should create two DeletedFinalStateUnknown objects
queue.Replace([]interface{}{}, "123")
// First test that we get actual DeletedFinalStateUnknown objects
var called bool
var processErr error
if _, err := queue.Pop(func(delta cache.Delta) error {
called = true
if _, ok := delta.Object.(cache.DeletedFinalStateUnknown); !ok {
// Capture error that Pop() logs the error but doesn't return
processErr = fmt.Errorf("Unexpected item type %T", delta.Object)
return processErr
}
return nil
}, nil); err != nil {
t.Fatalf(fmt.Sprintf("%v", err))
}
if !called {
t.Fatalf("Delta pop function wasn't called")
}
if processErr != nil {
t.Fatalf("Delta pop function returned error %v", processErr)
}
// Repeat but this time make sure we get the objects we want, not DeletedFinalStateUnknown
queue = NewEventQueue(DeletionHandlingMetaNamespaceKeyFunc)
queue.knownObjects.Add(obj1)
queue.knownObjects.Add(obj2)
// This should create two DeletedFinalStateUnknown objects
queue.Replace([]interface{}{}, "123")
// Now test that we only get api.ObjectMeta objects since we passed that
// as the expected type
called = false
if _, err := queue.Pop(func(delta cache.Delta) error {
called = true
if _, ok := delta.Object.(*api.ObjectMeta); !ok {
// Capture error that Pop() logs the error but doesn't return
processErr = fmt.Errorf("Unexpected item type %T", delta.Object)
return processErr
}
return nil
}, &api.ObjectMeta{}); err != nil {
t.Fatalf(fmt.Sprintf("%v", err))
}
if !called {
t.Fatalf("Delta pop function wasn't called")
}
if processErr != nil {
t.Fatalf("Delta pop function returned error %v", processErr)
}
}