Browse code

Use cio.FIFOSet.Close() to cleanup fifos

Signed-off-by: Daniel Nephin <dnephin@docker.com>

Daniel Nephin authored on 2017/12/08 05:52:14
Showing 5 changed files
... ...
@@ -4,6 +4,10 @@ TOMLV_COMMIT=9baf8a8a9f2ed20a8e54160840c492f937eeaf9a
4 4
 
5 5
 # When updating RUNC_COMMIT, also update runc in vendor.conf accordingly
6 6
 RUNC_COMMIT=b2567b37d7b75eb4cf325b77297b140ea686ce8f
7
+
8
+# containerd is also pinned in vendor.conf. When updating the binary
9
+# version you may also need to update the vendor version to pick up bug
10
+# fixes or new APIs.
7 11
 CONTAINERD_COMMIT=89623f28b87a6004d4b785663257362d1658a729 # v1.0.0
8 12
 TINI_COMMIT=949e6facb77383876aeff8a6944dde66b3089574
9 13
 LIBNETWORK_COMMIT=7b2b1feb1de4817d522cc372af149ff48d25028e
... ...
@@ -121,7 +121,7 @@ func (c *client) Restore(ctx context.Context, id string, attachStdio StdioCallba
121 121
 	c.Lock()
122 122
 	defer c.Unlock()
123 123
 
124
-	var rio *cio.DirectIO
124
+	var rio cio.IO
125 125
 	defer func() {
126 126
 		err = wrapError(err)
127 127
 	}()
... ...
@@ -139,12 +139,13 @@ func (c *client) Restore(ctx context.Context, id string, attachStdio StdioCallba
139 139
 	}()
140 140
 
141 141
 	t, err := ctr.Task(ctx, func(fifos *cio.FIFOSet) (cio.IO, error) {
142
-		rio, err = cio.NewDirectIO(ctx, fifos)
142
+		io, err := cio.NewDirectIO(ctx, fifos)
143 143
 		if err != nil {
144 144
 			return nil, err
145 145
 		}
146 146
 
147
-		return attachStdio(rio)
147
+		rio, err = attachStdio(io)
148
+		return rio, err
148 149
 	})
149 150
 	if err != nil && !errdefs.IsNotFound(errors.Cause(err)) {
150 151
 		return false, -1, err
... ...
@@ -322,7 +323,6 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec *
322 322
 				rio.Cancel()
323 323
 				rio.Close()
324 324
 			}
325
-			rmFIFOSet(fifos)
326 325
 		}
327 326
 	}()
328 327
 
... ...
@@ -332,10 +332,6 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec *
332 332
 	})
333 333
 	if err != nil {
334 334
 		close(stdinCloseSync)
335
-		if rio != nil {
336
-			rio.Cancel()
337
-			rio.Close()
338
-		}
339 335
 		return -1, err
340 336
 	}
341 337
 
... ...
@@ -686,7 +682,7 @@ func (c *client) processEvent(ctr *container, et EventType, ei EventInfo) {
686 686
 					"container": ei.ContainerID,
687 687
 				}).Error("failed to find container")
688 688
 			} else {
689
-				rmFIFOSet(newFIFOSet(ctr.bundleDir, ei.ProcessID, true, false))
689
+				newFIFOSet(ctr.bundleDir, ei.ProcessID, true, false).Close()
690 690
 			}
691 691
 		}
692 692
 	})
... ...
@@ -81,30 +81,28 @@ func prepareBundleDir(bundleDir string, ociSpec *specs.Spec) (string, error) {
81 81
 }
82 82
 
