Browse code

Avoid buffering to tempfile when pushing with V2

The practice of buffering to a tempfile during a pushing contributes massively
to slow V2 push performance perception. The protocol was actually designed to
avoid precalculation, supporting cut-through data push. This means we can
assemble the layer, calculate its digest and push to the remote endpoint, all
at the same time.

This should increase performance massively on systems with slow disks or IO
bottlenecks.

Signed-off-by: Stephen J Day <stephen.day@docker.com>

Stephen J Day authored on 2015/08/12 05:47:08
Showing 5 changed files
... ...
@@ -2,7 +2,6 @@ package graph
2 2
 
3 3
 import (
4 4
 	"compress/gzip"
5
-	"crypto/sha256"
6 5
 	"encoding/json"
7 6
 	"errors"
8 7
 	"fmt"
... ...
@@ -340,26 +339,6 @@ func (graph *Graph) newTempFile() (*os.File, error) {
340 340
 	return ioutil.TempFile(tmp, "")
341 341
 }
342 342
 
343
-func bufferToFile(f *os.File, src io.Reader) (int64, digest.Digest, error) {
344
-	var (
345
-		h = sha256.New()
346
-		w = gzip.NewWriter(io.MultiWriter(f, h))
347
-	)
348
-	_, err := io.Copy(w, src)
349
-	w.Close()
350
-	if err != nil {
351
-		return 0, "", err
352
-	}
353
-	n, err := f.Seek(0, os.SEEK_CUR)
354
-	if err != nil {
355
-		return 0, "", err
356
-	}
357
-	if _, err := f.Seek(0, 0); err != nil {
358
-		return 0, "", err
359
-	}
360
-	return n, digest.NewDigest("sha256", h), nil
361
-}
362
-
363 343
 // Delete atomically removes an image from the graph.
364 344
 func (graph *Graph) Delete(name string) error {
365 345
 	id, err := graph.idIndex.Get(name)
... ...
@@ -2,8 +2,8 @@ package graph
2 2
 
3 3
 import (
4 4
 	"fmt"
5
+	"io"
5 6
 	"io/ioutil"
6
-	"os"
7 7
 
8 8
 	"github.com/Sirupsen/logrus"
9 9
 	"github.com/docker/distribution"
... ...
@@ -199,7 +199,7 @@ func (p *v2Pusher) pushV2Tag(tag string) error {
199 199
 func (p *v2Pusher) pushV2Image(bs distribution.BlobService, img *image.Image) (digest.Digest, error) {
200 200
 	out := p.config.OutStream
201 201
 
202
-	out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Buffering to Disk", nil))
202
+	out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Preparing", nil))
203 203
 
204 204
 	image, err := p.graph.Get(img.ID)
205 205
 	if err != nil {
... ...
@@ -209,52 +209,46 @@ func (p *v2Pusher) pushV2Image(bs distribution.BlobService, img *image.Image) (d
209 209
 	if err != nil {
210 210
 		return "", err
211 211
 	}
212
-
213
-	tf, err := p.graph.newTempFile()
214
-	if err != nil {
215
-		return "", err
216
-	}
217
-	defer func() {
218
-		tf.Close()
219
-		os.Remove(tf.Name())
220
-	}()
221
-
222
-	size, dgst, err := bufferToFile(tf, arch)
223
-	if err != nil {
224
-		return "", err
225
-	}
212
+	defer arch.Close()
226 213
 
227 214
 	// Send the layer
228
-	logrus.Debugf("rendered layer for %s of [%d] size", img.ID, size)
229 215
 	layerUpload, err := bs.Create(context.Background())
230 216
 	if err != nil {
231 217
 		return "", err
232 218
 	}
233 219
 	defer layerUpload.Close()
234 220
 
221
+	digester := digest.Canonical.New()
222
+	tee := io.TeeReader(arch, digester.Hash())
223
+
235 224
 	reader := progressreader.New(progressreader.Config{
236
-		In:        ioutil.NopCloser(tf),
225
+		In:        ioutil.NopCloser(tee), // we'll take care of close here.
237 226
 		Out:       out,
238 227
 		Formatter: p.sf,
239
-		Size:      size,
240
-		NewLines:  false,
241
-		ID:        stringid.TruncateID(img.ID),
242
-		Action:    "Pushing",
228
+
229
+		// TODO(stevvooe): This may cause a size reporting error. Try to get
230
+		// this from tar-split or elsewhere. The main issue here is that we
231
+		// don't want to buffer to disk *just* to calculate the size.
232
+		Size: img.Size,
233
+
234
+		NewLines: false,
235
+		ID:       stringid.TruncateID(img.ID),
236
+		Action:   "Pushing",
243 237
 	})
244
-	n, err := layerUpload.ReadFrom(reader)
238
+
239
+	out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Pushing", nil))
240
+	nn, err := io.Copy(layerUpload, reader)
245 241
 	if err != nil {
246 242
 		return "", err
247 243
 	}
248
-	if n != size {
249
-		return "", fmt.Errorf("short upload: only wrote %d of %d", n, size)
250
-	}
251 244
 
252
-	desc := distribution.Descriptor{Digest: dgst}
253
-	if _, err := layerUpload.Commit(context.Background(), desc); err != nil {
245
+	dgst := digester.Digest()
246
+	if _, err := layerUpload.Commit(context.Background(), distribution.Descriptor{Digest: dgst}); err != nil {
254 247
 		return "", err
255 248
 	}
256 249
 
257
-	out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Image successfully pushed", nil))
250
+	logrus.Debugf("uploaded layer %s (%s), %d bytes", img.ID, dgst, nn)
251
+	out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Pushed", nil))
258 252
 
259 253
 	return dgst, nil
260 254
 }
... ...
@@ -108,7 +108,7 @@ func (s *DockerRegistrySuite) TestPushInterrupt(c *check.C) {
108 108
 	}
109 109
 
110 110
 	// Interrupt push (yes, we have no idea at what point it will get killed).
111
-	time.Sleep(200 * time.Millisecond)
111
+	time.Sleep(50 * time.Millisecond) // dependent on race condition.
112 112
 	if err := pushCmd.Process.Kill(); err != nil {
113 113
 		c.Fatalf("Failed to kill push process: %v", err)
114 114
 	}
... ...
@@ -67,8 +67,14 @@ func (p *JSONProgress) String() string {
67 67
 		}
68 68
 		pbBox = fmt.Sprintf("[%s>%s] ", strings.Repeat("=", percentage), strings.Repeat(" ", numSpaces))
69 69
 	}
70
+
70 71
 	numbersBox = fmt.Sprintf("%8v/%v", current, total)
71 72
 
73
+	if p.Current > p.Total {
74
+		// remove total display if the reported current is wonky.
75
+		numbersBox = fmt.Sprintf("%8v", current)
76
+	}
77
+
72 78
 	if p.Current > 0 && p.Start > 0 && percentage < 50 {
73 79
 		fromStart := time.Now().UTC().Sub(time.Unix(p.Start, 0))
74 80
 		perEntry := fromStart / time.Duration(p.Current)
... ...
@@ -3,12 +3,12 @@ package jsonmessage
3 3
 import (
4 4
 	"bytes"
5 5
 	"fmt"
6
+	"strings"
6 7
 	"testing"
7 8
 	"time"
8 9
 
9 10
 	"github.com/docker/docker/pkg/term"
10 11
 	"github.com/docker/docker/pkg/timeutils"
11
-	"strings"
12 12
 )
13 13
 
14 14
 func TestError(t *testing.T) {
... ...
@@ -45,7 +45,7 @@ func TestProgress(t *testing.T) {
45 45
 	}
46 46
 
47 47
 	// this number can't be negative gh#7136
48
-	expected = "[==================================================>]     50 B/40 B"
48
+	expected = "[==================================================>]     50 B"
49 49
 	jp5 := JSONProgress{Current: 50, Total: 40}
50 50
 	if jp5.String() != expected {
51 51
 		t.Fatalf("Expected %q, got %q", expected, jp5.String())