package remote // import "github.com/docker/docker/libcontainerd/remote"

import (
	"context"
	"encoding/json"
	"io"
	"os"
	"path/filepath"
	"reflect"
	"runtime"
	"strings"
	"sync"
	"syscall"
	"time"

	"github.com/Microsoft/hcsshim/cmd/containerd-shim-runhcs-v1/options"
	"github.com/containerd/containerd"
	apievents "github.com/containerd/containerd/api/events"
	"github.com/containerd/containerd/api/types"
	"github.com/containerd/containerd/archive"
	"github.com/containerd/containerd/cio"
	"github.com/containerd/containerd/content"
	containerderrors "github.com/containerd/containerd/errdefs"
	"github.com/containerd/containerd/events"
	"github.com/containerd/containerd/images"
	"github.com/containerd/containerd/runtime/linux/runctypes"
	"github.com/containerd/typeurl"
	"github.com/docker/docker/errdefs"
	"github.com/docker/docker/libcontainerd/queue"
	libcontainerdtypes "github.com/docker/docker/libcontainerd/types"

	"github.com/docker/docker/pkg/ioutils"
	v1 "github.com/opencontainers/image-spec/specs-go/v1"
	specs "github.com/opencontainers/runtime-spec/specs-go"
	"github.com/pkg/errors"
	"github.com/sirupsen/logrus"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

// DockerContainerBundlePath is the label key pointing to the container's bundle path
const DockerContainerBundlePath = "com.docker/engine.bundle.path"

type client struct {
	client   *containerd.Client
	stateDir string
	logger   *logrus.Entry
	ns       string

	backend libcontainerdtypes.Backend
	eventQ  queue.Queue
	oomMu   sync.Mutex
	oom     map[string]bool
}

// NewClient creates a new libcontainerd client from a containerd client
func NewClient(ctx context.Context, cli *containerd.Client, stateDir, ns string, b libcontainerdtypes.Backend) (libcontainerdtypes.Client, error) {
	c := &client{
		client:   cli,
		stateDir: stateDir,
		logger:   logrus.WithField("module", "libcontainerd").WithField("namespace", ns),
		ns:       ns,
		backend:  b,
		oom:      make(map[string]bool),
	}

	go c.processEventStream(ctx, ns)

	return c, nil
}

func (c *client) Version(ctx context.Context) (containerd.Version, error) {
	return c.client.Version(ctx)
}

// Restore loads the containerd container.
// It should not be called concurrently with any other operation for the given ID.
func (c *client) Restore(ctx context.Context, id string, attachStdio libcontainerdtypes.StdioCallback) (alive bool, pid int, p libcontainerdtypes.Process, err error) {
	var dio *cio.DirectIO
	defer func() {
		if err != nil && dio != nil {
			dio.Cancel()
			dio.Close()
		}
		err = wrapError(err)
	}()

	ctr, err := c.client.LoadContainer(ctx, id)
	if err != nil {
		return false, -1, nil, errors.WithStack(wrapError(err))
	}

	attachIO := func(fifos *cio.FIFOSet) (cio.IO, error) {
		// dio must be assigned to the previously defined dio for the defer above
		// to handle cleanup
		dio, err = c.newDirectIO(ctx, fifos)
		if err != nil {
			return nil, err
		}
		return attachStdio(dio)
	}
	t, err := ctr.Task(ctx, attachIO)
	if err != nil && !containerderrors.IsNotFound(err) {
		return false, -1, nil, errors.Wrap(wrapError(err), "error getting containerd task for container")
	}

	if t != nil {
		s, err := t.Status(ctx)
		if err != nil {
			return false, -1, nil, errors.Wrap(wrapError(err), "error getting task status")
		}
		alive = s.Status != containerd.Stopped
		pid = int(t.Pid())
	}

	c.logger.WithFields(logrus.Fields{
		"container": id,
		"alive":     alive,
		"pid":       pid,
	}).Debug("restored container")

	return alive, pid, &restoredProcess{
		p: t,
	}, nil
}

func (c *client) Create(ctx context.Context, id string, ociSpec *specs.Spec, runtimeOptions interface{}) error {
	bdir := c.bundleDir(id)
	c.logger.WithField("bundle", bdir).WithField("root", ociSpec.Root.Path).Debug("bundle dir created")

	_, err := c.client.NewContainer(ctx, id,
		containerd.WithSpec(ociSpec),
		containerd.WithRuntime(runtimeName, runtimeOptions),
		WithBundle(bdir, ociSpec),
	)
	if err != nil {
		if containerderrors.IsAlreadyExists(err) {
			return errors.WithStack(errdefs.Conflict(errors.New("id already in use")))
		}
		return wrapError(err)
	}
	return nil
}

// Start create and start a task for the specified containerd id
func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (int, error) {
	ctr, err := c.getContainer(ctx, id)
	if err != nil {
		return -1, err
	}
	var (
		cp             *types.Descriptor
		t              containerd.Task
		rio            cio.IO
		stdinCloseSync = make(chan struct{})
	)

	if checkpointDir != "" {
		// write checkpoint to the content store
		tar := archive.Diff(ctx, "", checkpointDir)
		cp, err = c.writeContent(ctx, images.MediaTypeContainerd1Checkpoint, checkpointDir, tar)
		// remove the checkpoint when we're done
		defer func() {
			if cp != nil {
				err := c.client.ContentStore().Delete(context.Background(), cp.Digest)
				if err != nil {
					c.logger.WithError(err).WithFields(logrus.Fields{
						"ref":    checkpointDir,
						"digest": cp.Digest,
					}).Warnf("failed to delete temporary checkpoint entry")
				}
			}
		}()
		if err := tar.Close(); err != nil {
			return -1, errors.Wrap(err, "failed to close checkpoint tar stream")
		}
		if err != nil {
			return -1, errors.Wrapf(err, "failed to upload checkpoint to containerd")
		}
	}

	spec, err := ctr.Spec(ctx)
	if err != nil {
		return -1, errors.Wrap(err, "failed to retrieve spec")
	}
	labels, err := ctr.Labels(ctx)
	if err != nil {
		return -1, errors.Wrap(err, "failed to retreive labels")
	}
	bundle := labels[DockerContainerBundlePath]
	uid, gid := getSpecUser(spec)
	t, err = ctr.NewTask(ctx,
		func(id string) (cio.IO, error) {
			fifos := newFIFOSet(bundle, libcontainerdtypes.InitProcessName, withStdin, spec.Process.Terminal)

			rio, err = c.createIO(fifos, id, libcontainerdtypes.InitProcessName, stdinCloseSync, attachStdio)
			return rio, err
		},
		func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error {
			info.Checkpoint = cp
			if runtime.GOOS != "windows" {
				info.Options = &runctypes.CreateOptions{
					IoUid:       uint32(uid),
					IoGid:       uint32(gid),
					NoPivotRoot: os.Getenv("DOCKER_RAMDISK") != "",
				}
			} else {
				// Make sure we set the runhcs options to debug if we are at debug level.
				if c.logger.Level == logrus.DebugLevel {
					info.Options = &options.Options{Debug: true}
				}
			}
			return nil
		})
	if err != nil {
		close(stdinCloseSync)
		if rio != nil {
			rio.Cancel()
			rio.Close()
		}
		return -1, wrapError(err)
	}

	// Signal c.createIO that it can call CloseIO
	close(stdinCloseSync)

	if err := t.Start(ctx); err != nil {
		if _, err := t.Delete(ctx); err != nil {
			c.logger.WithError(err).WithField("container", id).
				Error("failed to delete task after fail start")
		}
		return -1, wrapError(err)
	}

	return int(t.Pid()), nil
}

// Exec creates exec process.
//
// The containerd client calls Exec to register the exec config in the shim side.
// When the client calls Start, the shim will create stdin fifo if needs. But
// for the container main process, the stdin fifo will be created in Create not
// the Start call. stdinCloseSync channel should be closed after Start exec
// process.
func (c *client) Exec(ctx context.Context, containerID, processID string, spec *specs.Process, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (int, error) {
	ctr, err := c.getContainer(ctx, containerID)
	if err != nil {
		return -1, err
	}
	t, err := ctr.Task(ctx, nil)
	if err != nil {
		if containerderrors.IsNotFound(err) {
			return -1, errors.WithStack(errdefs.InvalidParameter(errors.New("container is not running")))
		}
		return -1, wrapError(err)
	}

	var (
		p              containerd.Process
		rio            cio.IO
		stdinCloseSync = make(chan struct{})
	)

	labels, err := ctr.Labels(ctx)
	if err != nil {
		return -1, wrapError(err)
	}

	fifos := newFIFOSet(labels[DockerContainerBundlePath], processID, withStdin, spec.Terminal)

	defer func() {
		if err != nil {
			if rio != nil {
				rio.Cancel()
				rio.Close()
			}
		}
	}()

	p, err = t.Exec(ctx, processID, spec, func(id string) (cio.IO, error) {
		rio, err = c.createIO(fifos, containerID, processID, stdinCloseSync, attachStdio)
		return rio, err
	})
	if err != nil {
		close(stdinCloseSync)
		if containerderrors.IsAlreadyExists(err) {
			return -1, errors.WithStack(errdefs.Conflict(errors.New("id already in use")))
		}
		return -1, wrapError(err)
	}

	// Signal c.createIO that it can call CloseIO
	//
	// the stdin of exec process will be created after p.Start in containerd
	defer close(stdinCloseSync)

	if err = p.Start(ctx); err != nil {
		// use new context for cleanup because old one may be cancelled by user, but leave a timeout to make sure
		// we are not waiting forever if containerd is unresponsive or to work around fifo cancelling issues in
		// older containerd-shim
		ctx, cancel := context.WithTimeout(context.Background(), 45*time.Second)
		defer cancel()
		p.Delete(ctx)
		return -1, wrapError(err)
	}
	return int(p.Pid()), nil
}

func (c *client) SignalProcess(ctx context.Context, containerID, processID string, signal int) error {
	p, err := c.getProcess(ctx, containerID, processID)
	if err != nil {
		return err
	}
	return wrapError(p.Kill(ctx, syscall.Signal(signal)))
}

func (c *client) ResizeTerminal(ctx context.Context, containerID, processID string, width, height int) error {
	p, err := c.getProcess(ctx, containerID, processID)
	if err != nil {
		return err
	}

	return p.Resize(ctx, uint32(width), uint32(height))
}

func (c *client) CloseStdin(ctx context.Context, containerID, processID string) error {
	p, err := c.getProcess(ctx, containerID, processID)
	if err != nil {
		return err
	}

	return p.CloseIO(ctx, containerd.WithStdinCloser)
}

func (c *client) Pause(ctx context.Context, containerID string) error {
	p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
	if err != nil {
		return err
	}

	return wrapError(p.(containerd.Task).Pause(ctx))
}

func (c *client) Resume(ctx context.Context, containerID string) error {
	p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
	if err != nil {
		return err
	}

	return p.(containerd.Task).Resume(ctx)
}

func (c *client) Stats(ctx context.Context, containerID string) (*libcontainerdtypes.Stats, error) {
	p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
	if err != nil {
		return nil, err
	}

	m, err := p.(containerd.Task).Metrics(ctx)
	if err != nil {
		return nil, err
	}

	v, err := typeurl.UnmarshalAny(m.Data)
	if err != nil {
		return nil, err
	}
	return libcontainerdtypes.InterfaceToStats(m.Timestamp, v), nil
}

func (c *client) ListPids(ctx context.Context, containerID string) ([]uint32, error) {
	p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
	if err != nil {
		return nil, err
	}

	pis, err := p.(containerd.Task).Pids(ctx)
	if err != nil {
		return nil, err
	}

	var pids []uint32
	for _, i := range pis {
		pids = append(pids, i.Pid)
	}

	return pids, nil
}

func (c *client) Summary(ctx context.Context, containerID string) ([]libcontainerdtypes.Summary, error) {
	p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
	if err != nil {
		return nil, err
	}

	pis, err := p.(containerd.Task).Pids(ctx)
	if err != nil {
		return nil, err
	}

	var infos []libcontainerdtypes.Summary
	for _, pi := range pis {
		i, err := typeurl.UnmarshalAny(pi.Info)
		if err != nil {
			return nil, errors.Wrap(err, "unable to decode process details")
		}
		s, err := summaryFromInterface(i)
		if err != nil {
			return nil, err
		}
		infos = append(infos, *s)
	}

	return infos, nil
}

type restoredProcess struct {
	p containerd.Process
}

func (p *restoredProcess) Delete(ctx context.Context) (uint32, time.Time, error) {
	if p.p == nil {
		return 255, time.Now(), nil
	}
	status, err := p.p.Delete(ctx)
	if err != nil {
		return 255, time.Now(), nil
	}
	return status.ExitCode(), status.ExitTime(), nil
}

func (c *client) DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error) {
	p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
	if err != nil {
		return 255, time.Now(), nil
	}

	status, err := p.Delete(ctx)
	if err != nil {
		return 255, time.Now(), nil
	}
	return status.ExitCode(), status.ExitTime(), nil
}

func (c *client) Delete(ctx context.Context, containerID string) error {
	ctr, err := c.getContainer(ctx, containerID)
	if err != nil {
		return err
	}
	labels, err := ctr.Labels(ctx)
	if err != nil {
		return err
	}
	bundle := labels[DockerContainerBundlePath]
	if err := ctr.Delete(ctx); err != nil {
		return wrapError(err)
	}
	c.oomMu.Lock()
	delete(c.oom, containerID)
	c.oomMu.Unlock()
	if os.Getenv("LIBCONTAINERD_NOCLEAN") != "1" {
		if err := os.RemoveAll(bundle); err != nil {
			c.logger.WithError(err).WithFields(logrus.Fields{
				"container": containerID,
				"bundle":    bundle,
			}).Error("failed to remove state dir")
		}
	}
	return nil
}

func (c *client) Status(ctx context.Context, containerID string) (containerd.ProcessStatus, error) {
	t, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
	if err != nil {
		return containerd.Unknown, err
	}
	s, err := t.Status(ctx)
	if err != nil {
		return containerd.Unknown, wrapError(err)
	}
	return s.Status, nil
}

func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDir string, exit bool) error {
	p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
	if err != nil {
		return err
	}

	opts := []containerd.CheckpointTaskOpts{}
	if exit {
		opts = append(opts, func(r *containerd.CheckpointTaskInfo) error {
			if r.Options == nil {
				r.Options = &runctypes.CheckpointOptions{
					Exit: true,
				}
			} else {
				opts, _ := r.Options.(*runctypes.CheckpointOptions)
				opts.Exit = true
			}
			return nil
		})
	}
	img, err := p.(containerd.Task).Checkpoint(ctx, opts...)
	if err != nil {
		return wrapError(err)
	}
	// Whatever happens, delete the checkpoint from containerd
	defer func() {
		err := c.client.ImageService().Delete(context.Background(), img.Name())
		if err != nil {
			c.logger.WithError(err).WithField("digest", img.Target().Digest).
				Warnf("failed to delete checkpoint image")
		}
	}()

	b, err := content.ReadBlob(ctx, c.client.ContentStore(), img.Target())
	if err != nil {
		return errdefs.System(errors.Wrapf(err, "failed to retrieve checkpoint data"))
	}
	var index v1.Index
	if err := json.Unmarshal(b, &index); err != nil {
		return errdefs.System(errors.Wrapf(err, "failed to decode checkpoint data"))
	}

	var cpDesc *v1.Descriptor
	for _, m := range index.Manifests {
		if m.MediaType == images.MediaTypeContainerd1Checkpoint {
			cpDesc = &m
			break
		}
	}
	if cpDesc == nil {
		return errdefs.System(errors.Wrapf(err, "invalid checkpoint"))
	}

	rat, err := c.client.ContentStore().ReaderAt(ctx, *cpDesc)
	if err != nil {
		return errdefs.System(errors.Wrapf(err, "failed to get checkpoint reader"))
	}
	defer rat.Close()
	_, err = archive.Apply(ctx, checkpointDir, content.NewReader(rat))
	if err != nil {
		return errdefs.System(errors.Wrapf(err, "failed to read checkpoint reader"))
	}

	return err
}

