`Upload` already closes the reader returned by `compress` and the
progressreader passed into it, before returning. But even so, the
io.Copy inside compress' goroutine needs to attempt a read from the
progressreader to notice that it's closed, and this read has a side
effect of outputting a progress message. If this happens after `Upload`
returns, it can result in a write to a closed channel. Change `compress`
to return a channel that allows the caller to wait for its goroutine to
finish before freeing any resources connected to the reader that was
passed to it.
Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
... | ... |
@@ -170,7 +170,14 @@ func Push(ctx context.Context, ref reference.Named, imagePushConfig *ImagePushCo |
170 | 170 |
// argument so that it can be used with httpBlobWriter's ReadFrom method. |
171 | 171 |
// Using httpBlobWriter's Write method would send a PATCH request for every |
172 | 172 |
// Write call. |
173 |
-func compress(in io.Reader) io.ReadCloser { |
|
173 |
+// |
|
174 |
+// The second return value is a channel that gets closed when the goroutine |
|
175 |
+// is finished. This allows the caller to make sure the goroutine finishes |
|
176 |
+// before it releases any resources connected with the reader that was |
|
177 |
+// passed in. |
|
178 |
+func compress(in io.Reader) (io.ReadCloser, chan struct{}) { |
|
179 |
+ compressionDone := make(chan struct{}) |
|
180 |
+ |
|
174 | 181 |
pipeReader, pipeWriter := io.Pipe() |
175 | 182 |
// Use a bufio.Writer to avoid excessive chunking in HTTP request. |
176 | 183 |
bufWriter := bufio.NewWriterSize(pipeWriter, compressionBufSize) |
... | ... |
@@ -189,7 +196,8 @@ func compress(in io.Reader) io.ReadCloser { |
189 | 189 |
} else { |
190 | 190 |
pipeWriter.Close() |
191 | 191 |
} |
192 |
+ close(compressionDone) |
|
192 | 193 |
}() |
193 | 194 |
|
194 |
- return pipeReader |
|
195 |
+ return pipeReader, compressionDone |
|
195 | 196 |
} |
... | ... |
@@ -345,8 +345,11 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. |
345 | 345 |
size, _ := pd.layer.DiffSize() |
346 | 346 |
|
347 | 347 |
reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, arch), progressOutput, size, pd.ID(), "Pushing") |
348 |
- defer reader.Close() |
|
349 |
- compressedReader := compress(reader) |
|
348 |
+ compressedReader, compressionDone := compress(reader) |
|
349 |
+ defer func() { |
|
350 |
+ reader.Close() |
|
351 |
+ <-compressionDone |
|
352 |
+ }() |
|
350 | 353 |
|
351 | 354 |
digester := digest.Canonical.New() |
352 | 355 |
tee := io.TeeReader(compressedReader, digester.Hash()) |