package kubernetes

import (
	"errors"
	"fmt"
	"net"
	"net/url"
	"os"
	"os/exec"
	"path/filepath"
	"time"

	dockertypes "github.com/docker/engine-api/types"
	dockerclient "github.com/fsouza/go-dockerclient"
	"github.com/golang/glog"

	kubeletapp "k8s.io/kubernetes/cmd/kubelet/app"
	kapi "k8s.io/kubernetes/pkg/api"
	"k8s.io/kubernetes/pkg/apis/componentconfig"
	"k8s.io/kubernetes/pkg/client/record"
	"k8s.io/kubernetes/pkg/kubelet/cadvisor"
	cadvisortesting "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing"
	"k8s.io/kubernetes/pkg/kubelet/cm"
	"k8s.io/kubernetes/pkg/kubelet/dockertools"
	proxy "k8s.io/kubernetes/pkg/proxy"
	pconfig "k8s.io/kubernetes/pkg/proxy/config"
	"k8s.io/kubernetes/pkg/proxy/iptables"
	"k8s.io/kubernetes/pkg/proxy/userspace"
	utildbus "k8s.io/kubernetes/pkg/util/dbus"
	kexec "k8s.io/kubernetes/pkg/util/exec"
	utiliptables "k8s.io/kubernetes/pkg/util/iptables"
	utilnet "k8s.io/kubernetes/pkg/util/net"
	"k8s.io/kubernetes/pkg/volume"

	configapi "github.com/openshift/origin/pkg/cmd/server/api"
	cmdutil "github.com/openshift/origin/pkg/cmd/util"
	dockerutil "github.com/openshift/origin/pkg/cmd/util/docker"
	"github.com/openshift/origin/pkg/volume/emptydir"
	"k8s.io/kubernetes/pkg/client/cache"
	"k8s.io/kubernetes/pkg/fields"
)

type commandExecutor interface {
	LookPath(executable string) (string, error)
	Run(command string, args ...string) error
}

type defaultCommandExecutor struct{}

func (ce defaultCommandExecutor) LookPath(executable string) (string, error) {
	return exec.LookPath(executable)
}

func (ce defaultCommandExecutor) Run(command string, args ...string) error {
	c := exec.Command(command, args...)
	return c.Run()
}

const minimumDockerAPIVersionWithPullByID = "1.18"

// EnsureKubeletAccess performs a number of test operations that the Kubelet requires to properly function.
// All errors here are fatal.
func (c *NodeConfig) EnsureKubeletAccess() {
	if _, err := os.Stat("/var/lib/docker"); os.IsPermission(err) {
		c.HandleDockerError("Unable to view the /var/lib/docker directory - are you running as root?")
	}
	if c.Containerized {
		if _, err := os.Stat("/rootfs"); os.IsPermission(err) || os.IsNotExist(err) {
			glog.Fatal("error: Running in containerized mode, but cannot find the /rootfs directory - be sure to mount the host filesystem at /rootfs (read-only) in the container.")
		}
		if !sameFileStat(true, "/rootfs/sys", "/sys") {
			glog.Fatal("error: Running in containerized mode, but the /sys directory in the container does not appear to match the host /sys directory - be sure to mount /sys into the container.")
		}
		if !sameFileStat(true, "/rootfs/var/run", "/var/run") {
			glog.Fatal("error: Running in containerized mode, but the /var/run directory in the container does not appear to match the host /var/run directory - be sure to mount /var/run (read-write) into the container.")
		}
	}
	// TODO: check whether we can mount disks (for volumes)
	// TODO: check things cAdvisor needs to properly function
	// TODO: test a cGroup move?
}

// sameFileStat checks whether the provided paths are the same file, to verify that a user has correctly
// mounted those binaries
func sameFileStat(requireMode bool, src, dst string) bool {
	srcStat, err := os.Stat(src)
	if err != nil {
		glog.V(4).Infof("Unable to stat %q: %v", src, err)
		return false
	}
	dstStat, err := os.Stat(dst)
	if err != nil {
		glog.V(4).Infof("Unable to stat %q: %v", dst, err)
		return false
	}
	if requireMode && srcStat.Mode() != dstStat.Mode() {
		glog.V(4).Infof("Mode mismatch between %q (%s) and %q (%s)", src, srcStat.Mode(), dst, dstStat.Mode())
		return false
	}
	if !os.SameFile(srcStat, dstStat) {
		glog.V(4).Infof("inode and device mismatch between %q (%s) and %q (%s)", src, srcStat, dst, dstStat)
		return false
	}
	return true
}

