Browse code

Implement plugin restore after daemon restart

This ensures that:

- The in-memory plugin store is populated with all the plugins
- Plugins which were active before daemon restart are active after.
This utilizes the liverestore feature when available, otherwise it
manually starts the plugin.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
(cherry picked from commit dfd91873056c172ffc061d882da0cd18204b521a)

Brian Goff authored on 2016/06/16 02:39:33
Showing 5 changed files
... ...
@@ -10,5 +10,5 @@ import (
10 10
 )
11 11
 
12 12
 func pluginInit(config *daemon.Config, remote libcontainerd.Remote, rs registry.Service) error {
13
-	return plugin.Init(config.Root, config.ExecRoot, remote, rs)
13
+	return plugin.Init(config.Root, config.ExecRoot, remote, rs, config.LiveRestore)
14 14
 }
... ...
@@ -40,7 +40,7 @@ func (pm *Manager) Inspect(name string) (tp types.Plugin, err error) {
40 40
 	if err != nil {
41 41
 		return tp, err
42 42
 	}
43
-	return p.p, nil
43
+	return p.P, nil
44 44
 }
45 45
 
46 46
 // Pull pulls a plugin and enables it.
... ...
@@ -86,14 +86,14 @@ func (pm *Manager) Pull(name string, metaHeader http.Header, authConfig *types.A
86 86
 	pm.save()
87 87
 	pm.Unlock()
88 88
 
89
-	return computePrivileges(&p.p.Manifest), nil
89
+	return computePrivileges(&p.P.Manifest), nil
90 90
 }
91 91
 
92 92
 // List displays the list of plugins and associated metadata.
93 93
 func (pm *Manager) List() ([]types.Plugin, error) {
94 94
 	out := make([]types.Plugin, 0, len(pm.plugins))
95 95
 	for _, p := range pm.plugins {
96
-		out = append(out, p.p)
96
+		out = append(out, p.P)
97 97
 	}
98 98
 	return out, nil
99 99
 }
