Browse code

Move container/stream to daemon/internal/stream

Signed-off-by: Derek McGowan <derek@mcg.dev>

Derek McGowan authored on 2025/06/28 06:27:05
Showing 14 changed files
... ...
@@ -21,7 +21,7 @@ import (
21 21
 	"github.com/docker/docker/api/types/events"
22 22
 	mounttypes "github.com/docker/docker/api/types/mount"
23 23
 	swarmtypes "github.com/docker/docker/api/types/swarm"
24
-	"github.com/docker/docker/container/stream"
24
+	"github.com/docker/docker/daemon/internal/stream"
25 25
 	"github.com/docker/docker/daemon/logger"
26 26
 	"github.com/docker/docker/daemon/logger/jsonfilelog"
27 27
 	"github.com/docker/docker/daemon/logger/local"
... ...
@@ -7,7 +7,7 @@ import (
7 7
 
8 8
 	"github.com/containerd/containerd/v2/pkg/cio"
9 9
 	"github.com/containerd/log"
10
-	"github.com/docker/docker/container/stream"
10
+	"github.com/docker/docker/daemon/internal/stream"
11 11
 	"github.com/docker/docker/libcontainerd/types"
12 12
 	"github.com/docker/docker/pkg/stringid"
13 13
 )
14 14
deleted file mode 100644
... ...
@@ -1,183 +0,0 @@
1
-package stream
2
-
3
-import (
4
-	"context"
5
-	"io"
6
-
7
-	"github.com/containerd/log"
8
-	"github.com/docker/docker/pkg/pools"
9
-	"github.com/moby/term"
10
-	"github.com/pkg/errors"
11
-	"golang.org/x/sync/errgroup"
12
-)
13
-
14
-var defaultEscapeSequence = []byte{16, 17} // ctrl-p, ctrl-q
15
-
16
-// AttachConfig is the config struct used to attach a client to a stream's stdio
17
-type AttachConfig struct {
18
-	// Tells the attach copier that the stream's stdin is a TTY and to look for
19
-	// escape sequences in stdin to detach from the stream.
20
-	// When true the escape sequence is not passed to the underlying stream
21
-	TTY bool
22
-	// Specifies the detach keys the client will be using
23
-	// Only useful when `TTY` is true
24
-	DetachKeys []byte
25
-
26
-	// CloseStdin signals that once done, stdin for the attached stream should be closed
27
-	// For example, this would close the attached container's stdin.
28
-	CloseStdin bool
29
-
30
-	// UseStd* indicate whether the client has requested to be connected to the
31
-	// given stream or not.  These flags are used instead of checking Std* != nil
32
-	// at points before the client streams Std* are wired up.
33
-	UseStdin, UseStdout, UseStderr bool
34
-
35
-	// CStd* are the streams directly connected to the container
36
-	CStdin           io.WriteCloser
37
-	CStdout, CStderr io.ReadCloser
38
-
39
-	// Provide client streams to wire up to
40
-	Stdin          io.ReadCloser
41
-	Stdout, Stderr io.Writer
42
-}
43
-
44
-// AttachStreams attaches the container's streams to the AttachConfig
45
-func (c *Config) AttachStreams(cfg *AttachConfig) {
46
-	if cfg.UseStdin {
47
-		cfg.CStdin = c.StdinPipe()
48
-	}
49
-
50
-	if cfg.UseStdout {
51
-		cfg.CStdout = c.StdoutPipe()
52
-	}
53
-
54
-	if cfg.UseStderr {
55
-		cfg.CStderr = c.StderrPipe()
56
-	}
57
-}
58
-
59
-// CopyStreams starts goroutines to copy data in and out to/from the container
60
-func (c *Config) CopyStreams(ctx context.Context, cfg *AttachConfig) <-chan error {
61
-	var group errgroup.Group
62
-
63
-	// Connect stdin of container to the attach stdin stream.
64
-	if cfg.Stdin != nil {
65
-		group.Go(func() error {
66
-			log.G(ctx).Debug("attach: stdin: begin")
67
-			defer log.G(ctx).Debug("attach: stdin: end")
68
-
69
-			defer func() {
70
-				if cfg.CloseStdin && !cfg.TTY {
71
-					cfg.CStdin.Close()
72
-				} else {
73
-					// No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr
74
-					if cfg.CStdout != nil {
75
-						cfg.CStdout.Close()
76
-					}
77
-					if cfg.CStderr != nil {
78
-						cfg.CStderr.Close()
79
-					}
80
-				}
81
-			}()
82
-
83
-			var err error
84
-			if cfg.TTY {
85
-				_, err = copyEscapable(cfg.CStdin, cfg.Stdin, cfg.DetachKeys)
86
-			} else {
87
-				_, err = pools.Copy(cfg.CStdin, cfg.Stdin)
88
-			}
89
-			if errors.Is(err, io.ErrClosedPipe) {
90
-				err = nil
91
-			}
92
-			if err != nil {
93
-				log.G(ctx).WithError(err).Debug("error on attach stdin")
94
-				return errors.Wrap(err, "error on attach stdin")
95
-			}
96
-			return nil
97
-		})
98
-	}
99
-
100
-	attachStream := func(name string, stream io.Writer, streamPipe io.ReadCloser) error {
101
-		log.G(ctx).Debugf("attach: %s: begin", name)
102
-		defer log.G(ctx).Debugf("attach: %s: end", name)
103
-		defer func() {
104
-			// Make sure stdin gets closed
105
-			if cfg.Stdin != nil {
106
-				cfg.Stdin.Close()
107
-			}
108
-			streamPipe.Close()
109
-		}()
110
-
111
-		_, err := pools.Copy(stream, streamPipe)
112
-		if errors.Is(err, io.ErrClosedPipe) {
113
-			err = nil
114
-		}
115
-		if err != nil {
116
-			log.G(ctx).WithError(err).Debugf("attach: %s", name)
117
-			return errors.Wrapf(err, "error attaching %s stream", name)
118
-		}
119
-		return nil
120
-	}
121
-
122
-	if cfg.Stdout != nil {
123
-		group.Go(func() error {
124
-			return attachStream("stdout", cfg.Stdout, cfg.CStdout)
125
-		})
126
-	}
127
-	if cfg.Stderr != nil {
128
-		group.Go(func() error {
129
-			return attachStream("stderr", cfg.Stderr, cfg.CStderr)
130
-		})
131
-	}
132
-
133
-	errs := make(chan error, 1)
134
-	go func() {
135
-		defer log.G(ctx).Debug("attach done")
136
-		groupErr := make(chan error, 1)
137
-		go func() {
138
-			groupErr <- group.Wait()
139
-		}()
140
-		select {
141
-		case <-ctx.Done():
142
-			// close all pipes
143
-			if cfg.CStdin != nil {
144
-				cfg.CStdin.Close()
145
-			}
146
-			if cfg.CStdout != nil {
147
-				cfg.CStdout.Close()
148
-			}
149
-			if cfg.CStderr != nil {
150
-				cfg.CStderr.Close()
151
-			}
152
-
153
-			if cfg.Stdin != nil {
154
-				// In this case, `cfg.Stdin` is a stream from the client.
155
-				// The way `io.Copy` works we may get stuck waiting to read from `cfg.Stdin` even if the container has exited.
156
-				// This will cause the `io.Copy` to never return and the `group.Wait()` to never return.
157
-				// By closing cfg.Stdin we will cause the `io.Copy` to return and the `group.Wait()` to return.
158
-				cfg.Stdin.Close()
159
-			}
160
-
161
-			// Now with these closed, wait should return.
162
-			if err := group.Wait(); err != nil {
163
-				errs <- err
164
-				return
165
-			}
166
-			errs <- ctx.Err()
167
-		case err := <-groupErr:
168
-			errs <- err
169
-		}
170
-	}()
171
-
172
-	return errs
173
-}
174
-
175
-func copyEscapable(dst io.Writer, src io.ReadCloser, keys []byte) (written int64, _ error) {
176
-	if len(keys) == 0 {
177
-		keys = defaultEscapeSequence
178
-	}
179
-	pr := term.NewEscapeProxy(src, keys)
180
-	defer src.Close()
181
-
182
-	return pools.Copy(dst, pr)
183
-}
184 1
deleted file mode 100644
... ...
@@ -1,102 +0,0 @@
1
-package stream
2
-
3
-import (
4
-	"context"
5
-	"io"
6
-	"testing"
7
-	"time"
8
-
9
-	"gotest.tools/v3/assert"
10
-)
11
-
12
-// Make sure when there is no I/O on a stream that the goroutines do not get blocked after the container exits.
13
-func TestAttachNoIO(t *testing.T) {
14
-	t.Run("stdin only", func(t *testing.T) {
15
-		stdinR, _ := io.Pipe()
16
-		defer stdinR.Close()
17
-		testStreamCopy(t, stdinR, nil, nil)
18
-	})
19
-
20
-	t.Run("stdout only", func(t *testing.T) {
21
-		_, w := io.Pipe()
22
-		defer w.Close()
23
-		testStreamCopy(t, nil, w, nil)
24
-	})
25
-
26
-	t.Run("stderr only", func(t *testing.T) {
27
-		_, w := io.Pipe()
28
-		defer w.Close()
29
-		testStreamCopy(t, nil, nil, w)
30
-	})
31
-
32
-	t.Run("stdout+stderr", func(t *testing.T) {
33
-		_, stdoutW := io.Pipe()
34
-		defer stdoutW.Close()
35
-		_, stderrW := io.Pipe()
36
-		defer stderrW.Close()
37
-
38
-		testStreamCopy(t, nil, stdoutW, stderrW)
39
-	})
40
-
41
-	t.Run("stdin+stdout", func(t *testing.T) {
42
-		stdin, _ := io.Pipe()
43
-		defer stdin.Close()
44
-		_, stdout := io.Pipe()
45
-		defer stdout.Close()
46
-
47
-		testStreamCopy(t, stdin, stdout, nil)
48
-	})
49
-
50
-	t.Run("stdin+stderr", func(t *testing.T) {
51
-		stdin, _ := io.Pipe()
52
-		defer stdin.Close()
53
-		_, stderr := io.Pipe()
54
-		defer stderr.Close()
55
-
56
-		testStreamCopy(t, stdin, nil, stderr)
57
-	})
58
-
59
-	t.Run("stdin+stdout+stderr", func(t *testing.T) {
60
-		stdinR, _ := io.Pipe()
61
-		defer stdinR.Close()
62
-		stdoutR, stdoutW := io.Pipe()
63
-		defer stdoutR.Close()
64
-		stderrR, stderrW := io.Pipe()
65
-		defer stderrR.Close()
66
-		testStreamCopy(t, stdinR, stdoutW, stderrW)
67
-	})
68
-}
69
-
70
-func testStreamCopy(t *testing.T, stdin io.ReadCloser, stdout, stderr io.WriteCloser) {
71
-	cfg := AttachConfig{
72
-		UseStdin:  stdin != nil,
73
-		UseStdout: stdout != nil,
74
-		UseStderr: stderr != nil,
75
-		Stdin:     stdin,
76
-		Stdout:    stdout,
77
-		Stderr:    stderr,
78
-	}
79
-
80
-	sc := NewConfig()
81
-	sc.AttachStreams(&cfg)
82
-	defer sc.CloseStreams()
83
-
84
-	ctx, cancel := context.WithCancel(context.Background())
85
-	defer cancel()
86
-	chErr := sc.CopyStreams(ctx, &cfg)
87
-
88
-	select {
89
-	case err := <-chErr:
90
-		assert.NilError(t, err)
91
-	default:
92
-	}
93
-
94
-	cancel()
95
-
96
-	select {
97
-	case err := <-chErr:
98
-		assert.ErrorIs(t, err, context.Canceled)
99
-	case <-time.After(10 * time.Second):
100
-		t.Fatal("timeout waiting for CopyStreams to exit")
101
-	}
102
-}
103 1
deleted file mode 100644
... ...
@@ -1,187 +0,0 @@
1
-package stream
2
-
3
-import (
4
-	"context"
5
-	"errors"
6
-	"fmt"
7
-	"io"
8
-	"sync"
9
-	"sync/atomic"
10
-
11
-	"github.com/containerd/containerd/v2/pkg/cio"
12
-	"github.com/containerd/log"
13
-	"github.com/docker/docker/daemon/internal/stream/bytespipe"
14
-	"github.com/docker/docker/pkg/pools"
15
-)
16
-
17
-// Config holds information about I/O streams managed together.
18
-//
19
-// config.StdinPipe returns a WriteCloser which can be used to feed data
20
-// to the standard input of the streamConfig's active process.
21
-// config.StdoutPipe and streamConfig.StderrPipe each return a ReadCloser
22
-// which can be used to retrieve the standard output (and error) generated
23
-// by the container's active process. The output (and error) are actually
24
-// copied and delivered to all StdoutPipe and StderrPipe consumers, using
25
-// a kind of "broadcaster".
26
-type Config struct {
27
-	wg        sync.WaitGroup
28
-	stdout    *unbuffered
29
-	stderr    *unbuffered
30
-	stdin     io.ReadCloser
31
-	stdinPipe io.WriteCloser
32
-	dio       *cio.DirectIO
33
-	// closed is set to true when CloseStreams is called
34
-	closed atomic.Bool
35
-}
36
-
37
-// NewConfig creates a stream config and initializes
38
-// the standard err and standard out to new unbuffered broadcasters.
39
-func NewConfig() *Config {
40
-	return &Config{
41
-		stderr: new(unbuffered),
42
-		stdout: new(unbuffered),
43
-	}
44
-}
45
-
46
-// Stdout returns the standard output in the configuration.
47
-func (c *Config) Stdout() io.Writer {
48
-	return c.stdout
49
-}
50
-
51
-// Stderr returns the standard error in the configuration.
52
-func (c *Config) Stderr() io.Writer {
53
-	return c.stderr
54
-}
55
-
56
-// Stdin returns the standard input in the configuration.
57
-func (c *Config) Stdin() io.ReadCloser {
58
-	return c.stdin
59
-}
60
-
61
-// StdinPipe returns an input writer pipe as an io.WriteCloser.
62
-func (c *Config) StdinPipe() io.WriteCloser {
63
-	return c.stdinPipe
64
-}
65
-
66
-// StdoutPipe creates a new io.ReadCloser with an empty bytes pipe.
67
-// It adds this new out pipe to the Stdout broadcaster.
68
-// This will block stdout if unconsumed.
69
-func (c *Config) StdoutPipe() io.ReadCloser {
70
-	bytesPipe := bytespipe.New()
71
-	c.stdout.Add(bytesPipe)
72
-	return bytesPipe
73
-}
74
-
75
-// StderrPipe creates a new io.ReadCloser with an empty bytes pipe.
76
-// It adds this new err pipe to the Stderr broadcaster.
77
-// This will block stderr if unconsumed.
78
-func (c *Config) StderrPipe() io.ReadCloser {
79
-	bytesPipe := bytespipe.New()
80
-	c.stderr.Add(bytesPipe)
81
-	return bytesPipe
82
-}
83
-
84
-// NewInputPipes creates new pipes for both standard inputs, Stdin and StdinPipe.
85
-func (c *Config) NewInputPipes() {
86
-	c.stdin, c.stdinPipe = io.Pipe()
87
-}
88
-
89
-// NewNopInputPipe creates a new input pipe that will silently drop all messages in the input.
90
-func (c *Config) NewNopInputPipe() {
91
-	c.stdinPipe = &nopWriteCloser{io.Discard}
92
-}
93
-
94
-type nopWriteCloser struct {
95
-	io.Writer
96
-}
97
-
98
-func (w *nopWriteCloser) Close() error { return nil }
99
-
100
-// CloseStreams ensures that the configured streams are properly closed.
101
-func (c *Config) CloseStreams() error {
102
-	var errs error
103
-
104
-	c.closed.Store(true)
105
-
106
-	if c.stdin != nil {
107
-		if err := c.stdin.Close(); err != nil {
108
-			errs = errors.Join(errs, fmt.Errorf("error close stdin: %w", err))
109
-		}
110
-	}
111
-
112
-	if err := c.stdout.Clean(); err != nil {
113
-		errs = errors.Join(errs, fmt.Errorf("error close stdout: %w", err))
114
-	}
115
-
116
-	if err := c.stderr.Clean(); err != nil {
117
-		errs = errors.Join(errs, fmt.Errorf("error close stderr: %w", err))
118
-	}
119
-
120
-	return errs
121
-}
122
-
123
-// CopyToPipe connects streamconfig with a libcontainerd.IOPipe
124
-func (c *Config) CopyToPipe(iop *cio.DirectIO) {
125
-	ctx := context.TODO()
126
-
127
-	c.dio = iop
128
-	copyFunc := func(name string, w io.Writer, r io.ReadCloser) {
129
-		c.wg.Add(1)
130
-		go func() {
131
-			defer c.wg.Done()
132
-			if _, err := pools.Copy(w, r); err != nil {
133
-				if c.closed.Load() {
134
-					return
135
-				}
136
-				log.G(ctx).WithFields(log.Fields{"stream": name, "error": err}).Error("copy stream failed")
137
-			}
138
-			if err := r.Close(); err != nil && !c.closed.Load() {
139
-				log.G(ctx).WithFields(log.Fields{"stream": name, "error": err}).Warn("close stream failed")
140
-			}
141
-		}()
142
-	}
143
-
144
-	if iop.Stdout != nil {
145
-		copyFunc("stdout", c.Stdout(), iop.Stdout)
146
-	}
147
-	if iop.Stderr != nil {
148
-		copyFunc("stderr", c.Stderr(), iop.Stderr)
149
-	}
150
-
151
-	if stdin := c.Stdin(); stdin != nil {
152
-		if iop.Stdin != nil {
153
-			go func() {
154
-				_, err := pools.Copy(iop.Stdin, stdin)
155
-				if err != nil {
156
-					if c.closed.Load() {
157
-						return
158
-					}
159
-					log.G(ctx).WithFields(log.Fields{"stream": "stdin", "error": err}).Error("copy stream failed")
160
-				}
161
-				if err := iop.Stdin.Close(); err != nil && !c.closed.Load() {
162
-					log.G(ctx).WithFields(log.Fields{"stream": "stdin", "error": err}).Warn("close stream failed")
163
-				}
164
-			}()
165
-		}
166
-	}
167
-}
168
-
169
-// Wait for the stream to close
170
-// Wait supports timeouts via the context to unblock and forcefully
171
-// close the io streams
172
-func (c *Config) Wait(ctx context.Context) {
173
-	done := make(chan struct{}, 1)
174
-	go func() {
175
-		c.wg.Wait()
176
-		close(done)
177
-	}()
178
-	select {
179
-	case <-done:
180
-	case <-ctx.Done():
181
-		if c.dio != nil {
182
-			c.dio.Cancel()
183
-			c.dio.Wait()
184
-			c.dio.Close()
185
-		}
186
-	}
187
-}
188 1
deleted file mode 100644
... ...
@@ -1,49 +0,0 @@
1
-package stream
2
-
3
-import (
4
-	"io"
5
-	"sync"
6
-)
7
-
8
-// unbuffered accumulates multiple io.WriteCloser by stream.
9
-type unbuffered struct {
10
-	mu      sync.Mutex
11
-	writers []io.WriteCloser
12
-}
13
-
14
-// Add adds new io.WriteCloser.
15
-func (w *unbuffered) Add(writer io.WriteCloser) {
16
-	w.mu.Lock()
17
-	w.writers = append(w.writers, writer)
18
-	w.mu.Unlock()
19
-}
20
-
21
-// Write writes bytes to all writers. Failed writers will be evicted during
22
-// this call.
23
-func (w *unbuffered) Write(p []byte) (int, error) {
24
-	w.mu.Lock()
25
-	var evict []int
26
-	for i, sw := range w.writers {
27
-		if n, err := sw.Write(p); err != nil || n != len(p) {
28
-			// On error, evict the writer
29
-			evict = append(evict, i)
30
-		}
31
-	}
32
-	for n, i := range evict {
33
-		w.writers = append(w.writers[:i-n], w.writers[i-n+1:]...)
34
-	}
35
-	w.mu.Unlock()
36
-	return len(p), nil
37
-}
38
-
39
-// Clean closes and removes all writers. Last non-eol-terminated part of data
40
-// will be saved.
41
-func (w *unbuffered) Clean() error {
42
-	w.mu.Lock()
43
-	for _, sw := range w.writers {
44
-		sw.Close()
45
-	}
46
-	w.writers = nil
47
-	w.mu.Unlock()
48
-	return nil
49
-}
50 1
deleted file mode 100644
... ...
@@ -1,164 +0,0 @@
1
-package stream
2
-
3
-import (
4
-	"bytes"
5
-	"errors"
6
-	"strings"
7
-	"testing"
8
-)
9
-
10
-type dummyWriter struct {
11
-	buffer      bytes.Buffer
12
-	failOnWrite bool
13
-}
14
-
15
-func (dw *dummyWriter) Write(p []byte) (int, error) {
16
-	if dw.failOnWrite {
17
-		return 0, errors.New("Fake fail")
18
-	}
19
-	return dw.buffer.Write(p)
20
-}
21
-
22
-func (dw *dummyWriter) String() string {
23
-	return dw.buffer.String()
24
-}
25
-
26
-func (dw *dummyWriter) Close() error {
27
-	return nil
28
-}
29
-
30
-func TestUnbuffered(t *testing.T) {
31
-	writer := new(unbuffered)
32
-
33
-	// Test 1: Both bufferA and bufferB should contain "foo"
34
-	bufferA := &dummyWriter{}
35
-	writer.Add(bufferA)
36
-	bufferB := &dummyWriter{}
37
-	writer.Add(bufferB)
38
-	writer.Write([]byte("foo"))
39
-
40
-	if bufferA.String() != "foo" {
41
-		t.Errorf("Buffer contains %v", bufferA.String())
42
-	}
43
-
44
-	if bufferB.String() != "foo" {
45
-		t.Errorf("Buffer contains %v", bufferB.String())
46
-	}
47
-
48
-	// Test2: bufferA and bufferB should contain "foobar",
49
-	// while bufferC should only contain "bar"
50
-	bufferC := &dummyWriter{}
51
-	writer.Add(bufferC)
52
-	writer.Write([]byte("bar"))
53
-
54
-	if bufferA.String() != "foobar" {
55
-		t.Errorf("Buffer contains %v", bufferA.String())
56
-	}
57
-
58
-	if bufferB.String() != "foobar" {
59
-		t.Errorf("Buffer contains %v", bufferB.String())
60
-	}
61
-
62
-	if bufferC.String() != "bar" {
63
-		t.Errorf("Buffer contains %v", bufferC.String())
64
-	}
65
-
66
-	// Test3: Test eviction on failure
67
-	bufferA.failOnWrite = true
68
-	writer.Write([]byte("fail"))
69
-	if bufferA.String() != "foobar" {
70
-		t.Errorf("Buffer contains %v", bufferA.String())
71
-	}
72
-	if bufferC.String() != "barfail" {
73
-		t.Errorf("Buffer contains %v", bufferC.String())
74
-	}
75
-	// Even though we reset the flag, no more writes should go in there
76
-	bufferA.failOnWrite = false
77
-	writer.Write([]byte("test"))
78
-	if bufferA.String() != "foobar" {
79
-		t.Errorf("Buffer contains %v", bufferA.String())
80
-	}
81
-	if bufferC.String() != "barfailtest" {
82
-		t.Errorf("Buffer contains %v", bufferC.String())
83
-	}
84
-
85
-	// Test4: Test eviction on multiple simultaneous failures
86
-	bufferB.failOnWrite = true
87
-	bufferC.failOnWrite = true
88
-	bufferD := &dummyWriter{}
89
-	writer.Add(bufferD)
90
-	writer.Write([]byte("yo"))
91
-	writer.Write([]byte("ink"))
92
-	if strings.Contains(bufferB.String(), "yoink") {
93
-		t.Errorf("bufferB received write. contents: %q", bufferB)
94
-	}
95
-	if strings.Contains(bufferC.String(), "yoink") {
96
-		t.Errorf("bufferC received write. contents: %q", bufferC)
97
-	}
98
-	if g, w := bufferD.String(), "yoink"; g != w {
99
-		t.Errorf("bufferD = %q, want %q", g, w)
100
-	}
101
-
102
-	writer.Clean()
103
-}
104
-
105
-type devNullCloser int
106
-
107
-func (d devNullCloser) Close() error {
108
-	return nil
109
-}
110
-
111
-func (d devNullCloser) Write(buf []byte) (int, error) {
112
-	return len(buf), nil
113
-}
114
-
115
-// This test checks for races. It is only useful when run with the race detector.
116
-func TestRaceUnbuffered(t *testing.T) {
117
-	writer := new(unbuffered)
118
-	c := make(chan bool)
119
-	go func() {
120
-		writer.Add(devNullCloser(0))
121
-		c <- true
122
-	}()
123
-	_, err := writer.Write([]byte("hello"))
124
-	if err != nil {
125
-		t.Error(err)
126
-	}
127
-	<-c
128
-}
129
-
130
-func BenchmarkUnbuffered(b *testing.B) {
131
-	writer := new(unbuffered)
132
-	setUpWriter := func() {
133
-		for i := 0; i < 100; i++ {
134
-			writer.Add(devNullCloser(0))
135
-			writer.Add(devNullCloser(0))
136
-			writer.Add(devNullCloser(0))
137
-		}
138
-	}
139
-	testLine := "Line that thinks that it is log line from docker"
140
-	var buf bytes.Buffer
141
-	for i := 0; i < 100; i++ {
142
-		buf.WriteString(testLine + "\n")
143
-	}
144
-	// line without eol
145
-	buf.WriteString(testLine)
146
-	testText := buf.Bytes()
147
-	b.SetBytes(int64(5 * len(testText)))
148
-	b.ResetTimer()
149
-	for i := 0; i < b.N; i++ {
150
-		b.StopTimer()
151
-		setUpWriter()
152
-		b.StartTimer()
153
-
154
-		for j := 0; j < 5; j++ {
155
-			if _, err := writer.Write(testText); err != nil {
156
-				b.Fatal(err)
157
-			}
158
-		}
159
-
160
-		b.StopTimer()
161
-		writer.Clean()
162
-		b.StartTimer()
163
-	}
164
-}
... ...
@@ -10,7 +10,7 @@ import (
10 10
 	containertypes "github.com/docker/docker/api/types/container"
11 11
 	"github.com/docker/docker/api/types/events"
12 12
 	"github.com/docker/docker/container"
13
-	"github.com/docker/docker/container/stream"
13
+	"github.com/docker/docker/daemon/internal/stream"
14 14
 	"github.com/docker/docker/daemon/logger"
15 15
 	"github.com/docker/docker/errdefs"
16 16
 	"github.com/docker/docker/pkg/stdcopy"
... ...
@@ -15,7 +15,7 @@ import (
15 15
 	containertypes "github.com/docker/docker/api/types/container"
16 16
 	"github.com/docker/docker/api/types/events"
17 17
 	"github.com/docker/docker/container"
18
-	"github.com/docker/docker/container/stream"
18
+	"github.com/docker/docker/daemon/internal/stream"
19 19
 	"github.com/docker/docker/errdefs"
20 20
 	"github.com/docker/docker/pkg/pools"
21 21
 	"github.com/moby/sys/signal"
22 22
new file mode 100644
... ...
@@ -0,0 +1,183 @@
0
+package stream
1
+
2
+import (
3
+	"context"
4
+	"io"
5
+
6
+	"github.com/containerd/log"
7
+	"github.com/docker/docker/pkg/pools"
8
+	"github.com/moby/term"
9
+	"github.com/pkg/errors"
10
+	"golang.org/x/sync/errgroup"
11
+)
12
+
13
+var defaultEscapeSequence = []byte{16, 17} // ctrl-p, ctrl-q
14
+
15
+// AttachConfig is the config struct used to attach a client to a stream's stdio
16
+type AttachConfig struct {
17
+	// Tells the attach copier that the stream's stdin is a TTY and to look for
18
+	// escape sequences in stdin to detach from the stream.
19
+	// When true the escape sequence is not passed to the underlying stream
20
+	TTY bool
21
+	// Specifies the detach keys the client will be using
22
+	// Only useful when `TTY` is true
23
+	DetachKeys []byte
24
+
25
+	// CloseStdin signals that once done, stdin for the attached stream should be closed
26
+	// For example, this would close the attached container's stdin.
27
+	CloseStdin bool
28
+
29
+	// UseStd* indicate whether the client has requested to be connected to the
30
+	// given stream or not.  These flags are used instead of checking Std* != nil
31
+	// at points before the client streams Std* are wired up.
32
+	UseStdin, UseStdout, UseStderr bool
33
+
34
+	// CStd* are the streams directly connected to the container
35
+	CStdin           io.WriteCloser
36
+	CStdout, CStderr io.ReadCloser
37
+
38
+	// Provide client streams to wire up to
39
+	Stdin          io.ReadCloser
40
+	Stdout, Stderr io.Writer
41
+}
42
+
43
+// AttachStreams attaches the container's streams to the AttachConfig
44
+func (c *Config) AttachStreams(cfg *AttachConfig) {
45
+	if cfg.UseStdin {
46
+		cfg.CStdin = c.StdinPipe()
47
+	}
48
+
49
+	if cfg.UseStdout {
50
+		cfg.CStdout = c.StdoutPipe()
51
+	}
52
+
53
+	if cfg.UseStderr {
54
+		cfg.CStderr = c.StderrPipe()
55
+	}
56
+}
57
+
58
+// CopyStreams starts goroutines to copy data in and out to/from the container
59
+func (c *Config) CopyStreams(ctx context.Context, cfg *AttachConfig) <-chan error {
60
+	var group errgroup.Group
61
+
62
+	// Connect stdin of container to the attach stdin stream.
63
+	if cfg.Stdin != nil {
64
+		group.Go(func() error {
65
+			log.G(ctx).Debug("attach: stdin: begin")
66
+			defer log.G(ctx).Debug("attach: stdin: end")
67
+
68
+			defer func() {
69
+				if cfg.CloseStdin && !cfg.TTY {
70
+					cfg.CStdin.Close()
71
+				} else {
72
+					// No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr
73
+					if cfg.CStdout != nil {
74
+						cfg.CStdout.Close()
75
+					}
76
+					if cfg.CStderr != nil {
77
+						cfg.CStderr.Close()
78
+					}
79
+				}
80
+			}()
81
+
82
+			var err error
83
+			if cfg.TTY {
84
+				_, err = copyEscapable(cfg.CStdin, cfg.Stdin, cfg.DetachKeys)
85
+			} else {
86
+				_, err = pools.Copy(cfg.CStdin, cfg.Stdin)
87
+			}
88
+			if errors.Is(err, io.ErrClosedPipe) {
89
+				err = nil
90
+			}
91
+			if err != nil {
92
+				log.G(ctx).WithError(err).Debug("error on attach stdin")
93
+				return errors.Wrap(err, "error on attach stdin")
94
+			}
95
+			return nil
96
+		})
97
+	}
98
+
99
+	attachStream := func(name string, stream io.Writer, streamPipe io.ReadCloser) error {
100
+		log.G(ctx).Debugf("attach: %s: begin", name)
101
+		defer log.G(ctx).Debugf("attach: %s: end", name)
102
+		defer func() {
103
+			// Make sure stdin gets closed
104
+			if cfg.Stdin != nil {
105
+				cfg.Stdin.Close()
106
+			}
107
+			streamPipe.Close()
108
+		}()
109
+
110
+		_, err := pools.Copy(stream, streamPipe)
111
+		if errors.Is(err, io.ErrClosedPipe) {
112
+			err = nil
113
+		}
114
+		if err != nil {
115
+			log.G(ctx).WithError(err).Debugf("attach: %s", name)
116
+			return errors.Wrapf(err, "error attaching %s stream", name)
117
+		}
118
+		return nil
119
+	}
120
+
121
+	if cfg.Stdout != nil {
122
+		group.Go(func() error {
123
+			return attachStream("stdout", cfg.Stdout, cfg.CStdout)
124
+		})
125
+	}
126
+	if cfg.Stderr != nil {
127
+		group.Go(func() error {
128
+			return attachStream("stderr", cfg.Stderr, cfg.CStderr)
129
+		})
130
+	}
131
+
132
+	errs := make(chan error, 1)
133
+	go func() {
134
+		defer log.G(ctx).Debug("attach done")
135
+		groupErr := make(chan error, 1)
136
+		go func() {
137
+			groupErr <- group.Wait()
138
+		}()
139
+		select {
140
+		case <-ctx.Done():
141
+			// close all pipes
142
+			if cfg.CStdin != nil {
143
+				cfg.CStdin.Close()
144
+			}
145
+			if cfg.CStdout != nil {
146
+				cfg.CStdout.Close()
147
+			}
148
+			if cfg.CStderr != nil {
149
+				cfg.CStderr.Close()
150
+			}
151
+
152
+			if cfg.Stdin != nil {
153
+				// In this case, `cfg.Stdin` is a stream from the client.
154
+				// The way `io.Copy` works we may get stuck waiting to read from `cfg.Stdin` even if the container has exited.
155
+				// This will cause the `io.Copy` to never return and the `group.Wait()` to never return.
156
+				// By closing cfg.Stdin we will cause the `io.Copy` to return and the `group.Wait()` to return.
157
+				cfg.Stdin.Close()
158
+			}
159
+
160
+			// Now with these closed, wait should return.
161
+			if err := group.Wait(); err != nil {
162
+				errs <- err
163
+				return
164
+			}
165
+			errs <- ctx.Err()
166
+		case err := <-groupErr:
167
+			errs <- err
168
+		}
169
+	}()
170
+
171
+	return errs
172
+}
173
+
174
+func copyEscapable(dst io.Writer, src io.ReadCloser, keys []byte) (written int64, _ error) {
175
+	if len(keys) == 0 {
176
+		keys = defaultEscapeSequence
177
+	}
178
+	pr := term.NewEscapeProxy(src, keys)
179
+	defer src.Close()
180
+
181
+	return pools.Copy(dst, pr)
182
+}
0 183
new file mode 100644
... ...
@@ -0,0 +1,102 @@
0
+package stream
1
+
2
+import (
3
+	"context"
4
+	"io"
5
+	"testing"
6
+	"time"
7
+
8
+	"gotest.tools/v3/assert"
9
+)
10
+
11
+// Make sure when there is no I/O on a stream that the goroutines do not get blocked after the container exits.
12
+func TestAttachNoIO(t *testing.T) {
13
+	t.Run("stdin only", func(t *testing.T) {
14
+		stdinR, _ := io.Pipe()
15
+		defer stdinR.Close()
16
+		testStreamCopy(t, stdinR, nil, nil)
17
+	})
18
+
19
+	t.Run("stdout only", func(t *testing.T) {
20
+		_, w := io.Pipe()
21
+		defer w.Close()
22
+		testStreamCopy(t, nil, w, nil)
23
+	})
24
+
25
+	t.Run("stderr only", func(t *testing.T) {
26
+		_, w := io.Pipe()
27
+		defer w.Close()
28
+		testStreamCopy(t, nil, nil, w)
29
+	})
30
+
31
+	t.Run("stdout+stderr", func(t *testing.T) {
32
+		_, stdoutW := io.Pipe()
33
+		defer stdoutW.Close()
34
+		_, stderrW := io.Pipe()
35
+		defer stderrW.Close()
36
+
37
+		testStreamCopy(t, nil, stdoutW, stderrW)
38
+	})
39
+
40
+	t.Run("stdin+stdout", func(t *testing.T) {
41
+		stdin, _ := io.Pipe()
42
+		defer stdin.Close()
43
+		_, stdout := io.Pipe()
44
+		defer stdout.Close()
45
+
46
+		testStreamCopy(t, stdin, stdout, nil)
47
+	})
48
+
49
+	t.Run("stdin+stderr", func(t *testing.T) {
50
+		stdin, _ := io.Pipe()
51
+		defer stdin.Close()
52
+		_, stderr := io.Pipe()
53
+		defer stderr.Close()
54
+
55
+		testStreamCopy(t, stdin, nil, stderr)
56
+	})
57
+
58
+	t.Run("stdin+stdout+stderr", func(t *testing.T) {
59
+		stdinR, _ := io.Pipe()
60
+		defer stdinR.Close()
61
+		stdoutR, stdoutW := io.Pipe()
62
+		defer stdoutR.Close()
63
+		stderrR, stderrW := io.Pipe()
64
+		defer stderrR.Close()
65
+		testStreamCopy(t, stdinR, stdoutW, stderrW)
66
+	})
67
+}
68
+
69
+func testStreamCopy(t *testing.T, stdin io.ReadCloser, stdout, stderr io.WriteCloser) {
70
+	cfg := AttachConfig{
71
+		UseStdin:  stdin != nil,
72
+		UseStdout: stdout != nil,
73
+		UseStderr: stderr != nil,
74
+		Stdin:     stdin,
75
+		Stdout:    stdout,
76
+		Stderr:    stderr,
77
+	}
78
+
79
+	sc := NewConfig()
80
+	sc.AttachStreams(&cfg)
81
+	defer sc.CloseStreams()
82
+
83
+	ctx, cancel := context.WithCancel(context.Background())
84
+	defer cancel()
85
+	chErr := sc.CopyStreams(ctx, &cfg)
86
+
87
+	select {
88
+	case err := <-chErr:
89
+		assert.NilError(t, err)
90
+	default:
91
+	}
92
+
93
+	cancel()
94
+
95
+	select {
96
+	case err := <-chErr:
97
+		assert.ErrorIs(t, err, context.Canceled)
98
+	case <-time.After(10 * time.Second):
99
+		t.Fatal("timeout waiting for CopyStreams to exit")
100
+	}
101
+}
0 102
new file mode 100644
... ...
@@ -0,0 +1,187 @@
0
+package stream
1
+
2
+import (
3
+	"context"
4
+	"errors"
5
+	"fmt"
6
+	"io"
7
+	"sync"
8
+	"sync/atomic"
9
+
10
+	"github.com/containerd/containerd/v2/pkg/cio"
11
+	"github.com/containerd/log"
12
+	"github.com/docker/docker/daemon/internal/stream/bytespipe"
13
+	"github.com/docker/docker/pkg/pools"
14
+)
15
+
16
+// Config holds information about I/O streams managed together.
17
+//
18
+// config.StdinPipe returns a WriteCloser which can be used to feed data
19
+// to the standard input of the streamConfig's active process.
20
+// config.StdoutPipe and streamConfig.StderrPipe each return a ReadCloser
21
+// which can be used to retrieve the standard output (and error) generated
22
+// by the container's active process. The output (and error) are actually
23
+// copied and delivered to all StdoutPipe and StderrPipe consumers, using
24
+// a kind of "broadcaster".
25
+type Config struct {
26
+	wg        sync.WaitGroup
27
+	stdout    *unbuffered
28
+	stderr    *unbuffered
29
+	stdin     io.ReadCloser
30
+	stdinPipe io.WriteCloser
31
+	dio       *cio.DirectIO
32
+	// closed is set to true when CloseStreams is called
33
+	closed atomic.Bool
34
+}
35
+
36
+// NewConfig creates a stream config and initializes
37
+// the standard err and standard out to new unbuffered broadcasters.
38
+func NewConfig() *Config {
39
+	return &Config{
40
+		stderr: new(unbuffered),
41
+		stdout: new(unbuffered),
42
+	}
43
+}
44
+
45
+// Stdout returns the standard output in the configuration.
46
+func (c *Config) Stdout() io.Writer {
47
+	return c.stdout
48
+}
49
+
50
+// Stderr returns the standard error in the configuration.
51
+func (c *Config) Stderr() io.Writer {
52
+	return c.stderr
53
+}
54
+
55
+// Stdin returns the standard input in the configuration.
56
+func (c *Config) Stdin() io.ReadCloser {
57
+	return c.stdin
58
+}
59
+
60
+// StdinPipe returns an input writer pipe as an io.WriteCloser.
61
+func (c *Config) StdinPipe() io.WriteCloser {
62
+	return c.stdinPipe
63
+}
64
+
65
+// StdoutPipe creates a new io.ReadCloser with an empty bytes pipe.
66
+// It adds this new out pipe to the Stdout broadcaster.
67
+// This will block stdout if unconsumed.
68
+func (c *Config) StdoutPipe() io.ReadCloser {
69
+	bytesPipe := bytespipe.New()
70
+	c.stdout.Add(bytesPipe)
71
+	return bytesPipe
72
+}
73
+
74
+// StderrPipe creates a new io.ReadCloser with an empty bytes pipe.
75
+// It adds this new err pipe to the Stderr broadcaster.
76
+// This will block stderr if unconsumed.
77
+func (c *Config) StderrPipe() io.ReadCloser {
78
+	bytesPipe := bytespipe.New()
79
+	c.stderr.Add(bytesPipe)
80
+	return bytesPipe
81
+}
82
+
83
+// NewInputPipes creates new pipes for both standard inputs, Stdin and StdinPipe.
84
+func (c *Config) NewInputPipes() {
85
+	c.stdin, c.stdinPipe = io.Pipe()
86
+}
87
+
88
+// NewNopInputPipe creates a new input pipe that will silently drop all messages in the input.
89
+func (c *Config) NewNopInputPipe() {
90
+	c.stdinPipe = &nopWriteCloser{io.Discard}
91
+}
92
+
93
+type nopWriteCloser struct {
94
+	io.Writer
95
+}
96
+
97
+func (w *nopWriteCloser) Close() error { return nil }
98
+
99
+// CloseStreams ensures that the configured streams are properly closed.
100
+func (c *Config) CloseStreams() error {
101
+	var errs error
102
+
103
+	c.closed.Store(true)
104
+
105
+	if c.stdin != nil {
106
+		if err := c.stdin.Close(); err != nil {
107
+			errs = errors.Join(errs, fmt.Errorf("error close stdin: %w", err))
108
+		}
109
+	}
110
+
111
+	if err := c.stdout.Clean(); err != nil {
112
+		errs = errors.Join(errs, fmt.Errorf("error close stdout: %w", err))
113
+	}
114
+
115
+	if err := c.stderr.Clean(); err != nil {
116
+		errs = errors.Join(errs, fmt.Errorf("error close stderr: %w", err))
117
+	}
118
+
119
+	return errs
120
+}
121
+
122
+// CopyToPipe connects streamconfig with a libcontainerd.IOPipe
123
+func (c *Config) CopyToPipe(iop *cio.DirectIO) {
124
+	ctx := context.TODO()
125
+
126
+	c.dio = iop
127
+	copyFunc := func(name string, w io.Writer, r io.ReadCloser) {
128
+		c.wg.Add(1)
129
+		go func() {
130
+			defer c.wg.Done()
131
+			if _, err := pools.Copy(w, r); err != nil {
132
+				if c.closed.Load() {
133
+					return
134
+				}
135
+				log.G(ctx).WithFields(log.Fields{"stream": name, "error": err}).Error("copy stream failed")
136
+			}
137
+			if err := r.Close(); err != nil && !c.closed.Load() {
138
+				log.G(ctx).WithFields(log.Fields{"stream": name, "error": err}).Warn("close stream failed")
139
+			}
140
+		}()
141
+	}
142
+
143
+	if iop.Stdout != nil {
144
+		copyFunc("stdout", c.Stdout(), iop.Stdout)
145
+	}
146
+	if iop.Stderr != nil {
147
+		copyFunc("stderr", c.Stderr(), iop.Stderr)
148
+	}
149
+
150
+	if stdin := c.Stdin(); stdin != nil {
151
+		if iop.Stdin != nil {
152
+			go func() {
153
+				_, err := pools.Copy(iop.Stdin, stdin)
154
+				if err != nil {
155
+					if c.closed.Load() {
156
+						return
157
+					}
158
+					log.G(ctx).WithFields(log.Fields{"stream": "stdin", "error": err}).Error("copy stream failed")
159
+				}
160
+				if err := iop.Stdin.Close(); err != nil && !c.closed.Load() {
161
+					log.G(ctx).WithFields(log.Fields{"stream": "stdin", "error": err}).Warn("close stream failed")
162
+				}
163
+			}()
164
+		}
165
+	}
166
+}
167
+
168
+// Wait for the stream to close
169
+// Wait supports timeouts via the context to unblock and forcefully
170
+// close the io streams
171
+func (c *Config) Wait(ctx context.Context) {
172
+	done := make(chan struct{}, 1)
173
+	go func() {
174
+		c.wg.Wait()
175
+		close(done)
176
+	}()
177
+	select {
178
+	case <-done:
179
+	case <-ctx.Done():
180
+		if c.dio != nil {
181
+			c.dio.Cancel()
182
+			c.dio.Wait()
183
+			c.dio.Close()
184
+		}
185
+	}
186
+}
0 187
new file mode 100644
... ...
@@ -0,0 +1,49 @@
0
+package stream
1
+
2
+import (
3
+	"io"
4
+	"sync"
5
+)
6
+
7
+// unbuffered accumulates multiple io.WriteCloser by stream.
8
+type unbuffered struct {
9
+	mu      sync.Mutex
10
+	writers []io.WriteCloser
11
+}
12
+
13
+// Add adds new io.WriteCloser.
14
+func (w *unbuffered) Add(writer io.WriteCloser) {
15
+	w.mu.Lock()
16
+	w.writers = append(w.writers, writer)
17
+	w.mu.Unlock()
18
+}
19
+
20
+// Write writes bytes to all writers. Failed writers will be evicted during
21
+// this call.
22
+func (w *unbuffered) Write(p []byte) (int, error) {
23
+	w.mu.Lock()
24
+	var evict []int
25
+	for i, sw := range w.writers {
26
+		if n, err := sw.Write(p); err != nil || n != len(p) {
27
+			// On error, evict the writer
28
+			evict = append(evict, i)
29
+		}
30
+	}
31
+	for n, i := range evict {
32
+		w.writers = append(w.writers[:i-n], w.writers[i-n+1:]...)
33
+	}
34
+	w.mu.Unlock()
35
+	return len(p), nil
36
+}
37
+
38
+// Clean closes and removes all writers. Last non-eol-terminated part of data
39
+// will be saved.
40
+func (w *unbuffered) Clean() error {
41
+	w.mu.Lock()
42
+	for _, sw := range w.writers {
43
+		sw.Close()
44
+	}
45
+	w.writers = nil
46
+	w.mu.Unlock()
47
+	return nil
48
+}
0 49
new file mode 100644
... ...
@@ -0,0 +1,164 @@
0
+package stream
1
+
2
+import (
3
+	"bytes"
4
+	"errors"
5
+	"strings"
6
+	"testing"
7
+)
8
+
9
+type dummyWriter struct {
10
+	buffer      bytes.Buffer
11
+	failOnWrite bool
12
+}
13
+
14
+func (dw *dummyWriter) Write(p []byte) (int, error) {
15
+	if dw.failOnWrite {
16
+		return 0, errors.New("Fake fail")
17
+	}
18
+	return dw.buffer.Write(p)
19
+}
20
+
21
+func (dw *dummyWriter) String() string {
22
+	return dw.buffer.String()
23
+}
24
+
25
+func (dw *dummyWriter) Close() error {
26
+	return nil
27
+}
28
+
29
+func TestUnbuffered(t *testing.T) {
30
+	writer := new(unbuffered)
31
+
32
+	// Test 1: Both bufferA and bufferB should contain "foo"
33
+	bufferA := &dummyWriter{}
34
+	writer.Add(bufferA)
35
+	bufferB := &dummyWriter{}
36
+	writer.Add(bufferB)
37
+	writer.Write([]byte("foo"))
38
+
39
+	if bufferA.String() != "foo" {
40
+		t.Errorf("Buffer contains %v", bufferA.String())
41
+	}
42
+
43
+	if bufferB.String() != "foo" {
44
+		t.Errorf("Buffer contains %v", bufferB.String())
45
+	}
46
+
47
+	// Test2: bufferA and bufferB should contain "foobar",
48
+	// while bufferC should only contain "bar"
49
+	bufferC := &dummyWriter{}
50
+	writer.Add(bufferC)
51
+	writer.Write([]byte("bar"))
52
+
53
+	if bufferA.String() != "foobar" {
54
+		t.Errorf("Buffer contains %v", bufferA.String())
55
+	}
56
+
57
+	if bufferB.String() != "foobar" {
58
+		t.Errorf("Buffer contains %v", bufferB.String())
59
+	}
60
+
61
+	if bufferC.String() != "bar" {
62
+		t.Errorf("Buffer contains %v", bufferC.String())
63
+	}
64
+
65
+	// Test3: Test eviction on failure
66
+	bufferA.failOnWrite = true
67
+	writer.Write([]byte("fail"))
68
+	if bufferA.String() != "foobar" {
69
+		t.Errorf("Buffer contains %v", bufferA.String())
70
+	}
71
+	if bufferC.String() != "barfail" {
72
+		t.Errorf("Buffer contains %v", bufferC.String())
73
+	}
74
+	// Even though we reset the flag, no more writes should go in there
75
+	bufferA.failOnWrite = false
76
+	writer.Write([]byte("test"))
77
+	if bufferA.String() != "foobar" {
78
+		t.Errorf("Buffer contains %v", bufferA.String())
79
+	}
80
+	if bufferC.String() != "barfailtest" {
81
+		t.Errorf("Buffer contains %v", bufferC.String())
82
+	}
83
+
84
+	// Test4: Test eviction on multiple simultaneous failures
85
+	bufferB.failOnWrite = true
86
+	bufferC.failOnWrite = true
87
+	bufferD := &dummyWriter{}
88
+	writer.Add(bufferD)
89
+	writer.Write([]byte("yo"))
90
+	writer.Write([]byte("ink"))
91
+	if strings.Contains(bufferB.String(), "yoink") {
92
+		t.Errorf("bufferB received write. contents: %q", bufferB)
93
+	}
94
+	if strings.Contains(bufferC.String(), "yoink") {
95
+		t.Errorf("bufferC received write. contents: %q", bufferC)
96
+	}
97
+	if g, w := bufferD.String(), "yoink"; g != w {
98
+		t.Errorf("bufferD = %q, want %q", g, w)
99
+	}
100
+
101
+	writer.Clean()
102
+}
103
+
104
+type devNullCloser int
105
+
106
+func (d devNullCloser) Close() error {
107
+	return nil
108
+}
109
+
110
+func (d devNullCloser) Write(buf []byte) (int, error) {
111
+	return len(buf), nil
112
+}
113
+
114
+// This test checks for races. It is only useful when run with the race detector.
115
+func TestRaceUnbuffered(t *testing.T) {
116
+	writer := new(unbuffered)
117
+	c := make(chan bool)
118
+	go func() {
119
+		writer.Add(devNullCloser(0))
120
+		c <- true
121
+	}()
122
+	_, err := writer.Write([]byte("hello"))
123
+	if err != nil {
124
+		t.Error(err)
125
+	}
126
+	<-c
127
+}
128
+
129
+func BenchmarkUnbuffered(b *testing.B) {
130
+	writer := new(unbuffered)
131
+	setUpWriter := func() {
132
+		for i := 0; i < 100; i++ {
133
+			writer.Add(devNullCloser(0))
134
+			writer.Add(devNullCloser(0))
135
+			writer.Add(devNullCloser(0))
136
+		}
137
+	}
138
+	testLine := "Line that thinks that it is log line from docker"
139
+	var buf bytes.Buffer
140
+	for i := 0; i < 100; i++ {
141
+		buf.WriteString(testLine + "\n")
142
+	}
143
+	// line without eol
144
+	buf.WriteString(testLine)
145
+	testText := buf.Bytes()
146
+	b.SetBytes(int64(5 * len(testText)))
147
+	b.ResetTimer()
148
+	for i := 0; i < b.N; i++ {
149
+		b.StopTimer()
150
+		setUpWriter()
151
+		b.StartTimer()
152
+
153
+		for j := 0; j < 5; j++ {
154
+			if _, err := writer.Write(testText); err != nil {
155
+				b.Fatal(err)
156
+			}
157
+		}
158
+
159
+		b.StopTimer()
160
+		writer.Clean()
161
+		b.StartTimer()
162
+	}
163
+}