package ioutils

import (
	"bytes"
	"crypto/rand"
	"io"
	"math/big"
	"sync"
	"time"
)

type readCloserWrapper struct {
	io.Reader
	closer func() error
}

func (r *readCloserWrapper) Close() error {
	return r.closer()
}

func NewReadCloserWrapper(r io.Reader, closer func() error) io.ReadCloser {
	return &readCloserWrapper{
		Reader: r,
		closer: closer,
	}
}

type readerErrWrapper struct {
	reader io.Reader
	closer func()
}

func (r *readerErrWrapper) Read(p []byte) (int, error) {
	n, err := r.reader.Read(p)
	if err != nil {
		r.closer()
	}
	return n, err
}

func NewReaderErrWrapper(r io.Reader, closer func()) io.Reader {
	return &readerErrWrapper{
		reader: r,
		closer: closer,
	}
}

// bufReader allows the underlying reader to continue to produce
// output by pre-emptively reading from the wrapped reader.
// This is achieved by buffering this data in bufReader's
// expanding buffer.
type bufReader struct {
	sync.Mutex
	buf                  *bytes.Buffer
	reader               io.Reader
	err                  error
	wait                 sync.Cond
	drainBuf             []byte
	reuseBuf             []byte
	maxReuse             int64
	resetTimeout         time.Duration
	bufLenResetThreshold int64
	maxReadDataReset     int64
}

func NewBufReader(r io.Reader) *bufReader {
	var timeout int
	if randVal, err := rand.Int(rand.Reader, big.NewInt(120)); err == nil {
		timeout = int(randVal.Int64()) + 180
	} else {
		timeout = 300
	}
	reader := &bufReader{
		buf:                  &bytes.Buffer{},
		drainBuf:             make([]byte, 1024),
		reuseBuf:             make([]byte, 4096),
		maxReuse:             1000,
		resetTimeout:         time.Second * time.Duration(timeout),
		bufLenResetThreshold: 100 * 1024,
		maxReadDataReset:     10 * 1024 * 1024,
		reader:               r,
	}
	reader.wait.L = &reader.Mutex
	go reader.drain()
	return reader
}

func NewBufReaderWithDrainbufAndBuffer(r io.Reader, drainBuffer []byte, buffer *bytes.Buffer) *bufReader {
	reader := &bufReader{
		buf:      buffer,
		drainBuf: drainBuffer,
		reader:   r,
	}
	reader.wait.L = &reader.Mutex
	go reader.drain()
	return reader
}

func (r *bufReader) drain() {
	var (
		duration       time.Duration
		lastReset      time.Time
		now            time.Time
		reset          bool
		bufLen         int64
		dataSinceReset int64
		maxBufLen      int64
		reuseBufLen    int64
		reuseCount     int64
	)
	reuseBufLen = int64(len(r.reuseBuf))
	lastReset = time.Now()
	for {
		n, err := r.reader.Read(r.drainBuf)
		dataSinceReset += int64(n)
		r.Lock()
		bufLen = int64(r.buf.Len())
		if bufLen > maxBufLen {
			maxBufLen = bufLen
		}

		// Avoid unbounded growth of the buffer over time.
		// This has been discovered to be the only non-intrusive
		// solution to the unbounded growth of the buffer.
		// Alternative solutions such as compression, multiple
		// buffers, channels and other similar pieces of code
		// were reducing throughput, overall Docker performance
		// or simply crashed Docker.
		// This solution releases the buffer when specific
		// conditions are met to avoid the continuous resizing
		// of the buffer for long lived containers.
		//
		// Move data to the front of the buffer if it's
		// smaller than what reuseBuf can store
		if bufLen > 0 && reuseBufLen >= bufLen {
			n, _ := r.buf.Read(r.reuseBuf)
			r.buf.Write(r.reuseBuf[0:n])
			// Take action if the buffer has been reused too many
			// times and if there's data in the buffer.
			// The timeout is also used as means to avoid doing
			// these operations more often or less often than
			// required.
			// The various conditions try to detect heavy activity
			// in the buffer which might be indicators of heavy
			// growth of the buffer.
		} else if reuseCount >= r.maxReuse && bufLen > 0 {
			now = time.Now()
			duration = now.Sub(lastReset)
			timeoutReached := duration >= r.resetTimeout

			// The timeout has been reached and the
			// buffered data couldn't be moved to the front
			// of the buffer, so the buffer gets reset.
			if timeoutReached && bufLen > reuseBufLen {
				reset = true
			}
			// The amount of buffered data is too high now,
			// reset the buffer.
			if timeoutReached && maxBufLen >= r.bufLenResetThreshold {
				reset = true
			}
			// Reset the buffer if a certain amount of
			// data has gone through the buffer since the
			// last reset.
			if timeoutReached && dataSinceReset >= r.maxReadDataReset {
				reset = true
			}
			// The buffered data is moved to a fresh buffer,
			// swap the old buffer with the new one and
			// reset all counters.
			if reset {
				newbuf := &bytes.Buffer{}
				newbuf.ReadFrom(r.buf)
				r.buf = newbuf
				lastReset = now
				reset = false
				dataSinceReset = 0
				maxBufLen = 0
				reuseCount = 0
			}
		}
		if err != nil {
			r.err = err
		} else {
			r.buf.Write(r.drainBuf[0:n])
		}
		reuseCount++
		r.wait.Signal()
		r.Unlock()
		if err != nil {
			break
		}
	}
}

func (r *bufReader) Read(p []byte) (n int, err error) {
	r.Lock()
	defer r.Unlock()
	for {
		n, err = r.buf.Read(p)
		if n > 0 {
			return n, err
		}
		if r.err != nil {
			return 0, r.err
		}
		r.wait.Wait()
	}
}

func (r *bufReader) Close() error {
	closer, ok := r.reader.(io.ReadCloser)
	if !ok {
		return nil
	}
	return closer.Close()
}