package ingressip

import (
	"errors"
	"fmt"
	"net"
	"reflect"
	"testing"
	"time"

	kapi "k8s.io/kubernetes/pkg/api"
	"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
	kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
	"k8s.io/kubernetes/pkg/client/testing/core"
	"k8s.io/kubernetes/pkg/registry/service/ipallocator"
	"k8s.io/kubernetes/pkg/runtime"
	"k8s.io/kubernetes/pkg/util/workqueue"
	"k8s.io/kubernetes/pkg/watch"
)

const namespace = "ns"

func newController(t *testing.T, client *fake.Clientset) *IngressIPController {
	_, ipNet, err := net.ParseCIDR("172.16.0.12/28")
	if err != nil {
		t.Fatalf("unexpected error: %v", err)
	}
	if client == nil {
		client = fake.NewSimpleClientset()
	}
	return NewIngressIPController(client, ipNet, 10*time.Minute)
}

func controllerSetup(t *testing.T, startingObjects []runtime.Object) (*fake.Clientset, *watch.FakeWatcher, *IngressIPController) {
	client := fake.NewSimpleClientset(startingObjects...)

	fakeWatch := watch.NewFake()
	client.PrependWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil))

	client.PrependReactor("create", "*", func(action core.Action) (handled bool, ret runtime.Object, err error) {
		obj := action.(core.CreateAction).GetObject()
		fakeWatch.Add(obj)
		return true, obj, nil
	})

	// Ensure that updates the controller makes are passed through to the watcher.
	client.PrependReactor("update", "*", func(action core.Action) (handled bool, ret runtime.Object, err error) {
		obj := action.(core.CreateAction).GetObject()
		fakeWatch.Modify(obj)
		return true, obj, nil
	})

	controller := newController(t, client)

	return client, fakeWatch, controller
}

func newService(name, ip string, typeLoadBalancer bool) *kapi.Service {
	serviceType := kapi.ServiceTypeClusterIP
	if typeLoadBalancer {
		serviceType = kapi.ServiceTypeLoadBalancer
	}
	service := &kapi.Service{
		ObjectMeta: kapi.ObjectMeta{
			Namespace: namespace,
			Name:      name,
		},
		Spec: kapi.ServiceSpec{
			Type: serviceType,
		},
	}
	if len(ip) > 0 {
		service.Status = kapi.ServiceStatus{
			LoadBalancer: kapi.LoadBalancerStatus{
				Ingress: []kapi.LoadBalancerIngress{
					{
						IP: ip,
					},
				},
			},
		}
	}
	return service
}

func TestProcessInitialSync(t *testing.T) {
	c := newController(t, nil)

	allocatedKey := "lb-allocated"
	allocatedIP := "172.16.0.1"
	services := []*kapi.Service{
		newService("regular", "", false),
		newService(allocatedKey, allocatedIP, true),
		newService("lb-reallocate", "foo", true),
		newService("lb-unallocated", "", true),
	}
	for _, service := range services {
		c.enqueueChange(service, nil)
		c.cache.Add(service)
	}
	// Queue a change without caching it to validate that it is ignored
	c.enqueueChange(newService("ignored", "", true), nil)

	// Enqueue post-sync changes to validate that they are added back
	// to the queue without being processed.
	postSyncUpdate := services[0]
	c.enqueueChange(postSyncUpdate, postSyncUpdate)
	c.cache.Update(postSyncUpdate)
	postSyncAddition := newService("lb-post-sync-addition", "", true)
	c.enqueueChange(postSyncAddition, nil)
	c.cache.Add(postSyncAddition)

	c.processInitialSync()

	// Validate allocation
	expectedMap := map[string]string{
		allocatedIP: fmt.Sprintf("%s/%s", namespace, allocatedKey),
	}
	if !reflect.DeepEqual(c.allocationMap, expectedMap) {
		t.Errorf("Expected allocation map %v, got %v", expectedMap, c.allocationMap)
	}
	if !c.ipAllocator.Has(net.ParseIP(allocatedIP)) {
		t.Errorf("IP %v was not marked as allocated", allocatedIP)
	}

	// Validate queue contents
	expectedQueueLength := 5 // 3 from initial sync, 2 from post-sync changes
	if c.queue.Len() != expectedQueueLength {
		t.Errorf("Expected queue length of %d, got %d", expectedQueueLength, c.queue.Len())
	}
}

