Browse code

Drop the SDN endpoint filter pod watch

We no longer need to do any filtering of cluster network / service
network IPs, because the endpoint admission controller does it for us.

Dan Winship authored on 2016/07/14 01:09:17
Showing 2 changed files
... ...
@@ -3,8 +3,6 @@ package plugin
3 3
 import (
4 4
 	"fmt"
5 5
 	"net"
6
-	"strings"
7
-	"sync"
8 6
 
9 7
 	"github.com/golang/glog"
10 8
 
... ...
@@ -26,10 +24,8 @@ type proxyFirewallItem struct {
26 26
 }
27 27
 
28 28
 type ovsProxyPlugin struct {
29
-	registry  *Registry
30
-	podsByIP  map[string]*kapi.Pod
31
-	podsMutex sync.Mutex
32
-	firewall  map[string][]proxyFirewallItem
29
+	registry *Registry
30
+	firewall map[string][]proxyFirewallItem
33 31
 
34 32
 	baseEndpointsHandler pconfig.EndpointsConfigHandler
35 33
 }
... ...
@@ -42,7 +38,6 @@ func NewProxyPlugin(pluginName string, osClient *osclient.Client, kClient *kclie
42 42
 
43 43
 	return &ovsProxyPlugin{
44 44
 		registry: newRegistry(osClient, kClient),
45
-		podsByIP: make(map[string]*kapi.Pod),
46 45
 		firewall: make(map[string][]proxyFirewallItem),
47 46
 	}, nil
48 47
 }
... ...
@@ -52,12 +47,6 @@ func (proxy *ovsProxyPlugin) Start(baseHandler pconfig.EndpointsConfigHandler) e
52 52
 
53 53
 	proxy.baseEndpointsHandler = baseHandler
54 54
 
55
-	// Populate pod info map synchronously so that kube proxy can filter endpoints to support isolation
56
-	pods, err := proxy.registry.GetAllPods()
57
-	if err != nil {
58
-		return err
59
-	}
60
-
61 55
 	policies, err := proxy.registry.GetEgressNetworkPolicies()
62 56
 	if err != nil {
63 57
 		return fmt.Errorf("Could not get EgressNetworkPolicies: %s", err)
... ...
@@ -66,13 +55,7 @@ func (proxy *ovsProxyPlugin) Start(baseHandler pconfig.EndpointsConfigHandler) e
66 66
 		proxy.updateNetworkPolicy(policy)
67 67
 	}
68 68
 
69
-	for _, pod := range pods {
70
-		proxy.trackPod(&pod)
71
-	}
72
-
73
-	go utilwait.Forever(proxy.watchPods, 0)
74 69
 	go utilwait.Forever(proxy.watchEgressNetworkPolicies, 0)
75
-
76 70
 	return nil
77 71
 }
78 72
 
... ...
@@ -113,74 +96,6 @@ func (proxy *ovsProxyPlugin) updateNetworkPolicy(policy osapi.EgressNetworkPolic
113 113
 	}
114 114
 }
115 115
 
