package controller

import (
	"encoding/json"
	"fmt"
	"sync"
	"time"

	unidlingapi "github.com/openshift/origin/pkg/unidling/api"
	unidlingutil "github.com/openshift/origin/pkg/unidling/util"

	deployclient "github.com/openshift/origin/pkg/deploy/client/clientset_generated/internalclientset/typed/core/unversioned"
	kapi "k8s.io/kubernetes/pkg/api"
	"k8s.io/kubernetes/pkg/api/errors"

	"k8s.io/kubernetes/pkg/apimachinery/registered"
	kextapi "k8s.io/kubernetes/pkg/apis/extensions"
	"k8s.io/kubernetes/pkg/client/cache"
	kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
	kextclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned"
	"k8s.io/kubernetes/pkg/controller/framework"
	"k8s.io/kubernetes/pkg/fields"
	"k8s.io/kubernetes/pkg/runtime"
	"k8s.io/kubernetes/pkg/types"
	utilruntime "k8s.io/kubernetes/pkg/util/runtime"

	"k8s.io/kubernetes/pkg/util/wait"
	"k8s.io/kubernetes/pkg/util/workqueue"
	"k8s.io/kubernetes/pkg/watch"

	"github.com/golang/glog"
)

const MaxRetries = 5

type lastFiredCache struct {
	sync.RWMutex
	items map[types.NamespacedName]time.Time
}

func (c *lastFiredCache) Get(info types.NamespacedName) time.Time {
	c.RLock()
	defer c.RUnlock()

	return c.items[info]
}

func (c *lastFiredCache) Clear(info types.NamespacedName) {
	c.Lock()
	defer c.Unlock()

	delete(c.items, info)
}

func (c *lastFiredCache) AddIfNewer(info types.NamespacedName, newLastFired time.Time) bool {
	c.Lock()
	defer c.Unlock()

	if lastFired, hasLastFired := c.items[info]; !hasLastFired || lastFired.Before(newLastFired) {
		c.items[info] = newLastFired
		return true
	}

	return false
}

type UnidlingController struct {
	controller          *framework.Controller
	scaleNamespacer     kextclient.ScalesGetter
	endpointsNamespacer kcoreclient.EndpointsGetter
	queue               workqueue.RateLimitingInterface
	lastFiredCache      *lastFiredCache

	// TODO: remove these once we get the scale-source functionality in the scale endpoints
	dcNamespacer deployclient.DeploymentConfigsGetter
	rcNamespacer kcoreclient.ReplicationControllersGetter
}

func NewUnidlingController(scaleNS kextclient.ScalesGetter, endptsNS kcoreclient.EndpointsGetter, evtNS kcoreclient.EventsGetter, dcNamespacer deployclient.DeploymentConfigsGetter, rcNamespacer kcoreclient.ReplicationControllersGetter, resyncPeriod time.Duration) *UnidlingController {
	fieldSet := fields.Set{}
	fieldSet["reason"] = unidlingapi.NeedPodsReason
	fieldSelector := fieldSet.AsSelector()

	unidlingController := &UnidlingController{
		scaleNamespacer:     scaleNS,
		endpointsNamespacer: endptsNS,
		queue:               workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
		lastFiredCache: &lastFiredCache{
			items: make(map[types.NamespacedName]time.Time),
		},

		dcNamespacer: dcNamespacer,
		rcNamespacer: rcNamespacer,
	}

	_, controller := framework.NewInformer(
		&cache.ListWatch{
			// No need to list -- we only care about new events
			ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
				options.FieldSelector = fieldSelector
				return evtNS.Events(kapi.NamespaceAll).List(options)
			},
			WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
				options.FieldSelector = fieldSelector
				return evtNS.Events(kapi.NamespaceAll).Watch(options)
			},
		},
		&kapi.Event{},
		resyncPeriod,
		framework.ResourceEventHandlerFuncs{
			AddFunc:    unidlingController.addEvent,
			UpdateFunc: unidlingController.updateEvent,
			// this is just to clean up our cache of the last seen times
			DeleteFunc: unidlingController.checkAndClearFromCache,
		},
	)

	unidlingController.controller = controller

	return unidlingController
}

func (c *UnidlingController) addEvent(obj interface{}) {
	evt, ok := obj.(*kapi.Event)
	if !ok {
		utilruntime.HandleError(fmt.Errorf("got non-Event object in event action: %v", obj))
		return
	}

	c.enqueueEvent(evt)
}

func (c *UnidlingController) updateEvent(oldObj, newObj interface{}) {
	evt, ok := newObj.(*kapi.Event)
	if !ok {
		utilruntime.HandleError(fmt.Errorf("got non-Event object in event action: %v", newObj))
		return
	}

	c.enqueueEvent(evt)
}

