Browse code

Add synchronization and closure to IO pipes in userns path

The execdriver pipes setup uses OS pipes with fds so that they can be
chown'ed to the remapped root user for proper access. Recent flakiness
in certain short-lived tests (usually via the "exec" path) reveals that
the copy routines are not completing before exit/tear-down.

This fix adds synchronization and proper closure such that these
routines exit successfully.

Docker-DCO-1.1-Signed-off-by: Phil Estes <estesp@linux.vnet.ibm.com> (github: estesp)

Phil Estes authored on 2016/02/27 02:59:48
Showing 2 changed files
... ...
@@ -152,7 +152,9 @@ func (d *Driver) Run(c *execdriver.Command, pipes *execdriver.Pipes, hooks execd
152 152
 		User: c.ProcessConfig.User,
153 153
 	}
154 154
 
155
-	if err := setupPipes(container, &c.ProcessConfig, p, pipes); err != nil {
155
+	wg := sync.WaitGroup{}
156
+	writers, err := setupPipes(container, &c.ProcessConfig, p, pipes, &wg)
157
+	if err != nil {
156 158
 		return execdriver.ExitStatus{ExitCode: -1}, err
157 159
 	}
158 160
 
... ...
@@ -174,6 +176,10 @@ func (d *Driver) Run(c *execdriver.Command, pipes *execdriver.Pipes, hooks execd
174 174
 		return execdriver.ExitStatus{ExitCode: -1}, err
175 175
 	}
176 176
 
177
+	//close the write end of any opened pipes now that they are dup'ed into the container
178
+	for _, writer := range writers {
179
+		writer.Close()
180
+	}
177 181
 	// 'oom' is used to emit 'oom' events to the eventstream, 'oomKilled' is used
178 182
 	// to set the 'OOMKilled' flag in state
179 183
 	oom := notifyOnOOM(cont)
... ...
@@ -202,6 +208,9 @@ func (d *Driver) Run(c *execdriver.Command, pipes *execdriver.Pipes, hooks execd
202 202
 		}
203 203
 		ps = execErr.ProcessState
204 204
 	}
205
+	// wait for all IO goroutine copiers to finish
206
+	wg.Wait()
207
+
205 208
 	cont.Destroy()
206 209
 	destroyed = true
207 210
 	// oomKilled will have an oom event if any process within the container was
... ...
@@ -480,24 +489,26 @@ func (t *TtyConsole) Close() error {
480 480
 	return t.console.Close()
481 481
 }
482 482
 
