Browse code

client/pkg/jsonmessage: use api message def'ns

Signed-off-by: Cory Snider <csnider@mirantis.com>

Cory Snider authored on 2025/10/10 08:19:45
Showing 17 changed files
... ...
@@ -9,13 +9,13 @@ import (
9 9
 
10 10
 	cerrdefs "github.com/containerd/errdefs"
11 11
 	"github.com/distribution/reference"
12
+	"github.com/moby/moby/api/types/jsonstream"
12 13
 	"github.com/moby/moby/client/internal"
13
-	"github.com/moby/moby/client/pkg/jsonmessage"
14 14
 )
15 15
 
16 16
 type ImagePullResponse interface {
17 17
 	io.ReadCloser
18
-	JSONMessages(ctx context.Context) iter.Seq2[jsonmessage.JSONMessage, error]
18
+	JSONMessages(ctx context.Context) iter.Seq2[jsonstream.Message, error]
19 19
 	Wait(ctx context.Context) error
20 20
 }
21 21
 
... ...
@@ -10,9 +10,9 @@ import (
10 10
 	"time"
11 11
 
12 12
 	cerrdefs "github.com/containerd/errdefs"
13
+	"github.com/moby/moby/api/types/jsonstream"
13 14
 	"github.com/moby/moby/api/types/registry"
14 15
 	"github.com/moby/moby/client/internal"
15
-	"github.com/moby/moby/client/pkg/jsonmessage"
16 16
 	"gotest.tools/v3/assert"
17 17
 	is "gotest.tools/v3/assert/cmp"
18 18
 )
