package controllers

import (


	kapi ""
	kclientset ""
	utilruntime ""

// DockerRegistryServiceControllerOptions contains options for the DockerRegistryServiceController
type DockerRegistryServiceControllerOptions struct {
	// Resync is the time.Duration at which to fully re-list services.
	// If zero, re-list will be delayed as long as possible
	Resync time.Duration

	RegistryNamespace   string
	RegistryServiceName string

	DockercfgController *DockercfgController

	// DockerURLsIntialized is used to send a signal to the DockercfgController that it has the correct set of docker urls
	DockerURLsIntialized chan struct{}

// NewDockerRegistryServiceController returns a new *DockerRegistryServiceController.
func NewDockerRegistryServiceController(cl kclientset.Interface, options DockerRegistryServiceControllerOptions) *DockerRegistryServiceController {
	e := &DockerRegistryServiceController{
		client:                cl,
		dockercfgController:   options.DockercfgController,
		registryLocationQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
		secretsToUpdate:       workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
		serviceName:           options.RegistryServiceName,
		serviceNamespace:      options.RegistryNamespace,
		dockerURLsIntialized:  options.DockerURLsIntialized,

	e.serviceCache, e.serviceController = framework.NewInformer(
			ListFunc: func(opts kapi.ListOptions) (runtime.Object, error) {
				opts.FieldSelector = fields.OneTermEqualSelector("", options.RegistryServiceName)
				return e.client.Core().Services(options.RegistryNamespace).List(opts)
			WatchFunc: func(opts kapi.ListOptions) (watch.Interface, error) {
				opts.FieldSelector = fields.OneTermEqualSelector("", options.RegistryServiceName)
				return e.client.Core().Services(options.RegistryNamespace).Watch(opts)
			AddFunc: func(obj interface{}) {
			UpdateFunc: func(old, cur interface{}) {
			DeleteFunc: func(obj interface{}) {
	e.servicesSynced = e.serviceController.HasSynced
	e.syncRegistryLocationHandler = e.syncRegistryLocationChange

	dockercfgOptions := kapi.ListOptions{FieldSelector: fields.SelectorFromSet(map[string]string{kapi.SecretTypeField: string(kapi.SecretTypeDockercfg)})}
	e.secretCache, e.secretController = framework.NewInformer(
			ListFunc: func(opts kapi.ListOptions) (runtime.Object, error) {
				return e.client.Core().Secrets(kapi.NamespaceAll).List(dockercfgOptions)
			WatchFunc: func(opts kapi.ListOptions) (watch.Interface, error) {
				return e.client.Core().Secrets(kapi.NamespaceAll).Watch(dockercfgOptions)
	e.secretsSynced = e.secretController.HasSynced
	e.syncSecretHandler = e.syncSecretUpdate

	return e

// DockerRegistryServiceController manages ServiceToken secrets for Service objects
type DockerRegistryServiceController struct {
	client kclientset.Interface

	serviceName      string
	serviceNamespace string

	dockercfgController *DockercfgController

	serviceController           *framework.Controller
	serviceCache                cache.Store
	servicesSynced              func() bool
	syncRegistryLocationHandler func(key string) error

	secretController  *framework.Controller
	secretCache       cache.Store
	secretsSynced     func() bool
	syncSecretHandler func(key string) error

	registryURLs          sets.String
	registryURLLock       sync.RWMutex
	registryLocationQueue workqueue.RateLimitingInterface
	secretsToUpdate       workqueue.RateLimitingInterface

	dockerURLsIntialized chan struct{}

// Runs controller loops and returns immediately
func (e *DockerRegistryServiceController) Run(workers int, stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()

	go e.serviceController.Run(stopCh)
	go e.secretController.Run(stopCh)

	// Wait for the store to sync before starting any work in this controller.
	ready := make(chan struct{})
	go e.waitForDockerURLs(ready, stopCh)
	select {
	case <-ready:
	case <-stopCh:

	go wait.Until(e.watchForDockerURLChanges, time.Second, stopCh)
	for i := 0; i < workers; i++ {
		go wait.Until(e.watchForDockercfgSecretUpdates, time.Second, stopCh)

	glog.Infof("Shutting down docker registry service controller")

// enqueue adds to our queue.  We only have one entry, but we never have to check it since we already know the things
// we're watching for.
func (e *DockerRegistryServiceController) enqueueRegistryLocationQueue() {

// waitForDockerURLs waits until all information required for fully determining the set of the internal docker registry
// hostnames and IPs are complete before continuing
// Once that work is done, the dockerconfig controller will be released to do work.
func (e *DockerRegistryServiceController) waitForDockerURLs(ready chan<- struct{}, stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()

	for !e.servicesSynced() || !e.secretsSynced() {
		// wait for the initialization to complete to be informed of a stop
		select {
		case <-time.After(100 * time.Millisecond):
		case <-stopCh:

	// after syncing, determine the current state and assume that we're up to date for it if you don't do this,
	// you'll get an initial storm as you mess with all the dockercfg secrets every time you startup
	urls := e.getDockerRegistryLocations()


func (e *DockerRegistryServiceController) setRegistryURLs(registryURLs ...string) {
	defer e.registryURLLock.Unlock()
	e.registryURLs = sets.NewString(registryURLs...)

func (e *DockerRegistryServiceController) getRegistryURLs() sets.String {
	defer e.registryURLLock.RUnlock()
	// return a copy to avoid any concurrent modification issues
	return sets.NewString(e.registryURLs.List()...)

// watchForDockerURLChanges runs a worker thread that just dequeues and processes items related to a docker URL change
func (e *DockerRegistryServiceController) watchForDockerURLChanges() {
	workFn := func() bool {
		key, quit := e.registryLocationQueue.Get()
		if quit {
			return true
		defer e.registryLocationQueue.Done(key)

		if err := e.syncRegistryLocationHandler(key.(string)); err == nil {
			// this means the request was successfully handled.  We should "forget" the item so that any retry
			// later on is reset

		} else {
			// if we had an error it means that we didn't handle it, which means that we want to requeue the work
			utilruntime.HandleError(fmt.Errorf("error syncing service, it will be retried: %v", err))

		return false

	for {
		if workFn() {

// getDockerRegistryLocations returns the dns form and the ip form of the secret
func (e *DockerRegistryServiceController) getDockerRegistryLocations() []string {
	key, err := controller.KeyFunc(&kapi.Service{ObjectMeta: kapi.ObjectMeta{Name: e.serviceName, Namespace: e.serviceNamespace}})
	if err != nil {
		return []string{}

	obj, exists, err := e.serviceCache.GetByKey(key)
	if err != nil {
		return []string{}
	if !exists {
		return []string{}
	service := obj.(*kapi.Service)

	hasClusterIP := (len(service.Spec.ClusterIP) > 0) && (net.ParseIP(service.Spec.ClusterIP) != nil)
	if hasClusterIP && len(service.Spec.Ports) > 0 {
		return []string{
			net.JoinHostPort(service.Spec.ClusterIP, fmt.Sprintf("%d", service.Spec.Ports[0].Port)),
			net.JoinHostPort(fmt.Sprintf("%s.%s.svc", service.Name, service.Namespace), fmt.Sprintf("%d", service.Spec.Ports[0].Port)),

	return []string{}

// syncRegistryLocationChange goes through all service account dockercfg secrets and updates them to point at a new docker-registry location
func (e *DockerRegistryServiceController) syncRegistryLocationChange(key string) error {
	newDockerRegistryLocations := sets.NewString(e.getDockerRegistryLocations()...)
	if e.getRegistryURLs().Equal(newDockerRegistryLocations) {
		glog.V(4).Infof("No effective update: %v", newDockerRegistryLocations)
		return nil

	// make sure that new dockercfg secrets get the correct locations

	// we've changed the docker registry URL.  Add items to the work queue for all known secrets
	// new secrets will already get the updated value.
	for _, obj := range e.secretCache.List() {
		key, err := controller.KeyFunc(obj)
		if err != nil {
			glog.Errorf("Couldn't get key for object %+v: %v", obj, err)

	return nil

// watchForDockercfgSecretUpdates watches the work queue for entries that indicate that it should modify dockercfg secrets with new
// docker registry URLs
func (e *DockerRegistryServiceController) watchForDockercfgSecretUpdates() {
	workFn := func() bool {
		key, quit := e.secretsToUpdate.Get()
		if quit {
			return true
		defer e.secretsToUpdate.Done(key)

		if err := e.syncSecretHandler(key.(string)); err == nil {
			// this means the request was successfully handled.  We should "forget" the item so that any retry
			// later on is reset

		} else {
			// if we had an error it means that we didn't handle it, which means that we want to requeue the work
			utilruntime.HandleError(fmt.Errorf("error syncing service, it will be retried: %v", err))

		return false

	for {
		if workFn() {

func (e *DockerRegistryServiceController) syncSecretUpdate(key string) error {
	obj, exists, err := e.secretCache.GetByKey(key)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("Unable to retrieve secret %v from store: %v", key, err))
		return err
	if !exists {
		return nil

	dockerRegistryURLs := e.getRegistryURLs()
	sharedDockercfgSecret := obj.(*kapi.Secret)

	dockercfg := &credentialprovider.DockerConfig{}
	// an error here doesn't matter.  If we can't deserialize this, we'll replace it with one that works.
	json.Unmarshal(sharedDockercfgSecret.Data[kapi.DockerConfigKey], dockercfg)

	dockercfgMap := map[string]credentialprovider.DockerConfigEntry(*dockercfg)
	existingDockercfgSecretLocations := sets.StringKeySet(dockercfgMap)
	// if the existingDockercfgSecretLocations haven't changed, don't make an update and check the next one
	if existingDockercfgSecretLocations.Equal(dockerRegistryURLs) {
		return nil

	// we need to update it, make a copy
	uncastObj, err := kapi.Scheme.DeepCopy(obj)
	if err != nil {
		return err
	dockercfgSecret := uncastObj.(*kapi.Secret)

	dockerCredentials := dockercfgSecret.Annotations[ServiceAccountTokenValueAnnotation]
	if len(dockerCredentials) == 0 && len(existingDockercfgSecretLocations) > 0 {
		dockerCredentials = dockercfgMap[existingDockercfgSecretLocations.List()[0]].Password
	if len(dockerCredentials) == 0 {
		tokenSecretKey := dockercfgSecret.Namespace + "/" + dockercfgSecret.Annotations[ServiceAccountTokenSecretNameKey]
		tokenSecret, exists, err := e.secretCache.GetByKey(tokenSecretKey)
		if !exists {
			utilruntime.HandleError(fmt.Errorf("cannot determine SA token due to missing secret: %v", tokenSecretKey))
			return nil
		if err != nil {
			utilruntime.HandleError(fmt.Errorf("cannot determine SA token: %v", err))
			return nil
		dockerCredentials = string(tokenSecret.(*kapi.Secret).Data[kapi.ServiceAccountTokenKey])

	newDockercfgMap := credentialprovider.DockerConfig{}
	for key := range dockerRegistryURLs {
		newDockercfgMap[key] = credentialprovider.DockerConfigEntry{
			Username: "serviceaccount",
			Password: dockerCredentials,
			Email:    "",

	dockercfgContent, err := json.Marshal(&newDockercfgMap)
	if err != nil {
		return nil
	dockercfgSecret.Data[kapi.DockerConfigKey] = dockercfgContent

	if _, err := e.client.Core().Secrets(dockercfgSecret.Namespace).Update(dockercfgSecret); err != nil {
		return err

	return nil