package control
import (
"context"
stderrors "errors"
"fmt"
"runtime/trace"
"strconv"
"sync"
"sync/atomic"
"time"
contentapi "github.com/containerd/containerd/api/services/content/v1"
"github.com/containerd/containerd/v2/core/content"
"github.com/containerd/containerd/v2/plugins/services/content/contentserver"
"github.com/distribution/reference"
"github.com/mitchellh/hashstructure/v2"
controlapi "github.com/moby/buildkit/api/services/control"
apitypes "github.com/moby/buildkit/api/types"
"github.com/moby/buildkit/cache/remotecache"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/client/llb"
"github.com/moby/buildkit/cmd/buildkitd/config"
controlgateway "github.com/moby/buildkit/control/gateway"
"github.com/moby/buildkit/exporter"
"github.com/moby/buildkit/exporter/containerimage/exptypes"
"github.com/moby/buildkit/exporter/util/epoch"
"github.com/moby/buildkit/frontend"
"github.com/moby/buildkit/frontend/attestations"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/session/grpchijack"
containerdsnapshot "github.com/moby/buildkit/snapshot/containerd"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/solver/bboltcachestorage"
"github.com/moby/buildkit/solver/llbsolver"
"github.com/moby/buildkit/solver/llbsolver/cdidevices"
"github.com/moby/buildkit/solver/llbsolver/proc"
provenancetypes "github.com/moby/buildkit/solver/llbsolver/provenance/types"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/bklog"
"github.com/moby/buildkit/util/db"
"github.com/moby/buildkit/util/entitlements"
"github.com/moby/buildkit/util/imageutil"
"github.com/moby/buildkit/util/leaseutil"
"github.com/moby/buildkit/util/throttle"
"github.com/moby/buildkit/util/tracing/transform"
"github.com/moby/buildkit/version"
"github.com/moby/buildkit/worker"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
tracev1 "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)
type Opt struct {
SessionManager *session.Manager
WorkerController *worker.Controller
Frontends map[string]frontend.Frontend
CacheManager solver.CacheManager
ResolveCacheExporterFuncs map[string]remotecache.ResolveCacheExporterFunc
ResolveCacheImporterFuncs map[string]remotecache.ResolveCacheImporterFunc
Entitlements []string
TraceCollector sdktrace.SpanExporter
HistoryDB db.DB
CacheStore *bboltcachestorage.Store
LeaseManager *leaseutil.Manager
ContentStore *containerdsnapshot.Store
HistoryConfig *config.HistoryConfig
GarbageCollect func(context.Context) error
GracefulStop <-chan struct{}
ProvenanceEnv map[string]any
}
type Controller struct { // TODO: ControlService
// buildCount needs to be 64bit aligned
buildCount int64
opt Opt
solver *llbsolver.Solver
history *llbsolver.HistoryQueue
cache solver.CacheManager
gatewayForwarder *controlgateway.GatewayForwarder
throttledGC func()
throttledReleaseUnreferenced func()
gcmu sync.Mutex
tracev1.UnimplementedTraceServiceServer
}
func NewController(opt Opt) (*Controller, error) {
gatewayForwarder := controlgateway.NewGatewayForwarder()
hq, err := llbsolver.NewHistoryQueue(llbsolver.HistoryQueueOpt{
DB: opt.HistoryDB,
LeaseManager: opt.LeaseManager,
ContentStore: opt.ContentStore,
CleanConfig: opt.HistoryConfig,
GarbageCollect: opt.GarbageCollect,
GracefulStop: opt.GracefulStop,
})
if err != nil {
return nil, errors.Wrap(err, "failed to create history queue")
}
s, err := llbsolver.New(llbsolver.Opt{
WorkerController: opt.WorkerController,
Frontends: opt.Frontends,
CacheManager: opt.CacheManager,
CacheResolvers: opt.ResolveCacheImporterFuncs,
GatewayForwarder: gatewayForwarder,
SessionManager: opt.SessionManager,
Entitlements: opt.Entitlements,
HistoryQueue: hq,
ProvenanceEnv: opt.ProvenanceEnv,
})
if err != nil {
return nil, errors.Wrap(err, "failed to create solver")
}
c := &Controller{
opt: opt,
solver: s,
history: hq,
cache: opt.CacheManager,
gatewayForwarder: gatewayForwarder,
}
c.throttledGC = throttle.After(time.Minute, c.gc)
// use longer interval for releaseUnreferencedCache deleting links quickly is less important
c.throttledReleaseUnreferenced = throttle.After(5*time.Minute, func() { c.releaseUnreferencedCache(context.TODO()) })
defer func() {
time.AfterFunc(time.Second, c.throttledGC)
}()
return c, nil
}
func (c *Controller) Close() error {
var errs []error
if err := c.opt.HistoryDB.Close(); err != nil {
errs = append(errs, err)
}
if err := c.opt.WorkerController.Close(); err != nil {
errs = append(errs, err)
}
if err := c.opt.CacheStore.Close(); err != nil {
errs = append(errs, err)
}
if err := c.solver.Close(); err != nil {
errs = append(errs, err)
}
return stderrors.Join(errs...)
}
func (c *Controller) Register(server *grpc.Server) {
controlapi.RegisterControlServer(server, c)
c.gatewayForwarder.Register(server)
tracev1.RegisterTraceServiceServer(server, c)
store := &roContentStore{c.opt.ContentStore.WithFallbackNS(c.opt.ContentStore.Namespace() + "_history")}
contentapi.RegisterContentServer(server, contentserver.New(store))
}
func (c *Controller) DiskUsage(ctx context.Context, r *controlapi.DiskUsageRequest) (*controlapi.DiskUsageResponse, error) {
resp := &controlapi.DiskUsageResponse{}
workers, err := c.opt.WorkerController.List()
if err != nil {
return nil, err
}
for _, w := range workers {
du, err := w.DiskUsage(ctx, client.DiskUsageInfo{
Filter: r.Filter,
AgeLimit: time.Duration(r.AgeLimit),
})
if err != nil {
return nil, err
}
for _, r := range du {
resp.Record = append(resp.Record, &controlapi.UsageRecord{
// TODO: add worker info
ID: r.ID,
Mutable: r.Mutable,
InUse: r.InUse,
Size: r.Size,
Parents: r.Parents,
UsageCount: int64(r.UsageCount),
Description: r.Description,
CreatedAt: timestamppb.New(r.CreatedAt),
LastUsedAt: func() *timestamppb.Timestamp {
if r.LastUsedAt != nil {
return timestamppb.New(*r.LastUsedAt)
}
return nil
}(),
RecordType: string(r.RecordType),
Shared: r.Shared,
})
}
}
return resp, nil
}
func (c *Controller) releaseUnreferencedCache(ctx context.Context) error {
return c.cache.ReleaseUnreferenced(ctx)
}
func (c *Controller) Prune(req *controlapi.PruneRequest, stream controlapi.Control_PruneServer) error {
if atomic.LoadInt64(&c.buildCount) == 0 {
imageutil.CancelCacheLeases()
}
ch := make(chan client.UsageInfo, 32)
eg, ctx := errgroup.WithContext(stream.Context())
workers, err := c.opt.WorkerController.List()
if err != nil {
return errors.Wrap(err, "failed to list workers for prune")
}
didPrune := false
defer func() {
if didPrune {
if c, ok := c.cache.(interface {
ReleaseUnreferenced(context.Context) error
}); ok {
if err := c.ReleaseUnreferenced(ctx); err != nil {
bklog.G(ctx).Errorf("failed to release cache metadata: %+v", err)
}
}
}
}()
for _, w := range workers {
func(w worker.Worker) {
eg.Go(func() error {
return w.Prune(ctx, ch, client.PruneInfo{
Filter: req.Filter,
All: req.All,
KeepDuration: time.Duration(req.KeepDuration),
ReservedSpace: req.ReservedSpace,
MaxUsedSpace: req.MaxUsedSpace,
MinFreeSpace: req.MinFreeSpace,
})
})
}(w)
}
eg2, _ := errgroup.WithContext(stream.Context())
eg2.Go(func() error {
defer close(ch)
return eg.Wait()
})
eg2.Go(func() error {
defer func() {
// drain channel on error
for range ch {
}
}()
for r := range ch {
didPrune = true
if err := stream.Send(&controlapi.UsageRecord{
// TODO: add worker info
ID: r.ID,
Mutable: r.Mutable,
InUse: r.InUse,
Size: r.Size,
Parents: r.Parents,
UsageCount: int64(r.UsageCount),
Description: r.Description,
CreatedAt: timestamppb.New(r.CreatedAt),
LastUsedAt: func() *timestamppb.Timestamp {
if r.LastUsedAt != nil {
return timestamppb.New(*r.LastUsedAt)
}
return nil
}(),
RecordType: string(r.RecordType),
Shared: r.Shared,
}); err != nil {
return err
}
}
return nil
})
return eg2.Wait()
}
func (c *Controller) Export(ctx context.Context, req *tracev1.ExportTraceServiceRequest) (*tracev1.ExportTraceServiceResponse, error) {
if c.opt.TraceCollector == nil {
return nil, status.Errorf(codes.Unavailable, "trace collector not configured")
}
err := c.opt.TraceCollector.ExportSpans(ctx, transform.Spans(req.GetResourceSpans()))
if err != nil {
return nil, err
}
return &tracev1.ExportTraceServiceResponse{}, nil
}
func (c *Controller) ListenBuildHistory(req *controlapi.BuildHistoryRequest, srv controlapi.Control_ListenBuildHistoryServer) error {
if err := sendTimestampHeader(srv); err != nil {
return err
}
return c.history.Listen(srv.Context(), req, func(h *controlapi.BuildHistoryEvent) error {
if err := srv.Send(h); err != nil {
return err
}
return nil
})
}
func (c *Controller) UpdateBuildHistory(ctx context.Context, req *controlapi.UpdateBuildHistoryRequest) (*controlapi.UpdateBuildHistoryResponse, error) {
if req.Delete {
c.history.Finalize(ctx, req.Ref) // ignore error
err := c.history.Delete(ctx, req.Ref)
return &controlapi.UpdateBuildHistoryResponse{}, err
}
if req.Finalize {
err := c.history.Finalize(ctx, req.Ref)
return &controlapi.UpdateBuildHistoryResponse{}, err
}
err := c.history.UpdateRef(ctx, req.Ref, func(r *controlapi.BuildHistoryRecord) error {
if req.Pinned == r.Pinned {
return nil
}
r.Pinned = req.Pinned
return nil
})
return &controlapi.UpdateBuildHistoryResponse{}, err
}
func translateLegacySolveRequest(req *controlapi.SolveRequest) {
// translates ExportRef and ExportAttrs to new Exports (v0.4.0)
if legacyExportRef := req.Cache.ExportRefDeprecated; legacyExportRef != "" {
ex := &controlapi.CacheOptionsEntry{
Type: "registry",
Attrs: req.Cache.ExportAttrsDeprecated,
}
if ex.Attrs == nil {
ex.Attrs = make(map[string]string)
}
ex.Attrs["ref"] = legacyExportRef
// FIXME(AkihiroSuda): skip append if already exists
req.Cache.Exports = append(req.Cache.Exports, ex)
req.Cache.ExportRefDeprecated = ""
req.Cache.ExportAttrsDeprecated = nil
}
// translates ImportRefs to new Imports (v0.4.0)
for _, legacyImportRef := range req.Cache.ImportRefsDeprecated {
im := &controlapi.CacheOptionsEntry{
Type: "registry",
Attrs: map[string]string{"ref": legacyImportRef},
}
// FIXME(AkihiroSuda): skip append if already exists
req.Cache.Imports = append(req.Cache.Imports, im)
}
req.Cache.ImportRefsDeprecated = nil
// translate single exporter to a slice (v0.13.0)
if len(req.Exporters) == 0 && req.ExporterDeprecated != "" {
req.Exporters = append(req.Exporters, &controlapi.Exporter{
Type: req.ExporterDeprecated,
Attrs: req.ExporterAttrsDeprecated,
})
req.ExporterDeprecated = ""
req.ExporterAttrsDeprecated = nil
}
}
func (c *Controller) Solve(ctx context.Context, req *controlapi.SolveRequest) (*controlapi.SolveResponse, error) {
defer trace.StartRegion(ctx, "Solve").End()
trace.Logf(ctx, "Request", "solve request: %v", req.Ref)
atomic.AddInt64(&c.buildCount, 1)
defer atomic.AddInt64(&c.buildCount, -1)
if req.Cache == nil {
req.Cache = &controlapi.CacheOptions{} // make sure cache options are initialized
}
translateLegacySolveRequest(req)
defer func() {
time.AfterFunc(time.Second, c.throttledGC)
}()
// TODO: multiworker
// This is actually tricky, as the exporter should come from the worker that has the returned reference. We may need to delay this so that the solver loads this.
w, err := c.opt.WorkerController.GetDefault()
if err != nil {
return nil, err
}
// if SOURCE_DATE_EPOCH is set, enable it for the exporters
if v, ok := epoch.ParseBuildArgs(req.FrontendAttrs); ok {
for _, ex := range req.Exporters {
if _, ok := ex.Attrs[string(exptypes.OptKeySourceDateEpoch)]; !ok {
if ex.Attrs == nil {
ex.Attrs = make(map[string]string)
}
ex.Attrs[string(exptypes.OptKeySourceDateEpoch)] = v
}
}
}
var expis []exporter.ExporterInstance
for i, ex := range req.Exporters {
exp, err := w.Exporter(ex.Type, c.opt.SessionManager)
if err != nil {
return nil, err
}
bklog.G(ctx).Debugf("resolve exporter %s with %v", ex.Type, ex.Attrs)
expi, err := exp.Resolve(ctx, i, ex.Attrs)
if err != nil {
return nil, err
}
expis = append(expis, expi)
}
if c, err := findDuplicateCacheOptions(req.Cache.Exports); err != nil {
return nil, err
} else if c != nil {
types := []string{}
for _, c := range c {
types = append(types, c.Type)
}
return nil, errors.Errorf("duplicate cache exports %s", types)
}
var cacheExporters []llbsolver.RemoteCacheExporter
for _, e := range req.Cache.Exports {
cacheExporterFunc, ok := c.opt.ResolveCacheExporterFuncs[e.Type]
if !ok {
return nil, errors.Errorf("unknown cache exporter: %q", e.Type)
}
var exp llbsolver.RemoteCacheExporter
exp.Exporter, err = cacheExporterFunc(ctx, session.NewGroup(req.Session), e.Attrs)
if err != nil {
return nil, errors.Wrapf(err, "failed to configure %v cache exporter", e.Type)
}
if exp.Exporter == nil {
bklog.G(ctx).Debugf("cache exporter resolver for %v returned nil, skipping exporter", e.Type)
continue
}
if exportMode, supported := parseCacheExportMode(e.Attrs["mode"]); !supported {
bklog.G(ctx).Debugf("skipping invalid cache export mode: %s", e.Attrs["mode"])
} else {
exp.CacheExportMode = exportMode
}
if ignoreErrorStr, ok := e.Attrs["ignore-error"]; ok {
if ignoreError, supported := parseCacheExportIgnoreError(ignoreErrorStr); !supported {
bklog.G(ctx).Debugf("skipping invalid cache export ignore-error: %s", e.Attrs["ignore-error"])
} else {
exp.IgnoreError = ignoreError
}
}
cacheExporters = append(cacheExporters, exp)
}
var cacheImports []frontend.CacheOptionsEntry
for _, im := range req.Cache.Imports {
if im == nil {
continue
}
cacheImports = append(cacheImports, frontend.CacheOptionsEntry{
Type: im.Type,
Attrs: im.Attrs,
})
}
attests, err := attestations.Parse(req.FrontendAttrs)
if err != nil {
return nil, err
}
var procs []llbsolver.Processor
if attrs, ok := attests["sbom"]; ok {
var ref reference.Named
params := make(map[string]string)
for k, v := range attrs {
if k == "generator" {
if v == "" {
return nil, errors.Errorf("sbom generator cannot be empty")
}
ref, err = reference.ParseNormalizedNamed(v)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse sbom generator %s", v)
}
ref = reference.TagNameOnly(ref)
} else {
params[k] = v
}
}
useCache := true
if v, ok := req.FrontendAttrs["no-cache"]; ok && v == "" {
// disable cache if cache is disabled for all stages
useCache = false
}
resolveMode := llb.ResolveModeDefault.String()
if v, ok := req.FrontendAttrs["image-resolve-mode"]; ok {
resolveMode = v
}
procs = append(procs, proc.SBOMProcessor(ref.String(), useCache, resolveMode, params))
}
if attrs, ok := attests["provenance"]; ok {
var slsaVersion provenancetypes.ProvenanceSLSA
params := make(map[string]string)
for k, v := range attrs {
if k == "version" {
slsaVersion = provenancetypes.ProvenanceSLSA(v)
if err := slsaVersion.Validate(); err != nil {
return nil, err
}
} else {
params[k] = v
}
}
procs = append(procs, proc.ProvenanceProcessor(slsaVersion, params, c.opt.ProvenanceEnv))
}
resp, err := c.solver.Solve(ctx, req.Ref, req.Session, frontend.SolveRequest{
Frontend: req.Frontend,
Definition: req.Definition,
FrontendOpt: req.FrontendAttrs,
FrontendInputs: req.FrontendInputs,
CacheImports: cacheImports,
}, llbsolver.ExporterRequest{
Exporters: expis,
CacheExporters: cacheExporters,
EnableSessionExporter: req.EnableSessionExporter,
}, entitlementsFromPB(req.Entitlements), procs, req.Internal, req.SourcePolicy)
if err != nil {
return nil, err
}
return &controlapi.SolveResponse{
ExporterResponse: resp.ExporterResponse,
}, nil
}
func (c *Controller) Status(req *controlapi.StatusRequest, stream controlapi.Control_StatusServer) error {
if err := sendTimestampHeader(stream); err != nil {
return err
}
ch := make(chan *client.SolveStatus, 8)
eg, ctx := errgroup.WithContext(stream.Context())
eg.Go(func() error {
return c.solver.Status(ctx, req.Ref, ch)
})
eg.Go(func() error {
defer func() {
// drain channel on error
for range ch {
}
}()
for {
ss, ok := <-ch
if !ok {
return nil
}
for _, sr := range ss.Marshal() {
if err := stream.SendMsg(sr); err != nil {
return err
}
}
}
})
return eg.Wait()
}
func (c *Controller) Session(stream controlapi.Control_SessionServer) error {
bklog.G(stream.Context()).Debugf("session started")
conn, closeCh, opts := grpchijack.Hijack(stream)
defer conn.Close()
ctx, cancel := context.WithCancelCause(stream.Context())
go func() {
<-closeCh
cancel(errors.WithStack(context.Canceled))
}()
err := c.opt.SessionManager.HandleConn(ctx, conn, opts)
bklog.G(ctx).Debugf("session finished: %v", err)
return err
}
func (c *Controller) ListWorkers(ctx context.Context, r *controlapi.ListWorkersRequest) (*controlapi.ListWorkersResponse, error) {
resp := &controlapi.ListWorkersResponse{}
workers, err := c.opt.WorkerController.List(r.Filter...)
if err != nil {
return nil, err
}
for _, w := range workers {
resp.Record = append(resp.Record, &apitypes.WorkerRecord{
ID: w.ID(),
Labels: w.Labels(),
Platforms: pb.PlatformsFromSpec(w.Platforms(true)),
GCPolicy: toPBGCPolicy(w.GCPolicy()),
BuildkitVersion: toPBBuildkitVersion(w.BuildkitVersion()),
CDIDevices: toPBCDIDevices(w.CDIManager()),
})
}
return resp, nil
}
func (c *Controller) Info(ctx context.Context, r *controlapi.InfoRequest) (*controlapi.InfoResponse, error) {
return &controlapi.InfoResponse{
BuildkitVersion: &apitypes.BuildkitVersion{
Package: version.Package,
Version: version.Version,
Revision: version.Revision,
},
}, nil
}
func (c *Controller) gc() {
c.gcmu.Lock()
defer c.gcmu.Unlock()
workers, err := c.opt.WorkerController.List()
if err != nil {
return
}
eg, ctx := errgroup.WithContext(context.TODO())
var size int64
ch := make(chan client.UsageInfo)
done := make(chan struct{})
go func() {
for ui := range ch {
size += ui.Size
}
close(done)
}()
for _, w := range workers {
eg.Go(func() error {
if policy := w.GCPolicy(); len(policy) > 0 {
return w.Prune(ctx, ch, policy...)
}
return nil
})
}
err = eg.Wait()
close(ch)
if err != nil {
bklog.G(ctx).Errorf("gc error: %+v", err)
}
<-done
if size > 0 {
bklog.G(ctx).Debugf("gc cleaned up %d bytes", size)
go c.throttledReleaseUnreferenced()
}
}
func parseCacheExportMode(mode string) (solver.CacheExportMode, bool) {
switch mode {
case "min":
return solver.CacheExportModeMin, true
case "max":
return solver.CacheExportModeMax, true
}
return solver.CacheExportModeMin, false
}
func parseCacheExportIgnoreError(ignoreErrorStr string) (bool, bool) {
ignoreError, err := strconv.ParseBool(ignoreErrorStr)
if err != nil {
return false, false
}
return ignoreError, true
}
func toPBGCPolicy(in []client.PruneInfo) []*apitypes.GCPolicy {
policy := make([]*apitypes.GCPolicy, 0, len(in))
for _, p := range in {
policy = append(policy, &apitypes.GCPolicy{
All: p.All,
Filters: p.Filter,
KeepDuration: int64(p.KeepDuration),
ReservedSpace: p.ReservedSpace,
MaxUsedSpace: p.MaxUsedSpace,
MinFreeSpace: p.MinFreeSpace,
})
}
return policy
}
func toPBBuildkitVersion(in client.BuildkitVersion) *apitypes.BuildkitVersion {
return &apitypes.BuildkitVersion{
Package: in.Package,
Version: in.Version,
Revision: in.Revision,
}
}
func toPBCDIDevices(manager *cdidevices.Manager) []*apitypes.CDIDevice {
if manager == nil {
return nil
}
devs := manager.ListDevices()
out := make([]*apitypes.CDIDevice, 0, len(devs))
for _, dev := range devs {
out = append(out, &apitypes.CDIDevice{
Name: dev.Name,
AutoAllow: dev.AutoAllow,
Annotations: dev.Annotations,
OnDemand: dev.OnDemand,
})
}
return out
}
func findDuplicateCacheOptions(cacheOpts []*controlapi.CacheOptionsEntry) ([]*controlapi.CacheOptionsEntry, error) {
seen := map[string]*controlapi.CacheOptionsEntry{}
duplicate := map[string]struct{}{}
for _, opt := range cacheOpts {
k, err := cacheOptKey(opt)
if err != nil {
return nil, err
}
if _, ok := seen[k]; ok {
duplicate[k] = struct{}{}
}
seen[k] = opt
}
var duplicates []*controlapi.CacheOptionsEntry
for k := range duplicate {
duplicates = append(duplicates, seen[k])
}
return duplicates, nil
}
func cacheOptKey(opt *controlapi.CacheOptionsEntry) (string, error) {
if opt.Type == "registry" && opt.Attrs["ref"] != "" {
return opt.Attrs["ref"], nil
}
rawOpt := struct {
Type string
Attrs map[string]string
}{
Type: opt.Type,
Attrs: opt.Attrs,
}
hash, err := hashstructure.Hash(rawOpt, hashstructure.FormatV2, nil)
if err != nil {
return "", err
}
return fmt.Sprint(opt.Type, ":", hash), nil
}
type roContentStore struct {
content.Store
}
func (cs *roContentStore) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
return nil, errors.Errorf("read-only content store")
}
func (cs *roContentStore) Delete(ctx context.Context, dgst digest.Digest) error {
return errors.Errorf("read-only content store")
}
func (cs *roContentStore) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) {
return content.Info{}, errors.Errorf("read-only content store")
}
func (cs *roContentStore) Abort(ctx context.Context, ref string) error {
return errors.Errorf("read-only content store")
}
const timestampKey = "buildkit-current-timestamp"
func sendTimestampHeader(srv grpc.ServerStream) error {
return srv.SendHeader(metadata.Pairs(timestampKey, time.Now().Format(time.RFC3339Nano)))
}
func entitlementsFromPB(elems []string) []entitlements.Entitlement {
clone := make([]entitlements.Entitlement, len(elems))
for i, e := range elems {
clone[i] = entitlements.Entitlement(e)
}
return clone
}