package cache

import (
	"fmt"
	"sync"

	kcache "k8s.io/kubernetes/pkg/client/cache"
	"k8s.io/kubernetes/pkg/util/sets"
	"k8s.io/kubernetes/pkg/watch"
)

// EventQueue is a Store implementation that provides a sequence of compressed events to a consumer
// along with event types.  This differs from the FIFO implementation in that FIFO does not provide
// events when an object is deleted and does not provide the type of event.  Events are compressed
// in a manner similar to FIFO, but accounting for event types and deletions.  The exact
// compression semantics are as follows:
//
// 1.  If a watch.Added is enqueued with state X and a watch.Modified with state Y is received,
//     these are compressed into (Added, Y)
//
// 2.  If a watch.Added is enqueued with state X and a watch.Deleted is received, these are
//     compressed and the item is removed from the queue
//
// 3.  If a watch.Modified is enqueued with state X and a watch.Modified with state Y is received,
//     these two events are compressed into (Modified, Y)
//
// 4.  If a watch.Modified is enqueued with state X and a watch.Deleted is received, these are
//     compressed into (Deleted, X)
//
// It should be noted that the scenario where an object is deleted and re-added is not handled by
// this type nor is it in scope; the reflector uses UIDs for the IDs passed to stores, so you will
// never see a delete and a re-add for the same ID.
//
// This type maintains a backing store in order to provide the deleted state on watch.Deleted
// events.  This is necessary because the Store API does not receive the deleted state on a
// watch.Deleted event (though this state is delivered by the watch API itself, it is not passed on
// to the reflector Store).
type EventQueue struct {
	lock   sync.RWMutex
	cond   sync.Cond
	store  kcache.Store
	keyFn  kcache.KeyFunc
	events map[string]watch.EventType
	queue  []string
	// Tracks the last key added to the queue by the most recent call
	// to Replace().  A reflector replaces the queue contents on a
	// re-list by calling Replace() and the compression algorithm does
	// not apply to those items, so a non-empty key is valid until the
	// item it refers to is explicitly deleted from the store or the
	// event is read via Pop().
	lastReplaceKey string
}

// EventQueue implements kcache.Store
var _ kcache.Store = &EventQueue{}

// Describes the effect of processing a watch event on the event queue's state.
type watchEventEffect string

type EventQueueStopped struct{}

const (
	// The watch event should result in an add to the event queue
	watchEventEffectAdd watchEventEffect = "ADD"

	// The watch event should be compressed with an already enqueued event
	watchEventEffectCompress watchEventEffect = "COMPRESS"

	// The watch event should result in the ID being deleted from the queue
	watchEventEffectDelete watchEventEffect = "DELETE"
)

// The watch event effect matrix defines the valid event sequences and what their effects are on
// the state of the event queue.
//
// A watch event that produces an invalid sequence results in a panic.
var watchEventEffectMatrix = map[watch.EventType]map[watch.EventType]watchEventEffect{
	watch.Added: {
		watch.Modified: watchEventEffectCompress,
		watch.Deleted:  watchEventEffectDelete,
	},
	watch.Modified: {
		watch.Modified: watchEventEffectCompress,
		watch.Deleted:  watchEventEffectCompress,
	},
	watch.Deleted: {},
}

// The watch event compression matrix defines how two events should be compressed.
var watchEventCompressionMatrix = map[watch.EventType]map[watch.EventType]watch.EventType{
	watch.Added: {
		watch.Modified: watch.Added,
	},
	watch.Modified: {
		watch.Modified: watch.Modified,
		watch.Deleted:  watch.Deleted,
	},
	watch.Deleted: {},
}

func (es EventQueueStopped) Error() string {
	return fmt.Sprintf("Event queue was stopped.")
}

