package controller
import (
"encoding/json"
"fmt"
"sync"
"time"
unidlingapi "github.com/openshift/origin/pkg/unidling/api"
unidlingutil "github.com/openshift/origin/pkg/unidling/util"
deployclient "github.com/openshift/origin/pkg/deploy/client/clientset_generated/internalclientset/typed/core/unversioned"
kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/apimachinery/registered"
kextapi "k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/cache"
kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
kextclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
)
const MaxRetries = 5
type lastFiredCache struct {
sync.RWMutex
items map[types.NamespacedName]time.Time
}
func (c *lastFiredCache) Get(info types.NamespacedName) time.Time {
c.RLock()
defer c.RUnlock()
return c.items[info]
}
func (c *lastFiredCache) Clear(info types.NamespacedName) {
c.Lock()
defer c.Unlock()
delete(c.items, info)
}
func (c *lastFiredCache) AddIfNewer(info types.NamespacedName, newLastFired time.Time) bool {
c.Lock()
defer c.Unlock()
if lastFired, hasLastFired := c.items[info]; !hasLastFired || lastFired.Before(newLastFired) {
c.items[info] = newLastFired
return true
}
return false
}
type UnidlingController struct {
controller *framework.Controller
scaleNamespacer kextclient.ScalesGetter
endpointsNamespacer kcoreclient.EndpointsGetter
queue workqueue.RateLimitingInterface
lastFiredCache *lastFiredCache
// TODO: remove these once we get the scale-source functionality in the scale endpoints
dcNamespacer deployclient.DeploymentConfigsGetter
rcNamespacer kcoreclient.ReplicationControllersGetter
}
func NewUnidlingController(scaleNS kextclient.ScalesGetter, endptsNS kcoreclient.EndpointsGetter, evtNS kcoreclient.EventsGetter, dcNamespacer deployclient.DeploymentConfigsGetter, rcNamespacer kcoreclient.ReplicationControllersGetter, resyncPeriod time.Duration) *UnidlingController {
fieldSet := fields.Set{}
fieldSet["reason"] = unidlingapi.NeedPodsReason
fieldSelector := fieldSet.AsSelector()
unidlingController := &UnidlingController{
scaleNamespacer: scaleNS,
endpointsNamespacer: endptsNS,
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
lastFiredCache: &lastFiredCache{
items: make(map[types.NamespacedName]time.Time),
},
dcNamespacer: dcNamespacer,
rcNamespacer: rcNamespacer,
}
_, controller := framework.NewInformer(
&cache.ListWatch{
// No need to list -- we only care about new events
ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
options.FieldSelector = fieldSelector
return evtNS.Events(kapi.NamespaceAll).List(options)
},
WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
options.FieldSelector = fieldSelector
return evtNS.Events(kapi.NamespaceAll).Watch(options)
},
},
&kapi.Event{},
resyncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: unidlingController.addEvent,
UpdateFunc: unidlingController.updateEvent,
// this is just to clean up our cache of the last seen times
DeleteFunc: unidlingController.checkAndClearFromCache,
},
)
unidlingController.controller = controller
return unidlingController
}
func (c *UnidlingController) addEvent(obj interface{}) {
evt, ok := obj.(*kapi.Event)
if !ok {
utilruntime.HandleError(fmt.Errorf("got non-Event object in event action: %v", obj))
return
}
c.enqueueEvent(evt)
}
func (c *UnidlingController) updateEvent(oldObj, newObj interface{}) {
evt, ok := newObj.(*kapi.Event)
if !ok {
utilruntime.HandleError(fmt.Errorf("got non-Event object in event action: %v", newObj))
return
}
c.enqueueEvent(evt)
}
func (c *UnidlingController) checkAndClearFromCache(obj interface{}) {
evt, objIsEvent := obj.(*kapi.Event)
if !objIsEvent {
tombstone, objIsTombstone := obj.(cache.DeletedFinalStateUnknown)
if !objIsTombstone {
utilruntime.HandleError(fmt.Errorf("got non-event, non-tombstone object in event action: %v", obj))
return
}
evt, objIsEvent = tombstone.Obj.(*kapi.Event)
if !objIsEvent {
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not an Event in event action: %v", obj))
return
}
}
c.clearEventFromCache(evt)
}
// clearEventFromCache removes the entry for the given event from the lastFiredCache.
func (c *UnidlingController) clearEventFromCache(event *kapi.Event) {
if event.Reason != unidlingapi.NeedPodsReason {
return
}
info := types.NamespacedName{
Namespace: event.InvolvedObject.Namespace,
Name: event.InvolvedObject.Name,
}
c.lastFiredCache.Clear(info)
}
// equeueEvent checks if the given event is relevant (i.e. if it's a NeedPods event),
// and, if so, extracts relevant information, and enqueues that information in the
// processing queue.
func (c *UnidlingController) enqueueEvent(event *kapi.Event) {
if event.Reason != unidlingapi.NeedPodsReason {
return
}
info := types.NamespacedName{
Namespace: event.InvolvedObject.Namespace,
Name: event.InvolvedObject.Name,
}
// only add things to the queue if they're newer than what we already have
if c.lastFiredCache.AddIfNewer(info, event.LastTimestamp.Time) {
c.queue.Add(info)
}
}
func (c *UnidlingController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go c.controller.Run(stopCh)
go wait.Until(c.processRequests, time.Second, stopCh)
}
// processRequests calls awaitRequest repeatedly, until told to stop by
// the return value of awaitRequest.
func (c *UnidlingController) processRequests() {
for {
if !c.awaitRequest() {
return
}
}
}
// awaitRequest awaits a new request on the queue, and sends it off for processing.
// If more requests on the queue should be processed, it returns true. If we should
// stop processing, it returns false.
func (c *UnidlingController) awaitRequest() bool {
infoRaw, stop := c.queue.Get()
if stop {
return false
}
defer c.queue.Done(infoRaw)
info := infoRaw.(types.NamespacedName)
lastFired := c.lastFiredCache.Get(info)
var retry bool
var err error
if retry, err = c.handleRequest(info, lastFired); err == nil {
// if there was no error, we succeeded in the unidling, and we need to
// tell the rate limitter to stop tracking this request
c.queue.Forget(infoRaw)
return true
}
// check to see if we think the error was transient (e.g. server error on the update request),
// and if not, do not retry
if !retry {
utilruntime.HandleError(fmt.Errorf("Unable to process unidling event for %s/%s at (%s), will not retry: %v", info.Namespace, info.Name, lastFired, err))
return true
}
// Otherwise, if we have an error, we were at least partially unsuccessful in unidling, so
// we requeue the event to process later
// don't try to process failing requests forever
if c.queue.NumRequeues(infoRaw) > MaxRetries {
utilruntime.HandleError(fmt.Errorf("Unable to process unidling event for %s/%s (at %s), will not retry again: %v", info.Namespace, info.Name, lastFired, err))
c.queue.Forget(infoRaw)
return true
}
glog.V(4).Infof("Unable to fully process unidling request for %s/%s (at %s), will retry: %v", info.Namespace, info.Name, lastFired, err)
c.queue.AddRateLimited(infoRaw)
return true
}
// handleRequest handles a single request to unidle. After checking the validity of the request,
// it will examine the endpoints in question to determine which scalables to scale, and will scale
// them and remove them from the endpoints' list of idled scalables. If it is unable to properly
// process the request, it will return a boolean indicating whether or not we should retry later,
// as well as an error (e.g. if we're unable to parse an annotation, retrying later won't help,
// so it will return false).
func (c *UnidlingController) handleRequest(info types.NamespacedName, lastFired time.Time) (bool, error) {
// fetch the endpoints associated with the service in question
targetEndpoints, err := c.endpointsNamespacer.Endpoints(info.Namespace).Get(info.Name)
if err != nil {
return true, fmt.Errorf("unable to retrieve endpoints: %v", err)
}
// make sure we actually were idled...
idledTimeRaw, wasIdled := targetEndpoints.Annotations[unidlingapi.IdledAtAnnotation]
if !wasIdled {
glog.V(5).Infof("UnidlingController received a NeedPods event for a service that was not idled, ignoring")
return false, nil
}
// ...and make sure this request was to wake up from the most recent idling, and not a previous one
idledTime, err := time.Parse(time.RFC3339, idledTimeRaw)
if err != nil {
// retrying here won't help, we're just stuck as idle since we can't get parse the idled time
return false, fmt.Errorf("unable to check idled-at time: %v", err)
}
if lastFired.Before(idledTime) {
glog.V(5).Infof("UnidlingController received an out-of-date NeedPods event, ignoring")
return false, nil
}
// TODO: ew, this is unversioned. Such is life when working with annotations.
var targetScalables []unidlingapi.RecordedScaleReference
if targetScalablesStr, hasTargetScalables := targetEndpoints.Annotations[unidlingapi.UnidleTargetAnnotation]; hasTargetScalables {
if err = json.Unmarshal([]byte(targetScalablesStr), &targetScalables); err != nil {
// retrying here won't help, we're just stuck as idled since we can't parse the idled scalables list
return false, fmt.Errorf("unable to unmarshal target scalable references: %v", err)
}
} else {
glog.V(4).Infof("Service %s/%s had no scalables to unidle", info.Namespace, info.Name)
targetScalables = []unidlingapi.RecordedScaleReference{}
}
targetScalablesSet := make(map[unidlingapi.RecordedScaleReference]struct{}, len(targetScalables))
for _, v := range targetScalables {
targetScalablesSet[v] = struct{}{}
}
deleteIdlingAnnotations := func(_ int32, annotations map[string]string) {
delete(annotations, unidlingapi.IdledAtAnnotation)
delete(annotations, unidlingapi.PreviousScaleAnnotation)
}
scaleAnnotater := unidlingutil.NewScaleAnnotater(c.scaleNamespacer, c.dcNamespacer, c.rcNamespacer, deleteIdlingAnnotations)
for _, scalableRef := range targetScalables {
var scale *kextapi.Scale
var obj runtime.Object
obj, scale, err = scaleAnnotater.GetObjectWithScale(info.Namespace, scalableRef.CrossGroupObjectReference)
if err != nil {
if errors.IsNotFound(err) {
utilruntime.HandleError(fmt.Errorf("%s %q does not exist, removing from list of scalables while unidling service %s/%s: %v", scalableRef.Kind, scalableRef.Name, info.Namespace, info.Name, err))
delete(targetScalablesSet, scalableRef)
} else {
utilruntime.HandleError(fmt.Errorf("Unable to get scale for %s %q while unidling service %s/%s, will try again later: %v", scalableRef.Kind, scalableRef.Name, info.Namespace, info.Name, err))
}
continue
}
if scale.Spec.Replicas > 0 {
glog.V(4).Infof("%s %q is not idle, skipping while unidling service %s/%s", scalableRef.Kind, scalableRef.Name, info.Namespace, info.Name)
continue
}
scale.Spec.Replicas = scalableRef.Replicas
updater := unidlingutil.NewScaleUpdater(kapi.Codecs.LegacyCodec(registered.EnabledVersions()...), info.Namespace, c.dcNamespacer, c.rcNamespacer)
if err = scaleAnnotater.UpdateObjectScale(updater, info.Namespace, scalableRef.CrossGroupObjectReference, obj, scale); err != nil {
if errors.IsNotFound(err) {
utilruntime.HandleError(fmt.Errorf("%s %q does not exist, removing from list of scalables while unidling service %s/%s: %v", scalableRef.Kind, scalableRef.Name, info.Namespace, info.Name, err))
delete(targetScalablesSet, scalableRef)
} else {
utilruntime.HandleError(fmt.Errorf("Unable to scale up %s %q while unidling service %s/%s: %v", scalableRef.Kind, scalableRef.Name, info.Namespace, info.Name, err))
}
continue
} else {
glog.V(4).Infof("Scaled up %s %q while unidling service %s/%s", scalableRef.Kind, scalableRef.Name, info.Namespace, info.Name)
}
delete(targetScalablesSet, scalableRef)
}
newAnnotationList := make([]unidlingapi.RecordedScaleReference, 0, len(targetScalablesSet))
for k := range targetScalablesSet {
newAnnotationList = append(newAnnotationList, k)
}
if len(newAnnotationList) == 0 {
delete(targetEndpoints.Annotations, unidlingapi.UnidleTargetAnnotation)
delete(targetEndpoints.Annotations, unidlingapi.IdledAtAnnotation)
} else {
var newAnnotationBytes []byte
newAnnotationBytes, err = json.Marshal(newAnnotationList)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to update/remove idle annotations from %s/%s: unable to marshal list of remaining scalables, removing list entirely: %v", info.Namespace, info.Name, err))
delete(targetEndpoints.Annotations, unidlingapi.UnidleTargetAnnotation)
delete(targetEndpoints.Annotations, unidlingapi.IdledAtAnnotation)
} else {
targetEndpoints.Annotations[unidlingapi.UnidleTargetAnnotation] = string(newAnnotationBytes)
}
}
if _, err = c.endpointsNamespacer.Endpoints(info.Namespace).Update(targetEndpoints); err != nil {
return true, fmt.Errorf("unable to update/remove idle annotations from %s/%s: %v", info.Namespace, info.Name, err)
}
return false, nil
}