Browse code

Move libcontainerd/remote to daemon/internal/libcontainerd/remote

Signed-off-by: Derek McGowan <derek@mcg.dev>

Derek McGowan authored on 2025/06/28 06:28:08
Showing 12 changed files
... ...
@@ -29,11 +29,11 @@ import (
29 29
 	"github.com/docker/docker/daemon/config"
30 30
 	"github.com/docker/docker/daemon/container"
31 31
 	"github.com/docker/docker/daemon/initlayer"
32
+	"github.com/docker/docker/daemon/internal/libcontainerd/remote"
32 33
 	"github.com/docker/docker/errdefs"
33 34
 	"github.com/docker/docker/internal/nlwrap"
34 35
 	"github.com/docker/docker/internal/otelutil"
35 36
 	"github.com/docker/docker/internal/usergroup"
36
-	"github.com/docker/docker/libcontainerd/remote"
37 37
 	"github.com/docker/docker/libnetwork"
38 38
 	nwconfig "github.com/docker/docker/libnetwork/config"
39 39
 	"github.com/docker/docker/libnetwork/drivers/bridge"
... ...
@@ -17,8 +17,8 @@ import (
17 17
 	"github.com/docker/docker/daemon/config"
18 18
 	"github.com/docker/docker/daemon/container"
19 19
 	"github.com/docker/docker/daemon/internal/libcontainerd/local"
20
+	"github.com/docker/docker/daemon/internal/libcontainerd/remote"
20 21
 	"github.com/docker/docker/daemon/network"
21
-	"github.com/docker/docker/libcontainerd/remote"
22 22
 	"github.com/docker/docker/libnetwork"
23 23
 	nwconfig "github.com/docker/docker/libnetwork/config"
24 24
 	winlibnetwork "github.com/docker/docker/libnetwork/drivers/windows"
25 25
new file mode 100644
... ...
@@ -0,0 +1,755 @@
0
+package remote
1
+
2
+import (
3
+	"context"
4
+	"encoding/json"
5
+	"io"
6
+	"os"
7
+	"path/filepath"
8
+	"reflect"
9
+	"runtime"
10
+	"strings"
11
+	"sync"
12
+	"syscall"
13
+	"time"
14
+
15
+	apievents "github.com/containerd/containerd/api/events"
16
+	"github.com/containerd/containerd/api/types"
17
+	runcoptions "github.com/containerd/containerd/api/types/runc/options"
18
+	containerd "github.com/containerd/containerd/v2/client"
19
+	"github.com/containerd/containerd/v2/core/content"
20
+	c8dimages "github.com/containerd/containerd/v2/core/images"
21
+	"github.com/containerd/containerd/v2/pkg/archive"
22
+	"github.com/containerd/containerd/v2/pkg/cio"
23
+	"github.com/containerd/containerd/v2/pkg/protobuf"
24
+	cerrdefs "github.com/containerd/errdefs"
25
+	"github.com/containerd/log"
26
+	"github.com/containerd/typeurl/v2"
27
+	"github.com/docker/docker/daemon/internal/libcontainerd/queue"
28
+	"github.com/docker/docker/errdefs"
29
+	libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
30
+	"github.com/docker/docker/pkg/ioutils"
31
+	"github.com/hashicorp/go-multierror"
32
+	"github.com/opencontainers/go-digest"
33
+	ocispec "github.com/opencontainers/image-spec/specs-go/v1"
34
+	"github.com/opencontainers/runtime-spec/specs-go"
35
+	"github.com/pkg/errors"
36
+	"go.opentelemetry.io/otel"
37
+	"google.golang.org/grpc/codes"
38
+	"google.golang.org/grpc/status"
39
+	"google.golang.org/protobuf/proto"
40
+)
41
+
42
+// DockerContainerBundlePath is the label key pointing to the container's bundle path
43
+const DockerContainerBundlePath = "com.docker/engine.bundle.path"
44
+
45
+type client struct {
46
+	client   *containerd.Client
47
+	stateDir string
48
+	logger   *log.Entry
49
+	ns       string
50
+
51
+	backend libcontainerdtypes.Backend
52
+	eventQ  queue.Queue
53
+}
54
+
55
+type container struct {
56
+	client *client
57
+	c8dCtr containerd.Container
58
+
59
+	v2runcoptions *runcoptions.Options
60
+}
61
+
62
+type task struct {
63
+	containerd.Task
64
+	ctr *container
65
+}
66
+
67
+type process struct {
68
+	containerd.Process
69
+}
70
+
71
+// NewClient creates a new libcontainerd client from a containerd client
72
+func NewClient(ctx context.Context, cli *containerd.Client, stateDir, ns string, b libcontainerdtypes.Backend) (libcontainerdtypes.Client, error) {
73
+	c := &client{
74
+		client:   cli,
75
+		stateDir: stateDir,
76
+		logger:   log.G(ctx).WithField("module", "libcontainerd").WithField("namespace", ns),
77
+		ns:       ns,
78
+		backend:  b,
79
+	}
80
+
81
+	go c.processEventStream(ctx, ns)
82
+
83
+	return c, nil
84
+}
85
+
86
+func (c *client) Version(ctx context.Context) (containerd.Version, error) {
87
+	return c.client.Version(ctx)
88
+}
89
+
90
+func (c *container) newTask(t containerd.Task) *task {
91
+	return &task{Task: t, ctr: c}
92
+}
93
+
94
+func (c *container) AttachTask(ctx context.Context, attachStdio libcontainerdtypes.StdioCallback) (_ libcontainerdtypes.Task, retErr error) {
95
+	var dio *cio.DirectIO
96
+	defer func() {
97
+		if retErr != nil && dio != nil {
98
+			dio.Cancel()
99
+			_ = dio.Close()
100
+		}
101
+	}()
102
+
103
+	attachIO := func(fifos *cio.FIFOSet) (cio.IO, error) {
104
+		// dio must be assigned to the previously defined dio for the defer above
105
+		// to handle cleanup
106
+		var err error
107
+		dio, err = c.client.newDirectIO(ctx, fifos)
108
+		if err != nil {
109
+			return nil, err
110
+		}
111
+		return attachStdio(dio)
112
+	}
113
+	t, err := c.c8dCtr.Task(ctx, attachIO)
114
+	if err != nil {
115
+		return nil, errors.Wrap(wrapError(err), "error getting containerd task for container")
116
+	}
117
+	return c.newTask(t), nil
118
+}
119
+
120
+func (c *client) NewContainer(ctx context.Context, id string, ociSpec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) (libcontainerdtypes.Container, error) {
121
+	bdir := c.bundleDir(id)
122
+	c.logger.WithField("bundle", bdir).WithField("root", ociSpec.Root.Path).Debug("bundle dir created")
123
+
124
+	newOpts := []containerd.NewContainerOpts{
125
+		containerd.WithSpec(ociSpec),
126
+		containerd.WithRuntime(shim, runtimeOptions),
127
+		WithBundle(bdir, ociSpec),
128
+	}
129
+	opts = append(opts, newOpts...)
130
+
131
+	ctr, err := c.client.NewContainer(ctx, id, opts...)
132
+	if err != nil {
133
+		if cerrdefs.IsAlreadyExists(err) {
134
+			return nil, errors.WithStack(errdefs.Conflict(errors.New("id already in use")))
135
+		}
136
+		return nil, wrapError(err)
137
+	}
138
+
139
+	created := container{
140
+		client: c,
141
+		c8dCtr: ctr,
142
+	}
143
+	if x, ok := runtimeOptions.(*runcoptions.Options); ok {
144
+		created.v2runcoptions = x
145
+	}
146
+	return &created, nil
147
+}
148
+
149
+// NewTask creates a task for the specified containerd id
150
+func (c *container) NewTask(ctx context.Context, checkpointDir string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (libcontainerdtypes.Task, error) {
151
+	var (
152
+		checkpoint     *types.Descriptor
153
+		t              containerd.Task
154
+		rio            cio.IO
155
+		stdinCloseSync = make(chan containerd.Process, 1)
156
+	)
157
+
158
+	ctx, span := otel.Tracer("").Start(ctx, "libcontainerd.remote.NewTask")
159
+	defer span.End()
160
+
161
+	if checkpointDir != "" {
162
+		// write checkpoint to the content store
163
+		tar := archive.Diff(ctx, "", checkpointDir)
164
+		var err error
165
+		checkpoint, err = c.client.writeContent(ctx, c8dimages.MediaTypeContainerd1Checkpoint, checkpointDir, tar)
166
+		// remove the checkpoint when we're done
167
+		defer func() {
168
+			if checkpoint != nil {
169
+				err := c.client.client.ContentStore().Delete(ctx, digest.Digest(checkpoint.Digest))
170
+				if err != nil {
171
+					c.client.logger.WithError(err).WithFields(log.Fields{
172
+						"ref":    checkpointDir,
173
+						"digest": checkpoint.Digest,
174
+					}).Warnf("failed to delete temporary checkpoint entry")
175
+				}
176
+			}
177
+		}()
178
+		if err := tar.Close(); err != nil {
179
+			return nil, errors.Wrap(err, "failed to close checkpoint tar stream")
180
+		}
181
+		if err != nil {
182
+			return nil, errors.Wrapf(err, "failed to upload checkpoint to containerd")
183
+		}
184
+	}
185
+
186
+	// Optimization: assume the relevant metadata has not changed in the
187
+	// moment since the container was created. Elide redundant RPC requests
188
+	// to refresh the metadata separately for spec and labels.
189
+	md, err := c.c8dCtr.Info(ctx, containerd.WithoutRefreshedMetadata)
190
+	if err != nil {
191
+		return nil, errors.Wrap(err, "failed to retrieve metadata")
192
+	}
193
+	bundle := md.Labels[DockerContainerBundlePath]
194
+
195
+	var spec specs.Spec
196
+	if err := json.Unmarshal(md.Spec.GetValue(), &spec); err != nil {
197
+		return nil, errors.Wrap(err, "failed to retrieve spec")
198
+	}
199
+	uid, gid := getSpecUser(&spec)
200
+
201
+	taskOpts := []containerd.NewTaskOpts{
202
+		func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error {
203
+			info.Checkpoint = checkpoint
204
+			return nil
205
+		},
206
+	}
207
+
208
+	if runtime.GOOS != "windows" {
209
+		taskOpts = append(taskOpts, func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error {
210
+			if c.v2runcoptions != nil {
211
+				opts := proto.Clone(c.v2runcoptions).(*runcoptions.Options)
212
+				opts.IoUid = uint32(uid)
213
+				opts.IoGid = uint32(gid)
214
+				info.Options = opts
215
+			}
216
+			return nil
217
+		})
218
+	} else {
219
+		taskOpts = append(taskOpts, withLogLevel(c.client.logger.Level))
220
+	}
221
+
222
+	t, err = c.c8dCtr.NewTask(ctx,
223
+		func(id string) (cio.IO, error) {
224
+			fifos := newFIFOSet(bundle, id, withStdin, spec.Process.Terminal)
225
+
226
+			rio, err = c.createIO(fifos, stdinCloseSync, attachStdio)
227
+			return rio, err
228
+		},
229
+		taskOpts...,
230
+	)
231
+	if err != nil {
232
+		close(stdinCloseSync)
233
+		if rio != nil {
234
+			rio.Cancel()
235
+			rio.Close()
236
+		}
237
+		return nil, errors.Wrap(wrapError(err), "failed to create task for container")
238
+	}
239
+
240
+	// Signal c.createIO that it can call CloseIO
241
+	stdinCloseSync <- t
242
+
243
+	return c.newTask(t), nil
244
+}
245
+
246
+func (t *task) Start(ctx context.Context) error {
247
+	ctx, span := otel.Tracer("").Start(ctx, "libcontainerd.remote.task.Start")
248
+	defer span.End()
249
+	return wrapError(t.Task.Start(ctx))
250
+}
251
+
252
+// Exec creates exec process.
253
+//
254
+// The containerd client calls Exec to register the exec config in the shim side.
255
+// When the client calls Start, the shim will create stdin fifo if needs. But
256
+// for the container main process, the stdin fifo will be created in Create not
257
+// the Start call. stdinCloseSync channel should be closed after Start exec
258
+// process.
259
+func (t *task) Exec(ctx context.Context, processID string, spec *specs.Process, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (libcontainerdtypes.Process, error) {
260
+	var (
261
+		p              containerd.Process
262
+		rio            cio.IO
263
+		stdinCloseSync = make(chan containerd.Process, 1)
264
+	)
265
+
266
+	// Optimization: assume the DockerContainerBundlePath label has not been
267
+	// updated since the container metadata was last loaded/refreshed.
268
+	md, err := t.ctr.c8dCtr.Info(ctx, containerd.WithoutRefreshedMetadata)
269
+	if err != nil {
270
+		return nil, wrapError(err)
271
+	}
272
+
273
+	fifos := newFIFOSet(md.Labels[DockerContainerBundlePath], processID, withStdin, spec.Terminal)
274
+
275
+	defer func() {
276
+		if err != nil {
277
+			if rio != nil {
278
+				rio.Cancel()
279
+				rio.Close()
280
+			}
281
+		}
282
+	}()
283
+
284
+	p, err = t.Task.Exec(ctx, processID, spec, func(id string) (cio.IO, error) {
285
+		rio, err = t.ctr.createIO(fifos, stdinCloseSync, attachStdio)
286
+		return rio, err
287
+	})
288
+	if err != nil {
289
+		close(stdinCloseSync)
290
+		if cerrdefs.IsAlreadyExists(err) {
291
+			return nil, errors.WithStack(errdefs.Conflict(errors.New("id already in use")))
292
+		}
293
+		return nil, wrapError(err)
294
+	}
295
+
296
+	// Signal c.createIO that it can call CloseIO
297
+	//
298
+	// the stdin of exec process will be created after p.Start in containerd
299
+	defer func() { stdinCloseSync <- p }()
300
+
301
+	if err = p.Start(ctx); err != nil {
302
+		// use new context for cleanup because old one may be cancelled by user, but leave a timeout to make sure
303
+		// we are not waiting forever if containerd is unresponsive or to work around fifo cancelling issues in
304
+		// older containerd-shim
305
+		ctx, cancel := context.WithTimeout(context.Background(), 45*time.Second)
306
+		defer cancel()
307
+		p.Delete(ctx)
308
+		return nil, wrapError(err)
309
+	}
310
+	return process{p}, nil
311
+}
312
+
313
+func (t *task) Kill(ctx context.Context, signal syscall.Signal) error {
314
+	return wrapError(t.Task.Kill(ctx, signal))
315
+}
316
+
317
+func (p process) Kill(ctx context.Context, signal syscall.Signal) error {
318
+	return wrapError(p.Process.Kill(ctx, signal))
319
+}
320
+
321
+func (t *task) Pause(ctx context.Context) error {
322
+	return wrapError(t.Task.Pause(ctx))
323
+}
324
+
325
+func (t *task) Resume(ctx context.Context) error {
326
+	return wrapError(t.Task.Resume(ctx))
327
+}
328
+
329
+func (t *task) Stats(ctx context.Context) (*libcontainerdtypes.Stats, error) {
330
+	m, err := t.Metrics(ctx)
331
+	if err != nil {
332
+		return nil, err
333
+	}
334
+
335
+	v, err := typeurl.UnmarshalAny(m.Data)
336
+	if err != nil {
337
+		return nil, err
338
+	}
339
+	return libcontainerdtypes.InterfaceToStats(protobuf.FromTimestamp(m.Timestamp), v), nil
340
+}
341
+
342
+func (t *task) Summary(ctx context.Context) ([]libcontainerdtypes.Summary, error) {
343
+	pis, err := t.Pids(ctx)
344
+	if err != nil {
345
+		return nil, err
346
+	}
347
+
348
+	var infos []libcontainerdtypes.Summary
349
+	for _, pi := range pis {
350
+		i, err := typeurl.UnmarshalAny(pi.Info)
351
+		if err != nil {
352
+			return nil, errors.Wrap(err, "unable to decode process details")
353
+		}
354
+		s, err := summaryFromInterface(i)
355
+		if err != nil {
356
+			return nil, err
357
+		}
358
+		infos = append(infos, *s)
359
+	}
360
+
361
+	return infos, nil
362
+}
363
+
364
+func (t *task) Delete(ctx context.Context) (*containerd.ExitStatus, error) {
365
+	s, err := t.Task.Delete(ctx)
366
+	return s, wrapError(err)
367
+}
368
+
369
+func (p process) Delete(ctx context.Context) (*containerd.ExitStatus, error) {
370
+	s, err := p.Process.Delete(ctx)
371
+	return s, wrapError(err)
372
+}
373
+
374
+func (c *container) Delete(ctx context.Context) error {
375
+	// Optimization: assume the DockerContainerBundlePath label has not been
376
+	// updated since the container metadata was last loaded/refreshed.
377
+	md, err := c.c8dCtr.Info(ctx, containerd.WithoutRefreshedMetadata)
378
+	if err != nil {
379
+		return err
380
+	}
381
+	bundle := md.Labels[DockerContainerBundlePath]
382
+	if err := c.c8dCtr.Delete(ctx); err != nil {
383
+		return wrapError(err)
384
+	}
385
+	if os.Getenv("LIBCONTAINERD_NOCLEAN") != "1" {
386
+		if err := os.RemoveAll(bundle); err != nil {
387
+			c.client.logger.WithContext(ctx).WithError(err).WithFields(log.Fields{
388
+				"container": c.c8dCtr.ID(),
389
+				"bundle":    bundle,
390
+			}).Error("failed to remove state dir")
391
+		}
392
+	}
393
+	return nil
394
+}
395
+
396
+func (t *task) ForceDelete(ctx context.Context) error {
397
+	_, err := t.Task.Delete(ctx, containerd.WithProcessKill)
398
+	return wrapError(err)
399
+}
400
+
401
+func (t *task) Status(ctx context.Context) (containerd.Status, error) {
402
+	s, err := t.Task.Status(ctx)
403
+	return s, wrapError(err)
404
+}
405
+
406
+func (p process) Status(ctx context.Context) (containerd.Status, error) {
407
+	s, err := p.Process.Status(ctx)
408
+	return s, wrapError(err)
409
+}
410
+
411
+func (c *container) getCheckpointOptions(exit bool) containerd.CheckpointTaskOpts {
412
+	return func(r *containerd.CheckpointTaskInfo) error {
413
+		if r.Options == nil && c.v2runcoptions != nil {
414
+			r.Options = &runcoptions.CheckpointOptions{}
415
+		}
416
+
417
+		switch opts := r.Options.(type) {
418
+		case *runcoptions.CheckpointOptions:
419
+			opts.Exit = exit
420
+		}
421
+
422
+		return nil
423
+	}
424
+}
425
+
426
+func (t *task) CreateCheckpoint(ctx context.Context, checkpointDir string, exit bool) error {
427
+	img, err := t.Task.Checkpoint(ctx, t.ctr.getCheckpointOptions(exit))
428
+	if err != nil {
429
+		return wrapError(err)
430
+	}
431
+	// Whatever happens, delete the checkpoint from containerd
432
+	defer func() {
433
+		err := t.ctr.client.client.ImageService().Delete(ctx, img.Name())
434
+		if err != nil {
435
+			t.ctr.client.logger.WithError(err).WithField("digest", img.Target().Digest).
436
+				Warnf("failed to delete checkpoint image")
437
+		}
438
+	}()
439
+
440
+	b, err := content.ReadBlob(ctx, t.ctr.client.client.ContentStore(), img.Target())
441
+	if err != nil {
442
+		return errdefs.System(errors.Wrapf(err, "failed to retrieve checkpoint data"))
443
+	}
444
+	var index ocispec.Index
445
+	if err := json.Unmarshal(b, &index); err != nil {
446
+		return errdefs.System(errors.Wrapf(err, "failed to decode checkpoint data"))
447
+	}
448
+
449
+	var cpDesc *ocispec.Descriptor
450
+	for _, m := range index.Manifests {
451
+		if m.MediaType == c8dimages.MediaTypeContainerd1Checkpoint {
452
+			cpDesc = &m //nolint:gosec
453
+			break
454
+		}
455
+	}
456
+	if cpDesc == nil {
457
+		return errdefs.System(errors.Wrapf(err, "invalid checkpoint"))
458
+	}
459
+
460
+	rat, err := t.ctr.client.client.ContentStore().ReaderAt(ctx, *cpDesc)
461
+	if err != nil {
462
+		return errdefs.System(errors.Wrapf(err, "failed to get checkpoint reader"))
463
+	}
464
+	defer rat.Close()
465
+	_, err = archive.Apply(ctx, checkpointDir, content.NewReader(rat))
466
+	if err != nil {
467
+		return errdefs.System(errors.Wrapf(err, "failed to read checkpoint reader"))
468
+	}
469
+
470
+	return err
471
+}
472
+
473
+// LoadContainer loads the containerd container.
474
+func (c *client) LoadContainer(ctx context.Context, id string) (libcontainerdtypes.Container, error) {
475
+	ctr, err := c.client.LoadContainer(ctx, id)
476
+	if err != nil {
477
+		if cerrdefs.IsNotFound(err) {
478
+			return nil, errors.WithStack(errdefs.NotFound(errors.New("no such container")))
479
+		}
480
+		return nil, wrapError(err)
481
+	}
482
+	return &container{client: c, c8dCtr: ctr}, nil
483
+}
484
+
485
+func (c *container) Task(ctx context.Context) (libcontainerdtypes.Task, error) {
486
+	t, err := c.c8dCtr.Task(ctx, nil)
487
+	if err != nil {
488
+		return nil, wrapError(err)
489
+	}
490
+	return c.newTask(t), nil
491
+}
492
+
493
+// createIO creates the io to be used by a process
494
+// This needs to get a pointer to interface as upon closure the process may not have yet been registered
495
+func (c *container) createIO(fifos *cio.FIFOSet, stdinCloseSync chan containerd.Process, attachStdio libcontainerdtypes.StdioCallback) (cio.IO, error) {
496
+	var (
497
+		io  *cio.DirectIO
498
+		err error
499
+	)
500
+	io, err = c.client.newDirectIO(context.Background(), fifos)
501
+	if err != nil {
502
+		return nil, err
503
+	}
504
+
505
+	if io.Stdin != nil {
506
+		var (
507
+			closeErr  error
508
+			stdinOnce sync.Once
509
+		)
510
+		pipe := io.Stdin
511
+		io.Stdin = ioutils.NewWriteCloserWrapper(pipe, func() error {
512
+			stdinOnce.Do(func() {
513
+				closeErr = pipe.Close()
514
+
515
+				select {
516
+				case p, ok := <-stdinCloseSync:
517
+					if !ok {
518
+						return
519
+					}
520
+					if err := closeStdin(context.Background(), p); err != nil {
521
+						if closeErr != nil {
522
+							closeErr = multierror.Append(closeErr, err)
523
+						} else {
524
+							// Avoid wrapping a single error in a multierror.
525
+							closeErr = err
526
+						}
527
+					}
528
+				default:
529
+					// The process wasn't ready. Close its stdin asynchronously.
530
+					go func() {
531
+						p, ok := <-stdinCloseSync
532
+						if !ok {
533
+							return
534
+						}
535
+						if err := closeStdin(context.Background(), p); err != nil {
536
+							c.client.logger.WithError(err).
537
+								WithField("container", c.c8dCtr.ID()).
538
+								Error("failed to close container stdin")
539
+						}
540
+					}()
541
+				}
542
+			})
543
+			return closeErr
544
+		})
545
+	}
546
+
547
+	rio, err := attachStdio(io)
548
+	if err != nil {
549
+		io.Cancel()
550
+		io.Close()
551
+	}
552
+	return rio, err
553
+}
554
+
555
+func closeStdin(ctx context.Context, p containerd.Process) error {
556
+	err := p.CloseIO(ctx, containerd.WithStdinCloser)
557
+	if err != nil && strings.Contains(err.Error(), "transport is closing") {
558
+		err = nil
559
+	}
560
+	return err
561
+}
562
+
563
+func (c *client) processEvent(ctx context.Context, et libcontainerdtypes.EventType, ei libcontainerdtypes.EventInfo) {
564
+	c.eventQ.Append(ei.ContainerID, func() {
565
+		err := c.backend.ProcessEvent(ei.ContainerID, et, ei)
566
+		if err != nil {
567
+			c.logger.WithContext(ctx).WithError(err).WithFields(log.Fields{
568
+				"container":  ei.ContainerID,
569
+				"event":      et,
570
+				"event-info": ei,
571
+			}).Error("failed to process event")
572
+		}
573
+	})
574
+}
575
+
576
+func (c *client) waitServe(ctx context.Context) bool {
577
+	t := 100 * time.Millisecond
578
+	delay := time.NewTimer(t)
579
+	if !delay.Stop() {
580
+		<-delay.C
581
+	}
582
+	defer delay.Stop()
583
+
584
+	// `IsServing` will actually block until the service is ready.
585
+	// However it can return early, so we'll loop with a delay to handle it.
586
+	for {
587
+		serving, err := c.client.IsServing(ctx)
588
+		if err != nil {
589
+			if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
590
+				return false
591
+			}
592
+			log.G(ctx).WithError(err).Warn("Error while testing if containerd API is ready")
593
+		}
594
+
595
+		if serving {
596
+			return true
597
+		}
598
+
599
+		delay.Reset(t)
600
+		select {
601
+		case <-ctx.Done():
602
+			return false
603
+		case <-delay.C:
604
+		}
605
+	}
606
+}
607
+
608
+func (c *client) processEventStream(ctx context.Context, ns string) {
609
+	// Create a new context specifically for this subscription.
610
+	// The context must be cancelled to cancel the subscription.
611
+	// In cases where we have to restart event stream processing,
612
+	//   we'll need the original context b/c this one will be cancelled
613
+	subCtx, cancel := context.WithCancel(ctx)
614
+	defer cancel()
615
+
616
+	// Filter on both namespace *and* topic. To create an "and" filter,
617
+	// this must be a single, comma-separated string
618
+	eventStream, errC := c.client.EventService().Subscribe(subCtx, "namespace=="+ns+",topic~=|^/tasks/|")
619
+
620
+	c.logger.Debug("processing event stream")
621
+
622
+	for {
623
+		select {
624
+		case err := <-errC:
625
+			if err != nil {
626
+				errStatus, ok := status.FromError(err)
627
+				if !ok || errStatus.Code() != codes.Canceled {
628
+					c.logger.WithError(err).Error("Failed to get event")
629
+					c.logger.Info("Waiting for containerd to be ready to restart event processing")
630
+					if c.waitServe(ctx) {
631
+						go c.processEventStream(ctx, ns)
632
+						return
633
+					}
634
+				}
635
+				c.logger.WithError(ctx.Err()).Info("stopping event stream following graceful shutdown")
636
+			}
637
+			return
638
+		case ev := <-eventStream:
639
+			if ev.Event == nil {
640
+				c.logger.WithField("event", ev).Warn("invalid event")
641
+				continue
642
+			}
643
+
644
+			v, err := typeurl.UnmarshalAny(ev.Event)
645
+			if err != nil {
646
+				c.logger.WithError(err).WithField("event", ev).Warn("failed to unmarshal event")
647
+				continue
648
+			}
649
+
650
+			c.logger.WithField("topic", ev.Topic).Debug("event")
651
+
652
+			switch t := v.(type) {
653
+			case *apievents.TaskCreate:
654
+				c.processEvent(ctx, libcontainerdtypes.EventCreate, libcontainerdtypes.EventInfo{
655
+					ContainerID: t.ContainerID,
656
+					ProcessID:   t.ContainerID,
657
+					Pid:         t.Pid,
658
+				})
659
+			case *apievents.TaskStart:
660
+				c.processEvent(ctx, libcontainerdtypes.EventStart, libcontainerdtypes.EventInfo{
661
+					ContainerID: t.ContainerID,
662
+					ProcessID:   t.ContainerID,
663
+					Pid:         t.Pid,
664
+				})
665
+			case *apievents.TaskExit:
666
+				c.processEvent(ctx, libcontainerdtypes.EventExit, libcontainerdtypes.EventInfo{
667
+					ContainerID: t.ContainerID,
668
+					ProcessID:   t.ID,
669
+					Pid:         t.Pid,
670
+					ExitCode:    t.ExitStatus,
671
+					ExitedAt:    protobuf.FromTimestamp(t.ExitedAt),
672
+				})
673
+			case *apievents.TaskOOM:
674
+				c.processEvent(ctx, libcontainerdtypes.EventOOM, libcontainerdtypes.EventInfo{
675
+					ContainerID: t.ContainerID,
676
+				})
677
+			case *apievents.TaskExecAdded:
678
+				c.processEvent(ctx, libcontainerdtypes.EventExecAdded, libcontainerdtypes.EventInfo{
679
+					ContainerID: t.ContainerID,
680
+					ProcessID:   t.ExecID,
681
+				})
682
+			case *apievents.TaskExecStarted:
683
+				c.processEvent(ctx, libcontainerdtypes.EventExecStarted, libcontainerdtypes.EventInfo{
684
+					ContainerID: t.ContainerID,
685
+					ProcessID:   t.ExecID,
686
+					Pid:         t.Pid,
687
+				})
688
+			case *apievents.TaskPaused:
689
+				c.processEvent(ctx, libcontainerdtypes.EventPaused, libcontainerdtypes.EventInfo{
690
+					ContainerID: t.ContainerID,
691
+				})
692
+			case *apievents.TaskResumed:
693
+				c.processEvent(ctx, libcontainerdtypes.EventResumed, libcontainerdtypes.EventInfo{
694
+					ContainerID: t.ContainerID,
695
+				})
696
+			case *apievents.TaskDelete:
697
+				c.logger.WithFields(log.Fields{
698
+					"topic":     ev.Topic,
699
+					"type":      reflect.TypeOf(t),
700
+					"container": t.ContainerID,
701
+				}).Info("ignoring event")
702
+			default:
703
+				c.logger.WithFields(log.Fields{
704
+					"topic": ev.Topic,
705
+					"type":  reflect.TypeOf(t),
706
+				}).Info("ignoring event")
707
+			}
708
+		}
709
+	}
710
+}
711
+
712
+func (c *client) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) {
713
+	writer, err := c.client.ContentStore().Writer(ctx, content.WithRef(ref))
714
+	if err != nil {
715
+		return nil, err
716
+	}
717
+	defer writer.Close()
718
+	size, err := io.Copy(writer, r)
719
+	if err != nil {
720
+		return nil, err
721
+	}
722
+	labels := map[string]string{
723
+		"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
724
+	}
725
+	if err := writer.Commit(ctx, 0, "", content.WithLabels(labels)); err != nil {
726
+		return nil, err
727
+	}
728
+	return &types.Descriptor{
729
+		MediaType: mediaType,
730
+		Digest:    writer.Digest().String(),
731
+		Size:      size,
732
+	}, nil
733
+}
734
+
735
+func (c *client) bundleDir(id string) string {
736
+	return filepath.Join(c.stateDir, id)
737
+}
738
+
739
+func wrapError(err error) error {
740
+	switch {
741
+	case err == nil:
742
+		return nil
743
+	case cerrdefs.IsNotFound(err):
744
+		return errdefs.NotFound(err)
745
+	}
746
+
747
+	msg := err.Error()
748
+	for _, s := range []string{"container does not exist", "not found", "no such container"} {
749
+		if strings.Contains(msg, s) {
750
+			return errdefs.NotFound(err)
751
+		}
752
+	}
753
+	return err
754
+}
0 755
new file mode 100644
... ...
@@ -0,0 +1,160 @@
0
+package remote
1
+
2
+import (
3
+	"io"
4
+	"net"
5
+	"sync"
6
+
7
+	"github.com/Microsoft/go-winio"
8
+	"github.com/containerd/containerd/v2/pkg/cio"
9
+	"github.com/containerd/log"
10
+	"github.com/pkg/errors"
11
+)
12
+
13
+type delayedConnection struct {
14
+	l    net.Listener
15
+	con  net.Conn
16
+	wg   sync.WaitGroup
17
+	once sync.Once
18
+}
19
+
20
+func (dc *delayedConnection) Write(p []byte) (int, error) {
21
+	dc.wg.Wait()
22
+	if dc.con != nil {
23
+		return dc.con.Write(p)
24
+	}
25
+	return 0, errors.New("use of closed network connection")
26
+}
27
+
28
+func (dc *delayedConnection) Read(p []byte) (int, error) {
29
+	dc.wg.Wait()
30
+	if dc.con != nil {
31
+		return dc.con.Read(p)
32
+	}
33
+	return 0, errors.New("use of closed network connection")
34
+}
35
+
36
+func (dc *delayedConnection) unblockConnectionWaiters() {
37
+	defer dc.once.Do(func() {
38
+		dc.wg.Done()
39
+	})
40
+}
41
+
42
+func (dc *delayedConnection) Close() error {
43
+	_ = dc.l.Close()
44
+	if dc.con != nil {
45
+		return dc.con.Close()
46
+	}
47
+	dc.unblockConnectionWaiters()
48
+	return nil
49
+}
50
+
51
+type stdioPipes struct {
52
+	stdin  io.WriteCloser
53
+	stdout io.ReadCloser
54
+	stderr io.ReadCloser
55
+}
56
+
57
+// newStdioPipes creates actual fifos for stdio.
58
+func (c *client) newStdioPipes(fifos *cio.FIFOSet) (_ *stdioPipes, retErr error) {
59
+	p := &stdioPipes{}
60
+	if fifos.Stdin != "" {
61
+		c.logger.WithField("stdin", fifos.Stdin).Debug("listen")
62
+		l, err := winio.ListenPipe(fifos.Stdin, nil)
63
+		if err != nil {
64
+			return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.Stdin)
65
+		}
66
+		dc := &delayedConnection{
67
+			l: l,
68
+		}
69
+		dc.wg.Add(1)
70
+		defer func() {
71
+			if retErr != nil {
72
+				_ = dc.Close()
73
+			}
74
+		}()
75
+		p.stdin = dc
76
+
77
+		go func() {
78
+			c.logger.WithField("stdin", fifos.Stdin).Debug("accept")
79
+			conn, err := l.Accept()
80
+			if err != nil {
81
+				_ = dc.Close()
82
+				if err != winio.ErrPipeListenerClosed {
83
+					c.logger.WithError(err).Errorf("failed to accept stdin connection on %s", fifos.Stdin)
84
+				}
85
+				return
86
+			}
87
+			c.logger.WithField("stdin", fifos.Stdin).Debug("connected")
88
+			dc.con = conn
89
+			dc.unblockConnectionWaiters()
90
+		}()
91
+	}
92
+
93
+	if fifos.Stdout != "" {
94
+		c.logger.WithField("stdout", fifos.Stdout).Debug("listen")
95
+		l, err := winio.ListenPipe(fifos.Stdout, nil)
96
+		if err != nil {
97
+			return nil, errors.Wrapf(err, "failed to create stdout pipe %s", fifos.Stdout)
98
+		}
99
+		dc := &delayedConnection{
100
+			l: l,
101
+		}
102
+		dc.wg.Add(1)
103
+		defer func() {
104
+			if retErr != nil {
105
+				_ = dc.Close()
106
+			}
107
+		}()
108
+		p.stdout = dc
109
+
110
+		go func() {
111
+			c.logger.WithField("stdout", fifos.Stdout).Debug("accept")
112
+			conn, err := l.Accept()
113
+			if err != nil {
114
+				_ = dc.Close()
115
+				if err != winio.ErrPipeListenerClosed {
116
+					c.logger.WithFields(log.Fields{"error": err, "stdout": fifos.Stdout}).Error("failed to accept stdout connection")
117
+				}
118
+				return
119
+			}
120
+			c.logger.WithField("stdout", fifos.Stdout).Debug("connected")
121
+			dc.con = conn
122
+			dc.unblockConnectionWaiters()
123
+		}()
124
+	}
125
+
126
+	if fifos.Stderr != "" {
127
+		c.logger.WithField("stderr", fifos.Stderr).Debug("listen")
128
+		l, err := winio.ListenPipe(fifos.Stderr, nil)
129
+		if err != nil {
130
+			return nil, errors.Wrapf(err, "failed to create stderr pipe %s", fifos.Stderr)
131
+		}
132
+		dc := &delayedConnection{
133
+			l: l,
134
+		}
135
+		dc.wg.Add(1)
136
+		defer func() {
137
+			if retErr != nil {
138
+				_ = dc.Close()
139
+			}
140
+		}()
141
+		p.stderr = dc
142
+
143
+		go func() {
144
+			c.logger.WithField("stderr", fifos.Stderr).Debug("accept")
145
+			conn, err := l.Accept()
146
+			if err != nil {
147
+				_ = dc.Close()
148
+				if err != winio.ErrPipeListenerClosed {
149
+					c.logger.WithError(err).Errorf("failed to accept stderr connection on %s", fifos.Stderr)
150
+				}
151
+				return
152
+			}
153
+			c.logger.WithField("stderr", fifos.Stderr).Debug("connected")
154
+			dc.con = conn
155
+			dc.unblockConnectionWaiters()
156
+		}()
157
+	}
158
+	return p, nil
159
+}
0 160
new file mode 100644
... ...
@@ -0,0 +1,121 @@
0
+package remote
1
+
2
+import (
3
+	"context"
4
+	"fmt"
5
+	"os"
6
+	"path/filepath"
7
+	"strings"
8
+
9
+	containerd "github.com/containerd/containerd/v2/client"
10
+	"github.com/containerd/containerd/v2/core/containers"
11
+	"github.com/containerd/containerd/v2/pkg/cio"
12
+	"github.com/containerd/log"
13
+	libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
14
+	"github.com/moby/sys/user"
15
+	"github.com/opencontainers/runtime-spec/specs-go"
16
+)
17
+
18
+func summaryFromInterface(i interface{}) (*libcontainerdtypes.Summary, error) {
19
+	return &libcontainerdtypes.Summary{}, nil
20
+}
21
+
22
+func (t *task) UpdateResources(ctx context.Context, resources *libcontainerdtypes.Resources) error {
23
+	return t.Update(ctx, containerd.WithResources(resources))
24
+}
25
+
26
+func hostIDFromMap(id uint32, mp []specs.LinuxIDMapping) int {
27
+	for _, m := range mp {
28
+		if id >= m.ContainerID && id <= m.ContainerID+m.Size-1 {
29
+			return int(m.HostID + id - m.ContainerID)
30
+		}
31
+	}
32
+	return 0
33
+}
34
+
35
+func getSpecUser(ociSpec *specs.Spec) (int, int) {
36
+	var (
37
+		uid int
38
+		gid int
39
+	)
40
+
41
+	for _, ns := range ociSpec.Linux.Namespaces {
42
+		if ns.Type == specs.UserNamespace {
43
+			uid = hostIDFromMap(0, ociSpec.Linux.UIDMappings)
44
+			gid = hostIDFromMap(0, ociSpec.Linux.GIDMappings)
45
+			break
46
+		}
47
+	}
48
+
49
+	return uid, gid
50
+}
51
+
52
+// WithBundle creates the bundle for the container
53
+func WithBundle(bundleDir string, ociSpec *specs.Spec) containerd.NewContainerOpts {
54
+	return func(ctx context.Context, client *containerd.Client, c *containers.Container) error {
55
+		if c.Labels == nil {
56
+			c.Labels = make(map[string]string)
57
+		}
58
+		uid, gid := getSpecUser(ociSpec)
59
+		if uid == 0 && gid == 0 {
60
+			c.Labels[DockerContainerBundlePath] = bundleDir
61
+			return user.MkdirAllAndChown(bundleDir, 0o755, uid, gid, user.WithOnlyNew)
62
+		}
63
+
64
+		p := string(filepath.Separator)
65
+		components := strings.Split(bundleDir, string(filepath.Separator))
66
+		for _, d := range components[1:] {
67
+			p = filepath.Join(p, d)
68
+			fi, err := os.Stat(p)
69
+			if err != nil && !os.IsNotExist(err) {
70
+				return err
71
+			}
72
+			if os.IsNotExist(err) || fi.Mode()&1 == 0 {
73
+				p = fmt.Sprintf("%s.%d.%d", p, uid, gid)
74
+				if err := user.MkdirAndChown(p, 0o700, uid, gid); err != nil && !os.IsExist(err) {
75
+					return err
76
+				}
77
+			}
78
+		}
79
+		if c.Labels == nil {
80
+			c.Labels = make(map[string]string)
81
+		}
82
+		c.Labels[DockerContainerBundlePath] = p
83
+		return nil
84
+	}
85
+}
86
+
87
+func withLogLevel(_ log.Level) containerd.NewTaskOpts {
88
+	panic("Not implemented")
89
+}
90
+
91
+func newFIFOSet(bundleDir, processID string, withStdin, withTerminal bool) *cio.FIFOSet {
92
+	config := cio.Config{
93
+		Terminal: withTerminal,
94
+		Stdout:   filepath.Join(bundleDir, processID+"-stdout"),
95
+	}
96
+	paths := []string{config.Stdout}
97
+
98
+	if withStdin {
99
+		config.Stdin = filepath.Join(bundleDir, processID+"-stdin")
100
+		paths = append(paths, config.Stdin)
101
+	}
102
+	if !withTerminal {
103
+		config.Stderr = filepath.Join(bundleDir, processID+"-stderr")
104
+		paths = append(paths, config.Stderr)
105
+	}
106
+	closer := func() error {
107
+		for _, path := range paths {
108
+			if err := os.RemoveAll(path); err != nil {
109
+				log.G(context.TODO()).Warnf("libcontainerd: failed to remove fifo %v: %v", path, err)
110
+			}
111
+		}
112
+		return nil
113
+	}
114
+
115
+	return cio.NewFIFOSet(config, closer)
116
+}
117
+
118
+func (c *client) newDirectIO(ctx context.Context, fifos *cio.FIFOSet) (*cio.DirectIO, error) {
119
+	return cio.NewDirectIO(ctx, fifos)
120
+}
0 121
new file mode 100644
... ...
@@ -0,0 +1,98 @@
0
+package remote
1
+
2
+import (
3
+	"context"
4
+	"fmt"
5
+	"os"
6
+	"path/filepath"
7
+
8
+	"github.com/Microsoft/hcsshim/cmd/containerd-shim-runhcs-v1/options"
9
+	containerd "github.com/containerd/containerd/v2/client"
10
+	"github.com/containerd/containerd/v2/core/containers"
11
+	"github.com/containerd/containerd/v2/pkg/cio"
12
+	"github.com/containerd/log"
13
+	libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
14
+	"github.com/opencontainers/runtime-spec/specs-go"
15
+	"github.com/pkg/errors"
16
+)
17
+
18
+func summaryFromInterface(i interface{}) (*libcontainerdtypes.Summary, error) {
19
+	switch pd := i.(type) {
20
+	case *options.ProcessDetails:
21
+		return &libcontainerdtypes.Summary{
22
+			ImageName:                    pd.ImageName,
23
+			CreatedAt:                    pd.CreatedAt,
24
+			KernelTime_100Ns:             pd.KernelTime_100Ns,
25
+			MemoryCommitBytes:            pd.MemoryCommitBytes,
26
+			MemoryWorkingSetPrivateBytes: pd.MemoryWorkingSetPrivateBytes,
27
+			MemoryWorkingSetSharedBytes:  pd.MemoryWorkingSetSharedBytes,
28
+			ProcessID:                    pd.ProcessID,
29
+			UserTime_100Ns:               pd.UserTime_100Ns,
30
+			ExecID:                       pd.ExecID,
31
+		}, nil
32
+	default:
33
+		return nil, errors.Errorf("Unknown process details type %T", pd)
34
+	}
35
+}
36
+
37
+// WithBundle creates the bundle for the container
38
+func WithBundle(bundleDir string, ociSpec *specs.Spec) containerd.NewContainerOpts {
39
+	return func(ctx context.Context, client *containerd.Client, c *containers.Container) error {
40
+		// TODO: (containerd) Determine if we need to use system.MkdirAllWithACL here
41
+		if c.Labels == nil {
42
+			c.Labels = make(map[string]string)
43
+		}
44
+		c.Labels[DockerContainerBundlePath] = bundleDir
45
+		return os.MkdirAll(bundleDir, 0o755)
46
+	}
47
+}
48
+
49
+func withLogLevel(level log.Level) containerd.NewTaskOpts {
50
+	// Make sure we set the runhcs options to debug if we are at debug level.
51
+	return func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error {
52
+		if level == log.DebugLevel {
53
+			info.Options = &options.Options{Debug: true}
54
+		}
55
+		return nil
56
+	}
57
+}
58
+
59
+func pipeName(containerID, processID, name string) string {
60
+	return fmt.Sprintf(`\\.\pipe\containerd-%s-%s-%s`, containerID, processID, name)
61
+}
62
+
63
+func newFIFOSet(bundleDir, processID string, withStdin, withTerminal bool) *cio.FIFOSet {
64
+	containerID := filepath.Base(bundleDir)
65
+	config := cio.Config{
66
+		Terminal: withTerminal,
67
+		Stdout:   pipeName(containerID, processID, "stdout"),
68
+	}
69
+
70
+	if withStdin {
71
+		config.Stdin = pipeName(containerID, processID, "stdin")
72
+	}
73
+
74
+	if !config.Terminal {
75
+		config.Stderr = pipeName(containerID, processID, "stderr")
76
+	}
77
+
78
+	return cio.NewFIFOSet(config, nil)
79
+}
80
+
81
+func (c *client) newDirectIO(ctx context.Context, fifos *cio.FIFOSet) (*cio.DirectIO, error) {
82
+	pipes, err := c.newStdioPipes(fifos)
83
+	if err != nil {
84
+		return nil, err
85
+	}
86
+	return cio.NewDirectIOFromFIFOSet(ctx, pipes.stdin, pipes.stdout, pipes.stderr, fifos), nil
87
+}
88
+
89
+func (t *task) UpdateResources(ctx context.Context, resources *libcontainerdtypes.Resources) error {
90
+	// TODO: (containerd): Not implemented, but don't error.
91
+	return nil
92
+}
93
+
94
+func getSpecUser(ociSpec *specs.Spec) (int, int) {
95
+	// TODO: (containerd): Not implemented, but don't error.
96
+	return 0, 0
97
+}
... ...
@@ -4,7 +4,7 @@ import (
4 4
 	"context"
5 5
 
6 6
 	containerd "github.com/containerd/containerd/v2/client"
7
-	"github.com/docker/docker/libcontainerd/remote"
7
+	"github.com/docker/docker/daemon/internal/libcontainerd/remote"
8 8
 	libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
9 9
 )
10 10
 
... ...
@@ -5,7 +5,7 @@ import (
5 5
 
6 6
 	containerd "github.com/containerd/containerd/v2/client"
7 7
 	"github.com/docker/docker/daemon/internal/libcontainerd/local"
8
-	"github.com/docker/docker/libcontainerd/remote"
8
+	"github.com/docker/docker/daemon/internal/libcontainerd/remote"
9 9
 	libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
10 10
 	"github.com/docker/docker/pkg/system"
11 11
 )
12 12
deleted file mode 100644
... ...
@@ -1,755 +0,0 @@
1
-package remote
2
-
3
-import (
4
-	"context"
5
-	"encoding/json"
6
-	"io"
7
-	"os"
8
-	"path/filepath"
9
-	"reflect"
10
-	"runtime"
11
-	"strings"
12
-	"sync"
13
-	"syscall"
14
-	"time"
15
-
16
-	apievents "github.com/containerd/containerd/api/events"
17
-	"github.com/containerd/containerd/api/types"
18
-	runcoptions "github.com/containerd/containerd/api/types/runc/options"
19
-	containerd "github.com/containerd/containerd/v2/client"
20
-	"github.com/containerd/containerd/v2/core/content"
21
-	c8dimages "github.com/containerd/containerd/v2/core/images"
22
-	"github.com/containerd/containerd/v2/pkg/archive"
23
-	"github.com/containerd/containerd/v2/pkg/cio"
24
-	"github.com/containerd/containerd/v2/pkg/protobuf"
25
-	cerrdefs "github.com/containerd/errdefs"
26
-	"github.com/containerd/log"
27
-	"github.com/containerd/typeurl/v2"
28
-	"github.com/docker/docker/daemon/internal/libcontainerd/queue"
29
-	"github.com/docker/docker/errdefs"
30
-	libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
31
-	"github.com/docker/docker/pkg/ioutils"
32
-	"github.com/hashicorp/go-multierror"
33
-	"github.com/opencontainers/go-digest"
34
-	ocispec "github.com/opencontainers/image-spec/specs-go/v1"
35
-	"github.com/opencontainers/runtime-spec/specs-go"
36
-	"github.com/pkg/errors"
37
-	"go.opentelemetry.io/otel"
38
-	"google.golang.org/grpc/codes"
39
-	"google.golang.org/grpc/status"
40
-	"google.golang.org/protobuf/proto"
41
-)
42
-
43
-// DockerContainerBundlePath is the label key pointing to the container's bundle path
44
-const DockerContainerBundlePath = "com.docker/engine.bundle.path"
45
-
46
-type client struct {
47
-	client   *containerd.Client
48
-	stateDir string
49
-	logger   *log.Entry
50
-	ns       string
51
-
52
-	backend libcontainerdtypes.Backend
53
-	eventQ  queue.Queue
54
-}
55
-
56
-type container struct {
57
-	client *client
58
-	c8dCtr containerd.Container
59
-
60
-	v2runcoptions *runcoptions.Options
61
-}
62
-
63
-type task struct {
64
-	containerd.Task
65
-	ctr *container
66
-}
67
-
68
-type process struct {
69
-	containerd.Process
70
-}
71
-
72
-// NewClient creates a new libcontainerd client from a containerd client
73
-func NewClient(ctx context.Context, cli *containerd.Client, stateDir, ns string, b libcontainerdtypes.Backend) (libcontainerdtypes.Client, error) {
74
-	c := &client{
75
-		client:   cli,
76
-		stateDir: stateDir,
77
-		logger:   log.G(ctx).WithField("module", "libcontainerd").WithField("namespace", ns),
78
-		ns:       ns,
79
-		backend:  b,
80
-	}
81
-
82
-	go c.processEventStream(ctx, ns)
83
-
84
-	return c, nil
85
-}
86
-
87
-func (c *client) Version(ctx context.Context) (containerd.Version, error) {
88
-	return c.client.Version(ctx)
89
-}
90
-
91
-func (c *container) newTask(t containerd.Task) *task {
92
-	return &task{Task: t, ctr: c}
93
-}
94
-
95
-func (c *container) AttachTask(ctx context.Context, attachStdio libcontainerdtypes.StdioCallback) (_ libcontainerdtypes.Task, retErr error) {
96
-	var dio *cio.DirectIO
97
-	defer func() {
98
-		if retErr != nil && dio != nil {
99
-			dio.Cancel()
100
-			_ = dio.Close()
101
-		}
102
-	}()
103
-
104
-	attachIO := func(fifos *cio.FIFOSet) (cio.IO, error) {
105
-		// dio must be assigned to the previously defined dio for the defer above
106
-		// to handle cleanup
107
-		var err error
108
-		dio, err = c.client.newDirectIO(ctx, fifos)
109
-		if err != nil {
110
-			return nil, err
111
-		}
112
-		return attachStdio(dio)
113
-	}
114
-	t, err := c.c8dCtr.Task(ctx, attachIO)
115
-	if err != nil {
116
-		return nil, errors.Wrap(wrapError(err), "error getting containerd task for container")
117
-	}
118
-	return c.newTask(t), nil
119
-}
120
-
121
-func (c *client) NewContainer(ctx context.Context, id string, ociSpec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) (libcontainerdtypes.Container, error) {
122
-	bdir := c.bundleDir(id)
123
-	c.logger.WithField("bundle", bdir).WithField("root", ociSpec.Root.Path).Debug("bundle dir created")
124
-
125
-	newOpts := []containerd.NewContainerOpts{
126
-		containerd.WithSpec(ociSpec),
127
-		containerd.WithRuntime(shim, runtimeOptions),
128
-		WithBundle(bdir, ociSpec),
129
-	}
130
-	opts = append(opts, newOpts...)
131
-
132
-	ctr, err := c.client.NewContainer(ctx, id, opts...)
133
-	if err != nil {
134
-		if cerrdefs.IsAlreadyExists(err) {
135
-			return nil, errors.WithStack(errdefs.Conflict(errors.New("id already in use")))
136
-		}
137
-		return nil, wrapError(err)
138
-	}
139
-
140
-	created := container{
141
-		client: c,
142
-		c8dCtr: ctr,
143
-	}
144
-	if x, ok := runtimeOptions.(*runcoptions.Options); ok {
145
-		created.v2runcoptions = x
146
-	}
147
-	return &created, nil
148
-}
149
-
150
-// NewTask creates a task for the specified containerd id
151
-func (c *container) NewTask(ctx context.Context, checkpointDir string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (libcontainerdtypes.Task, error) {
152
-	var (
153
-		checkpoint     *types.Descriptor
154
-		t              containerd.Task
155
-		rio            cio.IO
156
-		stdinCloseSync = make(chan containerd.Process, 1)
157
-	)
158
-
159
-	ctx, span := otel.Tracer("").Start(ctx, "libcontainerd.remote.NewTask")
160
-	defer span.End()
161
-
162
-	if checkpointDir != "" {
163
-		// write checkpoint to the content store
164
-		tar := archive.Diff(ctx, "", checkpointDir)
165
-		var err error
166
-		checkpoint, err = c.client.writeContent(ctx, c8dimages.MediaTypeContainerd1Checkpoint, checkpointDir, tar)
167
-		// remove the checkpoint when we're done
168
-		defer func() {
169
-			if checkpoint != nil {
170
-				err := c.client.client.ContentStore().Delete(ctx, digest.Digest(checkpoint.Digest))
171
-				if err != nil {
172
-					c.client.logger.WithError(err).WithFields(log.Fields{
173
-						"ref":    checkpointDir,
174
-						"digest": checkpoint.Digest,
175
-					}).Warnf("failed to delete temporary checkpoint entry")
176
-				}
177
-			}
178
-		}()
179
-		if err := tar.Close(); err != nil {
180
-			return nil, errors.Wrap(err, "failed to close checkpoint tar stream")
181
-		}
182
-		if err != nil {
183
-			return nil, errors.Wrapf(err, "failed to upload checkpoint to containerd")
184
-		}
185
-	}
186
-
187
-	// Optimization: assume the relevant metadata has not changed in the
188
-	// moment since the container was created. Elide redundant RPC requests
189
-	// to refresh the metadata separately for spec and labels.
190
-	md, err := c.c8dCtr.Info(ctx, containerd.WithoutRefreshedMetadata)
191
-	if err != nil {
192
-		return nil, errors.Wrap(err, "failed to retrieve metadata")
193
-	}
194
-	bundle := md.Labels[DockerContainerBundlePath]
195
-
196
-	var spec specs.Spec
197
-	if err := json.Unmarshal(md.Spec.GetValue(), &spec); err != nil {
198
-		return nil, errors.Wrap(err, "failed to retrieve spec")
199
-	}
200
-	uid, gid := getSpecUser(&spec)
201
-
202
-	taskOpts := []containerd.NewTaskOpts{
203
-		func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error {
204
-			info.Checkpoint = checkpoint
205
-			return nil
206
-		},
207
-	}
208
-
209
-	if runtime.GOOS != "windows" {
210
-		taskOpts = append(taskOpts, func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error {
211
-			if c.v2runcoptions != nil {
212
-				opts := proto.Clone(c.v2runcoptions).(*runcoptions.Options)
213
-				opts.IoUid = uint32(uid)
214
-				opts.IoGid = uint32(gid)
215
-				info.Options = opts
216
-			}
217
-			return nil
218
-		})
219
-	} else {
220
-		taskOpts = append(taskOpts, withLogLevel(c.client.logger.Level))
221
-	}
222
-
223
-	t, err = c.c8dCtr.NewTask(ctx,
224
-		func(id string) (cio.IO, error) {
225
-			fifos := newFIFOSet(bundle, id, withStdin, spec.Process.Terminal)
226
-
227
-			rio, err = c.createIO(fifos, stdinCloseSync, attachStdio)
228
-			return rio, err
229
-		},
230
-		taskOpts...,
231
-	)
232
-	if err != nil {
233
-		close(stdinCloseSync)
234
-		if rio != nil {
235
-			rio.Cancel()
236
-			rio.Close()
237
-		}
238
-		return nil, errors.Wrap(wrapError(err), "failed to create task for container")
239
-	}
240
-
241
-	// Signal c.createIO that it can call CloseIO
242
-	stdinCloseSync <- t
243
-
244
-	return c.newTask(t), nil
245
-}
246
-
247
-func (t *task) Start(ctx context.Context) error {
248
-	ctx, span := otel.Tracer("").Start(ctx, "libcontainerd.remote.task.Start")
249
-	defer span.End()
250
-	return wrapError(t.Task.Start(ctx))
251
-}
252
-
253
-// Exec creates exec process.
254
-//
255
-// The containerd client calls Exec to register the exec config in the shim side.
256
-// When the client calls Start, the shim will create stdin fifo if needs. But
257
-// for the container main process, the stdin fifo will be created in Create not
258
-// the Start call. stdinCloseSync channel should be closed after Start exec
259
-// process.
260
-func (t *task) Exec(ctx context.Context, processID string, spec *specs.Process, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (libcontainerdtypes.Process, error) {
261
-	var (
262
-		p              containerd.Process
263
-		rio            cio.IO
264
-		stdinCloseSync = make(chan containerd.Process, 1)
265
-	)
266
-
267
-	// Optimization: assume the DockerContainerBundlePath label has not been
268
-	// updated since the container metadata was last loaded/refreshed.
269
-	md, err := t.ctr.c8dCtr.Info(ctx, containerd.WithoutRefreshedMetadata)
270
-	if err != nil {
271
-		return nil, wrapError(err)
272
-	}
273
-
274
-	fifos := newFIFOSet(md.Labels[DockerContainerBundlePath], processID, withStdin, spec.Terminal)
275
-
276
-	defer func() {
277
-		if err != nil {
278
-			if rio != nil {
279
-				rio.Cancel()
280
-				rio.Close()
281
-			}
282
-		}
283
-	}()
284
-
285
-	p, err = t.Task.Exec(ctx, processID, spec, func(id string) (cio.IO, error) {
286
-		rio, err = t.ctr.createIO(fifos, stdinCloseSync, attachStdio)
287
-		return rio, err
288
-	})
289
-	if err != nil {
290
-		close(stdinCloseSync)
291
-		if cerrdefs.IsAlreadyExists(err) {
292
-			return nil, errors.WithStack(errdefs.Conflict(errors.New("id already in use")))
293
-		}
294
-		return nil, wrapError(err)
295
-	}
296
-
297
-	// Signal c.createIO that it can call CloseIO
298
-	//
299
-	// the stdin of exec process will be created after p.Start in containerd
300
-	defer func() { stdinCloseSync <- p }()
301
-
302
-	if err = p.Start(ctx); err != nil {
303
-		// use new context for cleanup because old one may be cancelled by user, but leave a timeout to make sure
304
-		// we are not waiting forever if containerd is unresponsive or to work around fifo cancelling issues in
305
-		// older containerd-shim
306
-		ctx, cancel := context.WithTimeout(context.Background(), 45*time.Second)
307
-		defer cancel()
308
-		p.Delete(ctx)
309
-		return nil, wrapError(err)
310
-	}
311
-	return process{p}, nil
312
-}
313
-
314
-func (t *task) Kill(ctx context.Context, signal syscall.Signal) error {
315
-	return wrapError(t.Task.Kill(ctx, signal))
316
-}
317
-
318
-func (p process) Kill(ctx context.Context, signal syscall.Signal) error {
319
-	return wrapError(p.Process.Kill(ctx, signal))
320
-}
321
-
322
-func (t *task) Pause(ctx context.Context) error {
323
-	return wrapError(t.Task.Pause(ctx))
324
-}
325
-
326
-func (t *task) Resume(ctx context.Context) error {
327
-	return wrapError(t.Task.Resume(ctx))
328
-}
329
-
330
-func (t *task) Stats(ctx context.Context) (*libcontainerdtypes.Stats, error) {
331
-	m, err := t.Metrics(ctx)
332
-	if err != nil {
333
-		return nil, err
334
-	}
335
-
336
-	v, err := typeurl.UnmarshalAny(m.Data)
337
-	if err != nil {
338
-		return nil, err
339
-	}
340
-	return libcontainerdtypes.InterfaceToStats(protobuf.FromTimestamp(m.Timestamp), v), nil
341
-}
342
-
343
-func (t *task) Summary(ctx context.Context) ([]libcontainerdtypes.Summary, error) {
344
-	pis, err := t.Pids(ctx)
345
-	if err != nil {
346
-		return nil, err
347
-	}
348
-
349
-	var infos []libcontainerdtypes.Summary
350
-	for _, pi := range pis {
351
-		i, err := typeurl.UnmarshalAny(pi.Info)
352
-		if err != nil {
353
-			return nil, errors.Wrap(err, "unable to decode process details")
354
-		}
355
-		s, err := summaryFromInterface(i)
356
-		if err != nil {
357
-			return nil, err
358
-		}
359
-		infos = append(infos, *s)
360
-	}
361
-
362
-	return infos, nil
363
-}
364
-
365
-func (t *task) Delete(ctx context.Context) (*containerd.ExitStatus, error) {
366
-	s, err := t.Task.Delete(ctx)
367
-	return s, wrapError(err)
368
-}
369
-
370
-func (p process) Delete(ctx context.Context) (*containerd.ExitStatus, error) {
371
-	s, err := p.Process.Delete(ctx)
372
-	return s, wrapError(err)
373
-}
374
-
375
-func (c *container) Delete(ctx context.Context) error {
376
-	// Optimization: assume the DockerContainerBundlePath label has not been
377
-	// updated since the container metadata was last loaded/refreshed.
378
-	md, err := c.c8dCtr.Info(ctx, containerd.WithoutRefreshedMetadata)
379
-	if err != nil {
380
-		return err
381
-	}
382
-	bundle := md.Labels[DockerContainerBundlePath]
383
-	if err := c.c8dCtr.Delete(ctx); err != nil {
384
-		return wrapError(err)
385
-	}
386
-	if os.Getenv("LIBCONTAINERD_NOCLEAN") != "1" {
387
-		if err := os.RemoveAll(bundle); err != nil {
388
-			c.client.logger.WithContext(ctx).WithError(err).WithFields(log.Fields{
389
-				"container": c.c8dCtr.ID(),
390
-				"bundle":    bundle,
391
-			}).Error("failed to remove state dir")
392
-		}
393
-	}
394
-	return nil
395
-}
396
-
397
-func (t *task) ForceDelete(ctx context.Context) error {
398
-	_, err := t.Task.Delete(ctx, containerd.WithProcessKill)
399
-	return wrapError(err)
400
-}
401
-
402
-func (t *task) Status(ctx context.Context) (containerd.Status, error) {
403
-	s, err := t.Task.Status(ctx)
404
-	return s, wrapError(err)
405
-}
406
-
407
-func (p process) Status(ctx context.Context) (containerd.Status, error) {
408
-	s, err := p.Process.Status(ctx)
409
-	return s, wrapError(err)
410
-}
411
-
412
-func (c *container) getCheckpointOptions(exit bool) containerd.CheckpointTaskOpts {
413
-	return func(r *containerd.CheckpointTaskInfo) error {
414
-		if r.Options == nil && c.v2runcoptions != nil {
415
-			r.Options = &runcoptions.CheckpointOptions{}
416
-		}
417
-
418
-		switch opts := r.Options.(type) {
419
-		case *runcoptions.CheckpointOptions:
420
-			opts.Exit = exit
421
-		}
422
-
423
-		return nil
424
-	}
425
-}
426
-
427
-func (t *task) CreateCheckpoint(ctx context.Context, checkpointDir string, exit bool) error {
428
-	img, err := t.Task.Checkpoint(ctx, t.ctr.getCheckpointOptions(exit))
429
-	if err != nil {
430
-		return wrapError(err)
431
-	}
432
-	// Whatever happens, delete the checkpoint from containerd
433
-	defer func() {
434
-		err := t.ctr.client.client.ImageService().Delete(ctx, img.Name())
435
-		if err != nil {
436
-			t.ctr.client.logger.WithError(err).WithField("digest", img.Target().Digest).
437
-				Warnf("failed to delete checkpoint image")
438
-		}
439
-	}()
440
-
441
-	b, err := content.ReadBlob(ctx, t.ctr.client.client.ContentStore(), img.Target())
442
-	if err != nil {
443
-		return errdefs.System(errors.Wrapf(err, "failed to retrieve checkpoint data"))
444
-	}
445
-	var index ocispec.Index
446
-	if err := json.Unmarshal(b, &index); err != nil {
447
-		return errdefs.System(errors.Wrapf(err, "failed to decode checkpoint data"))
448
-	}
449
-
450
-	var cpDesc *ocispec.Descriptor
451
-	for _, m := range index.Manifests {
452
-		if m.MediaType == c8dimages.MediaTypeContainerd1Checkpoint {
453
-			cpDesc = &m //nolint:gosec
454
-			break
455
-		}
456
-	}
457
-	if cpDesc == nil {
458
-		return errdefs.System(errors.Wrapf(err, "invalid checkpoint"))
459
-	}
460
-
461
-	rat, err := t.ctr.client.client.ContentStore().ReaderAt(ctx, *cpDesc)
462
-	if err != nil {
463
-		return errdefs.System(errors.Wrapf(err, "failed to get checkpoint reader"))
464
-	}
465
-	defer rat.Close()
466
-	_, err = archive.Apply(ctx, checkpointDir, content.NewReader(rat))
467
-	if err != nil {
468
-		return errdefs.System(errors.Wrapf(err, "failed to read checkpoint reader"))
469
-	}
470
-
471
-	return err
472
-}
473
-
474
-// LoadContainer loads the containerd container.
475
-func (c *client) LoadContainer(ctx context.Context, id string) (libcontainerdtypes.Container, error) {
476
-	ctr, err := c.client.LoadContainer(ctx, id)
477
-	if err != nil {
478
-		if cerrdefs.IsNotFound(err) {
479
-			return nil, errors.WithStack(errdefs.NotFound(errors.New("no such container")))
480
-		}
481
-		return nil, wrapError(err)
482
-	}
483
-	return &container{client: c, c8dCtr: ctr}, nil
484
-}
485
-
486
-func (c *container) Task(ctx context.Context) (libcontainerdtypes.Task, error) {
487
-	t, err := c.c8dCtr.Task(ctx, nil)
488
-	if err != nil {
489
-		return nil, wrapError(err)
490
-	}
491
-	return c.newTask(t), nil
492
-}
493
-
494
-// createIO creates the io to be used by a process
495
-// This needs to get a pointer to interface as upon closure the process may not have yet been registered
496
-func (c *container) createIO(fifos *cio.FIFOSet, stdinCloseSync chan containerd.Process, attachStdio libcontainerdtypes.StdioCallback) (cio.IO, error) {
497
-	var (
498
-		io  *cio.DirectIO
499
-		err error
500
-	)
501
-	io, err = c.client.newDirectIO(context.Background(), fifos)
502
-	if err != nil {
503
-		return nil, err
504
-	}
505
-
506
-	if io.Stdin != nil {
507
-		var (
508
-			closeErr  error
509
-			stdinOnce sync.Once
510
-		)
511
-		pipe := io.Stdin
512
-		io.Stdin = ioutils.NewWriteCloserWrapper(pipe, func() error {
513
-			stdinOnce.Do(func() {
514
-				closeErr = pipe.Close()
515
-
516
-				select {
517
-				case p, ok := <-stdinCloseSync:
518
-					if !ok {
519
-						return
520
-					}
521
-					if err := closeStdin(context.Background(), p); err != nil {
522
-						if closeErr != nil {
523
-							closeErr = multierror.Append(closeErr, err)
524
-						} else {
525
-							// Avoid wrapping a single error in a multierror.
526
-							closeErr = err
527
-						}
528
-					}
529
-				default:
530
-					// The process wasn't ready. Close its stdin asynchronously.
531
-					go func() {
532
-						p, ok := <-stdinCloseSync
533
-						if !ok {
534
-							return
535
-						}
536
-						if err := closeStdin(context.Background(), p); err != nil {
537
-							c.client.logger.WithError(err).
538
-								WithField("container", c.c8dCtr.ID()).
539
-								Error("failed to close container stdin")
540
-						}
541
-					}()
542
-				}
543
-			})
544
-			return closeErr
545
-		})
546
-	}
547
-
548
-	rio, err := attachStdio(io)
549
-	if err != nil {
550
-		io.Cancel()
551
-		io.Close()
552
-	}
553
-	return rio, err
554
-}
555
-
556
-func closeStdin(ctx context.Context, p containerd.Process) error {
557
-	err := p.CloseIO(ctx, containerd.WithStdinCloser)
558
-	if err != nil && strings.Contains(err.Error(), "transport is closing") {
559
-		err = nil
560
-	}
561
-	return err
562
-}
563
-
564
-func (c *client) processEvent(ctx context.Context, et libcontainerdtypes.EventType, ei libcontainerdtypes.EventInfo) {
565
-	c.eventQ.Append(ei.ContainerID, func() {
566
-		err := c.backend.ProcessEvent(ei.ContainerID, et, ei)
567
-		if err != nil {
568
-			c.logger.WithContext(ctx).WithError(err).WithFields(log.Fields{
569
-				"container":  ei.ContainerID,
570
-				"event":      et,
571
-				"event-info": ei,
572
-			}).Error("failed to process event")
573
-		}
574
-	})
575
-}
576
-
577
-func (c *client) waitServe(ctx context.Context) bool {
578
-	t := 100 * time.Millisecond
579
-	delay := time.NewTimer(t)
580
-	if !delay.Stop() {
581
-		<-delay.C
582
-	}
583
-	defer delay.Stop()
584
-
585
-	// `IsServing` will actually block until the service is ready.
586
-	// However it can return early, so we'll loop with a delay to handle it.
587
-	for {
588
-		serving, err := c.client.IsServing(ctx)
589
-		if err != nil {
590
-			if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
591
-				return false
592
-			}
593
-			log.G(ctx).WithError(err).Warn("Error while testing if containerd API is ready")
594
-		}
595
-
596
-		if serving {
597
-			return true
598
-		}
599
-
600
-		delay.Reset(t)
601
-		select {
602
-		case <-ctx.Done():
603
-			return false
604
-		case <-delay.C:
605
-		}
606
-	}
607
-}
608
-
609
-func (c *client) processEventStream(ctx context.Context, ns string) {
610
-	// Create a new context specifically for this subscription.
611
-	// The context must be cancelled to cancel the subscription.
612
-	// In cases where we have to restart event stream processing,
613
-	//   we'll need the original context b/c this one will be cancelled
614
-	subCtx, cancel := context.WithCancel(ctx)
615
-	defer cancel()
616
-
617
-	// Filter on both namespace *and* topic. To create an "and" filter,
618
-	// this must be a single, comma-separated string
619
-	eventStream, errC := c.client.EventService().Subscribe(subCtx, "namespace=="+ns+",topic~=|^/tasks/|")
620
-
621
-	c.logger.Debug("processing event stream")
622
-
623
-	for {
624
-		select {
625
-		case err := <-errC:
626
-			if err != nil {
627
-				errStatus, ok := status.FromError(err)
628
-				if !ok || errStatus.Code() != codes.Canceled {
629
-					c.logger.WithError(err).Error("Failed to get event")
630
-					c.logger.Info("Waiting for containerd to be ready to restart event processing")
631
-					if c.waitServe(ctx) {
632
-						go c.processEventStream(ctx, ns)
633
-						return
634
-					}
635
-				}
636
-				c.logger.WithError(ctx.Err()).Info("stopping event stream following graceful shutdown")
637
-			}
638
-			return
639
-		case ev := <-eventStream:
640
-			if ev.Event == nil {
641
-				c.logger.WithField("event", ev).Warn("invalid event")
642
-				continue
643
-			}
644
-
645
-			v, err := typeurl.UnmarshalAny(ev.Event)
646
-			if err != nil {
647
-				c.logger.WithError(err).WithField("event", ev).Warn("failed to unmarshal event")
648
-				continue
649
-			}
650
-
651
-			c.logger.WithField("topic", ev.Topic).Debug("event")
652
-
653
-			switch t := v.(type) {
654
-			case *apievents.TaskCreate:
655
-				c.processEvent(ctx, libcontainerdtypes.EventCreate, libcontainerdtypes.EventInfo{
656
-					ContainerID: t.ContainerID,
657
-					ProcessID:   t.ContainerID,
658
-					Pid:         t.Pid,
659
-				})
660
-			case *apievents.TaskStart:
661
-				c.processEvent(ctx, libcontainerdtypes.EventStart, libcontainerdtypes.EventInfo{
662
-					ContainerID: t.ContainerID,
663
-					ProcessID:   t.ContainerID,
664
-					Pid:         t.Pid,
665
-				})
666
-			case *apievents.TaskExit:
667
-				c.processEvent(ctx, libcontainerdtypes.EventExit, libcontainerdtypes.EventInfo{
668
-					ContainerID: t.ContainerID,
669
-					ProcessID:   t.ID,
670
-					Pid:         t.Pid,
671
-					ExitCode:    t.ExitStatus,
672
-					ExitedAt:    protobuf.FromTimestamp(t.ExitedAt),
673
-				})
674
-			case *apievents.TaskOOM:
675
-				c.processEvent(ctx, libcontainerdtypes.EventOOM, libcontainerdtypes.EventInfo{
676
-					ContainerID: t.ContainerID,
677
-				})
678
-			case *apievents.TaskExecAdded:
679
-				c.processEvent(ctx, libcontainerdtypes.EventExecAdded, libcontainerdtypes.EventInfo{
680
-					ContainerID: t.ContainerID,
681
-					ProcessID:   t.ExecID,
682
-				})
683
-			case *apievents.TaskExecStarted:
684
-				c.processEvent(ctx, libcontainerdtypes.EventExecStarted, libcontainerdtypes.EventInfo{
685
-					ContainerID: t.ContainerID,
686
-					ProcessID:   t.ExecID,
687
-					Pid:         t.Pid,
688
-				})
689
-			case *apievents.TaskPaused:
690
-				c.processEvent(ctx, libcontainerdtypes.EventPaused, libcontainerdtypes.EventInfo{
691
-					ContainerID: t.ContainerID,
692
-				})
693
-			case *apievents.TaskResumed:
694
-				c.processEvent(ctx, libcontainerdtypes.EventResumed, libcontainerdtypes.EventInfo{
695
-					ContainerID: t.ContainerID,
696
-				})
697
-			case *apievents.TaskDelete:
698
-				c.logger.WithFields(log.Fields{
699
-					"topic":     ev.Topic,
700
-					"type":      reflect.TypeOf(t),
701
-					"container": t.ContainerID,
702
-				}).Info("ignoring event")
703
-			default:
704
-				c.logger.WithFields(log.Fields{
705
-					"topic": ev.Topic,
706
-					"type":  reflect.TypeOf(t),
707
-				}).Info("ignoring event")
708
-			}
709
-		}
710
-	}
711
-}
712
-
713
-func (c *client) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) {
714
-	writer, err := c.client.ContentStore().Writer(ctx, content.WithRef(ref))
715
-	if err != nil {
716
-		return nil, err
717
-	}
718
-	defer writer.Close()
719
-	size, err := io.Copy(writer, r)
720
-	if err != nil {
721
-		return nil, err
722
-	}
723
-	labels := map[string]string{
724
-		"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
725
-	}
726
-	if err := writer.Commit(ctx, 0, "", content.WithLabels(labels)); err != nil {
727
-		return nil, err
728
-	}
729
-	return &types.Descriptor{
730
-		MediaType: mediaType,
731
-		Digest:    writer.Digest().String(),
732
-		Size:      size,
733
-	}, nil
734
-}
735
-
736
-func (c *client) bundleDir(id string) string {
737
-	return filepath.Join(c.stateDir, id)
738
-}
739
-
740
-func wrapError(err error) error {
741
-	switch {
742
-	case err == nil:
743
-		return nil
744
-	case cerrdefs.IsNotFound(err):
745
-		return errdefs.NotFound(err)
746
-	}
747
-
748
-	msg := err.Error()
749
-	for _, s := range []string{"container does not exist", "not found", "no such container"} {
750
-		if strings.Contains(msg, s) {
751
-			return errdefs.NotFound(err)
752
-		}
753
-	}
754
-	return err
755
-}
756 1
deleted file mode 100644
... ...
@@ -1,160 +0,0 @@
1
-package remote
2
-
3
-import (
4
-	"io"
5
-	"net"
6
-	"sync"
7
-
8
-	"github.com/Microsoft/go-winio"
9
-	"github.com/containerd/containerd/v2/pkg/cio"
10
-	"github.com/containerd/log"
11
-	"github.com/pkg/errors"
12
-)
13
-
14
-type delayedConnection struct {
15
-	l    net.Listener
16
-	con  net.Conn
17
-	wg   sync.WaitGroup
18
-	once sync.Once
19
-}
20
-
21
-func (dc *delayedConnection) Write(p []byte) (int, error) {
22
-	dc.wg.Wait()
23
-	if dc.con != nil {
24
-		return dc.con.Write(p)
25
-	}
26
-	return 0, errors.New("use of closed network connection")
27
-}
28
-
29
-func (dc *delayedConnection) Read(p []byte) (int, error) {
30
-	dc.wg.Wait()
31
-	if dc.con != nil {
32
-		return dc.con.Read(p)
33
-	}
34
-	return 0, errors.New("use of closed network connection")
35
-}
36
-
37
-func (dc *delayedConnection) unblockConnectionWaiters() {
38
-	defer dc.once.Do(func() {
39
-		dc.wg.Done()
40
-	})
41
-}
42
-
43
-func (dc *delayedConnection) Close() error {
44
-	_ = dc.l.Close()
45
-	if dc.con != nil {
46
-		return dc.con.Close()
47
-	}
48
-	dc.unblockConnectionWaiters()
49
-	return nil
50
-}
51
-
52
-type stdioPipes struct {
53
-	stdin  io.WriteCloser
54
-	stdout io.ReadCloser
55
-	stderr io.ReadCloser
56
-}
57
-
58
-// newStdioPipes creates actual fifos for stdio.
59
-func (c *client) newStdioPipes(fifos *cio.FIFOSet) (_ *stdioPipes, retErr error) {
60
-	p := &stdioPipes{}
61
-	if fifos.Stdin != "" {
62
-		c.logger.WithField("stdin", fifos.Stdin).Debug("listen")
63
-		l, err := winio.ListenPipe(fifos.Stdin, nil)
64
-		if err != nil {
65
-			return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.Stdin)
66
-		}
67
-		dc := &delayedConnection{
68
-			l: l,
69
-		}
70
-		dc.wg.Add(1)
71
-		defer func() {
72
-			if retErr != nil {
73
-				_ = dc.Close()
74
-			}
75
-		}()
76
-		p.stdin = dc
77
-
78
-		go func() {
79
-			c.logger.WithField("stdin", fifos.Stdin).Debug("accept")
80
-			conn, err := l.Accept()
81
-			if err != nil {
82
-				_ = dc.Close()
83
-				if err != winio.ErrPipeListenerClosed {
84
-					c.logger.WithError(err).Errorf("failed to accept stdin connection on %s", fifos.Stdin)
85
-				}
86
-				return
87
-			}
88
-			c.logger.WithField("stdin", fifos.Stdin).Debug("connected")
89
-			dc.con = conn
90
-			dc.unblockConnectionWaiters()
91
-		}()
92
-	}
93
-
94
-	if fifos.Stdout != "" {
95
-		c.logger.WithField("stdout", fifos.Stdout).Debug("listen")
96
-		l, err := winio.ListenPipe(fifos.Stdout, nil)
97
-		if err != nil {
98
-			return nil, errors.Wrapf(err, "failed to create stdout pipe %s", fifos.Stdout)
99
-		}
100
-		dc := &delayedConnection{
101
-			l: l,
102
-		}
103
-		dc.wg.Add(1)
104
-		defer func() {
105
-			if retErr != nil {
106
-				_ = dc.Close()
107
-			}
108
-		}()
109
-		p.stdout = dc
110
-
111
-		go func() {
112
-			c.logger.WithField("stdout", fifos.Stdout).Debug("accept")
113
-			conn, err := l.Accept()
114
-			if err != nil {
115
-				_ = dc.Close()
116
-				if err != winio.ErrPipeListenerClosed {
117
-					c.logger.WithFields(log.Fields{"error": err, "stdout": fifos.Stdout}).Error("failed to accept stdout connection")
118
-				}
119
-				return
120
-			}
121
-			c.logger.WithField("stdout", fifos.Stdout).Debug("connected")
122
-			dc.con = conn
123
-			dc.unblockConnectionWaiters()
124
-		}()
125
-	}
126
-
127
-	if fifos.Stderr != "" {
128
-		c.logger.WithField("stderr", fifos.Stderr).Debug("listen")
129
-		l, err := winio.ListenPipe(fifos.Stderr, nil)
130
-		if err != nil {
131
-			return nil, errors.Wrapf(err, "failed to create stderr pipe %s", fifos.Stderr)
132
-		}
133
-		dc := &delayedConnection{
134
-			l: l,
135
-		}
136
-		dc.wg.Add(1)
137
-		defer func() {
138
-			if retErr != nil {
139
-				_ = dc.Close()
140
-			}
141
-		}()
142
-		p.stderr = dc
143
-
144
-		go func() {
145
-			c.logger.WithField("stderr", fifos.Stderr).Debug("accept")
146
-			conn, err := l.Accept()
147
-			if err != nil {
148
-				_ = dc.Close()
149
-				if err != winio.ErrPipeListenerClosed {
150
-					c.logger.WithError(err).Errorf("failed to accept stderr connection on %s", fifos.Stderr)
151
-				}
152
-				return
153
-			}
154
-			c.logger.WithField("stderr", fifos.Stderr).Debug("connected")
155
-			dc.con = conn
156
-			dc.unblockConnectionWaiters()
157
-		}()
158
-	}
159
-	return p, nil
160
-}
161 1
deleted file mode 100644
... ...
@@ -1,121 +0,0 @@
1
-package remote
2
-
3
-import (
4
-	"context"
5
-	"fmt"
6
-	"os"
7
-	"path/filepath"
8
-	"strings"
9
-
10
-	containerd "github.com/containerd/containerd/v2/client"
11
-	"github.com/containerd/containerd/v2/core/containers"
12
-	"github.com/containerd/containerd/v2/pkg/cio"
13
-	"github.com/containerd/log"
14
-	libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
15
-	"github.com/moby/sys/user"
16
-	"github.com/opencontainers/runtime-spec/specs-go"
17
-)
18
-
19
-func summaryFromInterface(i interface{}) (*libcontainerdtypes.Summary, error) {
20
-	return &libcontainerdtypes.Summary{}, nil
21
-}
22
-
23
-func (t *task) UpdateResources(ctx context.Context, resources *libcontainerdtypes.Resources) error {
24
-	return t.Update(ctx, containerd.WithResources(resources))
25
-}
26
-
27
-func hostIDFromMap(id uint32, mp []specs.LinuxIDMapping) int {
28
-	for _, m := range mp {
29
-		if id >= m.ContainerID && id <= m.ContainerID+m.Size-1 {
30
-			return int(m.HostID + id - m.ContainerID)
31
-		}
32
-	}
33
-	return 0
34
-}
35
-
36
-func getSpecUser(ociSpec *specs.Spec) (int, int) {
37
-	var (
38
-		uid int
39
-		gid int
40
-	)
41
-
42
-	for _, ns := range ociSpec.Linux.Namespaces {
43
-		if ns.Type == specs.UserNamespace {
44
-			uid = hostIDFromMap(0, ociSpec.Linux.UIDMappings)
45
-			gid = hostIDFromMap(0, ociSpec.Linux.GIDMappings)
46
-			break
47
-		}
48
-	}
49
-
50
-	return uid, gid
51
-}
52
-
53
-// WithBundle creates the bundle for the container
54
-func WithBundle(bundleDir string, ociSpec *specs.Spec) containerd.NewContainerOpts {
55
-	return func(ctx context.Context, client *containerd.Client, c *containers.Container) error {
56
-		if c.Labels == nil {
57
-			c.Labels = make(map[string]string)
58
-		}
59
-		uid, gid := getSpecUser(ociSpec)
60
-		if uid == 0 && gid == 0 {
61
-			c.Labels[DockerContainerBundlePath] = bundleDir
62
-			return user.MkdirAllAndChown(bundleDir, 0o755, uid, gid, user.WithOnlyNew)
63
-		}
64
-
65
-		p := string(filepath.Separator)
66
-		components := strings.Split(bundleDir, string(filepath.Separator))
67
-		for _, d := range components[1:] {
68
-			p = filepath.Join(p, d)
69
-			fi, err := os.Stat(p)
70
-			if err != nil && !os.IsNotExist(err) {
71
-				return err
72
-			}
73
-			if os.IsNotExist(err) || fi.Mode()&1 == 0 {
74
-				p = fmt.Sprintf("%s.%d.%d", p, uid, gid)
75
-				if err := user.MkdirAndChown(p, 0o700, uid, gid); err != nil && !os.IsExist(err) {
76
-					return err
77
-				}
78
-			}
79
-		}
80
-		if c.Labels == nil {
81
-			c.Labels = make(map[string]string)
82
-		}
83
-		c.Labels[DockerContainerBundlePath] = p
84
-		return nil
85
-	}
86
-}
87
-
88
-func withLogLevel(_ log.Level) containerd.NewTaskOpts {
89
-	panic("Not implemented")
90
-}
91
-
92
-func newFIFOSet(bundleDir, processID string, withStdin, withTerminal bool) *cio.FIFOSet {
93
-	config := cio.Config{
94
-		Terminal: withTerminal,
95
-		Stdout:   filepath.Join(bundleDir, processID+"-stdout"),
96
-	}
97
-	paths := []string{config.Stdout}
98
-
99
-	if withStdin {
100
-		config.Stdin = filepath.Join(bundleDir, processID+"-stdin")
101
-		paths = append(paths, config.Stdin)
102
-	}
103
-	if !withTerminal {
104
-		config.Stderr = filepath.Join(bundleDir, processID+"-stderr")
105
-		paths = append(paths, config.Stderr)
106
-	}
107
-	closer := func() error {
108
-		for _, path := range paths {
109
-			if err := os.RemoveAll(path); err != nil {
110
-				log.G(context.TODO()).Warnf("libcontainerd: failed to remove fifo %v: %v", path, err)
111
-			}
112
-		}
113
-		return nil
114
-	}
115
-
116
-	return cio.NewFIFOSet(config, closer)
117
-}
118
-
119
-func (c *client) newDirectIO(ctx context.Context, fifos *cio.FIFOSet) (*cio.DirectIO, error) {
120
-	return cio.NewDirectIO(ctx, fifos)
121
-}
122 1
deleted file mode 100644
... ...
@@ -1,98 +0,0 @@
1
-package remote
2
-
3
-import (
4
-	"context"
5
-	"fmt"
6
-	"os"
7
-	"path/filepath"
8
-
9
-	"github.com/Microsoft/hcsshim/cmd/containerd-shim-runhcs-v1/options"
10
-	containerd "github.com/containerd/containerd/v2/client"
11
-	"github.com/containerd/containerd/v2/core/containers"
12
-	"github.com/containerd/containerd/v2/pkg/cio"
13
-	"github.com/containerd/log"
14
-	libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
15
-	"github.com/opencontainers/runtime-spec/specs-go"
16
-	"github.com/pkg/errors"
17
-)
18
-
19
-func summaryFromInterface(i interface{}) (*libcontainerdtypes.Summary, error) {
20
-	switch pd := i.(type) {
21
-	case *options.ProcessDetails:
22
-		return &libcontainerdtypes.Summary{
23
-			ImageName:                    pd.ImageName,
24
-			CreatedAt:                    pd.CreatedAt,
25
-			KernelTime_100Ns:             pd.KernelTime_100Ns,
26
-			MemoryCommitBytes:            pd.MemoryCommitBytes,
27
-			MemoryWorkingSetPrivateBytes: pd.MemoryWorkingSetPrivateBytes,
28
-			MemoryWorkingSetSharedBytes:  pd.MemoryWorkingSetSharedBytes,
29
-			ProcessID:                    pd.ProcessID,
30
-			UserTime_100Ns:               pd.UserTime_100Ns,
31
-			ExecID:                       pd.ExecID,
32
-		}, nil
33
-	default:
34
-		return nil, errors.Errorf("Unknown process details type %T", pd)
35
-	}
36
-}
37
-
38
-// WithBundle creates the bundle for the container
39
-func WithBundle(bundleDir string, ociSpec *specs.Spec) containerd.NewContainerOpts {
40
-	return func(ctx context.Context, client *containerd.Client, c *containers.Container) error {
41
-		// TODO: (containerd) Determine if we need to use system.MkdirAllWithACL here
42
-		if c.Labels == nil {
43
-			c.Labels = make(map[string]string)
44
-		}
45
-		c.Labels[DockerContainerBundlePath] = bundleDir
46
-		return os.MkdirAll(bundleDir, 0o755)
47
-	}
48
-}
49
-
50
-func withLogLevel(level log.Level) containerd.NewTaskOpts {
51
-	// Make sure we set the runhcs options to debug if we are at debug level.
52
-	return func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error {
53
-		if level == log.DebugLevel {
54
-			info.Options = &options.Options{Debug: true}
55
-		}
56
-		return nil
57
-	}
58
-}
59
-
60
-func pipeName(containerID, processID, name string) string {
61
-	return fmt.Sprintf(`\\.\pipe\containerd-%s-%s-%s`, containerID, processID, name)
62
-}
63
-
64
-func newFIFOSet(bundleDir, processID string, withStdin, withTerminal bool) *cio.FIFOSet {
65
-	containerID := filepath.Base(bundleDir)
66
-	config := cio.Config{
67
-		Terminal: withTerminal,
68
-		Stdout:   pipeName(containerID, processID, "stdout"),
69
-	}
70
-
71
-	if withStdin {
72
-		config.Stdin = pipeName(containerID, processID, "stdin")
73
-	}
74
-
75
-	if !config.Terminal {
76
-		config.Stderr = pipeName(containerID, processID, "stderr")
77
-	}
78
-
79
-	return cio.NewFIFOSet(config, nil)
80
-}
81
-
82
-func (c *client) newDirectIO(ctx context.Context, fifos *cio.FIFOSet) (*cio.DirectIO, error) {
83
-	pipes, err := c.newStdioPipes(fifos)
84
-	if err != nil {
85
-		return nil, err
86
-	}
87
-	return cio.NewDirectIOFromFIFOSet(ctx, pipes.stdin, pipes.stdout, pipes.stderr, fifos), nil
88
-}
89
-
90
-func (t *task) UpdateResources(ctx context.Context, resources *libcontainerdtypes.Resources) error {
91
-	// TODO: (containerd): Not implemented, but don't error.
92
-	return nil
93
-}
94
-
95
-func getSpecUser(ociSpec *specs.Spec) (int, int) {
96
-	// TODO: (containerd): Not implemented, but don't error.
97
-	return 0, 0
98
-}