package migration import ( "bytes" "context" "encoding/json" "fmt" "io" "strings" "sync" "time" "github.com/containerd/containerd/v2/core/content" "github.com/containerd/containerd/v2/core/images" "github.com/containerd/containerd/v2/core/leases" "github.com/containerd/containerd/v2/core/mount" "github.com/containerd/containerd/v2/core/snapshots" "github.com/containerd/containerd/v2/pkg/archive/compression" "github.com/containerd/continuity/fs" cerrdefs "github.com/containerd/errdefs" "github.com/containerd/log" "github.com/moby/moby/v2/daemon/internal/image" "github.com/moby/moby/v2/daemon/internal/layer" refstore "github.com/moby/moby/v2/daemon/internal/refstore" "github.com/opencontainers/go-digest" "github.com/opencontainers/image-spec/identity" "github.com/opencontainers/image-spec/specs-go" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "golang.org/x/sync/errgroup" ) type LayerMigrator struct { layers layer.Store refs refstore.Store dis image.Store leases leases.Manager content content.Store cis images.Store } type Config struct { ImageCount int LayerStore layer.Store ReferenceStore refstore.Store DockerImageStore image.Store Leases leases.Manager Content content.Store ImageStore images.Store } func NewLayerMigrator(config Config) *LayerMigrator { return &LayerMigrator{ layers: config.LayerStore, refs: config.ReferenceStore, dis: config.DockerImageStore, leases: config.Leases, content: config.Content, cis: config.ImageStore, } } // MigrateTocontainerd migrates containers from overlay2 to overlayfs or vfs to native func (lm *LayerMigrator) MigrateTocontainerd(ctx context.Context, snKey string, sn snapshots.Snapshotter) error { if sn == nil { return fmt.Errorf("no snapshotter to migrate to: %w", cerrdefs.ErrNotImplemented) } switch driver := lm.layers.DriverName(); driver { case "overlay2": case "vfs": default: return fmt.Errorf("%q not supported for migration: %w", driver, cerrdefs.ErrNotImplemented) } var ( // Zstd makes migration 10x faster // TODO: make configurable layerMediaType = ocispec.MediaTypeImageLayerZstd layerCompression = compression.Zstd ) l, err := lm.leases.Create(ctx, leases.WithRandomID(), leases.WithExpiration(24*time.Hour)) if err != nil { return err } defer func() { lm.leases.Delete(ctx, l) }() ctx = leases.WithLease(ctx, l.ID) for imgID, img := range lm.dis.Heads() { diffids := img.RootFS.DiffIDs if len(diffids) == 0 { continue } var ( parent string manifest = ocispec.Manifest{ MediaType: ocispec.MediaTypeImageManifest, Versioned: specs.Versioned{ SchemaVersion: 2, }, Layers: make([]ocispec.Descriptor, len(diffids)), } ml sync.Mutex eg, egctx = errgroup.WithContext(ctx) ) for i := range diffids { chainID := identity.ChainID(diffids[:i+1]) l, err := lm.layers.Get(chainID) if err != nil { return fmt.Errorf("failed to get layer [%d] %q: %w", i, chainID, err) } layerIndex := i eg.Go(func() error { ctx := egctx t1 := time.Now() ts, err := l.TarStream() if err != nil { return err } desc := ocispec.Descriptor{ MediaType: layerMediaType, } cw, err := lm.content.Writer(ctx, content.WithRef(fmt.Sprintf("ingest-%s", chainID)), content.WithDescriptor(desc)) if err != nil { return fmt.Errorf("failed to get content writer: %w", err) } dgstr := digest.Canonical.Digester() cs, _ := compression.CompressStream(io.MultiWriter(cw, dgstr.Hash()), layerCompression) _, err = io.Copy(cs, ts) if err != nil { return fmt.Errorf("failed to copy to compressed stream: %w", err) } cs.Close() status, err := cw.Status() if err != nil { return err } desc.Size = status.Offset desc.Digest = dgstr.Digest() if err := cw.Commit(ctx, desc.Size, desc.Digest); err != nil && !cerrdefs.IsAlreadyExists(err) { return err } log.G(ctx).WithFields(log.Fields{ "t": time.Since(t1), "size": desc.Size, "digest": desc.Digest, }).Debug("Converted layer to content tar") ml.Lock() manifest.Layers[layerIndex] = desc ml.Unlock() return nil }) metadata, err := l.Metadata() if err != nil { return err } src, ok := metadata["UpperDir"] if !ok { src, ok = metadata["SourceDir"] if !ok { log.G(ctx).WithField("metadata", metadata).WithField("driver", lm.layers.DriverName()).Debug("no source directory metadata") return fmt.Errorf("graphdriver not supported: %w", cerrdefs.ErrNotImplemented) } } log.G(ctx).WithField("metadata", metadata).Debugf("migrating %s from %s", chainID, src) active := fmt.Sprintf("migration-%s", chainID) key := chainID.String() snapshotLabels := map[string]string{ "containerd.io/snapshot.ref": key, } mounts, err := sn.Prepare(ctx, active, parent, snapshots.WithLabels(snapshotLabels)) parent = key if err != nil { if cerrdefs.IsAlreadyExists(err) { continue } return err } dst, err := extractSource(mounts) if err != nil { return err } t1 := time.Now() if err := fs.CopyDir(dst, src); err != nil { return err } log.G(ctx).WithFields(log.Fields{ "t": time.Since(t1), "key": key, }).Debug("Copied layer to snapshot") if err := sn.Commit(ctx, key, active); err != nil && !cerrdefs.IsAlreadyExists(err) { return err } } configBytes := img.RawJSON() digest.FromBytes(configBytes) manifest.Config = ocispec.Descriptor{ MediaType: ocispec.MediaTypeImageConfig, Digest: digest.FromBytes(configBytes), Size: int64(len(configBytes)), } configLabels := map[string]string{ fmt.Sprintf("containerd.io/gc.ref.snapshot.%s", snKey): parent, } if err = content.WriteBlob(ctx, lm.content, "config"+manifest.Config.Digest.String(), bytes.NewReader(configBytes), manifest.Config, content.WithLabels(configLabels)); err != nil && !cerrdefs.IsAlreadyExists(err) { return err } if err := eg.Wait(); err != nil { return err } manifestBytes, err := json.MarshalIndent(manifest, "", " ") if err != nil { return err } manifestDesc := ocispec.Descriptor{ MediaType: manifest.MediaType, Digest: digest.FromBytes(manifestBytes), Size: int64(len(manifestBytes)), } manifestLabels := map[string]string{ "containerd.io/gc.ref.content.config": manifest.Config.Digest.String(), } for i := range manifest.Layers { manifestLabels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i)] = manifest.Layers[i].Digest.String() } if err = content.WriteBlob(ctx, lm.content, "manifest"+manifestDesc.Digest.String(), bytes.NewReader(manifestBytes), manifestDesc, content.WithLabels(manifestLabels)); err != nil && !cerrdefs.IsAlreadyExists(err) { return err } childrenHandler := images.ChildrenHandler(lm.content) childrenHandler = images.SetChildrenMappedLabels(lm.content, childrenHandler, nil) if err = images.Walk(ctx, childrenHandler, manifestDesc); err != nil { return err } var added bool for _, named := range lm.refs.References(digest.Digest(imgID)) { img := images.Image{ Name: named.String(), Target: manifestDesc, // TODO: Any labels? } img, err = lm.cis.Create(ctx, img) if err != nil && !cerrdefs.IsAlreadyExists(err) { return err } else if err != nil { log.G(ctx).Infof("Tag already exists: %s", named) continue } log.G(ctx).Infof("Migrated image %s to %s", img.Name, img.Target.Digest) added = true } if !added { img := images.Image{ Name: "moby-dangling@" + manifestDesc.Digest.String(), Target: manifestDesc, // TODO: Any labels? } img, err = lm.cis.Create(ctx, img) if err != nil && !cerrdefs.IsAlreadyExists(err) { return err } else if err == nil { log.G(ctx).Infof("Migrated image %s to %s", img.Name, img.Target.Digest) } } } return nil } func extractSource(mounts []mount.Mount) (string, error) { if len(mounts) != 1 { return "", fmt.Errorf("cannot support snapshotters with multiple mount sources: %w", cerrdefs.ErrNotImplemented) } switch mounts[0].Type { case "bind": return mounts[0].Source, nil case "overlay": for _, option := range mounts[0].Options { if strings.HasPrefix(option, "upperdir=") { return option[9:], nil } } default: return "", fmt.Errorf("mount type %q not supported: %w", mounts[0].Type, cerrdefs.ErrNotImplemented) } return "", fmt.Errorf("mount is missing upper option: %w", cerrdefs.ErrNotImplemented) }