plugin/manager.go
f3711704
 package plugin
 
 import (
 	"encoding/json"
 	"io"
3d86b0c7
 	"io/ioutil"
f3711704
 	"os"
 	"path/filepath"
3d86b0c7
 	"reflect"
 	"regexp"
42c5c1a9
 	"runtime"
dafeeac4
 	"sort"
c54b717c
 	"strings"
f3711704
 	"sync"
 
0421f517
 	"github.com/docker/distribution/reference"
3d86b0c7
 	"github.com/docker/docker/api/types"
 	"github.com/docker/docker/image"
 	"github.com/docker/docker/layer"
38de272b
 	"github.com/docker/docker/pkg/authorization"
3d86b0c7
 	"github.com/docker/docker/pkg/ioutils"
c54b717c
 	"github.com/docker/docker/pkg/mount"
72c3bcf2
 	"github.com/docker/docker/pkg/pubsub"
42c5c1a9
 	"github.com/docker/docker/pkg/system"
27a55fba
 	"github.com/docker/docker/plugin/v2"
f3711704
 	"github.com/docker/docker/registry"
7a855799
 	"github.com/opencontainers/go-digest"
c85e8622
 	specs "github.com/opencontainers/runtime-spec/specs-go"
3d86b0c7
 	"github.com/pkg/errors"
1009e6a4
 	"github.com/sirupsen/logrus"
f3711704
 )
 
3d86b0c7
 const configFileName = "config.json"
 const rootFSFileName = "rootfs"
 
 var validFullID = regexp.MustCompile(`^([a-f0-9]{64})$`)
f3711704
 
c85e8622
 // Executor is the interface that the plugin manager uses to interact with for starting/stopping plugins
 type Executor interface {
 	Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error
 	Restore(id string, stdout, stderr io.WriteCloser) error
 	IsRunning(id string) (bool, error)
 	Signal(id string, signal int) error
 }
 
27a55fba
 func (pm *Manager) restorePlugin(p *v2.Plugin) error {
 	if p.IsEnabled() {
dfd91873
 		return pm.restore(p)
 	}
 	return nil
 }
 
42abccb8
 type eventLogger func(id, name, action string)
f3711704
 
3d86b0c7
 // ManagerConfig defines configuration needed to start new manager.
 type ManagerConfig struct {
 	Store              *Store // remove
 	RegistryService    registry.Service
 	LiveRestoreEnabled bool // TODO: remove
 	LogPluginEvent     eventLogger
 	Root               string
 	ExecRoot           string
c85e8622
 	CreateExecutor     ExecutorCreator
38de272b
 	AuthzMiddleware    *authorization.Middleware
3d86b0c7
 }
 
c85e8622
 // ExecutorCreator is used in the manager config to pass in an `Executor`
 type ExecutorCreator func(*Manager) (Executor, error)
 
f3711704
 // Manager controls the plugin subsystem.
 type Manager struct {
c85e8622
 	config    ManagerConfig
 	mu        sync.RWMutex // protects cMap
 	muGC      sync.RWMutex // protects blobstore deletions
 	cMap      map[*v2.Plugin]*controller
 	blobStore *basicBlobStore
 	publisher *pubsub.Publisher
 	executor  Executor
b35490a8
 }
 
 // controller represents the manager's control on a plugin.
 type controller struct {
 	restart       bool
 	exitChan      chan bool
 	timeoutInSecs int
f3711704
 }
 
3d86b0c7
 // pluginRegistryService ensures that all resolved repositories
 // are of the plugin class.
 type pluginRegistryService struct {
 	registry.Service
f3711704
 }
 
3d86b0c7
 func (s pluginRegistryService) ResolveRepository(name reference.Named) (repoInfo *registry.RepositoryInfo, err error) {
 	repoInfo, err = s.Service.ResolveRepository(name)
 	if repoInfo != nil {
 		repoInfo.Class = "plugin"
f3711704
 	}
3d86b0c7
 	return
 }
f3711704
 
