Browse code

Decouple plugin manager from libcontainerd package

libcontainerd has a bunch of platform dependent code and huge interfaces
that are a pain implement.
To make the plugin manager a bit easier to work with, extract the plugin
executor into an interface and move the containerd implementation to a
separate package.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>

Brian Goff authored on 2017/07/15 05:45:32
Showing 7 changed files
... ...
@@ -48,6 +48,7 @@ import (
48 48
 	"github.com/docker/docker/pkg/system"
49 49
 	"github.com/docker/docker/pkg/truncindex"
50 50
 	"github.com/docker/docker/plugin"
51
+	pluginexec "github.com/docker/docker/plugin/executor/containerd"
51 52
 	refstore "github.com/docker/docker/reference"
52 53
 	"github.com/docker/docker/registry"
53 54
 	"github.com/docker/docker/runconfig"
... ...
@@ -646,12 +647,16 @@ func NewDaemon(config *config.Config, registryService registry.Service, containe
646 646
 	}
647 647
 	registerMetricsPluginCallback(d.PluginStore, metricsSockPath)
648 648
 
649
+	createPluginExec := func(m *plugin.Manager) (plugin.Executor, error) {
650
+		return pluginexec.New(containerdRemote, m)
651
+	}
652
+
649 653
 	// Plugin system initialization should happen before restore. Do not change order.
650 654
 	d.pluginManager, err = plugin.NewManager(plugin.ManagerConfig{
651 655
 		Root:               filepath.Join(config.Root, "plugins"),
652 656
 		ExecRoot:           getPluginExecRoot(config.Root),
653 657
 		Store:              d.PluginStore,
654
-		Executor:           containerdRemote,
658
+		CreateExecutor:     createPluginExec,
655 659
 		RegistryService:    registryService,
656 660
 		LiveRestoreEnabled: config.LiveRestoreEnabled,
657 661
 		LogPluginEvent:     d.LogPluginEvent, // todo: make private
... ...
@@ -1,9 +1,19 @@
1 1
 package plugin
2 2
 
3 3
 import (
4
+	"encoding/json"
4 5
 	"io"
6
+	"io/ioutil"
7
+	"os"
8
+	"os/exec"
9
+	"path/filepath"
10
+	"time"
5 11
 
6 12
 	"github.com/docker/docker/api/types"
13
+	"github.com/docker/docker/pkg/archive"
14
+	"github.com/docker/docker/plugin"
15
+	"github.com/docker/docker/registry"
16
+	"github.com/pkg/errors"
7 17
 	"golang.org/x/net/context"
8 18
 )
9 19
 
... ...
@@ -32,3 +42,142 @@ func WithBinary(bin string) CreateOpt {
32 32
 type CreateClient interface {
33 33
 	PluginCreate(context.Context, io.Reader, types.PluginCreateOptions) error
34 34
 }
35
+
36
+// Create creates a new plugin with the specified name
37
+func Create(ctx context.Context, c CreateClient, name string, opts ...CreateOpt) error {
38
+	tmpDir, err := ioutil.TempDir("", "create-test-plugin")
39
+	if err != nil {
40
+		return err
41
+	}
42
+	defer os.RemoveAll(tmpDir)
43
+
44
+	tar, err := makePluginBundle(tmpDir, opts...)
45
+	if err != nil {
46
+		return err
47
+	}
48
+	defer tar.Close()
49
+
50
+	ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
51
+	defer cancel()
52
+
53
+	return c.PluginCreate(ctx, tar, types.PluginCreateOptions{RepoName: name})
54
+}
55
+
56
+// CreateInRegistry makes a plugin (locally) and pushes it to a registry.
57
+// This does not use a dockerd instance to create or push the plugin.
58
+// If you just want to create a plugin in some daemon, use `Create`.
59
+//
60
+// This can be useful when testing plugins on swarm where you don't really want
61
+// the plugin to exist on any of the daemons (immediately) and there needs to be
62
+// some way to distribute the plugin.
63
+func CreateInRegistry(ctx context.Context, repo string, auth *types.AuthConfig, opts ...CreateOpt) error {
64
+	tmpDir, err := ioutil.TempDir("", "create-test-plugin-local")
65
+	if err != nil {
66
+		return err
67
+	}
68
+	defer os.RemoveAll(tmpDir)
69
+
70
+	inPath := filepath.Join(tmpDir, "plugin")
71
+	if err := os.MkdirAll(inPath, 0755); err != nil {
72
+		return errors.Wrap(err, "error creating plugin root")
73
+	}
74
+
75
+	tar, err := makePluginBundle(inPath, opts...)
76
+	if err != nil {
77
+		return err
78
+	}
79
+	defer tar.Close()
80
+
81
+	dummyExec := func(m *plugin.Manager) (plugin.Executor, error) {
82
+		return nil, nil
83
+	}
84
+
85
+	regService, err := registry.NewService(registry.ServiceOptions{V2Only: true})
86
+	if err != nil {
87
+		return err
88
+	}
89
+
90
+	managerConfig := plugin.ManagerConfig{
91
+		Store:           plugin.NewStore(),
92
+		RegistryService: regService,
93
+		Root:            filepath.Join(tmpDir, "root"),
94
+		ExecRoot:        "/run/docker", // manager init fails if not set
95
+		CreateExecutor:  dummyExec,
96
+		LogPluginEvent:  func(id, name, action string) {}, // panics when not set
97
+	}
98
+	manager, err := plugin.NewManager(managerConfig)
99
+	if err != nil {
100
+		return errors.Wrap(err, "error creating plugin manager")
101
+	}
102
+
103
+	ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
104
+	defer cancel()
105
+	if err := manager.CreateFromContext(ctx, tar, &types.PluginCreateOptions{RepoName: repo}); err != nil {
106
+		return err
107
+	}
108
+
109
+	if auth == nil {
110
+		auth = &types.AuthConfig{}
111
+	}
112
+	err = manager.Push(ctx, repo, nil, auth, ioutil.Discard)
113
+	return errors.Wrap(err, "error pushing plugin")
114
+}
115
+
116
+func makePluginBundle(inPath string, opts ...CreateOpt) (io.ReadCloser, error) {
117
+	p := &types.PluginConfig{
118
+		Interface: types.PluginConfigInterface{
119
+			Socket: "basic.sock",
120
+			Types:  []types.PluginInterfaceType{{Capability: "docker.dummy/1.0"}},
121
+		},
122
+		Entrypoint: []string{"/basic"},
123
+	}
124
+	cfg := &Config{
125
+		PluginConfig: p,
126
+	}
127
+	for _, o := range opts {
128
+		o(cfg)
129
+	}
130
+	if cfg.binPath == "" {
131
+		binPath, err := ensureBasicPluginBin()
132
+		if err != nil {
133
+			return nil, err
134
+		}
135
+		cfg.binPath = binPath
136
+	}
137
+
138
+	configJSON, err := json.Marshal(p)
139
+	if err != nil {
140
+		return nil, err
141
+	}
142
+	if err := ioutil.WriteFile(filepath.Join(inPath, "config.json"), configJSON, 0644); err != nil {
143
+		return nil, err
144
+	}
145
+	if err := os.MkdirAll(filepath.Join(inPath, "rootfs", filepath.Dir(p.Entrypoint[0])), 0755); err != nil {
146
+		return nil, errors.Wrap(err, "error creating plugin rootfs dir")
147
+	}
148
+	if err := archive.NewDefaultArchiver().CopyFileWithTar(cfg.binPath, filepath.Join(inPath, "rootfs", p.Entrypoint[0])); err != nil {
149
+		return nil, errors.Wrap(err, "error copying plugin binary to rootfs path")
150
+	}
151
+	tar, err := archive.Tar(inPath, archive.Uncompressed)
152
+	return tar, errors.Wrap(err, "error making plugin archive")
153
+}
154
+
155
+func ensureBasicPluginBin() (string, error) {
156
+	name := "docker-basic-plugin"
157
+	p, err := exec.LookPath(name)
158
+	if err == nil {
159
+		return p, nil
160
+	}
161
+
162
+	goBin, err := exec.LookPath("go")
163
+	if err != nil {
164
+		return "", err
165
+	}
166
+	installPath := filepath.Join(os.Getenv("GOPATH"), "bin", name)
167
+	cmd := exec.Command(goBin, "build", "-o", installPath, "./"+filepath.Join("fixtures", "plugin", "basic"))
168
+	cmd.Env = append(cmd.Env, "CGO_ENABLED=0")
169
+	if out, err := cmd.CombinedOutput(); err != nil {
170
+		return "", errors.Wrapf(err, "error building basic plugin bin: %s", string(out))
171
+	}
172
+	return installPath, nil
173
+}
35 174
deleted file mode 100644
... ...
@@ -1,162 +0,0 @@
1
-package plugin
2
-
3
-import (
4
-	"encoding/json"
5
-	"io"
6
-	"io/ioutil"
7
-	"os"
8
-	"os/exec"
9
-	"path/filepath"
10
-	"time"
11
-
12
-	"github.com/docker/docker/api/types"
13
-	"github.com/docker/docker/libcontainerd"
14
-	"github.com/docker/docker/pkg/archive"
15
-	"github.com/docker/docker/plugin"
16
-	"github.com/docker/docker/registry"
17
-	"github.com/pkg/errors"
18
-	"golang.org/x/net/context"
19
-)
20
-
21
-// Create creates a new plugin with the specified name
22
-func Create(ctx context.Context, c CreateClient, name string, opts ...CreateOpt) error {
23
-	tmpDir, err := ioutil.TempDir("", "create-test-plugin")
24
-	if err != nil {
25
-		return err
26
-	}
27
-	defer os.RemoveAll(tmpDir)
28
-
29
-	tar, err := makePluginBundle(tmpDir, opts...)
30
-	if err != nil {
31
-		return err
32
-	}
33
-	defer tar.Close()
34
-
35
-	ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
36
-	defer cancel()
37
-
38
-	return c.PluginCreate(ctx, tar, types.PluginCreateOptions{RepoName: name})
39
-}
40
-
41
-// TODO(@cpuguy83): we really shouldn't have to do this...
42
-// The manager panics on init when `Executor` is not set.
43
-type dummyExecutor struct{}
44
-
45
-func (dummyExecutor) Client(libcontainerd.Backend) (libcontainerd.Client, error) { return nil, nil }
46
-func (dummyExecutor) Cleanup()                                                   {}
47
-func (dummyExecutor) UpdateOptions(...libcontainerd.RemoteOption) error          { return nil }
48
-
49
-// CreateInRegistry makes a plugin (locally) and pushes it to a registry.
50
-// This does not use a dockerd instance to create or push the plugin.
51
-// If you just want to create a plugin in some daemon, use `Create`.
52
-//
53
-// This can be useful when testing plugins on swarm where you don't really want
54
-// the plugin to exist on any of the daemons (immediately) and there needs to be
55
-// some way to distribute the plugin.
56
-func CreateInRegistry(ctx context.Context, repo string, auth *types.AuthConfig, opts ...CreateOpt) error {
57
-	tmpDir, err := ioutil.TempDir("", "create-test-plugin-local")
58
-	if err != nil {
59
-		return err
60
-	}
61
-	defer os.RemoveAll(tmpDir)
62
-
63
-	inPath := filepath.Join(tmpDir, "plugin")
64
-	if err := os.MkdirAll(inPath, 0755); err != nil {
65
-		return errors.Wrap(err, "error creating plugin root")
66
-	}
67
-
68
-	tar, err := makePluginBundle(inPath, opts...)
69
-	if err != nil {
70
-		return err
71
-	}
72
-	defer tar.Close()
73
-
74
-	regService, err := registry.NewService(registry.ServiceOptions{V2Only: true})
75
-	if err != nil {
76
-		return err
77
-	}
78
-
79
-	managerConfig := plugin.ManagerConfig{
80
-		Store:           plugin.NewStore(),
81
-		RegistryService: regService,
82
-		Root:            filepath.Join(tmpDir, "root"),
83
-		ExecRoot:        "/run/docker", // manager init fails if not set
84
-		Executor:        dummyExecutor{},
85
-		LogPluginEvent:  func(id, name, action string) {}, // panics when not set
86
-	}
87
-	manager, err := plugin.NewManager(managerConfig)
88
-	if err != nil {
89
-		return errors.Wrap(err, "error creating plugin manager")
90
-	}
91
-
92
-	ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
93
-	defer cancel()
94
-	if err := manager.CreateFromContext(ctx, tar, &types.PluginCreateOptions{RepoName: repo}); err != nil {
95
-		return err
96
-	}
97
-
98
-	if auth == nil {
99
-		auth = &types.AuthConfig{}
100
-	}
101
-	err = manager.Push(ctx, repo, nil, auth, ioutil.Discard)
102
-	return errors.Wrap(err, "error pushing plugin")
103
-}
104
-
105
-func makePluginBundle(inPath string, opts ...CreateOpt) (io.ReadCloser, error) {
106
-	p := &types.PluginConfig{
107
-		Interface: types.PluginConfigInterface{
108
-			Socket: "basic.sock",
109
-			Types:  []types.PluginInterfaceType{{Capability: "docker.dummy/1.0"}},
110
-		},
111
-		Entrypoint: []string{"/basic"},
112
-	}
113
-	cfg := &Config{
114
-		PluginConfig: p,
115
-	}
116
-	for _, o := range opts {
117
-		o(cfg)
118
-	}
119
-	if cfg.binPath == "" {
120
-		binPath, err := ensureBasicPluginBin()
121
-		if err != nil {
122
-			return nil, err
123
-		}
124
-		cfg.binPath = binPath
125
-	}
126
-
127
-	configJSON, err := json.Marshal(p)
128
-	if err != nil {
129
-		return nil, err
130
-	}
131
-	if err := ioutil.WriteFile(filepath.Join(inPath, "config.json"), configJSON, 0644); err != nil {
132
-		return nil, err
133
-	}
134
-	if err := os.MkdirAll(filepath.Join(inPath, "rootfs", filepath.Dir(p.Entrypoint[0])), 0755); err != nil {
135
-		return nil, errors.Wrap(err, "error creating plugin rootfs dir")
136
-	}
137
-	if err := archive.NewDefaultArchiver().CopyFileWithTar(cfg.binPath, filepath.Join(inPath, "rootfs", p.Entrypoint[0])); err != nil {
138
-		return nil, errors.Wrap(err, "error copying plugin binary to rootfs path")
139
-	}
140
-	tar, err := archive.Tar(inPath, archive.Uncompressed)
141
-	return tar, errors.Wrap(err, "error making plugin archive")
142
-}
143
-
144
-func ensureBasicPluginBin() (string, error) {
145
-	name := "docker-basic-plugin"
146
-	p, err := exec.LookPath(name)
147
-	if err == nil {
148
-		return p, nil
149
-	}
150
-
151
-	goBin, err := exec.LookPath("go")
152
-	if err != nil {
153
-		return "", err
154
-	}
155
-	installPath := filepath.Join(os.Getenv("GOPATH"), "bin", name)
156
-	cmd := exec.Command(goBin, "build", "-o", installPath, "./"+filepath.Join("fixtures", "plugin", "basic"))
157
-	cmd.Env = append(cmd.Env, "CGO_ENABLED=0")
158
-	if out, err := cmd.CombinedOutput(); err != nil {
159
-		return "", errors.Wrapf(err, "error building basic plugin bin: %s", string(out))
160
-	}
161
-	return installPath, nil
162
-}
163 1
deleted file mode 100644
... ...
@@ -1,19 +0,0 @@
1
-// +build !linux
2
-
3
-package plugin
4
-
5
-import (
6
-	"github.com/docker/docker/api/types"
7
-	"github.com/pkg/errors"
8
-	"golang.org/x/net/context"
9
-)
10
-
11
-// Create is not supported on this platform
12
-func Create(ctx context.Context, c CreateClient, name string, opts ...CreateOpt) error {
13
-	return errors.New("not supported on this platform")
14
-}
15
-
16
-// CreateInRegistry is not supported on this platform
17
-func CreateInRegistry(ctx context.Context, repo string, auth *types.AuthConfig, opts ...CreateOpt) error {
18
-	return errors.New("not supported on this platform")
19
-}
20 1
new file mode 100644
... ...
@@ -0,0 +1,77 @@
0
+package containerd
1
+
2
+import (
3
+	"io"
4
+
5
+	"github.com/docker/docker/libcontainerd"
6
+	"github.com/opencontainers/runtime-spec/specs-go"
7
+	"github.com/pkg/errors"
8
+)
9
+
10
+// ExitHandler represents an object that is called when the exit event is received from containerd
11
+type ExitHandler interface {
12
+	HandleExitEvent(id string) error
13
+}
14
+
15
+// New creates a new containerd plugin executor
16
+func New(remote libcontainerd.Remote, exitHandler ExitHandler) (*Executor, error) {
17
+	e := &Executor{exitHandler: exitHandler}
18
+	client, err := remote.Client(e)
19
+	if err != nil {
20
+		return nil, errors.Wrap(err, "error creating containerd exec client")
21
+	}
22
+	e.client = client
23
+	return e, nil
24
+}
25
+
26
+// Executor is the containerd client implementation of a plugin executor
27
+type Executor struct {
28
+	client      libcontainerd.Client
29
+	exitHandler ExitHandler
30
+}
31
+
32
+// Create creates a new container
33
+func (e *Executor) Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error {
34
+	return e.client.Create(id, "", "", spec, attachStreamsFunc(stdout, stderr))
35
+}
36
+
37
+// Restore restores a container
38
+func (e *Executor) Restore(id string, stdout, stderr io.WriteCloser) error {
39
+	return e.client.Restore(id, attachStreamsFunc(stdout, stderr))
40
+}
41
+
42
+// IsRunning returns if the container with the given id is running
43
+func (e *Executor) IsRunning(id string) (bool, error) {
44
+	pids, err := e.client.GetPidsForContainer(id)
45
+	return len(pids) > 0, err
46
+}
47
+
48
+// Signal sends the specified signal to the container
49
+func (e *Executor) Signal(id string, signal int) error {
50
+	return e.client.Signal(id, signal)
51
+}
52
+
53
+// StateChanged handles state changes from containerd
54
+// All events are ignored except the exit event, which is sent of to the stored handler
55
+func (e *Executor) StateChanged(id string, event libcontainerd.StateInfo) error {
56
+	switch event.State {
57
+	case libcontainerd.StateExit:
58
+		return e.exitHandler.HandleExitEvent(id)
59
+	}
60
+	return nil
61
+}
62
+
63
+func attachStreamsFunc(stdout, stderr io.WriteCloser) func(libcontainerd.IOPipe) error {
64
+	return func(iop libcontainerd.IOPipe) error {
65
+		iop.Stdin.Close()
66
+		go func() {
67
+			io.Copy(stdout, iop.Stdout)
68
+			stdout.Close()
69
+		}()
70
+		go func() {
71
+			io.Copy(stderr, iop.Stderr)
72
+			stderr.Close()
73
+		}()
74
+		return nil
75
+	}
76
+}
... ...
@@ -17,7 +17,6 @@ import (
17 17
 	"github.com/docker/docker/api/types"
18 18
 	"github.com/docker/docker/image"
19 19
 	"github.com/docker/docker/layer"
20
-	"github.com/docker/docker/libcontainerd"
21 20
 	"github.com/docker/docker/pkg/authorization"
22 21
 	"github.com/docker/docker/pkg/ioutils"
23 22
 	"github.com/docker/docker/pkg/mount"
... ...
@@ -26,6 +25,7 @@ import (
26 26
 	"github.com/docker/docker/plugin/v2"
27 27
 	"github.com/docker/docker/registry"
28 28
 	"github.com/opencontainers/go-digest"
29
+	specs "github.com/opencontainers/runtime-spec/specs-go"
29 30
 	"github.com/pkg/errors"
30 31
 	"github.com/sirupsen/logrus"
31 32
 )
... ...
@@ -35,6 +35,14 @@ const rootFSFileName = "rootfs"
35 35
 
36 36
 var validFullID = regexp.MustCompile(`^([a-f0-9]{64})$`)
37 37
 
38
+// Executor is the interface that the plugin manager uses to interact with for starting/stopping plugins
39
+type Executor interface {
40
+	Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error
41
+	Restore(id string, stdout, stderr io.WriteCloser) error
42
+	IsRunning(id string) (bool, error)
43
+	Signal(id string, signal int) error
44
+}
45
+
38 46
 func (pm *Manager) restorePlugin(p *v2.Plugin) error {
39 47
 	if p.IsEnabled() {
40 48
 		return pm.restore(p)
... ...
@@ -47,24 +55,27 @@ type eventLogger func(id, name, action string)
47 47
 // ManagerConfig defines configuration needed to start new manager.
48 48
 type ManagerConfig struct {
49 49
 	Store              *Store // remove
50
-	Executor           libcontainerd.Remote
51 50
 	RegistryService    registry.Service
52 51
 	LiveRestoreEnabled bool // TODO: remove
53 52
 	LogPluginEvent     eventLogger
54 53
 	Root               string
55 54
 	ExecRoot           string
55
+	CreateExecutor     ExecutorCreator
56 56
 	AuthzMiddleware    *authorization.Middleware
57 57
 }
58 58
 
59
+// ExecutorCreator is used in the manager config to pass in an `Executor`
60
+type ExecutorCreator func(*Manager) (Executor, error)
61
+
59 62
 // Manager controls the plugin subsystem.
60 63
 type Manager struct {
61
-	config           ManagerConfig
62
-	mu               sync.RWMutex // protects cMap
63
-	muGC             sync.RWMutex // protects blobstore deletions
64
-	cMap             map[*v2.Plugin]*controller
65
-	containerdClient libcontainerd.Client
66
-	blobStore        *basicBlobStore
67
-	publisher        *pubsub.Publisher
64
+	config    ManagerConfig
65
+	mu        sync.RWMutex // protects cMap
66
+	muGC      sync.RWMutex // protects blobstore deletions
67
+	cMap      map[*v2.Plugin]*controller
68
+	blobStore *basicBlobStore
69
+	publisher *pubsub.Publisher
70
+	executor  Executor
68 71
 }
69 72
 
70 73
 // controller represents the manager's control on a plugin.
... ...
@@ -111,10 +122,11 @@ func NewManager(config ManagerConfig) (*Manager, error) {
111 111
 	}
112 112
 
113 113
 	var err error
114
-	manager.containerdClient, err = config.Executor.Client(manager) // todo: move to another struct
114
+	manager.executor, err = config.CreateExecutor(manager)
115 115
 	if err != nil {
116
-		return nil, errors.Wrap(err, "failed to create containerd client")
116
+		return nil, err
117 117
 	}
118
+
118 119
 	manager.blobStore, err = newBasicBlobStore(filepath.Join(manager.config.Root, "storage/blobs"))
119 120
 	if err != nil {
120 121
 		return nil, err
... ...
@@ -133,42 +145,37 @@ func (pm *Manager) tmpDir() string {
133 133
 	return filepath.Join(pm.config.Root, "tmp")
134 134
 }
135 135
 
136
-// StateChanged updates plugin internals using libcontainerd events.
137
-func (pm *Manager) StateChanged(id string, e libcontainerd.StateInfo) error {
138
-	logrus.Debugf("plugin state changed %s %#v", id, e)
139
-
140
-	switch e.State {
141
-	case libcontainerd.StateExit:
142
-		p, err := pm.config.Store.GetV2Plugin(id)
143
-		if err != nil {
144
-			return err
145
-		}
136
+// HandleExitEvent is called when the executor receives the exit event
137
+// In the future we may change this, but for now all we care about is the exit event.
138
+func (pm *Manager) HandleExitEvent(id string) error {
139
+	p, err := pm.config.Store.GetV2Plugin(id)
140
+	if err != nil {
141
+		return err
142
+	}
146 143
 
147
-		os.RemoveAll(filepath.Join(pm.config.ExecRoot, id))
144
+	os.RemoveAll(filepath.Join(pm.config.ExecRoot, id))
148 145
 
149
-		if p.PropagatedMount != "" {
150
-			if err := mount.Unmount(p.PropagatedMount); err != nil {
151
-				logrus.Warnf("Could not unmount %s: %v", p.PropagatedMount, err)
152
-			}
153
-			propRoot := filepath.Join(filepath.Dir(p.Rootfs), "propagated-mount")
154
-			if err := mount.Unmount(propRoot); err != nil {
155
-				logrus.Warn("Could not unmount %s: %v", propRoot, err)
156
-			}
146
+	if p.PropagatedMount != "" {
147
+		if err := mount.Unmount(p.PropagatedMount); err != nil {
148
+			logrus.Warnf("Could not unmount %s: %v", p.PropagatedMount, err)
157 149
 		}
158
-
159
-		pm.mu.RLock()
160
-		c := pm.cMap[p]
161
-		if c.exitChan != nil {
162
-			close(c.exitChan)
150
+		propRoot := filepath.Join(filepath.Dir(p.Rootfs), "propagated-mount")
151
+		if err := mount.Unmount(propRoot); err != nil {
152
+			logrus.Warn("Could not unmount %s: %v", propRoot, err)
163 153
 		}
164
-		restart := c.restart
165
-		pm.mu.RUnlock()
154
+	}
166 155
 
167
-		if restart {
168
-			pm.enable(p, c, true)
169
-		}
156
+	pm.mu.RLock()
157
+	c := pm.cMap[p]
158
+	if c.exitChan != nil {
159
+		close(c.exitChan)
170 160
 	}
161
+	restart := c.restart
162
+	pm.mu.RUnlock()
171 163
 
164
+	if restart {
165
+		pm.enable(p, c, true)
166
+	}
172 167
 	return nil
173 168
 }
174 169
 
... ...
@@ -333,23 +340,10 @@ func (l logHook) Fire(entry *logrus.Entry) error {
333 333
 	return nil
334 334
 }
335 335
 
336
-func attachToLog(id string) func(libcontainerd.IOPipe) error {
337
-	return func(iop libcontainerd.IOPipe) error {
338
-		iop.Stdin.Close()
339
-
340
-		logger := logrus.New()
341
-		logger.Hooks.Add(logHook{id})
342
-		// TODO: cache writer per id
343
-		w := logger.Writer()
344
-		go func() {
345
-			io.Copy(w, iop.Stdout)
346
-		}()
347
-		go func() {
348
-			// TODO: update logrus and use logger.WriterLevel
349
-			io.Copy(w, iop.Stderr)
350
-		}()
351
-		return nil
352
-	}
336
+func makeLoggerStreams(id string) (stdout, stderr io.WriteCloser) {
337
+	logger := logrus.New()
338
+	logger.Hooks.Add(logHook{id})
339
+	return logger.WriterLevel(logrus.InfoLevel), logger.WriterLevel(logrus.ErrorLevel)
353 340
 }
354 341
 
355 342
 func validatePrivileges(requiredPrivileges, privileges types.PluginPrivileges) error {
... ...
@@ -11,7 +11,6 @@ import (
11 11
 
12 12
 	"github.com/docker/docker/api/types"
13 13
 	"github.com/docker/docker/daemon/initlayer"
14
-	"github.com/docker/docker/libcontainerd"
15 14
 	"github.com/docker/docker/pkg/containerfs"
16 15
 	"github.com/docker/docker/pkg/idtools"
17 16
 	"github.com/docker/docker/pkg/mount"
... ...
@@ -63,7 +62,8 @@ func (pm *Manager) enable(p *v2.Plugin, c *controller, force bool) error {
63 63
 		return errors.WithStack(err)
64 64
 	}
65 65
 
66
-	if err := pm.containerdClient.Create(p.GetID(), "", "", *spec, attachToLog(p.GetID())); err != nil {
66
+	stdout, stderr := makeLoggerStreams(p.GetID())
67
+	if err := pm.executor.Create(p.GetID(), *spec, stdout, stderr); err != nil {
67 68
 		if p.PropagatedMount != "" {
68 69
 			if err := mount.Unmount(p.PropagatedMount); err != nil {
69 70
 				logrus.Warnf("Could not unmount %s: %v", p.PropagatedMount, err)
... ...
@@ -83,7 +83,7 @@ func (pm *Manager) pluginPostStart(p *v2.Plugin, c *controller) error {
83 83
 	client, err := plugins.NewClientWithTimeout("unix://"+sockAddr, nil, time.Duration(c.timeoutInSecs)*time.Second)
84 84
 	if err != nil {
85 85
 		c.restart = false
86
-		shutdownPlugin(p, c, pm.containerdClient)
86
+		shutdownPlugin(p, c, pm.executor)
87 87
 		return errors.WithStack(err)
88 88
 	}
89 89
 
... ...
@@ -109,7 +109,7 @@ func (pm *Manager) pluginPostStart(p *v2.Plugin, c *controller) error {
109 109
 			c.restart = false
110 110
 			// While restoring plugins, we need to explicitly set the state to disabled
111 111
 			pm.config.Store.SetState(p, false)
112
-			shutdownPlugin(p, c, pm.containerdClient)
112
+			shutdownPlugin(p, c, pm.executor)
113 113
 			return err
114 114
 		}
115 115
 
... ...
@@ -121,13 +121,14 @@ func (pm *Manager) pluginPostStart(p *v2.Plugin, c *controller) error {
121 121
 }
122 122
 
123 123
 func (pm *Manager) restore(p *v2.Plugin) error {
124
-	if err := pm.containerdClient.Restore(p.GetID(), attachToLog(p.GetID())); err != nil {
124
+	stdout, stderr := makeLoggerStreams(p.GetID())
125
+	if err := pm.executor.Restore(p.GetID(), stdout, stderr); err != nil {
125 126
 		return err
126 127
 	}
127 128
 
128 129
 	if pm.config.LiveRestoreEnabled {
129 130
 		c := &controller{}
130
-		if pids, _ := pm.containerdClient.GetPidsForContainer(p.GetID()); len(pids) == 0 {
131
+		if isRunning, _ := pm.executor.IsRunning(p.GetID()); !isRunning {
131 132
 			// plugin is not running, so follow normal startup procedure
132 133
 			return pm.enable(p, c, true)
133 134
 		}
... ...
@@ -143,10 +144,10 @@ func (pm *Manager) restore(p *v2.Plugin) error {
143 143
 	return nil
144 144
 }
145 145
 
146
-func shutdownPlugin(p *v2.Plugin, c *controller, containerdClient libcontainerd.Client) {
146
+func shutdownPlugin(p *v2.Plugin, c *controller, executor Executor) {
147 147
 	pluginID := p.GetID()
148 148
 
149
-	err := containerdClient.Signal(pluginID, int(unix.SIGTERM))
149
+	err := executor.Signal(pluginID, int(unix.SIGTERM))
150 150
 	if err != nil {
151 151
 		logrus.Errorf("Sending SIGTERM to plugin failed with error: %v", err)
152 152
 	} else {
... ...
@@ -155,7 +156,7 @@ func shutdownPlugin(p *v2.Plugin, c *controller, containerdClient libcontainerd.
155 155
 			logrus.Debug("Clean shutdown of plugin")
156 156
 		case <-time.After(time.Second * 10):
157 157
 			logrus.Debug("Force shutdown plugin")
158
-			if err := containerdClient.Signal(pluginID, int(unix.SIGKILL)); err != nil {
158
+			if err := executor.Signal(pluginID, int(unix.SIGKILL)); err != nil {
159 159
 				logrus.Errorf("Sending SIGKILL to plugin failed with error: %v", err)
160 160
 			}
161 161
 		}
... ...
@@ -175,7 +176,7 @@ func (pm *Manager) disable(p *v2.Plugin, c *controller) error {
175 175
 	}
176 176
 
177 177
 	c.restart = false
178
-	shutdownPlugin(p, c, pm.containerdClient)
178
+	shutdownPlugin(p, c, pm.executor)
179 179
 	pm.config.Store.SetState(p, false)
180 180
 	return pm.save(p)
181 181
 }
... ...
@@ -192,9 +193,9 @@ func (pm *Manager) Shutdown() {
192 192
 			logrus.Debug("Plugin active when liveRestore is set, skipping shutdown")
193 193
 			continue
194 194
 		}
195
-		if pm.containerdClient != nil && p.IsEnabled() {
195
+		if pm.executor != nil && p.IsEnabled() {
196 196
 			c.restart = false
197
-			shutdownPlugin(p, c, pm.containerdClient)
197
+			shutdownPlugin(p, c, pm.executor)
198 198
 		}
199 199
 	}
200 200
 	mount.Unmount(pm.config.Root)