package kubernetes

import (
	"fmt"
	"io/ioutil"
	"net"
	"os"
	"time"

	"github.com/emicklei/go-restful"
	"github.com/golang/glog"

	kctrlmgr "k8s.io/kubernetes/cmd/kube-controller-manager/app"
	federationv1beta1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
	kapi "k8s.io/kubernetes/pkg/api"
	"k8s.io/kubernetes/pkg/api/unversioned"
	"k8s.io/kubernetes/pkg/api/v1"
	appsv1alpha1 "k8s.io/kubernetes/pkg/apis/apps/v1alpha1"
	autoscalingv1 "k8s.io/kubernetes/pkg/apis/autoscaling/v1"
	"k8s.io/kubernetes/pkg/apis/batch"
	batchv1 "k8s.io/kubernetes/pkg/apis/batch/v1"
	batchv2alpha1 "k8s.io/kubernetes/pkg/apis/batch/v2alpha1"
	certificatesv1alpha1 "k8s.io/kubernetes/pkg/apis/certificates/v1alpha1"
	"k8s.io/kubernetes/pkg/apis/componentconfig"
	extv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
	policyv1alpha1 "k8s.io/kubernetes/pkg/apis/policy/v1alpha1"
	kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
	"k8s.io/kubernetes/pkg/client/record"
	"k8s.io/kubernetes/pkg/client/restclient"
	"k8s.io/kubernetes/pkg/client/typed/dynamic"
	kclient "k8s.io/kubernetes/pkg/client/unversioned"
	adapter "k8s.io/kubernetes/pkg/client/unversioned/adapters/internalclientset"
	"k8s.io/kubernetes/pkg/controller/daemon"
	"k8s.io/kubernetes/pkg/controller/deployment"
	"k8s.io/kubernetes/pkg/controller/disruption"
	endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
	"k8s.io/kubernetes/pkg/controller/garbagecollector"
	"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
	jobcontroller "k8s.io/kubernetes/pkg/controller/job"
	namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace"
	nodecontroller "k8s.io/kubernetes/pkg/controller/node"
	petsetcontroller "k8s.io/kubernetes/pkg/controller/petset"
	podautoscalercontroller "k8s.io/kubernetes/pkg/controller/podautoscaler"
	"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
	gccontroller "k8s.io/kubernetes/pkg/controller/podgc"
	replicasetcontroller "k8s.io/kubernetes/pkg/controller/replicaset"
	replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
	"k8s.io/kubernetes/pkg/controller/scheduledjob"
	servicecontroller "k8s.io/kubernetes/pkg/controller/service"
	attachdetachcontroller "k8s.io/kubernetes/pkg/controller/volume/attachdetach"
	persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
	"k8s.io/kubernetes/pkg/master"
	"k8s.io/kubernetes/pkg/registry/generic"
	"k8s.io/kubernetes/pkg/runtime"
	"k8s.io/kubernetes/pkg/runtime/serializer"
	"k8s.io/kubernetes/pkg/storage"
	storagefactory "k8s.io/kubernetes/pkg/storage/storagebackend/factory"
	utilwait "k8s.io/kubernetes/pkg/util/wait"

	"k8s.io/kubernetes/pkg/registry/endpoint"
	endpointsetcd "k8s.io/kubernetes/pkg/registry/endpoint/etcd"
	"k8s.io/kubernetes/pkg/volume"
	"k8s.io/kubernetes/pkg/volume/aws_ebs"
	"k8s.io/kubernetes/pkg/volume/cinder"
	"k8s.io/kubernetes/pkg/volume/flexvolume"
	"k8s.io/kubernetes/pkg/volume/gce_pd"
	"k8s.io/kubernetes/pkg/volume/glusterfs"
	"k8s.io/kubernetes/pkg/volume/host_path"
	"k8s.io/kubernetes/pkg/volume/nfs"
	"k8s.io/kubernetes/pkg/volume/rbd"
	"k8s.io/kubernetes/pkg/volume/vsphere_volume"

	"k8s.io/kubernetes/plugin/pkg/scheduler"
	_ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider"
	schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
	latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest"
	"k8s.io/kubernetes/plugin/pkg/scheduler/factory"

	osclient "github.com/openshift/origin/pkg/client"
	configapi "github.com/openshift/origin/pkg/cmd/server/api"
	"github.com/openshift/origin/pkg/cmd/server/election"
)

