package deployment

import (
	"fmt"
	"time"

	kapi "k8s.io/kubernetes/pkg/api"
	kclient "k8s.io/kubernetes/pkg/client"
	"k8s.io/kubernetes/pkg/client/cache"
	"k8s.io/kubernetes/pkg/client/record"
	"k8s.io/kubernetes/pkg/fields"
	"k8s.io/kubernetes/pkg/labels"
	"k8s.io/kubernetes/pkg/runtime"
	kutil "k8s.io/kubernetes/pkg/util"
	"k8s.io/kubernetes/pkg/watch"

	controller "github.com/openshift/origin/pkg/controller"
	deployapi "github.com/openshift/origin/pkg/deploy/api"
	deployutil "github.com/openshift/origin/pkg/deploy/util"
)

// DeploymentControllerFactory can create a DeploymentController that creates
// deployer pods in a configurable way.
type DeploymentControllerFactory struct {
	// KubeClient is a Kubernetes client.
	KubeClient kclient.Interface
	// Codec is used for encoding/decoding.
	Codec runtime.Codec
	// ServiceAccount is the service account name to run deployer pods as
	ServiceAccount string
	// Environment is a set of environment which should be injected into all deployer pod containers.
	Environment []kapi.EnvVar
	// DeployerImage specifies which Docker image can support the default strategies.
	DeployerImage string
}

// Create creates a DeploymentController.
func (factory *DeploymentControllerFactory) Create() controller.RunnableController {
	deploymentLW := &deployutil.ListWatcherImpl{
		// TODO: Investigate specifying annotation field selectors to fetch only 'deployments'
		// Currently field selectors are not supported for replication controllers
		ListFunc: func() (runtime.Object, error) {
			return factory.KubeClient.ReplicationControllers(kapi.NamespaceAll).List(labels.Everything())
		},
		WatchFunc: func(resourceVersion string) (watch.Interface, error) {
			return factory.KubeClient.ReplicationControllers(kapi.NamespaceAll).Watch(labels.Everything(), fields.Everything(), resourceVersion)
		},
	}
	deploymentQueue := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
	cache.NewReflector(deploymentLW, &kapi.ReplicationController{}, deploymentQueue, 2*time.Minute).Run()

	eventBroadcaster := record.NewBroadcaster()
	eventBroadcaster.StartRecordingToSink(factory.KubeClient.Events(""))

	deployController := &DeploymentController{
		serviceAccount: factory.ServiceAccount,
		deploymentClient: &deploymentClientImpl{
			getDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) {
				return factory.KubeClient.ReplicationControllers(namespace).Get(name)
			},
			updateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) {
				return factory.KubeClient.ReplicationControllers(namespace).Update(deployment)
			},
		},
		podClient: &podClientImpl{
			getPodFunc: func(namespace, name string) (*kapi.Pod, error) {
				return factory.KubeClient.Pods(namespace).Get(name)
			},
			createPodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) {
				return factory.KubeClient.Pods(namespace).Create(pod)
			},
			deletePodFunc: func(namespace, name string) error {
				return factory.KubeClient.Pods(namespace).Delete(name, nil)
			},
			updatePodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) {
				return factory.KubeClient.Pods(namespace).Update(pod)
			},
			// Find deployer pods using the label they should all have which
			// correlates them to the named deployment.
			getDeployerPodsForFunc: func(namespace, name string) ([]kapi.Pod, error) {
				labelSel, err := labels.Parse(fmt.Sprintf("%s=%s", deployapi.DeployerPodForDeploymentLabel, name))
				if err != nil {
					return []kapi.Pod{}, err
				}
				pods, err := factory.KubeClient.Pods(namespace).List(labelSel, fields.Everything())
				if err != nil {
					return []kapi.Pod{}, err
				}
				return pods.Items, nil
			},
		},
		makeContainer: func(strategy *deployapi.DeploymentStrategy) (*kapi.Container, error) {
			return factory.makeContainer(strategy)
		},
		decodeConfig: func(deployment *kapi.ReplicationController) (*deployapi.DeploymentConfig, error) {
			return deployutil.DecodeDeploymentConfig(deployment, factory.Codec)
		},
		recorder: eventBroadcaster.NewRecorder(kapi.EventSource{Component: "deployer"}),
	}

	return &controller.RetryController{
		Queue: deploymentQueue,
		RetryManager: controller.NewQueueRetryManager(
			deploymentQueue,
			cache.MetaNamespaceKeyFunc,
			func(obj interface{}, err error, retries controller.Retry) bool {
				if _, isFatal := err.(fatalError); isFatal {
					kutil.HandleError(err)
					return false
				}
				if retries.Count > 1 {
					return false
				}
				return true
			},
			kutil.NewTokenBucketRateLimiter(1, 10),
		),
		Handle: func(obj interface{}) error {
			deployment := obj.(*kapi.ReplicationController)
			return deployController.Handle(deployment)
		},
	}
}

// makeContainer creates containers in the following way:
//
//   1. For the Recreate and Rolling strategies, strategy, use the factory's
//      DeployerImage as the container image, and the factory's Environment
//      as the container environment.
//   2. For all Custom strategy, use the strategy's image for the container
//      image, and use the combination of the factory's Environment and the
//      strategy's environment as the container environment.
//
// An error is returned if the deployment strategy type is not supported.
func (factory *DeploymentControllerFactory) makeContainer(strategy *deployapi.DeploymentStrategy) (*kapi.Container, error) {
	// Set default environment values
	environment := []kapi.EnvVar{}
	for _, env := range factory.Environment {
		environment = append(environment, env)
	}

	// Every strategy type should be handled here.
	switch strategy.Type {
	case deployapi.DeploymentStrategyTypeRecreate, deployapi.DeploymentStrategyTypeRolling:
		// Use the factory-configured image.
		return &kapi.Container{
			Image: factory.DeployerImage,
			Env:   environment,
		}, nil
	case deployapi.DeploymentStrategyTypeCustom:
		// Use user-defined values from the strategy input.
		for _, env := range strategy.CustomParams.Environment {
			environment = append(environment, env)
		}
		return &kapi.Container{
			Image: strategy.CustomParams.Image,
			Env:   environment,
		}, nil
	default:
		return nil, fmt.Errorf("unsupported deployment strategy type: %s", strategy.Type)
	}
}