Browse code

Handle blocked I/O of exec'd processes

This is the second part to
https://github.com/containerd/containerd/pull/3361 and will help process
delete not block forever when the process exists but the I/O was
inherited by a subprocess that lives on.

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
(cherry picked from commit b5f28865efebb14c66d5580dfa7bf0634b5e3241)
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>

Michael Crosby authored on 2019/06/21 05:21:42
Showing 6 changed files
... ...
@@ -730,7 +730,7 @@ func (i *rio) Close() error {
730 730
 }
731 731
 
732 732
 func (i *rio) Wait() {
733
-	i.sc.Wait()
733
+	i.sc.Wait(context.Background())
734 734
 
735 735
 	i.IO.Wait()
736 736
 }
... ...
@@ -1,6 +1,7 @@
1 1
 package stream // import "github.com/docker/docker/container/stream"
2 2
 
3 3
 import (
4
+	"context"
4 5
 	"fmt"
5 6
 	"io"
6 7
 	"io/ioutil"
... ...
@@ -24,11 +25,12 @@ import (
24 24
 // copied and delivered to all StdoutPipe and StderrPipe consumers, using
25 25
 // a kind of "broadcaster".
26 26
 type Config struct {
27
-	sync.WaitGroup
27
+	wg        sync.WaitGroup
28 28
 	stdout    *broadcaster.Unbuffered
29 29
 	stderr    *broadcaster.Unbuffered
30 30
 	stdin     io.ReadCloser
31 31
 	stdinPipe io.WriteCloser
32
+	dio       *cio.DirectIO
32 33
 }
33 34
 
34 35
 // NewConfig creates a stream config and initializes
... ...
@@ -115,14 +117,15 @@ func (c *Config) CloseStreams() error {
115 115
 
116 116
 // CopyToPipe connects streamconfig with a libcontainerd.IOPipe
117 117
 func (c *Config) CopyToPipe(iop *cio.DirectIO) {
118
+	c.dio = iop
118 119
 	copyFunc := func(w io.Writer, r io.ReadCloser) {
119
-		c.Add(1)
120
+		c.wg.Add(1)
120 121
 		go func() {
121 122
 			if _, err := pools.Copy(w, r); err != nil {
122 123
 				logrus.Errorf("stream copy error: %v", err)
123 124
 			}
124 125
 			r.Close()
125
-			c.Done()
126
+			c.wg.Done()
126 127
 		}()
127 128
 	}
128 129
 
... ...
@@ -144,3 +147,23 @@ func (c *Config) CopyToPipe(iop *cio.DirectIO) {
144 144
 		}
145 145
 	}
146 146
 }
147
+
148
+// Wait for the stream to close
149
+// Wait supports timeouts via the context to unblock and forcefully
150
+// close the io streams
151
+func (c *Config) Wait(ctx context.Context) {
152
+	done := make(chan struct{}, 1)
153
+	go func() {
154
+		c.wg.Wait()
155
+		close(done)
156
+	}()
157
+	select {
158
+	case <-done:
159
+	case <-ctx.Done():
160
+		if c.dio != nil {
161
+			c.dio.Cancel()
162
+			c.dio.Wait()
163
+			c.dio.Close()
164
+		}
165
+	}
166
+}
... ...
@@ -1,6 +1,7 @@
1 1
 package exec // import "github.com/docker/docker/daemon/exec"
2 2
 
3 3
 import (
4
+	"context"
4 5
 	"runtime"
5 6
 	"sync"
6 7
 
... ...
@@ -58,7 +59,7 @@ func (i *rio) Close() error {
58 58
 }
59 59
 
60 60
 func (i *rio) Wait() {
61
-	i.sc.Wait()
61
+	i.sc.Wait(context.Background())
62 62
 
63 63
 	i.IO.Wait()
64 64
 }
... ...
@@ -55,8 +55,9 @@ func (daemon *Daemon) ProcessEvent(id string, e libcontainerdtypes.EventType, ei
55 55
 			if err != nil {
56 56
 				logrus.WithError(err).Warnf("failed to delete container %s from containerd", c.ID)
57 57
 			}
58
+			ctx, _ := context.WithTimeout(context.Background(), 2*time.Second)
58 59
 
59
-			c.StreamConfig.Wait()
60
+			c.StreamConfig.Wait(ctx)
60 61
 			c.Reset(false)
61 62
 
62 63
 			exitStatus := container.ExitStatus{
... ...
@@ -124,7 +125,10 @@ func (daemon *Daemon) ProcessEvent(id string, e libcontainerdtypes.EventType, ei
124 124
 			defer execConfig.Unlock()
125 125
 			execConfig.ExitCode = &ec
126 126
 			execConfig.Running = false
127
-			execConfig.StreamConfig.Wait()
127
+
128
+			ctx, _ := context.WithTimeout(context.Background(), 2*time.Second)
129
+			execConfig.StreamConfig.Wait(ctx)
130
+
128 131
 			if err := execConfig.CloseStreams(); err != nil {
129 132
 				logrus.Errorf("failed to cleanup exec %s streams: %s", c.ID, err)
130 133
 			}
... ...
@@ -11,7 +11,6 @@ import (
11 11
 	"reflect"
12 12
 	"runtime"
13 13
 	"sort"
14
-	"strconv"
15 14
 	"strings"
16 15
 	"sync"
17 16
 	"time"
... ...
@@ -19,7 +18,6 @@ import (
19 19
 	"github.com/docker/docker/client"
20 20
 	"github.com/docker/docker/integration-cli/cli"
21 21
 	"github.com/docker/docker/integration-cli/cli/build"
22
-	"github.com/docker/docker/pkg/parsers/kernel"
23 22
 	"github.com/go-check/check"
24 23
 	"gotest.tools/assert"
25 24
 	is "gotest.tools/assert/cmp"
... ...
@@ -534,100 +532,3 @@ func (s *DockerSuite) TestExecEnvLinksHost(c *check.C) {
534 534
 	assert.Check(c, is.Contains(out, "HOSTNAME=myhost"))
535 535
 	assert.Check(c, is.Contains(out, "DB_NAME=/bar/db"))
536 536
 }
537
-
538
-func (s *DockerSuite) TestExecWindowsOpenHandles(c *check.C) {
539
-	testRequires(c, DaemonIsWindows)
540
-
541
-	if runtime.GOOS == "windows" {
542
-		v, err := kernel.GetKernelVersion()
543
-		assert.NilError(c, err)
544
-		build, _ := strconv.Atoi(strings.Split(strings.SplitN(v.String(), " ", 3)[2][1:], ".")[0])
545
-		if build >= 17743 {
546
-			c.Skip("Temporarily disabled on RS5 17743+ builds due to platform bug")
547
-
548
-			// This is being tracked internally. @jhowardmsft. Summary of failure
549
-			// from an email in early July 2018 below:
550
-			//
551
-			// Platform regression. In cmd.exe by the look of it. I can repro
552
-			// it outside of CI.  It fails the same on 17681, 17676 and even as
553
-			// far back as 17663, over a month old. From investigating, I can see
554
-			// what's happening in the container, but not the reason. The test
555
-			// starts a long-running container based on the Windows busybox image.
556
-			// It then adds another process (docker exec) to that container to
557
-			// sleep. It loops waiting for two instances of busybox.exe running,
558
-			// and cmd.exe to quit. What's actually happening is that the second
559
-			// exec hangs indefinitely, and from docker top, I can see
560
-			// "OpenWith.exe" running.
561
-
562
-			//Manual repro would be
563
-			//# Start the first long-running container
564
-			//docker run --rm -d --name test busybox sleep 300
565
-
566
-			//# In another window, docker top test. There should be a single instance of busybox.exe running
567
-			//# In a third window, docker exec test cmd /c start sleep 10  NOTE THIS HANGS UNTIL 5 MIN TIMEOUT
568
-			//# In the second window, run docker top test. Note that OpenWith.exe is running, one cmd.exe and only one busybox. I would expect no "OpenWith" and two busybox.exe's.
569
-		}
570
-	}
571
-
572
-	runSleepingContainer(c, "-d", "--name", "test")
573
-	exec := make(chan bool)
574
-	go func() {
575
-		dockerCmd(c, "exec", "test", "cmd", "/c", "start sleep 10")
576
-		exec <- true
577
-	}()
578
-
579
-	count := 0
580
-	for {
581
-		top := make(chan string)
582
-		var out string
583
-		go func() {
584
-			out, _ := dockerCmd(c, "top", "test")
585
-			top <- out
586
-		}()
587
-
588
-		select {
589
-		case <-time.After(time.Second * 5):
590
-			c.Fatal("timed out waiting for top while exec is exiting")
591
-		case out = <-top:
592
-			break
593
-		}
594
-
595
-		if strings.Count(out, "busybox.exe") == 2 && !strings.Contains(out, "cmd.exe") {
596
-			// The initial exec process (cmd.exe) has exited, and both sleeps are currently running
597
-			break
598
-		}
599
-		count++
600
-		if count >= 30 {
601
-			c.Fatal("too many retries")
602
-		}
603
-		time.Sleep(1 * time.Second)
604
-	}
605
-
606
-	inspect := make(chan bool)
607
-	go func() {
608
-		dockerCmd(c, "inspect", "test")
609
-		inspect <- true
610
-	}()
611
-
612
-	select {
613
-	case <-time.After(time.Second * 5):
614
-		c.Fatal("timed out waiting for inspect while exec is exiting")
615
-	case <-inspect:
616
-		break
617
-	}
618
-
619
-	// Ensure the background sleep is still running
620
-	out, _ := dockerCmd(c, "top", "test")
621
-	assert.Equal(c, strings.Count(out, "busybox.exe"), 2)
622
-
623
-	// The exec should exit when the background sleep exits
624
-	select {
625
-	case <-time.After(time.Second * 15):
626
-		c.Fatal("timed out waiting for async exec to exit")
627
-	case <-exec:
628
-		// Ensure the background sleep has actually exited
629
-		out, _ := dockerCmd(c, "top", "test")
630
-		assert.Equal(c, strings.Count(out, "busybox.exe"), 1)
631
-		break
632
-	}
633
-}
... ...
@@ -652,13 +652,6 @@ func (c *client) processEvent(ctx context.Context, et libcontainerdtypes.EventTy
652 652
 					}).Error("exit event")
653 653
 				return
654 654
 			}
655
-			_, err = p.Delete(context.Background())
656
-			if err != nil {
657
-				c.logger.WithError(err).WithFields(logrus.Fields{
658
-					"container": ei.ContainerID,
659
-					"process":   ei.ProcessID,
660
-				}).Warn("failed to delete process")
661
-			}
662 655
 
663 656
 			ctr, err := c.getContainer(ctx, ei.ContainerID)
664 657
 			if err != nil {
... ...
@@ -672,11 +665,18 @@ func (c *client) processEvent(ctx context.Context, et libcontainerdtypes.EventTy
672 672
 					c.logger.WithFields(logrus.Fields{
673 673
 						"container": ei.ContainerID,
674 674
 						"error":     err,
675
-					}).Error("failed to find container")
675
+					}).Error("failed to get container labels")
676 676
 					return
677 677
 				}
678 678
 				newFIFOSet(labels[DockerContainerBundlePath], ei.ProcessID, true, false).Close()
679 679
 			}
680
+			_, err = p.Delete(context.Background())
681
+			if err != nil {
682
+				c.logger.WithError(err).WithFields(logrus.Fields{
683
+					"container": ei.ContainerID,
684
+					"process":   ei.ProcessID,
685
+				}).Warn("failed to delete process")
686
+			}
680 687
 		}
681 688
 	})
682 689
 }