package integration

import (
	"fmt"
	"sync"
	"sync/atomic"
	"testing"

	"github.com/pborman/uuid"

	kapi "k8s.io/kubernetes/pkg/api"
	kapierrs "k8s.io/kubernetes/pkg/api/errors"
	"k8s.io/kubernetes/pkg/api/meta"
	"k8s.io/kubernetes/pkg/apiserver"
	"k8s.io/kubernetes/pkg/client/restclient"

	templatesapi "github.com/openshift/origin/pkg/template/api"
	testutil "github.com/openshift/origin/test/util"
	testserver "github.com/openshift/origin/test/util/server"
)

func TestPatchConflicts(t *testing.T) {

	testutil.RequireEtcd(t)
	defer testutil.DumpEtcdOnFailure(t)

	_, clusterAdminKubeConfig, err := testserver.StartTestMasterAPI()

	if err != nil {
		t.Fatalf("unexpected error: %v", err)
	}

	clusterAdminClient, err := testutil.GetClusterAdminClient(clusterAdminKubeConfig)
	if err != nil {
		t.Fatalf("unexpected error: %v", err)
	}

	clusterAdminKubeClientset, err := testutil.GetClusterAdminKubeClient(clusterAdminKubeConfig)
	if err != nil {
		t.Fatalf("unexpected error: %v", err)
	}

	objName := "myobj"
	ns := "patch-namespace"

	if _, err := clusterAdminKubeClientset.Core().Namespaces().Create(&kapi.Namespace{ObjectMeta: kapi.ObjectMeta{Name: ns}}); err != nil {
		t.Fatalf("Error creating namespace:%v", err)
	}
	if _, err := clusterAdminKubeClientset.Core().Secrets(ns).Create(&kapi.Secret{ObjectMeta: kapi.ObjectMeta{Name: objName}}); err != nil {
		t.Fatalf("Error creating k8s resource:%v", err)
	}
	if _, err := clusterAdminClient.Templates(ns).Create(&templatesapi.Template{ObjectMeta: kapi.ObjectMeta{Name: objName}}); err != nil {
		t.Fatalf("Error creating origin resource:%v", err)
	}

	testcases := []struct {
		client   *restclient.RESTClient
		resource string
	}{
		{
			client:   clusterAdminKubeClientset.CoreClient.RESTClient,
			resource: "secrets",
		},
		{
			client:   clusterAdminClient.RESTClient,
			resource: "templates",
		},
	}

	for _, tc := range testcases {
		successes := int32(0)

		// Force patch to deal with resourceVersion conflicts applying non-conflicting patches
		// ensure it handles reapplies without internal errors
		wg := sync.WaitGroup{}
		for i := 0; i < (2 * apiserver.MaxPatchConflicts); i++ {
			wg.Add(1)
			go func(labelName string) {
				defer wg.Done()
				labelValue := uuid.NewRandom().String()

				obj, err := tc.client.Patch(kapi.StrategicMergePatchType).
					Namespace(ns).
					Resource(tc.resource).
					Name(objName).
					Body([]byte(fmt.Sprintf(`{"metadata":{"labels":{"%s":"%s"}}}`, labelName, labelValue))).
					Do().
					Get()

				if kapierrs.IsConflict(err) {
					t.Logf("tolerated conflict error patching %s: %v", tc.resource, err)
					return
				}
				if err != nil {
					t.Errorf("error patching %s: %v", tc.resource, err)
					return
				}

				accessor, err := meta.Accessor(obj)
				if err != nil {
					t.Errorf("error getting object from %s: %v", tc.resource, err)
					return
				}
				if accessor.GetLabels()[labelName] != labelValue {
					t.Errorf("patch of %s was ineffective, expected %s=%s, got labels %#v", tc.resource, labelName, labelValue, accessor.GetLabels())
					return
				}

				atomic.AddInt32(&successes, 1)
			}(fmt.Sprintf("label-%d", i))
		}
		wg.Wait()

		if successes < apiserver.MaxPatchConflicts {
			t.Errorf("Expected at least %d successful patches for %s, got %d", apiserver.MaxPatchConflicts, tc.resource, successes)
		} else {
			t.Logf("Got %d successful patches for %s", successes, tc.resource)
		}
	}
}