package plugin

import (
	"fmt"
	"reflect"

	"k8s.io/kubernetes/pkg/client/cache"
	utilruntime "k8s.io/kubernetes/pkg/util/runtime"
)

// EventQueue is an enhanced DeltaFIFO that provides reliable Deleted deltas
// even if no knownObjects store is given, and compresses multiple deltas
// to reduce duplicate events.
//
// Without a store, DeltaFIFO will drop Deleted deltas when its queue is empty
// because the deleted object is not present in the queue and DeltaFIFO tries
// to protect against duplicate Deleted deltas resulting from Replace().
//
// To get reliable deletion, a store must be provided, and EventQueue provides
// one if the caller does not.
type EventQueue struct {
	*cache.DeltaFIFO

	// Private store if not intitialized with one to ensure deletion
	// events are always recognized.
	knownObjects cache.Store
}

func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) {
	if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
		return d.Key, nil
	}
	return cache.MetaNamespaceKeyFunc(obj)
}

func NewEventQueue(keyFunc cache.KeyFunc) *EventQueue {
	knownObjects := cache.NewStore(keyFunc)
	return &EventQueue{
		DeltaFIFO: cache.NewDeltaFIFO(
			keyFunc,
			cache.DeltaCompressorFunc(func(d cache.Deltas) cache.Deltas {
				return deltaCompressor(d, keyFunc)
			}),
			knownObjects),
		knownObjects: knownObjects,
	}
}

func NewEventQueueForStore(keyFunc cache.KeyFunc, knownObjects cache.KeyListerGetter) *EventQueue {
	return &EventQueue{
		DeltaFIFO: cache.NewDeltaFIFO(
			keyFunc,
			cache.DeltaCompressorFunc(func(d cache.Deltas) cache.Deltas {
				return deltaCompressor(d, keyFunc)
			}),
			knownObjects),
	}
}

func (queue *EventQueue) updateKnownObjects(delta cache.Delta) {
	switch delta.Type {
	case cache.Added:
		queue.knownObjects.Add(delta.Object)
	case cache.Updated:
		queue.knownObjects.Update(delta.Object)
	case cache.Sync:
		if _, ok, _ := queue.knownObjects.Get(delta.Object); ok {
			queue.knownObjects.Update(delta.Object)
		} else {
			queue.knownObjects.Add(delta.Object)
		}
	case cache.Deleted:
		queue.knownObjects.Delete(delta.Object)
	}
}

// Function should process one object delta, which represents a change notification
// for a single object. Function is passed the delta, which contains the
// changed object or the deleted final object state. The deleted final object
// state is extracted from the DeletedFinalStateUnknown passed by DeltaFIFO.
type ProcessEventFunc func(delta cache.Delta) error

// Process queued changes for an object.  The 'process' function is called
// repeatedly with each available cache.Delta that describes state changes
// for that object. If the process function returns an error queued changes
// for that object are dropped but processing continues with the next available
// object's cache.Deltas.  The error is logged with call stack information.
func (queue *EventQueue) Pop(process ProcessEventFunc, expectedType interface{}) (interface{}, error) {
	return queue.DeltaFIFO.Pop(func(obj interface{}) error {
		// Oldest to newest delta lists
		for _, delta := range obj.(cache.Deltas) {
			// Update private store to track object deletion
			if queue.knownObjects != nil {
				queue.updateKnownObjects(delta)
			}

			// Handle DeletedFinalStateUnknown delta objects
			var err error
			if expectedType != nil {
				delta.Object, err = extractDeltaObject(delta, expectedType)
				if err != nil {
					utilruntime.HandleError(err)
					return nil
				}
			}

			// Process one delta for the object
			if err = process(delta); err != nil {
				utilruntime.HandleError(fmt.Errorf("event processing failed: %v", err))
				return nil
			}
		}
		return nil
	})
}

// Helper function to extract the object from a Delta (including special handling
// of DeletedFinalStateUnknown delta objects) and check its type against
// an expected type.  The contained object is only returned if it matches the
// expected type, otherwise an error is returned.
func extractDeltaObject(delta cache.Delta, expectedType interface{}) (interface{}, error) {
	deltaObject := delta.Object
	if deleted, ok := deltaObject.(cache.DeletedFinalStateUnknown); ok {
		deltaObject = deleted.Obj
	}
	if reflect.TypeOf(deltaObject) != reflect.TypeOf(expectedType) {
		return nil, fmt.Errorf("event processing failed: got delta object type %T but wanted type %T", deltaObject, expectedType)
	}
	return deltaObject, nil
}

