package plugin
import (
"fmt"
"net"
osexec "os/exec"
"strings"
"time"
log "github.com/golang/glog"
"github.com/openshift/origin/pkg/sdn/plugin/cniserver"
osclient "github.com/openshift/origin/pkg/client"
osapi "github.com/openshift/origin/pkg/sdn/api"
"github.com/openshift/origin/pkg/util/ipcmd"
"github.com/openshift/origin/pkg/util/netutils"
"github.com/openshift/origin/pkg/util/ovs"
docker "github.com/fsouza/go-dockerclient"
kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/fields"
knetwork "k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/labels"
kexec "k8s.io/kubernetes/pkg/util/exec"
kubeutilnet "k8s.io/kubernetes/pkg/util/net"
kwait "k8s.io/kubernetes/pkg/util/wait"
)
type osdnPolicy interface {
Name() string
Start(node *OsdnNode) error
AddNetNamespace(netns *osapi.NetNamespace)
UpdateNetNamespace(netns *osapi.NetNamespace, oldNetID uint32)
DeleteNetNamespace(netns *osapi.NetNamespace)
GetVNID(namespace string) (uint32, error)
GetNamespaces(vnid uint32) []string
RefVNID(vnid uint32)
UnrefVNID(vnid uint32)
}
type OsdnNode struct {
policy osdnPolicy
kClient *kclientset.Clientset
osClient *osclient.Client
ovs *ovs.Interface
networkInfo *NetworkInfo
podManager *podManager
localSubnetCIDR string
localIP string
hostName string
podNetworkReady chan struct{}
kubeletInitReady chan struct{}
iptablesSyncPeriod time.Duration
mtu uint32
egressPolicies map[uint32][]osapi.EgressNetworkPolicy
host knetwork.Host
kubeletCniPlugin knetwork.NetworkPlugin
clearLbr0IptablesRule bool
}
// Called by higher layers to create the plugin SDN node instance
func NewNodePlugin(pluginName string, osClient *osclient.Client, kClient *kclientset.Clientset, hostname string, selfIP string, iptablesSyncPeriod time.Duration, mtu uint32) (*OsdnNode, error) {
var policy osdnPolicy
switch strings.ToLower(pluginName) {
case osapi.SingleTenantPluginName:
policy = NewSingleTenantPlugin()
case osapi.MultiTenantPluginName:
policy = NewMultiTenantPlugin()
default:
// Not an OpenShift plugin
return nil, nil
}
log.Infof("Initializing SDN node of type %q with configured hostname %q (IP %q), iptables sync period %q", pluginName, hostname, selfIP, iptablesSyncPeriod.String())
if hostname == "" {
output, err := kexec.New().Command("uname", "-n").CombinedOutput()
if err != nil {
return nil, err
}
hostname = strings.TrimSpace(string(output))
log.Infof("Resolved hostname to %q", hostname)
}
if selfIP == "" {
var err error
selfIP, err = netutils.GetNodeIP(hostname)
if err != nil {
log.V(5).Infof("Failed to determine node address from hostname %s; using default interface (%v)", hostname, err)
var defaultIP net.IP
defaultIP, err = kubeutilnet.ChooseHostInterface()
if err != nil {
return nil, err
}
selfIP = defaultIP.String()
log.Infof("Resolved IP address to %q", selfIP)
}
}
ovsif, err := ovs.New(kexec.New(), BR)
if err != nil {
return nil, err
}
plugin := &OsdnNode{
policy: policy,
kClient: kClient,
osClient: osClient,
ovs: ovsif,
localIP: selfIP,
hostName: hostname,
podNetworkReady: make(chan struct{}),
kubeletInitReady: make(chan struct{}),
iptablesSyncPeriod: iptablesSyncPeriod,
mtu: mtu,
egressPolicies: make(map[uint32][]osapi.EgressNetworkPolicy),
}
if err := plugin.dockerPreCNICleanup(); err != nil {
return nil, err
}
return plugin, nil
}
// Detect whether we are upgrading from a pre-CNI openshift and clean up
// interfaces and iptables rules that are no longer required
func (node *OsdnNode) dockerPreCNICleanup() error {
exec := kexec.New()
itx := ipcmd.NewTransaction(exec, "lbr0")
itx.SetLink("down")
if err := itx.EndTransaction(); err != nil {
// no cleanup required
return nil
}
node.clearLbr0IptablesRule = true
// Restart docker to kill old pods and make it use docker0 again.
// "systemctl restart" will bail out (unnecessarily) in the
// OpenShift-in-a-container case, so we work around that by sending
// the messages by hand.
if _, err := osexec.Command("dbus-send", "--system", "--print-reply", "--reply-timeout=2000", "--type=method_call", "--dest=org.freedesktop.systemd1", "/org/freedesktop/systemd1", "org.freedesktop.systemd1.Manager.Reload").CombinedOutput(); err != nil {
log.Error(err)
}
if _, err := osexec.Command("dbus-send", "--system", "--print-reply", "--reply-timeout=2000", "--type=method_call", "--dest=org.freedesktop.systemd1", "/org/freedesktop/systemd1", "org.freedesktop.systemd1.Manager.RestartUnit", "string:'docker.service' string:'replace'").CombinedOutput(); err != nil {
log.Error(err)
}
// Delete pre-CNI interfaces
for _, intf := range []string{"lbr0", "vovsbr", "vlinuxbr"} {
itx := ipcmd.NewTransaction(exec, intf)
itx.DeleteLink()
itx.IgnoreError()
itx.EndTransaction()
}
// Wait until docker has restarted since kubelet will exit it docker isn't running
dockerClient, err := docker.NewClientFromEnv()
if err != nil {
return fmt.Errorf("failed to get docker client: %v", err)
}
err = kwait.ExponentialBackoff(
kwait.Backoff{
Duration: 100 * time.Millisecond,
Factor: 1.2,
Steps: 6,
},
func() (bool, error) {
if err := dockerClient.Ping(); err != nil {
// wait longer
return false, nil
}
return true, nil
})
if err != nil {
return fmt.Errorf("failed to connect to docker after SDN cleanup restart: %v", err)
}
log.Infof("Cleaned up left-over openshift-sdn docker bridge and interfaces")
return nil
}
func (node *OsdnNode) Start() error {
var err error
node.networkInfo, err = getNetworkInfo(node.osClient)
if err != nil {
return fmt.Errorf("Failed to get network information: %v", err)
}
nodeIPTables := newNodeIPTables(node.networkInfo.ClusterNetwork.String(), node.iptablesSyncPeriod)
if err = nodeIPTables.Setup(); err != nil {
return fmt.Errorf("Failed to set up iptables: %v", err)
}
node.localSubnetCIDR, err = node.getLocalSubnet()
if err != nil {
return err
}
networkChanged, err := node.SetupSDN()
if err != nil {
return err
}
err = node.SubnetStartNode()
if err != nil {
return err
}
if err = node.policy.Start(node); err != nil {
return err
}
go kwait.Forever(node.watchServices, 0)
// Wait for kubelet to init the plugin so we get a knetwork.Host
log.V(5).Infof("Waiting for kubelet network plugin initialization")
<-node.kubeletInitReady
// Wait for kubelet itself to finish initializing
kwait.PollInfinite(100*time.Millisecond,
func() (bool, error) {
if node.host.GetRuntime() == nil {
return false, nil
}
return true, nil
})
log.V(5).Infof("Creating and initializing openshift-sdn pod manager")
node.podManager, err = newPodManager(node.host, node.localSubnetCIDR, node.networkInfo, node.kClient, node.policy, node.mtu)
if err != nil {
return err
}
if err := node.podManager.Start(cniserver.CNIServerSocketPath); err != nil {
return err
}
if networkChanged {
var pods []kapi.Pod
pods, err = node.GetLocalPods(kapi.NamespaceAll)
if err != nil {
return err
}
for _, p := range pods {
err = node.UpdatePod(p)
if err != nil {
log.Warningf("Could not update pod %q: %s", p.Name, err)
continue
}
if vnid, err := node.policy.GetVNID(p.Namespace); err == nil {
node.policy.RefVNID(vnid)
}
}
}
log.V(5).Infof("openshift-sdn network plugin ready")
node.markPodNetworkReady()
return nil
}
// FIXME: this should eventually go into kubelet via a CNI UPDATE/CHANGE action
// See https://github.com/containernetworking/cni/issues/89
func (node *OsdnNode) UpdatePod(pod kapi.Pod) error {
req := &cniserver.PodRequest{
Command: cniserver.CNI_UPDATE,
PodNamespace: pod.Namespace,
PodName: pod.Name,
ContainerId: getPodContainerID(&pod),
// netns is read from docker if needed, since we don't get it from kubelet
Result: make(chan *cniserver.PodResult),
}
// Send request and wait for the result
_, err := node.podManager.handleCNIRequest(req)
return err
}
func (node *OsdnNode) GetLocalPods(namespace string) ([]kapi.Pod, error) {
fieldSelector := fields.Set{"spec.nodeName": node.hostName}.AsSelector()
opts := kapi.ListOptions{
LabelSelector: labels.Everything(),
FieldSelector: fieldSelector,
}
podList, err := node.kClient.Core().Pods(namespace).List(opts)
if err != nil {
return nil, err
}
// Filter running pods
pods := make([]kapi.Pod, 0, len(podList.Items))
for _, pod := range podList.Items {
if pod.Status.Phase == kapi.PodRunning {
pods = append(pods, pod)
}
}
return pods, nil
}
func (node *OsdnNode) markPodNetworkReady() {
close(node.podNetworkReady)
}
func (node *OsdnNode) IsPodNetworkReady() error {
select {
case <-node.podNetworkReady:
return nil
default:
return fmt.Errorf("SDN pod network is not ready")
}
}
func isServiceChanged(oldsvc, newsvc *kapi.Service) bool {
if len(oldsvc.Spec.Ports) == len(newsvc.Spec.Ports) {
for i := range oldsvc.Spec.Ports {
if oldsvc.Spec.Ports[i].Protocol != newsvc.Spec.Ports[i].Protocol ||
oldsvc.Spec.Ports[i].Port != newsvc.Spec.Ports[i].Port {
return true
}
}
return false
}
return true
}
func (node *OsdnNode) watchServices() {
services := make(map[string]*kapi.Service)
RunEventQueue(node.kClient.CoreClient, Services, func(delta cache.Delta) error {
serv := delta.Object.(*kapi.Service)
// Ignore headless services
if !kapi.IsServiceIPSet(serv) {
return nil
}
log.V(5).Infof("Watch %s event for Service %q", delta.Type, serv.ObjectMeta.Name)
switch delta.Type {
case cache.Sync, cache.Added, cache.Updated:
oldsvc, exists := services[string(serv.UID)]
if exists {
if !isServiceChanged(oldsvc, serv) {
break
}
node.DeleteServiceRules(oldsvc)
}
netid, err := node.policy.GetVNID(serv.Namespace)
if err != nil {
return fmt.Errorf("skipped adding service rules for serviceEvent: %v, Error: %v", delta.Type, err)
}
node.AddServiceRules(serv, netid)
services[string(serv.UID)] = serv
if !exists {
node.policy.RefVNID(netid)
}
case cache.Deleted:
delete(services, string(serv.UID))
node.DeleteServiceRules(serv)
netid, err := node.policy.GetVNID(serv.Namespace)
if err == nil {
node.policy.UnrefVNID(netid)
}
}
return nil
})
}