package ingressip

import (
	"fmt"
	"net"
	"sort"
	"sync"
	"time"

	"github.com/golang/glog"
	kapi "k8s.io/kubernetes/pkg/api"
	kerrors "k8s.io/kubernetes/pkg/api/errors"
	"k8s.io/kubernetes/pkg/client/cache"
	kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
	kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
	"k8s.io/kubernetes/pkg/client/record"
	"k8s.io/kubernetes/pkg/controller"
	"k8s.io/kubernetes/pkg/controller/framework"
	"k8s.io/kubernetes/pkg/registry/service/allocator"
	"k8s.io/kubernetes/pkg/registry/service/ipallocator"
	"k8s.io/kubernetes/pkg/runtime"
	utilruntime "k8s.io/kubernetes/pkg/util/runtime"
	"k8s.io/kubernetes/pkg/util/sets"
	"k8s.io/kubernetes/pkg/util/wait"
	"k8s.io/kubernetes/pkg/util/workqueue"
	"k8s.io/kubernetes/pkg/watch"
)

const (
	// It's necessary to allocate for the initial sync in consistent
	// order rather than in the order received.  This requires waiting
	// until the initial sync has been processed, and to avoid a hot
	// loop, we'll wait this long between checks.
	SyncProcessedPollPeriod = 100 * time.Millisecond

	clientRetryCount    = 5
	clientRetryInterval = 5 * time.Second
	clientRetryFactor   = 1.1
)

// IngressIPController is responsible for allocating ingress ip
// addresses to Service objects of type LoadBalancer.
type IngressIPController struct {
	client kcoreclient.ServicesGetter

	controller *framework.Controller

	maxRetries int

	// Tracks ip allocation for the configured range
	ipAllocator *ipallocator.Range
	// Tracks ip -> service key to allow detection of duplicate ip
	// allocations.
	allocationMap map[string]string

	// Tracks services requeued for allocation when the range is full.
	requeuedAllocations sets.String

	// Protects the transition between initial sync and regular processing
	lock  sync.Mutex
	cache cache.Store
	queue workqueue.RateLimitingInterface

	// recorder is used to record events.
	recorder record.EventRecorder

	// changeHandler does the work. It can be factored out for unit testing.
	changeHandler func(change *serviceChange) error
	// persistenceHandler persists service changes.  It can be factored out for unit testing
	persistenceHandler func(client kcoreclient.ServicesGetter, service *kapi.Service, targetStatus bool) error
}

type serviceChange struct {
	key                string
	oldService         *kapi.Service
	requeuedAllocation bool
}

// NewIngressIPController creates a new IngressIPController.
// TODO this should accept a shared informer
func NewIngressIPController(kc kclientset.Interface, ipNet *net.IPNet, resyncInterval time.Duration) *IngressIPController {
	eventBroadcaster := record.NewBroadcaster()
	eventBroadcaster.StartRecordingToSink(&kcoreclient.EventSinkImpl{Interface: kc.Core().Events("")})
	recorder := eventBroadcaster.NewRecorder(kapi.EventSource{Component: "ingressip-controller"})

	ic := &IngressIPController{
		client:     kc.Core(),
		queue:      workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
		maxRetries: 10,
		recorder:   recorder,
	}

	ic.cache, ic.controller = framework.NewInformer(
		&cache.ListWatch{
			ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
				return ic.client.Services(kapi.NamespaceAll).List(options)
			},
			WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
				return ic.client.Services(kapi.NamespaceAll).Watch(options)
			},
		},
		&kapi.Service{},
		resyncInterval,
		framework.ResourceEventHandlerFuncs{
			AddFunc: func(obj interface{}) {
				service := obj.(*kapi.Service)
				glog.V(5).Infof("Adding service %s/%s", service.Namespace, service.Name)
				ic.enqueueChange(obj, nil)
			},
			UpdateFunc: func(old, cur interface{}) {
				service := cur.(*kapi.Service)
				glog.V(5).Infof("Updating service %s/%s", service.Namespace, service.Name)
				ic.enqueueChange(cur, old)
			},
			DeleteFunc: func(obj interface{}) {
				service := obj.(*kapi.Service)
				glog.V(5).Infof("Deleting service %s/%s", service.Namespace, service.Name)
				ic.enqueueChange(nil, obj)
			},
		},
	)

	ic.changeHandler = ic.processChange
	ic.persistenceHandler = persistService

	ic.ipAllocator = ipallocator.NewAllocatorCIDRRange(ipNet, func(max int, rangeSpec string) allocator.Interface {
		return allocator.NewAllocationMap(max, rangeSpec)
	})

	ic.allocationMap = make(map[string]string)
	ic.requeuedAllocations = sets.NewString()

	return ic
}