// EnsureDocker attempts to connect to the Docker daemon defined by the helper,
// and if it is unable to it will print a warning.
func (c *NodeConfig) EnsureDocker(docker *dockerutil.Helper) {
	dockerClient, dockerAddr, err := docker.GetKubeClient()
	if err != nil {
		c.HandleDockerError(fmt.Sprintf("Unable to create a Docker client for %s - Docker must be installed and running to start containers.\n%v", dockerAddr, err))
		return
	}
	if url, err := url.Parse(dockerAddr); err == nil && url.Scheme == "unix" && len(url.Path) > 0 {
		s, err := os.Stat(url.Path)
		switch {
		case os.IsNotExist(err):
			c.HandleDockerError(fmt.Sprintf("No Docker socket found at %s. Have you started the Docker daemon?", url.Path))
			return
		case os.IsPermission(err):
			c.HandleDockerError(fmt.Sprintf("You do not have permission to connect to the Docker daemon (via %s). This process requires running as the root user.", url.Path))
			return
		case err == nil && s.IsDir():
			c.HandleDockerError(fmt.Sprintf("The Docker socket at %s is a directory instead of a unix socket - check that you have configured your connection to the Docker daemon properly.", url.Path))
			return
		}
	}
	if err := dockerClient.Ping(); err != nil {
		c.HandleDockerError(fmt.Sprintf("Docker could not be reached at %s.  Docker must be installed and running to start containers.\n%v", dockerAddr, err))
		return
	}

	glog.Infof("Connecting to Docker at %s", dockerAddr)

	version, err := dockerClient.Version()
	if err != nil {
		c.HandleDockerError(fmt.Sprintf("Unable to check for Docker server version.\n%v", err))
		return
	}

	serverVersion, err := dockerclient.NewAPIVersion(version.APIVersion)
	if err != nil {
		c.HandleDockerError(fmt.Sprintf("Unable to determine Docker server version from %q.\n%v", version.APIVersion, err))
		return
	}

	minimumPullByIDVersion, err := dockerclient.NewAPIVersion(minimumDockerAPIVersionWithPullByID)
	if err != nil {
		c.HandleDockerError(fmt.Sprintf("Unable to check for Docker server version.\n%v", err))
		return
	}

	if serverVersion.LessThan(minimumPullByIDVersion) {
		c.HandleDockerError(fmt.Sprintf("Docker 1.6 or later (server API version 1.18 or later) required."))
		return
	}

	c.DockerClient = dockerClient
}

// HandleDockerError handles an an error from the docker daemon
func (c *NodeConfig) HandleDockerError(message string) {
	if !c.AllowDisabledDocker {
		glog.Fatalf("error: %s", message)
	}
	glog.Errorf("WARNING: %s", message)
	c.DockerClient = &dockertools.FakeDockerClient{VersionInfo: dockertypes.Version{APIVersion: "1.18"}}
}

// EnsureVolumeDir attempts to convert the provided volume directory argument to
// an absolute path and create the directory if it does not exist. Will exit if
// an error is encountered.
func (c *NodeConfig) EnsureVolumeDir() {
	if volumeDir, err := c.initializeVolumeDir(&defaultCommandExecutor{}, c.VolumeDir); err != nil {
		glog.Fatal(err)
	} else {
		c.VolumeDir = volumeDir
	}
}

func (c *NodeConfig) initializeVolumeDir(ce commandExecutor, path string) (string, error) {
	rootDirectory, err := filepath.Abs(path)
	if err != nil {
		return "", fmt.Errorf("Error converting volume directory to an absolute path: %v", err)
	}

	if _, err := os.Stat(rootDirectory); os.IsNotExist(err) {
		if err := os.MkdirAll(rootDirectory, 0750); err != nil {
			return "", fmt.Errorf("Couldn't create kubelet volume root directory '%s': %s", rootDirectory, err)
		}
	}
	// always try to chcon, in case the volume dir existed prior to the node starting
	if chconPath, err := ce.LookPath("chcon"); err != nil {
		glog.V(2).Infof("Couldn't locate 'chcon' to set the kubelet volume root directory SELinux context: %s", err)
	} else {
		if err := ce.Run(chconPath, "-t", "svirt_sandbox_file_t", rootDirectory); err != nil {
			glog.Warningf("Error running 'chcon' to set the kubelet volume root directory SELinux context: %s", err)
		}
	}
	return rootDirectory, nil
}

