package controller

import (
	"fmt"
	"testing"
	"time"

	kapi "k8s.io/kubernetes/pkg/api"
	apierrs "k8s.io/kubernetes/pkg/api/errors"
	"k8s.io/kubernetes/pkg/api/unversioned"
	"k8s.io/kubernetes/pkg/util/diff"

	client "github.com/openshift/origin/pkg/client/testclient"
	"github.com/openshift/origin/pkg/dockerregistry"
	"github.com/openshift/origin/pkg/image/api"

	_ "github.com/openshift/origin/pkg/api/install"
)

type expectedImage struct {
	Tag   string
	ID    string
	Image *dockerregistry.Image
	Err   error
}

type fakeDockerRegistryClient struct {
	Registry                 string
	Namespace, Name, Tag, ID string
	Insecure                 bool

	Tags    map[string]string
	Err     error
	ConnErr error

	Images []expectedImage

	Called bool
}

func (f *fakeDockerRegistryClient) Connect(registry string, insecure bool) (dockerregistry.Connection, error) {
	f.Called = true
	f.Registry = registry
	f.Insecure = insecure
	return f, f.ConnErr
}

func (f *fakeDockerRegistryClient) ImageTags(namespace, name string) (map[string]string, error) {
	f.Called = true
	f.Namespace, f.Name = namespace, name
	return f.Tags, f.Err
}

func (f *fakeDockerRegistryClient) ImageByTag(namespace, name, tag string) (*dockerregistry.Image, error) {
	f.Called = true
	if len(tag) == 0 {
		tag = api.DefaultImageTag
	}
	f.Namespace, f.Name, f.Tag = namespace, name, tag
	for _, t := range f.Images {
		if t.Tag == tag {
			return t.Image, t.Err
		}
	}
	return nil, dockerregistry.NewImageNotFoundError(fmt.Sprintf("%s/%s", namespace, name), tag, tag)
}

func (f *fakeDockerRegistryClient) ImageByID(namespace, name, id string) (*dockerregistry.Image, error) {
	f.Called = true
	f.Namespace, f.Name, f.ID = namespace, name, id
	for _, t := range f.Images {
		if t.ID == id {
			return t.Image, t.Err
		}
	}
	return nil, dockerregistry.NewImageNotFoundError(fmt.Sprintf("%s/%s", namespace, name), id, "")
}