83 83
 func newFIFOSet(bundleDir, processID string, withStdin, withTerminal bool) *cio.FIFOSet {
84
-	fifos := &cio.FIFOSet{
85
-		Config: cio.Config{
86
-			Terminal: withTerminal,
87
-			Stdout:   filepath.Join(bundleDir, processID+"-stdout"),
88
-		},
84
+	config := cio.Config{
85
+		Terminal: withTerminal,
86
+		Stdout:   filepath.Join(bundleDir, processID+"-stdout"),
89 87
 	}
88
+	paths := []string{config.Stdout}
90 89
 
91 90
 	if withStdin {
92
-		fifos.Stdin = filepath.Join(bundleDir, processID+"-stdin")
91
+		config.Stdin = filepath.Join(bundleDir, processID+"-stdin")
92
+		paths = append(paths, config.Stdin)
93 93
 	}
94
-
95
-	if !fifos.Terminal {
96
-		fifos.Stderr = filepath.Join(bundleDir, processID+"-stderr")
94
+	if !withTerminal {
95
+		config.Stderr = filepath.Join(bundleDir, processID+"-stderr")
96
+		paths = append(paths, config.Stderr)
97 97
 	}
98
-
99
-	return fifos
100
-}
101
-
102
-func rmFIFOSet(fset *cio.FIFOSet) {
103
-	for _, fn := range []string{fset.Stdout, fset.Stdin, fset.Stderr} {
104
-		if fn != "" {
105
-			if err := os.RemoveAll(fn); err != nil {
106
-				logrus.Warnf("libcontainerd: failed to remove fifo %v: %v", fn, err)
98
+	closer := func() error {
99
+		for _, path := range paths {
100
+			if err := os.RemoveAll(path); err != nil {
101
+				logrus.Warnf("libcontainerd: failed to remove fifo %v: %v", path, err)
107 102
 			}
108 103
 		}
104
+		return nil
109 105
 	}
106
+
107
+	return cio.NewFIFOSet(config, closer)
110 108
 }
... ...
@@ -2,6 +2,7 @@ package libcontainerd
2 2
 
3 3
 import (
4 4
 	"fmt"
5
+	"path/filepath"
5 6
 
6 7
 	"github.com/containerd/containerd/cio"
7 8
 	"github.com/containerd/containerd/windows/hcsshimtypes"
... ...
@@ -35,19 +36,20 @@ func pipeName(containerID, processID, name string) string {
35 35
 	return fmt.Sprintf(`\\.\pipe\containerd-%s-%s-%s`, containerID, processID, name)
36 36
 }
37 37
 
38
-func newFIFOSet(bundleDir, containerID, processID string, withStdin, withTerminal bool) *cio.FIFOSet {
39
-	fifos := &cio.FIFOSet{
38
+func newFIFOSet(bundleDir, processID string, withStdin, withTerminal bool) *cio.FIFOSet {
39
+	containerID := filepath.Base(bundleDir)
40
+	config := cio.Config{
40 41
 		Terminal: withTerminal,
41
-		Out:      pipeName(containerID, processID, "stdout"),
42
+		Stdout:   pipeName(containerID, processID, "stdout"),
42 43
 	}
43 44
 
44 45
 	if withStdin {
45
-		fifos.In = pipeName(containerID, processID, "stdin")
46
+		config.Stdin = pipeName(containerID, processID, "stdin")
46 47
 	}
47 48
 
48
-	if !fifos.Terminal {
49
-		fifos.Err = pipeName(containerID, processID, "stderr")
49
+	if !config.Terminal {
50
+		config.Stderr = pipeName(containerID, processID, "stderr")
50 51
 	}
51 52
 
52
-	return fifos
53
+	return cio.NewFIFOSet(config, nil)
53 54
 }
... ...
@@ -4,7 +4,6 @@ import (
4 4
 	"context"
5 5
 	"encoding/json"
6 6
 	"fmt"
7
-	"io"
8 7
 	"io/ioutil"
9 8
 	"os"
10 9
 	"path"
... ...
@@ -671,7 +670,7 @@ func (c *client) Start(_ context.Context, id, _ string, withStdin bool, attachSt
671 671
 		return p.pid, nil
672 672
 	}
673 673
 
674
-	dio, err := newIOFromProcess(newProcess)
674
+	dio, err := newIOFromProcess(newProcess, ctr.ociSpec.Process.Terminal)
675 675
 	if err != nil {
676 676
 		logger.WithError(err).Error("failed to get stdio pipes")
677 677
 		return -1, err
... ...
@@ -712,16 +711,14 @@ func (c *client) Start(_ context.Context, id, _ string, withStdin bool, attachSt
712 712
 	return p.pid, nil
713 713
 }
714 714
 
715
-func newIOFromProcess(newProcess process) (*cio.DirectIO, error) {
715
+func newIOFromProcess(newProcess hcsshim.Process, terminal bool) (*cio.DirectIO, error) {
716 716
 	stdin, stdout, stderr, err := newProcess.Stdio()
717 717
 	if err != nil {
718 718
 		return nil, err
719 719
 	}
720 720
 
721
-	dio := cio.DirectIO{
722
-		Terminal: ctr.ociSpec.Process.Terminal,
723
-		Stdin:    createStdInCloser(stdin, newProcess),
724
-	}
721
+	dio := cio.NewDirectIO(createStdInCloser(stdin, newProcess), nil, nil, terminal)
722
+
725 723
 	// Convert io.ReadClosers to io.Readers
726 724
 	if stdout != nil {
727 725
 		dio.Stdout = ioutil.NopCloser(&autoClosingReader{ReadCloser: stdout})
... ...
@@ -786,10 +783,6 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec *
786 786
 	logger.Debugf("exec commandLine: %s", createProcessParms.CommandLine)
787 787
 
788 788
 	// Start the command running in the container.
789
-	var (
790
-		stdout, stderr io.ReadCloser
791
-		stdin          io.WriteCloser
792
-	)
793 789
 	newProcess, err := ctr.hcsContainer.CreateProcess(&createProcessParms)
794 790
 	if err != nil {
795 791
 		logger.WithError(err).Errorf("exec's CreateProcess() failed")
... ...
@@ -812,12 +805,11 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec *
812 812
 		}
813 813
 	}()
814 814
 
815
-	dio, err := newIOFromProcess(newProcess)
815
+	dio, err := newIOFromProcess(newProcess, spec.Terminal)
816 816
 	if err != nil {
817 817
 		logger.WithError(err).Error("failed to get stdio pipes")
818 818
 		return -1, err
819 819
 	}
820
-	dio.Termainl = spec.Terminal
821 820
 	// Tell the engine to attach streams back to the client
822 821
 	_, err = attachStdio(dio)
823 822
 	if err != nil {