3d86b0c7
 // NewManager returns a new plugin manager.
 func NewManager(config ManagerConfig) (*Manager, error) {
 	if config.RegistryService != nil {
 		config.RegistryService = pluginRegistryService{config.RegistryService}
f3711704
 	}
3d86b0c7
 	manager := &Manager{
 		config: config,
f3711704
 	}
d75f1d84
 	for _, dirName := range []string{manager.config.Root, manager.config.ExecRoot, manager.tmpDir()} {
 		if err := os.MkdirAll(dirName, 0700); err != nil {
 			return nil, errors.Wrapf(err, "failed to mkdir %v", dirName)
 		}
3d86b0c7
 	}
0c2821d6
 
 	if err := setupRoot(manager.config.Root); err != nil {
 		return nil, err
 	}
 
3d86b0c7
 	var err error
c85e8622
 	manager.executor, err = config.CreateExecutor(manager)
f3711704
 	if err != nil {
c85e8622
 		return nil, err
f3711704
 	}
c85e8622
 
3d86b0c7
 	manager.blobStore, err = newBasicBlobStore(filepath.Join(manager.config.Root, "storage/blobs"))
 	if err != nil {
 		return nil, err
 	}
 
b35490a8
 	manager.cMap = make(map[*v2.Plugin]*controller)
3d86b0c7
 	if err := manager.reload(); err != nil {
 		return nil, errors.Wrap(err, "failed to restore plugins")
 	}
72c3bcf2
 
 	manager.publisher = pubsub.NewPublisher(0, 0)
3d86b0c7
 	return manager, nil
 }
 
 func (pm *Manager) tmpDir() string {
 	return filepath.Join(pm.config.Root, "tmp")
f3711704
 }
 
c85e8622
 // HandleExitEvent is called when the executor receives the exit event
 // In the future we may change this, but for now all we care about is the exit event.
 func (pm *Manager) HandleExitEvent(id string) error {
 	p, err := pm.config.Store.GetV2Plugin(id)
 	if err != nil {
 		return err
 	}
b35490a8
 
c85e8622
 	os.RemoveAll(filepath.Join(pm.config.ExecRoot, id))
c54b717c
 
c85e8622
 	if p.PropagatedMount != "" {
 		if err := mount.Unmount(p.PropagatedMount); err != nil {
 			logrus.Warnf("Could not unmount %s: %v", p.PropagatedMount, err)
c54b717c
 		}
c85e8622
 		propRoot := filepath.Join(filepath.Dir(p.Rootfs), "propagated-mount")
 		if err := mount.Unmount(propRoot); err != nil {
 			logrus.Warn("Could not unmount %s: %v", propRoot, err)
70b76266
 		}
c85e8622
 	}
70b76266
 
c85e8622
 	pm.mu.RLock()
 	c := pm.cMap[p]
 	if c.exitChan != nil {
 		close(c.exitChan)
863ab9ab
 	}
c85e8622
 	restart := c.restart
 	pm.mu.RUnlock()
863ab9ab
 
c85e8622
 	if restart {
 		pm.enable(p, c, true)
 	}
f3711704
 	return nil
 }
 
11cf394e
 func handleLoadError(err error, id string) {
 	if err == nil {
 		return
 	}
 	logger := logrus.WithError(err).WithField("id", id)
 	if os.IsNotExist(errors.Cause(err)) {
 		// Likely some error while removing on an older version of docker
 		logger.Warn("missing plugin config, skipping: this may be caused due to a failed remove and requires manual cleanup.")
 		return
 	}
 	logger.Error("error loading plugin, skipping")
 }
 
3d86b0c7
 func (pm *Manager) reload() error { // todo: restore
 	dir, err := ioutil.ReadDir(pm.config.Root)
f3711704
 	if err != nil {
3d86b0c7
 		return errors.Wrapf(err, "failed to read %v", pm.config.Root)
f3711704
 	}
27a55fba
 	plugins := make(map[string]*v2.Plugin)
3d86b0c7
 	for _, v := range dir {
 		if validFullID.MatchString(v.Name()) {
 			p, err := pm.loadPlugin(v.Name())
 			if err != nil {
11cf394e
 				handleLoadError(err, v.Name())
 				continue
3d86b0c7
 			}
 			plugins[p.GetID()] = p
11cf394e
 		} else {
 			if validFullID.MatchString(strings.TrimSuffix(v.Name(), "-removing")) {
 				// There was likely some error while removing this plugin, let's try to remove again here
 				if err := system.EnsureRemoveAll(v.Name()); err != nil {
 					logrus.WithError(err).WithField("id", v.Name()).Warn("error while attempting to clean up previously removed plugin")
 				}
 			}
3d86b0c7
 		}
f3711704
 	}
 
3d86b0c7
 	pm.config.Store.SetAll(plugins)
 
 	var wg sync.WaitGroup
 	wg.Add(len(plugins))
27a55fba
 	for _, p := range plugins {
3d86b0c7
 		c := &controller{} // todo: remove this
b35490a8
 		pm.cMap[p] = c
27a55fba
 		go func(p *v2.Plugin) {
3d86b0c7
 			defer wg.Done()
dfd91873
 			if err := pm.restorePlugin(p); err != nil {
0016b331
 				logrus.Errorf("failed to restore plugin '%s': %s", p.Name(), err)
dfd91873
 				return
 			}
 
c54b717c
 			if p.Rootfs != "" {
3d86b0c7
 				p.Rootfs = filepath.Join(pm.config.Root, p.PluginObj.ID, "rootfs")
c54b717c
 			}
 
 			// We should only enable rootfs propagation for certain plugin types that need it.
 			for _, typ := range p.PluginObj.Config.Interface.Types {
50021047
 				if (typ.Capability == "volumedriver" || typ.Capability == "graphdriver") && typ.Prefix == "docker" && strings.HasPrefix(typ.Version, "1.") {
c54b717c
 					if p.PluginObj.Config.PropagatedMount != "" {
e8307b86
 						propRoot := filepath.Join(filepath.Dir(p.Rootfs), "propagated-mount")
 
 						// check if we need to migrate an older propagated mount from before
 						// these mounts were stored outside the plugin rootfs
 						if _, err := os.Stat(propRoot); os.IsNotExist(err) {
 							if _, err := os.Stat(p.PropagatedMount); err == nil {
 								// make sure nothing is mounted here
 								// don't care about errors
 								mount.Unmount(p.PropagatedMount)
 								if err := os.Rename(p.PropagatedMount, propRoot); err != nil {
 									logrus.WithError(err).WithField("dir", propRoot).Error("error migrating propagated mount storage")
 								}
 								if err := os.MkdirAll(p.PropagatedMount, 0755); err != nil {
 									logrus.WithError(err).WithField("dir", p.PropagatedMount).Error("error migrating propagated mount storage")
 								}
 							}
 						}
 
 						if err := os.MkdirAll(propRoot, 0755); err != nil {
 							logrus.Errorf("failed to create PropagatedMount directory at %s: %v", propRoot, err)
 						}
c54b717c
 						// TODO: sanitize PropagatedMount and prevent breakout
 						p.PropagatedMount = filepath.Join(p.Rootfs, p.PluginObj.Config.PropagatedMount)
 						if err := os.MkdirAll(p.PropagatedMount, 0755); err != nil {
 							logrus.Errorf("failed to create PropagatedMount directory at %s: %v", p.PropagatedMount, err)
 							return
 						}
 					}
 				}
 			}
 
3d86b0c7
 			pm.save(p)
 			requiresManualRestore := !pm.config.LiveRestoreEnabled && p.IsEnabled()
dfd91873
 
 			if requiresManualRestore {
 				// if liveRestore is not enabled, the plugin will be stopped now so we should enable it
b35490a8
 				if err := pm.enable(p, c, true); err != nil {
0016b331
 					logrus.Errorf("failed to enable plugin '%s': %s", p.Name(), err)
dfd91873
 				}
 			}
 		}(p)
 	}
3d86b0c7
 	wg.Wait()
f3711704
 	return nil
 }
 
72c3bcf2
 // Get looks up the requested plugin in the store.
 func (pm *Manager) Get(idOrName string) (*v2.Plugin, error) {
 	return pm.config.Store.GetV2Plugin(idOrName)
 }
 
