package kubernetes

import (
	"crypto/tls"
	"fmt"
	"net"
	"os"
	"os/exec"
	"path/filepath"
	"strconv"
	"strings"
	"time"

	kapp "github.com/GoogleCloudPlatform/kubernetes/cmd/kubelet/app"
	kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
	"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
	"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor"
	"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
	"github.com/GoogleCloudPlatform/kubernetes/pkg/proxy"
	pconfig "github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/config"
	"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
	kexec "github.com/GoogleCloudPlatform/kubernetes/pkg/util/exec"
	"github.com/GoogleCloudPlatform/kubernetes/pkg/util/iptables"
	"github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount"
	dockerclient "github.com/fsouza/go-dockerclient"
	"github.com/golang/glog"

	cmdutil "github.com/openshift/origin/pkg/cmd/util"
	dockerutil "github.com/openshift/origin/pkg/cmd/util/docker"
	"github.com/openshift/origin/pkg/kubelet/app"
)

type commandExecutor interface {
	LookPath(executable string) (string, error)
	Run(command string, args ...string) error
}

type defaultCommandExecutor struct{}

func (ce defaultCommandExecutor) LookPath(executable string) (string, error) {
	return exec.LookPath(executable)
}

func (ce defaultCommandExecutor) Run(command string, args ...string) error {
	c := exec.Command(command, args...)
	return c.Run()
}

const minimumDockerAPIVersionWithPullByID = "1.18"

// EnsureDocker attempts to connect to the Docker daemon defined by the helper,
// and if it is unable to it will print a warning.
func (c *NodeConfig) EnsureDocker(docker *dockerutil.Helper) {
	dockerClient, dockerAddr := docker.GetClientOrExit()
	if err := dockerClient.Ping(); err != nil {
		c.HandleDockerError(fmt.Sprintf("Docker could not be reached at %s.  Docker must be installed and running to start containers.\n%v", dockerAddr, err))
		return
	}

	glog.Infof("Connecting to Docker at %s", dockerAddr)

	env, err := dockerClient.Version()
	if err != nil {
		c.HandleDockerError(fmt.Sprintf("Unable to check for Docker server version.\n%v", err))
		return
	}

	serverVersionString := env.Get("ApiVersion")
	serverVersion, err := dockerclient.NewAPIVersion(serverVersionString)
	if err != nil {
		c.HandleDockerError(fmt.Sprintf("Unable to determine Docker server version from %q.\n%v", serverVersionString, err))
		return
	}

	minimumPullByIDVersion, err := dockerclient.NewAPIVersion(minimumDockerAPIVersionWithPullByID)
	if err != nil {
		c.HandleDockerError(fmt.Sprintf("Unable to check for Docker server version.\n%v", err))
		return
	}

	if serverVersion.LessThan(minimumPullByIDVersion) {
		c.HandleDockerError(fmt.Sprintf("Docker 1.6 or later (server API version 1.18 or later) required."))
		return
	}

	c.DockerClient = dockerClient
}

// HandleDockerError handles an an error from the docker daemon
func (c *NodeConfig) HandleDockerError(message string) {
	if !c.AllowDisabledDocker {
		glog.Fatalf("ERROR: %s", message)
	}
	glog.Errorf("WARNING: %s", message)
	c.DockerClient = &dockertools.FakeDockerClient{VersionInfo: dockerclient.Env{"ApiVersion=1.18"}}
}

// EnsureVolumeDir attempts to convert the provided volume directory argument to
// an absolute path and create the directory if it does not exist. Will exit if
// an error is encountered.
func (c *NodeConfig) EnsureVolumeDir() {
	if volumeDir, err := c.initializeVolumeDir(&defaultCommandExecutor{}, c.VolumeDir); err != nil {
		glog.Fatal(err)
	} else {
		c.VolumeDir = volumeDir
	}
}