func TestControllerStart(t *testing.T) {
	two := int64(2)
	testCases := []struct {
		stream *api.ImageStream
		run    bool
	}{
		{
			stream: &api.ImageStream{
				ObjectMeta: kapi.ObjectMeta{
					Annotations: map[string]string{api.DockerImageRepositoryCheckAnnotation: unversioned.Now().UTC().Format(time.RFC3339)},
					Name:        "test",
					Namespace:   "other",
				},
			},
		},
		{
			stream: &api.ImageStream{
				ObjectMeta: kapi.ObjectMeta{
					Annotations: map[string]string{api.DockerImageRepositoryCheckAnnotation: unversioned.Now().UTC().Format(time.RFC3339)},
					Name:        "test",
					Namespace:   "other",
				},
				Spec: api.ImageStreamSpec{
					DockerImageRepository: "test/other",
				},
			},
		},
		{
			stream: &api.ImageStream{
				ObjectMeta: kapi.ObjectMeta{
					Annotations: map[string]string{api.DockerImageRepositoryCheckAnnotation: "a random error"},
					Name:        "test",
					Namespace:   "other",
				},
				Spec: api.ImageStreamSpec{
					DockerImageRepository: "test/other",
				},
			},
		},

		// references are ignored
		{
			stream: &api.ImageStream{
				ObjectMeta: kapi.ObjectMeta{Name: "test", Namespace: "other"},
				Spec: api.ImageStreamSpec{
					Tags: map[string]api.TagReference{
						"latest": {
							From:      &kapi.ObjectReference{Kind: "DockerImage", Name: "test/other:latest"},
							Reference: true,
						},
					},
				},
			},
		},
		{
			stream: &api.ImageStream{
				ObjectMeta: kapi.ObjectMeta{Name: "test", Namespace: "other"},
				Spec: api.ImageStreamSpec{
					Tags: map[string]api.TagReference{
						"latest": {
							From:      &kapi.ObjectReference{Kind: "AnotherImage", Name: "test/other:latest"},
							Reference: true,
						},
					},
				},
			},
		},

		// spec tag will be imported
		{
			run: true,
			stream: &api.ImageStream{
				ObjectMeta: kapi.ObjectMeta{Name: "test", Namespace: "other"},
				Spec: api.ImageStreamSpec{
					Tags: map[string]api.TagReference{
						"latest": {
							From: &kapi.ObjectReference{Kind: "DockerImage", Name: "test/other:latest"},
						},
					},
				},
			},
		},
		// spec tag with generation with no pending status will be imported
		{
			run: true,
			stream: &api.ImageStream{
				ObjectMeta: kapi.ObjectMeta{Name: "test", Namespace: "other"},
				Spec: api.ImageStreamSpec{
					Tags: map[string]api.TagReference{
						"latest": {
							From:       &kapi.ObjectReference{Kind: "DockerImage", Name: "test/other:latest"},
							Generation: &two,
						},
					},
				},
			},
		},
		// spec tag with generation with older status generation will be imported
		{
			run: true,
			stream: &api.ImageStream{
				ObjectMeta: kapi.ObjectMeta{Name: "test", Namespace: "other"},
				Spec: api.ImageStreamSpec{
					Tags: map[string]api.TagReference{
						"latest": {
							From:       &kapi.ObjectReference{Kind: "DockerImage", Name: "test/other:latest"},
							Generation: &two,
						},
					},
				},
				Status: api.ImageStreamStatus{
					Tags: map[string]api.TagEventList{"latest": {Items: []api.TagEvent{{Generation: 1}}}},
				},
			},
		},
		// spec tag with generation with status condition error and equal generation will not be imported
		{
			stream: &api.ImageStream{
				ObjectMeta: kapi.ObjectMeta{
					Annotations: map[string]string{api.DockerImageRepositoryCheckAnnotation: unversioned.Now().UTC().Format(time.RFC3339)},
					Name:        "test",
					Namespace:   "other",
				},
				Spec: api.ImageStreamSpec{
					Tags: map[string]api.TagReference{
						"latest": {
							From:       &kapi.ObjectReference{Kind: "DockerImage", Name: "test/other:latest"},
							Generation: &two,
						},
					},
				},
				Status: api.ImageStreamStatus{
					Tags: map[string]api.TagEventList{"latest": {Conditions: []api.TagEventCondition{
						{
							Type:       api.ImportSuccess,
							Status:     kapi.ConditionFalse,
							Generation: 2,
						},
					}}},
				},
			},
		},
		// spec tag with generation with status condition error and older generation will be imported
		{
			run: true,
			stream: &api.ImageStream{
				ObjectMeta: kapi.ObjectMeta{
					Annotations: map[string]string{api.DockerImageRepositoryCheckAnnotation: unversioned.Now().UTC().Format(time.RFC3339)},
					Name:        "test",
					Namespace:   "other",
				},
				Spec: api.ImageStreamSpec{
					Tags: map[string]api.TagReference{
						"latest": {
							From:       &kapi.ObjectReference{Kind: "DockerImage", Name: "test/other:latest"},
							Generation: &two,
						},
					},
				},
				Status: api.ImageStreamStatus{
					Tags: map[string]api.TagEventList{"latest": {Conditions: []api.TagEventCondition{
						{
							Type:       api.ImportSuccess,
							Status:     kapi.ConditionFalse,
							Generation: 1,
						},
					}}},
				},
			},
		},
		// spec tag with generation with older status generation will be imported
		{
			run: true,
			stream: &api.ImageStream{
				ObjectMeta: kapi.ObjectMeta{
					Annotations: map[string]string{api.DockerImageRepositoryCheckAnnotation: unversioned.Now().UTC().Format(time.RFC3339)},
					Name:        "test",
					Namespace:   "other",
				},
				Spec: api.ImageStreamSpec{
					Tags: map[string]api.TagReference{
						"latest": {
							From:       &kapi.ObjectReference{Kind: "DockerImage", Name: "test/other:latest"},
							Generation: &two,
						},
					},
				},
				Status: api.ImageStreamStatus{
					Tags: map[string]api.TagEventList{"latest": {Items: []api.TagEvent{{Generation: 1}}}},
				},
			},
		},
	}

	for i, test := range testCases {
		fake := &client.Fake{}
		c := ImportController{streams: fake}
		other, err := kapi.Scheme.DeepCopy(test.stream)
		if err != nil {
			t.Fatal(err)
		}

		if err := c.Next(test.stream, nil); err != nil {
			t.Errorf("%d: unexpected error: %v", i, err)
		}
		if test.run {
			if len(fake.Actions()) == 0 {
				t.Errorf("%d: expected remote calls: %#v", i, fake)
			}
		} else {
			if !kapi.Semantic.DeepEqual(test.stream, other) {
				t.Errorf("%d: did not expect change to stream: %s", i, diff.ObjectGoPrintDiff(test.stream, other))
			}
			if len(fake.Actions()) != 0 {
				t.Errorf("%d: did not expect remote calls", i)
			}
		}
	}
}

