package logger

import (
	"errors"
	"sync"
	"sync/atomic"

	"github.com/Sirupsen/logrus"
)

const (
	defaultRingMaxSize = 1e6 // 1MB
)

// RingLogger is a ring buffer that implements the Logger interface.
// This is used when lossy logging is OK.
type RingLogger struct {
	buffer    *messageRing
	l         Logger
	logInfo   Info
	closeFlag int32
}

type ringWithReader struct {
	*RingLogger
}

func (r *ringWithReader) ReadLogs(cfg ReadConfig) *LogWatcher {
	reader, ok := r.l.(LogReader)
	if !ok {
		// something is wrong if we get here
		panic("expected log reader")
	}
	return reader.ReadLogs(cfg)
}

func newRingLogger(driver Logger, logInfo Info, maxSize int64) *RingLogger {
	l := &RingLogger{
		buffer:  newRing(maxSize),
		l:       driver,
		logInfo: logInfo,
	}
	go l.run()
	return l
}

// NewRingLogger creates a new Logger that is implemented as a RingBuffer wrapping
// the passed in logger.
func NewRingLogger(driver Logger, logInfo Info, maxSize int64) Logger {
	if maxSize < 0 {
		maxSize = defaultRingMaxSize
	}
	l := newRingLogger(driver, logInfo, maxSize)
	if _, ok := driver.(LogReader); ok {
		return &ringWithReader{l}
	}
	return l
}

// Log queues messages into the ring buffer
func (r *RingLogger) Log(msg *Message) error {
	if r.closed() {
		return errClosed
	}
	return r.buffer.Enqueue(msg)
}

// Name returns the name of the underlying logger
func (r *RingLogger) Name() string {
	return r.l.Name()
}

func (r *RingLogger) closed() bool {
	return atomic.LoadInt32(&r.closeFlag) == 1
}

func (r *RingLogger) setClosed() {
	atomic.StoreInt32(&r.closeFlag, 1)
}

// Close closes the logger
func (r *RingLogger) Close() error {
	r.setClosed()
	r.buffer.Close()
	// empty out the queue
	var logErr bool
	for _, msg := range r.buffer.Drain() {
		if logErr {
			// some error logging a previous message, so re-insert to message pool
			// and assume log driver is hosed
			PutMessage(msg)
			continue
		}

		if err := r.l.Log(msg); err != nil {
			logrus.WithField("driver", r.l.Name()).WithField("container", r.logInfo.ContainerID).Errorf("Error writing log message: %v", r.l)
			logErr = true
		}
	}
	return r.l.Close()
}

// run consumes messages from the ring buffer and forwards them to the underling
// logger.
// This is run in a goroutine when the RingLogger is created
func (r *RingLogger) run() {
	for {
		if r.closed() {
			return
		}
		msg, err := r.buffer.Dequeue()
		if err != nil {
			// buffer is closed
			return
		}
		if err := r.l.Log(msg); err != nil {
			logrus.WithField("driver", r.l.Name()).WithField("container", r.logInfo.ContainerID).Errorf("Error writing log message: %v", r.l)
		}
	}
}

type messageRing struct {
	mu sync.Mutex
	// signals callers of `Dequeue` to wake up either on `Close` or when a new `Message` is added
	wait *sync.Cond

	sizeBytes int64 // current buffer size
	maxBytes  int64 // max buffer size size
	queue     []*Message
	closed    bool
}

func newRing(maxBytes int64) *messageRing {
	queueSize := 1000
	if maxBytes == 0 || maxBytes == 1 {
		// With 0 or 1 max byte size, the maximum size of the queue would only ever be 1
		// message long.
		queueSize = 1
	}

	r := &messageRing{queue: make([]*Message, 0, queueSize), maxBytes: maxBytes}
	r.wait = sync.NewCond(&r.mu)
	return r
}

// Enqueue adds a message to the buffer queue
// If the message is too big for the buffer it drops the oldest messages to make room
// If there are no messages in the queue and the message is still too big, it adds the message anyway.
func (r *messageRing) Enqueue(m *Message) error {
	mSize := int64(len(m.Line))

	r.mu.Lock()
	if r.closed {
		r.mu.Unlock()
		return errClosed
	}
	if mSize+r.sizeBytes > r.maxBytes && len(r.queue) > 0 {
		r.wait.Signal()
		r.mu.Unlock()
		return nil
	}

	r.queue = append(r.queue, m)
	r.sizeBytes += mSize
	r.wait.Signal()
	r.mu.Unlock()
	return nil
}

// Dequeue pulls a message off the queue
// If there are no messages, it waits for one.
// If the buffer is closed, it will return immediately.
func (r *messageRing) Dequeue() (*Message, error) {
	r.mu.Lock()
	for len(r.queue) == 0 && !r.closed {
		r.wait.Wait()
	}

	if r.closed {
		r.mu.Unlock()
		return nil, errClosed
	}

	msg := r.queue[0]
	r.queue = r.queue[1:]
	r.sizeBytes -= int64(len(msg.Line))
	r.mu.Unlock()
	return msg, nil
}

var errClosed = errors.New("closed")

// Close closes the buffer ensuring no new messages can be added.
// Any callers waiting to dequeue a message will be woken up.
func (r *messageRing) Close() {
	r.mu.Lock()
	if r.closed {
		r.mu.Unlock()
		return
	}

	r.closed = true
	r.wait.Broadcast()
	r.mu.Unlock()
	return
}

// Drain drains all messages from the queue.
// This can be used after `Close()` to get any remaining messages that were in queue.
func (r *messageRing) Drain() []*Message {
	r.mu.Lock()
	ls := make([]*Message, 0, len(r.queue))
	ls = append(ls, r.queue...)
	r.sizeBytes = 0
	r.queue = r.queue[:0]
	r.mu.Unlock()
	return ls
}