func TestWorkRequeuesWhenFull(t *testing.T) {
	tests := []struct {
		testName        string
		requeuedChange  bool
		requeuedService bool
		requeued        bool
	}{
		{
			testName: "Previously requeued change should be requeued",
			requeued: true,
		},
		{
			testName:        "The only pending allocation should be requeued",
			requeuedChange:  true,
			requeuedService: true,
			requeued:        true,
		},
		{
			testName:        "Already requeued allocation should not be requeued",
			requeuedService: true,
			requeued:        false,
		},
	}
	for _, test := range tests {
		c := newController(t, nil)
		c.changeHandler = func(change *serviceChange) error {
			return ipallocator.ErrFull
		}
		// Use a queue with no delay to avoid timing issues
		c.queue = workqueue.NewRateLimitingQueue(workqueue.NewMaxOfRateLimiter())
		change := &serviceChange{
			key:                "foo",
			requeuedAllocation: test.requeuedChange,
		}
		if test.requeuedService {
			c.requeuedAllocations.Insert(change.key)
		}
		c.queue.Add(change)

		c.work()

		requeued := (c.queue.Len() == 1)
		if test.requeued != requeued {
			t.Errorf("Expected requeued == %v, got %v", test.requeued, requeued)
		}
	}
}

func TestProcessChange(t *testing.T) {
	tests := []struct {
		testName    string
		ip          string
		lb          bool
		deleted     bool
		allocatedIP string
		ipAllocated bool
	}{
		{
			testName: "Deleted service",
			deleted:  true,
		},
		{
			testName:    "Existing allocation",
			ip:          "172.16.0.1",
			lb:          true,
			allocatedIP: "172.16.0.1",
		},
		{
			testName:    "Needs allocation",
			lb:          true,
			ipAllocated: true,
		},
		{
			testName: "Needs deallocation",
			ip:       "172.16.0.1",
			lb:       false,
		},
	}
	for _, test := range tests {
		c := newController(t, nil)
		c.persistenceHandler = func(client kcoreclient.ServicesGetter, service *kapi.Service, targetStatus bool) error {
			return nil
		}
		s := newService("svc", test.ip, test.lb)
		if !test.deleted {
			c.cache.Add(s)
		}
		key := fmt.Sprintf("%s/%s", namespace, s.Name)
		addAllocation := len(test.ip) > 0 && len(test.allocatedIP) == 0
		if addAllocation {
			c.allocationMap[test.ip] = key
		}
		change := &serviceChange{key: key}

		freeBefore := c.ipAllocator.Free()

		c.processChange(change)

		switch {
		case len(test.allocatedIP) > 0:
			if _, ok := c.allocationMap[test.allocatedIP]; !ok {
				t.Errorf("%s: %v was not allocated as expected", test.testName, test.allocatedIP)
			}
		case test.ipAllocated:
			if freeBefore == c.ipAllocator.Free() {
				t.Errorf("%s: ip was not allocated", test.testName)
			}
		case len(test.ip) > 0:
			if _, ok := c.allocationMap[test.ip]; ok {
				t.Errorf("%s: %v was not deallocated as expected", test.testName, test.ip)
			}
		}
	}
}

func TestClearOldAllocation(t *testing.T) {
	tests := []struct {
		testName string
		oldIP    string
		newIP    string
		cleared  bool
	}{
		{
			testName: "No old allocation",
			oldIP:    "",
			newIP:    "foo",
		},
		{
			testName: "Unchanged allocation",
			oldIP:    "172.16.0.1",
			newIP:    "172.16.0.1",
		},
		{
			testName: "Old allocation should be cleared",
			oldIP:    "172.16.0.1",
			newIP:    "172.16.0.2",
			cleared:  true,
		},
	}
	for _, test := range tests {
		c := newController(t, nil)
		new := newService("new", test.newIP, true)
		old := newService("old", test.oldIP, true)
		if cleared := c.clearOldAllocation(new, old); test.cleared != cleared {
			t.Errorf("%s: expected cleared %v, got %v", test.testName, test.cleared, cleared)
		}
	}
}

func TestRecordAllocationReallocates(t *testing.T) {
	c := newController(t, nil)
	var persisted *kapi.Service
	// Keep track of the last-persisted service
	c.persistenceHandler = func(client kcoreclient.ServicesGetter, service *kapi.Service, targetStatus bool) error {
		persisted = service
		return nil
	}
	s := newService("bad-ip", "foo", true)
	key := fmt.Sprintf("%s/%s", namespace, s.Name)
	err := c.recordAllocation(s, key)
	if err != nil {
		t.Fatalf("Unexpected error: %v", err)
	}
	if persisted == nil {
		t.Errorf("Service was not persisted")
	}
	if len(c.allocationMap) != 1 {
		t.Errorf("Service ip was not reallocated")
	}
	if ingress := persisted.Status.LoadBalancer.Ingress; len(ingress) == 0 {
		t.Errorf("Ingress ip was not persisted")
	}
}

