package plugin
import (
"fmt"
"reflect"
"k8s.io/kubernetes/pkg/client/cache"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
)
// EventQueue is an enhanced DeltaFIFO that provides reliable Deleted deltas
// even if no knownObjects store is given, and compresses multiple deltas
// to reduce duplicate events.
//
// Without a store, DeltaFIFO will drop Deleted deltas when its queue is empty
// because the deleted object is not present in the queue and DeltaFIFO tries
// to protect against duplicate Deleted deltas resulting from Replace().
//
// To get reliable deletion, a store must be provided, and EventQueue provides
// one if the caller does not.
type EventQueue struct {
*cache.DeltaFIFO
// Private store if not intitialized with one to ensure deletion
// events are always recognized.
knownObjects cache.Store
}
func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) {
if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
return d.Key, nil
}
return cache.MetaNamespaceKeyFunc(obj)
}
func NewEventQueue(keyFunc cache.KeyFunc) *EventQueue {
knownObjects := cache.NewStore(keyFunc)
return &EventQueue{
DeltaFIFO: cache.NewDeltaFIFO(
keyFunc,
cache.DeltaCompressorFunc(func(d cache.Deltas) cache.Deltas {
return deltaCompressor(d, keyFunc)
}),
knownObjects),
knownObjects: knownObjects,
}
}
func NewEventQueueForStore(keyFunc cache.KeyFunc, knownObjects cache.KeyListerGetter) *EventQueue {
return &EventQueue{
DeltaFIFO: cache.NewDeltaFIFO(
keyFunc,
cache.DeltaCompressorFunc(func(d cache.Deltas) cache.Deltas {
return deltaCompressor(d, keyFunc)
}),
knownObjects),
}
}
func (queue *EventQueue) updateKnownObjects(delta cache.Delta) {
switch delta.Type {
case cache.Added:
queue.knownObjects.Add(delta.Object)
case cache.Updated:
queue.knownObjects.Update(delta.Object)
case cache.Sync:
if _, ok, _ := queue.knownObjects.Get(delta.Object); ok {
queue.knownObjects.Update(delta.Object)
} else {
queue.knownObjects.Add(delta.Object)
}
case cache.Deleted:
queue.knownObjects.Delete(delta.Object)
}
}
// Function should process one object delta, which represents a change notification
// for a single object. Function is passed the delta, which contains the
// changed object or the deleted final object state. The deleted final object
// state is extracted from the DeletedFinalStateUnknown passed by DeltaFIFO.
type ProcessEventFunc func(delta cache.Delta) error
// Process queued changes for an object. The 'process' function is called
// repeatedly with each available cache.Delta that describes state changes
// for that object. If the process function returns an error queued changes
// for that object are dropped but processing continues with the next available
// object's cache.Deltas. The error is logged with call stack information.
func (queue *EventQueue) Pop(process ProcessEventFunc, expectedType interface{}) (interface{}, error) {
return queue.DeltaFIFO.Pop(func(obj interface{}) error {
// Oldest to newest delta lists
for _, delta := range obj.(cache.Deltas) {
// Update private store to track object deletion
if queue.knownObjects != nil {
queue.updateKnownObjects(delta)
}
// Handle DeletedFinalStateUnknown delta objects
var err error
if expectedType != nil {
delta.Object, err = extractDeltaObject(delta, expectedType)
if err != nil {
utilruntime.HandleError(err)
return nil
}
}
// Process one delta for the object
if err = process(delta); err != nil {
utilruntime.HandleError(fmt.Errorf("event processing failed: %v", err))
return nil
}
}
return nil
})
}
// Helper function to extract the object from a Delta (including special handling
// of DeletedFinalStateUnknown delta objects) and check its type against
// an expected type. The contained object is only returned if it matches the
// expected type, otherwise an error is returned.
func extractDeltaObject(delta cache.Delta, expectedType interface{}) (interface{}, error) {
deltaObject := delta.Object
if deleted, ok := deltaObject.(cache.DeletedFinalStateUnknown); ok {
deltaObject = deleted.Obj
}
if reflect.TypeOf(deltaObject) != reflect.TypeOf(expectedType) {
return nil, fmt.Errorf("event processing failed: got delta object type %T but wanted type %T", deltaObject, expectedType)
}
return deltaObject, nil
}
// Describes the action to take for a given combination of deltas
type actionType string
const (
// The delta combination should result in the delta being added to the compressor cache
actionAdd actionType = "ADD"
// The delta combination should should be compressed into a single delta
actionCompress actionType = "COMPRESS"
// The delta combination should result in the object being deleted from the compressor cache
actionDelete actionType = "DELETE"
)
type deltaAction struct {
// The action to take for the delta combination
action actionType
// The type for the new compressed delta
deltaType cache.DeltaType
}
// The delta combination action matrix defines the valid delta sequences and
// how to compress specific combinations of deltas.
//
// A delta combination that produces an invalid sequence results in a panic.
var deltaActionMatrix = map[cache.DeltaType]map[cache.DeltaType]deltaAction{
cache.Added: {
cache.Sync: {actionCompress, cache.Added},
cache.Updated: {actionCompress, cache.Added},
cache.Deleted: {actionDelete, cache.Deleted},
},
cache.Sync: {
cache.Sync: {actionCompress, cache.Sync},
cache.Updated: {actionCompress, cache.Sync},
cache.Deleted: {actionCompress, cache.Deleted},
},
cache.Updated: {
cache.Updated: {actionCompress, cache.Updated},
cache.Deleted: {actionCompress, cache.Deleted},
},
cache.Deleted: {
cache.Added: {actionCompress, cache.Updated},
cache.Sync: {actionCompress, cache.Sync},
},
}
func removeDeltasWithKey(deltas cache.Deltas, removeKey string, keyFunc cache.KeyFunc) cache.Deltas {
newDeltas := cache.Deltas{}
for _, d := range deltas {
key, err := keyFunc(d.Object)
if err == nil && key != removeKey {
newDeltas = append(newDeltas, d)
}
}
return newDeltas
}
// This DeltaFIFO compressor combines deltas for the same object, the exact
// compression semantics of which are as follows:
//
// 1. If a cache.Added/cache.Sync is enqueued with state X and a cache.Updated with state Y
// is received, these are compressed into (Added/Sync, Y)
//
// 2. If a cache.Added is enqueued with state X and a cache.Deleted is received with state Y,
// these are dropped and consumers will not see either event
//
// 3. If a cache.Sync/cache.Updated is enqueued with state X and a cache.Deleted
// is received with state Y, these are compressed into (Deleted, Y)
//
// 4. If a cache.Updated is enqueued with state X and a cache.Updated with state Y is received,
// these two events are compressed into (Updated, Y)
//
// 5. If a cache.Added is enqueued with state X and a cache.Sync with state Y is received,
// these are compressed into (Added, Y)
//
// 6. If a cache.Sync is enqueued with state X and a cache.Sync with state Y is received,
// these are compressed into (Sync, Y)
//
// 7. Invalid combinations (eg, Sync + Added or Updated + Added) result in a panic.
//
// This function will compress all events for the same object into a single delta.
func deltaCompressor(deltas cache.Deltas, keyFunc cache.KeyFunc) cache.Deltas {
// Final compressed deltas list
newDeltas := cache.Deltas{}
// Cache of object's current state including previous deltas
objects := make(map[string]cache.DeltaType)
// Deltas range from oldest (index 0) to newest (last index)
for _, d := range deltas {
key, err := keyFunc(d.Object)
if err != nil {
panic(fmt.Sprintf("unkeyable object: %v, %v", d.Object, err))
}
var compressAction deltaAction
if oldType, ok := objects[key]; !ok {
compressAction = deltaAction{actionAdd, d.Type}
} else {
// Older event exists; combine them
compressAction, ok = deltaActionMatrix[oldType][d.Type]
if !ok {
panic(fmt.Sprintf("invalid state transition: %v -> %v", oldType, d.Type))
}
}
switch compressAction.action {
case actionAdd:
newDeltas = append(newDeltas, d)
objects[key] = d.Type
case actionCompress:
newDelta := cache.Delta{
Type: compressAction.deltaType,
Object: d.Object,
}
objects[key] = newDelta.Type
newDeltas = removeDeltasWithKey(newDeltas, key, keyFunc)
newDeltas = append(newDeltas, newDelta)
case actionDelete:
delete(objects, key)
newDeltas = removeDeltasWithKey(newDeltas, key, keyFunc)
}
}
return newDeltas
}