package queue

import (
	"container/list"
	"fmt"
	"sync"

	"github.com/docker/go-events"
	"github.com/moby/swarmkit/v2/log"
)

// ErrQueueFull is returned by a Write operation when that Write causes the
// queue to reach its size limit.
var ErrQueueFull = fmt.Errorf("queue closed due to size limit")

// LimitQueue accepts all messages into a queue for asynchronous consumption by
// a sink until an upper limit of messages is reached. When that limit is
// reached, the entire Queue is Closed. It is thread safe but the
// sink must be reliable or events will be dropped.
// If a size of 0 is provided, the LimitQueue is considered limitless.
type LimitQueue struct {
	dst        events.Sink
	events     *list.List
	limit      uint64
	cond       *sync.Cond
	mu         sync.Mutex
	closed     bool
	full       chan struct{}
	fullClosed bool
}

// NewLimitQueue returns a queue to the provided Sink dst.
func NewLimitQueue(dst events.Sink, limit uint64) *LimitQueue {
	eq := LimitQueue{
		dst:    dst,
		events: list.New(),
		limit:  limit,
		full:   make(chan struct{}),
	}

	eq.cond = sync.NewCond(&eq.mu)
	go eq.run()
	return &eq
}

// Write accepts the events into the queue, only failing if the queue has
// been closed or has reached its size limit.
func (eq *LimitQueue) Write(event events.Event) error {
	eq.mu.Lock()
	defer eq.mu.Unlock()

	if eq.closed {
		return events.ErrSinkClosed
	}

	if eq.limit > 0 && uint64(eq.events.Len()) >= eq.limit {
		// If the limit has been reached, don't write the event to the queue,
		// and close the Full channel. This notifies listeners that the queue
		// is now full, but the sink is still permitted to consume events. It's
		// the responsibility of the listener to decide whether they want to
		// live with dropped events or whether they want to Close() the
		// LimitQueue
		if !eq.fullClosed {
			eq.fullClosed = true
			close(eq.full)
		}
		return ErrQueueFull
	}

	eq.events.PushBack(event)
	eq.cond.Signal() // signal waiters

	return nil
}

// Full returns a channel that is closed when the queue becomes full for the
// first time.
func (eq *LimitQueue) Full() chan struct{} {
	return eq.full
}

// Close shuts down the event queue, flushing all events
func (eq *LimitQueue) Close() error {
	eq.mu.Lock()
	defer eq.mu.Unlock()

	if eq.closed {
		return nil
	}

	// set the closed flag
	eq.closed = true
	eq.cond.Signal() // signal flushes queue
	eq.cond.Wait()   // wait for signal from last flush
	return eq.dst.Close()
}

// run is the main goroutine to flush events to the target sink.
func (eq *LimitQueue) run() {
	for {
		event := eq.next()

		if event == nil {
			return // nil block means event queue is closed.
		}

		if err := eq.dst.Write(event); err != nil {
			// TODO(aaronl): Dropping events could be bad depending
			// on the application. We should have a way of
			// communicating this condition. However, logging
			// at a log level above debug may not be appropriate.
			// Eventually, go-events should not use logrus at all,
			// and should bubble up conditions like this through
			// error values.
			log.L.WithFields(log.Fields{
				"event": event,
				"sink":  eq.dst,
			}).WithError(err).Debug("eventqueue: dropped event")
		}
	}
}

// Len returns the number of items that are currently stored in the queue and
// not consumed by its sink.
func (eq *LimitQueue) Len() int {
	eq.mu.Lock()
	defer eq.mu.Unlock()
	return eq.events.Len()
}

func (eq *LimitQueue) String() string {
	eq.mu.Lock()
	defer eq.mu.Unlock()
	return fmt.Sprintf("%v", eq.events)
}

// next encompasses the critical section of the run loop. When the queue is
// empty, it will block on the condition. If new data arrives, it will wake
// and return a block. When closed, a nil slice will be returned.
func (eq *LimitQueue) next() events.Event {
	eq.mu.Lock()
	defer eq.mu.Unlock()

	for eq.events.Len() < 1 {
		if eq.closed {
			eq.cond.Broadcast()
			return nil
		}

		eq.cond.Wait()
	}

	front := eq.events.Front()
	block := front.Value.(events.Event)
	eq.events.Remove(front)

	return block
}