Browse code

Merge pull request #11378 from danwinship/new-isolation-rules

Merged by openshift-bot

OpenShift Bot authored on 2016/12/17 08:31:22
Showing 12 changed files
... ...
@@ -38,19 +38,14 @@ add_ovs_flows() {
38 38
     fi
39 39
 
40 40
     # from container
41
-    ovs-ofctl -O OpenFlow13 add-flow br0 "table=2, priority=100, in_port=${ovs_port}, arp, nw_src=${ipaddr}, arp_sha=${macaddr}, actions=load:${tenant_id}->NXM_NX_REG0[], goto_table:5"
42
-    ovs-ofctl -O OpenFlow13 add-flow br0 "table=2, priority=100, in_port=${ovs_port}, ip, nw_src=${ipaddr}, actions=load:${tenant_id}->NXM_NX_REG0[], goto_table:3"
41
+    ovs-ofctl -O OpenFlow13 add-flow br0 "table=20, priority=100, in_port=${ovs_port}, arp, nw_src=${ipaddr}, arp_sha=${macaddr}, actions=load:${tenant_id}->NXM_NX_REG0[], goto_table:30"
42
+    ovs-ofctl -O OpenFlow13 add-flow br0 "table=20, priority=100, in_port=${ovs_port}, ip, nw_src=${ipaddr}, actions=load:${tenant_id}->NXM_NX_REG0[], goto_table:30"
43 43
 
44 44
     # arp request/response to container (not isolated)
45
-    ovs-ofctl -O OpenFlow13 add-flow br0 "table=6, priority=100, arp, nw_dst=${ipaddr}, actions=output:${ovs_port}"
45
+    ovs-ofctl -O OpenFlow13 add-flow br0 "table=40, priority=100, arp, nw_dst=${ipaddr}, actions=output:${ovs_port}"
46 46
 
47 47
     # IP to container
48
-    if [ $tenant_id = "0" ]; then
49
-	ovs-ofctl -O OpenFlow13 add-flow br0 "table=7, priority=100, ip, nw_dst=${ipaddr}, actions=output:${ovs_port}"
50
-    else
51
-	ovs-ofctl -O OpenFlow13 add-flow br0 "table=7, priority=100, reg0=0, ip, nw_dst=${ipaddr}, actions=output:${ovs_port}"
52
-	ovs-ofctl -O OpenFlow13 add-flow br0 "table=7, priority=100, reg0=${tenant_id}, ip, nw_dst=${ipaddr}, actions=output:${ovs_port}"
53
-    fi
48
+    ovs-ofctl -O OpenFlow13 add-flow br0 "table=70, priority=100, ip, nw_dst=${ipaddr}, actions=load:${tenant_id}->NXM_NX_REG1[], load:${ovs_port}->NXM_NX_REG2[], goto_table:80"
54 49
 
55 50
     # Pod ingress == OVS bridge egress
56 51
     # linux-htb used here since that's the Kubernetes default traffic shaper too
... ...
@@ -25,7 +25,7 @@ import (
25 25
 
26 26
 const (
27 27
 	// rule versioning; increment each time flow rules change
28
-	VERSION        = 1
28
+	VERSION        = 2
29 29
 	VERSION_TABLE  = "table=253"
30 30
 	VERSION_ACTION = "actions=note:"
31 31
 
... ...
@@ -36,16 +36,21 @@ const (
36 36
 	VXLAN_PORT = "4789"
37 37
 )
38 38
 
39
-func getPluginVersion(multitenant bool) []string {
39
+func (plugin *OsdnNode) getPluginVersion() []string {
40 40
 	if VERSION > 254 {
41 41
 		panic("Version too large!")
42 42
 	}
43 43
 	version := fmt.Sprintf("%02X", VERSION)
44
-	if multitenant {
45
-		return []string{"01", version}
46
-	}
47
-	// single-tenant
48
-	return []string{"00", version}
44
+	pluginId := ""
45
+	switch plugin.policy.(type) {
46
+	case *singleTenantPlugin:
47
+		pluginId = "00"
48
+	case *multiTenantPlugin:
49
+		pluginId = "01"
50
+	default:
51
+		panic("Not an OpenShift-SDN plugin")
52
+	}
53
+	return []string{pluginId, version}
49 54
 }
50 55
 
51 56
 func (plugin *OsdnNode) getLocalSubnet() (string, error) {
... ...
@@ -133,7 +138,7 @@ func (plugin *OsdnNode) alreadySetUp(localSubnetGatewayCIDR, clusterNetworkCIDR
133 133
 		// OVS note action format hex bytes separated by '.'; first
134 134
 		// byte is plugin type (multi-tenant/single-tenant) and second
135 135
 		// byte is flow rule version
136
-		expected := getPluginVersion(plugin.multitenant)
136
+		expected := plugin.getPluginVersion()
137 137
 		existing := strings.Split(flow[idx+len(VERSION_ACTION):], ".")
138 138
 		if len(existing) >= 2 && existing[0] == expected[0] && existing[1] == expected[1] {
139 139
 			found = true
... ...
@@ -232,64 +237,68 @@ func (plugin *OsdnNode) SetupSDN() (bool, error) {
232 232
 	otx := plugin.ovs.NewTransaction()
233 233
 	// Table 0: initial dispatch based on in_port
234 234
 	// vxlan0
235
-	otx.AddFlow("table=0, priority=200, in_port=1, arp, nw_src=%s, nw_dst=%s, actions=move:NXM_NX_TUN_ID[0..31]->NXM_NX_REG0[],goto_table:1", clusterNetworkCIDR, localSubnetCIDR)
236
-	otx.AddFlow("table=0, priority=200, in_port=1, ip, nw_src=%s, nw_dst=%s, actions=move:NXM_NX_TUN_ID[0..31]->NXM_NX_REG0[],goto_table:1", clusterNetworkCIDR, localSubnetCIDR)
235
+	otx.AddFlow("table=0, priority=200, in_port=1, arp, nw_src=%s, nw_dst=%s, actions=move:NXM_NX_TUN_ID[0..31]->NXM_NX_REG0[],goto_table:10", clusterNetworkCIDR, localSubnetCIDR)
236
+	otx.AddFlow("table=0, priority=200, in_port=1, ip, nw_src=%s, nw_dst=%s, actions=move:NXM_NX_TUN_ID[0..31]->NXM_NX_REG0[],goto_table:10", clusterNetworkCIDR, localSubnetCIDR)
237 237
 	otx.AddFlow("table=0, priority=150, in_port=1, actions=drop")
238 238
 	// tun0
239
-	otx.AddFlow("table=0, priority=200, in_port=2, arp, nw_src=%s, nw_dst=%s, actions=goto_table:5", localSubnetGateway, clusterNetworkCIDR)
240
-	otx.AddFlow("table=0, priority=200, in_port=2, ip, actions=goto_table:5")
239
+	otx.AddFlow("table=0, priority=200, in_port=2, arp, nw_src=%s, nw_dst=%s, actions=goto_table:30", localSubnetGateway, clusterNetworkCIDR)
240
+	otx.AddFlow("table=0, priority=200, in_port=2, ip, actions=goto_table:30")
241 241
 	otx.AddFlow("table=0, priority=150, in_port=2, actions=drop")
242 242
 	// else, from a container
243
-	otx.AddFlow("table=0, priority=100, arp, actions=goto_table:2")
244
-	otx.AddFlow("table=0, priority=100, ip, actions=goto_table:2")
243
+	otx.AddFlow("table=0, priority=100, arp, actions=goto_table:20")
244
+	otx.AddFlow("table=0, priority=100, ip, actions=goto_table:20")
245 245
 	otx.AddFlow("table=0, priority=0, actions=drop")
246 246
 
247
-	// Table 1: VXLAN ingress filtering; filled in by AddHostSubnetRules()
248
-	// eg, "table=1, priority=100, tun_src=${remote_node_ip}, actions=goto_table:5"
249
-	otx.AddFlow("table=1, priority=0, actions=drop")
247
+	// Table 10: VXLAN ingress filtering; filled in by AddHostSubnetRules()
248
+	// eg, "table=10, priority=100, tun_src=${remote_node_ip}, actions=goto_table:30"
249
+	otx.AddFlow("table=10, priority=0, actions=drop")
250 250
 
251
-	// Table 2: from OpenShift container; validate IP/MAC, assign tenant-id; filled in by openshift-sdn-ovs
252
-	// eg, "table=2, priority=100, in_port=${ovs_port}, arp, nw_src=${ipaddr}, arp_sha=${macaddr}, actions=load:${tenant_id}->NXM_NX_REG0[], goto_table:5"
253
-	//     "table=2, priority=100, in_port=${ovs_port}, ip, nw_src=${ipaddr}, actions=load:${tenant_id}->NXM_NX_REG0[], goto_table:3"
251
+	// Table 20: from OpenShift container; validate IP/MAC, assign tenant-id; filled in by openshift-sdn-ovs
252
+	// eg, "table=20, priority=100, in_port=${ovs_port}, arp, nw_src=${ipaddr}, arp_sha=${macaddr}, actions=load:${tenant_id}->NXM_NX_REG0[], goto_table:30"
253
+	//     "table=20, priority=100, in_port=${ovs_port}, ip, nw_src=${ipaddr}, actions=load:${tenant_id}->NXM_NX_REG0[], goto_table:30"
254 254
 	// (${tenant_id} is always 0 for single-tenant)
255
-	otx.AddFlow("table=2, priority=0, actions=drop")
256
-
257
-	// Table 3: from OpenShift container; service vs non-service
258
-	otx.AddFlow("table=3, priority=100, ip, nw_dst=%s, actions=goto_table:4", serviceNetworkCIDR)
259
-	otx.AddFlow("table=3, priority=0, actions=goto_table:5")
260
-
261
-	// Table 4: from OpenShift container; service dispatch; filled in by AddServiceRules()
262
-	otx.AddFlow("table=4, priority=200, reg0=0, actions=output:2")
263
-	// eg, "table=4, priority=100, reg0=${tenant_id}, ${service_proto}, nw_dst=${service_ip}, tp_dst=${service_port}, actions=output:2"
264
-	otx.AddFlow("table=4, priority=0, actions=drop")
265
-
266
-	// Table 5: general routing
267
-	otx.AddFlow("table=5, priority=300, arp, nw_dst=%s, actions=output:2", localSubnetGateway)
268
-	otx.AddFlow("table=5, priority=300, ip, nw_dst=%s, actions=output:2", localSubnetGateway)
269
-	otx.AddFlow("table=5, priority=200, arp, nw_dst=%s, actions=goto_table:6", localSubnetCIDR)
270
-	otx.AddFlow("table=5, priority=200, ip, nw_dst=%s, actions=goto_table:7", localSubnetCIDR)
271
-	otx.AddFlow("table=5, priority=100, arp, nw_dst=%s, actions=goto_table:8", clusterNetworkCIDR)
272
-	otx.AddFlow("table=5, priority=100, ip, nw_dst=%s, actions=goto_table:8", clusterNetworkCIDR)
273
-	otx.AddFlow("table=5, priority=0, ip, actions=goto_table:9")
274
-	otx.AddFlow("table=5, priority=0, arp, actions=drop")
275
-
276
-	// Table 6: ARP to container, filled in by openshift-sdn-ovs
277
-	// eg, "table=6, priority=100, arp, nw_dst=${container_ip}, actions=output:${ovs_port}"
278
-	otx.AddFlow("table=6, priority=0, actions=drop")
279
-
280
-	// Table 7: IP to container; filled in by openshift-sdn-ovs
281
-	// eg, "table=7, priority=100, reg0=0, ip, nw_dst=${ipaddr}, actions=output:${ovs_port}"
282
-	// eg, "table=7, priority=100, reg0=${tenant_id}, ip, nw_dst=${ipaddr}, actions=output:${ovs_port}"
283
-	otx.AddFlow("table=7, priority=0, actions=drop")
284
-
285
-	// Table 8: to remote container; filled in by AddHostSubnetRules()
286
-	// eg, "table=8, priority=100, arp, nw_dst=${remote_subnet_cidr}, actions=move:NXM_NX_REG0[]->NXM_NX_TUN_ID[0..31], set_field:${remote_node_ip}->tun_dst,output:1"
287
-	// eg, "table=8, priority=100, ip, nw_dst=${remote_subnet_cidr}, actions=move:NXM_NX_REG0[]->NXM_NX_TUN_ID[0..31], set_field:${remote_node_ip}->tun_dst,output:1"
288
-	otx.AddFlow("table=8, priority=0, actions=drop")
289
-
290
-	// Table 9: egress network policy dispatch; edited by updateEgressNetworkPolicyRules()
291
-	// eg, "table=9, reg0=${tenant_id}, priority=2, ip, nw_dst=${external_cidr}, actions=drop
292
-	otx.AddFlow("table=9, priority=0, actions=output:2")
255
+	otx.AddFlow("table=20, priority=0, actions=drop")
256
+
257
+	// Table 30: general routing
258
+	otx.AddFlow("table=30, priority=300, arp, nw_dst=%s, actions=output:2", localSubnetGateway)
259
+	otx.AddFlow("table=30, priority=200, arp, nw_dst=%s, actions=goto_table:40", localSubnetCIDR)
260
+	otx.AddFlow("table=30, priority=100, arp, nw_dst=%s, actions=goto_table:50", clusterNetworkCIDR)
261
+	otx.AddFlow("table=30, priority=300, ip, nw_dst=%s, actions=output:2", localSubnetGateway)
262
+	otx.AddFlow("table=30, priority=100, ip, nw_dst=%s, actions=goto_table:60", serviceNetworkCIDR)
263
+	otx.AddFlow("table=30, priority=200, ip, nw_dst=%s, actions=goto_table:70", localSubnetCIDR)
264
+	otx.AddFlow("table=30, priority=100, ip, nw_dst=%s, actions=goto_table:90", clusterNetworkCIDR)
265
+	otx.AddFlow("table=30, priority=0, ip, actions=goto_table:100")
266
+	otx.AddFlow("table=30, priority=0, arp, actions=drop")
267
+
268
+	// Table 40: ARP to local container, filled in by openshift-sdn-ovs
269
+	// eg, "table=40, priority=100, arp, nw_dst=${container_ip}, actions=output:${ovs_port}"
270
+	otx.AddFlow("table=40, priority=0, actions=drop")
271
+
272
+	// Table 50: ARP to remote container; filled in by AddHostSubnetRules()
273
+	// eg, "table=50, priority=100, arp, nw_dst=${remote_subnet_cidr}, actions=move:NXM_NX_REG0[]->NXM_NX_TUN_ID[0..31], set_field:${remote_node_ip}->tun_dst,output:1"
274
+	otx.AddFlow("table=50, priority=0, actions=drop")
275
+
276
+	// Table 60: IP to service: vnid/port mappings; filled in by AddServiceRules()
277
+	otx.AddFlow("table=60, priority=200, reg0=0, actions=output:2")
278
+	// eg, "table=60, priority=100, reg0=${tenant_id}, ${service_proto}, nw_dst=${service_ip}, tp_dst=${service_port}, actions=load:${tenant_id}->NXM_NX_REG1[], load:2->NXM_NX_REG2[], goto_table:80"
279
+	otx.AddFlow("table=60, priority=0, actions=drop")
280
+
281
+	// Table 70: IP to local container: vnid/port mappings; filled in by openshift-sdn-ovs
282
+	// eg, "table=70, priority=100, ip, nw_dst=${ipaddr}, actions=load:${tenant_id}->NXM_NX_REG1[], load:${ovs_port}->NXM_NX_REG2[], goto_table:80"
283
+	otx.AddFlow("table=70, priority=0, actions=drop")
284
+
285
+	// Table 80: IP policy enforcement; mostly managed by the osdnPolicy
286
+	otx.AddFlow("table=80, priority=300, ip, nw_src=%s/32, actions=output:NXM_NX_REG2[]", localSubnetGateway)
287
+	// eg, "table=80, priority=100, reg0=${tenant_id}, reg1=${tenant_id}, actions=output:NXM_NX_REG2[]"
288
+	otx.AddFlow("table=80, priority=0, actions=drop")
289
+
290
+	// Table 90: IP to remote container; filled in by AddHostSubnetRules()
291
+	// eg, "table=90, priority=100, ip, nw_dst=${remote_subnet_cidr}, actions=move:NXM_NX_REG0[]->NXM_NX_TUN_ID[0..31], set_field:${remote_node_ip}->tun_dst,output:1"
292
+	otx.AddFlow("table=90, priority=0, actions=drop")
293
+
294
+	// Table 100: egress network policy dispatch; edited by UpdateEgressNetworkPolicy()
295
+	// eg, "table=100, reg0=${tenant_id}, priority=2, ip, nw_dst=${external_cidr}, actions=drop
296
+	otx.AddFlow("table=100, priority=0, actions=output:2")
293 297
 
294 298
 	err = otx.EndTransaction()
295 299
 	if err != nil {
... ...
@@ -322,7 +331,7 @@ func (plugin *OsdnNode) SetupSDN() (bool, error) {
322 322
 
323 323
 	// Table 253: rule version; note action is hex bytes separated by '.'
324 324
 	otx = plugin.ovs.NewTransaction()
325
-	pluginVersion := getPluginVersion(plugin.multitenant)
325
+	pluginVersion := plugin.getPluginVersion()
326 326
 	otx.AddFlow("%s, %s%s.%s", VERSION_TABLE, VERSION_ACTION, pluginVersion[0], pluginVersion[1])
327 327
 	err = otx.EndTransaction()
328 328
 	if err != nil {
... ...
@@ -340,27 +349,27 @@ func policyNames(policies []osapi.EgressNetworkPolicy) string {
340 340
 	return strings.Join(names, ", ")
341 341
 }
342 342
 
343
-func (plugin *OsdnNode) updateEgressNetworkPolicyRules(vnid uint32) error {
343
+func (plugin *OsdnNode) updateEgressNetworkPolicyRules(vnid uint32) {
344 344
 	otx := plugin.ovs.NewTransaction()
345 345
 
346 346
 	policies := plugin.egressPolicies[vnid]
347
-	namespaces := plugin.vnids.GetNamespaces(vnid)
347
+	namespaces := plugin.policy.GetNamespaces(vnid)
348 348
 	if len(policies) == 0 {
349
-		otx.DeleteFlows("table=9, reg0=%d", vnid)
349
+		otx.DeleteFlows("table=100, reg0=%d", vnid)
350 350
 	} else if vnid == 0 {
351 351
 		glog.Errorf("EgressNetworkPolicy in global network namespace is not allowed (%s); ignoring", policyNames(policies))
352 352
 	} else if len(namespaces) > 1 {
353 353
 		glog.Errorf("EgressNetworkPolicy not allowed in shared NetNamespace (%s); dropping all traffic", strings.Join(namespaces, ", "))
354
-		otx.DeleteFlows("table=9, reg0=%d", vnid)
355
-		otx.AddFlow("table=9, reg0=%d, priority=1, actions=drop", vnid)
354
+		otx.DeleteFlows("table=100, reg0=%d", vnid)
355
+		otx.AddFlow("table=100, reg0=%d, priority=1, actions=drop", vnid)
356 356
 	} else if len(policies) > 1 {
357 357
 		glog.Errorf("multiple EgressNetworkPolicies in same network namespace (%s) is not allowed; dropping all traffic", policyNames(policies))
358
-		otx.DeleteFlows("table=9, reg0=%d", vnid)
359
-		otx.AddFlow("table=9, reg0=%d, priority=1, actions=drop", vnid)
358
+		otx.DeleteFlows("table=100, reg0=%d", vnid)
359
+		otx.AddFlow("table=100, reg0=%d, priority=1, actions=drop", vnid)
360 360
 	} else /* vnid != 0 && len(policies) == 1 */ {
361 361
 		// Temporarily drop all outgoing traffic, to avoid race conditions while modifying the other rules
362
-		otx.AddFlow("table=9, reg0=%d, cookie=1, priority=65535, actions=drop", vnid)
363
-		otx.DeleteFlows("table=9, reg0=%d, cookie=0/1", vnid)
362
+		otx.AddFlow("table=100, reg0=%d, cookie=1, priority=65535, actions=drop", vnid)
363
+		otx.DeleteFlows("table=100, reg0=%d, cookie=0/1", vnid)
364 364
 
365 365
 		for i, rule := range policies[0].Spec.Egress {
366 366
 			priority := len(policies[0].Spec.Egress) - i
... ...
@@ -379,99 +388,77 @@ func (plugin *OsdnNode) updateEgressNetworkPolicyRules(vnid uint32) error {
379 379
 				dst = fmt.Sprintf(", nw_dst=%s", rule.To.CIDRSelector)
380 380
 			}
381 381
 
382
-			otx.AddFlow("table=9, reg0=%d, priority=%d, ip%s, actions=%s", vnid, priority, dst, action)
382
+			otx.AddFlow("table=100, reg0=%d, priority=%d, ip%s, actions=%s", vnid, priority, dst, action)
383 383
 		}
384
-		otx.DeleteFlows("table=9, reg0=%d, cookie=1/1", vnid)
384
+		otx.DeleteFlows("table=100, reg0=%d, cookie=1/1", vnid)
385 385
 	}
386 386
 
387
-	err := otx.EndTransaction()
388
-	if err != nil {
389
-		return fmt.Errorf("Error updating OVS flows for EgressNetworkPolicy: %v", err)
387
+	if err := otx.EndTransaction(); err != nil {
388
+		glog.Errorf("Error updating OVS flows for EgressNetworkPolicy: %v", err)
390 389
 	}
391
-	return nil
392 390
 }
393 391
 
394
-func (plugin *OsdnNode) AddHostSubnetRules(subnet *osapi.HostSubnet) error {
392
+func (plugin *OsdnNode) AddHostSubnetRules(subnet *osapi.HostSubnet) {
395 393
 	glog.Infof("AddHostSubnetRules for %s", hostSubnetToString(subnet))
396 394
 	otx := plugin.ovs.NewTransaction()
397 395
 
398
-	otx.AddFlow("table=1, priority=100, tun_src=%s, actions=goto_table:5", subnet.HostIP)
396
+	otx.AddFlow("table=10, priority=100, tun_src=%s, actions=goto_table:30", subnet.HostIP)
399 397
 	if vnid, ok := subnet.Annotations[osapi.FixedVnidHost]; ok {
400
-		otx.AddFlow("table=8, priority=100, arp, nw_dst=%s, actions=load:%s->NXM_NX_TUN_ID[0..31],set_field:%s->tun_dst,output:1", subnet.Subnet, vnid, subnet.HostIP)
401
-		otx.AddFlow("table=8, priority=100, ip, nw_dst=%s, actions=load:%s->NXM_NX_TUN_ID[0..31],set_field:%s->tun_dst,output:1", subnet.Subnet, vnid, subnet.HostIP)
398
+		otx.AddFlow("table=50, priority=100, arp, nw_dst=%s, actions=load:%s->NXM_NX_TUN_ID[0..31],set_field:%s->tun_dst,output:1", subnet.Subnet, vnid, subnet.HostIP)
399
+		otx.AddFlow("table=90, priority=100, ip, nw_dst=%s, actions=load:%s->NXM_NX_TUN_ID[0..31],set_field:%s->tun_dst,output:1", subnet.Subnet, vnid, subnet.HostIP)
402 400
 	} else {
403
-		otx.AddFlow("table=8, priority=100, arp, nw_dst=%s, actions=move:NXM_NX_REG0[]->NXM_NX_TUN_ID[0..31],set_field:%s->tun_dst,output:1", subnet.Subnet, subnet.HostIP)
404
-		otx.AddFlow("table=8, priority=100, ip, nw_dst=%s, actions=move:NXM_NX_REG0[]->NXM_NX_TUN_ID[0..31],set_field:%s->tun_dst,output:1", subnet.Subnet, subnet.HostIP)
401
+		otx.AddFlow("table=50, priority=100, arp, nw_dst=%s, actions=move:NXM_NX_REG0[]->NXM_NX_TUN_ID[0..31],set_field:%s->tun_dst,output:1", subnet.Subnet, subnet.HostIP)
402
+		otx.AddFlow("table=90, priority=100, ip, nw_dst=%s, actions=move:NXM_NX_REG0[]->NXM_NX_TUN_ID[0..31],set_field:%s->tun_dst,output:1", subnet.Subnet, subnet.HostIP)
405 403
 	}
406 404
 
407
-	err := otx.EndTransaction()
408
-	if err != nil {
409
-		return fmt.Errorf("Error adding OVS flows for subnet: %v, %v", subnet, err)
405
+	if err := otx.EndTransaction(); err != nil {
406
+		glog.Errorf("Error adding OVS flows for subnet %q: %v", subnet.Subnet, err)
410 407
 	}
411
-	return nil
412 408
 }
413 409
 
414
-func (plugin *OsdnNode) DeleteHostSubnetRules(subnet *osapi.HostSubnet) error {
410
+func (plugin *OsdnNode) DeleteHostSubnetRules(subnet *osapi.HostSubnet) {
415 411
 	glog.Infof("DeleteHostSubnetRules for %s", hostSubnetToString(subnet))
416 412
 
417 413
 	otx := plugin.ovs.NewTransaction()
418
-	otx.DeleteFlows("table=1, tun_src=%s", subnet.HostIP)
419
-	otx.DeleteFlows("table=8, ip, nw_dst=%s", subnet.Subnet)
420
-	otx.DeleteFlows("table=8, arp, nw_dst=%s", subnet.Subnet)
421
-	err := otx.EndTransaction()
422
-	if err != nil {
423
-		return fmt.Errorf("Error deleting OVS flows for subnet: %v, %v", subnet, err)
414
+	otx.DeleteFlows("table=10, tun_src=%s", subnet.HostIP)
415
+	otx.DeleteFlows("table=50, arp, nw_dst=%s", subnet.Subnet)
416
+	otx.DeleteFlows("table=90, ip, nw_dst=%s", subnet.Subnet)
417
+	if err := otx.EndTransaction(); err != nil {
418
+		glog.Errorf("Error deleting OVS flows for subnet %q: %v", subnet.Subnet, err)
424 419
 	}
425
-	return nil
426 420
 }
427 421
 
428
-func (plugin *OsdnNode) AddServiceRules(service *kapi.Service, netID uint32) error {
429
-	if !plugin.multitenant {
430
-		return nil
431
-	}
432
-
422
+func (plugin *OsdnNode) AddServiceRules(service *kapi.Service, netID uint32) {
433 423
 	glog.V(5).Infof("AddServiceRules for %v", service)
434 424
 
435 425
 	otx := plugin.ovs.NewTransaction()
436 426
 	for _, port := range service.Spec.Ports {
437 427
 		otx.AddFlow(generateAddServiceRule(netID, service.Spec.ClusterIP, port.Protocol, int(port.Port)))
438
-		err := otx.EndTransaction()
439
-		if err != nil {
440
-			return fmt.Errorf("Error adding OVS flows for service: %v, netid: %d, %v", service, netID, err)
428
+		if err := otx.EndTransaction(); err != nil {
429
+			glog.Errorf("Error adding OVS flows for service %v, netid %d: %v", service, netID, err)
441 430
 		}
442 431
 	}
443
-	return nil
444 432
 }
445 433
 
446
-func (plugin *OsdnNode) DeleteServiceRules(service *kapi.Service) error {
447
-	if !plugin.multitenant {
448
-		return nil
449
-	}
450
-
434
+func (plugin *OsdnNode) DeleteServiceRules(service *kapi.Service) {
451 435
 	glog.V(5).Infof("DeleteServiceRules for %v", service)
452 436
 
453 437
 	otx := plugin.ovs.NewTransaction()
454 438
 	for _, port := range service.Spec.Ports {
455 439
 		otx.DeleteFlows(generateDeleteServiceRule(service.Spec.ClusterIP, port.Protocol, int(port.Port)))
456
-		err := otx.EndTransaction()
457
-		if err != nil {
458
-			return fmt.Errorf("Error deleting OVS flows for service: %v, %v", service, err)
440
+		if err := otx.EndTransaction(); err != nil {
441
+			glog.Errorf("Error deleting OVS flows for service %v: %v", service, err)
459 442
 		}
460 443
 	}
461
-	return nil
462 444
 }
463 445
 
464 446
 func generateBaseServiceRule(IP string, protocol kapi.Protocol, port int) string {
465
-	return fmt.Sprintf("table=4, %s, nw_dst=%s, tp_dst=%d", strings.ToLower(string(protocol)), IP, port)
447
+	return fmt.Sprintf("table=60, %s, nw_dst=%s, tp_dst=%d", strings.ToLower(string(protocol)), IP, port)
466 448
 }
467 449
 
468 450
 func generateAddServiceRule(netID uint32, IP string, protocol kapi.Protocol, port int) string {
469 451
 	baseRule := generateBaseServiceRule(IP, protocol, port)
470
-	if netID == 0 {
471
-		return fmt.Sprintf("%s, priority=100, actions=output:2", baseRule)
472
-	} else {
473
-		return fmt.Sprintf("%s, priority=100, reg0=%d, actions=output:2", baseRule, netID)
474
-	}
452
+	return fmt.Sprintf("%s, priority=100, actions=load:%d->NXM_NX_REG1[], load:2->NXM_NX_REG2[], goto_table:80", baseRule, netID)
475 453
 }
476 454
 
477 455
 func generateDeleteServiceRule(IP string, protocol kapi.Protocol, port int) string {
... ...
@@ -19,7 +19,7 @@ func (plugin *OsdnNode) SetupEgressNetworkPolicy() error {
19 19
 	}
20 20
 
21 21
 	for _, policy := range policies.Items {
22
-		vnid, err := plugin.vnids.GetVNID(policy.Namespace)
22
+		vnid, err := plugin.policy.GetVNID(policy.Namespace)
23 23
 		if err != nil {
24 24
 			glog.Warningf("Could not find netid for namespace %q: %v", policy.Namespace, err)
25 25
 			continue
... ...
@@ -28,10 +28,7 @@ func (plugin *OsdnNode) SetupEgressNetworkPolicy() error {
28 28
 	}
29 29
 
30 30
 	for vnid := range plugin.egressPolicies {
31
-		err := plugin.updateEgressNetworkPolicyRules(vnid)
32
-		if err != nil {
33
-			return err
34
-		}
31
+		plugin.updateEgressNetworkPolicyRules(vnid)
35 32
 	}
36 33
 
37 34
 	go utilwait.Forever(plugin.watchEgressNetworkPolicies, 0)
... ...
@@ -42,7 +39,7 @@ func (plugin *OsdnNode) watchEgressNetworkPolicies() {
42 42
 	RunEventQueue(plugin.osClient, EgressNetworkPolicies, func(delta cache.Delta) error {
43 43
 		policy := delta.Object.(*osapi.EgressNetworkPolicy)
44 44
 
45
-		vnid, err := plugin.vnids.GetVNID(policy.Namespace)
45
+		vnid, err := plugin.policy.GetVNID(policy.Namespace)
46 46
 		if err != nil {
47 47
 			return fmt.Errorf("Could not find netid for namespace %q: %v", policy.Namespace, err)
48 48
 		}
... ...
@@ -59,15 +56,12 @@ func (plugin *OsdnNode) watchEgressNetworkPolicies() {
59 59
 		}
60 60
 		plugin.egressPolicies[vnid] = policies
61 61
 
62
-		err = plugin.updateEgressNetworkPolicyRules(vnid)
63
-		if err != nil {
64
-			return err
65
-		}
62
+		plugin.updateEgressNetworkPolicyRules(vnid)
66 63
 		return nil
67 64
 	})
68 65
 }
69 66
 
70
-func (plugin *OsdnNode) UpdateEgressNetworkPolicyVNID(namespace string, oldVnid, newVnid uint32) error {
67
+func (plugin *OsdnNode) UpdateEgressNetworkPolicyVNID(namespace string, oldVnid, newVnid uint32) {
71 68
 	var policy *osapi.EgressNetworkPolicy
72 69
 
73 70
 	policies := plugin.egressPolicies[oldVnid]
... ...
@@ -75,21 +69,13 @@ func (plugin *OsdnNode) UpdateEgressNetworkPolicyVNID(namespace string, oldVnid,
75 75
 		if oldPolicy.Namespace == namespace {
76 76
 			policy = &oldPolicy
77 77
 			plugin.egressPolicies[oldVnid] = append(policies[:i], policies[i+1:]...)
78
-			err := plugin.updateEgressNetworkPolicyRules(oldVnid)
79
-			if err != nil {
80
-				return err
81
-			}
78
+			plugin.updateEgressNetworkPolicyRules(oldVnid)
82 79
 			break
83 80
 		}
84 81
 	}
85 82
 
86 83
 	if policy != nil {
87 84
 		plugin.egressPolicies[newVnid] = append(plugin.egressPolicies[newVnid], *policy)
88
-		err := plugin.updateEgressNetworkPolicyRules(newVnid)
89
-		if err != nil {
90
-			return err
91
-		}
85
+		plugin.updateEgressNetworkPolicyRules(newVnid)
92 86
 	}
93
-
94
-	return nil
95 87
 }
96 88
new file mode 100644
... ...
@@ -0,0 +1,186 @@
0
+package plugin
1
+
2
+import (
3
+	"sync"
4
+
5
+	"github.com/golang/glog"
6
+
7
+	kapi "k8s.io/kubernetes/pkg/api"
8
+
9
+	osapi "github.com/openshift/origin/pkg/sdn/api"
10
+)
11
+
12
+type multiTenantPlugin struct {
13
+	node  *OsdnNode
14
+	vnids *nodeVNIDMap
15
+
16
+	vnidRefsLock sync.Mutex
17
+	vnidRefs     map[uint32]int
18
+}
19
+
20
+func NewMultiTenantPlugin() osdnPolicy {
21
+	return &multiTenantPlugin{
22
+		vnidRefs: make(map[uint32]int),
23
+	}
24
+}
25
+
26
+func (mp *multiTenantPlugin) Name() string {
27
+	return osapi.MultiTenantPluginName
28
+}
29
+
30
+func (mp *multiTenantPlugin) Start(node *OsdnNode) error {
31
+	mp.node = node
32
+	mp.vnids = newNodeVNIDMap(mp, node.osClient)
33
+	if err := mp.vnids.Start(); err != nil {
34
+		return err
35
+	}
36
+
37
+	otx := node.ovs.NewTransaction()
38
+	otx.AddFlow("table=80, priority=200, reg0=0, actions=output:NXM_NX_REG2[]")
39
+	otx.AddFlow("table=80, priority=200, reg1=0, actions=output:NXM_NX_REG2[]")
40
+	if err := otx.EndTransaction(); err != nil {
41
+		return err
42
+	}
43
+
44
+	if err := mp.node.SetupEgressNetworkPolicy(); err != nil {
45
+		return err
46
+	}
47
+
48
+	return nil
49
+}
50
+
51
+func (mp *multiTenantPlugin) updatePodNetwork(namespace string, oldNetID, netID uint32) {
52
+	// FIXME: this is racy; traffic coming from the pods gets switched to the new
53
+	// VNID before the service and firewall rules are updated to match. We need
54
+	// to do the updates as a single transaction (ovs-ofctl --bundle).
55
+
56
+	pods, err := mp.node.GetLocalPods(namespace)
57
+	if err != nil {
58
+		glog.Errorf("Could not get list of local pods in namespace %q: %v", namespace, err)
59
+	}
60
+	services, err := mp.node.kClient.Core().Services(namespace).List(kapi.ListOptions{})
61
+	if err != nil {
62
+		glog.Errorf("Could not get list of services in namespace %q: %v", namespace, err)
63
+		services = &kapi.ServiceList{}
64
+	}
65
+
66
+	movedVNIDRefs := 0
67
+
68
+	// Update OF rules for the existing/old pods in the namespace
69
+	for _, pod := range pods {
70
+		err = mp.node.UpdatePod(pod)
71
+		if err == nil {
72
+			movedVNIDRefs++
73
+		} else {
74
+			glog.Errorf("Could not update pod %q in namespace %q: %v", pod.Name, namespace, err)
75
+		}
76
+	}
77
+
78
+	// Update OF rules for the old services in the namespace
79
+	for _, svc := range services.Items {
80
+		if !kapi.IsServiceIPSet(&svc) {
81
+			continue
82
+		}
83
+
84
+		mp.node.DeleteServiceRules(&svc)
85
+		mp.node.AddServiceRules(&svc, netID)
86
+		movedVNIDRefs++
87
+	}
88
+
89
+	if movedVNIDRefs > 0 {
90
+		mp.moveVNIDRefs(movedVNIDRefs, oldNetID, netID)
91
+	}
92
+
93
+	// Update namespace references in egress firewall rules
94
+	mp.node.UpdateEgressNetworkPolicyVNID(namespace, oldNetID, netID)
95
+}
96
+
97
+func (mp *multiTenantPlugin) AddNetNamespace(netns *osapi.NetNamespace) {
98
+	mp.updatePodNetwork(netns.Name, 0, netns.NetID)
99
+}
100
+
101
+func (mp *multiTenantPlugin) UpdateNetNamespace(netns *osapi.NetNamespace, oldNetID uint32) {
102
+	mp.updatePodNetwork(netns.Name, oldNetID, netns.NetID)
103
+}
104
+
105
+func (mp *multiTenantPlugin) DeleteNetNamespace(netns *osapi.NetNamespace) {
106
+	mp.updatePodNetwork(netns.Name, netns.NetID, 0)
107
+}
108
+
109
+func (mp *multiTenantPlugin) GetVNID(namespace string) (uint32, error) {
110
+	return mp.vnids.WaitAndGetVNID(namespace)
111
+}
112
+
113
+func (mp *multiTenantPlugin) GetNamespaces(vnid uint32) []string {
114
+	return mp.vnids.GetNamespaces(vnid)
115
+}
116
+
117
+func (mp *multiTenantPlugin) RefVNID(vnid uint32) {
118
+	if vnid == 0 {
119
+		return
120
+	}
121
+
122
+	mp.vnidRefsLock.Lock()
123
+	defer mp.vnidRefsLock.Unlock()
124
+	mp.vnidRefs[vnid] += 1
125
+	if mp.vnidRefs[vnid] > 1 {
126
+		return
127
+	}
128
+	glog.V(5).Infof("RefVNID %d adding rule", vnid)
129
+
130
+	otx := mp.node.ovs.NewTransaction()
131
+	otx.AddFlow("table=80, priority=100, reg0=%d, reg1=%d, actions=output:NXM_NX_REG2[]", vnid, vnid)
132
+	if err := otx.EndTransaction(); err != nil {
133
+		glog.Errorf("Error adding OVS flow for VNID: %v", err)
134
+	}
135
+}
136
+
137
+func (mp *multiTenantPlugin) UnrefVNID(vnid uint32) {
138
+	if vnid == 0 {
139
+		return
140
+	}
141
+
142
+	mp.vnidRefsLock.Lock()
143
+	defer mp.vnidRefsLock.Unlock()
144
+	if mp.vnidRefs[vnid] == 0 {
145
+		glog.Warningf("refcounting error on vnid %d", vnid)
146
+		return
147
+	}
148
+	mp.vnidRefs[vnid] -= 1
149
+	if mp.vnidRefs[vnid] > 0 {
150
+		return
151
+	}
152
+	glog.V(5).Infof("UnrefVNID %d removing rule", vnid)
153
+
154
+	otx := mp.node.ovs.NewTransaction()
155
+	otx.DeleteFlows("table=80, reg0=%d, reg1=%d", vnid, vnid)
156
+	if err := otx.EndTransaction(); err != nil {
157
+		glog.Errorf("Error deleting OVS flow for VNID: %v", err)
158
+	}
159
+}
160
+
161
+func (mp *multiTenantPlugin) moveVNIDRefs(num int, oldVNID, newVNID uint32) {
162
+	glog.V(5).Infof("moveVNIDRefs %d -> %d", oldVNID, newVNID)
163
+
164
+	mp.vnidRefsLock.Lock()
165
+	defer mp.vnidRefsLock.Unlock()
166
+
167
+	otx := mp.node.ovs.NewTransaction()
168
+	if mp.vnidRefs[oldVNID] <= num {
169
+		otx.DeleteFlows("table=80, reg0=%d, reg1=%d", oldVNID, oldVNID)
170
+	}
171
+	if mp.vnidRefs[newVNID] == 0 {
172
+		otx.AddFlow("table=80, priority=100, reg0=%d, reg1=%d, actions=output:NXM_NX_REG2[]", newVNID, newVNID)
173
+	}
174
+	err := otx.EndTransaction()
175
+	if err != nil {
176
+		glog.Errorf("Error modifying OVS flows for VNID: %v", err)
177
+	}
178
+
179
+	mp.vnidRefs[oldVNID] -= num
180
+	if mp.vnidRefs[oldVNID] < 0 {
181
+		glog.Warningf("refcounting error on vnid %d", oldVNID)
182
+		mp.vnidRefs[oldVNID] = 0
183
+	}
184
+	mp.vnidRefs[newVNID] += num
185
+}
... ...
@@ -20,6 +20,7 @@ import (
20 20
 	docker "github.com/fsouza/go-dockerclient"
21 21
 
22 22
 	kapi "k8s.io/kubernetes/pkg/api"
23
+	"k8s.io/kubernetes/pkg/client/cache"
23 24
 	kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
24 25
 	"k8s.io/kubernetes/pkg/fields"
25 26
 	knetwork "k8s.io/kubernetes/pkg/kubelet/network"
... ...
@@ -29,8 +30,23 @@ import (
29 29
 	kwait "k8s.io/kubernetes/pkg/util/wait"
30 30
 )
31 31
 
32
+type osdnPolicy interface {
33
+	Name() string
34
+	Start(node *OsdnNode) error
35
+
36
+	AddNetNamespace(netns *osapi.NetNamespace)
37
+	UpdateNetNamespace(netns *osapi.NetNamespace, oldNetID uint32)
38
+	DeleteNetNamespace(netns *osapi.NetNamespace)
39
+
40
+	GetVNID(namespace string) (uint32, error)
41
+	GetNamespaces(vnid uint32) []string
42
+
43
+	RefVNID(vnid uint32)
44
+	UnrefVNID(vnid uint32)
45
+}
46
+
32 47
 type OsdnNode struct {
33
-	multitenant        bool
48
+	policy             osdnPolicy
34 49
 	kClient            *kclientset.Clientset
35 50
 	osClient           *osclient.Client
36 51
 	ovs                *ovs.Interface
... ...
@@ -41,7 +57,6 @@ type OsdnNode struct {
41 41
 	hostName           string
42 42
 	podNetworkReady    chan struct{}
43 43
 	kubeletInitReady   chan struct{}
44
-	vnids              *nodeVNIDMap
45 44
 	iptablesSyncPeriod time.Duration
46 45
 	mtu                uint32
47 46
 	egressPolicies     map[uint32][]osapi.EgressNetworkPolicy
... ...
@@ -54,7 +69,14 @@ type OsdnNode struct {
54 54
 
55 55
 // Called by higher layers to create the plugin SDN node instance
56 56
 func NewNodePlugin(pluginName string, osClient *osclient.Client, kClient *kclientset.Clientset, hostname string, selfIP string, iptablesSyncPeriod time.Duration, mtu uint32) (*OsdnNode, error) {
57
-	if !osapi.IsOpenShiftNetworkPlugin(pluginName) {
57
+	var policy osdnPolicy
58
+	switch strings.ToLower(pluginName) {
59
+	case osapi.SingleTenantPluginName:
60
+		policy = NewSingleTenantPlugin()
61
+	case osapi.MultiTenantPluginName:
62
+		policy = NewMultiTenantPlugin()
63
+	default:
64
+		// Not an OpenShift plugin
58 65
 		return nil, nil
59 66
 	}
60 67
 
... ...
@@ -88,13 +110,12 @@ func NewNodePlugin(pluginName string, osClient *osclient.Client, kClient *kclien
88 88
 	}
89 89
 
90 90
 	plugin := &OsdnNode{
91
-		multitenant:        osapi.IsOpenShiftMultitenantNetworkPlugin(pluginName),
91
+		policy:             policy,
92 92
 		kClient:            kClient,
93 93
 		osClient:           osClient,
94 94
 		ovs:                ovsif,
95 95
 		localIP:            selfIP,
96 96
 		hostName:           hostname,
97
-		vnids:              newNodeVNIDMap(),
98 97
 		podNetworkReady:    make(chan struct{}),
99 98
 		kubeletInitReady:   make(chan struct{}),
100 99
 		iptablesSyncPeriod: iptablesSyncPeriod,
... ...
@@ -195,14 +216,10 @@ func (node *OsdnNode) Start() error {
195 195
 		return err
196 196
 	}
197 197
 
198
-	if node.multitenant {
199
-		if err = node.VnidStartNode(); err != nil {
200
-			return err
201
-		}
202
-		if err = node.SetupEgressNetworkPolicy(); err != nil {
203
-			return err
204
-		}
198
+	if err = node.policy.Start(node); err != nil {
199
+		return err
205 200
 	}
201
+	go kwait.Forever(node.watchServices, 0)
206 202
 
207 203
 	// Wait for kubelet to init the plugin so we get a knetwork.Host
208 204
 	log.V(5).Infof("Waiting for kubelet network plugin initialization")
... ...
@@ -217,7 +234,7 @@ func (node *OsdnNode) Start() error {
217 217
 		})
218 218
 
219 219
 	log.V(5).Infof("Creating and initializing openshift-sdn pod manager")
220
-	node.podManager, err = newPodManager(node.host, node.multitenant, node.localSubnetCIDR, node.networkInfo, node.kClient, node.vnids, node.mtu)
220
+	node.podManager, err = newPodManager(node.host, node.localSubnetCIDR, node.networkInfo, node.kClient, node.policy, node.mtu)
221 221
 	if err != nil {
222 222
 		return err
223 223
 	}
... ...
@@ -235,6 +252,10 @@ func (node *OsdnNode) Start() error {
235 235
 			err = node.UpdatePod(p)
236 236
 			if err != nil {
237 237
 				log.Warningf("Could not update pod %q: %s", p.Name, err)
238
+				continue
239
+			}
240
+			if vnid, err := node.policy.GetVNID(p.Namespace); err == nil {
241
+				node.policy.RefVNID(vnid)
238 242
 			}
239 243
 		}
240 244
 	}
... ...
@@ -295,3 +316,60 @@ func (node *OsdnNode) IsPodNetworkReady() error {
295 295
 		return fmt.Errorf("SDN pod network is not ready")
296 296
 	}
297 297
 }
298
+
299
+func isServiceChanged(oldsvc, newsvc *kapi.Service) bool {
300
+	if len(oldsvc.Spec.Ports) == len(newsvc.Spec.Ports) {
301
+		for i := range oldsvc.Spec.Ports {
302
+			if oldsvc.Spec.Ports[i].Protocol != newsvc.Spec.Ports[i].Protocol ||
303
+				oldsvc.Spec.Ports[i].Port != newsvc.Spec.Ports[i].Port {
304
+				return true
305
+			}
306
+		}
307
+		return false
308
+	}
309
+	return true
310
+}
311
+
312
+func (node *OsdnNode) watchServices() {
313
+	services := make(map[string]*kapi.Service)
314
+	RunEventQueue(node.kClient.CoreClient, Services, func(delta cache.Delta) error {
315
+		serv := delta.Object.(*kapi.Service)
316
+
317
+		// Ignore headless services
318
+		if !kapi.IsServiceIPSet(serv) {
319
+			return nil
320
+		}
321
+
322
+		log.V(5).Infof("Watch %s event for Service %q", delta.Type, serv.ObjectMeta.Name)
323
+		switch delta.Type {
324
+		case cache.Sync, cache.Added, cache.Updated:
325
+			oldsvc, exists := services[string(serv.UID)]
326
+			if exists {
327
+				if !isServiceChanged(oldsvc, serv) {
328
+					break
329
+				}
330
+				node.DeleteServiceRules(oldsvc)
331
+			}
332
+
333
+			netid, err := node.policy.GetVNID(serv.Namespace)
334
+			if err != nil {
335
+				return fmt.Errorf("skipped adding service rules for serviceEvent: %v, Error: %v", delta.Type, err)
336
+			}
337
+
338
+			node.AddServiceRules(serv, netid)
339
+			services[string(serv.UID)] = serv
340
+			if !exists {
341
+				node.policy.RefVNID(netid)
342
+			}
343
+		case cache.Deleted:
344
+			delete(services, string(serv.UID))
345
+			node.DeleteServiceRules(serv)
346
+
347
+			netid, err := node.policy.GetVNID(serv.Namespace)
348
+			if err == nil {
349
+				node.policy.UnrefVNID(netid)
350
+			}
351
+		}
352
+		return nil
353
+	})
354
+}
... ...
@@ -33,9 +33,8 @@ type podManager struct {
33 33
 	runningPods map[string]*kubehostport.RunningPod
34 34
 
35 35
 	// Live pod setup/teardown stuff not used in testing code
36
-	multitenant     bool
37 36
 	kClient         *kclientset.Clientset
38
-	vnids           *nodeVNIDMap
37
+	policy          osdnPolicy
39 38
 	ipamConfig      []byte
40 39
 	mtu             uint32
41 40
 	hostportHandler kubehostport.HostportHandler
... ...
@@ -43,11 +42,10 @@ type podManager struct {
43 43
 }
44 44
 
45 45
 // Creates a new live podManager; used by node code
46
-func newPodManager(host knetwork.Host, multitenant bool, localSubnetCIDR string, netInfo *NetworkInfo, kClient *kclientset.Clientset, vnids *nodeVNIDMap, mtu uint32) (*podManager, error) {
46
+func newPodManager(host knetwork.Host, localSubnetCIDR string, netInfo *NetworkInfo, kClient *kclientset.Clientset, policy osdnPolicy, mtu uint32) (*podManager, error) {
47 47
 	pm := newDefaultPodManager(host)
48
-	pm.multitenant = multitenant
49 48
 	pm.kClient = kClient
50
-	pm.vnids = vnids
49
+	pm.policy = policy
51 50
 	pm.mtu = mtu
52 51
 	pm.hostportHandler = kubehostport.NewHostportHandler()
53 52
 	pm.podHandler = pm
... ...
@@ -91,11 +91,9 @@ func (m *podManager) getPodConfig(req *cniserver.PodRequest) (*PodConfig, *kapi.
91 91
 	var err error
92 92
 
93 93
 	config := &PodConfig{}
94
-	if m.multitenant {
95
-		config.vnid, err = m.vnids.GetVNID(req.PodNamespace)
96
-		if err != nil {
97
-			return nil, nil, err
98
-		}
94
+	config.vnid, err = m.policy.GetVNID(req.PodNamespace)
95
+	if err != nil {
96
+		return nil, nil, err
99 97
 	}
100 98
 
101 99
 	pod, err := m.kClient.Pods(req.PodNamespace).Get(req.PodName)
... ...
@@ -449,6 +447,7 @@ func (m *podManager) setup(req *cniserver.PodRequest) (*cnitypes.Result, *kubeho
449 449
 		return nil, nil, err
450 450
 	}
451 451
 
452
+	m.policy.RefVNID(podConfig.vnid)
452 453
 	success = true
453 454
 	return ipamResult, newPod, nil
454 455
 }
... ...
@@ -523,6 +522,9 @@ func (m *podManager) teardown(req *cniserver.PodRequest) error {
523 523
 			return err
524 524
 		}
525 525
 	}
526
+	if vnid, err := m.policy.GetVNID(req.PodNamespace); err == nil {
527
+		m.policy.UnrefVNID(vnid)
528
+	}
526 529
 
527 530
 	if err := m.ipamDel(req.ContainerId); err != nil {
528 531
 		return err
529 532
new file mode 100644
... ...
@@ -0,0 +1,44 @@
0
+package plugin
1
+
2
+import (
3
+	osapi "github.com/openshift/origin/pkg/sdn/api"
4
+)
5
+
6
+type singleTenantPlugin struct{}
7
+
8
+func NewSingleTenantPlugin() osdnPolicy {
9
+	return &singleTenantPlugin{}
10
+}
11
+
12
+func (sp *singleTenantPlugin) Name() string {
13
+	return osapi.SingleTenantPluginName
14
+}
15
+
16
+func (sp *singleTenantPlugin) Start(node *OsdnNode) error {
17
+	otx := node.ovs.NewTransaction()
18
+	otx.AddFlow("table=80, priority=200, actions=output:NXM_NX_REG2[]")
19
+	return otx.EndTransaction()
20
+}
21
+
22
+func (sp *singleTenantPlugin) AddNetNamespace(netns *osapi.NetNamespace) {
23
+}
24
+
25
+func (sp *singleTenantPlugin) UpdateNetNamespace(netns *osapi.NetNamespace, oldNetID uint32) {
26
+}
27
+
28
+func (sp *singleTenantPlugin) DeleteNetNamespace(netns *osapi.NetNamespace) {
29
+}
30
+
31
+func (sp *singleTenantPlugin) GetVNID(namespace string) (uint32, error) {
32
+	return 0, nil
33
+}
34
+
35
+func (sp *singleTenantPlugin) GetNamespaces(vnid uint32) []string {
36
+	return nil
37
+}
38
+
39
+func (sp *singleTenantPlugin) RefVNID(vnid uint32) {
40
+}
41
+
42
+func (sp *singleTenantPlugin) UnrefVNID(vnid uint32) {
43
+}
... ...
@@ -278,9 +278,7 @@ func (node *OsdnNode) watchSubnets() {
278 278
 					break
279 279
 				} else {
280 280
 					// Delete old subnet rules
281
-					if err := node.DeleteHostSubnetRules(oldSubnet); err != nil {
282
-						return err
283
-					}
281
+					node.DeleteHostSubnetRules(oldSubnet)
284 282
 				}
285 283
 			}
286 284
 			if err := node.networkInfo.validateNodeIP(hs.HostIP); err != nil {
... ...
@@ -288,15 +286,11 @@ func (node *OsdnNode) watchSubnets() {
288 288
 				break
289 289
 			}
290 290
 
291
-			if err := node.AddHostSubnetRules(hs); err != nil {
292
-				return err
293
-			}
291
+			node.AddHostSubnetRules(hs)
294 292
 			subnets[string(hs.UID)] = hs
295 293
 		case cache.Deleted:
296 294
 			delete(subnets, string(hs.UID))
297
-			if err := node.DeleteHostSubnetRules(hs); err != nil {
298
-				return err
299
-			}
295
+			node.DeleteHostSubnetRules(hs)
300 296
 		}
301 297
 		return nil
302 298
 	})
... ...
@@ -9,7 +9,6 @@ import (
9 9
 
10 10
 	kapi "k8s.io/kubernetes/pkg/api"
11 11
 	"k8s.io/kubernetes/pkg/client/cache"
12
-	kerrors "k8s.io/kubernetes/pkg/util/errors"
13 12
 	"k8s.io/kubernetes/pkg/util/sets"
14 13
 	utilwait "k8s.io/kubernetes/pkg/util/wait"
15 14
 
... ...
@@ -18,14 +17,19 @@ import (
18 18
 )
19 19
 
20 20
 type nodeVNIDMap struct {
21
+	policy   osdnPolicy
22
+	osClient *osclient.Client
23
+
21 24
 	// Synchronizes add or remove ids/namespaces
22 25
 	lock       sync.Mutex
23 26
 	ids        map[string]uint32
24 27
 	namespaces map[uint32]sets.String
25 28
 }
26 29
 
27
-func newNodeVNIDMap() *nodeVNIDMap {
30
+func newNodeVNIDMap(policy osdnPolicy, osClient *osclient.Client) *nodeVNIDMap {
28 31
 	return &nodeVNIDMap{
32
+		policy:     policy,
33
+		osClient:   osClient,
29 34
 		ids:        make(map[string]uint32),
30 35
 		namespaces: make(map[uint32]sets.String),
31 36
 	}
... ...
@@ -122,8 +126,8 @@ func (vmap *nodeVNIDMap) unsetVNID(name string) (id uint32, err error) {
122 122
 	return id, nil
123 123
 }
124 124
 
125
-func (vmap *nodeVNIDMap) populateVNIDs(osClient *osclient.Client) error {
126
-	nets, err := osClient.NetNamespaces().List(kapi.ListOptions{})
125
+func (vmap *nodeVNIDMap) populateVNIDs() error {
126
+	nets, err := vmap.osClient.NetNamespaces().List(kapi.ListOptions{})
127 127
 	if err != nil {
128 128
 		return err
129 129
 	}
... ...
@@ -134,147 +138,39 @@ func (vmap *nodeVNIDMap) populateVNIDs(osClient *osclient.Client) error {
134 134
 	return nil
135 135
 }
136 136
 
137
-//------------------ Node Methods --------------------
138
-
139
-func (node *OsdnNode) VnidStartNode() error {
137
+func (vmap *nodeVNIDMap) Start() error {
140 138
 	// Populate vnid map synchronously so that existing services can fetch vnid
141
-	err := node.vnids.populateVNIDs(node.osClient)
139
+	err := vmap.populateVNIDs()
142 140
 	if err != nil {
143 141
 		return err
144 142
 	}
145 143
 
146
-	go utilwait.Forever(node.watchNetNamespaces, 0)
147
-	go utilwait.Forever(node.watchServices, 0)
144
+	go utilwait.Forever(vmap.watchNetNamespaces, 0)
148 145
 	return nil
149 146
 }
150 147
 
151
-func (node *OsdnNode) updatePodNetwork(namespace string, oldNetID, netID uint32) error {
152
-	// FIXME: this is racy; traffic coming from the pods gets switched to the new
153
-	// VNID before the service and firewall rules are updated to match. We need
154
-	// to do the updates as a single transaction (ovs-ofctl --bundle).
155
-
156
-	pods, err := node.GetLocalPods(namespace)
157
-	if err != nil {
158
-		return err
159
-	}
160
-	services, err := node.kClient.Services(namespace).List(kapi.ListOptions{})
161
-	if err != nil {
162
-		return err
163
-	}
164
-
165
-	errList := []error{}
166
-
167
-	// Update OF rules for the existing/old pods in the namespace
168
-	for _, pod := range pods {
169
-		err = node.UpdatePod(pod)
170
-		if err != nil {
171
-			errList = append(errList, err)
172
-		}
173
-	}
174
-
175
-	// Update OF rules for the old services in the namespace
176
-	for _, svc := range services.Items {
177
-		if !kapi.IsServiceIPSet(&svc) {
178
-			continue
179
-		}
180
-
181
-		if err = node.DeleteServiceRules(&svc); err != nil {
182
-			log.Error(err)
183
-		}
184
-		if err = node.AddServiceRules(&svc, netID); err != nil {
185
-			errList = append(errList, err)
186
-		}
187
-	}
188
-
189
-	// Update namespace references in egress firewall rules
190
-	if err = node.UpdateEgressNetworkPolicyVNID(namespace, oldNetID, netID); err != nil {
191
-		errList = append(errList, err)
192
-	}
193
-
194
-	return kerrors.NewAggregate(errList)
195
-}
196
-
197
-func (node *OsdnNode) watchNetNamespaces() {
198
-	RunEventQueue(node.osClient, NetNamespaces, func(delta cache.Delta) error {
148
+func (vmap *nodeVNIDMap) watchNetNamespaces() {
149
+	RunEventQueue(vmap.osClient, NetNamespaces, func(delta cache.Delta) error {
199 150
 		netns := delta.Object.(*osapi.NetNamespace)
200 151
 
201 152
 		log.V(5).Infof("Watch %s event for NetNamespace %q", delta.Type, netns.ObjectMeta.Name)
202 153
 		switch delta.Type {
203 154
 		case cache.Sync, cache.Added, cache.Updated:
204 155
 			// Skip this event if the old and new network ids are same
205
-			oldNetID, err := node.vnids.GetVNID(netns.NetName)
156
+			oldNetID, err := vmap.GetVNID(netns.NetName)
206 157
 			if (err == nil) && (oldNetID == netns.NetID) {
207 158
 				break
208 159
 			}
209
-			node.vnids.setVNID(netns.NetName, netns.NetID)
160
+			vmap.setVNID(netns.NetName, netns.NetID)
210 161
 
211
-			err = node.updatePodNetwork(netns.NetName, oldNetID, netns.NetID)
212
-			if err != nil {
213
-				node.vnids.setVNID(netns.NetName, oldNetID)
214
-				return fmt.Errorf("failed to update pod network for namespace '%s', error: %s", netns.NetName, err)
162
+			if delta.Type == cache.Added {
163
+				vmap.policy.AddNetNamespace(netns)
164
+			} else {
165
+				vmap.policy.UpdateNetNamespace(netns, oldNetID)
215 166
 			}
216 167
 		case cache.Deleted:
217
-			// updatePodNetwork needs vnid, so unset vnid after this call
218
-			err := node.updatePodNetwork(netns.NetName, netns.NetID, osapi.GlobalVNID)
219
-			if err != nil {
220
-				return fmt.Errorf("failed to update pod network for namespace '%s', error: %s", netns.NetName, err)
221
-			}
222
-			node.vnids.unsetVNID(netns.NetName)
223
-		}
224
-		return nil
225
-	})
226
-}
227
-
228
-func isServiceChanged(oldsvc, newsvc *kapi.Service) bool {
229
-	if len(oldsvc.Spec.Ports) == len(newsvc.Spec.Ports) {
230
-		for i := range oldsvc.Spec.Ports {
231
-			if oldsvc.Spec.Ports[i].Protocol != newsvc.Spec.Ports[i].Protocol ||
232
-				oldsvc.Spec.Ports[i].Port != newsvc.Spec.Ports[i].Port {
233
-				return true
234
-			}
235
-		}
236
-		return false
237
-	}
238
-	return true
239
-}
240
-
241
-func (node *OsdnNode) watchServices() {
242
-	services := make(map[string]*kapi.Service)
243
-	RunEventQueue(node.kClient.CoreClient, Services, func(delta cache.Delta) error {
244
-		serv := delta.Object.(*kapi.Service)
245
-
246
-		// Ignore headless services
247
-		if !kapi.IsServiceIPSet(serv) {
248
-			return nil
249
-		}
250
-
251
-		log.V(5).Infof("Watch %s event for Service %q", delta.Type, serv.ObjectMeta.Name)
252
-		switch delta.Type {
253
-		case cache.Sync, cache.Added, cache.Updated:
254
-			oldsvc, exists := services[string(serv.UID)]
255
-			if exists {
256
-				if !isServiceChanged(oldsvc, serv) {
257
-					break
258
-				}
259
-				if err := node.DeleteServiceRules(oldsvc); err != nil {
260
-					log.Error(err)
261
-				}
262
-			}
263
-
264
-			netid, err := node.vnids.WaitAndGetVNID(serv.Namespace)
265
-			if err != nil {
266
-				return fmt.Errorf("skipped adding service rules for serviceEvent: %v, Error: %v", delta.Type, err)
267
-			}
268
-
269
-			if err = node.AddServiceRules(serv, netid); err != nil {
270
-				return err
271
-			}
272
-			services[string(serv.UID)] = serv
273
-		case cache.Deleted:
274
-			delete(services, string(serv.UID))
275
-			if err := node.DeleteServiceRules(serv); err != nil {
276
-				return err
277
-			}
168
+			vmap.policy.DeleteNetNamespace(netns)
169
+			vmap.unsetVNID(netns.NetName)
278 170
 		}
279 171
 		return nil
280 172
 	})
... ...
@@ -9,7 +9,7 @@ import (
9 9
 )
10 10
 
11 11
 func TestNodeVNIDMap(t *testing.T) {
12
-	vmap := newNodeVNIDMap()
12
+	vmap := newNodeVNIDMap(nil, nil)
13 13
 
14 14
 	// empty vmap
15 15
 
... ...
@@ -47,40 +47,14 @@ var _ = Describe("[networking] OVS", func() {
47 47
 				}
48 48
 			}
49 49
 
50
-			var otherFlows []string
51
-			var arpOut, ipOut, arpIn, ipInGeneric, ipInGlobal, ipInIsolated bool
50
+			foundPodFlow := false
52 51
 			for _, flow := range newFlows[deployNodeName] {
53
-				if strings.Contains(flow, ip) {
54
-					if strings.Contains(flow, "arp_spa="+ip) {
55
-						arpOut = true
56
-					} else if strings.Contains(flow, "arp_tpa="+ip) {
57
-						arpIn = true
58
-					} else if strings.Contains(flow, "nw_src="+ip) {
59
-						ipOut = true
60
-					} else if strings.Contains(flow, "nw_dst="+ip) {
61
-						if strings.Contains(flow, "reg0=0x") {
62
-							ipInIsolated = true
63
-						} else if strings.Contains(flow, "reg0=0") {
64
-							ipInGlobal = true
65
-						} else {
66
-							ipInGeneric = true
67
-						}
68
-					} else {
69
-						Fail("found unexpected OVS flow: " + flow)
70
-					}
71
-				} else {
72
-					otherFlows = append(otherFlows, flow)
52
+				if strings.Contains(flow, "="+ip+",") || strings.Contains(flow, "="+ip+" ") {
53
+					foundPodFlow = true
54
+					break
73 55
 				}
74 56
 			}
75
-			Expect(arpOut).To(BeTrue(), "Should have an outgoing ARP rule")
76
-			Expect(arpIn).To(BeTrue(), "Should have an incoming ARP rule")
77
-			Expect(ipOut).To(BeTrue(), "Should have an outgoing IP rule")
78
-			if pluginIsolatesNamespaces() {
79
-				Expect(ipInGlobal && ipInIsolated).To(BeTrue(), "Should have global and isolated incoming IP rules")
80
-			} else {
81
-				Expect(ipInGeneric).To(BeTrue(), "Should have a generic incoming IP rule")
82
-			}
83
-			Expect(reflect.DeepEqual(origFlows[deployNodeName], otherFlows)).To(BeTrue(), "Flows on deployed-to node should be unchanged except for the new pod")
57
+			Expect(foundPodFlow).To(BeTrue(), "Should have flows referring to pod IP address")
84 58
 
85 59
 			err := f1.Client.Pods(f1.Namespace.Name).Delete(podName, nil)
86 60
 			Expect(err).NotTo(HaveOccurred())
... ...
@@ -150,29 +124,14 @@ var _ = Describe("[networking] OVS", func() {
150 150
 
151 151
 			newFlows := getFlowsForAllNodes(oc, nodes.Items)
152 152
 			for nodeName := range newFlows {
153
-				var otherFlows []string
154
-				var tunIn, arpTunOut, ipTunOut bool
155
-
153
+				foundNodeFlow := false
156 154
 				for _, flow := range newFlows[nodeName] {
157
-					if strings.Contains(flow, newNodeIP) {
158
-						if strings.Contains(flow, "tun_src="+newNodeIP) {
159
-							tunIn = true
160
-						} else if strings.Contains(flow, "arp,") && strings.Contains(flow, newNodeIP+"->tun_dst") {
161
-							arpTunOut = true
162
-						} else if strings.Contains(flow, "ip,") && strings.Contains(flow, newNodeIP+"->tun_dst") {
163
-							ipTunOut = true
164
-						} else {
165
-							Fail("found unexpected OVS flow: " + flow)
166
-						}
167
-					} else {
168
-						otherFlows = append(otherFlows, flow)
155
+					if strings.Contains(flow, "="+newNodeIP+",") || strings.Contains(flow, "="+newNodeIP+" ") {
156
+						foundNodeFlow = true
157
+						break
169 158
 					}
170 159
 				}
171
-
172
-				Expect(tunIn).To(BeTrue(), "Should have an incoming VXLAN tunnel rule")
173
-				Expect(arpTunOut).To(BeTrue(), "Should have an outgoing ARP VXLAN tunnel rule")
174
-				Expect(ipTunOut).To(BeTrue(), "Should have an outgoing IP VXLAN tunnel rule")
175
-				Expect(reflect.DeepEqual(origFlows[nodeName], otherFlows)).To(BeTrue(), "Flows should be unchanged except for the new node")
160
+				Expect(foundNodeFlow).To(BeTrue(), "Should have flows referring to node IP address")
176 161
 			}
177 162
 
178 163
 			err = f1.Client.Nodes().Delete(node.Name)
... ...
@@ -208,7 +167,7 @@ var _ = Describe("[networking] OVS", func() {
208 208
 			for _, node := range nodes.Items {
209 209
 				foundServiceFlow := false
210 210
 				for _, flow := range newFlows[node.Name] {
211
-					if strings.Contains(flow, "nw_dst="+ip) {
211
+					if strings.Contains(flow, "nw_dst="+ip+",") || strings.Contains(flow, "nw_dst="+ip+" ") {
212 212
 						foundServiceFlow = true
213 213
 						break
214 214
 					}