// enqueueChange transforms the old and new objects into a change
// object and queues it.  A lock is shared with processInitialSync to
// avoid enqueueing while the changes from the initial sync are being
// processed.
func (ic *IngressIPController) enqueueChange(new interface{}, old interface{}) {
	ic.lock.Lock()
	defer ic.lock.Unlock()

	change := &serviceChange{}

	if new != nil {
		// Queue the key needed to retrieve the lastest state from the
		// cache when the change is processed.
		key, err := controller.KeyFunc(new)
		if err != nil {
			utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", new, err))
			return
		}
		change.key = key
	}

	if old != nil {
		change.oldService = old.(*kapi.Service)
	}

	ic.queue.Add(change)
}

// Run begins watching and syncing.
func (ic *IngressIPController) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	go ic.controller.Run(stopCh)

	glog.V(5).Infof("Waiting for the initial sync to be completed")
	for !ic.controller.HasSynced() {
		select {
		case <-time.After(SyncProcessedPollPeriod):
		case <-stopCh:
			return
		}
	}

	if !ic.processInitialSync() {
		return
	}

	glog.V(5).Infof("Starting normal worker")
	for {
		if !ic.work() {
			break
		}
	}

	glog.V(5).Infof("Shutting down ingress ip controller")
	ic.queue.ShutDown()
}

type serviceAge []*kapi.Service

func (s serviceAge) Len() int      { return len(s) }
func (s serviceAge) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s serviceAge) Less(i, j int) bool {
	if s[i].CreationTimestamp.Before(s[j].CreationTimestamp) {
		return true
	}
	return (s[i].CreationTimestamp == s[j].CreationTimestamp && s[i].UID < s[j].UID)
}

// processInitialSync processes the items queued by informer's initial sync.
// A lock is shared between this method and enqueueService to ensure
// that queue additions are blocked while the sync is being processed.
// Returns a boolean indication of whether processing should continue.
func (ic *IngressIPController) processInitialSync() bool {
	ic.lock.Lock()
	defer ic.lock.Unlock()

	glog.V(5).Infof("Processing initial sync")

	// Track services that need to be processed after existing
	// allocations are recorded.
	var pendingServices []*kapi.Service

	// Track post-sync changes that need to be added back the queue
	// after allocations are recorded.  These changes may end up in
	// the queue if watch events were queued before completion of the
	// initial sync was detected.
	var pendingChanges []*serviceChange

	// Drain the queue.  Len() should be safe because enqueueService
	// requires the same lock held by this method.
	for ic.queue.Len() > 0 {
		item, quit := ic.queue.Get()
		if quit {
			return false
		}
		ic.queue.Done(item)
		ic.queue.Forget(item)
		change := item.(*serviceChange)

		// The initial sync only includes additions, so if an update
		// or delete is seen (indicated by the presence of oldService),
		// it and all subsequent changes are post-sync watch events that
		// should be queued without processing.
		postSyncChange := change.oldService != nil || len(pendingChanges) > 0
		if postSyncChange {
			pendingChanges = append(pendingChanges, change)
			continue
		}

		service := ic.getCachedService(change.key)
		if service == nil {
			// Service was deleted
			continue
		}

		if service.Spec.Type == kapi.ServiceTypeLoadBalancer {
			// Save for subsequent addition back to the queue to
			// ensure persistent state is updated during regular
			// processing.
			pendingServices = append(pendingServices, service)

			if len(service.Status.LoadBalancer.Ingress) > 0 {
				// The service has an existing allocation
				ipString := service.Status.LoadBalancer.Ingress[0].IP
				// Return values indicating that reallocation is
				// necessary or that an error occurred can be ignored
				// since the service will be processed again.
				ic.recordLocalAllocation(change.key, ipString)
			}
		}
	}

	// Add pending service additions back to the queue in consistent order.
	sort.Sort(serviceAge(pendingServices))
	for _, service := range pendingServices {
		if key, err := controller.KeyFunc(service); err == nil {
			glog.V(5).Infof("Adding service back to queue: %v ", key)
			change := &serviceChange{key: key}
			ic.queue.Add(change)
		} else {
			// This error should have been caught by enqueueService
			utilruntime.HandleError(fmt.Errorf("Couldn't get key for service %+v: %v", service, err))
			continue
		}
	}

	// Add watch events back to the queue
	for _, change := range pendingChanges {
		ic.queue.Add(change)
	}

	glog.V(5).Infof("Completed processing initial sync")

	return true
}

