package controllers

import (
	"encoding/json"
	"fmt"
	"net"
	"sync"
	"time"

	"github.com/golang/glog"

	kapi "k8s.io/kubernetes/pkg/api"
	"k8s.io/kubernetes/pkg/client/cache"
	kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
	"k8s.io/kubernetes/pkg/controller"
	"k8s.io/kubernetes/pkg/controller/framework"
	"k8s.io/kubernetes/pkg/credentialprovider"
	"k8s.io/kubernetes/pkg/fields"
	"k8s.io/kubernetes/pkg/runtime"
	utilruntime "k8s.io/kubernetes/pkg/util/runtime"
	"k8s.io/kubernetes/pkg/util/sets"
	"k8s.io/kubernetes/pkg/util/wait"
	"k8s.io/kubernetes/pkg/util/workqueue"
	"k8s.io/kubernetes/pkg/watch"
)

// DockerRegistryServiceControllerOptions contains options for the DockerRegistryServiceController
type DockerRegistryServiceControllerOptions struct {
	// Resync is the time.Duration at which to fully re-list services.
	// If zero, re-list will be delayed as long as possible
	Resync time.Duration

	RegistryNamespace   string
	RegistryServiceName string

	DockercfgController *DockercfgController

	// DockerURLsIntialized is used to send a signal to the DockercfgController that it has the correct set of docker urls
	DockerURLsIntialized chan struct{}
}

// NewDockerRegistryServiceController returns a new *DockerRegistryServiceController.
func NewDockerRegistryServiceController(cl kclientset.Interface, options DockerRegistryServiceControllerOptions) *DockerRegistryServiceController {
	e := &DockerRegistryServiceController{
		client:                cl,
		dockercfgController:   options.DockercfgController,
		registryLocationQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
		secretsToUpdate:       workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
		serviceName:           options.RegistryServiceName,
		serviceNamespace:      options.RegistryNamespace,
		dockerURLsIntialized:  options.DockerURLsIntialized,
	}

	e.serviceCache, e.serviceController = framework.NewInformer(
		&cache.ListWatch{
			ListFunc: func(opts kapi.ListOptions) (runtime.Object, error) {
				opts.FieldSelector = fields.OneTermEqualSelector("metadata.name", options.RegistryServiceName)
				return e.client.Core().Services(options.RegistryNamespace).List(opts)
			},
			WatchFunc: func(opts kapi.ListOptions) (watch.Interface, error) {
				opts.FieldSelector = fields.OneTermEqualSelector("metadata.name", options.RegistryServiceName)
				return e.client.Core().Services(options.RegistryNamespace).Watch(opts)
			},
		},
		&kapi.Service{},
		options.Resync,
		framework.ResourceEventHandlerFuncs{
			AddFunc: func(obj interface{}) {
				e.enqueueRegistryLocationQueue()
			},
			UpdateFunc: func(old, cur interface{}) {
				e.enqueueRegistryLocationQueue()
			},
			DeleteFunc: func(obj interface{}) {
				e.enqueueRegistryLocationQueue()
			},
		},
	)
	e.servicesSynced = e.serviceController.HasSynced
	e.syncRegistryLocationHandler = e.syncRegistryLocationChange

	dockercfgOptions := kapi.ListOptions{FieldSelector: fields.SelectorFromSet(map[string]string{kapi.SecretTypeField: string(kapi.SecretTypeDockercfg)})}
	e.secretCache, e.secretController = framework.NewInformer(
		&cache.ListWatch{
			ListFunc: func(opts kapi.ListOptions) (runtime.Object, error) {
				return e.client.Core().Secrets(kapi.NamespaceAll).List(dockercfgOptions)
			},
			WatchFunc: func(opts kapi.ListOptions) (watch.Interface, error) {
				return e.client.Core().Secrets(kapi.NamespaceAll).Watch(dockercfgOptions)
			},
		},
		&kapi.Secret{},
		options.Resync,
		framework.ResourceEventHandlerFuncs{},
	)
	e.secretsSynced = e.secretController.HasSynced
	e.syncSecretHandler = e.syncSecretUpdate

	return e
}

// DockerRegistryServiceController manages ServiceToken secrets for Service objects
type DockerRegistryServiceController struct {
	client kclientset.Interface

	serviceName      string
	serviceNamespace string

	dockercfgController *DockercfgController

	serviceController           *framework.Controller
	serviceCache                cache.Store
	servicesSynced              func() bool
	syncRegistryLocationHandler func(key string) error

	secretController  *framework.Controller
	secretCache       cache.Store
	secretsSynced     func() bool
	syncSecretHandler func(key string) error

	registryURLs          sets.String
	registryURLLock       sync.RWMutex
	registryLocationQueue workqueue.RateLimitingInterface
	secretsToUpdate       workqueue.RateLimitingInterface

	dockerURLsIntialized chan struct{}
}

