package plugin
import (
"fmt"
"net"
"strconv"
log "github.com/golang/glog"
kapi "k8s.io/kubernetes/pkg/api"
kapiunversioned "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
kclient "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/types"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
utilwait "k8s.io/kubernetes/pkg/util/wait"
osapi "github.com/openshift/origin/pkg/sdn/api"
"github.com/openshift/origin/pkg/util/netutils"
)
func (master *OsdnMaster) SubnetStartMaster(clusterNetwork *net.IPNet, hostSubnetLength uint32) error {
subrange := make([]string, 0)
subnets, err := master.osClient.HostSubnets().List(kapi.ListOptions{})
if err != nil {
log.Errorf("Error in initializing/fetching subnets: %v", err)
return err
}
for _, sub := range subnets.Items {
subrange = append(subrange, sub.Subnet)
if err = master.networkInfo.validateNodeIP(sub.HostIP); err != nil {
// Don't error out; just warn so the error can be corrected with 'oc'
log.Errorf("Failed to validate HostSubnet %s: %v", hostSubnetToString(&sub), err)
} else {
log.Infof("Found existing HostSubnet %s", hostSubnetToString(&sub))
}
}
master.subnetAllocator, err = netutils.NewSubnetAllocator(clusterNetwork.String(), hostSubnetLength, subrange)
if err != nil {
return err
}
go utilwait.Forever(master.watchNodes, 0)
go utilwait.Forever(master.watchSubnets, 0)
return nil
}
func (master *OsdnMaster) addNode(nodeName string, nodeIP string, hsAnnotations map[string]string) error {
// Validate node IP before proceeding
if err := master.networkInfo.validateNodeIP(nodeIP); err != nil {
return err
}
// Check if subnet needs to be created or updated
sub, err := master.osClient.HostSubnets().Get(nodeName)
if err == nil {
if sub.HostIP == nodeIP {
return nil
} else {
// Node IP changed, update old subnet
sub.HostIP = nodeIP
sub, err = master.osClient.HostSubnets().Update(sub)
if err != nil {
return fmt.Errorf("Error updating subnet %s for node %s: %v", sub.Subnet, nodeName, err)
}
log.Infof("Updated HostSubnet %s", hostSubnetToString(sub))
return nil
}
}
// Create new subnet
sn, err := master.subnetAllocator.GetNetwork()
if err != nil {
return fmt.Errorf("Error allocating network for node %s: %v", nodeName, err)
}
sub = &osapi.HostSubnet{
TypeMeta: kapiunversioned.TypeMeta{Kind: "HostSubnet"},
ObjectMeta: kapi.ObjectMeta{Name: nodeName, Annotations: hsAnnotations},
Host: nodeName,
HostIP: nodeIP,
Subnet: sn.String(),
}
sub, err = master.osClient.HostSubnets().Create(sub)
if err != nil {
master.subnetAllocator.ReleaseNetwork(sn)
return fmt.Errorf("Error creating subnet %s for node %s: %v", sn.String(), nodeName, err)
}
log.Infof("Created HostSubnet %s", hostSubnetToString(sub))
return nil
}
func (master *OsdnMaster) deleteNode(nodeName string) error {
sub, err := master.osClient.HostSubnets().Get(nodeName)
if err != nil {
return fmt.Errorf("Error fetching subnet for node %q for deletion: %v", nodeName, err)
}
err = master.osClient.HostSubnets().Delete(nodeName)
if err != nil {
return fmt.Errorf("Error deleting subnet %v for node %q: %v", sub, nodeName, err)
}
log.Infof("Deleted HostSubnet %s", hostSubnetToString(sub))
return nil
}
func isValidNodeIP(node *kapi.Node, nodeIP string) bool {
for _, addr := range node.Status.Addresses {
if addr.Address == nodeIP {
return true
}
}
return false
}
func getNodeIP(node *kapi.Node) (string, error) {
if len(node.Status.Addresses) > 0 && node.Status.Addresses[0].Address != "" {
return node.Status.Addresses[0].Address, nil
} else {
return netutils.GetNodeIP(node.Name)
}
}
// Because openshift-sdn uses an overlay and doesn't need GCE Routes, we need to
// clear the NetworkUnavailable condition that kubelet adds to initial node
// status when using GCE.
// TODO: make upstream kubelet more flexible with overlays and GCE so this
// condition doesn't get added for network plugins that don't want it, and then
// we can remove this function.
func (master *OsdnMaster) clearInitialNodeNetworkUnavailableCondition(node *kapi.Node) {
knode := node
cleared := false
resultErr := kclient.RetryOnConflict(kclient.DefaultBackoff, func() error {
var err error
if knode != node {
knode, err = master.kClient.Nodes().Get(node.ObjectMeta.Name)
if err != nil {
return err
}
}
// Let caller modify knode's status, then push to api server.
_, condition := kapi.GetNodeCondition(&node.Status, kapi.NodeNetworkUnavailable)
if condition != nil && condition.Status != kapi.ConditionFalse && condition.Reason == "NoRouteCreated" {
condition.Status = kapi.ConditionFalse
condition.Reason = "RouteCreated"
condition.Message = "openshift-sdn cleared kubelet-set NoRouteCreated"
condition.LastTransitionTime = kapiunversioned.Now()
knode, err = master.kClient.Nodes().UpdateStatus(knode)
if err == nil {
cleared = true
}
}
return err
})
if resultErr != nil {
utilruntime.HandleError(fmt.Errorf("Status update failed for local node: %v", resultErr))
} else if cleared {
log.Infof("Cleared node NetworkUnavailable/NoRouteCreated condition for %s", node.ObjectMeta.Name)
}
}
func (master *OsdnMaster) watchNodes() {
nodeAddressMap := map[types.UID]string{}
RunEventQueue(master.kClient.CoreClient, Nodes, func(delta cache.Delta) error {
node := delta.Object.(*kapi.Node)
name := node.ObjectMeta.Name
uid := node.ObjectMeta.UID
nodeIP, err := getNodeIP(node)
if err != nil {
return fmt.Errorf("failed to get node IP for %s, skipping event: %v, node: %v", name, delta.Type, node)
}
switch delta.Type {
case cache.Sync, cache.Added, cache.Updated:
master.clearInitialNodeNetworkUnavailableCondition(node)
if oldNodeIP, ok := nodeAddressMap[uid]; ok && ((nodeIP == oldNodeIP) || isValidNodeIP(node, oldNodeIP)) {
break
}
// Node status is frequently updated by kubelet, so log only if the above condition is not met
log.V(5).Infof("Watch %s event for Node %q", delta.Type, name)
err = master.addNode(name, nodeIP, nil)
if err != nil {
return fmt.Errorf("error creating subnet for node %s, ip %s: %v", name, nodeIP, err)
}
nodeAddressMap[uid] = nodeIP
case cache.Deleted:
log.V(5).Infof("Watch %s event for Node %q", delta.Type, name)
delete(nodeAddressMap, uid)
err = master.deleteNode(name)
if err != nil {
return fmt.Errorf("Error deleting node %s: %v", name, err)
}
}
return nil
})
}
func (node *OsdnNode) SubnetStartNode() error {
go utilwait.Forever(node.watchSubnets, 0)
return nil
}
// Only run on the master
// Watch for all hostsubnet events and if one is found with the right annotation, use the SubnetAllocator to dole a real subnet
func (master *OsdnMaster) watchSubnets() {
RunEventQueue(master.osClient, HostSubnets, func(delta cache.Delta) error {
hs := delta.Object.(*osapi.HostSubnet)
name := hs.ObjectMeta.Name
hostIP := hs.HostIP
subnet := hs.Subnet
log.V(5).Infof("Watch %s event for HostSubnet %q", delta.Type, hs.ObjectMeta.Name)
switch delta.Type {
case cache.Sync, cache.Added, cache.Updated:
if _, ok := hs.Annotations[osapi.AssignHostSubnetAnnotation]; ok {
// Delete the annotated hostsubnet and create a new one with an assigned subnet
// We do not update (instead of delete+create) because the watchSubnets on the nodes
// will skip the event if it finds that the hostsubnet has the same host
// And we cannot fix the watchSubnets code for node because it will break migration if
// nodes are upgraded after the master
err := master.osClient.HostSubnets().Delete(name)
if err != nil {
log.Errorf("Error in deleting annotated subnet from master, name: %s, ip %s: %v", name, hostIP, err)
return nil
}
var hsAnnotations map[string]string
if vnid, ok := hs.Annotations[osapi.FixedVnidHost]; ok {
vnidInt, err := strconv.Atoi(vnid)
if err == nil && vnidInt >= 0 && uint32(vnidInt) <= osapi.MaxVNID {
hsAnnotations = make(map[string]string)
hsAnnotations[osapi.FixedVnidHost] = strconv.Itoa(vnidInt)
} else {
log.Errorf("Vnid %s is an invalid value for annotation %s. Annotation will be ignored.", vnid, osapi.FixedVnidHost)
}
}
err = master.addNode(name, hostIP, hsAnnotations)
if err != nil {
log.Errorf("Error creating subnet for node %s, ip %s: %v", name, hostIP, err)
return nil
}
}
case cache.Deleted:
if _, ok := hs.Annotations[osapi.AssignHostSubnetAnnotation]; !ok {
// release the subnet
_, ipnet, err := net.ParseCIDR(subnet)
if err != nil {
return fmt.Errorf("Error parsing subnet %q for node %q for deletion: %v", subnet, name, err)
}
master.subnetAllocator.ReleaseNetwork(ipnet)
}
}
return nil
})
}
// Only run on the nodes
func (node *OsdnNode) watchSubnets() {
subnets := make(map[string]*osapi.HostSubnet)
RunEventQueue(node.osClient, HostSubnets, func(delta cache.Delta) error {
hs := delta.Object.(*osapi.HostSubnet)
if hs.HostIP == node.localIP {
return nil
}
log.V(5).Infof("Watch %s event for HostSubnet %q", delta.Type, hs.ObjectMeta.Name)
switch delta.Type {
case cache.Sync, cache.Added, cache.Updated:
oldSubnet, exists := subnets[string(hs.UID)]
if exists {
if oldSubnet.HostIP == hs.HostIP {
break
} else {
// Delete old subnet rules
node.DeleteHostSubnetRules(oldSubnet)
}
}
if err := node.networkInfo.validateNodeIP(hs.HostIP); err != nil {
log.Warningf("Ignoring invalid subnet for node %s: %v", hs.HostIP, err)
break
}
node.AddHostSubnetRules(hs)
subnets[string(hs.UID)] = hs
case cache.Deleted:
delete(subnets, string(hs.UID))
node.DeleteHostSubnetRules(hs)
}
return nil
})
}