Browse code

Make image (layer) downloads faster by using pigz

The Golang built-in gzip library is serialized, and fairly slow
at decompressing. It also only decompresses on demand, versus
pipelining decompression.

This change switches to using the pigz external command
for gzip decompression, as opposed to using the built-in
golang one. This code is not vendored, but will be used
if it autodetected as part of the OS.

This also switches to using context, versus a manually
managed channel to manage cancellations, and synchronization.
There is a little bit of weirdness around manually having
to cancel in the error cases.

Signed-off-by: Sargun Dhillon <sargun@sargun.me>

Sargun Dhillon authored on 2018/01/17 03:49:18
Showing 10 changed files
... ...
@@ -62,6 +62,7 @@ RUN apt-get update && apt-get install -y \
62 62
 	libudev-dev \
63 63
 	mercurial \
64 64
 	net-tools \
65
+	pigz \
65 66
 	pkg-config \
66 67
 	protobuf-compiler \
67 68
 	protobuf-c-compiler \
... ...
@@ -52,6 +52,7 @@ RUN apt-get update && apt-get install -y \
52 52
 	libudev-dev \
53 53
 	mercurial \
54 54
 	net-tools \
55
+	pigz \
55 56
 	pkg-config \
56 57
 	protobuf-compiler \
57 58
 	protobuf-c-compiler \
... ...
@@ -45,6 +45,7 @@ RUN apt-get update && apt-get install -y \
45 45
 	libtool \
46 46
 	libudev-dev \
47 47
 	mercurial \
48
+	pigz \
48 49
 	pkg-config \
49 50
 	python-backports.ssl-match-hostname \
50 51
 	python-dev \
... ...
@@ -47,6 +47,7 @@ RUN apk add --update \
47 47
     g++ \
48 48
     git \
49 49
     iptables \
50
+    pigz \
50 51
     tar \
51 52
     xz \