func (c *NodeConfig) initializeVolumeDir(ce commandExecutor, path string) (string, error) {
	rootDirectory, err := filepath.Abs(path)
	if err != nil {
		return "", fmt.Errorf("Error converting volume directory to an absolute path: %v", err)
	}

	if _, err := os.Stat(rootDirectory); os.IsNotExist(err) {
		if err := os.MkdirAll(rootDirectory, 0750); err != nil {
			return "", fmt.Errorf("Couldn't create kubelet volume root directory '%s': %s", rootDirectory, err)
		}
		if chconPath, err := ce.LookPath("chcon"); err != nil {
			glog.V(2).Infof("Couldn't locate 'chcon' to set the kubelet volume root directory SELinux context: %s", err)
		} else {
			if err := ce.Run(chconPath, "-t", "svirt_sandbox_file_t", rootDirectory); err != nil {
				glog.Warningf("Error running 'chcon' to set the kubelet volume root directory SELinux context: %s", err)
			}
		}
	}
	return rootDirectory, nil
}

// RunKubelet starts the Kubelet.
func (c *NodeConfig) RunKubelet() {
	// TODO: clean this up and make it more formal (service named 'dns'?). Use multiple ports.
	clusterDNS := c.ClusterDNS
	if clusterDNS == nil {
		if service, err := c.Client.Endpoints(kapi.NamespaceDefault).Get("kubernetes"); err == nil {
			if ip, ok := firstIP(service, 53); ok {
				if err := cmdutil.WaitForSuccessfulDial(false, "tcp", fmt.Sprintf("%s:%d", ip, 53), 50*time.Millisecond, 0, 2); err == nil {
					clusterDNS = net.ParseIP(ip)
				}
			}
		}
	}

	cadvisorInterface, err := cadvisor.New(4194)
	if err != nil {
		glog.Fatalf("Error instantiating cadvisor: %v", err)
	}

	hostNetworkCapabilities := []string{kubelet.ApiserverSource, kubelet.FileSource}

	imageGCPolicy := kubelet.ImageGCPolicy{
		HighThresholdPercent: 90,
		LowThresholdPercent:  80,
	}
	diskSpacePolicy := kubelet.DiskSpacePolicy{
		DockerFreeDiskMB: 256,
		RootFreeDiskMB:   256,
	}
	kubeAddress, kubePortStr, err := net.SplitHostPort(c.BindAddress)
	if err != nil {
		glog.Fatalf("Cannot parse node address: %v", err)
	}
	kubePort, err := strconv.Atoi(kubePortStr)
	if err != nil {
		glog.Fatalf("Cannot parse node port: %v", err)
	}

	var tlsOptions *kubelet.TLSOptions
	if c.TLS {
		tlsOptions = &kubelet.TLSOptions{
			Config: &tls.Config{
				// Change default from SSLv3 to TLSv1.0 (because of POODLE vulnerability)
				MinVersion: tls.VersionTLS10,
				// RequireAndVerifyClientCert lets us limit requests to ones with a valid client certificate
				ClientAuth: tls.RequireAndVerifyClientCert,
				ClientCAs:  c.ClientCAs,
			},
			CertFile: c.KubeletCertFile,
			KeyFile:  c.KubeletKeyFile,
		}
	}

	kcfg := kapp.KubeletConfig{
		Address: util.IP(net.ParseIP(kubeAddress)),
		// Allow privileged containers
		// TODO: make this configurable and not the default https://github.com/openshift/origin/issues/662
		AllowPrivileged:                true,
		HostNetworkSources:             hostNetworkCapabilities,
		HostnameOverride:               c.NodeHost,
		RootDirectory:                  c.VolumeDir,
		ConfigFile:                     c.PodManifestPath,
		ManifestURL:                    "",
		FileCheckFrequency:             time.Duration(c.PodManifestCheckIntervalSeconds) * time.Second,
		HTTPCheckFrequency:             0,
		PodInfraContainerImage:         c.ImageFor("pod"),
		SyncFrequency:                  10 * time.Second,
		RegistryPullQPS:                0.0,
		RegistryBurst:                  10,
		MinimumGCAge:                   10 * time.Second,
		MaxPerPodContainerCount:        5,
		MaxContainerCount:              100,
		ClusterDomain:                  c.ClusterDomain,
		ClusterDNS:                     util.IP(clusterDNS),
		Runonce:                        false,
		Port:                           uint(kubePort),
		ReadOnlyPort:                   0,
		CadvisorInterface:              cadvisorInterface,
		EnableServer:                   true,
		EnableDebuggingHandlers:        true,
		DockerClient:                   c.DockerClient,
		KubeClient:                     c.Client,
		MasterServiceNamespace:         kapi.NamespaceDefault,
		VolumePlugins:                  app.ProbeVolumePlugins(),
		NetworkPlugins:                 app.ProbeNetworkPlugins(),
		NetworkPluginName:              c.NetworkPluginName,
		StreamingConnectionIdleTimeout: 5 * time.Minute,
		TLSOptions:                     tlsOptions,
		ImageGCPolicy:                  imageGCPolicy,
		DiskSpacePolicy:                diskSpacePolicy,
		Cloud:                          nil,
		NodeStatusUpdateFrequency: 15 * time.Second,
		ResourceContainer:         "/kubelet",
		CgroupRoot:                "",
		ContainerRuntime:          "docker",
		Mounter:                   mount.New(),
		DockerDaemonContainer:     "",
		ConfigureCBR0:             false,
		MaxPods:                   200,
	}
	kapp.RunKubelet(&kcfg, nil)
}