// handleEvent is called by Add, Update, and Delete to determine the effect
// of an event of the queue, realize that effect, and update the underlying store.
func (eq *EventQueue) handleEvent(obj interface{}, newEventType watch.EventType) error {
	key, err := eq.keyFn(obj)
	if err != nil {
		return err
	}

	eq.lock.Lock()
	defer eq.lock.Unlock()

	var (
		queuedEventType watch.EventType
		effect          watchEventEffect
		ok              bool
	)

	queuedEventType, ok = eq.events[key]
	if !ok {
		effect = watchEventEffectAdd
	} else {
		effect, ok = watchEventEffectMatrix[queuedEventType][newEventType]
		if !ok {
			panic(fmt.Sprintf("Invalid state transition: %v -> %v", queuedEventType, newEventType))
		}
	}

	if err := eq.updateStore(key, obj, newEventType); err != nil {
		return err
	}

	switch effect {
	case watchEventEffectAdd:
		eq.events[key] = newEventType
		eq.queue = append(eq.queue, key)
		eq.cond.Broadcast()
	case watchEventEffectCompress:
		newEventType, ok := watchEventCompressionMatrix[queuedEventType][newEventType]
		if !ok {
			panic(fmt.Sprintf("Invalid state transition: %v -> %v", queuedEventType, newEventType))
		}

		eq.events[key] = newEventType
	case watchEventEffectDelete:
		delete(eq.events, key)
		eq.queue = eq.queueWithout(key)
	}
	return nil
}

// Cancel function to force Pop function to unblock
func (eq *EventQueue) Cancel() {
	eq.cond.Broadcast()
}

// updateStore updates the stored value for the given key.  Note that deletions are not handled
// here; they are performed in Pop in order to provide the deleted value on watch.Deleted events.
func (eq *EventQueue) updateStore(key string, obj interface{}, eventType watch.EventType) error {
	if eventType == watch.Deleted {
		return nil
	}

	var err error
	if eventType == watch.Added {
		err = eq.store.Add(obj)
	} else {
		err = eq.store.Update(obj)
	}
	return err
}

// queueWithout returns the internal queue minus the given key.
func (eq *EventQueue) queueWithout(key string) []string {
	rq := make([]string, 0)
	for _, qkey := range eq.queue {
		if qkey == key {
			continue
		}

		rq = append(rq, qkey)
	}

	return rq
}

// Add enqueues a watch.Added event for the given state.
func (eq *EventQueue) Add(obj interface{}) error {
	return eq.handleEvent(obj, watch.Added)
}

// Update enqueues a watch.Modified event for the given state.
func (eq *EventQueue) Update(obj interface{}) error {
	return eq.handleEvent(obj, watch.Modified)
}

// Delete enqueues a watch.Delete event for the given object.
func (eq *EventQueue) Delete(obj interface{}) error {
	return eq.handleEvent(obj, watch.Deleted)
}

// List returns a list of all enqueued items.
func (eq *EventQueue) List() []interface{} {
	eq.lock.RLock()
	defer eq.lock.RUnlock()

	list := make([]interface{}, 0, len(eq.queue))
	for _, key := range eq.queue {
		item, ok, err := eq.store.GetByKey(key)
		if err != nil {
			panic(fmt.Sprintf("Failure to get by key %q: %v", key, err))
		}
		if !ok {
			panic(fmt.Sprintf("Tried to list an ID not in backing store: %v", key))
		}
		list = append(list, item)
	}

	return list
}

// ListKeys returns all enqueued keys.
func (eq *EventQueue) ListKeys() []string {
	eq.lock.RLock()
	defer eq.lock.RUnlock()

	list := make([]string, 0, len(eq.queue))
	copy(list, eq.queue)
	return list
}

// ContainedIDs returns a sets.String containing all IDs of the enqueued items.
// This is a snapshot of a moment in time, and one should keep in mind that
// other go routines can add or remove items after you call this.
func (eq *EventQueue) ContainedIDs() sets.String {
	eq.lock.RLock()
	defer eq.lock.RUnlock()

	s := sets.String{}
	for _, key := range eq.queue {
		s.Insert(key)
	}

	return s
}

// Get returns the requested item, or sets exists=false.
func (eq *EventQueue) Get(obj interface{}) (item interface{}, exists bool, err error) {
	key, err := eq.keyFn(obj)
	if err != nil {
		return nil, false, err
	}
	return eq.GetByKey(key)
}