116
-func (proxy *ovsProxyPlugin) watchPods() {
117
-	eventQueue := proxy.registry.RunEventQueue(Pods)
118
-
119
-	for {
120
-		eventType, obj, err := eventQueue.Pop()
121
-		if err != nil {
122
-			utilruntime.HandleError(fmt.Errorf("EventQueue failed for pods: %v", err))
123
-			return
124
-		}
125
-		pod := obj.(*kapi.Pod)
126
-
127
-		glog.V(5).Infof("Watch %s event for Pod %q", strings.Title(string(eventType)), pod.ObjectMeta.Name)
128
-		switch eventType {
129
-		case watch.Added, watch.Modified:
130
-			proxy.trackPod(pod)
131
-		case watch.Deleted:
132
-			proxy.unTrackPod(pod)
133
-		}
134
-	}
135
-}
136
-
137
-func (proxy *ovsProxyPlugin) getTrackedPod(ip string) (*kapi.Pod, bool) {
138
-	proxy.podsMutex.Lock()
139
-	defer proxy.podsMutex.Unlock()
140
-
141
-	pod, ok := proxy.podsByIP[ip]
142
-	return pod, ok
143
-}
144
-
145
-func (proxy *ovsProxyPlugin) trackPod(pod *kapi.Pod) {
146
-	if pod.Status.PodIP == "" {
147
-		return
148
-	}
149
-
150
-	proxy.podsMutex.Lock()
151
-	defer proxy.podsMutex.Unlock()
152
-	podInfo, ok := proxy.podsByIP[pod.Status.PodIP]
153
-
154
-	if pod.Status.Phase == kapi.PodPending || pod.Status.Phase == kapi.PodRunning {
155
-		// When a pod hits one of the states where the IP is in use then
156
-		// we need to add it to our IP -> namespace tracker.  There _should_ be no
157
-		// other entries for the IP if we caught all of the right messages, so warn
158
-		// if we see one, but clobber it anyway since the IPAM
159
-		// should ensure that each IP is uniquely assigned to a pod (when running)
160
-		if ok && podInfo.UID != pod.UID {
161
-			glog.Warningf("IP '%s' was marked as used by namespace '%s' (pod '%s')... updating to namespace '%s' (pod '%s')",
162
-				pod.Status.PodIP, podInfo.Namespace, podInfo.UID, pod.ObjectMeta.Namespace, pod.UID)
163
-		}
164
-
165
-		proxy.podsByIP[pod.Status.PodIP] = pod
166
-	} else if ok && podInfo.UID == pod.UID {
167
-		// If the UIDs match, then this pod is moving to a state that indicates it is not running
168
-		// so we need to remove it from the cache
169
-		delete(proxy.podsByIP, pod.Status.PodIP)
170
-	}
171
-}
172
-
173
-func (proxy *ovsProxyPlugin) unTrackPod(pod *kapi.Pod) {
174
-	proxy.podsMutex.Lock()
175
-	defer proxy.podsMutex.Unlock()
176
-
177
-	// Only delete if the pod ID is the one we are tracking (in case there is a failed or complete
178
-	// pod lying around that gets deleted while there is a running pod with the same IP)
179
-	if podInfo, ok := proxy.podsByIP[pod.Status.PodIP]; ok && podInfo.UID == pod.UID {
180
-		delete(proxy.podsByIP, pod.Status.PodIP)
181
-	}
182
-}
183
-
184 116
 func (proxy *ovsProxyPlugin) firewallBlocksIP(namespace string, ip net.IP) bool {
185 117
 	for _, item := range proxy.firewall[namespace] {
186 118
 		if item.net.Contains(ip) {
... ...
@@ -191,6 +106,11 @@ func (proxy *ovsProxyPlugin) firewallBlocksIP(namespace string, ip net.IP) bool
191 191
 }
192 192
 
193 193
 func (proxy *ovsProxyPlugin) OnEndpointsUpdate(allEndpoints []kapi.Endpoints) {
194
+	if len(proxy.firewall) == 0 {
195
+		proxy.baseEndpointsHandler.OnEndpointsUpdate(allEndpoints)
196
+		return
197
+	}
198
+
194 199
 	ni, err := proxy.registry.GetNetworkInfo()
195 200
 	if err != nil {
196 201
 		glog.Warningf("Error fetching network information: %v", err)
... ...
@@ -205,20 +125,7 @@ EndpointLoop:
205 205
 		for _, ss := range ep.Subsets {
206 206
 			for _, addr := range ss.Addresses {
207 207
 				IP := net.ParseIP(addr.IP)
208
-				if ni.ServiceNetwork.Contains(IP) {
209
-					glog.Warningf("Service '%s' in namespace '%s' has an Endpoint inside the service network (%s)", ep.ObjectMeta.Name, ns, addr.IP)
210
-					continue EndpointLoop
211
-				} else if ni.ClusterNetwork.Contains(IP) {
212
-					podInfo, ok := proxy.getTrackedPod(addr.IP)
213
-					if !ok {
214
-						glog.Warningf("Service '%s' in namespace '%s' has an Endpoint pointing to non-existent pod (%s)", ep.ObjectMeta.Name, ns, addr.IP)
215
-						continue EndpointLoop
216
-					}
217
-					if podInfo.ObjectMeta.Namespace != ns {
218
-						glog.Warningf("Service '%s' in namespace '%s' has an Endpoint pointing to pod %s in namespace '%s'", ep.ObjectMeta.Name, ns, addr.IP, podInfo.ObjectMeta.Namespace)
219
-						continue EndpointLoop
220
-					}
221
-				} else {
208
+				if !ni.ClusterNetwork.Contains(IP) && !ni.ServiceNetwork.Contains(IP) {
222 209
 					if proxy.firewallBlocksIP(ns, IP) {
223 210
 						glog.Warningf("Service '%s' in namespace '%s' has an Endpoint pointing to firewalled destination (%s)", ep.ObjectMeta.Name, ns, addr.IP)
224 211
 						continue EndpointLoop
... ...
@@ -85,15 +85,6 @@ func (registry *Registry) UpdateSubnet(hs *osapi.HostSubnet) (*osapi.HostSubnet,
85 85
 	return registry.oClient.HostSubnets().Update(hs)
86 86
 }
87 87
 
88
-func (registry *Registry) GetAllPods() ([]kapi.Pod, error) {
89
-	podList, err := registry.kClient.Pods(kapi.NamespaceAll).List(kapi.ListOptions{})
90
-	if err != nil {
91
-		return nil, err
92
-	}
93
-
94
-	return podList.Items, nil
95
-}
96
-
97 88
 func (registry *Registry) GetRunningPods(nodeName, namespace string) ([]kapi.Pod, error) {
98 89
 	fieldSelector := fields.Set{"spec.host": nodeName}.AsSelector()
99 90
 	opts := kapi.ListOptions{