package plugin
import (
"fmt"
"net"
"sync"
"github.com/golang/glog"
osclient "github.com/openshift/origin/pkg/client"
osapi "github.com/openshift/origin/pkg/sdn/api"
kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
pconfig "k8s.io/kubernetes/pkg/proxy/config"
utilwait "k8s.io/kubernetes/pkg/util/wait"
)
type proxyFirewallItem struct {
policy osapi.EgressNetworkPolicyRuleType
net *net.IPNet
}
type OsdnProxy struct {
kClient *kclientset.Clientset
osClient *osclient.Client
networkInfo *NetworkInfo
baseEndpointsHandler pconfig.EndpointsConfigHandler
lock sync.Mutex
firewall map[string][]proxyFirewallItem
allEndpoints []kapi.Endpoints
}
// Called by higher layers to create the proxy plugin instance; only used by nodes
func NewProxyPlugin(pluginName string, osClient *osclient.Client, kClient *kclientset.Clientset) (*OsdnProxy, error) {
if !osapi.IsOpenShiftMultitenantNetworkPlugin(pluginName) {
return nil, nil
}
return &OsdnProxy{
kClient: kClient,
osClient: osClient,
firewall: make(map[string][]proxyFirewallItem),
}, nil
}
func (proxy *OsdnProxy) Start(baseHandler pconfig.EndpointsConfigHandler) error {
glog.Infof("Starting multitenant SDN proxy endpoint filter")
var err error
proxy.networkInfo, err = getNetworkInfo(proxy.osClient)
if err != nil {
return fmt.Errorf("could not get network info: %s", err)
}
proxy.baseEndpointsHandler = baseHandler
policies, err := proxy.osClient.EgressNetworkPolicies(kapi.NamespaceAll).List(kapi.ListOptions{})
if err != nil {
return fmt.Errorf("Could not get EgressNetworkPolicies: %s", err)
}
for _, policy := range policies.Items {
proxy.updateNetworkPolicy(policy)
}
go utilwait.Forever(proxy.watchEgressNetworkPolicies, 0)
return nil
}
func (proxy *OsdnProxy) watchEgressNetworkPolicies() {
RunEventQueue(proxy.osClient, EgressNetworkPolicies, func(delta cache.Delta) error {
policy := delta.Object.(*osapi.EgressNetworkPolicy)
if delta.Type == cache.Deleted {
policy.Spec.Egress = nil
}
func() {
proxy.lock.Lock()
defer proxy.lock.Unlock()
proxy.updateNetworkPolicy(*policy)
if proxy.allEndpoints != nil {
proxy.updateEndpoints()
}
}()
return nil
})
}
func (proxy *OsdnProxy) updateNetworkPolicy(policy osapi.EgressNetworkPolicy) {
firewall := make([]proxyFirewallItem, len(policy.Spec.Egress))
for i, rule := range policy.Spec.Egress {
_, cidr, err := net.ParseCIDR(rule.To.CIDRSelector)
if err != nil {
// should have been caught by validation
glog.Errorf("Illegal CIDR value %q in EgressNetworkPolicy rule", rule.To.CIDRSelector)
return
}
firewall[i] = proxyFirewallItem{rule.Type, cidr}
}
if len(firewall) > 0 {
proxy.firewall[policy.Namespace] = firewall
} else {
delete(proxy.firewall, policy.Namespace)
}
}
func (proxy *OsdnProxy) firewallBlocksIP(namespace string, ip net.IP) bool {
for _, item := range proxy.firewall[namespace] {
if item.net.Contains(ip) {
return item.policy == osapi.EgressNetworkPolicyRuleDeny
}
}
return false
}
func (proxy *OsdnProxy) OnEndpointsUpdate(allEndpoints []kapi.Endpoints) {
proxy.lock.Lock()
defer proxy.lock.Unlock()
proxy.allEndpoints = allEndpoints
proxy.updateEndpoints()
}
func (proxy *OsdnProxy) updateEndpoints() {
if len(proxy.firewall) == 0 {
proxy.baseEndpointsHandler.OnEndpointsUpdate(proxy.allEndpoints)
return
}
filteredEndpoints := make([]kapi.Endpoints, 0, len(proxy.allEndpoints))
EndpointLoop:
for _, ep := range proxy.allEndpoints {
ns := ep.ObjectMeta.Namespace
for _, ss := range ep.Subsets {
for _, addr := range ss.Addresses {
IP := net.ParseIP(addr.IP)
if !proxy.networkInfo.ClusterNetwork.Contains(IP) && !proxy.networkInfo.ServiceNetwork.Contains(IP) {
if proxy.firewallBlocksIP(ns, IP) {
glog.Warningf("Service '%s' in namespace '%s' has an Endpoint pointing to firewalled destination (%s)", ep.ObjectMeta.Name, ns, addr.IP)
continue EndpointLoop
}
}
}
}
filteredEndpoints = append(filteredEndpoints, ep)
}
proxy.baseEndpointsHandler.OnEndpointsUpdate(filteredEndpoints)
}