// +build linux package linux import ( "context" "fmt" "io/ioutil" "os" "path/filepath" "time" "github.com/boltdb/bolt" eventstypes "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/api/types" "github.com/containerd/containerd/containers" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events/exchange" "github.com/containerd/containerd/identifiers" "github.com/containerd/containerd/linux/proc" "github.com/containerd/containerd/linux/runctypes" shim "github.com/containerd/containerd/linux/shim/v1" "github.com/containerd/containerd/log" "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/platforms" "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/reaper" "github.com/containerd/containerd/runtime" "github.com/containerd/containerd/sys" runc "github.com/containerd/go-runc" "github.com/containerd/typeurl" ptypes "github.com/gogo/protobuf/types" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" "golang.org/x/sys/unix" ) var ( pluginID = fmt.Sprintf("%s.%s", plugin.RuntimePlugin, "linux") empty = &ptypes.Empty{} ) const ( configFilename = "config.json" defaultRuntime = "runc" defaultShim = "containerd-shim" ) func init() { plugin.Register(&plugin.Registration{ Type: plugin.RuntimePlugin, ID: "linux", InitFn: New, Requires: []plugin.Type{ plugin.TaskMonitorPlugin, plugin.MetadataPlugin, }, Config: &Config{ Shim: defaultShim, Runtime: defaultRuntime, }, }) } var _ = (runtime.Runtime)(&Runtime{}) // Config options for the runtime type Config struct { // Shim is a path or name of binary implementing the Shim GRPC API Shim string `toml:"shim"` // Runtime is a path or name of an OCI runtime used by the shim Runtime string `toml:"runtime"` // RuntimeRoot is the path that shall be used by the OCI runtime for its data RuntimeRoot string `toml:"runtime_root"` // NoShim calls runc directly from within the pkg NoShim bool `toml:"no_shim"` // Debug enable debug on the shim ShimDebug bool `toml:"shim_debug"` } // New returns a configured runtime func New(ic *plugin.InitContext) (interface{}, error) { ic.Meta.Platforms = []ocispec.Platform{platforms.DefaultSpec()} if err := os.MkdirAll(ic.Root, 0711); err != nil { return nil, err } if err := os.MkdirAll(ic.State, 0711); err != nil { return nil, err } monitor, err := ic.Get(plugin.TaskMonitorPlugin) if err != nil { return nil, err } m, err := ic.Get(plugin.MetadataPlugin) if err != nil { return nil, err } cfg := ic.Config.(*Config) r := &Runtime{ root: ic.Root, state: ic.State, monitor: monitor.(runtime.TaskMonitor), tasks: runtime.NewTaskList(), db: m.(*metadata.DB), address: ic.Address, events: ic.Events, config: cfg, } tasks, err := r.restoreTasks(ic.Context) if err != nil { return nil, err } // TODO: need to add the tasks to the monitor for _, t := range tasks { if err := r.tasks.AddWithNamespace(t.namespace, t); err != nil { return nil, err } } return r, nil } // Runtime for a linux based system type Runtime struct { root string state string address string monitor runtime.TaskMonitor tasks *runtime.TaskList db *metadata.DB events *exchange.Exchange config *Config } // ID of the runtime func (r *Runtime) ID() string { return pluginID } // Create a new task func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts) (_ runtime.Task, err error) { namespace, err := namespaces.NamespaceRequired(ctx) if err != nil { return nil, err } if err := identifiers.Validate(id); err != nil { return nil, errors.Wrapf(err, "invalid task id") } ropts, err := r.getRuncOptions(ctx, id) if err != nil { return nil, err } ec := reaper.Default.Subscribe() defer reaper.Default.Unsubscribe(ec) bundle, err := newBundle(id, filepath.Join(r.state, namespace), filepath.Join(r.root, namespace), opts.Spec.Value) if err != nil { return nil, err } defer func() { if err != nil { bundle.Delete() } }() shimopt := ShimLocal(r.events) if !r.config.NoShim { var cgroup string if opts.Options != nil { v, err := typeurl.UnmarshalAny(opts.Options) if err != nil { return nil, err } cgroup = v.(*runctypes.CreateOptions).ShimCgroup } exitHandler := func() { log.G(ctx).WithField("id", id).Info("shim reaped") t, err := r.tasks.Get(ctx, id) if err != nil { // Task was never started or was already sucessfully deleted return } lc := t.(*Task) // Stop the monitor if err := r.monitor.Stop(lc); err != nil { log.G(ctx).WithError(err).WithFields(logrus.Fields{ "id": id, "namespace": namespace, }).Warn("failed to stop monitor") } log.G(ctx).WithFields(logrus.Fields{ "id": id, "namespace": namespace, }).Warn("cleaning up after killed shim") err = r.cleanupAfterDeadShim(context.Background(), bundle, namespace, id, lc.pid, ec) if err == nil { r.tasks.Delete(ctx, lc) } else { log.G(ctx).WithError(err).WithFields(logrus.Fields{ "id": id, "namespace": namespace, }).Warn("failed to clen up after killed shim") } } shimopt = ShimRemote(r.config.Shim, r.address, cgroup, r.config.ShimDebug, exitHandler) } s, err := bundle.NewShimClient(ctx, namespace, shimopt, ropts) if err != nil { return nil, err } defer func() { if err != nil { if kerr := s.KillShim(ctx); kerr != nil { log.G(ctx).WithError(err).Error("failed to kill shim") } } }() rt := r.config.Runtime if ropts != nil && ropts.Runtime != "" { rt = ropts.Runtime } sopts := &shim.CreateTaskRequest{ ID: id, Bundle: bundle.path, Runtime: rt, Stdin: opts.IO.Stdin, Stdout: opts.IO.Stdout, Stderr: opts.IO.Stderr, Terminal: opts.IO.Terminal, Checkpoint: opts.Checkpoint, Options: opts.Options, } for _, m := range opts.Rootfs { sopts.Rootfs = append(sopts.Rootfs, &types.Mount{ Type: m.Type, Source: m.Source, Options: m.Options, }) } cr, err := s.Create(ctx, sopts) if err != nil { return nil, errdefs.FromGRPC(err) } t, err := newTask(id, namespace, int(cr.Pid), s, r.monitor, r.events, proc.NewRunc(ropts.RuntimeRoot, sopts.Bundle, namespace, rt, ropts.CriuPath, ropts.SystemdCgroup)) if err != nil { return nil, err } if err := r.tasks.Add(ctx, t); err != nil { return nil, err } // after the task is created, add it to the monitor if it has a cgroup // this can be different on a checkpoint/restore if t.cg != nil { if err = r.monitor.Monitor(t); err != nil { if _, err := r.Delete(ctx, t); err != nil { log.G(ctx).WithError(err).Error("deleting task after failed monitor") } return nil, err } } r.events.Publish(ctx, runtime.TaskCreateEventTopic, &eventstypes.TaskCreate{ ContainerID: sopts.ID, Bundle: sopts.Bundle, Rootfs: sopts.Rootfs, IO: &eventstypes.TaskIO{ Stdin: sopts.Stdin, Stdout: sopts.Stdout, Stderr: sopts.Stderr, Terminal: sopts.Terminal, }, Checkpoint: sopts.Checkpoint, Pid: uint32(t.pid), }) return t, nil } // Delete a task removing all on disk state func (r *Runtime) Delete(ctx context.Context, c runtime.Task) (*runtime.Exit, error) { namespace, err := namespaces.NamespaceRequired(ctx) if err != nil { return nil, err } lc, ok := c.(*Task) if !ok { return nil, fmt.Errorf("task cannot be cast as *linux.Task") } if err := r.monitor.Stop(lc); err != nil { return nil, err } bundle := loadBundle( lc.id, filepath.Join(r.state, namespace, lc.id), filepath.Join(r.root, namespace, lc.id), ) rsp, err := lc.shim.Delete(ctx, empty) if err != nil { if cerr := r.cleanupAfterDeadShim(ctx, bundle, namespace, c.ID(), lc.pid, nil); cerr != nil { log.G(ctx).WithError(err).Error("unable to cleanup task") } return nil, errdefs.FromGRPC(err) } r.tasks.Delete(ctx, lc) if err := lc.shim.KillShim(ctx); err != nil { log.G(ctx).WithError(err).Error("failed to kill shim") } if err := bundle.Delete(); err != nil { log.G(ctx).WithError(err).Error("failed to delete bundle") } r.events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{ ContainerID: lc.id, ExitStatus: rsp.ExitStatus, ExitedAt: rsp.ExitedAt, Pid: rsp.Pid, }) return &runtime.Exit{ Status: rsp.ExitStatus, Timestamp: rsp.ExitedAt, Pid: rsp.Pid, }, nil } // Tasks returns all tasks known to the runtime func (r *Runtime) Tasks(ctx context.Context) ([]runtime.Task, error) { return r.tasks.GetAll(ctx) } func (r *Runtime) restoreTasks(ctx context.Context) ([]*Task, error) { dir, err := ioutil.ReadDir(r.state) if err != nil { return nil, err } var o []*Task for _, namespace := range dir { if !namespace.IsDir() { continue } name := namespace.Name() log.G(ctx).WithField("namespace", name).Debug("loading tasks in namespace") tasks, err := r.loadTasks(ctx, name) if err != nil { return nil, err } o = append(o, tasks...) } return o, nil } // Get a specific task by task id func (r *Runtime) Get(ctx context.Context, id string) (runtime.Task, error) { return r.tasks.Get(ctx, id) } func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { dir, err := ioutil.ReadDir(filepath.Join(r.state, ns)) if err != nil { return nil, err } var o []*Task for _, path := range dir { if !path.IsDir() { continue } id := path.Name() bundle := loadBundle( id, filepath.Join(r.state, ns, id), filepath.Join(r.root, ns, id), ) ctx = namespaces.WithNamespace(ctx, ns) pid, _ := runc.ReadPidFile(filepath.Join(bundle.path, proc.InitPidFile)) s, err := bundle.NewShimClient(ctx, ns, ShimConnect(), nil) if err != nil { log.G(ctx).WithError(err).WithFields(logrus.Fields{ "id": id, "namespace": ns, }).Error("connecting to shim") err := r.cleanupAfterDeadShim(ctx, bundle, ns, id, pid, nil) if err != nil { log.G(ctx).WithError(err).WithField("bundle", bundle.path). Error("cleaning up after dead shim") } continue } ropts, err := r.getRuncOptions(ctx, id) if err != nil { log.G(ctx).WithError(err).WithField("id", id). Error("get runtime options") continue } t, err := newTask(id, ns, pid, s, r.monitor, r.events, proc.NewRunc(ropts.RuntimeRoot, bundle.path, ns, ropts.Runtime, ropts.CriuPath, ropts.SystemdCgroup)) if err != nil { log.G(ctx).WithError(err).Error("loading task type") continue } o = append(o, t) } return o, nil } func (r *Runtime) cleanupAfterDeadShim(ctx context.Context, bundle *bundle, ns, id string, pid int, ec chan runc.Exit) error { ctx = namespaces.WithNamespace(ctx, ns) if err := r.terminate(ctx, bundle, ns, id); err != nil { if r.config.ShimDebug { return errors.Wrap(err, "failed to terminate task, leaving bundle for debugging") } log.G(ctx).WithError(err).Warn("failed to terminate task") } if ec != nil { // if sub-reaper is set, reap our new child if v, err := sys.GetSubreaper(); err == nil && v == 1 { for e := range ec { if e.Pid == pid { break } } } } // Notify Client exitedAt := time.Now().UTC() r.events.Publish(ctx, runtime.TaskExitEventTopic, &eventstypes.TaskExit{ ContainerID: id, ID: id, Pid: uint32(pid), ExitStatus: 128 + uint32(unix.SIGKILL), ExitedAt: exitedAt, }) if err := bundle.Delete(); err != nil { log.G(ctx).WithError(err).Error("delete bundle") } r.events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{ ContainerID: id, Pid: uint32(pid), ExitStatus: 128 + uint32(unix.SIGKILL), ExitedAt: exitedAt, }) return nil } func (r *Runtime) terminate(ctx context.Context, bundle *bundle, ns, id string) error { ctx = namespaces.WithNamespace(ctx, ns) rt, err := r.getRuntime(ctx, ns, id) if err != nil { return err } if err := rt.Delete(ctx, id, &runc.DeleteOpts{ Force: true, }); err != nil { log.G(ctx).WithError(err).Warnf("delete runtime state %s", id) } if err := mount.Unmount(filepath.Join(bundle.path, "rootfs"), 0); err != nil { log.G(ctx).WithError(err).WithFields(logrus.Fields{ "path": bundle.path, "id": id, }).Warnf("unmount task rootfs") } return nil } func (r *Runtime) getRuntime(ctx context.Context, ns, id string) (*runc.Runc, error) { ropts, err := r.getRuncOptions(ctx, id) if err != nil { return nil, err } var ( cmd = r.config.Runtime root = proc.RuncRoot ) if ropts != nil { if ropts.Runtime != "" { cmd = ropts.Runtime } if ropts.RuntimeRoot != "" { root = ropts.RuntimeRoot } } return &runc.Runc{ Command: cmd, LogFormat: runc.JSON, PdeathSignal: unix.SIGKILL, Root: filepath.Join(root, ns), }, nil } func (r *Runtime) getRuncOptions(ctx context.Context, id string) (*runctypes.RuncOptions, error) { var container containers.Container if err := r.db.View(func(tx *bolt.Tx) error { store := metadata.NewContainerStore(tx) var err error container, err = store.Get(ctx, id) return err }); err != nil { return nil, err } if container.Runtime.Options != nil { v, err := typeurl.UnmarshalAny(container.Runtime.Options) if err != nil { return nil, err } ropts, ok := v.(*runctypes.RuncOptions) if !ok { return nil, errors.New("invalid runtime options format") } return ropts, nil } return &runctypes.RuncOptions{}, nil }