func TestControllerExternalRepo(t *testing.T) {
	fake := &client.Fake{}
	c := ImportController{streams: fake}

	stream := api.ImageStream{
		ObjectMeta: kapi.ObjectMeta{
			Name:      "test",
			Namespace: "other",
		},
		Spec: api.ImageStreamSpec{
			Tags: map[string]api.TagReference{
				"1.1": {
					From: &kapi.ObjectReference{
						Kind: "DockerImage",
						Name: "some/repo:mytag",
					},
				},
			},
		},
	}
	if err := c.Next(&stream, nil); err != nil {
		t.Errorf("unexpected error: %v", err)
	}
	actions := fake.Actions()
	if len(actions) != 1 {
		t.Fatalf("expected 1 actions, got %#v", actions)
	}
	if !actions[0].Matches("create", "imagestreamimports") {
		t.Errorf("expected a create action: %#v", actions)
	}
}

func TestScheduledImport(t *testing.T) {
	fake := &client.Fake{}
	b := newScheduled(true, fake, 1, nil, nil)

	one := int64(1)
	stream := &api.ImageStream{
		ObjectMeta: kapi.ObjectMeta{
			Name: "test", Namespace: "other", UID: "1", ResourceVersion: "1",
			Annotations: map[string]string{api.DockerImageRepositoryCheckAnnotation: "done"},
			Generation:  1,
		},
		Spec: api.ImageStreamSpec{
			Tags: map[string]api.TagReference{
				"default": {
					From:         &kapi.ObjectReference{Kind: "DockerImage", Name: "mysql:latest"},
					Generation:   &one,
					ImportPolicy: api.TagImportPolicy{Scheduled: true},
				},
			},
		},
		Status: api.ImageStreamStatus{
			Tags: map[string]api.TagEventList{
				"default": {Items: []api.TagEvent{{Generation: 1}}},
			},
		},
	}
	successfulImport := &api.ImageStreamImport{
		ObjectMeta: kapi.ObjectMeta{Name: "test"},
		Spec: api.ImageStreamImportSpec{
			Import: true,
			Images: []api.ImageImportSpec{{From: kapi.ObjectReference{Kind: "DockerImage", Name: "mysql:latest"}}},
		},
		Status: api.ImageStreamImportStatus{
			Images: []api.ImageImportStatus{{
				Status: unversioned.Status{Status: unversioned.StatusSuccess},
				Image:  &api.Image{},
			}},
		},
	}

	// queue, but don't import the stream
	if err := b.Handle(stream); err != nil {
		t.Fatal(err)
	}
	if b.scheduler.Len() != 1 {
		t.Fatalf("should have scheduled: %#v", b.scheduler)
	}
	if len(fake.Actions()) != 0 {
		t.Fatalf("should have made no calls: %#v", fake)
	}

	// run a background import
	fake = client.NewSimpleFake(stream, successfulImport)
	b.controller.streams = fake
	b.scheduler.RunOnce()
	if b.scheduler.Len() != 1 {
		t.Fatalf("should have left item in scheduler: %#v", b.scheduler)
	}
	if len(fake.Actions()) != 2 || !fake.Actions()[0].Matches("get", "imagestreams") || !fake.Actions()[1].Matches("create", "imagestreamimports") {
		t.Fatalf("invalid actions: %#v", fake.Actions())
	}
	var key, value interface{}
	for k, v := range b.scheduler.Map() {
		key, value = k, v
		break
	}

	// encountering a not found error for image streams should drop the controller
	status := apierrs.NewNotFound(api.Resource("imagestream"), "test").ErrStatus
	fake = client.NewSimpleFake(&status)
	b.controller.streams = fake
	b.scheduler.RunOnce()
	if b.scheduler.Len() != 0 {
		t.Fatalf("should have removed item in scheduler: %#v", b.scheduler)
	}
	if len(fake.Actions()) != 1 || !fake.Actions()[0].Matches("get", "imagestreams") {
		t.Fatalf("invalid actions: %#v", fake.Actions())
	}

	// requeue the stream with a new resource version
	stream.ResourceVersion = "2"
	if err := b.Handle(stream); err != nil {
		t.Fatal(err)
	}
	if b.scheduler.Len() != 1 {
		t.Fatalf("should have scheduled: %#v", b.scheduler)
	}

	// simulate a race where another caller attempts to dequeue the item
	if b.scheduler.Remove(key, value) {
		t.Fatalf("should not have removed %s: %#v", key, b.scheduler)
	}
	if b.scheduler.Len() != 1 {
		t.Fatalf("should have left scheduled: %#v", b.scheduler)
	}
}