package controller
import (
kcache "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
kutil "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
// RunnableController is a controller which implements a Run loop.
type RunnableController interface {
// Run starts the asynchronous controller loop.
Run()
}
// RetryController is a RunnableController which delegates resource
// handling to a function and knows how to safely manage retries of a resource
// which failed to be successfully handled.
type RetryController struct {
// Queue is where work is retrieved for Handle.
Queue
// Handle is expected to process the next resource from the queue.
Handle func(interface{}) error
// RetryManager is fed the handled resource if Handle returns a Retryable
// error. If Handle returns no error, the RetryManager is asked to forget
// the resource.
RetryManager
}
// Queue is a narrow abstraction of a cache.FIFO.
type Queue interface {
Pop() interface{}
}
// Run begins processing resources from Queue asynchronously.
func (c *RetryController) Run() {
go kutil.Forever(func() { c.handleOne(c.Queue.Pop()) }, 0)
}
// RunUntil begins processing resources from Queue asynchronously until stopCh is closed.
func (c *RetryController) RunUntil(stopCh <-chan struct{}) {
go kutil.Until(func() { c.handleOne(c.Queue.Pop()) }, 0, stopCh)
}
// handleOne processes resource with Handle. If Handle returns a retryable
// error, the handled resource is passed to the RetryManager. If no error is
// returned from Handle, the RetryManager is asked to forget the processed
// resource.
func (c *RetryController) handleOne(resource interface{}) {
if err := c.Handle(resource); err != nil {
c.Retry(resource, err)
return
}
c.RetryManager.Forget(resource)
}
// RetryManager knows how to retry processing of a resource, and how to forget
// a resource it may be tracking the state of.
type RetryManager interface {
// Retry will cause resource processing to be retried (for example, by
// requeueing resource)
Retry(resource interface{}, err error)
// Forget will cause the manager to erase all prior knowledge of resource
// and reclaim internal resources associated with state tracking of
// resource.
Forget(resource interface{})
}
// RetryFunc should return true if the given object and error should be retried after
// the provided number of times.
type RetryFunc func(obj interface{}, err error, retries int) bool
// QueueRetryManager retries a resource by re-queueing it into a ReQueue as long as
// retryFunc returns true.
type QueueRetryManager struct {
// queue is where resources are re-queued.
queue ReQueue
// keyFunc is used to index resources.
keyFunc kcache.KeyFunc
// retryFunc returns true if the resource and error returned should be retried.
retryFunc RetryFunc
// retries maps resources to their current retry count.
retries map[string]int
}
// ReQueue is a queue that allows an object to be requeued
type ReQueue interface {
Queue
AddIfNotPresent(interface{}) error
}
// NewQueueRetryManager safely creates a new QueueRetryManager.
func NewQueueRetryManager(queue ReQueue, keyFn kcache.KeyFunc, retryFn RetryFunc) *QueueRetryManager {
return &QueueRetryManager{
queue: queue,
keyFunc: keyFn,
retryFunc: retryFn,
retries: make(map[string]int),
}
}
// Retry will enqueue resource until retryFunc returns false for that resource has been
// exceeded, at which point resource will be forgotten and no longer retried. The current
// retry count will be passed to each invocation of retryFunc.
func (r *QueueRetryManager) Retry(resource interface{}, err error) {
id, _ := r.keyFunc(resource)
if _, exists := r.retries[id]; !exists {
r.retries[id] = 0
}
tries := r.retries[id]
if r.retryFunc(resource, err, tries) {
// It's important to use AddIfNotPresent to prevent overwriting newer
// state in the queue which may have arrived asynchronously.
r.queue.AddIfNotPresent(resource)
r.retries[id] = tries + 1
} else {
r.Forget(resource)
}
}
// Forget resets the retry count for resource.
func (r *QueueRetryManager) Forget(resource interface{}) {
id, _ := r.keyFunc(resource)
delete(r.retries, id)
}