package ioutils import ( "bytes" "crypto/rand" "io" "math/big" "sync" "time" ) type readCloserWrapper struct { io.Reader closer func() error } func (r *readCloserWrapper) Close() error { return r.closer() } func NewReadCloserWrapper(r io.Reader, closer func() error) io.ReadCloser { return &readCloserWrapper{ Reader: r, closer: closer, } } type readerErrWrapper struct { reader io.Reader closer func() } func (r *readerErrWrapper) Read(p []byte) (int, error) { n, err := r.reader.Read(p) if err != nil { r.closer() } return n, err } func NewReaderErrWrapper(r io.Reader, closer func()) io.Reader { return &readerErrWrapper{ reader: r, closer: closer, } } // bufReader allows the underlying reader to continue to produce // output by pre-emptively reading from the wrapped reader. // This is achieved by buffering this data in bufReader's // expanding buffer. type bufReader struct { sync.Mutex buf *bytes.Buffer reader io.Reader err error wait sync.Cond drainBuf []byte reuseBuf []byte maxReuse int64 resetTimeout time.Duration bufLenResetThreshold int64 maxReadDataReset int64 } func NewBufReader(r io.Reader) *bufReader { var timeout int if randVal, err := rand.Int(rand.Reader, big.NewInt(120)); err == nil { timeout = int(randVal.Int64()) + 180 } else { timeout = 300 } reader := &bufReader{ buf: &bytes.Buffer{}, drainBuf: make([]byte, 1024), reuseBuf: make([]byte, 4096), maxReuse: 1000, resetTimeout: time.Second * time.Duration(timeout), bufLenResetThreshold: 100 * 1024, maxReadDataReset: 10 * 1024 * 1024, reader: r, } reader.wait.L = &reader.Mutex go reader.drain() return reader } func NewBufReaderWithDrainbufAndBuffer(r io.Reader, drainBuffer []byte, buffer *bytes.Buffer) *bufReader { reader := &bufReader{ buf: buffer, drainBuf: drainBuffer, reader: r, } reader.wait.L = &reader.Mutex go reader.drain() return reader } func (r *bufReader) drain() { var ( duration time.Duration lastReset time.Time now time.Time reset bool bufLen int64 dataSinceReset int64 maxBufLen int64 reuseBufLen int64 reuseCount int64 ) reuseBufLen = int64(len(r.reuseBuf)) lastReset = time.Now() for { n, err := r.reader.Read(r.drainBuf) dataSinceReset += int64(n) r.Lock() bufLen = int64(r.buf.Len()) if bufLen > maxBufLen { maxBufLen = bufLen } // Avoid unbounded growth of the buffer over time. // This has been discovered to be the only non-intrusive // solution to the unbounded growth of the buffer. // Alternative solutions such as compression, multiple // buffers, channels and other similar pieces of code // were reducing throughput, overall Docker performance // or simply crashed Docker. // This solution releases the buffer when specific // conditions are met to avoid the continuous resizing // of the buffer for long lived containers. // // Move data to the front of the buffer if it's // smaller than what reuseBuf can store if bufLen > 0 && reuseBufLen >= bufLen { n, _ := r.buf.Read(r.reuseBuf) r.buf.Write(r.reuseBuf[0:n]) // Take action if the buffer has been reused too many // times and if there's data in the buffer. // The timeout is also used as means to avoid doing // these operations more often or less often than // required. // The various conditions try to detect heavy activity // in the buffer which might be indicators of heavy // growth of the buffer. } else if reuseCount >= r.maxReuse && bufLen > 0 { now = time.Now() duration = now.Sub(lastReset) timeoutReached := duration >= r.resetTimeout // The timeout has been reached and the // buffered data couldn't be moved to the front // of the buffer, so the buffer gets reset. if timeoutReached && bufLen > reuseBufLen { reset = true } // The amount of buffered data is too high now, // reset the buffer. if timeoutReached && maxBufLen >= r.bufLenResetThreshold { reset = true } // Reset the buffer if a certain amount of // data has gone through the buffer since the // last reset. if timeoutReached && dataSinceReset >= r.maxReadDataReset { reset = true } // The buffered data is moved to a fresh buffer, // swap the old buffer with the new one and // reset all counters. if reset { newbuf := &bytes.Buffer{} newbuf.ReadFrom(r.buf) r.buf = newbuf lastReset = now reset = false dataSinceReset = 0 maxBufLen = 0 reuseCount = 0 } } if err != nil { r.err = err } else { r.buf.Write(r.drainBuf[0:n]) } reuseCount++ r.wait.Signal() r.Unlock() if err != nil { break } } } func (r *bufReader) Read(p []byte) (n int, err error) { r.Lock() defer r.Unlock() for { n, err = r.buf.Read(p) if n > 0 { return n, err } if r.err != nil { return 0, r.err } r.wait.Wait() } } func (r *bufReader) Close() error { closer, ok := r.reader.(io.ReadCloser) if !ok { return nil } return closer.Close() }