package controller

import (
	"time"

	"github.com/golang/glog"

	kapi "k8s.io/kubernetes/pkg/api"
	"k8s.io/kubernetes/pkg/client/cache"
	"k8s.io/kubernetes/pkg/runtime"
	"k8s.io/kubernetes/pkg/util/flowcontrol"
	utilruntime "k8s.io/kubernetes/pkg/util/runtime"
	"k8s.io/kubernetes/pkg/watch"

	"github.com/openshift/origin/pkg/client"
	"github.com/openshift/origin/pkg/controller"
	"github.com/openshift/origin/pkg/image/api"
)

// ImportControllerFactory can create an ImportController.
type ImportControllerFactory struct {
	Client               client.Interface
	ResyncInterval       time.Duration
	MinimumCheckInterval time.Duration
	ImportRateLimiter    flowcontrol.RateLimiter
	ScheduleEnabled      bool
}

// Create creates an ImportController.
func (f *ImportControllerFactory) Create() (controller.RunnableController, controller.StoppableController) {
	lw := &cache.ListWatch{
		ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
			return f.Client.ImageStreams(kapi.NamespaceAll).List(options)
		},
		WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
			return f.Client.ImageStreams(kapi.NamespaceAll).Watch(options)
		},
	}
	q := cache.NewResyncableFIFO(cache.MetaNamespaceKeyFunc)
	cache.NewReflector(lw, &api.ImageStream{}, q, f.ResyncInterval).Run()

	// instantiate a scheduled importer using a number of buckets
	buckets := 4
	switch {
	case f.MinimumCheckInterval > time.Hour:
		buckets = 8
	case f.MinimumCheckInterval < 10*time.Minute:
		buckets = 2
	}
	seconds := f.MinimumCheckInterval / time.Second
	bucketQPS := 1.0 / float32(seconds) * float32(buckets)

	limiter := flowcontrol.NewTokenBucketRateLimiter(bucketQPS, 1)
	b := newScheduled(f.ScheduleEnabled, f.Client, buckets, limiter, f.ImportRateLimiter)

	// instantiate an importer for changes that happen to the image stream
	changed := &controller.RetryController{
		Queue: q,
		RetryManager: controller.NewQueueRetryManager(
			q,
			cache.MetaNamespaceKeyFunc,
			func(obj interface{}, err error, retries controller.Retry) bool {
				utilruntime.HandleError(err)
				return retries.Count < 5
			},
			flowcontrol.NewTokenBucketRateLimiter(1, 10),
		),
		Handle: b.Handle,
	}

	return changed, b.scheduler
}

type uniqueItem struct {
	uid             string
	resourceVersion string
}

// scheduled watches for changes to image streams and adds them to the list of streams to be
// periodically imported (later) or directly imported (now).
type scheduled struct {
	enabled     bool
	scheduler   *controller.Scheduler
	rateLimiter flowcontrol.RateLimiter
	controller  *ImportController
}

// newScheduled initializes a scheduled import object and sets its scheduler. Limiter is optional.
func newScheduled(enabled bool, client client.ImageStreamsNamespacer, buckets int, bucketLimiter, importLimiter flowcontrol.RateLimiter) *scheduled {
	b := &scheduled{
		enabled:     enabled,
		rateLimiter: importLimiter,
		controller: &ImportController{
			streams: client,
		},
	}
	b.scheduler = controller.NewScheduler(buckets, bucketLimiter, b.HandleTimed)
	return b
}

// Handle ensures an image stream is checked for scheduling and then runs a direct import
func (b *scheduled) Handle(obj interface{}) error {
	stream := obj.(*api.ImageStream)
	if b.enabled && needsScheduling(stream) {
		key, _ := cache.MetaNamespaceKeyFunc(stream)
		b.scheduler.Add(key, uniqueItem{uid: string(stream.UID), resourceVersion: stream.ResourceVersion})
	}
	return b.controller.Next(stream, b)
}

// HandleTimed is invoked when a key is ready to be processed.
func (b *scheduled) HandleTimed(key, value interface{}) {
	if !b.enabled {
		b.scheduler.Remove(key, value)
		return
	}
	glog.V(5).Infof("DEBUG: checking %s", key)
	if b.rateLimiter != nil && !b.rateLimiter.TryAccept() {
		glog.V(5).Infof("DEBUG: check of %s exceeded rate limit, will retry later", key)
		return
	}
	namespace, name, _ := cache.SplitMetaNamespaceKey(key.(string))
	if err := b.controller.NextTimedByName(namespace, name); err != nil {
		// the stream cannot be imported
		if err == ErrNotImportable {
			// value must match to be removed, so we avoid races against creation by ensuring that we only
			// remove the stream if the uid and resource version in the scheduler are exactly the same.
			b.scheduler.Remove(key, value)
			return
		}
		utilruntime.HandleError(err)
		return
	}
}

// Importing is invoked when the controller decides to import a stream in order to push back
// the next schedule time.
func (b *scheduled) Importing(stream *api.ImageStream) {
	if !b.enabled {
		return
	}
	glog.V(5).Infof("DEBUG: stream %s was just imported", stream.Name)
	// Push the current key back to the end of the queue because it's just been imported
	key, _ := cache.MetaNamespaceKeyFunc(stream)
	b.scheduler.Delay(key)
}