package events

import (
	"bytes"
	"encoding/json"
	"fmt"
	"io"
	"testing"
	"time"

	"github.com/docker/docker/engine"
	"github.com/docker/docker/utils"
)

func TestEventsPublish(t *testing.T) {
	e := New()
	l1 := make(chan *utils.JSONMessage)
	l2 := make(chan *utils.JSONMessage)
	e.subscribe(l1)
	e.subscribe(l2)
	count := e.subscribersCount()
	if count != 2 {
		t.Fatalf("Must be 2 subscribers, got %d", count)
	}
	go e.log("test", "cont", "image")
	select {
	case msg := <-l1:
		if len(e.events) != 1 {
			t.Fatalf("Must be only one event, got %d", len(e.events))
		}
		if msg.Status != "test" {
			t.Fatalf("Status should be test, got %s", msg.Status)
		}
		if msg.ID != "cont" {
			t.Fatalf("ID should be cont, got %s", msg.ID)
		}
		if msg.From != "image" {
			t.Fatalf("From should be image, got %s", msg.From)
		}
	case <-time.After(1 * time.Second):
		t.Fatal("Timeout waiting for broadcasted message")
	}
	select {
	case msg := <-l2:
		if len(e.events) != 1 {
			t.Fatalf("Must be only one event, got %d", len(e.events))
		}
		if msg.Status != "test" {
			t.Fatalf("Status should be test, got %s", msg.Status)
		}
		if msg.ID != "cont" {
			t.Fatalf("ID should be cont, got %s", msg.ID)
		}
		if msg.From != "image" {
			t.Fatalf("From should be image, got %s", msg.From)
		}
	case <-time.After(1 * time.Second):
		t.Fatal("Timeout waiting for broadcasted message")
	}
}

func TestEventsPublishTimeout(t *testing.T) {
	e := New()
	l := make(chan *utils.JSONMessage)
	e.subscribe(l)

	c := make(chan struct{})
	go func() {
		e.log("test", "cont", "image")
		close(c)
	}()

	select {
	case <-c:
	case <-time.After(time.Second):
		t.Fatal("Timeout publishing message")
	}
}

func TestLogEvents(t *testing.T) {
	e := New()
	eng := engine.New()
	if err := e.Install(eng); err != nil {
		t.Fatal(err)
	}

	for i := 0; i < eventsLimit+16; i++ {
		action := fmt.Sprintf("action_%d", i)
		id := fmt.Sprintf("cont_%d", i)
		from := fmt.Sprintf("image_%d", i)
		job := eng.Job("log", action, id, from)
		if err := job.Run(); err != nil {
			t.Fatal(err)
		}
	}
	time.Sleep(50 * time.Millisecond)
	if len(e.events) != eventsLimit {
		t.Fatalf("Must be %d events, got %d", eventsLimit, len(e.events))
	}

	job := eng.Job("events")
	job.SetenvInt64("since", 1)
	job.SetenvInt64("until", time.Now().Unix())
	buf := bytes.NewBuffer(nil)
	job.Stdout.Add(buf)
	if err := job.Run(); err != nil {
		t.Fatal(err)
	}
	buf = bytes.NewBuffer(buf.Bytes())
	dec := json.NewDecoder(buf)
	var msgs []utils.JSONMessage
	for {
		var jm utils.JSONMessage
		if err := dec.Decode(&jm); err != nil {
			if err == io.EOF {
				break
			}
			t.Fatal(err)
		}
		msgs = append(msgs, jm)
	}
	if len(msgs) != eventsLimit {
		t.Fatalf("Must be %d events, got %d", eventsLimit, len(msgs))
	}
	first := msgs[0]
	if first.Status != "action_16" {
		t.Fatalf("First action is %s, must be action_15", first.Status)
	}
	last := msgs[len(msgs)-1]
	if last.Status != "action_79" {
		t.Fatalf("First action is %s, must be action_79", first.Status)
	}
}

func TestEventsCountJob(t *testing.T) {
	e := New()
	eng := engine.New()
	if err := e.Install(eng); err != nil {
		t.Fatal(err)
	}
	l1 := make(chan *utils.JSONMessage)
	l2 := make(chan *utils.JSONMessage)
	e.subscribe(l1)
	e.subscribe(l2)
	job := eng.Job("subscribers_count")
	env, _ := job.Stdout.AddEnv()
	if err := job.Run(); err != nil {
		t.Fatal(err)
	}
	count := env.GetInt("count")
	if count != 2 {
		t.Fatalf("There must be 2 subscribers, got %d", count)
	}
}