package integration
import (
"fmt"
"sync"
"sync/atomic"
"testing"
"github.com/pborman/uuid"
kapi "k8s.io/kubernetes/pkg/api"
kapierrs "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/apiserver"
"k8s.io/kubernetes/pkg/client/restclient"
templatesapi "github.com/openshift/origin/pkg/template/api"
testutil "github.com/openshift/origin/test/util"
testserver "github.com/openshift/origin/test/util/server"
)
func TestPatchConflicts(t *testing.T) {
testutil.RequireEtcd(t)
defer testutil.DumpEtcdOnFailure(t)
_, clusterAdminKubeConfig, err := testserver.StartTestMasterAPI()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
clusterAdminClient, err := testutil.GetClusterAdminClient(clusterAdminKubeConfig)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
clusterAdminKubeClientset, err := testutil.GetClusterAdminKubeClient(clusterAdminKubeConfig)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
objName := "myobj"
ns := "patch-namespace"
if _, err := clusterAdminKubeClientset.Core().Namespaces().Create(&kapi.Namespace{ObjectMeta: kapi.ObjectMeta{Name: ns}}); err != nil {
t.Fatalf("Error creating namespace:%v", err)
}
if _, err := clusterAdminKubeClientset.Core().Secrets(ns).Create(&kapi.Secret{ObjectMeta: kapi.ObjectMeta{Name: objName}}); err != nil {
t.Fatalf("Error creating k8s resource:%v", err)
}
if _, err := clusterAdminClient.Templates(ns).Create(&templatesapi.Template{ObjectMeta: kapi.ObjectMeta{Name: objName}}); err != nil {
t.Fatalf("Error creating origin resource:%v", err)
}
testcases := []struct {
client *restclient.RESTClient
resource string
}{
{
client: clusterAdminKubeClientset.CoreClient.RESTClient,
resource: "secrets",
},
{
client: clusterAdminClient.RESTClient,
resource: "templates",
},
}
for _, tc := range testcases {
successes := int32(0)
// Force patch to deal with resourceVersion conflicts applying non-conflicting patches
// ensure it handles reapplies without internal errors
wg := sync.WaitGroup{}
for i := 0; i < (2 * apiserver.MaxPatchConflicts); i++ {
wg.Add(1)
go func(labelName string) {
defer wg.Done()
labelValue := uuid.NewRandom().String()
obj, err := tc.client.Patch(kapi.StrategicMergePatchType).
Namespace(ns).
Resource(tc.resource).
Name(objName).
Body([]byte(fmt.Sprintf(`{"metadata":{"labels":{"%s":"%s"}}}`, labelName, labelValue))).
Do().
Get()
if kapierrs.IsConflict(err) {
t.Logf("tolerated conflict error patching %s: %v", tc.resource, err)
return
}
if err != nil {
t.Errorf("error patching %s: %v", tc.resource, err)
return
}
accessor, err := meta.Accessor(obj)
if err != nil {
t.Errorf("error getting object from %s: %v", tc.resource, err)
return
}
if accessor.GetLabels()[labelName] != labelValue {
t.Errorf("patch of %s was ineffective, expected %s=%s, got labels %#v", tc.resource, labelName, labelValue, accessor.GetLabels())
return
}
atomic.AddInt32(&successes, 1)
}(fmt.Sprintf("label-%d", i))
}
wg.Wait()
if successes < apiserver.MaxPatchConflicts {
t.Errorf("Expected at least %d successful patches for %s, got %d", apiserver.MaxPatchConflicts, tc.resource, successes)
} else {
t.Logf("Got %d successful patches for %s", successes, tc.resource)
}
}
}