The stdcopy package is used to produce and read multiplexed streams for
"attach" and "logs". It is used both by the API server (to produce), and
the client (to read / de-multiplex).
Move it to the api package, so that it can be included in the api module.
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
| 1 | 1 |
new file mode 100644 |
| ... | ... |
@@ -0,0 +1,190 @@ |
| 0 |
+package stdcopy |
|
| 1 |
+ |
|
| 2 |
+import ( |
|
| 3 |
+ "bytes" |
|
| 4 |
+ "encoding/binary" |
|
| 5 |
+ "errors" |
|
| 6 |
+ "fmt" |
|
| 7 |
+ "io" |
|
| 8 |
+ "sync" |
|
| 9 |
+) |
|
| 10 |
+ |
|
| 11 |
+// StdType is the type of standard stream |
|
| 12 |
+// a writer can multiplex to. |
|
| 13 |
+type StdType byte |
|
| 14 |
+ |
|
| 15 |
+const ( |
|
| 16 |
+ // Stdin represents standard input stream type. |
|
| 17 |
+ Stdin StdType = iota |
|
| 18 |
+ // Stdout represents standard output stream type. |
|
| 19 |
+ Stdout |
|
| 20 |
+ // Stderr represents standard error steam type. |
|
| 21 |
+ Stderr |
|
| 22 |
+ // Systemerr represents errors originating from the system that make it |
|
| 23 |
+ // into the multiplexed stream. |
|
| 24 |
+ Systemerr |
|
| 25 |
+ |
|
| 26 |
+ stdWriterPrefixLen = 8 |
|
| 27 |
+ stdWriterFdIndex = 0 |
|
| 28 |
+ stdWriterSizeIndex = 4 |
|
| 29 |
+ |
|
| 30 |
+ startingBufLen = 32*1024 + stdWriterPrefixLen + 1 |
|
| 31 |
+) |
|
| 32 |
+ |
|
| 33 |
+var bufPool = &sync.Pool{New: func() interface{} { return bytes.NewBuffer(nil) }}
|
|
| 34 |
+ |
|
| 35 |
+// stdWriter is wrapper of io.Writer with extra customized info. |
|
| 36 |
+type stdWriter struct {
|
|
| 37 |
+ io.Writer |
|
| 38 |
+ prefix byte |
|
| 39 |
+} |
|
| 40 |
+ |
|
| 41 |
+// Write sends the buffer to the underneath writer. |
|
| 42 |
+// It inserts the prefix header before the buffer, |
|
| 43 |
+// so stdcopy.StdCopy knows where to multiplex the output. |
|
| 44 |
+// It makes stdWriter to implement io.Writer. |
|
| 45 |
+func (w *stdWriter) Write(p []byte) (int, error) {
|
|
| 46 |
+ if w == nil || w.Writer == nil {
|
|
| 47 |
+ return 0, errors.New("writer not instantiated")
|
|
| 48 |
+ } |
|
| 49 |
+ if p == nil {
|
|
| 50 |
+ return 0, nil |
|
| 51 |
+ } |
|
| 52 |
+ |
|
| 53 |
+ header := [stdWriterPrefixLen]byte{stdWriterFdIndex: w.prefix}
|
|
| 54 |
+ binary.BigEndian.PutUint32(header[stdWriterSizeIndex:], uint32(len(p))) |
|
| 55 |
+ buf := bufPool.Get().(*bytes.Buffer) |
|
| 56 |
+ buf.Write(header[:]) |
|
| 57 |
+ buf.Write(p) |
|
| 58 |
+ |
|
| 59 |
+ n, err := w.Writer.Write(buf.Bytes()) |
|
| 60 |
+ n -= stdWriterPrefixLen |
|
| 61 |
+ if n < 0 {
|
|
| 62 |
+ n = 0 |
|
| 63 |
+ } |
|
| 64 |
+ |
|
| 65 |
+ buf.Reset() |
|
| 66 |
+ bufPool.Put(buf) |
|
| 67 |
+ return n, err |
|
| 68 |
+} |
|
| 69 |
+ |
|
| 70 |
+// NewStdWriter instantiates a new Writer. |
|
| 71 |
+// Everything written to it will be encapsulated using a custom format, |
|
| 72 |
+// and written to the underlying `w` stream. |
|
| 73 |
+// This allows multiple write streams (e.g. stdout and stderr) to be muxed into a single connection. |
|
| 74 |
+// `t` indicates the id of the stream to encapsulate. |
|
| 75 |
+// It can be stdcopy.Stdin, stdcopy.Stdout, stdcopy.Stderr. |
|
| 76 |
+func NewStdWriter(w io.Writer, t StdType) io.Writer {
|
|
| 77 |
+ return &stdWriter{
|
|
| 78 |
+ Writer: w, |
|
| 79 |
+ prefix: byte(t), |
|
| 80 |
+ } |
|
| 81 |
+} |
|
| 82 |
+ |
|
| 83 |
+// StdCopy is a modified version of io.Copy. |
|
| 84 |
+// |
|
| 85 |
+// StdCopy will demultiplex `src`, assuming that it contains two streams, |
|
| 86 |
+// previously multiplexed together using a StdWriter instance. |
|
| 87 |
+// As it reads from `src`, StdCopy will write to `dstout` and `dsterr`. |
|
| 88 |
+// |
|
| 89 |
+// StdCopy will read until it hits EOF on `src`. It will then return a nil error. |
|
| 90 |
+// In other words: if `err` is non nil, it indicates a real underlying error. |
|
| 91 |
+// |
|
| 92 |
+// `written` will hold the total number of bytes written to `dstout` and `dsterr`. |
|
| 93 |
+func StdCopy(dstout, dsterr io.Writer, src io.Reader) (written int64, _ error) {
|
|
| 94 |
+ var ( |
|
| 95 |
+ buf = make([]byte, startingBufLen) |
|
| 96 |
+ bufLen = len(buf) |
|
| 97 |
+ nr, nw int |
|
| 98 |
+ err error |
|
| 99 |
+ out io.Writer |
|
| 100 |
+ frameSize int |
|
| 101 |
+ ) |
|
| 102 |
+ |
|
| 103 |
+ for {
|
|
| 104 |
+ // Make sure we have at least a full header |
|
| 105 |
+ for nr < stdWriterPrefixLen {
|
|
| 106 |
+ var nr2 int |
|
| 107 |
+ nr2, err = src.Read(buf[nr:]) |
|
| 108 |
+ nr += nr2 |
|
| 109 |
+ if errors.Is(err, io.EOF) {
|
|
| 110 |
+ if nr < stdWriterPrefixLen {
|
|
| 111 |
+ return written, nil |
|
| 112 |
+ } |
|
| 113 |
+ break |
|
| 114 |
+ } |
|
| 115 |
+ if err != nil {
|
|
| 116 |
+ return 0, err |
|
| 117 |
+ } |
|
| 118 |
+ } |
|
| 119 |
+ |
|
| 120 |
+ stream := StdType(buf[stdWriterFdIndex]) |
|
| 121 |
+ // Check the first byte to know where to write |
|
| 122 |
+ switch stream {
|
|
| 123 |
+ case Stdin: |
|
| 124 |
+ fallthrough |
|
| 125 |
+ case Stdout: |
|
| 126 |
+ // Write on stdout |
|
| 127 |
+ out = dstout |
|
| 128 |
+ case Stderr: |
|
| 129 |
+ // Write on stderr |
|
| 130 |
+ out = dsterr |
|
| 131 |
+ case Systemerr: |
|
| 132 |
+ // If we're on Systemerr, we won't write anywhere. |
|
| 133 |
+ // NB: if this code changes later, make sure you don't try to write |
|
| 134 |
+ // to outstream if Systemerr is the stream |
|
| 135 |
+ out = nil |
|
| 136 |
+ default: |
|
| 137 |
+ return 0, fmt.Errorf("Unrecognized input header: %d", buf[stdWriterFdIndex])
|
|
| 138 |
+ } |
|
| 139 |
+ |
|
| 140 |
+ // Retrieve the size of the frame |
|
| 141 |
+ frameSize = int(binary.BigEndian.Uint32(buf[stdWriterSizeIndex : stdWriterSizeIndex+4])) |
|
| 142 |
+ |
|
| 143 |
+ // Check if the buffer is big enough to read the frame. |
|
| 144 |
+ // Extend it if necessary. |
|
| 145 |
+ if frameSize+stdWriterPrefixLen > bufLen {
|
|
| 146 |
+ buf = append(buf, make([]byte, frameSize+stdWriterPrefixLen-bufLen+1)...) |
|
| 147 |
+ bufLen = len(buf) |
|
| 148 |
+ } |
|
| 149 |
+ |
|
| 150 |
+ // While the amount of bytes read is less than the size of the frame + header, we keep reading |
|
| 151 |
+ for nr < frameSize+stdWriterPrefixLen {
|
|
| 152 |
+ var nr2 int |
|
| 153 |
+ nr2, err = src.Read(buf[nr:]) |
|
| 154 |
+ nr += nr2 |
|
| 155 |
+ if errors.Is(err, io.EOF) {
|
|
| 156 |
+ if nr < frameSize+stdWriterPrefixLen {
|
|
| 157 |
+ return written, nil |
|
| 158 |
+ } |
|
| 159 |
+ break |
|
| 160 |
+ } |
|
| 161 |
+ if err != nil {
|
|
| 162 |
+ return 0, err |
|
| 163 |
+ } |
|
| 164 |
+ } |
|
| 165 |
+ |
|
| 166 |
+ // we might have an error from the source mixed up in our multiplexed |
|
| 167 |
+ // stream. if we do, return it. |
|
| 168 |
+ if stream == Systemerr {
|
|
| 169 |
+ return written, fmt.Errorf("error from daemon in stream: %s", string(buf[stdWriterPrefixLen:frameSize+stdWriterPrefixLen]))
|
|
| 170 |
+ } |
|
| 171 |
+ |
|
| 172 |
+ // Write the retrieved frame (without header) |
|
| 173 |
+ nw, err = out.Write(buf[stdWriterPrefixLen : frameSize+stdWriterPrefixLen]) |
|
| 174 |
+ if err != nil {
|
|
| 175 |
+ return 0, err |
|
| 176 |
+ } |
|
| 177 |
+ |
|
| 178 |
+ // If the frame has not been fully written: error |
|
| 179 |
+ if nw != frameSize {
|
|
| 180 |
+ return 0, io.ErrShortWrite |
|
| 181 |
+ } |
|
| 182 |
+ written += int64(nw) |
|
| 183 |
+ |
|
| 184 |
+ // Move the rest of the buffer to the beginning |
|
| 185 |
+ copy(buf, buf[frameSize+stdWriterPrefixLen:]) |
|
| 186 |
+ // Move the index |
|
| 187 |
+ nr -= frameSize + stdWriterPrefixLen |
|
| 188 |
+ } |
|
| 189 |
+} |
| 0 | 190 |
new file mode 100644 |
| ... | ... |
@@ -0,0 +1,292 @@ |
| 0 |
+package stdcopy |
|
| 1 |
+ |
|
| 2 |
+import ( |
|
| 3 |
+ "bytes" |
|
| 4 |
+ "errors" |
|
| 5 |
+ "io" |
|
| 6 |
+ "strings" |
|
| 7 |
+ "testing" |
|
| 8 |
+) |
|
| 9 |
+ |
|
| 10 |
+func TestNewStdWriter(t *testing.T) {
|
|
| 11 |
+ writer := NewStdWriter(io.Discard, Stdout) |
|
| 12 |
+ if writer == nil {
|
|
| 13 |
+ t.Fatalf("NewStdWriter with an invalid StdType should not return nil.")
|
|
| 14 |
+ } |
|
| 15 |
+} |
|
| 16 |
+ |
|
| 17 |
+func TestWriteWithUninitializedStdWriter(t *testing.T) {
|
|
| 18 |
+ writer := stdWriter{
|
|
| 19 |
+ Writer: nil, |
|
| 20 |
+ prefix: byte(Stdout), |
|
| 21 |
+ } |
|
| 22 |
+ n, err := writer.Write([]byte("Something here"))
|
|
| 23 |
+ if n != 0 || err == nil {
|
|
| 24 |
+ t.Fatalf("Should fail when given an incomplete or uninitialized StdWriter")
|
|
| 25 |
+ } |
|
| 26 |
+} |
|
| 27 |
+ |
|
| 28 |
+func TestWriteWithNilBytes(t *testing.T) {
|
|
| 29 |
+ writer := NewStdWriter(io.Discard, Stdout) |
|
| 30 |
+ n, err := writer.Write(nil) |
|
| 31 |
+ if err != nil {
|
|
| 32 |
+ t.Fatalf("Shouldn't have fail when given no data")
|
|
| 33 |
+ } |
|
| 34 |
+ if n > 0 {
|
|
| 35 |
+ t.Fatalf("Write should have written 0 byte, but has written %d", n)
|
|
| 36 |
+ } |
|
| 37 |
+} |
|
| 38 |
+ |
|
| 39 |
+func TestWrite(t *testing.T) {
|
|
| 40 |
+ writer := NewStdWriter(io.Discard, Stdout) |
|
| 41 |
+ data := []byte("Test StdWrite.Write")
|
|
| 42 |
+ n, err := writer.Write(data) |
|
| 43 |
+ if err != nil {
|
|
| 44 |
+ t.Fatalf("Error while writing with StdWrite")
|
|
| 45 |
+ } |
|
| 46 |
+ if n != len(data) {
|
|
| 47 |
+ t.Fatalf("Write should have written %d byte but wrote %d.", len(data), n)
|
|
| 48 |
+ } |
|
| 49 |
+} |
|
| 50 |
+ |
|
| 51 |
+type errWriter struct {
|
|
| 52 |
+ n int |
|
| 53 |
+ err error |
|
| 54 |
+} |
|
| 55 |
+ |
|
| 56 |
+func (f *errWriter) Write(buf []byte) (int, error) {
|
|
| 57 |
+ return f.n, f.err |
|
| 58 |
+} |
|
| 59 |
+ |
|
| 60 |
+func TestWriteWithWriterError(t *testing.T) {
|
|
| 61 |
+ expectedError := errors.New("expected")
|
|
| 62 |
+ expectedReturnedBytes := 10 |
|
| 63 |
+ writer := NewStdWriter(&errWriter{
|
|
| 64 |
+ n: stdWriterPrefixLen + expectedReturnedBytes, |
|
| 65 |
+ err: expectedError, |
|
| 66 |
+ }, Stdout) |
|
| 67 |
+ data := []byte("This won't get written, sigh")
|
|
| 68 |
+ n, err := writer.Write(data) |
|
| 69 |
+ if !errors.Is(err, expectedError) {
|
|
| 70 |
+ t.Fatalf("Didn't get expected error.")
|
|
| 71 |
+ } |
|
| 72 |
+ if n != expectedReturnedBytes {
|
|
| 73 |
+ t.Fatalf("Didn't get expected written bytes %d, got %d.",
|
|
| 74 |
+ expectedReturnedBytes, n) |
|
| 75 |
+ } |
|
| 76 |
+} |
|
| 77 |
+ |
|
| 78 |
+func TestWriteDoesNotReturnNegativeWrittenBytes(t *testing.T) {
|
|
| 79 |
+ writer := NewStdWriter(&errWriter{n: -1}, Stdout)
|
|
| 80 |
+ data := []byte("This won't get written, sigh")
|
|
| 81 |
+ actual, _ := writer.Write(data) |
|
| 82 |
+ if actual != 0 {
|
|
| 83 |
+ t.Fatalf("Expected returned written bytes equal to 0, got %d", actual)
|
|
| 84 |
+ } |
|
| 85 |
+} |
|
| 86 |
+ |
|
| 87 |
+func getSrcBuffer(stdOutBytes, stdErrBytes []byte) (*bytes.Buffer, error) {
|
|
| 88 |
+ buffer := new(bytes.Buffer) |
|
| 89 |
+ dstOut := NewStdWriter(buffer, Stdout) |
|
| 90 |
+ _, err := dstOut.Write(stdOutBytes) |
|
| 91 |
+ if err != nil {
|
|
| 92 |
+ return buffer, err |
|
| 93 |
+ } |
|
| 94 |
+ dstErr := NewStdWriter(buffer, Stderr) |
|
| 95 |
+ _, err = dstErr.Write(stdErrBytes) |
|
| 96 |
+ return buffer, err |
|
| 97 |
+} |
|
| 98 |
+ |
|
| 99 |
+func TestStdCopyWriteAndRead(t *testing.T) {
|
|
| 100 |
+ stdOutBytes := []byte(strings.Repeat("o", startingBufLen))
|
|
| 101 |
+ stdErrBytes := []byte(strings.Repeat("e", startingBufLen))
|
|
| 102 |
+ buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes) |
|
| 103 |
+ if err != nil {
|
|
| 104 |
+ t.Fatal(err) |
|
| 105 |
+ } |
|
| 106 |
+ written, err := StdCopy(io.Discard, io.Discard, buffer) |
|
| 107 |
+ if err != nil {
|
|
| 108 |
+ t.Fatal(err) |
|
| 109 |
+ } |
|
| 110 |
+ expectedTotalWritten := len(stdOutBytes) + len(stdErrBytes) |
|
| 111 |
+ if written != int64(expectedTotalWritten) {
|
|
| 112 |
+ t.Fatalf("Expected to have total of %d bytes written, got %d", expectedTotalWritten, written)
|
|
| 113 |
+ } |
|
| 114 |
+} |
|
| 115 |
+ |
|
| 116 |
+type customReader struct {
|
|
| 117 |
+ n int |
|
| 118 |
+ err error |
|
| 119 |
+ totalCalls int |
|
| 120 |
+ correctCalls int |
|
| 121 |
+ src *bytes.Buffer |
|
| 122 |
+} |
|
| 123 |
+ |
|
| 124 |
+func (f *customReader) Read(buf []byte) (int, error) {
|
|
| 125 |
+ f.totalCalls++ |
|
| 126 |
+ if f.totalCalls <= f.correctCalls {
|
|
| 127 |
+ return f.src.Read(buf) |
|
| 128 |
+ } |
|
| 129 |
+ return f.n, f.err |
|
| 130 |
+} |
|
| 131 |
+ |
|
| 132 |
+func TestStdCopyReturnsErrorReadingHeader(t *testing.T) {
|
|
| 133 |
+ expectedError := errors.New("error")
|
|
| 134 |
+ reader := &customReader{
|
|
| 135 |
+ err: expectedError, |
|
| 136 |
+ } |
|
| 137 |
+ written, err := StdCopy(io.Discard, io.Discard, reader) |
|
| 138 |
+ if written != 0 {
|
|
| 139 |
+ t.Fatalf("Expected 0 bytes read, got %d", written)
|
|
| 140 |
+ } |
|
| 141 |
+ if !errors.Is(err, expectedError) {
|
|
| 142 |
+ t.Fatalf("Didn't get expected error")
|
|
| 143 |
+ } |
|
| 144 |
+} |
|
| 145 |
+ |
|
| 146 |
+func TestStdCopyReturnsErrorReadingFrame(t *testing.T) {
|
|
| 147 |
+ expectedError := errors.New("error")
|
|
| 148 |
+ stdOutBytes := []byte(strings.Repeat("o", startingBufLen))
|
|
| 149 |
+ stdErrBytes := []byte(strings.Repeat("e", startingBufLen))
|
|
| 150 |
+ buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes) |
|
| 151 |
+ if err != nil {
|
|
| 152 |
+ t.Fatal(err) |
|
| 153 |
+ } |
|
| 154 |
+ reader := &customReader{
|
|
| 155 |
+ correctCalls: 1, |
|
| 156 |
+ n: stdWriterPrefixLen + 1, |
|
| 157 |
+ err: expectedError, |
|
| 158 |
+ src: buffer, |
|
| 159 |
+ } |
|
| 160 |
+ written, err := StdCopy(io.Discard, io.Discard, reader) |
|
| 161 |
+ if written != 0 {
|
|
| 162 |
+ t.Fatalf("Expected 0 bytes read, got %d", written)
|
|
| 163 |
+ } |
|
| 164 |
+ if !errors.Is(err, expectedError) {
|
|
| 165 |
+ t.Fatalf("Didn't get expected error")
|
|
| 166 |
+ } |
|
| 167 |
+} |
|
| 168 |
+ |
|
| 169 |
+func TestStdCopyDetectsCorruptedFrame(t *testing.T) {
|
|
| 170 |
+ stdOutBytes := []byte(strings.Repeat("o", startingBufLen))
|
|
| 171 |
+ stdErrBytes := []byte(strings.Repeat("e", startingBufLen))
|
|
| 172 |
+ buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes) |
|
| 173 |
+ if err != nil {
|
|
| 174 |
+ t.Fatal(err) |
|
| 175 |
+ } |
|
| 176 |
+ reader := &customReader{
|
|
| 177 |
+ correctCalls: 1, |
|
| 178 |
+ n: stdWriterPrefixLen + 1, |
|
| 179 |
+ err: io.EOF, |
|
| 180 |
+ src: buffer, |
|
| 181 |
+ } |
|
| 182 |
+ written, err := StdCopy(io.Discard, io.Discard, reader) |
|
| 183 |
+ if written != startingBufLen {
|
|
| 184 |
+ t.Fatalf("Expected %d bytes read, got %d", startingBufLen, written)
|
|
| 185 |
+ } |
|
| 186 |
+ if err != nil {
|
|
| 187 |
+ t.Fatal("Didn't get nil error")
|
|
| 188 |
+ } |
|
| 189 |
+} |
|
| 190 |
+ |
|
| 191 |
+func TestStdCopyWithInvalidInputHeader(t *testing.T) {
|
|
| 192 |
+ dstOut := NewStdWriter(io.Discard, Stdout) |
|
| 193 |
+ dstErr := NewStdWriter(io.Discard, Stderr) |
|
| 194 |
+ src := strings.NewReader("Invalid input")
|
|
| 195 |
+ _, err := StdCopy(dstOut, dstErr, src) |
|
| 196 |
+ if err == nil {
|
|
| 197 |
+ t.Fatal("StdCopy with invalid input header should fail.")
|
|
| 198 |
+ } |
|
| 199 |
+} |
|
| 200 |
+ |
|
| 201 |
+func TestStdCopyWithCorruptedPrefix(t *testing.T) {
|
|
| 202 |
+ data := []byte{0x01, 0x02, 0x03}
|
|
| 203 |
+ src := bytes.NewReader(data) |
|
| 204 |
+ written, err := StdCopy(nil, nil, src) |
|
| 205 |
+ if err != nil {
|
|
| 206 |
+ t.Fatalf("StdCopy should not return an error with corrupted prefix.")
|
|
| 207 |
+ } |
|
| 208 |
+ if written != 0 {
|
|
| 209 |
+ t.Fatalf("StdCopy should have written 0, but has written %d", written)
|
|
| 210 |
+ } |
|
| 211 |
+} |
|
| 212 |
+ |
|
| 213 |
+func TestStdCopyReturnsWriteErrors(t *testing.T) {
|
|
| 214 |
+ stdOutBytes := []byte(strings.Repeat("o", startingBufLen))
|
|
| 215 |
+ stdErrBytes := []byte(strings.Repeat("e", startingBufLen))
|
|
| 216 |
+ buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes) |
|
| 217 |
+ if err != nil {
|
|
| 218 |
+ t.Fatal(err) |
|
| 219 |
+ } |
|
| 220 |
+ expectedError := errors.New("expected")
|
|
| 221 |
+ |
|
| 222 |
+ dstOut := &errWriter{err: expectedError}
|
|
| 223 |
+ |
|
| 224 |
+ written, err := StdCopy(dstOut, io.Discard, buffer) |
|
| 225 |
+ if written != 0 {
|
|
| 226 |
+ t.Fatalf("StdCopy should have written 0, but has written %d", written)
|
|
| 227 |
+ } |
|
| 228 |
+ if !errors.Is(err, expectedError) {
|
|
| 229 |
+ t.Fatalf("Didn't get expected error, got %v", err)
|
|
| 230 |
+ } |
|
| 231 |
+} |
|
| 232 |
+ |
|
| 233 |
+func TestStdCopyDetectsNotFullyWrittenFrames(t *testing.T) {
|
|
| 234 |
+ stdOutBytes := []byte(strings.Repeat("o", startingBufLen))
|
|
| 235 |
+ stdErrBytes := []byte(strings.Repeat("e", startingBufLen))
|
|
| 236 |
+ buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes) |
|
| 237 |
+ if err != nil {
|
|
| 238 |
+ t.Fatal(err) |
|
| 239 |
+ } |
|
| 240 |
+ dstOut := &errWriter{n: startingBufLen - 10}
|
|
| 241 |
+ |
|
| 242 |
+ written, err := StdCopy(dstOut, io.Discard, buffer) |
|
| 243 |
+ if written != 0 {
|
|
| 244 |
+ t.Fatalf("StdCopy should have return 0 written bytes, but returned %d", written)
|
|
| 245 |
+ } |
|
| 246 |
+ if !errors.Is(err, io.ErrShortWrite) {
|
|
| 247 |
+ t.Fatalf("Didn't get expected io.ErrShortWrite error")
|
|
| 248 |
+ } |
|
| 249 |
+} |
|
| 250 |
+ |
|
| 251 |
+// TestStdCopyReturnsErrorFromSystem tests that StdCopy correctly returns an |
|
| 252 |
+// error, when that error is muxed into the Systemerr stream. |
|
| 253 |
+func TestStdCopyReturnsErrorFromSystem(t *testing.T) {
|
|
| 254 |
+ // write in the basic messages, just so there's some fluff in there |
|
| 255 |
+ stdOutBytes := []byte(strings.Repeat("o", startingBufLen))
|
|
| 256 |
+ stdErrBytes := []byte(strings.Repeat("e", startingBufLen))
|
|
| 257 |
+ buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes) |
|
| 258 |
+ if err != nil {
|
|
| 259 |
+ t.Fatal(err) |
|
| 260 |
+ } |
|
| 261 |
+ // add in an error message on the Systemerr stream |
|
| 262 |
+ systemErrBytes := []byte(strings.Repeat("S", startingBufLen))
|
|
| 263 |
+ systemWriter := NewStdWriter(buffer, Systemerr) |
|
| 264 |
+ _, err = systemWriter.Write(systemErrBytes) |
|
| 265 |
+ if err != nil {
|
|
| 266 |
+ t.Fatal(err) |
|
| 267 |
+ } |
|
| 268 |
+ |
|
| 269 |
+ // now copy and demux. we should expect an error containing the string we |
|
| 270 |
+ // wrote out |
|
| 271 |
+ _, err = StdCopy(io.Discard, io.Discard, buffer) |
|
| 272 |
+ if err == nil {
|
|
| 273 |
+ t.Fatal("expected error, got none")
|
|
| 274 |
+ } |
|
| 275 |
+ if !strings.Contains(err.Error(), string(systemErrBytes)) {
|
|
| 276 |
+ t.Fatal("expected error to contain message")
|
|
| 277 |
+ } |
|
| 278 |
+} |
|
| 279 |
+ |
|
| 280 |
+func BenchmarkWrite(b *testing.B) {
|
|
| 281 |
+ w := NewStdWriter(io.Discard, Stdout) |
|
| 282 |
+ data := []byte("Test line for testing stdwriter performance\n")
|
|
| 283 |
+ data = bytes.Repeat(data, 100) |
|
| 284 |
+ b.SetBytes(int64(len(data))) |
|
| 285 |
+ b.ResetTimer() |
|
| 286 |
+ for i := 0; i < b.N; i++ {
|
|
| 287 |
+ if _, err := w.Write(data); err != nil {
|
|
| 288 |
+ b.Fatal(err) |
|
| 289 |
+ } |
|
| 290 |
+ } |
|
| 291 |
+} |
| ... | ... |
@@ -31,7 +31,7 @@ import ( |
| 31 | 31 |
// SIZE1, SIZE2, SIZE3, and SIZE4 are four bytes of uint32 encoded as big endian. |
| 32 | 32 |
// This is the size of OUTPUT. |
| 33 | 33 |
// |
| 34 |
-// You can use github.com/docker/docker/pkg/stdcopy.StdCopy to demultiplex this |
|
| 34 |
+// You can use github.com/moby/moby/api/stdcopy.StdCopy to demultiplex this |
|
| 35 | 35 |
// stream. |
| 36 | 36 |
func (cli *Client) ContainerAttach(ctx context.Context, containerID string, options container.AttachOptions) (types.HijackedResponse, error) {
|
| 37 | 37 |
containerID, err := trimID("container", containerID)
|
| ... | ... |
@@ -68,7 +68,7 @@ func (cli *Client) ContainerExecStart(ctx context.Context, execID string, config |
| 68 | 68 |
// - If the container is *not* using a TTY, streams for stdout and stderr are |
| 69 | 69 |
// multiplexed. |
| 70 | 70 |
// |
| 71 |
-// You can use [github.com/docker/docker/pkg/stdcopy.StdCopy] to demultiplex this |
|
| 71 |
+// You can use [github.com/moby/moby/api/stdcopy.StdCopy] to demultiplex this |
|
| 72 | 72 |
// stream. Refer to [Client.ContainerAttach] for details about the multiplexed |
| 73 | 73 |
// stream. |
| 74 | 74 |
func (cli *Client) ContainerExecAttach(ctx context.Context, execID string, config container.ExecAttachOptions) (types.HijackedResponse, error) {
|
| ... | ... |
@@ -31,7 +31,7 @@ import ( |
| 31 | 31 |
// SIZE1, SIZE2, SIZE3, and SIZE4 are four bytes of uint32 encoded as big endian. |
| 32 | 32 |
// This is the size of OUTPUT. |
| 33 | 33 |
// |
| 34 |
-// You can use github.com/docker/docker/pkg/stdcopy.StdCopy to demultiplex this |
|
| 34 |
+// You can use github.com/moby/moby/api/stdcopy.StdCopy to demultiplex this |
|
| 35 | 35 |
// stream. |
| 36 | 36 |
func (cli *Client) ContainerLogs(ctx context.Context, containerID string, options container.LogsOptions) (io.ReadCloser, error) {
|
| 37 | 37 |
containerID, err := trimID("container", containerID)
|
| ... | ... |
@@ -10,7 +10,7 @@ import ( |
| 10 | 10 |
"github.com/docker/docker/daemon/internal/stream" |
| 11 | 11 |
"github.com/docker/docker/daemon/logger" |
| 12 | 12 |
"github.com/docker/docker/errdefs" |
| 13 |
- "github.com/docker/docker/pkg/stdcopy" |
|
| 13 |
+ "github.com/moby/moby/api/stdcopy" |
|
| 14 | 14 |
"github.com/moby/moby/api/types/backend" |
| 15 | 15 |
containertypes "github.com/moby/moby/api/types/container" |
| 16 | 16 |
"github.com/moby/moby/api/types/events" |
| ... | ... |
@@ -10,7 +10,7 @@ import ( |
| 10 | 10 |
|
| 11 | 11 |
"github.com/docker/docker/pkg/ioutils" |
| 12 | 12 |
"github.com/docker/docker/pkg/jsonmessage" |
| 13 |
- "github.com/docker/docker/pkg/stdcopy" |
|
| 13 |
+ "github.com/moby/moby/api/stdcopy" |
|
| 14 | 14 |
"github.com/moby/moby/api/types/backend" |
| 15 | 15 |
"github.com/moby/moby/api/types/container" |
| 16 | 16 |
) |
| ... | ... |
@@ -9,7 +9,7 @@ import ( |
| 9 | 9 |
"github.com/containerd/log" |
| 10 | 10 |
"github.com/docker/docker/daemon/server/httputils" |
| 11 | 11 |
"github.com/docker/docker/errdefs" |
| 12 |
- "github.com/docker/docker/pkg/stdcopy" |
|
| 12 |
+ "github.com/moby/moby/api/stdcopy" |
|
| 13 | 13 |
"github.com/moby/moby/api/types" |
| 14 | 14 |
"github.com/moby/moby/api/types/backend" |
| 15 | 15 |
"github.com/moby/moby/api/types/container" |
| ... | ... |
@@ -256,6 +256,15 @@ Function Validate-PkgImports($headCommit, $upstreamCommit) {
|
| 256 | 256 |
$files=@(); $files = Invoke-Expression "git diff $upstreamCommit...$headCommit --diff-filter=ACMR --name-only -- `'pkg\*.go`'" |
| 257 | 257 |
$badFiles=@(); $files | ForEach-Object{
|
| 258 | 258 |
$file=$_ |
| 259 |
+ |
|
| 260 |
+ if ($file -like "pkg\stdcopy\*") {
|
|
| 261 |
+ # Temporarily allow pkg/stdcopy to import "github.com/moby/moby/api/stdcopy", |
|
| 262 |
+ # because it's an alias for backward-compatibility. |
|
| 263 |
+ # |
|
| 264 |
+ # TODO(thaJeztah): remove once "github.com/docker/docker/pkg/stdcopy" is removed. |
|
| 265 |
+ return |
|
| 266 |
+ } |
|
| 267 |
+ |
|
| 259 | 268 |
# For the current changed file, get its list of dependencies, sorted and uniqued. |
| 260 | 269 |
$imports = Invoke-Expression "go list -e -f `'{{ .Deps }}`' $file"
|
| 261 | 270 |
if ($LASTEXITCODE -ne 0) { Throw "Failed go list for dependencies on $file" }
|
| ... | ... |
@@ -10,6 +10,16 @@ unset IFS |
| 10 | 10 |
|
| 11 | 11 |
badFiles=() |
| 12 | 12 |
for f in "${files[@]}"; do
|
| 13 |
+ case "$f" in |
|
| 14 |
+ pkg/stdcopy/*) |
|
| 15 |
+ # Temporarily allow pkg/stdcopy to import "github.com/moby/moby/api/stdcopy", |
|
| 16 |
+ # because it's an alias for backward-compatibility. |
|
| 17 |
+ # |
|
| 18 |
+ # TODO(thaJeztah): remove once "github.com/docker/docker/pkg/stdcopy" is removed. |
|
| 19 |
+ continue |
|
| 20 |
+ ;; |
|
| 21 |
+ esac |
|
| 22 |
+ |
|
| 13 | 23 |
IFS=$'\n' |
| 14 | 24 |
badImports=($(go list -e -f '{{ join .Deps "\n" }}' "$f" | sort -u \
|
| 15 | 25 |
| grep -vE '^github.com/docker/docker/pkg/' \ |
| ... | ... |
@@ -11,10 +11,10 @@ import ( |
| 11 | 11 |
"time" |
| 12 | 12 |
|
| 13 | 13 |
"github.com/docker/docker/integration-cli/cli" |
| 14 |
- "github.com/docker/docker/pkg/stdcopy" |
|
| 15 | 14 |
"github.com/docker/docker/testutil" |
| 16 | 15 |
"github.com/docker/docker/testutil/request" |
| 17 | 16 |
"github.com/docker/go-connections/sockets" |
| 17 |
+ "github.com/moby/moby/api/stdcopy" |
|
| 18 | 18 |
"github.com/moby/moby/api/types" |
| 19 | 19 |
"github.com/moby/moby/api/types/container" |
| 20 | 20 |
"github.com/moby/moby/client" |
| ... | ... |
@@ -12,9 +12,9 @@ import ( |
| 12 | 12 |
"time" |
| 13 | 13 |
|
| 14 | 14 |
"github.com/docker/docker/integration-cli/cli" |
| 15 |
- "github.com/docker/docker/pkg/stdcopy" |
|
| 16 | 15 |
"github.com/docker/docker/testutil" |
| 17 | 16 |
"github.com/docker/docker/testutil/request" |
| 17 |
+ "github.com/moby/moby/api/stdcopy" |
|
| 18 | 18 |
"github.com/moby/moby/api/types/container" |
| 19 | 19 |
"github.com/moby/moby/client" |
| 20 | 20 |
"gotest.tools/v3/assert" |
| ... | ... |
@@ -7,10 +7,10 @@ import ( |
| 7 | 7 |
"testing" |
| 8 | 8 |
|
| 9 | 9 |
"github.com/docker/docker/integration/internal/container" |
| 10 |
- "github.com/docker/docker/pkg/stdcopy" |
|
| 11 | 10 |
"github.com/docker/docker/testutil" |
| 12 | 11 |
"github.com/docker/docker/testutil/daemon" |
| 13 | 12 |
"github.com/docker/docker/testutil/fakecontext" |
| 13 |
+ "github.com/moby/moby/api/stdcopy" |
|
| 14 | 14 |
"github.com/moby/moby/api/types/build" |
| 15 | 15 |
containertypes "github.com/moby/moby/api/types/container" |
| 16 | 16 |
dclient "github.com/moby/moby/client" |
| ... | ... |
@@ -11,11 +11,11 @@ import ( |
| 11 | 11 |
|
| 12 | 12 |
"github.com/docker/docker/integration/internal/container" |
| 13 | 13 |
"github.com/docker/docker/pkg/jsonmessage" |
| 14 |
- "github.com/docker/docker/pkg/stdcopy" |
|
| 15 | 14 |
"github.com/docker/docker/testutil" |
| 16 | 15 |
"github.com/docker/docker/testutil/daemon" |
| 17 | 16 |
"github.com/docker/docker/testutil/fakecontext" |
| 18 | 17 |
"github.com/docker/docker/testutil/fixtures/load" |
| 18 |
+ "github.com/moby/moby/api/stdcopy" |
|
| 19 | 19 |
"github.com/moby/moby/api/types/build" |
| 20 | 20 |
containertypes "github.com/moby/moby/api/types/container" |
| 21 | 21 |
"gotest.tools/v3/assert" |
| ... | ... |
@@ -7,12 +7,11 @@ import ( |
| 7 | 7 |
"testing" |
| 8 | 8 |
|
| 9 | 9 |
"github.com/docker/docker/integration/internal/container" |
| 10 |
- "github.com/docker/docker/pkg/stdcopy" |
|
| 11 | 10 |
"github.com/docker/docker/testutil" |
| 12 | 11 |
"github.com/docker/docker/testutil/fakecontext" |
| 12 |
+ "github.com/moby/moby/api/stdcopy" |
|
| 13 | 13 |
"github.com/moby/moby/api/types/build" |
| 14 | 14 |
containertypes "github.com/moby/moby/api/types/container" |
| 15 |
- |
|
| 16 | 15 |
"gotest.tools/v3/assert" |
| 17 | 16 |
"gotest.tools/v3/poll" |
| 18 | 17 |
) |
| ... | ... |
@@ -10,8 +10,8 @@ import ( |
| 10 | 10 |
|
| 11 | 11 |
cerrdefs "github.com/containerd/errdefs" |
| 12 | 12 |
"github.com/docker/docker/integration/internal/swarm" |
| 13 |
- "github.com/docker/docker/pkg/stdcopy" |
|
| 14 | 13 |
"github.com/docker/docker/testutil" |
| 14 |
+ "github.com/moby/moby/api/stdcopy" |
|
| 15 | 15 |
"github.com/moby/moby/api/types/container" |
| 16 | 16 |
"github.com/moby/moby/api/types/filters" |
| 17 | 17 |
swarmtypes "github.com/moby/moby/api/types/swarm" |
| ... | ... |
@@ -9,9 +9,9 @@ import ( |
| 9 | 9 |
"testing" |
| 10 | 10 |
|
| 11 | 11 |
"github.com/docker/docker/integration/internal/container" |
| 12 |
- "github.com/docker/docker/pkg/stdcopy" |
|
| 13 | 12 |
"github.com/docker/docker/testutil" |
| 14 | 13 |
"github.com/docker/docker/testutil/daemon" |
| 14 |
+ "github.com/moby/moby/api/stdcopy" |
|
| 15 | 15 |
containertypes "github.com/moby/moby/api/types/container" |
| 16 | 16 |
"github.com/moby/moby/api/types/system" |
| 17 | 17 |
"gotest.tools/v3/assert" |
| ... | ... |
@@ -179,7 +179,7 @@ func TestCDIInfoDiscoveredDevices(t *testing.T) {
|
| 179 | 179 |
cdiDir := testutil.TempDir(t) |
| 180 | 180 |
specFilePath := filepath.Join(cdiDir, "test-device.json") |
| 181 | 181 |
|
| 182 |
- err := os.WriteFile(specFilePath, []byte(specContent), 0644) |
|
| 182 |
+ err := os.WriteFile(specFilePath, []byte(specContent), 0o644) |
|
| 183 | 183 |
assert.NilError(t, err, "Failed to write sample CDI spec file") |
| 184 | 184 |
|
| 185 | 185 |
d := daemon.New(t) |
| ... | ... |
@@ -11,7 +11,7 @@ import ( |
| 11 | 11 |
"github.com/docker/docker/daemon/logger/local" |
| 12 | 12 |
"github.com/docker/docker/integration/internal/container" |
| 13 | 13 |
"github.com/docker/docker/integration/internal/termtest" |
| 14 |
- "github.com/docker/docker/pkg/stdcopy" |
|
| 14 |
+ "github.com/moby/moby/api/stdcopy" |
|
| 15 | 15 |
containertypes "github.com/moby/moby/api/types/container" |
| 16 | 16 |
"gotest.tools/v3/assert" |
| 17 | 17 |
is "gotest.tools/v3/assert/cmp" |
| ... | ... |
@@ -11,9 +11,9 @@ import ( |
| 11 | 11 |
|
| 12 | 12 |
"github.com/docker/docker/integration/internal/container" |
| 13 | 13 |
net "github.com/docker/docker/integration/internal/network" |
| 14 |
- "github.com/docker/docker/pkg/stdcopy" |
|
| 15 | 14 |
"github.com/docker/docker/testutil" |
| 16 | 15 |
"github.com/docker/docker/testutil/daemon" |
| 16 |
+ "github.com/moby/moby/api/stdcopy" |
|
| 17 | 17 |
containertypes "github.com/moby/moby/api/types/container" |
| 18 | 18 |
"github.com/moby/moby/api/types/versions" |
| 19 | 19 |
"github.com/moby/moby/client" |
| ... | ... |
@@ -10,7 +10,7 @@ import ( |
| 10 | 10 |
|
| 11 | 11 |
cerrdefs "github.com/containerd/errdefs" |
| 12 | 12 |
"github.com/docker/docker/integration/internal/container" |
| 13 |
- "github.com/docker/docker/pkg/stdcopy" |
|
| 13 |
+ "github.com/moby/moby/api/stdcopy" |
|
| 14 | 14 |
containertypes "github.com/moby/moby/api/types/container" |
| 15 | 15 |
"github.com/moby/moby/client" |
| 16 | 16 |
"gotest.tools/v3/assert" |
| ... | ... |
@@ -18,9 +18,9 @@ import ( |
| 18 | 18 |
"github.com/docker/docker/daemon/config" |
| 19 | 19 |
"github.com/docker/docker/integration/internal/container" |
| 20 | 20 |
"github.com/docker/docker/integration/internal/process" |
| 21 |
- "github.com/docker/docker/pkg/stdcopy" |
|
| 22 | 21 |
"github.com/docker/docker/testutil" |
| 23 | 22 |
"github.com/docker/docker/testutil/daemon" |
| 23 |
+ "github.com/moby/moby/api/stdcopy" |
|
| 24 | 24 |
containertypes "github.com/moby/moby/api/types/container" |
| 25 | 25 |
"github.com/moby/moby/api/types/image" |
| 26 | 26 |
"github.com/moby/moby/api/types/mount" |
| ... | ... |
@@ -19,10 +19,10 @@ import ( |
| 19 | 19 |
"github.com/docker/docker/integration/internal/container" |
| 20 | 20 |
"github.com/docker/docker/integration/internal/network" |
| 21 | 21 |
"github.com/docker/docker/internal/testutils/networking" |
| 22 |
- "github.com/docker/docker/pkg/stdcopy" |
|
| 23 | 22 |
"github.com/docker/docker/testutil" |
| 24 | 23 |
"github.com/docker/docker/testutil/daemon" |
| 25 | 24 |
"github.com/docker/go-connections/nat" |
| 25 |
+ "github.com/moby/moby/api/stdcopy" |
|
| 26 | 26 |
containertypes "github.com/moby/moby/api/types/container" |
| 27 | 27 |
networktypes "github.com/moby/moby/api/types/network" |
| 28 | 28 |
"github.com/moby/moby/client" |
| ... | ... |
@@ -8,9 +8,9 @@ import ( |
| 8 | 8 |
"time" |
| 9 | 9 |
|
| 10 | 10 |
testContainer "github.com/docker/docker/integration/internal/container" |
| 11 |
- "github.com/docker/docker/pkg/stdcopy" |
|
| 12 | 11 |
"github.com/docker/docker/testutil" |
| 13 | 12 |
"github.com/docker/docker/testutil/daemon" |
| 13 |
+ "github.com/moby/moby/api/stdcopy" |
|
| 14 | 14 |
"github.com/moby/moby/api/types" |
| 15 | 15 |
"github.com/moby/moby/api/types/container" |
| 16 | 16 |
"gotest.tools/v3/assert" |
| ... | ... |
@@ -10,8 +10,8 @@ import ( |
| 10 | 10 |
|
| 11 | 11 |
cerrdefs "github.com/containerd/errdefs" |
| 12 | 12 |
"github.com/docker/docker/integration/internal/swarm" |
| 13 |
- "github.com/docker/docker/pkg/stdcopy" |
|
| 14 | 13 |
"github.com/docker/docker/testutil" |
| 14 |
+ "github.com/moby/moby/api/stdcopy" |
|
| 15 | 15 |
"github.com/moby/moby/api/types/container" |
| 16 | 16 |
"github.com/moby/moby/api/types/filters" |
| 17 | 17 |
swarmtypes "github.com/moby/moby/api/types/swarm" |
| 18 | 18 |
deleted file mode 100644 |
| ... | ... |
@@ -1,190 +0,0 @@ |
| 1 |
-package stdcopy |
|
| 2 |
- |
|
| 3 |
-import ( |
|
| 4 |
- "bytes" |
|
| 5 |
- "encoding/binary" |
|
| 6 |
- "errors" |
|
| 7 |
- "fmt" |
|
| 8 |
- "io" |
|
| 9 |
- "sync" |
|
| 10 |
-) |
|
| 11 |
- |
|
| 12 |
-// StdType is the type of standard stream |
|
| 13 |
-// a writer can multiplex to. |
|
| 14 |
-type StdType byte |
|
| 15 |
- |
|
| 16 |
-const ( |
|
| 17 |
- // Stdin represents standard input stream type. |
|
| 18 |
- Stdin StdType = iota |
|
| 19 |
- // Stdout represents standard output stream type. |
|
| 20 |
- Stdout |
|
| 21 |
- // Stderr represents standard error steam type. |
|
| 22 |
- Stderr |
|
| 23 |
- // Systemerr represents errors originating from the system that make it |
|
| 24 |
- // into the multiplexed stream. |
|
| 25 |
- Systemerr |
|
| 26 |
- |
|
| 27 |
- stdWriterPrefixLen = 8 |
|
| 28 |
- stdWriterFdIndex = 0 |
|
| 29 |
- stdWriterSizeIndex = 4 |
|
| 30 |
- |
|
| 31 |
- startingBufLen = 32*1024 + stdWriterPrefixLen + 1 |
|
| 32 |
-) |
|
| 33 |
- |
|
| 34 |
-var bufPool = &sync.Pool{New: func() interface{} { return bytes.NewBuffer(nil) }}
|
|
| 35 |
- |
|
| 36 |
-// stdWriter is wrapper of io.Writer with extra customized info. |
|
| 37 |
-type stdWriter struct {
|
|
| 38 |
- io.Writer |
|
| 39 |
- prefix byte |
|
| 40 |
-} |
|
| 41 |
- |
|
| 42 |
-// Write sends the buffer to the underneath writer. |
|
| 43 |
-// It inserts the prefix header before the buffer, |
|
| 44 |
-// so stdcopy.StdCopy knows where to multiplex the output. |
|
| 45 |
-// It makes stdWriter to implement io.Writer. |
|
| 46 |
-func (w *stdWriter) Write(p []byte) (int, error) {
|
|
| 47 |
- if w == nil || w.Writer == nil {
|
|
| 48 |
- return 0, errors.New("writer not instantiated")
|
|
| 49 |
- } |
|
| 50 |
- if p == nil {
|
|
| 51 |
- return 0, nil |
|
| 52 |
- } |
|
| 53 |
- |
|
| 54 |
- header := [stdWriterPrefixLen]byte{stdWriterFdIndex: w.prefix}
|
|
| 55 |
- binary.BigEndian.PutUint32(header[stdWriterSizeIndex:], uint32(len(p))) |
|
| 56 |
- buf := bufPool.Get().(*bytes.Buffer) |
|
| 57 |
- buf.Write(header[:]) |
|
| 58 |
- buf.Write(p) |
|
| 59 |
- |
|
| 60 |
- n, err := w.Writer.Write(buf.Bytes()) |
|
| 61 |
- n -= stdWriterPrefixLen |
|
| 62 |
- if n < 0 {
|
|
| 63 |
- n = 0 |
|
| 64 |
- } |
|
| 65 |
- |
|
| 66 |
- buf.Reset() |
|
| 67 |
- bufPool.Put(buf) |
|
| 68 |
- return n, err |
|
| 69 |
-} |
|
| 70 |
- |
|
| 71 |
-// NewStdWriter instantiates a new Writer. |
|
| 72 |
-// Everything written to it will be encapsulated using a custom format, |
|
| 73 |
-// and written to the underlying `w` stream. |
|
| 74 |
-// This allows multiple write streams (e.g. stdout and stderr) to be muxed into a single connection. |
|
| 75 |
-// `t` indicates the id of the stream to encapsulate. |
|
| 76 |
-// It can be stdcopy.Stdin, stdcopy.Stdout, stdcopy.Stderr. |
|
| 77 |
-func NewStdWriter(w io.Writer, t StdType) io.Writer {
|
|
| 78 |
- return &stdWriter{
|
|
| 79 |
- Writer: w, |
|
| 80 |
- prefix: byte(t), |
|
| 81 |
- } |
|
| 82 |
-} |
|
| 83 |
- |
|
| 84 |
-// StdCopy is a modified version of io.Copy. |
|
| 85 |
-// |
|
| 86 |
-// StdCopy will demultiplex `src`, assuming that it contains two streams, |
|
| 87 |
-// previously multiplexed together using a StdWriter instance. |
|
| 88 |
-// As it reads from `src`, StdCopy will write to `dstout` and `dsterr`. |
|
| 89 |
-// |
|
| 90 |
-// StdCopy will read until it hits EOF on `src`. It will then return a nil error. |
|
| 91 |
-// In other words: if `err` is non nil, it indicates a real underlying error. |
|
| 92 |
-// |
|
| 93 |
-// `written` will hold the total number of bytes written to `dstout` and `dsterr`. |
|
| 94 |
-func StdCopy(dstout, dsterr io.Writer, src io.Reader) (written int64, _ error) {
|
|
| 95 |
- var ( |
|
| 96 |
- buf = make([]byte, startingBufLen) |
|
| 97 |
- bufLen = len(buf) |
|
| 98 |
- nr, nw int |
|
| 99 |
- err error |
|
| 100 |
- out io.Writer |
|
| 101 |
- frameSize int |
|
| 102 |
- ) |
|
| 103 |
- |
|
| 104 |
- for {
|
|
| 105 |
- // Make sure we have at least a full header |
|
| 106 |
- for nr < stdWriterPrefixLen {
|
|
| 107 |
- var nr2 int |
|
| 108 |
- nr2, err = src.Read(buf[nr:]) |
|
| 109 |
- nr += nr2 |
|
| 110 |
- if errors.Is(err, io.EOF) {
|
|
| 111 |
- if nr < stdWriterPrefixLen {
|
|
| 112 |
- return written, nil |
|
| 113 |
- } |
|
| 114 |
- break |
|
| 115 |
- } |
|
| 116 |
- if err != nil {
|
|
| 117 |
- return 0, err |
|
| 118 |
- } |
|
| 119 |
- } |
|
| 120 |
- |
|
| 121 |
- stream := StdType(buf[stdWriterFdIndex]) |
|
| 122 |
- // Check the first byte to know where to write |
|
| 123 |
- switch stream {
|
|
| 124 |
- case Stdin: |
|
| 125 |
- fallthrough |
|
| 126 |
- case Stdout: |
|
| 127 |
- // Write on stdout |
|
| 128 |
- out = dstout |
|
| 129 |
- case Stderr: |
|
| 130 |
- // Write on stderr |
|
| 131 |
- out = dsterr |
|
| 132 |
- case Systemerr: |
|
| 133 |
- // If we're on Systemerr, we won't write anywhere. |
|
| 134 |
- // NB: if this code changes later, make sure you don't try to write |
|
| 135 |
- // to outstream if Systemerr is the stream |
|
| 136 |
- out = nil |
|
| 137 |
- default: |
|
| 138 |
- return 0, fmt.Errorf("Unrecognized input header: %d", buf[stdWriterFdIndex])
|
|
| 139 |
- } |
|
| 140 |
- |
|
| 141 |
- // Retrieve the size of the frame |
|
| 142 |
- frameSize = int(binary.BigEndian.Uint32(buf[stdWriterSizeIndex : stdWriterSizeIndex+4])) |
|
| 143 |
- |
|
| 144 |
- // Check if the buffer is big enough to read the frame. |
|
| 145 |
- // Extend it if necessary. |
|
| 146 |
- if frameSize+stdWriterPrefixLen > bufLen {
|
|
| 147 |
- buf = append(buf, make([]byte, frameSize+stdWriterPrefixLen-bufLen+1)...) |
|
| 148 |
- bufLen = len(buf) |
|
| 149 |
- } |
|
| 150 |
- |
|
| 151 |
- // While the amount of bytes read is less than the size of the frame + header, we keep reading |
|
| 152 |
- for nr < frameSize+stdWriterPrefixLen {
|
|
| 153 |
- var nr2 int |
|
| 154 |
- nr2, err = src.Read(buf[nr:]) |
|
| 155 |
- nr += nr2 |
|
| 156 |
- if errors.Is(err, io.EOF) {
|
|
| 157 |
- if nr < frameSize+stdWriterPrefixLen {
|
|
| 158 |
- return written, nil |
|
| 159 |
- } |
|
| 160 |
- break |
|
| 161 |
- } |
|
| 162 |
- if err != nil {
|
|
| 163 |
- return 0, err |
|
| 164 |
- } |
|
| 165 |
- } |
|
| 166 |
- |
|
| 167 |
- // we might have an error from the source mixed up in our multiplexed |
|
| 168 |
- // stream. if we do, return it. |
|
| 169 |
- if stream == Systemerr {
|
|
| 170 |
- return written, fmt.Errorf("error from daemon in stream: %s", string(buf[stdWriterPrefixLen:frameSize+stdWriterPrefixLen]))
|
|
| 171 |
- } |
|
| 172 |
- |
|
| 173 |
- // Write the retrieved frame (without header) |
|
| 174 |
- nw, err = out.Write(buf[stdWriterPrefixLen : frameSize+stdWriterPrefixLen]) |
|
| 175 |
- if err != nil {
|
|
| 176 |
- return 0, err |
|
| 177 |
- } |
|
| 178 |
- |
|
| 179 |
- // If the frame has not been fully written: error |
|
| 180 |
- if nw != frameSize {
|
|
| 181 |
- return 0, io.ErrShortWrite |
|
| 182 |
- } |
|
| 183 |
- written += int64(nw) |
|
| 184 |
- |
|
| 185 |
- // Move the rest of the buffer to the beginning |
|
| 186 |
- copy(buf, buf[frameSize+stdWriterPrefixLen:]) |
|
| 187 |
- // Move the index |
|
| 188 |
- nr -= frameSize + stdWriterPrefixLen |
|
| 189 |
- } |
|
| 190 |
-} |
| 191 | 1 |
new file mode 100644 |
| ... | ... |
@@ -0,0 +1,35 @@ |
| 0 |
+package stdcopy // Deprecated: use [github.com/docker/docker/api/stdcopy] instead. |
|
| 1 |
+ |
|
| 2 |
+import ( |
|
| 3 |
+ "io" |
|
| 4 |
+ |
|
| 5 |
+ "github.com/moby/moby/api/stdcopy" |
|
| 6 |
+) |
|
| 7 |
+ |
|
| 8 |
+// TODO(thaJeztah): remove exception in hack/make.ps1 and hack/validate/pkg-imports when removing. |
|
| 9 |
+ |
|
| 10 |
+// StdType is the type of standard stream |
|
| 11 |
+// a writer can multiplex to. |
|
| 12 |
+// |
|
| 13 |
+// Deprecated: use [stdcopy.StdType]. This alias will be removed in the next release. |
|
| 14 |
+type StdType = stdcopy.StdType |
|
| 15 |
+ |
|
| 16 |
+const ( |
|
| 17 |
+ Stdin = stdcopy.Stdin // Deprecated: use [stdcopy.Stderr]. This alias will be removed in the next release. |
|
| 18 |
+ Stdout = stdcopy.Stdout // Deprecated: use [stdcopy.Stdout]. This alias will be removed in the next release. |
|
| 19 |
+ Stderr = stdcopy.Stderr // Deprecated: use [stdcopy.Stderr]. This alias will be removed in the next release. |
|
| 20 |
+) |
|
| 21 |
+ |
|
| 22 |
+// NewStdWriter instantiates a new Writer. |
|
| 23 |
+// |
|
| 24 |
+// Deprecated: use [stdcopy.NewStdWriter]. This alias will be removed in the next release. |
|
| 25 |
+func NewStdWriter(w io.Writer, t stdcopy.StdType) io.Writer {
|
|
| 26 |
+ return stdcopy.NewStdWriter(w, t) |
|
| 27 |
+} |
|
| 28 |
+ |
|
| 29 |
+// StdCopy is a modified version of io.Copy. |
|
| 30 |
+// |
|
| 31 |
+// Deprecated: use [stdcopy.StdCopy]. This alias will be removed in the next release. |
|
| 32 |
+func StdCopy(dstout, dsterr io.Writer, src io.Reader) (written int64, _ error) {
|
|
| 33 |
+ return stdcopy.StdCopy(dstout, dsterr, src) |
|
| 34 |
+} |
| 0 | 35 |
deleted file mode 100644 |
| ... | ... |
@@ -1,292 +0,0 @@ |
| 1 |
-package stdcopy |
|
| 2 |
- |
|
| 3 |
-import ( |
|
| 4 |
- "bytes" |
|
| 5 |
- "errors" |
|
| 6 |
- "io" |
|
| 7 |
- "strings" |
|
| 8 |
- "testing" |
|
| 9 |
-) |
|
| 10 |
- |
|
| 11 |
-func TestNewStdWriter(t *testing.T) {
|
|
| 12 |
- writer := NewStdWriter(io.Discard, Stdout) |
|
| 13 |
- if writer == nil {
|
|
| 14 |
- t.Fatalf("NewStdWriter with an invalid StdType should not return nil.")
|
|
| 15 |
- } |
|
| 16 |
-} |
|
| 17 |
- |
|
| 18 |
-func TestWriteWithUninitializedStdWriter(t *testing.T) {
|
|
| 19 |
- writer := stdWriter{
|
|
| 20 |
- Writer: nil, |
|
| 21 |
- prefix: byte(Stdout), |
|
| 22 |
- } |
|
| 23 |
- n, err := writer.Write([]byte("Something here"))
|
|
| 24 |
- if n != 0 || err == nil {
|
|
| 25 |
- t.Fatalf("Should fail when given an incomplete or uninitialized StdWriter")
|
|
| 26 |
- } |
|
| 27 |
-} |
|
| 28 |
- |
|
| 29 |
-func TestWriteWithNilBytes(t *testing.T) {
|
|
| 30 |
- writer := NewStdWriter(io.Discard, Stdout) |
|
| 31 |
- n, err := writer.Write(nil) |
|
| 32 |
- if err != nil {
|
|
| 33 |
- t.Fatalf("Shouldn't have fail when given no data")
|
|
| 34 |
- } |
|
| 35 |
- if n > 0 {
|
|
| 36 |
- t.Fatalf("Write should have written 0 byte, but has written %d", n)
|
|
| 37 |
- } |
|
| 38 |
-} |
|
| 39 |
- |
|
| 40 |
-func TestWrite(t *testing.T) {
|
|
| 41 |
- writer := NewStdWriter(io.Discard, Stdout) |
|
| 42 |
- data := []byte("Test StdWrite.Write")
|
|
| 43 |
- n, err := writer.Write(data) |
|
| 44 |
- if err != nil {
|
|
| 45 |
- t.Fatalf("Error while writing with StdWrite")
|
|
| 46 |
- } |
|
| 47 |
- if n != len(data) {
|
|
| 48 |
- t.Fatalf("Write should have written %d byte but wrote %d.", len(data), n)
|
|
| 49 |
- } |
|
| 50 |
-} |
|
| 51 |
- |
|
| 52 |
-type errWriter struct {
|
|
| 53 |
- n int |
|
| 54 |
- err error |
|
| 55 |
-} |
|
| 56 |
- |
|
| 57 |
-func (f *errWriter) Write(buf []byte) (int, error) {
|
|
| 58 |
- return f.n, f.err |
|
| 59 |
-} |
|
| 60 |
- |
|
| 61 |
-func TestWriteWithWriterError(t *testing.T) {
|
|
| 62 |
- expectedError := errors.New("expected")
|
|
| 63 |
- expectedReturnedBytes := 10 |
|
| 64 |
- writer := NewStdWriter(&errWriter{
|
|
| 65 |
- n: stdWriterPrefixLen + expectedReturnedBytes, |
|
| 66 |
- err: expectedError, |
|
| 67 |
- }, Stdout) |
|
| 68 |
- data := []byte("This won't get written, sigh")
|
|
| 69 |
- n, err := writer.Write(data) |
|
| 70 |
- if !errors.Is(err, expectedError) {
|
|
| 71 |
- t.Fatalf("Didn't get expected error.")
|
|
| 72 |
- } |
|
| 73 |
- if n != expectedReturnedBytes {
|
|
| 74 |
- t.Fatalf("Didn't get expected written bytes %d, got %d.",
|
|
| 75 |
- expectedReturnedBytes, n) |
|
| 76 |
- } |
|
| 77 |
-} |
|
| 78 |
- |
|
| 79 |
-func TestWriteDoesNotReturnNegativeWrittenBytes(t *testing.T) {
|
|
| 80 |
- writer := NewStdWriter(&errWriter{n: -1}, Stdout)
|
|
| 81 |
- data := []byte("This won't get written, sigh")
|
|
| 82 |
- actual, _ := writer.Write(data) |
|
| 83 |
- if actual != 0 {
|
|
| 84 |
- t.Fatalf("Expected returned written bytes equal to 0, got %d", actual)
|
|
| 85 |
- } |
|
| 86 |
-} |
|
| 87 |
- |
|
| 88 |
-func getSrcBuffer(stdOutBytes, stdErrBytes []byte) (*bytes.Buffer, error) {
|
|
| 89 |
- buffer := new(bytes.Buffer) |
|
| 90 |
- dstOut := NewStdWriter(buffer, Stdout) |
|
| 91 |
- _, err := dstOut.Write(stdOutBytes) |
|
| 92 |
- if err != nil {
|
|
| 93 |
- return buffer, err |
|
| 94 |
- } |
|
| 95 |
- dstErr := NewStdWriter(buffer, Stderr) |
|
| 96 |
- _, err = dstErr.Write(stdErrBytes) |
|
| 97 |
- return buffer, err |
|
| 98 |
-} |
|
| 99 |
- |
|
| 100 |
-func TestStdCopyWriteAndRead(t *testing.T) {
|
|
| 101 |
- stdOutBytes := []byte(strings.Repeat("o", startingBufLen))
|
|
| 102 |
- stdErrBytes := []byte(strings.Repeat("e", startingBufLen))
|
|
| 103 |
- buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes) |
|
| 104 |
- if err != nil {
|
|
| 105 |
- t.Fatal(err) |
|
| 106 |
- } |
|
| 107 |
- written, err := StdCopy(io.Discard, io.Discard, buffer) |
|
| 108 |
- if err != nil {
|
|
| 109 |
- t.Fatal(err) |
|
| 110 |
- } |
|
| 111 |
- expectedTotalWritten := len(stdOutBytes) + len(stdErrBytes) |
|
| 112 |
- if written != int64(expectedTotalWritten) {
|
|
| 113 |
- t.Fatalf("Expected to have total of %d bytes written, got %d", expectedTotalWritten, written)
|
|
| 114 |
- } |
|
| 115 |
-} |
|
| 116 |
- |
|
| 117 |
-type customReader struct {
|
|
| 118 |
- n int |
|
| 119 |
- err error |
|
| 120 |
- totalCalls int |
|
| 121 |
- correctCalls int |
|
| 122 |
- src *bytes.Buffer |
|
| 123 |
-} |
|
| 124 |
- |
|
| 125 |
-func (f *customReader) Read(buf []byte) (int, error) {
|
|
| 126 |
- f.totalCalls++ |
|
| 127 |
- if f.totalCalls <= f.correctCalls {
|
|
| 128 |
- return f.src.Read(buf) |
|
| 129 |
- } |
|
| 130 |
- return f.n, f.err |
|
| 131 |
-} |
|
| 132 |
- |
|
| 133 |
-func TestStdCopyReturnsErrorReadingHeader(t *testing.T) {
|
|
| 134 |
- expectedError := errors.New("error")
|
|
| 135 |
- reader := &customReader{
|
|
| 136 |
- err: expectedError, |
|
| 137 |
- } |
|
| 138 |
- written, err := StdCopy(io.Discard, io.Discard, reader) |
|
| 139 |
- if written != 0 {
|
|
| 140 |
- t.Fatalf("Expected 0 bytes read, got %d", written)
|
|
| 141 |
- } |
|
| 142 |
- if !errors.Is(err, expectedError) {
|
|
| 143 |
- t.Fatalf("Didn't get expected error")
|
|
| 144 |
- } |
|
| 145 |
-} |
|
| 146 |
- |
|
| 147 |
-func TestStdCopyReturnsErrorReadingFrame(t *testing.T) {
|
|
| 148 |
- expectedError := errors.New("error")
|
|
| 149 |
- stdOutBytes := []byte(strings.Repeat("o", startingBufLen))
|
|
| 150 |
- stdErrBytes := []byte(strings.Repeat("e", startingBufLen))
|
|
| 151 |
- buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes) |
|
| 152 |
- if err != nil {
|
|
| 153 |
- t.Fatal(err) |
|
| 154 |
- } |
|
| 155 |
- reader := &customReader{
|
|
| 156 |
- correctCalls: 1, |
|
| 157 |
- n: stdWriterPrefixLen + 1, |
|
| 158 |
- err: expectedError, |
|
| 159 |
- src: buffer, |
|
| 160 |
- } |
|
| 161 |
- written, err := StdCopy(io.Discard, io.Discard, reader) |
|
| 162 |
- if written != 0 {
|
|
| 163 |
- t.Fatalf("Expected 0 bytes read, got %d", written)
|
|
| 164 |
- } |
|
| 165 |
- if !errors.Is(err, expectedError) {
|
|
| 166 |
- t.Fatalf("Didn't get expected error")
|
|
| 167 |
- } |
|
| 168 |
-} |
|
| 169 |
- |
|
| 170 |
-func TestStdCopyDetectsCorruptedFrame(t *testing.T) {
|
|
| 171 |
- stdOutBytes := []byte(strings.Repeat("o", startingBufLen))
|
|
| 172 |
- stdErrBytes := []byte(strings.Repeat("e", startingBufLen))
|
|
| 173 |
- buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes) |
|
| 174 |
- if err != nil {
|
|
| 175 |
- t.Fatal(err) |
|
| 176 |
- } |
|
| 177 |
- reader := &customReader{
|
|
| 178 |
- correctCalls: 1, |
|
| 179 |
- n: stdWriterPrefixLen + 1, |
|
| 180 |
- err: io.EOF, |
|
| 181 |
- src: buffer, |
|
| 182 |
- } |
|
| 183 |
- written, err := StdCopy(io.Discard, io.Discard, reader) |
|
| 184 |
- if written != startingBufLen {
|
|
| 185 |
- t.Fatalf("Expected %d bytes read, got %d", startingBufLen, written)
|
|
| 186 |
- } |
|
| 187 |
- if err != nil {
|
|
| 188 |
- t.Fatal("Didn't get nil error")
|
|
| 189 |
- } |
|
| 190 |
-} |
|
| 191 |
- |
|
| 192 |
-func TestStdCopyWithInvalidInputHeader(t *testing.T) {
|
|
| 193 |
- dstOut := NewStdWriter(io.Discard, Stdout) |
|
| 194 |
- dstErr := NewStdWriter(io.Discard, Stderr) |
|
| 195 |
- src := strings.NewReader("Invalid input")
|
|
| 196 |
- _, err := StdCopy(dstOut, dstErr, src) |
|
| 197 |
- if err == nil {
|
|
| 198 |
- t.Fatal("StdCopy with invalid input header should fail.")
|
|
| 199 |
- } |
|
| 200 |
-} |
|
| 201 |
- |
|
| 202 |
-func TestStdCopyWithCorruptedPrefix(t *testing.T) {
|
|
| 203 |
- data := []byte{0x01, 0x02, 0x03}
|
|
| 204 |
- src := bytes.NewReader(data) |
|
| 205 |
- written, err := StdCopy(nil, nil, src) |
|
| 206 |
- if err != nil {
|
|
| 207 |
- t.Fatalf("StdCopy should not return an error with corrupted prefix.")
|
|
| 208 |
- } |
|
| 209 |
- if written != 0 {
|
|
| 210 |
- t.Fatalf("StdCopy should have written 0, but has written %d", written)
|
|
| 211 |
- } |
|
| 212 |
-} |
|
| 213 |
- |
|
| 214 |
-func TestStdCopyReturnsWriteErrors(t *testing.T) {
|
|
| 215 |
- stdOutBytes := []byte(strings.Repeat("o", startingBufLen))
|
|
| 216 |
- stdErrBytes := []byte(strings.Repeat("e", startingBufLen))
|
|
| 217 |
- buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes) |
|
| 218 |
- if err != nil {
|
|
| 219 |
- t.Fatal(err) |
|
| 220 |
- } |
|
| 221 |
- expectedError := errors.New("expected")
|
|
| 222 |
- |
|
| 223 |
- dstOut := &errWriter{err: expectedError}
|
|
| 224 |
- |
|
| 225 |
- written, err := StdCopy(dstOut, io.Discard, buffer) |
|
| 226 |
- if written != 0 {
|
|
| 227 |
- t.Fatalf("StdCopy should have written 0, but has written %d", written)
|
|
| 228 |
- } |
|
| 229 |
- if !errors.Is(err, expectedError) {
|
|
| 230 |
- t.Fatalf("Didn't get expected error, got %v", err)
|
|
| 231 |
- } |
|
| 232 |
-} |
|
| 233 |
- |
|
| 234 |
-func TestStdCopyDetectsNotFullyWrittenFrames(t *testing.T) {
|
|
| 235 |
- stdOutBytes := []byte(strings.Repeat("o", startingBufLen))
|
|
| 236 |
- stdErrBytes := []byte(strings.Repeat("e", startingBufLen))
|
|
| 237 |
- buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes) |
|
| 238 |
- if err != nil {
|
|
| 239 |
- t.Fatal(err) |
|
| 240 |
- } |
|
| 241 |
- dstOut := &errWriter{n: startingBufLen - 10}
|
|
| 242 |
- |
|
| 243 |
- written, err := StdCopy(dstOut, io.Discard, buffer) |
|
| 244 |
- if written != 0 {
|
|
| 245 |
- t.Fatalf("StdCopy should have return 0 written bytes, but returned %d", written)
|
|
| 246 |
- } |
|
| 247 |
- if !errors.Is(err, io.ErrShortWrite) {
|
|
| 248 |
- t.Fatalf("Didn't get expected io.ErrShortWrite error")
|
|
| 249 |
- } |
|
| 250 |
-} |
|
| 251 |
- |
|
| 252 |
-// TestStdCopyReturnsErrorFromSystem tests that StdCopy correctly returns an |
|
| 253 |
-// error, when that error is muxed into the Systemerr stream. |
|
| 254 |
-func TestStdCopyReturnsErrorFromSystem(t *testing.T) {
|
|
| 255 |
- // write in the basic messages, just so there's some fluff in there |
|
| 256 |
- stdOutBytes := []byte(strings.Repeat("o", startingBufLen))
|
|
| 257 |
- stdErrBytes := []byte(strings.Repeat("e", startingBufLen))
|
|
| 258 |
- buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes) |
|
| 259 |
- if err != nil {
|
|
| 260 |
- t.Fatal(err) |
|
| 261 |
- } |
|
| 262 |
- // add in an error message on the Systemerr stream |
|
| 263 |
- systemErrBytes := []byte(strings.Repeat("S", startingBufLen))
|
|
| 264 |
- systemWriter := NewStdWriter(buffer, Systemerr) |
|
| 265 |
- _, err = systemWriter.Write(systemErrBytes) |
|
| 266 |
- if err != nil {
|
|
| 267 |
- t.Fatal(err) |
|
| 268 |
- } |
|
| 269 |
- |
|
| 270 |
- // now copy and demux. we should expect an error containing the string we |
|
| 271 |
- // wrote out |
|
| 272 |
- _, err = StdCopy(io.Discard, io.Discard, buffer) |
|
| 273 |
- if err == nil {
|
|
| 274 |
- t.Fatal("expected error, got none")
|
|
| 275 |
- } |
|
| 276 |
- if !strings.Contains(err.Error(), string(systemErrBytes)) {
|
|
| 277 |
- t.Fatal("expected error to contain message")
|
|
| 278 |
- } |
|
| 279 |
-} |
|
| 280 |
- |
|
| 281 |
-func BenchmarkWrite(b *testing.B) {
|
|
| 282 |
- w := NewStdWriter(io.Discard, Stdout) |
|
| 283 |
- data := []byte("Test line for testing stdwriter performance\n")
|
|
| 284 |
- data = bytes.Repeat(data, 100) |
|
| 285 |
- b.SetBytes(int64(len(data))) |
|
| 286 |
- b.ResetTimer() |
|
| 287 |
- for i := 0; i < b.N; i++ {
|
|
| 288 |
- if _, err := w.Write(data); err != nil {
|
|
| 289 |
- b.Fatal(err) |
|
| 290 |
- } |
|
| 291 |
- } |
|
| 292 |
-} |
| 293 | 1 |
new file mode 100644 |
| ... | ... |
@@ -0,0 +1,190 @@ |
| 0 |
+package stdcopy |
|
| 1 |
+ |
|
| 2 |
+import ( |
|
| 3 |
+ "bytes" |
|
| 4 |
+ "encoding/binary" |
|
| 5 |
+ "errors" |
|
| 6 |
+ "fmt" |
|
| 7 |
+ "io" |
|
| 8 |
+ "sync" |
|
| 9 |
+) |
|
| 10 |
+ |
|
| 11 |
+// StdType is the type of standard stream |
|
| 12 |
+// a writer can multiplex to. |
|
| 13 |
+type StdType byte |
|
| 14 |
+ |
|
| 15 |
+const ( |
|
| 16 |
+ // Stdin represents standard input stream type. |
|
| 17 |
+ Stdin StdType = iota |
|
| 18 |
+ // Stdout represents standard output stream type. |
|
| 19 |
+ Stdout |
|
| 20 |
+ // Stderr represents standard error steam type. |
|
| 21 |
+ Stderr |
|
| 22 |
+ // Systemerr represents errors originating from the system that make it |
|
| 23 |
+ // into the multiplexed stream. |
|
| 24 |
+ Systemerr |
|
| 25 |
+ |
|
| 26 |
+ stdWriterPrefixLen = 8 |
|
| 27 |
+ stdWriterFdIndex = 0 |
|
| 28 |
+ stdWriterSizeIndex = 4 |
|
| 29 |
+ |
|
| 30 |
+ startingBufLen = 32*1024 + stdWriterPrefixLen + 1 |
|
| 31 |
+) |
|
| 32 |
+ |
|
| 33 |
+var bufPool = &sync.Pool{New: func() interface{} { return bytes.NewBuffer(nil) }}
|
|
| 34 |
+ |
|
| 35 |
+// stdWriter is wrapper of io.Writer with extra customized info. |
|
| 36 |
+type stdWriter struct {
|
|
| 37 |
+ io.Writer |
|
| 38 |
+ prefix byte |
|
| 39 |
+} |
|
| 40 |
+ |
|
| 41 |
+// Write sends the buffer to the underneath writer. |
|
| 42 |
+// It inserts the prefix header before the buffer, |
|
| 43 |
+// so stdcopy.StdCopy knows where to multiplex the output. |
|
| 44 |
+// It makes stdWriter to implement io.Writer. |
|
| 45 |
+func (w *stdWriter) Write(p []byte) (int, error) {
|
|
| 46 |
+ if w == nil || w.Writer == nil {
|
|
| 47 |
+ return 0, errors.New("writer not instantiated")
|
|
| 48 |
+ } |
|
| 49 |
+ if p == nil {
|
|
| 50 |
+ return 0, nil |
|
| 51 |
+ } |
|
| 52 |
+ |
|
| 53 |
+ header := [stdWriterPrefixLen]byte{stdWriterFdIndex: w.prefix}
|
|
| 54 |
+ binary.BigEndian.PutUint32(header[stdWriterSizeIndex:], uint32(len(p))) |
|
| 55 |
+ buf := bufPool.Get().(*bytes.Buffer) |
|
| 56 |
+ buf.Write(header[:]) |
|
| 57 |
+ buf.Write(p) |
|
| 58 |
+ |
|
| 59 |
+ n, err := w.Writer.Write(buf.Bytes()) |
|
| 60 |
+ n -= stdWriterPrefixLen |
|
| 61 |
+ if n < 0 {
|
|
| 62 |
+ n = 0 |
|
| 63 |
+ } |
|
| 64 |
+ |
|
| 65 |
+ buf.Reset() |
|
| 66 |
+ bufPool.Put(buf) |
|
| 67 |
+ return n, err |
|
| 68 |
+} |
|
| 69 |
+ |
|
| 70 |
+// NewStdWriter instantiates a new Writer. |
|
| 71 |
+// Everything written to it will be encapsulated using a custom format, |
|
| 72 |
+// and written to the underlying `w` stream. |
|
| 73 |
+// This allows multiple write streams (e.g. stdout and stderr) to be muxed into a single connection. |
|
| 74 |
+// `t` indicates the id of the stream to encapsulate. |
|
| 75 |
+// It can be stdcopy.Stdin, stdcopy.Stdout, stdcopy.Stderr. |
|
| 76 |
+func NewStdWriter(w io.Writer, t StdType) io.Writer {
|
|
| 77 |
+ return &stdWriter{
|
|
| 78 |
+ Writer: w, |
|
| 79 |
+ prefix: byte(t), |
|
| 80 |
+ } |
|
| 81 |
+} |
|
| 82 |
+ |
|
| 83 |
+// StdCopy is a modified version of io.Copy. |
|
| 84 |
+// |
|
| 85 |
+// StdCopy will demultiplex `src`, assuming that it contains two streams, |
|
| 86 |
+// previously multiplexed together using a StdWriter instance. |
|
| 87 |
+// As it reads from `src`, StdCopy will write to `dstout` and `dsterr`. |
|
| 88 |
+// |
|
| 89 |
+// StdCopy will read until it hits EOF on `src`. It will then return a nil error. |
|
| 90 |
+// In other words: if `err` is non nil, it indicates a real underlying error. |
|
| 91 |
+// |
|
| 92 |
+// `written` will hold the total number of bytes written to `dstout` and `dsterr`. |
|
| 93 |
+func StdCopy(dstout, dsterr io.Writer, src io.Reader) (written int64, _ error) {
|
|
| 94 |
+ var ( |
|
| 95 |
+ buf = make([]byte, startingBufLen) |
|
| 96 |
+ bufLen = len(buf) |
|
| 97 |
+ nr, nw int |
|
| 98 |
+ err error |
|
| 99 |
+ out io.Writer |
|
| 100 |
+ frameSize int |
|
| 101 |
+ ) |
|
| 102 |
+ |
|
| 103 |
+ for {
|
|
| 104 |
+ // Make sure we have at least a full header |
|
| 105 |
+ for nr < stdWriterPrefixLen {
|
|
| 106 |
+ var nr2 int |
|
| 107 |
+ nr2, err = src.Read(buf[nr:]) |
|
| 108 |
+ nr += nr2 |
|
| 109 |
+ if errors.Is(err, io.EOF) {
|
|
| 110 |
+ if nr < stdWriterPrefixLen {
|
|
| 111 |
+ return written, nil |
|
| 112 |
+ } |
|
| 113 |
+ break |
|
| 114 |
+ } |
|
| 115 |
+ if err != nil {
|
|
| 116 |
+ return 0, err |
|
| 117 |
+ } |
|
| 118 |
+ } |
|
| 119 |
+ |
|
| 120 |
+ stream := StdType(buf[stdWriterFdIndex]) |
|
| 121 |
+ // Check the first byte to know where to write |
|
| 122 |
+ switch stream {
|
|
| 123 |
+ case Stdin: |
|
| 124 |
+ fallthrough |
|
| 125 |
+ case Stdout: |
|
| 126 |
+ // Write on stdout |
|
| 127 |
+ out = dstout |
|
| 128 |
+ case Stderr: |
|
| 129 |
+ // Write on stderr |
|
| 130 |
+ out = dsterr |
|
| 131 |
+ case Systemerr: |
|
| 132 |
+ // If we're on Systemerr, we won't write anywhere. |
|
| 133 |
+ // NB: if this code changes later, make sure you don't try to write |
|
| 134 |
+ // to outstream if Systemerr is the stream |
|
| 135 |
+ out = nil |
|
| 136 |
+ default: |
|
| 137 |
+ return 0, fmt.Errorf("Unrecognized input header: %d", buf[stdWriterFdIndex])
|
|
| 138 |
+ } |
|
| 139 |
+ |
|
| 140 |
+ // Retrieve the size of the frame |
|
| 141 |
+ frameSize = int(binary.BigEndian.Uint32(buf[stdWriterSizeIndex : stdWriterSizeIndex+4])) |
|
| 142 |
+ |
|
| 143 |
+ // Check if the buffer is big enough to read the frame. |
|
| 144 |
+ // Extend it if necessary. |
|
| 145 |
+ if frameSize+stdWriterPrefixLen > bufLen {
|
|
| 146 |
+ buf = append(buf, make([]byte, frameSize+stdWriterPrefixLen-bufLen+1)...) |
|
| 147 |
+ bufLen = len(buf) |
|
| 148 |
+ } |
|
| 149 |
+ |
|
| 150 |
+ // While the amount of bytes read is less than the size of the frame + header, we keep reading |
|
| 151 |
+ for nr < frameSize+stdWriterPrefixLen {
|
|
| 152 |
+ var nr2 int |
|
| 153 |
+ nr2, err = src.Read(buf[nr:]) |
|
| 154 |
+ nr += nr2 |
|
| 155 |
+ if errors.Is(err, io.EOF) {
|
|
| 156 |
+ if nr < frameSize+stdWriterPrefixLen {
|
|
| 157 |
+ return written, nil |
|
| 158 |
+ } |
|
| 159 |
+ break |
|
| 160 |
+ } |
|
| 161 |
+ if err != nil {
|
|
| 162 |
+ return 0, err |
|
| 163 |
+ } |
|
| 164 |
+ } |
|
| 165 |
+ |
|
| 166 |
+ // we might have an error from the source mixed up in our multiplexed |
|
| 167 |
+ // stream. if we do, return it. |
|
| 168 |
+ if stream == Systemerr {
|
|
| 169 |
+ return written, fmt.Errorf("error from daemon in stream: %s", string(buf[stdWriterPrefixLen:frameSize+stdWriterPrefixLen]))
|
|
| 170 |
+ } |
|
| 171 |
+ |
|
| 172 |
+ // Write the retrieved frame (without header) |
|
| 173 |
+ nw, err = out.Write(buf[stdWriterPrefixLen : frameSize+stdWriterPrefixLen]) |
|
| 174 |
+ if err != nil {
|
|
| 175 |
+ return 0, err |
|
| 176 |
+ } |
|
| 177 |
+ |
|
| 178 |
+ // If the frame has not been fully written: error |
|
| 179 |
+ if nw != frameSize {
|
|
| 180 |
+ return 0, io.ErrShortWrite |
|
| 181 |
+ } |
|
| 182 |
+ written += int64(nw) |
|
| 183 |
+ |
|
| 184 |
+ // Move the rest of the buffer to the beginning |
|
| 185 |
+ copy(buf, buf[frameSize+stdWriterPrefixLen:]) |
|
| 186 |
+ // Move the index |
|
| 187 |
+ nr -= frameSize + stdWriterPrefixLen |
|
| 188 |
+ } |
|
| 189 |
+} |
| ... | ... |
@@ -31,7 +31,7 @@ import ( |
| 31 | 31 |
// SIZE1, SIZE2, SIZE3, and SIZE4 are four bytes of uint32 encoded as big endian. |
| 32 | 32 |
// This is the size of OUTPUT. |
| 33 | 33 |
// |
| 34 |
-// You can use github.com/docker/docker/pkg/stdcopy.StdCopy to demultiplex this |
|
| 34 |
+// You can use github.com/moby/moby/api/stdcopy.StdCopy to demultiplex this |
|
| 35 | 35 |
// stream. |
| 36 | 36 |
func (cli *Client) ContainerAttach(ctx context.Context, containerID string, options container.AttachOptions) (types.HijackedResponse, error) {
|
| 37 | 37 |
containerID, err := trimID("container", containerID)
|
| ... | ... |
@@ -68,7 +68,7 @@ func (cli *Client) ContainerExecStart(ctx context.Context, execID string, config |
| 68 | 68 |
// - If the container is *not* using a TTY, streams for stdout and stderr are |
| 69 | 69 |
// multiplexed. |
| 70 | 70 |
// |
| 71 |
-// You can use [github.com/docker/docker/pkg/stdcopy.StdCopy] to demultiplex this |
|
| 71 |
+// You can use [github.com/moby/moby/api/stdcopy.StdCopy] to demultiplex this |
|
| 72 | 72 |
// stream. Refer to [Client.ContainerAttach] for details about the multiplexed |
| 73 | 73 |
// stream. |
| 74 | 74 |
func (cli *Client) ContainerExecAttach(ctx context.Context, execID string, config container.ExecAttachOptions) (types.HijackedResponse, error) {
|
| ... | ... |
@@ -31,7 +31,7 @@ import ( |
| 31 | 31 |
// SIZE1, SIZE2, SIZE3, and SIZE4 are four bytes of uint32 encoded as big endian. |
| 32 | 32 |
// This is the size of OUTPUT. |
| 33 | 33 |
// |
| 34 |
-// You can use github.com/docker/docker/pkg/stdcopy.StdCopy to demultiplex this |
|
| 34 |
+// You can use github.com/moby/moby/api/stdcopy.StdCopy to demultiplex this |
|
| 35 | 35 |
// stream. |
| 36 | 36 |
func (cli *Client) ContainerLogs(ctx context.Context, containerID string, options container.LogsOptions) (io.ReadCloser, error) {
|
| 37 | 37 |
containerID, err := trimID("container", containerID)
|
| ... | ... |
@@ -939,6 +939,7 @@ github.com/moby/locker |
| 939 | 939 |
# github.com/moby/moby/api v0.0.0 => ./api |
| 940 | 940 |
## explicit; go 1.23.0 |
| 941 | 941 |
github.com/moby/moby/api |
| 942 |
+github.com/moby/moby/api/stdcopy |
|
| 942 | 943 |
github.com/moby/moby/api/types |
| 943 | 944 |
github.com/moby/moby/api/types/auxprogress |
| 944 | 945 |
github.com/moby/moby/api/types/backend |