func (c *client) getContainer(ctx context.Context, id string) (containerd.Container, error) {
	ctr, err := c.client.LoadContainer(ctx, id)
	if err != nil {
		if containerderrors.IsNotFound(err) {
			return nil, errors.WithStack(errdefs.NotFound(errors.New("no such container")))
		}
		return nil, wrapError(err)
	}
	return ctr, nil
}

func (c *client) getProcess(ctx context.Context, containerID, processID string) (containerd.Process, error) {
	ctr, err := c.getContainer(ctx, containerID)
	if err != nil {
		return nil, err
	}
	t, err := ctr.Task(ctx, nil)
	if err != nil {
		if containerderrors.IsNotFound(err) {
			return nil, errors.WithStack(errdefs.NotFound(errors.New("container is not running")))
		}
		return nil, wrapError(err)
	}
	if processID == libcontainerdtypes.InitProcessName {
		return t, nil
	}
	p, err := t.LoadProcess(ctx, processID, nil)
	if err != nil {
		if containerderrors.IsNotFound(err) {
			return nil, errors.WithStack(errdefs.NotFound(errors.New("no such exec")))
		}
		return nil, wrapError(err)
	}
	return p, nil
}

// createIO creates the io to be used by a process
// This needs to get a pointer to interface as upon closure the process may not have yet been registered
func (c *client) createIO(fifos *cio.FIFOSet, containerID, processID string, stdinCloseSync chan struct{}, attachStdio libcontainerdtypes.StdioCallback) (cio.IO, error) {
	var (
		io  *cio.DirectIO
		err error
	)
	io, err = c.newDirectIO(context.Background(), fifos)
	if err != nil {
		return nil, err
	}

	if io.Stdin != nil {
		var (
			err       error
			stdinOnce sync.Once
		)
		pipe := io.Stdin
		io.Stdin = ioutils.NewWriteCloserWrapper(pipe, func() error {
			stdinOnce.Do(func() {
				err = pipe.Close()
				// Do the rest in a new routine to avoid a deadlock if the
				// Exec/Start call failed.
				go func() {
					<-stdinCloseSync
					p, err := c.getProcess(context.Background(), containerID, processID)
					if err == nil {
						err = p.CloseIO(context.Background(), containerd.WithStdinCloser)
						if err != nil && strings.Contains(err.Error(), "transport is closing") {
							err = nil
						}
					}
				}()
			})
			return err
		})
	}

	rio, err := attachStdio(io)
	if err != nil {
		io.Cancel()
		io.Close()
	}
	return rio, err
}

