package controller

import (
	"fmt"
	"sync"
	"testing"

	kcache "k8s.io/kubernetes/pkg/client/cache"
	"k8s.io/kubernetes/pkg/util/flowcontrol"
)

func TestRetryController_handleOneRetryableError(t *testing.T) {
	retried := false
	retryErr := fmt.Errorf("retryable error")

	controller := &RetryController{
		Handle: func(obj interface{}) error {
			return retryErr
		},
		RetryManager: &testRetryManager{
			RetryFunc: func(resource interface{}, err error) {
				if err != retryErr {
					t.Fatalf("unexpected error: %v", err)
				}
				retried = true
			},
			ForgetFunc: func(resource interface{}) {
				t.Fatalf("unexpected call to forget %v", resource)
			},
		},
	}

	controller.handleOne(struct{}{})

	if !retried {
		t.Fatalf("expected a retry")
	}
}

func TestRetryController_handleOneNoError(t *testing.T) {
	forgotten := false

	controller := &RetryController{
		Handle: func(obj interface{}) error {
			return nil
		},
		RetryManager: &testRetryManager{
			RetryFunc: func(resource interface{}, err error) {
				t.Fatalf("unexpected call to retry %v", resource)
			},
			ForgetFunc: func(resource interface{}) {
				forgotten = true
			},
		},
	}

	controller.handleOne(struct{}{})

	if !forgotten {
		t.Fatalf("expected to forget")
	}
}

func TestQueueRetryManager_retries(t *testing.T) {
	retries := 5
	requeued := map[string]int{}

	manager := &QueueRetryManager{
		queue: &testFifo{
			// Track re-queues
			AddIfNotPresentFunc: func(obj interface{}) error {
				id := obj.(testObj).id
				if _, exists := requeued[id]; !exists {
					requeued[id] = 0
				}
				requeued[id] = requeued[id] + 1
				return nil
			},
		},
		keyFunc: func(obj interface{}) (string, error) {
			return obj.(testObj).id, nil
		},
		retryFunc: func(obj interface{}, err error, r Retry) bool {
			return r.Count < 5 && !r.StartTimestamp.IsZero()
		},
		retries: make(map[string]Retry),
		limiter: flowcontrol.NewTokenBucketRateLimiter(1000, 1000),
	}

	objects := []testObj{
		{"a", 1},
		{"b", 2},
		{"c", 3},
	}

	// Retry one more than the max
	for _, obj := range objects {
		for i := 0; i < retries+1; i++ {
			manager.Retry(obj, nil)
		}
	}

	// Should only have re-queued up to the max retry setting
	for _, obj := range objects {
		if e, a := retries, requeued[obj.id]; e != a {
			t.Fatalf("expected requeue count %d for obj %s, got %d", e, obj.id, a)
		}
	}

	// Should have no more state since all objects were retried beyond max
	if e, a := 0, len(manager.retries); e != a {
		t.Fatalf("expected retry len %d, got %d", e, a)
	}
}

// This test ensures that when an asynchronous state update is received
// on the queue during failed event handling, that the updated state is
// retried, NOT the event that failed (which is now stale).
func TestRetryController_realFifoEventOrdering(t *testing.T) {
	keyFunc := func(obj interface{}) (string, error) {
		return obj.(testObj).id, nil
	}

	fifo := kcache.NewFIFO(keyFunc)

	wg := sync.WaitGroup{}
	wg.Add(1)

	controller := &RetryController{
		Queue:        fifo,
		RetryManager: NewQueueRetryManager(fifo, keyFunc, func(_ interface{}, _ error, _ Retry) bool { return true }, flowcontrol.NewTokenBucketRateLimiter(1000, 10)),
		Handle: func(obj interface{}) error {
			if e, a := 1, obj.(testObj).value; e != a {
				t.Fatalf("expected to handle test value %d, got %d", e, a)
			}

			go func() {
				fifo.Add(testObj{"a", 2})
				wg.Done()
			}()
			wg.Wait()
			return fmt.Errorf("retryable error")
		},
	}

	fifo.Add(testObj{"a", 1})
	controller.handleOne(kcache.Pop(fifo))

	if e, a := 1, len(fifo.List()); e != a {
		t.Fatalf("expected queue length %d, got %d", e, a)
	}

	obj := kcache.Pop(fifo)
	if e, a := 2, obj.(testObj).value; e != a {
		t.Fatalf("expected queued value %d, got %d", e, a)
	}
}

// This test ensures that when events are retried, the
// requeue rate does not exceed the configured rate limit,
// including burst behavior.
func TestRetryController_ratelimit(t *testing.T) {
	keyFunc := func(obj interface{}) (string, error) {
		return "key", nil
	}
	fifo := kcache.NewFIFO(keyFunc)
	limiter := &mockLimiter{}
	retryManager := NewQueueRetryManager(fifo,
		keyFunc,
		func(_ interface{}, _ error, r Retry) bool {
			return r.Count < 15
		},
		limiter,
	)
	for i := 0; i < 10; i++ {
		retryManager.Retry("key", nil)
	}

	if limiter.count != 10 {
		t.Fatalf("Retries did not invoke rate limiter, expected %d got %d", 10, limiter.count)
	}
}

type mockLimiter struct {
	count int
}

func (l *mockLimiter) TryAccept() bool {
	return true
}

func (l *mockLimiter) Accept() {
	l.count++
}

func (l *mockLimiter) Stop() {}

func (t *mockLimiter) Saturation() float64 {
	return float64(0.0)
}

type testObj struct {
	id    string
	value int
}

type testFifo struct {
	AddIfNotPresentFunc func(interface{}) error
	PopFunc             func() interface{}
}

func (t *testFifo) AddIfNotPresent(obj interface{}) error {
	return t.AddIfNotPresentFunc(obj)
}

func (t *testFifo) Pop(fn kcache.PopProcessFunc) (interface{}, error) {
	obj := t.PopFunc()
	err := fn(obj)
	return obj, err
}

type testRetryManager struct {
	RetryFunc  func(resource interface{}, err error)
	ForgetFunc func(resource interface{})
}

func (m *testRetryManager) Retry(resource interface{}, err error) {
	m.RetryFunc(resource, err)
}

func (m *testRetryManager) Forget(resource interface{}) {
	m.ForgetFunc(resource)
}