Browse code

Kill off SDN "Registry" type

This used to be an abstraction between kclient-based access and direct
etcd access, back when openshift-sdn could be compiled standalone, but
now it's just cruft.

Dan Winship authored on 2016/08/30 02:05:21
Showing 10 changed files
... ...
@@ -4,11 +4,16 @@ import (
4 4
 	"fmt"
5 5
 	"net"
6 6
 	"strings"
7
+	"time"
8
+
9
+	"github.com/golang/glog"
7 10
 
8 11
 	osclient "github.com/openshift/origin/pkg/client"
9 12
 	osapi "github.com/openshift/origin/pkg/sdn/api"
10 13
 
11 14
 	kapi "k8s.io/kubernetes/pkg/api"
15
+	kcache "k8s.io/kubernetes/pkg/client/cache"
16
+	"k8s.io/kubernetes/pkg/fields"
12 17
 )
13 18
 
14 19
 func getPodContainerID(pod *kapi.Pod) string {
... ...
@@ -25,6 +30,10 @@ func hostSubnetToString(subnet *osapi.HostSubnet) string {
25 25
 	return fmt.Sprintf("%s (host: %q, ip: %q, subnet: %q)", subnet.Name, subnet.Host, subnet.HostIP, subnet.Subnet)
26 26
 }
27 27
 
28
+func clusterNetworkToString(n *osapi.ClusterNetwork) string {
29
+	return fmt.Sprintf("%s (network: %q, hostSubnetBits: %d, serviceNetwork: %q, pluginName: %q)", n.Name, n.Network, n.HostSubnetLength, n.ServiceNetwork, n.PluginName)
30
+}
31
+
28 32
 type NetworkInfo struct {
29 33
 	ClusterNetwork *net.IPNet
30 34
 	ServiceNetwork *net.IPNet
... ...
@@ -76,3 +85,68 @@ func getNetworkInfo(osClient *osclient.Client) (*NetworkInfo, error) {
76 76
 
77 77
 	return parseNetworkInfo(cn.Network, cn.ServiceNetwork)
78 78
 }
79
+
80
+type ResourceName string
81
+
82
+const (
83
+	Nodes                 ResourceName = "Nodes"
84
+	Namespaces            ResourceName = "Namespaces"
85
+	NetNamespaces         ResourceName = "NetNamespaces"
86
+	Services              ResourceName = "Services"
87
+	HostSubnets           ResourceName = "HostSubnets"
88
+	Pods                  ResourceName = "Pods"
89
+	EgressNetworkPolicies ResourceName = "EgressNetworkPolicies"
90
+)
91
+
92
+// Run event queue for the given resource. The 'process' function is called
93
+// repeatedly with each available cache.Delta that describes state changes
94
+// to an object. If the process function returns an error queued changes
95
+// for that object are dropped but processing continues with the next available
96
+// object's cache.Deltas.  The error is logged with call stack information.
97
+func runEventQueueForResource(client kcache.Getter, resourceName ResourceName, expectedType interface{}, selector fields.Selector, process ProcessEventFunc) {
98
+	rn := strings.ToLower(string(resourceName))
99
+	lw := kcache.NewListWatchFromClient(client, rn, kapi.NamespaceAll, selector)
100
+	eventQueue := NewEventQueue(kcache.MetaNamespaceKeyFunc)
101
+	// Repopulate event queue every 30 mins
102
+	// Existing items in the event queue will have watch.Modified event type
103
+	kcache.NewReflector(lw, expectedType, eventQueue, 30*time.Minute).Run()
104
+
105
+	// Run the queue
106
+	for {
107
+		eventQueue.Pop(process)
108
+	}
109
+}
110
+
111
+// Run event queue for the given resource
112
+func RunEventQueue(client kcache.Getter, resourceName ResourceName, process ProcessEventFunc) {
113
+	var expectedType interface{}
114
+
115
+	switch resourceName {
116
+	case HostSubnets:
117
+		expectedType = &osapi.HostSubnet{}
118
+	case NetNamespaces:
119
+		expectedType = &osapi.NetNamespace{}
120
+	case Nodes:
121
+		expectedType = &kapi.Node{}
122
+	case Namespaces:
123
+		expectedType = &kapi.Namespace{}
124
+	case Services:
125
+		expectedType = &kapi.Service{}
126
+	case Pods:
127
+		expectedType = &kapi.Pod{}
128
+	case EgressNetworkPolicies:
129
+		expectedType = &osapi.EgressNetworkPolicy{}
130
+	default:
131
+		glog.Fatalf("Unknown resource %s during initialization of event queue", resourceName)
132
+	}
133
+
134
+	runEventQueueForResource(client, resourceName, expectedType, fields.Everything(), process)
135
+}
136
+
137
+func RunLocalPodsEventQueue(client kcache.Getter, nodeName string, process ProcessEventFunc) {
138
+	if nodeName == "" {
139
+		glog.Fatalf("LocalPods resource requires a node name")
140
+	}
141
+
142
+	runEventQueueForResource(client, Pods, &kapi.Pod{}, fields.Set{"spec.host": nodeName}.AsSelector(), process)
143
+}
... ...
@@ -357,7 +357,7 @@ func (plugin *OsdnNode) SetupSDN(localSubnetCIDR, clusterNetworkCIDR, servicesNe
357 357
 }
358 358
 
359 359
 func (plugin *OsdnNode) updateEgressNetworkPolicyFailureLabel(failure bool) error {
360
-	node, err := plugin.registry.kClient.Nodes().Get(plugin.hostName)
360
+	node, err := plugin.kClient.Nodes().Get(plugin.hostName)
361 361
 	if err != nil {
362 362
 		return err
363 363
 	}
... ...
@@ -374,12 +374,12 @@ func (plugin *OsdnNode) updateEgressNetworkPolicyFailureLabel(failure bool) erro
374 374
 		delete(node.Labels, EgressNetworkPolicyFailureLabel)
375 375
 	}
376 376
 
377
-	_, err = plugin.registry.kClient.Nodes().UpdateStatus(node)
377
+	_, err = plugin.kClient.Nodes().UpdateStatus(node)
378 378
 	return err
379 379
 }
380 380
 
381 381
 func (plugin *OsdnNode) SetupEgressNetworkPolicy() error {
382
-	policies, err := plugin.registry.GetEgressNetworkPolicies()
382
+	policies, err := plugin.osClient.EgressNetworkPolicies(kapi.NamespaceAll).List(kapi.ListOptions{})
383 383
 	if err != nil {
384 384
 		if kapierrs.IsForbidden(err) {
385 385
 			// 1.3 node running with 1.2-bootstrapped policies
... ...
@@ -398,7 +398,7 @@ func (plugin *OsdnNode) SetupEgressNetworkPolicy() error {
398 398
 		}
399 399
 	}
400 400
 
401
-	for _, policy := range policies {
401
+	for _, policy := range policies.Items {
402 402
 		vnid, err := plugin.vnids.GetVNID(policy.Namespace)
403 403
 		if err != nil {
404 404
 			glog.Warningf("Could not find netid for namespace %q: %v", policy.Namespace, err)
... ...
@@ -419,7 +419,7 @@ func (plugin *OsdnNode) SetupEgressNetworkPolicy() error {
419 419
 }
420 420
 
421 421
 func (plugin *OsdnNode) watchEgressNetworkPolicies() {
422
-	plugin.registry.RunEventQueue(EgressNetworkPolicies, func(delta cache.Delta) error {
422
+	RunEventQueue(plugin.osClient, EgressNetworkPolicies, func(delta cache.Delta) error {
423 423
 		policy := delta.Object.(*osapi.EgressNetworkPolicy)
424 424
 
425 425
 		vnid, err := plugin.vnids.GetVNID(policy.Namespace)
... ...
@@ -18,7 +18,8 @@ import (
18 18
 )
19 19
 
20 20
 type OsdnMaster struct {
21
-	registry        *Registry
21
+	kClient         *kclient.Client
22
+	osClient        *osclient.Client
22 23
 	networkInfo     *NetworkInfo
23 24
 	subnetAllocator *netutils.SubnetAllocator
24 25
 	vnids           *masterVNIDMap
... ...
@@ -32,7 +33,8 @@ func StartMaster(networkConfig osconfigapi.MasterNetworkConfig, osClient *osclie
32 32
 	log.Infof("Initializing SDN master of type %q", networkConfig.NetworkPluginName)
33 33
 
34 34
 	master := &OsdnMaster{
35
-		registry: newRegistry(osClient, kClient),
35
+		kClient:  kClient,
36
+		osClient: osClient,
36 37
 	}
37 38
 
38 39
 	var err error
... ...
@@ -43,7 +45,7 @@ func StartMaster(networkConfig osconfigapi.MasterNetworkConfig, osClient *osclie
43 43
 
44 44
 	createConfig := false
45 45
 	updateConfig := false
46
-	cn, err := master.registry.oClient.ClusterNetwork().Get(osapi.ClusterNetworkDefault)
46
+	cn, err := master.osClient.ClusterNetwork().Get(osapi.ClusterNetworkDefault)
47 47
 	if err == nil {
48 48
 		if master.networkInfo.ClusterNetwork.String() != cn.Network ||
49 49
 			networkConfig.HostSubnetLength != cn.HostSubnetLength ||
... ...
@@ -73,13 +75,13 @@ func StartMaster(networkConfig osconfigapi.MasterNetworkConfig, osClient *osclie
73 73
 	}
74 74
 
75 75
 	if createConfig {
76
-		cn, err := master.registry.oClient.ClusterNetwork().Create(cn)
76
+		cn, err := master.osClient.ClusterNetwork().Create(cn)
77 77
 		if err != nil {
78 78
 			return err
79 79
 		}
80 80
 		log.Infof("Created ClusterNetwork %s", clusterNetworkToString(cn))
81 81
 	} else if updateConfig {
82
-		cn, err := master.registry.oClient.ClusterNetwork().Update(cn)
82
+		cn, err := master.osClient.ClusterNetwork().Update(cn)
83 83
 		if err != nil {
84 84
 			return err
85 85
 		}
... ...
@@ -127,11 +129,11 @@ func (master *OsdnMaster) validateNetworkConfig() error {
127 127
 	}
128 128
 
129 129
 	// Ensure each host subnet is within the cluster network
130
-	subnets, err := master.registry.GetSubnets()
130
+	subnets, err := master.osClient.HostSubnets().List(kapi.ListOptions{})
131 131
 	if err != nil {
132 132
 		return fmt.Errorf("Error in initializing/fetching subnets: %v", err)
133 133
 	}
134
-	for _, sub := range subnets {
134
+	for _, sub := range subnets.Items {
135 135
 		subnetIP, _, _ := net.ParseCIDR(sub.Subnet)
136 136
 		if subnetIP == nil {
137 137
 			errList = append(errList, fmt.Errorf("Failed to parse network address: %s", sub.Subnet))
... ...
@@ -143,11 +145,11 @@ func (master *OsdnMaster) validateNetworkConfig() error {
143 143
 	}
144 144
 
145 145
 	// Ensure each service is within the services network
146
-	services, err := master.registry.GetServices()
146
+	services, err := master.kClient.Services(kapi.NamespaceAll).List(kapi.ListOptions{})
147 147
 	if err != nil {
148 148
 		return err
149 149
 	}
150
-	for _, svc := range services {
150
+	for _, svc := range services.Items {
151 151
 		if !ni.ServiceNetwork.Contains(net.ParseIP(svc.Spec.ClusterIP)) {
152 152
 			errList = append(errList, fmt.Errorf("Error: Existing service with IP: %s is not part of service network: %s", svc.Spec.ClusterIP, ni.ServiceNetwork.String()))
153 153
 		}
... ...
@@ -15,14 +15,17 @@ import (
15 15
 
16 16
 	kapi "k8s.io/kubernetes/pkg/api"
17 17
 	kclient "k8s.io/kubernetes/pkg/client/unversioned"
18
+	"k8s.io/kubernetes/pkg/fields"
18 19
 	kubeletTypes "k8s.io/kubernetes/pkg/kubelet/container"
20
+	"k8s.io/kubernetes/pkg/labels"
19 21
 	kexec "k8s.io/kubernetes/pkg/util/exec"
20 22
 	kubeutilnet "k8s.io/kubernetes/pkg/util/net"
21 23
 )
22 24
 
23 25
 type OsdnNode struct {
24 26
 	multitenant        bool
25
-	registry           *Registry
27
+	kClient            *kclient.Client
28
+	osClient           *osclient.Client
26 29
 	networkInfo        *NetworkInfo
27 30
 	localIP            string
28 31
 	localSubnet        *osapi.HostSubnet
... ...
@@ -66,7 +69,8 @@ func NewNodePlugin(pluginName string, osClient *osclient.Client, kClient *kclien
66 66
 
67 67
 	plugin := &OsdnNode{
68 68
 		multitenant:        IsOpenShiftMultitenantNetworkPlugin(pluginName),
69
-		registry:           newRegistry(osClient, kClient),
69
+		kClient:            kClient,
70
+		osClient:           osClient,
70 71
 		localIP:            selfIP,
71 72
 		hostName:           hostname,
72 73
 		vnids:              newNodeVNIDMap(),
... ...
@@ -80,7 +84,7 @@ func NewNodePlugin(pluginName string, osClient *osclient.Client, kClient *kclien
80 80
 
81 81
 func (node *OsdnNode) Start() error {
82 82
 	var err error
83
-	node.networkInfo, err = getNetworkInfo(node.registry.oClient)
83
+	node.networkInfo, err = getNetworkInfo(node.osClient)
84 84
 	if err != nil {
85 85
 		return fmt.Errorf("Failed to get network information: %v", err)
86 86
 	}
... ...
@@ -126,7 +130,24 @@ func (node *OsdnNode) Start() error {
126 126
 }
127 127
 
128 128
 func (node *OsdnNode) GetLocalPods(namespace string) ([]kapi.Pod, error) {
129
-	return node.registry.GetRunningPods(node.hostName, namespace)
129
+	fieldSelector := fields.Set{"spec.host": node.hostName}.AsSelector()
130
+	opts := kapi.ListOptions{
131
+		LabelSelector: labels.Everything(),
132
+		FieldSelector: fieldSelector,
133
+	}
134
+	podList, err := node.kClient.Pods(namespace).List(opts)
135
+	if err != nil {
136
+		return nil, err
137
+	}
138
+
139
+	// Filter running pods
140
+	pods := make([]kapi.Pod, 0, len(podList.Items))
141
+	for _, pod := range podList.Items {
142
+		if pod.Status.Phase == kapi.PodRunning {
143
+			pods = append(pods, pod)
144
+		}
145
+	}
146
+	return pods, nil
130 147
 }
131 148
 
132 149
 func (node *OsdnNode) markPodNetworkReady() {
... ...
@@ -11,8 +11,10 @@ import (
11 11
 	kapi "k8s.io/kubernetes/pkg/api"
12 12
 	"k8s.io/kubernetes/pkg/api/resource"
13 13
 	"k8s.io/kubernetes/pkg/apis/componentconfig"
14
+	"k8s.io/kubernetes/pkg/fields"
14 15
 	kubeletTypes "k8s.io/kubernetes/pkg/kubelet/container"
15 16
 	knetwork "k8s.io/kubernetes/pkg/kubelet/network"
17
+	"k8s.io/kubernetes/pkg/labels"
16 18
 	utilsets "k8s.io/kubernetes/pkg/util/sets"
17 19
 )
18 20
 
... ...
@@ -153,13 +155,32 @@ func getScriptError(output []byte) string {
153 153
 	return string(output)
154 154
 }
155 155
 
156
+func (plugin *OsdnNode) getPod(nodeName, namespace, podName string) (*kapi.Pod, error) {
157
+	fieldSelector := fields.Set{"spec.host": nodeName}.AsSelector()
158
+	opts := kapi.ListOptions{
159
+		LabelSelector: labels.Everything(),
160
+		FieldSelector: fieldSelector,
161
+	}
162
+	podList, err := plugin.kClient.Pods(namespace).List(opts)
163
+	if err != nil {
164
+		return nil, err
165
+	}
166
+
167
+	for _, pod := range podList.Items {
168
+		if pod.ObjectMeta.Name == podName {
169
+			return &pod, nil
170
+		}
171
+	}
172
+	return nil, nil
173
+}
174
+
156 175
 func (plugin *OsdnNode) SetUpPod(namespace string, name string, id kubeletTypes.ContainerID) error {
157 176
 	err := plugin.WaitForPodNetworkReady()
158 177
 	if err != nil {
159 178
 		return err
160 179
 	}
161 180
 
162
-	pod, err := plugin.registry.GetPod(plugin.hostName, namespace, name)
181
+	pod, err := plugin.getPod(plugin.hostName, namespace, name)
163 182
 	if err != nil {
164 183
 		return err
165 184
 	}
... ...
@@ -25,7 +25,8 @@ type proxyFirewallItem struct {
25 25
 }
26 26
 
27 27
 type ovsProxyPlugin struct {
28
-	registry             *Registry
28
+	kClient              *kclient.Client
29
+	osClient             *osclient.Client
29 30
 	networkInfo          *NetworkInfo
30 31
 	baseEndpointsHandler pconfig.EndpointsConfigHandler
31 32
 
... ...
@@ -41,7 +42,8 @@ func NewProxyPlugin(pluginName string, osClient *osclient.Client, kClient *kclie
41 41
 	}
42 42
 
43 43
 	return &ovsProxyPlugin{
44
-		registry: newRegistry(osClient, kClient),
44
+		kClient:  kClient,
45
+		osClient: osClient,
45 46
 		firewall: make(map[string][]proxyFirewallItem),
46 47
 	}, nil
47 48
 }
... ...
@@ -50,13 +52,13 @@ func (proxy *ovsProxyPlugin) Start(baseHandler pconfig.EndpointsConfigHandler) e
50 50
 	glog.Infof("Starting multitenant SDN proxy endpoint filter")
51 51
 
52 52
 	var err error
53
-	proxy.networkInfo, err = getNetworkInfo(proxy.registry.oClient)
53
+	proxy.networkInfo, err = getNetworkInfo(proxy.osClient)
54 54
 	if err != nil {
55 55
 		return fmt.Errorf("could not get network info: %s", err)
56 56
 	}
57 57
 	proxy.baseEndpointsHandler = baseHandler
58 58
 
59
-	policies, err := proxy.registry.GetEgressNetworkPolicies()
59
+	policies, err := proxy.osClient.EgressNetworkPolicies(kapi.NamespaceAll).List(kapi.ListOptions{})
60 60
 	if err != nil {
61 61
 		if kapierrs.IsForbidden(err) {
62 62
 			// controller.go will log an error about this
... ...
@@ -64,7 +66,7 @@ func (proxy *ovsProxyPlugin) Start(baseHandler pconfig.EndpointsConfigHandler) e
64 64
 		}
65 65
 		return fmt.Errorf("could not get EgressNetworkPolicies: %s", err)
66 66
 	}
67
-	for _, policy := range policies {
67
+	for _, policy := range policies.Items {
68 68
 		proxy.updateNetworkPolicy(policy)
69 69
 	}
70 70
 
... ...
@@ -73,7 +75,7 @@ func (proxy *ovsProxyPlugin) Start(baseHandler pconfig.EndpointsConfigHandler) e
73 73
 }
74 74
 
75 75
 func (proxy *ovsProxyPlugin) watchEgressNetworkPolicies() {
76
-	proxy.registry.RunEventQueue(EgressNetworkPolicies, func(delta cache.Delta) error {
76
+	RunEventQueue(proxy.osClient, EgressNetworkPolicies, func(delta cache.Delta) error {
77 77
 		policy := delta.Object.(*osapi.EgressNetworkPolicy)
78 78
 		if delta.Type == cache.Deleted {
79 79
 			policy.Spec.Egress = nil
80 80
deleted file mode 100644
... ...
@@ -1,228 +0,0 @@
1
-package plugin
2
-
3
-import (
4
-	"fmt"
5
-	"strings"
6
-	"time"
7
-
8
-	log "github.com/golang/glog"
9
-
10
-	kapi "k8s.io/kubernetes/pkg/api"
11
-	"k8s.io/kubernetes/pkg/api/unversioned"
12
-	"k8s.io/kubernetes/pkg/client/cache"
13
-	kclient "k8s.io/kubernetes/pkg/client/unversioned"
14
-	"k8s.io/kubernetes/pkg/fields"
15
-	"k8s.io/kubernetes/pkg/labels"
16
-
17
-	osclient "github.com/openshift/origin/pkg/client"
18
-	osapi "github.com/openshift/origin/pkg/sdn/api"
19
-)
20
-
21
-type Registry struct {
22
-	oClient *osclient.Client
23
-	kClient *kclient.Client
24
-}
25
-
26
-type ResourceName string
27
-
28
-const (
29
-	Nodes                 ResourceName = "Nodes"
30
-	Namespaces            ResourceName = "Namespaces"
31
-	NetNamespaces         ResourceName = "NetNamespaces"
32
-	Services              ResourceName = "Services"
33
-	HostSubnets           ResourceName = "HostSubnets"
34
-	Pods                  ResourceName = "Pods"
35
-	EgressNetworkPolicies ResourceName = "EgressNetworkPolicies"
36
-)
37
-
38
-func newRegistry(osClient *osclient.Client, kClient *kclient.Client) *Registry {
39
-	return &Registry{
40
-		oClient: osClient,
41
-		kClient: kClient,
42
-	}
43
-}
44
-
45
-func (registry *Registry) GetSubnets() ([]osapi.HostSubnet, error) {
46
-	hostSubnetList, err := registry.oClient.HostSubnets().List(kapi.ListOptions{})
47
-	if err != nil {
48
-		return nil, err
49
-	}
50
-	return hostSubnetList.Items, nil
51
-}
52
-
53
-func (registry *Registry) GetSubnet(nodeName string) (*osapi.HostSubnet, error) {
54
-	return registry.oClient.HostSubnets().Get(nodeName)
55
-}
56
-
57
-func (registry *Registry) DeleteSubnet(nodeName string) error {
58
-	return registry.oClient.HostSubnets().Delete(nodeName)
59
-}
60
-
61
-func (registry *Registry) CreateSubnet(nodeName, nodeIP, subnetCIDR string) (*osapi.HostSubnet, error) {
62
-	hs := &osapi.HostSubnet{
63
-		TypeMeta:   unversioned.TypeMeta{Kind: "HostSubnet"},
64
-		ObjectMeta: kapi.ObjectMeta{Name: nodeName},
65
-		Host:       nodeName,
66
-		HostIP:     nodeIP,
67
-		Subnet:     subnetCIDR,
68
-	}
69
-	return registry.oClient.HostSubnets().Create(hs)
70
-}
71
-
72
-func (registry *Registry) UpdateSubnet(hs *osapi.HostSubnet) (*osapi.HostSubnet, error) {
73
-	return registry.oClient.HostSubnets().Update(hs)
74
-}
75
-
76
-func (registry *Registry) GetRunningPods(nodeName, namespace string) ([]kapi.Pod, error) {
77
-	fieldSelector := fields.Set{"spec.host": nodeName}.AsSelector()
78
-	opts := kapi.ListOptions{
79
-		LabelSelector: labels.Everything(),
80
-		FieldSelector: fieldSelector,
81
-	}
82
-	podList, err := registry.kClient.Pods(namespace).List(opts)
83
-	if err != nil {
84
-		return nil, err
85
-	}
86
-
87
-	// Filter running pods
88
-	pods := make([]kapi.Pod, 0, len(podList.Items))
89
-	for _, pod := range podList.Items {
90
-		if pod.Status.Phase == kapi.PodRunning {
91
-			pods = append(pods, pod)
92
-		}
93
-	}
94
-	return pods, nil
95
-}
96
-
97
-func (registry *Registry) GetPod(nodeName, namespace, podName string) (*kapi.Pod, error) {
98
-	fieldSelector := fields.Set{"spec.host": nodeName}.AsSelector()
99
-	opts := kapi.ListOptions{
100
-		LabelSelector: labels.Everything(),
101
-		FieldSelector: fieldSelector,
102
-	}
103
-	podList, err := registry.kClient.Pods(namespace).List(opts)
104
-	if err != nil {
105
-		return nil, err
106
-	}
107
-
108
-	for _, pod := range podList.Items {
109
-		if pod.ObjectMeta.Name == podName {
110
-			return &pod, nil
111
-		}
112
-	}
113
-	return nil, nil
114
-}
115
-
116
-func (registry *Registry) GetNetNamespaces() ([]osapi.NetNamespace, error) {
117
-	netNamespaceList, err := registry.oClient.NetNamespaces().List(kapi.ListOptions{})
118
-	if err != nil {
119
-		return nil, err
120
-	}
121
-	return netNamespaceList.Items, nil
122
-}
123
-
124
-func (registry *Registry) GetNetNamespace(name string) (*osapi.NetNamespace, error) {
125
-	return registry.oClient.NetNamespaces().Get(name)
126
-}
127
-
128
-func (registry *Registry) CreateNetNamespace(name string, id uint32) error {
129
-	netns := &osapi.NetNamespace{
130
-		TypeMeta:   unversioned.TypeMeta{Kind: "NetNamespace"},
131
-		ObjectMeta: kapi.ObjectMeta{Name: name},
132
-		NetName:    name,
133
-		NetID:      id,
134
-	}
135
-	_, err := registry.oClient.NetNamespaces().Create(netns)
136
-	return err
137
-}
138
-
139
-func (registry *Registry) UpdateNetNamespace(netns *osapi.NetNamespace) (*osapi.NetNamespace, error) {
140
-	return registry.oClient.NetNamespaces().Update(netns)
141
-}
142
-
143
-func (registry *Registry) DeleteNetNamespace(name string) error {
144
-	return registry.oClient.NetNamespaces().Delete(name)
145
-}
146
-
147
-func (registry *Registry) GetServicesForNamespace(namespace string) ([]kapi.Service, error) {
148
-	return registry.getServices(namespace)
149
-}
150
-
151
-func (registry *Registry) GetServices() ([]kapi.Service, error) {
152
-	return registry.getServices(kapi.NamespaceAll)
153
-}
154
-
155
-func (registry *Registry) getServices(namespace string) ([]kapi.Service, error) {
156
-	kServList, err := registry.kClient.Services(namespace).List(kapi.ListOptions{})
157
-	if err != nil {
158
-		return nil, err
159
-	}
160
-
161
-	servList := make([]kapi.Service, 0, len(kServList.Items))
162
-	for _, service := range kServList.Items {
163
-		if !kapi.IsServiceIPSet(&service) {
164
-			continue
165
-		}
166
-		servList = append(servList, service)
167
-	}
168
-	return servList, nil
169
-}
170
-
171
-func (registry *Registry) GetEgressNetworkPolicies() ([]osapi.EgressNetworkPolicy, error) {
172
-	policyList, err := registry.oClient.EgressNetworkPolicies(kapi.NamespaceAll).List(kapi.ListOptions{})
173
-	if err != nil {
174
-		return nil, err
175
-	}
176
-	return policyList.Items, nil
177
-}
178
-
179
-// Run event queue for the given resource. The 'process' function is called
180
-// repeatedly with each available cache.Delta that describes state changes
181
-// to an object. If the process function returns an error queued changes
182
-// for that object are dropped but processing continues with the next available
183
-// object's cache.Deltas.  The error is logged with call stack information.
184
-func (registry *Registry) RunEventQueue(resourceName ResourceName, process ProcessEventFunc) {
185
-	var client cache.Getter
186
-	var expectedType interface{}
187
-
188
-	switch resourceName {
189
-	case HostSubnets:
190
-		expectedType = &osapi.HostSubnet{}
191
-		client = registry.oClient
192
-	case NetNamespaces:
193
-		expectedType = &osapi.NetNamespace{}
194
-		client = registry.oClient
195
-	case Nodes:
196
-		expectedType = &kapi.Node{}
197
-		client = registry.kClient
198
-	case Namespaces:
199
-		expectedType = &kapi.Namespace{}
200
-		client = registry.kClient
201
-	case Services:
202
-		expectedType = &kapi.Service{}
203
-		client = registry.kClient
204
-	case Pods:
205
-		expectedType = &kapi.Pod{}
206
-		client = registry.kClient
207
-	case EgressNetworkPolicies:
208
-		expectedType = &osapi.EgressNetworkPolicy{}
209
-		client = registry.oClient
210
-	default:
211
-		log.Fatalf("Unknown resource %s during initialization of event queue", resourceName)
212
-	}
213
-
214
-	lw := cache.NewListWatchFromClient(client, strings.ToLower(string(resourceName)), kapi.NamespaceAll, fields.Everything())
215
-	eventQueue := NewEventQueue(cache.MetaNamespaceKeyFunc)
216
-	// Repopulate event queue every 30 mins
217
-	// Existing items in the event queue will have watch.Modified event type
218
-	cache.NewReflector(lw, expectedType, eventQueue, 30*time.Minute).Run()
219
-
220
-	// Run the queue
221
-	for {
222
-		eventQueue.Pop(process)
223
-	}
224
-}
225
-
226
-func clusterNetworkToString(n *osapi.ClusterNetwork) string {
227
-	return fmt.Sprintf("%s (network: %q, hostSubnetBits: %d, serviceNetwork: %q, pluginName: %q)", n.Name, n.Network, n.HostSubnetLength, n.ServiceNetwork, n.PluginName)
228
-}
... ...
@@ -21,12 +21,12 @@ import (
21 21
 
22 22
 func (master *OsdnMaster) SubnetStartMaster(clusterNetwork *net.IPNet, hostSubnetLength uint32) error {
23 23
 	subrange := make([]string, 0)
24
-	subnets, err := master.registry.GetSubnets()
24
+	subnets, err := master.osClient.HostSubnets().List(kapi.ListOptions{})
25 25
 	if err != nil {
26 26
 		log.Errorf("Error in initializing/fetching subnets: %v", err)
27 27
 		return err
28 28
 	}
29
-	for _, sub := range subnets {
29
+	for _, sub := range subnets.Items {
30 30
 		subrange = append(subrange, sub.Subnet)
31 31
 		if err = master.networkInfo.validateNodeIP(sub.HostIP); err != nil {
32 32
 			// Don't error out; just warn so the error can be corrected with 'oc'
... ...
@@ -52,14 +52,14 @@ func (master *OsdnMaster) addNode(nodeName string, nodeIP string) error {
52 52
 	}
53 53
 
54 54
 	// Check if subnet needs to be created or updated
55
-	sub, err := master.registry.GetSubnet(nodeName)
55
+	sub, err := master.osClient.HostSubnets().Get(nodeName)
56 56
 	if err == nil {
57 57
 		if sub.HostIP == nodeIP {
58 58
 			return nil
59 59
 		} else {
60 60
 			// Node IP changed, update old subnet
61 61
 			sub.HostIP = nodeIP
62
-			sub, err = master.registry.UpdateSubnet(sub)
62
+			sub, err = master.osClient.HostSubnets().Update(sub)
63 63
 			if err != nil {
64 64
 				return fmt.Errorf("Error updating subnet %s for node %s: %v", sub.Subnet, nodeName, err)
65 65
 			}
... ...
@@ -74,7 +74,14 @@ func (master *OsdnMaster) addNode(nodeName string, nodeIP string) error {
74 74
 		return fmt.Errorf("Error allocating network for node %s: %v", nodeName, err)
75 75
 	}
76 76
 
77
-	sub, err = master.registry.CreateSubnet(nodeName, nodeIP, sn.String())
77
+	sub = &osapi.HostSubnet{
78
+		TypeMeta:   kapiunversioned.TypeMeta{Kind: "HostSubnet"},
79
+		ObjectMeta: kapi.ObjectMeta{Name: nodeName},
80
+		Host:       nodeName,
81
+		HostIP:     nodeIP,
82
+		Subnet:     sn.String(),
83
+	}
84
+	sub, err = master.osClient.HostSubnets().Create(sub)
78 85
 	if err != nil {
79 86
 		master.subnetAllocator.ReleaseNetwork(sn)
80 87
 		return fmt.Errorf("Error creating subnet %s for node %s: %v", sn.String(), nodeName, err)
... ...
@@ -84,7 +91,7 @@ func (master *OsdnMaster) addNode(nodeName string, nodeIP string) error {
84 84
 }
85 85
 
86 86
 func (master *OsdnMaster) deleteNode(nodeName string) error {
87
-	sub, err := master.registry.GetSubnet(nodeName)
87
+	sub, err := master.osClient.HostSubnets().Get(nodeName)
88 88
 	if err != nil {
89 89
 		return fmt.Errorf("Error fetching subnet for node %q for deletion: %v", nodeName, err)
90 90
 	}
... ...
@@ -93,7 +100,7 @@ func (master *OsdnMaster) deleteNode(nodeName string) error {
93 93
 		return fmt.Errorf("Error parsing subnet %q for node %q for deletion: %v", sub.Subnet, nodeName, err)
94 94
 	}
95 95
 	master.subnetAllocator.ReleaseNetwork(ipnet)
96
-	err = master.registry.DeleteSubnet(nodeName)
96
+	err = master.osClient.HostSubnets().Delete(nodeName)
97 97
 	if err != nil {
98 98
 		return fmt.Errorf("Error deleting subnet %v for node %q: %v", sub, nodeName, err)
99 99
 	}
... ...
@@ -123,7 +130,7 @@ func (master *OsdnMaster) clearInitialNodeNetworkUnavailableCondition(node *kapi
123 123
 		var err error
124 124
 
125 125
 		if knode != node {
126
-			knode, err = master.registry.kClient.Nodes().Get(node.ObjectMeta.Name)
126
+			knode, err = master.kClient.Nodes().Get(node.ObjectMeta.Name)
127 127
 			if err != nil {
128 128
 				return err
129 129
 			}
... ...
@@ -136,7 +143,7 @@ func (master *OsdnMaster) clearInitialNodeNetworkUnavailableCondition(node *kapi
136 136
 			condition.Reason = "RouteCreated"
137 137
 			condition.Message = "openshift-sdn cleared kubelet-set NoRouteCreated"
138 138
 			condition.LastTransitionTime = kapiunversioned.Now()
139
-			knode, err = master.registry.kClient.Nodes().UpdateStatus(knode)
139
+			knode, err = master.kClient.Nodes().UpdateStatus(knode)
140 140
 			if err == nil {
141 141
 				cleared = true
142 142
 			}
... ...
@@ -152,7 +159,7 @@ func (master *OsdnMaster) clearInitialNodeNetworkUnavailableCondition(node *kapi
152 152
 
153 153
 func (master *OsdnMaster) watchNodes() {
154 154
 	nodeAddressMap := map[types.UID]string{}
155
-	master.registry.RunEventQueue(Nodes, func(delta cache.Delta) error {
155
+	RunEventQueue(master.kClient, Nodes, func(delta cache.Delta) error {
156 156
 		node := delta.Object.(*kapi.Node)
157 157
 		name := node.ObjectMeta.Name
158 158
 		uid := node.ObjectMeta.UID
... ...
@@ -215,7 +222,7 @@ func (node *OsdnNode) initSelfSubnet() error {
215 215
 	// Try every retryInterval and bail-out if it exceeds max retries
216 216
 	for i := 0; i < retries; i++ {
217 217
 		// Get subnet for current node
218
-		subnet, err = node.registry.GetSubnet(node.hostName)
218
+		subnet, err = node.osClient.HostSubnets().Get(node.hostName)
219 219
 		if err == nil {
220 220
 			break
221 221
 		}
... ...
@@ -238,7 +245,7 @@ func (node *OsdnNode) initSelfSubnet() error {
238 238
 // Only run on the nodes
239 239
 func (node *OsdnNode) watchSubnets() {
240 240
 	subnets := make(map[string]*osapi.HostSubnet)
241
-	node.registry.RunEventQueue(HostSubnets, func(delta cache.Delta) error {
241
+	RunEventQueue(node.osClient, HostSubnets, func(delta cache.Delta) error {
242 242
 		hs := delta.Object.(*osapi.HostSubnet)
243 243
 		if hs.HostIP == node.localIP {
244 244
 			return nil
... ...
@@ -7,10 +7,12 @@ import (
7 7
 	log "github.com/golang/glog"
8 8
 
9 9
 	kapi "k8s.io/kubernetes/pkg/api"
10
+	"k8s.io/kubernetes/pkg/api/unversioned"
10 11
 	"k8s.io/kubernetes/pkg/client/cache"
11 12
 	"k8s.io/kubernetes/pkg/util/sets"
12 13
 	utilwait "k8s.io/kubernetes/pkg/util/wait"
13 14
 
15
+	osclient "github.com/openshift/origin/pkg/client"
14 16
 	osapi "github.com/openshift/origin/pkg/sdn/api"
15 17
 	pnetid "github.com/openshift/origin/pkg/sdn/plugin/netid"
16 18
 )
... ...
@@ -69,13 +71,13 @@ func (vmap *masterVNIDMap) isAdminNamespace(nsName string) bool {
69 69
 	return false
70 70
 }
71 71
 
72
-func (vmap *masterVNIDMap) populateVNIDs(registry *Registry) error {
73
-	netnsList, err := registry.GetNetNamespaces()
72
+func (vmap *masterVNIDMap) populateVNIDs(osClient *osclient.Client) error {
73
+	netnsList, err := osClient.NetNamespaces().List(kapi.ListOptions{})
74 74
 	if err != nil {
75 75
 		return err
76 76
 	}
77 77
 
78
-	for _, netns := range netnsList {
78
+	for _, netns := range netnsList.Items {
79 79
 		vmap.setVNID(netns.NetName, netns.NetID)
80 80
 
81 81
 		// Skip GlobalVNID, not part of netID allocation range
... ...
@@ -194,7 +196,7 @@ func (vmap *masterVNIDMap) updateNetID(nsName string, action osapi.PodNetworkAct
194 194
 }
195 195
 
196 196
 // assignVNID, revokeVNID and updateVNID methods updates in-memory structs and persists etcd objects
197
-func (vmap *masterVNIDMap) assignVNID(registry *Registry, nsName string) error {
197
+func (vmap *masterVNIDMap) assignVNID(osClient *osclient.Client, nsName string) error {
198 198
 	vmap.lock.Lock()
199 199
 	defer vmap.lock.Unlock()
200 200
 
... ...
@@ -205,7 +207,13 @@ func (vmap *masterVNIDMap) assignVNID(registry *Registry, nsName string) error {
205 205
 
206 206
 	if !exists {
207 207
 		// Create NetNamespace Object and update vnid map
208
-		err = registry.CreateNetNamespace(nsName, netid)
208
+		netns := &osapi.NetNamespace{
209
+			TypeMeta:   unversioned.TypeMeta{Kind: "NetNamespace"},
210
+			ObjectMeta: kapi.ObjectMeta{Name: nsName},
211
+			NetName:    nsName,
212
+			NetID:      netid,
213
+		}
214
+		_, err := osClient.NetNamespaces().Create(netns)
209 215
 		if err != nil {
210 216
 			vmap.releaseNetID(nsName)
211 217
 			return err
... ...
@@ -214,12 +222,12 @@ func (vmap *masterVNIDMap) assignVNID(registry *Registry, nsName string) error {
214 214
 	return nil
215 215
 }
216 216
 
217
-func (vmap *masterVNIDMap) revokeVNID(registry *Registry, nsName string) error {
217
+func (vmap *masterVNIDMap) revokeVNID(osClient *osclient.Client, nsName string) error {
218 218
 	vmap.lock.Lock()
219 219
 	defer vmap.lock.Unlock()
220 220
 
221 221
 	// Delete NetNamespace object
222
-	if err := registry.DeleteNetNamespace(nsName); err != nil {
222
+	if err := osClient.NetNamespaces().Delete(nsName); err != nil {
223 223
 		return err
224 224
 	}
225 225
 
... ...
@@ -229,7 +237,7 @@ func (vmap *masterVNIDMap) revokeVNID(registry *Registry, nsName string) error {
229 229
 	return nil
230 230
 }
231 231
 
232
-func (vmap *masterVNIDMap) updateVNID(registry *Registry, netns *osapi.NetNamespace) error {
232
+func (vmap *masterVNIDMap) updateVNID(osClient *osclient.Client, netns *osapi.NetNamespace) error {
233 233
 	action, args, err := osapi.GetChangePodNetworkAnnotation(netns)
234 234
 	if err == osapi.ErrorPodNetworkAnnotationNotFound {
235 235
 		// Nothing to update
... ...
@@ -246,7 +254,7 @@ func (vmap *masterVNIDMap) updateVNID(registry *Registry, netns *osapi.NetNamesp
246 246
 	netns.NetID = netid
247 247
 	osapi.DeleteChangePodNetworkAnnotation(netns)
248 248
 
249
-	if _, err := registry.UpdateNetNamespace(netns); err != nil {
249
+	if _, err := osClient.NetNamespaces().Update(netns); err != nil {
250 250
 		return err
251 251
 	}
252 252
 	return nil
... ...
@@ -255,7 +263,7 @@ func (vmap *masterVNIDMap) updateVNID(registry *Registry, netns *osapi.NetNamesp
255 255
 //--------------------- Master methods ----------------------
256 256
 
257 257
 func (master *OsdnMaster) VnidStartMaster() error {
258
-	err := master.vnids.populateVNIDs(master.registry)
258
+	err := master.vnids.populateVNIDs(master.osClient)
259 259
 	if err != nil {
260 260
 		return err
261 261
 	}
... ...
@@ -266,20 +274,18 @@ func (master *OsdnMaster) VnidStartMaster() error {
266 266
 }
267 267
 
268 268
 func (master *OsdnMaster) watchNamespaces() {
269
-	registry := master.registry
270
-
271
-	registry.RunEventQueue(Namespaces, func(delta cache.Delta) error {
269
+	RunEventQueue(master.kClient, Namespaces, func(delta cache.Delta) error {
272 270
 		ns := delta.Object.(*kapi.Namespace)
273 271
 		name := ns.ObjectMeta.Name
274 272
 
275 273
 		log.V(5).Infof("Watch %s event for Namespace %q", delta.Type, name)
276 274
 		switch delta.Type {
277 275
 		case cache.Sync, cache.Added, cache.Updated:
278
-			if err := master.vnids.assignVNID(registry, name); err != nil {
276
+			if err := master.vnids.assignVNID(master.osClient, name); err != nil {
279 277
 				return fmt.Errorf("Error assigning netid: %v", err)
280 278
 			}
281 279
 		case cache.Deleted:
282
-			if err := master.vnids.revokeVNID(registry, name); err != nil {
280
+			if err := master.vnids.revokeVNID(master.osClient, name); err != nil {
283 281
 				return fmt.Errorf("Error revoking netid: %v", err)
284 282
 			}
285 283
 		}
... ...
@@ -288,16 +294,14 @@ func (master *OsdnMaster) watchNamespaces() {
288 288
 }
289 289
 
290 290
 func (master *OsdnMaster) watchNetNamespaces() {
291
-	registry := master.registry
292
-
293
-	registry.RunEventQueue(NetNamespaces, func(delta cache.Delta) error {
291
+	RunEventQueue(master.osClient, NetNamespaces, func(delta cache.Delta) error {
294 292
 		netns := delta.Object.(*osapi.NetNamespace)
295 293
 		name := netns.ObjectMeta.Name
296 294
 
297 295
 		log.V(5).Infof("Watch %s event for NetNamespace %q", delta.Type, name)
298 296
 		switch delta.Type {
299 297
 		case cache.Sync, cache.Added, cache.Updated:
300
-			err := master.vnids.updateVNID(registry, netns)
298
+			err := master.vnids.updateVNID(master.osClient, netns)
301 299
 			if err != nil {
302 300
 				return fmt.Errorf("Error updating netid: %v", err)
303 301
 			}
... ...
@@ -14,6 +14,7 @@ import (
14 14
 	"k8s.io/kubernetes/pkg/util/sets"
15 15
 	utilwait "k8s.io/kubernetes/pkg/util/wait"
16 16
 
17
+	osclient "github.com/openshift/origin/pkg/client"
17 18
 	osapi "github.com/openshift/origin/pkg/sdn/api"
18 19
 )
19 20
 
... ...
@@ -117,13 +118,13 @@ func (vmap *nodeVNIDMap) unsetVNID(name string) (id uint32, err error) {
117 117
 	return id, nil
118 118
 }
119 119
 
120
-func (vmap *nodeVNIDMap) populateVNIDs(registry *Registry) error {
121
-	nets, err := registry.GetNetNamespaces()
120
+func (vmap *nodeVNIDMap) populateVNIDs(osClient *osclient.Client) error {
121
+	nets, err := osClient.NetNamespaces().List(kapi.ListOptions{})
122 122
 	if err != nil {
123 123
 		return err
124 124
 	}
125 125
 
126
-	for _, net := range nets {
126
+	for _, net := range nets.Items {
127 127
 		vmap.setVNID(net.Name, net.NetID)
128 128
 	}
129 129
 	return nil
... ...
@@ -133,7 +134,7 @@ func (vmap *nodeVNIDMap) populateVNIDs(registry *Registry) error {
133 133
 
134 134
 func (node *OsdnNode) VnidStartNode() error {
135 135
 	// Populate vnid map synchronously so that existing services can fetch vnid
136
-	err := node.vnids.populateVNIDs(node.registry)
136
+	err := node.vnids.populateVNIDs(node.osClient)
137 137
 	if err != nil {
138 138
 		return err
139 139
 	}
... ...
@@ -152,7 +153,7 @@ func (node *OsdnNode) updatePodNetwork(namespace string, oldNetID, netID uint32)
152 152
 	if err != nil {
153 153
 		return err
154 154
 	}
155
-	services, err := node.registry.GetServicesForNamespace(namespace)
155
+	services, err := node.kClient.Services(namespace).List(kapi.ListOptions{})
156 156
 	if err != nil {
157 157
 		return err
158 158
 	}
... ...
@@ -168,7 +169,11 @@ func (node *OsdnNode) updatePodNetwork(namespace string, oldNetID, netID uint32)
168 168
 	}
169 169
 
170 170
 	// Update OF rules for the old services in the namespace
171
-	for _, svc := range services {
171
+	for _, svc := range services.Items {
172
+		if !kapi.IsServiceIPSet(&svc) {
173
+			continue
174
+		}
175
+
172 176
 		if err = node.DeleteServiceRules(&svc); err != nil {
173 177
 			log.Error(err)
174 178
 		}
... ...
@@ -186,7 +191,7 @@ func (node *OsdnNode) updatePodNetwork(namespace string, oldNetID, netID uint32)
186 186
 }
187 187
 
188 188
 func (node *OsdnNode) watchNetNamespaces() {
189
-	node.registry.RunEventQueue(NetNamespaces, func(delta cache.Delta) error {
189
+	RunEventQueue(node.osClient, NetNamespaces, func(delta cache.Delta) error {
190 190
 		netns := delta.Object.(*osapi.NetNamespace)
191 191
 
192 192
 		log.V(5).Infof("Watch %s event for NetNamespace %q", delta.Type, netns.ObjectMeta.Name)
... ...
@@ -231,7 +236,7 @@ func isServiceChanged(oldsvc, newsvc *kapi.Service) bool {
231 231
 
232 232
 func (node *OsdnNode) watchServices() {
233 233
 	services := make(map[string]*kapi.Service)
234
-	node.registry.RunEventQueue(Services, func(delta cache.Delta) error {
234
+	RunEventQueue(node.kClient, Services, func(delta cache.Delta) error {
235 235
 		serv := delta.Object.(*kapi.Service)
236 236
 
237 237
 		// Ignore headless services