Browse code

c8d/pull: Show `Extracting` layer status

Before this patch, pull progress wouldn't show the `Extracting` layer
status which made the pull look like it got stuck when extracting a big
layer.

Use the `containerd.io/snapshot/cri.layer-digest` snapshot labels to
find a corresponding snapshot and check whether it's `active` or
`committed` to set the layer status accordingly.

Despite the `cri.` component in the label name, it's not CRI specific -
it only depends on the `snapshotters.AppendInfoHandlerWrapper`.

We _could_ also use the `Usage` snapshot method to query the exact
progress of the unpack, but it would be too expensive as the
implementation time complexity will be proportional to the snapshot size.

Signed-off-by: Paweł Gronowski <pawel.gronowski@docker.com>

Paweł Gronowski authored on 2024/12/11 19:32:28
Showing 3 changed files
... ...
@@ -125,7 +125,11 @@ func (i *ImageService) pullTag(ctx context.Context, ref reference.Named, platfor
125 125
 	})
126 126
 	opts = append(opts, containerd.WithImageHandler(h))
127 127
 
128
-	pp := pullProgress{store: i.content, showExists: true}
128
+	pp := &pullProgress{
129
+		store:       i.content,
130
+		snapshotter: i.snapshotterService(i.snapshotter),
131
+		showExists:  true,
132
+	}
129 133
 	finishProgress := jobs.showProgress(ctx, out, pp)
130 134
 
