Browse code

daemon/server/httputils: move WriteLogStream to its own package

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>

Sebastiaan van Stijn authored on 2026/02/22 05:58:08
Showing 4 changed files
1 1
new file mode 100644
... ...
@@ -0,0 +1,92 @@
0
+package logstream
1
+
2
+import (
3
+	"context"
4
+	"fmt"
5
+	"io"
6
+	"net/http"
7
+	"net/url"
8
+	"sort"
9
+
10
+	"github.com/moby/moby/api/pkg/stdcopy"
11
+	"github.com/moby/moby/v2/daemon/internal/stdcopymux"
12
+	"github.com/moby/moby/v2/daemon/server/backend"
13
+	"github.com/moby/moby/v2/pkg/ioutils"
14
+)
15
+
16
+// rfc3339NanoFixed is time.RFC3339Nano with nanoseconds padded using zeros to
17
+// ensure the formatted time isalways the same number of characters.
18
+const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
19
+
20
+// Write writes an encoded byte stream of log messages from the
21
+// messages channel, multiplexing them with a stdcopy.Writer if mux is true
22
+func Write(_ context.Context, w http.ResponseWriter, msgs <-chan *backend.LogMessage, config *backend.ContainerLogsOptions, mux bool) {
23
+	// See https://github.com/moby/moby/issues/47448
24
+	// Trigger headers to be written immediately.
25
+	w.WriteHeader(http.StatusOK)
26
+
27
+	wf := ioutils.NewWriteFlusher(w)
28
+	defer wf.Close()
29
+
30
+	wf.Flush()
31
+
32
+	outStream := io.Writer(wf)
33
+	errStream := outStream
34
+	sysErrStream := errStream
35
+	if mux {
36
+		sysErrStream = stdcopymux.NewStdWriter(outStream, stdcopy.Systemerr)
37
+		errStream = stdcopymux.NewStdWriter(outStream, stdcopy.Stderr)
38
+		outStream = stdcopymux.NewStdWriter(outStream, stdcopy.Stdout)
39
+	}
40
+
41
+	for {
42
+		msg, ok := <-msgs
43
+		if !ok {
44
+			return
45
+		}
46
+		// check if the message contains an error. if so, write that error
47
+		// and exit
48
+		if msg.Err != nil {
49
+			fmt.Fprintf(sysErrStream, "Error grabbing logs: %v\n", msg.Err)
50
+			continue
51
+		}
52
+		logLine := msg.Line
53
+		if config.Details {
54
+			logLine = append(attrsByteSlice(msg.Attrs), ' ')
55
+			logLine = append(logLine, msg.Line...)
56
+		}
57
+		if config.Timestamps {
58
+			logLine = append([]byte(msg.Timestamp.Format(rfc3339NanoFixed)+" "), logLine...)
59
+		}
60
+		if msg.Source == "stdout" && config.ShowStdout {
61
+			_, _ = outStream.Write(logLine)
62
+		}
63
+		if msg.Source == "stderr" && config.ShowStderr {
64
+			_, _ = errStream.Write(logLine)
65
+		}
66
+	}
67
+}
68
+
69
+type byKey []backend.LogAttr
70
+
71
+func (b byKey) Len() int           { return len(b) }
72
+func (b byKey) Less(i, j int) bool { return b[i].Key < b[j].Key }
73
+func (b byKey) Swap(i, j int)      { b[i], b[j] = b[j], b[i] }
74
+
75
+func attrsByteSlice(a []backend.LogAttr) []byte {
76
+	// Note this sorts "a" in-place. That is fine here - nothing else is
77
+	// going to use Attrs or care about the order.
78
+	sort.Sort(byKey(a))
79
+
80
+	var ret []byte
81
+	for i, pair := range a {
82
+		k, v := url.QueryEscape(pair.Key), url.QueryEscape(pair.Value)
83
+		ret = append(ret, []byte(k)...)
84
+		ret = append(ret, '=')
85
+		ret = append(ret, []byte(v)...)
86
+		if i != len(a)-1 {
87
+			ret = append(ret, ',')
88
+		}
89
+	}
90
+	return ret
91
+}
0 92
deleted file mode 100644
... ...
@@ -1,92 +0,0 @@
1
-package httputils
2
-
3
-import (
4
-	"context"
5
-	"fmt"
6
-	"io"
7
-	"net/http"
8
-	"net/url"
9
-	"sort"
10
-
11
-	"github.com/moby/moby/api/pkg/stdcopy"
12
-	"github.com/moby/moby/v2/daemon/internal/stdcopymux"
13
-	"github.com/moby/moby/v2/daemon/server/backend"
14
-	"github.com/moby/moby/v2/pkg/ioutils"
15
-)
16
-
17
-// rfc3339NanoFixed is time.RFC3339Nano with nanoseconds padded using zeros to
18
-// ensure the formatted time isalways the same number of characters.
19
-const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
20
-
21
-// WriteLogStream writes an encoded byte stream of log messages from the
22
-// messages channel, multiplexing them with a stdcopy.Writer if mux is true
23
-func WriteLogStream(_ context.Context, w http.ResponseWriter, msgs <-chan *backend.LogMessage, config *backend.ContainerLogsOptions, mux bool) {
24
-	// See https://github.com/moby/moby/issues/47448
25
-	// Trigger headers to be written immediately.
26
-	w.WriteHeader(http.StatusOK)
27
-
28
-	wf := ioutils.NewWriteFlusher(w)
29
-	defer wf.Close()
30
-
31
-	wf.Flush()
32
-
33
-	outStream := io.Writer(wf)
34
-	errStream := outStream
35
-	sysErrStream := errStream
36
-	if mux {
37
-		sysErrStream = stdcopymux.NewStdWriter(outStream, stdcopy.Systemerr)
38
-		errStream = stdcopymux.NewStdWriter(outStream, stdcopy.Stderr)
39
-		outStream = stdcopymux.NewStdWriter(outStream, stdcopy.Stdout)
40
-	}
41
-
42
-	for {
43
-		msg, ok := <-msgs
44
-		if !ok {
45
-			return
46
-		}
47
-		// check if the message contains an error. if so, write that error
48
-		// and exit
49
-		if msg.Err != nil {
50
-			fmt.Fprintf(sysErrStream, "Error grabbing logs: %v\n", msg.Err)
51
-			continue
52
-		}
53
-		logLine := msg.Line
54
-		if config.Details {
55
-			logLine = append(attrsByteSlice(msg.Attrs), ' ')
56
-			logLine = append(logLine, msg.Line...)
57
-		}
58
-		if config.Timestamps {
59
-			logLine = append([]byte(msg.Timestamp.Format(rfc3339NanoFixed)+" "), logLine...)
60
-		}
61
-		if msg.Source == "stdout" && config.ShowStdout {
62
-			_, _ = outStream.Write(logLine)
63
-		}
64
-		if msg.Source == "stderr" && config.ShowStderr {
65
-			_, _ = errStream.Write(logLine)
66
-		}
67
-	}
68
-}
69
-
70
-type byKey []backend.LogAttr
71
-
72
-func (b byKey) Len() int           { return len(b) }
73
-func (b byKey) Less(i, j int) bool { return b[i].Key < b[j].Key }
74
-func (b byKey) Swap(i, j int)      { b[i], b[j] = b[j], b[i] }
75
-
76
-func attrsByteSlice(a []backend.LogAttr) []byte {
77
-	// Note this sorts "a" in-place. That is fine here - nothing else is
78
-	// going to use Attrs or care about the order.
79
-	sort.Sort(byKey(a))
80
-
81
-	var ret []byte
82
-	for i, pair := range a {
83
-		k, v := url.QueryEscape(pair.Key), url.QueryEscape(pair.Value)
84
-		ret = append(ret, []byte(k)...)
85
-		ret = append(ret, '=')
86
-		ret = append(ret, []byte(v)...)
87
-		if i != len(a)-1 {
88
-			ret = append(ret, ',')
89
-		}
90
-	}
91
-	return ret
92
-}
... ...
@@ -26,6 +26,7 @@ import (
26 26
 	"github.com/moby/moby/v2/daemon/server/backend"
27 27
 	"github.com/moby/moby/v2/daemon/server/httpstatus"
28 28
 	"github.com/moby/moby/v2/daemon/server/httputils"
29
+	"github.com/moby/moby/v2/daemon/server/httputils/logstream"
29 30
 	"github.com/moby/moby/v2/errdefs"
30 31
 	"github.com/moby/moby/v2/pkg/ioutils"
31 32
 	ocispec "github.com/opencontainers/image-spec/specs-go/v1"
... ...
@@ -217,7 +218,7 @@ func (c *containerRouter) getContainersLogs(ctx context.Context, w http.Response
217 217
 	// this is the point of no return for writing a response. once we call
218 218
 	// WriteLogStream, the response has been started and errors will be
219 219
 	// returned in band by WriteLogStream
220
-	httputils.WriteLogStream(ctx, w, msgs, logsConfig, !tty)
220
+	logstream.Write(ctx, w, msgs, logsConfig, !tty)
221 221
 	return nil
222 222
 }
223 223
 
... ...
@@ -10,6 +10,7 @@ import (
10 10
 	"github.com/moby/moby/v2/daemon/internal/versions"
11 11
 	"github.com/moby/moby/v2/daemon/server/backend"
12 12
 	"github.com/moby/moby/v2/daemon/server/httputils"
13
+	"github.com/moby/moby/v2/daemon/server/httputils/logstream"
13 14
 )
14 15
 
15 16
 // swarmLogs takes an http response, request, and selector, and writes the logs
... ...
@@ -67,7 +68,7 @@ func (sr *swarmRouter) swarmLogs(ctx context.Context, w http.ResponseWriter, r *
67 67
 		contentType = basictypes.MediaTypeMultiplexedStream
68 68
 	}
69 69
 	w.Header().Set("Content-Type", contentType)
70
-	httputils.WriteLogStream(ctx, w, msgs, logsConfig, !tty)
70
+	logstream.Write(ctx, w, msgs, logsConfig, !tty)
71 71
 	return nil
72 72
 }
73 73