... ...
@@ -193,7 +193,7 @@ func TestImagePullResponse(t *testing.T) {
193 193
 	response := internal.NewJSONMessageStream(r)
194 194
 	ctx, cancel := context.WithCancel(t.Context())
195 195
 	messages := response.JSONMessages(ctx)
196
-	c := make(chan jsonmessage.JSONMessage)
196
+	c := make(chan jsonstream.Message)
197 197
 	go func() {
198 198
 		for message, err := range messages {
199 199
 			if err != nil {
... ...
@@ -12,14 +12,14 @@ import (
12 12
 
13 13
 	cerrdefs "github.com/containerd/errdefs"
14 14
 	"github.com/distribution/reference"
15
+	"github.com/moby/moby/api/types/jsonstream"
15 16
 	"github.com/moby/moby/api/types/registry"
16 17
 	"github.com/moby/moby/client/internal"
17
-	"github.com/moby/moby/client/pkg/jsonmessage"
18 18
 )
19 19
 
20 20
 type ImagePushResponse interface {
21 21
 	io.ReadCloser
22
-	JSONMessages(ctx context.Context) iter.Seq2[jsonmessage.JSONMessage, error]
22
+	JSONMessages(ctx context.Context) iter.Seq2[jsonstream.Message, error]
23 23
 	Wait(ctx context.Context) error
24 24
 }
25 25
 
... ...
@@ -8,7 +8,7 @@ import (
8 8
 	"iter"
9 9
 	"sync"
10 10
 
11
-	"github.com/moby/moby/client/pkg/jsonmessage"
11
+	"github.com/moby/moby/api/types/jsonstream"
12 12
 )
13 13
 
14 14
 func NewJSONMessageStream(rc io.ReadCloser) stream {
... ...
@@ -44,15 +44,15 @@ func (r stream) Close() error {
44 44
 
45 45
 // JSONMessages decodes the response stream as a sequence of JSONMessages.
46 46
 // if stream ends or context is cancelled, the underlying [io.Reader] is closed.
47
-func (r stream) JSONMessages(ctx context.Context) iter.Seq2[jsonmessage.JSONMessage, error] {
47
+func (r stream) JSONMessages(ctx context.Context) iter.Seq2[jsonstream.Message, error] {
48 48
 	context.AfterFunc(ctx, func() {
49 49
 		_ = r.Close()
50 50
 	})
51 51
 	dec := json.NewDecoder(r)
52
-	return func(yield func(jsonmessage.JSONMessage, error) bool) {
52
+	return func(yield func(jsonstream.Message, error) bool) {
53 53
 		defer r.Close()
54 54
 		for {
55
-			var jm jsonmessage.JSONMessage
55
+			var jm jsonstream.Message
56 56
 			err := dec.Decode(&jm)
57 57
 			if errors.Is(err, io.EOF) {
58 58
 				break
... ...
@@ -14,28 +14,14 @@ import (
14 14
 	"github.com/moby/term"
15 15
 )
16 16
 
17
+var timeNow = time.Now // For overriding in tests.
18
+
17 19
 // RFC3339NanoFixed is time.RFC3339Nano with nanoseconds padded using zeros to
18 20
 // ensure the formatted time isalways the same number of characters.
19 21
 const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
20 22
 
21
-// JSONProgress describes a progress message in a JSON stream.
22
-type JSONProgress struct {
23
-	jsonstream.Progress
24
-
25
-	// terminalFd is the fd of the current terminal, if any. It is used
26
-	// to get the terminal width.
27
-	terminalFd uintptr
28
-
29
-	// nowFunc is used to override the current time in tests.
30
-	nowFunc func() time.Time
31
-
32
-	// winSize is used to override the terminal width in tests.
33
-	winSize int
34
-}
35
-
36
-func (p *JSONProgress) String() string {
23
+func RenderTUIProgress(p jsonstream.Progress, width uint16) string {
37 24
 	var (
38
-		width      = p.width()
39 25
 		pbBox      string
40 26
 		numbersBox string
41 27
 	)
... ...
@@ -89,7 +75,7 @@ func (p *JSONProgress) String() string {
89 89
 	var timeLeftBox string
90 90
 	if width > 50 {
91 91
 		if p.Current > 0 && p.Start > 0 && percentage < 50 {
92
-			fromStart := p.now().Sub(time.Unix(p.Start, 0))
92
+			fromStart := timeNow().UTC().Sub(time.Unix(p.Start, 0))
93 93
 			perEntry := fromStart / time.Duration(p.Current)
94 94
 			left := time.Duration(p.Total-p.Current) * perEntry
95 95
 			timeLeftBox = " " + left.Round(time.Second).String()
... ...
@@ -98,40 +84,6 @@ func (p *JSONProgress) String() string {
98 98
 	return pbBox + numbersBox + timeLeftBox
99 99
 }
100 100
 
101
-// now returns the current time in UTC, but can be overridden in tests
102
-// by setting JSONProgress.nowFunc to a custom function.
103
-func (p *JSONProgress) now() time.Time {
104
-	if p.nowFunc != nil {
105
-		return p.nowFunc()
106
-	}
107
-	return time.Now().UTC()
108
-}
109
-
110
-// width returns the current terminal's width, but can be overridden
111
-// in tests by setting JSONProgress.winSize to a non-zero value.
112
-func (p *JSONProgress) width() int {
113
-	if p.winSize != 0 {
114
-		return p.winSize
115
-	}
116
-	ws, err := term.GetWinsize(p.terminalFd)
117
-	if err == nil {
118
-		return int(ws.Width)
119
-	}
120
-	return 200
121
-}
122
-
123
-// JSONMessage defines a message struct. It describes
124
-// the created time, where it from, status, ID of the
125
-// message. It's used for docker events.
126
-type JSONMessage struct {
127
-	Stream   string            `json:"stream,omitempty"`
128
-	Status   string            `json:"status,omitempty"`
129
-	Progress *JSONProgress     `json:"progressDetail,omitempty"`
130
-	ID       string            `json:"id,omitempty"`
131
-	Error    *jsonstream.Error `json:"errorDetail,omitempty"`
132
-	Aux      *json.RawMessage  `json:"aux,omitempty"` // Aux contains out-of-band data, such as digests for push signing and image id after building.
133
-}
134
-
135 101
 // We can probably use [aec.EmptyBuilder] for managing the output, but
136 102
 // currently we're doing it all manually, so defining some consts for
137 103
 // the basics we use.
... ...
@@ -164,7 +116,7 @@ func cursorDown(out io.Writer, l uint) {
164 164
 // Display prints the JSONMessage to out. If isTerminal is true, it erases
165 165
 // the entire current line when displaying the progressbar. It returns an
166 166
 // error if the [JSONMessage.Error] field is non-nil.
167
-func (jm *JSONMessage) Display(out io.Writer, isTerminal bool) error {
167
+func Display(jm jsonstream.Message, out io.Writer, isTerminal bool, width uint16) error {
168 168
 	if jm.Error != nil {
169 169
 		return jm.Error
170 170
 	}
... ...
@@ -173,14 +125,17 @@ func (jm *JSONMessage) Display(out io.Writer, isTerminal bool) error {
173 173
 		clearLine(out)
174 174
 		endl = "\r"
175 175
 		_, _ = fmt.Fprint(out, endl)
176
-	} else if jm.Progress != nil && jm.Progress.String() != "" { // disable progressbar in non-terminal
176
+	} else if jm.Progress != nil && (jm.Progress.Current > 0 || jm.Progress.Total > 0) { // disable progressbar in non-terminal
177 177
 		return nil
178 178
 	}
179 179
 	if jm.ID != "" {
180 180
 		_, _ = fmt.Fprintf(out, "%s: ", jm.ID)
181 181
 	}
182 182
 	if jm.Progress != nil && isTerminal {
183
-		_, _ = fmt.Fprintf(out, "%s %s%s", jm.Status, jm.Progress.String(), endl)
183
+		if width == 0 {
184
+			width = 200
185
+		}
186
+		_, _ = fmt.Fprintf(out, "%s %s%s", jm.Status, RenderTUIProgress(*jm.Progress, width), endl)
184 187
 	} else if jm.Stream != "" {
185 188
 		_, _ = fmt.Fprintf(out, "%s%s", jm.Stream, endl)
186 189
 	} else {
... ...
@@ -189,16 +144,16 @@ func (jm *JSONMessage) Display(out io.Writer, isTerminal bool) error {
189 189
 	return nil
190 190
 }
191 191
 
192
-type JSONMessagesStream iter.Seq2[JSONMessage, error]
192
+type JSONMessagesStream iter.Seq2[jsonstream.Message, error]
193 193
 
194 194
 // DisplayJSONMessagesStream reads a JSON message stream from in, and writes
195 195
 // each [JSONMessage] to out.
196 196
 // see DisplayJSONMessages for details
197
-func DisplayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(JSONMessage)) error {
197
+func DisplayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(jsonstream.Message)) error {
198 198
 	dec := json.NewDecoder(in)
199
-	var f JSONMessagesStream = func(yield func(JSONMessage, error) bool) {
199
+	var f JSONMessagesStream = func(yield func(jsonstream.Message, error) bool) {
200 200
 		for {
201
-			var jm JSONMessage
201
+			var jm jsonstream.Message
202 202
 			err := dec.Decode(&jm)
203 203
 			if errors.Is(err, io.EOF) {
204 204
 				break
... ...
@@ -228,8 +183,15 @@ func DisplayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr,
228 228
 //   - auxCallback allows handling the [JSONMessage.Aux] field. It is
229 229
 //     called if a JSONMessage contains an Aux field, in which case
230 230
 //     DisplayJSONMessagesStream does not present the JSONMessage.
231
-func DisplayJSONMessages(messages JSONMessagesStream, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(JSONMessage)) error {
231
+func DisplayJSONMessages(messages JSONMessagesStream, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(jsonstream.Message)) error {
232 232
 	ids := make(map[string]uint)
233
+	var width uint16 = 200
234
+	if isTerminal {
235
+		ws, err := term.GetWinsize(terminalFd)
236
+		if err == nil {
237
+			width = ws.Width
238
+		}
239
+	}
233 240
 
234 241
 	for jm, err := range messages {
235 242
 		var diff uint
... ...
@@ -244,9 +206,6 @@ func DisplayJSONMessages(messages JSONMessagesStream, out io.Writer, terminalFd
244 244
 			continue
245 245
 		}
246 246
 
247
-		if jm.Progress != nil {
248
-			jm.Progress.terminalFd = terminalFd
249
-		}
250 247
 		if jm.ID != "" && jm.Progress != nil {
251 248
 			line, ok := ids[jm.ID]
252 249
 			if !ok {
... ...
@@ -274,7 +233,7 @@ func DisplayJSONMessages(messages JSONMessagesStream, out io.Writer, terminalFd
274 274
 			// with multiple tags).
275 275
 			ids = make(map[string]uint)
276 276
 		}
277
-		err := jm.Display(out, isTerminal)
277
+		err := Display(jm, out, isTerminal, width)
278 278
 		if jm.ID != "" && isTerminal {
279 279
 			cursorDown(out, diff)
280 280
 		}
... ...
@@ -32,27 +32,26 @@ func TestProgressString(t *testing.T) {
32 32
 
33 33
 	testcases := []struct {
34 34
 		name     string
35
-		progress JSONProgress
35
+		progress jsonstream.Progress
36 36
 		expected expected
37
+		nowFunc  func() time.Time
37 38
 	}{
38 39
 		{
39 40
 			name: "no progress",
40 41
 		},
41 42
 		{
42 43
 			name:     "progress 1",
43
-			progress: JSONProgress{Progress: jsonstream.Progress{Current: 1}},
44
+			progress: jsonstream.Progress{Current: 1},
44 45
 			expected: shortAndLong("      1B", "      1B"),
45 46
 		},
46 47
 		{
47 48
 			name: "some progress with a start time",
48
-			progress: JSONProgress{
49
-				Progress: jsonstream.Progress{
50
-					Current: 20,
51
-					Total:   100,
52
-					Start:   start.Unix(),
53
-				},
54
-				nowFunc: timeAfter(time.Second),
49
+			progress: jsonstream.Progress{
50
+				Current: 20,
51
+				Total:   100,
52
+				Start:   start.Unix(),
55 53
 			},
54
+			nowFunc: timeAfter(time.Second),
56 55
 			expected: shortAndLong(
57 56
 				"     20B/100B 4s",
58 57
 				"[==========>                                        ]      20B/100B 4s",
... ...
@@ -60,7 +59,7 @@ func TestProgressString(t *testing.T) {
60 60
 		},
61 61
 		{
62 62
 			name:     "some progress without a start time",
63
-			progress: JSONProgress{Progress: jsonstream.Progress{Current: 50, Total: 100}},
63
+			progress: jsonstream.Progress{Current: 50, Total: 100},
64 64
 			expected: shortAndLong(
65 65
 				"     50B/100B",
66 66
 				"[=========================>                         ]      50B/100B",
... ...
@@ -68,7 +67,7 @@ func TestProgressString(t *testing.T) {
68 68
 		},
69 69
 		{
70 70
 			name:     "current more than total is not negative gh#7136",
71
-			progress: JSONProgress{Progress: jsonstream.Progress{Current: 50, Total: 40}},
71
+			progress: jsonstream.Progress{Current: 50, Total: 40},
72 72
 			expected: shortAndLong(
73 73
 				"     50B",
74 74
 				"[==================================================>]      50B",
... ...
@@ -76,7 +75,7 @@ func TestProgressString(t *testing.T) {
76 76
 		},
77 77
 		{
78 78
 			name:     "with units",
79
-			progress: JSONProgress{Progress: jsonstream.Progress{Current: 50, Total: 100, Units: "units"}},
79
+			progress: jsonstream.Progress{Current: 50, Total: 100, Units: "units"},
80 80
 			expected: shortAndLong(
81 81
 				"50/100 units",
82 82
 				"[=========================>                         ] 50/100 units",
... ...
@@ -84,7 +83,7 @@ func TestProgressString(t *testing.T) {
84 84
 		},
85 85
 		{
86 86
 			name:     "current more than total with units is not negative ",
87
-			progress: JSONProgress{Progress: jsonstream.Progress{Current: 50, Total: 40, Units: "units"}},
87
+			progress: jsonstream.Progress{Current: 50, Total: 40, Units: "units"},
88 88
 			expected: shortAndLong(
89 89
 				"50 units",
90 90
 				"[==================================================>] 50 units",
... ...
@@ -92,7 +91,7 @@ func TestProgressString(t *testing.T) {
92 92
 		},
93 93
 		{
94 94
 			name:     "hide counts",
95
-			progress: JSONProgress{Progress: jsonstream.Progress{Current: 50, Total: 100, HideCounts: true}},
95
+			progress: jsonstream.Progress{Current: 50, Total: 100, HideCounts: true},
96 96
 			expected: shortAndLong(
97 97
 				"",
98 98
 				"[=========================>                         ] ",
... ...
@@ -102,17 +101,19 @@ func TestProgressString(t *testing.T) {
102 102
 
103 103
 	for _, testcase := range testcases {
104 104
 		t.Run(testcase.name, func(t *testing.T) {
105
-			testcase.progress.winSize = 100
106
-			assert.Equal(t, testcase.progress.String(), testcase.expected.short)
107
-
108
-			testcase.progress.winSize = 200
109
-			assert.Equal(t, testcase.progress.String(), testcase.expected.long)
105
+			if testcase.nowFunc != nil {
106
+				originalTimeNow := timeNow
107
+				timeNow = testcase.nowFunc
108
+				defer func() { timeNow = originalTimeNow }()
109
+			}
110
+			assert.Equal(t, RenderTUIProgress(testcase.progress, 100), testcase.expected.short)
111
+			assert.Equal(t, RenderTUIProgress(testcase.progress, 200), testcase.expected.long)
110 112
 		})
111 113
 	}
112 114
 }
113 115
 
114 116
 func TestJSONMessageDisplay(t *testing.T) {
115
-	messages := map[JSONMessage][]string{
117
+	messages := map[jsonstream.Message][]string{
116 118
 		// Empty
117 119
 		{}: {"\n", "\n"},
118 120
 		// Status
... ...
@@ -142,7 +143,7 @@ func TestJSONMessageDisplay(t *testing.T) {
142 142
 		{
143 143
 			Status:   "status",
144 144
 			Stream:   "",
145
-			Progress: &JSONProgress{Progress: jsonstream.Progress{Current: 1}},
145
+			Progress: &jsonstream.Progress{Current: 1},
146 146
 		}: {
147 147
 			"",
148 148
 			fmt.Sprintf("%c[2K\rstatus       1B\r", 27),
... ...
@@ -153,7 +154,7 @@ func TestJSONMessageDisplay(t *testing.T) {
153 153
 	for jsonMessage, expectedMessages := range messages {
154 154
 		// Without terminal
155 155
 		data := bytes.NewBuffer([]byte{})
156
-		if err := jsonMessage.Display(data, false); err != nil {
156
+		if err := Display(jsonMessage, data, false, 0); err != nil {
157 157
 			t.Fatal(err)
158 158
 		}
159 159
 		if data.String() != expectedMessages[0] {
... ...
@@ -161,7 +162,7 @@ func TestJSONMessageDisplay(t *testing.T) {
161 161
 		}
162 162
 		// With terminal
163 163
 		data = bytes.NewBuffer([]byte{})
164
-		if err := jsonMessage.Display(data, true); err != nil {
164
+		if err := Display(jsonMessage, data, true, 0); err != nil {
165 165
 			t.Fatal(err)
166 166
 		}
167 167
 		if data.String() != expectedMessages[1] {
... ...
@@ -173,15 +174,15 @@ func TestJSONMessageDisplay(t *testing.T) {
173 173
 // Test JSONMessage with an Error. It returns an error with the given text, not the meaning of the HTTP code.
174 174
 func TestJSONMessageDisplayWithJSONError(t *testing.T) {
175 175
 	data := bytes.NewBuffer([]byte{})
176
-	jsonMessage := JSONMessage{Error: &jsonstream.Error{Code: 404, Message: "Can't find it"}}
176
+	jsonMessage := jsonstream.Message{Error: &jsonstream.Error{Code: 404, Message: "Can't find it"}}
177 177
 
178
-	err := jsonMessage.Display(data, true)
178
+	err := Display(jsonMessage, data, true, 0)
179 179
 	if err == nil || err.Error() != "Can't find it" {
180 180
 		t.Fatalf("Expected a jsonstream.Error 404, got %q", err)
181 181
 	}
182 182
 
183
-	jsonMessage = JSONMessage{Error: &jsonstream.Error{Code: 401, Message: "Anything"}}
184
-	err = jsonMessage.Display(data, true)
183
+	jsonMessage = jsonstream.Message{Error: &jsonstream.Error{Code: 401, Message: "Anything"}}
184
+	err = Display(jsonMessage, data, true, 0)
185 185
 	assert.Check(t, is.Error(err, "Anything"))
186 186
 }
187 187
 
... ...
@@ -7,8 +7,8 @@ import (
7 7
 	"strings"
8 8
 	"testing"
9 9
 
10
+	"github.com/moby/moby/api/types/jsonstream"
10 11
 	"github.com/moby/moby/client"
11
-	"github.com/moby/moby/client/pkg/jsonmessage"
12 12
 	"github.com/moby/moby/v2/integration/internal/requirement"
13 13
 	"github.com/moby/moby/v2/internal/testutil"
14 14
 	"github.com/moby/moby/v2/internal/testutil/daemon"
... ...
@@ -23,7 +23,7 @@ func getCgroupFromBuildOutput(buildOutput io.Reader) (string, error) {
23 23
 
24 24
 	dec := json.NewDecoder(buildOutput)
25 25
 	for {
26
-		m := jsonmessage.JSONMessage{}
26
+		m := jsonstream.Message{}
27 27
 		err := dec.Decode(&m)
28 28
 		if err == io.EOF {
29 29
 			return "", nil
... ...
@@ -18,8 +18,8 @@ import (
18 18
 	"github.com/moby/moby/api/types/build"
19 19
 	"github.com/moby/moby/api/types/events"
20 20
 	"github.com/moby/moby/api/types/image"
21
+	"github.com/moby/moby/api/types/jsonstream"
21 22
 	"github.com/moby/moby/client"
22
-	"github.com/moby/moby/client/pkg/jsonmessage"
23 23
 	"github.com/moby/moby/v2/internal/testutil"
24 24
 	"github.com/moby/moby/v2/internal/testutil/fakecontext"
25 25
 	"gotest.tools/v3/assert"
... ...
@@ -127,7 +127,7 @@ func buildContainerIdsFilter(buildOutput io.Reader) (client.Filters, error) {
127 127
 
128 128
 	dec := json.NewDecoder(buildOutput)
129 129
 	for {
130
-		m := jsonmessage.JSONMessage{}
130
+		m := jsonstream.Message{}
131 131
 		err := dec.Decode(&m)
132 132
 		if err == io.EOF {
133 133
 			return filter, nil
... ...
@@ -811,7 +811,7 @@ func readBuildImageIDs(t *testing.T, rd io.Reader) string {
811 811
 	t.Helper()
812 812
 	decoder := json.NewDecoder(rd)
813 813
 	for {
814
-		var jm jsonmessage.JSONMessage
814
+		var jm jsonstream.Message
815 815
 		if err := decoder.Decode(&jm); err != nil {
816 816
 			if err == io.EOF {
817 817
 				break
... ...
@@ -15,6 +15,7 @@ import (
15 15
 	cerrdefs "github.com/containerd/errdefs"
16 16
 	"github.com/moby/go-archive"
17 17
 	"github.com/moby/moby/api/types/build"
18
+	"github.com/moby/moby/api/types/jsonstream"
18 19
 	"github.com/moby/moby/client"
19 20
 	"github.com/moby/moby/client/pkg/jsonmessage"
20 21
 	"github.com/moby/moby/v2/integration/internal/container"
... ...
@@ -217,7 +218,7 @@ func makeTestImage(ctx context.Context, t *testing.T) (imageID string) {
217 217
 	assert.NilError(t, err)
218 218
 	defer resp.Body.Close()
219 219
 
220
-	err = jsonmessage.DisplayJSONMessagesStream(resp.Body, io.Discard, 0, false, func(msg jsonmessage.JSONMessage) {
220
+	err = jsonmessage.DisplayJSONMessagesStream(resp.Body, io.Discard, 0, false, func(msg jsonstream.Message) {
221 221
 		var r build.Result
222 222
 		assert.NilError(t, json.Unmarshal(*msg.Aux, &r))
223 223
 		imageID = r.ID
... ...
@@ -292,7 +293,7 @@ func TestCopyFromContainer(t *testing.T) {
292 292
 	defer resp.Body.Close()
293 293
 
294 294
 	var imageID string
295
-	err = jsonmessage.DisplayJSONMessagesStream(resp.Body, io.Discard, 0, false, func(msg jsonmessage.JSONMessage) {
295
+	err = jsonmessage.DisplayJSONMessagesStream(resp.Body, io.Discard, 0, false, func(msg jsonstream.Message) {
296 296
 		var r build.Result
297 297
 		assert.NilError(t, json.Unmarshal(*msg.Aux, &r))
298 298
 		imageID = r.ID
... ...
@@ -5,8 +5,8 @@ import (
5 5
 	"strings"
6 6
 	"testing"
7 7
 
8
+	"github.com/moby/moby/api/types/jsonstream"
8 9
 	"github.com/moby/moby/client"
9
-	"github.com/moby/moby/client/pkg/jsonmessage"
10 10
 	"github.com/moby/moby/v2/integration/internal/container"
11 11
 	"github.com/moby/moby/v2/internal/testutil"
12 12
 	"github.com/moby/moby/v2/internal/testutil/daemon"
... ...
@@ -39,7 +39,7 @@ func TestExportContainerAndImportImage(t *testing.T) {
39 39
 	// the image ID and match with the output from `docker images`.
40 40
 
41 41
 	dec := json.NewDecoder(importRes)
42
-	var jm jsonmessage.JSONMessage
42
+	var jm jsonstream.Message
43 43
 	err = dec.Decode(&jm)
44 44
 	assert.NilError(t, err)
45 45
 
... ...
@@ -10,6 +10,7 @@ import (
10 10
 	"github.com/containerd/containerd/v2/pkg/protobuf/proto"
11 11
 	controlapi "github.com/moby/buildkit/api/services/control"
12 12
 	"github.com/moby/moby/api/types/build"
13
+	"github.com/moby/moby/api/types/jsonstream"
13 14
 	"github.com/moby/moby/client"
14 15
 	"github.com/moby/moby/client/pkg/jsonmessage"
15 16
 	"github.com/moby/moby/v2/internal/testutil/fakecontext"
... ...
@@ -36,7 +37,7 @@ func GetImageIDFromBody(t *testing.T, body io.Reader) string {
36 36
 	buf := bytes.NewBuffer(nil)
37 37
 	dec := json.NewDecoder(body)
38 38
 	for {
39
-		var jm jsonmessage.JSONMessage
39
+		var jm jsonstream.Message
40 40
 		err := dec.Decode(&jm)
41 41
 		if err == io.EOF {
42 42
 			break
... ...
@@ -48,7 +49,7 @@ func GetImageIDFromBody(t *testing.T, body io.Reader) string {
48 48
 		}
49 49
 
50 50
 		buf.Reset()
51
-		jm.Display(buf, false)
51
+		jsonmessage.Display(jm, buf, false, 0)
52 52
 		if buf.Len() == 0 {
53 53
 			continue
54 54
 		}
... ...
@@ -76,7 +77,7 @@ func GetImageIDFromBody(t *testing.T, body io.Reader) string {
76 76
 	return id
77 77
 }
78 78
 
79
-func processBuildkitAux(t *testing.T, jm *jsonmessage.JSONMessage, id *string) bool {
79
+func processBuildkitAux(t *testing.T, jm *jsonstream.Message, id *string) bool {
80 80
 	if jm.ID == "moby.buildkit.trace" {
81 81
 		var dt []byte
82 82
 		if err := json.Unmarshal(*jm.Aux, &dt); err != nil {
... ...
@@ -10,8 +10,8 @@ import (
10 10
 	"testing"
11 11
 
12 12
 	"github.com/moby/go-archive"
13
+	"github.com/moby/moby/api/types/jsonstream"
13 14
 	"github.com/moby/moby/client"
14
-	"github.com/moby/moby/client/pkg/jsonmessage"
15 15
 	"github.com/moby/moby/v2/internal/testutil/specialimage"
16 16
 	"gotest.tools/v3/assert"
17 17
 )
... ...
@@ -46,7 +46,7 @@ func Load(ctx context.Context, t *testing.T, apiClient client.APIClient, imageFu
46 46
 
47 47
 	decoder := json.NewDecoder(bytes.NewReader(all))
48 48
 	for {
49
-		var msg jsonmessage.JSONMessage
49
+		var msg jsonstream.Message
50 50
 		err := decoder.Decode(&msg)
51 51
 		if errors.Is(err, io.EOF) {
52 52
 			break
... ...
@@ -16,6 +16,7 @@ import (
16 16
 	c8dimages "github.com/containerd/containerd/v2/core/images"
17 17
 	"github.com/containerd/containerd/v2/core/remotes/docker"
18 18
 	"github.com/moby/moby/api/types"
19
+	"github.com/moby/moby/api/types/jsonstream"
19 20
 	registrytypes "github.com/moby/moby/api/types/registry"
20 21
 	"github.com/moby/moby/api/types/system"
21 22
 	"github.com/moby/moby/client"
... ...
@@ -142,7 +143,7 @@ func TestPluginInstall(t *testing.T) {
142 142
 		buf := &strings.Builder{}
143 143
 		assert.NilError(t, err)
144 144
 		var digest string
145
-		assert.NilError(t, jsonmessage.DisplayJSONMessagesStream(pushResult, buf, 0, false, func(j jsonmessage.JSONMessage) {
145
+		assert.NilError(t, jsonmessage.DisplayJSONMessagesStream(pushResult, buf, 0, false, func(j jsonstream.Message) {
146 146
 			if j.Aux != nil {
147 147
 				var r types.PushResult
148 148
 				assert.NilError(t, json.Unmarshal(*j.Aux, &r))
... ...
@@ -9,13 +9,13 @@ import (
9 9
 
10 10
 	cerrdefs "github.com/containerd/errdefs"
11 11
 	"github.com/distribution/reference"
12
+	"github.com/moby/moby/api/types/jsonstream"
12 13
 	"github.com/moby/moby/client/internal"
13
-	"github.com/moby/moby/client/pkg/jsonmessage"
14 14
 )
15 15
 
16 16
 type ImagePullResponse interface {
17 17
 	io.ReadCloser
18
-	JSONMessages(ctx context.Context) iter.Seq2[jsonmessage.JSONMessage, error]
18
+	JSONMessages(ctx context.Context) iter.Seq2[jsonstream.Message, error]
19 19
 	Wait(ctx context.Context) error
20 20
 }
21 21
 
... ...
@@ -12,14 +12,14 @@ import (
12 12
 
13 13
 	cerrdefs "github.com/containerd/errdefs"
14 14
 	"github.com/distribution/reference"
15
+	"github.com/moby/moby/api/types/jsonstream"
15 16
 	"github.com/moby/moby/api/types/registry"
16 17
 	"github.com/moby/moby/client/internal"
17
-	"github.com/moby/moby/client/pkg/jsonmessage"
18 18
 )
19 19
 
20 20
 type ImagePushResponse interface {
21 21
 	io.ReadCloser
22
-	JSONMessages(ctx context.Context) iter.Seq2[jsonmessage.JSONMessage, error]
22
+	JSONMessages(ctx context.Context) iter.Seq2[jsonstream.Message, error]
23 23
 	Wait(ctx context.Context) error
24 24
 }
25 25
 
... ...
@@ -8,7 +8,7 @@ import (
8 8
 	"iter"
9 9
 	"sync"
10 10
 
11
-	"github.com/moby/moby/client/pkg/jsonmessage"
11
+	"github.com/moby/moby/api/types/jsonstream"
12 12
 )
13 13
 
14 14
 func NewJSONMessageStream(rc io.ReadCloser) stream {
... ...
@@ -44,15 +44,15 @@ func (r stream) Close() error {
44 44
 
45 45
 // JSONMessages decodes the response stream as a sequence of JSONMessages.
46 46
 // if stream ends or context is cancelled, the underlying [io.Reader] is closed.
47
-func (r stream) JSONMessages(ctx context.Context) iter.Seq2[jsonmessage.JSONMessage, error] {
47
+func (r stream) JSONMessages(ctx context.Context) iter.Seq2[jsonstream.Message, error] {
48 48
 	context.AfterFunc(ctx, func() {
49 49
 		_ = r.Close()
50 50
 	})
51 51
 	dec := json.NewDecoder(r)
52
-	return func(yield func(jsonmessage.JSONMessage, error) bool) {
52
+	return func(yield func(jsonstream.Message, error) bool) {
53 53
 		defer r.Close()
54 54
 		for {
55
-			var jm jsonmessage.JSONMessage
55
+			var jm jsonstream.Message
56 56
 			err := dec.Decode(&jm)
57 57
 			if errors.Is(err, io.EOF) {
58 58
 				break
... ...
@@ -14,28 +14,14 @@ import (
14 14
 	"github.com/moby/term"
15 15
 )
16 16
 
17
+var timeNow = time.Now // For overriding in tests.
18
+
17 19
 // RFC3339NanoFixed is time.RFC3339Nano with nanoseconds padded using zeros to
18 20
 // ensure the formatted time isalways the same number of characters.
19 21
 const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
20 22
 
21
-// JSONProgress describes a progress message in a JSON stream.
22
-type JSONProgress struct {
23
-	jsonstream.Progress
24
-
25
-	// terminalFd is the fd of the current terminal, if any. It is used
26
-	// to get the terminal width.
27
-	terminalFd uintptr
28
-
29
-	// nowFunc is used to override the current time in tests.
30
-	nowFunc func() time.Time
31
-
32
-	// winSize is used to override the terminal width in tests.
33
-	winSize int
34
-}
35
-
36
-func (p *JSONProgress) String() string {
23
+func RenderTUIProgress(p jsonstream.Progress, width uint16) string {
37 24
 	var (
38
-		width      = p.width()
39 25
 		pbBox      string
40 26
 		numbersBox string
41 27
 	)
... ...
@@ -89,7 +75,7 @@ func (p *JSONProgress) String() string {
89 89
 	var timeLeftBox string
90 90
 	if width > 50 {
91 91
 		if p.Current > 0 && p.Start > 0 && percentage < 50 {
92
-			fromStart := p.now().Sub(time.Unix(p.Start, 0))
92
+			fromStart := timeNow().UTC().Sub(time.Unix(p.Start, 0))
93 93
 			perEntry := fromStart / time.Duration(p.Current)
94 94
 			left := time.Duration(p.Total-p.Current) * perEntry
95 95
 			timeLeftBox = " " + left.Round(time.Second).String()
... ...
@@ -98,40 +84,6 @@ func (p *JSONProgress) String() string {
98 98
 	return pbBox + numbersBox + timeLeftBox
99 99
 }
100 100
 
101
-// now returns the current time in UTC, but can be overridden in tests
102
-// by setting JSONProgress.nowFunc to a custom function.
103
-func (p *JSONProgress) now() time.Time {
104
-	if p.nowFunc != nil {
105
-		return p.nowFunc()
106
-	}
107
-	return time.Now().UTC()
108
-}
109
-
110
-// width returns the current terminal's width, but can be overridden
111
-// in tests by setting JSONProgress.winSize to a non-zero value.
112
-func (p *JSONProgress) width() int {
113
-	if p.winSize != 0 {
114
-		return p.winSize
115
-	}
116
-	ws, err := term.GetWinsize(p.terminalFd)
117
-	if err == nil {
118
-		return int(ws.Width)
119
-	}
120
-	return 200
121
-}
122
-
123
-// JSONMessage defines a message struct. It describes
124
-// the created time, where it from, status, ID of the
125
-// message. It's used for docker events.
126
-type JSONMessage struct {
127
-	Stream   string            `json:"stream,omitempty"`
128
-	Status   string            `json:"status,omitempty"`
129
-	Progress *JSONProgress     `json:"progressDetail,omitempty"`
130
-	ID       string            `json:"id,omitempty"`
131
-	Error    *jsonstream.Error `json:"errorDetail,omitempty"`
132
-	Aux      *json.RawMessage  `json:"aux,omitempty"` // Aux contains out-of-band data, such as digests for push signing and image id after building.
133
-}
134
-
135 101
 // We can probably use [aec.EmptyBuilder] for managing the output, but
136 102
 // currently we're doing it all manually, so defining some consts for
137 103
 // the basics we use.
... ...
@@ -164,7 +116,7 @@ func cursorDown(out io.Writer, l uint) {
164 164
 // Display prints the JSONMessage to out. If isTerminal is true, it erases
165 165
 // the entire current line when displaying the progressbar. It returns an
166 166
 // error if the [JSONMessage.Error] field is non-nil.
167
-func (jm *JSONMessage) Display(out io.Writer, isTerminal bool) error {
167
+func Display(jm jsonstream.Message, out io.Writer, isTerminal bool, width uint16) error {
168 168
 	if jm.Error != nil {
169 169
 		return jm.Error
170 170
 	}
... ...
@@ -173,14 +125,17 @@ func (jm *JSONMessage) Display(out io.Writer, isTerminal bool) error {
173 173
 		clearLine(out)
174 174
 		endl = "\r"
175 175
 		_, _ = fmt.Fprint(out, endl)
176
-	} else if jm.Progress != nil && jm.Progress.String() != "" { // disable progressbar in non-terminal
176
+	} else if jm.Progress != nil && (jm.Progress.Current > 0 || jm.Progress.Total > 0) { // disable progressbar in non-terminal
177 177
 		return nil
178 178
 	}
179 179
 	if jm.ID != "" {
180 180
 		_, _ = fmt.Fprintf(out, "%s: ", jm.ID)
181 181
 	}
182 182
 	if jm.Progress != nil && isTerminal {
183
-		_, _ = fmt.Fprintf(out, "%s %s%s", jm.Status, jm.Progress.String(), endl)
183
+		if width == 0 {
184
+			width = 200
185
+		}
186
+		_, _ = fmt.Fprintf(out, "%s %s%s", jm.Status, RenderTUIProgress(*jm.Progress, width), endl)
184 187
 	} else if jm.Stream != "" {
185 188
 		_, _ = fmt.Fprintf(out, "%s%s", jm.Stream, endl)
186 189
 	} else {
... ...
@@ -189,16 +144,16 @@ func (jm *JSONMessage) Display(out io.Writer, isTerminal bool) error {
189 189
 	return nil
190 190
 }
191 191
 
192
-type JSONMessagesStream iter.Seq2[JSONMessage, error]
192
+type JSONMessagesStream iter.Seq2[jsonstream.Message, error]
193 193
 
194 194
 // DisplayJSONMessagesStream reads a JSON message stream from in, and writes
195 195
 // each [JSONMessage] to out.
196 196
 // see DisplayJSONMessages for details
197
-func DisplayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(JSONMessage)) error {
197
+func DisplayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(jsonstream.Message)) error {
198 198
 	dec := json.NewDecoder(in)
199
-	var f JSONMessagesStream = func(yield func(JSONMessage, error) bool) {
199
+	var f JSONMessagesStream = func(yield func(jsonstream.Message, error) bool) {
200 200
 		for {
201
-			var jm JSONMessage
201
+			var jm jsonstream.Message
202 202
 			err := dec.Decode(&jm)
203 203
 			if errors.Is(err, io.EOF) {
204 204
 				break
... ...
@@ -228,8 +183,15 @@ func DisplayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr,
228 228
 //   - auxCallback allows handling the [JSONMessage.Aux] field. It is
229 229
 //     called if a JSONMessage contains an Aux field, in which case
230 230
 //     DisplayJSONMessagesStream does not present the JSONMessage.
231
-func DisplayJSONMessages(messages JSONMessagesStream, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(JSONMessage)) error {
231
+func DisplayJSONMessages(messages JSONMessagesStream, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(jsonstream.Message)) error {
232 232
 	ids := make(map[string]uint)
233
+	var width uint16 = 200
234
+	if isTerminal {
235
+		ws, err := term.GetWinsize(terminalFd)
236
+		if err == nil {
237
+			width = ws.Width
238
+		}
239
+	}
233 240
 
234 241
 	for jm, err := range messages {
235 242
 		var diff uint
... ...
@@ -244,9 +206,6 @@ func DisplayJSONMessages(messages JSONMessagesStream, out io.Writer, terminalFd
244 244
 			continue
245 245
 		}
246 246
 
247
-		if jm.Progress != nil {
248
-			jm.Progress.terminalFd = terminalFd
249
-		}
250 247
 		if jm.ID != "" && jm.Progress != nil {
251 248
 			line, ok := ids[jm.ID]
252 249
 			if !ok {
... ...
@@ -274,7 +233,7 @@ func DisplayJSONMessages(messages JSONMessagesStream, out io.Writer, terminalFd
274 274
 			// with multiple tags).
275 275
 			ids = make(map[string]uint)
276 276
 		}
277
-		err := jm.Display(out, isTerminal)
277
+		err := Display(jm, out, isTerminal, width)
278 278
 		if jm.ID != "" && isTerminal {
279 279
 			cursorDown(out, diff)
280 280
 		}