52 53
     && rm -rf /var/cache/apk/*
... ...
@@ -46,6 +46,7 @@ RUN apt-get update && apt-get install -y \
46 46
 	libtool \
47 47
 	libudev-dev \
48 48
 	mercurial \
49
+	pigz \
49 50
 	pkg-config \
50 51
 	python-backports.ssl-match-hostname \
51 52
 	python-dev \
... ...
@@ -42,6 +42,7 @@ RUN apt-get update && apt-get install -y \
42 42
 	libtool \
43 43
 	libudev-dev \
44 44
 	mercurial \
45
+	pigz \
45 46
 	pkg-config \
46 47
 	python-backports.ssl-match-hostname \
47 48
 	python-dev \
... ...
@@ -28,6 +28,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
28 28
 		e2fsprogs \
29 29
 		iptables \
30 30
 		pkg-config \
31
+		pigz \
31 32
 		procps \
32 33
 		xfsprogs \
33 34
 		xz-utils \
... ...
@@ -6,6 +6,7 @@ import (
6 6
 	"bytes"
7 7
 	"compress/bzip2"
8 8
 	"compress/gzip"
9
+	"context"
9 10
 	"fmt"
10 11
 	"io"
11 12
 	"io/ioutil"
... ...
@@ -13,6 +14,7 @@ import (
13 13
 	"os/exec"
14 14
 	"path/filepath"
15 15
 	"runtime"
16
+	"strconv"
16 17
 	"strings"
17 18
 	"syscall"
18 19
 
... ...
@@ -24,6 +26,17 @@ import (
24 24
 	"github.com/sirupsen/logrus"
25 25
 )
26 26
 
27
+var unpigzPath string
28
+
29
+func init() {
30
+	if path, err := exec.LookPath("unpigz"); err != nil {
31
+		logrus.Debug("unpigz binary not found in PATH, falling back to go gzip library")
32
+	} else {
33
+		logrus.Debugf("Using unpigz binary found at path %s", path)
34
+		unpigzPath = path
35
+	}
36
+}
37
+
27 38
 type (
28 39
 	// Compression is the state represents if compressed or not.
29 40
 	Compression int
... ...
@@ -136,10 +149,34 @@ func DetectCompression(source []byte) Compression {
136 136
 	return Uncompressed
137 137
 }
138 138
 
139
-func xzDecompress(archive io.Reader) (io.ReadCloser, <-chan struct{}, error) {
139
+func xzDecompress(ctx context.Context, archive io.Reader) (io.ReadCloser, error) {
140 140
 	args := []string{"xz", "-d", "-c", "-q"}
141 141
 
142
-	return cmdStream(exec.Command(args[0], args[1:]...), archive)
142
+	return cmdStream(exec.CommandContext(ctx, args[0], args[1:]...), archive)
143
+}
144
+
145
+func gzDecompress(ctx context.Context, buf io.Reader) (io.ReadCloser, error) {
146
+	if unpigzPath == "" {
147
+		return gzip.NewReader(buf)
148
+	}
149
+
150
+	disablePigzEnv := os.Getenv("MOBY_DISABLE_PIGZ")
151
+	if disablePigzEnv != "" {
152
+		if disablePigz, err := strconv.ParseBool(disablePigzEnv); err != nil {
153
+			return nil, err
154
+		} else if disablePigz {
155
+			return gzip.NewReader(buf)
156
+		}
157
+	}
158
+
159
+	return cmdStream(exec.CommandContext(ctx, unpigzPath, "-d", "-c"), buf)
160
+}
161
+
162
+func wrapReadCloser(readBuf io.ReadCloser, cancel context.CancelFunc) io.ReadCloser {
163
+	return ioutils.NewReadCloserWrapper(readBuf, func() error {
164
+		cancel()
165
+		return readBuf.Close()
166
+	})
143 167
 }
144 168
 
145 169
 // DecompressStream decompresses the archive and returns a ReaderCloser with the decompressed archive.
... ...
@@ -163,26 +200,29 @@ func DecompressStream(archive io.Reader) (io.ReadCloser, error) {
163 163
 		readBufWrapper := p.NewReadCloserWrapper(buf, buf)
164 164
 		return readBufWrapper, nil
165 165
 	case Gzip:
166
-		gzReader, err := gzip.NewReader(buf)
166
+		ctx, cancel := context.WithCancel(context.Background())
167
+
168
+		gzReader, err := gzDecompress(ctx, buf)
167 169
 		if err != nil {
170
+			cancel()
168 171
 			return nil, err
169 172
 		}
170 173
 		readBufWrapper := p.NewReadCloserWrapper(buf, gzReader)
171
-		return readBufWrapper, nil
174
+		return wrapReadCloser(readBufWrapper, cancel), nil
172 175
 	case Bzip2:
173 176
 		bz2Reader := bzip2.NewReader(buf)
174 177
 		readBufWrapper := p.NewReadCloserWrapper(buf, bz2Reader)
175 178
 		return readBufWrapper, nil
176 179
 	case Xz:
177
-		xzReader, chdone, err := xzDecompress(buf)
180
+		ctx, cancel := context.WithCancel(context.Background())
181
+
182
+		xzReader, err := xzDecompress(ctx, buf)
178 183
 		if err != nil {
184
+			cancel()
179 185
 			return nil, err
180 186
 		}
181 187
 		readBufWrapper := p.NewReadCloserWrapper(buf, xzReader)
182
-		return ioutils.NewReadCloserWrapper(readBufWrapper, func() error {
183
-			<-chdone
184
-			return readBufWrapper.Close()
185
-		}), nil
188
+		return wrapReadCloser(readBufWrapper, cancel), nil
186 189
 	default:
187 190
 		return nil, fmt.Errorf("Unsupported compression format %s", (&compression).Extension())
188 191
 	}
... ...
@@ -1163,8 +1203,7 @@ func remapIDs(idMappings *idtools.IDMappings, hdr *tar.Header) error {
1163 1163
 // cmdStream executes a command, and returns its stdout as a stream.
1164 1164
 // If the command fails to run or doesn't complete successfully, an error
1165 1165
 // will be returned, including anything written on stderr.
1166
-func cmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, <-chan struct{}, error) {
1167
-	chdone := make(chan struct{})
1166
+func cmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, error) {
1168 1167
 	cmd.Stdin = input
1169 1168
 	pipeR, pipeW := io.Pipe()
1170 1169
 	cmd.Stdout = pipeW
... ...
@@ -1173,7 +1212,7 @@ func cmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, <-chan struct{},
1173 1173
 
1174 1174
 	// Run the command and return the pipe
1175 1175
 	if err := cmd.Start(); err != nil {
1176
-		return nil, nil, err
1176
+		return nil, err
1177 1177
 	}
1178 1178
 
1179 1179
 	// Copy stdout to the returned pipe
... ...
@@ -1183,10 +1222,9 @@ func cmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, <-chan struct{},
1183 1183
 		} else {
1184 1184
 			pipeW.Close()
1185 1185
 		}
1186
-		close(chdone)
1187 1186
 	}()
1188 1187
 
1189
-	return pipeR, chdone, nil
1188
+	return pipeR, nil
1190 1189
 }
1191 1190
 
1192 1191
 // NewTempArchive reads the content of src into a temporary file, and returns the contents
... ...
@@ -3,6 +3,7 @@ package archive
3 3
 import (
4 4
 	"archive/tar"
5 5
 	"bytes"
6
+	"compress/gzip"
6 7
 	"fmt"
7 8
 	"io"
8 9
 	"io/ioutil"
... ...
@@ -15,6 +16,7 @@ import (
15 15
 	"time"
16 16
 
17 17
 	"github.com/docker/docker/pkg/idtools"
18
+	"github.com/docker/docker/pkg/ioutils"
18 19
 	"github.com/stretchr/testify/assert"
19 20
 	"github.com/stretchr/testify/require"
20 21
 )
... ...
@@ -87,7 +89,7 @@ func TestIsArchivePathTar(t *testing.T) {
87 87
 	}
88 88
 }
89 89
 
90
-func testDecompressStream(t *testing.T, ext, compressCommand string) {
90
+func testDecompressStream(t *testing.T, ext, compressCommand string) io.Reader {
91 91
 	cmd := exec.Command("sh", "-c",
92 92
 		fmt.Sprintf("touch /tmp/archive && %s /tmp/archive", compressCommand))
93 93
 	output, err := cmd.CombinedOutput()
... ...
@@ -111,6 +113,8 @@ func testDecompressStream(t *testing.T, ext, compressCommand string) {
111 111
 	if err = r.Close(); err != nil {
112 112
 		t.Fatalf("Failed to close the decompressed stream: %v ", err)
113 113
 	}
114
+
115
+	return r
114 116
 }
115 117
 
116 118
 func TestDecompressStreamGzip(t *testing.T) {
... ...
@@ -206,7 +210,7 @@ func TestExtensionXz(t *testing.T) {
206 206
 
207 207
 func TestCmdStreamLargeStderr(t *testing.T) {
208 208
 	cmd := exec.Command("sh", "-c", "dd if=/dev/zero bs=1k count=1000 of=/dev/stderr; echo hello")
209
-	out, _, err := cmdStream(cmd, nil)
209
+	out, err := cmdStream(cmd, nil)
210 210
 	if err != nil {
211 211
 		t.Fatalf("Failed to start command: %s", err)
212 212
 	}
... ...
@@ -231,7 +235,7 @@ func TestCmdStreamBad(t *testing.T) {
231 231
 		t.Skip("Failing on Windows CI machines")
232 232
 	}
233 233
 	badCmd := exec.Command("sh", "-c", "echo hello; echo >&2 error couldn\\'t reverse the phase pulser; exit 1")
234
-	out, _, err := cmdStream(badCmd, nil)
234
+	out, err := cmdStream(badCmd, nil)
235 235
 	if err != nil {
236 236
 		t.Fatalf("Failed to start command: %s", err)
237 237
 	}
... ...
@@ -246,7 +250,7 @@ func TestCmdStreamBad(t *testing.T) {
246 246
 
247 247
 func TestCmdStreamGood(t *testing.T) {
248 248
 	cmd := exec.Command("sh", "-c", "echo hello; exit 0")
249
-	out, _, err := cmdStream(cmd, nil)
249
+	out, err := cmdStream(cmd, nil)
250 250
 	if err != nil {
251 251
 		t.Fatal(err)
252 252
 	}
... ...
@@ -1318,3 +1322,38 @@ func readFileFromArchive(t *testing.T, archive io.ReadCloser, name string, expec
1318 1318
 	assert.NoError(t, err)
1319 1319
 	return string(content)
1320 1320
 }
1321
+
1322
+func TestDisablePigz(t *testing.T) {
1323
+	_, err := exec.LookPath("unpigz")
1324
+	if err != nil {
1325
+		t.Log("Test will not check full path when Pigz not installed")
1326
+	}
1327
+
1328
+	os.Setenv("MOBY_DISABLE_PIGZ", "true")
1329
+	defer os.Unsetenv("MOBY_DISABLE_PIGZ")
1330
+
1331
+	r := testDecompressStream(t, "gz", "gzip -f")
1332
+	// For the bufio pool
1333
+	outsideReaderCloserWrapper := r.(*ioutils.ReadCloserWrapper)
1334
+	// For the context canceller
1335
+	contextReaderCloserWrapper := outsideReaderCloserWrapper.Reader.(*ioutils.ReadCloserWrapper)
1336
+
1337
+	assert.IsType(t, &gzip.Reader{}, contextReaderCloserWrapper.Reader)
1338
+}
1339
+
1340
+func TestPigz(t *testing.T) {
1341
+	r := testDecompressStream(t, "gz", "gzip -f")
1342
+	// For the bufio pool
1343
+	outsideReaderCloserWrapper := r.(*ioutils.ReadCloserWrapper)
1344
+	// For the context canceller
1345
+	contextReaderCloserWrapper := outsideReaderCloserWrapper.Reader.(*ioutils.ReadCloserWrapper)
1346
+
1347
+	_, err := exec.LookPath("unpigz")
1348
+	if err == nil {
1349
+		t.Log("Tested whether Pigz is used, as it installed")
1350
+		assert.IsType(t, &io.PipeReader{}, contextReaderCloserWrapper.Reader)
1351
+	} else {
1352
+		t.Log("Tested whether Pigz is not used, as it not installed")
1353
+		assert.IsType(t, &gzip.Reader{}, contextReaderCloserWrapper.Reader)
1354
+	}
1355
+}
... ...
@@ -8,18 +8,22 @@ import (
8 8
 	"golang.org/x/net/context"
9 9
 )
10 10
 
11
-type readCloserWrapper struct {
11
+// ReadCloserWrapper wraps an io.Reader, and implements an io.ReadCloser
12
+// It calls the given callback function when closed. It should be constructed
13
+// with NewReadCloserWrapper
14
+type ReadCloserWrapper struct {
12 15
 	io.Reader
13 16
 	closer func() error
14 17
 }
15 18
 
16
-func (r *readCloserWrapper) Close() error {
19
+// Close calls back the passed closer function
20
+func (r *ReadCloserWrapper) Close() error {
17 21
 	return r.closer()
18 22
 }
19 23
 
20 24
 // NewReadCloserWrapper returns a new io.ReadCloser.
21 25
 func NewReadCloserWrapper(r io.Reader, closer func() error) io.ReadCloser {
22
-	return &readCloserWrapper{
26
+	return &ReadCloserWrapper{
23 27
 		Reader: r,
24 28
 		closer: closer,
25 29
 	}