// EnsureLocalQuota checks if the node config specifies a local storage
// perFSGroup quota, and if so will test that the volumeDirectory is on a
// filesystem suitable for quota enforcement. If checks pass the k8s emptyDir
// volume plugin will be replaced with a wrapper version which adds quota
// functionality.
func (c *NodeConfig) EnsureLocalQuota(nodeConfig configapi.NodeConfig) {
	if nodeConfig.VolumeConfig.LocalQuota.PerFSGroup == nil {
		return
	}
	glog.V(4).Info("Replacing empty-dir volume plugin with quota wrapper")
	wrappedEmptyDirPlugin := false

	quotaApplicator, err := emptydir.NewQuotaApplicator(nodeConfig.VolumeDirectory)
	if err != nil {
		glog.Fatalf("Could not set up local quota, %s", err)
	}

	// Create a volume spec with emptyDir we can use to search for the
	// emptyDir plugin with CanSupport:
	emptyDirSpec := &volume.Spec{
		Volume: &kapi.Volume{
			VolumeSource: kapi.VolumeSource{
				EmptyDir: &kapi.EmptyDirVolumeSource{},
			},
		},
	}

	for idx, plugin := range c.KubeletConfig.VolumePlugins {
		// Can't really do type checking or use a constant here as they are not exported:
		if plugin.CanSupport(emptyDirSpec) {
			wrapper := emptydir.EmptyDirQuotaPlugin{
				VolumePlugin:    plugin,
				Quota:           *nodeConfig.VolumeConfig.LocalQuota.PerFSGroup,
				QuotaApplicator: quotaApplicator,
			}
			c.KubeletConfig.VolumePlugins[idx] = &wrapper
			wrappedEmptyDirPlugin = true
		}
	}
	// Because we can't look for the k8s emptyDir plugin by any means that would
	// survive a refactor, error out if we couldn't find it:
	if !wrappedEmptyDirPlugin {
		glog.Fatal(errors.New("No plugin handling EmptyDir was found, unable to apply local quotas"))
	}
}

// RunServiceStores retrieves service info from the master, and closes the
// ServicesReady channel when done.
func (c *NodeConfig) RunServiceStores(enableProxy, enableDNS bool) {
	if !enableProxy && !enableDNS {
		close(c.ServicesReady)
		return
	}

	serviceList := cache.NewListWatchFromClient(c.Client, "services", kapi.NamespaceAll, fields.Everything())
	serviceReflector := cache.NewReflector(serviceList, &kapi.Service{}, c.ServiceStore, c.ProxyConfig.ConfigSyncPeriod)
	serviceReflector.Run()

	if enableProxy {
		endpointList := cache.NewListWatchFromClient(c.Client, "endpoints", kapi.NamespaceAll, fields.Everything())
		endpointReflector := cache.NewReflector(endpointList, &kapi.Endpoints{}, c.EndpointsStore, c.ProxyConfig.ConfigSyncPeriod)
		endpointReflector.Run()

		for len(endpointReflector.LastSyncResourceVersion()) == 0 {
			time.Sleep(100 * time.Millisecond)
		}
	}
	for len(serviceReflector.LastSyncResourceVersion()) == 0 {
		time.Sleep(100 * time.Millisecond)
	}
	close(c.ServicesReady)
}

// RunKubelet starts the Kubelet.
func (c *NodeConfig) RunKubelet() {
	if c.KubeletConfig.ClusterDNS == nil {
		if service, err := c.Client.Services(kapi.NamespaceDefault).Get("kubernetes"); err == nil {
			if includesServicePort(service.Spec.Ports, 53, "dns") {
				// Use master service if service includes "dns" port 53.
				c.KubeletConfig.ClusterDNS = net.ParseIP(service.Spec.ClusterIP)
			}
		}
	}
	if c.KubeletConfig.ClusterDNS == nil {
		if endpoint, err := c.Client.Endpoints(kapi.NamespaceDefault).Get("kubernetes"); err == nil {
			if endpointIP, ok := firstEndpointIPWithNamedPort(endpoint, 53, "dns"); ok {
				// Use first endpoint if endpoint includes "dns" port 53.
				c.KubeletConfig.ClusterDNS = net.ParseIP(endpointIP)
			} else if endpointIP, ok := firstEndpointIP(endpoint, 53); ok {
				// Test and use first endpoint if endpoint includes any port 53.
				if err := cmdutil.WaitForSuccessfulDial(false, "tcp", fmt.Sprintf("%s:%d", endpointIP, 53), 50*time.Millisecond, 0, 2); err == nil {
					c.KubeletConfig.ClusterDNS = net.ParseIP(endpointIP)
				}
			}
		}
	}

	c.KubeletConfig.DockerClient = c.DockerClient
	// updated by NodeConfig.EnsureVolumeDir
	c.KubeletConfig.RootDirectory = c.VolumeDir

	// hook for overriding the cadvisor interface for integration tests
	c.KubeletConfig.CAdvisorInterface = defaultCadvisorInterface
	// hook for overriding the container manager interface for integration tests
	c.KubeletConfig.ContainerManager = defaultContainerManagerInterface

	go func() {
		glog.Fatal(kubeletapp.Run(c.KubeletServer, c.KubeletConfig))
	}()
}

