// +build linux
package plugin
import (
"fmt"
"io/ioutil"
"net"
"os/exec"
"path/filepath"
"strconv"
"strings"
"syscall"
sdnapi "github.com/openshift/origin/pkg/sdn/api"
"github.com/openshift/origin/pkg/sdn/plugin/cniserver"
"github.com/golang/glog"
kapi "k8s.io/kubernetes/pkg/api"
kcontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
knetwork "k8s.io/kubernetes/pkg/kubelet/network"
kubehostport "k8s.io/kubernetes/pkg/kubelet/network/hostport"
kbandwidth "k8s.io/kubernetes/pkg/util/bandwidth"
ksets "k8s.io/kubernetes/pkg/util/sets"
"github.com/containernetworking/cni/pkg/invoke"
"github.com/containernetworking/cni/pkg/ip"
"github.com/containernetworking/cni/pkg/ipam"
"github.com/containernetworking/cni/pkg/ns"
cnitypes "github.com/containernetworking/cni/pkg/types"
"github.com/vishvananda/netlink"
)
const (
sdnScript = "openshift-sdn-ovs"
setUpCmd = "setup"
tearDownCmd = "teardown"
updateCmd = "update"
podInterfaceName = knetwork.DefaultInterfaceName
)
type PodConfig struct {
vnid uint32
ingressBandwidth string
egressBandwidth string
wantMacvlan bool
}
func getBandwidth(pod *kapi.Pod) (string, string, error) {
ingress, egress, err := kbandwidth.ExtractPodBandwidthResources(pod.Annotations)
if err != nil {
return "", "", fmt.Errorf("failed to parse pod bandwidth: %v", err)
}
var ingressStr, egressStr string
if ingress != nil {
ingressStr = fmt.Sprintf("%d", ingress.Value())
}
if egress != nil {
egressStr = fmt.Sprintf("%d", egress.Value())
}
return ingressStr, egressStr, nil
}
func wantsMacvlan(pod *kapi.Pod) (bool, error) {
privileged := false
for _, container := range pod.Spec.Containers {
if container.SecurityContext.Privileged != nil && *container.SecurityContext.Privileged {
privileged = true
break
}
}
val, ok := pod.Annotations[sdnapi.AssignMacvlanAnnotation]
if !ok || val != "true" {
return false, nil
}
if !privileged {
return false, fmt.Errorf("pod has %q annotation but is not privileged", sdnapi.AssignMacvlanAnnotation)
}
return true, nil
}
// Create and return a PodConfig describing which openshift-sdn specific pod attributes
// to configure
func (m *podManager) getPodConfig(req *cniserver.PodRequest) (*PodConfig, *kapi.Pod, error) {
var err error
config := &PodConfig{}
config.vnid, err = m.policy.GetVNID(req.PodNamespace)
if err != nil {
return nil, nil, err
}
pod, err := m.kClient.Pods(req.PodNamespace).Get(req.PodName)
if err != nil {
return nil, nil, fmt.Errorf("failed to read pod %s/%s: %v", req.PodNamespace, req.PodName, err)
}
config.wantMacvlan, err = wantsMacvlan(pod)
if err != nil {
return nil, nil, err
}
config.ingressBandwidth, config.egressBandwidth, err = getBandwidth(pod)
if err != nil {
return nil, nil, err
}
return config, pod, nil
}
// For a given container, returns host veth name, container veth MAC, and pod IP
func getVethInfo(netns, containerIfname string) (string, string, string, error) {
var (
peerIfindex int
contVeth netlink.Link
err error
podIP string
)
containerNs, err := ns.GetNS(netns)
if err != nil {
return "", "", "", fmt.Errorf("failed to get container netns: %v", err)
}
defer containerNs.Close()
err = containerNs.Do(func(ns.NetNS) error {
contVeth, err = netlink.LinkByName(containerIfname)
if err != nil {
return err
}
peerIfindex = contVeth.Attrs().ParentIndex
addrs, err := netlink.AddrList(contVeth, syscall.AF_INET)
if err != nil {
return fmt.Errorf("failed to get container IP addresses: %v", err)
}
if len(addrs) == 0 {
return fmt.Errorf("container had no addresses")
}
podIP = addrs[0].IP.String()
return nil
})
if err != nil {
return "", "", "", fmt.Errorf("failed to inspect container interface: %v", err)
}
hostVeth, err := netlink.LinkByIndex(peerIfindex)
if err != nil {
return "", "", "", fmt.Errorf("failed to get host veth: %v", err)
}
return hostVeth.Attrs().Name, contVeth.Attrs().HardwareAddr.String(), podIP, nil
}
// Adds a macvlan interface to a container for use with the egress router feature
func addMacvlan(netns string) error {
var defIface netlink.Link
var err error
// Find interface with the default route
routes, err := netlink.RouteList(nil, netlink.FAMILY_V4)
if err != nil {
return fmt.Errorf("failed to read routes: %v", err)
}
for _, r := range routes {
if r.Dst == nil {
defIface, err = netlink.LinkByIndex(r.LinkIndex)
if err != nil {
return fmt.Errorf("failed to get default route interface: %v", err)
}
}
}
if defIface == nil {
return fmt.Errorf("failed to find default route interface")
}
podNs, err := ns.GetNS(netns)
if err != nil {
return fmt.Errorf("could not open netns %q", netns)
}
defer podNs.Close()
err = netlink.LinkAdd(&netlink.Macvlan{
LinkAttrs: netlink.LinkAttrs{
MTU: defIface.Attrs().MTU,
Name: "macvlan0",
ParentIndex: defIface.Attrs().Index,
Namespace: netlink.NsFd(podNs.Fd()),
},
Mode: netlink.MACVLAN_MODE_PRIVATE,
})
if err != nil {
return fmt.Errorf("failed to create macvlan interface: %v", err)
}
return podNs.Do(func(netns ns.NetNS) error {
l, err := netlink.LinkByName("macvlan0")
if err != nil {
return fmt.Errorf("failed to find macvlan interface: %v", err)
}
err = netlink.LinkSetUp(l)
if err != nil {
return fmt.Errorf("failed to set macvlan interface up: %v", err)
}
return nil
})
}
func createIPAMArgs(netnsPath string, action cniserver.CNICommand, id string) *invoke.Args {
return &invoke.Args{
Command: string(action),
ContainerID: id,
NetNS: netnsPath,
IfName: podInterfaceName,
Path: "/opt/cni/bin",
}
}
// Run CNI IPAM allocation for the container and return the allocated IP address
func (m *podManager) ipamAdd(netnsPath string, id string) (*cnitypes.Result, error) {
if netnsPath == "" {
return nil, fmt.Errorf("netns required for CNI_ADD")
}
args := createIPAMArgs(netnsPath, cniserver.CNI_ADD, id)
result, err := invoke.ExecPluginWithResult("/opt/cni/bin/host-local", m.ipamConfig, args)
if err != nil {
return nil, fmt.Errorf("failed to run CNI IPAM ADD: %v", err)
}
if result.IP4 == nil {
return nil, fmt.Errorf("failed to obtain IP address from CNI IPAM")
}
return result, nil
}
// Run CNI IPAM release for the container
func (m *podManager) ipamDel(id string) error {
args := createIPAMArgs("", cniserver.CNI_DEL, id)
err := invoke.ExecPluginWithoutResult("/opt/cni/bin/host-local", m.ipamConfig, args)
if err != nil {
return fmt.Errorf("failed to run CNI IPAM DEL: %v", err)
}
return nil
}
func isScriptError(err error) bool {
_, ok := err.(*exec.ExitError)
return ok
}
// Get the last command (which is prefixed with "+" because of "set -x") and its output
func getScriptError(output []byte) string {
lines := strings.Split(string(output), "\n")
for n := len(lines) - 1; n >= 0; n-- {
if strings.HasPrefix(lines[n], "+") {
return strings.Join(lines[n:], "\n")
}
}
return string(output)
}
func vnidToString(vnid uint32) string {
return strconv.FormatUint(uint64(vnid), 10)
}
// podIsExited returns true if the pod is exited (all containers inside are exited).
func podIsExited(p *kcontainer.Pod) bool {
for _, c := range p.Containers {
if c.State != kcontainer.ContainerStateExited {
return false
}
}
return true
}
// getNonExitedPods returns a list of pods that have at least one running container.
func (m *podManager) getNonExitedPods() ([]*kcontainer.Pod, error) {
ret := []*kcontainer.Pod{}
pods, err := m.host.GetRuntime().GetPods(true)
if err != nil {
return nil, fmt.Errorf("Failed to retrieve pods from runtime: %v", err)
}
for _, p := range pods {
if podIsExited(p) {
continue
}
ret = append(ret, p)
}
return ret, nil
}
// ipamGarbageCollection will release unused IPs from dead containers that
// the CNI plugin was never notified had died. openshift-sdn uses the CNI
// host-local IPAM plugin, which stores allocated IPs in a file in
// /var/lib/cni/network. Each file in this directory has as its name the
// allocated IP address of the container, and as its contents the container ID.
// This routine looks for container IDs that are not reported as running by the
// container runtime, and releases each one's IPAM allocation.
func (m *podManager) ipamGarbageCollection() {
glog.V(2).Infof("Starting IP garbage collection")
const ipamDir string = "/var/lib/cni/networks/openshift-sdn"
files, err := ioutil.ReadDir(ipamDir)
if err != nil {
glog.Errorf("Failed to list files in CNI host-local IPAM store %v: %v", ipamDir, err)
return
}
// gather containerIDs for allocated ips
ipContainerIdMap := make(map[string]string)
for _, file := range files {
// skip non checkpoint file
if ip := net.ParseIP(file.Name()); ip == nil {
continue
}
content, err := ioutil.ReadFile(filepath.Join(ipamDir, file.Name()))
if err != nil {
glog.Errorf("Failed to read file %v: %v", file, err)
}
ipContainerIdMap[file.Name()] = strings.TrimSpace(string(content))
}
// gather infra container IDs of current running Pods
runningContainerIDs := ksets.String{}
pods, err := m.getNonExitedPods()
if err != nil {
glog.Errorf("Failed to get pods: %v", err)
return
}
for _, pod := range pods {
containerID, err := m.host.GetRuntime().GetPodContainerID(pod)
if err != nil {
glog.Warningf("Failed to get infra containerID of %q/%q: %v", pod.Namespace, pod.Name, err)
continue
}
runningContainerIDs.Insert(strings.TrimSpace(containerID.ID))
}
// release leaked ips
for ip, containerID := range ipContainerIdMap {
// if the container is not running, release IP
if runningContainerIDs.Has(containerID) {
continue
}
glog.V(2).Infof("Releasing IP %q allocated to %q.", ip, containerID)
m.ipamDel(containerID)
}
}
// Set up all networking (host/container veth, OVS flows, IPAM, loopback, etc)
func (m *podManager) setup(req *cniserver.PodRequest) (*cnitypes.Result, *kubehostport.RunningPod, error) {
podConfig, pod, err := m.getPodConfig(req)
if err != nil {
return nil, nil, err
}
ipamResult, err := m.ipamAdd(req.Netns, req.ContainerId)
if err != nil {
// TODO: Remove this hack once we've figured out how to retrieve the netns
// of an exited container. Currently, restarting docker will leak a bunch of
// ips. This will exhaust available ip space unless we cleanup old ips. At the
// same time we don't want to try GC'ing them periodically as that could lead
// to a performance regression in starting pods. So on each setup failure, try
// GC on the assumption that the kubelet is going to retry pod creation, and
// when it does, there will be ips.
m.ipamGarbageCollection()
return nil, nil, fmt.Errorf("failed to run IPAM for %v: %v", req.ContainerId, err)
}
podIP := ipamResult.IP4.IP.IP
// Release any IPAM allocations and hostports if the setup failed
var success bool
defer func() {
if !success {
m.ipamDel(req.ContainerId)
if err := m.hostportHandler.SyncHostports(TUN, m.getRunningPods()); err != nil {
glog.Warningf("failed syncing hostports: %v", err)
}
}
}()
// Open any hostports the pod wants
newPod := &kubehostport.RunningPod{Pod: pod, IP: podIP}
if err := m.hostportHandler.OpenPodHostportsAndSync(newPod, TUN, m.getRunningPods()); err != nil {
return nil, nil, err
}
var hostVeth, contVeth netlink.Link
err = ns.WithNetNSPath(req.Netns, func(hostNS ns.NetNS) error {
hostVeth, contVeth, err = ip.SetupVeth(podInterfaceName, int(m.mtu), hostNS)
if err != nil {
return fmt.Errorf("failed to create container veth: %v", err)
}
// refetch to get hardware address and other properties
contVeth, err = netlink.LinkByIndex(contVeth.Attrs().Index)
if err != nil {
return fmt.Errorf("failed to fetch container veth: %v", err)
}
// Clear out gateway to prevent ConfigureIface from adding the cluster
// subnet via the gateway
ipamResult.IP4.Gateway = nil
if err = ipam.ConfigureIface(podInterfaceName, ipamResult); err != nil {
return fmt.Errorf("failed to configure container IPAM: %v", err)
}
lo, err := netlink.LinkByName("lo")
if err == nil {
err = netlink.LinkSetUp(lo)
}
if err != nil {
return fmt.Errorf("failed to configure container loopback: %v", err)
}
return nil
})
if err != nil {
return nil, nil, err
}
if podConfig.wantMacvlan {
if err := addMacvlan(req.Netns); err != nil {
return nil, nil, err
}
}
contVethMac := contVeth.Attrs().HardwareAddr.String()
vnidStr := vnidToString(podConfig.vnid)
out, err := exec.Command(sdnScript, setUpCmd, hostVeth.Attrs().Name, contVethMac, podIP.String(), vnidStr, podConfig.ingressBandwidth, podConfig.egressBandwidth).CombinedOutput()
glog.V(5).Infof("SetUpPod network plugin output: %s, %v", string(out), err)
if isScriptError(err) {
return nil, nil, fmt.Errorf("error running network setup script:\nhostVethName %s, contVethMac %s, podIP %s, podConfig %#v\n %s", hostVeth.Attrs().Name, contVethMac, podIP.String(), podConfig, getScriptError(out))
} else if err != nil {
return nil, nil, err
}
m.policy.RefVNID(podConfig.vnid)
success = true
return ipamResult, newPod, nil
}
func (m *podManager) getContainerNetnsPath(id string) (string, error) {
runtime, ok := m.host.GetRuntime().(*dockertools.DockerManager)
if !ok {
return "", fmt.Errorf("openshift-sdn execution called on non-docker runtime")
}
return runtime.GetNetNS(kcontainer.DockerID(id).ContainerID())
}
// Update OVS flows when something (like the pod's namespace VNID) changes
func (m *podManager) update(req *cniserver.PodRequest) error {
// Updates may come at startup and thus we may not have the pod's
// netns from kubelet (since kubelet doesn't have UPDATE actions).
// Read the missing netns from the pod's file.
if req.Netns == "" {
netns, err := m.getContainerNetnsPath(req.ContainerId)
if err != nil {
return err
}
req.Netns = netns
}
podConfig, _, err := m.getPodConfig(req)
if err != nil {
return err
}
hostVethName, contVethMac, podIP, err := getVethInfo(req.Netns, podInterfaceName)
if err != nil {
return err
}
vnidStr := vnidToString(podConfig.vnid)
out, err := exec.Command(sdnScript, updateCmd, hostVethName, contVethMac, podIP, vnidStr, podConfig.ingressBandwidth, podConfig.egressBandwidth).CombinedOutput()
glog.V(5).Infof("UpdatePod network plugin output: %s, %v", string(out), err)
if isScriptError(err) {
return fmt.Errorf("error running network update script: %s", getScriptError(out))
} else if err != nil {
return err
}
return nil
}
// Clean up all pod networking (clear OVS flows, release IPAM lease, remove host/container veth)
func (m *podManager) teardown(req *cniserver.PodRequest) error {
netnsValid := true
if err := ns.IsNSorErr(req.Netns); err != nil {
if _, ok := err.(ns.NSPathNotExistErr); ok {
glog.V(3).Infof("teardown called on already-destroyed pod %s/%s; only cleaning up IPAM", req.PodNamespace, req.PodName)
netnsValid = false
}
}
if netnsValid {
hostVethName, contVethMac, podIP, err := getVethInfo(req.Netns, podInterfaceName)
if err != nil {
return err
}
// The script's teardown functionality doesn't need the VNID
out, err := exec.Command(sdnScript, tearDownCmd, hostVethName, contVethMac, podIP, "-1").CombinedOutput()
glog.V(5).Infof("TearDownPod network plugin output: %s, %v", string(out), err)
if isScriptError(err) {
return fmt.Errorf("error running network teardown script: %s", getScriptError(out))
} else if err != nil {
return err
}
}
if vnid, err := m.policy.GetVNID(req.PodNamespace); err == nil {
m.policy.UnrefVNID(vnid)
}
if err := m.ipamDel(req.ContainerId); err != nil {
return err
}
if err := m.hostportHandler.SyncHostports(TUN, m.getRunningPods()); err != nil {
return err
}
return nil
}