package local
import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/cache/contenthash"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/session/filesync"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/source"
srctypes "github.com/moby/buildkit/source/types"
"github.com/moby/buildkit/util/bklog"
"github.com/moby/buildkit/util/cachedigest"
"github.com/moby/buildkit/util/progress"
"github.com/moby/patternmatcher"
"github.com/moby/sys/user"
"github.com/pkg/errors"
"github.com/tonistiigi/fsutil"
fstypes "github.com/tonistiigi/fsutil/types"
"golang.org/x/time/rate"
)
type Opt struct {
CacheAccessor cache.Accessor
}
func NewSource(opt Opt) (source.Source, error) {
ls := &localSource{
cm: opt.CacheAccessor,
}
return ls, nil
}
type localSource struct {
cm cache.Accessor
}
func (ls *localSource) Schemes() []string {
return []string{srctypes.LocalScheme}
}
func (ls *localSource) Identifier(scheme, ref string, attrs map[string]string, platform *pb.Platform) (source.Identifier, error) {
id, err := NewLocalIdentifier(ref)
if err != nil {
return nil, err
}
for k, v := range attrs {
switch k {
case pb.AttrLocalSessionID:
id.SessionID = v
if p := strings.SplitN(v, ":", 2); len(p) == 2 {
id.Name = p[0] + "-" + id.Name
id.SessionID = p[1]
}
case pb.AttrIncludePatterns:
var patterns []string
if err := json.Unmarshal([]byte(v), &patterns); err != nil {
return nil, err
}
id.IncludePatterns = patterns
case pb.AttrExcludePatterns:
var patterns []string
if err := json.Unmarshal([]byte(v), &patterns); err != nil {
return nil, err
}
id.ExcludePatterns = patterns
case pb.AttrFollowPaths:
var paths []string
if err := json.Unmarshal([]byte(v), &paths); err != nil {
return nil, err
}
id.FollowPaths = paths
case pb.AttrSharedKeyHint:
id.SharedKeyHint = v
case pb.AttrLocalDiffer:
switch v {
case pb.AttrLocalDifferMetadata, "":
id.Differ = fsutil.DiffMetadata
case pb.AttrLocalDifferNone:
id.Differ = fsutil.DiffNone
}
case pb.AttrMetadataTransfer:
b, err := strconv.ParseBool(v)
if err != nil {
return nil, errors.Wrapf(err, "invalid value for local.metadatatransfer %q", v)
}
id.MetadataOnly = b
case pb.AttrMetadataTransferExclude:
var exceptions []string
if err := json.Unmarshal([]byte(v), &exceptions); err != nil {
return nil, err
}
id.MetadataExceptions = exceptions
}
}
return id, nil
}
func (ls *localSource) Resolve(ctx context.Context, id source.Identifier, sm *session.Manager, _ solver.Vertex) (source.SourceInstance, error) {
localIdentifier, ok := id.(*LocalIdentifier)
if !ok {
return nil, errors.Errorf("invalid local identifier %v", id)
}
return &localSourceHandler{
src: *localIdentifier,
sm: sm,
localSource: ls,
}, nil
}
type localSourceHandler struct {
src LocalIdentifier
sm *session.Manager
*localSource
}
func (ls *localSourceHandler) CacheKey(ctx context.Context, g session.Group, index int) (string, string, solver.CacheOpts, bool, error) {
sessionID := ls.src.SessionID
if sessionID == "" {
id := g.SessionIterator().NextSession()
if id == "" {
return "", "", nil, false, errors.New("could not access local files without session")
}
sessionID = id
}
dt, err := json.Marshal(struct {
SessionID string
IncludePatterns []string
ExcludePatterns []string
FollowPaths []string
MetadataTransfer bool `json:",omitempty"`
MetadataExceptions []string `json:",omitempty"`
}{
SessionID: sessionID,
IncludePatterns: ls.src.IncludePatterns,
ExcludePatterns: ls.src.ExcludePatterns,
FollowPaths: ls.src.FollowPaths,
MetadataTransfer: ls.src.MetadataOnly,
MetadataExceptions: ls.src.MetadataExceptions,
})
if err != nil {
return "", "", nil, false, err
}
dgst, err := cachedigest.FromBytes(dt, cachedigest.TypeJSON)
if err != nil {
return "", "", nil, false, err
}
return "session:" + ls.src.Name + ":" + dgst.String(), dgst.String(), nil, true, nil
}
func (ls *localSourceHandler) Snapshot(ctx context.Context, g session.Group) (cache.ImmutableRef, error) {
sessionID := ls.src.SessionID
if sessionID == "" {
return ls.snapshotWithAnySession(ctx, g)
}
timeoutCtx, cancel := context.WithCancelCause(ctx)
timeoutCtx, _ = context.WithTimeoutCause(timeoutCtx, 5*time.Second, errors.WithStack(context.DeadlineExceeded)) //nolint:govet
defer func() { cancel(errors.WithStack(context.Canceled)) }()
caller, err := ls.sm.Get(timeoutCtx, sessionID, false)
if err != nil {
return ls.snapshotWithAnySession(ctx, g)
}
ref, err := ls.snapshot(ctx, caller)
if err != nil {
var serr filesync.InvalidSessionError
if errors.As(err, &serr) {
return ls.snapshotWithAnySession(ctx, g)
}
return nil, err
}
return ref, nil
}
func (ls *localSourceHandler) snapshotWithAnySession(ctx context.Context, g session.Group) (cache.ImmutableRef, error) {
var ref cache.ImmutableRef
err := ls.sm.Any(ctx, g, func(ctx context.Context, _ string, c session.Caller) error {
r, err := ls.snapshot(ctx, c)
if err != nil {
return err
}
ref = r
return nil
})
return ref, err
}
func (ls *localSourceHandler) snapshot(ctx context.Context, caller session.Caller) (out cache.ImmutableRef, retErr error) {
metaSfx := ""
if ls.src.MetadataOnly {
metaSfx = ":metadata"
}
sharedKey := ls.src.Name + ":" + ls.src.SharedKeyHint + ":" + caller.SharedKey() + metaSfx // TODO: replace caller.SharedKey() with source based hint from client(absolute-path+nodeid)
var mutable cache.MutableRef
sis, err := searchSharedKey(ctx, ls.cm, sharedKey)
if err != nil {
return nil, err
}
for _, si := range sis {
if m, err := ls.cm.GetMutable(ctx, si.ID()); err == nil {
bklog.G(ctx).Debugf("reusing ref for local: %s", m.ID())
mutable = m
break
} else {
bklog.G(ctx).Debugf("not reusing ref %s for local: %v", si.ID(), err)
}
}
if mutable == nil {
m, err := ls.cm.New(ctx, nil, nil, cache.CachePolicyRetain, cache.WithRecordType(client.UsageRecordTypeLocalSource), cache.WithDescription(fmt.Sprintf("local source for %s", ls.src.Name)))
if err != nil {
return nil, err
}
mutable = m
bklog.G(ctx).Debugf("new ref for local: %s", mutable.ID())
}
defer func() {
if retErr != nil && mutable != nil {
// on error remove the record as checksum update is in undefined state
if err := mutable.SetCachePolicyDefault(); err != nil {
bklog.G(ctx).Errorf("failed to reset mutable cachepolicy: %v", err)
}
contenthash.ClearCacheContext(mutable)
go mutable.Release(context.WithoutCancel(ctx))
}
}()
mount, err := mutable.Mount(ctx, false, nil)
if err != nil {
return nil, err
}
lm := snapshot.LocalMounter(mount)
dest, err := lm.Mount()
if err != nil {
return nil, err
}
defer func() {
if retErr != nil && lm != nil {
lm.Unmount()
}
}()
cc, err := contenthash.GetCacheContext(ctx, mutable)
if err != nil {
return nil, err
}
opt := filesync.FSSendRequestOpt{
Name: ls.src.Name,
IncludePatterns: ls.src.IncludePatterns,
ExcludePatterns: ls.src.ExcludePatterns,
FollowPaths: ls.src.FollowPaths,
DestDir: dest,
CacheUpdater: &cacheUpdater{cc, mount.IdentityMapping()},
ProgressCb: newProgressHandler(ctx, "transferring "+ls.src.Name+":"),
Differ: ls.src.Differ,
MetadataOnly: ls.src.MetadataOnly,
}
if opt.MetadataOnly && len(ls.src.MetadataExceptions) > 0 {
matcher, err := patternmatcher.New(ls.src.MetadataExceptions)
if err != nil {
return nil, errors.WithStack(err)
}
opt.MetadataOnlyFilter = func(p string, _ *fstypes.Stat) bool {
v, err := matcher.MatchesOrParentMatches(p)
return err == nil && v
}
}
if idmap := mount.IdentityMapping(); idmap != nil {
opt.Filter = func(p string, stat *fstypes.Stat) bool {
uid, gid, err := idmap.ToHost(int(stat.Uid), int(stat.Gid))
if err != nil {
return false
}
stat.Uid = uint32(uid)
stat.Gid = uint32(gid)
return true
}
}
if err := filesync.FSSync(ctx, caller, opt); err != nil {
return nil, err
}
if err := lm.Unmount(); err != nil {
return nil, err
}
lm = nil
if err := contenthash.SetCacheContext(ctx, mutable, cc); err != nil {
return nil, err
}
// skip storing snapshot by the shared key if it already exists
md := cacheRefMetadata{mutable}
if md.getSharedKey() != sharedKey {
if err := md.setSharedKey(sharedKey); err != nil {
return nil, err
}
bklog.G(ctx).Debugf("saved %s as %s", mutable.ID(), sharedKey)
}
snap, err := mutable.Commit(ctx)
if err != nil {
return nil, err
}
mutable = nil // avoid deferred cleanup
return snap, nil
}
func newProgressHandler(ctx context.Context, id string) func(int, bool) {
limiter := rate.NewLimiter(rate.Every(100*time.Millisecond), 1)
pw, _, _ := progress.NewFromContext(ctx)
now := time.Now()
st := progress.Status{
Started: &now,
Action: "transferring",
}
pw.Write(id, st)
return func(s int, last bool) {
if last || limiter.Allow() {
st.Current = s
if last {
now := time.Now()
st.Completed = &now
}
pw.Write(id, st)
if last {
pw.Close()
}
}
}
}
type cacheUpdater struct {
contenthash.CacheContext
idmap *user.IdentityMapping
}
func (cu *cacheUpdater) MarkSupported(bool) {
}
func (cu *cacheUpdater) ContentHasher() fsutil.ContentHasher {
return contenthash.NewFromStat
}
const (
keySharedKey = "local.sharedKey"
sharedKeyIndex = keySharedKey + ":"
)
func searchSharedKey(ctx context.Context, store cache.MetadataStore, k string) ([]cacheRefMetadata, error) {
var results []cacheRefMetadata
mds, err := store.Search(ctx, sharedKeyIndex+k, false)
if err != nil {
return nil, err
}
for _, md := range mds {
results = append(results, cacheRefMetadata{md})
}
return results, nil
}
type cacheRefMetadata struct {
cache.RefMetadata
}
func (md cacheRefMetadata) getSharedKey() string {
return md.GetString(keySharedKey)
}
func (md cacheRefMetadata) setSharedKey(key string) error {
return md.SetString(keySharedKey, key, sharedKeyIndex+key)
}