Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
| ... | ... |
@@ -110,7 +110,6 @@ type downloadInfo struct {
|
| 110 | 110 |
layer distribution.ReadSeekCloser |
| 111 | 111 |
size int64 |
| 112 | 112 |
err chan error |
| 113 |
- out io.Writer // Download progress is written here. |
|
| 114 | 113 |
poolKey string |
| 115 | 114 |
broadcaster *progressreader.Broadcaster |
| 116 | 115 |
} |
| ... | ... |
@@ -122,22 +121,6 @@ func (errVerification) Error() string { return "verification failed" }
|
| 122 | 122 |
func (p *v2Puller) download(di *downloadInfo) {
|
| 123 | 123 |
logrus.Debugf("pulling blob %q to %s", di.digest, di.img.ID)
|
| 124 | 124 |
|
| 125 |
- di.poolKey = "layer:" + di.img.ID |
|
| 126 |
- broadcaster, found := p.poolAdd("pull", di.poolKey)
|
|
| 127 |
- broadcaster.Add(di.out) |
|
| 128 |
- di.broadcaster = broadcaster |
|
| 129 |
- if found {
|
|
| 130 |
- di.err <- nil |
|
| 131 |
- return |
|
| 132 |
- } |
|
| 133 |
- |
|
| 134 |
- tmpFile, err := ioutil.TempFile("", "GetImageBlob")
|
|
| 135 |
- if err != nil {
|
|
| 136 |
- di.err <- err |
|
| 137 |
- return |
|
| 138 |
- } |
|
| 139 |
- di.tmpFile = tmpFile |
|
| 140 |
- |
|
| 141 | 125 |
blobs := p.repo.Blobs(context.Background()) |
| 142 | 126 |
|
| 143 | 127 |
desc, err := blobs.Stat(context.Background(), di.digest) |
| ... | ... |
@@ -164,16 +147,16 @@ func (p *v2Puller) download(di *downloadInfo) {
|
| 164 | 164 |
|
| 165 | 165 |
reader := progressreader.New(progressreader.Config{
|
| 166 | 166 |
In: ioutil.NopCloser(io.TeeReader(layerDownload, verifier)), |
| 167 |
- Out: broadcaster, |
|
| 167 |
+ Out: di.broadcaster, |
|
| 168 | 168 |
Formatter: p.sf, |
| 169 | 169 |
Size: di.size, |
| 170 | 170 |
NewLines: false, |
| 171 | 171 |
ID: stringid.TruncateID(di.img.ID), |
| 172 | 172 |
Action: "Downloading", |
| 173 | 173 |
}) |
| 174 |
- io.Copy(tmpFile, reader) |
|
| 174 |
+ io.Copy(di.tmpFile, reader) |
|
| 175 | 175 |
|
| 176 |
- broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Verifying Checksum", nil)) |
|
| 176 |
+ di.broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Verifying Checksum", nil)) |
|
| 177 | 177 |
|
| 178 | 178 |
if !verifier.Verified() {
|
| 179 | 179 |
err = fmt.Errorf("filesystem layer verification failed for digest %s", di.digest)
|
| ... | ... |
@@ -182,9 +165,9 @@ func (p *v2Puller) download(di *downloadInfo) {
|
| 182 | 182 |
return |
| 183 | 183 |
} |
| 184 | 184 |
|
| 185 |
- broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Download complete", nil)) |
|
| 185 |
+ di.broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Download complete", nil)) |
|
| 186 | 186 |
|
| 187 |
- logrus.Debugf("Downloaded %s to tempfile %s", di.img.ID, tmpFile.Name())
|
|
| 187 |
+ logrus.Debugf("Downloaded %s to tempfile %s", di.img.ID, di.tmpFile.Name())
|
|
| 188 | 188 |
di.layer = layerDownload |
| 189 | 189 |
|
| 190 | 190 |
di.err <- nil |
| ... | ... |
@@ -244,6 +227,16 @@ func (p *v2Puller) pullV2Tag(out io.Writer, tag, taggedName string) (verified bo |
| 244 | 244 |
var layerIDs []string |
| 245 | 245 |
defer func() {
|
| 246 | 246 |
p.graph.Release(p.sessionID, layerIDs...) |
| 247 |
+ |
|
| 248 |
+ for _, d := range downloads {
|
|
| 249 |
+ p.poolRemoveWithError("pull", d.poolKey, err)
|
|
| 250 |
+ if d.tmpFile != nil {
|
|
| 251 |
+ d.tmpFile.Close() |
|
| 252 |
+ if err := os.RemoveAll(d.tmpFile.Name()); err != nil {
|
|
| 253 |
+ logrus.Errorf("Failed to remove temp file: %s", d.tmpFile.Name())
|
|
| 254 |
+ } |
|
| 255 |
+ } |
|
| 256 |
+ } |
|
| 247 | 257 |
}() |
| 248 | 258 |
|
| 249 | 259 |
for i := len(manifest.FSLayers) - 1; i >= 0; i-- {
|
| ... | ... |
@@ -264,30 +257,31 @@ func (p *v2Puller) pullV2Tag(out io.Writer, tag, taggedName string) (verified bo |
| 264 | 264 |
out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Pulling fs layer", nil)) |
| 265 | 265 |
|
| 266 | 266 |
d := &downloadInfo{
|
| 267 |
- img: img, |
|
| 268 |
- digest: manifest.FSLayers[i].BlobSum, |
|
| 267 |
+ img: img, |
|
| 268 |
+ poolKey: "layer:" + img.ID, |
|
| 269 |
+ digest: manifest.FSLayers[i].BlobSum, |
|
| 269 | 270 |
// TODO: seems like this chan buffer solved hanging problem in go1.5, |
| 270 | 271 |
// this can indicate some deeper problem that somehow we never take |
| 271 | 272 |
// error from channel in loop below |
| 272 | 273 |
err: make(chan error, 1), |
| 273 |
- out: pipeWriter, |
|
| 274 | 274 |
} |
| 275 |
- downloads = append(downloads, d) |
|
| 276 | 275 |
|
| 277 |
- go p.download(d) |
|
| 278 |
- } |
|
| 276 |
+ tmpFile, err := ioutil.TempFile("", "GetImageBlob")
|
|
| 277 |
+ if err != nil {
|
|
| 278 |
+ return false, err |
|
| 279 |
+ } |
|
| 280 |
+ d.tmpFile = tmpFile |
|
| 279 | 281 |
|
| 280 |
- // run clean for all downloads to prevent leftovers |
|
| 281 |
- for _, d := range downloads {
|
|
| 282 |
- defer func(d *downloadInfo) {
|
|
| 283 |
- p.poolRemoveWithError("pull", d.poolKey, err)
|
|
| 284 |
- if d.tmpFile != nil {
|
|
| 285 |
- d.tmpFile.Close() |
|
| 286 |
- if err := os.RemoveAll(d.tmpFile.Name()); err != nil {
|
|
| 287 |
- logrus.Errorf("Failed to remove temp file: %s", d.tmpFile.Name())
|
|
| 288 |
- } |
|
| 289 |
- } |
|
| 290 |
- }(d) |
|
| 282 |
+ downloads = append(downloads, d) |
|
| 283 |
+ |
|
| 284 |
+ broadcaster, found := p.poolAdd("pull", d.poolKey)
|
|
| 285 |
+ broadcaster.Add(pipeWriter) |
|
| 286 |
+ d.broadcaster = broadcaster |
|
| 287 |
+ if found {
|
|
| 288 |
+ d.err <- nil |
|
| 289 |
+ } else {
|
|
| 290 |
+ go p.download(d) |
|
| 291 |
+ } |
|
| 291 | 292 |
} |
| 292 | 293 |
|
| 293 | 294 |
var tagUpdated bool |