package plugin
import (
"fmt"
"net"
"strings"
"time"
"github.com/golang/glog"
osclient "github.com/openshift/origin/pkg/client"
osapi "github.com/openshift/origin/pkg/sdn/api"
kapi "k8s.io/kubernetes/pkg/api"
kcache "k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/fields"
kcontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
func getPodContainerID(pod *kapi.Pod) string {
if len(pod.Status.ContainerStatuses) > 0 {
return kcontainer.ParseContainerID(pod.Status.ContainerStatuses[0].ContainerID).ID
}
return ""
}
func hostSubnetToString(subnet *osapi.HostSubnet) string {
return fmt.Sprintf("%s (host: %q, ip: %q, subnet: %q)", subnet.Name, subnet.Host, subnet.HostIP, subnet.Subnet)
}
func clusterNetworkToString(n *osapi.ClusterNetwork) string {
return fmt.Sprintf("%s (network: %q, hostSubnetBits: %d, serviceNetwork: %q, pluginName: %q)", n.Name, n.Network, n.HostSubnetLength, n.ServiceNetwork, n.PluginName)
}
type NetworkInfo struct {
ClusterNetwork *net.IPNet
ServiceNetwork *net.IPNet
}
func parseNetworkInfo(clusterNetwork string, serviceNetwork string) (*NetworkInfo, error) {
_, cn, err := net.ParseCIDR(clusterNetwork)
if err != nil {
return nil, fmt.Errorf("Failed to parse ClusterNetwork CIDR %s: %v", clusterNetwork, err)
}
_, sn, err := net.ParseCIDR(serviceNetwork)
if err != nil {
return nil, fmt.Errorf("Failed to parse ServiceNetwork CIDR %s: %v", serviceNetwork, err)
}
return &NetworkInfo{
ClusterNetwork: cn,
ServiceNetwork: sn,
}, nil
}
func (ni *NetworkInfo) validateNodeIP(nodeIP string) error {
if nodeIP == "" || nodeIP == "127.0.0.1" {
return fmt.Errorf("Invalid node IP %q", nodeIP)
}
// Ensure each node's NodeIP is not contained by the cluster network,
// which could cause a routing loop. (rhbz#1295486)
ipaddr := net.ParseIP(nodeIP)
if ipaddr == nil {
return fmt.Errorf("Failed to parse node IP %s", nodeIP)
}
if ni.ClusterNetwork.Contains(ipaddr) {
return fmt.Errorf("Node IP %s conflicts with cluster network %s", nodeIP, ni.ClusterNetwork.String())
}
if ni.ServiceNetwork.Contains(ipaddr) {
return fmt.Errorf("Node IP %s conflicts with service network %s", nodeIP, ni.ServiceNetwork.String())
}
return nil
}
func getNetworkInfo(osClient *osclient.Client) (*NetworkInfo, error) {
cn, err := osClient.ClusterNetwork().Get(osapi.ClusterNetworkDefault)
if err != nil {
return nil, err
}
return parseNetworkInfo(cn.Network, cn.ServiceNetwork)
}
type ResourceName string
const (
Nodes ResourceName = "Nodes"
Namespaces ResourceName = "Namespaces"
NetNamespaces ResourceName = "NetNamespaces"
Services ResourceName = "Services"
HostSubnets ResourceName = "HostSubnets"
Pods ResourceName = "Pods"
EgressNetworkPolicies ResourceName = "EgressNetworkPolicies"
)
// Run event queue for the given resource. The 'process' function is called
// repeatedly with each available cache.Delta that describes state changes
// to an object. If the process function returns an error queued changes
// for that object are dropped but processing continues with the next available
// object's cache.Deltas. The error is logged with call stack information.
func runEventQueueForResource(client kcache.Getter, resourceName ResourceName, expectedType interface{}, selector fields.Selector, process ProcessEventFunc) {
rn := strings.ToLower(string(resourceName))
lw := kcache.NewListWatchFromClient(client, rn, kapi.NamespaceAll, selector)
eventQueue := NewEventQueue(DeletionHandlingMetaNamespaceKeyFunc)
// Repopulate event queue every 30 mins
// Existing items in the event queue will have watch.Modified event type
kcache.NewReflector(lw, expectedType, eventQueue, 30*time.Minute).Run()
// Run the queue
for {
eventQueue.Pop(process, expectedType)
}
}
// Run event queue for the given resource.
// NOTE: this function will handle DeletedFinalStateUnknown delta objects
// automatically, which may not always be what you want since the now-deleted
// object may be stale.
func RunEventQueue(client kcache.Getter, resourceName ResourceName, process ProcessEventFunc) {
var expectedType interface{}
switch resourceName {
case HostSubnets:
expectedType = &osapi.HostSubnet{}
case NetNamespaces:
expectedType = &osapi.NetNamespace{}
case Nodes:
expectedType = &kapi.Node{}
case Namespaces:
expectedType = &kapi.Namespace{}
case Services:
expectedType = &kapi.Service{}
case Pods:
expectedType = &kapi.Pod{}
case EgressNetworkPolicies:
expectedType = &osapi.EgressNetworkPolicy{}
default:
glog.Fatalf("Unknown resource %s during initialization of event queue", resourceName)
}
runEventQueueForResource(client, resourceName, expectedType, fields.Everything(), process)
}