const (
	KubeAPIPrefix      = "/api"
	KubeAPIGroupPrefix = "/apis"
)

// InstallAPI starts a Kubernetes master and registers the supported REST APIs
// into the provided mux, then returns an array of strings indicating what
// endpoints were started (these are format strings that will expect to be sent
// a single string value).
func (c *MasterConfig) InstallAPI(container *restful.Container) ([]string, error) {
	c.Master.RestfulContainer = container

	if c.Master.EnableCoreControllers {
		glog.V(2).Info("Using the lease endpoint reconciler")
		config, err := c.Master.StorageFactory.NewConfig(kapi.Resource("apiServerIPInfo"))
		if err != nil {
			return nil, err
		}
		leaseStorage, _, err := storagefactory.Create(*config)
		if err != nil {
			return nil, err
		}
		masterLeases := newMasterLeases(leaseStorage)

		endpointConfig, err := c.Master.StorageFactory.NewConfig(kapi.Resource("endpoints"))
		if err != nil {
			return nil, err
		}
		endpointsStorage := endpointsetcd.NewREST(generic.RESTOptions{
			StorageConfig:           endpointConfig,
			Decorator:               generic.UndecoratedStorage,
			DeleteCollectionWorkers: 0,
			ResourcePrefix:          c.Master.StorageFactory.ResourcePrefix(kapi.Resource("endpoints")),
		})

		endpointRegistry := endpoint.NewRegistry(endpointsStorage)

		c.Master.EndpointReconcilerConfig = master.EndpointReconcilerConfig{
			Reconciler: election.NewLeaseEndpointReconciler(endpointRegistry, masterLeases),
			Interval:   master.DefaultEndpointReconcilerInterval,
		}
	}

	_, err := master.New(c.Master)
	if err != nil {
		return nil, err
	}

	messages := []string{}
	// v1 has to be printed separately since it's served from different endpoint than groups
	if configapi.HasKubernetesAPIVersion(c.Options, v1.SchemeGroupVersion) {
		messages = append(messages, fmt.Sprintf("Started Kubernetes API at %%s%s", KubeAPIPrefix))
	}

	// TODO: this is a bit much - I exist in some code somewhere
	versions := []unversioned.GroupVersion{
		extv1beta1.SchemeGroupVersion,
		batchv1.SchemeGroupVersion,
		batchv2alpha1.SchemeGroupVersion,
		autoscalingv1.SchemeGroupVersion,
		certificatesv1alpha1.SchemeGroupVersion,
		appsv1alpha1.SchemeGroupVersion,
		policyv1alpha1.SchemeGroupVersion,
		federationv1beta1.SchemeGroupVersion,
	}
	for _, ver := range versions {
		if configapi.HasKubernetesAPIVersion(c.Options, ver) {
			messages = append(messages, fmt.Sprintf("Started Kubernetes API %s at %%s%s", ver.String(), KubeAPIGroupPrefix))
		}
	}

	return messages, nil
}

func newMasterLeases(storage storage.Interface) election.Leases {
	// leaseTTL is in seconds, i.e. 15 means 15 seconds; do NOT do 15*time.Second!
	leaseTTL := uint64((master.DefaultEndpointReconcilerInterval + 5*time.Second) / time.Second) // add 5 seconds for wiggle room
	return election.NewLeases(storage, "/masterleases/", leaseTTL)
}