// Runs controller loops and returns immediately
func (e *DockerRegistryServiceController) Run(workers int, stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()

	go e.serviceController.Run(stopCh)
	go e.secretController.Run(stopCh)

	// Wait for the store to sync before starting any work in this controller.
	ready := make(chan struct{})
	go e.waitForDockerURLs(ready, stopCh)
	select {
	case <-ready:
	case <-stopCh:
		return
	}

	go wait.Until(e.watchForDockerURLChanges, time.Second, stopCh)
	for i := 0; i < workers; i++ {
		go wait.Until(e.watchForDockercfgSecretUpdates, time.Second, stopCh)
	}

	<-stopCh
	glog.Infof("Shutting down docker registry service controller")
	e.registryLocationQueue.ShutDown()
}

// enqueue adds to our queue.  We only have one entry, but we never have to check it since we already know the things
// we're watching for.
func (e *DockerRegistryServiceController) enqueueRegistryLocationQueue() {
	e.registryLocationQueue.Add("check")
}

// waitForDockerURLs waits until all information required for fully determining the set of the internal docker registry
// hostnames and IPs are complete before continuing
// Once that work is done, the dockerconfig controller will be released to do work.
func (e *DockerRegistryServiceController) waitForDockerURLs(ready chan<- struct{}, stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()

	for !e.servicesSynced() || !e.secretsSynced() {
		// wait for the initialization to complete to be informed of a stop
		select {
		case <-time.After(100 * time.Millisecond):
		case <-stopCh:
			return
		}
	}

	// after syncing, determine the current state and assume that we're up to date for it if you don't do this,
	// you'll get an initial storm as you mess with all the dockercfg secrets every time you startup
	urls := e.getDockerRegistryLocations()
	e.setRegistryURLs(urls...)
	e.dockercfgController.SetDockerURLs(urls...)
	close(e.dockerURLsIntialized)
	close(ready)

	return
}

func (e *DockerRegistryServiceController) setRegistryURLs(registryURLs ...string) {
	e.registryURLLock.Lock()
	defer e.registryURLLock.Unlock()
	e.registryURLs = sets.NewString(registryURLs...)
}

func (e *DockerRegistryServiceController) getRegistryURLs() sets.String {
	e.registryURLLock.RLock()
	defer e.registryURLLock.RUnlock()
	// return a copy to avoid any concurrent modification issues
	return sets.NewString(e.registryURLs.List()...)
}

// watchForDockerURLChanges runs a worker thread that just dequeues and processes items related to a docker URL change
func (e *DockerRegistryServiceController) watchForDockerURLChanges() {
	workFn := func() bool {
		key, quit := e.registryLocationQueue.Get()
		if quit {
			return true
		}
		defer e.registryLocationQueue.Done(key)

		if err := e.syncRegistryLocationHandler(key.(string)); err == nil {
			// this means the request was successfully handled.  We should "forget" the item so that any retry
			// later on is reset
			e.registryLocationQueue.Forget(key)

		} else {
			// if we had an error it means that we didn't handle it, which means that we want to requeue the work
			utilruntime.HandleError(fmt.Errorf("error syncing service, it will be retried: %v", err))
			e.registryLocationQueue.AddRateLimited(key)
		}

		return false
	}

	for {
		if workFn() {
			return
		}
	}
}

// getDockerRegistryLocations returns the dns form and the ip form of the secret
func (e *DockerRegistryServiceController) getDockerRegistryLocations() []string {
	key, err := controller.KeyFunc(&kapi.Service{ObjectMeta: kapi.ObjectMeta{Name: e.serviceName, Namespace: e.serviceNamespace}})
	if err != nil {
		return []string{}
	}

	obj, exists, err := e.serviceCache.GetByKey(key)
	if err != nil {
		return []string{}
	}
	if !exists {
		return []string{}
	}
	service := obj.(*kapi.Service)

	hasClusterIP := (len(service.Spec.ClusterIP) > 0) && (net.ParseIP(service.Spec.ClusterIP) != nil)
	if hasClusterIP && len(service.Spec.Ports) > 0 {
		return []string{
			net.JoinHostPort(service.Spec.ClusterIP, fmt.Sprintf("%d", service.Spec.Ports[0].Port)),
			net.JoinHostPort(fmt.Sprintf("%s.%s.svc", service.Name, service.Namespace), fmt.Sprintf("%d", service.Spec.Ports[0].Port)),
		}
	}

	return []string{}
}