483
-func setupPipes(container *configs.Config, processConfig *execdriver.ProcessConfig, p *libcontainer.Process, pipes *execdriver.Pipes) error {
483
+func setupPipes(container *configs.Config, processConfig *execdriver.ProcessConfig, p *libcontainer.Process, pipes *execdriver.Pipes, wg *sync.WaitGroup) ([]io.WriteCloser, error) {
484
+
485
+	writers := []io.WriteCloser{}
484 486
 
485 487
 	rootuid, err := container.HostUID()
486 488
 	if err != nil {
487
-		return err
489
+		return writers, err
488 490
 	}
489 491
 
490 492
 	if processConfig.Tty {
491 493
 		cons, err := p.NewConsole(rootuid)
492 494
 		if err != nil {
493
-			return err
495
+			return writers, err
494 496
 		}
495 497
 		term, err := NewTtyConsole(cons, pipes)
496 498
 		if err != nil {
497
-			return err
499
+			return writers, err
498 500
 		}
499 501
 		processConfig.Terminal = term
500
-		return nil
502
+		return writers, nil
501 503
 	}
502 504
 	// not a tty--set up stdio pipes
503 505
 	term := &execdriver.StdConsole{}
... ...
@@ -512,7 +523,7 @@ func setupPipes(container *configs.Config, processConfig *execdriver.ProcessConf
512 512
 
513 513
 		r, w, err := os.Pipe()
514 514
 		if err != nil {
515
-			return err
515
+			return writers, err
516 516
 		}
517 517
 		if pipes.Stdin != nil {
518 518
 			go func() {
... ...
@@ -521,23 +532,32 @@ func setupPipes(container *configs.Config, processConfig *execdriver.ProcessConf
521 521
 			}()
522 522
 			p.Stdin = r
523 523
 		}
524
-		return nil
524
+		return writers, nil
525 525
 	}
526 526
 
527 527
 	// if we have user namespaces enabled (rootuid != 0), we will set
528 528
 	// up os pipes for stderr, stdout, stdin so we can chown them to
529 529
 	// the proper ownership to allow for proper access to the underlying
530 530
 	// fds
531
-	var fds []int
531
+	var fds []uintptr
532
+
533
+	copyPipes := func(out io.Writer, in io.ReadCloser) {
534
+		defer wg.Done()
535
+		io.Copy(out, in)
536
+		in.Close()
537
+	}
532 538
 
533 539
 	//setup stdout
534 540
 	r, w, err := os.Pipe()
535 541
 	if err != nil {
536
-		return err
542
+		w.Close()
543
+		return writers, err
537 544
 	}
538
-	fds = append(fds, int(r.Fd()), int(w.Fd()))
545
+	writers = append(writers, w)
546
+	fds = append(fds, r.Fd(), w.Fd())
539 547
 	if pipes.Stdout != nil {
540
-		go io.Copy(pipes.Stdout, r)
548
+		wg.Add(1)
549
+		go copyPipes(pipes.Stdout, r)
541 550
 	}
542 551
 	term.Closers = append(term.Closers, r)
543 552
 	p.Stdout = w
... ...
@@ -545,11 +565,14 @@ func setupPipes(container *configs.Config, processConfig *execdriver.ProcessConf
545 545
 	//setup stderr
546 546
 	r, w, err = os.Pipe()
547 547
 	if err != nil {
548
-		return err
548
+		w.Close()
549
+		return writers, err
549 550
 	}
550
-	fds = append(fds, int(r.Fd()), int(w.Fd()))
551
+	writers = append(writers, w)
552
+	fds = append(fds, r.Fd(), w.Fd())
551 553
 	if pipes.Stderr != nil {
552
-		go io.Copy(pipes.Stderr, r)
554
+		wg.Add(1)
555
+		go copyPipes(pipes.Stderr, r)
553 556
 	}
554 557
 	term.Closers = append(term.Closers, r)
555 558
 	p.Stderr = w
... ...
@@ -557,9 +580,10 @@ func setupPipes(container *configs.Config, processConfig *execdriver.ProcessConf
557 557
 	//setup stdin
558 558
 	r, w, err = os.Pipe()
559 559
 	if err != nil {
560
-		return err
560
+		r.Close()
561
+		return writers, err
561 562
 	}
562
-	fds = append(fds, int(r.Fd()), int(w.Fd()))
563
+	fds = append(fds, r.Fd(), w.Fd())
563 564
 	if pipes.Stdin != nil {
564 565
 		go func() {
565 566
 			io.Copy(w, pipes.Stdin)
... ...
@@ -568,11 +592,11 @@ func setupPipes(container *configs.Config, processConfig *execdriver.ProcessConf
568 568
 		p.Stdin = r
569 569
 	}
570 570
 	for _, fd := range fds {
571
-		if err := syscall.Fchown(fd, rootuid, rootuid); err != nil {
572
-			return fmt.Errorf("Failed to chown pipes fd: %v", err)
571
+		if err := syscall.Fchown(int(fd), rootuid, rootuid); err != nil {
572
+			return writers, fmt.Errorf("Failed to chown pipes fd: %v", err)
573 573
 		}
574 574
 	}
575
-	return nil
575
+	return writers, nil
576 576
 }
577 577
 
578 578
 // SupportsHooks implements the execdriver Driver interface.
... ...
@@ -7,6 +7,7 @@ import (
7 7
 	"os"
8 8
 	"os/exec"
9 9
 	"strings"
10
+	"sync"
10 11
 	"syscall"
11 12
 
12 13
 	"github.com/docker/docker/daemon/execdriver"
... ...
@@ -52,13 +53,19 @@ func (d *Driver) Exec(c *execdriver.Command, processConfig *execdriver.ProcessCo
52 52
 	}
53 53
 
54 54
 	config := active.Config()
55
-	if err := setupPipes(&config, processConfig, p, pipes); err != nil {
55
+	wg := sync.WaitGroup{}
56
+	writers, err := setupPipes(&config, processConfig, p, pipes, &wg)
57
+	if err != nil {
56 58
 		return -1, err
57 59
 	}
58 60
 
59 61
 	if err := active.Start(p); err != nil {
60 62
 		return -1, err
61 63
 	}
64
+	//close the write end of any opened pipes now that they are dup'ed into the container
65
+	for _, writer := range writers {
66
+		writer.Close()
67
+	}
62 68
 
63 69
 	if hooks.Start != nil {
64 70
 		pid, err := p.Pid()
... ...
@@ -83,5 +90,7 @@ func (d *Driver) Exec(c *execdriver.Command, processConfig *execdriver.ProcessCo
83 83
 		}
84 84
 		ps = exitErr.ProcessState
85 85
 	}
86
+	// wait for all IO goroutine copiers to finish
87
+	wg.Wait()
86 88
 	return utils.ExitStatus(ps.Sys().(syscall.WaitStatus)), nil
87 89
 }