// RunProxy starts the proxy
func (c *NodeConfig) RunProxy() {
	// initialize kube proxy
	serviceConfig := pconfig.NewServiceConfig()
	endpointsConfig := pconfig.NewEndpointsConfig()
	loadBalancer := proxy.NewLoadBalancerRR()
	endpointsConfig.RegisterHandler(loadBalancer)

	host, _, err := net.SplitHostPort(c.BindAddress)
	if err != nil {
		glog.Fatalf("The provided value to bind to must be an ip:port %q", c.BindAddress)
	}
	ip := net.ParseIP(host)
	if ip == nil {
		glog.Fatalf("The provided value to bind to must be an ip:port: %q", c.BindAddress)
	}

	protocol := iptables.ProtocolIpv4
	if ip.To4() == nil {
		protocol = iptables.ProtocolIpv6
	}

	go util.Forever(func() {
		proxier, err := proxy.NewProxier(loadBalancer, ip, iptables.New(kexec.New(), protocol))
		if err != nil {
			switch {
			// conflicting use of iptables, retry
			case proxy.IsProxyLocked(err):
				glog.Errorf("Unable to start proxy, will retry: %v", err)
				return
			// on a system without iptables
			case strings.Contains(err.Error(), "executable file not found in path"):
				glog.V(4).Infof("kube-proxy initialization error: %v", err)
				glog.Warningf("WARNING: Could not find the iptables command. The service proxy requires iptables and will be disabled.")
			case err == proxy.ErrProxyOnLocalhost:
				glog.Warningf("WARNING: The service proxy cannot bind to localhost and will be disabled.")
			case strings.Contains(err.Error(), "you must be root"):
				glog.Warningf("WARNING: Could not modify iptables. You must run this process as root to use the service proxy.")
			default:
				glog.Warningf("WARNING: Could not modify iptables. You must run this process as root to use the service proxy: %v", err)
			}
			select {}
		}

		pconfig.NewSourceAPI(
			c.Client.Services(kapi.NamespaceAll),
			c.Client.Endpoints(kapi.NamespaceAll),
			30*time.Second,
			serviceConfig.Channel("api"),
			endpointsConfig.Channel("api"))

		serviceConfig.RegisterHandler(proxier)
		glog.Infof("Started Kubernetes Proxy on %s", host)
		select {}
	}, 5*time.Second)
}

// TODO: more generic location
func includesPort(ports []kapi.EndpointPort, port int) bool {
	for _, p := range ports {
		if p.Port == port {
			return true
		}
	}
	return false
}

// TODO: more generic location
func firstIP(endpoints *kapi.Endpoints, port int) (string, bool) {
	for _, s := range endpoints.Subsets {
		if !includesPort(s.Ports, port) {
			continue
		}
		for _, a := range s.Addresses {
			return a.IP, true
		}
	}
	return "", false
}