// RunNamespaceController starts the Kubernetes Namespace Manager
func (c *MasterConfig) RunNamespaceController(kubeClient kclientset.Interface, clientPool dynamic.ClientPool) {
	// Find the list of namespaced resources via discovery that the namespace controller must manage
	groupVersionResources, err := kubeClient.Discovery().ServerPreferredNamespacedResources()
	if err != nil {
		glog.Fatalf("Failed to get supported resources from server: %v", err)
	}
	namespaceController := namespacecontroller.NewNamespaceController(kubeClient, clientPool, groupVersionResources, c.ControllerManager.NamespaceSyncPeriod.Duration, kapi.FinalizerKubernetes)
	go namespaceController.Run(int(c.ControllerManager.ConcurrentNamespaceSyncs), utilwait.NeverStop)
}

func (c *MasterConfig) RunPersistentVolumeController(client *kclientset.Clientset, namespace, recyclerImageName, recyclerServiceAccountName string) {
	s := c.ControllerManager

	alphaProvisioner, err := kctrlmgr.NewAlphaVolumeProvisioner(c.CloudProvider, s.VolumeConfiguration)
	if err != nil {
		glog.Fatalf("A backward-compatible provisioner could not be created: %v, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.", err)
	}

	volumeController := persistentvolumecontroller.NewPersistentVolumeController(
		client,
		s.PVClaimBinderSyncPeriod.Duration,
		alphaProvisioner,
		probeRecyclableVolumePlugins(s.VolumeConfiguration, namespace, recyclerImageName, recyclerServiceAccountName),
		c.CloudProvider,
		s.ClusterName,
		nil, nil, nil,
		nil, // event recorder
		s.VolumeConfiguration.EnableDynamicProvisioning,
	)
	volumeController.Run(utilwait.NeverStop)
}

func (c *MasterConfig) RunPersistentVolumeAttachDetachController(client *kclientset.Clientset) {
	s := c.ControllerManager
	attachDetachController, err :=
		attachdetachcontroller.NewAttachDetachController(
			client,
			c.Informers.Pods().Informer(),
			c.Informers.Nodes().Informer(),
			c.Informers.PersistentVolumeClaims().Informer(),
			c.Informers.PersistentVolumes().Informer(),
			c.CloudProvider,
			kctrlmgr.ProbeAttachableVolumePlugins(s.VolumeConfiguration),
			nil,
		)
	if err != nil {
		glog.Fatalf("Failed to start attach/detach controller: %v", err)
	} else {
		go attachDetachController.Run(utilwait.NeverStop)
	}
}

