package llbsolver
import (
"context"
"fmt"
"sync"
"time"
"github.com/containerd/platforms"
"github.com/mitchellh/hashstructure/v2"
"github.com/moby/buildkit/cache/remotecache"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/client/llb/sourceresolver"
"github.com/moby/buildkit/executor"
resourcestypes "github.com/moby/buildkit/executor/resources/types"
"github.com/moby/buildkit/frontend"
gw "github.com/moby/buildkit/frontend/gateway/client"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/solver/errdefs"
llberrdefs "github.com/moby/buildkit/solver/llbsolver/errdefs"
"github.com/moby/buildkit/solver/llbsolver/provenance"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/sourcepolicy"
spb "github.com/moby/buildkit/sourcepolicy/pb"
"github.com/moby/buildkit/util/bklog"
"github.com/moby/buildkit/util/entitlements"
"github.com/moby/buildkit/util/flightcontrol"
"github.com/moby/buildkit/util/progress"
"github.com/moby/buildkit/worker"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
)
type llbBridge struct {
builder solver.Builder
frontends map[string]frontend.Frontend
resolveWorker func() (worker.Worker, error)
eachWorker func(func(worker.Worker) error) error
resolveCacheImporterFuncs map[string]remotecache.ResolveCacheImporterFunc
cms map[string]solver.CacheManager
cmsMu sync.Mutex
sm *session.Manager
executorOnce sync.Once
executorErr error
executor executor.Executor
}
func (b *llbBridge) Warn(ctx context.Context, dgst digest.Digest, msg string, opts frontend.WarnOpts) error {
return b.builder.InContext(ctx, func(ctx context.Context, g session.Group) error {
pw, ok, _ := progress.NewFromContext(ctx, progress.WithMetadata("vertex", dgst))
if !ok {
return nil
}
level := opts.Level
if level == 0 {
level = 1
}
pw.Write(identity.NewID(), client.VertexWarning{
Vertex: dgst,
Level: level,
Short: []byte(msg),
SourceInfo: opts.SourceInfo,
Range: opts.Range,
Detail: opts.Detail,
URL: opts.URL,
})
return pw.Close()
})
}
func (b *llbBridge) loadResult(ctx context.Context, def *pb.Definition, cacheImports []gw.CacheOptionsEntry, pol []*spb.Policy) (solver.CachedResultWithProvenance, error) {
w, err := b.resolveWorker()
if err != nil {
return nil, err
}
ent, err := loadEntitlements(b.builder)
if err != nil {
return nil, err
}
srcPol, err := loadSourcePolicy(b.builder)
if err != nil {
return nil, err
}
var polEngine SourcePolicyEvaluator
if srcPol != nil || len(pol) > 0 {
for _, p := range pol {
if p == nil {
return nil, errors.Errorf("invalid nil policy")
}
if err := validateSourcePolicy(p); err != nil {
return nil, err
}
}
if srcPol != nil {
pol = append([]*spb.Policy{srcPol}, pol...)
}
polEngine = sourcepolicy.NewEngine(pol)
}
var cms []solver.CacheManager
for _, im := range cacheImports {
cmID, err := cmKey(im)
if err != nil {
return nil, err
}
b.cmsMu.Lock()
var cm solver.CacheManager
if prevCm, ok := b.cms[cmID]; !ok {
func(cmID string, im gw.CacheOptionsEntry) {
cm = newLazyCacheManager(cmID, func() (solver.CacheManager, error) {
var cmNew solver.CacheManager
if err := inBuilderContext(context.TODO(), b.builder, "importing cache manifest from "+cmID, "", func(ctx context.Context, g session.Group) error {
resolveCI, ok := b.resolveCacheImporterFuncs[im.Type]
if !ok {
return errors.Errorf("unknown cache importer: %s", im.Type)
}
ci, desc, err := resolveCI(ctx, g, im.Attrs)
if err != nil {
return errors.Wrapf(err, "failed to configure %v cache importer", im.Type)
}
cmNew, err = ci.Resolve(ctx, desc, cmID, w)
return err
}); err != nil {
bklog.G(ctx).Debugf("error while importing cache manifest from cmId=%s: %v", cmID, err)
return nil, err
}
return cmNew, nil
})
}(cmID, im)
b.cms[cmID] = cm
} else {
cm = prevCm
}
cms = append(cms, cm)
b.cmsMu.Unlock()
}
dpc := &detectPrunedCacheID{}
edge, err := Load(ctx, def, polEngine, dpc.Load, ValidateEntitlements(ent, w.CDIManager()), WithCacheSources(cms), NormalizeRuntimePlatforms(), WithValidateCaps())
if err != nil {
return nil, errors.Wrap(err, "failed to load LLB")
}
if len(dpc.ids) > 0 {
if err := b.eachWorker(func(w worker.Worker) error {
return w.PruneCacheMounts(ctx, dpc.ids)
}); err != nil {
return nil, err
}
}
res, err := b.builder.Build(ctx, edge)
if err != nil {
return nil, err
}
return res, nil
}
func (b *llbBridge) validateEntitlements(p executor.ProcessInfo) error {
ent, err := loadEntitlements(b.builder)
if err != nil {
return err
}
v := entitlements.Values{
NetworkHost: p.Meta.NetMode == pb.NetMode_HOST,
SecurityInsecure: p.Meta.SecurityMode == pb.SecurityMode_INSECURE,
}
return ent.Check(v)
}
func (b *llbBridge) Run(ctx context.Context, id string, rootfs executor.Mount, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (resourcestypes.Recorder, error) {
if err := b.validateEntitlements(process); err != nil {
return nil, err
}
if err := b.loadExecutor(); err != nil {
return nil, err
}
return b.executor.Run(ctx, id, rootfs, mounts, process, started)
}
func (b *llbBridge) Exec(ctx context.Context, id string, process executor.ProcessInfo) error {
if err := b.validateEntitlements(process); err != nil {
return err
}
if err := b.loadExecutor(); err != nil {
return err
}
return b.executor.Exec(ctx, id, process)
}
func (b *llbBridge) loadExecutor() error {
b.executorOnce.Do(func() {
w, err := b.resolveWorker()
if err != nil {
b.executorErr = err
return
}
b.executor = w.Executor()
})
return b.executorErr
}
type resultProxy struct {
id string
b *provenanceBridge
req frontend.SolveRequest
g flightcontrol.Group[solver.CachedResult]
mu sync.Mutex
released bool
v solver.CachedResult
err error
errResults []solver.Result
provenance *provenance.Capture
}
func newResultProxy(b *provenanceBridge, req frontend.SolveRequest) *resultProxy {
return &resultProxy{req: req, b: b, id: identity.NewID()}
}
func (rp *resultProxy) ID() string {
return rp.id
}
func (rp *resultProxy) Definition() *pb.Definition {
return rp.req.Definition
}
func (rp *resultProxy) Provenance() any {
if rp.provenance == nil {
return nil
}
return rp.provenance
}
func (rp *resultProxy) Release(ctx context.Context) (err error) {
rp.mu.Lock()
defer rp.mu.Unlock()
for _, res := range rp.errResults {
rerr := res.Release(ctx)
if rerr != nil {
err = rerr
}
}
if rp.v != nil {
if rp.released {
bklog.G(ctx).Warnf("release of already released result")
}
rerr := rp.v.Release(ctx)
if err != nil {
return rerr
}
}
rp.released = true
return
}
func (rp *resultProxy) wrapError(err error) error {
if err == nil {
return nil
}
var ve *errdefs.VertexError
if errors.As(err, &ve) {
if rp.req.Definition.Source != nil {
locs, ok := rp.req.Definition.Source.Locations[ve.Digest]
if ok {
for _, loc := range locs.Locations {
err = errdefs.WithSource(err, &errdefs.Source{
Info: rp.req.Definition.Source.Infos[loc.SourceIndex],
Ranges: loc.Ranges,
})
}
}
}
}
return err
}
func (rp *resultProxy) loadResult(ctx context.Context) (solver.CachedResultWithProvenance, error) {
res, err := rp.b.loadResult(ctx, rp.req.Definition, rp.req.CacheImports, rp.req.SourcePolicies)
var ee *llberrdefs.ExecError
if errors.As(err, &ee) {
ee.EachRef(func(res solver.Result) error {
rp.errResults = append(rp.errResults, res)
return nil
})
// acquire ownership so ExecError finalizer doesn't attempt to release as well
ee.OwnerBorrowed = true
}
return res, err
}
func (rp *resultProxy) Result(ctx context.Context) (res solver.CachedResult, err error) {
defer func() {
err = rp.wrapError(err)
}()
return rp.g.Do(ctx, "result", func(ctx context.Context) (solver.CachedResult, error) {
rp.mu.Lock()
if rp.released {
rp.mu.Unlock()
return nil, errors.Errorf("accessing released result")
}
if rp.v != nil || rp.err != nil {
rp.mu.Unlock()
return rp.v, rp.err
}
rp.mu.Unlock()
v, err := rp.loadResult(ctx)
if err != nil {
select {
case <-ctx.Done():
if errdefs.IsCanceled(ctx, err) {
return v, err
}
default:
}
}
rp.mu.Lock()
if rp.released {
if v != nil {
v.Release(context.TODO())
}
rp.mu.Unlock()
return nil, errors.Errorf("evaluating released result")
}
if err == nil {
var capture *provenance.Capture
capture, err = captureProvenance(ctx, v)
if err != nil {
err = errors.Errorf("failed to capture provenance: %v", err)
v.Release(context.TODO())
v = nil
}
rp.provenance = capture
}
rp.v = v
rp.err = err
rp.mu.Unlock()
return v, err
})
}
func (b *llbBridge) ResolveSourceMetadata(ctx context.Context, op *pb.SourceOp, opt sourceresolver.Opt) (resp *sourceresolver.MetaResponse, err error) {
w, err := b.resolveWorker()
if err != nil {
return nil, err
}
if opt.LogName == "" {
// TODO: better name
opt.LogName = fmt.Sprintf("resolve image config for %s", op.Identifier)
}
id := op.Identifier
if opt.Platform != nil {
id += platforms.FormatAll(*opt.Platform)
} else {
id += platforms.FormatAll(platforms.DefaultSpec())
}
pol, err := loadSourcePolicy(b.builder)
if err != nil {
return nil, err
}
if pol != nil {
opt.SourcePolicies = append(opt.SourcePolicies, pol)
}
if _, err := sourcepolicy.NewEngine(opt.SourcePolicies).Evaluate(ctx, op); err != nil {
return nil, errors.Wrap(err, "could not resolve image due to policy")
}
// policy is evaluated, so we can remove it from the options
opt.SourcePolicies = nil
err = inBuilderContext(ctx, b.builder, opt.LogName, id, func(ctx context.Context, g session.Group) error {
resp, err = w.ResolveSourceMetadata(ctx, op, opt, b.sm, g)
return err
})
if err != nil {
return nil, err
}
return resp, nil
}
type lazyCacheManager struct {
id string
main solver.CacheManager
waitCh chan struct{}
err error
}
func (lcm *lazyCacheManager) ID() string {
return lcm.id
}
func (lcm *lazyCacheManager) Query(inp []solver.CacheKeyWithSelector, inputIndex solver.Index, dgst digest.Digest, outputIndex solver.Index) ([]*solver.CacheKey, error) {
lcm.wait()
if lcm.main == nil {
return nil, nil
}
return lcm.main.Query(inp, inputIndex, dgst, outputIndex)
}
func (lcm *lazyCacheManager) Records(ctx context.Context, ck *solver.CacheKey) ([]*solver.CacheRecord, error) {
lcm.wait()
if lcm.main == nil {
return nil, nil
}
return lcm.main.Records(ctx, ck)
}
func (lcm *lazyCacheManager) Load(ctx context.Context, rec *solver.CacheRecord) (solver.Result, error) {
if err := lcm.wait(); err != nil {
return nil, err
}
return lcm.main.Load(ctx, rec)
}
func (lcm *lazyCacheManager) Save(key *solver.CacheKey, s solver.Result, createdAt time.Time) (*solver.ExportableCacheKey, error) {
if err := lcm.wait(); err != nil {
return nil, err
}
return lcm.main.Save(key, s, createdAt)
}
func (lcm *lazyCacheManager) ReleaseUnreferenced(ctx context.Context) error {
if err := lcm.wait(); err != nil {
return err
}
return lcm.main.ReleaseUnreferenced(ctx)
}
func (lcm *lazyCacheManager) wait() error {
<-lcm.waitCh
return lcm.err
}
func newLazyCacheManager(id string, fn func() (solver.CacheManager, error)) solver.CacheManager {
lcm := &lazyCacheManager{id: id, waitCh: make(chan struct{})}
go func() {
defer close(lcm.waitCh)
cm, err := fn()
if err != nil {
lcm.err = err
return
}
lcm.main = cm
}()
return lcm
}
func cmKey(im gw.CacheOptionsEntry) (string, error) {
if im.Type == "registry" && im.Attrs["ref"] != "" {
return im.Attrs["ref"], nil
}
i, err := hashstructure.Hash(im, hashstructure.FormatV2, nil)
if err != nil {
return "", err
}
return fmt.Sprintf("%s:%d", im.Type, i), nil
}