// defaultCadvisorInterface holds the overridden default interface
// exists only to allow stubbing integration tests, should always be nil in production
var defaultCadvisorInterface cadvisor.Interface = nil

// SetFakeCadvisorInterfaceForIntegrationTest sets a fake cadvisor implementation to allow the node to run in integration tests
func SetFakeCadvisorInterfaceForIntegrationTest() {
	defaultCadvisorInterface = &cadvisortesting.Fake{}
}

// defaultContainerManagerInterface holds the overridden default interface
// exists only to allow stubbing integration tests, should always be nil in production
var defaultContainerManagerInterface cm.ContainerManager = nil

// SetFakeContainerManagerInterfaceForIntegrationTest sets a fake container manager implementation to allow the node to run in integration tests
func SetFakeContainerManagerInterfaceForIntegrationTest() {
	defaultContainerManagerInterface = cm.NewStubContainerManager()
}

// RunPlugin starts the local SDN plugin, if enabled in configuration.
func (c *NodeConfig) RunPlugin() {
	if c.SDNPlugin == nil {
		return
	}
	if err := c.SDNPlugin.Start(); err != nil {
		glog.Fatalf("error: SDN node startup failed: %v", err)
	}
}

// RunDNS starts the DNS server as soon as services are loaded.
func (c *NodeConfig) RunDNS() {
	go func() {
		<-c.ServicesReady
		glog.Infof("Starting DNS on %s", c.DNSServer.Config.DnsAddr)
		err := c.DNSServer.ListenAndServe()
		glog.Fatalf("DNS server failed to start: %v", err)
	}()
}