// getCachedService logs if unable to retrieve a service for the given key.
func (ic *IngressIPController) getCachedService(key string) *kapi.Service {
	if len(key) == 0 {
		return nil
	}
	if obj, exists, err := ic.cache.GetByKey(key); err != nil {
		glog.V(5).Infof("Unable to retrieve service %v from store: %v", key, err)
	} else if !exists {
		glog.V(6).Infof("Service %v has been deleted", key)
	} else {
		return obj.(*kapi.Service)
	}
	return nil
}

// recordLocalAllocation attempts to update local state for the given
// service key and ingress ip.  Returns a boolean indication of
// whether reallocation is necessary and an error indicating the
// reason for reallocation.  If reallocation is not indicated, a
// non-nil error indicates an exceptional condition.
func (ic *IngressIPController) recordLocalAllocation(key, ipString string) (reallocate bool, err error) {
	ip := net.ParseIP(ipString)
	if ip == nil {
		return true, fmt.Errorf("Service %v has an invalid ingress ip %v.  A new ip will be allocated.", key, ipString)
	}

	ipKey, ok := ic.allocationMap[ipString]
	switch {
	case ok && ipKey == key:
		// Allocation exists for this service
		return false, nil
	case ok && ipKey != key:
		// TODO prefer removing the allocation from a service that does not have a matching LoadBalancerIP
		return true, fmt.Errorf("Another service is using ingress ip %v.  A new ip will be allocated for %v.", ipString, key)
	}

	err = ic.ipAllocator.Allocate(ip)
	switch {
	case err == ipallocator.ErrNotInRange:
		return true, fmt.Errorf("The ingress ip %v for service %v is not in the ingress range.  A new ip will be allocated.", ipString, key)
	case err != nil:
		// The only other error that Allocate() can throw is ErrAllocated, but that
		// should not happen after the check against the allocation map.
		return false, fmt.Errorf("Unexpected error from ip allocator for service %v: %v", key, err)
	}
	ic.allocationMap[ipString] = key
	glog.V(5).Infof("Recorded allocation of ip %v for service %v", ipString, key)
	return false, nil
}