// probeRecyclableVolumePlugins collects all persistent volume plugins into an easy to use list.
func probeRecyclableVolumePlugins(config componentconfig.VolumeConfiguration, namespace, recyclerImageName, recyclerServiceAccountName string) []volume.VolumePlugin {
	uid := int64(0)
	defaultScrubPod := volume.NewPersistentVolumeRecyclerPodTemplate()
	defaultScrubPod.Namespace = namespace
	defaultScrubPod.Spec.ServiceAccountName = recyclerServiceAccountName
	defaultScrubPod.Spec.Containers[0].Image = recyclerImageName
	defaultScrubPod.Spec.Containers[0].Command = []string{"/usr/bin/openshift-recycle"}
	defaultScrubPod.Spec.Containers[0].Args = []string{"/scrub"}
	defaultScrubPod.Spec.Containers[0].SecurityContext = &kapi.SecurityContext{RunAsUser: &uid}
	defaultScrubPod.Spec.Containers[0].ImagePullPolicy = kapi.PullIfNotPresent

	allPlugins := []volume.VolumePlugin{}

	// The list of plugins to probe is decided by this binary, not
	// by dynamic linking or other "magic".  Plugins will be analyzed and
	// initialized later.

	// Each plugin can make use of VolumeConfig.  The single arg to this func contains *all* enumerated
	// options meant to configure volume plugins.  From that single config, create an instance of volume.VolumeConfig
	// for a specific plugin and pass that instance to the plugin's ProbeVolumePlugins(config) func.

	// HostPath recycling is for testing and development purposes only!
	hostPathConfig := volume.VolumeConfig{
		RecyclerMinimumTimeout:   int(config.PersistentVolumeRecyclerConfiguration.MinimumTimeoutHostPath),
		RecyclerTimeoutIncrement: int(config.PersistentVolumeRecyclerConfiguration.IncrementTimeoutHostPath),
		RecyclerPodTemplate:      defaultScrubPod,
	}
	if err := kctrlmgr.AttemptToLoadRecycler(config.PersistentVolumeRecyclerConfiguration.PodTemplateFilePathHostPath, &hostPathConfig); err != nil {
		glog.Fatalf("Could not create hostpath recycler pod from file %s: %+v", config.PersistentVolumeRecyclerConfiguration.PodTemplateFilePathHostPath, err)
	}
	allPlugins = append(allPlugins, host_path.ProbeVolumePlugins(hostPathConfig)...)

	nfsConfig := volume.VolumeConfig{
		RecyclerMinimumTimeout:   int(config.PersistentVolumeRecyclerConfiguration.MinimumTimeoutNFS),
		RecyclerTimeoutIncrement: int(config.PersistentVolumeRecyclerConfiguration.IncrementTimeoutNFS),
		RecyclerPodTemplate:      defaultScrubPod,
	}
	if err := kctrlmgr.AttemptToLoadRecycler(config.PersistentVolumeRecyclerConfiguration.PodTemplateFilePathNFS, &nfsConfig); err != nil {
		glog.Fatalf("Could not create NFS recycler pod from file %s: %+v", config.PersistentVolumeRecyclerConfiguration.PodTemplateFilePathNFS, err)
	}
	allPlugins = append(allPlugins, nfs.ProbeVolumePlugins(nfsConfig)...)

	allPlugins = append(allPlugins, aws_ebs.ProbeVolumePlugins()...)
	allPlugins = append(allPlugins, gce_pd.ProbeVolumePlugins()...)
	allPlugins = append(allPlugins, cinder.ProbeVolumePlugins()...)
	allPlugins = append(allPlugins, flexvolume.ProbeVolumePlugins(config.FlexVolumePluginDir)...)
	allPlugins = append(allPlugins, vsphere_volume.ProbeVolumePlugins()...)
	allPlugins = append(allPlugins, glusterfs.ProbeVolumePlugins()...)
	allPlugins = append(allPlugins, rbd.ProbeVolumePlugins()...)

	return allPlugins
}

func (c *MasterConfig) RunReplicaSetController(client *kclientset.Clientset) {
	controller := replicasetcontroller.NewReplicaSetController(
		c.Informers.Pods().Informer(),
		client,
		kctrlmgr.ResyncPeriod(c.ControllerManager),
		replicasetcontroller.BurstReplicas,
		int(c.ControllerManager.LookupCacheSizeForRC),
		c.ControllerManager.EnableGarbageCollector,
	)
	go controller.Run(int(c.ControllerManager.ConcurrentRSSyncs), utilwait.NeverStop)
}

// RunReplicationController starts the Kubernetes replication controller sync loop
func (c *MasterConfig) RunReplicationController(client *kclientset.Clientset) {
	controllerManager := replicationcontroller.NewReplicationManager(
		c.Informers.Pods().Informer(),
		client,
		kctrlmgr.ResyncPeriod(c.ControllerManager),
		replicationcontroller.BurstReplicas,
		int(c.ControllerManager.LookupCacheSizeForRC),
		c.ControllerManager.EnableGarbageCollector,
	)
	go controllerManager.Run(int(c.ControllerManager.ConcurrentRCSyncs), utilwait.NeverStop)
}

func (c *MasterConfig) RunDeploymentController(client *kclientset.Clientset) {
	controller := deployment.NewDeploymentController(
		client,
		kctrlmgr.ResyncPeriod(c.ControllerManager),
	)
	go controller.Run(int(c.ControllerManager.ConcurrentDeploymentSyncs), utilwait.NeverStop)
}

// RunJobController starts the Kubernetes job controller sync loop
func (c *MasterConfig) RunJobController(client *kclientset.Clientset) {
	controller := jobcontroller.NewJobController(c.Informers.Pods().Informer(), client)
	go controller.Run(int(c.ControllerManager.ConcurrentJobSyncs), utilwait.NeverStop)
}