// syncRegistryLocationChange goes through all service account dockercfg secrets and updates them to point at a new docker-registry location
func (e *DockerRegistryServiceController) syncRegistryLocationChange(key string) error {
	newDockerRegistryLocations := sets.NewString(e.getDockerRegistryLocations()...)
	if e.getRegistryURLs().Equal(newDockerRegistryLocations) {
		glog.V(4).Infof("No effective update: %v", newDockerRegistryLocations)
		return nil
	}

	// make sure that new dockercfg secrets get the correct locations
	e.dockercfgController.SetDockerURLs(newDockerRegistryLocations.List()...)
	e.setRegistryURLs(newDockerRegistryLocations.List()...)

	// we've changed the docker registry URL.  Add items to the work queue for all known secrets
	// new secrets will already get the updated value.
	for _, obj := range e.secretCache.List() {
		key, err := controller.KeyFunc(obj)
		if err != nil {
			glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
			continue
		}
		e.secretsToUpdate.Add(key)
	}

	return nil
}

// watchForDockercfgSecretUpdates watches the work queue for entries that indicate that it should modify dockercfg secrets with new
// docker registry URLs
func (e *DockerRegistryServiceController) watchForDockercfgSecretUpdates() {
	workFn := func() bool {
		key, quit := e.secretsToUpdate.Get()
		if quit {
			return true
		}
		defer e.secretsToUpdate.Done(key)

		if err := e.syncSecretHandler(key.(string)); err == nil {
			// this means the request was successfully handled.  We should "forget" the item so that any retry
			// later on is reset
			e.secretsToUpdate.Forget(key)

		} else {
			// if we had an error it means that we didn't handle it, which means that we want to requeue the work
			utilruntime.HandleError(fmt.Errorf("error syncing service, it will be retried: %v", err))
			e.secretsToUpdate.AddRateLimited(key)
		}

		return false
	}

	for {
		if workFn() {
			return
		}
	}
}

func (e *DockerRegistryServiceController) syncSecretUpdate(key string) error {
	obj, exists, err := e.secretCache.GetByKey(key)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("Unable to retrieve secret %v from store: %v", key, err))
		return err
	}
	if !exists {
		return nil
	}

	dockerRegistryURLs := e.getRegistryURLs()
	sharedDockercfgSecret := obj.(*kapi.Secret)

	dockercfg := &credentialprovider.DockerConfig{}
	// an error here doesn't matter.  If we can't deserialize this, we'll replace it with one that works.
	json.Unmarshal(sharedDockercfgSecret.Data[kapi.DockerConfigKey], dockercfg)

	dockercfgMap := map[string]credentialprovider.DockerConfigEntry(*dockercfg)
	existingDockercfgSecretLocations := sets.StringKeySet(dockercfgMap)
	// if the existingDockercfgSecretLocations haven't changed, don't make an update and check the next one
	if existingDockercfgSecretLocations.Equal(dockerRegistryURLs) {
		return nil
	}

	// we need to update it, make a copy
	uncastObj, err := kapi.Scheme.DeepCopy(obj)
	if err != nil {
		return err
	}
	dockercfgSecret := uncastObj.(*kapi.Secret)

	dockerCredentials := dockercfgSecret.Annotations[ServiceAccountTokenValueAnnotation]
	if len(dockerCredentials) == 0 && len(existingDockercfgSecretLocations) > 0 {
		dockerCredentials = dockercfgMap[existingDockercfgSecretLocations.List()[0]].Password
	}
	if len(dockerCredentials) == 0 {
		tokenSecretKey := dockercfgSecret.Namespace + "/" + dockercfgSecret.Annotations[ServiceAccountTokenSecretNameKey]
		tokenSecret, exists, err := e.secretCache.GetByKey(tokenSecretKey)
		if !exists {
			utilruntime.HandleError(fmt.Errorf("cannot determine SA token due to missing secret: %v", tokenSecretKey))
			return nil
		}
		if err != nil {
			utilruntime.HandleError(fmt.Errorf("cannot determine SA token: %v", err))
			return nil
		}
		dockerCredentials = string(tokenSecret.(*kapi.Secret).Data[kapi.ServiceAccountTokenKey])
	}

	newDockercfgMap := credentialprovider.DockerConfig{}
	for key := range dockerRegistryURLs {
		newDockercfgMap[key] = credentialprovider.DockerConfigEntry{
			Username: "serviceaccount",
			Password: dockerCredentials,
			Email:    "serviceaccount@example.org",
		}
	}

	dockercfgContent, err := json.Marshal(&newDockercfgMap)
	if err != nil {
		utilruntime.HandleError(err)
		return nil
	}
	dockercfgSecret.Data[kapi.DockerConfigKey] = dockercfgContent

	if _, err := e.client.Core().Secrets(dockercfgSecret.Namespace).Update(dockercfgSecret); err != nil {
		return err
	}

	return nil
}