func (c *UnidlingController) checkAndClearFromCache(obj interface{}) {
	evt, objIsEvent := obj.(*kapi.Event)
	if !objIsEvent {
		tombstone, objIsTombstone := obj.(cache.DeletedFinalStateUnknown)
		if !objIsTombstone {
			utilruntime.HandleError(fmt.Errorf("got non-event, non-tombstone object in event action: %v", obj))
			return
		}

		evt, objIsEvent = tombstone.Obj.(*kapi.Event)
		if !objIsEvent {
			utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not an Event in event action: %v", obj))
			return
		}
	}

	c.clearEventFromCache(evt)
}

// clearEventFromCache removes the entry for the given event from the lastFiredCache.
func (c *UnidlingController) clearEventFromCache(event *kapi.Event) {
	if event.Reason != unidlingapi.NeedPodsReason {
		return
	}

	info := types.NamespacedName{
		Namespace: event.InvolvedObject.Namespace,
		Name:      event.InvolvedObject.Name,
	}
	c.lastFiredCache.Clear(info)
}

// equeueEvent checks if the given event is relevant (i.e. if it's a NeedPods event),
// and, if so, extracts relevant information, and enqueues that information in the
// processing queue.
func (c *UnidlingController) enqueueEvent(event *kapi.Event) {
	if event.Reason != unidlingapi.NeedPodsReason {
		return
	}

	info := types.NamespacedName{
		Namespace: event.InvolvedObject.Namespace,
		Name:      event.InvolvedObject.Name,
	}

	// only add things to the queue if they're newer than what we already have
	if c.lastFiredCache.AddIfNewer(info, event.LastTimestamp.Time) {
		c.queue.Add(info)
	}
}

func (c *UnidlingController) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	go c.controller.Run(stopCh)
	go wait.Until(c.processRequests, time.Second, stopCh)
}

// processRequests calls awaitRequest repeatedly, until told to stop by
// the return value of awaitRequest.
func (c *UnidlingController) processRequests() {
	for {
		if !c.awaitRequest() {
			return
		}
	}
}

// awaitRequest awaits a new request on the queue, and sends it off for processing.
// If more requests on the queue should be processed, it returns true.  If we should
// stop processing, it returns false.
func (c *UnidlingController) awaitRequest() bool {
	infoRaw, stop := c.queue.Get()
	if stop {
		return false
	}

	defer c.queue.Done(infoRaw)

	info := infoRaw.(types.NamespacedName)
	lastFired := c.lastFiredCache.Get(info)

	var retry bool
	var err error
	if retry, err = c.handleRequest(info, lastFired); err == nil {
		// if there was no error, we succeeded in the unidling, and we need to
		// tell the rate limitter to stop tracking this request
		c.queue.Forget(infoRaw)
		return true
	}

	// check to see if we think the error was transient (e.g. server error on the update request),
	// and if not, do not retry
	if !retry {
		utilruntime.HandleError(fmt.Errorf("Unable to process unidling event for %s/%s at (%s), will not retry: %v", info.Namespace, info.Name, lastFired, err))
		return true
	}

	// Otherwise, if we have an error, we were at least partially unsuccessful in unidling, so
	// we requeue the event to process later

	// don't try to process failing requests forever
	if c.queue.NumRequeues(infoRaw) > MaxRetries {
		utilruntime.HandleError(fmt.Errorf("Unable to process unidling event for %s/%s (at %s), will not retry again: %v", info.Namespace, info.Name, lastFired, err))
		c.queue.Forget(infoRaw)
		return true
	}

	glog.V(4).Infof("Unable to fully process unidling request for %s/%s (at %s), will retry: %v", info.Namespace, info.Name, lastFired, err)
	c.queue.AddRateLimited(infoRaw)
	return true
}