// Describes the action to take for a given combination of deltas
type actionType string

const (
	// The delta combination should result in the delta being added to the compressor cache
	actionAdd actionType = "ADD"

	// The delta combination should should be compressed into a single delta
	actionCompress actionType = "COMPRESS"

	// The delta combination should result in the object being deleted from the compressor cache
	actionDelete actionType = "DELETE"
)

type deltaAction struct {
	// The action to take for the delta combination
	action actionType
	// The type for the new compressed delta
	deltaType cache.DeltaType
}

// The delta combination action matrix defines the valid delta sequences and
// how to compress specific combinations of deltas.
//
// A delta combination that produces an invalid sequence results in a panic.
var deltaActionMatrix = map[cache.DeltaType]map[cache.DeltaType]deltaAction{
	cache.Added: {
		cache.Sync:    {actionCompress, cache.Added},
		cache.Updated: {actionCompress, cache.Added},
		cache.Deleted: {actionDelete, cache.Deleted},
	},
	cache.Sync: {
		cache.Sync:    {actionCompress, cache.Sync},
		cache.Updated: {actionCompress, cache.Sync},
		cache.Deleted: {actionCompress, cache.Deleted},
	},
	cache.Updated: {
		cache.Updated: {actionCompress, cache.Updated},
		cache.Deleted: {actionCompress, cache.Deleted},
	},
	cache.Deleted: {
		cache.Added: {actionCompress, cache.Updated},
		cache.Sync:  {actionCompress, cache.Sync},
	},
}

func removeDeltasWithKey(deltas cache.Deltas, removeKey string, keyFunc cache.KeyFunc) cache.Deltas {
	newDeltas := cache.Deltas{}
	for _, d := range deltas {
		key, err := keyFunc(d.Object)
		if err == nil && key != removeKey {
			newDeltas = append(newDeltas, d)
		}
	}
	return newDeltas
}

// This DeltaFIFO compressor combines deltas for the same object, the exact
// compression semantics of which are as follows:
//
// 1.  If a cache.Added/cache.Sync is enqueued with state X and a cache.Updated with state Y
//     is received, these are compressed into (Added/Sync, Y)
//
// 2.  If a cache.Added is enqueued with state X and a cache.Deleted is received with state Y,
//     these are dropped and consumers will not see either event
//
// 3.  If a cache.Sync/cache.Updated is enqueued with state X and a cache.Deleted
//     is received with state Y, these are compressed into (Deleted, Y)
//
// 4.  If a cache.Updated is enqueued with state X and a cache.Updated with state Y is received,
//     these two events are compressed into (Updated, Y)
//
// 5.  If a cache.Added is enqueued with state X and a cache.Sync with state Y is received,
//     these are compressed into (Added, Y)
//
// 6.  If a cache.Sync is enqueued with state X and a cache.Sync with state Y is received,
//     these are compressed into (Sync, Y)
//
// 7.  Invalid combinations (eg, Sync + Added or Updated + Added) result in a panic.
//
// This function will compress all events for the same object into a single delta.
func deltaCompressor(deltas cache.Deltas, keyFunc cache.KeyFunc) cache.Deltas {
	// Final compressed deltas list
	newDeltas := cache.Deltas{}

	// Cache of object's current state including previous deltas
	objects := make(map[string]cache.DeltaType)

	// Deltas range from oldest (index 0) to newest (last index)
	for _, d := range deltas {
		key, err := keyFunc(d.Object)
		if err != nil {
			panic(fmt.Sprintf("unkeyable object: %v, %v", d.Object, err))
		}

		var compressAction deltaAction
		if oldType, ok := objects[key]; !ok {
			compressAction = deltaAction{actionAdd, d.Type}
		} else {
			// Older event exists; combine them
			compressAction, ok = deltaActionMatrix[oldType][d.Type]
			if !ok {
				panic(fmt.Sprintf("invalid state transition: %v -> %v", oldType, d.Type))
			}
		}

		switch compressAction.action {
		case actionAdd:
			newDeltas = append(newDeltas, d)
			objects[key] = d.Type
		case actionCompress:
			newDelta := cache.Delta{
				Type:   compressAction.deltaType,
				Object: d.Object,
			}
			objects[key] = newDelta.Type
			newDeltas = removeDeltasWithKey(newDeltas, key, keyFunc)
			newDeltas = append(newDeltas, newDelta)
		case actionDelete:
			delete(objects, key)
			newDeltas = removeDeltasWithKey(newDeltas, key, keyFunc)
		}
	}

	return newDeltas
}