// GetByKey returns the requested item, or sets exists=false.
func (eq *EventQueue) GetByKey(key string) (item interface{}, exists bool, err error) {
	eq.lock.RLock()
	defer eq.lock.RUnlock()

	_, ok := eq.events[key]
	if !ok {
		return nil, false, nil
	}

	return eq.store.GetByKey(key) // Should always be populated and succeed
}

// Pop gets the event and object at the head of the queue.  If the event
// is a delete event, Pop deletes the key from the underlying cache.
func (eq *EventQueue) Pop() (watch.EventType, interface{}, error) {
	eq.lock.Lock()
	defer eq.lock.Unlock()

	for {
		for len(eq.queue) == 0 {
			eq.cond.Wait()
		}

		if len(eq.queue) == 0 {
			return watch.Error, nil, EventQueueStopped{}
		}
		key := eq.queue[0]
		eq.queue = eq.queue[1:]

		eventType := eq.events[key]
		delete(eq.events, key)

		// Track the last replace key immediately after the store
		// state has been changed to prevent subsequent errors from
		// leaving a stale key.
		if eq.lastReplaceKey != "" && eq.lastReplaceKey == key {
			eq.lastReplaceKey = ""
		}

		obj, exists, err := eq.store.GetByKey(key) // Should always succeed
		if err != nil {
			return watch.Error, nil, err
		}
		if !exists {
			panic(fmt.Sprintf("Pop() of key not in store: %v", key))
		}

		if eventType == watch.Deleted {
			if err := eq.store.Delete(obj); err != nil {
				return watch.Error, nil, err
			}
		}

		return eventType, obj, nil
	}
}

// Replace initializes 'eq' with the state contained in the given map and
// populates the queue with a watch.Modified event for each of the replaced
// objects.  The backing store takes ownership of keyToObjs; you should not
// reference the map again after calling this function.
func (eq *EventQueue) Replace(objects []interface{}, resourceVersion string) error {
	eq.lock.Lock()
	defer eq.lock.Unlock()

	eq.events = map[string]watch.EventType{}
	eq.queue = eq.queue[:0]

	for i := range objects {
		key, err := eq.keyFn(objects[i])
		if err != nil {
			return err
		}
		eq.queue = append(eq.queue, key)
		eq.events[key] = watch.Modified
	}
	if err := eq.store.Replace(objects, resourceVersion); err != nil {
		return err
	}

	if len(eq.queue) > 0 {
		eq.lastReplaceKey = eq.queue[len(eq.queue)-1]
		eq.cond.Broadcast()
	} else {
		eq.lastReplaceKey = ""
	}
	return nil
}

// ListConsumed indicates whether the items queued by a List/Relist
// operation have been consumed.
func (eq *EventQueue) ListConsumed() bool {
	eq.lock.Lock()
	defer eq.lock.Unlock()

	return eq.lastReplaceKey == ""
}

// Resync will touch all objects to put them into the processing queue
func (eq *EventQueue) Resync() error {
	eq.lock.Lock()
	defer eq.lock.Unlock()

	inQueue := sets.NewString()
	for _, id := range eq.queue {
		inQueue.Insert(id)
	}

	for _, id := range eq.store.ListKeys() {
		if !inQueue.Has(id) {
			eq.queue = append(eq.queue, id)
		}
	}

	if len(eq.queue) > 0 {
		eq.cond.Broadcast()
	} else {
		eq.lastReplaceKey = ""
	}
	return nil
}

// NewEventQueue returns a new EventQueue.
func NewEventQueue(keyFn kcache.KeyFunc) *EventQueue {
	q := &EventQueue{
		store:  kcache.NewStore(keyFn),
		events: map[string]watch.EventType{},
		queue:  []string{},
		keyFn:  keyFn,
	}
	q.cond.L = &q.lock
	return q
}

// NewEventQueueForStore returns a new EventQueue that uses the provided store.
func NewEventQueueForStore(keyFn kcache.KeyFunc, store kcache.Store) *EventQueue {
	q := &EventQueue{
		store:  store,
		events: map[string]watch.EventType{},
		queue:  []string{},
		keyFn:  keyFn,
	}
	q.cond.L = &q.lock
	return q
}