The race is between pools.Put which calls buf.Reset and exec.Cmd
doing io.Copy from the buffer; it caused a runtime crash, as
described in #16924:
``` docker-daemon cat the-tarball.xz | xz -d -c -q | docker-untar /path/to/... (aufs ) ```
When docker-untar side fails (like try to set xattr on aufs, or a broken
tar), invokeUnpack will be responsible to exhaust all input, otherwise
`xz` will be write pending for ever.
this change add a receive only channel to cmdStream, and will close it
to notify it's now safe to close the input stream;
in CmdStream the change to use Stdin / Stdout / Stderr keeps the
code simple, os/exec.Cmd will spawn goroutines and call io.Copy automatically.
the CmdStream is actually called in the same file only, change it
lowercase to mark as private.
[...]
INFO[0000] Docker daemon commit=0a8c2e3 execdriver=native-0.2 graphdriver=aufs version=1.8.2
DEBU[0006] Calling POST /build
INFO[0006] POST /v1.20/build?cgroupparent=&cpuperiod=0&cpuquota=0&cpusetcpus=&cpusetmems=&cpushares=0&dockerfile=Dockerfile&memory=0&memswap=0&rm=1&t=gentoo-x32&ulimits=null
DEBU[0008] [BUILDER] Cache miss
DEBU[0009] Couldn't untar /home/lib-docker-v1.8.2-tmp/tmp/docker-build316710953/stage3-x32-20151004.tar.xz to /home/lib-docker-v1.8.2-tmp/aufs/mnt/d909abb87150463939c13e8a349b889a72d9b14f0cfcab42a8711979be285537: Untar re-exec error: exit status 1: output: operation not supported
DEBU[0009] CopyFileWithTar(/home/lib-docker-v1.8.2-tmp/tmp/docker-build316710953/stage3-x32-20151004.tar.xz, /home/lib-docker-v1.8.2-tmp/aufs/mnt/d909abb87150463939c13e8a349b889a72d9b14f0cfcab42a8711979be285537/)
panic: runtime error: slice bounds out of range
goroutine 42 [running]:
bufio.(*Reader).fill(0xc208187800)
/usr/local/go/src/bufio/bufio.go:86 +0x2db
bufio.(*Reader).WriteTo(0xc208187800, 0x7ff39602d150, 0xc2083f11a0, 0x508000, 0x0, 0x0)
/usr/local/go/src/bufio/bufio.go:449 +0x27e
io.Copy(0x7ff39602d150, 0xc2083f11a0, 0x7ff3960261f8, 0xc208187800, 0x0, 0x0, 0x0)
/usr/local/go/src/io/io.go:354 +0xb2
github.com/docker/docker/pkg/archive.funcĀ·006()
/go/src/github.com/docker/docker/pkg/archive/archive.go:817 +0x71
created by github.com/docker/docker/pkg/archive.CmdStream
/go/src/github.com/docker/docker/pkg/archive/archive.go:819 +0x1ec
goroutine 1 [chan receive]:
main.(*DaemonCli).CmdDaemon(0xc20809da30, 0xc20800a020, 0xd, 0xd, 0x0, 0x0)
/go/src/github.com/docker/docker/docker/daemon.go:289 +0x1781
reflect.callMethod(0xc208140090, 0xc20828fce0)
/usr/local/go/src/reflect/value.go:605 +0x179
reflect.methodValueCall(0xc20800a020, 0xd, 0xd, 0x1, 0xc208140090, 0x0, 0x0, 0xc208140090, 0x0, 0x45343f, ...)
/usr/local/go/src/reflect/asm_amd64.s:29 +0x36
github.com/docker/docker/cli.(*Cli).Run(0xc208129fb0, 0xc20800a010, 0xe, 0xe, 0x0, 0x0)
/go/src/github.com/docker/docker/cli/cli.go:89 +0x38e
main.main()
/go/src/github.com/docker/docker/docker/docker.go:69 +0x428
goroutine 5 [syscall]:
os/signal.loop()
/usr/local/go/src/os/signal/signal_unix.go:21 +0x1f
created by os/signal.initĀ·1
/usr/local/go/src/os/signal/signal_unix.go:27 +0x35
Signed-off-by: Derek Ch <denc716@gmail.com>
| ... | ... |
@@ -20,6 +20,7 @@ import ( |
| 20 | 20 |
"github.com/Sirupsen/logrus" |
| 21 | 21 |
"github.com/docker/docker/pkg/fileutils" |
| 22 | 22 |
"github.com/docker/docker/pkg/idtools" |
| 23 |
+ "github.com/docker/docker/pkg/ioutils" |
|
| 23 | 24 |
"github.com/docker/docker/pkg/pools" |
| 24 | 25 |
"github.com/docker/docker/pkg/promise" |
| 25 | 26 |
"github.com/docker/docker/pkg/system" |
| ... | ... |
@@ -116,10 +117,10 @@ func DetectCompression(source []byte) Compression {
|
| 116 | 116 |
return Uncompressed |
| 117 | 117 |
} |
| 118 | 118 |
|
| 119 |
-func xzDecompress(archive io.Reader) (io.ReadCloser, error) {
|
|
| 119 |
+func xzDecompress(archive io.Reader) (io.ReadCloser, <-chan struct{}, error) {
|
|
| 120 | 120 |
args := []string{"xz", "-d", "-c", "-q"}
|
| 121 | 121 |
|
| 122 |
- return CmdStream(exec.Command(args[0], args[1:]...), archive) |
|
| 122 |
+ return cmdStream(exec.Command(args[0], args[1:]...), archive) |
|
| 123 | 123 |
} |
| 124 | 124 |
|
| 125 | 125 |
// DecompressStream decompress the archive and returns a ReaderCloser with the decompressed archive. |
| ... | ... |
@@ -148,12 +149,15 @@ func DecompressStream(archive io.Reader) (io.ReadCloser, error) {
|
| 148 | 148 |
readBufWrapper := p.NewReadCloserWrapper(buf, bz2Reader) |
| 149 | 149 |
return readBufWrapper, nil |
| 150 | 150 |
case Xz: |
| 151 |
- xzReader, err := xzDecompress(buf) |
|
| 151 |
+ xzReader, chdone, err := xzDecompress(buf) |
|
| 152 | 152 |
if err != nil {
|
| 153 | 153 |
return nil, err |
| 154 | 154 |
} |
| 155 | 155 |
readBufWrapper := p.NewReadCloserWrapper(buf, xzReader) |
| 156 |
- return readBufWrapper, nil |
|
| 156 |
+ return ioutils.NewReadCloserWrapper(readBufWrapper, func() error {
|
|
| 157 |
+ <-chdone |
|
| 158 |
+ return readBufWrapper.Close() |
|
| 159 |
+ }), nil |
|
| 157 | 160 |
default: |
| 158 | 161 |
return nil, fmt.Errorf("Unsupported compression format %s", (&compression).Extension())
|
| 159 | 162 |
} |
| ... | ... |
@@ -925,57 +929,33 @@ func CopyFileWithTar(src, dst string) (err error) {
|
| 925 | 925 |
return defaultArchiver.CopyFileWithTar(src, dst) |
| 926 | 926 |
} |
| 927 | 927 |
|
| 928 |
-// CmdStream executes a command, and returns its stdout as a stream. |
|
| 928 |
+// cmdStream executes a command, and returns its stdout as a stream. |
|
| 929 | 929 |
// If the command fails to run or doesn't complete successfully, an error |
| 930 | 930 |
// will be returned, including anything written on stderr. |
| 931 |
-func CmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, error) {
|
|
| 932 |
- if input != nil {
|
|
| 933 |
- stdin, err := cmd.StdinPipe() |
|
| 934 |
- if err != nil {
|
|
| 935 |
- return nil, err |
|
| 936 |
- } |
|
| 937 |
- // Write stdin if any |
|
| 938 |
- go func() {
|
|
| 939 |
- io.Copy(stdin, input) |
|
| 940 |
- stdin.Close() |
|
| 941 |
- }() |
|
| 942 |
- } |
|
| 943 |
- stdout, err := cmd.StdoutPipe() |
|
| 944 |
- if err != nil {
|
|
| 945 |
- return nil, err |
|
| 946 |
- } |
|
| 947 |
- stderr, err := cmd.StderrPipe() |
|
| 948 |
- if err != nil {
|
|
| 949 |
- return nil, err |
|
| 950 |
- } |
|
| 931 |
+func cmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, <-chan struct{}, error) {
|
|
| 932 |
+ chdone := make(chan struct{})
|
|
| 933 |
+ cmd.Stdin = input |
|
| 951 | 934 |
pipeR, pipeW := io.Pipe() |
| 952 |
- errChan := make(chan []byte) |
|
| 953 |
- // Collect stderr, we will use it in case of an error |
|
| 954 |
- go func() {
|
|
| 955 |
- errText, e := ioutil.ReadAll(stderr) |
|
| 956 |
- if e != nil {
|
|
| 957 |
- errText = []byte("(...couldn't fetch stderr: " + e.Error() + ")")
|
|
| 958 |
- } |
|
| 959 |
- errChan <- errText |
|
| 960 |
- }() |
|
| 935 |
+ cmd.Stdout = pipeW |
|
| 936 |
+ var errBuf bytes.Buffer |
|
| 937 |
+ cmd.Stderr = &errBuf |
|
| 938 |
+ |
|
| 939 |
+ // Run the command and return the pipe |
|
| 940 |
+ if err := cmd.Start(); err != nil {
|
|
| 941 |
+ return nil, nil, err |
|
| 942 |
+ } |
|
| 943 |
+ |
|
| 961 | 944 |
// Copy stdout to the returned pipe |
| 962 | 945 |
go func() {
|
| 963 |
- _, err := io.Copy(pipeW, stdout) |
|
| 964 |
- if err != nil {
|
|
| 965 |
- pipeW.CloseWithError(err) |
|
| 966 |
- } |
|
| 967 |
- errText := <-errChan |
|
| 968 | 946 |
if err := cmd.Wait(); err != nil {
|
| 969 |
- pipeW.CloseWithError(fmt.Errorf("%s: %s", err, errText))
|
|
| 947 |
+ pipeW.CloseWithError(fmt.Errorf("%s: %s", err, errBuf.String()))
|
|
| 970 | 948 |
} else {
|
| 971 | 949 |
pipeW.Close() |
| 972 | 950 |
} |
| 951 |
+ close(chdone) |
|
| 973 | 952 |
}() |
| 974 |
- // Run the command and return the pipe |
|
| 975 |
- if err := cmd.Start(); err != nil {
|
|
| 976 |
- return nil, err |
|
| 977 |
- } |
|
| 978 |
- return pipeR, nil |
|
| 953 |
+ |
|
| 954 |
+ return pipeR, chdone, nil |
|
| 979 | 955 |
} |
| 980 | 956 |
|
| 981 | 957 |
// NewTempArchive reads the content of src into a temporary file, and returns the contents |
| ... | ... |
@@ -160,7 +160,7 @@ func TestExtensionXz(t *testing.T) {
|
| 160 | 160 |
|
| 161 | 161 |
func TestCmdStreamLargeStderr(t *testing.T) {
|
| 162 | 162 |
cmd := exec.Command("/bin/sh", "-c", "dd if=/dev/zero bs=1k count=1000 of=/dev/stderr; echo hello")
|
| 163 |
- out, err := CmdStream(cmd, nil) |
|
| 163 |
+ out, _, err := cmdStream(cmd, nil) |
|
| 164 | 164 |
if err != nil {
|
| 165 | 165 |
t.Fatalf("Failed to start command: %s", err)
|
| 166 | 166 |
} |
| ... | ... |
@@ -181,7 +181,7 @@ func TestCmdStreamLargeStderr(t *testing.T) {
|
| 181 | 181 |
|
| 182 | 182 |
func TestCmdStreamBad(t *testing.T) {
|
| 183 | 183 |
badCmd := exec.Command("/bin/sh", "-c", "echo hello; echo >&2 error couldn\\'t reverse the phase pulser; exit 1")
|
| 184 |
- out, err := CmdStream(badCmd, nil) |
|
| 184 |
+ out, _, err := cmdStream(badCmd, nil) |
|
| 185 | 185 |
if err != nil {
|
| 186 | 186 |
t.Fatalf("Failed to start command: %s", err)
|
| 187 | 187 |
} |
| ... | ... |
@@ -196,7 +196,7 @@ func TestCmdStreamBad(t *testing.T) {
|
| 196 | 196 |
|
| 197 | 197 |
func TestCmdStreamGood(t *testing.T) {
|
| 198 | 198 |
cmd := exec.Command("/bin/sh", "-c", "echo hello; exit 0")
|
| 199 |
- out, err := CmdStream(cmd, nil) |
|
| 199 |
+ out, _, err := cmdStream(cmd, nil) |
|
| 200 | 200 |
if err != nil {
|
| 201 | 201 |
t.Fatal(err) |
| 202 | 202 |
} |
| ... | ... |
@@ -8,6 +8,7 @@ import ( |
| 8 | 8 |
"flag" |
| 9 | 9 |
"fmt" |
| 10 | 10 |
"io" |
| 11 |
+ "io/ioutil" |
|
| 11 | 12 |
"os" |
| 12 | 13 |
"runtime" |
| 13 | 14 |
"syscall" |
| ... | ... |
@@ -79,6 +80,11 @@ func invokeUnpack(decompressedArchive io.Reader, dest string, options *archive.T |
| 79 | 79 |
w.Close() |
| 80 | 80 |
|
| 81 | 81 |
if err := cmd.Wait(); err != nil {
|
| 82 |
+ // when `xz -d -c -q | docker-untar ...` failed on docker-untar side, |
|
| 83 |
+ // we need to exhaust `xz`'s output, otherwise the `xz` side will be |
|
| 84 |
+ // pending on write pipe forever |
|
| 85 |
+ io.Copy(ioutil.Discard, decompressedArchive) |
|
| 86 |
+ |
|
| 82 | 87 |
return fmt.Errorf("Untar re-exec error: %v: output: %s", err, output)
|
| 83 | 88 |
} |
| 84 | 89 |
return nil |