package controller import ( "k8s.io/kubernetes/pkg/api/unversioned" kcache "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/util/flowcontrol" utilwait "k8s.io/kubernetes/pkg/util/wait" ) // RunnableController is a controller which implements a Run loop. type RunnableController interface { // Run starts the asynchronous controller loop. Run() } // StoppableController is a controller which implements a Run loop. type StoppableController interface { // RunUntil starts the asynchronous controller loop, which runs until // ch is closed. RunUntil(ch <-chan struct{}) } // RetryController is a RunnableController which delegates resource // handling to a function and knows how to safely manage retries of a resource // which failed to be successfully handled. type RetryController struct { // Queue is where work is retrieved for Handle. Queue // Handle is expected to process the next resource from the queue. Handle func(interface{}) error // RetryManager is fed the handled resource if Handle returns a Retryable // error. If Handle returns no error, the RetryManager is asked to forget // the resource. RetryManager } // Queue is a narrow abstraction of a cache.FIFO. type Queue interface { Pop(kcache.PopProcessFunc) (interface{}, error) } // Run begins processing resources from Queue asynchronously. func (c *RetryController) Run() { go utilwait.Forever(c.pop, 0) } // RunUntil begins processing resources from Queue asynchronously until stopCh is closed. func (c *RetryController) RunUntil(stopCh <-chan struct{}) { go utilwait.Until(c.pop, 0, stopCh) } // pop removes the next item from the stack and handles it. If Handle returns // a retryable error, the handled resource is passed to the RetryManager. If // no error is returned from Handle, the RetryManager is asked to forget the // processed resource. // TODO: Pop holds the lock on the queue, which means we can't AddIfNotPresent // from within Handle. We need to fix that. func (c *RetryController) pop() { resource, err := c.Queue.Pop(c.Handle) if err != nil { c.Retry(resource, err) return } c.Forget(resource) } // handleOne processes resource with Handle. If Handle returns a retryable // error, the handled resource is passed to the RetryManager. If no error is // returned from Handle, the RetryManager is asked to forget the processed // resource. func (c *RetryController) handleOne(resource interface{}) { if err := c.Handle(resource); err != nil { c.Retry(resource, err) return } c.Forget(resource) } // RetryManager knows how to retry processing of a resource, and how to forget // a resource it may be tracking the state of. type RetryManager interface { // Retry will cause resource processing to be retried (for example, by // requeueing resource) Retry(resource interface{}, err error) // Forget will cause the manager to erase all prior knowledge of resource // and reclaim internal resources associated with state tracking of // resource. Forget(resource interface{}) } // RetryFunc should return true if the given object and error should be retried after // the provided number of times. type RetryFunc func(obj interface{}, err error, retries Retry) bool // RetryNever is a RetryFunc implementation that will never retry func RetryNever(obj interface{}, err error, retries Retry) bool { return false } // RetryNever is a RetryFunc implementation that will always retry func RetryAlways(obj interface{}, err error, retries Retry) bool { return true } // QueueRetryManager retries a resource by re-queueing it into a ReQueue as long as // retryFunc returns true. type QueueRetryManager struct { // queue is where resources are re-queued. queue ReQueue // keyFunc is used to index resources. keyFunc kcache.KeyFunc // retryFunc returns true if the resource and error returned should be retried. retryFunc RetryFunc // retries maps resources to their current retry retries map[string]Retry // limits how fast retries can be enqueued to ensure you can't tight // loop on retries. limiter flowcontrol.RateLimiter } // Retry describes provides additional information regarding retries. type Retry struct { // Count is the number of retries Count int // StartTimestamp is retry start timestamp StartTimestamp unversioned.Time } // ReQueue is a queue that allows an object to be requeued type ReQueue interface { Queue AddIfNotPresent(interface{}) error } // NewQueueRetryManager safely creates a new QueueRetryManager. func NewQueueRetryManager(queue ReQueue, keyFn kcache.KeyFunc, retryFn RetryFunc, limiter flowcontrol.RateLimiter) *QueueRetryManager { return &QueueRetryManager{ queue: queue, keyFunc: keyFn, retryFunc: retryFn, retries: make(map[string]Retry), limiter: limiter, } } // Retry will enqueue resource until retryFunc returns false for that resource has been // exceeded, at which point resource will be forgotten and no longer retried. The current // retry count will be passed to each invocation of retryFunc. func (r *QueueRetryManager) Retry(resource interface{}, err error) { id, _ := r.keyFunc(resource) if _, exists := r.retries[id]; !exists { r.retries[id] = Retry{0, unversioned.Now()} } tries := r.retries[id] if r.retryFunc(resource, err, tries) { r.limiter.Accept() // It's important to use AddIfNotPresent to prevent overwriting newer // state in the queue which may have arrived asynchronously. r.queue.AddIfNotPresent(resource) tries.Count = tries.Count + 1 r.retries[id] = tries } else { r.Forget(resource) } } // Forget resets the retry count for resource. func (r *QueueRetryManager) Forget(resource interface{}) { id, _ := r.keyFunc(resource) delete(r.retries, id) }