// work dispatches the next item in the queue to the change handler.
// If the change handler returns an error, the change will be added to
// the end of the queue to be processed again.  Returns a boolean
// indication of whether processing should continue.
func (ic *IngressIPController) work() bool {
	item, quit := ic.queue.Get()
	if quit {
		return false
	}
	change := item.(*serviceChange)
	defer ic.queue.Done(change)

	if change.requeuedAllocation {
		// Reset the allocation state so that the change can be
		// requeued if necessary.  Only additions/updates are requeued
		// for allocation so change.key should be non-empty.
		change.requeuedAllocation = false
		ic.requeuedAllocations.Delete(change.key)
	}

	if err := ic.changeHandler(change); err == nil {
		// No further processing required
		ic.queue.Forget(change)
	} else {
		if err == ipallocator.ErrFull {
			// When the range is full, avoid requeueing more than a
			// single change requiring allocation per service.
			// Otherwise the queue could grow without bounds as every
			// service update would add another change that would be
			// endlessly requeued.
			if ic.requeuedAllocations.Has(change.key) {
				return true
			}
			change.requeuedAllocation = true
			ic.requeuedAllocations.Insert(change.key)
			service := ic.getCachedService(change.key)
			if service != nil {
				ic.recorder.Eventf(service, kapi.EventTypeWarning, "IngressIPRangeFull", "No available ingress ip to allocate to service %s", change.key)
			}
		}
		// Failed but can be retried
		utilruntime.HandleError(fmt.Errorf("error syncing service, it will be retried: %v", err))
		ic.queue.AddRateLimited(change)
	}

	return true
}

// processChange responds to a service change by synchronizing the
// local and persisted ingress ip allocation state of the service.
func (ic *IngressIPController) processChange(change *serviceChange) error {
	service := ic.getCachedService(change.key)

	ic.clearOldAllocation(service, change.oldService)

	if service == nil {
		// Service was deleted - no further processing required
		return nil
	}

	typeLoadBalancer := service.Spec.Type == kapi.ServiceTypeLoadBalancer
	hasAllocation := len(service.Status.LoadBalancer.Ingress) > 0
	switch {
	case typeLoadBalancer && hasAllocation:
		return ic.recordAllocation(service, change.key)
	case typeLoadBalancer && !hasAllocation:
		return ic.allocate(service, change.key)
	case !typeLoadBalancer && hasAllocation:
		return ic.deallocate(service, change.key)
	default:
		return nil
	}
}

// clearOldAllocation clears the old allocation for a service if it
// differs from a new allocation.  Returns a boolean indication of
// whether the old allocation was cleared.
func (ic *IngressIPController) clearOldAllocation(new, old *kapi.Service) bool {
	oldIP := ""
	if old != nil && old.Spec.Type == kapi.ServiceTypeLoadBalancer && len(old.Status.LoadBalancer.Ingress) > 0 {
		oldIP = old.Status.LoadBalancer.Ingress[0].IP
	}
	noOldAllocation := len(oldIP) == 0
	if noOldAllocation {
		return false
	}

	newIP := ""
	if new != nil && new.Spec.Type == kapi.ServiceTypeLoadBalancer && len(new.Status.LoadBalancer.Ingress) > 0 {
		newIP = new.Status.LoadBalancer.Ingress[0].IP
	}
	allocationUnchanged := newIP == oldIP
	if allocationUnchanged {
		return false
	}

	// New allocation differs from old due to update or deletion

	// Get the key from the old service since the new service may be nil
	if key, err := controller.KeyFunc(old); err == nil {
		ic.clearLocalAllocation(key, oldIP)
		return true
	} else {
		// Recovery/retry not possible for this error
		utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", old, err))
		return false
	}
}

// recordAllocation updates local state with the ingress ip indicated
// in a service's status and ensures that the ingress ip appears in
// the service's list of external ips.  If the service's ingress ip is
// invalid for any reason, a new ip will be allocated.
func (ic *IngressIPController) recordAllocation(service *kapi.Service, key string) error {
	// If more than one ingress ip is present, it will be ignored
	ipString := service.Status.LoadBalancer.Ingress[0].IP

	reallocate, err := ic.recordLocalAllocation(key, ipString)
	if !reallocate && err != nil {
		return err
	}
	reallocateMessage := ""
	if err != nil {
		reallocateMessage = err.Error()
	}

	// Make a copy to modify to avoid mutating cache state
	t, err := kapi.Scheme.DeepCopy(service)
	if err != nil {
		return err
	}
	service = t.(*kapi.Service)

	if reallocate {
		// TODO update the external ips but not the status since
		// allocate() will overwrite any existing allocation.
		if err = ic.clearPersistedAllocation(service, key, reallocateMessage); err != nil {
			return err
		}
		ic.recorder.Eventf(service, kapi.EventTypeWarning, "IngressIPReallocated", reallocateMessage)
		return ic.allocate(service, key)
	} else {
		// Ensure that the ingress ip is present in the service's spec.
		return ic.ensureExternalIP(service, key, ipString)
	}
}