func TestAllocateReleasesOnPersistenceFailure(t *testing.T) {
	c := newController(t, nil)
	expectedFree := c.ipAllocator.Free()
	expectedErr := errors.New("Persistence failure")
	c.persistenceHandler = func(client kcoreclient.ServicesGetter, service *kapi.Service, targetStatus bool) error {
		return expectedErr
	}
	s := newService("svc", "", true)
	key := fmt.Sprintf("%s/%s", namespace, s.Name)
	err := c.allocate(s, key)
	if !reflect.DeepEqual(expectedErr, err) {
		t.Fatalf("Expected err %v, got %v", expectedErr, err)
	}
	if expectedFree != c.ipAllocator.Free() {
		t.Fatalf("IP wasn't released on error")
	}
}

func TestClearLocalAllocation(t *testing.T) {
	tests := []struct {
		testName     string
		key          string
		ip           string
		allocatedKey string
		cleared      bool
	}{
		{
			testName: "Invalid ip",
			ip:       "foo",
		},
		{
			testName: "IP not allocated",
			ip:       "172.16.0.1",
		},
		{
			testName:     "IP not allocated to service",
			key:          "foo",
			ip:           "172.16.0.1",
			allocatedKey: "bar",
		},
		{
			testName:     "Local ip allocation cleared",
			key:          "foo",
			ip:           "172.16.0.1",
			allocatedKey: "foo",
			cleared:      true,
		},
	}
	for _, test := range tests {
		c := newController(t, nil)
		if len(test.allocatedKey) > 0 {
			c.allocationMap[test.ip] = test.allocatedKey
			c.ipAllocator.Allocate(net.ParseIP(test.ip))
		}
		if cleared := c.clearLocalAllocation(test.key, test.ip); test.cleared != cleared {
			t.Errorf("%s: expected cleared %v, got %v", test.testName, test.cleared, cleared)
		} else if cleared {
			if _, ok := c.allocationMap[test.ip]; ok {
				t.Errorf("%s: allocation map was not cleared", test.testName)
			}
			if c.ipAllocator.Has(net.ParseIP(test.ip)) {
				t.Errorf("%s: ip %v is still allocated", test.testName, test.ip)
			}
		}
	}
}

func TestEnsureExternalIPRespectsNonIngress(t *testing.T) {
	c := newController(t, nil)
	c.persistenceHandler = func(client kcoreclient.ServicesGetter, service *kapi.Service, targetStatus bool) error {
		return nil
	}
	ingressIP := "172.16.0.1"
	s := newService("foo", ingressIP, true)
	externalIP := "172.16.1.1"
	s.Spec.ExternalIPs = append(s.Spec.ExternalIPs, externalIP)
	c.ensureExternalIP(s, s.Name, ingressIP)
	expectedExternalIPs := []string{externalIP, ingressIP}
	externalIPs := s.Spec.ExternalIPs
	if !reflect.DeepEqual(expectedExternalIPs, externalIPs) {
		t.Errorf("Expected ExternalIPs %v, got %v", expectedExternalIPs, externalIPs)
	}
}

func TestAllocateIP(t *testing.T) {
	tests := []struct {
		testName    string
		requestedIP string
		allocated   bool
		asRequested bool
	}{
		{
			testName:    "No requested ip",
			requestedIP: "",
			asRequested: false,
		},
		{
			testName:    "Invalid ip",
			requestedIP: "foo",
			asRequested: false,
		},
		{
			testName:    "IP not available",
			requestedIP: "172.16.0.1",
			allocated:   true,
			asRequested: false,
		},
		{
			testName:    "Available",
			requestedIP: "172.16.0.1",
			asRequested: true,
		},
	}
	for _, test := range tests {
		controller := newController(t, nil)
		if test.allocated {
			ip := net.ParseIP(test.requestedIP)
			controller.ipAllocator.Allocate(ip)
		}
		// Expect no error for these
		ip, err := controller.allocateIP(test.requestedIP)
		if err != nil {
			t.Errorf("Unexpected error: %v", err)
		}
		if test.asRequested && ip.String() != test.requestedIP {
			t.Errorf("%s: expected %s but got %s", test.testName, test.requestedIP, ip.String())
		}
		if !test.asRequested && ip.String() == test.requestedIP {
			t.Errorf("%s: did not expect %s", test.testName, test.requestedIP)
		}
	}
}

func TestRecordLocalAllocation(t *testing.T) {
	key := "svc1"
	ip := "172.16.0.1"
	otherKey := "svc2"
	tests := []struct {
		testName      string
		allocationMap map[string]string
		ip            string
		reallocate    bool
		errExpected   bool
	}{
		{
			testName:    "Invalid ip",
			ip:          "foo",
			reallocate:  true,
			errExpected: true,
		},
		{
			testName: "Allocation exists for service",
			allocationMap: map[string]string{
				ip: key,
			},
			ip: ip,
		},
		{
			testName: "Allocation exists for another service",
			allocationMap: map[string]string{
				ip: otherKey,
			},
			ip:          ip,
			reallocate:  true,
			errExpected: true,
		},
		{
			testName:    "IP not in range",
			ip:          "172.16.1.1",
			reallocate:  true,
			errExpected: true,
		},
		{
			testName: "Allocation successful",
			ip:       "172.16.0.1",
		},
	}
	for _, test := range tests {
		c := newController(t, nil)
		if test.allocationMap != nil {
			c.allocationMap = test.allocationMap
			for ipString := range test.allocationMap {
				c.ipAllocator.Allocate(net.ParseIP(ipString))
			}
		}

		reallocate, err := c.recordLocalAllocation(key, test.ip)

		if test.reallocate != reallocate {
			t.Errorf("%s: expected reallocate == %v but got %v", test.testName, test.reallocate, reallocate)
		}
		switch {
		case test.errExpected && (err == nil):
			t.Errorf("%s: expected error but didn't see one", test.testName)
		case !test.errExpected && (err != nil):
			t.Errorf("%s: saw unexpected error: %v", test.testName, err)
		}

		// Ensure allocation was successfully recorded
		checkAllocation := !test.reallocate && !test.errExpected
		if checkAllocation {
			ipKey, ok := c.allocationMap[test.ip]
			inMap := ok && ipKey == key
			inAllocator := c.ipAllocator.Has(net.ParseIP(test.ip))
			if !(inMap && inAllocator) {
				t.Errorf("%s: allocation not recorded", test.testName)
			}
		}
	}
}

func TestClearPersistedAllocation(t *testing.T) {
	tests := []struct {
		testName         string
		persistenceError error
		ingressIPCount   int
	}{
		{
			testName:         "Status not cleared if external ip not removed",
			persistenceError: errors.New(""),
			ingressIPCount:   1,
		},
		{
			testName: "Status cleared",
		},
	}
	for _, test := range tests {
		c := newController(t, nil)
		var persistedService *kapi.Service
		c.persistenceHandler = func(client kcoreclient.ServicesGetter, service *kapi.Service, targetStatus bool) error {
			// Save the last persisted service
			persistedService = service
			return test.persistenceError
		}
		ip := "172.16.0.1"
		s := newService("svc", ip, true)
		// Add other external ips to ensure they are not affected by controller
		s.Spec.ExternalIPs = []string{"172.16.1.1", ip, "172.16.1.2"}
		key := fmt.Sprintf("%s/%s", namespace, s.Name)
		c.clearPersistedAllocation(s, key, "")

		expectedExternalIPs := []string{"172.16.1.1", "172.16.1.2"}
		externalIPs := persistedService.Spec.ExternalIPs
		if !reflect.DeepEqual(expectedExternalIPs, externalIPs) {
			t.Errorf("%s: Expected ExternalIPs %v, got %v", test.testName, expectedExternalIPs, externalIPs)
		}
		ingressIPCount := len(persistedService.Status.LoadBalancer.Ingress)
		if test.ingressIPCount != ingressIPCount {
			t.Errorf("%s: Expected %d ingress ips, got %d", test.testName, test.ingressIPCount, ingressIPCount)
		}
	}
}

// TestBasicControllerFlow validates controller start, initial sync
// processing, and post-sync processing.
func TestBasicControllerFlow(t *testing.T) {
	startingObjects := []runtime.Object{
		newService("lb-unallocated", "", true),
	}

	stopChannel := make(chan struct{})
	defer close(stopChannel)

	_, fakeWatch, controller := controllerSetup(t, startingObjects)

	updated := make(chan bool)
	deleted := make(chan bool)

	controller.changeHandler = func(change *serviceChange) error {
		defer func() {
			if len(change.key) == 0 {
				deleted <- true
			} else if change.oldService != nil {
				updated <- true
			}
		}()

		err := controller.processChange(change)
		if err != nil {
			t.Errorf("unexpected error: %v", err)
		}

		return err
	}

	go controller.Run(stopChannel)

	waitForUpdate := func(msg string) {
		t.Logf("waiting for: %v", msg)
		select {
		case <-updated:
		case <-time.After(time.Duration(30 * time.Second)):
			t.Fatalf("failed to see: %v", msg)
		}
	}

	waitForUpdate("spec update")
	waitForUpdate("status update")

	fakeWatch.Delete(startingObjects[0])

	t.Log("waiting for the service to be deleted")
	select {
	case <-deleted:
	case <-time.After(time.Duration(30 * time.Second)):
		t.Fatalf("failed to see expected service deletion")
	}
}