package plugin import ( "fmt" "net" osexec "os/exec" "strings" "time" log "github.com/golang/glog" "github.com/openshift/origin/pkg/sdn/plugin/cniserver" osclient "github.com/openshift/origin/pkg/client" osapi "github.com/openshift/origin/pkg/sdn/api" "github.com/openshift/origin/pkg/util/ipcmd" "github.com/openshift/origin/pkg/util/netutils" "github.com/openshift/origin/pkg/util/ovs" docker "github.com/fsouza/go-dockerclient" kapi "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/cache" kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/fields" knetwork "k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/labels" kexec "k8s.io/kubernetes/pkg/util/exec" kubeutilnet "k8s.io/kubernetes/pkg/util/net" kwait "k8s.io/kubernetes/pkg/util/wait" ) type osdnPolicy interface { Name() string Start(node *OsdnNode) error AddNetNamespace(netns *osapi.NetNamespace) UpdateNetNamespace(netns *osapi.NetNamespace, oldNetID uint32) DeleteNetNamespace(netns *osapi.NetNamespace) GetVNID(namespace string) (uint32, error) GetNamespaces(vnid uint32) []string RefVNID(vnid uint32) UnrefVNID(vnid uint32) } type OsdnNode struct { policy osdnPolicy kClient *kclientset.Clientset osClient *osclient.Client ovs *ovs.Interface networkInfo *NetworkInfo podManager *podManager localSubnetCIDR string localIP string hostName string podNetworkReady chan struct{} kubeletInitReady chan struct{} iptablesSyncPeriod time.Duration mtu uint32 egressPolicies map[uint32][]osapi.EgressNetworkPolicy host knetwork.Host kubeletCniPlugin knetwork.NetworkPlugin clearLbr0IptablesRule bool } // Called by higher layers to create the plugin SDN node instance func NewNodePlugin(pluginName string, osClient *osclient.Client, kClient *kclientset.Clientset, hostname string, selfIP string, iptablesSyncPeriod time.Duration, mtu uint32) (*OsdnNode, error) { var policy osdnPolicy switch strings.ToLower(pluginName) { case osapi.SingleTenantPluginName: policy = NewSingleTenantPlugin() case osapi.MultiTenantPluginName: policy = NewMultiTenantPlugin() default: // Not an OpenShift plugin return nil, nil } log.Infof("Initializing SDN node of type %q with configured hostname %q (IP %q), iptables sync period %q", pluginName, hostname, selfIP, iptablesSyncPeriod.String()) if hostname == "" { output, err := kexec.New().Command("uname", "-n").CombinedOutput() if err != nil { return nil, err } hostname = strings.TrimSpace(string(output)) log.Infof("Resolved hostname to %q", hostname) } if selfIP == "" { var err error selfIP, err = netutils.GetNodeIP(hostname) if err != nil { log.V(5).Infof("Failed to determine node address from hostname %s; using default interface (%v)", hostname, err) var defaultIP net.IP defaultIP, err = kubeutilnet.ChooseHostInterface() if err != nil { return nil, err } selfIP = defaultIP.String() log.Infof("Resolved IP address to %q", selfIP) } } ovsif, err := ovs.New(kexec.New(), BR) if err != nil { return nil, err } plugin := &OsdnNode{ policy: policy, kClient: kClient, osClient: osClient, ovs: ovsif, localIP: selfIP, hostName: hostname, podNetworkReady: make(chan struct{}), kubeletInitReady: make(chan struct{}), iptablesSyncPeriod: iptablesSyncPeriod, mtu: mtu, egressPolicies: make(map[uint32][]osapi.EgressNetworkPolicy), } if err := plugin.dockerPreCNICleanup(); err != nil { return nil, err } return plugin, nil } // Detect whether we are upgrading from a pre-CNI openshift and clean up // interfaces and iptables rules that are no longer required func (node *OsdnNode) dockerPreCNICleanup() error { exec := kexec.New() itx := ipcmd.NewTransaction(exec, "lbr0") itx.SetLink("down") if err := itx.EndTransaction(); err != nil { // no cleanup required return nil } node.clearLbr0IptablesRule = true // Restart docker to kill old pods and make it use docker0 again. // "systemctl restart" will bail out (unnecessarily) in the // OpenShift-in-a-container case, so we work around that by sending // the messages by hand. if _, err := osexec.Command("dbus-send", "--system", "--print-reply", "--reply-timeout=2000", "--type=method_call", "--dest=org.freedesktop.systemd1", "/org/freedesktop/systemd1", "org.freedesktop.systemd1.Manager.Reload").CombinedOutput(); err != nil { log.Error(err) } if _, err := osexec.Command("dbus-send", "--system", "--print-reply", "--reply-timeout=2000", "--type=method_call", "--dest=org.freedesktop.systemd1", "/org/freedesktop/systemd1", "org.freedesktop.systemd1.Manager.RestartUnit", "string:'docker.service' string:'replace'").CombinedOutput(); err != nil { log.Error(err) } // Delete pre-CNI interfaces for _, intf := range []string{"lbr0", "vovsbr", "vlinuxbr"} { itx := ipcmd.NewTransaction(exec, intf) itx.DeleteLink() itx.IgnoreError() itx.EndTransaction() } // Wait until docker has restarted since kubelet will exit it docker isn't running dockerClient, err := docker.NewClientFromEnv() if err != nil { return fmt.Errorf("failed to get docker client: %v", err) } err = kwait.ExponentialBackoff( kwait.Backoff{ Duration: 100 * time.Millisecond, Factor: 1.2, Steps: 6, }, func() (bool, error) { if err := dockerClient.Ping(); err != nil { // wait longer return false, nil } return true, nil }) if err != nil { return fmt.Errorf("failed to connect to docker after SDN cleanup restart: %v", err) } log.Infof("Cleaned up left-over openshift-sdn docker bridge and interfaces") return nil } func (node *OsdnNode) Start() error { var err error node.networkInfo, err = getNetworkInfo(node.osClient) if err != nil { return fmt.Errorf("Failed to get network information: %v", err) } nodeIPTables := newNodeIPTables(node.networkInfo.ClusterNetwork.String(), node.iptablesSyncPeriod) if err = nodeIPTables.Setup(); err != nil { return fmt.Errorf("Failed to set up iptables: %v", err) } node.localSubnetCIDR, err = node.getLocalSubnet() if err != nil { return err } networkChanged, err := node.SetupSDN() if err != nil { return err } err = node.SubnetStartNode() if err != nil { return err } if err = node.policy.Start(node); err != nil { return err } go kwait.Forever(node.watchServices, 0) // Wait for kubelet to init the plugin so we get a knetwork.Host log.V(5).Infof("Waiting for kubelet network plugin initialization") <-node.kubeletInitReady // Wait for kubelet itself to finish initializing kwait.PollInfinite(100*time.Millisecond, func() (bool, error) { if node.host.GetRuntime() == nil { return false, nil } return true, nil }) log.V(5).Infof("Creating and initializing openshift-sdn pod manager") node.podManager, err = newPodManager(node.host, node.localSubnetCIDR, node.networkInfo, node.kClient, node.policy, node.mtu) if err != nil { return err } if err := node.podManager.Start(cniserver.CNIServerSocketPath); err != nil { return err } if networkChanged { var pods []kapi.Pod pods, err = node.GetLocalPods(kapi.NamespaceAll) if err != nil { return err } for _, p := range pods { err = node.UpdatePod(p) if err != nil { log.Warningf("Could not update pod %q: %s", p.Name, err) continue } if vnid, err := node.policy.GetVNID(p.Namespace); err == nil { node.policy.RefVNID(vnid) } } } log.V(5).Infof("openshift-sdn network plugin ready") node.markPodNetworkReady() return nil } // FIXME: this should eventually go into kubelet via a CNI UPDATE/CHANGE action // See https://github.com/containernetworking/cni/issues/89 func (node *OsdnNode) UpdatePod(pod kapi.Pod) error { req := &cniserver.PodRequest{ Command: cniserver.CNI_UPDATE, PodNamespace: pod.Namespace, PodName: pod.Name, ContainerId: getPodContainerID(&pod), // netns is read from docker if needed, since we don't get it from kubelet Result: make(chan *cniserver.PodResult), } // Send request and wait for the result _, err := node.podManager.handleCNIRequest(req) return err } func (node *OsdnNode) GetLocalPods(namespace string) ([]kapi.Pod, error) { fieldSelector := fields.Set{"spec.nodeName": node.hostName}.AsSelector() opts := kapi.ListOptions{ LabelSelector: labels.Everything(), FieldSelector: fieldSelector, } podList, err := node.kClient.Core().Pods(namespace).List(opts) if err != nil { return nil, err } // Filter running pods pods := make([]kapi.Pod, 0, len(podList.Items)) for _, pod := range podList.Items { if pod.Status.Phase == kapi.PodRunning { pods = append(pods, pod) } } return pods, nil } func (node *OsdnNode) markPodNetworkReady() { close(node.podNetworkReady) } func (node *OsdnNode) IsPodNetworkReady() error { select { case <-node.podNetworkReady: return nil default: return fmt.Errorf("SDN pod network is not ready") } } func isServiceChanged(oldsvc, newsvc *kapi.Service) bool { if len(oldsvc.Spec.Ports) == len(newsvc.Spec.Ports) { for i := range oldsvc.Spec.Ports { if oldsvc.Spec.Ports[i].Protocol != newsvc.Spec.Ports[i].Protocol || oldsvc.Spec.Ports[i].Port != newsvc.Spec.Ports[i].Port { return true } } return false } return true } func (node *OsdnNode) watchServices() { services := make(map[string]*kapi.Service) RunEventQueue(node.kClient.CoreClient, Services, func(delta cache.Delta) error { serv := delta.Object.(*kapi.Service) // Ignore headless services if !kapi.IsServiceIPSet(serv) { return nil } log.V(5).Infof("Watch %s event for Service %q", delta.Type, serv.ObjectMeta.Name) switch delta.Type { case cache.Sync, cache.Added, cache.Updated: oldsvc, exists := services[string(serv.UID)] if exists { if !isServiceChanged(oldsvc, serv) { break } node.DeleteServiceRules(oldsvc) } netid, err := node.policy.GetVNID(serv.Namespace) if err != nil { return fmt.Errorf("skipped adding service rules for serviceEvent: %v, Error: %v", delta.Type, err) } node.AddServiceRules(serv, netid) services[string(serv.UID)] = serv if !exists { node.policy.RefVNID(netid) } case cache.Deleted: delete(services, string(serv.UID)) node.DeleteServiceRules(serv) netid, err := node.policy.GetVNID(serv.Namespace) if err == nil { node.policy.UnrefVNID(netid) } } return nil }) }