Browse code

Fix concurrent uploads that share layers

Concurrent uploads which share layers worked correctly as of #18353,
but unfortunately #18785 caused a regression. This PR removed the logic
that shares digests between different push sessions. This overlooked the
case where one session was waiting for another session to upload a
layer.

This commit adds back the ability to propagate this digest information,
using the distribution.Descriptor type because this is what is received
from stats and uploads, and also what is ultimately needed for building
the manifest.

Surprisingly, there was no test covering this case. This commit adds
one. It fails without the fix.

See recent comments on #9132.

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>

Aaron Lehmann authored on 2016/03/02 03:56:05
Showing 4 changed files
... ...
@@ -42,7 +42,7 @@ type v2Pusher struct {
42 42
 	config            *ImagePushConfig
43 43
 	repo              distribution.Repository
44 44
 
45
-	// pushState is state built by the Download functions.
45
+	// pushState is state built by the Upload functions.
46 46
 	pushState pushState
47 47
 }
48 48
 
... ...
@@ -224,6 +224,7 @@ type v2PushDescriptor struct {
224 224
 	repoInfo          reference.Named
225 225
 	repo              distribution.Repository
226 226
 	pushState         *pushState
227
+	remoteDescriptor  distribution.Descriptor
227 228
 }
228 229
 
