package storage

import (

	utilruntime "k8s.io/kubernetes/pkg/util/runtime"


// CacherConfig contains the configuration for a given Cache.
type CacherConfig struct {
	// Maximum size of the history cached in memory.
	CacheCapacity int

	// An underlying storage.Interface.
	Storage Interface

	// An underlying storage.Versioner.
	Versioner Versioner

	// The Cache will be caching objects of a given Type and assumes that they
	// are all stored under ResourcePrefix directory in the underlying database.
	Type           interface{}
	ResourcePrefix string

	// KeyFunc is used to get a key in the underyling storage for a given object.
	KeyFunc func(runtime.Object) (string, error)

	// NewList is a function that creates new empty object storing a list of
	// objects of type Type.
	NewListFunc func() runtime.Object

// Cacher is responsible for serving WATCH and LIST requests for a given
// resource from its internal cache and updating its cache in the background
// based on the underlying storage contents.
// Cacher implements storage.Interface (although most of the calls are just
// delegated to the underlying storage).
type Cacher struct {

	// Each user-facing method that is not simply redirected to the underlying
	// storage has to read-lock on this mutex before starting any processing.
	// This is necessary to prevent users from accessing structures that are
	// uninitialized or are being repopulated right now.
	// NOTE: We cannot easily reuse the main mutex for it due to multi-threaded
	// interactions of Cacher with the underlying WatchCache. Since Cacher is
	// caling WatchCache directly and WatchCache is calling Cacher methods
	// via its OnEvent and OnReplace hooks, we explicitly assume that if mutexes
	// of both structures are held, the one from WatchCache is acquired first
	// to avoid deadlocks. Unfortunately, forcing this rule in startCaching
	// would be very difficult and introducing one more mutex seems to be much
	// easier.
	usable sync.RWMutex

	// Underlying storage.Interface.
	storage Interface

	// "sliding window" of recent changes of objects and the current state.
	watchCache *watchCache
	reflector  *cache.Reflector

	// Registered watchers.
	watcherIdx int
	watchers   map[int]*cacheWatcher

	// Versioner is used to handle resource versions.
	versioner Versioner

	// keyFunc is used to get a key in the underyling storage for a given object.
	keyFunc func(runtime.Object) (string, error)

	// Handling graceful termination.
	stopLock sync.RWMutex
	stopped  bool
	stopCh   chan struct{}
	stopWg   sync.WaitGroup

// Create a new Cacher responsible from service WATCH and LIST requests from its
// internal cache and updating its cache in the background based on the given
// configuration.
func NewCacher(
	storage Interface,
	capacity int,
	versioner Versioner,
	objectType runtime.Object,
	resourcePrefix string,
	scopeStrategy rest.NamespaceScopedStrategy,
	newListFunc func() runtime.Object) Interface {
	config := CacherConfig{
		CacheCapacity:  capacity,
		Storage:        storage,
		Versioner:      versioner,
		Type:           objectType,
		ResourcePrefix: resourcePrefix,
		NewListFunc:    newListFunc,
	if scopeStrategy.NamespaceScoped() {
		config.KeyFunc = func(obj runtime.Object) (string, error) {
			return NamespaceKeyFunc(resourcePrefix, obj)
	} else {
		config.KeyFunc = func(obj runtime.Object) (string, error) {
			return NoNamespaceKeyFunc(resourcePrefix, obj)
	return NewCacherFromConfig(config)

// Create a new Cacher responsible from service WATCH and LIST requests from its
// internal cache and updating its cache in the background based on the given
// configuration.
func NewCacherFromConfig(config CacherConfig) *Cacher {
	watchCache := newWatchCache(config.CacheCapacity)
	listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)

	// Give this error when it is constructed rather than when you get the
	// first watch item, because it's much easier to track down that way.
	if obj, ok := config.Type.(runtime.Object); ok {
		if err := runtime.CheckCodec(config.Storage.Codec(), obj); err != nil {
			panic("storage codec doesn't seem to match given type: " + err.Error())

	cacher := &Cacher{
		usable:     sync.RWMutex{},
		storage:    config.Storage,
		watchCache: watchCache,
		reflector:  cache.NewReflector(listerWatcher, config.Type, watchCache, 0),
		watcherIdx: 0,
		watchers:   make(map[int]*cacheWatcher),
		versioner:  config.Versioner,
		keyFunc:    config.KeyFunc,
		stopped:    false,
		// We need to (potentially) stop both:
		// - wait.Until go-routine
		// - reflector.ListAndWatch
		// and there are no guarantees on the order that they will stop.
		// So we will be simply closing the channel, and synchronizing on the WaitGroup.
		stopCh: make(chan struct{}),
		stopWg: sync.WaitGroup{},
	// See startCaching method for explanation and where this is unlocked.

	stopCh := cacher.stopCh
	go func() {
		defer cacher.stopWg.Done()
			func() {
				if !cacher.isStopped() {
			}, time.Second, stopCh,
	return cacher

func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
	// The 'usable' lock is always 'RLock'able when it is safe to use the cache.
	// It is safe to use the cache after a successful list until a disconnection.
	// We start with usable (write) locked. The below OnReplace function will
	// unlock it after a successful list. The below defer will then re-lock
	// it when this function exits (always due to disconnection), only if
	// we actually got a successful list. This cycle will repeat as needed.
	successfulList := false
	c.watchCache.SetOnReplace(func() {
		successfulList = true
	defer func() {
		if successfulList {

	// Note that since onReplace may be not called due to errors, we explicitly
	// need to retry it on errors under lock.
	// Also note that startCaching is called in a loop, so there's no need
	// to have another loop here.
	if err := c.reflector.ListAndWatch(stopChannel); err != nil {
		glog.Errorf("unexpected ListAndWatch error: %v", err)

// Implements storage.Interface.
func (c *Cacher) Backends(ctx context.Context) []string {
	return c.storage.Backends(ctx)

// Implements storage.Interface.
func (c *Cacher) Versioner() Versioner {
	return c.storage.Versioner()

// Implements storage.Interface.
func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
	return c.storage.Create(ctx, key, obj, out, ttl)

// Implements storage.Interface.
func (c *Cacher) Set(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
	return c.storage.Set(ctx, key, obj, out, ttl)

// Implements storage.Interface.
func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object) error {
	return c.storage.Delete(ctx, key, out)

// Implements storage.Interface.
func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error) {
	watchRV, err := ParseWatchResourceVersion(resourceVersion)
	if err != nil {
		return nil, err

	// Do NOT allow Watch to start when the underlying structures are not propagated.
	defer c.usable.RUnlock()

	// We explicitly use thread unsafe version and do locking ourself to ensure that
	// no new events will be processed in the meantime. The watchCache will be unlocked
	// on return from this function.
	// Note that we cannot do it under Cacher lock, to avoid a deadlock, since the
	// underlying watchCache is calling processEvent under its lock.
	defer c.watchCache.RUnlock()
	initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
	if err != nil {
		// To match the uncached watch implementation, once we have passed authn/authz/admission,
		// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
		// rather than a directly returned error.
		return newErrWatcher(err), nil

	defer c.Unlock()
	watcher := newCacheWatcher(watchRV, initEvents, filterFunction(key, c.keyFunc, filter), forgetWatcher(c, c.watcherIdx))
	c.watchers[c.watcherIdx] = watcher
	return watcher, nil

// Implements storage.Interface.
func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error) {
	return c.Watch(ctx, key, resourceVersion, filter)

// Implements storage.Interface.
func (c *Cacher) Get(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) error {
	return c.storage.Get(ctx, key, objPtr, ignoreNotFound)

// Implements storage.Interface.
func (c *Cacher) GetToList(ctx context.Context, key string, filter FilterFunc, listObj runtime.Object) error {
	return c.storage.GetToList(ctx, key, filter, listObj)

// Implements storage.Interface.
func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, filter FilterFunc, listObj runtime.Object) error {
	if resourceVersion == "" {
		// If resourceVersion is not specified, serve it from underlying
		// storage (for backward compatibility).
		return c.storage.List(ctx, key, resourceVersion, filter, listObj)

	// If resourceVersion is specified, serve it from cache.
	// It's guaranteed that the returned value is at least that
	// fresh as the given resourceVersion.

	listRV, err := ParseListResourceVersion(resourceVersion)
	if err != nil {
		return err

	// To avoid situation when List is processed before the underlying
	// watchCache is propagated for the first time, we acquire and immediately
	// release the 'usable' lock.
	// We don't need to hold it all the time, because watchCache is thread-safe
	// and it would complicate already very difficult locking pattern.

	// List elements from cache, with at least 'listRV'.
	listPtr, err := meta.GetItemsPtr(listObj)
	if err != nil {
		return err
	listVal, err := conversion.EnforcePtr(listPtr)
	if err != nil || listVal.Kind() != reflect.Slice {
		return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
	filterFunc := filterFunction(key, c.keyFunc, filter)

	objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV)
	if err != nil {
		return fmt.Errorf("failed to wait for fresh list: %v", err)
	for _, obj := range objs {
		object, ok := obj.(runtime.Object)
		if !ok {
			return fmt.Errorf("non runtime.Object returned from storage: %v", obj)
		if filterFunc(object) {
			listVal.Set(reflect.Append(listVal, reflect.ValueOf(object).Elem()))
	if c.versioner != nil {
		if err := c.versioner.UpdateList(listObj, readResourceVersion); err != nil {
			return err
	return nil

// Implements storage.Interface.
func (c *Cacher) GuaranteedUpdate(ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate UpdateFunc) error {
	return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, tryUpdate)

// Implements storage.Interface.
func (c *Cacher) Codec() runtime.Codec {
	return c.storage.Codec()

func (c *Cacher) processEvent(event watchCacheEvent) {
	defer c.Unlock()
	for _, watcher := range c.watchers {

func (c *Cacher) terminateAllWatchers() {
	defer c.Unlock()
	for key, watcher := range c.watchers {
		delete(c.watchers, key)

func (c *Cacher) isStopped() bool {
	defer c.stopLock.RUnlock()
	return c.stopped

func (c *Cacher) Stop() {
	c.stopped = true

func forgetWatcher(c *Cacher, index int) func(bool) {
	return func(lock bool) {
		if lock {
			defer c.Unlock()
		// It's possible that the watcher is already not in the map (e.g. in case of
		// simulaneous Stop() and terminateAllWatchers(), but it doesn't break anything.
		delete(c.watchers, index)

func filterFunction(key string, keyFunc func(runtime.Object) (string, error), filter FilterFunc) FilterFunc {
	return func(obj runtime.Object) bool {
		objKey, err := keyFunc(obj)
		if err != nil {
			glog.Errorf("invalid object for filter: %v", obj)
			return false
		if !strings.HasPrefix(objKey, key) {
			return false
		return filter(obj)

// Returns resource version to which the underlying cache is synced.
func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
	// To avoid situation when LastSyncResourceVersion is processed before the
	// underlying watchCache is propagated, we acquire 'usable' lock.
	defer c.usable.RUnlock()

	defer c.RUnlock()

	resourceVersion := c.reflector.LastSyncResourceVersion()
	if resourceVersion == "" {
		return 0, nil
	return strconv.ParseUint(resourceVersion, 10, 64)

// cacherListerWatcher opaques storage.Interface to expose cache.ListerWatcher.
type cacherListerWatcher struct {
	storage        Interface
	resourcePrefix string
	newListFunc    func() runtime.Object

func newCacherListerWatcher(storage Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher {
	return &cacherListerWatcher{
		storage:        storage,
		resourcePrefix: resourcePrefix,
		newListFunc:    newListFunc,

// Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) List(options api.ListOptions) (runtime.Object, error) {
	list := lw.newListFunc()
	if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", Everything, list); err != nil {
		return nil, err
	return list, nil

// Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) Watch(options api.ListOptions) (watch.Interface, error) {
	return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, options.ResourceVersion, Everything)

// cacherWatch implements watch.Interface to return a single error
type errWatcher struct {
	result chan watch.Event

func newErrWatcher(err error) *errWatcher {
	// Create an error event
	errEvent := watch.Event{Type: watch.Error}
	switch err := err.(type) {
	case runtime.Object:
		errEvent.Object = err
	case *errors.StatusError:
		errEvent.Object = &err.ErrStatus
		errEvent.Object = &unversioned.Status{
			Status:  unversioned.StatusFailure,
			Message: err.Error(),
			Reason:  unversioned.StatusReasonInternalError,
			Code:    http.StatusInternalServerError,

	// Create a watcher with room for a single event, populate it, and close the channel
	watcher := &errWatcher{result: make(chan watch.Event, 1)}
	watcher.result <- errEvent

	return watcher

// Implements watch.Interface.
func (c *errWatcher) ResultChan() <-chan watch.Event {
	return c.result

// Implements watch.Interface.
func (c *errWatcher) Stop() {
	// no-op

// cacherWatch implements watch.Interface
type cacheWatcher struct {
	input   chan watchCacheEvent
	result  chan watch.Event
	filter  FilterFunc
	stopped bool
	forget  func(bool)

func newCacheWatcher(resourceVersion uint64, initEvents []watchCacheEvent, filter FilterFunc, forget func(bool)) *cacheWatcher {
	watcher := &cacheWatcher{
		input:   make(chan watchCacheEvent, 10),
		result:  make(chan watch.Event, 10),
		filter:  filter,
		stopped: false,
		forget:  forget,
	go watcher.process(initEvents, resourceVersion)
	return watcher

// Implements watch.Interface.
func (c *cacheWatcher) ResultChan() <-chan watch.Event {
	return c.result

// Implements watch.Interface.
func (c *cacheWatcher) Stop() {

func (c *cacheWatcher) stop() {
	defer c.Unlock()
	if !c.stopped {
		c.stopped = true

func (c *cacheWatcher) add(event watchCacheEvent) {
	select {
	case c.input <- event:
	case <-time.After(5 * time.Second):
		// This means that we couldn't send event to that watcher.
		// Since we don't want to blockin on it infinitely,
		// we simply terminate it.

func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) {
	curObjPasses := event.Type != watch.Deleted && c.filter(event.Object)
	oldObjPasses := false
	if event.PrevObject != nil {
		oldObjPasses = c.filter(event.PrevObject)
	if !curObjPasses && !oldObjPasses {
		// Watcher is not interested in that object.

	object, err := api.Scheme.Copy(event.Object)
	if err != nil {
		glog.Errorf("unexpected copy error: %v", err)
	switch {
	case curObjPasses && !oldObjPasses:
		c.result <- watch.Event{Type: watch.Added, Object: object}
	case curObjPasses && oldObjPasses:
		c.result <- watch.Event{Type: watch.Modified, Object: object}
	case !curObjPasses && oldObjPasses:
		c.result <- watch.Event{Type: watch.Deleted, Object: object}

func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uint64) {
	defer utilruntime.HandleCrash()

	for _, event := range initEvents {
	defer close(c.result)
	defer c.Stop()
	for {
		event, ok := <-c.input
		if !ok {
		// only send events newer than resourceVersion
		if event.ResourceVersion > resourceVersion {