// RunProxy starts the proxy
func (c *NodeConfig) RunProxy() {
	protocol := utiliptables.ProtocolIpv4
	bindAddr := net.ParseIP(c.ProxyConfig.BindAddress)
	if bindAddr.To4() == nil {
		protocol = utiliptables.ProtocolIpv6
	}

	portRange := utilnet.ParsePortRangeOrDie(c.ProxyConfig.PortRange)

	eventBroadcaster := record.NewBroadcaster()
	eventBroadcaster.StartRecordingToSink(c.Client.Events(""))
	recorder := eventBroadcaster.NewRecorder(kapi.EventSource{Component: "kube-proxy", Host: c.KubeletConfig.NodeName})

	execer := kexec.New()
	dbus := utildbus.New()
	iptInterface := utiliptables.New(execer, dbus, protocol)

	var proxier proxy.ProxyProvider
	var endpointsHandler pconfig.EndpointsConfigHandler

	switch c.ProxyConfig.Mode {
	case componentconfig.ProxyModeIPTables:
		glog.V(0).Info("Using iptables Proxier.")
		if c.ProxyConfig.IPTablesMasqueradeBit == nil {
			// IPTablesMasqueradeBit must be specified or defaulted.
			glog.Fatalf("Unable to read IPTablesMasqueradeBit from config")
		}
		proxierIptables, err := iptables.NewProxier(iptInterface, execer, c.ProxyConfig.IPTablesSyncPeriod.Duration, c.ProxyConfig.MasqueradeAll, int(*c.ProxyConfig.IPTablesMasqueradeBit), c.ProxyConfig.ClusterCIDR)
		if err != nil {
			if c.Containerized {
				glog.Fatalf("error: Could not initialize Kubernetes Proxy: %v\n When running in a container, you must run the container in the host network namespace with --net=host and with --privileged", err)
			} else {
				glog.Fatalf("error: Could not initialize Kubernetes Proxy. You must run this process as root to use the service proxy: %v", err)
			}
		}
		proxier = proxierIptables
		endpointsHandler = proxierIptables
		// No turning back. Remove artifacts that might still exist from the userspace Proxier.
		glog.V(0).Info("Tearing down userspace rules.")
		userspace.CleanupLeftovers(iptInterface)
	case componentconfig.ProxyModeUserspace:
		glog.V(0).Info("Using userspace Proxier.")
		// This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for
		// our config.EndpointsConfigHandler.
		loadBalancer := userspace.NewLoadBalancerRR()
		// set EndpointsConfigHandler to our loadBalancer
		endpointsHandler = loadBalancer

		proxierUserspace, err := userspace.NewProxier(
			loadBalancer,
			bindAddr,
			iptInterface,
			*portRange,
			c.ProxyConfig.IPTablesSyncPeriod.Duration,
			c.ProxyConfig.UDPIdleTimeout.Duration,
		)
		if err != nil {
			if c.Containerized {
				glog.Fatalf("error: Could not initialize Kubernetes Proxy: %v\n When running in a container, you must run the container in the host network namespace with --net=host and with --privileged", err)
			} else {
				glog.Fatalf("error: Could not initialize Kubernetes Proxy. You must run this process as root to use the service proxy: %v", err)
			}
		}
		proxier = proxierUserspace
		// Remove artifacts from the pure-iptables Proxier.
		glog.V(0).Info("Tearing down pure-iptables proxy rules.")
		iptables.CleanupLeftovers(iptInterface)
	default:
		glog.Fatalf("Unknown proxy mode %q", c.ProxyConfig.Mode)
	}
	iptInterface.AddReloadFunc(proxier.Sync)

	// Create configs (i.e. Watches for Services and Endpoints)
	// Note: RegisterHandler() calls need to happen before creation of Sources because sources
	// only notify on changes, and the initial update (on process start) may be lost if no handlers
	// are registered yet.
	serviceConfig := pconfig.NewServiceConfig()
	serviceConfig.RegisterHandler(proxier)

	endpointsConfig := pconfig.NewEndpointsConfig()
	// customized handling registration that inserts a filter if needed
	if c.FilteringEndpointsHandler != nil {
		if err := c.FilteringEndpointsHandler.Start(endpointsHandler); err != nil {
			glog.Fatalf("error: node proxy plugin startup failed: %v", err)
		}
		endpointsHandler = c.FilteringEndpointsHandler
	}
	endpointsConfig.RegisterHandler(endpointsHandler)

	c.ServiceStore = pconfig.NewServiceStore(c.ServiceStore, serviceConfig.Channel("api"))
	c.EndpointsStore = pconfig.NewEndpointsStore(c.EndpointsStore, endpointsConfig.Channel("api"))
	// will be started by RunServiceStores

	recorder.Eventf(c.ProxyConfig.NodeRef, kapi.EventTypeNormal, "Starting", "Starting kube-proxy.")
	glog.Infof("Started Kubernetes Proxy on %s", c.ProxyConfig.BindAddress)
}

// TODO: more generic location
func includesServicePort(ports []kapi.ServicePort, port int, portName string) bool {
	for _, p := range ports {
		if p.Port == int32(port) && p.Name == portName {
			return true
		}
	}
	return false
}

// TODO: more generic location
func includesEndpointPort(ports []kapi.EndpointPort, port int) bool {
	for _, p := range ports {
		if p.Port == int32(port) {
			return true
		}
	}
	return false
}

// TODO: more generic location
func firstEndpointIP(endpoints *kapi.Endpoints, port int) (string, bool) {
	for _, s := range endpoints.Subsets {
		if !includesEndpointPort(s.Ports, port) {
			continue
		}
		for _, a := range s.Addresses {
			return a.IP, true
		}
	}
	return "", false
}

// TODO: more generic location
func firstEndpointIPWithNamedPort(endpoints *kapi.Endpoints, port int, portName string) (string, bool) {
	for _, s := range endpoints.Subsets {
		if !includesNamedEndpointPort(s.Ports, port, portName) {
			continue
		}
		for _, a := range s.Addresses {
			return a.IP, true
		}
	}
	return "", false
}

// TODO: more generic location
func includesNamedEndpointPort(ports []kapi.EndpointPort, port int, portName string) bool {
	for _, p := range ports {
		if p.Port == int32(port) && p.Name == portName {
			return true
		}
	}
	return false
}