func (c *client) processEvent(ctx context.Context, et libcontainerdtypes.EventType, ei libcontainerdtypes.EventInfo) {
	c.eventQ.Append(ei.ContainerID, func() {
		err := c.backend.ProcessEvent(ei.ContainerID, et, ei)
		if err != nil {
			c.logger.WithError(err).WithFields(logrus.Fields{
				"container":  ei.ContainerID,
				"event":      et,
				"event-info": ei,
			}).Error("failed to process event")
		}

		if et == libcontainerdtypes.EventExit && ei.ProcessID != ei.ContainerID {
			p, err := c.getProcess(ctx, ei.ContainerID, ei.ProcessID)
			if err != nil {

				c.logger.WithError(errors.New("no such process")).
					WithFields(logrus.Fields{
						"error":     err,
						"container": ei.ContainerID,
						"process":   ei.ProcessID,
					}).Error("exit event")
				return
			}

			ctr, err := c.getContainer(ctx, ei.ContainerID)
			if err != nil {
				c.logger.WithFields(logrus.Fields{
					"container": ei.ContainerID,
					"error":     err,
				}).Error("failed to find container")
			} else {
				labels, err := ctr.Labels(ctx)
				if err != nil {
					c.logger.WithFields(logrus.Fields{
						"container": ei.ContainerID,
						"error":     err,
					}).Error("failed to get container labels")
					return
				}
				newFIFOSet(labels[DockerContainerBundlePath], ei.ProcessID, true, false).Close()
			}
			_, err = p.Delete(context.Background())
			if err != nil {
				c.logger.WithError(err).WithFields(logrus.Fields{
					"container": ei.ContainerID,
					"process":   ei.ProcessID,
				}).Warn("failed to delete process")
			}
		}
	})
}