229 230
 func (pd *v2PushDescriptor) Key() string {
... ...
@@ -238,16 +239,16 @@ func (pd *v2PushDescriptor) DiffID() layer.DiffID {
238 238
 	return pd.layer.DiffID()
239 239
 }
240 240
 
241
-func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.Output) error {
241
+func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) {
242 242
 	diffID := pd.DiffID()
243 243
 
244 244
 	pd.pushState.Lock()
245
-	if _, ok := pd.pushState.remoteLayers[diffID]; ok {
245
+	if descriptor, ok := pd.pushState.remoteLayers[diffID]; ok {
246 246
 		// it is already known that the push is not needed and
247 247
 		// therefore doing a stat is unnecessary
248 248
 		pd.pushState.Unlock()
249 249
 		progress.Update(progressOutput, pd.ID(), "Layer already exists")
250
-		return nil
250
+		return descriptor, nil
251 251
 	}
252 252
 	pd.pushState.Unlock()
253 253
 
... ...
@@ -257,14 +258,14 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
257 257
 		descriptor, exists, err := layerAlreadyExists(ctx, v2Metadata, pd.repoInfo, pd.repo, pd.pushState)
258 258
 		if err != nil {
259 259
 			progress.Update(progressOutput, pd.ID(), "Image push failed")
260
-			return retryOnError(err)
260
+			return distribution.Descriptor{}, retryOnError(err)
261 261
 		}
262 262
 		if exists {
263 263
 			progress.Update(progressOutput, pd.ID(), "Layer already exists")
264 264
 			pd.pushState.Lock()
265 265
 			pd.pushState.remoteLayers[diffID] = descriptor
266 266
 			pd.pushState.Unlock()
267
-			return nil
267
+			return descriptor, nil
268 268
 		}
269 269
 	}
270 270
 
... ...
@@ -328,9 +329,9 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
328 328
 
329 329
 			// Cache mapping from this layer's DiffID to the blobsum
330 330
 			if err := pd.v2MetadataService.Add(diffID, metadata.V2Metadata{Digest: mountFrom.Digest, SourceRepository: pd.repoInfo.FullName()}); err != nil {
331
-				return xfer.DoNotRetry{Err: err}
331
+				return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
332 332
 			}
333
-			return nil
333
+			return err.Descriptor, nil
334 334
 		case nil:
335 335
 			// blob upload session created successfully, so begin the upload
336 336
 			mountAttemptsRemaining = 0
... ...
@@ -345,14 +346,14 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
345 345
 	if layerUpload == nil {
346 346
 		layerUpload, err = bs.Create(ctx)
347 347
 		if err != nil {
348
-			return retryOnError(err)
348
+			return distribution.Descriptor{}, retryOnError(err)
349 349
 		}
350 350
 	}
351 351
 	defer layerUpload.Close()
352 352
 
353 353
 	arch, err := pd.layer.TarStream()
354 354
 	if err != nil {
355
-		return xfer.DoNotRetry{Err: err}
355
+		return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
356 356
 	}
357 357
 
358 358
 	// don't care if this fails; best effort
... ...
@@ -371,12 +372,12 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
371 371
 	nn, err := layerUpload.ReadFrom(tee)
372 372
 	compressedReader.Close()
373 373
 	if err != nil {
374
-		return retryOnError(err)
374
+		return distribution.Descriptor{}, retryOnError(err)
375 375
 	}
376 376
 
377 377
 	pushDigest := digester.Digest()
378 378
 	if _, err := layerUpload.Commit(ctx, distribution.Descriptor{Digest: pushDigest}); err != nil {
379
-		return retryOnError(err)
379
+		return distribution.Descriptor{}, retryOnError(err)
380 380
 	}
381 381
 
382 382
 	logrus.Debugf("uploaded layer %s (%s), %d bytes", diffID, pushDigest, nn)
... ...
@@ -384,7 +385,7 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
384 384
 
385 385
 	// Cache mapping from this layer's DiffID to the blobsum
386 386
 	if err := pd.v2MetadataService.Add(diffID, metadata.V2Metadata{Digest: pushDigest, SourceRepository: pd.repoInfo.FullName()}); err != nil {
387
-		return xfer.DoNotRetry{Err: err}
387
+		return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
388 388
 	}
389 389
 
390 390
 	pd.pushState.Lock()
... ...
@@ -393,23 +394,24 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
393 393
 	// speaks the v2 protocol.
394 394
 	pd.pushState.confirmedV2 = true
395 395
 
396
-	pd.pushState.remoteLayers[diffID] = distribution.Descriptor{
396
+	descriptor := distribution.Descriptor{
397 397
 		Digest:    pushDigest,
398 398
 		MediaType: schema2.MediaTypeLayer,
399 399
 		Size:      nn,
400 400
 	}
401
+	pd.pushState.remoteLayers[diffID] = descriptor
401 402
 
402 403
 	pd.pushState.Unlock()
403 404
 
404
-	return nil
405
+	return descriptor, nil
406
+}
407
+
408
+func (pd *v2PushDescriptor) SetRemoteDescriptor(descriptor distribution.Descriptor) {
409
+	pd.remoteDescriptor = descriptor
405 410
 }
406 411
 
407 412
 func (pd *v2PushDescriptor) Descriptor() distribution.Descriptor {
408
-	// Not necessary to lock pushStatus because this is always
409
-	// called after all the mutation in pushStatus.
410
-	// By the time this function is called, every layer will have
411
-	// an entry in remoteLayers.
412
-	return pd.pushState.remoteLayers[pd.DiffID()]
413
+	return pd.remoteDescriptor
413 414
 }
414 415
 
415 416
 // layerAlreadyExists checks if the registry already know about any of the
... ...
@@ -5,6 +5,7 @@ import (
5 5
 	"time"
6 6
 
7 7
 	"github.com/Sirupsen/logrus"
8
+	"github.com/docker/distribution"
8 9
 	"github.com/docker/docker/layer"
9 10
 	"github.com/docker/docker/pkg/progress"
10 11
 	"golang.org/x/net/context"
... ...
@@ -28,8 +29,8 @@ func NewLayerUploadManager(concurrencyLimit int) *LayerUploadManager {
28 28
 type uploadTransfer struct {
29 29
 	Transfer
30 30
 
31
-	diffID layer.DiffID
32
-	err    error
31
+	remoteDescriptor distribution.Descriptor
32
+	err              error
33 33
 }
34 34
 
35 35
 // An UploadDescriptor references a layer that may need to be uploaded.
... ...
@@ -41,7 +42,12 @@ type UploadDescriptor interface {
41 41
 	// DiffID should return the DiffID for this layer.
42 42
 	DiffID() layer.DiffID
43 43
 	// Upload is called to perform the Upload.
44
-	Upload(ctx context.Context, progressOutput progress.Output) error
44
+	Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error)
45
+	// SetRemoteDescriptor provides the distribution.Descriptor that was
46
+	// returned by Upload. This descriptor is not to be confused with
47
+	// the UploadDescriptor interface, which is used for internally
48
+	// identifying layers that are being uploaded.
49
+	SetRemoteDescriptor(descriptor distribution.Descriptor)
45 50
 }
46 51
 
47 52
 // Upload is a blocking function which ensures the listed layers are present on
... ...
@@ -50,7 +56,7 @@ type UploadDescriptor interface {
50 50
 func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescriptor, progressOutput progress.Output) error {
51 51
 	var (
52 52
 		uploads          []*uploadTransfer
53
-		dedupDescriptors = make(map[string]struct{})
53
+		dedupDescriptors = make(map[string]*uploadTransfer)
54 54
 	)
55 55
 
56 56
 	for _, descriptor := range layers {
... ...
@@ -60,12 +66,12 @@ func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescri
60 60
 		if _, present := dedupDescriptors[key]; present {
61 61
 			continue
62 62
 		}
63
-		dedupDescriptors[key] = struct{}{}
64 63
 
65 64
 		xferFunc := lum.makeUploadFunc(descriptor)
66 65
 		upload, watcher := lum.tm.Transfer(descriptor.Key(), xferFunc, progressOutput)
67 66
 		defer upload.Release(watcher)
68 67
 		uploads = append(uploads, upload.(*uploadTransfer))
68
+		dedupDescriptors[key] = upload.(*uploadTransfer)
69 69
 	}
70 70
 
71 71
 	for _, upload := range uploads {
... ...
@@ -78,6 +84,9 @@ func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescri
78 78
 			}
79 79
 		}
80 80
 	}
81
+	for _, l := range layers {
82
+		l.SetRemoteDescriptor(dedupDescriptors[l.Key()].remoteDescriptor)
83
+	}
81 84
 
82 85
 	return nil
83 86
 }