// RunScheduledJobController starts the Kubernetes scheduled job controller sync loop
func (c *MasterConfig) RunScheduledJobController(config *restclient.Config) {
	// TODO: this is a temp fix for allowing kubeClient list v2alpha1 jobs, should switch to using clientset
	config.ContentConfig.GroupVersion = &unversioned.GroupVersion{Group: batch.GroupName, Version: "v2alpha1"}
	client, err := kclient.New(config)
	if err != nil {
		glog.Fatalf("Unable to configure scheduled job controller: %v", err)
	}
	go scheduledjob.NewScheduledJobController(client).Run(utilwait.NeverStop)
}

// RunDisruptionBudgetController starts the Kubernetes disruption budget controller
func (c *MasterConfig) RunDisruptionBudgetController(client *kclient.Client) {
	go disruption.NewDisruptionController(c.Informers.Pods().Informer(), client).Run(utilwait.NeverStop)
}

// RunHPAController starts the Kubernetes hpa controller sync loop
func (c *MasterConfig) RunHPAController(oc *osclient.Client, kc *kclientset.Clientset, heapsterNamespace string) {
	delegatingScaleNamespacer := osclient.NewDelegatingScaleNamespacer(oc, kc)
	podautoscaler := podautoscalercontroller.NewHorizontalController(
		kc,
		delegatingScaleNamespacer,
		kc,
		metrics.NewHeapsterMetricsClient(kc, heapsterNamespace, "https", "heapster", ""),
		c.ControllerManager.HorizontalPodAutoscalerSyncPeriod.Duration,
	)
	go podautoscaler.Run(utilwait.NeverStop)
}

func (c *MasterConfig) RunDaemonSetsController(client *kclientset.Clientset) {
	controller := daemon.NewDaemonSetsController(
		c.Informers.Pods().Informer(),
		client,
		kctrlmgr.ResyncPeriod(c.ControllerManager),
		int(c.ControllerManager.LookupCacheSizeForDaemonSet),
	)
	go controller.Run(int(c.ControllerManager.ConcurrentDaemonSetSyncs), utilwait.NeverStop)
}

// RunEndpointController starts the Kubernetes replication controller sync loop
func (c *MasterConfig) RunEndpointController(client *kclientset.Clientset) {
	endpoints := endpointcontroller.NewEndpointController(c.Informers.Pods().Informer(), client)
	go endpoints.Run(int(c.ControllerManager.ConcurrentEndpointSyncs), utilwait.NeverStop)

}

// RunScheduler starts the Kubernetes scheduler
func (c *MasterConfig) RunScheduler() {
	config, err := c.createSchedulerConfig()
	if err != nil {
		glog.Fatalf("Unable to start scheduler: %v", err)
	}
	eventcast := record.NewBroadcaster()
	config.Recorder = eventcast.NewRecorder(kapi.EventSource{Component: kapi.DefaultSchedulerName})
	eventcast.StartRecordingToSink(c.KubeClient.Events(""))

	s := scheduler.New(config)
	s.Run()
}

// RunGCController handles deletion of terminated pods.
func (c *MasterConfig) RunGCController(client *kclientset.Clientset) {
	if c.ControllerManager.TerminatedPodGCThreshold > 0 {
		gcController := gccontroller.New(client, kctrlmgr.ResyncPeriod(c.ControllerManager), int(c.ControllerManager.TerminatedPodGCThreshold))
		go gcController.Run(utilwait.NeverStop)
	}
}

