package cache import ( "context" "fmt" "maps" "net/url" "slices" "strings" "github.com/containerd/containerd/v2/core/content" "github.com/containerd/containerd/v2/pkg/reference" cerrdefs "github.com/containerd/errdefs" "github.com/moby/buildkit/cache/config" "github.com/moby/buildkit/session" "github.com/moby/buildkit/solver" "github.com/moby/buildkit/util/bklog" "github.com/moby/buildkit/util/compression" "github.com/moby/buildkit/util/contentutil" "github.com/moby/buildkit/util/leaseutil" "github.com/moby/buildkit/util/progress/logs" "github.com/moby/buildkit/util/pull/pullprogress" digest "github.com/opencontainers/go-digest" ocispecs "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "golang.org/x/sync/errgroup" ) type Unlazier interface { Unlazy(ctx context.Context) error } // GetRemotes gets []*solver.Remote from content store for this ref (potentially pulling lazily). // Compressionopt can be used to specify the compression type of blobs. If Force is true, the compression // type is applied to all blobs in the chain. If Force is false, it's applied only to the newly created // layers. If all is true, all available chains that has the specified compression type of topmost blob are // appended to the result. // Note: Use WorkerRef.GetRemotes instead as moby integration requires custom GetRemotes implementation. func (sr *immutableRef) GetRemotes(ctx context.Context, createIfNeeded bool, refCfg config.RefConfig, all bool, s session.Group) ([]*solver.Remote, error) { ctx, done, err := leaseutil.WithLease(ctx, sr.cm.LeaseManager, leaseutil.MakeTemporary) if err != nil { return nil, err } defer done(context.WithoutCancel(ctx)) // fast path if compression variants aren't required // NOTE: compressionopt is applied only to *newly created layers* if Force != true. remote, err := sr.getRemote(ctx, createIfNeeded, refCfg, s) if err != nil { return nil, err } if !all || refCfg.Compression.Force || len(remote.Descriptors) == 0 { return []*solver.Remote{remote}, nil // early return if compression variants aren't required } // Search all available remotes that has the topmost blob with the specified // compression with all combination of compressions res := []*solver.Remote{remote} topmost, parentChain := remote.Descriptors[len(remote.Descriptors)-1], remote.Descriptors[:len(remote.Descriptors)-1] vDesc, err := getBlobWithCompression(ctx, sr.cm.ContentStore, topmost, refCfg.Compression.Type) if err != nil { return res, nil // compression variant doesn't exist. return the main blob only. } var variants []*solver.Remote if len(parentChain) == 0 { variants = append(variants, &solver.Remote{ Descriptors: []ocispecs.Descriptor{vDesc}, Provider: sr.cm.ContentStore, }) } else { // get parents with all combination of all available compressions. parents, err := getAvailableBlobs(ctx, sr.cm.ContentStore, &solver.Remote{ Descriptors: parentChain, Provider: remote.Provider, }) if err != nil { return nil, err } variants = appendRemote(parents, vDesc, sr.cm.ContentStore) } // Return the main remote and all its compression variants. // NOTE: Because compressionopt is applied only to *newly created layers* in the main remote (i.e. res[0]), // it's possible that the main remote doesn't contain any blobs of the compressionopt.Type. // The topmost blob of the variants (res[1:]) is guaranteed to be the compressionopt.Type. res = append(res, variants...) return res, nil } func appendRemote(parents []*solver.Remote, desc ocispecs.Descriptor, p content.InfoReaderProvider) (res []*solver.Remote) { for _, pRemote := range parents { provider := contentutil.NewMultiProvider(pRemote.Provider) provider.Add(desc.Digest, p) res = append(res, &solver.Remote{ Descriptors: append(pRemote.Descriptors, desc), Provider: provider, }) } return } func getAvailableBlobs(ctx context.Context, cs content.Store, chain *solver.Remote) ([]*solver.Remote, error) { if len(chain.Descriptors) == 0 { return nil, nil } target, parentChain := chain.Descriptors[len(chain.Descriptors)-1], chain.Descriptors[:len(chain.Descriptors)-1] parents, err := getAvailableBlobs(ctx, cs, &solver.Remote{ Descriptors: parentChain, Provider: chain.Provider, }) if err != nil { return nil, err } var descs []ocispecs.Descriptor if err := walkBlob(ctx, cs, target, func(desc ocispecs.Descriptor) bool { // Nothing prevents this function from being called multiple times for the same descriptor. // So we need to make sure we don't add the same descriptor again. // Looping over the list is preferable: // 1. to avoid using a map, which don't preserve the order of descriptors, // 2. descs will have a length the number of compression variants for a blob, which is usually very small if !slices.ContainsFunc(descs, func(d ocispecs.Descriptor) bool { return d.Digest == desc.Digest }) { descs = append(descs, desc) } return true }); err != nil { bklog.G(ctx).WithError(err).Warn("failed to walk variant blob") // is not a critical error at this moment. } var res []*solver.Remote for _, desc := range descs { if len(parents) == 0 { // bottommost ref res = append(res, &solver.Remote{ Descriptors: []ocispecs.Descriptor{desc}, Provider: cs, }) continue } res = append(res, appendRemote(parents, desc, cs)...) } if len(res) == 0 { // no available compression blobs for this blob. return the original blob. if len(parents) == 0 { // bottommost ref return []*solver.Remote{chain}, nil } return appendRemote(parents, target, chain.Provider), nil } return res, nil } func (sr *immutableRef) getRemote(ctx context.Context, createIfNeeded bool, refCfg config.RefConfig, s session.Group) (*solver.Remote, error) { err := sr.computeBlobChain(ctx, createIfNeeded, refCfg.Compression, s) if err != nil { return nil, err } chain := sr.layerChain() mproviderBase := contentutil.NewMultiProvider(nil) mprovider := &lazyMultiProvider{mprovider: mproviderBase} remote := &solver.Remote{ Provider: mprovider, } for _, ref := range chain { desc, err := ref.ociDesc(ctx, sr.descHandlers, refCfg.PreferNonDistributable) if err != nil { return nil, err } // NOTE: The media type might be missing for some migrated ones // from before lease based storage. If so, we should detect // the media type from blob data. // // Discussion: https://github.com/moby/buildkit/pull/1277#discussion_r352795429 if desc.MediaType == "" { desc.MediaType, err = compression.DetectLayerMediaType(ctx, sr.cm.ContentStore, desc.Digest, false) if err != nil { return nil, err } } // update distribution source annotation for lazy-refs (non-lazy refs // will already have their dsl stored in the content store, which is // used by the push handlers) var addAnnotations []string isLazy, err := ref.isLazy(ctx) if err != nil { return nil, err } else if isLazy { imageRefs := ref.getImageRefs() for _, imageRef := range imageRefs { refspec, err := reference.Parse(imageRef) if err != nil { return nil, err } u, err := url.Parse("dummy://" + refspec.Locator) if err != nil { return nil, err } source, repo := u.Hostname(), strings.TrimPrefix(u.Path, "/") if desc.Annotations == nil { desc.Annotations = make(map[string]string) } dslKey := fmt.Sprintf("%s.%s", "containerd.io/distribution.source", source) var existingRepos []string if existings, ok := desc.Annotations[dslKey]; ok { existingRepos = strings.Split(existings, ",") } if !slices.Contains(existingRepos, repo) { existingRepos = append(existingRepos, repo) } desc.Annotations[dslKey] = strings.Join(existingRepos, ",") addAnnotations = append(addAnnotations, dslKey) } } if refCfg.Compression.Force { if needs, err := refCfg.Compression.Type.NeedsConversion(ctx, sr.cm.ContentStore, desc); err != nil { return nil, err } else if needs { // ensure the compression type. // compressed blob must be created and stored in the content store. blobDesc, err := getBlobWithCompressionWithRetry(ctx, ref, refCfg.Compression, s) if err != nil { return nil, errors.Wrapf(err, "failed to get compression blob %q", refCfg.Compression.Type) } newDesc := desc newDesc.MediaType = blobDesc.MediaType newDesc.Digest = blobDesc.Digest newDesc.Size = blobDesc.Size newDesc.URLs = blobDesc.URLs newDesc.Annotations = nil if len(addAnnotations) > 0 || len(blobDesc.Annotations) > 0 { newDesc.Annotations = make(map[string]string) } for _, k := range addAnnotations { newDesc.Annotations[k] = desc.Annotations[k] } maps.Copy(newDesc.Annotations, blobDesc.Annotations) desc = newDesc } } remote.Descriptors = append(remote.Descriptors, desc) mprovider.Add(lazyRefProvider{ ref: ref, desc: desc, dh: sr.descHandlers[desc.Digest], session: s, }) } return remote, nil } func getBlobWithCompressionWithRetry(ctx context.Context, ref *immutableRef, comp compression.Config, s session.Group) (ocispecs.Descriptor, error) { if blobDesc, err := ref.getBlobWithCompression(ctx, comp.Type); err == nil { return blobDesc, nil } if err := ensureCompression(ctx, ref, comp, s); err != nil { return ocispecs.Descriptor{}, errors.Wrapf(err, "failed to get and ensure compression type of %q", comp.Type) } return ref.getBlobWithCompression(ctx, comp.Type) } type lazyMultiProvider struct { mprovider *contentutil.MultiProvider plist []lazyRefProvider } func (mp *lazyMultiProvider) Add(p lazyRefProvider) { mp.mprovider.Add(p.desc.Digest, p) mp.plist = append(mp.plist, p) } func (mp *lazyMultiProvider) ReaderAt(ctx context.Context, desc ocispecs.Descriptor) (content.ReaderAt, error) { return mp.mprovider.ReaderAt(ctx, desc) } func (mp *lazyMultiProvider) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) { return mp.mprovider.Info(ctx, dgst) } func (mp *lazyMultiProvider) Unlazy(ctx context.Context) error { eg, egctx := errgroup.WithContext(ctx) for _, p := range mp.plist { eg.Go(func() error { return p.Unlazy(egctx) }) } return eg.Wait() } type lazyRefProvider struct { ref *immutableRef desc ocispecs.Descriptor dh *DescHandler session session.Group } func (p lazyRefProvider) ReaderAt(ctx context.Context, desc ocispecs.Descriptor) (content.ReaderAt, error) { if desc.Digest != p.desc.Digest { return nil, cerrdefs.ErrNotFound } if err := p.Unlazy(ctx); err != nil { return nil, err } return p.ref.cm.ContentStore.ReaderAt(ctx, desc) } func (p lazyRefProvider) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) { if dgst != p.desc.Digest { return content.Info{}, cerrdefs.ErrNotFound } info, err := p.ref.cm.ContentStore.Info(ctx, dgst) if err == nil { return info, nil } if isLazy, err1 := p.ref.isLazy(ctx); err1 != nil { return content.Info{}, err1 } else if !isLazy { return content.Info{}, err } // for lazy records don't unlazy without read request return content.Info{ Digest: p.desc.Digest, Size: p.desc.Size, }, nil } func (p lazyRefProvider) Unlazy(ctx context.Context) error { _, err := p.ref.cm.unlazyG.Do(ctx, string(p.desc.Digest), func(ctx context.Context) (_ struct{}, rerr error) { if isLazy, err := p.ref.isLazy(ctx); err != nil { return struct{}{}, err } else if !isLazy { return struct{}{}, nil } defer func() { if rerr == nil { rerr = p.ref.linkBlob(ctx, p.desc) } }() if p.dh == nil { // shouldn't happen, if you have a lazy immutable ref it already should be validated // that descriptor handlers exist for it return struct{}{}, errors.New("unexpected nil descriptor handler") } if p.dh.Progress != nil { var stopProgress func(error) ctx, stopProgress = p.dh.Progress.Start(ctx) defer stopProgress(rerr) } // For now, just pull down the whole content and then return a ReaderAt from the local content // store. If efficient partial reads are desired in the future, something more like a "tee" // that caches remote partial reads to a local store may need to replace this. err := contentutil.Copy(ctx, p.ref.cm.ContentStore, &pullprogress.ProviderWithProgress{ Provider: p.dh.Provider(p.session), Manager: p.ref.cm.ContentStore, }, p.desc, p.dh.Ref, logs.LoggerFromContext(ctx)) if err != nil { return struct{}{}, err } if imageRefs := p.ref.getImageRefs(); len(imageRefs) > 0 { // just use the first image ref, it's arbitrary imageRef := imageRefs[0] if p.ref.GetDescription() == "" { if err := p.ref.SetDescription("pulled from " + imageRef); err != nil { return struct{}{}, err } } } return struct{}{}, nil }) return err }