... ...
@@ -86,7 +95,6 @@ func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFun
86 86
 	return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
87 87
 		u := &uploadTransfer{
88 88
 			Transfer: NewTransfer(),
89
-			diffID:   descriptor.DiffID(),
90 89
 		}
91 90
 
92 91
 		go func() {
... ...
@@ -105,8 +113,9 @@ func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFun
105 105
 
106 106
 			retries := 0
107 107
 			for {
108
-				err := descriptor.Upload(u.Transfer.Context(), progressOutput)
108
+				remoteDescriptor, err := descriptor.Upload(u.Transfer.Context(), progressOutput)
109 109
 				if err == nil {
110
+					u.remoteDescriptor = remoteDescriptor
110 111
 					break
111 112
 				}
112 113
 
... ...
@@ -6,6 +6,7 @@ import (
6 6
 	"testing"
7 7
 	"time"
8 8
 
9
+	"github.com/docker/distribution"
9 10
 	"github.com/docker/distribution/digest"
10 11
 	"github.com/docker/docker/layer"
11 12
 	"github.com/docker/docker/pkg/progress"
... ...
@@ -35,13 +36,17 @@ func (u *mockUploadDescriptor) DiffID() layer.DiffID {
35 35
 	return u.diffID
36 36
 }
37 37
 
38
+// SetRemoteDescriptor is not used in the mock.
39
+func (u *mockUploadDescriptor) SetRemoteDescriptor(remoteDescriptor distribution.Descriptor) {
40
+}
41
+
38 42
 // Upload is called to perform the upload.
39
-func (u *mockUploadDescriptor) Upload(ctx context.Context, progressOutput progress.Output) error {
43
+func (u *mockUploadDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) {
40 44
 	if u.currentUploads != nil {
41 45
 		defer atomic.AddInt32(u.currentUploads, -1)
42 46
 
43 47
 		if atomic.AddInt32(u.currentUploads, 1) > maxUploadConcurrency {
44
-			return errors.New("concurrency limit exceeded")
48
+			return distribution.Descriptor{}, errors.New("concurrency limit exceeded")
45 49
 		}
46 50
 	}
47 51
 
... ...
@@ -49,7 +54,7 @@ func (u *mockUploadDescriptor) Upload(ctx context.Context, progressOutput progre
49 49
 	for i := int64(0); i <= 10; i++ {
50 50
 		select {
51 51
 		case <-ctx.Done():
52
-			return ctx.Err()
52
+			return distribution.Descriptor{}, ctx.Err()
53 53
 		case <-time.After(10 * time.Millisecond):
54 54
 			progressOutput.WriteProgress(progress.Progress{ID: u.ID(), Current: i, Total: 10})
55 55
 		}
... ...
@@ -57,10 +62,10 @@ func (u *mockUploadDescriptor) Upload(ctx context.Context, progressOutput progre
57 57
 
58 58
 	if u.simulateRetries != 0 {
59 59
 		u.simulateRetries--
60
-		return errors.New("simulating retry")
60
+		return distribution.Descriptor{}, errors.New("simulating retry")
61 61
 	}
62 62
 
63
-	return nil
63
+	return distribution.Descriptor{}, nil
64 64
 }
65 65
 
66 66
 func uploadDescriptors(currentUploads *int32) []UploadDescriptor {
... ...
@@ -148,6 +148,61 @@ func (s *DockerSchema1RegistrySuite) TestPushEmptyLayer(c *check.C) {
148 148
 	testPushEmptyLayer(c)
149 149
 }
150 150
 
151
+// testConcurrentPush pushes multiple tags to the same repo
152
+// concurrently.
153
+func testConcurrentPush(c *check.C) {
154
+	repoName := fmt.Sprintf("%v/dockercli/busybox", privateRegistryURL)
155
+
156
+	repos := []string{}
157
+	for _, tag := range []string{"push1", "push2", "push3"} {
158
+		repo := fmt.Sprintf("%v:%v", repoName, tag)
159
+		_, err := buildImage(repo, fmt.Sprintf(`
160
+	FROM busybox
161
+	ENTRYPOINT ["/bin/echo"]
162
+	ENV FOO foo
163
+	ENV BAR bar
164
+	CMD echo %s
165
+`, repo), true)
166
+		c.Assert(err, checker.IsNil)
167
+		repos = append(repos, repo)
168
+	}
169
+
170
+	// Push tags, in parallel
171
+	results := make(chan error)
172
+
173
+	for _, repo := range repos {
174
+		go func(repo string) {
175
+			_, _, err := runCommandWithOutput(exec.Command(dockerBinary, "push", repo))
176
+			results <- err
177
+		}(repo)
178
+	}
179
+
180
+	for range repos {
181
+		err := <-results
182
+		c.Assert(err, checker.IsNil, check.Commentf("concurrent push failed with error: %v", err))
183
+	}
184
+
185
+	// Clear local images store.
186
+	args := append([]string{"rmi"}, repos...)
187
+	dockerCmd(c, args...)
188
+
189
+	// Re-pull and run individual tags, to make sure pushes succeeded
190
+	for _, repo := range repos {
191
+		dockerCmd(c, "pull", repo)
192
+		dockerCmd(c, "inspect", repo)
193
+		out, _ := dockerCmd(c, "run", "--rm", repo)
194
+		c.Assert(strings.TrimSpace(out), checker.Equals, "/bin/sh -c echo "+repo)
195
+	}
196
+}
197
+
198
+func (s *DockerRegistrySuite) TestConcurrentPush(c *check.C) {
199
+	testConcurrentPush(c)
200
+}
201
+
202
+func (s *DockerSchema1RegistrySuite) TestConcurrentPush(c *check.C) {
203
+	testConcurrentPush(c)
204
+}
205
+
151 206
 func (s *DockerRegistrySuite) TestCrossRepositoryLayerPush(c *check.C) {
152 207
 	sourceRepoName := fmt.Sprintf("%v/dockercli/busybox", privateRegistryURL)
153 208
 	// tag the image to upload it to the private registry