package plugin import ( "encoding/json" "fmt" "net" "github.com/openshift/origin/pkg/sdn/plugin/cniserver" "github.com/openshift/origin/pkg/util/netutils" "github.com/golang/glog" kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" knetwork "k8s.io/kubernetes/pkg/kubelet/network" kubehostport "k8s.io/kubernetes/pkg/kubelet/network/hostport" cnitypes "github.com/containernetworking/cni/pkg/types" ) type podHandler interface { setup(req *cniserver.PodRequest) (*cnitypes.Result, *kubehostport.RunningPod, error) update(req *cniserver.PodRequest) error teardown(req *cniserver.PodRequest) error } type podManager struct { // Common stuff used for both live and testing code podHandler podHandler cniServer *cniserver.CNIServer // Request queue for pod operations incoming from the CNIServer requests chan (*cniserver.PodRequest) // Tracks pod :: IP address for hostport handling runningPods map[string]*kubehostport.RunningPod // Live pod setup/teardown stuff not used in testing code kClient *kclientset.Clientset policy osdnPolicy ipamConfig []byte mtu uint32 hostportHandler kubehostport.HostportHandler host knetwork.Host } // Creates a new live podManager; used by node code func newPodManager(host knetwork.Host, localSubnetCIDR string, netInfo *NetworkInfo, kClient *kclientset.Clientset, policy osdnPolicy, mtu uint32) (*podManager, error) { pm := newDefaultPodManager(host) pm.kClient = kClient pm.policy = policy pm.mtu = mtu pm.hostportHandler = kubehostport.NewHostportHandler() pm.podHandler = pm var err error pm.ipamConfig, err = getIPAMConfig(netInfo.ClusterNetwork, localSubnetCIDR) if err != nil { return nil, err } return pm, nil } // Creates a new basic podManager; used by testcases func newDefaultPodManager(host knetwork.Host) *podManager { return &podManager{ runningPods: make(map[string]*kubehostport.RunningPod), requests: make(chan *cniserver.PodRequest, 20), host: host, } } // Generates a CNI IPAM config from a given node cluster and local subnet that // CNI 'host-local' IPAM plugin will use to create an IP address lease for the // container func getIPAMConfig(clusterNetwork *net.IPNet, localSubnet string) ([]byte, error) { nodeNet, err := cnitypes.ParseCIDR(localSubnet) if err != nil { return nil, fmt.Errorf("error parsing node network '%s': %v", localSubnet, err) } type hostLocalIPAM struct { Type string `json:"type"` Subnet cnitypes.IPNet `json:"subnet"` Routes []cnitypes.Route `json:"routes"` } type cniNetworkConfig struct { Name string `json:"name"` Type string `json:"type"` IPAM *hostLocalIPAM `json:"ipam"` } return json.Marshal(&cniNetworkConfig{ Name: "openshift-sdn", Type: "openshift-sdn", IPAM: &hostLocalIPAM{ Type: "host-local", Subnet: cnitypes.IPNet{ IP: nodeNet.IP, Mask: nodeNet.Mask, }, Routes: []cnitypes.Route{ { Dst: net.IPNet{ IP: net.IPv4zero, Mask: net.IPMask(net.IPv4zero), }, GW: netutils.GenerateDefaultGateway(nodeNet), }, {Dst: *clusterNetwork}, }, }, }) } // Start the CNI server and start processing requests from it func (m *podManager) Start(socketPath string) error { go m.processCNIRequests() m.cniServer = cniserver.NewCNIServer(socketPath) return m.cniServer.Start(m.handleCNIRequest) } // Returns a key for use with the runningPods map func getPodKey(request *cniserver.PodRequest) string { return fmt.Sprintf("%s/%s", request.PodNamespace, request.PodName) } func (m *podManager) getPod(request *cniserver.PodRequest) *kubehostport.RunningPod { return m.runningPods[getPodKey(request)] } // Return a list of Kubernetes RunningPod objects for hostport operations func (m *podManager) getRunningPods() []*kubehostport.RunningPod { pods := make([]*kubehostport.RunningPod, 0) for _, runningPod := range m.runningPods { pods = append(pods, runningPod) } return pods } // Add a request to the podManager CNI request queue func (m *podManager) addRequest(request *cniserver.PodRequest) { m.requests <- request } // Wait for and return the result of a pod request func (m *podManager) waitRequest(request *cniserver.PodRequest) *cniserver.PodResult { return <-request.Result } // Enqueue incoming pod requests from the CNI server, wait on the result, // and return that result to the CNI client func (m *podManager) handleCNIRequest(request *cniserver.PodRequest) ([]byte, error) { glog.V(5).Infof("Dispatching pod network request %v", request) m.addRequest(request) result := m.waitRequest(request) glog.V(5).Infof("Returning pod network request %v, result %s err %v", request, string(result.Response), result.Err) return result.Response, result.Err } // Process all CNI requests from the request queue serially. Our OVS interaction // and scripts currently cannot run in parallel, and doing so greatly complicates // setup/teardown logic func (m *podManager) processCNIRequests() { for request := range m.requests { pk := getPodKey(request) glog.V(5).Infof("Processing pod network request %v", request) result := &cniserver.PodResult{} switch request.Command { case cniserver.CNI_ADD: ipamResult, runningPod, err := m.podHandler.setup(request) if ipamResult != nil { result.Response, err = json.Marshal(ipamResult) if result.Err == nil { m.runningPods[pk] = runningPod } } if err != nil { result.Err = err } case cniserver.CNI_UPDATE: result.Err = m.podHandler.update(request) case cniserver.CNI_DEL: delete(m.runningPods, pk) result.Err = m.podHandler.teardown(request) default: result.Err = fmt.Errorf("unhandled CNI request %v", request.Command) } glog.V(5).Infof("Processed pod network request %v, result %s err %v", request, string(result.Response), result.Err) request.Result <- result } panic("stopped processing CNI pod requests!") }