// allocate assigns an unallocated ip to a service and updates the
// service's persisted state.
func (ic *IngressIPController) allocate(service *kapi.Service, key string) error {
	// Make a copy to avoid mutating cache state
	t, err := kapi.Scheme.DeepCopy(service)
	if err != nil {
		return err
	}
	service = t.(*kapi.Service)

	ip, err := ic.allocateIP(service.Spec.LoadBalancerIP)
	if err != nil {
		return err
	}
	ipString := ip.String()

	glog.V(5).Infof("Allocating ip %v to service %v", ipString, key)
	service.Status = kapi.ServiceStatus{
		LoadBalancer: kapi.LoadBalancerStatus{
			Ingress: []kapi.LoadBalancerIngress{
				{
					IP: ipString,
				},
			},
		},
	}
	if err = ic.persistServiceStatus(service); err != nil {
		if releaseErr := ic.ipAllocator.Release(ip); releaseErr != nil {
			// Release from contiguous allocator should never return an error, but just in case...
			utilruntime.HandleError(fmt.Errorf("Error releasing ip %v for service %v: %v", ipString, key, releaseErr))
		}
		return err
	}
	ic.allocationMap[ipString] = key

	return ic.ensureExternalIP(service, key, ipString)
}

// deallocate ensures that the ip currently allocated to a service is
// removed and that its loadbalancer status is cleared.
func (ic *IngressIPController) deallocate(service *kapi.Service, key string) error {
	glog.V(5).Infof("Clearing allocation state for %v", key)

	// Make a copy to modify to avoid mutating cache state
	t, err := kapi.Scheme.DeepCopy(service)
	if err != nil {
		return err
	}
	service = t.(*kapi.Service)

	// Get the ingress ip to remove from local allocation state before
	// it is removed from the service.
	ipString := service.Status.LoadBalancer.Ingress[0].IP

	if err = ic.clearPersistedAllocation(service, key, ""); err != nil {
		return err
	}

	ic.clearLocalAllocation(key, ipString)
	return nil
}

// clearLocalAllocation clears an in-memory allocation if it belongs
// to the specified service key.
func (ic *IngressIPController) clearLocalAllocation(key, ipString string) bool {
	glog.V(5).Infof("Attempting to clear local allocation of ip %v for service %v", ipString, key)

	ip := net.ParseIP(ipString)
	if ip == nil {
		// An invalid ip address cannot be deallocated
		utilruntime.HandleError(fmt.Errorf("Error parsing ip: %v", ipString))
		return false
	}

	ipKey, ok := ic.allocationMap[ipString]
	switch {
	case !ok:
		glog.V(6).Infof("IP address %v is not currently allocated", ipString)
		return false
	case key != ipKey:
		glog.V(6).Infof("IP address %v is not allocated to service %v", ipString, key)
		return false
	}

	// Remove allocation
	if err := ic.ipAllocator.Release(ip); err != nil {
		// Release from contiguous allocator should never return an error.
		utilruntime.HandleError(fmt.Errorf("Error releasing ip %v for service %v: %v", ipString, key, err))
		return false
	}
	delete(ic.allocationMap, ipString)
	glog.V(5).Infof("IP address %v is now available for allocation", ipString)
	return true
}

