package events

import (
	"sync"
	"time"

	eventtypes "github.com/docker/docker/api/types/events"
	"github.com/docker/docker/pkg/pubsub"
)

const (
	eventsLimit = 256
	bufferSize  = 1024
)

// Events is pubsub channel for events generated by the engine.
type Events struct {
	mu     sync.Mutex
	events []eventtypes.Message
	pub    *pubsub.Publisher
}

// New returns new *Events instance
func New() *Events {
	return &Events{
		events: make([]eventtypes.Message, 0, eventsLimit),
		pub:    pubsub.NewPublisher(100*time.Millisecond, bufferSize),
	}
}

// Subscribe adds new listener to events, returns slice of 256 stored
// last events, a channel in which you can expect new events (in form
// of interface{}, so you need type assertion), and a function to call
// to stop the stream of events.
func (e *Events) Subscribe() ([]eventtypes.Message, chan interface{}, func()) {
	eventSubscribers.Inc()
	e.mu.Lock()
	current := make([]eventtypes.Message, len(e.events))
	copy(current, e.events)
	l := e.pub.Subscribe()
	e.mu.Unlock()

	cancel := func() {
		e.Evict(l)
	}
	return current, l, cancel
}

// SubscribeTopic adds new listener to events, returns slice of 256 stored
// last events, a channel in which you can expect new events (in form
// of interface{}, so you need type assertion).
func (e *Events) SubscribeTopic(since, until time.Time, ef *Filter) ([]eventtypes.Message, chan interface{}) {
	eventSubscribers.Inc()
	e.mu.Lock()

	var topic func(m interface{}) bool
	if ef != nil && ef.filter.Len() > 0 {
		topic = func(m interface{}) bool { return ef.Include(m.(eventtypes.Message)) }
	}

	buffered := e.loadBufferedEvents(since, until, topic)

	var ch chan interface{}
	if topic != nil {
		ch = e.pub.SubscribeTopic(topic)
	} else {
		// Subscribe to all events if there are no filters
		ch = e.pub.Subscribe()
	}

	e.mu.Unlock()
	return buffered, ch
}

// Evict evicts listener from pubsub
func (e *Events) Evict(l chan interface{}) {
	eventSubscribers.Dec()
	e.pub.Evict(l)
}

// Log creates a local scope message and publishes it
func (e *Events) Log(action, eventType string, actor eventtypes.Actor) {
	now := time.Now().UTC()
	jm := eventtypes.Message{
		Action:   action,
		Type:     eventType,
		Actor:    actor,
		Scope:    "local",
		Time:     now.Unix(),
		TimeNano: now.UnixNano(),
	}

	// fill deprecated fields for container and images
	switch eventType {
	case eventtypes.ContainerEventType:
		jm.ID = actor.ID
		jm.Status = action
		jm.From = actor.Attributes["image"]
	case eventtypes.ImageEventType:
		jm.ID = actor.ID
		jm.Status = action
	}

	e.PublishMessage(jm)
}

// PublishMessage broadcasts event to listeners. Each listener has 100 milliseconds to
// receive the event or it will be skipped.
func (e *Events) PublishMessage(jm eventtypes.Message) {
	eventsCounter.Inc()

	e.mu.Lock()
	if len(e.events) == cap(e.events) {
		// discard oldest event
		copy(e.events, e.events[1:])
		e.events[len(e.events)-1] = jm
	} else {
		e.events = append(e.events, jm)
	}
	e.mu.Unlock()
	e.pub.Publish(jm)
}

// SubscribersCount returns number of event listeners
func (e *Events) SubscribersCount() int {
	return e.pub.Len()
}

// loadBufferedEvents iterates over the cached events in the buffer
// and returns those that were emitted between two specific dates.
// It uses `time.Unix(seconds, nanoseconds)` to generate valid dates with those arguments.
// It filters those buffered messages with a topic function if it's not nil, otherwise it adds all messages.
func (e *Events) loadBufferedEvents(since, until time.Time, topic func(interface{}) bool) []eventtypes.Message {
	var buffered []eventtypes.Message
	if since.IsZero() && until.IsZero() {
		return buffered
	}

	var sinceNanoUnix int64
	if !since.IsZero() {
		sinceNanoUnix = since.UnixNano()
	}

	var untilNanoUnix int64
	if !until.IsZero() {
		untilNanoUnix = until.UnixNano()
	}

	for i := len(e.events) - 1; i >= 0; i-- {
		ev := e.events[i]

		if ev.TimeNano < sinceNanoUnix {
			break
		}

		if untilNanoUnix > 0 && ev.TimeNano > untilNanoUnix {
			continue
		}

		if topic == nil || topic(ev) {
			buffered = append([]eventtypes.Message{ev}, buffered...)
		}
	}
	return buffered
}