package plugin
import (
"encoding/json"
"fmt"
"net"
"net/http"
"os"
"path/filepath"
"strings"
"testing"
"github.com/openshift/origin/pkg/sdn/plugin/cniserver"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
kapi "k8s.io/kubernetes/pkg/api"
kunversioned "k8s.io/kubernetes/pkg/api/unversioned"
kcontainer "k8s.io/kubernetes/pkg/kubelet/container"
kcontainertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
khostport "k8s.io/kubernetes/pkg/kubelet/network/hostport"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
cnitypes "github.com/containernetworking/cni/pkg/types"
)
type operation struct {
command cniserver.CNICommand
namespace string
name string
cidr string // pod CIDR for add operation
failStr string // error string for failing the operation
request *cniserver.PodRequest // filled in automatically from other info
result *cnitypes.Result
}
type expectedPod struct {
// IP address to return for the pod's ADD operation
cidr string
added bool
updated uint
deleted bool
errors map[cniserver.CNICommand]string
}
type podTester struct {
t *testing.T
testname string
client *http.Client
// Holds list of expected pods and their IP address for the ADD operation
pods map[string]*expectedPod
}
func newPodTester(t *testing.T, testname string, socketPath string) *podTester {
client := &http.Client{
Transport: &http.Transport{
Dial: func(proto, addr string) (net.Conn, error) {
return net.Dial("unix", socketPath)
},
},
}
return &podTester{
t: t,
testname: testname,
client: client,
pods: make(map[string]*expectedPod),
}
}
func ptPodKey(namespace, name string) string {
return fmt.Sprintf("%s/%s", namespace, name)
}
func (pt *podTester) getExpectedPod(namespace, name string, command cniserver.CNICommand) (*expectedPod, error) {
pod := pt.pods[ptPodKey(namespace, name)]
if pod == nil {
return nil, fmt.Errorf("pod not found!")
} else if failStr, ok := pod.errors[command]; ok {
return nil, fmt.Errorf(failStr)
}
return pod, nil
}
func (pt *podTester) addExpectedPod(t *testing.T, op *operation) {
pk := ptPodKey(op.namespace, op.name)
pod, ok := pt.pods[pk]
if !ok {
pod = &expectedPod{
cidr: op.cidr,
errors: make(map[cniserver.CNICommand]string),
}
pt.pods[pk] = pod
}
if op.failStr != "" {
pod.errors[op.command] = op.failStr
}
}
func (pt *podTester) setup(req *cniserver.PodRequest) (*cnitypes.Result, *khostport.RunningPod, error) {
pod, err := pt.getExpectedPod(req.PodNamespace, req.PodName, req.Command)
if err != nil {
return nil, nil, err
} else if pod.added {
return nil, nil, fmt.Errorf("pod already added!")
}
pod.added = true
ip, ipnet, _ := net.ParseCIDR(pod.cidr)
result := &cnitypes.Result{
IP4: &cnitypes.IPConfig{
IP: net.IPNet{
IP: ip,
Mask: ipnet.Mask,
},
},
}
runningPod := &khostport.RunningPod{
Pod: &kapi.Pod{
TypeMeta: kunversioned.TypeMeta{
Kind: "Pod",
},
ObjectMeta: kapi.ObjectMeta{
Name: req.PodName,
Namespace: req.PodNamespace,
},
Spec: kapi.PodSpec{
Containers: []kapi.Container{
{
Name: "foobareasadfa",
Image: "awesome-image",
},
},
},
},
IP: ip,
}
return result, runningPod, nil
}
func (pt *podTester) update(req *cniserver.PodRequest) error {
pod, err := pt.getExpectedPod(req.PodNamespace, req.PodName, req.Command)
if err == nil {
pod.updated += 1
}
return err
}
func (pt *podTester) teardown(req *cniserver.PodRequest) error {
pod, err := pt.getExpectedPod(req.PodNamespace, req.PodName, req.Command)
if err == nil {
pod.deleted = true
}
return err
}
type fakeHost struct {
runtime kcontainer.Runtime
}
func newFakeHost() *fakeHost {
return &fakeHost{
runtime: &kcontainertest.FakeRuntime{
AllPodList: []*kcontainertest.FakePod{},
},
}
}
func (fnh *fakeHost) GetPodByName(name, namespace string) (*kapi.Pod, bool) {
return nil, false
}
func (fnh *fakeHost) GetKubeClient() clientset.Interface {
return nil
}
func (fnh *fakeHost) GetRuntime() kcontainer.Runtime {
return fnh.runtime
}
type podcheck struct {
namespace string
name string
updateCount uint
}
func TestPodManager(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("cniserver")
if err != nil {
t.Fatalf("failed to create temp directory: %v", err)
}
defer os.RemoveAll(tmpDir)
socketPath := filepath.Join(tmpDir, "cni-server.sock")
testcases := map[string]struct {
operations []*operation
checks []*podcheck
}{
"ADD+DEL one pod": {
operations: []*operation{
{
command: cniserver.CNI_ADD,
namespace: "namespace1",
name: "pod1",
cidr: "10.1.2.4/24",
},
{
command: cniserver.CNI_DEL,
namespace: "namespace1",
name: "pod1",
},
},
checks: []*podcheck{
{
namespace: "namespace1",
name: "pod1",
updateCount: 0,
},
},
},
"ADD+UPDATE+DEL many pod": {
operations: []*operation{
{
command: cniserver.CNI_ADD,
namespace: "namespace1",
name: "pod1",
cidr: "10.1.2.4/24",
},
{
command: cniserver.CNI_UPDATE,
namespace: "namespace1",
name: "pod1",
},
{
command: cniserver.CNI_ADD,
namespace: "namespace2",
name: "pod2",
cidr: "10.1.2.3/24",
},
{
command: cniserver.CNI_ADD,
namespace: "namespace3",
name: "pod3",
cidr: "10.1.2.2/24",
},
{
command: cniserver.CNI_UPDATE,
namespace: "namespace2",
name: "pod2",
},
{
command: cniserver.CNI_DEL,
namespace: "namespace1",
name: "pod1",
},
{
command: cniserver.CNI_UPDATE,
namespace: "namespace2",
name: "pod2",
},
{
command: cniserver.CNI_DEL,
namespace: "namespace3",
name: "pod3",
},
{
command: cniserver.CNI_DEL,
namespace: "namespace2",
name: "pod2",
},
{
command: cniserver.CNI_DEL,
namespace: "namespace2",
name: "pod2",
},
},
checks: []*podcheck{
{
namespace: "namespace1",
name: "pod1",
updateCount: 1,
},
{
namespace: "namespace2",
name: "pod2",
updateCount: 2,
},
{
namespace: "namespace3",
name: "pod3",
updateCount: 0,
},
},
},
"ADD error": {
operations: []*operation{
{
command: cniserver.CNI_ADD,
namespace: "namespace1",
name: "pod1",
cidr: "10.1.2.5/24",
failStr: "fail hard",
},
},
},
"UPDATE error": {
operations: []*operation{
{
command: cniserver.CNI_ADD,
namespace: "namespace2",
name: "pod2",
cidr: "10.1.2.5/24",
},
{
command: cniserver.CNI_UPDATE,
namespace: "namespace2",
name: "pod2",
failStr: "fail harder",
},
},
},
"DEL error": {
operations: []*operation{
{
command: cniserver.CNI_ADD,
namespace: "namespace3",
name: "pod3",
cidr: "10.1.2.5/24",
},
{
command: cniserver.CNI_DEL,
namespace: "namespace3",
name: "pod3",
failStr: "fail like a rock",
},
},
},
"unknown command": {
operations: []*operation{
{
command: cniserver.CNICommand("foobar!"),
namespace: "namespace3",
name: "pod3",
failStr: "unhandled CNI request foobar!",
},
},
},
}
for k, tc := range testcases {
podTester := newPodTester(t, k, socketPath)
podManager := newDefaultPodManager(newFakeHost())
podManager.podHandler = podTester
podManager.Start(socketPath)
// Add pods to our expected pod list before kicking off the
// actual pod setup to ensure we don't concurrently access
// our pod map from different goroutines
for _, op := range tc.operations {
podTester.addExpectedPod(t, op)
}
for _, op := range tc.operations {
op.request = &cniserver.PodRequest{
Command: op.command,
PodNamespace: op.namespace,
PodName: op.name,
ContainerId: "asd;lfkajsdflkajfs",
Netns: "/some/network/namespace",
Result: make(chan *cniserver.PodResult),
}
podManager.addRequest(op.request)
}
for _, op := range tc.operations {
result := podManager.waitRequest(op.request)
if op.failStr != "" {
if result.Err == nil {
t.Fatalf("[%s] unexpected %v result success", k, op)
}
if !strings.HasPrefix(fmt.Sprintf("%v", result.Err), op.failStr) {
t.Fatalf("[%s] unexpected %v error: %v", k, op, result.Err)
}
} else {
if result.Err != nil {
t.Fatalf("[%s] unexpected %v result error %v", k, op, result.Err)
}
if op.command == cniserver.CNI_ADD {
if result.Response == nil {
t.Fatalf("[%s] unexpected %v nil result response", k, op)
}
var cniResult *cnitypes.Result
if err := json.Unmarshal(result.Response, &cniResult); err != nil {
t.Fatalf("[%s] unexpected error unmarshalling CNI result '%s': %v", k, string(result.Response), err)
}
if cniResult.IP4.IP.String() != op.cidr {
t.Fatalf("[%s] expected ADD IP %s but got %s", k, op.cidr, cniResult.IP4.IP.String())
}
}
}
}
// Verify pod operations performed as requested
for _, check := range tc.checks {
pod, err := podTester.getExpectedPod(check.namespace, check.name, "")
if err != nil {
t.Fatalf("[%s] expected pod %v: %v", k, check, err)
}
if len(pod.errors) > 0 {
// expected error; don't check operations
continue
}
if !pod.added {
t.Fatalf("[%s] added pod %v not marked added", k, check)
}
if pod.updated != check.updateCount {
t.Fatalf("[%s] pod %v update count wrong, got %v", k, check, pod.updated)
}
if !pod.deleted {
t.Fatalf("[%s] expected pod %v to be deleted", k, check)
}
// Make sure it's gone from the podManager too
if podManager.getPod(&cniserver.PodRequest{
PodNamespace: check.namespace,
PodName: check.name,
}) != nil {
t.Fatalf("[%s] expected pod %v to be deleted from podManager", k, check)
}
}
}
}
// Test a direct pod update, not through the CNIServer, like the node process
// currently does due to lack of a standard CNI update command
func TestDirectPodUpdate(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("cniserver")
if err != nil {
t.Fatalf("failed to create temp directory: %v", err)
}
defer os.RemoveAll(tmpDir)
socketPath := filepath.Join(tmpDir, "cni-server.sock")
podTester := newPodTester(t, "update", socketPath)
podManager := newDefaultPodManager(newFakeHost())
podManager.podHandler = podTester
podManager.Start(socketPath)
op := &operation{
command: cniserver.CNI_UPDATE,
namespace: "foobarnamespace",
name: "foobarname",
}
podTester.addExpectedPod(t, op)
req := &cniserver.PodRequest{
Command: op.command,
PodNamespace: op.namespace,
PodName: op.name,
ContainerId: "asdfasdfasdfaf",
Result: make(chan *cniserver.PodResult),
}
// Send request and wait for the result
if _, err = podManager.handleCNIRequest(req); err != nil {
t.Fatalf("failed to update pod: %v", err)
}
}