131 135
 	defer func() {
... ...
@@ -195,6 +199,7 @@ func (i *ImageService) pullTag(ctx context.Context, ref reference.Named, platfor
195 195
 
196 196
 	// AppendInfoHandlerWrapper will annotate the image with basic information like manifest and layer digests as labels;
197 197
 	// this information is used to enable remote snapshotters like nydus and stargz to query a registry.
198
+	// This is also needed for the pull progress to detect the `Extracting` status.
198 199
 	infoHandler := snapshotters.AppendInfoHandlerWrapper(ref.String())
199 200
 	opts = append(opts, containerd.WithImageHandlerWrapper(infoHandler))
200 201
 
... ...
@@ -121,7 +121,7 @@ func (i *ImageService) pushRef(ctx context.Context, targetRef reference.Named, p
121 121
 	jobsQueue := newJobs()
122 122
 	finishProgress := jobsQueue.showProgress(ctx, out, combinedProgress([]progressUpdater{
123 123
 		&pp,
124
-		pullProgress{showExists: false, store: store},
124
+		&pullProgress{showExists: false, store: store},
125 125
 	}))
126 126
 	defer func() {
127 127
 		finishProgress()
... ...
@@ -3,14 +3,17 @@ package containerd
3 3
 import (
4 4
 	"context"
5 5
 	"errors"
6
+	"sort"
6 7
 	"sync"
7 8
 	"sync/atomic"
8 9
 	"time"
9 10
 
10 11
 	"github.com/containerd/containerd/content"
11 12
 	"github.com/containerd/containerd/images"
13
+	"github.com/containerd/containerd/pkg/snapshotters"
12 14
 	"github.com/containerd/containerd/remotes"
13 15
 	"github.com/containerd/containerd/remotes/docker"
16
+	"github.com/containerd/containerd/snapshots"
14 17
 	cerrdefs "github.com/containerd/errdefs"
15 18
 	"github.com/containerd/log"
16 19
 	"github.com/distribution/reference"
... ...
@@ -107,12 +110,15 @@ func (j *jobs) Jobs() []ocispec.Descriptor {
107 107
 }
108 108
 
109 109
 type pullProgress struct {
110
-	store      content.Store
111
-	showExists bool
112
-	hideLayers bool
110
+	store       content.Store
111
+	showExists  bool
112
+	hideLayers  bool
113
+	snapshotter snapshots.Snapshotter
114
+	layers      []ocispec.Descriptor
115
+	unpackStart map[digest.Digest]time.Time
113 116
 }
114 117
 
115
-func (p pullProgress) UpdateProgress(ctx context.Context, ongoing *jobs, out progress.Output, start time.Time) error {
118
+func (p *pullProgress) UpdateProgress(ctx context.Context, ongoing *jobs, out progress.Output, start time.Time) error {
116 119
 	actives, err := p.store.ListStatuses(ctx, "")
117 120
 	if err != nil {
118 121
 		if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
... ...
@@ -157,22 +163,85 @@ func (p pullProgress) UpdateProgress(ctx context.Context, ongoing *jobs, out pro
157 157
 				ID:         stringid.TruncateID(j.Digest.Encoded()),
158 158
 				Action:     "Download complete",
159 159
 				HideCounts: true,
160
-				LastUpdate: true,
161 160
 			})
161
+			p.finished(ctx, out, j)
162 162
 			ongoing.Remove(j)
163 163
 		} else if p.showExists {
164 164
 			out.WriteProgress(progress.Progress{
165 165
 				ID:         stringid.TruncateID(j.Digest.Encoded()),
166 166
 				Action:     "Already exists",
167 167
 				HideCounts: true,
168
-				LastUpdate: true,
169 168
 			})
169
+			p.finished(ctx, out, j)
170 170
 			ongoing.Remove(j)
171 171
 		}
172 172
 	}
173
+
174
+	var committedIdx []int
175
+	for idx, desc := range p.layers {
176
+		// Find the snapshot corresponding to this layer
177
+		walkFilter := "labels.\"" + snapshotters.TargetLayerDigestLabel + "\"==" + p.layers[idx].Digest.String()
178
+
179
+		err := p.snapshotter.Walk(ctx, func(ctx context.Context, sn snapshots.Info) error {
180
+			if sn.Kind == snapshots.KindActive {
181
+				if p.unpackStart == nil {
182
+					p.unpackStart = make(map[digest.Digest]time.Time)
183
+				}
184
+				var seconds int64
185
+				if began, ok := p.unpackStart[desc.Digest]; !ok {
186
+					p.unpackStart[desc.Digest] = time.Now()
187
+				} else {
188
+					seconds = int64(time.Since(began).Seconds())
189
+				}
190
+
191
+				// We _could_ get the current size of snapshot by calling Usage, but this is too expensive
192
+				// and could impact performance. So we just show the "Extracting" message with the elapsed time as progress.
193
+				out.WriteProgress(
194
+					progress.Progress{
195
+						ID:     stringid.TruncateID(desc.Digest.Encoded()),
196
+						Action: "Extracting",
197
+						// Start from 1s, because without Total, 0 won't be shown at all.
198
+						Current: 1 + seconds,
199
+						Units:   "s",
200
+					})
201
+				return nil
202
+			}
203
+
204
+			if sn.Kind == snapshots.KindCommitted {
205
+				out.WriteProgress(progress.Progress{
206
+					ID:         stringid.TruncateID(desc.Digest.Encoded()),
207
+					Action:     "Pull complete",
208
+					HideCounts: true,
209
+					LastUpdate: true,
210
+				})
211
+
212
+				committedIdx = append(committedIdx, idx)
213
+				return nil
214
+			}
215
+			return nil
216
+		}, walkFilter)
217
+		if err != nil {
218
+			return err
219
+		}
220
+	}
221
+
222
+	// Remove finished/committed layers from p.layers
223
+	if len(committedIdx) > 0 {
224
+		sort.Ints(committedIdx)
225
+		for i := len(committedIdx) - 1; i >= 0; i-- {
226
+			p.layers = append(p.layers[:committedIdx[i]], p.layers[committedIdx[i]+1:]...)
227
+		}
228
+	}
229
+
173 230
 	return nil
174 231
 }
175 232
 
233
+func (p *pullProgress) finished(ctx context.Context, out progress.Output, desc ocispec.Descriptor) {
234
+	if images.IsLayerType(desc.MediaType) {
235
+		p.layers = append(p.layers, desc)
236
+	}
237
+}
238
+
176 239
 type pushProgress struct {
177 240
 	Tracker                         docker.StatusTracker
178 241
 	notStartedWaitingAreUnavailable atomic.Bool