Browse code

Support network ingress on arbitrary ports

This change adds a new controller that allocates ips to services of
type load balancer from a range configured by IngressIPNetworkCIDR.
This is intended to support ip-based traffic ingress on bare metal.

Maru Newby authored on 2016/07/18 23:15:39
Showing 15 changed files
... ...
@@ -450,6 +450,11 @@ type MasterNetworkConfig struct {
450 450
 	// CIDR will be rejected. Rejections will be applied first, then the IP checked against one of the allowed CIDRs. You
451 451
 	// should ensure this range does not overlap with your nodes, pods, or service CIDRs for security reasons.
452 452
 	ExternalIPNetworkCIDRs []string
453
+	// IngressIPNetworkCIDR controls the range to assign ingress ips from for services of type LoadBalancer on bare
454
+	// metal. If empty, ingress ips will not be assigned. It may contain a single CIDR that will be allocated from.
455
+	// For security reasons, you should ensure that this range does not overlap with the CIDRs reserved for external ips,
456
+	// nodes, pods, or services.
457
+	IngressIPNetworkCIDR string
453 458
 }
454 459
 
455 460
 type ImageConfig struct {
... ...
@@ -482,6 +482,7 @@ var map_MasterNetworkConfig = map[string]string{
482 482
 	"hostSubnetLength":       "HostSubnetLength is the number of bits to allocate to each host's subnet e.g. 8 would mean a /24 network on the host",
483 483
 	"serviceNetworkCIDR":     "ServiceNetwork is the CIDR string to specify the service networks",
484 484
 	"externalIPNetworkCIDRs": "ExternalIPNetworkCIDRs controls what values are acceptable for the service external IP field. If empty, no externalIP may be set. It may contain a list of CIDRs which are checked for access. If a CIDR is prefixed with !, IPs in that CIDR will be rejected. Rejections will be applied first, then the IP checked against one of the allowed CIDRs. You should ensure this range does not overlap with your nodes, pods, or service CIDRs for security reasons.",
485
+	"ingressIPNetworkCIDR":   "IngressIPNetworkCIDR controls the range to assign ingress ips from for services of type LoadBalancer on bare metal. If empty, ingress ips will not be assigned. It may contain a single CIDR that will be allocated from. For security reasons, you should ensure that this range does not overlap with the CIDRs reserved for external ips, nodes, pods, or services.",
485 486
 }
486 487
 
487 488
 func (MasterNetworkConfig) SwaggerDoc() map[string]string {
... ...
@@ -401,6 +401,11 @@ type MasterNetworkConfig struct {
401 401
 	// CIDR will be rejected. Rejections will be applied first, then the IP checked against one of the allowed CIDRs. You
402 402
 	// should ensure this range does not overlap with your nodes, pods, or service CIDRs for security reasons.
403 403
 	ExternalIPNetworkCIDRs []string `json:"externalIPNetworkCIDRs"`
404
+	// IngressIPNetworkCIDR controls the range to assign ingress ips from for services of type LoadBalancer on bare
405
+	// metal. If empty, ingress ips will not be assigned. It may contain a single CIDR that will be allocated from.
406
+	// For security reasons, you should ensure that this range does not overlap with the CIDRs reserved for external ips,
407
+	// nodes, pods, or services.
408
+	IngressIPNetworkCIDR string `json:"ingressIPNetworkCIDR"`
404 409
 }
405 410
 
406 411
 // ImageConfig holds the necessary configuration options for building image names for system components
... ...
@@ -199,6 +199,7 @@ networkConfig:
199 199
   clusterNetworkCIDR: ""
200 200
   externalIPNetworkCIDRs: null
201 201
   hostSubnetLength: 0
202
+  ingressIPNetworkCIDR: ""
202 203
   networkPluginName: ""
203 204
   serviceNetworkCIDR: ""
204 205
 oauthConfig:
... ...
@@ -157,6 +157,12 @@ func ValidateMasterConfig(config *api.MasterConfig, fldPath *field.Path) Validat
157 157
 			}
158 158
 		}
159 159
 	}
160
+	if len(config.NetworkConfig.IngressIPNetworkCIDR) > 0 {
161
+		cidr := config.NetworkConfig.IngressIPNetworkCIDR
162
+		if _, ipNet, err := net.ParseCIDR(cidr); err != nil || ipNet.IP.IsUnspecified() {
163
+			validationResults.AddErrors(field.Invalid(fldPath.Child("networkConfig", "ingressIPNetworkCIDR").Index(0), cidr, "must be a valid CIDR notation IP range (e.g. 172.30.0.0/16)"))
164
+		}
165
+	}
160 166
 
161 167
 	validationResults.AddErrors(ValidateKubeConfig(config.MasterClients.OpenShiftLoopbackKubeConfig, fldPath.Child("masterClients", "openShiftLoopbackKubeConfig"))...)
162 168
 
... ...
@@ -60,6 +60,9 @@ const (
60 60
 
61 61
 	InfraEndpointControllerServiceAccountName = "endpoint-controller"
62 62
 	EndpointControllerRoleName                = "system:endpoint-controller"
63
+
64
+	InfraServiceIngressIPControllerServiceAccountName = "service-ingress-ip-controller"
65
+	ServiceIngressIPControllerRoleName                = "system:service-ingress-ip-controller"
63 66
 )
64 67
 
65 68
 type InfraServiceAccounts struct {
... ...
@@ -766,4 +769,41 @@ func init() {
766 766
 		panic(err)
767 767
 	}
768 768
 
769
+	err = InfraSAs.addServiceAccount(
770
+		InfraServiceIngressIPControllerServiceAccountName,
771
+		authorizationapi.ClusterRole{
772
+			ObjectMeta: kapi.ObjectMeta{
773
+				Name: ServiceIngressIPControllerRoleName,
774
+			},
775
+			Rules: []authorizationapi.PolicyRule{
776
+				// Listing and watching services
777
+				{
778
+					APIGroups: []string{kapi.GroupName},
779
+					Verbs:     sets.NewString("list", "watch"),
780
+					Resources: sets.NewString("services"),
781
+				},
782
+				// IngressIPController.persistSpec changes the spec of the service
783
+				{
784
+					APIGroups: []string{kapi.GroupName},
785
+					Verbs:     sets.NewString("update"),
786
+					Resources: sets.NewString("services"),
787
+				},
788
+				// IngressIPController.persistStatus changes the status of the service
789
+				{
790
+					APIGroups: []string{kapi.GroupName},
791
+					Verbs:     sets.NewString("update"),
792
+					Resources: sets.NewString("services/status"),
793
+				},
794
+				// IngressIPController.recorder
795
+				{
796
+					Verbs:     sets.NewString("create", "update", "patch"),
797
+					Resources: sets.NewString("events"),
798
+				},
799
+			},
800
+		},
801
+	)
802
+	if err != nil {
803
+		panic(err)
804
+	}
805
+
769 806
 }
... ...
@@ -464,7 +464,9 @@ func newAdmissionChain(pluginNames []string, admissionConfigFilename string, plu
464 464
 				// should have been caught with validation
465 465
 				return nil, err
466 466
 			}
467
-			plugins = append(plugins, serviceadmit.NewExternalIPRanger(reject, admit))
467
+			// TODO need to disallow if a cloud provider is configured
468
+			allowIngressIP := len(options.NetworkConfig.IngressIPNetworkCIDR) > 0
469
+			plugins = append(plugins, serviceadmit.NewExternalIPRanger(reject, admit, allowIngressIP))
468 470
 
469 471
 		case serviceadmit.RestrictedEndpointsPluginName:
470 472
 			// we need to set some customer parameters, so create by hand
... ...
@@ -44,6 +44,7 @@ import (
44 44
 	"github.com/openshift/origin/pkg/security/mcs"
45 45
 	"github.com/openshift/origin/pkg/security/uid"
46 46
 	"github.com/openshift/origin/pkg/security/uidallocator"
47
+	"github.com/openshift/origin/pkg/service/controller/ingressip"
47 48
 	servingcertcontroller "github.com/openshift/origin/pkg/service/controller/servingcert"
48 49
 
49 50
 	configapi "github.com/openshift/origin/pkg/cmd/server/api"
... ...
@@ -61,6 +62,8 @@ const (
61 61
 
62 62
 	// from CMServer MinResyncPeriod
63 63
 	defaultReplenishmentSyncPeriod time.Duration = 12 * time.Hour
64
+
65
+	defaultIngressIPSyncPeriod time.Duration = 10 * time.Minute
64 66
 )
65 67
 
66 68
 // RunProjectAuthorizationCache starts the project authorization cache
... ...
@@ -516,3 +519,19 @@ func (c *MasterConfig) RunClusterQuotaReconciliationController() {
516 516
 	c.ClusterQuotaMappingController.GetClusterQuotaMapper().AddListener(controller)
517 517
 	go controller.Run(5, utilwait.NeverStop)
518 518
 }
519
+
520
+// RunIngressIPController starts the ingress ip controller if IngressIPNetworkCIDR is configured.
521
+func (c *MasterConfig) RunIngressIPController(client *kclient.Client) {
522
+	// TODO need to disallow if a cloud provider is configured
523
+	if len(c.Options.NetworkConfig.IngressIPNetworkCIDR) == 0 {
524
+		return
525
+	}
526
+
527
+	_, ipNet, err := net.ParseCIDR(c.Options.NetworkConfig.IngressIPNetworkCIDR)
528
+	if err != nil {
529
+		// should have been caught with validation
530
+		glog.Fatalf("Unable to start ingress ip controller: %v", err)
531
+	}
532
+	ingressIPController := ingressip.NewIngressIPController(client, ipNet, defaultIngressIPSyncPeriod)
533
+	go ingressIPController.Run(utilwait.NeverStop)
534
+}
... ...
@@ -669,6 +669,12 @@ func startControllers(oc *origin.MasterConfig, kc *kubernetes.MasterConfig) erro
669 669
 	}
670 670
 	oc.RunServiceServingCertController(serviceServingCertClient)
671 671
 
672
+	_, _, ingressIPClient, err := oc.GetServiceAccountClients(bootstrappolicy.InfraServiceIngressIPControllerServiceAccountName)
673
+	if err != nil {
674
+		glog.Fatalf("Could not get client: %v", err)
675
+	}
676
+	oc.RunIngressIPController(ingressIPClient)
677
+
672 678
 	glog.Infof("Started Origin Controllers")