func (c *client) processEventStream(ctx context.Context, ns string) {
	var (
		err error
		ev  *events.Envelope
		et  libcontainerdtypes.EventType
		ei  libcontainerdtypes.EventInfo
	)

	// Filter on both namespace *and* topic. To create an "and" filter,
	// this must be a single, comma-separated string
	eventStream, errC := c.client.EventService().Subscribe(ctx, "namespace=="+ns+",topic~=|^/tasks/|")

	c.logger.Debug("processing event stream")

	for {
		var oomKilled bool
		select {
		case err = <-errC:
			if err != nil {
				errStatus, ok := status.FromError(err)
				if !ok || errStatus.Code() != codes.Canceled {
					c.logger.WithError(err).Error("failed to get event")

					// rate limit
					select {
					case <-time.After(time.Second):
						go c.processEventStream(ctx, ns)
						return
					case <-ctx.Done():
					}
				}
				c.logger.WithError(ctx.Err()).Info("stopping event stream following graceful shutdown")
			}
			return
		case ev = <-eventStream:
			if ev.Event == nil {
				c.logger.WithField("event", ev).Warn("invalid event")
				continue
			}

			v, err := typeurl.UnmarshalAny(ev.Event)
			if err != nil {
				c.logger.WithError(err).WithField("event", ev).Warn("failed to unmarshal event")
				continue
			}

			c.logger.WithField("topic", ev.Topic).Debug("event")

			switch t := v.(type) {
			case *apievents.TaskCreate:
				et = libcontainerdtypes.EventCreate
				ei = libcontainerdtypes.EventInfo{
					ContainerID: t.ContainerID,
					ProcessID:   t.ContainerID,
					Pid:         t.Pid,
				}
			case *apievents.TaskStart:
				et = libcontainerdtypes.EventStart
				ei = libcontainerdtypes.EventInfo{
					ContainerID: t.ContainerID,
					ProcessID:   t.ContainerID,
					Pid:         t.Pid,
				}
			case *apievents.TaskExit:
				et = libcontainerdtypes.EventExit
				ei = libcontainerdtypes.EventInfo{
					ContainerID: t.ContainerID,
					ProcessID:   t.ID,
					Pid:         t.Pid,
					ExitCode:    t.ExitStatus,
					ExitedAt:    t.ExitedAt,
				}
			case *apievents.TaskOOM:
				et = libcontainerdtypes.EventOOM
				ei = libcontainerdtypes.EventInfo{
					ContainerID: t.ContainerID,
					OOMKilled:   true,
				}
				oomKilled = true
			case *apievents.TaskExecAdded:
				et = libcontainerdtypes.EventExecAdded
				ei = libcontainerdtypes.EventInfo{
					ContainerID: t.ContainerID,
					ProcessID:   t.ExecID,
				}
			case *apievents.TaskExecStarted:
				et = libcontainerdtypes.EventExecStarted
				ei = libcontainerdtypes.EventInfo{
					ContainerID: t.ContainerID,
					ProcessID:   t.ExecID,
					Pid:         t.Pid,
				}
			case *apievents.TaskPaused:
				et = libcontainerdtypes.EventPaused
				ei = libcontainerdtypes.EventInfo{
					ContainerID: t.ContainerID,
				}
			case *apievents.TaskResumed:
				et = libcontainerdtypes.EventResumed
				ei = libcontainerdtypes.EventInfo{
					ContainerID: t.ContainerID,
				}
			default:
				c.logger.WithFields(logrus.Fields{
					"topic": ev.Topic,
					"type":  reflect.TypeOf(t)},
				).Info("ignoring event")
				continue
			}

			c.oomMu.Lock()
			if oomKilled {
				c.oom[ei.ContainerID] = true
			}
			ei.OOMKilled = c.oom[ei.ContainerID]
			c.oomMu.Unlock()

			c.processEvent(ctx, et, ei)
		}
	}
}

func (c *client) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) {
	writer, err := c.client.ContentStore().Writer(ctx, content.WithRef(ref))
	if err != nil {
		return nil, err
	}
	defer writer.Close()
	size, err := io.Copy(writer, r)
	if err != nil {
		return nil, err
	}
	labels := map[string]string{
		"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
	}
	if err := writer.Commit(ctx, 0, "", content.WithLabels(labels)); err != nil {
		return nil, err
	}
	return &types.Descriptor{
		MediaType: mediaType,
		Digest:    writer.Digest(),
		Size_:     size,
	}, nil
}

func (c *client) bundleDir(id string) string {
	return filepath.Join(c.stateDir, id)
}

func wrapError(err error) error {
	switch {
	case err == nil:
		return nil
	case containerderrors.IsNotFound(err):
		return errdefs.NotFound(err)
	}

	msg := err.Error()
	for _, s := range []string{"container does not exist", "not found", "no such container"} {
		if strings.Contains(msg, s) {
			return errdefs.NotFound(err)
		}
	}
	return err
}