Browse code

Implement incremental file sync using client session

Also exposes shared cache and garbage collection/prune
for the source data.

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>

Tonis Tiigi authored on 2017/05/16 06:54:27
Showing 42 changed files
... ...
@@ -4,11 +4,11 @@ import (
4 4
 	"fmt"
5 5
 
6 6
 	"github.com/docker/distribution/reference"
7
+	"github.com/docker/docker/api/types"
7 8
 	"github.com/docker/docker/api/types/backend"
8 9
 	"github.com/docker/docker/builder"
9
-	"github.com/docker/docker/builder/dockerfile"
10
+	"github.com/docker/docker/builder/fscache"
10 11
 	"github.com/docker/docker/image"
11
-	"github.com/docker/docker/pkg/idtools"
12 12
 	"github.com/docker/docker/pkg/stringid"
13 13
 	"github.com/pkg/errors"
14 14
 	"golang.org/x/net/context"
... ...
@@ -20,16 +20,21 @@ type ImageComponent interface {
20 20
 	TagImageWithReference(image.ID, string, reference.Named) error
21 21
 }
22 22
 
23
+// Builder defines interface for running a build
24
+type Builder interface {
25
+	Build(context.Context, backend.BuildConfig) (*builder.Result, error)
26
+}
27
+
23 28
 // Backend provides build functionality to the API router
24 29
 type Backend struct {
25
-	manager        *dockerfile.BuildManager
30
+	builder        Builder
31
+	fsCache        *fscache.FSCache
26 32
 	imageComponent ImageComponent
27 33
 }
28 34
 
29 35
 // NewBackend creates a new build backend from components
30
-func NewBackend(components ImageComponent, builderBackend builder.Backend, sg dockerfile.SessionGetter, idMappings *idtools.IDMappings) (*Backend, error) {
31
-	manager := dockerfile.NewBuildManager(builderBackend, sg, idMappings)
32
-	return &Backend{imageComponent: components, manager: manager}, nil
36
+func NewBackend(components ImageComponent, builder Builder, fsCache *fscache.FSCache) (*Backend, error) {
37
+	return &Backend{imageComponent: components, builder: builder, fsCache: fsCache}, nil
33 38
 }
34 39
 
35 40
 // Build builds an image from a Source
... ...
@@ -40,7 +45,7 @@ func (b *Backend) Build(ctx context.Context, config backend.BuildConfig) (string
40 40
 		return "", err
41 41
 	}
42 42
 
43
-	build, err := b.manager.Build(ctx, config)
43
+	build, err := b.builder.Build(ctx, config)
44 44
 	if err != nil {
45 45
 		return "", err
46 46
 	}
... ...
@@ -58,6 +63,15 @@ func (b *Backend) Build(ctx context.Context, config backend.BuildConfig) (string
58 58
 	return imageID, err
59 59
 }
60 60
 
61
+// PruneCache removes all cached build sources
62
+func (b *Backend) PruneCache(ctx context.Context) (*types.BuildCachePruneReport, error) {
63
+	size, err := b.fsCache.Prune()
64
+	if err != nil {
65
+		return nil, errors.Wrap(err, "failed to prune build cache")
66
+	}
67
+	return &types.BuildCachePruneReport{SpaceReclaimed: size}, nil
68
+}
69
+
61 70
 func squashBuild(build *builder.Result, imageComponent ImageComponent) (string, error) {
62 71
 	var fromID string
63 72
 	if build.FromImage != nil {
... ...
@@ -1,6 +1,7 @@
1 1
 package build
2 2
 
3 3
 import (
4
+	"github.com/docker/docker/api/types"
4 5
 	"github.com/docker/docker/api/types/backend"
5 6
 	"golang.org/x/net/context"
6 7
 )
... ...
@@ -10,6 +11,9 @@ type Backend interface {
10 10
 	// Build a Docker image returning the id of the image
11 11
 	// TODO: make this return a reference instead of string
12 12
 	Build(context.Context, backend.BuildConfig) (string, error)
13
+
14
+	// Prune build cache
15
+	PruneCache(context.Context) (*types.BuildCachePruneReport, error)
13 16
 }
14 17
 
15 18
 type experimentalProvider interface {
... ...
@@ -24,5 +24,6 @@ func (r *buildRouter) Routes() []router.Route {
24 24
 func (r *buildRouter) initRoutes() {
25 25
 	r.routes = []router.Route{
26 26
 		router.NewPostRoute("/build", r.postBuild, router.WithCancel),
27
+		router.NewPostRoute("/build/prune", r.postPrune, router.WithCancel),
27 28
 	}
28 29
 }
... ...
@@ -132,6 +132,14 @@ func newImageBuildOptions(ctx context.Context, r *http.Request) (*types.ImageBui
132 132
 	return options, nil
133 133
 }
134 134
 
135
+func (br *buildRouter) postPrune(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
136
+	report, err := br.backend.PruneCache(ctx)
137
+	if err != nil {
138
+		return err
139
+	}
140
+	return httputils.WriteJSON(w, http.StatusOK, report)
141
+}
142
+
135 143
 func (br *buildRouter) postBuild(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
136 144
 	var (
137 145
 		notVerboseBuffer = bytes.NewBuffer(nil)
... ...
@@ -2,6 +2,7 @@ package system
2 2
 
3 3
 import (
4 4
 	"github.com/docker/docker/api/server/router"
5
+	"github.com/docker/docker/builder/fscache"
5 6
 	"github.com/docker/docker/daemon/cluster"
6 7
 )
7 8
 
... ...
@@ -11,13 +12,15 @@ type systemRouter struct {
11 11
 	backend Backend
12 12
 	cluster *cluster.Cluster
13 13
 	routes  []router.Route
14
+	builder *fscache.FSCache
14 15
 }
15 16
 
16 17
 // NewRouter initializes a new system router
17
-func NewRouter(b Backend, c *cluster.Cluster) router.Router {
18
+func NewRouter(b Backend, c *cluster.Cluster, fscache *fscache.FSCache) router.Router {
18 19
 	r := &systemRouter{
19 20
 		backend: b,
20 21
 		cluster: c,
22
+		builder: fscache,
21 23
 	}
22 24
 
23 25
 	r.routes = []router.Route{
... ...
@@ -17,6 +17,7 @@ import (
17 17
 	timetypes "github.com/docker/docker/api/types/time"
18 18
 	"github.com/docker/docker/api/types/versions"
19 19
 	"github.com/docker/docker/pkg/ioutils"
20
+	pkgerrors "github.com/pkg/errors"
20 21
 	"golang.org/x/net/context"
21 22
 )
22 23
 
... ...
@@ -75,6 +76,11 @@ func (s *systemRouter) getDiskUsage(ctx context.Context, w http.ResponseWriter,
75 75
 	if err != nil {
76 76
 		return err
77 77
 	}
78
+	builderSize, err := s.builder.DiskUsage()
79
+	if err != nil {
80
+		return pkgerrors.Wrap(err, "error getting build cache usage")
81
+	}
82
+	du.BuilderSize = builderSize
78 83
 
79 84
 	return httputils.WriteJSON(w, http.StatusOK, du)
80 85
 }
... ...
@@ -4745,6 +4745,27 @@ paths:
4745 4745
           schema:
4746 4746
             $ref: "#/definitions/ErrorResponse"
4747 4747
       tags: ["Image"]
4748
+  /build/prune:
4749
+    post:
4750
+      summary: "Delete builder cache"
4751
+      produces:
4752
+        - "application/json"
4753
+      operationId: "BuildPrune"
4754
+      responses:
4755
+        200:
4756
+          description: "No error"
4757
+          schema:
4758
+            type: "object"
4759
+            properties:
4760
+              SpaceReclaimed:
4761
+                description: "Disk space reclaimed in bytes"
4762
+                type: "integer"
4763
+                format: "int64"
4764
+        500:
4765
+          description: "Server error"
4766
+          schema:
4767
+            $ref: "#/definitions/ErrorResponse"
4768
+      tags: ["Image"]
4748 4769
   /images/create:
4749 4770
     post:
4750 4771
       summary: "Create an image"
... ...
@@ -489,10 +489,11 @@ type Runtime struct {
489 489
 // DiskUsage contains response of Engine API:
490 490
 // GET "/system/df"
491 491
 type DiskUsage struct {
492
-	LayersSize int64
493
-	Images     []*ImageSummary
494
-	Containers []*Container
495
-	Volumes    []*Volume
492
+	LayersSize  int64
493
+	Images      []*ImageSummary
494
+	Containers  []*Container
495
+	Volumes     []*Volume
496
+	BuilderSize int64
496 497
 }
497 498
 
498 499
 // ContainersPruneReport contains the response for Engine API:
... ...
@@ -516,6 +517,12 @@ type ImagesPruneReport struct {
516 516
 	SpaceReclaimed uint64
517 517
 }
518 518
 
519
+// BuildCachePruneReport contains the response for Engine API:
520
+// POST "/build/prune"
521
+type BuildCachePruneReport struct {
522
+	SpaceReclaimed uint64
523
+}
524
+
519 525
 // NetworksPruneReport contains the response for Engine API:
520 526
 // POST "/networks/prune"
521 527
 type NetworksPruneReport struct {
... ...
@@ -7,6 +7,7 @@ import (
7 7
 	"io/ioutil"
8 8
 	"runtime"
9 9
 	"strings"
10
+	"time"
10 11
 
11 12
 	"github.com/Sirupsen/logrus"
12 13
 	"github.com/docker/docker/api/types"
... ...
@@ -15,6 +16,7 @@ import (
15 15
 	"github.com/docker/docker/builder"
16 16
 	"github.com/docker/docker/builder/dockerfile/command"
17 17
 	"github.com/docker/docker/builder/dockerfile/parser"
18
+	"github.com/docker/docker/builder/fscache"
18 19
 	"github.com/docker/docker/builder/remotecontext"
19 20
 	"github.com/docker/docker/client/session"
20 21
 	"github.com/docker/docker/pkg/archive"
... ...
@@ -52,16 +54,22 @@ type BuildManager struct {
52 52
 	backend   builder.Backend
53 53
 	pathCache pathCache // TODO: make this persistent
54 54
 	sg        SessionGetter
55
+	fsCache   *fscache.FSCache
55 56
 }
56 57
 
57 58
 // NewBuildManager creates a BuildManager
58
-func NewBuildManager(b builder.Backend, sg SessionGetter, idMappings *idtools.IDMappings) *BuildManager {
59
-	return &BuildManager{
59
+func NewBuildManager(b builder.Backend, sg SessionGetter, fsCache *fscache.FSCache, idMappings *idtools.IDMappings) (*BuildManager, error) {
60
+	bm := &BuildManager{
60 61
 		backend:   b,
61 62
 		pathCache: &syncmap.Map{},
62 63
 		sg:        sg,
63 64
 		archiver:  chrootarchive.NewArchiver(idMappings),
65
+		fsCache:   fsCache,
64 66
 	}
67
+	if err := fsCache.RegisterTransport(remotecontext.ClientSessionRemote, NewClientSessionTransport()); err != nil {
68
+		return nil, err
69
+	}
70
+	return bm, nil
65 71
 }
66 72
 
67 73
 // Build starts a new build from a BuildConfig
... ...
@@ -75,13 +83,13 @@ func (bm *BuildManager) Build(ctx context.Context, config backend.BuildConfig) (
75 75
 	if err != nil {
76 76
 		return nil, err
77 77
 	}
78
-	if source != nil {
79
-		defer func() {
78
+	defer func() {
79
+		if source != nil {
80 80
 			if err := source.Close(); err != nil {
81 81
 				logrus.Debugf("[BUILDER] failed to remove temporary context: %v", err)
82 82
 			}
83
-		}()
84
-	}
83
+		}
84
+	}()
85 85
 
86 86
 	// TODO @jhowardmsft LCOW support - this will require rework to allow both linux and Windows simultaneously.
87 87
 	// This is an interim solution to hardcode to linux if LCOW is turned on.
... ...
@@ -95,8 +103,10 @@ func (bm *BuildManager) Build(ctx context.Context, config backend.BuildConfig) (
95 95
 	ctx, cancel := context.WithCancel(ctx)
96 96
 	defer cancel()
97 97
 
98
-	if err := bm.initializeClientSession(ctx, cancel, config.Options); err != nil {
98
+	if src, err := bm.initializeClientSession(ctx, cancel, config.Options); err != nil {
99 99
 		return nil, err
100
+	} else if src != nil {
101
+		source = src
100 102
 	}
101 103
 
102 104
 	builderOptions := builderOptions{
... ...
@@ -111,20 +121,38 @@ func (bm *BuildManager) Build(ctx context.Context, config backend.BuildConfig) (
111 111
 	return newBuilder(ctx, builderOptions).build(source, dockerfile)
112 112
 }
113 113
 
114
-func (bm *BuildManager) initializeClientSession(ctx context.Context, cancel func(), options *types.ImageBuildOptions) error {
114
+func (bm *BuildManager) initializeClientSession(ctx context.Context, cancel func(), options *types.ImageBuildOptions) (builder.Source, error) {
115 115
 	if options.SessionID == "" || bm.sg == nil {
116
-		return nil
116
+		return nil, nil
117 117
 	}
118 118
 	logrus.Debug("client is session enabled")
119
+
120
+	ctx, cancelCtx := context.WithTimeout(ctx, sessionConnectTimeout)
121
+	defer cancelCtx()
122
+
119 123
 	c, err := bm.sg.Get(ctx, options.SessionID)
120 124
 	if err != nil {
121
-		return err
125
+		return nil, err
122 126
 	}
123 127
 	go func() {
124 128
 		<-c.Context().Done()
125 129
 		cancel()
126 130
 	}()
127
-	return nil
131
+	if options.RemoteContext == remotecontext.ClientSessionRemote {
132
+		st := time.Now()
133
+		csi, err := NewClientSessionSourceIdentifier(ctx, bm.sg,
134
+			options.SessionID, []string{"/"})
135
+		if err != nil {
136
+			return nil, err
137
+		}
138
+		src, err := bm.fsCache.SyncFrom(ctx, csi)
139
+		if err != nil {
140
+			return nil, err
141
+		}
142
+		logrus.Debugf("sync-time: %v", time.Since(st))
143
+		return src, nil
144
+	}
145
+	return nil, nil
128 146
 }
129 147
 
130 148
 // builderOptions are the dependencies required by the builder
131 149
new file mode 100644
... ...
@@ -0,0 +1,78 @@
0
+package dockerfile
1
+
2
+import (
3
+	"time"
4
+
5
+	"github.com/docker/docker/builder/fscache"
6
+	"github.com/docker/docker/builder/remotecontext"
7
+	"github.com/docker/docker/client/session"
8
+	"github.com/docker/docker/client/session/filesync"
9
+	"github.com/pkg/errors"
10
+	"golang.org/x/net/context"
11
+)
12
+
13
+const sessionConnectTimeout = 5 * time.Second
14
+
15
+// ClientSessionTransport is a transport for copying files from docker client
16
+// to the daemon.
17
+type ClientSessionTransport struct{}
18
+
19
+// NewClientSessionTransport returns new ClientSessionTransport instance
20
+func NewClientSessionTransport() *ClientSessionTransport {
21
+	return &ClientSessionTransport{}
22
+}
23
+
24
+// Copy data from a remote to a destination directory.
25
+func (cst *ClientSessionTransport) Copy(ctx context.Context, id fscache.RemoteIdentifier, dest string, cu filesync.CacheUpdater) error {
26
+	csi, ok := id.(*ClientSessionSourceIdentifier)
27
+	if !ok {
28
+		return errors.New("invalid identifier for client session")
29
+	}
30
+
31
+	return filesync.FSSync(ctx, csi.caller, filesync.FSSendRequestOpt{
32
+		SrcPaths:     csi.srcPaths,
33
+		DestDir:      dest,
34
+		CacheUpdater: cu,
35
+	})
36
+}
37
+
38
+// ClientSessionSourceIdentifier is an identifier that can be used for requesting
39
+// files from remote client
40
+type ClientSessionSourceIdentifier struct {
41
+	srcPaths  []string
42
+	caller    session.Caller
43
+	sharedKey string
44
+	uuid      string
45
+}
46
+
47
+// NewClientSessionSourceIdentifier returns new ClientSessionSourceIdentifier instance
48
+func NewClientSessionSourceIdentifier(ctx context.Context, sg SessionGetter, uuid string, sources []string) (*ClientSessionSourceIdentifier, error) {
49
+	csi := &ClientSessionSourceIdentifier{
50
+		uuid:     uuid,
51
+		srcPaths: sources,
52
+	}
53
+	caller, err := sg.Get(ctx, uuid)
54
+	if err != nil {
55
+		return nil, errors.Wrapf(err, "failed to get session for %s", uuid)
56
+	}
57
+
58
+	csi.caller = caller
59
+	return csi, nil
60
+}
61
+
62
+// Transport returns transport identifier for remote identifier
63
+func (csi *ClientSessionSourceIdentifier) Transport() string {
64
+	return remotecontext.ClientSessionRemote
65
+}
66
+
67
+// SharedKey returns shared key for remote identifier. Shared key is used
68
+// for finding the base for a repeated transfer.
69
+func (csi *ClientSessionSourceIdentifier) SharedKey() string {
70
+	return csi.caller.SharedKey()
71
+}
72
+
73
+// Key returns unique key for remote identifier. Requests with same key return
74
+// same data.
75
+func (csi *ClientSessionSourceIdentifier) Key() string {
76
+	return csi.uuid
77
+}
... ...
@@ -158,7 +158,7 @@ func executeTestCase(t *testing.T, testCase dispatchTestCase) {
158 158
 		}
159 159
 	}()
160 160
 
161
-	context, err := remotecontext.MakeTarSumContext(tarStream)
161
+	context, err := remotecontext.FromArchive(tarStream)
162 162
 
163 163
 	if err != nil {
164 164
 		t.Fatalf("Error when creating tar context: %s", err)
165 165
new file mode 100644
... ...
@@ -0,0 +1,602 @@
0
+package fscache
1
+
2
+import (
3
+	"encoding/json"
4
+	"os"
5
+	"path/filepath"
6
+	"sort"
7
+	"sync"
8
+	"time"
9
+
10
+	"github.com/Sirupsen/logrus"
11
+	"github.com/boltdb/bolt"
12
+	"github.com/docker/docker/builder"
13
+	"github.com/docker/docker/builder/remotecontext"
14
+	"github.com/docker/docker/client/session/filesync"
15
+	"github.com/docker/docker/pkg/directory"
16
+	"github.com/docker/docker/pkg/stringid"
17
+	"github.com/pkg/errors"
18
+	"github.com/tonistiigi/fsutil"
19
+	"golang.org/x/net/context"
20
+	"golang.org/x/sync/singleflight"
21
+)
22
+
23
+const dbFile = "fscache.db"
24
+const cacheKey = "cache"
25
+const metaKey = "meta"
26
+
27
+// Backend is a backing implementation for FSCache
28
+type Backend interface {
29
+	Get(id string) (string, error)
30
+	Remove(id string) error
31
+}
32
+
33
+// FSCache allows syncing remote resources to cached snapshots
34
+type FSCache struct {
35
+	opt        Opt
36
+	transports map[string]Transport
37
+	mu         sync.Mutex
38
+	g          singleflight.Group
39
+	store      *fsCacheStore
40
+}
41
+
42
+// Opt defines options for initializing FSCache
43
+type Opt struct {
44
+	Backend  Backend
45
+	Root     string // for storing local metadata
46
+	GCPolicy GCPolicy
47
+}
48
+
49
+// GCPolicy defines policy for garbage collection
50
+type GCPolicy struct {
51
+	MaxSize         uint64
52
+	MaxKeepDuration time.Duration
53
+}
54
+
55
+// NewFSCache returns new FSCache object
56
+func NewFSCache(opt Opt) (*FSCache, error) {
57
+	store, err := newFSCacheStore(opt)
58
+	if err != nil {
59
+		return nil, err
60
+	}
61
+	return &FSCache{
62
+		store:      store,
63
+		opt:        opt,
64
+		transports: make(map[string]Transport),
65
+	}, nil
66
+}
67
+
68
+// Transport defines a method for syncing remote data to FSCache
69
+type Transport interface {
70
+	Copy(ctx context.Context, id RemoteIdentifier, dest string, cs filesync.CacheUpdater) error
71
+}
72
+
73
+// RemoteIdentifier identifies a transfer request
74
+type RemoteIdentifier interface {
75
+	Key() string
76
+	SharedKey() string
77
+	Transport() string
78
+}
79
+
80
+// RegisterTransport registers a new transport method
81
+func (fsc *FSCache) RegisterTransport(id string, transport Transport) error {
82
+	fsc.mu.Lock()
83
+	defer fsc.mu.Unlock()
84
+	if _, ok := fsc.transports[id]; ok {
85
+		return errors.Errorf("transport %v already exists", id)
86
+	}
87
+	fsc.transports[id] = transport
88
+	return nil
89
+}
90
+
91
+// SyncFrom returns a source based on a remote identifier
92
+func (fsc *FSCache) SyncFrom(ctx context.Context, id RemoteIdentifier) (builder.Source, error) { // cacheOpt
93
+	trasportID := id.Transport()
94
+	fsc.mu.Lock()
95
+	transport, ok := fsc.transports[id.Transport()]
96
+	if !ok {
97
+		fsc.mu.Unlock()
98
+		return nil, errors.Errorf("invalid transport %s", trasportID)
99
+	}
100
+
101
+	logrus.Debugf("SyncFrom %s %s", id.Key(), id.SharedKey())
102
+	fsc.mu.Unlock()
103
+	sourceRef, err, _ := fsc.g.Do(id.Key(), func() (interface{}, error) {
104
+		var sourceRef *cachedSourceRef
105
+		sourceRef, err := fsc.store.Get(id.Key())
106
+		if err == nil {
107
+			return sourceRef, nil
108
+		}
109
+
110
+		// check for unused shared cache
111
+		sharedKey := id.SharedKey()
112
+		if sharedKey != "" {
113
+			r, err := fsc.store.Rebase(sharedKey, id.Key())
114
+			if err == nil {
115
+				sourceRef = r
116
+			}
117
+		}
118
+
119
+		if sourceRef == nil {
120
+			var err error
121
+			sourceRef, err = fsc.store.New(id.Key(), sharedKey)
122
+			if err != nil {
123
+				return nil, errors.Wrap(err, "failed to create remote context")
124
+			}
125
+		}
126
+
127
+		if err := syncFrom(ctx, sourceRef, transport, id); err != nil {
128
+			sourceRef.Release()
129
+			return nil, err
130
+		}
131
+		if err := sourceRef.resetSize(-1); err != nil {
132
+			return nil, err
133
+		}
134
+		return sourceRef, nil
135
+	})
136
+	if err != nil {
137
+		return nil, err
138
+	}
139
+	ref := sourceRef.(*cachedSourceRef)
140
+	if ref.src == nil { // failsafe
141
+		return nil, errors.Errorf("invalid empty pull")
142
+	}
143
+	wc := &wrappedContext{Source: ref.src, closer: func() error {
144
+		ref.Release()
145
+		return nil
146
+	}}
147
+	return wc, nil
148
+}
149
+
150
+// DiskUsage reports how much data is allocated by the cache
151
+func (fsc *FSCache) DiskUsage() (int64, error) {
152
+	return fsc.store.DiskUsage()
153
+}
154
+
155
+// Prune allows manually cleaning up the cache
156
+func (fsc *FSCache) Prune() (uint64, error) {
157
+	return fsc.store.Prune()
158
+}
159
+
160
+// Close stops the gc and closes the persistent db
161
+func (fsc *FSCache) Close() error {
162
+	return fsc.store.Close()
163
+}
164
+
165
+func syncFrom(ctx context.Context, cs *cachedSourceRef, transport Transport, id RemoteIdentifier) (retErr error) {
166
+	src := cs.src
167
+	if src == nil {
168
+		src = remotecontext.NewCachableSource(cs.Dir())
169
+	}
170
+
171
+	if !cs.cached {
172
+		if err := cs.storage.db.View(func(tx *bolt.Tx) error {
173
+			b := tx.Bucket([]byte(id.Key()))
174
+			dt := b.Get([]byte(cacheKey))
175
+			if dt != nil {
176
+				if err := src.UnmarshalBinary(dt); err != nil {
177
+					return err
178
+				}
179
+			} else {
180
+				return errors.Wrap(src.Scan(), "failed to scan cache records")
181
+			}
182
+			return nil
183
+		}); err != nil {
184
+			return err
185
+		}
186
+	}
187
+
188
+	dc := &detectChanges{f: src.HandleChange}
189
+
190
+	// todo: probably send a bucket to `Copy` and let it return source
191
+	// but need to make sure that tx is safe
192
+	if err := transport.Copy(ctx, id, cs.Dir(), dc); err != nil {
193
+		return errors.Wrapf(err, "failed to copy to %s", cs.Dir())
194
+	}
195
+
196
+	if !dc.supported {
197
+		if err := src.Scan(); err != nil {
198
+			return errors.Wrap(err, "failed to scan cache records after transfer")
199
+		}
200
+	}
201
+	cs.cached = true
202
+	cs.src = src
203
+	return cs.storage.db.Update(func(tx *bolt.Tx) error {
204
+		dt, err := src.MarshalBinary()
205
+		if err != nil {
206
+			return err
207
+		}
208
+		b := tx.Bucket([]byte(id.Key()))
209
+		return b.Put([]byte(cacheKey), dt)
210
+	})
211
+}
212
+
213
+type fsCacheStore struct {
214
+	root     string
215
+	mu       sync.Mutex
216
+	sources  map[string]*cachedSource
217
+	db       *bolt.DB
218
+	fs       Backend
219
+	gcTimer  *time.Timer
220
+	gcPolicy GCPolicy
221
+}
222
+
223
+// CachePolicy defines policy for keeping a resource in cache
224
+type CachePolicy struct {
225
+	Priority int
226
+	LastUsed time.Time
227
+}
228
+
229
+func defaultCachePolicy() CachePolicy {
230
+	return CachePolicy{Priority: 10, LastUsed: time.Now()}
231
+}
232
+
233
+func newFSCacheStore(opt Opt) (*fsCacheStore, error) {
234
+	if err := os.MkdirAll(opt.Root, 0700); err != nil {
235
+		return nil, err
236
+	}
237
+	p := filepath.Join(opt.Root, dbFile)
238
+	db, err := bolt.Open(p, 0600, nil)
239
+	if err != nil {
240
+		return nil, errors.Wrap(err, "failed to open database file %s")
241
+	}
242
+	s := &fsCacheStore{db: db, sources: make(map[string]*cachedSource), fs: opt.Backend, gcPolicy: opt.GCPolicy}
243
+	db.View(func(tx *bolt.Tx) error {
244
+		return tx.ForEach(func(name []byte, b *bolt.Bucket) error {
245
+			dt := b.Get([]byte(metaKey))
246
+			if dt == nil {
247
+				return nil
248
+			}
249
+			var sm sourceMeta
250
+			if err := json.Unmarshal(dt, &sm); err != nil {
251
+				return err
252
+			}
253
+			dir, err := s.fs.Get(sm.BackendID)
254
+			if err != nil {
255
+				return err // TODO: handle gracefully
256
+			}
257
+			source := &cachedSource{
258
+				refs:       make(map[*cachedSourceRef]struct{}),
259
+				id:         string(name),
260
+				dir:        dir,
261
+				sourceMeta: sm,
262
+				storage:    s,
263
+			}
264
+			s.sources[string(name)] = source
265
+			return nil
266
+		})
267
+	})
268
+
269
+	s.gcTimer = s.startPeriodicGC(5 * time.Minute)
270
+	return s, nil
271
+}
272
+
273
+func (s *fsCacheStore) startPeriodicGC(interval time.Duration) *time.Timer {
274
+	var t *time.Timer
275
+	t = time.AfterFunc(interval, func() {
276
+		if err := s.GC(); err != nil {
277
+			logrus.Errorf("build gc error: %v", err)
278
+		}
279
+		t.Reset(interval)
280
+	})
281
+	return t
282
+}
283
+
284
+func (s *fsCacheStore) Close() error {
285
+	s.gcTimer.Stop()
286
+	return s.db.Close()
287
+}
288
+
289
+func (s *fsCacheStore) New(id, sharedKey string) (*cachedSourceRef, error) {
290
+	s.mu.Lock()
291
+	defer s.mu.Unlock()
292
+	var ret *cachedSource
293
+	if err := s.db.Update(func(tx *bolt.Tx) error {
294
+		b, err := tx.CreateBucket([]byte(id))
295
+		if err != nil {
296
+			return err
297
+		}
298
+		backendID := stringid.GenerateRandomID()
299
+		dir, err := s.fs.Get(backendID)
300
+		if err != nil {
301
+			return err
302
+		}
303
+		source := &cachedSource{
304
+			refs: make(map[*cachedSourceRef]struct{}),
305
+			id:   id,
306
+			dir:  dir,
307
+			sourceMeta: sourceMeta{
308
+				BackendID:   backendID,
309
+				SharedKey:   sharedKey,
310
+				CachePolicy: defaultCachePolicy(),
311
+			},
312
+			storage: s,
313
+		}
314
+		dt, err := json.Marshal(source.sourceMeta)
315
+		if err != nil {
316
+			return err
317
+		}
318
+		if err := b.Put([]byte(metaKey), dt); err != nil {
319
+			return err
320
+		}
321
+		s.sources[id] = source
322
+		ret = source
323
+		return nil
324
+	}); err != nil {
325
+		return nil, err
326
+	}
327
+	return ret.getRef(), nil
328
+}
329
+
330
+func (s *fsCacheStore) Rebase(sharedKey, newid string) (*cachedSourceRef, error) {
331
+	s.mu.Lock()
332
+	defer s.mu.Unlock()
333
+	var ret *cachedSource
334
+	for id, snap := range s.sources {
335
+		if snap.SharedKey == sharedKey && len(snap.refs) == 0 {
336
+			if err := s.db.Update(func(tx *bolt.Tx) error {
337
+				if err := tx.DeleteBucket([]byte(id)); err != nil {
338
+					return err
339
+				}
340
+				b, err := tx.CreateBucket([]byte(newid))
341
+				if err != nil {
342
+					return err
343
+				}
344
+				snap.id = newid
345
+				snap.CachePolicy = defaultCachePolicy()
346
+				dt, err := json.Marshal(snap.sourceMeta)
347
+				if err != nil {
348
+					return err
349
+				}
350
+				if err := b.Put([]byte(metaKey), dt); err != nil {
351
+					return err
352
+				}
353
+				delete(s.sources, id)
354
+				s.sources[newid] = snap
355
+				return nil
356
+			}); err != nil {
357
+				return nil, err
358
+			}
359
+			ret = snap
360
+			break
361
+		}
362
+	}
363
+	if ret == nil {
364
+		return nil, errors.Errorf("no candidate for rebase")
365
+	}
366
+	return ret.getRef(), nil
367
+}
368
+
369
+func (s *fsCacheStore) Get(id string) (*cachedSourceRef, error) {
370
+	s.mu.Lock()
371
+	defer s.mu.Unlock()
372
+	src, ok := s.sources[id]
373
+	if !ok {
374
+		return nil, errors.Errorf("not found")
375
+	}
376
+	return src.getRef(), nil
377
+}
378
+
379
+// DiskUsage reports how much data is allocated by the cache
380
+func (s *fsCacheStore) DiskUsage() (int64, error) {
381
+	s.mu.Lock()
382
+	defer s.mu.Unlock()
383
+	var size int64
384
+
385
+	for _, snap := range s.sources {
386
+		if len(snap.refs) == 0 {
387
+			ss, err := snap.getSize()
388
+			if err != nil {
389
+				return 0, err
390
+			}
391
+			size += ss
392
+		}
393
+	}
394
+	return size, nil
395
+}
396
+
397
+// Prune allows manually cleaning up the cache
398
+func (s *fsCacheStore) Prune() (uint64, error) {
399
+	s.mu.Lock()
400
+	defer s.mu.Unlock()
401
+	var size uint64
402
+
403
+	for id, snap := range s.sources {
404
+		if len(snap.refs) == 0 {
405
+			ss, err := snap.getSize()
406
+			if err != nil {
407
+				return size, err
408
+			}
409
+			if err := s.delete(id); err != nil {
410
+				return size, errors.Wrapf(err, "failed to delete %s", id)
411
+			}
412
+			size += uint64(ss)
413
+		}
414
+	}
415
+	return size, nil
416
+}
417
+
418
+// GC runs a garbage collector on FSCache
419
+func (s *fsCacheStore) GC() error {
420
+	s.mu.Lock()
421
+	defer s.mu.Unlock()
422
+	var size uint64
423
+
424
+	cutoff := time.Now().Add(-s.gcPolicy.MaxKeepDuration)
425
+	var blacklist []*cachedSource
426
+
427
+	for id, snap := range s.sources {
428
+		if len(snap.refs) == 0 {
429
+			if cutoff.After(snap.CachePolicy.LastUsed) {
430
+				if err := s.delete(id); err != nil {
431
+					return errors.Wrapf(err, "failed to delete %s", id)
432
+				}
433
+			} else {
434
+				ss, err := snap.getSize()
435
+				if err != nil {
436
+					return err
437
+				}
438
+				size += uint64(ss)
439
+				blacklist = append(blacklist, snap)
440
+			}
441
+		}
442
+	}
443
+
444
+	sort.Sort(sortableCacheSources(blacklist))
445
+	for _, snap := range blacklist {
446
+		if size <= s.gcPolicy.MaxSize {
447
+			break
448
+		}
449
+		ss, err := snap.getSize()
450
+		if err != nil {
451
+			return err
452
+		}
453
+		if err := s.delete(snap.id); err != nil {
454
+			return errors.Wrapf(err, "failed to delete %s", snap.id)
455
+		}
456
+		size -= uint64(ss)
457
+	}
458
+	return nil
459
+}
460
+
461
+// keep mu while calling this
462
+func (s *fsCacheStore) delete(id string) error {
463
+	src, ok := s.sources[id]
464
+	if !ok {
465
+		return nil
466
+	}
467
+	if len(src.refs) > 0 {
468
+		return errors.Errorf("can't delete %s because it has active references", id)
469
+	}
470
+	delete(s.sources, id)
471
+	if err := s.db.Update(func(tx *bolt.Tx) error {
472
+		return tx.DeleteBucket([]byte(id))
473
+	}); err != nil {
474
+		return err
475
+	}
476
+	if err := s.fs.Remove(src.BackendID); err != nil {
477
+		return err
478
+	}
479
+	return nil
480
+}
481
+
482
+type sourceMeta struct {
483
+	SharedKey   string
484
+	BackendID   string
485
+	CachePolicy CachePolicy
486
+	Size        int64
487
+}
488
+
489
+type cachedSource struct {
490
+	sourceMeta
491
+	refs    map[*cachedSourceRef]struct{}
492
+	id      string
493
+	dir     string
494
+	src     *remotecontext.CachableSource
495
+	storage *fsCacheStore
496
+	cached  bool // keep track if cache is up to date
497
+}
498
+
499
+type cachedSourceRef struct {
500
+	*cachedSource
501
+}
502
+
503
+func (cs *cachedSource) Dir() string {
504
+	return cs.dir
505
+}
506
+
507
+// hold storage lock before calling
508
+func (cs *cachedSource) getRef() *cachedSourceRef {
509
+	ref := &cachedSourceRef{cachedSource: cs}
510
+	cs.refs[ref] = struct{}{}
511
+	return ref
512
+}
513
+
514
+// hold storage lock before calling
515
+func (cs *cachedSource) getSize() (int64, error) {
516
+	if cs.sourceMeta.Size < 0 {
517
+		ss, err := directory.Size(cs.dir)
518
+		if err != nil {
519
+			return 0, err
520
+		}
521
+		if err := cs.resetSize(ss); err != nil {
522
+			return 0, err
523
+		}
524
+		return ss, nil
525
+	}
526
+	return cs.sourceMeta.Size, nil
527
+}
528
+
529
+func (cs *cachedSource) resetSize(val int64) error {
530
+	cs.sourceMeta.Size = val
531
+	return cs.saveMeta()
532
+}
533
+func (cs *cachedSource) saveMeta() error {
534
+	return cs.storage.db.Update(func(tx *bolt.Tx) error {
535
+		b := tx.Bucket([]byte(cs.id))
536
+		dt, err := json.Marshal(cs.sourceMeta)
537
+		if err != nil {
538
+			return err
539
+		}
540
+		return b.Put([]byte(metaKey), dt)
541
+	})
542
+}
543
+
544
+func (csr *cachedSourceRef) Release() error {
545
+	csr.cachedSource.storage.mu.Lock()
546
+	defer csr.cachedSource.storage.mu.Unlock()
547
+	delete(csr.cachedSource.refs, csr)
548
+	if len(csr.cachedSource.refs) == 0 {
549
+		go csr.cachedSource.storage.GC()
550
+	}
551
+	return nil
552
+}
553
+
554
+type detectChanges struct {
555
+	f         fsutil.ChangeFunc
556
+	supported bool
557
+}
558
+
559
+func (dc *detectChanges) HandleChange(kind fsutil.ChangeKind, path string, fi os.FileInfo, err error) error {
560
+	if dc == nil {
561
+		return nil
562
+	}
563
+	return dc.f(kind, path, fi, err)
564
+}
565
+
566
+func (dc *detectChanges) MarkSupported(v bool) {
567
+	if dc == nil {
568
+		return
569
+	}
570
+	dc.supported = v
571
+}
572
+
573
+type wrappedContext struct {
574
+	builder.Source
575
+	closer func() error
576
+}
577
+
578
+func (wc *wrappedContext) Close() error {
579
+	if err := wc.Source.Close(); err != nil {
580
+		return err
581
+	}
582
+	return wc.closer()
583
+}
584
+
585
+type sortableCacheSources []*cachedSource
586
+
587
+// Len is the number of elements in the collection.
588
+func (s sortableCacheSources) Len() int {
589
+	return len(s)
590
+}
591
+
592
+// Less reports whether the element with
593
+// index i should sort before the element with index j.
594
+func (s sortableCacheSources) Less(i, j int) bool {
595
+	return s[i].CachePolicy.LastUsed.Before(s[j].CachePolicy.LastUsed)
596
+}
597
+
598
+// Swap swaps the elements with indexes i and j.
599
+func (s sortableCacheSources) Swap(i, j int) {
600
+	s[i], s[j] = s[j], s[i]
601
+}
0 602
new file mode 100644
... ...
@@ -0,0 +1,131 @@
0
+package fscache
1
+
2
+import (
3
+	"io/ioutil"
4
+	"os"
5
+	"path/filepath"
6
+	"testing"
7
+	"time"
8
+
9
+	"github.com/docker/docker/client/session/filesync"
10
+	"github.com/stretchr/testify/assert"
11
+	"golang.org/x/net/context"
12
+)
13
+
14
+func TestFSCache(t *testing.T) {
15
+	tmpDir, err := ioutil.TempDir("", "fscache")
16
+	assert.Nil(t, err)
17
+	defer os.RemoveAll(tmpDir)
18
+
19
+	backend := NewNaiveCacheBackend(filepath.Join(tmpDir, "backend"))
20
+
21
+	opt := Opt{
22
+		Root:     tmpDir,
23
+		Backend:  backend,
24
+		GCPolicy: GCPolicy{MaxSize: 15, MaxKeepDuration: time.Hour},
25
+	}
26
+
27
+	fscache, err := NewFSCache(opt)
28
+	assert.Nil(t, err)
29
+
30
+	defer fscache.Close()
31
+
32
+	err = fscache.RegisterTransport("test", &testTransport{})
33
+	assert.Nil(t, err)
34
+
35
+	src1, err := fscache.SyncFrom(context.TODO(), &testIdentifier{"foo", "data", "bar"})
36
+	assert.Nil(t, err)
37
+
38
+	dt, err := ioutil.ReadFile(filepath.Join(src1.Root(), "foo"))
39
+	assert.Nil(t, err)
40
+	assert.Equal(t, string(dt), "data")
41
+
42
+	// same id doesn't recalculate anything
43
+	src2, err := fscache.SyncFrom(context.TODO(), &testIdentifier{"foo", "data2", "bar"})
44
+	assert.Nil(t, err)
45
+	assert.Equal(t, src1.Root(), src2.Root())
46
+
47
+	dt, err = ioutil.ReadFile(filepath.Join(src1.Root(), "foo"))
48
+	assert.Nil(t, err)
49
+	assert.Equal(t, string(dt), "data")
50
+	assert.Nil(t, src2.Close())
51
+
52
+	src3, err := fscache.SyncFrom(context.TODO(), &testIdentifier{"foo2", "data2", "bar"})
53
+	assert.Nil(t, err)
54
+	assert.NotEqual(t, src1.Root(), src3.Root())
55
+
56
+	dt, err = ioutil.ReadFile(filepath.Join(src3.Root(), "foo2"))
57
+	assert.Nil(t, err)
58
+	assert.Equal(t, string(dt), "data2")
59
+
60
+	s, err := fscache.DiskUsage()
61
+	assert.Nil(t, err)
62
+	assert.Equal(t, s, int64(0))
63
+
64
+	assert.Nil(t, src3.Close())
65
+
66
+	s, err = fscache.DiskUsage()
67
+	assert.Nil(t, err)
68
+	assert.Equal(t, s, int64(5))
69
+
70
+	// new upload with the same shared key shoutl overwrite
71
+	src4, err := fscache.SyncFrom(context.TODO(), &testIdentifier{"foo3", "data3", "bar"})
72
+	assert.Nil(t, err)
73
+	assert.NotEqual(t, src1.Root(), src3.Root())
74
+
75
+	dt, err = ioutil.ReadFile(filepath.Join(src3.Root(), "foo3"))
76
+	assert.Nil(t, err)
77
+	assert.Equal(t, string(dt), "data3")
78
+	assert.Equal(t, src4.Root(), src3.Root())
79
+	assert.Nil(t, src4.Close())
80
+
81
+	s, err = fscache.DiskUsage()
82
+	assert.Nil(t, err)
83
+	assert.Equal(t, s, int64(10))
84
+
85
+	// this one goes over the GC limit
86
+	src5, err := fscache.SyncFrom(context.TODO(), &testIdentifier{"foo4", "datadata", "baz"})
87
+	assert.Nil(t, err)
88
+	assert.Nil(t, src5.Close())
89
+
90
+	// GC happens async
91
+	time.Sleep(100 * time.Millisecond)
92
+
93
+	// only last insertion after GC
94
+	s, err = fscache.DiskUsage()
95
+	assert.Nil(t, err)
96
+	assert.Equal(t, s, int64(8))
97
+
98
+	// prune deletes everything
99
+	released, err := fscache.Prune()
100
+	assert.Nil(t, err)
101
+	assert.Equal(t, released, uint64(8))
102
+
103
+	s, err = fscache.DiskUsage()
104
+	assert.Nil(t, err)
105
+	assert.Equal(t, s, int64(0))
106
+}
107
+
108
+type testTransport struct {
109
+}
110
+
111
+func (t *testTransport) Copy(ctx context.Context, id RemoteIdentifier, dest string, cs filesync.CacheUpdater) error {
112
+	testid := id.(*testIdentifier)
113
+	return ioutil.WriteFile(filepath.Join(dest, testid.filename), []byte(testid.data), 0600)
114
+}
115
+
116
+type testIdentifier struct {
117
+	filename  string
118
+	data      string
119
+	sharedKey string
120
+}
121
+
122
+func (t *testIdentifier) Key() string {
123
+	return t.filename
124
+}
125
+func (t *testIdentifier) SharedKey() string {
126
+	return t.sharedKey
127
+}
128
+func (t *testIdentifier) Transport() string {
129
+	return "test"
130
+}
0 131
new file mode 100644
... ...
@@ -0,0 +1,28 @@
0
+package fscache
1
+
2
+import (
3
+	"os"
4
+	"path/filepath"
5
+
6
+	"github.com/pkg/errors"
7
+)
8
+
9
+// NewNaiveCacheBackend is a basic backend implementation for fscache
10
+func NewNaiveCacheBackend(root string) Backend {
11
+	return &naiveCacheBackend{root: root}
12
+}
13
+
14
+type naiveCacheBackend struct {
15
+	root string
16
+}
17
+
18
+func (tcb *naiveCacheBackend) Get(id string) (string, error) {
19
+	d := filepath.Join(tcb.root, id)
20
+	if err := os.MkdirAll(d, 0700); err != nil {
21
+		return "", errors.Wrapf(err, "failed to create tmp dir for %s", d)
22
+	}
23
+	return d, nil
24
+}
25
+func (tcb *naiveCacheBackend) Remove(id string) error {
26
+	return errors.WithStack(os.RemoveAll(filepath.Join(tcb.root, id)))
27
+}
0 28
new file mode 100644
... ...
@@ -0,0 +1,128 @@
0
+package remotecontext
1
+
2
+import (
3
+	"io"
4
+	"os"
5
+	"path/filepath"
6
+
7
+	"github.com/docker/docker/builder"
8
+	"github.com/docker/docker/pkg/archive"
9
+	"github.com/docker/docker/pkg/chrootarchive"
10
+	"github.com/docker/docker/pkg/ioutils"
11
+	"github.com/docker/docker/pkg/symlink"
12
+	"github.com/docker/docker/pkg/tarsum"
13
+	"github.com/pkg/errors"
14
+)
15
+
16
+type archiveContext struct {
17
+	root string
18
+	sums tarsum.FileInfoSums
19
+}
20
+
21
+func (c *archiveContext) Close() error {
22
+	return os.RemoveAll(c.root)
23
+}
24
+
25
+func convertPathError(err error, cleanpath string) error {
26
+	if err, ok := err.(*os.PathError); ok {
27
+		err.Path = cleanpath
28
+		return err
29
+	}
30
+	return err
31
+}
32
+
33
+type modifiableContext interface {
34
+	builder.Source
35
+	// Remove deletes the entry specified by `path`.
36
+	// It is usual for directory entries to delete all its subentries.
37
+	Remove(path string) error
38
+}
39
+
40
+// FromArchive returns a build source from a tar stream.
41
+//
42
+// It extracts the tar stream to a temporary folder that is deleted as soon as
43
+// the Context is closed.
44
+// As the extraction happens, a tarsum is calculated for every file, and the set of
45
+// all those sums then becomes the source of truth for all operations on this Context.
46
+//
47
+// Closing tarStream has to be done by the caller.
48
+func FromArchive(tarStream io.Reader) (builder.Source, error) {
49
+	root, err := ioutils.TempDir("", "docker-builder")
50
+	if err != nil {
51
+		return nil, err
52
+	}
53
+
54
+	tsc := &archiveContext{root: root}
55
+
56
+	// Make sure we clean-up upon error.  In the happy case the caller
57
+	// is expected to manage the clean-up
58
+	defer func() {
59
+		if err != nil {
60
+			tsc.Close()
61
+		}
62
+	}()
63
+
64
+	decompressedStream, err := archive.DecompressStream(tarStream)
65
+	if err != nil {
66
+		return nil, err
67
+	}
68
+
69
+	sum, err := tarsum.NewTarSum(decompressedStream, true, tarsum.Version1)
70
+	if err != nil {
71
+		return nil, err
72
+	}
73
+
74
+	err = chrootarchive.Untar(sum, root, nil)
75
+	if err != nil {
76
+		return nil, err
77
+	}
78
+
79
+	tsc.sums = sum.GetSums()
80
+
81
+	return tsc, nil
82
+}
83
+
84
+func (c *archiveContext) Root() string {
85
+	return c.root
86
+}
87
+
88
+func (c *archiveContext) Remove(path string) error {
89
+	_, fullpath, err := normalize(path, c.root)
90
+	if err != nil {
91
+		return err
92
+	}
93
+	return os.RemoveAll(fullpath)
94
+}
95
+
96
+func (c *archiveContext) Hash(path string) (string, error) {
97
+	cleanpath, fullpath, err := normalize(path, c.root)
98
+	if err != nil {
99
+		return "", err
100
+	}
101
+
102
+	rel, err := filepath.Rel(c.root, fullpath)
103
+	if err != nil {
104
+		return "", convertPathError(err, cleanpath)
105
+	}
106
+
107
+	// Use the checksum of the followed path(not the possible symlink) because
108
+	// this is the file that is actually copied.
109
+	if tsInfo := c.sums.GetFile(filepath.ToSlash(rel)); tsInfo != nil {
110
+		return tsInfo.Sum(), nil
111
+	}
112
+	// We set sum to path by default for the case where GetFile returns nil.
113
+	// The usual case is if relative path is empty.
114
+	return path, nil // backwards compat TODO: see if really needed
115
+}
116
+
117
+func normalize(path, root string) (cleanPath, fullPath string, err error) {
118
+	cleanPath = filepath.Clean(string(os.PathSeparator) + path)[1:]
119
+	fullPath, err = symlink.FollowSymlinkInScope(filepath.Join(root, path), root)
120
+	if err != nil {
121
+		return "", "", errors.Wrapf(err, "forbidden path outside the build context: %s (%s)", path, cleanPath)
122
+	}
123
+	if _, err := os.Lstat(fullPath); err != nil {
124
+		return "", "", errors.WithStack(convertPathError(err, path))
125
+	}
126
+	return
127
+}
... ...
@@ -19,6 +19,9 @@ import (
19 19
 	"github.com/pkg/errors"
20 20
 )
21 21
 
22
+// ClientSessionRemote is identifier for client-session context transport
23
+const ClientSessionRemote = "client-session"
24
+
22 25
 // Detect returns a context and dockerfile from remote location or local
23 26
 // archive. progressReader is only used if remoteURL is actually a URL
24 27
 // (not empty, and not a Git endpoint).
... ...
@@ -29,6 +32,12 @@ func Detect(config backend.BuildConfig) (remote builder.Source, dockerfile *pars
29 29
 	switch {
30 30
 	case remoteURL == "":
31 31
 		remote, dockerfile, err = newArchiveRemote(config.Source, dockerfilePath)
32
+	case remoteURL == ClientSessionRemote:
33
+		res, err := parser.Parse(config.Source)
34
+		if err != nil {
35
+			return nil, nil, err
36
+		}
37
+		return nil, res, nil
32 38
 	case urlutil.IsGitURL(remoteURL):
33 39
 		remote, dockerfile, err = newGitRemote(remoteURL, dockerfilePath)
34 40
 	case urlutil.IsURL(remoteURL):
... ...
@@ -41,7 +50,7 @@ func Detect(config backend.BuildConfig) (remote builder.Source, dockerfile *pars
41 41
 
42 42
 func newArchiveRemote(rc io.ReadCloser, dockerfilePath string) (builder.Source, *parser.Result, error) {
43 43
 	defer rc.Close()
44
-	c, err := MakeTarSumContext(rc)
44
+	c, err := FromArchive(rc)
45 45
 	if err != nil {
46 46
 		return nil, nil, err
47 47
 	}
... ...
@@ -12,10 +12,21 @@ import (
12 12
 
13 13
 // NewFileHash returns new hash that is used for the builder cache keys
14 14
 func NewFileHash(path, name string, fi os.FileInfo) (hash.Hash, error) {
15
-	hdr, err := archive.FileInfoHeader(path, name, fi)
15
+	var link string
16
+	if fi.Mode()&os.ModeSymlink != 0 {
17
+		var err error
18
+		link, err = os.Readlink(path)
19
+		if err != nil {
20
+			return nil, err
21
+		}
22
+	}
23
+	hdr, err := archive.FileInfoHeader(name, fi, link)
16 24
 	if err != nil {
17 25
 		return nil, err
18 26
 	}
27
+	if err := archive.ReadSecurityXattrToTarHeader(path, hdr); err != nil {
28
+		return nil, err
29
+	}
19 30
 	tsh := &tarsumHash{hdr: hdr, Hash: sha256.New()}
20 31
 	tsh.Reset() // initialize header
21 32
 	return tsh, nil
22 33
new file mode 100644
... ...
@@ -0,0 +1,3 @@
0
+package remotecontext
1
+
2
+//go:generate protoc --gogoslick_out=. tarsum.proto
... ...
@@ -25,5 +25,5 @@ func MakeGitContext(gitURL string) (builder.Source, error) {
25 25
 		c.Close()
26 26
 		os.RemoveAll(root)
27 27
 	}()
28
-	return MakeTarSumContext(c)
28
+	return FromArchive(c)
29 29
 }
... ...
@@ -43,7 +43,7 @@ func (c *lazySource) Hash(path string) (string, error) {
43 43
 
44 44
 	fi, err := os.Lstat(fullPath)
45 45
 	if err != nil {
46
-		return "", err
46
+		return "", errors.WithStack(err)
47 47
 	}
48 48
 
49 49
 	relPath, err := Rel(c.root, fullPath)
... ...
@@ -28,7 +28,7 @@ var mimeRe = regexp.MustCompile(acceptableRemoteMIME)
28 28
 //
29 29
 // If a match is found, then the body is sent to the contentType handler and a (potentially compressed) tar stream is expected
30 30
 // to be returned. If no match is found, it is assumed the body is a tar stream (compressed or not).
31
-// In either case, an (assumed) tar stream is passed to MakeTarSumContext whose result is returned.
31
+// In either case, an (assumed) tar stream is passed to FromArchive whose result is returned.
32 32
 func MakeRemoteContext(remoteURL string, contentTypeHandlers map[string]func(io.ReadCloser) (io.ReadCloser, error)) (builder.Source, error) {
33 33
 	f, err := GetWithStatusError(remoteURL)
34 34
 	if err != nil {
... ...
@@ -63,7 +63,7 @@ func MakeRemoteContext(remoteURL string, contentTypeHandlers map[string]func(io.
63 63
 
64 64
 	// Pass through - this is a pre-packaged context, presumably
65 65
 	// with a Dockerfile with the right name inside it.
66
-	return MakeTarSumContext(contextReader)
66
+	return FromArchive(contextReader)
67 67
 }
68 68
 
69 69
 // GetWithStatusError does an http.Get() and returns an error if the
... ...
@@ -212,26 +212,13 @@ func TestMakeRemoteContext(t *testing.T) {
212 212
 		t.Fatal("Remote context should not be nil")
213 213
 	}
214 214
 
215
-	tarSumCtx, ok := remoteContext.(*tarSumContext)
216
-
217
-	if !ok {
218
-		t.Fatal("Cast error, remote context should be casted to tarSumContext")
219
-	}
220
-
221
-	fileInfoSums := tarSumCtx.sums
222
-
223
-	if fileInfoSums.Len() != 1 {
224
-		t.Fatalf("Size of file info sums should be 1, got: %d", fileInfoSums.Len())
225
-	}
226
-
227
-	fileInfo := fileInfoSums.GetFile(builder.DefaultDockerfileName)
228
-
229
-	if fileInfo == nil {
230
-		t.Fatalf("There should be file named %s in fileInfoSums", builder.DefaultDockerfileName)
215
+	h, err := remoteContext.Hash(builder.DefaultDockerfileName)
216
+	if err != nil {
217
+		t.Fatalf("failed to compute hash %s", err)
231 218
 	}
232 219
 
233
-	if fileInfo.Pos() != 0 {
234
-		t.Fatalf("File %s should have position 0, got %d", builder.DefaultDockerfileName, fileInfo.Pos())
220
+	if expected, actual := "7b6b6b66bee9e2102fbdc2228be6c980a2a23adf371962a37286a49f7de0f7cc", h; expected != actual {
221
+		t.Fatalf("There should be file named %s %s in fileInfoSums", expected, actual)
235 222
 	}
236 223
 }
237 224
 
... ...
@@ -1,128 +1,174 @@
1 1
 package remotecontext
2 2
 
3 3
 import (
4
-	"io"
4
+	"fmt"
5 5
 	"os"
6 6
 	"path/filepath"
7
+	"sync"
7 8
 
8
-	"github.com/docker/docker/builder"
9
-	"github.com/docker/docker/pkg/archive"
10
-	"github.com/docker/docker/pkg/chrootarchive"
11
-	"github.com/docker/docker/pkg/ioutils"
12 9
 	"github.com/docker/docker/pkg/symlink"
13
-	"github.com/docker/docker/pkg/tarsum"
10
+	iradix "github.com/hashicorp/go-immutable-radix"
14 11
 	"github.com/pkg/errors"
12
+	"github.com/tonistiigi/fsutil"
15 13
 )
16 14
 
17
-type tarSumContext struct {
18
-	root string
19
-	sums tarsum.FileInfoSums
15
+type hashed interface {
16
+	Hash() string
20 17
 }
21 18
 
22
-func (c *tarSumContext) Close() error {
23
-	return os.RemoveAll(c.root)
19
+// CachableSource is a source that contains cache records for its contents
20
+type CachableSource struct {
21
+	mu   sync.Mutex
22
+	root string
23
+	tree *iradix.Tree
24
+	txn  *iradix.Txn
24 25
 }
25 26
 
26
-func convertPathError(err error, cleanpath string) error {
27
-	if err, ok := err.(*os.PathError); ok {
28
-		err.Path = cleanpath
29
-		return err
27
+// NewCachableSource creates new CachableSource
28
+func NewCachableSource(root string) *CachableSource {
29
+	ts := &CachableSource{
30
+		tree: iradix.New(),
31
+		root: root,
30 32
 	}
31
-	return err
33
+	return ts
32 34
 }
33 35
 
34
-type modifiableContext interface {
35
-	builder.Source
36
-	// Remove deletes the entry specified by `path`.
37
-	// It is usual for directory entries to delete all its subentries.
38
-	Remove(path string) error
36
+// MarshalBinary marshals current cache information to a byte array
37
+func (cs *CachableSource) MarshalBinary() ([]byte, error) {
38
+	b := TarsumBackup{Hashes: make(map[string]string)}
39
+	root := cs.getRoot()
40
+	root.Walk(func(k []byte, v interface{}) bool {
41
+		b.Hashes[string(k)] = v.(*fileInfo).sum
42
+		return false
43
+	})
44
+	return b.Marshal()
39 45
 }
40 46
 
41
-// MakeTarSumContext returns a build Context from a tar stream.
42
-//
43
-// It extracts the tar stream to a temporary folder that is deleted as soon as
44
-// the Context is closed.
45
-// As the extraction happens, a tarsum is calculated for every file, and the set of
46
-// all those sums then becomes the source of truth for all operations on this Context.
47
-//
48
-// Closing tarStream has to be done by the caller.
49
-func MakeTarSumContext(tarStream io.Reader) (builder.Source, error) {
50
-	root, err := ioutils.TempDir("", "docker-builder")
51
-	if err != nil {
52
-		return nil, err
47
+// UnmarshalBinary decodes cache information for presented byte array
48
+func (cs *CachableSource) UnmarshalBinary(data []byte) error {
49
+	var b TarsumBackup
50
+	if err := b.Unmarshal(data); err != nil {
51
+		return err
53 52
 	}
53
+	txn := iradix.New().Txn()
54
+	for p, v := range b.Hashes {
55
+		txn.Insert([]byte(p), &fileInfo{sum: v})
56
+	}
57
+	cs.mu.Lock()
58
+	defer cs.mu.Unlock()
59
+	cs.tree = txn.Commit()
60
+	return nil
61
+}
54 62
 
55
-	tsc := &tarSumContext{root: root}
56
-
57
-	// Make sure we clean-up upon error.  In the happy case the caller
58
-	// is expected to manage the clean-up
59
-	defer func() {
63
+// Scan rescans the cache information from the file system
64
+func (cs *CachableSource) Scan() error {
65
+	lc, err := NewLazySource(cs.root)
66
+	if err != nil {
67
+		return err
68
+	}
69
+	txn := iradix.New().Txn()
70
+	err = filepath.Walk(cs.root, func(path string, info os.FileInfo, err error) error {
60 71
 		if err != nil {
61
-			tsc.Close()
72
+			return errors.Wrapf(err, "failed to walk %s", path)
62 73
 		}
63
-	}()
64
-
65
-	decompressedStream, err := archive.DecompressStream(tarStream)
74
+		rel, err := Rel(cs.root, path)
75
+		if err != nil {
76
+			return err
77
+		}
78
+		h, err := lc.Hash(rel)
79
+		if err != nil {
80
+			return err
81
+		}
82
+		txn.Insert([]byte(rel), &fileInfo{sum: h})
83
+		return nil
84
+	})
66 85
 	if err != nil {
67
-		return nil, err
86
+		return err
68 87
 	}
88
+	cs.mu.Lock()
89
+	defer cs.mu.Unlock()
90
+	cs.tree = txn.Commit()
91
+	return nil
92
+}
69 93
 
70
-	sum, err := tarsum.NewTarSum(decompressedStream, true, tarsum.Version1)
71
-	if err != nil {
72
-		return nil, err
94
+// HandleChange notifies the source about a modification operation
95
+func (cs *CachableSource) HandleChange(kind fsutil.ChangeKind, p string, fi os.FileInfo, err error) (retErr error) {
96
+	cs.mu.Lock()
97
+	if cs.txn == nil {
98
+		cs.txn = cs.tree.Txn()
73 99
 	}
74
-
75
-	err = chrootarchive.Untar(sum, root, nil)
76
-	if err != nil {
77
-		return nil, err
100
+	if kind == fsutil.ChangeKindDelete {
101
+		cs.txn.Delete([]byte(p))
102
+		cs.mu.Unlock()
103
+		return
78 104
 	}
79 105
 
80
-	tsc.sums = sum.GetSums()
106
+	h, ok := fi.(hashed)
107
+	if !ok {
108
+		cs.mu.Unlock()
109
+		return errors.Errorf("invalid fileinfo: %s", p)
110
+	}
81 111
 
82
-	return tsc, nil
112
+	hfi := &fileInfo{
113
+		sum: h.Hash(),
114
+	}
115
+	cs.txn.Insert([]byte(p), hfi)
116
+	cs.mu.Unlock()
117
+	return nil
83 118
 }
84 119
 
85
-func (c *tarSumContext) Root() string {
86
-	return c.root
120
+func (cs *CachableSource) getRoot() *iradix.Node {
121
+	cs.mu.Lock()
122
+	if cs.txn != nil {
123
+		cs.tree = cs.txn.Commit()
124
+		cs.txn = nil
125
+	}
126
+	t := cs.tree
127
+	cs.mu.Unlock()
128
+	return t.Root()
87 129
 }
88 130
 
89
-func (c *tarSumContext) Remove(path string) error {
90
-	_, fullpath, err := normalize(path, c.root)
91
-	if err != nil {
92
-		return err
93
-	}
94
-	return os.RemoveAll(fullpath)
131
+// Close closes the source
132
+func (cs *CachableSource) Close() error {
133
+	return nil
95 134
 }
96 135
 
97
-func (c *tarSumContext) Hash(path string) (string, error) {
98
-	cleanpath, fullpath, err := normalize(path, c.root)
136
+func (cs *CachableSource) normalize(path string) (cleanpath, fullpath string, err error) {
137
+	cleanpath = filepath.Clean(string(os.PathSeparator) + path)[1:]
138
+	fullpath, err = symlink.FollowSymlinkInScope(filepath.Join(cs.root, path), cs.root)
99 139
 	if err != nil {
100
-		return "", err
140
+		return "", "", fmt.Errorf("Forbidden path outside the context: %s (%s)", path, fullpath)
101 141
 	}
102
-
103
-	rel, err := filepath.Rel(c.root, fullpath)
142
+	_, err = os.Lstat(fullpath)
104 143
 	if err != nil {
105
-		return "", convertPathError(err, cleanpath)
144
+		return "", "", convertPathError(err, path)
106 145
 	}
146
+	return
147
+}
107 148
 
108
-	// Use the checksum of the followed path(not the possible symlink) because
109
-	// this is the file that is actually copied.
110
-	if tsInfo := c.sums.GetFile(filepath.ToSlash(rel)); tsInfo != nil {
111
-		return tsInfo.Sum(), nil
149
+// Hash returns a hash for a single file in the source
150
+func (cs *CachableSource) Hash(path string) (string, error) {
151
+	n := cs.getRoot()
152
+	sum := ""
153
+	// TODO: check this for symlinks
154
+	v, ok := n.Get([]byte(path))
155
+	if !ok {
156
+		sum = path
157
+	} else {
158
+		sum = v.(*fileInfo).sum
112 159
 	}
113
-	// We set sum to path by default for the case where GetFile returns nil.
114
-	// The usual case is if relative path is empty.
115
-	return path, nil // backwards compat TODO: see if really needed
160
+	return sum, nil
116 161
 }
117 162
 
118
-func normalize(path, root string) (cleanPath, fullPath string, err error) {
119
-	cleanPath = filepath.Clean(string(os.PathSeparator) + path)[1:]
120
-	fullPath, err = symlink.FollowSymlinkInScope(filepath.Join(root, path), root)
121
-	if err != nil {
122
-		return "", "", errors.Wrapf(err, "forbidden path outside the build context: %s (%s)", path, cleanPath)
123
-	}
124
-	if _, err := os.Lstat(fullPath); err != nil {
125
-		return "", "", convertPathError(err, path)
126
-	}
127
-	return
163
+// Root returns a root directory for the source
164
+func (cs *CachableSource) Root() string {
165
+	return cs.root
166
+}
167
+
168
+type fileInfo struct {
169
+	sum string
170
+}
171
+
172
+func (fi *fileInfo) Hash() string {
173
+	return fi.sum
128 174
 }
129 175
new file mode 100644
... ...
@@ -0,0 +1,525 @@
0
+// Code generated by protoc-gen-gogo.
1
+// source: tarsum.proto
2
+// DO NOT EDIT!
3
+
4
+/*
5
+Package remotecontext is a generated protocol buffer package.
6
+
7
+It is generated from these files:
8
+	tarsum.proto
9
+
10
+It has these top-level messages:
11
+	TarsumBackup
12
+*/
13
+package remotecontext
14
+
15
+import proto "github.com/gogo/protobuf/proto"
16
+import fmt "fmt"
17
+import math "math"
18
+
19
+import strings "strings"
20
+import reflect "reflect"
21
+import github_com_gogo_protobuf_sortkeys "github.com/gogo/protobuf/sortkeys"
22
+
23
+import io "io"
24
+
25
+// Reference imports to suppress errors if they are not otherwise used.
26
+var _ = proto.Marshal
27
+var _ = fmt.Errorf
28
+var _ = math.Inf
29
+
30
+// This is a compile-time assertion to ensure that this generated file
31
+// is compatible with the proto package it is being compiled against.
32
+// A compilation error at this line likely means your copy of the
33
+// proto package needs to be updated.
34
+const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
35
+
36
+type TarsumBackup struct {
37
+	Hashes map[string]string `protobuf:"bytes,1,rep,name=Hashes" json:"Hashes,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
38
+}
39
+
40
+func (m *TarsumBackup) Reset()                    { *m = TarsumBackup{} }
41
+func (*TarsumBackup) ProtoMessage()               {}
42
+func (*TarsumBackup) Descriptor() ([]byte, []int) { return fileDescriptorTarsum, []int{0} }
43
+
44
+func (m *TarsumBackup) GetHashes() map[string]string {
45
+	if m != nil {
46
+		return m.Hashes
47
+	}
48
+	return nil
49
+}
50
+
51
+func init() {
52
+	proto.RegisterType((*TarsumBackup)(nil), "remotecontext.TarsumBackup")
53
+}
54
+func (this *TarsumBackup) Equal(that interface{}) bool {
55
+	if that == nil {
56
+		if this == nil {
57
+			return true
58
+		}
59
+		return false
60
+	}
61
+
62
+	that1, ok := that.(*TarsumBackup)
63
+	if !ok {
64
+		that2, ok := that.(TarsumBackup)
65
+		if ok {
66
+			that1 = &that2
67
+		} else {
68
+			return false
69
+		}
70
+	}
71
+	if that1 == nil {
72
+		if this == nil {
73
+			return true
74
+		}
75
+		return false
76
+	} else if this == nil {
77
+		return false
78
+	}
79
+	if len(this.Hashes) != len(that1.Hashes) {
80
+		return false
81
+	}
82
+	for i := range this.Hashes {
83
+		if this.Hashes[i] != that1.Hashes[i] {
84
+			return false
85
+		}
86
+	}
87
+	return true
88
+}
89
+func (this *TarsumBackup) GoString() string {
90
+	if this == nil {
91
+		return "nil"
92
+	}
93
+	s := make([]string, 0, 5)
94
+	s = append(s, "&remotecontext.TarsumBackup{")
95
+	keysForHashes := make([]string, 0, len(this.Hashes))
96
+	for k, _ := range this.Hashes {
97
+		keysForHashes = append(keysForHashes, k)
98
+	}
99
+	github_com_gogo_protobuf_sortkeys.Strings(keysForHashes)
100
+	mapStringForHashes := "map[string]string{"
101
+	for _, k := range keysForHashes {
102
+		mapStringForHashes += fmt.Sprintf("%#v: %#v,", k, this.Hashes[k])
103
+	}
104
+	mapStringForHashes += "}"
105
+	if this.Hashes != nil {
106
+		s = append(s, "Hashes: "+mapStringForHashes+",\n")
107
+	}
108
+	s = append(s, "}")
109
+	return strings.Join(s, "")
110
+}
111
+func valueToGoStringTarsum(v interface{}, typ string) string {
112
+	rv := reflect.ValueOf(v)
113
+	if rv.IsNil() {
114
+		return "nil"
115
+	}
116
+	pv := reflect.Indirect(rv).Interface()
117
+	return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv)
118
+}
119
+func (m *TarsumBackup) Marshal() (dAtA []byte, err error) {
120
+	size := m.Size()
121
+	dAtA = make([]byte, size)
122
+	n, err := m.MarshalTo(dAtA)
123
+	if err != nil {
124
+		return nil, err
125
+	}
126
+	return dAtA[:n], nil
127
+}
128
+
129
+func (m *TarsumBackup) MarshalTo(dAtA []byte) (int, error) {
130
+	var i int
131
+	_ = i
132
+	var l int
133
+	_ = l
134
+	if len(m.Hashes) > 0 {
135
+		for k, _ := range m.Hashes {
136
+			dAtA[i] = 0xa
137
+			i++
138
+			v := m.Hashes[k]
139
+			mapSize := 1 + len(k) + sovTarsum(uint64(len(k))) + 1 + len(v) + sovTarsum(uint64(len(v)))
140
+			i = encodeVarintTarsum(dAtA, i, uint64(mapSize))
141
+			dAtA[i] = 0xa
142
+			i++
143
+			i = encodeVarintTarsum(dAtA, i, uint64(len(k)))
144
+			i += copy(dAtA[i:], k)
145
+			dAtA[i] = 0x12
146
+			i++
147
+			i = encodeVarintTarsum(dAtA, i, uint64(len(v)))
148
+			i += copy(dAtA[i:], v)
149
+		}
150
+	}
151
+	return i, nil
152
+}
153
+
154
+func encodeFixed64Tarsum(dAtA []byte, offset int, v uint64) int {
155
+	dAtA[offset] = uint8(v)
156
+	dAtA[offset+1] = uint8(v >> 8)
157
+	dAtA[offset+2] = uint8(v >> 16)
158
+	dAtA[offset+3] = uint8(v >> 24)
159
+	dAtA[offset+4] = uint8(v >> 32)
160
+	dAtA[offset+5] = uint8(v >> 40)
161
+	dAtA[offset+6] = uint8(v >> 48)
162
+	dAtA[offset+7] = uint8(v >> 56)
163
+	return offset + 8
164
+}
165
+func encodeFixed32Tarsum(dAtA []byte, offset int, v uint32) int {
166
+	dAtA[offset] = uint8(v)
167
+	dAtA[offset+1] = uint8(v >> 8)
168
+	dAtA[offset+2] = uint8(v >> 16)
169
+	dAtA[offset+3] = uint8(v >> 24)
170
+	return offset + 4
171
+}
172
+func encodeVarintTarsum(dAtA []byte, offset int, v uint64) int {
173
+	for v >= 1<<7 {
174
+		dAtA[offset] = uint8(v&0x7f | 0x80)
175
+		v >>= 7
176
+		offset++
177
+	}
178
+	dAtA[offset] = uint8(v)
179
+	return offset + 1
180
+}
181
+func (m *TarsumBackup) Size() (n int) {
182
+	var l int
183
+	_ = l
184
+	if len(m.Hashes) > 0 {
185
+		for k, v := range m.Hashes {
186
+			_ = k
187
+			_ = v
188
+			mapEntrySize := 1 + len(k) + sovTarsum(uint64(len(k))) + 1 + len(v) + sovTarsum(uint64(len(v)))
189
+			n += mapEntrySize + 1 + sovTarsum(uint64(mapEntrySize))
190
+		}
191
+	}
192
+	return n
193
+}
194
+
195
+func sovTarsum(x uint64) (n int) {
196
+	for {
197
+		n++
198
+		x >>= 7
199
+		if x == 0 {
200
+			break
201
+		}
202
+	}
203
+	return n
204
+}
205
+func sozTarsum(x uint64) (n int) {
206
+	return sovTarsum(uint64((x << 1) ^ uint64((int64(x) >> 63))))
207
+}
208
+func (this *TarsumBackup) String() string {
209
+	if this == nil {
210
+		return "nil"
211
+	}
212
+	keysForHashes := make([]string, 0, len(this.Hashes))
213
+	for k, _ := range this.Hashes {
214
+		keysForHashes = append(keysForHashes, k)
215
+	}
216
+	github_com_gogo_protobuf_sortkeys.Strings(keysForHashes)
217
+	mapStringForHashes := "map[string]string{"
218
+	for _, k := range keysForHashes {
219
+		mapStringForHashes += fmt.Sprintf("%v: %v,", k, this.Hashes[k])
220
+	}
221
+	mapStringForHashes += "}"
222
+	s := strings.Join([]string{`&TarsumBackup{`,
223
+		`Hashes:` + mapStringForHashes + `,`,
224
+		`}`,
225
+	}, "")
226
+	return s
227
+}
228
+func valueToStringTarsum(v interface{}) string {
229
+	rv := reflect.ValueOf(v)
230
+	if rv.IsNil() {
231
+		return "nil"
232
+	}
233
+	pv := reflect.Indirect(rv).Interface()
234
+	return fmt.Sprintf("*%v", pv)
235
+}
236
+func (m *TarsumBackup) Unmarshal(dAtA []byte) error {
237
+	l := len(dAtA)
238
+	iNdEx := 0
239
+	for iNdEx < l {
240
+		preIndex := iNdEx
241
+		var wire uint64
242
+		for shift := uint(0); ; shift += 7 {
243
+			if shift >= 64 {
244
+				return ErrIntOverflowTarsum
245
+			}
246
+			if iNdEx >= l {
247
+				return io.ErrUnexpectedEOF
248
+			}
249
+			b := dAtA[iNdEx]
250
+			iNdEx++
251
+			wire |= (uint64(b) & 0x7F) << shift
252
+			if b < 0x80 {
253
+				break
254
+			}
255
+		}
256
+		fieldNum := int32(wire >> 3)
257
+		wireType := int(wire & 0x7)
258
+		if wireType == 4 {
259
+			return fmt.Errorf("proto: TarsumBackup: wiretype end group for non-group")
260
+		}
261
+		if fieldNum <= 0 {
262
+			return fmt.Errorf("proto: TarsumBackup: illegal tag %d (wire type %d)", fieldNum, wire)
263
+		}
264
+		switch fieldNum {
265
+		case 1:
266
+			if wireType != 2 {
267
+				return fmt.Errorf("proto: wrong wireType = %d for field Hashes", wireType)
268
+			}
269
+			var msglen int
270
+			for shift := uint(0); ; shift += 7 {
271
+				if shift >= 64 {
272
+					return ErrIntOverflowTarsum
273
+				}
274
+				if iNdEx >= l {
275
+					return io.ErrUnexpectedEOF
276
+				}
277
+				b := dAtA[iNdEx]
278
+				iNdEx++
279
+				msglen |= (int(b) & 0x7F) << shift
280
+				if b < 0x80 {
281
+					break
282
+				}
283
+			}
284
+			if msglen < 0 {
285
+				return ErrInvalidLengthTarsum
286
+			}
287
+			postIndex := iNdEx + msglen
288
+			if postIndex > l {
289
+				return io.ErrUnexpectedEOF
290
+			}
291
+			var keykey uint64
292
+			for shift := uint(0); ; shift += 7 {
293
+				if shift >= 64 {
294
+					return ErrIntOverflowTarsum
295
+				}
296
+				if iNdEx >= l {
297
+					return io.ErrUnexpectedEOF
298
+				}
299
+				b := dAtA[iNdEx]
300
+				iNdEx++
301
+				keykey |= (uint64(b) & 0x7F) << shift
302
+				if b < 0x80 {
303
+					break
304
+				}
305
+			}
306
+			var stringLenmapkey uint64
307
+			for shift := uint(0); ; shift += 7 {
308
+				if shift >= 64 {
309
+					return ErrIntOverflowTarsum
310
+				}
311
+				if iNdEx >= l {
312
+					return io.ErrUnexpectedEOF
313
+				}
314
+				b := dAtA[iNdEx]
315
+				iNdEx++
316
+				stringLenmapkey |= (uint64(b) & 0x7F) << shift
317
+				if b < 0x80 {
318
+					break
319
+				}
320
+			}
321
+			intStringLenmapkey := int(stringLenmapkey)
322
+			if intStringLenmapkey < 0 {
323
+				return ErrInvalidLengthTarsum
324
+			}
325
+			postStringIndexmapkey := iNdEx + intStringLenmapkey
326
+			if postStringIndexmapkey > l {
327
+				return io.ErrUnexpectedEOF
328
+			}
329
+			mapkey := string(dAtA[iNdEx:postStringIndexmapkey])
330
+			iNdEx = postStringIndexmapkey
331
+			if m.Hashes == nil {
332
+				m.Hashes = make(map[string]string)
333
+			}
334
+			if iNdEx < postIndex {
335
+				var valuekey uint64
336
+				for shift := uint(0); ; shift += 7 {
337
+					if shift >= 64 {
338
+						return ErrIntOverflowTarsum
339
+					}
340
+					if iNdEx >= l {
341
+						return io.ErrUnexpectedEOF
342
+					}
343
+					b := dAtA[iNdEx]
344
+					iNdEx++
345
+					valuekey |= (uint64(b) & 0x7F) << shift
346
+					if b < 0x80 {
347
+						break
348
+					}
349
+				}
350
+				var stringLenmapvalue uint64
351
+				for shift := uint(0); ; shift += 7 {
352
+					if shift >= 64 {
353
+						return ErrIntOverflowTarsum
354
+					}
355
+					if iNdEx >= l {
356
+						return io.ErrUnexpectedEOF
357
+					}
358
+					b := dAtA[iNdEx]
359
+					iNdEx++
360
+					stringLenmapvalue |= (uint64(b) & 0x7F) << shift
361
+					if b < 0x80 {
362
+						break
363
+					}
364
+				}
365
+				intStringLenmapvalue := int(stringLenmapvalue)
366
+				if intStringLenmapvalue < 0 {
367
+					return ErrInvalidLengthTarsum
368
+				}
369
+				postStringIndexmapvalue := iNdEx + intStringLenmapvalue
370
+				if postStringIndexmapvalue > l {
371
+					return io.ErrUnexpectedEOF
372
+				}
373
+				mapvalue := string(dAtA[iNdEx:postStringIndexmapvalue])
374
+				iNdEx = postStringIndexmapvalue
375
+				m.Hashes[mapkey] = mapvalue
376
+			} else {
377
+				var mapvalue string
378
+				m.Hashes[mapkey] = mapvalue
379
+			}
380
+			iNdEx = postIndex
381
+		default:
382
+			iNdEx = preIndex
383
+			skippy, err := skipTarsum(dAtA[iNdEx:])
384
+			if err != nil {
385
+				return err
386
+			}
387
+			if skippy < 0 {
388
+				return ErrInvalidLengthTarsum
389
+			}
390
+			if (iNdEx + skippy) > l {
391
+				return io.ErrUnexpectedEOF
392
+			}
393
+			iNdEx += skippy
394
+		}
395
+	}
396
+
397
+	if iNdEx > l {
398
+		return io.ErrUnexpectedEOF
399
+	}
400
+	return nil
401
+}
402
+func skipTarsum(dAtA []byte) (n int, err error) {
403
+	l := len(dAtA)
404
+	iNdEx := 0
405
+	for iNdEx < l {
406
+		var wire uint64
407
+		for shift := uint(0); ; shift += 7 {
408
+			if shift >= 64 {
409
+				return 0, ErrIntOverflowTarsum
410
+			}
411
+			if iNdEx >= l {
412
+				return 0, io.ErrUnexpectedEOF
413
+			}
414
+			b := dAtA[iNdEx]
415
+			iNdEx++
416
+			wire |= (uint64(b) & 0x7F) << shift
417
+			if b < 0x80 {
418
+				break
419
+			}
420
+		}
421
+		wireType := int(wire & 0x7)
422
+		switch wireType {
423
+		case 0:
424
+			for shift := uint(0); ; shift += 7 {
425
+				if shift >= 64 {
426
+					return 0, ErrIntOverflowTarsum
427
+				}
428
+				if iNdEx >= l {
429
+					return 0, io.ErrUnexpectedEOF
430
+				}
431
+				iNdEx++
432
+				if dAtA[iNdEx-1] < 0x80 {
433
+					break
434
+				}
435
+			}
436
+			return iNdEx, nil
437
+		case 1:
438
+			iNdEx += 8
439
+			return iNdEx, nil
440
+		case 2:
441
+			var length int
442
+			for shift := uint(0); ; shift += 7 {
443
+				if shift >= 64 {
444
+					return 0, ErrIntOverflowTarsum
445
+				}
446
+				if iNdEx >= l {
447
+					return 0, io.ErrUnexpectedEOF
448
+				}
449
+				b := dAtA[iNdEx]
450
+				iNdEx++
451
+				length |= (int(b) & 0x7F) << shift
452
+				if b < 0x80 {
453
+					break
454
+				}
455
+			}
456
+			iNdEx += length
457
+			if length < 0 {
458
+				return 0, ErrInvalidLengthTarsum
459
+			}
460
+			return iNdEx, nil
461
+		case 3:
462
+			for {
463
+				var innerWire uint64
464
+				var start int = iNdEx
465
+				for shift := uint(0); ; shift += 7 {
466
+					if shift >= 64 {
467
+						return 0, ErrIntOverflowTarsum
468
+					}
469
+					if iNdEx >= l {
470
+						return 0, io.ErrUnexpectedEOF
471
+					}
472
+					b := dAtA[iNdEx]
473
+					iNdEx++
474
+					innerWire |= (uint64(b) & 0x7F) << shift
475
+					if b < 0x80 {
476
+						break
477
+					}
478
+				}
479
+				innerWireType := int(innerWire & 0x7)
480
+				if innerWireType == 4 {
481
+					break
482
+				}
483
+				next, err := skipTarsum(dAtA[start:])
484
+				if err != nil {
485
+					return 0, err
486
+				}
487
+				iNdEx = start + next
488
+			}
489
+			return iNdEx, nil
490
+		case 4:
491
+			return iNdEx, nil
492
+		case 5:
493
+			iNdEx += 4
494
+			return iNdEx, nil
495
+		default:
496
+			return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
497
+		}
498
+	}
499
+	panic("unreachable")
500
+}
501
+
502
+var (
503
+	ErrInvalidLengthTarsum = fmt.Errorf("proto: negative length found during unmarshaling")
504
+	ErrIntOverflowTarsum   = fmt.Errorf("proto: integer overflow")
505
+)
506
+
507
+func init() { proto.RegisterFile("tarsum.proto", fileDescriptorTarsum) }
508
+
509
+var fileDescriptorTarsum = []byte{
510
+	// 196 bytes of a gzipped FileDescriptorProto
511
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x29, 0x49, 0x2c, 0x2a,
512
+	0x2e, 0xcd, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x2d, 0x4a, 0xcd, 0xcd, 0x2f, 0x49,
513
+	0x4d, 0xce, 0xcf, 0x2b, 0x49, 0xad, 0x28, 0x51, 0xea, 0x62, 0xe4, 0xe2, 0x09, 0x01, 0xcb, 0x3b,
514
+	0x25, 0x26, 0x67, 0x97, 0x16, 0x08, 0xd9, 0x73, 0xb1, 0x79, 0x24, 0x16, 0x67, 0xa4, 0x16, 0x4b,
515
+	0x30, 0x2a, 0x30, 0x6b, 0x70, 0x1b, 0xa9, 0xeb, 0xa1, 0x68, 0xd0, 0x43, 0x56, 0xac, 0x07, 0x51,
516
+	0xe9, 0x9a, 0x57, 0x52, 0x54, 0x19, 0x04, 0xd5, 0x26, 0x65, 0xc9, 0xc5, 0x8d, 0x24, 0x2c, 0x24,
517
+	0xc0, 0xc5, 0x9c, 0x9d, 0x5a, 0x29, 0xc1, 0xa8, 0xc0, 0xa8, 0xc1, 0x19, 0x04, 0x62, 0x0a, 0x89,
518
+	0x70, 0xb1, 0x96, 0x25, 0xe6, 0x94, 0xa6, 0x4a, 0x30, 0x81, 0xc5, 0x20, 0x1c, 0x2b, 0x26, 0x0b,
519
+	0x46, 0x27, 0x9d, 0x0b, 0x0f, 0xe5, 0x18, 0x6e, 0x3c, 0x94, 0x63, 0xf8, 0xf0, 0x50, 0x8e, 0xb1,
520
+	0xe1, 0x91, 0x1c, 0xe3, 0x8a, 0x47, 0x72, 0x8c, 0x27, 0x1e, 0xc9, 0x31, 0x5e, 0x78, 0x24, 0xc7,
521
+	0xf8, 0xe0, 0x91, 0x1c, 0xe3, 0x8b, 0x47, 0x72, 0x0c, 0x1f, 0x1e, 0xc9, 0x31, 0x4e, 0x78, 0x2c,
522
+	0xc7, 0x90, 0xc4, 0x06, 0xf6, 0x90, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0x89, 0x57, 0x7d, 0x3f,
523
+	0xe0, 0x00, 0x00, 0x00,
524
+}
0 525
new file mode 100644
... ...
@@ -0,0 +1,7 @@
0
+syntax = "proto3";
1
+
2
+package remotecontext; // no namespace because only used internally
3
+
4
+message TarsumBackup {
5
+  map<string, string> Hashes = 1;
6
+}
0 7
\ No newline at end of file
... ...
@@ -9,6 +9,7 @@ import (
9 9
 	"github.com/docker/docker/builder"
10 10
 	"github.com/docker/docker/pkg/archive"
11 11
 	"github.com/docker/docker/pkg/reexec"
12
+	"github.com/pkg/errors"
12 13
 )
13 14
 
14 15
 const (
... ...
@@ -22,24 +23,22 @@ func init() {
22 22
 
23 23
 func TestCloseRootDirectory(t *testing.T) {
24 24
 	contextDir, err := ioutil.TempDir("", "builder-tarsum-test")
25
-
25
+	defer os.RemoveAll(contextDir)
26 26
 	if err != nil {
27 27
 		t.Fatalf("Error with creating temporary directory: %s", err)
28 28
 	}
29 29
 
30
-	tarsum := &tarSumContext{root: contextDir}
31
-
32
-	err = tarsum.Close()
30
+	src := makeTestArchiveContext(t, contextDir)
31
+	err = src.Close()
33 32
 
34 33
 	if err != nil {
35 34
 		t.Fatalf("Error while executing Close: %s", err)
36 35
 	}
37 36
 
38
-	_, err = os.Stat(contextDir)
37
+	_, err = os.Stat(src.Root())
39 38
 
40 39
 	if !os.IsNotExist(err) {
41 40
 		t.Fatal("Directory should not exist at this point")
42
-		defer os.RemoveAll(contextDir)
43 41
 	}
44 42
 }
45 43
 
... ...
@@ -49,7 +48,7 @@ func TestHashFile(t *testing.T) {
49 49
 
50 50
 	createTestTempFile(t, contextDir, filename, contents, 0755)
51 51
 
52
-	tarSum := makeTestTarsumContext(t, contextDir)
52
+	tarSum := makeTestArchiveContext(t, contextDir)
53 53
 
54 54
 	sum, err := tarSum.Hash(filename)
55 55
 
... ...
@@ -80,7 +79,7 @@ func TestHashSubdir(t *testing.T) {
80 80
 
81 81
 	testFilename := createTestTempFile(t, contextSubdir, filename, contents, 0755)
82 82
 
83
-	tarSum := makeTestTarsumContext(t, contextDir)
83
+	tarSum := makeTestArchiveContext(t, contextDir)
84 84
 
85 85
 	relativePath, err := filepath.Rel(contextDir, testFilename)
86 86
 
... ...
@@ -109,11 +108,9 @@ func TestStatNotExisting(t *testing.T) {
109 109
 	contextDir, cleanup := createTestTempDir(t, "", "builder-tarsum-test")
110 110
 	defer cleanup()
111 111
 
112
-	tarSum := &tarSumContext{root: contextDir}
113
-
114
-	_, err := tarSum.Hash("not-existing")
115
-
116
-	if !os.IsNotExist(err) {
112
+	src := makeTestArchiveContext(t, contextDir)
113
+	_, err := src.Hash("not-existing")
114
+	if !os.IsNotExist(errors.Cause(err)) {
117 115
 		t.Fatalf("This file should not exist: %s", err)
118 116
 	}
119 117
 }
... ...
@@ -130,30 +127,31 @@ func TestRemoveDirectory(t *testing.T) {
130 130
 		t.Fatalf("Error when getting relative path: %s", err)
131 131
 	}
132 132
 
133
-	tarSum := &tarSumContext{root: contextDir}
133
+	src := makeTestArchiveContext(t, contextDir)
134 134
 
135
-	err = tarSum.Remove(relativePath)
135
+	tarSum := src.(modifiableContext)
136 136
 
137
+	err = tarSum.Remove(relativePath)
137 138
 	if err != nil {
138 139
 		t.Fatalf("Error when executing Remove: %s", err)
139 140
 	}
140 141
 
141
-	_, err = os.Stat(contextSubdir)
142
+	_, err = src.Hash(contextSubdir)
142 143
 
143
-	if !os.IsNotExist(err) {
144
+	if !os.IsNotExist(errors.Cause(err)) {
144 145
 		t.Fatal("Directory should not exist at this point")
145 146
 	}
146 147
 }
147 148
 
148
-func makeTestTarsumContext(t *testing.T, dir string) builder.Source {
149
+func makeTestArchiveContext(t *testing.T, dir string) builder.Source {
149 150
 	tarStream, err := archive.Tar(dir, archive.Uncompressed)
150 151
 	if err != nil {
151 152
 		t.Fatalf("error: %s", err)
152 153
 	}
153 154
 	defer tarStream.Close()
154
-	tarSum, err := MakeTarSumContext(tarStream)
155
+	tarSum, err := FromArchive(tarStream)
155 156
 	if err != nil {
156
-		t.Fatalf("Error when executing MakeTarSumContext: %s", err)
157
+		t.Fatalf("Error when executing FromArchive: %s", err)
157 158
 	}
158 159
 	return tarSum
159 160
 }
160 161
new file mode 100644
... ...
@@ -0,0 +1,30 @@
0
+package client
1
+
2
+import (
3
+	"encoding/json"
4
+	"fmt"
5
+
6
+	"github.com/docker/docker/api/types"
7
+	"golang.org/x/net/context"
8
+)
9
+
10
+// BuildCachePrune requests the daemon to delete unused cache data
11
+func (cli *Client) BuildCachePrune(ctx context.Context) (*types.BuildCachePruneReport, error) {
12
+	if err := cli.NewVersionError("1.31", "build prune"); err != nil {
13
+		return nil, err
14
+	}
15
+
16
+	report := types.BuildCachePruneReport{}
17
+
18
+	serverResp, err := cli.post(ctx, "/build/prune", nil, nil, nil)
19
+	if err != nil {
20
+		return nil, err
21
+	}
22
+	defer ensureReaderClosed(serverResp)
23
+
24
+	if err := json.NewDecoder(serverResp.body).Decode(&report); err != nil {
25
+		return nil, fmt.Errorf("Error retrieving disk usage: %v", err)
26
+	}
27
+
28
+	return &report, nil
29
+}
... ...
@@ -82,6 +82,7 @@ type DistributionAPIClient interface {
82 82
 // ImageAPIClient defines API client methods for the images
83 83
 type ImageAPIClient interface {
84 84
 	ImageBuild(ctx context.Context, context io.Reader, options types.ImageBuildOptions) (types.ImageBuildResponse, error)
85
+	BuildCachePrune(ctx context.Context) (*types.BuildCachePruneReport, error)
85 86
 	ImageCreate(ctx context.Context, parentReference string, options types.ImageCreateOptions) (io.ReadCloser, error)
86 87
 	ImageHistory(ctx context.Context, image string) ([]image.HistoryResponseItem, error)
87 88
 	ImageImport(ctx context.Context, source types.ImageImportSource, ref string, options types.ImageImportOptions) (io.ReadCloser, error)
88 89
new file mode 100644
... ...
@@ -0,0 +1,30 @@
0
+package filesync
1
+
2
+import (
3
+	"time"
4
+
5
+	"google.golang.org/grpc"
6
+
7
+	"github.com/Sirupsen/logrus"
8
+	"github.com/tonistiigi/fsutil"
9
+)
10
+
11
+func sendDiffCopy(stream grpc.Stream, dir string, excludes []string, progress progressCb) error {
12
+	return fsutil.Send(stream.Context(), stream, dir, &fsutil.WalkOpt{
13
+		ExcludePatterns: excludes,
14
+	}, progress)
15
+}
16
+
17
+func recvDiffCopy(ds grpc.Stream, dest string, cu CacheUpdater) error {
18
+	st := time.Now()
19
+	defer func() {
20
+		logrus.Debugf("diffcopy took: %v", time.Since(st))
21
+	}()
22
+	var cf fsutil.ChangeFunc
23
+	if cu != nil {
24
+		cu.MarkSupported(true)
25
+		cf = cu.HandleChange
26
+	}
27
+
28
+	return fsutil.Receive(ds.Context(), ds, dest, cf)
29
+}
0 30
new file mode 100644
... ...
@@ -0,0 +1,173 @@
0
+package filesync
1
+
2
+import (
3
+	"os"
4
+	"strings"
5
+
6
+	"github.com/docker/docker/client/session"
7
+	"github.com/pkg/errors"
8
+	"github.com/tonistiigi/fsutil"
9
+	"golang.org/x/net/context"
10
+	"google.golang.org/grpc"
11
+	"google.golang.org/grpc/metadata"
12
+)
13
+
14
+type fsSyncProvider struct {
15
+	root     string
16
+	excludes []string
17
+	p        progressCb
18
+	doneCh   chan error
19
+}
20
+
21
+// NewFSSyncProvider creates a new provider for sending files from client
22
+func NewFSSyncProvider(root string, excludes []string) session.Attachable {
23
+	p := &fsSyncProvider{
24
+		root:     root,
25
+		excludes: excludes,
26
+	}
27
+	return p
28
+}
29
+
30
+func (sp *fsSyncProvider) Register(server *grpc.Server) {
31
+	RegisterFileSyncServer(server, sp)
32
+}
33
+
34
+func (sp *fsSyncProvider) DiffCopy(stream FileSync_DiffCopyServer) error {
35
+	return sp.handle("diffcopy", stream)
36
+}
37
+func (sp *fsSyncProvider) TarStream(stream FileSync_TarStreamServer) error {
38
+	return sp.handle("tarstream", stream)
39
+}
40
+
41
+func (sp *fsSyncProvider) handle(method string, stream grpc.ServerStream) error {
42
+	var pr *protocol
43
+	for _, p := range supportedProtocols {
44
+		if method == p.name && isProtoSupported(p.name) {
45
+			pr = &p
46
+			break
47
+		}
48
+	}
49
+	if pr == nil {
50
+		return errors.New("failed to negotiate protocol")
51
+	}
52
+
53
+	opts, _ := metadata.FromContext(stream.Context()) // if no metadata continue with empty object
54
+
55
+	var excludes []string
56
+	if len(opts["Override-Excludes"]) == 0 || opts["Override-Excludes"][0] != "true" {
57
+		excludes = sp.excludes
58
+	}
59
+
60
+	var progress progressCb
61
+	if sp.p != nil {
62
+		progress = sp.p
63
+		sp.p = nil
64
+	}
65
+
66
+	var doneCh chan error
67
+	if sp.doneCh != nil {
68
+		doneCh = sp.doneCh
69
+		sp.doneCh = nil
70
+	}
71
+	err := pr.sendFn(stream, sp.root, excludes, progress)
72
+	if doneCh != nil {
73
+		if err != nil {
74
+			doneCh <- err
75
+		}
76
+		close(doneCh)
77
+	}
78
+	return err
79
+}
80
+
81
+func (sp *fsSyncProvider) SetNextProgressCallback(f func(int, bool), doneCh chan error) {
82
+	sp.p = f
83
+	sp.doneCh = doneCh
84
+}
85
+
86
+type progressCb func(int, bool)
87
+
88
+type protocol struct {
89
+	name   string
90
+	sendFn func(stream grpc.Stream, srcDir string, excludes []string, progress progressCb) error
91
+	recvFn func(stream grpc.Stream, destDir string, cu CacheUpdater) error
92
+}
93
+
94
+func isProtoSupported(p string) bool {
95
+	// TODO: this should be removed after testing if stability is confirmed
96
+	if override := os.Getenv("BUILD_STREAM_PROTOCOL"); override != "" {
97
+		return strings.EqualFold(p, override)
98
+	}
99
+	return true
100
+}
101
+
102
+var supportedProtocols = []protocol{
103
+	{
104
+		name:   "diffcopy",
105
+		sendFn: sendDiffCopy,
106
+		recvFn: recvDiffCopy,
107
+	},
108
+	{
109
+		name:   "tarstream",
110
+		sendFn: sendTarStream,
111
+		recvFn: recvTarStream,
112
+	},
113
+}
114
+
115
+// FSSendRequestOpt defines options for FSSend request
116
+type FSSendRequestOpt struct {
117
+	SrcPaths         []string
118
+	OverrideExcludes bool
119
+	DestDir          string
120
+	CacheUpdater     CacheUpdater
121
+}
122
+
123
+// CacheUpdater is an object capable of sending notifications for the cache hash changes
124
+type CacheUpdater interface {
125
+	MarkSupported(bool)
126
+	HandleChange(fsutil.ChangeKind, string, os.FileInfo, error) error
127
+}
128
+
129
+// FSSync initializes a transfer of files
130
+func FSSync(ctx context.Context, c session.Caller, opt FSSendRequestOpt) error {
131
+	var pr *protocol
132
+	for _, p := range supportedProtocols {
133
+		if isProtoSupported(p.name) && c.Supports(session.MethodURL(_FileSync_serviceDesc.ServiceName, p.name)) {
134
+			pr = &p
135
+			break
136
+		}
137
+	}
138
+	if pr == nil {
139
+		return errors.New("no fssync handlers")
140
+	}
141
+
142
+	opts := make(map[string][]string)
143
+	if opt.OverrideExcludes {
144
+		opts["Override-Excludes"] = []string{"true"}
145
+	}
146
+
147
+	ctx, cancel := context.WithCancel(ctx)
148
+	defer cancel()
149
+
150
+	client := NewFileSyncClient(c.Conn())
151
+
152
+	var stream grpc.ClientStream
153
+
154
+	ctx = metadata.NewContext(ctx, opts)
155
+
156
+	switch pr.name {
157
+	case "tarstream":
158
+		cc, err := client.TarStream(ctx)
159
+		if err != nil {
160
+			return err
161
+		}
162
+		stream = cc
163
+	case "diffcopy":
164
+		cc, err := client.DiffCopy(ctx)
165
+		if err != nil {
166
+			return err
167
+		}
168
+		stream = cc
169
+	}
170
+
171
+	return pr.recvFn(stream, opt.DestDir, opt.CacheUpdater)
172
+}
0 173
new file mode 100644
... ...
@@ -0,0 +1,575 @@
0
+// Code generated by protoc-gen-gogo.
1
+// source: filesync.proto
2
+// DO NOT EDIT!
3
+
4
+/*
5
+Package filesync is a generated protocol buffer package.
6
+
7
+It is generated from these files:
8
+	filesync.proto
9
+
10
+It has these top-level messages:
11
+	BytesMessage
12
+*/
13
+package filesync
14
+
15
+import proto "github.com/gogo/protobuf/proto"
16
+import fmt "fmt"
17
+import math "math"
18
+
19
+import bytes "bytes"
20
+
21
+import strings "strings"
22
+import reflect "reflect"
23
+
24
+import (
25
+	context "golang.org/x/net/context"
26
+	grpc "google.golang.org/grpc"
27
+)
28
+
29
+import io "io"
30
+
31
+// Reference imports to suppress errors if they are not otherwise used.
32
+var _ = proto.Marshal
33
+var _ = fmt.Errorf
34
+var _ = math.Inf
35
+
36
+// This is a compile-time assertion to ensure that this generated file
37
+// is compatible with the proto package it is being compiled against.
38
+// A compilation error at this line likely means your copy of the
39
+// proto package needs to be updated.
40
+const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
41
+
42
+// BytesMessage contains a chunk of byte data
43
+type BytesMessage struct {
44
+	Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
45
+}
46
+
47
+func (m *BytesMessage) Reset()                    { *m = BytesMessage{} }
48
+func (*BytesMessage) ProtoMessage()               {}
49
+func (*BytesMessage) Descriptor() ([]byte, []int) { return fileDescriptorFilesync, []int{0} }
50
+
51
+func (m *BytesMessage) GetData() []byte {
52
+	if m != nil {
53
+		return m.Data
54
+	}
55
+	return nil
56
+}
57
+
58
+func init() {
59
+	proto.RegisterType((*BytesMessage)(nil), "moby.filesync.v1.BytesMessage")
60
+}
61
+func (this *BytesMessage) Equal(that interface{}) bool {
62
+	if that == nil {
63
+		if this == nil {
64
+			return true
65
+		}
66
+		return false
67
+	}
68
+
69
+	that1, ok := that.(*BytesMessage)
70
+	if !ok {
71
+		that2, ok := that.(BytesMessage)
72
+		if ok {
73
+			that1 = &that2
74
+		} else {
75
+			return false
76
+		}
77
+	}
78
+	if that1 == nil {
79
+		if this == nil {
80
+			return true
81
+		}
82
+		return false
83
+	} else if this == nil {
84
+		return false
85
+	}
86
+	if !bytes.Equal(this.Data, that1.Data) {
87
+		return false
88
+	}
89
+	return true
90
+}
91
+func (this *BytesMessage) GoString() string {
92
+	if this == nil {
93
+		return "nil"
94
+	}
95
+	s := make([]string, 0, 5)
96
+	s = append(s, "&filesync.BytesMessage{")
97
+	s = append(s, "Data: "+fmt.Sprintf("%#v", this.Data)+",\n")
98
+	s = append(s, "}")
99
+	return strings.Join(s, "")
100
+}
101
+func valueToGoStringFilesync(v interface{}, typ string) string {
102
+	rv := reflect.ValueOf(v)
103
+	if rv.IsNil() {
104
+		return "nil"
105
+	}
106
+	pv := reflect.Indirect(rv).Interface()
107
+	return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv)
108
+}
109
+
110
+// Reference imports to suppress errors if they are not otherwise used.
111
+var _ context.Context
112
+var _ grpc.ClientConn
113
+
114
+// This is a compile-time assertion to ensure that this generated file
115
+// is compatible with the grpc package it is being compiled against.
116
+const _ = grpc.SupportPackageIsVersion4
117
+
118
+// Client API for FileSync service
119
+
120
+type FileSyncClient interface {
121
+	DiffCopy(ctx context.Context, opts ...grpc.CallOption) (FileSync_DiffCopyClient, error)
122
+	TarStream(ctx context.Context, opts ...grpc.CallOption) (FileSync_TarStreamClient, error)
123
+}
124
+
125
+type fileSyncClient struct {
126
+	cc *grpc.ClientConn
127
+}
128
+
129
+func NewFileSyncClient(cc *grpc.ClientConn) FileSyncClient {
130
+	return &fileSyncClient{cc}
131
+}
132
+
133
+func (c *fileSyncClient) DiffCopy(ctx context.Context, opts ...grpc.CallOption) (FileSync_DiffCopyClient, error) {
134
+	stream, err := grpc.NewClientStream(ctx, &_FileSync_serviceDesc.Streams[0], c.cc, "/moby.filesync.v1.FileSync/DiffCopy", opts...)
135
+	if err != nil {
136
+		return nil, err
137
+	}
138
+	x := &fileSyncDiffCopyClient{stream}
139
+	return x, nil
140
+}
141
+
142
+type FileSync_DiffCopyClient interface {
143
+	Send(*BytesMessage) error
144
+	Recv() (*BytesMessage, error)
145
+	grpc.ClientStream
146
+}
147
+
148
+type fileSyncDiffCopyClient struct {
149
+	grpc.ClientStream
150
+}
151
+
152
+func (x *fileSyncDiffCopyClient) Send(m *BytesMessage) error {
153
+	return x.ClientStream.SendMsg(m)
154
+}
155
+
156
+func (x *fileSyncDiffCopyClient) Recv() (*BytesMessage, error) {
157
+	m := new(BytesMessage)
158
+	if err := x.ClientStream.RecvMsg(m); err != nil {
159
+		return nil, err
160
+	}
161
+	return m, nil
162
+}
163
+
164
+func (c *fileSyncClient) TarStream(ctx context.Context, opts ...grpc.CallOption) (FileSync_TarStreamClient, error) {
165
+	stream, err := grpc.NewClientStream(ctx, &_FileSync_serviceDesc.Streams[1], c.cc, "/moby.filesync.v1.FileSync/TarStream", opts...)
166
+	if err != nil {
167
+		return nil, err
168
+	}
169
+	x := &fileSyncTarStreamClient{stream}
170
+	return x, nil
171
+}
172
+
173
+type FileSync_TarStreamClient interface {
174
+	Send(*BytesMessage) error
175
+	Recv() (*BytesMessage, error)
176
+	grpc.ClientStream
177
+}
178
+
179
+type fileSyncTarStreamClient struct {
180
+	grpc.ClientStream
181
+}
182
+
183
+func (x *fileSyncTarStreamClient) Send(m *BytesMessage) error {
184
+	return x.ClientStream.SendMsg(m)
185
+}
186
+
187
+func (x *fileSyncTarStreamClient) Recv() (*BytesMessage, error) {
188
+	m := new(BytesMessage)
189
+	if err := x.ClientStream.RecvMsg(m); err != nil {
190
+		return nil, err
191
+	}
192
+	return m, nil
193
+}
194
+
195
+// Server API for FileSync service
196
+
197
+type FileSyncServer interface {
198
+	DiffCopy(FileSync_DiffCopyServer) error
199
+	TarStream(FileSync_TarStreamServer) error
200
+}
201
+
202
+func RegisterFileSyncServer(s *grpc.Server, srv FileSyncServer) {
203
+	s.RegisterService(&_FileSync_serviceDesc, srv)
204
+}
205
+
206
+func _FileSync_DiffCopy_Handler(srv interface{}, stream grpc.ServerStream) error {
207
+	return srv.(FileSyncServer).DiffCopy(&fileSyncDiffCopyServer{stream})
208
+}
209
+
210
+type FileSync_DiffCopyServer interface {
211
+	Send(*BytesMessage) error
212
+	Recv() (*BytesMessage, error)
213
+	grpc.ServerStream
214
+}
215
+
216
+type fileSyncDiffCopyServer struct {
217
+	grpc.ServerStream
218
+}
219
+
220
+func (x *fileSyncDiffCopyServer) Send(m *BytesMessage) error {
221
+	return x.ServerStream.SendMsg(m)
222
+}
223
+
224
+func (x *fileSyncDiffCopyServer) Recv() (*BytesMessage, error) {
225
+	m := new(BytesMessage)
226
+	if err := x.ServerStream.RecvMsg(m); err != nil {
227
+		return nil, err
228
+	}
229
+	return m, nil
230
+}
231
+
232
+func _FileSync_TarStream_Handler(srv interface{}, stream grpc.ServerStream) error {
233
+	return srv.(FileSyncServer).TarStream(&fileSyncTarStreamServer{stream})
234
+}
235
+
236
+type FileSync_TarStreamServer interface {
237
+	Send(*BytesMessage) error
238
+	Recv() (*BytesMessage, error)
239
+	grpc.ServerStream
240
+}
241
+
242
+type fileSyncTarStreamServer struct {
243
+	grpc.ServerStream
244
+}
245
+
246
+func (x *fileSyncTarStreamServer) Send(m *BytesMessage) error {
247
+	return x.ServerStream.SendMsg(m)
248
+}
249
+
250
+func (x *fileSyncTarStreamServer) Recv() (*BytesMessage, error) {
251
+	m := new(BytesMessage)
252
+	if err := x.ServerStream.RecvMsg(m); err != nil {
253
+		return nil, err
254
+	}
255
+	return m, nil
256
+}
257
+
258
+var _FileSync_serviceDesc = grpc.ServiceDesc{
259
+	ServiceName: "moby.filesync.v1.FileSync",
260
+	HandlerType: (*FileSyncServer)(nil),
261
+	Methods:     []grpc.MethodDesc{},
262
+	Streams: []grpc.StreamDesc{
263
+		{
264
+			StreamName:    "DiffCopy",
265
+			Handler:       _FileSync_DiffCopy_Handler,
266
+			ServerStreams: true,
267
+			ClientStreams: true,
268
+		},
269
+		{
270
+			StreamName:    "TarStream",
271
+			Handler:       _FileSync_TarStream_Handler,
272
+			ServerStreams: true,
273
+			ClientStreams: true,
274
+		},
275
+	},
276
+	Metadata: "filesync.proto",
277
+}
278
+
279
+func (m *BytesMessage) Marshal() (dAtA []byte, err error) {
280
+	size := m.Size()
281
+	dAtA = make([]byte, size)
282
+	n, err := m.MarshalTo(dAtA)
283
+	if err != nil {
284
+		return nil, err
285
+	}
286
+	return dAtA[:n], nil
287
+}
288
+
289
+func (m *BytesMessage) MarshalTo(dAtA []byte) (int, error) {
290
+	var i int
291
+	_ = i
292
+	var l int
293
+	_ = l
294
+	if len(m.Data) > 0 {
295
+		dAtA[i] = 0xa
296
+		i++
297
+		i = encodeVarintFilesync(dAtA, i, uint64(len(m.Data)))
298
+		i += copy(dAtA[i:], m.Data)
299
+	}
300
+	return i, nil
301
+}
302
+
303
+func encodeFixed64Filesync(dAtA []byte, offset int, v uint64) int {
304
+	dAtA[offset] = uint8(v)
305
+	dAtA[offset+1] = uint8(v >> 8)
306
+	dAtA[offset+2] = uint8(v >> 16)
307
+	dAtA[offset+3] = uint8(v >> 24)
308
+	dAtA[offset+4] = uint8(v >> 32)
309
+	dAtA[offset+5] = uint8(v >> 40)
310
+	dAtA[offset+6] = uint8(v >> 48)
311
+	dAtA[offset+7] = uint8(v >> 56)
312
+	return offset + 8
313
+}
314
+func encodeFixed32Filesync(dAtA []byte, offset int, v uint32) int {
315
+	dAtA[offset] = uint8(v)
316
+	dAtA[offset+1] = uint8(v >> 8)
317
+	dAtA[offset+2] = uint8(v >> 16)
318
+	dAtA[offset+3] = uint8(v >> 24)
319
+	return offset + 4
320
+}
321
+func encodeVarintFilesync(dAtA []byte, offset int, v uint64) int {
322
+	for v >= 1<<7 {
323
+		dAtA[offset] = uint8(v&0x7f | 0x80)
324
+		v >>= 7
325
+		offset++
326
+	}
327
+	dAtA[offset] = uint8(v)
328
+	return offset + 1
329
+}
330
+func (m *BytesMessage) Size() (n int) {
331
+	var l int
332
+	_ = l
333
+	l = len(m.Data)
334
+	if l > 0 {
335
+		n += 1 + l + sovFilesync(uint64(l))
336
+	}
337
+	return n
338
+}
339
+
340
+func sovFilesync(x uint64) (n int) {
341
+	for {
342
+		n++
343
+		x >>= 7
344
+		if x == 0 {
345
+			break
346
+		}
347
+	}
348
+	return n
349
+}
350
+func sozFilesync(x uint64) (n int) {
351
+	return sovFilesync(uint64((x << 1) ^ uint64((int64(x) >> 63))))
352
+}
353
+func (this *BytesMessage) String() string {
354
+	if this == nil {
355
+		return "nil"
356
+	}
357
+	s := strings.Join([]string{`&BytesMessage{`,
358
+		`Data:` + fmt.Sprintf("%v", this.Data) + `,`,
359
+		`}`,
360
+	}, "")
361
+	return s
362
+}
363
+func valueToStringFilesync(v interface{}) string {
364
+	rv := reflect.ValueOf(v)
365
+	if rv.IsNil() {
366
+		return "nil"
367
+	}
368
+	pv := reflect.Indirect(rv).Interface()
369
+	return fmt.Sprintf("*%v", pv)
370
+}
371
+func (m *BytesMessage) Unmarshal(dAtA []byte) error {
372
+	l := len(dAtA)
373
+	iNdEx := 0
374
+	for iNdEx < l {
375
+		preIndex := iNdEx
376
+		var wire uint64
377
+		for shift := uint(0); ; shift += 7 {
378
+			if shift >= 64 {
379
+				return ErrIntOverflowFilesync
380
+			}
381
+			if iNdEx >= l {
382
+				return io.ErrUnexpectedEOF
383
+			}
384
+			b := dAtA[iNdEx]
385
+			iNdEx++
386
+			wire |= (uint64(b) & 0x7F) << shift
387
+			if b < 0x80 {
388
+				break
389
+			}
390
+		}
391
+		fieldNum := int32(wire >> 3)
392
+		wireType := int(wire & 0x7)
393
+		if wireType == 4 {
394
+			return fmt.Errorf("proto: BytesMessage: wiretype end group for non-group")
395
+		}
396
+		if fieldNum <= 0 {
397
+			return fmt.Errorf("proto: BytesMessage: illegal tag %d (wire type %d)", fieldNum, wire)
398
+		}
399
+		switch fieldNum {
400
+		case 1:
401
+			if wireType != 2 {
402
+				return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType)
403
+			}
404
+			var byteLen int
405
+			for shift := uint(0); ; shift += 7 {
406
+				if shift >= 64 {
407
+					return ErrIntOverflowFilesync
408
+				}
409
+				if iNdEx >= l {
410
+					return io.ErrUnexpectedEOF
411
+				}
412
+				b := dAtA[iNdEx]
413
+				iNdEx++
414
+				byteLen |= (int(b) & 0x7F) << shift
415
+				if b < 0x80 {
416
+					break
417
+				}
418
+			}
419
+			if byteLen < 0 {
420
+				return ErrInvalidLengthFilesync
421
+			}
422
+			postIndex := iNdEx + byteLen
423
+			if postIndex > l {
424
+				return io.ErrUnexpectedEOF
425
+			}
426
+			m.Data = append(m.Data[:0], dAtA[iNdEx:postIndex]...)
427
+			if m.Data == nil {
428
+				m.Data = []byte{}
429
+			}
430
+			iNdEx = postIndex
431
+		default:
432
+			iNdEx = preIndex
433
+			skippy, err := skipFilesync(dAtA[iNdEx:])
434
+			if err != nil {
435
+				return err
436
+			}
437
+			if skippy < 0 {
438
+				return ErrInvalidLengthFilesync
439
+			}
440
+			if (iNdEx + skippy) > l {
441
+				return io.ErrUnexpectedEOF
442
+			}
443
+			iNdEx += skippy
444
+		}
445
+	}
446
+
447
+	if iNdEx > l {
448
+		return io.ErrUnexpectedEOF
449
+	}
450
+	return nil
451
+}
452
+func skipFilesync(dAtA []byte) (n int, err error) {
453
+	l := len(dAtA)
454
+	iNdEx := 0
455
+	for iNdEx < l {
456
+		var wire uint64
457
+		for shift := uint(0); ; shift += 7 {
458
+			if shift >= 64 {
459
+				return 0, ErrIntOverflowFilesync
460
+			}
461
+			if iNdEx >= l {
462
+				return 0, io.ErrUnexpectedEOF
463
+			}
464
+			b := dAtA[iNdEx]
465
+			iNdEx++
466
+			wire |= (uint64(b) & 0x7F) << shift
467
+			if b < 0x80 {
468
+				break
469
+			}
470
+		}
471
+		wireType := int(wire & 0x7)
472
+		switch wireType {
473
+		case 0:
474
+			for shift := uint(0); ; shift += 7 {
475
+				if shift >= 64 {
476
+					return 0, ErrIntOverflowFilesync
477
+				}
478
+				if iNdEx >= l {
479
+					return 0, io.ErrUnexpectedEOF
480
+				}
481
+				iNdEx++
482
+				if dAtA[iNdEx-1] < 0x80 {
483
+					break
484
+				}
485
+			}
486
+			return iNdEx, nil
487
+		case 1:
488
+			iNdEx += 8
489
+			return iNdEx, nil
490
+		case 2:
491
+			var length int
492
+			for shift := uint(0); ; shift += 7 {
493
+				if shift >= 64 {
494
+					return 0, ErrIntOverflowFilesync
495
+				}
496
+				if iNdEx >= l {
497
+					return 0, io.ErrUnexpectedEOF
498
+				}
499
+				b := dAtA[iNdEx]
500
+				iNdEx++
501
+				length |= (int(b) & 0x7F) << shift
502
+				if b < 0x80 {
503
+					break
504
+				}
505
+			}
506
+			iNdEx += length
507
+			if length < 0 {
508
+				return 0, ErrInvalidLengthFilesync
509
+			}
510
+			return iNdEx, nil
511
+		case 3:
512
+			for {
513
+				var innerWire uint64
514
+				var start int = iNdEx
515
+				for shift := uint(0); ; shift += 7 {
516
+					if shift >= 64 {
517
+						return 0, ErrIntOverflowFilesync
518
+					}
519
+					if iNdEx >= l {
520
+						return 0, io.ErrUnexpectedEOF
521
+					}
522
+					b := dAtA[iNdEx]
523
+					iNdEx++
524
+					innerWire |= (uint64(b) & 0x7F) << shift
525
+					if b < 0x80 {
526
+						break
527
+					}
528
+				}
529
+				innerWireType := int(innerWire & 0x7)
530
+				if innerWireType == 4 {
531
+					break
532
+				}
533
+				next, err := skipFilesync(dAtA[start:])
534
+				if err != nil {
535
+					return 0, err
536
+				}
537
+				iNdEx = start + next
538
+			}
539
+			return iNdEx, nil
540
+		case 4:
541
+			return iNdEx, nil
542
+		case 5:
543
+			iNdEx += 4
544
+			return iNdEx, nil
545
+		default:
546
+			return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
547
+		}
548
+	}
549
+	panic("unreachable")
550
+}
551
+
552
+var (
553
+	ErrInvalidLengthFilesync = fmt.Errorf("proto: negative length found during unmarshaling")
554
+	ErrIntOverflowFilesync   = fmt.Errorf("proto: integer overflow")
555
+)
556
+
557
+func init() { proto.RegisterFile("filesync.proto", fileDescriptorFilesync) }
558
+
559
+var fileDescriptorFilesync = []byte{
560
+	// 198 bytes of a gzipped FileDescriptorProto
561
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x4b, 0xcb, 0xcc, 0x49,
562
+	0x2d, 0xae, 0xcc, 0x4b, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x12, 0xc8, 0xcd, 0x4f, 0xaa,
563
+	0xd4, 0x83, 0x0b, 0x96, 0x19, 0x2a, 0x29, 0x71, 0xf1, 0x38, 0x55, 0x96, 0xa4, 0x16, 0xfb, 0xa6,
564
+	0x16, 0x17, 0x27, 0xa6, 0xa7, 0x0a, 0x09, 0x71, 0xb1, 0xa4, 0x24, 0x96, 0x24, 0x4a, 0x30, 0x2a,
565
+	0x30, 0x6a, 0xf0, 0x04, 0x81, 0xd9, 0x46, 0xab, 0x19, 0xb9, 0x38, 0xdc, 0x32, 0x73, 0x52, 0x83,
566
+	0x2b, 0xf3, 0x92, 0x85, 0xfc, 0xb8, 0x38, 0x5c, 0x32, 0xd3, 0xd2, 0x9c, 0xf3, 0x0b, 0x2a, 0x85,
567
+	0xe4, 0xf4, 0xd0, 0xcd, 0xd3, 0x43, 0x36, 0x4c, 0x8a, 0x80, 0xbc, 0x06, 0xa3, 0x01, 0xa3, 0x90,
568
+	0x3f, 0x17, 0x67, 0x48, 0x62, 0x51, 0x70, 0x49, 0x51, 0x6a, 0x62, 0x2e, 0x35, 0x0c, 0x74, 0x32,
569
+	0xbb, 0xf0, 0x50, 0x8e, 0xe1, 0xc6, 0x43, 0x39, 0x86, 0x0f, 0x0f, 0xe5, 0x18, 0x1b, 0x1e, 0xc9,
570
+	0x31, 0xae, 0x78, 0x24, 0xc7, 0x78, 0xe2, 0x91, 0x1c, 0xe3, 0x85, 0x47, 0x72, 0x8c, 0x0f, 0x1e,
571
+	0xc9, 0x31, 0xbe, 0x78, 0x24, 0xc7, 0xf0, 0xe1, 0x91, 0x1c, 0xe3, 0x84, 0xc7, 0x72, 0x0c, 0x51,
572
+	0x1c, 0x30, 0xb3, 0x92, 0xd8, 0xc0, 0x41, 0x64, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x5f, 0x0c,
573
+	0x8d, 0xc5, 0x34, 0x01, 0x00, 0x00,
574
+}
0 575
new file mode 100644
... ...
@@ -0,0 +1,15 @@
0
+syntax = "proto3";
1
+
2
+package moby.filesync.v1;
3
+
4
+option go_package = "filesync";
5
+
6
+service FileSync{
7
+  rpc DiffCopy(stream BytesMessage) returns (stream BytesMessage);
8
+  rpc TarStream(stream BytesMessage) returns (stream BytesMessage);
9
+}
10
+
11
+// BytesMessage contains a chunk of byte data
12
+message BytesMessage{
13
+	bytes data = 1;
14
+}
0 15
\ No newline at end of file
1 16
new file mode 100644
... ...
@@ -0,0 +1,3 @@
0
+package filesync
1
+
2
+//go:generate protoc --gogoslick_out=plugins=grpc:. filesync.proto
0 3
new file mode 100644
... ...
@@ -0,0 +1,83 @@
0
+package filesync
1
+
2
+import (
3
+	"io"
4
+
5
+	"github.com/Sirupsen/logrus"
6
+	"github.com/docker/docker/pkg/archive"
7
+	"github.com/docker/docker/pkg/chrootarchive"
8
+	"github.com/pkg/errors"
9
+	"google.golang.org/grpc"
10
+)
11
+
12
+func sendTarStream(stream grpc.Stream, dir string, excludes []string, progress progressCb) error {
13
+	a, err := archive.TarWithOptions(dir, &archive.TarOptions{
14
+		ExcludePatterns: excludes,
15
+	})
16
+	if err != nil {
17
+		return err
18
+	}
19
+
20
+	size := 0
21
+	buf := make([]byte, 1<<15)
22
+	t := new(BytesMessage)
23
+	for {
24
+		n, err := a.Read(buf)
25
+		if err != nil {
26
+			if err == io.EOF {
27
+				break
28
+			}
29
+			return err
30
+		}
31
+		t.Data = buf[:n]
32
+
33
+		if err := stream.SendMsg(t); err != nil {
34
+			return err
35
+		}
36
+		size += n
37
+		if progress != nil {
38
+			progress(size, false)
39
+		}
40
+	}
41
+	if progress != nil {
42
+		progress(size, true)
43
+	}
44
+	return nil
45
+}
46
+
47
+func recvTarStream(ds grpc.Stream, dest string, cs CacheUpdater) error {
48
+
49
+	pr, pw := io.Pipe()
50
+
51
+	go func() {
52
+		var (
53
+			err error
54
+			t   = new(BytesMessage)
55
+		)
56
+		for {
57
+			if err = ds.RecvMsg(t); err != nil {
58
+				if err == io.EOF {
59
+					err = nil
60
+				}
61
+				break
62
+			}
63
+			_, err = pw.Write(t.Data)
64
+			if err != nil {
65
+				break
66
+			}
67
+		}
68
+		if err = pw.CloseWithError(err); err != nil {
69
+			logrus.Errorf("failed to close tar transfer pipe")
70
+		}
71
+	}()
72
+
73
+	decompressedStream, err := archive.DecompressStream(pr)
74
+	if err != nil {
75
+		return errors.Wrap(err, "failed to decompress stream")
76
+	}
77
+
78
+	if err := chrootarchive.Untar(decompressedStream, dest, nil); err != nil {
79
+		return errors.Wrap(err, "failed to untar context")
80
+	}
81
+	return nil
82
+}
... ...
@@ -27,6 +27,8 @@ import (
27 27
 	swarmrouter "github.com/docker/docker/api/server/router/swarm"
28 28
 	systemrouter "github.com/docker/docker/api/server/router/system"
29 29
 	"github.com/docker/docker/api/server/router/volume"
30
+	"github.com/docker/docker/builder/dockerfile"
31
+	"github.com/docker/docker/builder/fscache"
30 32
 	"github.com/docker/docker/cli/debug"
31 33
 	"github.com/docker/docker/client/session"
32 34
 	"github.com/docker/docker/daemon"
... ...
@@ -268,7 +270,26 @@ func (cli *DaemonCli) start(opts *daemonOptions) (err error) {
268 268
 		logrus.Fatalf("Error starting cluster component: %v", err)
269 269
 	}
270 270
 
271
-	bb, err := buildbackend.NewBackend(d, d, sm, d.IDMappings())
271
+	builderStateDir := filepath.Join(cli.Config.Root, "builder")
272
+
273
+	fsCache, err := fscache.NewFSCache(fscache.Opt{
274
+		Backend: fscache.NewNaiveCacheBackend(builderStateDir),
275
+		Root:    builderStateDir,
276
+		GCPolicy: fscache.GCPolicy{ // TODO: expose this in config
277
+			MaxSize:         1024 * 1024 * 512,  // 512MB
278
+			MaxKeepDuration: 7 * 24 * time.Hour, // 1 week
279
+		},
280
+	})
281
+	if err != nil {
282
+		return errors.Wrap(err, "failed to create fscache")
283
+	}
284
+
285
+	manager, err := dockerfile.NewBuildManager(d, sm, fsCache, d.IDMappings())
286
+	if err != nil {
287
+		return err
288
+	}
289
+
290
+	bb, err := buildbackend.NewBackend(d, manager, fsCache)
272 291
 	if err != nil {
273 292
 		return errors.Wrap(err, "failed to create buildmanager")
274 293
 	}
... ...
@@ -282,7 +303,7 @@ func (cli *DaemonCli) start(opts *daemonOptions) (err error) {
282 282
 
283 283
 	cli.d = d
284 284
 
285
-	initRouter(api, d, c, sm, bb)
285
+	initRouter(api, d, c, sm, bb, fsCache)
286 286
 
287 287
 	// process cluster change notifications
288 288
 	watchCtx, cancel := context.WithCancel(context.Background())
... ...
@@ -455,7 +476,7 @@ func loadDaemonCliConfig(opts *daemonOptions) (*config.Config, error) {
455 455
 	return conf, nil
456 456
 }
457 457
 
458
-func initRouter(s *apiserver.Server, d *daemon.Daemon, c *cluster.Cluster, sm *session.Manager, bb *buildbackend.Backend) {
458
+func initRouter(s *apiserver.Server, d *daemon.Daemon, c *cluster.Cluster, sm *session.Manager, bb *buildbackend.Backend, bc *fscache.FSCache) {
459 459
 	decoder := runconfig.ContainerDecoder{}
460 460
 
461 461
 	routers := []router.Router{
... ...
@@ -463,7 +484,7 @@ func initRouter(s *apiserver.Server, d *daemon.Daemon, c *cluster.Cluster, sm *s
463 463
 		checkpointrouter.NewRouter(d, decoder),
464 464
 		container.NewRouter(d, decoder),
465 465
 		image.NewRouter(d, decoder),
466
-		systemrouter.NewRouter(d, c),
466
+		systemrouter.NewRouter(d, c, bc),
467 467
 		volume.NewRouter(d),
468 468
 		build.NewRouter(bb, d),
469 469
 		sessionrouter.NewRouter(sm),
... ...
@@ -5,7 +5,8 @@ source "${SCRIPTDIR}/.validate"
5 5
 
6 6
 IFS=$'\n'
7 7
 files=( $(validate_diff --diff-filter=ACMR --name-only -- '*.go' |
8
-    grep -v '^vendor/' || true) )
8
+	grep -v '^vendor/' |
9
+	grep -v '\.pb\.go$' || true) )
9 10
 unset IFS
10 11
 
11 12
 badFiles=()
... ...
@@ -4,7 +4,7 @@ export SCRIPTDIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
4 4
 source "${SCRIPTDIR}/.validate"
5 5
 
6 6
 IFS=$'\n'
7
-files=( $(validate_diff --diff-filter=ACMR --name-only -- '*.go' | grep -v '^vendor/' | grep -v '^api/types/container/' | grep -v '^api/types/plugins/logdriver/entry.pb.go' || true) )
7
+files=( $(validate_diff --diff-filter=ACMR --name-only -- '*.go' | grep -v '^vendor/' | grep -v '^api/types/container/' | grep -v '\.pb\.go$'  || true) )
8 8
 unset IFS
9 9
 
10 10
 errors=()
... ...
@@ -12,6 +12,8 @@ import (
12 12
 	"strings"
13 13
 
14 14
 	"github.com/docker/docker/api/types"
15
+	"github.com/docker/docker/client/session"
16
+	"github.com/docker/docker/client/session/filesync"
15 17
 	"github.com/docker/docker/integration-cli/checker"
16 18
 	"github.com/docker/docker/integration-cli/cli/build/fakecontext"
17 19
 	"github.com/docker/docker/integration-cli/cli/build/fakegit"
... ...
@@ -22,6 +24,7 @@ import (
22 22
 	"github.com/stretchr/testify/assert"
23 23
 	"github.com/stretchr/testify/require"
24 24
 	"golang.org/x/net/context"
25
+	"golang.org/x/sync/errgroup"
25 26
 )
26 27
 
27 28
 func (s *DockerSuite) TestBuildAPIDockerFileRemote(c *check.C) {
... ...
@@ -363,6 +366,108 @@ func (s *DockerRegistrySuite) TestBuildCopyFromForcePull(c *check.C) {
363 363
 	assert.Contains(c, string(out), "Successfully built")
364 364
 }
365 365
 
366
+func (s *DockerSuite) TestBuildWithSession(c *check.C) {
367
+	testRequires(c, ExperimentalDaemon)
368
+
369
+	dockerfile := `
370
+		FROM busybox
371
+		COPY file /
372
+		RUN cat /file
373
+	`
374
+
375
+	fctx := fakecontext.New(c, "",
376
+		fakecontext.WithFile("file", "some content"),
377
+	)
378
+	defer fctx.Close()
379
+
380
+	out := testBuildWithSession(c, fctx.Dir, dockerfile)
381
+	assert.Contains(c, out, "some content")
382
+
383
+	fctx.Add("second", "contentcontent")
384
+
385
+	dockerfile += `
386
+	COPY second /
387
+	RUN cat /second
388
+	`
389
+
390
+	out = testBuildWithSession(c, fctx.Dir, dockerfile)
391
+	assert.Equal(c, strings.Count(out, "Using cache"), 2)
392
+	assert.Contains(c, out, "contentcontent")
393
+
394
+	client, err := request.NewClient()
395
+	require.NoError(c, err)
396
+
397
+	du, err := client.DiskUsage(context.TODO())
398
+	assert.Nil(c, err)
399
+	assert.True(c, du.BuilderSize > 10)
400
+
401
+	out = testBuildWithSession(c, fctx.Dir, dockerfile)
402
+	assert.Equal(c, strings.Count(out, "Using cache"), 4)
403
+
404
+	du2, err := client.DiskUsage(context.TODO())
405
+	assert.Nil(c, err)
406
+	assert.Equal(c, du.BuilderSize, du2.BuilderSize)
407
+
408
+	// rebuild with regular tar, confirm cache still applies
409
+	fctx.Add("Dockerfile", dockerfile)
410
+	res, body, err := request.Post(
411
+		"/build",
412
+		request.RawContent(fctx.AsTarReader(c)),
413
+		request.ContentType("application/x-tar"))
414
+	require.NoError(c, err)
415
+	assert.Equal(c, http.StatusOK, res.StatusCode)
416
+
417
+	outBytes, err := testutil.ReadBody(body)
418
+	require.NoError(c, err)
419
+	assert.Contains(c, string(outBytes), "Successfully built")
420
+	assert.Equal(c, strings.Count(string(outBytes), "Using cache"), 4)
421
+
422
+	_, err = client.BuildCachePrune(context.TODO())
423
+	assert.Nil(c, err)
424
+
425
+	du, err = client.DiskUsage(context.TODO())
426
+	assert.Nil(c, err)
427
+	assert.Equal(c, du.BuilderSize, int64(0))
428
+}
429
+
430
+func testBuildWithSession(c *check.C, dir, dockerfile string) (outStr string) {
431
+	client, err := request.NewClient()
432
+	require.NoError(c, err)
433
+
434
+	sess, err := session.NewSession("foo1", "foo")
435
+	assert.Nil(c, err)
436
+
437
+	fsProvider := filesync.NewFSSyncProvider(dir, nil)
438
+	sess.Allow(fsProvider)
439
+
440
+	g, ctx := errgroup.WithContext(context.Background())
441
+
442
+	g.Go(func() error {
443
+		return sess.Run(ctx, client.DialSession)
444
+	})
445
+
446
+	g.Go(func() error {
447
+		res, body, err := request.Post("/build?remote=client-session&session="+sess.UUID(), func(req *http.Request) error {
448
+			req.Body = ioutil.NopCloser(strings.NewReader(dockerfile))
449
+			return nil
450
+		})
451
+		if err != nil {
452
+			return err
453
+		}
454
+		assert.Equal(c, res.StatusCode, http.StatusOK)
455
+		out, err := testutil.ReadBody(body)
456
+		require.NoError(c, err)
457
+		assert.Contains(c, string(out), "Successfully built")
458
+		sess.Close()
459
+		outStr = string(out)
460
+		return nil
461
+	})
462
+
463
+	err = g.Wait()
464
+	assert.Nil(c, err)
465
+	return
466
+}
467
+
366 468
 type buildLine struct {
367 469
 	Stream string
368 470
 	Aux    struct {
... ...
@@ -1789,7 +1789,7 @@ func (s *DockerDaemonSuite) TestDaemonNoSpaceLeftOnDeviceError(c *check.C) {
1789 1789
 
1790 1790
 	// create a 2MiB image and mount it as graph root
1791 1791
 	// Why in a container? Because `mount` sometimes behaves weirdly and often fails outright on this test in debian:jessie (which is what the test suite runs under if run from the Makefile)
1792
-	dockerCmd(c, "run", "--rm", "-v", testDir+":/test", "busybox", "sh", "-c", "dd of=/test/testfs.img bs=1M seek=2 count=0")
1792
+	dockerCmd(c, "run", "--rm", "-v", testDir+":/test", "busybox", "sh", "-c", "dd of=/test/testfs.img bs=1M seek=3 count=0")
1793 1793
 	icmd.RunCommand("mkfs.ext4", "-F", filepath.Join(testDir, "testfs.img")).Assert(c, icmd.Success)
1794 1794
 
1795 1795
 	result := icmd.RunCommand("losetup", "-f", "--show", filepath.Join(testDir, "testfs.img"))
... ...
@@ -17,6 +17,7 @@ import (
17 17
 	"strings"
18 18
 	"time"
19 19
 
20
+	"github.com/docker/docker/api"
20 21
 	dclient "github.com/docker/docker/client"
21 22
 	"github.com/docker/docker/opts"
22 23
 	"github.com/docker/docker/pkg/ioutils"
... ...
@@ -170,7 +171,7 @@ func NewClient() (dclient.APIClient, error) {
170 170
 	if err != nil {
171 171
 		return nil, err
172 172
 	}
173
-	return dclient.NewClient(host, "", httpClient, nil)
173
+	return dclient.NewClient(host, api.DefaultVersion, httpClient, nil)
174 174
 }
175 175
 
176 176
 // FIXME(vdemeester) httputil.ClientConn is deprecated, use http.Client instead (closer to actual client)
... ...
@@ -305,15 +305,7 @@ func (compression *Compression) Extension() string {
305 305
 
306 306
 // FileInfoHeader creates a populated Header from fi.
307 307
 // Compared to archive pkg this function fills in more information.
308
-func FileInfoHeader(path, name string, fi os.FileInfo) (*tar.Header, error) {
309
-	var link string
310
-	if fi.Mode()&os.ModeSymlink != 0 {
311
-		var err error
312
-		link, err = os.Readlink(path)
313
-		if err != nil {
314
-			return nil, err
315
-		}
316
-	}
308
+func FileInfoHeader(name string, fi os.FileInfo, link string) (*tar.Header, error) {
317 309
 	hdr, err := tar.FileInfoHeader(fi, link)
318 310
 	if err != nil {
319 311
 		return nil, err
... ...
@@ -327,12 +319,18 @@ func FileInfoHeader(path, name string, fi os.FileInfo) (*tar.Header, error) {
327 327
 	if err := setHeaderForSpecialDevice(hdr, name, fi.Sys()); err != nil {
328 328
 		return nil, err
329 329
 	}
330
+	return hdr, nil
331
+}
332
+
333
+// ReadSecurityXattrToTarHeader reads security.capability xattr from filesystem
334
+// to a tar header
335
+func ReadSecurityXattrToTarHeader(path string, hdr *tar.Header) error {
330 336
 	capability, _ := system.Lgetxattr(path, "security.capability")
331 337
 	if capability != nil {
332 338
 		hdr.Xattrs = make(map[string]string)
333 339
 		hdr.Xattrs["security.capability"] = string(capability)
334 340
 	}
335
-	return hdr, nil
341
+	return nil
336 342
 }
337 343
 
338 344
 type tarWhiteoutConverter interface {
... ...
@@ -386,10 +384,22 @@ func (ta *tarAppender) addTarFile(path, name string) error {
386 386
 		return err
387 387
 	}
388 388
 
389
-	hdr, err := FileInfoHeader(path, name, fi)
389
+	var link string
390
+	if fi.Mode()&os.ModeSymlink != 0 {
391
+		var err error
392
+		link, err = os.Readlink(path)
393
+		if err != nil {
394
+			return err
395
+		}
396
+	}
397
+
398
+	hdr, err := FileInfoHeader(name, fi, link)
390 399
 	if err != nil {
391 400
 		return err
392 401
 	}
402
+	if err := ReadSecurityXattrToTarHeader(path, hdr); err != nil {
403
+		return err
404
+	}
393 405
 
394 406
 	// if it's not a directory and has more than 1 link,
395 407
 	// it's hard linked, so set the type flag accordingly
... ...
@@ -45,16 +45,13 @@ func chmodTarEntry(perm os.FileMode) os.FileMode {
45 45
 func setHeaderForSpecialDevice(hdr *tar.Header, name string, stat interface{}) (err error) {
46 46
 	s, ok := stat.(*syscall.Stat_t)
47 47
 
48
-	if !ok {
49
-		err = errors.New("cannot convert stat value to syscall.Stat_t")
50
-		return
51
-	}
52
-
53
-	// Currently go does not fill in the major/minors
54
-	if s.Mode&syscall.S_IFBLK != 0 ||
55
-		s.Mode&syscall.S_IFCHR != 0 {
56
-		hdr.Devmajor = int64(major(uint64(s.Rdev)))
57
-		hdr.Devminor = int64(minor(uint64(s.Rdev)))
48
+	if ok {
49
+		// Currently go does not fill in the major/minors
50
+		if s.Mode&syscall.S_IFBLK != 0 ||
51
+			s.Mode&syscall.S_IFCHR != 0 {
52
+			hdr.Devmajor = int64(major(uint64(s.Rdev)))
53
+			hdr.Devminor = int64(minor(uint64(s.Rdev)))
54
+		}
58 55
 	}
59 56
 
60 57
 	return
... ...
@@ -63,13 +60,10 @@ func setHeaderForSpecialDevice(hdr *tar.Header, name string, stat interface{}) (
63 63
 func getInodeFromStat(stat interface{}) (inode uint64, err error) {
64 64
 	s, ok := stat.(*syscall.Stat_t)
65 65
 
66
-	if !ok {
67
-		err = errors.New("cannot convert stat value to syscall.Stat_t")
68
-		return
66
+	if ok {
67
+		inode = uint64(s.Ino)
69 68
 	}
70 69
 
71
-	inode = uint64(s.Ino)
72
-
73 70
 	return
74 71
 }
75 72