673 679
 
674 680
 	return nil
... ...
@@ -16,14 +16,15 @@ const ExternalIPPluginName = "ExternalIPRanger"
16 16
 
17 17
 func init() {
18 18
 	kadmission.RegisterPlugin("ExternalIPRanger", func(client clientset.Interface, config io.Reader) (kadmission.Interface, error) {
19
-		return NewExternalIPRanger(nil, nil), nil
19
+		return NewExternalIPRanger(nil, nil, false), nil
20 20
 	})
21 21
 }
22 22
 
23 23
 type externalIPRanger struct {
24 24
 	*kadmission.Handler
25
-	reject []*net.IPNet
26
-	admit  []*net.IPNet
25
+	reject         []*net.IPNet
26
+	admit          []*net.IPNet
27
+	allowIngressIP bool
27 28
 }
28 29
 
29 30
 var _ kadmission.Interface = &externalIPRanger{}
... ...
@@ -51,11 +52,12 @@ func ParseRejectAdmitCIDRRules(rules []string) (reject, admit []*net.IPNet, err
51 51
 }
52 52
 
53 53
 // NewConstraint creates a new SCC constraint admission plugin.
54
-func NewExternalIPRanger(reject, admit []*net.IPNet) *externalIPRanger {
54
+func NewExternalIPRanger(reject, admit []*net.IPNet, allowIngressIP bool) *externalIPRanger {
55 55
 	return &externalIPRanger{
56
-		Handler: kadmission.NewHandler(kadmission.Create, kadmission.Update),
57
-		reject:  reject,
58
-		admit:   admit,
56
+		Handler:        kadmission.NewHandler(kadmission.Create, kadmission.Update),
57
+		reject:         reject,
58
+		admit:          admit,
59
+		allowIngressIP: allowIngressIP,
59 60
 	}
60 61
 }
61 62
 
... ...
@@ -84,11 +86,30 @@ func (r *externalIPRanger) Admit(a kadmission.Attributes) error {
84 84
 		return nil
85 85
 	}
86 86
 
87
+	// Determine if an ingress ip address should be allowed as an
88
+	// external ip by checking the loadbalancer status of the previous
89
+	// object state. Only updates need to be validated against the
90
+	// ingress ip since the loadbalancer status cannot be set on
91
+	// create.
92
+	ingressIP := ""
93
+	retrieveIngressIP := a.GetOperation() == kadmission.Update &&
94
+		r.allowIngressIP && svc.Spec.Type == kapi.ServiceTypeLoadBalancer
95
+	if retrieveIngressIP {
96
+		old, ok := a.GetOldObject().(*kapi.Service)
97
+		ipPresent := ok && old != nil && len(old.Status.LoadBalancer.Ingress) > 0
98
+		if ipPresent {
99
+			ingressIP = old.Status.LoadBalancer.Ingress[0].IP
100
+		}
101
+	}
102
+
87 103
 	var errs field.ErrorList
88 104
 	switch {
89 105
 	// administrator disabled externalIPs
90 106
 	case len(svc.Spec.ExternalIPs) > 0 && len(r.admit) == 0:
91
-		errs = append(errs, field.Forbidden(field.NewPath("spec", "externalIPs"), "externalIPs have been disabled"))
107
+		onlyIngressIP := len(svc.Spec.ExternalIPs) == 1 && svc.Spec.ExternalIPs[0] == ingressIP
108
+		if !onlyIngressIP {
109
+			errs = append(errs, field.Forbidden(field.NewPath("spec", "externalIPs"), "externalIPs have been disabled"))
110
+		}
92 111
 	// administrator has limited the range
93 112
 	case len(svc.Spec.ExternalIPs) > 0 && len(r.admit) > 0:
94 113
 		for i, s := range svc.Spec.ExternalIPs {
... ...
@@ -97,7 +118,8 @@ func (r *externalIPRanger) Admit(a kadmission.Attributes) error {
97 97
 				errs = append(errs, field.Forbidden(field.NewPath("spec", "externalIPs").Index(i), "externalIPs must be a valid address"))
98 98
 				continue
99 99
 			}
100
-			if NetworkSlice(r.reject).Contains(ip) || !NetworkSlice(r.admit).Contains(ip) {
100
+			notIngressIP := s != ingressIP
101
+			if (NetworkSlice(r.reject).Contains(ip) || !NetworkSlice(r.admit).Contains(ip)) && notIngressIP {
101 102
 				errs = append(errs, field.Forbidden(field.NewPath("spec", "externalIPs").Index(i), "externalIP is not allowed"))
102 103
 				continue
103 104
 			}
... ...
@@ -14,6 +14,7 @@ func TestAdmission(t *testing.T) {
14 14
 	svc := &kapi.Service{
15 15
 		ObjectMeta: kapi.ObjectMeta{Name: "test"},
16 16
 	}
17
+	var oldSvc *kapi.Service
17 18
 
18 19
 	_, ipv4, err := net.ParseCIDR("172.0.0.0/16")
19 20
 	if err != nil {
... ...
@@ -43,6 +44,8 @@ func TestAdmission(t *testing.T) {
43 43
 		externalIPs     []string
44 44
 		admit           bool
45 45
 		errFn           func(err error) bool
46
+		loadBalancer    bool
47
+		ingressIP       string
46 48
 	}{
47 49
 		{
48 50
 			admit:    true,
... ...
@@ -128,7 +131,6 @@ func TestAdmission(t *testing.T) {
128 128
 			op:          admission.Update,
129 129
 			testName:    "IP in range on update",
130 130
 		},
131
-
132 131
 		// other checks
133 132
 		{
134 133
 			admit:       false,
... ...
@@ -156,12 +158,65 @@ func TestAdmission(t *testing.T) {
156 156
 			op:          admission.Create,
157 157
 			testName:    "rejections can cover the entire range",
158 158
 		},
159
+		// Ingress IP checks
160
+		{
161
+			admit:        true,
162
+			externalIPs:  []string{"1.2.3.4"},
163
+			op:           admission.Update,
164
+			testName:     "Ingress ip allowed when external ips are disabled",
165
+			loadBalancer: true,
166
+			ingressIP:    "1.2.3.4",
167
+		},
168
+		{
169
+			admit:        true,
170
+			admits:       []*net.IPNet{ipv4},
171
+			externalIPs:  []string{"1.2.3.4", "172.0.0.1"},
172
+			op:           admission.Update,
173
+			testName:     "Ingress ip allowed when external ips are enabled",
174
+			loadBalancer: true,
175
+			ingressIP:    "1.2.3.4",
176
+		},
177
+		{
178
+			admit:        false,
179
+			admits:       []*net.IPNet{ipv4},
180
+			externalIPs:  []string{"1.2.3.4", "172.0.0.1"},
181
+			op:           admission.Update,
182
+			testName:     "Ingress ip not allowed for non-lb service",
183
+			loadBalancer: false,
184
+			ingressIP:    "1.2.3.4",
185
+		},
159 186
 	}
160 187
 	for _, test := range tests {
161 188
 		svc.Spec.ExternalIPs = test.externalIPs
162
-		handler := NewExternalIPRanger(test.rejects, test.admits)
189
+		allowIngressIP := len(test.ingressIP) > 0 || test.loadBalancer
190
+		handler := NewExternalIPRanger(test.rejects, test.admits, allowIngressIP)
191
+
192
+		if test.loadBalancer {
193
+			svc.Spec.Type = kapi.ServiceTypeLoadBalancer
194
+		} else {
195
+			svc.Spec.Type = kapi.ServiceTypeClusterIP
196
+		}
197
+
198
+		if len(test.ingressIP) > 0 {
199
+			// Provide an ingress ip via the previous object state
200
+			oldSvc = &kapi.Service{
201
+				ObjectMeta: kapi.ObjectMeta{Name: "test"},
202
+				Status: kapi.ServiceStatus{
203
+					LoadBalancer: kapi.LoadBalancerStatus{
204
+						Ingress: []kapi.LoadBalancerIngress{
205
+							{
206
+								IP: test.ingressIP,
207
+							},
208
+						},
209
+					},
210
+				},
211
+			}
212
+
213
+		} else {
214
+			oldSvc = nil
215
+		}
163 216
 
164
-		err := handler.Admit(admission.NewAttributesRecord(svc, nil, kapi.Kind("Service").WithVersion("version"), "namespace", svc.ObjectMeta.Name, kapi.Resource("services").WithVersion("version"), "", test.op, nil))
217
+		err := handler.Admit(admission.NewAttributesRecord(svc, oldSvc, kapi.Kind("Service").WithVersion("version"), "namespace", svc.ObjectMeta.Name, kapi.Resource("services").WithVersion("version"), "", test.op, nil))
165 218
 		if test.admit && err != nil {
166 219
 			t.Errorf("%s: expected no error but got: %s", test.testName, err)
167 220
 		} else if !test.admit && err == nil {
... ...
@@ -180,7 +235,7 @@ func TestHandles(t *testing.T) {
180 180
 		admission.Connect: false,
181 181
 		admission.Delete:  false,
182 182
 	} {
183
-		ranger := NewExternalIPRanger(nil, nil)
183
+		ranger := NewExternalIPRanger(nil, nil, false)
184 184
 		if e, a := shouldHandle, ranger.Handles(op); e != a {
185 185
 			t.Errorf("%v: shouldHandle=%t, handles=%t", op, e, a)
186 186
 		}
187 187
new file mode 100644
... ...
@@ -0,0 +1,698 @@
0
+package ingressip
1
+
2
+import (
3
+	"fmt"
4
+	"net"
5
+	"sort"
6
+	"sync"
7
+	"time"
8
+
9
+	"github.com/golang/glog"
10
+	kapi "k8s.io/kubernetes/pkg/api"
11
+	kerrors "k8s.io/kubernetes/pkg/api/errors"
12
+	"k8s.io/kubernetes/pkg/client/cache"
13
+	"k8s.io/kubernetes/pkg/client/record"
14
+	kclient "k8s.io/kubernetes/pkg/client/unversioned"
15
+	"k8s.io/kubernetes/pkg/controller"
16
+	"k8s.io/kubernetes/pkg/controller/framework"
17
+	"k8s.io/kubernetes/pkg/registry/service/allocator"
18
+	"k8s.io/kubernetes/pkg/registry/service/ipallocator"
19
+	"k8s.io/kubernetes/pkg/runtime"
20
+	utilruntime "k8s.io/kubernetes/pkg/util/runtime"
21
+	"k8s.io/kubernetes/pkg/util/sets"
22
+	"k8s.io/kubernetes/pkg/util/wait"
23
+	"k8s.io/kubernetes/pkg/util/workqueue"
24
+	"k8s.io/kubernetes/pkg/watch"
25
+)
26
+
27
+const (
28
+	// It's necessary to allocate for the initial sync in consistent
29
+	// order rather than in the order received.  This requires waiting
30
+	// until the initial sync has been processed, and to avoid a hot
31
+	// loop, we'll wait this long between checks.
32
+	SyncProcessedPollPeriod = 100 * time.Millisecond
33
+
34
+	clientRetryCount    = 5
35
+	clientRetryInterval = 5 * time.Second
36
+)
37
+
38
+// IngressIPController is responsible for allocating ingress ip
39
+// addresses to Service objects of type LoadBalancer.
40
+type IngressIPController struct {
41
+	client kclient.ServicesNamespacer
42
+
43
+	controller *framework.Controller
44
+
45
+	maxRetries int
46
+
47
+	// Tracks ip allocation for the configured range
48
+	ipAllocator *ipallocator.Range
49
+	// Tracks ip -> service key to allow detection of duplicate ip
50
+	// allocations.
51
+	allocationMap map[string]string
52
+
53
+	// Tracks services requeued for allocation when the range is full.
54
+	requeuedAllocations sets.String
55
+
56
+	// Protects the transition between initial sync and regular processing
57
+	lock  sync.Mutex
58
+	cache cache.Store
59
+	queue workqueue.RateLimitingInterface
60
+
61
+	// recorder is used to record events.
62
+	recorder record.EventRecorder
63
+
64
+	// changeHandler does the work. It can be factored out for unit testing.
65
+	changeHandler func(change *serviceChange) error
66
+	// persistenceHandler persists service changes.  It can be factored out for unit testing
67
+	persistenceHandler func(client kclient.ServicesNamespacer, service *kapi.Service, targetStatus bool) error
68
+}
69
+
70
+type serviceChange struct {
71
+	key                string
72
+	oldService         *kapi.Service
73
+	requeuedAllocation bool
74
+}
75
+
76
+// NewIngressIPController creates a new IngressIPController.
77
+// TODO this should accept a shared informer
78
+func NewIngressIPController(kc kclient.Interface, ipNet *net.IPNet, resyncInterval time.Duration) *IngressIPController {
79
+	eventBroadcaster := record.NewBroadcaster()
80
+	eventBroadcaster.StartRecordingToSink(kc.Events(""))
81
+	recorder := eventBroadcaster.NewRecorder(kapi.EventSource{Component: "ingressip-controller"})
82
+
83
+	ic := &IngressIPController{
84
+		client:     kc,
85
+		queue:      workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
86
+		maxRetries: 10,
87
+		recorder:   recorder,
88
+	}
89
+
90
+	ic.cache, ic.controller = framework.NewInformer(
91
+		&cache.ListWatch{
92
+			ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
93
+				return ic.client.Services(kapi.NamespaceAll).List(options)
94
+			},
95
+			WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
96
+				return ic.client.Services(kapi.NamespaceAll).Watch(options)
97
+			},
98
+		},
99
+		&kapi.Service{},
100
+		resyncInterval,
101
+		framework.ResourceEventHandlerFuncs{
102
+			AddFunc: func(obj interface{}) {
103
+				service := obj.(*kapi.Service)
104
+				glog.V(5).Infof("Adding service %s/%s", service.Namespace, service.Name)
105
+				ic.enqueueChange(obj, nil)
106
+			},
107
+			UpdateFunc: func(old, cur interface{}) {
108
+				service := cur.(*kapi.Service)
109
+				glog.V(5).Infof("Updating service %s/%s", service.Namespace, service.Name)
110
+				ic.enqueueChange(cur, old)
111
+			},
112
+			DeleteFunc: func(obj interface{}) {
113
+				service := obj.(*kapi.Service)
114
+				glog.V(5).Infof("Deleting service %s/%s", service.Namespace, service.Name)
115
+				ic.enqueueChange(nil, obj)
116
+			},
117
+		},
118
+	)
119
+
120
+	ic.changeHandler = ic.processChange
121
+	ic.persistenceHandler = persistService
122
+
123
+	ic.ipAllocator = ipallocator.NewAllocatorCIDRRange(ipNet, func(max int, rangeSpec string) allocator.Interface {
124
+		return allocator.NewAllocationMap(max, rangeSpec)
125
+	})
126
+
127
+	ic.allocationMap = make(map[string]string)
128
+	ic.requeuedAllocations = sets.NewString()
129
+
130
+	return ic
131
+}
132
+
133
+// enqueueChange transforms the old and new objects into a change
134
+// object and queues it.  A lock is shared with processInitialSync to
135
+// avoid enqueueing while the changes from the initial sync are being
136
+// processed.
137
+func (ic *IngressIPController) enqueueChange(new interface{}, old interface{}) {
138
+	ic.lock.Lock()
139
+	defer ic.lock.Unlock()
140
+
141
+	change := &serviceChange{}
142
+
143
+	if new != nil {
144
+		// Queue the key needed to retrieve the lastest state from the
145
+		// cache when the change is processed.
146
+		key, err := controller.KeyFunc(new)
147
+		if err != nil {
148
+			utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", new, err))
149
+			return
150
+		}
151
+		change.key = key
152
+	}
153
+
154
+	if old != nil {
155
+		change.oldService = old.(*kapi.Service)
156
+	}
157
+
158
+	ic.queue.Add(change)
159
+}
160
+
161
+// Run begins watching and syncing.
162
+func (ic *IngressIPController) Run(stopCh <-chan struct{}) {
163
+	defer utilruntime.HandleCrash()
164
+	go ic.controller.Run(stopCh)
165
+
166
+	glog.V(5).Infof("Waiting for the initial sync to be completed")
167
+	for !ic.controller.HasSynced() {
168
+		select {
169
+		case <-time.After(SyncProcessedPollPeriod):
170
+		case <-stopCh:
171
+			return
172
+		}
173
+	}
174
+
175
+	if !ic.processInitialSync() {
176
+		return
177
+	}
178
+
179
+	glog.V(5).Infof("Starting normal worker")
180
+	for {
181
+		if !ic.work() {
182
+			break
183
+		}
184
+	}
185
+
186
+	glog.V(5).Infof("Shutting down ingress ip controller")
187
+	ic.queue.ShutDown()
188
+}
189
+
190
+type serviceAge []*kapi.Service
191
+
192
+func (s serviceAge) Len() int      { return len(s) }
193
+func (s serviceAge) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
194
+func (s serviceAge) Less(i, j int) bool {
195
+	if s[i].CreationTimestamp.Before(s[j].CreationTimestamp) {
196
+		return true
197
+	}
198
+	return (s[i].CreationTimestamp == s[j].CreationTimestamp && s[i].UID < s[j].UID)
199
+}
200
+
201
+// processInitialSync processes the items queued by informer's initial sync.
202
+// A lock is shared between this method and enqueueService to ensure
203
+// that queue additions are blocked while the sync is being processed.
204
+// Returns a boolean indication of whether processing should continue.
205
+func (ic *IngressIPController) processInitialSync() bool {
206
+	ic.lock.Lock()
207
+	defer ic.lock.Unlock()
208
+
209
+	glog.V(5).Infof("Processing initial sync")
210
+
211
+	// Track services that need to be processed after existing
212
+	// allocations are recorded.
213
+	var pendingServices []*kapi.Service
214
+
215
+	// Track post-sync changes that need to be added back the queue
216
+	// after allocations are recorded.  These changes may end up in
217
+	// the queue if watch events were queued before completion of the
218
+	// initial sync was detected.
219
+	var pendingChanges []*serviceChange
220
+
221
+	// Drain the queue.  Len() should be safe because enqueueService
222
+	// requires the same lock held by this method.
223
+	for ic.queue.Len() > 0 {
224
+		item, quit := ic.queue.Get()
225
+		if quit {
226
+			return false
227
+		}
228
+		ic.queue.Done(item)
229
+		ic.queue.Forget(item)
230
+		change := item.(*serviceChange)
231
+
232
+		// The initial sync only includes additions, so if an update
233
+		// or delete is seen (indicated by the presence of oldService),
234
+		// it and all subsequent changes are post-sync watch events that
235
+		// should be queued without processing.
236
+		postSyncChange := change.oldService != nil || len(pendingChanges) > 0
237
+		if postSyncChange {
238
+			pendingChanges = append(pendingChanges, change)
239
+			continue
240
+		}
241
+
242
+		service := ic.getCachedService(change.key)
243
+		if service == nil {
244
+			// Service was deleted
245
+			continue
246
+		}
247
+
248
+		if service.Spec.Type == kapi.ServiceTypeLoadBalancer {
249
+			// Save for subsequent addition back to the queue to
250
+			// ensure persistent state is updated during regular
251
+			// processing.
252
+			pendingServices = append(pendingServices, service)
253
+
254
+			if len(service.Status.LoadBalancer.Ingress) > 0 {
255
+				// The service has an existing allocation
256
+				ipString := service.Status.LoadBalancer.Ingress[0].IP
257
+				// Return values indicating that reallocation is
258
+				// necessary or that an error occurred can be ignored
259
+				// since the service will be processed again.
260
+				ic.recordLocalAllocation(change.key, ipString)
261
+			}
262
+		}
263
+	}
264
+
265
+	// Add pending service additions back to the queue in consistent order.
266
+	sort.Sort(serviceAge(pendingServices))
267
+	for _, service := range pendingServices {
268
+		if key, err := controller.KeyFunc(service); err == nil {
269
+			glog.V(5).Infof("Adding service back to queue: %v ", key)
270
+			change := &serviceChange{key: key}
271
+			ic.queue.Add(change)
272
+		} else {
273
+			// This error should have been caught by enqueueService
274
+			utilruntime.HandleError(fmt.Errorf("Couldn't get key for service %+v: %v", service, err))
275
+			continue
276
+		}
277
+	}
278
+
279
+	// Add watch events back to the queue
280
+	for _, change := range pendingChanges {
281
+		ic.queue.Add(change)
282
+	}
283
+
284
+	glog.V(5).Infof("Completed processing initial sync")
285
+
286
+	return true
287
+}
288
+
289
+// getCachedService logs if unable to retrieve a service for the given key.
290
+func (ic *IngressIPController) getCachedService(key string) *kapi.Service {
291
+	if len(key) == 0 {
292
+		return nil
293
+	}
294
+	if obj, exists, err := ic.cache.GetByKey(key); err != nil {
295
+		glog.V(5).Infof("Unable to retrieve service %v from store: %v", key, err)
296
+	} else if !exists {
297
+		glog.V(6).Infof("Service %v has been deleted", key)
298
+	} else {
299
+		return obj.(*kapi.Service)
300
+	}
301
+	return nil
302
+}
303
+
304
+// recordLocalAllocation attempts to update local state for the given
305
+// service key and ingress ip.  Returns a boolean indication of
306
+// whether reallocation is necessary and an error indicating the
307
+// reason for reallocation.  If reallocation is not indicated, a
308
+// non-nil error indicates an exceptional condition.
309
+func (ic *IngressIPController) recordLocalAllocation(key, ipString string) (reallocate bool, err error) {
310
+	ip := net.ParseIP(ipString)
311
+	if ip == nil {
312
+		return true, fmt.Errorf("Service %v has an invalid ingress ip %v.  A new ip will be allocated.", key, ipString)
313
+	}
314
+
315
+	ipKey, ok := ic.allocationMap[ipString]
316
+	switch {
317
+	case ok && ipKey == key:
318
+		// Allocation exists for this service
319
+		return false, nil
320
+	case ok && ipKey != key:
321
+		// TODO prefer removing the allocation from a service that does not have a matching LoadBalancerIP
322
+		return true, fmt.Errorf("Another service is using ingress ip %v.  A new ip will be allocated for %v.", ipString, key)
323
+	}
324
+
325
+	err = ic.ipAllocator.Allocate(ip)
326
+	switch {
327
+	case err == ipallocator.ErrNotInRange:
328
+		return true, fmt.Errorf("The ingress ip %v for service %v is not in the ingress range.  A new ip will be allocated.", ipString, key)
329
+	case err != nil:
330
+		// The only other error that Allocate() can throw is ErrAllocated, but that
331
+		// should not happen after the check against the allocation map.
332
+		return false, fmt.Errorf("Unexpected error from ip allocator for service %v: %v", key, err)
333
+	}
334
+	ic.allocationMap[ipString] = key
335
+	glog.V(5).Infof("Recorded allocation of ip %v for service %v", ipString, key)
336
+	return false, nil
337
+}
338
+
339
+// work dispatches the next item in the queue to the change handler.
340
+// If the change handler returns an error, the change will be added to
341
+// the end of the queue to be processed again.  Returns a boolean
342
+// indication of whether processing should continue.
343
+func (ic *IngressIPController) work() bool {
344
+	item, quit := ic.queue.Get()
345
+	if quit {
346
+		return false
347
+	}
348
+	change := item.(*serviceChange)
349
+	defer ic.queue.Done(change)
350
+
351
+	if change.requeuedAllocation {
352
+		// Reset the allocation state so that the change can be
353
+		// requeued if necessary.  Only additions/updates are requeued
354
+		// for allocation so change.key should be non-empty.
355
+		change.requeuedAllocation = false
356
+		ic.requeuedAllocations.Delete(change.key)
357
+	}
358
+
359
+	if err := ic.changeHandler(change); err == nil {
360
+		// No further processing required
361
+		ic.queue.Forget(change)
362
+	} else {
363
+		if err == ipallocator.ErrFull {
364
+			// When the range is full, avoid requeueing more than a
365
+			// single change requiring allocation per service.
366
+			// Otherwise the queue could grow without bounds as every
367
+			// service update would add another change that would be
368
+			// endlessly requeued.
369
+			if ic.requeuedAllocations.Has(change.key) {
370
+				return true
371
+			}
372
+			change.requeuedAllocation = true
373
+			ic.requeuedAllocations.Insert(change.key)
374
+			service := ic.getCachedService(change.key)
375
+			if service != nil {
376
+				ic.recorder.Eventf(service, kapi.EventTypeWarning, "IngressIPRangeFull", "No available ingress ip to allocate to service %s", change.key)
377
+			}
378
+		}
379
+		// Failed but can be retried
380
+		utilruntime.HandleError(fmt.Errorf("error syncing service, it will be retried: %v", err))
381
+		ic.queue.AddRateLimited(change)
382
+	}
383
+
384
+	return true
385
+}
386
+
387
+// processChange responds to a service change by synchronizing the
388
+// local and persisted ingress ip allocation state of the service.
389
+func (ic *IngressIPController) processChange(change *serviceChange) error {
390
+	service := ic.getCachedService(change.key)
391
+
392
+	ic.clearOldAllocation(service, change.oldService)
393
+
394
+	if service == nil {
395
+		// Service was deleted - no further processing required
396
+		return nil
397
+	}
398
+
399
+	typeLoadBalancer := service.Spec.Type == kapi.ServiceTypeLoadBalancer
400
+	hasAllocation := len(service.Status.LoadBalancer.Ingress) > 0
401
+	switch {
402
+	case typeLoadBalancer && hasAllocation:
403
+		return ic.recordAllocation(service, change.key)
404
+	case typeLoadBalancer && !hasAllocation:
405
+		return ic.allocate(service, change.key)
406
+	case !typeLoadBalancer && hasAllocation:
407
+		return ic.deallocate(service, change.key)
408
+	default:
409
+		return nil
410
+	}
411
+}
412
+
413
+// clearOldAllocation clears the old allocation for a service if it
414
+// differs from a new allocation.  Returns a boolean indication of
415
+// whether the old allocation was cleared.
416
+func (ic *IngressIPController) clearOldAllocation(new, old *kapi.Service) bool {
417
+	oldIP := ""
418
+	if old != nil && old.Spec.Type == kapi.ServiceTypeLoadBalancer && len(old.Status.LoadBalancer.Ingress) > 0 {
419
+		oldIP = old.Status.LoadBalancer.Ingress[0].IP
420
+	}
421
+	noOldAllocation := len(oldIP) == 0
422
+	if noOldAllocation {
423
+		return false
424
+	}
425
+
426
+	newIP := ""
427
+	if new != nil && new.Spec.Type == kapi.ServiceTypeLoadBalancer && len(new.Status.LoadBalancer.Ingress) > 0 {
428
+		newIP = new.Status.LoadBalancer.Ingress[0].IP
429
+	}
430
+	allocationUnchanged := newIP == oldIP
431
+	if allocationUnchanged {
432
+		return false
433
+	}
434
+
435
+	// New allocation differs from old due to update or deletion
436
+
437
+	// Get the key from the old service since the new service may be nil
438
+	if key, err := controller.KeyFunc(old); err == nil {
439
+		ic.clearLocalAllocation(key, oldIP)
440
+		return true
441
+	} else {
442
+		// Recovery/retry not possible for this error
443
+		utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", old, err))
444
+		return false
445
+	}
446
+}
447
+
448
+// recordAllocation updates local state with the ingress ip indicated
449
+// in a service's status and ensures that the ingress ip appears in
450
+// the service's list of external ips.  If the service's ingress ip is
451
+// invalid for any reason, a new ip will be allocated.
452
+func (ic *IngressIPController) recordAllocation(service *kapi.Service, key string) error {
453
+	// If more than one ingress ip is present, it will be ignored
454
+	ipString := service.Status.LoadBalancer.Ingress[0].IP
455
+
456
+	reallocate, err := ic.recordLocalAllocation(key, ipString)
457
+	if !reallocate && err != nil {
458
+		return err
459
+	}
460
+	reallocateMessage := ""
461
+	if err != nil {
462
+		reallocateMessage = err.Error()
463
+	}
464
+
465
+	// Make a copy to modify to avoid mutating cache state
466
+	t, err := kapi.Scheme.DeepCopy(service)
467
+	if err != nil {
468
+		return err
469
+	}
470
+	service = t.(*kapi.Service)
471
+
472
+	if reallocate {
473
+		// TODO update the external ips but not the status since
474
+		// allocate() will overwrite any existing allocation.
475
+		if err = ic.clearPersistedAllocation(service, key, reallocateMessage); err != nil {
476
+			return err
477
+		}
478
+		ic.recorder.Eventf(service, kapi.EventTypeWarning, "IngressIPReallocated", reallocateMessage)
479
+		return ic.allocate(service, key)
480
+	} else {
481
+		// Ensure that the ingress ip is present in the service's spec.
482
+		return ic.ensureExternalIP(service, key, ipString)
483
+	}
484
+}
485
+
486
+// allocate assigns an unallocated ip to a service and updates the
487
+// service's persisted state.
488
+func (ic *IngressIPController) allocate(service *kapi.Service, key string) error {
489
+	// Make a copy to avoid mutating cache state
490
+	t, err := kapi.Scheme.DeepCopy(service)
491
+	if err != nil {
492
+		return err
493
+	}
494
+	service = t.(*kapi.Service)
495
+
496
+	ip, err := ic.allocateIP(service.Spec.LoadBalancerIP)
497
+	if err != nil {
498
+		return err
499
+	}
500
+	ipString := ip.String()
501
+
502
+	glog.V(5).Infof("Allocating ip %v to service %v", ipString, key)
503
+	service.Status = kapi.ServiceStatus{
504
+		LoadBalancer: kapi.LoadBalancerStatus{
505
+			Ingress: []kapi.LoadBalancerIngress{
506
+				{
507
+					IP: ipString,
508
+				},
509
+			},
510
+		},
511
+	}
512
+	if err = ic.persistServiceStatus(service); err != nil {
513
+		if releaseErr := ic.ipAllocator.Release(ip); releaseErr != nil {
514
+			// Release from contiguous allocator should never return an error, but just in case...
515
+			utilruntime.HandleError(fmt.Errorf("Error releasing ip %v for service %v: %v", ipString, key, releaseErr))
516
+		}
517
+		return err
518
+	}
519
+	ic.allocationMap[ipString] = key
520
+
521
+	return ic.ensureExternalIP(service, key, ipString)
522
+}
523
+
524
+// deallocate ensures that the ip currently allocated to a service is
525
+// removed and that its loadbalancer status is cleared.
526
+func (ic *IngressIPController) deallocate(service *kapi.Service, key string) error {
527
+	glog.V(5).Infof("Clearing allocation state for %v", key)
528
+
529
+	// Make a copy to modify to avoid mutating cache state
530
+	t, err := kapi.Scheme.DeepCopy(service)
531
+	if err != nil {
532
+		return err
533
+	}
534
+	service = t.(*kapi.Service)
535
+
536
+	// Get the ingress ip to remove from local allocation state before
537
+	// it is removed from the service.
538
+	ipString := service.Status.LoadBalancer.Ingress[0].IP
539
+
540
+	if err = ic.clearPersistedAllocation(service, key, ""); err != nil {
541
+		return err
542
+	}
543
+
544
+	ic.clearLocalAllocation(key, ipString)
545
+	return nil
546
+}
547
+
548
+// clearLocalAllocation clears an in-memory allocation if it belongs
549
+// to the specified service key.
550
+func (ic *IngressIPController) clearLocalAllocation(key, ipString string) bool {
551
+	glog.V(5).Infof("Attempting to clear local allocation of ip %v for service %v", ipString, key)
552
+
553
+	ip := net.ParseIP(ipString)
554
+	if ip == nil {
555
+		// An invalid ip address cannot be deallocated
556
+		utilruntime.HandleError(fmt.Errorf("Error parsing ip: %v", ipString))
557
+		return false
558
+	}
559
+
560
+	ipKey, ok := ic.allocationMap[ipString]
561
+	switch {
562
+	case !ok:
563
+		glog.V(6).Infof("IP address %v is not currently allocated", ipString)
564
+		return false
565
+	case key != ipKey:
566
+		glog.V(6).Infof("IP address %v is not allocated to service %v", ipString, key)
567
+		return false
568
+	}
569
+
570
+	// Remove allocation
571
+	if err := ic.ipAllocator.Release(ip); err != nil {
572
+		// Release from contiguous allocator should never return an error.
573
+		utilruntime.HandleError(fmt.Errorf("Error releasing ip %v for service %v: %v", ipString, key, err))
574
+		return false
575
+	}
576
+	delete(ic.allocationMap, ipString)
577
+	glog.V(5).Infof("IP address %v is now available for allocation", ipString)
578
+	return true
579
+}
580
+
581
+// clearPersistedAllocation ensures there is no ingress ip in the
582
+// service's spec and that the service's status is cleared.
583
+func (ic *IngressIPController) clearPersistedAllocation(service *kapi.Service, key, errMessage string) error {
584
+	// Assume it is safe to modify the service without worrying about changing the local cache
585
+
586
+	if len(errMessage) > 0 {
587
+		utilruntime.HandleError(fmt.Errorf(errMessage))
588
+	} else {
589
+		glog.V(5).Infof("Attempting to clear persisted allocation for service: %v", key)
590
+	}
591
+
592
+	// An ingress ip is only allowed in ExternalIPs when a
593
+	// corresponding status exists, so update the spec first to avoid
594
+	// failing admission control.
595
+	ingressIP := service.Status.LoadBalancer.Ingress[0].IP
596
+	for i, ip := range service.Spec.ExternalIPs {
597
+		if ip == ingressIP {
598
+			glog.V(5).Infof("Removing ip %v from the external ips of service %v", ingressIP, key)
599
+			service.Spec.ExternalIPs = append(service.Spec.ExternalIPs[:i], service.Spec.ExternalIPs[i+1:]...)
600
+			if err := ic.persistServiceSpec(service); err != nil {
601
+				return err
602
+			}
603
+			break
604
+		}
605
+	}
606
+
607
+	service.Status.LoadBalancer = kapi.LoadBalancerStatus{}
608
+	glog.V(5).Infof("Clearing the load balancer status of service: %v", key)
609
+	return ic.persistServiceStatus(service)
610
+}
611
+
612
+// ensureExternalIP ensures that the provided service has the ingress
613
+// ip persisted as an external ip.
614
+func (ic *IngressIPController) ensureExternalIP(service *kapi.Service, key, ingressIP string) error {
615
+	// Assume it is safe to modify the service without worrying about changing the local cache
616
+
617
+	ipExists := false
618
+	for _, ip := range service.Spec.ExternalIPs {
619
+		if ip == ingressIP {
620
+			ipExists = true
621
+			glog.V(6).Infof("Service %v already has ip %v as an external ip", key, ingressIP)
622
+			break
623
+		}
624
+	}
625
+	if !ipExists {
626
+		service.Spec.ExternalIPs = append(service.Spec.ExternalIPs, ingressIP)
627
+		glog.V(5).Infof("Adding ip %v to service %v as an external ip", ingressIP, key)
628
+		return ic.persistServiceSpec(service)
629
+	}
630
+	return nil
631
+}
632
+
633
+// allocateIP attempts to allocate the requested ip, and if that is
634
+// not possible, allocates the next available address.
635
+func (ic *IngressIPController) allocateIP(requestedIP string) (net.IP, error) {
636
+	if len(requestedIP) == 0 {
637
+		// Specific ip not requested
638
+		return ic.ipAllocator.AllocateNext()
639
+	}
640
+	var ip net.IP
641
+	if ip = net.ParseIP(requestedIP); ip == nil {
642
+		// Invalid ip
643
+		return ic.ipAllocator.AllocateNext()
644
+	}
645
+	if err := ic.ipAllocator.Allocate(ip); err != nil {
646
+		// Unable to allocate requested ip
647
+		return ic.ipAllocator.AllocateNext()
648
+	}
649
+	// Allocated requested ip
650
+	return ip, nil
651
+}
652
+
653
+func (ic *IngressIPController) persistServiceSpec(service *kapi.Service) error {
654
+	return ic.persistenceHandler(ic.client, service, false)
655
+}
656
+
657
+func (ic *IngressIPController) persistServiceStatus(service *kapi.Service) error {
658
+	return ic.persistenceHandler(ic.client, service, true)
659
+}
660
+
661
+func persistService(client kclient.ServicesNamespacer, service *kapi.Service, targetStatus bool) error {
662
+	backoff := wait.Backoff{
663
+		Steps:    clientRetryCount,
664
+		Duration: clientRetryInterval,
665
+	}
666
+	return wait.ExponentialBackoff(backoff, func() (bool, error) {
667
+		var err error
668
+		if targetStatus {
669
+			_, err = client.Services(service.Namespace).UpdateStatus(service)
670
+		} else {
671
+			_, err = client.Services(service.Namespace).Update(service)
672
+		}
673
+		switch {
674
+		case err == nil:
675
+			return true, nil
676
+		case kerrors.IsNotFound(err):
677
+			// If the service no longer exists, we don't want to recreate
678
+			// it. Just bail out so that we can process the delete, which
679
+			// we should soon be receiving if we haven't already.
680
+			glog.V(5).Infof("Not persisting update to service '%s/%s' that no longer exists: %v",
681
+				service.Namespace, service.Name, err)
682
+			return true, nil
683
+		case kerrors.IsConflict(err):
684
+			// TODO: Try to resolve the conflict if the change was
685
+			// unrelated to load balancer status. For now, just rely on
686
+			// the fact that we'll also process the update that caused the
687
+			// resource version to change.
688
+			glog.V(5).Infof("Not persisting update to service '%s/%s' that has been changed since we received it: %v",
689
+				service.Namespace, service.Name, err)
690
+			return true, nil
691
+		default:
692
+			err = fmt.Errorf("Failed to persist updated LoadBalancerStatus to service '%s/%s': %v",
693
+				service.Namespace, service.Name, err)
694
+			return false, err
695
+		}
696
+	})
697
+}
0 698
new file mode 100644
... ...
@@ -0,0 +1,610 @@
0
+package ingressip
1
+
2
+import (
3
+	"errors"
4
+	"fmt"
5
+	"net"
6
+	"reflect"
7
+	"testing"
8
+	"time"
9
+
10
+	kapi "k8s.io/kubernetes/pkg/api"
11
+	kclient "k8s.io/kubernetes/pkg/client/unversioned"
12
+	ktestclient "k8s.io/kubernetes/pkg/client/unversioned/testclient"
13
+	"k8s.io/kubernetes/pkg/registry/service/ipallocator"
14
+	"k8s.io/kubernetes/pkg/runtime"
15
+	"k8s.io/kubernetes/pkg/util/workqueue"
16
+	"k8s.io/kubernetes/pkg/watch"
17
+)
18
+
19
+const namespace = "ns"
20
+
21
+func newController(t *testing.T, client *ktestclient.Fake) *IngressIPController {
22
+	_, ipNet, err := net.ParseCIDR("172.16.0.12/28")
23
+	if err != nil {
24
+		t.Fatalf("unexpected error: %v", err)
25
+	}
26
+	return NewIngressIPController(client, ipNet, 10*time.Minute)
27
+}
28
+
29
+func controllerSetup(t *testing.T, startingObjects []runtime.Object) (*ktestclient.Fake, *watch.FakeWatcher, *IngressIPController) {
30
+	client := ktestclient.NewSimpleFake(startingObjects...)
31
+
32
+	fakeWatch := watch.NewFake()
33
+	client.PrependWatchReactor("*", ktestclient.DefaultWatchReactor(fakeWatch, nil))
34
+
35
+	client.PrependReactor("create", "*", func(action ktestclient.Action) (handled bool, ret runtime.Object, err error) {
36
+		obj := action.(ktestclient.CreateAction).GetObject()
37
+		fakeWatch.Add(obj)
38
+		return true, obj, nil
39
+	})
40
+
41
+	// Ensure that updates the controller makes are passed through to the watcher.
42
+	client.PrependReactor("update", "*", func(action ktestclient.Action) (handled bool, ret runtime.Object, err error) {
43
+		obj := action.(ktestclient.CreateAction).GetObject()
44
+		fakeWatch.Modify(obj)
45
+		return true, obj, nil
46
+	})
47
+
48
+	controller := newController(t, client)
49
+
50
+	return client, fakeWatch, controller
51
+}
52
+
53
+func newService(name, ip string, typeLoadBalancer bool) *kapi.Service {
54
+	serviceType := kapi.ServiceTypeClusterIP
55
+	if typeLoadBalancer {
56
+		serviceType = kapi.ServiceTypeLoadBalancer
57
+	}
58
+	service := &kapi.Service{
59
+		ObjectMeta: kapi.ObjectMeta{
60
+			Namespace: namespace,
61
+			Name:      name,
62
+		},
63
+		Spec: kapi.ServiceSpec{
64
+			Type: serviceType,
65
+		},
66
+	}
67
+	if len(ip) > 0 {
68
+		service.Status = kapi.ServiceStatus{
69
+			LoadBalancer: kapi.LoadBalancerStatus{
70
+				Ingress: []kapi.LoadBalancerIngress{
71
+					{
72
+						IP: ip,
73
+					},
74
+				},
75
+			},
76
+		}
77
+	}
78
+	return service
79
+}
80
+
81
+func TestProcessInitialSync(t *testing.T) {
82
+	c := newController(t, nil)
83
+
84
+	allocatedKey := "lb-allocated"
85
+	allocatedIP := "172.16.0.1"
86
+	services := []*kapi.Service{
87
+		newService("regular", "", false),
88
+		newService(allocatedKey, allocatedIP, true),
89
+		newService("lb-reallocate", "foo", true),
90
+		newService("lb-unallocated", "", true),
91
+	}
92
+	for _, service := range services {
93
+		c.enqueueChange(service, nil)
94
+		c.cache.Add(service)
95
+	}
96
+	// Queue a change without caching it to validate that it is ignored
97
+	c.enqueueChange(newService("ignored", "", true), nil)
98
+
99
+	// Enqueue post-sync changes to validate that they are added back
100
+	// to the queue without being processed.
101
+	postSyncUpdate := services[0]
102
+	c.enqueueChange(postSyncUpdate, postSyncUpdate)
103
+	c.cache.Update(postSyncUpdate)
104
+	postSyncAddition := newService("lb-post-sync-addition", "", true)
105
+	c.enqueueChange(postSyncAddition, nil)
106
+	c.cache.Add(postSyncAddition)
107
+
108
+	c.processInitialSync()
109
+
110
+	// Validate allocation
111
+	expectedMap := map[string]string{
112
+		allocatedIP: fmt.Sprintf("%s/%s", namespace, allocatedKey),
113
+	}
114
+	if !reflect.DeepEqual(c.allocationMap, expectedMap) {
115
+		t.Errorf("Expected allocation map %v, got %v", expectedMap, c.allocationMap)
116
+	}
117
+	if !c.ipAllocator.Has(net.ParseIP(allocatedIP)) {
118
+		t.Errorf("IP %v was not marked as allocated", allocatedIP)
119
+	}
120
+
121
+	// Validate queue contents
122
+	expectedQueueLength := 5 // 3 from initial sync, 2 from post-sync changes
123
+	if c.queue.Len() != expectedQueueLength {
124
+		t.Errorf("Expected queue length of %d, got %d", expectedQueueLength, c.queue.Len())
125
+	}
126
+}
127
+
128
+func TestWorkRequeuesWhenFull(t *testing.T) {
129
+	tests := []struct {
130
+		testName        string
131
+		requeuedChange  bool
132
+		requeuedService bool
133
+		requeued        bool
134
+	}{
135
+		{
136
+			testName: "Previously requeued change should be requeued",
137
+			requeued: true,
138
+		},
139
+		{
140
+			testName:        "The only pending allocation should be requeued",
141
+			requeuedChange:  true,
142
+			requeuedService: true,
143
+			requeued:        true,
144
+		},
145
+		{
146
+			testName:        "Already requeued allocation should not be requeued",
147
+			requeuedService: true,
148
+			requeued:        false,
149
+		},
150
+	}
151
+	for _, test := range tests {
152
+		c := newController(t, nil)
153
+		c.changeHandler = func(change *serviceChange) error {
154
+			return ipallocator.ErrFull
155
+		}
156
+		// Use a queue with no delay to avoid timing issues
157
+		c.queue = workqueue.NewRateLimitingQueue(workqueue.NewMaxOfRateLimiter())
158
+		change := &serviceChange{
159
+			key:                "foo",
160
+			requeuedAllocation: test.requeuedChange,
161
+		}
162
+		if test.requeuedService {
163
+			c.requeuedAllocations.Insert(change.key)
164
+		}
165
+		c.queue.Add(change)
166
+
167
+		c.work()
168
+
169
+		requeued := (c.queue.Len() == 1)
170
+		if test.requeued != requeued {
171
+			t.Errorf("Expected requeued == %v, got %v", test.requeued, requeued)
172
+		}
173
+	}
174
+}
175
+
176
+func TestProcessChange(t *testing.T) {
177
+	tests := []struct {
178
+		testName    string
179
+		ip          string
180
+		lb          bool
181
+		deleted     bool
182
+		allocatedIP string
183
+		ipAllocated bool
184
+	}{
185
+		{
186
+			testName: "Deleted service",
187
+			deleted:  true,
188
+		},
189
+		{
190
+			testName:    "Existing allocation",
191
+			ip:          "172.16.0.1",
192
+			lb:          true,
193
+			allocatedIP: "172.16.0.1",
194
+		},
195
+		{
196
+			testName:    "Needs allocation",
197
+			lb:          true,
198
+			ipAllocated: true,
199
+		},
200
+		{
201
+			testName: "Needs deallocation",
202
+			ip:       "172.16.0.1",
203
+			lb:       false,
204
+		},
205
+	}
206
+	for _, test := range tests {
207
+		c := newController(t, nil)
208
+		c.persistenceHandler = func(client kclient.ServicesNamespacer, service *kapi.Service, targetStatus bool) error {
209
+			return nil
210
+		}
211
+		s := newService("svc", test.ip, test.lb)
212
+		if !test.deleted {
213
+			c.cache.Add(s)
214
+		}
215
+		key := fmt.Sprintf("%s/%s", namespace, s.Name)
216
+		addAllocation := len(test.ip) > 0 && len(test.allocatedIP) == 0
217
+		if addAllocation {
218
+			c.allocationMap[test.ip] = key
219
+		}
220
+		change := &serviceChange{key: key}
221
+
222
+		freeBefore := c.ipAllocator.Free()
223
+
224
+		c.processChange(change)
225
+
226
+		switch {
227
+		case len(test.allocatedIP) > 0:
228
+			if _, ok := c.allocationMap[test.allocatedIP]; !ok {
229
+				t.Errorf("%s: %v was not allocated as expected", test.testName, test.allocatedIP)
230
+			}
231
+		case test.ipAllocated:
232
+			if freeBefore == c.ipAllocator.Free() {
233
+				t.Errorf("%s: ip was not allocated", test.testName)
234
+			}
235
+		case len(test.ip) > 0:
236
+			if _, ok := c.allocationMap[test.ip]; ok {
237
+				t.Errorf("%s: %v was not deallocated as expected", test.testName, test.ip)
238
+			}
239
+		}
240
+	}
241
+}
242
+
243
+func TestClearOldAllocation(t *testing.T) {
244
+	tests := []struct {
245
+		testName string
246
+		oldIP    string
247
+		newIP    string
248
+		cleared  bool
249
+	}{
250
+		{
251
+			testName: "No old allocation",
252
+			oldIP:    "",
253
+			newIP:    "foo",
254
+		},
255
+		{
256
+			testName: "Unchanged allocation",
257
+			oldIP:    "172.16.0.1",
258
+			newIP:    "172.16.0.1",
259
+		},
260
+		{
261
+			testName: "Old allocation should be cleared",
262
+			oldIP:    "172.16.0.1",
263
+			newIP:    "172.16.0.2",
264
+			cleared:  true,
265
+		},
266
+	}
267
+	for _, test := range tests {
268
+		c := newController(t, nil)
269
+		new := newService("new", test.newIP, true)
270
+		old := newService("old", test.oldIP, true)
271
+		if cleared := c.clearOldAllocation(new, old); test.cleared != cleared {
272
+			t.Errorf("%s: expected cleared %v, got %v", test.testName, test.cleared, cleared)
273
+		}
274
+	}
275
+}
276
+
277
+func TestRecordAllocationReallocates(t *testing.T) {
278
+	c := newController(t, nil)
279
+	var persisted *kapi.Service
280
+	// Keep track of the last-persisted service
281
+	c.persistenceHandler = func(client kclient.ServicesNamespacer, service *kapi.Service, targetStatus bool) error {
282
+		persisted = service
283
+		return nil
284
+	}
285
+	s := newService("bad-ip", "foo", true)
286
+	key := fmt.Sprintf("%s/%s", namespace, s.Name)
287
+	err := c.recordAllocation(s, key)
288
+	if err != nil {
289
+		t.Fatalf("Unexpected error: %v", err)
290
+	}
291
+	if persisted == nil {
292
+		t.Errorf("Service was not persisted")
293
+	}
294
+	if len(c.allocationMap) != 1 {
295
+		t.Errorf("Service ip was not reallocated")
296
+	}
297
+	if ingress := persisted.Status.LoadBalancer.Ingress; len(ingress) == 0 {
298
+		t.Errorf("Ingress ip was not persisted")
299
+	}
300
+}
301
+
302
+func TestAllocateReleasesOnPersistenceFailure(t *testing.T) {
303
+	c := newController(t, nil)
304
+	expectedFree := c.ipAllocator.Free()
305
+	expectedErr := errors.New("Persistence failure")
306
+	c.persistenceHandler = func(client kclient.ServicesNamespacer, service *kapi.Service, targetStatus bool) error {
307
+		return expectedErr
308
+	}
309
+	s := newService("svc", "", true)
310
+	key := fmt.Sprintf("%s/%s", namespace, s.Name)
311
+	err := c.allocate(s, key)
312
+	if !reflect.DeepEqual(expectedErr, err) {
313
+		t.Fatalf("Expected err %v, got %v", expectedErr, err)
314
+	}
315
+	if expectedFree != c.ipAllocator.Free() {
316
+		t.Fatalf("IP wasn't released on error")
317
+	}
318
+}
319
+
320
+func TestClearLocalAllocation(t *testing.T) {
321
+	tests := []struct {
322
+		testName     string
323
+		key          string
324
+		ip           string
325
+		allocatedKey string
326
+		cleared      bool
327
+	}{
328
+		{
329
+			testName: "Invalid ip",
330
+			ip:       "foo",
331
+		},
332
+		{
333
+			testName: "IP not allocated",
334
+			ip:       "172.16.0.1",
335
+		},
336
+		{
337
+			testName:     "IP not allocated to service",
338
+			key:          "foo",
339
+			ip:           "172.16.0.1",
340
+			allocatedKey: "bar",
341
+		},
342
+		{
343
+			testName:     "Local ip allocation cleared",
344
+			key:          "foo",
345
+			ip:           "172.16.0.1",
346
+			allocatedKey: "foo",
347
+			cleared:      true,
348
+		},
349
+	}
350
+	for _, test := range tests {
351
+		c := newController(t, nil)
352
+		if len(test.allocatedKey) > 0 {
353
+			c.allocationMap[test.ip] = test.allocatedKey
354
+			c.ipAllocator.Allocate(net.ParseIP(test.ip))
355
+		}
356
+		if cleared := c.clearLocalAllocation(test.key, test.ip); test.cleared != cleared {
357
+			t.Errorf("%s: expected cleared %v, got %v", test.testName, test.cleared, cleared)
358
+		} else if cleared {
359
+			if _, ok := c.allocationMap[test.ip]; ok {
360
+				t.Errorf("%s: allocation map was not cleared", test.testName)
361
+			}
362
+			if c.ipAllocator.Has(net.ParseIP(test.ip)) {
363
+				t.Errorf("%s: ip %v is still allocated", test.testName, test.ip)
364
+			}
365
+		}
366
+	}
367
+}
368
+
369
+func TestEnsureExternalIPRespectsNonIngress(t *testing.T) {
370
+	c := newController(t, nil)
371
+	c.persistenceHandler = func(client kclient.ServicesNamespacer, service *kapi.Service, targetStatus bool) error {
372
+		return nil
373
+	}
374
+	ingressIP := "172.16.0.1"
375
+	s := newService("foo", ingressIP, true)
376
+	externalIP := "172.16.1.1"
377
+	s.Spec.ExternalIPs = append(s.Spec.ExternalIPs, externalIP)
378
+	c.ensureExternalIP(s, s.Name, ingressIP)
379
+	expectedExternalIPs := []string{externalIP, ingressIP}
380
+	externalIPs := s.Spec.ExternalIPs
381
+	if !reflect.DeepEqual(expectedExternalIPs, externalIPs) {
382
+		t.Errorf("Expected ExternalIPs %v, got %v", expectedExternalIPs, externalIPs)
383
+	}
384
+}
385
+
386
+func TestAllocateIP(t *testing.T) {
387
+	tests := []struct {
388
+		testName    string
389
+		requestedIP string
390
+		allocated   bool
391
+		asRequested bool
392
+	}{
393
+		{
394
+			testName:    "No requested ip",
395
+			requestedIP: "",
396
+			asRequested: false,
397
+		},
398
+		{
399
+			testName:    "Invalid ip",
400
+			requestedIP: "foo",
401
+			asRequested: false,
402
+		},
403
+		{
404
+			testName:    "IP not available",
405
+			requestedIP: "172.16.0.1",
406
+			allocated:   true,
407
+			asRequested: false,
408
+		},
409
+		{
410
+			testName:    "Available",
411
+			requestedIP: "172.16.0.1",
412
+			asRequested: true,
413
+		},
414
+	}
415
+	for _, test := range tests {
416
+		controller := newController(t, nil)
417
+		if test.allocated {
418
+			ip := net.ParseIP(test.requestedIP)
419
+			controller.ipAllocator.Allocate(ip)
420
+		}
421
+		// Expect no error for these
422
+		ip, err := controller.allocateIP(test.requestedIP)
423
+		if err != nil {
424
+			t.Errorf("Unexpected error: %v", err)
425
+		}
426
+		if test.asRequested && ip.String() != test.requestedIP {
427
+			t.Errorf("%s: expected %s but got %s", test.testName, test.requestedIP, ip.String())
428
+		}
429
+		if !test.asRequested && ip.String() == test.requestedIP {
430
+			t.Errorf("%s: did not expect %s", test.testName, test.requestedIP)
431
+		}
432
+	}
433
+}
434
+
435
+func TestRecordLocalAllocation(t *testing.T) {
436
+	key := "svc1"
437
+	ip := "172.16.0.1"
438
+	otherKey := "svc2"
439
+	tests := []struct {
440
+		testName      string
441
+		allocationMap map[string]string
442
+		ip            string
443
+		reallocate    bool
444
+		errExpected   bool
445
+	}{
446
+		{
447
+			testName:    "Invalid ip",
448
+			ip:          "foo",
449
+			reallocate:  true,
450
+			errExpected: true,
451
+		},
452
+		{
453
+			testName: "Allocation exists for service",
454
+			allocationMap: map[string]string{
455
+				ip: key,
456
+			},
457
+			ip: ip,
458
+		},
459
+		{
460
+			testName: "Allocation exists for another service",
461
+			allocationMap: map[string]string{
462
+				ip: otherKey,
463
+			},
464
+			ip:          ip,
465
+			reallocate:  true,
466
+			errExpected: true,
467
+		},
468
+		{
469
+			testName:    "IP not in range",
470
+			ip:          "172.16.1.1",
471
+			reallocate:  true,
472
+			errExpected: true,
473
+		},
474
+		{
475
+			testName: "Allocation successful",
476
+			ip:       "172.16.0.1",
477
+		},
478
+	}
479
+	for _, test := range tests {
480
+		c := newController(t, nil)
481
+		if test.allocationMap != nil {
482
+			c.allocationMap = test.allocationMap
483
+			for ipString := range test.allocationMap {
484
+				c.ipAllocator.Allocate(net.ParseIP(ipString))
485
+			}
486
+		}
487
+
488
+		reallocate, err := c.recordLocalAllocation(key, test.ip)
489
+
490
+		if test.reallocate != reallocate {
491
+			t.Errorf("%s: expected reallocate == %v but got %v", test.testName, test.reallocate, reallocate)
492
+		}
493
+		switch {
494
+		case test.errExpected && (err == nil):
495
+			t.Errorf("%s: expected error but didn't see one", test.testName)
496
+		case !test.errExpected && (err != nil):
497
+			t.Errorf("%s: saw unexpected error: %v", test.testName, err)
498
+		}
499
+
500
+		// Ensure allocation was successfully recorded
501
+		checkAllocation := !test.reallocate && !test.errExpected
502
+		if checkAllocation {
503
+			ipKey, ok := c.allocationMap[test.ip]
504
+			inMap := ok && ipKey == key
505
+			inAllocator := c.ipAllocator.Has(net.ParseIP(test.ip))
506
+			if !(inMap && inAllocator) {
507
+				t.Errorf("%s: allocation not recorded", test.testName)
508
+			}
509
+		}
510
+	}
511
+}
512
+
513
+func TestClearPersistedAllocation(t *testing.T) {
514
+	tests := []struct {
515
+		testName         string
516
+		persistenceError error
517
+		ingressIPCount   int
518
+	}{
519
+		{
520
+			testName:         "Status not cleared if external ip not removed",
521
+			persistenceError: errors.New(""),
522
+			ingressIPCount:   1,
523
+		},
524
+		{
525
+			testName: "Status cleared",
526
+		},
527
+	}
528
+	for _, test := range tests {
529
+		c := newController(t, nil)
530
+		var persistedService *kapi.Service
531
+		c.persistenceHandler = func(client kclient.ServicesNamespacer, service *kapi.Service, targetStatus bool) error {
532
+			// Save the last persisted service
533
+			persistedService = service
534
+			return test.persistenceError
535
+		}
536
+		ip := "172.16.0.1"
537
+		s := newService("svc", ip, true)
538
+		// Add other external ips to ensure they are not affected by controller
539
+		s.Spec.ExternalIPs = []string{"172.16.1.1", ip, "172.16.1.2"}
540
+		key := fmt.Sprintf("%s/%s", namespace, s.Name)
541
+		c.clearPersistedAllocation(s, key, "")
542
+
543
+		expectedExternalIPs := []string{"172.16.1.1", "172.16.1.2"}
544
+		externalIPs := persistedService.Spec.ExternalIPs
545
+		if !reflect.DeepEqual(expectedExternalIPs, externalIPs) {
546
+			t.Errorf("%s: Expected ExternalIPs %v, got %v", test.testName, expectedExternalIPs, externalIPs)
547
+		}
548
+		ingressIPCount := len(persistedService.Status.LoadBalancer.Ingress)
549
+		if test.ingressIPCount != ingressIPCount {
550
+			t.Errorf("%s: Expected %d ingress ips, got %d", test.testName, test.ingressIPCount, ingressIPCount)
551
+		}
552
+	}
553
+}
554
+
555
+// TestBasicControllerFlow validates controller start, initial sync
556
+// processing, and post-sync processing.
557
+func TestBasicControllerFlow(t *testing.T) {
558
+	startingObjects := []runtime.Object{
559
+		newService("lb-unallocated", "", true),
560
+	}
561
+
562
+	stopChannel := make(chan struct{})
563
+	defer close(stopChannel)
564
+
565
+	_, fakeWatch, controller := controllerSetup(t, startingObjects)
566
+
567
+	updated := make(chan bool)
568
+	deleted := make(chan bool)
569
+
570
+	controller.changeHandler = func(change *serviceChange) error {
571
+		defer func() {
572
+			if len(change.key) == 0 {
573
+				deleted <- true
574
+			} else if change.oldService != nil {
575
+				updated <- true
576
+			}
577
+		}()
578
+
579
+		err := controller.processChange(change)
580
+		if err != nil {
581
+			t.Errorf("unexpected error: %v", err)
582
+		}
583
+
584
+		return err
585
+	}
586
+
587
+	go controller.Run(stopChannel)
588
+
589
+	waitForUpdate := func(msg string) {
590
+		t.Logf("waiting for: %v", msg)
591
+		select {
592
+		case <-updated:
593
+		case <-time.After(time.Duration(30 * time.Second)):
594
+			t.Fatalf("failed to see: %v", msg)
595
+		}
596
+	}
597
+
598
+	waitForUpdate("spec update")
599
+	waitForUpdate("status update")
600
+
601
+	fakeWatch.Delete(startingObjects[0])
602
+
603
+	t.Log("waiting for the service to be deleted")
604
+	select {
605
+	case <-deleted:
606
+	case <-time.After(time.Duration(30 * time.Second)):
607
+		t.Fatalf("failed to see expected service deletion")
608
+	}
609
+}
0 610
new file mode 100644
... ...
@@ -0,0 +1,217 @@
0
+package integration
1
+
2
+import (
3
+	"math/rand"
4
+	"net"
5
+	"testing"
6
+	"time"
7
+
8
+	kapi "k8s.io/kubernetes/pkg/api"
9
+	"k8s.io/kubernetes/pkg/client/cache"
10
+	kclient "k8s.io/kubernetes/pkg/client/unversioned"
11
+	"k8s.io/kubernetes/pkg/controller/framework"
12
+	"k8s.io/kubernetes/pkg/runtime"
13
+	"k8s.io/kubernetes/pkg/util/sets"
14
+	"k8s.io/kubernetes/pkg/watch"
15
+
16
+	configapi "github.com/openshift/origin/pkg/cmd/server/api"
17
+	"github.com/openshift/origin/pkg/service/controller/ingressip"
18
+	testutil "github.com/openshift/origin/test/util"
19
+	testserver "github.com/openshift/origin/test/util/server"
20
+)
21
+
22
+const sentinelName = "sentinel"
23
+
24
+// TestIngressIPAllocation validates that ingress ip allocation is
25
+// performed correctly even when multiple controllers are running.
26
+func TestIngressIPAllocation(t *testing.T) {
27
+	testutil.RequireEtcd(t)
28
+
29
+	masterConfig, err := testserver.DefaultMasterOptions()
30
+	if err != nil {
31
+		t.Fatalf("Unexpected error: %v", err)
32
+	}
33
+	masterConfig.NetworkConfig.ExternalIPNetworkCIDRs = []string{"172.16.0.0/24"}
34
+	masterConfig.NetworkConfig.IngressIPNetworkCIDR = "172.16.1.0/24"
35
+	clusterAdminKubeConfig, err := testserver.StartConfiguredMasterWithOptions(masterConfig, testserver.TestOptions{})
36
+	if err != nil {
37
+		t.Fatalf("Unexpected error: %v", err)
38
+	}
39
+	kc, _, err := configapi.GetKubeClient(clusterAdminKubeConfig, &configapi.ClientConnectionOverrides{
40
+		QPS:   20,
41
+		Burst: 50,
42
+	})
43
+	if err != nil {
44
+		t.Fatalf("Unexpected error: %v", err)
45
+	}
46
+
47
+	stopChannel := make(chan struct{})
48
+	defer close(stopChannel)
49
+	received := make(chan bool)
50
+
51
+	rand.Seed(time.Now().UTC().UnixNano())
52
+
53
+	t.Log("start informer to watch for sentinel")
54
+	_, informerController := framework.NewInformer(
55
+		&cache.ListWatch{
56
+			ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
57
+				return kc.Services(kapi.NamespaceAll).List(options)
58
+			},
59
+			WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
60
+				return kc.Services(kapi.NamespaceAll).Watch(options)
61
+			},
62
+		},
63
+		&kapi.Service{},
64
+		time.Minute*10,
65
+		framework.ResourceEventHandlerFuncs{
66
+			UpdateFunc: func(old, cur interface{}) {
67
+				service := cur.(*kapi.Service)
68
+				if service.Name == sentinelName && len(service.Spec.ExternalIPs) > 0 {
69
+					received <- true
70
+				}
71
+			},
72
+		},
73
+	)
74
+	go informerController.Run(stopChannel)
75
+
76
+	t.Log("start generating service events")
77
+	go generateServiceEvents(t, kc)
78
+
79
+	// Start a second controller that will be out of sync with the first
80
+	_, ipNet, err := net.ParseCIDR(masterConfig.NetworkConfig.IngressIPNetworkCIDR)
81
+	c := ingressip.NewIngressIPController(kc, ipNet, 10*time.Minute)
82
+	go c.Run(stopChannel)
83
+
84
+	t.Log("waiting for sentinel to be updated with external ip")
85
+	select {
86
+	case <-received:
87
+	case <-time.After(time.Duration(90 * time.Second)):
88
+		t.Fatal("took too long")
89
+	}
90
+
91
+	// Validate that all services of type load balancer have a unique
92
+	// ingress ip and corresponding external ip.
93
+	services, err := kc.Services(kapi.NamespaceDefault).List(kapi.ListOptions{})
94
+	if err != nil {
95
+		t.Fatalf("Unexpected error: %v", err)
96
+	}
97
+	ips := sets.NewString()
98
+	for _, s := range services.Items {
99
+		typeLoadBalancer := s.Spec.Type == kapi.ServiceTypeLoadBalancer
100
+		hasAllocation := len(s.Status.LoadBalancer.Ingress) > 0
101
+		switch {
102
+		case !typeLoadBalancer && !hasAllocation:
103
+			continue
104
+		case !typeLoadBalancer && hasAllocation:
105
+			t.Errorf("A service not of type load balancer has an ingress ip allocation")
106
+			continue
107
+		case typeLoadBalancer && !hasAllocation:
108
+			t.Errorf("A service of type load balancer has not been allocated an ingress ip")
109
+			continue
110
+		}
111
+		ingressIP := s.Status.LoadBalancer.Ingress[0].IP
112
+		if ips.Has(ingressIP) {
113
+			t.Errorf("One or more services have the same ingress ip")
114
+			continue
115
+		}
116
+		ips.Insert(ingressIP)
117
+		if len(s.Spec.ExternalIPs) == 0 || s.Spec.ExternalIPs[0] != ingressIP {
118
+			t.Errorf("Service does not have the ingress ip as an external ip")
119
+			continue
120
+		}
121
+	}
122
+}
123
+
124
+const (
125
+	createOp = iota
126
+	updateOp
127
+	deleteOp
128
+)
129
+
130
+func generateServiceEvents(t *testing.T, kc kclient.Interface) {
131
+	maxMillisecondInterval := 25
132
+	minServiceCount := 10
133
+	maxOperations := minServiceCount + 30
134
+	var services []*kapi.Service
135
+	for i := 0; i < maxOperations; {
136
+		op := createOp
137
+		if len(services) > minServiceCount {
138
+			op = rand.Intn(deleteOp + 1)
139
+		}
140
+		switch op {
141
+		case createOp:
142
+			typeChoice := rand.Intn(2)
143
+			typeLoadBalancer := false
144
+			if typeChoice == 1 {
145
+				typeLoadBalancer = true
146
+			}
147
+			s, err := createService(kc, "", typeLoadBalancer)
148
+			if err != nil {
149
+				t.Fatalf("unexpected error: %v", err)
150
+			}
151
+			services = append(services, s)
152
+			t.Logf("Added service %s", s.Name)
153
+		case updateOp:
154
+			targetIndex := rand.Intn(len(services))
155
+			name := services[targetIndex].Name
156
+			s, err := kc.Services(kapi.NamespaceDefault).Get(name)
157
+			if err != nil {
158
+				continue
159
+			}
160
+			// Flip the service type
161
+			if s.Spec.Type == kapi.ServiceTypeLoadBalancer {
162
+				s.Spec.Type = kapi.ServiceTypeClusterIP
163
+				s.Spec.Ports[0].NodePort = 0
164
+			} else {
165
+				s.Spec.Type = kapi.ServiceTypeLoadBalancer
166
+			}
167
+			s, err = kc.Services(kapi.NamespaceDefault).Update(s)
168
+			if err != nil {
169
+				continue
170
+			}
171
+			t.Logf("Updated service %s", name)
172
+		case deleteOp:
173
+			targetIndex := rand.Intn(len(services))
174
+			name := services[targetIndex].Name
175
+			err := kc.Services(kapi.NamespaceDefault).Delete(name)
176
+			if err != nil {
177
+				continue
178
+			}
179
+			services = append(services[:targetIndex], services[targetIndex+1:]...)
180
+			t.Logf("Deleted service %s", name)
181
+		}
182
+		i++
183
+		time.Sleep(time.Duration(rand.Intn(maxMillisecondInterval)) * time.Millisecond)
184
+	}
185
+
186
+	// Create one last service to serve as a sentinel. The service
187
+	// will be created after a slight delay so that it can be assured
188
+	// of being the last service a controller will see, and with a
189
+	// known name so its processing can be detected.
190
+	time.Sleep(time.Millisecond * 100)
191
+	_, err := createService(kc, sentinelName, true)
192
+	if err != nil {
193
+		t.Fatalf("unexpected error: %v", err)
194
+	}
195
+}
196
+
197
+func createService(kc kclient.Interface, name string, typeLoadBalancer bool) (*kapi.Service, error) {
198
+	serviceType := kapi.ServiceTypeClusterIP
199
+	if typeLoadBalancer {
200
+		serviceType = kapi.ServiceTypeLoadBalancer
201
+	}
202
+	service := &kapi.Service{
203
+		ObjectMeta: kapi.ObjectMeta{
204
+			GenerateName: "service-",
205
+			Name:         name,
206
+		},
207
+		Spec: kapi.ServiceSpec{
208
+			Type: serviceType,
209
+			Ports: []kapi.ServicePort{{
210
+				Protocol: "TCP",
211
+				Port:     8080,
212
+			}},
213
+		},
214
+	}
215
+	return kc.Services(kapi.NamespaceDefault).Create(service)
216
+}
... ...
@@ -2894,6 +2894,43 @@ items:
2894 2894
   kind: ClusterRole
2895 2895
   metadata:
2896 2896
     creationTimestamp: null
2897
+    name: system:service-ingress-ip-controller
2898
+  rules:
2899
+  - apiGroups:
2900
+    - ""
2901
+    attributeRestrictions: null
2902
+    resources:
2903
+    - services
2904
+    verbs:
2905
+    - list
2906
+    - watch
2907
+  - apiGroups:
2908
+    - ""
2909
+    attributeRestrictions: null
2910
+    resources:
2911
+    - services
2912
+    verbs:
2913
+    - update
2914
+  - apiGroups:
2915
+    - ""
2916
+    attributeRestrictions: null
2917
+    resources:
2918
+    - services/status
2919
+    verbs:
2920
+    - update
2921
+  - apiGroups:
2922
+    - ""
2923
+    attributeRestrictions: null
2924
+    resources:
2925
+    - events
2926
+    verbs:
2927
+    - create
2928
+    - patch
2929
+    - update
2930
+- apiVersion: v1
2931
+  kind: ClusterRole
2932
+  metadata:
2933
+    creationTimestamp: null
2897 2934
     name: system:service-load-balancer-controller
2898 2935
   rules:
2899 2936
   - apiGroups: