package pull import ( "context" "sync" "github.com/containerd/containerd/v2/core/content" "github.com/containerd/containerd/v2/core/images" "github.com/containerd/containerd/v2/core/remotes" "github.com/containerd/containerd/v2/core/remotes/docker" "github.com/containerd/containerd/v2/pkg/labels" "github.com/containerd/containerd/v2/pkg/reference" cerrdefs "github.com/containerd/errdefs" "github.com/containerd/platforms" "github.com/moby/buildkit/session" "github.com/moby/buildkit/util/contentutil" "github.com/moby/buildkit/util/flightcontrol" "github.com/moby/buildkit/util/imageutil" "github.com/moby/buildkit/util/progress/logs" "github.com/moby/buildkit/util/resolver/limited" "github.com/moby/buildkit/util/resolver/retryhandler" digest "github.com/opencontainers/go-digest" ocispecs "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" ) type SessionResolver func(g session.Group) remotes.Resolver type Puller struct { ContentStore content.Store Resolver remotes.Resolver Src reference.Spec Platform ocispecs.Platform g flightcontrol.Group[struct{}] resolveErr error resolveDone bool desc ocispecs.Descriptor configDesc ocispecs.Descriptor ref string layers []ocispecs.Descriptor nonlayers []ocispecs.Descriptor } var _ content.Provider = &provider{} type PulledManifests struct { Ref string MainManifestDesc ocispecs.Descriptor ConfigDesc ocispecs.Descriptor Nonlayers []ocispecs.Descriptor Descriptors []ocispecs.Descriptor Provider func(session.Group) content.Provider } func (p *Puller) resolve(ctx context.Context, resolver remotes.Resolver) error { _, err := p.g.Do(ctx, "", func(ctx context.Context) (_ struct{}, err error) { if p.resolveErr != nil || p.resolveDone { return struct{}{}, p.resolveErr } defer func() { if !errors.Is(err, context.Canceled) { p.resolveErr = err } }() if p.tryLocalResolve(ctx) == nil { return } ref, desc, err := resolver.Resolve(ctx, p.Src.String()) if err != nil { return struct{}{}, err } p.desc = desc p.ref = ref p.resolveDone = true return struct{}{}, nil }) return err } func (p *Puller) tryLocalResolve(ctx context.Context) error { desc := ocispecs.Descriptor{ Digest: p.Src.Digest(), } if desc.Digest == "" { return errors.New("empty digest") } info, err := p.ContentStore.Info(ctx, desc.Digest) if err != nil { return err } if ok, err := contentutil.HasSource(info, p.Src); err != nil || !ok { return errors.Errorf("no matching source") } desc.Size = info.Size p.ref = p.Src.String() ra, err := p.ContentStore.ReaderAt(ctx, desc) if err != nil { return err } mt, err := imageutil.DetectManifestMediaType(ra) if err != nil { return err } desc.MediaType = mt p.desc = desc return nil } func (p *Puller) PullManifests(ctx context.Context, getResolver SessionResolver) (*PulledManifests, error) { err := p.resolve(ctx, p.Resolver) if err != nil { return nil, err } platform := platforms.Only(p.Platform) var mu sync.Mutex // images.Dispatch calls handlers in parallel metadata := make(map[digest.Digest]ocispecs.Descriptor) // TODO: need a wrapper snapshot interface that combines content // and snapshots as 1) buildkit shouldn't have a dependency on contentstore // or 2) cachemanager should manage the contentstore var handlers []images.Handler fetcher, err := p.Resolver.Fetcher(ctx, p.ref) if err != nil { return nil, err } if p.desc.MediaType == images.MediaTypeDockerSchema1Manifest { errMsg := "support Docker Image manifest version 2, schema 1 has been removed. " + "More information at https://docs.docker.com/go/deprecated-image-specs/" return nil, errors.WithStack(cerrdefs.ErrConflict.WithMessage(errMsg)) } // Get all the children for a descriptor childrenHandler := images.ChildrenHandler(p.ContentStore) // Filter the children by the platform childrenHandler = images.FilterPlatforms(childrenHandler, platform) // Limit manifests pulled to the best match in an index childrenHandler = images.LimitManifests(childrenHandler, platform, 1) dslHandler, err := docker.AppendDistributionSourceLabel(p.ContentStore, p.ref) if err != nil { return nil, err } handlers = append(handlers, filterLayerBlobs(metadata, &mu), retryhandler.New(limited.FetchHandler(p.ContentStore, fetcher, p.ref), logs.LoggerFromContext(ctx)), childrenHandler, dslHandler, ) if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, p.desc); err != nil { return nil, err } for _, desc := range metadata { p.nonlayers = append(p.nonlayers, desc) switch desc.MediaType { case images.MediaTypeDockerSchema2Config, ocispecs.MediaTypeImageConfig: p.configDesc = desc } } // split all pulled data to layers and rest. layers remain roots and are deleted with snapshots. rest will be linked to layers. p.layers, err = getLayers(ctx, p.ContentStore, p.desc, platform) if err != nil { return nil, err } return &PulledManifests{ Ref: p.ref, MainManifestDesc: p.desc, ConfigDesc: p.configDesc, Nonlayers: p.nonlayers, Descriptors: p.layers, Provider: func(g session.Group) content.Provider { return &provider{puller: p, resolver: getResolver(g)} }, }, nil } type provider struct { puller *Puller resolver remotes.Resolver } func (p *provider) ReaderAt(ctx context.Context, desc ocispecs.Descriptor) (content.ReaderAt, error) { err := p.puller.resolve(ctx, p.resolver) if err != nil { return nil, err } fetcher, err := p.resolver.Fetcher(ctx, p.puller.ref) if err != nil { return nil, err } return contentutil.FromFetcher(fetcher).ReaderAt(ctx, desc) } // filterLayerBlobs causes layer blobs to be skipped for fetch, which is required to support lazy blobs. // It also stores the non-layer blobs (metadata) it encounters in the provided map. func filterLayerBlobs(metadata map[digest.Digest]ocispecs.Descriptor, mu sync.Locker) images.HandlerFunc { return func(ctx context.Context, desc ocispecs.Descriptor) ([]ocispecs.Descriptor, error) { switch desc.MediaType { case ocispecs.MediaTypeImageLayer, ocispecs.MediaTypeImageLayerNonDistributable, //nolint:staticcheck // ignore SA1019: Non-distributable layers are deprecated, and not recommended for future use. images.MediaTypeDockerSchema2Layer, images.MediaTypeDockerSchema2LayerForeign, ocispecs.MediaTypeImageLayerGzip, images.MediaTypeDockerSchema2LayerGzip, ocispecs.MediaTypeImageLayerNonDistributableGzip, //nolint:staticcheck // ignore SA1019: Non-distributable layers are deprecated, and not recommended for future use. images.MediaTypeDockerSchema2LayerForeignGzip, ocispecs.MediaTypeImageLayerZstd, ocispecs.MediaTypeImageLayerNonDistributableZstd: //nolint:staticcheck // ignore SA1019: Non-distributable layers are deprecated, and not recommended for future use. return nil, images.ErrSkipDesc default: if metadata != nil { mu.Lock() metadata[desc.Digest] = desc mu.Unlock() } } return nil, nil } } func getLayers(ctx context.Context, provider content.Provider, desc ocispecs.Descriptor, platform platforms.MatchComparer) ([]ocispecs.Descriptor, error) { manifest, err := images.Manifest(ctx, provider, desc, platform) if err != nil { return nil, errors.WithStack(err) } image := images.Image{Target: desc} diffIDs, err := image.RootFS(ctx, provider, platform) if err != nil { return nil, errors.Wrap(err, "failed to resolve rootfs") } if len(diffIDs) != len(manifest.Layers) { return nil, errors.Errorf("mismatched image rootfs and manifest layers %+v %+v", diffIDs, manifest.Layers) } layers := make([]ocispecs.Descriptor, len(diffIDs)) for i := range diffIDs { desc := manifest.Layers[i] if desc.Annotations == nil { desc.Annotations = map[string]string{} } desc.Annotations[labels.LabelUncompressed] = diffIDs[i].String() layers[i] = desc } return layers, nil }