// clearPersistedAllocation ensures there is no ingress ip in the
// service's spec and that the service's status is cleared.
func (ic *IngressIPController) clearPersistedAllocation(service *kapi.Service, key, errMessage string) error {
	// Assume it is safe to modify the service without worrying about changing the local cache

	if len(errMessage) > 0 {
		utilruntime.HandleError(fmt.Errorf(errMessage))
	} else {
		glog.V(5).Infof("Attempting to clear persisted allocation for service: %v", key)
	}

	// An ingress ip is only allowed in ExternalIPs when a
	// corresponding status exists, so update the spec first to avoid
	// failing admission control.
	ingressIP := service.Status.LoadBalancer.Ingress[0].IP
	for i, ip := range service.Spec.ExternalIPs {
		if ip == ingressIP {
			glog.V(5).Infof("Removing ip %v from the external ips of service %v", ingressIP, key)
			service.Spec.ExternalIPs = append(service.Spec.ExternalIPs[:i], service.Spec.ExternalIPs[i+1:]...)
			if err := ic.persistServiceSpec(service); err != nil {
				return err
			}
			break
		}
	}

	service.Status.LoadBalancer = kapi.LoadBalancerStatus{}
	glog.V(5).Infof("Clearing the load balancer status of service: %v", key)
	return ic.persistServiceStatus(service)
}

// ensureExternalIP ensures that the provided service has the ingress
// ip persisted as an external ip.
func (ic *IngressIPController) ensureExternalIP(service *kapi.Service, key, ingressIP string) error {
	// Assume it is safe to modify the service without worrying about changing the local cache

	ipExists := false
	for _, ip := range service.Spec.ExternalIPs {
		if ip == ingressIP {
			ipExists = true
			glog.V(6).Infof("Service %v already has ip %v as an external ip", key, ingressIP)
			break
		}
	}
	if !ipExists {
		service.Spec.ExternalIPs = append(service.Spec.ExternalIPs, ingressIP)
		glog.V(5).Infof("Adding ip %v to service %v as an external ip", ingressIP, key)
		return ic.persistServiceSpec(service)
	}
	return nil
}

// allocateIP attempts to allocate the requested ip, and if that is
// not possible, allocates the next available address.
func (ic *IngressIPController) allocateIP(requestedIP string) (net.IP, error) {
	if len(requestedIP) == 0 {
		// Specific ip not requested
		return ic.ipAllocator.AllocateNext()
	}
	var ip net.IP
	if ip = net.ParseIP(requestedIP); ip == nil {
		// Invalid ip
		return ic.ipAllocator.AllocateNext()
	}
	if err := ic.ipAllocator.Allocate(ip); err != nil {
		// Unable to allocate requested ip
		return ic.ipAllocator.AllocateNext()
	}
	// Allocated requested ip
	return ip, nil
}

func (ic *IngressIPController) persistServiceSpec(service *kapi.Service) error {
	return ic.persistenceHandler(ic.client, service, false)
}

func (ic *IngressIPController) persistServiceStatus(service *kapi.Service) error {
	return ic.persistenceHandler(ic.client, service, true)
}

func persistService(client kcoreclient.ServicesGetter, service *kapi.Service, targetStatus bool) error {
	backoff := wait.Backoff{
		Steps:    clientRetryCount,
		Duration: clientRetryInterval,
		Factor:   clientRetryFactor,
	}
	return wait.ExponentialBackoff(backoff, func() (bool, error) {
		var err error
		if targetStatus {
			_, err = client.Services(service.Namespace).UpdateStatus(service)
		} else {
			_, err = client.Services(service.Namespace).Update(service)
		}
		switch {
		case err == nil:
			return true, nil
		case kerrors.IsNotFound(err):
			// If the service no longer exists, we don't want to recreate
			// it. Just bail out so that we can process the delete, which
			// we should soon be receiving if we haven't already.
			glog.V(5).Infof("Not persisting update to service '%s/%s' that no longer exists: %v",
				service.Namespace, service.Name, err)
			return true, nil
		case kerrors.IsConflict(err):
			// TODO: Try to resolve the conflict if the change was
			// unrelated to load balancer status. For now, just rely on
			// the fact that we'll also process the update that caused the
			// resource version to change.
			glog.V(5).Infof("Not persisting update to service '%s/%s' that has been changed since we received it: %v",
				service.Namespace, service.Name, err)
			return true, nil
		default:
			err = fmt.Errorf("Failed to persist updated LoadBalancerStatus to service '%s/%s': %v",
				service.Namespace, service.Name, err)
			return false, err
		}
	})
}