package plugin import ( "fmt" "sync" "time" log "github.com/golang/glog" kapi "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/cache" kerrors "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/sets" utilwait "k8s.io/kubernetes/pkg/util/wait" osclient "github.com/openshift/origin/pkg/client" osapi "github.com/openshift/origin/pkg/sdn/api" ) type nodeVNIDMap struct { // Synchronizes add or remove ids/namespaces lock sync.Mutex ids map[string]uint32 namespaces map[uint32]sets.String } func newNodeVNIDMap() *nodeVNIDMap { return &nodeVNIDMap{ ids: make(map[string]uint32), namespaces: make(map[uint32]sets.String), } } func (vmap *nodeVNIDMap) addNamespaceToSet(name string, vnid uint32) { set, found := vmap.namespaces[vnid] if !found { set = sets.NewString() vmap.namespaces[vnid] = set } set.Insert(name) } func (vmap *nodeVNIDMap) removeNamespaceFromSet(name string, vnid uint32) { if set, found := vmap.namespaces[vnid]; found { set.Delete(name) if set.Len() == 0 { delete(vmap.namespaces, vnid) } } } func (vmap *nodeVNIDMap) GetNamespaces(id uint32) []string { vmap.lock.Lock() defer vmap.lock.Unlock() if set, ok := vmap.namespaces[id]; ok { return set.List() } else { return nil } } func (vmap *nodeVNIDMap) GetVNID(name string) (uint32, error) { vmap.lock.Lock() defer vmap.lock.Unlock() if id, ok := vmap.ids[name]; ok { return id, nil } return 0, fmt.Errorf("Failed to find netid for namespace: %s in vnid map", name) } // Nodes asynchronously watch for both NetNamespaces and services // NetNamespaces populates vnid map and services/pod-setup depend on vnid map // If for some reason, vnid map propagation from master to node is slow // and if service/pod-setup tries to lookup vnid map then it may fail. // So, use this method to alleviate this problem. This method will // retry vnid lookup before giving up. func (vmap *nodeVNIDMap) WaitAndGetVNID(name string) (uint32, error) { var id uint32 backoff := utilwait.Backoff{ Duration: 100 * time.Millisecond, Factor: 1.5, Steps: 5, } err := utilwait.ExponentialBackoff(backoff, func() (bool, error) { var err error id, err = vmap.GetVNID(name) return err == nil, nil }) if err == nil { return id, nil } else { return 0, fmt.Errorf("Failed to find netid for namespace: %s in vnid map", name) } } func (vmap *nodeVNIDMap) setVNID(name string, id uint32) { vmap.lock.Lock() defer vmap.lock.Unlock() if oldId, found := vmap.ids[name]; found { vmap.removeNamespaceFromSet(name, oldId) } vmap.ids[name] = id vmap.addNamespaceToSet(name, id) log.Infof("Associate netid %d to namespace %q", id, name) } func (vmap *nodeVNIDMap) unsetVNID(name string) (id uint32, err error) { vmap.lock.Lock() defer vmap.lock.Unlock() id, found := vmap.ids[name] if !found { return 0, fmt.Errorf("Failed to find netid for namespace: %s in vnid map", name) } vmap.removeNamespaceFromSet(name, id) delete(vmap.ids, name) log.Infof("Dissociate netid %d from namespace %q", id, name) return id, nil } func (vmap *nodeVNIDMap) populateVNIDs(osClient *osclient.Client) error { nets, err := osClient.NetNamespaces().List(kapi.ListOptions{}) if err != nil { return err } for _, net := range nets.Items { vmap.setVNID(net.Name, net.NetID) } return nil } //------------------ Node Methods -------------------- func (node *OsdnNode) VnidStartNode() error { // Populate vnid map synchronously so that existing services can fetch vnid err := node.vnids.populateVNIDs(node.osClient) if err != nil { return err } go utilwait.Forever(node.watchNetNamespaces, 0) go utilwait.Forever(node.watchServices, 0) return nil } func (node *OsdnNode) updatePodNetwork(namespace string, oldNetID, netID uint32) error { // FIXME: this is racy; traffic coming from the pods gets switched to the new // VNID before the service and firewall rules are updated to match. We need // to do the updates as a single transaction (ovs-ofctl --bundle). pods, err := node.GetLocalPods(namespace) if err != nil { return err } services, err := node.kClient.Services(namespace).List(kapi.ListOptions{}) if err != nil { return err } errList := []error{} // Update OF rules for the existing/old pods in the namespace for _, pod := range pods { err = node.UpdatePod(pod) if err != nil { errList = append(errList, err) } } // Update OF rules for the old services in the namespace for _, svc := range services.Items { if !kapi.IsServiceIPSet(&svc) { continue } if err = node.DeleteServiceRules(&svc); err != nil { log.Error(err) } if err = node.AddServiceRules(&svc, netID); err != nil { errList = append(errList, err) } } // Update namespace references in egress firewall rules if err = node.UpdateEgressNetworkPolicyVNID(namespace, oldNetID, netID); err != nil { errList = append(errList, err) } return kerrors.NewAggregate(errList) } func (node *OsdnNode) watchNetNamespaces() { RunEventQueue(node.osClient, NetNamespaces, func(delta cache.Delta) error { netns := delta.Object.(*osapi.NetNamespace) log.V(5).Infof("Watch %s event for NetNamespace %q", delta.Type, netns.ObjectMeta.Name) switch delta.Type { case cache.Sync, cache.Added, cache.Updated: // Skip this event if the old and new network ids are same oldNetID, err := node.vnids.GetVNID(netns.NetName) if (err == nil) && (oldNetID == netns.NetID) { break } node.vnids.setVNID(netns.NetName, netns.NetID) err = node.updatePodNetwork(netns.NetName, oldNetID, netns.NetID) if err != nil { node.vnids.setVNID(netns.NetName, oldNetID) return fmt.Errorf("failed to update pod network for namespace '%s', error: %s", netns.NetName, err) } case cache.Deleted: // updatePodNetwork needs vnid, so unset vnid after this call err := node.updatePodNetwork(netns.NetName, netns.NetID, osapi.GlobalVNID) if err != nil { return fmt.Errorf("failed to update pod network for namespace '%s', error: %s", netns.NetName, err) } node.vnids.unsetVNID(netns.NetName) } return nil }) } func isServiceChanged(oldsvc, newsvc *kapi.Service) bool { if len(oldsvc.Spec.Ports) == len(newsvc.Spec.Ports) { for i := range oldsvc.Spec.Ports { if oldsvc.Spec.Ports[i].Protocol != newsvc.Spec.Ports[i].Protocol || oldsvc.Spec.Ports[i].Port != newsvc.Spec.Ports[i].Port { return true } } return false } return true } func (node *OsdnNode) watchServices() { services := make(map[string]*kapi.Service) RunEventQueue(node.kClient.CoreClient, Services, func(delta cache.Delta) error { serv := delta.Object.(*kapi.Service) // Ignore headless services if !kapi.IsServiceIPSet(serv) { return nil } log.V(5).Infof("Watch %s event for Service %q", delta.Type, serv.ObjectMeta.Name) switch delta.Type { case cache.Sync, cache.Added, cache.Updated: oldsvc, exists := services[string(serv.UID)] if exists { if !isServiceChanged(oldsvc, serv) { break } if err := node.DeleteServiceRules(oldsvc); err != nil { log.Error(err) } } netid, err := node.vnids.WaitAndGetVNID(serv.Namespace) if err != nil { return fmt.Errorf("skipped adding service rules for serviceEvent: %v, Error: %v", delta.Type, err) } if err = node.AddServiceRules(serv, netid); err != nil { return err } services[string(serv.UID)] = serv case cache.Deleted: delete(services, string(serv.UID)) if err := node.DeleteServiceRules(serv); err != nil { return err } } return nil }) }