package plugin import ( "fmt" "net" "strings" "time" log "github.com/golang/glog" osclient "github.com/openshift/origin/pkg/client" osapi "github.com/openshift/origin/pkg/sdn/api" "github.com/openshift/origin/pkg/sdn/plugin/api" "github.com/openshift/origin/pkg/util/netutils" kapi "k8s.io/kubernetes/pkg/api" kclient "k8s.io/kubernetes/pkg/client/unversioned" kubeletTypes "k8s.io/kubernetes/pkg/kubelet/container" kexec "k8s.io/kubernetes/pkg/util/exec" kubeutilnet "k8s.io/kubernetes/pkg/util/net" ) type OsdnNode struct { multitenant bool registry *Registry localIP string localSubnet *osapi.HostSubnet hostName string podNetworkReady chan struct{} vnids *nodeVNIDMap iptablesSyncPeriod time.Duration mtu uint32 egressPolicies map[uint32][]*osapi.EgressNetworkPolicy } // Called by higher layers to create the plugin SDN node instance func NewNodePlugin(pluginName string, osClient *osclient.Client, kClient *kclient.Client, hostname string, selfIP string, iptablesSyncPeriod time.Duration, mtu uint32) (api.OsdnNodePlugin, error) { if !IsOpenShiftNetworkPlugin(pluginName) { return nil, nil } log.Infof("Initializing SDN node of type %q with configured hostname %q (IP %q), iptables sync period %q", pluginName, hostname, selfIP, iptablesSyncPeriod.String()) if hostname == "" { output, err := kexec.New().Command("uname", "-n").CombinedOutput() if err != nil { return nil, err } hostname = strings.TrimSpace(string(output)) log.Infof("Resolved hostname to %q", hostname) } if selfIP == "" { var err error selfIP, err = netutils.GetNodeIP(hostname) if err != nil { log.V(5).Infof("Failed to determine node address from hostname %s; using default interface (%v)", hostname, err) var defaultIP net.IP defaultIP, err = kubeutilnet.ChooseHostInterface() if err != nil { return nil, err } selfIP = defaultIP.String() log.Infof("Resolved IP address to %q", selfIP) } } plugin := &OsdnNode{ multitenant: IsOpenShiftMultitenantNetworkPlugin(pluginName), registry: newRegistry(osClient, kClient), localIP: selfIP, hostName: hostname, vnids: newNodeVNIDMap(), podNetworkReady: make(chan struct{}), iptablesSyncPeriod: iptablesSyncPeriod, mtu: mtu, egressPolicies: make(map[uint32][]*osapi.EgressNetworkPolicy), } return plugin, nil } func (node *OsdnNode) Start() error { ni, err := node.registry.GetNetworkInfo() if err != nil { return fmt.Errorf("Failed to get network information: %v", err) } nodeIPTables := newNodeIPTables(ni.ClusterNetwork.String(), node.iptablesSyncPeriod) if err = nodeIPTables.Setup(); err != nil { return fmt.Errorf("Failed to set up iptables: %v", err) } var networkChanged bool networkChanged, err = node.SubnetStartNode(node.mtu) if err != nil { return err } if node.multitenant { if err = node.VnidStartNode(); err != nil { return err } if err = node.SetupEgressNetworkPolicy(); err != nil { return err } } if networkChanged { var pods []kapi.Pod pods, err = node.GetLocalPods(kapi.NamespaceAll) if err != nil { return err } for _, p := range pods { containerID := getPodContainerID(&p) err = node.UpdatePod(p.Namespace, p.Name, kubeletTypes.DockerID(containerID)) if err != nil { log.Warningf("Could not update pod %q (%s): %s", p.Name, containerID, err) } } } node.markPodNetworkReady() return nil } func (node *OsdnNode) GetLocalPods(namespace string) ([]kapi.Pod, error) { return node.registry.GetRunningPods(node.hostName, namespace) } func (node *OsdnNode) markPodNetworkReady() { close(node.podNetworkReady) } func (node *OsdnNode) WaitForPodNetworkReady() error { logInterval := 10 * time.Second numIntervals := 12 // timeout: 2 mins for i := 0; i < numIntervals; i++ { select { // Wait for StartNode() to finish SDN setup case <-node.podNetworkReady: return nil case <-time.After(logInterval): log.Infof("Waiting for SDN pod network to be ready...") } } return fmt.Errorf("SDN pod network is not ready(timeout: 2 mins)") }