package ratelimiter

import (
	kcache "k8s.io/kubernetes/pkg/client/cache"
	"k8s.io/kubernetes/pkg/util/flowcontrol"
	utilruntime "k8s.io/kubernetes/pkg/util/runtime"
	utilwait "k8s.io/kubernetes/pkg/util/wait"
)

// HandlerFunc defines function signature for a RateLimitedFunction.
type HandlerFunc func() error

// RateLimitedFunction is a rate limited function controlling how often the function/handler is invoked.
type RateLimitedFunction struct {
	// Handler is the function to rate limit calls to.
	Handler HandlerFunc

	// Internal queue of requests to be processed.
	queue kcache.Queue

	// Rate limiting configuration.
	flowcontrol.RateLimiter
}

// NewRateLimitedFunction creates a new rate limited function.
func NewRateLimitedFunction(keyFunc kcache.KeyFunc, interval int, handlerFunc HandlerFunc) *RateLimitedFunction {
	fifo := kcache.NewFIFO(keyFunc)

	qps := float32(1000.0) // Call rate per second (SLA).
	if interval > 0 {
		qps = float32(1.0 / float32(interval))
	}

	limiter := flowcontrol.NewTokenBucketRateLimiter(qps, 1)

	return &RateLimitedFunction{handlerFunc, fifo, limiter}
}

// RunUntil begins processes the resources from queue asynchronously until
// stopCh is closed.
func (rlf *RateLimitedFunction) RunUntil(stopCh <-chan struct{}) {
	go utilwait.Until(rlf.pop, 0, stopCh)
}

// handleOne processes a request in the queue invoking the rate limited
// function.
func (rlf *RateLimitedFunction) pop() {
	rlf.RateLimiter.Accept()
	if _, err := rlf.queue.Pop(func(_ interface{}) error {
		return rlf.Handler()
	}); err != nil {
		utilruntime.HandleError(err)
	}
}

// Invoke adds a request if its not already present and waits for the
// background processor to execute it.
func (rlf *RateLimitedFunction) Invoke(resource interface{}) {
	rlf.queue.AddIfNotPresent(resource)
}