// RunGarbageCollectorController starts generic garbage collection for the cluster.
func (c *MasterConfig) RunGarbageCollectorController(client *osclient.Client, config *restclient.Config) {
	if !c.ControllerManager.EnableGarbageCollector {
		return
	}

	groupVersionResources, err := client.Discovery().ServerPreferredResources()
	if err != nil {
		glog.Fatalf("Failed to get supported resources from server: %v", err)
	}

	config = restclient.AddUserAgent(config, "generic-garbage-collector")
	config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()}
	// TODO: needs to take GVR
	metaOnlyClientPool := dynamic.NewClientPool(config, dynamic.LegacyAPIPathResolverFunc)
	config.ContentConfig.NegotiatedSerializer = nil
	// TODO: needs to take GVR
	clientPool := dynamic.NewClientPool(config, dynamic.LegacyAPIPathResolverFunc)
	garbageCollector, err := garbagecollector.NewGarbageCollector(metaOnlyClientPool, clientPool, groupVersionResources)
	if err != nil {
		glog.Fatalf("Failed to start the garbage collector: %v", err)
	}

	workers := int(c.ControllerManager.ConcurrentGCSyncs)
	go garbageCollector.Run(workers, utilwait.NeverStop)
}

// RunNodeController starts the node controller
// TODO: handle node CIDR and route allocation
func (c *MasterConfig) RunNodeController() {
	s := c.ControllerManager

	// this cidr has been validated already
	_, clusterCIDR, _ := net.ParseCIDR(s.ClusterCIDR)
	_, serviceCIDR, _ := net.ParseCIDR(s.ServiceCIDR)

	controller, err := nodecontroller.NewNodeController(
		c.Informers.Pods().Informer(),
		c.CloudProvider,
		adapter.FromUnversionedClient(c.KubeClient),
		s.PodEvictionTimeout.Duration,

		s.NodeEvictionRate,
		s.SecondaryNodeEvictionRate,
		s.LargeClusterSizeThreshold,
		s.UnhealthyZoneThreshold,

		s.NodeMonitorGracePeriod.Duration,
		s.NodeStartupGracePeriod.Duration,
		s.NodeMonitorPeriod.Duration,

		clusterCIDR,
		serviceCIDR,

		int(s.NodeCIDRMaskSize),
		s.AllocateNodeCIDRs,
	)
	if err != nil {
		glog.Fatalf("Unable to start node controller: %v", err)
	}

	controller.Run()
}

// RunServiceLoadBalancerController starts the service loadbalancer controller if the cloud provider is configured.
func (c *MasterConfig) RunServiceLoadBalancerController(client *kclientset.Clientset) {
	if c.CloudProvider == nil {
		glog.V(2).Infof("Service controller will not start - no cloud provider configured")
		return
	}
	serviceController, err := servicecontroller.New(c.CloudProvider, client, c.ControllerManager.ClusterName)
	if err != nil {
		glog.Errorf("Unable to start service controller: %v", err)
	} else {
		serviceController.Run(int(c.ControllerManager.ConcurrentServiceSyncs))
	}
}

// RunPetSetController starts the PetSet controller
func (c *MasterConfig) RunPetSetController(client *kclient.Client) {
	ps := petsetcontroller.NewPetSetController(c.Informers.Pods().Informer(), client, kctrlmgr.ResyncPeriod(c.ControllerManager)())
	go ps.Run(1, utilwait.NeverStop)
}

func (c *MasterConfig) createSchedulerConfig() (*scheduler.Config, error) {
	var policy schedulerapi.Policy
	var configData []byte

	// TODO make the rate limiter configurable
	configFactory := factory.NewConfigFactory(c.KubeClient, c.SchedulerServer.SchedulerName, int(c.SchedulerServer.HardPodAffinitySymmetricWeight), c.SchedulerServer.FailureDomains)
	if _, err := os.Stat(c.Options.SchedulerConfigFile); err == nil {
		configData, err = ioutil.ReadFile(c.SchedulerServer.PolicyConfigFile)
		if err != nil {
			return nil, fmt.Errorf("unable to read scheduler config: %v", err)
		}
		err = runtime.DecodeInto(latestschedulerapi.Codec, configData, &policy)
		if err != nil {
			return nil, fmt.Errorf("invalid scheduler configuration: %v", err)
		}

		return configFactory.CreateFromConfig(policy)
	}

	// if the config file isn't provided, use the default provider
	return configFactory.CreateFromProvider(factory.DefaultProvider)
}