// handleRequest handles a single request to unidle.  After checking the validity of the request,
// it will examine the endpoints in question to determine which scalables to scale, and will scale
// them and remove them from the endpoints' list of idled scalables.  If it is unable to properly
// process the request, it will return a boolean indicating whether or not we should retry later,
// as well as an error (e.g. if we're unable to parse an annotation, retrying later won't help,
// so it will return false).
func (c *UnidlingController) handleRequest(info types.NamespacedName, lastFired time.Time) (bool, error) {
	// fetch the endpoints associated with the service in question
	targetEndpoints, err := c.endpointsNamespacer.Endpoints(info.Namespace).Get(info.Name)
	if err != nil {
		return true, fmt.Errorf("unable to retrieve endpoints: %v", err)
	}

	// make sure we actually were idled...
	idledTimeRaw, wasIdled := targetEndpoints.Annotations[unidlingapi.IdledAtAnnotation]
	if !wasIdled {
		glog.V(5).Infof("UnidlingController received a NeedPods event for a service that was not idled, ignoring")
		return false, nil
	}

	// ...and make sure this request was to wake up from the most recent idling, and not a previous one
	idledTime, err := time.Parse(time.RFC3339, idledTimeRaw)
	if err != nil {
		// retrying here won't help, we're just stuck as idle since we can't get parse the idled time
		return false, fmt.Errorf("unable to check idled-at time: %v", err)
	}
	if lastFired.Before(idledTime) {
		glog.V(5).Infof("UnidlingController received an out-of-date NeedPods event, ignoring")
		return false, nil
	}

	// TODO: ew, this is unversioned.  Such is life when working with annotations.
	var targetScalables []unidlingapi.RecordedScaleReference
	if targetScalablesStr, hasTargetScalables := targetEndpoints.Annotations[unidlingapi.UnidleTargetAnnotation]; hasTargetScalables {
		if err = json.Unmarshal([]byte(targetScalablesStr), &targetScalables); err != nil {
			// retrying here won't help, we're just stuck as idled since we can't parse the idled scalables list
			return false, fmt.Errorf("unable to unmarshal target scalable references: %v", err)
		}
	} else {
		glog.V(4).Infof("Service %s/%s had no scalables to unidle", info.Namespace, info.Name)
		targetScalables = []unidlingapi.RecordedScaleReference{}
	}

	targetScalablesSet := make(map[unidlingapi.RecordedScaleReference]struct{}, len(targetScalables))
	for _, v := range targetScalables {
		targetScalablesSet[v] = struct{}{}
	}

	deleteIdlingAnnotations := func(_ int32, annotations map[string]string) {
		delete(annotations, unidlingapi.IdledAtAnnotation)
		delete(annotations, unidlingapi.PreviousScaleAnnotation)
	}

	scaleAnnotater := unidlingutil.NewScaleAnnotater(c.scaleNamespacer, c.dcNamespacer, c.rcNamespacer, deleteIdlingAnnotations)

	for _, scalableRef := range targetScalables {
		var scale *kextapi.Scale
		var obj runtime.Object

		obj, scale, err = scaleAnnotater.GetObjectWithScale(info.Namespace, scalableRef.CrossGroupObjectReference)
		if err != nil {
			if errors.IsNotFound(err) {
				utilruntime.HandleError(fmt.Errorf("%s %q does not exist, removing from list of scalables while unidling service %s/%s: %v", scalableRef.Kind, scalableRef.Name, info.Namespace, info.Name, err))
				delete(targetScalablesSet, scalableRef)
			} else {
				utilruntime.HandleError(fmt.Errorf("Unable to get scale for %s %q while unidling service %s/%s, will try again later: %v", scalableRef.Kind, scalableRef.Name, info.Namespace, info.Name, err))
			}
			continue
		}

		if scale.Spec.Replicas > 0 {
			glog.V(4).Infof("%s %q is not idle, skipping while unidling service %s/%s", scalableRef.Kind, scalableRef.Name, info.Namespace, info.Name)
			continue
		}

		scale.Spec.Replicas = scalableRef.Replicas

		updater := unidlingutil.NewScaleUpdater(kapi.Codecs.LegacyCodec(registered.EnabledVersions()...), info.Namespace, c.dcNamespacer, c.rcNamespacer)
		if err = scaleAnnotater.UpdateObjectScale(updater, info.Namespace, scalableRef.CrossGroupObjectReference, obj, scale); err != nil {
			if errors.IsNotFound(err) {
				utilruntime.HandleError(fmt.Errorf("%s %q does not exist, removing from list of scalables while unidling service %s/%s: %v", scalableRef.Kind, scalableRef.Name, info.Namespace, info.Name, err))
				delete(targetScalablesSet, scalableRef)
			} else {
				utilruntime.HandleError(fmt.Errorf("Unable to scale up %s %q while unidling service %s/%s: %v", scalableRef.Kind, scalableRef.Name, info.Namespace, info.Name, err))
			}
			continue
		} else {
			glog.V(4).Infof("Scaled up %s %q while unidling service %s/%s", scalableRef.Kind, scalableRef.Name, info.Namespace, info.Name)
		}

		delete(targetScalablesSet, scalableRef)
	}

	newAnnotationList := make([]unidlingapi.RecordedScaleReference, 0, len(targetScalablesSet))
	for k := range targetScalablesSet {
		newAnnotationList = append(newAnnotationList, k)
	}

	if len(newAnnotationList) == 0 {
		delete(targetEndpoints.Annotations, unidlingapi.UnidleTargetAnnotation)
		delete(targetEndpoints.Annotations, unidlingapi.IdledAtAnnotation)
	} else {
		var newAnnotationBytes []byte
		newAnnotationBytes, err = json.Marshal(newAnnotationList)
		if err != nil {
			utilruntime.HandleError(fmt.Errorf("unable to update/remove idle annotations from %s/%s: unable to marshal list of remaining scalables, removing list entirely: %v", info.Namespace, info.Name, err))

			delete(targetEndpoints.Annotations, unidlingapi.UnidleTargetAnnotation)
			delete(targetEndpoints.Annotations, unidlingapi.IdledAtAnnotation)
		} else {
			targetEndpoints.Annotations[unidlingapi.UnidleTargetAnnotation] = string(newAnnotationBytes)
		}
	}

	if _, err = c.endpointsNamespacer.Endpoints(info.Namespace).Update(targetEndpoints); err != nil {
		return true, fmt.Errorf("unable to update/remove idle annotations from %s/%s: %v", info.Namespace, info.Name, err)
	}

	return false, nil
}