... ...
@@ -101,7 +101,7 @@ func (pm *Manager) List() ([]types.Plugin, error) {
101 101
 // Push pushes a plugin to the store.
102 102
 func (pm *Manager) Push(name string, metaHeader http.Header, authConfig *types.AuthConfig) error {
103 103
 	p, err := pm.get(name)
104
-	dest := filepath.Join(pm.libRoot, p.p.ID)
104
+	dest := filepath.Join(pm.libRoot, p.P.ID)
105 105
 	config, err := os.Open(filepath.Join(dest, "manifest.json"))
106 106
 	if err != nil {
107 107
 		return err
... ...
@@ -46,7 +46,7 @@ func (e ErrInadequateCapability) Error() string {
46 46
 
47 47
 type plugin struct {
48 48
 	//sync.RWMutex TODO
49
-	p                 types.Plugin
49
+	P                 types.Plugin `json:"plugin"`
50 50
 	client            *plugins.Client
51 51
 	restartManager    restartmanager.RestartManager
52 52
 	stateSourcePath   string
... ...
@@ -58,17 +58,17 @@ func (p *plugin) Client() *plugins.Client {
58 58
 }
59 59
 
60 60
 func (p *plugin) Name() string {
61
-	name := p.p.Name
62
-	if len(p.p.Tag) > 0 {
61
+	name := p.P.Name
62
+	if len(p.P.Tag) > 0 {
63 63
 		// TODO: this feels hacky, maybe we should be storing the distribution reference rather than splitting these
64
-		name += ":" + p.p.Tag
64
+		name += ":" + p.P.Tag
65 65
 	}
66 66
 	return name
67 67
 }
68 68
 
69 69
 func (pm *Manager) newPlugin(ref reference.Named, id string) *plugin {
70 70
 	p := &plugin{
71
-		p: types.Plugin{
71
+		P: types.Plugin{
72 72
 			Name: ref.Name(),
73 73
 			ID:   id,
74 74
 		},
... ...
@@ -76,12 +76,20 @@ func (pm *Manager) newPlugin(ref reference.Named, id string) *plugin {
76 76
 		runtimeSourcePath: filepath.Join(pm.runRoot, id),
77 77
 	}
78 78
 	if ref, ok := ref.(reference.NamedTagged); ok {
79
-		p.p.Tag = ref.Tag()
79
+		p.P.Tag = ref.Tag()
80 80
 	}
81 81
 	return p
82 82
 }
83 83
 
84
-// TODO: figure out why save() doesn't json encode *plugin object
84
+func (pm *Manager) restorePlugin(p *plugin) error {
85
+	p.stateSourcePath = filepath.Join(pm.libRoot, p.P.ID, "state")
86
+	p.runtimeSourcePath = filepath.Join(pm.runRoot, p.P.ID)
87
+	if p.P.Active {
88
+		return pm.restore(p)
89
+	}
90
+	return nil
91
+}
92
+
85 93
 type pluginMap map[string]*plugin
86 94
 
87 95
 // Manager controls the plugin subsystem.
... ...
@@ -95,6 +103,7 @@ type Manager struct {
95 95
 	containerdClient libcontainerd.Client
96 96
 	registryService  registry.Service
97 97
 	handleLegacy     bool
98
+	liveRestore      bool
98 99
 }
99 100
 
100 101
 // GetManager returns the singleton plugin Manager
... ...
@@ -104,7 +113,7 @@ func GetManager() *Manager {
104 104
 
105 105
 // Init (was NewManager) instantiates the singleton Manager.
106 106
 // TODO: revert this to NewManager once we get rid of all the singletons.
107
-func Init(root, execRoot string, remote libcontainerd.Remote, rs registry.Service) (err error) {
107
+func Init(root, execRoot string, remote libcontainerd.Remote, rs registry.Service, liveRestore bool) (err error) {
108 108
 	if manager != nil {
109 109
 		return nil
110 110
 	}
... ...
@@ -125,17 +134,18 @@ func Init(root, execRoot string, remote libcontainerd.Remote, rs registry.Servic
125 125
 		handlers:        make(map[string]func(string, *plugins.Client)),
126 126
 		registryService: rs,
127 127
 		handleLegacy:    true,
128
+		liveRestore:     liveRestore,
128 129
 	}
129 130
 	if err := os.MkdirAll(manager.runRoot, 0700); err != nil {
130 131
 		return err
131 132
 	}
132
-	if err := manager.init(); err != nil {
133
-		return err
134
-	}
135 133
 	manager.containerdClient, err = remote.Client(manager)
136 134
 	if err != nil {
137 135
 		return err
138 136
 	}
137
+	if err := manager.init(); err != nil {
138
+		return err
139
+	}
139 140
 	return nil
140 141
 }
141 142
 
... ...
@@ -170,7 +180,7 @@ func FindWithCapability(capability string) ([]Plugin, error) {
170 170
 		defer manager.RUnlock()
171 171
 	pluginLoop:
172 172
 		for _, p := range manager.plugins {
173
-			for _, typ := range p.p.Manifest.Interface.Types {
173
+			for _, typ := range p.P.Manifest.Interface.Types {
174 174
 				if typ.Capability != capability || typ.Prefix != "docker" {
175 175
 					continue pluginLoop
176 176
 				}
... ...
@@ -221,7 +231,7 @@ func LookupWithCapability(name, capability string) (Plugin, error) {
221 221
 	}
222 222
 
223 223
 	capability = strings.ToLower(capability)
224
-	for _, typ := range p.p.Manifest.Interface.Types {
224
+	for _, typ := range p.P.Manifest.Interface.Types {
225 225
 		if typ.Capability == capability && typ.Prefix == "docker" {
226 226
 			return p, nil
227 227
 		}
... ...
@@ -262,55 +272,78 @@ func (pm *Manager) init() error {
262 262
 		}
263 263
 		return err
264 264
 	}
265
-	// TODO: Populate pm.plugins
266
-	if err := json.NewDecoder(dt).Decode(&pm.nameToID); err != nil {
265
+
266
+	if err := json.NewDecoder(dt).Decode(&pm.plugins); err != nil {
267 267
 		return err
268 268
 	}
269
-	// FIXME: validate, restore
270 269
 
271
-	return nil
270
+	var group sync.WaitGroup
271
+	group.Add(len(pm.plugins))
272
+	for _, p := range pm.plugins {
273
+		go func(p *plugin) {
274
+			defer group.Done()
275
+			if err := pm.restorePlugin(p); err != nil {
276
+				logrus.Errorf("Error restoring plugin '%s': %s", p.Name(), err)
277
+				return
278
+			}
279
+
280
+			pm.Lock()
281
+			pm.nameToID[p.Name()] = p.P.ID
282
+			requiresManualRestore := !pm.liveRestore && p.P.Active
283
+			pm.Unlock()
284
+
285
+			if requiresManualRestore {
286
+				// if liveRestore is not enabled, the plugin will be stopped now so we should enable it
287
+				if err := pm.enable(p); err != nil {
288
+					logrus.Errorf("Error restoring plugin '%s': %s", p.Name(), err)
289
+				}
290
+			}
291
+		}(p)
292
+		group.Wait()
293
+	}
294
+	return pm.save()
272 295
 }
273 296
 
274 297
 func (pm *Manager) initPlugin(p *plugin) error {
275
-	dt, err := os.Open(filepath.Join(pm.libRoot, p.p.ID, "manifest.json"))
298
+	dt, err := os.Open(filepath.Join(pm.libRoot, p.P.ID, "manifest.json"))
276 299
 	if err != nil {
277 300
 		return err
278 301
 	}
279
-	err = json.NewDecoder(dt).Decode(&p.p.Manifest)
302
+	err = json.NewDecoder(dt).Decode(&p.P.Manifest)
280 303
 	dt.Close()
281 304
 	if err != nil {
282 305
 		return err
283 306
 	}
284 307
 
285
-	p.p.Config.Mounts = make([]types.PluginMount, len(p.p.Manifest.Mounts))
286
-	for i, mount := range p.p.Manifest.Mounts {
287
-		p.p.Config.Mounts[i] = mount
308
+	p.P.Config.Mounts = make([]types.PluginMount, len(p.P.Manifest.Mounts))
309
+	for i, mount := range p.P.Manifest.Mounts {
310
+		p.P.Config.Mounts[i] = mount
288 311
 	}
289
-	p.p.Config.Env = make([]string, 0, len(p.p.Manifest.Env))
290
-	for _, env := range p.p.Manifest.Env {
312
+	p.P.Config.Env = make([]string, 0, len(p.P.Manifest.Env))
313
+	for _, env := range p.P.Manifest.Env {
291 314
 		if env.Value != nil {
292
-			p.p.Config.Env = append(p.p.Config.Env, fmt.Sprintf("%s=%s", env.Name, *env.Value))
315
+			p.P.Config.Env = append(p.P.Config.Env, fmt.Sprintf("%s=%s", env.Name, *env.Value))
293 316
 		}
294 317
 	}
295
-	copy(p.p.Config.Args, p.p.Manifest.Args.Value)
318
+	copy(p.P.Config.Args, p.P.Manifest.Args.Value)
296 319
 
297
-	f, err := os.Create(filepath.Join(pm.libRoot, p.p.ID, "plugin-config.json"))
320
+	f, err := os.Create(filepath.Join(pm.libRoot, p.P.ID, "plugin-config.json"))
298 321
 	if err != nil {
299 322
 		return err
300 323
 	}
301
-	err = json.NewEncoder(f).Encode(&p.p.Config)
324
+	err = json.NewEncoder(f).Encode(&p.P.Config)
302 325
 	f.Close()
303 326
 	return err
304 327
 }
305 328
 
306 329
 func (pm *Manager) remove(p *plugin) error {
307
-	if p.p.Active {
330
+	if p.P.Active {
308 331
 		return fmt.Errorf("plugin %s is active", p.Name())
309 332
 	}
310 333
 	pm.Lock() // fixme: lock single record
311 334
 	defer pm.Unlock()
312 335
 	os.RemoveAll(p.stateSourcePath)
313
-	delete(pm.plugins, p.p.ID)
336
+	delete(pm.plugins, p.P.ID)
314 337
 	delete(pm.nameToID, p.Name())
315 338
 	pm.save()
316 339
 	return nil
... ...
@@ -332,7 +365,7 @@ func (pm *Manager) set(p *plugin, args []string) error {
332 332
 func (pm *Manager) save() error {
333 333
 	filePath := filepath.Join(pm.libRoot, "plugins.json")
334 334
 
335
-	jsonData, err := json.Marshal(pm.nameToID)
335
+	jsonData, err := json.Marshal(pm.plugins)
336 336
 	if err != nil {
337 337
 		logrus.Debugf("Error in json.Marshal: %v", err)
338 338
 		return err
... ...
@@ -25,11 +25,11 @@ func (pm *Manager) enable(p *plugin) error {
25 25
 	}
26 26
 
27 27
 	p.restartManager = restartmanager.New(container.RestartPolicy{Name: "always"}, 0)
28
-	if err := pm.containerdClient.Create(p.p.ID, libcontainerd.Spec(*spec), libcontainerd.WithRestartManager(p.restartManager)); err != nil { // POC-only
28
+	if err := pm.containerdClient.Create(p.P.ID, libcontainerd.Spec(*spec), libcontainerd.WithRestartManager(p.restartManager)); err != nil { // POC-only
29 29
 		return err
30 30
 	}
31 31
 
32
-	socket := p.p.Manifest.Interface.Socket
32
+	socket := p.P.Manifest.Interface.Socket
33 33
 	p.client, err = plugins.NewClient("unix://"+filepath.Join(p.runtimeSourcePath, socket), nil)
34 34
 	if err != nil {
35 35
 		return err
... ...
@@ -38,11 +38,11 @@ func (pm *Manager) enable(p *plugin) error {
38 38
 	//TODO: check net.Dial
39 39
 
40 40
 	pm.Lock() // fixme: lock single record
41
-	p.p.Active = true
41
+	p.P.Active = true
42 42
 	pm.save()
43 43
 	pm.Unlock()
44 44
 
45
-	for _, typ := range p.p.Manifest.Interface.Types {
45
+	for _, typ := range p.P.Manifest.Interface.Types {
46 46
 		if handler := pm.handlers[typ.String()]; handler != nil {
47 47
 			handler(p.Name(), p.Client())
48 48
 		}
... ...
@@ -51,16 +51,21 @@ func (pm *Manager) enable(p *plugin) error {
51 51
 	return nil
52 52
 }
53 53
 
54
+func (pm *Manager) restore(p *plugin) error {
55
+	p.restartManager = restartmanager.New(container.RestartPolicy{Name: "always"}, 0)
56
+	return pm.containerdClient.Restore(p.P.ID, libcontainerd.WithRestartManager(p.restartManager))
57
+}
58
+
54 59
 func (pm *Manager) initSpec(p *plugin) (*specs.Spec, error) {
55 60
 	s := oci.DefaultSpec()
56 61
 
57
-	rootfs := filepath.Join(pm.libRoot, p.p.ID, "rootfs")
62
+	rootfs := filepath.Join(pm.libRoot, p.P.ID, "rootfs")
58 63
 	s.Root = specs.Root{
59 64
 		Path:     rootfs,
60 65
 		Readonly: false, // TODO: all plugins should be readonly? settable in manifest?
61 66
 	}
62 67
 
63
-	mounts := append(p.p.Config.Mounts, types.PluginMount{
68
+	mounts := append(p.P.Config.Mounts, types.PluginMount{
64 69
 		Source:      &p.runtimeSourcePath,
65 70
 		Destination: defaultPluginRuntimeDestination,
66 71
 		Type:        "bind",
... ...
@@ -95,11 +100,11 @@ func (pm *Manager) initSpec(p *plugin) (*specs.Spec, error) {
95 95
 		s.Mounts = append(s.Mounts, m)
96 96
 	}
97 97
 
98
-	envs := make([]string, 1, len(p.p.Config.Env)+1)
98
+	envs := make([]string, 1, len(p.P.Config.Env)+1)
99 99
 	envs[0] = "PATH=" + system.DefaultPathEnv
100
-	envs = append(envs, p.p.Config.Env...)
100
+	envs = append(envs, p.P.Config.Env...)
101 101
 
102
-	args := append(p.p.Manifest.Entrypoint, p.p.Config.Args...)
102
+	args := append(p.P.Manifest.Entrypoint, p.P.Config.Args...)
103 103
 	s.Process = specs.Process{
104 104
 		Terminal: false,
105 105
 		Args:     args,
... ...
@@ -114,13 +119,13 @@ func (pm *Manager) disable(p *plugin) error {
114 114
 	if err := p.restartManager.Cancel(); err != nil {
115 115
 		logrus.Error(err)
116 116
 	}
117
-	if err := pm.containerdClient.Signal(p.p.ID, int(syscall.SIGKILL)); err != nil {
117
+	if err := pm.containerdClient.Signal(p.P.ID, int(syscall.SIGKILL)); err != nil {
118 118
 		logrus.Error(err)
119 119
 	}
120 120
 	os.RemoveAll(p.runtimeSourcePath)
121 121
 	pm.Lock() // fixme: lock single record
122 122
 	defer pm.Unlock()
123
-	p.p.Active = false
123
+	p.P.Active = false
124 124
 	pm.save()
125 125
 	return nil
126 126
 }
... ...
@@ -19,3 +19,7 @@ func (pm *Manager) initSpec(p *plugin) (*specs.Spec, error) {
19 19
 func (pm *Manager) disable(p *plugin) error {
20 20
 	return fmt.Errorf("Not implemented")
21 21
 }
22
+
23
+func (pm *Manager) restore(p *plugin) error {
24
+	return fmt.Errorf("Not implemented")
25
+}