package idling import ( "encoding/json" "fmt" "io/ioutil" "net" "os" "strings" "sync" "time" g "github.com/onsi/ginkgo" o "github.com/onsi/gomega" unidlingproxy "github.com/openshift/origin/pkg/proxy/unidler" unidlingapi "github.com/openshift/origin/pkg/unidling/api" exutil "github.com/openshift/origin/test/extended/util" kapi "k8s.io/kubernetes/pkg/api" ) func tryEchoUDPOnce(ip net.IP, udpPort int, expectedBuff []byte, readTimeout time.Duration) ([]byte, error) { conn, err := net.DialUDP("udp", nil, &net.UDPAddr{IP: ip, Port: udpPort}) if err != nil { return nil, fmt.Errorf("unable to connect to service: %v", err) } defer conn.Close() var n int if n, err = conn.Write(expectedBuff); err != nil { // It's technically possible to get some errors on write while switching over return nil, nil } else if n != len(expectedBuff) { return nil, fmt.Errorf("unable to write entire %v bytes to UDP echo server socket", len(expectedBuff)) } if err = conn.SetReadDeadline(time.Now().Add(readTimeout)); err != nil { return nil, fmt.Errorf("unable to set deadline on read from echo server: %v", err) } actualBuff := make([]byte, n) var amtRead int amtRead, _, err = conn.ReadFromUDP(actualBuff) if err != nil { return nil, fmt.Errorf("unable to read from UDP echo server: %v", err) } else if amtRead != n { // we should never read back the *wrong* thing return nil, fmt.Errorf("read back incorrect number of bytes from echo server") } if string(expectedBuff) != string(actualBuff) { return nil, fmt.Errorf("written contents %q didn't equal read contents %q from echo server: %v", string(expectedBuff), string(actualBuff), err) } return actualBuff, nil } func tryEchoUDP(svc *kapi.Service) error { rawIP := svc.Spec.ClusterIP o.Expect(rawIP).NotTo(o.BeEmpty(), "The service should have a cluster IP set") ip := net.ParseIP(rawIP) o.Expect(ip).NotTo(o.BeNil(), "The service should have a valid cluster IP, but %q was not valid", rawIP) var udpPort int for _, port := range svc.Spec.Ports { if port.Protocol == "UDP" { udpPort = int(port.Port) break } } o.Expect(udpPort).NotTo(o.Equal(0), "The service should have a UDP port exposed") // For UDP, we just drop packets on the floor rather than queue them up readTimeout := 5 * time.Second expectedBuff := []byte("It's time to UDP!\n") o.Eventually(func() ([]byte, error) { return tryEchoUDPOnce(ip, udpPort, expectedBuff, readTimeout) }, 2*time.Minute, readTimeout).Should(o.Equal(expectedBuff)) return nil } func tryEchoTCP(svc *kapi.Service) error { rawIP := svc.Spec.ClusterIP if rawIP == "" { return fmt.Errorf("no ClusterIP specified on service %s", svc.Name) } ip := net.ParseIP(rawIP) var tcpPort int for _, port := range svc.Spec.Ports { if port.Protocol == "TCP" { tcpPort = int(port.Port) break } } if tcpPort == 0 { return fmt.Errorf("Unable to find any TCP ports on service %s", svc.Name) } conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{IP: ip, Port: tcpPort}) if err != nil { return fmt.Errorf("unable to connect to service %s: %v", svc.Name, err) } if err = conn.SetDeadline(time.Now().Add(2 * time.Minute)); err != nil { return fmt.Errorf("unable to set timeout on TCP connection to service %s: %v", svc.Name, err) } expectedBuff := []byte("It's time to TCP!\n") var n int if n, err = conn.Write(expectedBuff); err != nil { return fmt.Errorf("unable to write data to echo server for service %s: %v", svc.Name, err) } else if n != len(expectedBuff) { return fmt.Errorf("unable to write all data to echo server for service %s", svc.Name) } actualBuff := make([]byte, n) var amtRead int amtRead, err = conn.Read(actualBuff) if err != nil { return fmt.Errorf("unable to read data from echo server for service %s: %v", svc.Name, err) } else if amtRead != n { return fmt.Errorf("unable to read all data written from echo server for service %s: %v", svc.Name, err) } if string(expectedBuff) != string(actualBuff) { return fmt.Errorf("written contents %q didn't equal read contents %q from echo server for service %s: %v", string(expectedBuff), string(actualBuff), svc.Name, err) } return nil } func createFixture(oc *exutil.CLI, path string) ([]string, []string, error) { output, err := oc.Run("create").Args("-f", path, "-o", "name").Output() if err != nil { return nil, nil, err } lines := strings.Split(output, "\n") resources := make([]string, 0, len(lines)-1) names := make([]string, 0, len(lines)-1) for _, line := range lines { if line == "" { continue } parts := strings.Split(line, "/") if len(parts) != 2 { return nil, nil, fmt.Errorf("expected type/name syntax, got: %q", line) } resources = append(resources, parts[0]) names = append(names, parts[1]) } return resources, names, nil } func checkSingleIdle(oc *exutil.CLI, idlingFile string, resources map[string][]string, resourceName string, kind string) { g.By("Idling the service") _, err := oc.Run("idle").Args("--resource-names-file", idlingFile).Output() o.Expect(err).ToNot(o.HaveOccurred()) g.By("Ensuring the scale is zero (and stays zero)") objName := resources[resourceName][0] // make sure we don't get woken up by an incorrect router health check or anything like that o.Consistently(func() (string, error) { return oc.Run("get").Args(resourceName+"/"+objName, "--output=jsonpath=\"{.spec.replicas}\"").Output() }, 20*time.Second, 500*time.Millisecond).Should(o.ContainSubstring("0")) g.By("Fetching the service and checking the annotations are present") serviceName := resources["service"][0] endpoints, err := oc.KubeClient().Core().Endpoints(oc.Namespace()).Get(serviceName) o.Expect(err).NotTo(o.HaveOccurred()) o.Expect(endpoints.Annotations).To(o.HaveKey(unidlingapi.IdledAtAnnotation)) o.Expect(endpoints.Annotations).To(o.HaveKey(unidlingapi.UnidleTargetAnnotation)) g.By("Checking the idled-at time") idledAtAnnotation := endpoints.Annotations[unidlingapi.IdledAtAnnotation] idledAtTime, err := time.Parse(time.RFC3339, idledAtAnnotation) o.Expect(err).ToNot(o.HaveOccurred()) o.Expect(idledAtTime).To(o.BeTemporally("~", time.Now(), 5*time.Minute)) g.By("Checking the idle targets") unidleTargetAnnotation := endpoints.Annotations[unidlingapi.UnidleTargetAnnotation] unidleTargets := []unidlingapi.RecordedScaleReference{} err = json.Unmarshal([]byte(unidleTargetAnnotation), &unidleTargets) o.Expect(err).ToNot(o.HaveOccurred()) o.Expect(unidleTargets).To(o.Equal([]unidlingapi.RecordedScaleReference{ { Replicas: 2, CrossGroupObjectReference: unidlingapi.CrossGroupObjectReference{ Name: resources[resourceName][0], Kind: kind, }, }, })) } var _ = g.Describe("idling and unidling", func() { defer g.GinkgoRecover() var ( oc = exutil.NewCLI("cli-idling", exutil.KubeConfigPath()).Verbose() echoServerFixture = exutil.FixturePath("testdata", "idling-echo-server.yaml") echoServerRcFixture = exutil.FixturePath("testdata", "idling-echo-server-rc.yaml") framework = oc.KubeFramework() ) // path to the fixture var fixture string // path to the idling file var idlingFile string // map of all resources created from the fixtures var resources map[string][]string g.JustBeforeEach(func() { g.By("Creating the resources") rawResources, rawResourceNames, err := createFixture(oc, fixture) o.Expect(err).ToNot(o.HaveOccurred()) resources = make(map[string][]string) for i, resource := range rawResources { resources[resource] = append(resources[resource], rawResourceNames[i]) } g.By("Creating the idling file") serviceNames := resources["service"] targetFile, err := ioutil.TempFile(exutil.TestContext.OutputDir, "idling-services-") o.Expect(err).ToNot(o.HaveOccurred()) defer targetFile.Close() idlingFile = targetFile.Name() _, err = targetFile.Write([]byte(strings.Join(serviceNames, "\n"))) o.Expect(err).ToNot(o.HaveOccurred()) g.By("Waiting for the endpoints to exist") serviceName := resources["service"][0] g.By("Waiting for endpoints to be up") err = waitForEndpointsAvailable(oc, serviceName) o.Expect(err).ToNot(o.HaveOccurred()) }) g.AfterEach(func() { g.By("Cleaning up the idling file") os.Remove(idlingFile) }) g.Describe("idling", func() { g.Context("with a single service and DeploymentConfig [Conformance]", func() { g.BeforeEach(func() { framework.BeforeEach() fixture = echoServerFixture }) g.It("should idle the service and DeploymentConfig properly", func() { checkSingleIdle(oc, idlingFile, resources, "deploymentconfig", "DeploymentConfig") }) }) g.Context("with a single service and ReplicationController", func() { g.BeforeEach(func() { framework.BeforeEach() fixture = echoServerRcFixture }) g.It("should idle the service and ReplicationController properly", func() { checkSingleIdle(oc, idlingFile, resources, "replicationcontroller", "ReplicationController") }) }) }) g.Describe("unidling", func() { g.BeforeEach(func() { framework.BeforeEach() fixture = echoServerFixture }) g.It("should work with TCP (when fully idled) [Conformance] [local]", func() { g.By("Idling the service") _, err := oc.Run("idle").Args("--resource-names-file", idlingFile).Output() o.Expect(err).ToNot(o.HaveOccurred()) g.By("Waiting for the pods to have terminated") err = waitForNoPodsAvailable(oc) o.Expect(err).ToNot(o.HaveOccurred()) g.By("Connecting to the service IP and checking the echo") serviceName := resources["service"][0] svc, err := oc.KubeClient().Core().Services(oc.Namespace()).Get(serviceName) o.Expect(err).ToNot(o.HaveOccurred()) err = tryEchoTCP(svc) o.Expect(err).ToNot(o.HaveOccurred()) g.By("Waiting until we have endpoints") err = waitForEndpointsAvailable(oc, serviceName) o.Expect(err).ToNot(o.HaveOccurred()) endpoints, err := oc.KubeClient().Core().Endpoints(oc.Namespace()).Get(serviceName) o.Expect(err).ToNot(o.HaveOccurred()) g.By("Making sure the endpoints are no longer marked as idled") o.Expect(endpoints.Annotations).NotTo(o.HaveKey(unidlingapi.IdledAtAnnotation)) o.Expect(endpoints.Annotations).NotTo(o.HaveKey(unidlingapi.UnidleTargetAnnotation)) }) g.It("should work with TCP (while idling) [local]", func() { g.By("Idling the service") _, err := oc.Run("idle").Args("--resource-names-file", idlingFile).Output() o.Expect(err).ToNot(o.HaveOccurred()) g.By("Connecting to the service IP and repeatedly connecting, making sure we seamlessly idle and come back up") serviceName := resources["service"][0] svc, err := oc.KubeClient().Core().Services(oc.Namespace()).Get(serviceName) o.Expect(err).ToNot(o.HaveOccurred()) o.Consistently(func() error { return tryEchoTCP(svc) }, 10*time.Second, 500*time.Millisecond).ShouldNot(o.HaveOccurred()) g.By("Waiting until we have endpoints") err = waitForEndpointsAvailable(oc, serviceName) o.Expect(err).ToNot(o.HaveOccurred()) endpoints, err := oc.KubeClient().Core().Endpoints(oc.Namespace()).Get(serviceName) o.Expect(err).ToNot(o.HaveOccurred()) g.By("Making sure the endpoints are no longer marked as idled") o.Expect(endpoints.Annotations).NotTo(o.HaveKey(unidlingapi.IdledAtAnnotation)) o.Expect(endpoints.Annotations).NotTo(o.HaveKey(unidlingapi.UnidleTargetAnnotation)) }) g.It("should handle many TCP connections by dropping those under a certain bound [local]", func() { g.By("Idling the service") _, err := oc.Run("idle").Args("--resource-names-file", idlingFile).Output() o.Expect(err).ToNot(o.HaveOccurred()) g.By("Waiting for the pods to have terminated") serviceName := resources["service"][0] err = waitForNoPodsAvailable(oc) o.Expect(err).ToNot(o.HaveOccurred()) g.By("Connecting to the service IP many times and checking the echo") svc, err := oc.KubeClient().Core().Services(oc.Namespace()).Get(serviceName) o.Expect(err).ToNot(o.HaveOccurred()) connectionsToStart := 100 errors := make([]error, connectionsToStart) var connWG sync.WaitGroup // spawn many connections for i := 0; i < connectionsToStart; i++ { connWG.Add(1) go func(ind int) { defer connWG.Done() err = tryEchoTCP(svc) errors[ind] = err }(i) } connWG.Wait() g.By(fmt.Sprintf("Expecting all but %v of those connections to fail", unidlingproxy.MaxHeldConnections)) errCount := 0 for _, err := range errors { if err != nil { errCount++ } } o.Expect(errCount).To(o.Equal(connectionsToStart - unidlingproxy.MaxHeldConnections)) g.By("Waiting until we have endpoints") err = waitForEndpointsAvailable(oc, serviceName) endpoints, err := oc.KubeClient().Core().Endpoints(oc.Namespace()).Get(serviceName) o.Expect(err).ToNot(o.HaveOccurred()) g.By("Making sure the endpoints are no longer marked as idled") o.Expect(endpoints.Annotations).NotTo(o.HaveKey(unidlingapi.IdledAtAnnotation)) o.Expect(endpoints.Annotations).NotTo(o.HaveKey(unidlingapi.UnidleTargetAnnotation)) }) g.It("should work with UDP [local]", func() { g.By("Idling the service") _, err := oc.Run("idle").Args("--resource-names-file", idlingFile).Output() o.Expect(err).ToNot(o.HaveOccurred()) g.By("Waiting for the pods to have terminated") err = waitForNoPodsAvailable(oc) o.Expect(err).ToNot(o.HaveOccurred()) g.By("Connecting to the service IP and checking the echo") serviceName := resources["service"][0] svc, err := oc.KubeClient().Core().Services(oc.Namespace()).Get(serviceName) o.Expect(err).ToNot(o.HaveOccurred()) err = tryEchoUDP(svc) o.Expect(err).ToNot(o.HaveOccurred()) g.By("Waiting until we have endpoints") err = waitForEndpointsAvailable(oc, serviceName) o.Expect(err).ToNot(o.HaveOccurred()) endpoints, err := oc.KubeClient().Core().Endpoints(oc.Namespace()).Get(serviceName) o.Expect(err).ToNot(o.HaveOccurred()) g.By("Making sure the endpoints are no longer marked as idled") o.Expect(endpoints.Annotations).NotTo(o.HaveKey(unidlingapi.IdledAtAnnotation)) o.Expect(endpoints.Annotations).NotTo(o.HaveKey(unidlingapi.UnidleTargetAnnotation)) }) // TODO: Work out how to make this test work correctly when run on AWS g.XIt("should handle many UDP senders (by continuing to drop all packets on the floor) [local]", func() { g.By("Idling the service") _, err := oc.Run("idle").Args("--resource-names-file", idlingFile).Output() o.Expect(err).ToNot(o.HaveOccurred()) g.By("Waiting for the pods to have terminated") err = waitForNoPodsAvailable(oc) o.Expect(err).ToNot(o.HaveOccurred()) g.By("Connecting to the service IP many times and checking the echo") serviceName := resources["service"][0] svc, err := oc.KubeClient().Core().Services(oc.Namespace()).Get(serviceName) o.Expect(err).ToNot(o.HaveOccurred()) connectionsToStart := 100 errors := make([]error, connectionsToStart) var connWG sync.WaitGroup // spawn many connectors for i := 0; i < connectionsToStart; i++ { connWG.Add(1) go func(ind int) { defer g.GinkgoRecover() defer connWG.Done() err = tryEchoUDP(svc) errors[ind] = err }(i) } connWG.Wait() // all of the echoers should eventually succeed errCount := 0 for _, err := range errors { if err != nil { errCount++ } } o.Expect(errCount).To(o.Equal(0)) g.By("Waiting until we have endpoints") err = waitForEndpointsAvailable(oc, serviceName) o.Expect(err).ToNot(o.HaveOccurred()) endpoints, err := oc.KubeClient().Core().Endpoints(oc.Namespace()).Get(serviceName) o.Expect(err).ToNot(o.HaveOccurred()) g.By("Making sure the endpoints are no longer marked as idled") o.Expect(endpoints.Annotations).NotTo(o.HaveKey(unidlingapi.IdledAtAnnotation)) o.Expect(endpoints.Annotations).NotTo(o.HaveKey(unidlingapi.UnidleTargetAnnotation)) }) }) })