Make image (layer) downloads faster by using pigz
unclejack authored on 2018/01/18 04:18:20... | ... |
@@ -6,6 +6,7 @@ import ( |
6 | 6 |
"bytes" |
7 | 7 |
"compress/bzip2" |
8 | 8 |
"compress/gzip" |
9 |
+ "context" |
|
9 | 10 |
"fmt" |
10 | 11 |
"io" |
11 | 12 |
"io/ioutil" |
... | ... |
@@ -13,6 +14,7 @@ import ( |
13 | 13 |
"os/exec" |
14 | 14 |
"path/filepath" |
15 | 15 |
"runtime" |
16 |
+ "strconv" |
|
16 | 17 |
"strings" |
17 | 18 |
"syscall" |
18 | 19 |
|
... | ... |
@@ -24,6 +26,17 @@ import ( |
24 | 24 |
"github.com/sirupsen/logrus" |
25 | 25 |
) |
26 | 26 |
|
27 |
+var unpigzPath string |
|
28 |
+ |
|
29 |
+func init() { |
|
30 |
+ if path, err := exec.LookPath("unpigz"); err != nil { |
|
31 |
+ logrus.Debug("unpigz binary not found in PATH, falling back to go gzip library") |
|
32 |
+ } else { |
|
33 |
+ logrus.Debugf("Using unpigz binary found at path %s", path) |
|
34 |
+ unpigzPath = path |
|
35 |
+ } |
|
36 |
+} |
|
37 |
+ |
|
27 | 38 |
type ( |
28 | 39 |
// Compression is the state represents if compressed or not. |
29 | 40 |
Compression int |
... | ... |
@@ -136,10 +149,34 @@ func DetectCompression(source []byte) Compression { |
136 | 136 |
return Uncompressed |
137 | 137 |
} |
138 | 138 |
|
139 |
-func xzDecompress(archive io.Reader) (io.ReadCloser, <-chan struct{}, error) { |
|
139 |
+func xzDecompress(ctx context.Context, archive io.Reader) (io.ReadCloser, error) { |
|
140 | 140 |
args := []string{"xz", "-d", "-c", "-q"} |
141 | 141 |
|
142 |
- return cmdStream(exec.Command(args[0], args[1:]...), archive) |
|
142 |
+ return cmdStream(exec.CommandContext(ctx, args[0], args[1:]...), archive) |
|
143 |
+} |
|
144 |
+ |
|
145 |
+func gzDecompress(ctx context.Context, buf io.Reader) (io.ReadCloser, error) { |
|
146 |
+ if unpigzPath == "" { |
|
147 |
+ return gzip.NewReader(buf) |
|
148 |
+ } |
|
149 |
+ |
|
150 |
+ disablePigzEnv := os.Getenv("MOBY_DISABLE_PIGZ") |
|
151 |
+ if disablePigzEnv != "" { |
|
152 |
+ if disablePigz, err := strconv.ParseBool(disablePigzEnv); err != nil { |
|
153 |
+ return nil, err |
|
154 |
+ } else if disablePigz { |
|
155 |
+ return gzip.NewReader(buf) |
|
156 |
+ } |
|
157 |
+ } |
|
158 |
+ |
|
159 |
+ return cmdStream(exec.CommandContext(ctx, unpigzPath, "-d", "-c"), buf) |
|
160 |
+} |
|
161 |
+ |
|
162 |
+func wrapReadCloser(readBuf io.ReadCloser, cancel context.CancelFunc) io.ReadCloser { |
|
163 |
+ return ioutils.NewReadCloserWrapper(readBuf, func() error { |
|
164 |
+ cancel() |
|
165 |
+ return readBuf.Close() |
|
166 |
+ }) |
|
143 | 167 |
} |
144 | 168 |
|
145 | 169 |
// DecompressStream decompresses the archive and returns a ReaderCloser with the decompressed archive. |
... | ... |
@@ -163,26 +200,29 @@ func DecompressStream(archive io.Reader) (io.ReadCloser, error) { |
163 | 163 |
readBufWrapper := p.NewReadCloserWrapper(buf, buf) |
164 | 164 |
return readBufWrapper, nil |
165 | 165 |
case Gzip: |
166 |
- gzReader, err := gzip.NewReader(buf) |
|
166 |
+ ctx, cancel := context.WithCancel(context.Background()) |
|
167 |
+ |
|
168 |
+ gzReader, err := gzDecompress(ctx, buf) |
|
167 | 169 |
if err != nil { |
170 |
+ cancel() |
|
168 | 171 |
return nil, err |
169 | 172 |
} |
170 | 173 |
readBufWrapper := p.NewReadCloserWrapper(buf, gzReader) |
171 |
- return readBufWrapper, nil |
|
174 |
+ return wrapReadCloser(readBufWrapper, cancel), nil |
|
172 | 175 |
case Bzip2: |
173 | 176 |
bz2Reader := bzip2.NewReader(buf) |
174 | 177 |
readBufWrapper := p.NewReadCloserWrapper(buf, bz2Reader) |
175 | 178 |
return readBufWrapper, nil |
176 | 179 |
case Xz: |
177 |
- xzReader, chdone, err := xzDecompress(buf) |
|
180 |
+ ctx, cancel := context.WithCancel(context.Background()) |
|
181 |
+ |
|
182 |
+ xzReader, err := xzDecompress(ctx, buf) |
|
178 | 183 |
if err != nil { |
184 |
+ cancel() |
|
179 | 185 |
return nil, err |
180 | 186 |
} |
181 | 187 |
readBufWrapper := p.NewReadCloserWrapper(buf, xzReader) |
182 |
- return ioutils.NewReadCloserWrapper(readBufWrapper, func() error { |
|
183 |
- <-chdone |
|
184 |
- return readBufWrapper.Close() |
|
185 |
- }), nil |
|
188 |
+ return wrapReadCloser(readBufWrapper, cancel), nil |
|
186 | 189 |
default: |
187 | 190 |
return nil, fmt.Errorf("Unsupported compression format %s", (&compression).Extension()) |
188 | 191 |
} |
... | ... |
@@ -1163,8 +1203,7 @@ func remapIDs(idMappings *idtools.IDMappings, hdr *tar.Header) error { |
1163 | 1163 |
// cmdStream executes a command, and returns its stdout as a stream. |
1164 | 1164 |
// If the command fails to run or doesn't complete successfully, an error |
1165 | 1165 |
// will be returned, including anything written on stderr. |
1166 |
-func cmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, <-chan struct{}, error) { |
|
1167 |
- chdone := make(chan struct{}) |
|
1166 |
+func cmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, error) { |
|
1168 | 1167 |
cmd.Stdin = input |
1169 | 1168 |
pipeR, pipeW := io.Pipe() |
1170 | 1169 |
cmd.Stdout = pipeW |
... | ... |
@@ -1173,7 +1212,7 @@ func cmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, <-chan struct{}, |
1173 | 1173 |
|
1174 | 1174 |
// Run the command and return the pipe |
1175 | 1175 |
if err := cmd.Start(); err != nil { |
1176 |
- return nil, nil, err |
|
1176 |
+ return nil, err |
|
1177 | 1177 |
} |
1178 | 1178 |
|
1179 | 1179 |
// Copy stdout to the returned pipe |
... | ... |
@@ -1183,10 +1222,9 @@ func cmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, <-chan struct{}, |
1183 | 1183 |
} else { |
1184 | 1184 |
pipeW.Close() |
1185 | 1185 |
} |
1186 |
- close(chdone) |
|
1187 | 1186 |
}() |
1188 | 1187 |
|
1189 |
- return pipeR, chdone, nil |
|
1188 |
+ return pipeR, nil |
|
1190 | 1189 |
} |
1191 | 1190 |
|
1192 | 1191 |
// NewTempArchive reads the content of src into a temporary file, and returns the contents |
... | ... |
@@ -3,6 +3,7 @@ package archive |
3 | 3 |
import ( |
4 | 4 |
"archive/tar" |
5 | 5 |
"bytes" |
6 |
+ "compress/gzip" |
|
6 | 7 |
"fmt" |
7 | 8 |
"io" |
8 | 9 |
"io/ioutil" |
... | ... |
@@ -15,6 +16,7 @@ import ( |
15 | 15 |
"time" |
16 | 16 |
|
17 | 17 |
"github.com/docker/docker/pkg/idtools" |
18 |
+ "github.com/docker/docker/pkg/ioutils" |
|
18 | 19 |
"github.com/stretchr/testify/assert" |
19 | 20 |
"github.com/stretchr/testify/require" |
20 | 21 |
) |
... | ... |
@@ -87,7 +89,7 @@ func TestIsArchivePathTar(t *testing.T) { |
87 | 87 |
} |
88 | 88 |
} |
89 | 89 |
|
90 |
-func testDecompressStream(t *testing.T, ext, compressCommand string) { |
|
90 |
+func testDecompressStream(t *testing.T, ext, compressCommand string) io.Reader { |
|
91 | 91 |
cmd := exec.Command("sh", "-c", |
92 | 92 |
fmt.Sprintf("touch /tmp/archive && %s /tmp/archive", compressCommand)) |
93 | 93 |
output, err := cmd.CombinedOutput() |
... | ... |
@@ -111,6 +113,8 @@ func testDecompressStream(t *testing.T, ext, compressCommand string) { |
111 | 111 |
if err = r.Close(); err != nil { |
112 | 112 |
t.Fatalf("Failed to close the decompressed stream: %v ", err) |
113 | 113 |
} |
114 |
+ |
|
115 |
+ return r |
|
114 | 116 |
} |
115 | 117 |
|
116 | 118 |
func TestDecompressStreamGzip(t *testing.T) { |
... | ... |
@@ -206,7 +210,7 @@ func TestExtensionXz(t *testing.T) { |
206 | 206 |
|
207 | 207 |
func TestCmdStreamLargeStderr(t *testing.T) { |
208 | 208 |
cmd := exec.Command("sh", "-c", "dd if=/dev/zero bs=1k count=1000 of=/dev/stderr; echo hello") |
209 |
- out, _, err := cmdStream(cmd, nil) |
|
209 |
+ out, err := cmdStream(cmd, nil) |
|
210 | 210 |
if err != nil { |
211 | 211 |
t.Fatalf("Failed to start command: %s", err) |
212 | 212 |
} |
... | ... |
@@ -231,7 +235,7 @@ func TestCmdStreamBad(t *testing.T) { |
231 | 231 |
t.Skip("Failing on Windows CI machines") |
232 | 232 |
} |
233 | 233 |
badCmd := exec.Command("sh", "-c", "echo hello; echo >&2 error couldn\\'t reverse the phase pulser; exit 1") |
234 |
- out, _, err := cmdStream(badCmd, nil) |
|
234 |
+ out, err := cmdStream(badCmd, nil) |
|
235 | 235 |
if err != nil { |
236 | 236 |
t.Fatalf("Failed to start command: %s", err) |
237 | 237 |
} |
... | ... |
@@ -246,7 +250,7 @@ func TestCmdStreamBad(t *testing.T) { |
246 | 246 |
|
247 | 247 |
func TestCmdStreamGood(t *testing.T) { |
248 | 248 |
cmd := exec.Command("sh", "-c", "echo hello; exit 0") |
249 |
- out, _, err := cmdStream(cmd, nil) |
|
249 |
+ out, err := cmdStream(cmd, nil) |
|
250 | 250 |
if err != nil { |
251 | 251 |
t.Fatal(err) |
252 | 252 |
} |
... | ... |
@@ -1318,3 +1322,38 @@ func readFileFromArchive(t *testing.T, archive io.ReadCloser, name string, expec |
1318 | 1318 |
assert.NoError(t, err) |
1319 | 1319 |
return string(content) |
1320 | 1320 |
} |
1321 |
+ |
|
1322 |
+func TestDisablePigz(t *testing.T) { |
|
1323 |
+ _, err := exec.LookPath("unpigz") |
|
1324 |
+ if err != nil { |
|
1325 |
+ t.Log("Test will not check full path when Pigz not installed") |
|
1326 |
+ } |
|
1327 |
+ |
|
1328 |
+ os.Setenv("MOBY_DISABLE_PIGZ", "true") |
|
1329 |
+ defer os.Unsetenv("MOBY_DISABLE_PIGZ") |
|
1330 |
+ |
|
1331 |
+ r := testDecompressStream(t, "gz", "gzip -f") |
|
1332 |
+ // For the bufio pool |
|
1333 |
+ outsideReaderCloserWrapper := r.(*ioutils.ReadCloserWrapper) |
|
1334 |
+ // For the context canceller |
|
1335 |
+ contextReaderCloserWrapper := outsideReaderCloserWrapper.Reader.(*ioutils.ReadCloserWrapper) |
|
1336 |
+ |
|
1337 |
+ assert.IsType(t, &gzip.Reader{}, contextReaderCloserWrapper.Reader) |
|
1338 |
+} |
|
1339 |
+ |
|
1340 |
+func TestPigz(t *testing.T) { |
|
1341 |
+ r := testDecompressStream(t, "gz", "gzip -f") |
|
1342 |
+ // For the bufio pool |
|
1343 |
+ outsideReaderCloserWrapper := r.(*ioutils.ReadCloserWrapper) |
|
1344 |
+ // For the context canceller |
|
1345 |
+ contextReaderCloserWrapper := outsideReaderCloserWrapper.Reader.(*ioutils.ReadCloserWrapper) |
|
1346 |
+ |
|
1347 |
+ _, err := exec.LookPath("unpigz") |
|
1348 |
+ if err == nil { |
|
1349 |
+ t.Log("Tested whether Pigz is used, as it installed") |
|
1350 |
+ assert.IsType(t, &io.PipeReader{}, contextReaderCloserWrapper.Reader) |
|
1351 |
+ } else { |
|
1352 |
+ t.Log("Tested whether Pigz is not used, as it not installed") |
|
1353 |
+ assert.IsType(t, &gzip.Reader{}, contextReaderCloserWrapper.Reader) |
|
1354 |
+ } |
|
1355 |
+} |
... | ... |
@@ -8,18 +8,22 @@ import ( |
8 | 8 |
"golang.org/x/net/context" |
9 | 9 |
) |
10 | 10 |
|
11 |
-type readCloserWrapper struct { |
|
11 |
+// ReadCloserWrapper wraps an io.Reader, and implements an io.ReadCloser |
|
12 |
+// It calls the given callback function when closed. It should be constructed |
|
13 |
+// with NewReadCloserWrapper |
|
14 |
+type ReadCloserWrapper struct { |
|
12 | 15 |
io.Reader |
13 | 16 |
closer func() error |
14 | 17 |
} |
15 | 18 |
|
16 |
-func (r *readCloserWrapper) Close() error { |
|
19 |
+// Close calls back the passed closer function |
|
20 |
+func (r *ReadCloserWrapper) Close() error { |
|
17 | 21 |
return r.closer() |
18 | 22 |
} |
19 | 23 |
|
20 | 24 |
// NewReadCloserWrapper returns a new io.ReadCloser. |
21 | 25 |
func NewReadCloserWrapper(r io.Reader, closer func() error) io.ReadCloser { |
22 |
- return &readCloserWrapper{ |
|
26 |
+ return &ReadCloserWrapper{ |
|
23 | 27 |
Reader: r, |
24 | 28 |
closer: closer, |
25 | 29 |
} |