package plugin import ( "sync" "github.com/golang/glog" kapi "k8s.io/kubernetes/pkg/api" osapi "github.com/openshift/origin/pkg/sdn/api" ) type multiTenantPlugin struct { node *OsdnNode vnids *nodeVNIDMap vnidRefsLock sync.Mutex vnidRefs map[uint32]int } func NewMultiTenantPlugin() osdnPolicy { return &multiTenantPlugin{ vnidRefs: make(map[uint32]int), } } func (mp *multiTenantPlugin) Name() string { return osapi.MultiTenantPluginName } func (mp *multiTenantPlugin) Start(node *OsdnNode) error { mp.node = node mp.vnids = newNodeVNIDMap(mp, node.osClient) if err := mp.vnids.Start(); err != nil { return err } otx := node.ovs.NewTransaction() otx.AddFlow("table=80, priority=200, reg0=0, actions=output:NXM_NX_REG2[]") otx.AddFlow("table=80, priority=200, reg1=0, actions=output:NXM_NX_REG2[]") if err := otx.EndTransaction(); err != nil { return err } if err := mp.node.SetupEgressNetworkPolicy(); err != nil { return err } return nil } func (mp *multiTenantPlugin) updatePodNetwork(namespace string, oldNetID, netID uint32) { // FIXME: this is racy; traffic coming from the pods gets switched to the new // VNID before the service and firewall rules are updated to match. We need // to do the updates as a single transaction (ovs-ofctl --bundle). pods, err := mp.node.GetLocalPods(namespace) if err != nil { glog.Errorf("Could not get list of local pods in namespace %q: %v", namespace, err) } services, err := mp.node.kClient.Core().Services(namespace).List(kapi.ListOptions{}) if err != nil { glog.Errorf("Could not get list of services in namespace %q: %v", namespace, err) services = &kapi.ServiceList{} } movedVNIDRefs := 0 // Update OF rules for the existing/old pods in the namespace for _, pod := range pods { err = mp.node.UpdatePod(pod) if err == nil { movedVNIDRefs++ } else { glog.Errorf("Could not update pod %q in namespace %q: %v", pod.Name, namespace, err) } } // Update OF rules for the old services in the namespace for _, svc := range services.Items { if !kapi.IsServiceIPSet(&svc) { continue } mp.node.DeleteServiceRules(&svc) mp.node.AddServiceRules(&svc, netID) movedVNIDRefs++ } if movedVNIDRefs > 0 { mp.moveVNIDRefs(movedVNIDRefs, oldNetID, netID) } // Update namespace references in egress firewall rules mp.node.UpdateEgressNetworkPolicyVNID(namespace, oldNetID, netID) } func (mp *multiTenantPlugin) AddNetNamespace(netns *osapi.NetNamespace) { mp.updatePodNetwork(netns.Name, 0, netns.NetID) } func (mp *multiTenantPlugin) UpdateNetNamespace(netns *osapi.NetNamespace, oldNetID uint32) { mp.updatePodNetwork(netns.Name, oldNetID, netns.NetID) } func (mp *multiTenantPlugin) DeleteNetNamespace(netns *osapi.NetNamespace) { mp.updatePodNetwork(netns.Name, netns.NetID, 0) } func (mp *multiTenantPlugin) GetVNID(namespace string) (uint32, error) { return mp.vnids.WaitAndGetVNID(namespace) } func (mp *multiTenantPlugin) GetNamespaces(vnid uint32) []string { return mp.vnids.GetNamespaces(vnid) } func (mp *multiTenantPlugin) RefVNID(vnid uint32) { if vnid == 0 { return } mp.vnidRefsLock.Lock() defer mp.vnidRefsLock.Unlock() mp.vnidRefs[vnid] += 1 if mp.vnidRefs[vnid] > 1 { return } glog.V(5).Infof("RefVNID %d adding rule", vnid) otx := mp.node.ovs.NewTransaction() otx.AddFlow("table=80, priority=100, reg0=%d, reg1=%d, actions=output:NXM_NX_REG2[]", vnid, vnid) if err := otx.EndTransaction(); err != nil { glog.Errorf("Error adding OVS flow for VNID: %v", err) } } func (mp *multiTenantPlugin) UnrefVNID(vnid uint32) { if vnid == 0 { return } mp.vnidRefsLock.Lock() defer mp.vnidRefsLock.Unlock() if mp.vnidRefs[vnid] == 0 { glog.Warningf("refcounting error on vnid %d", vnid) return } mp.vnidRefs[vnid] -= 1 if mp.vnidRefs[vnid] > 0 { return } glog.V(5).Infof("UnrefVNID %d removing rule", vnid) otx := mp.node.ovs.NewTransaction() otx.DeleteFlows("table=80, reg0=%d, reg1=%d", vnid, vnid) if err := otx.EndTransaction(); err != nil { glog.Errorf("Error deleting OVS flow for VNID: %v", err) } } func (mp *multiTenantPlugin) moveVNIDRefs(num int, oldVNID, newVNID uint32) { glog.V(5).Infof("moveVNIDRefs %d -> %d", oldVNID, newVNID) mp.vnidRefsLock.Lock() defer mp.vnidRefsLock.Unlock() otx := mp.node.ovs.NewTransaction() if mp.vnidRefs[oldVNID] <= num { otx.DeleteFlows("table=80, reg0=%d, reg1=%d", oldVNID, oldVNID) } if mp.vnidRefs[newVNID] == 0 { otx.AddFlow("table=80, priority=100, reg0=%d, reg1=%d, actions=output:NXM_NX_REG2[]", newVNID, newVNID) } err := otx.EndTransaction() if err != nil { glog.Errorf("Error modifying OVS flows for VNID: %v", err) } mp.vnidRefs[oldVNID] -= num if mp.vnidRefs[oldVNID] < 0 { glog.Warningf("refcounting error on vnid %d", oldVNID) mp.vnidRefs[oldVNID] = 0 } mp.vnidRefs[newVNID] += num }