package ioutils // import "github.com/docker/docker/pkg/ioutils"

import (
	"errors"
	"io"
	"sync"
)

// maxCap is the highest capacity to use in byte slices that buffer data.
const maxCap = 1e6

// minCap is the lowest capacity to use in byte slices that buffer data
const minCap = 64

// blockThreshold is the minimum number of bytes in the buffer which will cause
// a write to BytesPipe to block when allocating a new slice.
const blockThreshold = 1e6

var (
	// ErrClosed is returned when Write is called on a closed BytesPipe.
	ErrClosed = errors.New("write to closed BytesPipe")

	bufPools     = make(map[int]*sync.Pool)
	bufPoolsLock sync.Mutex
)

// BytesPipe is io.ReadWriteCloser which works similarly to pipe(queue).
// All written data may be read at most once. Also, BytesPipe allocates
// and releases new byte slices to adjust to current needs, so the buffer
// won't be overgrown after peak loads.
type BytesPipe struct {
	mu       sync.Mutex
	wait     *sync.Cond
	buf      []*fixedBuffer
	bufLen   int
	closeErr error // error to return from next Read. set to nil if not closed.
}

// NewBytesPipe creates new BytesPipe, initialized by specified slice.
// If buf is nil, then it will be initialized with slice which cap is 64.
// buf will be adjusted in a way that len(buf) == 0, cap(buf) == cap(buf).
func NewBytesPipe() *BytesPipe {
	bp := &BytesPipe{}
	bp.buf = append(bp.buf, getBuffer(minCap))
	bp.wait = sync.NewCond(&bp.mu)
	return bp
}

// Write writes p to BytesPipe.
// It can allocate new []byte slices in a process of writing.
func (bp *BytesPipe) Write(p []byte) (int, error) {
	bp.mu.Lock()

	written := 0
loop0:
	for {
		if bp.closeErr != nil {
			bp.mu.Unlock()
			return written, ErrClosed
		}

		if len(bp.buf) == 0 {
			bp.buf = append(bp.buf, getBuffer(64))
		}
		// get the last buffer
		b := bp.buf[len(bp.buf)-1]

		n, err := b.Write(p)
		written += n
		bp.bufLen += n

		// errBufferFull is an error we expect to get if the buffer is full
		if err != nil && err != errBufferFull {
			bp.wait.Broadcast()
			bp.mu.Unlock()
			return written, err
		}

		// if there was enough room to write all then break
		if len(p) == n {
			break
		}

		// more data: write to the next slice
		p = p[n:]

		// make sure the buffer doesn't grow too big from this write
		for bp.bufLen >= blockThreshold {
			bp.wait.Wait()
			if bp.closeErr != nil {
				continue loop0
			}
		}

		// add new byte slice to the buffers slice and continue writing
		nextCap := b.Cap() * 2
		if nextCap > maxCap {
			nextCap = maxCap
		}
		bp.buf = append(bp.buf, getBuffer(nextCap))
	}
	bp.wait.Broadcast()
	bp.mu.Unlock()
	return written, nil
}

// CloseWithError causes further reads from a BytesPipe to return immediately.
func (bp *BytesPipe) CloseWithError(err error) error {
	bp.mu.Lock()
	if err != nil {
		bp.closeErr = err
	} else {
		bp.closeErr = io.EOF
	}
	bp.wait.Broadcast()
	bp.mu.Unlock()
	return nil
}

// Close causes further reads from a BytesPipe to return immediately.
func (bp *BytesPipe) Close() error {
	return bp.CloseWithError(nil)
}

// Read reads bytes from BytesPipe.
// Data could be read only once.
func (bp *BytesPipe) Read(p []byte) (n int, err error) {
	bp.mu.Lock()
	if bp.bufLen == 0 {
		if bp.closeErr != nil {
			err := bp.closeErr
			bp.mu.Unlock()
			return 0, err
		}
		bp.wait.Wait()
		if bp.bufLen == 0 && bp.closeErr != nil {
			err := bp.closeErr
			bp.mu.Unlock()
			return 0, err
		}
	}

	for bp.bufLen > 0 {
		b := bp.buf[0]
		read, _ := b.Read(p) // ignore error since fixedBuffer doesn't really return an error
		n += read
		bp.bufLen -= read

		if b.Len() == 0 {
			// it's empty so return it to the pool and move to the next one
			returnBuffer(b)
			bp.buf[0] = nil
			bp.buf = bp.buf[1:]
		}

		if len(p) == read {
			break
		}

		p = p[read:]
	}

	bp.wait.Broadcast()
	bp.mu.Unlock()
	return
}

func returnBuffer(b *fixedBuffer) {
	b.Reset()
	bufPoolsLock.Lock()
	pool := bufPools[b.Cap()]
	bufPoolsLock.Unlock()
	if pool != nil {
		pool.Put(b)
	}
}

func getBuffer(size int) *fixedBuffer {
	bufPoolsLock.Lock()
	pool, ok := bufPools[size]
	if !ok {
		pool = &sync.Pool{New: func() interface{} { return &fixedBuffer{buf: make([]byte, 0, size)} }}
		bufPools[size] = pool
	}
	bufPoolsLock.Unlock()
	return pool.Get().(*fixedBuffer)
}