Browse code

Move api/stdcopy to api/pkg/stdcopy

Signed-off-by: Derek McGowan <derek@mcg.dev>
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>

Derek McGowan authored on 2025/07/30 05:28:00
Showing 25 changed files
1 1
new file mode 100644
... ...
@@ -0,0 +1,201 @@
0
+package stdcopy
1
+
2
+import (
3
+	"bytes"
4
+	"encoding/binary"
5
+	"errors"
6
+	"fmt"
7
+	"io"
8
+	"sync"
9
+)
10
+
11
+// StdType is the type of standard stream
12
+// a writer can multiplex to.
13
+type StdType byte
14
+
15
+const (
16
+	// Stdin represents standard input stream type.
17
+	Stdin StdType = iota
18
+	// Stdout represents standard output stream type.
19
+	Stdout
20
+	// Stderr represents standard error steam type.
21
+	Stderr
22
+	// Systemerr represents errors originating from the system that make it
23
+	// into the multiplexed stream.
24
+	Systemerr
25
+
26
+	stdWriterPrefixLen = 8
27
+	stdWriterFdIndex   = 0
28
+	stdWriterSizeIndex = 4
29
+
30
+	startingBufLen = 32*1024 + stdWriterPrefixLen + 1
31
+)
32
+
33
+var bufPool = &sync.Pool{New: func() interface{} { return bytes.NewBuffer(nil) }}
34
+
35
+// stdWriter is wrapper of io.Writer with extra customized info.
36
+type stdWriter struct {
37
+	io.Writer
38
+	prefix byte
39
+}
40
+
41
+// Write sends the buffer to the underlying writer.
42
+// It inserts the prefix header before the buffer,
43
+// so [StdCopy] knows where to multiplex the output.
44
+//
45
+// It implements [io.Writer].
46
+func (w *stdWriter) Write(p []byte) (int, error) {
47
+	if w == nil || w.Writer == nil {
48
+		return 0, errors.New("writer not instantiated")
49
+	}
50
+	if p == nil {
51
+		return 0, nil
52
+	}
53
+
54
+	header := [stdWriterPrefixLen]byte{stdWriterFdIndex: w.prefix}
55
+	binary.BigEndian.PutUint32(header[stdWriterSizeIndex:], uint32(len(p)))
56
+	buf := bufPool.Get().(*bytes.Buffer)
57
+	buf.Write(header[:])
58
+	buf.Write(p)
59
+
60
+	n, err := w.Writer.Write(buf.Bytes())
61
+	n -= stdWriterPrefixLen
62
+	if n < 0 {
63
+		n = 0
64
+	}
65
+
66
+	buf.Reset()
67
+	bufPool.Put(buf)
68
+	return n, err
69
+}
70
+
71
+// NewStdWriter instantiates a new writer using a custom format to multiplex
72
+// multiple streams to a single writer. All messages written using this writer
73
+// are encapsulated using a custom format, and written to the underlying
74
+// stream "w".
75
+//
76
+// Writers created through NewStdWriter allow for multiple write streams
77
+// (e.g. stdout ([Stdout]) and stderr ([Stderr]) to be multiplexed into a
78
+// single connection. "streamType" indicates the type of stream to encapsulate,
79
+// and can be [Stdin], [Stdout], pr [Stderr].
80
+func NewStdWriter(w io.Writer, streamType StdType) io.Writer {
81
+	return &stdWriter{
82
+		Writer: w,
83
+		prefix: byte(streamType),
84
+	}
85
+}
86
+
87
+// StdCopy is a modified version of [io.Copy] to de-multiplex messages
88
+// from "multiplexedSource" and copy them to destination streams
89
+// "destOut" and "destErr".
90
+//
91
+// StdCopy demultiplexes "multiplexedSource", assuming that it contains
92
+// two streams, previously multiplexed using a writer created with
93
+// [NewStdWriter].
94
+//
95
+// As it reads from "multiplexedSource", StdCopy writes [Stdout] messages
96
+// to "destOut", and [Stderr] message to "destErr].
97
+//
98
+// StdCopy it reads until it hits [io.EOF] on "multiplexedSource", after
99
+// which it returns a nil error. In other words: any error returned indicates
100
+// a real underlying error.
101
+//
102
+// The "written" return holds the total number of bytes written to "destOut"
103
+// and "destErr" combined.
104
+func StdCopy(destOut, destErr io.Writer, multiplexedSource io.Reader) (written int64, _ error) {
105
+	var (
106
+		buf       = make([]byte, startingBufLen)
107
+		bufLen    = len(buf)
108
+		nr, nw    int
109
+		err       error
110
+		out       io.Writer
111
+		frameSize int
112
+	)
113
+
114
+	for {
115
+		// Make sure we have at least a full header
116
+		for nr < stdWriterPrefixLen {
117
+			var nr2 int
118
+			nr2, err = multiplexedSource.Read(buf[nr:])
119
+			nr += nr2
120
+			if errors.Is(err, io.EOF) {
121
+				if nr < stdWriterPrefixLen {
122
+					return written, nil
123
+				}
124
+				break
125
+			}
126
+			if err != nil {
127
+				return 0, err
128
+			}
129
+		}
130
+
131
+		stream := StdType(buf[stdWriterFdIndex])
132
+		// Check the first byte to know where to write
133
+		switch stream {
134
+		case Stdin:
135
+			fallthrough
136
+		case Stdout:
137
+			// Write on stdout
138
+			out = destOut
139
+		case Stderr:
140
+			// Write on stderr
141
+			out = destErr
142
+		case Systemerr:
143
+			// If we're on Systemerr, we won't write anywhere.
144
+			// NB: if this code changes later, make sure you don't try to write
145
+			// to outstream if Systemerr is the stream
146
+			out = nil
147
+		default:
148
+			return 0, fmt.Errorf("unrecognized input header: %d", buf[stdWriterFdIndex])
149
+		}
150
+
151
+		// Retrieve the size of the frame
152
+		frameSize = int(binary.BigEndian.Uint32(buf[stdWriterSizeIndex : stdWriterSizeIndex+4]))
153
+
154
+		// Check if the buffer is big enough to read the frame.
155
+		// Extend it if necessary.
156
+		if frameSize+stdWriterPrefixLen > bufLen {
157
+			buf = append(buf, make([]byte, frameSize+stdWriterPrefixLen-bufLen+1)...)
158
+			bufLen = len(buf)
159
+		}
160
+
161
+		// While the amount of bytes read is less than the size of the frame + header, we keep reading
162
+		for nr < frameSize+stdWriterPrefixLen {
163
+			var nr2 int
164
+			nr2, err = multiplexedSource.Read(buf[nr:])
165
+			nr += nr2
166
+			if errors.Is(err, io.EOF) {
167
+				if nr < frameSize+stdWriterPrefixLen {
168
+					return written, nil
169
+				}
170
+				break
171
+			}
172
+			if err != nil {
173
+				return 0, err
174
+			}
175
+		}
176
+
177
+		// we might have an error from the source mixed up in our multiplexed
178
+		// stream. if we do, return it.
179
+		if stream == Systemerr {
180
+			return written, fmt.Errorf("error from daemon in stream: %s", string(buf[stdWriterPrefixLen:frameSize+stdWriterPrefixLen]))
181
+		}
182
+
183
+		// Write the retrieved frame (without header)
184
+		nw, err = out.Write(buf[stdWriterPrefixLen : frameSize+stdWriterPrefixLen])
185
+		if err != nil {
186
+			return 0, err
187
+		}
188
+
189
+		// If the frame has not been fully written: error
190
+		if nw != frameSize {
191
+			return 0, io.ErrShortWrite
192
+		}
193
+		written += int64(nw)
194
+
195
+		// Move the rest of the buffer to the beginning
196
+		copy(buf, buf[frameSize+stdWriterPrefixLen:])
197
+		// Move the index
198
+		nr -= frameSize + stdWriterPrefixLen
199
+	}
200
+}
0 201
new file mode 100644
... ...
@@ -0,0 +1,292 @@
0
+package stdcopy
1
+
2
+import (
3
+	"bytes"
4
+	"errors"
5
+	"io"
6
+	"strings"
7
+	"testing"
8
+)
9
+
10
+func TestNewStdWriter(t *testing.T) {
11
+	writer := NewStdWriter(io.Discard, Stdout)
12
+	if writer == nil {
13
+		t.Fatalf("NewStdWriter with an invalid StdType should not return nil.")
14
+	}
15
+}
16
+
17
+func TestWriteWithUninitializedStdWriter(t *testing.T) {
18
+	writer := stdWriter{
19
+		Writer: nil,
20
+		prefix: byte(Stdout),
21
+	}
22
+	n, err := writer.Write([]byte("Something here"))
23
+	if n != 0 || err == nil {
24
+		t.Fatalf("Should fail when given an incomplete or uninitialized StdWriter")
25
+	}
26
+}
27
+
28
+func TestWriteWithNilBytes(t *testing.T) {
29
+	writer := NewStdWriter(io.Discard, Stdout)
30
+	n, err := writer.Write(nil)
31
+	if err != nil {
32
+		t.Fatalf("Shouldn't have fail when given no data")
33
+	}
34
+	if n > 0 {
35
+		t.Fatalf("Write should have written 0 byte, but has written %d", n)
36
+	}
37
+}
38
+
39
+func TestWrite(t *testing.T) {
40
+	writer := NewStdWriter(io.Discard, Stdout)
41
+	data := []byte("Test StdWrite.Write")
42
+	n, err := writer.Write(data)
43
+	if err != nil {
44
+		t.Fatalf("Error while writing with StdWrite")
45
+	}
46
+	if n != len(data) {
47
+		t.Fatalf("Write should have written %d byte but wrote %d.", len(data), n)
48
+	}
49
+}
50
+
51
+type errWriter struct {
52
+	n   int
53
+	err error
54
+}
55
+
56
+func (f *errWriter) Write(buf []byte) (int, error) {
57
+	return f.n, f.err
58
+}
59
+
60
+func TestWriteWithWriterError(t *testing.T) {
61
+	expectedError := errors.New("expected")
62
+	expectedReturnedBytes := 10
63
+	writer := NewStdWriter(&errWriter{
64
+		n:   stdWriterPrefixLen + expectedReturnedBytes,
65
+		err: expectedError,
66
+	}, Stdout)
67
+	data := []byte("This won't get written, sigh")
68
+	n, err := writer.Write(data)
69
+	if !errors.Is(err, expectedError) {
70
+		t.Fatalf("Didn't get expected error.")
71
+	}
72
+	if n != expectedReturnedBytes {
73
+		t.Fatalf("Didn't get expected written bytes %d, got %d.",
74
+			expectedReturnedBytes, n)
75
+	}
76
+}
77
+
78
+func TestWriteDoesNotReturnNegativeWrittenBytes(t *testing.T) {
79
+	writer := NewStdWriter(&errWriter{n: -1}, Stdout)
80
+	data := []byte("This won't get written, sigh")
81
+	actual, _ := writer.Write(data)
82
+	if actual != 0 {
83
+		t.Fatalf("Expected returned written bytes equal to 0, got %d", actual)
84
+	}
85
+}
86
+
87
+func getSrcBuffer(stdOutBytes, stdErrBytes []byte) (*bytes.Buffer, error) {
88
+	buffer := new(bytes.Buffer)
89
+	dstOut := NewStdWriter(buffer, Stdout)
90
+	_, err := dstOut.Write(stdOutBytes)
91
+	if err != nil {
92
+		return buffer, err
93
+	}
94
+	dstErr := NewStdWriter(buffer, Stderr)
95
+	_, err = dstErr.Write(stdErrBytes)
96
+	return buffer, err
97
+}
98
+
99
+func TestStdCopyWriteAndRead(t *testing.T) {
100
+	stdOutBytes := []byte(strings.Repeat("o", startingBufLen))
101
+	stdErrBytes := []byte(strings.Repeat("e", startingBufLen))
102
+	buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes)
103
+	if err != nil {
104
+		t.Fatal(err)
105
+	}
106
+	written, err := StdCopy(io.Discard, io.Discard, buffer)
107
+	if err != nil {
108
+		t.Fatal(err)
109
+	}
110
+	expectedTotalWritten := len(stdOutBytes) + len(stdErrBytes)
111
+	if written != int64(expectedTotalWritten) {
112
+		t.Fatalf("Expected to have total of %d bytes written, got %d", expectedTotalWritten, written)
113
+	}
114
+}
115
+
116
+type customReader struct {
117
+	n            int
118
+	err          error
119
+	totalCalls   int
120
+	correctCalls int
121
+	src          *bytes.Buffer
122
+}
123
+
124
+func (f *customReader) Read(buf []byte) (int, error) {
125
+	f.totalCalls++
126
+	if f.totalCalls <= f.correctCalls {
127
+		return f.src.Read(buf)
128
+	}
129
+	return f.n, f.err
130
+}
131
+
132
+func TestStdCopyReturnsErrorReadingHeader(t *testing.T) {
133
+	expectedError := errors.New("error")
134
+	reader := &customReader{
135
+		err: expectedError,
136
+	}
137
+	written, err := StdCopy(io.Discard, io.Discard, reader)
138
+	if written != 0 {
139
+		t.Fatalf("Expected 0 bytes read, got %d", written)
140
+	}
141
+	if !errors.Is(err, expectedError) {
142
+		t.Fatalf("Didn't get expected error")
143
+	}
144
+}
145
+
146
+func TestStdCopyReturnsErrorReadingFrame(t *testing.T) {
147
+	expectedError := errors.New("error")
148
+	stdOutBytes := []byte(strings.Repeat("o", startingBufLen))
149
+	stdErrBytes := []byte(strings.Repeat("e", startingBufLen))
150
+	buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes)
151
+	if err != nil {
152
+		t.Fatal(err)
153
+	}
154
+	reader := &customReader{
155
+		correctCalls: 1,
156
+		n:            stdWriterPrefixLen + 1,
157
+		err:          expectedError,
158
+		src:          buffer,
159
+	}
160
+	written, err := StdCopy(io.Discard, io.Discard, reader)
161
+	if written != 0 {
162
+		t.Fatalf("Expected 0 bytes read, got %d", written)
163
+	}
164
+	if !errors.Is(err, expectedError) {
165
+		t.Fatalf("Didn't get expected error")
166
+	}
167
+}
168
+
169
+func TestStdCopyDetectsCorruptedFrame(t *testing.T) {
170
+	stdOutBytes := []byte(strings.Repeat("o", startingBufLen))
171
+	stdErrBytes := []byte(strings.Repeat("e", startingBufLen))
172
+	buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes)
173
+	if err != nil {
174
+		t.Fatal(err)
175
+	}
176
+	reader := &customReader{
177
+		correctCalls: 1,
178
+		n:            stdWriterPrefixLen + 1,
179
+		err:          io.EOF,
180
+		src:          buffer,
181
+	}
182
+	written, err := StdCopy(io.Discard, io.Discard, reader)
183
+	if written != startingBufLen {
184
+		t.Fatalf("Expected %d bytes read, got %d", startingBufLen, written)
185
+	}
186
+	if err != nil {
187
+		t.Fatal("Didn't get nil error")
188
+	}
189
+}
190
+
191
+func TestStdCopyWithInvalidInputHeader(t *testing.T) {
192
+	dstOut := NewStdWriter(io.Discard, Stdout)
193
+	dstErr := NewStdWriter(io.Discard, Stderr)
194
+	src := strings.NewReader("Invalid input")
195
+	_, err := StdCopy(dstOut, dstErr, src)
196
+	if err == nil {
197
+		t.Fatal("StdCopy with invalid input header should fail.")
198
+	}
199
+}
200
+
201
+func TestStdCopyWithCorruptedPrefix(t *testing.T) {
202
+	data := []byte{0x01, 0x02, 0x03}
203
+	src := bytes.NewReader(data)
204
+	written, err := StdCopy(nil, nil, src)
205
+	if err != nil {
206
+		t.Fatalf("StdCopy should not return an error with corrupted prefix.")
207
+	}
208
+	if written != 0 {
209
+		t.Fatalf("StdCopy should have written 0, but has written %d", written)
210
+	}
211
+}
212
+
213
+func TestStdCopyReturnsWriteErrors(t *testing.T) {
214
+	stdOutBytes := []byte(strings.Repeat("o", startingBufLen))
215
+	stdErrBytes := []byte(strings.Repeat("e", startingBufLen))
216
+	buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes)
217
+	if err != nil {
218
+		t.Fatal(err)
219
+	}
220
+	expectedError := errors.New("expected")
221
+
222
+	dstOut := &errWriter{err: expectedError}
223
+
224
+	written, err := StdCopy(dstOut, io.Discard, buffer)
225
+	if written != 0 {
226
+		t.Fatalf("StdCopy should have written 0, but has written %d", written)
227
+	}
228
+	if !errors.Is(err, expectedError) {
229
+		t.Fatalf("Didn't get expected error, got %v", err)
230
+	}
231
+}
232
+
233
+func TestStdCopyDetectsNotFullyWrittenFrames(t *testing.T) {
234
+	stdOutBytes := []byte(strings.Repeat("o", startingBufLen))
235
+	stdErrBytes := []byte(strings.Repeat("e", startingBufLen))
236
+	buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes)
237
+	if err != nil {
238
+		t.Fatal(err)
239
+	}
240
+	dstOut := &errWriter{n: startingBufLen - 10}
241
+
242
+	written, err := StdCopy(dstOut, io.Discard, buffer)
243
+	if written != 0 {
244
+		t.Fatalf("StdCopy should have return 0 written bytes, but returned %d", written)
245
+	}
246
+	if !errors.Is(err, io.ErrShortWrite) {
247
+		t.Fatalf("Didn't get expected io.ErrShortWrite error")
248
+	}
249
+}
250
+
251
+// TestStdCopyReturnsErrorFromSystem tests that StdCopy correctly returns an
252
+// error, when that error is muxed into the Systemerr stream.
253
+func TestStdCopyReturnsErrorFromSystem(t *testing.T) {
254
+	// write in the basic messages, just so there's some fluff in there
255
+	stdOutBytes := []byte(strings.Repeat("o", startingBufLen))
256
+	stdErrBytes := []byte(strings.Repeat("e", startingBufLen))
257
+	buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes)
258
+	if err != nil {
259
+		t.Fatal(err)
260
+	}
261
+	// add in an error message on the Systemerr stream
262
+	systemErrBytes := []byte(strings.Repeat("S", startingBufLen))
263
+	systemWriter := NewStdWriter(buffer, Systemerr)
264
+	_, err = systemWriter.Write(systemErrBytes)
265
+	if err != nil {
266
+		t.Fatal(err)
267
+	}
268
+
269
+	// now copy and demux. we should expect an error containing the string we
270
+	// wrote out
271
+	_, err = StdCopy(io.Discard, io.Discard, buffer)
272
+	if err == nil {
273
+		t.Fatal("expected error, got none")
274
+	}
275
+	if !strings.Contains(err.Error(), string(systemErrBytes)) {
276
+		t.Fatal("expected error to contain message")
277
+	}
278
+}
279
+
280
+func BenchmarkWrite(b *testing.B) {
281
+	w := NewStdWriter(io.Discard, Stdout)
282
+	data := []byte("Test line for testing stdwriter performance\n")
283
+	data = bytes.Repeat(data, 100)
284
+	b.SetBytes(int64(len(data)))
285
+	b.ResetTimer()
286
+	for i := 0; i < b.N; i++ {
287
+		if _, err := w.Write(data); err != nil {
288
+			b.Fatal(err)
289
+		}
290
+	}
291
+}
0 292
deleted file mode 100644
... ...
@@ -1,201 +0,0 @@
1
-package stdcopy
2
-
3
-import (
4
-	"bytes"
5
-	"encoding/binary"
6
-	"errors"
7
-	"fmt"
8
-	"io"
9
-	"sync"
10
-)
11
-
12
-// StdType is the type of standard stream
13
-// a writer can multiplex to.
14
-type StdType byte
15
-
16
-const (
17
-	// Stdin represents standard input stream type.
18
-	Stdin StdType = iota
19
-	// Stdout represents standard output stream type.
20
-	Stdout
21
-	// Stderr represents standard error steam type.
22
-	Stderr
23
-	// Systemerr represents errors originating from the system that make it
24
-	// into the multiplexed stream.
25
-	Systemerr
26
-
27
-	stdWriterPrefixLen = 8
28
-	stdWriterFdIndex   = 0
29
-	stdWriterSizeIndex = 4
30
-
31
-	startingBufLen = 32*1024 + stdWriterPrefixLen + 1
32
-)
33
-
34
-var bufPool = &sync.Pool{New: func() interface{} { return bytes.NewBuffer(nil) }}
35
-
36
-// stdWriter is wrapper of io.Writer with extra customized info.
37
-type stdWriter struct {
38
-	io.Writer
39
-	prefix byte
40
-}
41
-
42
-// Write sends the buffer to the underlying writer.
43
-// It inserts the prefix header before the buffer,
44
-// so [StdCopy] knows where to multiplex the output.
45
-//
46
-// It implements [io.Writer].
47
-func (w *stdWriter) Write(p []byte) (int, error) {
48
-	if w == nil || w.Writer == nil {
49
-		return 0, errors.New("writer not instantiated")
50
-	}
51
-	if p == nil {
52
-		return 0, nil
53
-	}
54
-
55
-	header := [stdWriterPrefixLen]byte{stdWriterFdIndex: w.prefix}
56
-	binary.BigEndian.PutUint32(header[stdWriterSizeIndex:], uint32(len(p)))
57
-	buf := bufPool.Get().(*bytes.Buffer)
58
-	buf.Write(header[:])
59
-	buf.Write(p)
60
-
61
-	n, err := w.Writer.Write(buf.Bytes())
62
-	n -= stdWriterPrefixLen
63
-	if n < 0 {
64
-		n = 0
65
-	}
66
-
67
-	buf.Reset()
68
-	bufPool.Put(buf)
69
-	return n, err
70
-}
71
-
72
-// NewStdWriter instantiates a new writer using a custom format to multiplex
73
-// multiple streams to a single writer. All messages written using this writer
74
-// are encapsulated using a custom format, and written to the underlying
75
-// stream "w".
76
-//
77
-// Writers created through NewStdWriter allow for multiple write streams
78
-// (e.g. stdout ([Stdout]) and stderr ([Stderr]) to be multiplexed into a
79
-// single connection. "streamType" indicates the type of stream to encapsulate,
80
-// and can be [Stdin], [Stdout], pr [Stderr].
81
-func NewStdWriter(w io.Writer, streamType StdType) io.Writer {
82
-	return &stdWriter{
83
-		Writer: w,
84
-		prefix: byte(streamType),
85
-	}
86
-}
87
-
88
-// StdCopy is a modified version of [io.Copy] to de-multiplex messages
89
-// from "multiplexedSource" and copy them to destination streams
90
-// "destOut" and "destErr".
91
-//
92
-// StdCopy demultiplexes "multiplexedSource", assuming that it contains
93
-// two streams, previously multiplexed using a writer created with
94
-// [NewStdWriter].
95
-//
96
-// As it reads from "multiplexedSource", StdCopy writes [Stdout] messages
97
-// to "destOut", and [Stderr] message to "destErr].
98
-//
99
-// StdCopy it reads until it hits [io.EOF] on "multiplexedSource", after
100
-// which it returns a nil error. In other words: any error returned indicates
101
-// a real underlying error.
102
-//
103
-// The "written" return holds the total number of bytes written to "destOut"
104
-// and "destErr" combined.
105
-func StdCopy(destOut, destErr io.Writer, multiplexedSource io.Reader) (written int64, _ error) {
106
-	var (
107
-		buf       = make([]byte, startingBufLen)
108
-		bufLen    = len(buf)
109
-		nr, nw    int
110
-		err       error
111
-		out       io.Writer
112
-		frameSize int
113
-	)
114
-
115
-	for {
116
-		// Make sure we have at least a full header
117
-		for nr < stdWriterPrefixLen {
118
-			var nr2 int
119
-			nr2, err = multiplexedSource.Read(buf[nr:])
120
-			nr += nr2
121
-			if errors.Is(err, io.EOF) {
122
-				if nr < stdWriterPrefixLen {
123
-					return written, nil
124
-				}
125
-				break
126
-			}
127
-			if err != nil {
128
-				return 0, err
129
-			}
130
-		}
131
-
132
-		stream := StdType(buf[stdWriterFdIndex])
133
-		// Check the first byte to know where to write
134
-		switch stream {
135
-		case Stdin:
136
-			fallthrough
137
-		case Stdout:
138
-			// Write on stdout
139
-			out = destOut
140
-		case Stderr:
141
-			// Write on stderr
142
-			out = destErr
143
-		case Systemerr:
144
-			// If we're on Systemerr, we won't write anywhere.
145
-			// NB: if this code changes later, make sure you don't try to write
146
-			// to outstream if Systemerr is the stream
147
-			out = nil
148
-		default:
149
-			return 0, fmt.Errorf("unrecognized input header: %d", buf[stdWriterFdIndex])
150
-		}
151
-
152
-		// Retrieve the size of the frame
153
-		frameSize = int(binary.BigEndian.Uint32(buf[stdWriterSizeIndex : stdWriterSizeIndex+4]))
154
-
155
-		// Check if the buffer is big enough to read the frame.
156
-		// Extend it if necessary.
157
-		if frameSize+stdWriterPrefixLen > bufLen {
158
-			buf = append(buf, make([]byte, frameSize+stdWriterPrefixLen-bufLen+1)...)
159
-			bufLen = len(buf)
160
-		}
161
-
162
-		// While the amount of bytes read is less than the size of the frame + header, we keep reading
163
-		for nr < frameSize+stdWriterPrefixLen {
164
-			var nr2 int
165
-			nr2, err = multiplexedSource.Read(buf[nr:])
166
-			nr += nr2
167
-			if errors.Is(err, io.EOF) {
168
-				if nr < frameSize+stdWriterPrefixLen {
169
-					return written, nil
170
-				}
171
-				break
172
-			}
173
-			if err != nil {
174
-				return 0, err
175
-			}
176
-		}
177
-
178
-		// we might have an error from the source mixed up in our multiplexed
179
-		// stream. if we do, return it.
180
-		if stream == Systemerr {
181
-			return written, fmt.Errorf("error from daemon in stream: %s", string(buf[stdWriterPrefixLen:frameSize+stdWriterPrefixLen]))
182
-		}
183
-
184
-		// Write the retrieved frame (without header)
185
-		nw, err = out.Write(buf[stdWriterPrefixLen : frameSize+stdWriterPrefixLen])
186
-		if err != nil {
187
-			return 0, err
188
-		}
189
-
190
-		// If the frame has not been fully written: error
191
-		if nw != frameSize {
192
-			return 0, io.ErrShortWrite
193
-		}
194
-		written += int64(nw)
195
-
196
-		// Move the rest of the buffer to the beginning
197
-		copy(buf, buf[frameSize+stdWriterPrefixLen:])
198
-		// Move the index
199
-		nr -= frameSize + stdWriterPrefixLen
200
-	}
201
-}
202 1
deleted file mode 100644
... ...
@@ -1,292 +0,0 @@
1
-package stdcopy
2
-
3
-import (
4
-	"bytes"
5
-	"errors"
6
-	"io"
7
-	"strings"
8
-	"testing"
9
-)
10
-
11
-func TestNewStdWriter(t *testing.T) {
12
-	writer := NewStdWriter(io.Discard, Stdout)
13
-	if writer == nil {
14
-		t.Fatalf("NewStdWriter with an invalid StdType should not return nil.")
15
-	}
16
-}
17
-
18
-func TestWriteWithUninitializedStdWriter(t *testing.T) {
19
-	writer := stdWriter{
20
-		Writer: nil,
21
-		prefix: byte(Stdout),
22
-	}
23
-	n, err := writer.Write([]byte("Something here"))
24
-	if n != 0 || err == nil {
25
-		t.Fatalf("Should fail when given an incomplete or uninitialized StdWriter")
26
-	}
27
-}
28
-
29
-func TestWriteWithNilBytes(t *testing.T) {
30
-	writer := NewStdWriter(io.Discard, Stdout)
31
-	n, err := writer.Write(nil)
32
-	if err != nil {
33
-		t.Fatalf("Shouldn't have fail when given no data")
34
-	}
35
-	if n > 0 {
36
-		t.Fatalf("Write should have written 0 byte, but has written %d", n)
37
-	}
38
-}
39
-
40
-func TestWrite(t *testing.T) {
41
-	writer := NewStdWriter(io.Discard, Stdout)
42
-	data := []byte("Test StdWrite.Write")
43
-	n, err := writer.Write(data)
44
-	if err != nil {
45
-		t.Fatalf("Error while writing with StdWrite")
46
-	}
47
-	if n != len(data) {
48
-		t.Fatalf("Write should have written %d byte but wrote %d.", len(data), n)
49
-	}
50
-}
51
-
52
-type errWriter struct {
53
-	n   int
54
-	err error
55
-}
56
-
57
-func (f *errWriter) Write(buf []byte) (int, error) {
58
-	return f.n, f.err
59
-}
60
-
61
-func TestWriteWithWriterError(t *testing.T) {
62
-	expectedError := errors.New("expected")
63
-	expectedReturnedBytes := 10
64
-	writer := NewStdWriter(&errWriter{
65
-		n:   stdWriterPrefixLen + expectedReturnedBytes,
66
-		err: expectedError,
67
-	}, Stdout)
68
-	data := []byte("This won't get written, sigh")
69
-	n, err := writer.Write(data)
70
-	if !errors.Is(err, expectedError) {
71
-		t.Fatalf("Didn't get expected error.")
72
-	}
73
-	if n != expectedReturnedBytes {
74
-		t.Fatalf("Didn't get expected written bytes %d, got %d.",
75
-			expectedReturnedBytes, n)
76
-	}
77
-}
78
-
79
-func TestWriteDoesNotReturnNegativeWrittenBytes(t *testing.T) {
80
-	writer := NewStdWriter(&errWriter{n: -1}, Stdout)
81
-	data := []byte("This won't get written, sigh")
82
-	actual, _ := writer.Write(data)
83
-	if actual != 0 {
84
-		t.Fatalf("Expected returned written bytes equal to 0, got %d", actual)
85
-	}
86
-}
87
-
88
-func getSrcBuffer(stdOutBytes, stdErrBytes []byte) (*bytes.Buffer, error) {
89
-	buffer := new(bytes.Buffer)
90
-	dstOut := NewStdWriter(buffer, Stdout)
91
-	_, err := dstOut.Write(stdOutBytes)
92
-	if err != nil {
93
-		return buffer, err
94
-	}
95
-	dstErr := NewStdWriter(buffer, Stderr)
96
-	_, err = dstErr.Write(stdErrBytes)
97
-	return buffer, err
98
-}
99
-
100
-func TestStdCopyWriteAndRead(t *testing.T) {
101
-	stdOutBytes := []byte(strings.Repeat("o", startingBufLen))
102
-	stdErrBytes := []byte(strings.Repeat("e", startingBufLen))
103
-	buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes)
104
-	if err != nil {
105
-		t.Fatal(err)
106
-	}
107
-	written, err := StdCopy(io.Discard, io.Discard, buffer)
108
-	if err != nil {
109
-		t.Fatal(err)
110
-	}
111
-	expectedTotalWritten := len(stdOutBytes) + len(stdErrBytes)
112
-	if written != int64(expectedTotalWritten) {
113
-		t.Fatalf("Expected to have total of %d bytes written, got %d", expectedTotalWritten, written)
114
-	}
115
-}
116
-
117
-type customReader struct {
118
-	n            int
119
-	err          error
120
-	totalCalls   int
121
-	correctCalls int
122
-	src          *bytes.Buffer
123
-}
124
-
125
-func (f *customReader) Read(buf []byte) (int, error) {
126
-	f.totalCalls++
127
-	if f.totalCalls <= f.correctCalls {
128
-		return f.src.Read(buf)
129
-	}
130
-	return f.n, f.err
131
-}
132
-
133
-func TestStdCopyReturnsErrorReadingHeader(t *testing.T) {
134
-	expectedError := errors.New("error")
135
-	reader := &customReader{
136
-		err: expectedError,
137
-	}
138
-	written, err := StdCopy(io.Discard, io.Discard, reader)
139
-	if written != 0 {
140
-		t.Fatalf("Expected 0 bytes read, got %d", written)
141
-	}
142
-	if !errors.Is(err, expectedError) {
143
-		t.Fatalf("Didn't get expected error")
144
-	}
145
-}
146
-
147
-func TestStdCopyReturnsErrorReadingFrame(t *testing.T) {
148
-	expectedError := errors.New("error")
149
-	stdOutBytes := []byte(strings.Repeat("o", startingBufLen))
150
-	stdErrBytes := []byte(strings.Repeat("e", startingBufLen))
151
-	buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes)
152
-	if err != nil {
153
-		t.Fatal(err)
154
-	}
155
-	reader := &customReader{
156
-		correctCalls: 1,
157
-		n:            stdWriterPrefixLen + 1,
158
-		err:          expectedError,
159
-		src:          buffer,
160
-	}
161
-	written, err := StdCopy(io.Discard, io.Discard, reader)
162
-	if written != 0 {
163
-		t.Fatalf("Expected 0 bytes read, got %d", written)
164
-	}
165
-	if !errors.Is(err, expectedError) {
166
-		t.Fatalf("Didn't get expected error")
167
-	}
168
-}
169
-
170
-func TestStdCopyDetectsCorruptedFrame(t *testing.T) {
171
-	stdOutBytes := []byte(strings.Repeat("o", startingBufLen))
172
-	stdErrBytes := []byte(strings.Repeat("e", startingBufLen))
173
-	buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes)
174
-	if err != nil {
175
-		t.Fatal(err)
176
-	}
177
-	reader := &customReader{
178
-		correctCalls: 1,
179
-		n:            stdWriterPrefixLen + 1,
180
-		err:          io.EOF,
181
-		src:          buffer,
182
-	}
183
-	written, err := StdCopy(io.Discard, io.Discard, reader)
184
-	if written != startingBufLen {
185
-		t.Fatalf("Expected %d bytes read, got %d", startingBufLen, written)
186
-	}
187
-	if err != nil {
188
-		t.Fatal("Didn't get nil error")
189
-	}
190
-}
191
-
192
-func TestStdCopyWithInvalidInputHeader(t *testing.T) {
193
-	dstOut := NewStdWriter(io.Discard, Stdout)
194
-	dstErr := NewStdWriter(io.Discard, Stderr)
195
-	src := strings.NewReader("Invalid input")
196
-	_, err := StdCopy(dstOut, dstErr, src)
197
-	if err == nil {
198
-		t.Fatal("StdCopy with invalid input header should fail.")
199
-	}
200
-}
201
-
202
-func TestStdCopyWithCorruptedPrefix(t *testing.T) {
203
-	data := []byte{0x01, 0x02, 0x03}
204
-	src := bytes.NewReader(data)
205
-	written, err := StdCopy(nil, nil, src)
206
-	if err != nil {
207
-		t.Fatalf("StdCopy should not return an error with corrupted prefix.")
208
-	}
209
-	if written != 0 {
210
-		t.Fatalf("StdCopy should have written 0, but has written %d", written)
211
-	}
212
-}
213
-
214
-func TestStdCopyReturnsWriteErrors(t *testing.T) {
215
-	stdOutBytes := []byte(strings.Repeat("o", startingBufLen))
216
-	stdErrBytes := []byte(strings.Repeat("e", startingBufLen))
217
-	buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes)
218
-	if err != nil {
219
-		t.Fatal(err)
220
-	}
221
-	expectedError := errors.New("expected")
222
-
223
-	dstOut := &errWriter{err: expectedError}
224
-
225
-	written, err := StdCopy(dstOut, io.Discard, buffer)
226
-	if written != 0 {
227
-		t.Fatalf("StdCopy should have written 0, but has written %d", written)
228
-	}
229
-	if !errors.Is(err, expectedError) {
230
-		t.Fatalf("Didn't get expected error, got %v", err)
231
-	}
232
-}
233
-
234
-func TestStdCopyDetectsNotFullyWrittenFrames(t *testing.T) {
235
-	stdOutBytes := []byte(strings.Repeat("o", startingBufLen))
236
-	stdErrBytes := []byte(strings.Repeat("e", startingBufLen))
237
-	buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes)
238
-	if err != nil {
239
-		t.Fatal(err)
240
-	}
241
-	dstOut := &errWriter{n: startingBufLen - 10}
242
-
243
-	written, err := StdCopy(dstOut, io.Discard, buffer)
244
-	if written != 0 {
245
-		t.Fatalf("StdCopy should have return 0 written bytes, but returned %d", written)
246
-	}
247
-	if !errors.Is(err, io.ErrShortWrite) {
248
-		t.Fatalf("Didn't get expected io.ErrShortWrite error")
249
-	}
250
-}
251
-
252
-// TestStdCopyReturnsErrorFromSystem tests that StdCopy correctly returns an
253
-// error, when that error is muxed into the Systemerr stream.
254
-func TestStdCopyReturnsErrorFromSystem(t *testing.T) {
255
-	// write in the basic messages, just so there's some fluff in there
256
-	stdOutBytes := []byte(strings.Repeat("o", startingBufLen))
257
-	stdErrBytes := []byte(strings.Repeat("e", startingBufLen))
258
-	buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes)
259
-	if err != nil {
260
-		t.Fatal(err)
261
-	}
262
-	// add in an error message on the Systemerr stream
263
-	systemErrBytes := []byte(strings.Repeat("S", startingBufLen))
264
-	systemWriter := NewStdWriter(buffer, Systemerr)
265
-	_, err = systemWriter.Write(systemErrBytes)
266
-	if err != nil {
267
-		t.Fatal(err)
268
-	}
269
-
270
-	// now copy and demux. we should expect an error containing the string we
271
-	// wrote out
272
-	_, err = StdCopy(io.Discard, io.Discard, buffer)
273
-	if err == nil {
274
-		t.Fatal("expected error, got none")
275
-	}
276
-	if !strings.Contains(err.Error(), string(systemErrBytes)) {
277
-		t.Fatal("expected error to contain message")
278
-	}
279
-}
280
-
281
-func BenchmarkWrite(b *testing.B) {
282
-	w := NewStdWriter(io.Discard, Stdout)
283
-	data := []byte("Test line for testing stdwriter performance\n")
284
-	data = bytes.Repeat(data, 100)
285
-	b.SetBytes(int64(len(data)))
286
-	b.ResetTimer()
287
-	for i := 0; i < b.N; i++ {
288
-		if _, err := w.Write(data); err != nil {
289
-			b.Fatal(err)
290
-		}
291
-	}
292
-}
... ...
@@ -11,7 +11,7 @@ import (
11 11
 	"github.com/docker/docker/daemon/logger"
12 12
 	"github.com/docker/docker/daemon/server/backend"
13 13
 	"github.com/docker/docker/errdefs"
14
-	"github.com/moby/moby/api/stdcopy"
14
+	"github.com/moby/moby/api/pkg/stdcopy"
15 15
 	containertypes "github.com/moby/moby/api/types/container"
16 16
 	"github.com/moby/moby/api/types/events"
17 17
 	"github.com/moby/term"
... ...
@@ -10,7 +10,7 @@ import (
10 10
 
11 11
 	"github.com/docker/docker/daemon/server/backend"
12 12
 	"github.com/docker/docker/pkg/ioutils"
13
-	"github.com/moby/moby/api/stdcopy"
13
+	"github.com/moby/moby/api/pkg/stdcopy"
14 14
 	"github.com/moby/moby/api/types/container"
15 15
 )
16 16
 
... ...
@@ -10,7 +10,7 @@ import (
10 10
 	"github.com/docker/docker/daemon/server/backend"
11 11
 	"github.com/docker/docker/daemon/server/httputils"
12 12
 	"github.com/docker/docker/errdefs"
13
-	"github.com/moby/moby/api/stdcopy"
13
+	"github.com/moby/moby/api/pkg/stdcopy"
14 14
 	"github.com/moby/moby/api/types"
15 15
 	"github.com/moby/moby/api/types/container"
16 16
 	"github.com/moby/moby/api/types/versions"
... ...
@@ -14,7 +14,7 @@ import (
14 14
 	"github.com/docker/docker/testutil"
15 15
 	"github.com/docker/docker/testutil/request"
16 16
 	"github.com/docker/go-connections/sockets"
17
-	"github.com/moby/moby/api/stdcopy"
17
+	"github.com/moby/moby/api/pkg/stdcopy"
18 18
 	"github.com/moby/moby/api/types"
19 19
 	"github.com/moby/moby/api/types/container"
20 20
 	"github.com/moby/moby/client"
... ...
@@ -14,7 +14,7 @@ import (
14 14
 	"github.com/docker/docker/integration-cli/cli"
15 15
 	"github.com/docker/docker/testutil"
16 16
 	"github.com/docker/docker/testutil/request"
17
-	"github.com/moby/moby/api/stdcopy"
17
+	"github.com/moby/moby/api/pkg/stdcopy"
18 18
 	"github.com/moby/moby/api/types/container"
19 19
 	"github.com/moby/moby/client"
20 20
 	"gotest.tools/v3/assert"
... ...
@@ -10,7 +10,7 @@ import (
10 10
 	"github.com/docker/docker/testutil"
11 11
 	"github.com/docker/docker/testutil/daemon"
12 12
 	"github.com/docker/docker/testutil/fakecontext"
13
-	"github.com/moby/moby/api/stdcopy"
13
+	"github.com/moby/moby/api/pkg/stdcopy"
14 14
 	"github.com/moby/moby/api/types/build"
15 15
 	containertypes "github.com/moby/moby/api/types/container"
16 16
 	"github.com/moby/moby/client"
... ...
@@ -15,7 +15,7 @@ import (
15 15
 	"github.com/docker/docker/testutil/daemon"
16 16
 	"github.com/docker/docker/testutil/fakecontext"
17 17
 	"github.com/docker/docker/testutil/fixtures/load"
18
-	"github.com/moby/moby/api/stdcopy"
18
+	"github.com/moby/moby/api/pkg/stdcopy"
19 19
 	"github.com/moby/moby/api/types/build"
20 20
 	containertypes "github.com/moby/moby/api/types/container"
21 21
 	"gotest.tools/v3/assert"
... ...
@@ -9,7 +9,7 @@ import (
9 9
 	"github.com/docker/docker/integration/internal/container"
10 10
 	"github.com/docker/docker/testutil"
11 11
 	"github.com/docker/docker/testutil/fakecontext"
12
-	"github.com/moby/moby/api/stdcopy"
12
+	"github.com/moby/moby/api/pkg/stdcopy"
13 13
 	"github.com/moby/moby/api/types/build"
14 14
 	containertypes "github.com/moby/moby/api/types/container"
15 15
 	"gotest.tools/v3/assert"
... ...
@@ -11,7 +11,7 @@ import (
11 11
 	cerrdefs "github.com/containerd/errdefs"
12 12
 	"github.com/docker/docker/integration/internal/swarm"
13 13
 	"github.com/docker/docker/testutil"
14
-	"github.com/moby/moby/api/stdcopy"
14
+	"github.com/moby/moby/api/pkg/stdcopy"
15 15
 	"github.com/moby/moby/api/types/container"
16 16
 	"github.com/moby/moby/api/types/filters"
17 17
 	swarmtypes "github.com/moby/moby/api/types/swarm"
... ...
@@ -11,7 +11,7 @@ import (
11 11
 	"github.com/docker/docker/integration/internal/container"
12 12
 	"github.com/docker/docker/testutil"
13 13
 	"github.com/docker/docker/testutil/daemon"
14
-	"github.com/moby/moby/api/stdcopy"
14
+	"github.com/moby/moby/api/pkg/stdcopy"
15 15
 	containertypes "github.com/moby/moby/api/types/container"
16 16
 	"github.com/moby/moby/api/types/system"
17 17
 	"gotest.tools/v3/assert"
... ...
@@ -11,7 +11,7 @@ import (
11 11
 	"github.com/docker/docker/daemon/logger/local"
12 12
 	"github.com/docker/docker/integration/internal/container"
13 13
 	"github.com/docker/docker/integration/internal/termtest"
14
-	"github.com/moby/moby/api/stdcopy"
14
+	"github.com/moby/moby/api/pkg/stdcopy"
15 15
 	containertypes "github.com/moby/moby/api/types/container"
16 16
 	"gotest.tools/v3/assert"
17 17
 	is "gotest.tools/v3/assert/cmp"
... ...
@@ -15,7 +15,7 @@ import (
15 15
 	"github.com/docker/docker/testutil"
16 16
 	"github.com/docker/docker/testutil/daemon"
17 17
 	"github.com/docker/go-units"
18
-	"github.com/moby/moby/api/stdcopy"
18
+	"github.com/moby/moby/api/pkg/stdcopy"
19 19
 	containertypes "github.com/moby/moby/api/types/container"
20 20
 	"github.com/moby/moby/api/types/versions"
21 21
 	"github.com/moby/moby/client"
... ...
@@ -10,7 +10,7 @@ import (
10 10
 
11 11
 	cerrdefs "github.com/containerd/errdefs"
12 12
 	"github.com/docker/docker/integration/internal/container"
13
-	"github.com/moby/moby/api/stdcopy"
13
+	"github.com/moby/moby/api/pkg/stdcopy"
14 14
 	containertypes "github.com/moby/moby/api/types/container"
15 15
 	"github.com/moby/moby/client"
16 16
 	"gotest.tools/v3/assert"
... ...
@@ -20,7 +20,7 @@ import (
20 20
 	"github.com/docker/docker/integration/internal/process"
21 21
 	"github.com/docker/docker/testutil"
22 22
 	"github.com/docker/docker/testutil/daemon"
23
-	"github.com/moby/moby/api/stdcopy"
23
+	"github.com/moby/moby/api/pkg/stdcopy"
24 24
 	containertypes "github.com/moby/moby/api/types/container"
25 25
 	"github.com/moby/moby/api/types/image"
26 26
 	"github.com/moby/moby/api/types/mount"
... ...
@@ -8,7 +8,7 @@ import (
8 8
 	"sync"
9 9
 	"testing"
10 10
 
11
-	"github.com/moby/moby/api/stdcopy"
11
+	"github.com/moby/moby/api/pkg/stdcopy"
12 12
 	"github.com/moby/moby/api/types/container"
13 13
 	"github.com/moby/moby/api/types/network"
14 14
 	"github.com/moby/moby/client"
... ...
@@ -22,7 +22,7 @@ import (
22 22
 	"github.com/docker/docker/testutil"
23 23
 	"github.com/docker/docker/testutil/daemon"
24 24
 	"github.com/docker/go-connections/nat"
25
-	"github.com/moby/moby/api/stdcopy"
25
+	"github.com/moby/moby/api/pkg/stdcopy"
26 26
 	containertypes "github.com/moby/moby/api/types/container"
27 27
 	networktypes "github.com/moby/moby/api/types/network"
28 28
 	"github.com/moby/moby/client"
... ...
@@ -10,7 +10,7 @@ import (
10 10
 	testContainer "github.com/docker/docker/integration/internal/container"
11 11
 	"github.com/docker/docker/testutil"
12 12
 	"github.com/docker/docker/testutil/daemon"
13
-	"github.com/moby/moby/api/stdcopy"
13
+	"github.com/moby/moby/api/pkg/stdcopy"
14 14
 	"github.com/moby/moby/api/types/container"
15 15
 	"github.com/moby/moby/client"
16 16
 	"gotest.tools/v3/assert"
... ...
@@ -11,7 +11,7 @@ import (
11 11
 	cerrdefs "github.com/containerd/errdefs"
12 12
 	"github.com/docker/docker/integration/internal/swarm"
13 13
 	"github.com/docker/docker/testutil"
14
-	"github.com/moby/moby/api/stdcopy"
14
+	"github.com/moby/moby/api/pkg/stdcopy"
15 15
 	"github.com/moby/moby/api/types/container"
16 16
 	"github.com/moby/moby/api/types/filters"
17 17
 	swarmtypes "github.com/moby/moby/api/types/swarm"
18 18
new file mode 100644
... ...
@@ -0,0 +1,201 @@
0
+package stdcopy
1
+
2
+import (
3
+	"bytes"
4
+	"encoding/binary"
5
+	"errors"
6
+	"fmt"
7
+	"io"
8
+	"sync"
9
+)
10
+
11
+// StdType is the type of standard stream
12
+// a writer can multiplex to.
13
+type StdType byte
14
+
15
+const (
16
+	// Stdin represents standard input stream type.
17
+	Stdin StdType = iota
18
+	// Stdout represents standard output stream type.
19
+	Stdout
20
+	// Stderr represents standard error steam type.
21
+	Stderr
22
+	// Systemerr represents errors originating from the system that make it
23
+	// into the multiplexed stream.
24
+	Systemerr
25
+
26
+	stdWriterPrefixLen = 8
27
+	stdWriterFdIndex   = 0
28
+	stdWriterSizeIndex = 4
29
+
30
+	startingBufLen = 32*1024 + stdWriterPrefixLen + 1
31
+)
32
+
33
+var bufPool = &sync.Pool{New: func() interface{} { return bytes.NewBuffer(nil) }}
34
+
35
+// stdWriter is wrapper of io.Writer with extra customized info.
36
+type stdWriter struct {
37
+	io.Writer
38
+	prefix byte
39
+}
40
+
41
+// Write sends the buffer to the underlying writer.
42
+// It inserts the prefix header before the buffer,
43
+// so [StdCopy] knows where to multiplex the output.
44
+//
45
+// It implements [io.Writer].
46
+func (w *stdWriter) Write(p []byte) (int, error) {
47
+	if w == nil || w.Writer == nil {
48
+		return 0, errors.New("writer not instantiated")
49
+	}
50
+	if p == nil {
51
+		return 0, nil
52
+	}
53
+
54
+	header := [stdWriterPrefixLen]byte{stdWriterFdIndex: w.prefix}
55
+	binary.BigEndian.PutUint32(header[stdWriterSizeIndex:], uint32(len(p)))
56
+	buf := bufPool.Get().(*bytes.Buffer)
57
+	buf.Write(header[:])
58
+	buf.Write(p)
59
+
60
+	n, err := w.Writer.Write(buf.Bytes())
61
+	n -= stdWriterPrefixLen
62
+	if n < 0 {
63
+		n = 0
64
+	}
65
+
66
+	buf.Reset()
67
+	bufPool.Put(buf)
68
+	return n, err
69
+}
70
+
71
+// NewStdWriter instantiates a new writer using a custom format to multiplex
72
+// multiple streams to a single writer. All messages written using this writer
73
+// are encapsulated using a custom format, and written to the underlying
74
+// stream "w".
75
+//
76
+// Writers created through NewStdWriter allow for multiple write streams
77
+// (e.g. stdout ([Stdout]) and stderr ([Stderr]) to be multiplexed into a
78
+// single connection. "streamType" indicates the type of stream to encapsulate,
79
+// and can be [Stdin], [Stdout], pr [Stderr].
80
+func NewStdWriter(w io.Writer, streamType StdType) io.Writer {
81
+	return &stdWriter{
82
+		Writer: w,
83
+		prefix: byte(streamType),
84
+	}
85
+}
86
+
87
+// StdCopy is a modified version of [io.Copy] to de-multiplex messages
88
+// from "multiplexedSource" and copy them to destination streams
89
+// "destOut" and "destErr".
90
+//
91
+// StdCopy demultiplexes "multiplexedSource", assuming that it contains
92
+// two streams, previously multiplexed using a writer created with
93
+// [NewStdWriter].
94
+//
95
+// As it reads from "multiplexedSource", StdCopy writes [Stdout] messages
96
+// to "destOut", and [Stderr] message to "destErr].
97
+//
98
+// StdCopy it reads until it hits [io.EOF] on "multiplexedSource", after
99
+// which it returns a nil error. In other words: any error returned indicates
100
+// a real underlying error.
101
+//
102
+// The "written" return holds the total number of bytes written to "destOut"
103
+// and "destErr" combined.
104
+func StdCopy(destOut, destErr io.Writer, multiplexedSource io.Reader) (written int64, _ error) {
105
+	var (
106
+		buf       = make([]byte, startingBufLen)
107
+		bufLen    = len(buf)
108
+		nr, nw    int
109
+		err       error
110
+		out       io.Writer
111
+		frameSize int
112
+	)
113
+
114
+	for {
115
+		// Make sure we have at least a full header
116
+		for nr < stdWriterPrefixLen {
117
+			var nr2 int
118
+			nr2, err = multiplexedSource.Read(buf[nr:])
119
+			nr += nr2
120
+			if errors.Is(err, io.EOF) {
121
+				if nr < stdWriterPrefixLen {
122
+					return written, nil
123
+				}
124
+				break
125
+			}
126
+			if err != nil {
127
+				return 0, err
128
+			}
129
+		}
130
+
131
+		stream := StdType(buf[stdWriterFdIndex])
132
+		// Check the first byte to know where to write
133
+		switch stream {
134
+		case Stdin:
135
+			fallthrough
136
+		case Stdout:
137
+			// Write on stdout
138
+			out = destOut
139
+		case Stderr:
140
+			// Write on stderr
141
+			out = destErr
142
+		case Systemerr:
143
+			// If we're on Systemerr, we won't write anywhere.
144
+			// NB: if this code changes later, make sure you don't try to write
145
+			// to outstream if Systemerr is the stream
146
+			out = nil
147
+		default:
148
+			return 0, fmt.Errorf("unrecognized input header: %d", buf[stdWriterFdIndex])
149
+		}
150
+
151
+		// Retrieve the size of the frame
152
+		frameSize = int(binary.BigEndian.Uint32(buf[stdWriterSizeIndex : stdWriterSizeIndex+4]))
153
+
154
+		// Check if the buffer is big enough to read the frame.
155
+		// Extend it if necessary.
156
+		if frameSize+stdWriterPrefixLen > bufLen {
157
+			buf = append(buf, make([]byte, frameSize+stdWriterPrefixLen-bufLen+1)...)
158
+			bufLen = len(buf)
159
+		}
160
+
161
+		// While the amount of bytes read is less than the size of the frame + header, we keep reading
162
+		for nr < frameSize+stdWriterPrefixLen {
163
+			var nr2 int
164
+			nr2, err = multiplexedSource.Read(buf[nr:])
165
+			nr += nr2
166
+			if errors.Is(err, io.EOF) {
167
+				if nr < frameSize+stdWriterPrefixLen {
168
+					return written, nil
169
+				}
170
+				break
171
+			}
172
+			if err != nil {
173
+				return 0, err
174
+			}
175
+		}
176
+
177
+		// we might have an error from the source mixed up in our multiplexed
178
+		// stream. if we do, return it.
179
+		if stream == Systemerr {
180
+			return written, fmt.Errorf("error from daemon in stream: %s", string(buf[stdWriterPrefixLen:frameSize+stdWriterPrefixLen]))
181
+		}
182
+
183
+		// Write the retrieved frame (without header)
184
+		nw, err = out.Write(buf[stdWriterPrefixLen : frameSize+stdWriterPrefixLen])
185
+		if err != nil {
186
+			return 0, err
187
+		}
188
+
189
+		// If the frame has not been fully written: error
190
+		if nw != frameSize {
191
+			return 0, io.ErrShortWrite
192
+		}
193
+		written += int64(nw)
194
+
195
+		// Move the rest of the buffer to the beginning
196
+		copy(buf, buf[frameSize+stdWriterPrefixLen:])
197
+		// Move the index
198
+		nr -= frameSize + stdWriterPrefixLen
199
+	}
200
+}
0 201
deleted file mode 100644
... ...
@@ -1,201 +0,0 @@
1
-package stdcopy
2
-
3
-import (
4
-	"bytes"
5
-	"encoding/binary"
6
-	"errors"
7
-	"fmt"
8
-	"io"
9
-	"sync"
10
-)
11
-
12
-// StdType is the type of standard stream
13
-// a writer can multiplex to.
14
-type StdType byte
15
-
16
-const (
17
-	// Stdin represents standard input stream type.
18
-	Stdin StdType = iota
19
-	// Stdout represents standard output stream type.
20
-	Stdout
21
-	// Stderr represents standard error steam type.
22
-	Stderr
23
-	// Systemerr represents errors originating from the system that make it
24
-	// into the multiplexed stream.
25
-	Systemerr
26
-
27
-	stdWriterPrefixLen = 8
28
-	stdWriterFdIndex   = 0
29
-	stdWriterSizeIndex = 4
30
-
31
-	startingBufLen = 32*1024 + stdWriterPrefixLen + 1
32
-)
33
-
34
-var bufPool = &sync.Pool{New: func() interface{} { return bytes.NewBuffer(nil) }}
35
-
36
-// stdWriter is wrapper of io.Writer with extra customized info.
37
-type stdWriter struct {
38
-	io.Writer
39
-	prefix byte
40
-}
41
-
42
-// Write sends the buffer to the underlying writer.
43
-// It inserts the prefix header before the buffer,
44
-// so [StdCopy] knows where to multiplex the output.
45
-//
46
-// It implements [io.Writer].
47
-func (w *stdWriter) Write(p []byte) (int, error) {
48
-	if w == nil || w.Writer == nil {
49
-		return 0, errors.New("writer not instantiated")
50
-	}
51
-	if p == nil {
52
-		return 0, nil
53
-	}
54
-
55
-	header := [stdWriterPrefixLen]byte{stdWriterFdIndex: w.prefix}
56
-	binary.BigEndian.PutUint32(header[stdWriterSizeIndex:], uint32(len(p)))
57
-	buf := bufPool.Get().(*bytes.Buffer)
58
-	buf.Write(header[:])
59
-	buf.Write(p)
60
-
61
-	n, err := w.Writer.Write(buf.Bytes())
62
-	n -= stdWriterPrefixLen
63
-	if n < 0 {
64
-		n = 0
65
-	}
66
-
67
-	buf.Reset()
68
-	bufPool.Put(buf)
69
-	return n, err
70
-}
71
-
72
-// NewStdWriter instantiates a new writer using a custom format to multiplex
73
-// multiple streams to a single writer. All messages written using this writer
74
-// are encapsulated using a custom format, and written to the underlying
75
-// stream "w".
76
-//
77
-// Writers created through NewStdWriter allow for multiple write streams
78
-// (e.g. stdout ([Stdout]) and stderr ([Stderr]) to be multiplexed into a
79
-// single connection. "streamType" indicates the type of stream to encapsulate,
80
-// and can be [Stdin], [Stdout], pr [Stderr].
81
-func NewStdWriter(w io.Writer, streamType StdType) io.Writer {
82
-	return &stdWriter{
83
-		Writer: w,
84
-		prefix: byte(streamType),
85
-	}
86
-}
87
-
88
-// StdCopy is a modified version of [io.Copy] to de-multiplex messages
89
-// from "multiplexedSource" and copy them to destination streams
90
-// "destOut" and "destErr".
91
-//
92
-// StdCopy demultiplexes "multiplexedSource", assuming that it contains
93
-// two streams, previously multiplexed using a writer created with
94
-// [NewStdWriter].
95
-//
96
-// As it reads from "multiplexedSource", StdCopy writes [Stdout] messages
97
-// to "destOut", and [Stderr] message to "destErr].
98
-//
99
-// StdCopy it reads until it hits [io.EOF] on "multiplexedSource", after
100
-// which it returns a nil error. In other words: any error returned indicates
101
-// a real underlying error.
102
-//
103
-// The "written" return holds the total number of bytes written to "destOut"
104
-// and "destErr" combined.
105
-func StdCopy(destOut, destErr io.Writer, multiplexedSource io.Reader) (written int64, _ error) {
106
-	var (
107
-		buf       = make([]byte, startingBufLen)
108
-		bufLen    = len(buf)
109
-		nr, nw    int
110
-		err       error
111
-		out       io.Writer
112
-		frameSize int
113
-	)
114
-
115
-	for {
116
-		// Make sure we have at least a full header
117
-		for nr < stdWriterPrefixLen {
118
-			var nr2 int
119
-			nr2, err = multiplexedSource.Read(buf[nr:])
120
-			nr += nr2
121
-			if errors.Is(err, io.EOF) {
122
-				if nr < stdWriterPrefixLen {
123
-					return written, nil
124
-				}
125
-				break
126
-			}
127
-			if err != nil {
128
-				return 0, err
129
-			}
130
-		}
131
-
132
-		stream := StdType(buf[stdWriterFdIndex])
133
-		// Check the first byte to know where to write
134
-		switch stream {
135
-		case Stdin:
136
-			fallthrough
137
-		case Stdout:
138
-			// Write on stdout
139
-			out = destOut
140
-		case Stderr:
141
-			// Write on stderr
142
-			out = destErr
143
-		case Systemerr:
144
-			// If we're on Systemerr, we won't write anywhere.
145
-			// NB: if this code changes later, make sure you don't try to write
146
-			// to outstream if Systemerr is the stream
147
-			out = nil
148
-		default:
149
-			return 0, fmt.Errorf("unrecognized input header: %d", buf[stdWriterFdIndex])
150
-		}
151
-
152
-		// Retrieve the size of the frame
153
-		frameSize = int(binary.BigEndian.Uint32(buf[stdWriterSizeIndex : stdWriterSizeIndex+4]))
154
-
155
-		// Check if the buffer is big enough to read the frame.
156
-		// Extend it if necessary.
157
-		if frameSize+stdWriterPrefixLen > bufLen {
158
-			buf = append(buf, make([]byte, frameSize+stdWriterPrefixLen-bufLen+1)...)
159
-			bufLen = len(buf)
160
-		}
161
-
162
-		// While the amount of bytes read is less than the size of the frame + header, we keep reading
163
-		for nr < frameSize+stdWriterPrefixLen {
164
-			var nr2 int
165
-			nr2, err = multiplexedSource.Read(buf[nr:])
166
-			nr += nr2
167
-			if errors.Is(err, io.EOF) {
168
-				if nr < frameSize+stdWriterPrefixLen {
169
-					return written, nil
170
-				}
171
-				break
172
-			}
173
-			if err != nil {
174
-				return 0, err
175
-			}
176
-		}
177
-
178
-		// we might have an error from the source mixed up in our multiplexed
179
-		// stream. if we do, return it.
180
-		if stream == Systemerr {
181
-			return written, fmt.Errorf("error from daemon in stream: %s", string(buf[stdWriterPrefixLen:frameSize+stdWriterPrefixLen]))
182
-		}
183
-
184
-		// Write the retrieved frame (without header)
185
-		nw, err = out.Write(buf[stdWriterPrefixLen : frameSize+stdWriterPrefixLen])
186
-		if err != nil {
187
-			return 0, err
188
-		}
189
-
190
-		// If the frame has not been fully written: error
191
-		if nw != frameSize {
192
-			return 0, io.ErrShortWrite
193
-		}
194
-		written += int64(nw)
195
-
196
-		// Move the rest of the buffer to the beginning
197
-		copy(buf, buf[frameSize+stdWriterPrefixLen:])
198
-		// Move the index
199
-		nr -= frameSize + stdWriterPrefixLen
200
-	}
201
-}
... ...
@@ -939,7 +939,7 @@ github.com/moby/locker
939 939
 # github.com/moby/moby/api v0.0.0 => ./api
940 940
 ## explicit; go 1.23.0
941 941
 github.com/moby/moby/api
942
-github.com/moby/moby/api/stdcopy
942
+github.com/moby/moby/api/pkg/stdcopy
943 943
 github.com/moby/moby/api/types
944 944
 github.com/moby/moby/api/types/auxprogress
945 945
 github.com/moby/moby/api/types/blkiodev