3d86b0c7
 func (pm *Manager) loadPlugin(id string) (*v2.Plugin, error) {
 	p := filepath.Join(pm.config.Root, id, configFileName)
 	dt, err := ioutil.ReadFile(p)
 	if err != nil {
 		return nil, errors.Wrapf(err, "error reading %v", p)
 	}
 	var plugin v2.Plugin
 	if err := json.Unmarshal(dt, &plugin); err != nil {
 		return nil, errors.Wrapf(err, "error decoding %v", p)
 	}
 	return &plugin, nil
 }
 
 func (pm *Manager) save(p *v2.Plugin) error {
 	pluginJSON, err := json.Marshal(p)
 	if err != nil {
 		return errors.Wrap(err, "failed to marshal plugin json")
 	}
 	if err := ioutils.AtomicWriteFile(filepath.Join(pm.config.Root, p.GetID(), configFileName), pluginJSON, 0600); err != nil {
26d0bac8
 		return errors.Wrap(err, "failed to write atomically plugin json")
3d86b0c7
 	}
 	return nil
 }
 
39bcaee4
 // GC cleans up unreferenced blobs. This is recommended to run in a goroutine
3d86b0c7
 func (pm *Manager) GC() {
 	pm.muGC.Lock()
 	defer pm.muGC.Unlock()
 
 	whitelist := make(map[digest.Digest]struct{})
 	for _, p := range pm.config.Store.GetAll() {
 		whitelist[p.Config] = struct{}{}
 		for _, b := range p.Blobsums {
 			whitelist[b] = struct{}{}
 		}
 	}
 
 	pm.blobStore.gc(whitelist)
 }
 
f3711704
 type logHook struct{ id string }
 
 func (logHook) Levels() []logrus.Level {
 	return logrus.AllLevels
 }
 
 func (l logHook) Fire(entry *logrus.Entry) error {
 	entry.Data = logrus.Fields{"plugin": l.id}
 	return nil
 }
37a3be24
 
c85e8622
 func makeLoggerStreams(id string) (stdout, stderr io.WriteCloser) {
 	logger := logrus.New()
 	logger.Hooks.Add(logHook{id})
 	return logger.WriterLevel(logrus.InfoLevel), logger.WriterLevel(logrus.ErrorLevel)
37a3be24
 }
3d86b0c7
 
 func validatePrivileges(requiredPrivileges, privileges types.PluginPrivileges) error {
dafeeac4
 	if !isEqual(requiredPrivileges, privileges, isEqualPrivilege) {
3d86b0c7
 		return errors.New("incorrect privileges")
 	}
dafeeac4
 
3d86b0c7
 	return nil
 }
 
dafeeac4
 func isEqual(arrOne, arrOther types.PluginPrivileges, compare func(x, y types.PluginPrivilege) bool) bool {
 	if len(arrOne) != len(arrOther) {
 		return false
 	}
 
 	sort.Sort(arrOne)
 	sort.Sort(arrOther)
 
 	for i := 1; i < arrOne.Len(); i++ {
 		if !compare(arrOne[i], arrOther[i]) {
 			return false
 		}
 	}
 
 	return true
 }
 
 func isEqualPrivilege(a, b types.PluginPrivilege) bool {
 	if a.Name != b.Name {
 		return false
 	}
 
 	return reflect.DeepEqual(a.Value, b.Value)
 }
 
ce8e529e
 func configToRootFS(c []byte) (*image.RootFS, string, error) {
 	// TODO @jhowardmsft LCOW - Will need to revisit this.
 	os := runtime.GOOS
3d86b0c7
 	var pluginConfig types.PluginConfig
 	if err := json.Unmarshal(c, &pluginConfig); err != nil {
42c5c1a9
 		return nil, "", err
3d86b0c7
 	}
6c7cb520
 	// validation for empty rootfs is in distribution code
 	if pluginConfig.Rootfs == nil {
0380fbff
 		return nil, os, nil
6c7cb520
 	}
3d86b0c7
 
0380fbff
 	return rootFSFromPlugin(pluginConfig.Rootfs), os, nil
3d86b0c7
 }
 
 func rootFSFromPlugin(pluginfs *types.PluginConfigRootfs) *image.RootFS {
 	rootFS := image.RootFS{
 		Type:    pluginfs.Type,
 		DiffIDs: make([]layer.DiffID, len(pluginfs.DiffIds)),
 	}
 	for i := range pluginfs.DiffIds {
 		rootFS.DiffIDs[i] = layer.DiffID(pluginfs.DiffIds[i])
 	}
 
 	return &rootFS
 }