Browse code

Use containerd dist libs for plugin pull/pull

This removes the use of the old distribution code in the plugin packages
and replaces it with containerd libraries for plugin pushes and pulls.

Additionally it uses a content store from containerd which seems like
it's compatible with the old "basicBlobStore" in the plugin package.
This is being used locally isntead of through the containerd client for
now.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>

Brian Goff authored on 2020/02/11 09:31:04
Showing 13 changed files
... ...
@@ -5,12 +5,16 @@ import (
5 5
 	"os"
6 6
 	"testing"
7 7
 
8
+	"github.com/docker/docker/pkg/reexec"
8 9
 	"github.com/docker/docker/testutil/environment"
9 10
 )
10 11
 
11 12
 var testEnv *environment.Execution
12 13
 
13 14
 func TestMain(m *testing.M) {
15
+	if reexec.Init() {
16
+		return
17
+	}
14 18
 	var err error
15 19
 	testEnv, err = environment.New()
16 20
 	if err != nil {
... ...
@@ -1,12 +1,25 @@
1 1
 package common // import "github.com/docker/docker/integration/plugin/common"
2 2
 
3 3
 import (
4
+	"context"
5
+	"encoding/base64"
6
+	"encoding/json"
7
+	"io"
8
+	"io/ioutil"
9
+	"net"
4 10
 	"net/http"
11
+	"path"
12
+	"strings"
5 13
 	"testing"
6 14
 
15
+	"github.com/docker/docker/api/types"
16
+	"github.com/docker/docker/testutil/daemon"
17
+	"github.com/docker/docker/testutil/fixtures/plugin"
18
+	"github.com/docker/docker/testutil/registry"
7 19
 	"github.com/docker/docker/testutil/request"
8 20
 	"gotest.tools/v3/assert"
9 21
 	is "gotest.tools/v3/assert/cmp"
22
+	"gotest.tools/v3/skip"
10 23
 )
11 24
 
12 25
 func TestPluginInvalidJSON(t *testing.T) {
... ...
@@ -36,3 +49,111 @@ func TestPluginInvalidJSON(t *testing.T) {
36 36
 		})
37 37
 	}
38 38
 }
39
+
40
+func TestPluginInstall(t *testing.T) {
41
+	skip.If(t, testEnv.IsRemoteDaemon, "cannot run daemon when remote daemon")
42
+	skip.If(t, testEnv.OSType == "windows")
43
+	skip.If(t, testEnv.IsRootless, "rootless mode has different view of localhost")
44
+
45
+	ctx := context.Background()
46
+	client := testEnv.APIClient()
47
+
48
+	t.Run("no auth", func(t *testing.T) {
49
+		defer setupTest(t)()
50
+
51
+		reg := registry.NewV2(t)
52
+		defer reg.Close()
53
+
54
+		name := "test-" + strings.ToLower(t.Name())
55
+		repo := path.Join(registry.DefaultURL, name+":latest")
56
+		assert.NilError(t, plugin.CreateInRegistry(ctx, repo, nil))
57
+
58
+		rdr, err := client.PluginInstall(ctx, repo, types.PluginInstallOptions{Disabled: true, RemoteRef: repo})
59
+		assert.NilError(t, err)
60
+		defer rdr.Close()
61
+
62
+		_, err = io.Copy(ioutil.Discard, rdr)
63
+		assert.NilError(t, err)
64
+
65
+		_, _, err = client.PluginInspectWithRaw(ctx, repo)
66
+		assert.NilError(t, err)
67
+	})
68
+
69
+	t.Run("with htpasswd", func(t *testing.T) {
70
+		defer setupTest(t)()
71
+
72
+		reg := registry.NewV2(t, registry.Htpasswd)
73
+		defer reg.Close()
74
+
75
+		name := "test-" + strings.ToLower(t.Name())
76
+		repo := path.Join(registry.DefaultURL, name+":latest")
77
+		auth := &types.AuthConfig{ServerAddress: registry.DefaultURL, Username: "testuser", Password: "testpassword"}
78
+		assert.NilError(t, plugin.CreateInRegistry(ctx, repo, auth))
79
+
80
+		authEncoded, err := json.Marshal(auth)
81
+		assert.NilError(t, err)
82
+
83
+		rdr, err := client.PluginInstall(ctx, repo, types.PluginInstallOptions{
84
+			RegistryAuth: base64.URLEncoding.EncodeToString(authEncoded),
85
+			Disabled:     true,
86
+			RemoteRef:    repo,
87
+		})
88
+		assert.NilError(t, err)
89
+		defer rdr.Close()
90
+
91
+		_, err = io.Copy(ioutil.Discard, rdr)
92
+		assert.NilError(t, err)
93
+
94
+		_, _, err = client.PluginInspectWithRaw(ctx, repo)
95
+		assert.NilError(t, err)
96
+	})
97
+	t.Run("with insecure", func(t *testing.T) {
98
+		skip.If(t, !testEnv.IsLocalDaemon())
99
+
100
+		addrs, err := net.InterfaceAddrs()
101
+		assert.NilError(t, err)
102
+
103
+		var bindTo string
104
+		for _, addr := range addrs {
105
+			ip, ok := addr.(*net.IPNet)
106
+			if !ok {
107
+				continue
108
+			}
109
+			if ip.IP.IsLoopback() || ip.IP.To4() == nil {
110
+				continue
111
+			}
112
+			bindTo = ip.IP.String()
113
+		}
114
+
115
+		if bindTo == "" {
116
+			t.Skip("No suitable interface to bind registry to")
117
+		}
118
+
119
+		regURL := bindTo + ":5000"
120
+
121
+		d := daemon.New(t)
122
+		defer d.Stop(t)
123
+
124
+		d.Start(t, "--insecure-registry="+regURL)
125
+		defer d.Stop(t)
126
+
127
+		reg := registry.NewV2(t, registry.URL(regURL))
128
+		defer reg.Close()
129
+
130
+		name := "test-" + strings.ToLower(t.Name())
131
+		repo := path.Join(regURL, name+":latest")
132
+		assert.NilError(t, plugin.CreateInRegistry(ctx, repo, nil, plugin.WithInsecureRegistry(regURL)))
133
+
134
+		client := d.NewClientT(t)
135
+		rdr, err := client.PluginInstall(ctx, repo, types.PluginInstallOptions{Disabled: true, RemoteRef: repo})
136
+		assert.NilError(t, err)
137
+		defer rdr.Close()
138
+
139
+		_, err = io.Copy(ioutil.Discard, rdr)
140
+		assert.NilError(t, err)
141
+
142
+		_, _, err = client.PluginInspectWithRaw(ctx, repo)
143
+		assert.NilError(t, err)
144
+	})
145
+	// TODO: test insecure registry with https
146
+}
... ...
@@ -2,6 +2,7 @@ package plugin // import "github.com/docker/docker/plugin"
2 2
 
3 3
 import (
4 4
 	"archive/tar"
5
+	"bytes"
5 6
 	"compress/gzip"
6 7
 	"context"
7 8
 	"encoding/json"
... ...
@@ -11,27 +12,27 @@ import (
11 11
 	"os"
12 12
 	"path"
13 13
 	"path/filepath"
14
-	"runtime"
15 14
 	"strings"
15
+	"time"
16 16
 
17
+	"github.com/containerd/containerd/content"
18
+	"github.com/containerd/containerd/images"
19
+	"github.com/containerd/containerd/platforms"
20
+	"github.com/containerd/containerd/remotes"
21
+	"github.com/containerd/containerd/remotes/docker"
17 22
 	"github.com/docker/distribution/manifest/schema2"
18 23
 	"github.com/docker/distribution/reference"
19 24
 	"github.com/docker/docker/api/types"
20 25
 	"github.com/docker/docker/api/types/filters"
21
-	"github.com/docker/docker/distribution"
22
-	progressutils "github.com/docker/docker/distribution/utils"
23
-	"github.com/docker/docker/distribution/xfer"
24 26
 	"github.com/docker/docker/dockerversion"
25 27
 	"github.com/docker/docker/errdefs"
26
-	"github.com/docker/docker/image"
27
-	"github.com/docker/docker/layer"
28 28
 	"github.com/docker/docker/pkg/authorization"
29 29
 	"github.com/docker/docker/pkg/chrootarchive"
30 30
 	"github.com/docker/docker/pkg/pools"
31 31
 	"github.com/docker/docker/pkg/progress"
32
+	"github.com/docker/docker/pkg/stringid"
32 33
 	"github.com/docker/docker/pkg/system"
33 34
 	v2 "github.com/docker/docker/plugin/v2"
34
-	refstore "github.com/docker/docker/reference"
35 35
 	"github.com/moby/sys/mount"
36 36
 	digest "github.com/opencontainers/go-digest"
37 37
 	specs "github.com/opencontainers/image-spec/specs-go/v1"
... ...
@@ -98,64 +99,6 @@ func (pm *Manager) Inspect(refOrID string) (tp *types.Plugin, err error) {
98 98
 	return &p.PluginObj, nil
99 99
 }
100 100
 
101
-func (pm *Manager) pull(ctx context.Context, ref reference.Named, config *distribution.ImagePullConfig, outStream io.Writer) error {
102
-	if outStream != nil {
103
-		// Include a buffer so that slow client connections don't affect
104
-		// transfer performance.
105
-		progressChan := make(chan progress.Progress, 100)
106
-
107
-		writesDone := make(chan struct{})
108
-
109
-		defer func() {
110
-			close(progressChan)
111
-			<-writesDone
112
-		}()
113
-
114
-		var cancelFunc context.CancelFunc
115
-		ctx, cancelFunc = context.WithCancel(ctx)
116
-
117
-		go func() {
118
-			progressutils.WriteDistributionProgress(cancelFunc, outStream, progressChan)
119
-			close(writesDone)
120
-		}()
121
-
122
-		config.ProgressOutput = progress.ChanOutput(progressChan)
123
-	} else {
124
-		config.ProgressOutput = progress.DiscardOutput()
125
-	}
126
-	return distribution.Pull(ctx, ref, config)
127
-}
128
-
129
-type tempConfigStore struct {
130
-	config       []byte
131
-	configDigest digest.Digest
132
-}
133
-
134
-func (s *tempConfigStore) Put(c []byte) (digest.Digest, error) {
135
-	dgst := digest.FromBytes(c)
136
-
137
-	s.config = c
138
-	s.configDigest = dgst
139
-
140
-	return dgst, nil
141
-}
142
-
143
-func (s *tempConfigStore) Get(d digest.Digest) ([]byte, error) {
144
-	if d != s.configDigest {
145
-		return nil, errNotFound("digest not found")
146
-	}
147
-	return s.config, nil
148
-}
149
-
150
-func (s *tempConfigStore) RootFSFromConfig(c []byte) (*image.RootFS, error) {
151
-	return configToRootFS(c)
152
-}
153
-
154
-func (s *tempConfigStore) PlatformFromConfig(c []byte) (*specs.Platform, error) {
155
-	// TODO: LCOW/Plugins. This will need revisiting. For now use the runtime OS
156
-	return &specs.Platform{OS: runtime.GOOS}, nil
157
-}
158
-
159 101
 func computePrivileges(c types.PluginConfig) types.PluginPrivileges {
160 102
 	var privileges types.PluginPrivileges
161 103
 	if c.Network.Type != "null" && c.Network.Type != "bridge" && c.Network.Type != "" {
... ...
@@ -217,37 +160,53 @@ func computePrivileges(c types.PluginConfig) types.PluginPrivileges {
217 217
 
218 218
 // Privileges pulls a plugin config and computes the privileges required to install it.
219 219
 func (pm *Manager) Privileges(ctx context.Context, ref reference.Named, metaHeader http.Header, authConfig *types.AuthConfig) (types.PluginPrivileges, error) {
220
-	// create image store instance
221
-	cs := &tempConfigStore{}
222
-
223
-	// DownloadManager not defined because only pulling configuration.
224
-	pluginPullConfig := &distribution.ImagePullConfig{
225
-		Config: distribution.Config{
226
-			MetaHeaders:      metaHeader,
227
-			AuthConfig:       authConfig,
228
-			RegistryService:  pm.config.RegistryService,
229
-			ImageEventLogger: func(string, string, string) {},
230
-			ImageStore:       cs,
231
-		},
232
-		Schema2Types: distribution.PluginTypes,
233
-	}
234
-
235
-	if err := pm.pull(ctx, ref, pluginPullConfig, nil); err != nil {
236
-		return nil, err
220
+	var (
221
+		config     types.PluginConfig
222
+		configSeen bool
223
+	)
224
+
225
+	h := func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) {
226
+		switch desc.MediaType {
227
+		case schema2.MediaTypeManifest, specs.MediaTypeImageManifest:
228
+			data, err := content.ReadBlob(ctx, pm.blobStore, desc)
229
+			if err != nil {
230
+				return nil, errors.Wrapf(err, "error reading image manifest from blob store for %s", ref)
231
+			}
232
+
233
+			var m specs.Manifest
234
+			if err := json.Unmarshal(data, &m); err != nil {
235
+				return nil, errors.Wrapf(err, "error unmarshaling image manifest for %s", ref)
236
+			}
237
+			return []specs.Descriptor{m.Config}, nil
238
+		case schema2.MediaTypePluginConfig:
239
+			configSeen = true
240
+			data, err := content.ReadBlob(ctx, pm.blobStore, desc)
241
+			if err != nil {
242
+				return nil, errors.Wrapf(err, "error reading plugin config from blob store for %s", ref)
243
+			}
244
+
245
+			if err := json.Unmarshal(data, &config); err != nil {
246
+				return nil, errors.Wrapf(err, "error unmarshaling plugin config for %s", ref)
247
+			}
248
+		}
249
+
250
+		return nil, nil
237 251
 	}
238 252
 
239
-	if cs.config == nil {
240
-		return nil, errors.New("no configuration pulled")
253
+	if err := pm.fetch(ctx, ref, authConfig, progress.DiscardOutput(), metaHeader, images.HandlerFunc(h)); err != nil {
254
+		return types.PluginPrivileges{}, nil
241 255
 	}
242
-	var config types.PluginConfig
243
-	if err := json.Unmarshal(cs.config, &config); err != nil {
244
-		return nil, errdefs.System(err)
256
+
257
+	if !configSeen {
258
+		return types.PluginPrivileges{}, errors.Errorf("did not find plugin config for specified reference %s", ref)
245 259
 	}
246 260
 
247 261
 	return computePrivileges(config), nil
248 262
 }
249 263
 
250 264
 // Upgrade upgrades a plugin
265
+//
266
+// TODO: replace reference package usage with simpler url.Parse semantics
251 267
 func (pm *Manager) Upgrade(ctx context.Context, ref reference.Named, name string, metaHeader http.Header, authConfig *types.AuthConfig, privileges types.PluginPrivileges, outStream io.Writer) (err error) {
252 268
 	p, err := pm.config.Store.GetV2Plugin(name)
253 269
 	if err != nil {
... ...
@@ -258,44 +217,35 @@ func (pm *Manager) Upgrade(ctx context.Context, ref reference.Named, name string
258 258
 		return errors.Wrap(enabledError(p.Name()), "plugin must be disabled before upgrading")
259 259
 	}
260 260
 
261
-	pm.muGC.RLock()
262
-	defer pm.muGC.RUnlock()
263
-
264 261
 	// revalidate because Pull is public
265 262
 	if _, err := reference.ParseNormalizedNamed(name); err != nil {
266 263
 		return errors.Wrapf(errdefs.InvalidParameter(err), "failed to parse %q", name)
267 264
 	}
268 265
 
266
+	pm.muGC.RLock()
267
+	defer pm.muGC.RUnlock()
268
+
269 269
 	tmpRootFSDir, err := ioutil.TempDir(pm.tmpDir(), ".rootfs")
270 270
 	if err != nil {
271
-		return errors.Wrap(errdefs.System(err), "error preparing upgrade")
271
+		return errors.Wrap(err, "error creating tmp dir for plugin rootfs")
272 272
 	}
273
-	defer os.RemoveAll(tmpRootFSDir)
274 273
 
275
-	dm := &downloadManager{
276
-		tmpDir:    tmpRootFSDir,
277
-		blobStore: pm.blobStore,
278
-	}
274
+	var md fetchMeta
279 275
 
280
-	pluginPullConfig := &distribution.ImagePullConfig{
281
-		Config: distribution.Config{
282
-			MetaHeaders:      metaHeader,
283
-			AuthConfig:       authConfig,
284
-			RegistryService:  pm.config.RegistryService,
285
-			ImageEventLogger: pm.config.LogPluginEvent,
286
-			ImageStore:       dm,
287
-		},
288
-		DownloadManager: dm, // todo: reevaluate if possible to substitute distribution/xfer dependencies instead
289
-		Schema2Types:    distribution.PluginTypes,
276
+	ctx, cancel := context.WithCancel(ctx)
277
+	out, waitProgress := setupProgressOutput(outStream, cancel)
278
+	defer waitProgress()
279
+
280
+	if err := pm.fetch(ctx, ref, authConfig, out, metaHeader, storeFetchMetadata(&md), childrenHandler(pm.blobStore), applyLayer(pm.blobStore, tmpRootFSDir, out)); err != nil {
281
+		return err
290 282
 	}
283
+	pm.config.LogPluginEvent(reference.FamiliarString(ref), name, "pull")
291 284
 
292
-	err = pm.pull(ctx, ref, pluginPullConfig, outStream)
293
-	if err != nil {
294
-		go pm.GC()
285
+	if err := validateFetchedMetadata(md); err != nil {
295 286
 		return err
296 287
 	}
297 288
 
298
-	if err := pm.upgradePlugin(p, dm.configDigest, dm.blobs, tmpRootFSDir, &privileges); err != nil {
289
+	if err := pm.upgradePlugin(p, md.config, md.manifest, md.blobs, tmpRootFSDir, &privileges); err != nil {
299 290
 		return err
300 291
 	}
301 292
 	p.PluginObj.PluginReference = ref.String()
... ...
@@ -303,6 +253,8 @@ func (pm *Manager) Upgrade(ctx context.Context, ref reference.Named, name string
303 303
 }
304 304
 
305 305
 // Pull pulls a plugin, check if the correct privileges are provided and install the plugin.
306
+//
307
+// TODO: replace reference package usage with simpler url.Parse semantics
306 308
 func (pm *Manager) Pull(ctx context.Context, ref reference.Named, name string, metaHeader http.Header, authConfig *types.AuthConfig, privileges types.PluginPrivileges, outStream io.Writer, opts ...CreateOpt) (err error) {
307 309
 	pm.muGC.RLock()
308 310
 	defer pm.muGC.RUnlock()
... ...
@@ -320,30 +272,22 @@ func (pm *Manager) Pull(ctx context.Context, ref reference.Named, name string, m
320 320
 
321 321
 	tmpRootFSDir, err := ioutil.TempDir(pm.tmpDir(), ".rootfs")
322 322
 	if err != nil {
323
-		return errors.Wrap(errdefs.System(err), "error preparing pull")
323
+		return errors.Wrap(errdefs.System(err), "error preparing upgrade")
324 324
 	}
325 325
 	defer os.RemoveAll(tmpRootFSDir)
326 326
 
327
-	dm := &downloadManager{
328
-		tmpDir:    tmpRootFSDir,
329
-		blobStore: pm.blobStore,
330
-	}
327
+	var md fetchMeta
331 328
 
332
-	pluginPullConfig := &distribution.ImagePullConfig{
333
-		Config: distribution.Config{
334
-			MetaHeaders:      metaHeader,
335
-			AuthConfig:       authConfig,
336
-			RegistryService:  pm.config.RegistryService,
337
-			ImageEventLogger: pm.config.LogPluginEvent,
338
-			ImageStore:       dm,
339
-		},
340
-		DownloadManager: dm, // todo: reevaluate if possible to substitute distribution/xfer dependencies instead
341
-		Schema2Types:    distribution.PluginTypes,
329
+	ctx, cancel := context.WithCancel(ctx)
330
+	out, waitProgress := setupProgressOutput(outStream, cancel)
331
+	defer waitProgress()
332
+
333
+	if err := pm.fetch(ctx, ref, authConfig, out, metaHeader, storeFetchMetadata(&md), childrenHandler(pm.blobStore), applyLayer(pm.blobStore, tmpRootFSDir, out)); err != nil {
334
+		return err
342 335
 	}
336
+	pm.config.LogPluginEvent(reference.FamiliarString(ref), name, "pull")
343 337
 
344
-	err = pm.pull(ctx, ref, pluginPullConfig, outStream)
345
-	if err != nil {
346
-		go pm.GC()
338
+	if err := validateFetchedMetadata(md); err != nil {
347 339
 		return err
348 340
 	}
349 341
 
... ...
@@ -354,12 +298,14 @@ func (pm *Manager) Pull(ctx context.Context, ref reference.Named, name string, m
354 354
 	optsList = append(optsList, opts...)
355 355
 	optsList = append(optsList, refOpt)
356 356
 
357
-	p, err := pm.createPlugin(name, dm.configDigest, dm.blobs, tmpRootFSDir, &privileges, optsList...)
357
+	// TODO: tmpRootFSDir is empty but should have layers in it
358
+	p, err := pm.createPlugin(name, md.config, md.manifest, md.blobs, tmpRootFSDir, &privileges, optsList...)
358 359
 	if err != nil {
359 360
 		return err
360 361
 	}
361 362
 
362 363
 	pm.publisher.Publish(EventCreate{Plugin: p.PluginObj})
364
+
363 365
 	return nil
364 366
 }
365 367
 
... ...
@@ -404,7 +350,7 @@ next:
404 404
 	return out, nil
405 405
 }
406 406
 
407
-// Push pushes a plugin to the store.
407
+// Push pushes a plugin to the registry.
408 408
 func (pm *Manager) Push(ctx context.Context, name string, metaHeader http.Header, authConfig *types.AuthConfig, outStream io.Writer) error {
409 409
 	p, err := pm.config.Store.GetV2Plugin(name)
410 410
 	if err != nil {
... ...
@@ -416,201 +362,197 @@ func (pm *Manager) Push(ctx context.Context, name string, metaHeader http.Header
416 416
 		return errors.Wrapf(err, "plugin has invalid name %v for push", p.Name())
417 417
 	}
418 418
 
419
-	var po progress.Output
420
-	if outStream != nil {
421
-		// Include a buffer so that slow client connections don't affect
422
-		// transfer performance.
423
-		progressChan := make(chan progress.Progress, 100)
424
-
425
-		writesDone := make(chan struct{})
419
+	statusTracker := docker.NewInMemoryTracker()
426 420
 
427
-		defer func() {
428
-			close(progressChan)
429
-			<-writesDone
430
-		}()
431
-
432
-		var cancelFunc context.CancelFunc
433
-		ctx, cancelFunc = context.WithCancel(ctx)
421
+	resolver, err := pm.newResolver(ctx, statusTracker, authConfig, metaHeader, false)
422
+	if err != nil {
423
+		return err
424
+	}
434 425
 
435
-		go func() {
436
-			progressutils.WriteDistributionProgress(cancelFunc, outStream, progressChan)
437
-			close(writesDone)
438
-		}()
426
+	pusher, err := resolver.Pusher(ctx, ref.String())
427
+	if err != nil {
439 428
 
440
-		po = progress.ChanOutput(progressChan)
441
-	} else {
442
-		po = progress.DiscardOutput()
429
+		return errors.Wrap(err, "error creating plugin pusher")
443 430
 	}
444 431
 
445
-	// TODO: replace these with manager
446
-	is := &pluginConfigStore{
447
-		pm:     pm,
448
-		plugin: p,
449
-	}
450
-	lss := make(map[string]distribution.PushLayerProvider)
451
-	lss[runtime.GOOS] = &pluginLayerProvider{
452
-		pm:     pm,
453
-		plugin: p,
454
-	}
455
-	rs := &pluginReference{
456
-		name:     ref,
457
-		pluginID: p.Config,
458
-	}
432
+	pj := newPushJobs(statusTracker)
433
+
434
+	ctx, cancel := context.WithCancel(ctx)
435
+	out, waitProgress := setupProgressOutput(outStream, cancel)
436
+	defer waitProgress()
459 437
 
460
-	uploadManager := xfer.NewLayerUploadManager(3)
438
+	progressHandler := images.HandlerFunc(func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) {
439
+		logrus.WithField("mediaType", desc.MediaType).WithField("digest", desc.Digest.String()).Debug("Preparing to push plugin layer")
440
+		id := stringid.TruncateID(desc.Digest.String())
441
+		pj.add(remotes.MakeRefKey(ctx, desc), id)
442
+		progress.Update(out, id, "Preparing")
443
+		return nil, nil
444
+	})
461 445
 
462
-	imagePushConfig := &distribution.ImagePushConfig{
463
-		Config: distribution.Config{
464
-			MetaHeaders:      metaHeader,
465
-			AuthConfig:       authConfig,
466
-			ProgressOutput:   po,
467
-			RegistryService:  pm.config.RegistryService,
468
-			ReferenceStore:   rs,
469
-			ImageEventLogger: pm.config.LogPluginEvent,
470
-			ImageStore:       is,
471
-			RequireSchema2:   true,
472
-		},
473
-		ConfigMediaType: schema2.MediaTypePluginConfig,
474
-		LayerStores:     lss,
475
-		UploadManager:   uploadManager,
446
+	desc, err := pm.getManifestDescriptor(ctx, p)
447
+	if err != nil {
448
+		return errors.Wrap(err, "error reading plugin manifest")
476 449
 	}
477 450
 
478
-	return distribution.Push(ctx, ref, imagePushConfig)
479
-}
451
+	progress.Messagef(out, "", "The push refers to repository [%s]", reference.FamiliarName(ref))
480 452
 
481
-type pluginReference struct {
482
-	name     reference.Named
483
-	pluginID digest.Digest
484
-}
453
+	// TODO: If a layer already exists on the registry, the progress output just says "Preparing"
454
+	go func() {
455
+		timer := time.NewTimer(100 * time.Millisecond)
456
+		defer timer.Stop()
457
+		if !timer.Stop() {
458
+			<-timer.C
459
+		}
460
+		var statuses []contentStatus
461
+		for {
462
+			timer.Reset(100 * time.Millisecond)
463
+			select {
464
+			case <-ctx.Done():
465
+				return
466
+			case <-timer.C:
467
+				statuses = pj.status()
468
+			}
485 469
 
486
-func (r *pluginReference) References(id digest.Digest) []reference.Named {
487
-	if r.pluginID != id {
488
-		return nil
489
-	}
490
-	return []reference.Named{r.name}
491
-}
470
+			for _, s := range statuses {
471
+				out.WriteProgress(progress.Progress{ID: s.Ref, Current: s.Offset, Total: s.Total, Action: s.Status, LastUpdate: s.Offset == s.Total})
472
+			}
473
+		}
474
+	}()
492 475
 
493
-func (r *pluginReference) ReferencesByName(ref reference.Named) []refstore.Association {
494
-	return []refstore.Association{
495
-		{
496
-			Ref: r.name,
497
-			ID:  r.pluginID,
498
-		},
476
+	// Make sure we can authenticate the request since the auth scope for plugin repos is different than a normal repo.
477
+	ctx = docker.WithScope(ctx, scope(ref, true))
478
+	if err := remotes.PushContent(ctx, pusher, desc, pm.blobStore, nil, func(h images.Handler) images.Handler {
479
+		return images.Handlers(progressHandler, h)
480
+	}); err != nil {
481
+		// Try fallback to http.
482
+		// This is needed because the containerd pusher will only attempt the first registry config we pass, which would
483
+		// typically be https.
484
+		// If there are no http-only host configs found we'll error out anyway.
485
+		resolver, _ := pm.newResolver(ctx, statusTracker, authConfig, metaHeader, true)
486
+		if resolver != nil {
487
+			pusher, _ := resolver.Pusher(ctx, ref.String())
488
+			if pusher != nil {
489
+				logrus.WithField("ref", ref).Debug("Re-attmpting push with http-fallback")
490
+				err2 := remotes.PushContent(ctx, pusher, desc, pm.blobStore, nil, func(h images.Handler) images.Handler {
491
+					return images.Handlers(progressHandler, h)
492
+				})
493
+				if err2 == nil {
494
+					err = nil
495
+				} else {
496
+					logrus.WithError(err2).WithField("ref", ref).Debug("Error while attempting push with http-fallback")
497
+				}
498
+			}
499
+		}
500
+		if err != nil {
501
+			return errors.Wrap(err, "error pushing plugin")
502
+		}
499 503
 	}
500
-}
501 504
 
502
-func (r *pluginReference) Get(ref reference.Named) (digest.Digest, error) {
503
-	if r.name.String() != ref.String() {
504
-		return digest.Digest(""), refstore.ErrDoesNotExist
505
+	// For blobs that already exist in the registry we need to make sure to update the progress otherwise it will just say "pending"
506
+	// TODO: How to check if the layer already exists? Is it worth it?
507
+	for _, j := range pj.jobs {
508
+		progress.Update(out, pj.names[j], "Upload complete")
505 509
 	}
506
-	return r.pluginID, nil
507
-}
508 510
 
509
-func (r *pluginReference) AddTag(ref reference.Named, id digest.Digest, force bool) error {
510
-	// Read only, ignore
511
-	return nil
512
-}
513
-func (r *pluginReference) AddDigest(ref reference.Canonical, id digest.Digest, force bool) error {
514
-	// Read only, ignore
511
+	// Signal the client for content trust verification
512
+	progress.Aux(out, types.PushResult{Tag: ref.(reference.Tagged).Tag(), Digest: desc.Digest.String(), Size: int(desc.Size)})
513
+
515 514
 	return nil
516 515
 }
517
-func (r *pluginReference) Delete(ref reference.Named) (bool, error) {
518
-	// Read only, ignore
519
-	return false, nil
520
-}
521 516
 
522
-type pluginConfigStore struct {
523
-	pm     *Manager
524
-	plugin *v2.Plugin
517
+// manifest wraps an OCI manifest, because...
518
+// Historically the registry does not support plugins unless the media type on the manifest is specifically schema2.MediaTypeManifest
519
+// So the OCI manifest media type is not supported.
520
+// Additionally, there is extra validation for the docker schema2 manifest than there is a mediatype set on the manifest itself
521
+// even though this is set on the descriptor
522
+// The OCI types do not have this field.
523
+type manifest struct {
524
+	specs.Manifest
525
+	MediaType string `json:"mediaType,omitempty"`
525 526
 }
526 527
 
527
-func (s *pluginConfigStore) Put([]byte) (digest.Digest, error) {
528
-	return digest.Digest(""), errors.New("cannot store config on push")
529
-}
528
+func buildManifest(ctx context.Context, s content.Manager, config digest.Digest, layers []digest.Digest) (manifest, error) {
529
+	var m manifest
530
+	m.MediaType = images.MediaTypeDockerSchema2Manifest
531
+	m.SchemaVersion = 2
530 532
 
531
-func (s *pluginConfigStore) Get(d digest.Digest) ([]byte, error) {
532
-	if s.plugin.Config != d {
533
-		return nil, errors.New("plugin not found")
534
-	}
535
-	rwc, err := s.pm.blobStore.Get(d)
533
+	configInfo, err := s.Info(ctx, config)
536 534
 	if err != nil {
537
-		return nil, err
535
+		return m, errors.Wrapf(err, "error reading plugin config content for digest %s", config)
536
+	}
537
+	m.Config = specs.Descriptor{
538
+		MediaType: mediaTypePluginConfig,
539
+		Size:      configInfo.Size,
540
+		Digest:    configInfo.Digest,
538 541
 	}
539
-	defer rwc.Close()
540
-	return ioutil.ReadAll(rwc)
541
-}
542
-
543
-func (s *pluginConfigStore) RootFSFromConfig(c []byte) (*image.RootFS, error) {
544
-	return configToRootFS(c)
545
-}
546
-
547
-func (s *pluginConfigStore) PlatformFromConfig(c []byte) (*specs.Platform, error) {
548
-	// TODO: LCOW/Plugins. This will need revisiting. For now use the runtime OS
549
-	return &specs.Platform{OS: runtime.GOOS}, nil
550
-}
551 542
 
552
-type pluginLayerProvider struct {
553
-	pm     *Manager
554
-	plugin *v2.Plugin
543
+	for _, l := range layers {
544
+		info, err := s.Info(ctx, l)
545
+		if err != nil {
546
+			return m, errors.Wrapf(err, "error fetching info for content digest %s", l)
547
+		}
548
+		m.Layers = append(m.Layers, specs.Descriptor{
549
+			MediaType: specs.MediaTypeImageLayerGzip, // TODO: This is assuming everything is a gzip compressed layer, but that may not be true.
550
+			Digest:    l,
551
+			Size:      info.Size,
552
+		})
553
+	}
554
+	return m, nil
555 555
 }
556 556
 
557
-func (p *pluginLayerProvider) Get(id layer.ChainID) (distribution.PushLayer, error) {
558
-	rootFS := rootFSFromPlugin(p.plugin.PluginObj.Config.Rootfs)
559
-	var i int
560
-	for i = 1; i <= len(rootFS.DiffIDs); i++ {
561
-		if layer.CreateChainID(rootFS.DiffIDs[:i]) == id {
562
-			break
557
+// getManifestDescriptor gets the OCI descriptor for a manifest
558
+// It will generate a manifest if one does not exist
559
+func (pm *Manager) getManifestDescriptor(ctx context.Context, p *v2.Plugin) (specs.Descriptor, error) {
560
+	logger := logrus.WithField("plugin", p.Name()).WithField("digest", p.Manifest)
561
+	if p.Manifest != "" {
562
+		info, err := pm.blobStore.Info(ctx, p.Manifest)
563
+		if err == nil {
564
+			desc := specs.Descriptor{
565
+				Size:      info.Size,
566
+				Digest:    info.Digest,
567
+				MediaType: images.MediaTypeDockerSchema2Manifest,
568
+			}
569
+			return desc, nil
563 570
 		}
571
+		logger.WithError(err).Debug("Could not find plugin manifest in content store")
572
+	} else {
573
+		logger.Info("Plugin does not have manifest digest")
564 574
 	}
565
-	if i > len(rootFS.DiffIDs) {
566
-		return nil, errors.New("layer not found")
567
-	}
568
-	return &pluginLayer{
569
-		pm:      p.pm,
570
-		diffIDs: rootFS.DiffIDs[:i],
571
-		blobs:   p.plugin.Blobsums[:i],
572
-	}, nil
573
-}
575
+	logger.Info("Building a new plugin manifest")
574 576
 
575
-type pluginLayer struct {
576
-	pm      *Manager
577
-	diffIDs []layer.DiffID
578
-	blobs   []digest.Digest
579
-}
577
+	manifest, err := buildManifest(ctx, pm.blobStore, p.Config, p.Blobsums)
578
+	if err != nil {
579
+		return specs.Descriptor{}, err
580
+	}
580 581
 
581
-func (l *pluginLayer) ChainID() layer.ChainID {
582
-	return layer.CreateChainID(l.diffIDs)
583
-}
582
+	desc, err := writeManifest(ctx, pm.blobStore, &manifest)
583
+	if err != nil {
584
+		return desc, err
585
+	}
584 586
 
585
-func (l *pluginLayer) DiffID() layer.DiffID {
586
-	return l.diffIDs[len(l.diffIDs)-1]
587
+	if err := pm.save(p); err != nil {
588
+		logger.WithError(err).Error("Could not save plugin with manifest digest")
589
+	}
590
+	return desc, nil
587 591
 }
588 592
 
589
-func (l *pluginLayer) Parent() distribution.PushLayer {
590
-	if len(l.diffIDs) == 1 {
591
-		return nil
593
+func writeManifest(ctx context.Context, cs content.Store, m *manifest) (specs.Descriptor, error) {
594
+	platform := platforms.DefaultSpec()
595
+	desc := specs.Descriptor{
596
+		MediaType: images.MediaTypeDockerSchema2Manifest,
597
+		Platform:  &platform,
592 598
 	}
593
-	return &pluginLayer{
594
-		pm:      l.pm,
595
-		diffIDs: l.diffIDs[:len(l.diffIDs)-1],
596
-		blobs:   l.blobs[:len(l.diffIDs)-1],
599
+	data, err := json.Marshal(m)
600
+	if err != nil {
601
+		return desc, errors.Wrap(err, "error encoding manifest")
597 602
 	}
598
-}
599
-
600
-func (l *pluginLayer) Open() (io.ReadCloser, error) {
601
-	return l.pm.blobStore.Get(l.blobs[len(l.diffIDs)-1])
602
-}
603
-
604
-func (l *pluginLayer) Size() (int64, error) {
605
-	return l.pm.blobStore.Size(l.blobs[len(l.diffIDs)-1])
606
-}
607
-
608
-func (l *pluginLayer) MediaType() string {
609
-	return schema2.MediaTypeLayer
610
-}
603
+	desc.Digest = digest.FromBytes(data)
604
+	desc.Size = int64(len(data))
611 605
 
612
-func (l *pluginLayer) Release() {
613
-	// Nothing needs to be release, no references held
606
+	if err := content.WriteBlob(ctx, cs, remotes.MakeRefKey(ctx, desc), bytes.NewReader(data), desc); err != nil {
607
+		return desc, errors.Wrap(err, "error writing plugin manifest")
608
+	}
609
+	return desc, nil
614 610
 }
615 611
 
616 612
 // Remove deletes plugin's root directory.
... ...
@@ -700,14 +642,14 @@ func (pm *Manager) CreateFromContext(ctx context.Context, tarCtx io.ReadCloser,
700 700
 	var configJSON []byte
701 701
 	rootFS := splitConfigRootFSFromTar(tarCtx, &configJSON)
702 702
 
703
-	rootFSBlob, err := pm.blobStore.New()
703
+	rootFSBlob, err := pm.blobStore.Writer(ctx, content.WithRef(name))
704 704
 	if err != nil {
705 705
 		return err
706 706
 	}
707 707
 	defer rootFSBlob.Close()
708
+
708 709
 	gzw := gzip.NewWriter(rootFSBlob)
709
-	layerDigester := digest.Canonical.Digester()
710
-	rootFSReader := io.TeeReader(rootFS, io.MultiWriter(gzw, layerDigester.Hash()))
710
+	rootFSReader := io.TeeReader(rootFS, gzw)
711 711
 
712 712
 	if err := chrootarchive.Untar(rootFSReader, tmpRootFSDir, nil); err != nil {
713 713
 		return err
... ...
@@ -736,8 +678,7 @@ func (pm *Manager) CreateFromContext(ctx context.Context, tarCtx io.ReadCloser,
736 736
 	pm.mu.Lock()
737 737
 	defer pm.mu.Unlock()
738 738
 
739
-	rootFSBlobsum, err := rootFSBlob.Commit()
740
-	if err != nil {
739
+	if err := rootFSBlob.Commit(ctx, 0, ""); err != nil {
741 740
 		return err
742 741
 	}
743 742
 	defer func() {
... ...
@@ -748,12 +689,12 @@ func (pm *Manager) CreateFromContext(ctx context.Context, tarCtx io.ReadCloser,
748 748
 
749 749
 	config.Rootfs = &types.PluginConfigRootfs{
750 750
 		Type:    "layers",
751
-		DiffIds: []string{layerDigester.Digest().String()},
751
+		DiffIds: []string{rootFSBlob.Digest().String()},
752 752
 	}
753 753
 
754 754
 	config.DockerVersion = dockerversion.Version
755 755
 
756
-	configBlob, err := pm.blobStore.New()
756
+	configBlob, err := pm.blobStore.Writer(ctx, content.WithRef(name+"-config.json"))
757 757
 	if err != nil {
758 758
 		return err
759 759
 	}
... ...
@@ -761,12 +702,23 @@ func (pm *Manager) CreateFromContext(ctx context.Context, tarCtx io.ReadCloser,
761 761
 	if err := json.NewEncoder(configBlob).Encode(config); err != nil {
762 762
 		return errors.Wrap(err, "error encoding json config")
763 763
 	}
764
-	configBlobsum, err := configBlob.Commit()
764
+	if err := configBlob.Commit(ctx, 0, ""); err != nil {
765
+		return err
766
+	}
767
+
768
+	configDigest := configBlob.Digest()
769
+	layers := []digest.Digest{rootFSBlob.Digest()}
770
+
771
+	manifest, err := buildManifest(ctx, pm.blobStore, configDigest, layers)
765 772
 	if err != nil {
766 773
 		return err
767 774
 	}
775
+	desc, err := writeManifest(ctx, pm.blobStore, &manifest)
776
+	if err != nil {
777
+		return
778
+	}
768 779
 
769
-	p, err := pm.createPlugin(name, configBlobsum, []digest.Digest{rootFSBlobsum}, tmpRootFSDir, nil)
780
+	p, err := pm.createPlugin(name, configDigest, desc.Digest, layers, tmpRootFSDir, nil)
770 781
 	if err != nil {
771 782
 		return err
772 783
 	}
773 784
deleted file mode 100644
... ...
@@ -1,190 +0,0 @@
1
-package plugin // import "github.com/docker/docker/plugin"
2
-
3
-import (
4
-	"context"
5
-	"fmt"
6
-	"io"
7
-	"io/ioutil"
8
-	"os"
9
-	"path/filepath"
10
-	"runtime"
11
-
12
-	"github.com/docker/docker/distribution/xfer"
13
-	"github.com/docker/docker/image"
14
-	"github.com/docker/docker/layer"
15
-	"github.com/docker/docker/pkg/archive"
16
-	"github.com/docker/docker/pkg/chrootarchive"
17
-	"github.com/docker/docker/pkg/progress"
18
-	digest "github.com/opencontainers/go-digest"
19
-	specs "github.com/opencontainers/image-spec/specs-go/v1"
20
-	"github.com/pkg/errors"
21
-	"github.com/sirupsen/logrus"
22
-)
23
-
24
-type blobstore interface {
25
-	New() (WriteCommitCloser, error)
26
-	Get(dgst digest.Digest) (io.ReadCloser, error)
27
-	Size(dgst digest.Digest) (int64, error)
28
-}
29
-
30
-type basicBlobStore struct {
31
-	path string
32
-}
33
-
34
-func newBasicBlobStore(p string) (*basicBlobStore, error) {
35
-	tmpdir := filepath.Join(p, "tmp")
36
-	if err := os.MkdirAll(tmpdir, 0700); err != nil {
37
-		return nil, errors.Wrapf(err, "failed to mkdir %v", p)
38
-	}
39
-	return &basicBlobStore{path: p}, nil
40
-}
41
-
42
-func (b *basicBlobStore) New() (WriteCommitCloser, error) {
43
-	f, err := ioutil.TempFile(filepath.Join(b.path, "tmp"), ".insertion")
44
-	if err != nil {
45
-		return nil, errors.Wrap(err, "failed to create temp file")
46
-	}
47
-	return newInsertion(f), nil
48
-}
49
-
50
-func (b *basicBlobStore) Get(dgst digest.Digest) (io.ReadCloser, error) {
51
-	return os.Open(filepath.Join(b.path, string(dgst.Algorithm()), dgst.Hex()))
52
-}
53
-
54
-func (b *basicBlobStore) Size(dgst digest.Digest) (int64, error) {
55
-	stat, err := os.Stat(filepath.Join(b.path, string(dgst.Algorithm()), dgst.Hex()))
56
-	if err != nil {
57
-		return 0, err
58
-	}
59
-	return stat.Size(), nil
60
-}
61
-
62
-func (b *basicBlobStore) gc(whitelist map[digest.Digest]struct{}) {
63
-	for _, alg := range []string{string(digest.Canonical)} {
64
-		items, err := ioutil.ReadDir(filepath.Join(b.path, alg))
65
-		if err != nil {
66
-			continue
67
-		}
68
-		for _, fi := range items {
69
-			if _, exists := whitelist[digest.Digest(alg+":"+fi.Name())]; !exists {
70
-				p := filepath.Join(b.path, alg, fi.Name())
71
-				err := os.RemoveAll(p)
72
-				logrus.Debugf("cleaned up blob %v: %v", p, err)
73
-			}
74
-		}
75
-	}
76
-
77
-}
78
-
79
-// WriteCommitCloser defines object that can be committed to blobstore.
80
-type WriteCommitCloser interface {
81
-	io.WriteCloser
82
-	Commit() (digest.Digest, error)
83
-}
84
-
85
-type insertion struct {
86
-	io.Writer
87
-	f        *os.File
88
-	digester digest.Digester
89
-	closed   bool
90
-}
91
-
92
-func newInsertion(tempFile *os.File) *insertion {
93
-	digester := digest.Canonical.Digester()
94
-	return &insertion{f: tempFile, digester: digester, Writer: io.MultiWriter(tempFile, digester.Hash())}
95
-}
96
-
97
-func (i *insertion) Commit() (digest.Digest, error) {
98
-	p := i.f.Name()
99
-	d := filepath.Join(filepath.Join(p, "../../"))
100
-	i.f.Sync()
101
-	defer os.RemoveAll(p)
102
-	if err := i.f.Close(); err != nil {
103
-		return "", err
104
-	}
105
-	i.closed = true
106
-	dgst := i.digester.Digest()
107
-	if err := os.MkdirAll(filepath.Join(d, string(dgst.Algorithm())), 0700); err != nil {
108
-		return "", errors.Wrapf(err, "failed to mkdir %v", d)
109
-	}
110
-	if err := os.Rename(p, filepath.Join(d, string(dgst.Algorithm()), dgst.Hex())); err != nil {
111
-		return "", errors.Wrapf(err, "failed to rename %v", p)
112
-	}
113
-	return dgst, nil
114
-}
115
-
116
-func (i *insertion) Close() error {
117
-	if i.closed {
118
-		return nil
119
-	}
120
-	defer os.RemoveAll(i.f.Name())
121
-	return i.f.Close()
122
-}
123
-
124
-type downloadManager struct {
125
-	blobStore    blobstore
126
-	tmpDir       string
127
-	blobs        []digest.Digest
128
-	configDigest digest.Digest
129
-}
130
-
131
-func (dm *downloadManager) Download(ctx context.Context, initialRootFS image.RootFS, os string, layers []xfer.DownloadDescriptor, progressOutput progress.Output) (image.RootFS, func(), error) {
132
-	for _, l := range layers {
133
-		b, err := dm.blobStore.New()
134
-		if err != nil {
135
-			return initialRootFS, nil, err
136
-		}
137
-		defer b.Close()
138
-		rc, _, err := l.Download(ctx, progressOutput)
139
-		if err != nil {
140
-			return initialRootFS, nil, errors.Wrap(err, "failed to download")
141
-		}
142
-		defer rc.Close()
143
-		r := io.TeeReader(rc, b)
144
-		inflatedLayerData, err := archive.DecompressStream(r)
145
-		if err != nil {
146
-			return initialRootFS, nil, err
147
-		}
148
-		defer inflatedLayerData.Close()
149
-		digester := digest.Canonical.Digester()
150
-		if _, err := chrootarchive.ApplyLayer(dm.tmpDir, io.TeeReader(inflatedLayerData, digester.Hash())); err != nil {
151
-			return initialRootFS, nil, err
152
-		}
153
-		initialRootFS.Append(layer.DiffID(digester.Digest()))
154
-		d, err := b.Commit()
155
-		if err != nil {
156
-			return initialRootFS, nil, err
157
-		}
158
-		dm.blobs = append(dm.blobs, d)
159
-	}
160
-	return initialRootFS, nil, nil
161
-}
162
-
163
-func (dm *downloadManager) Put(dt []byte) (digest.Digest, error) {
164
-	b, err := dm.blobStore.New()
165
-	if err != nil {
166
-		return "", err
167
-	}
168
-	defer b.Close()
169
-	n, err := b.Write(dt)
170
-	if err != nil {
171
-		return "", err
172
-	}
173
-	if n != len(dt) {
174
-		return "", io.ErrShortWrite
175
-	}
176
-	d, err := b.Commit()
177
-	dm.configDigest = d
178
-	return d, err
179
-}
180
-
181
-func (dm *downloadManager) Get(d digest.Digest) ([]byte, error) {
182
-	return nil, fmt.Errorf("digest not found")
183
-}
184
-func (dm *downloadManager) RootFSFromConfig(c []byte) (*image.RootFS, error) {
185
-	return configToRootFS(c)
186
-}
187
-func (dm *downloadManager) PlatformFromConfig(c []byte) (*specs.Platform, error) {
188
-	// TODO: LCOW/Plugins. This will need revisiting. For now use the runtime OS
189
-	return &specs.Platform{OS: runtime.GOOS}, nil
190
-}
191 1
new file mode 100644
... ...
@@ -0,0 +1,288 @@
0
+package plugin
1
+
2
+import (
3
+	"context"
4
+	"io"
5
+	"net/http"
6
+	"time"
7
+
8
+	"github.com/containerd/containerd/content"
9
+	c8derrdefs "github.com/containerd/containerd/errdefs"
10
+	"github.com/containerd/containerd/images"
11
+	"github.com/containerd/containerd/remotes"
12
+	"github.com/containerd/containerd/remotes/docker"
13
+	"github.com/docker/distribution/reference"
14
+	"github.com/docker/docker/api/types"
15
+	progressutils "github.com/docker/docker/distribution/utils"
16
+	"github.com/docker/docker/pkg/chrootarchive"
17
+	"github.com/docker/docker/pkg/ioutils"
18
+	"github.com/docker/docker/pkg/progress"
19
+	"github.com/docker/docker/pkg/stringid"
20
+	digest "github.com/opencontainers/go-digest"
21
+	specs "github.com/opencontainers/image-spec/specs-go/v1"
22
+	"github.com/pkg/errors"
23
+	"github.com/sirupsen/logrus"
24
+)
25
+
26
+const mediaTypePluginConfig = "application/vnd.docker.plugin.v1+json"
27
+
28
+// setupProgressOutput sets up the passed in writer to stream progress.
29
+//
30
+// The passed in cancel function is used by the progress writer to signal callers that there
31
+// is an issue writing to the stream.
32
+//
33
+// The returned function is used to wait for the progress writer to be finished.
34
+// Call it to make sure the progress writer is done before returning from your function as needed.
35
+func setupProgressOutput(outStream io.Writer, cancel func()) (progress.Output, func()) {
36
+	var out progress.Output
37
+	f := func() {}
38
+
39
+	if outStream != nil {
40
+		ch := make(chan progress.Progress, 100)
41
+		out = progress.ChanOutput(ch)
42
+
43
+		ctx, retCancel := context.WithCancel(context.Background())
44
+		go func() {
45
+			progressutils.WriteDistributionProgress(cancel, outStream, ch)
46
+			retCancel()
47
+		}()
48
+
49
+		f = func() {
50
+			close(ch)
51
+			<-ctx.Done()
52
+		}
53
+	} else {
54
+		out = progress.DiscardOutput()
55
+	}
56
+	return out, f
57
+}
58
+
59
+// fetch the content related to the passed in reference into the blob store and appends the provided images.Handlers
60
+// There is no need to use remotes.FetchHandler since it already gets set
61
+func (pm *Manager) fetch(ctx context.Context, ref reference.Named, auth *types.AuthConfig, out progress.Output, metaHeader http.Header, handlers ...images.Handler) (err error) {
62
+	// We need to make sure we have a domain on the reference
63
+	withDomain, err := reference.ParseNormalizedNamed(ref.String())
64
+	if err != nil {
65
+		return errors.Wrap(err, "error parsing plugin image reference")
66
+	}
67
+
68
+	// Make sure we can authenticate the request since the auth scope for plugin repos is different than a normal repo.
69
+	ctx = docker.WithScope(ctx, scope(ref, false))
70
+
71
+	// Make sure the fetch handler knows how to set a ref key for the plugin media type.
72
+	// Without this the ref key is "unknown" and we see a nasty warning message in the logs
73
+	ctx = remotes.WithMediaTypeKeyPrefix(ctx, mediaTypePluginConfig, "docker-plugin")
74
+
75
+	resolver, err := pm.newResolver(ctx, nil, auth, metaHeader, false)
76
+	if err != nil {
77
+		return err
78
+	}
79
+	resolved, desc, err := resolver.Resolve(ctx, withDomain.String())
80
+	if err != nil {
81
+		// This is backwards compatible with older versions of the distribution registry.
82
+		// The containerd client will add it's own accept header as a comma separated list of supported manifests.
83
+		// This is perfectly fine, unless you are talking to an older registry which does not split the comma separated list,
84
+		//   so it is never able to match a media type and it falls back to schema1 (yuck) and fails because our manifest the
85
+		//   fallback does not support plugin configs...
86
+		logrus.WithError(err).WithField("ref", withDomain).Debug("Error while resolving reference, falling back to backwards compatible accept header format")
87
+		headers := http.Header{}
88
+		headers.Add("Accept", images.MediaTypeDockerSchema2Manifest)
89
+		headers.Add("Accept", images.MediaTypeDockerSchema2ManifestList)
90
+		headers.Add("Accept", specs.MediaTypeImageManifest)
91
+		headers.Add("Accept", specs.MediaTypeImageIndex)
92
+		resolver, _ = pm.newResolver(ctx, nil, auth, headers, false)
93
+		if resolver != nil {
94
+			resolved, desc, err = resolver.Resolve(ctx, withDomain.String())
95
+			if err != nil {
96
+				logrus.WithError(err).WithField("ref", withDomain).Debug("Failed to resolve reference after falling back to backwards compatible accept header format")
97
+			}
98
+		}
99
+		if err != nil {
100
+			return errors.Wrap(err, "error resolving plugin reference")
101
+		}
102
+	}
103
+
104
+	fetcher, err := resolver.Fetcher(ctx, resolved)
105
+	if err != nil {
106
+		return errors.Wrap(err, "error creating plugin image fetcher")
107
+	}
108
+
109
+	fp := withFetchProgress(pm.blobStore, out, ref)
110
+	handlers = append([]images.Handler{fp, remotes.FetchHandler(pm.blobStore, fetcher)}, handlers...)
111
+	if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, desc); err != nil {
112
+		return err
113
+	}
114
+	return nil
115
+}
116
+
117
+// applyLayer makes an images.HandlerFunc which applies a fetched image rootfs layer to a directory.
118
+//
119
+// TODO(@cpuguy83) This gets run sequentially after layer pull (makes sense), however
120
+// if there are multiple layers to fetch we may end up extracting layers in the wrong
121
+// order.
122
+func applyLayer(cs content.Store, dir string, out progress.Output) images.HandlerFunc {
123
+	return func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) {
124
+		switch desc.MediaType {
125
+		case
126
+			specs.MediaTypeImageLayer,
127
+			images.MediaTypeDockerSchema2Layer,
128
+			specs.MediaTypeImageLayerGzip,
129
+			images.MediaTypeDockerSchema2LayerGzip:
130
+		default:
131
+			return nil, nil
132
+		}
133
+
134
+		ra, err := cs.ReaderAt(ctx, desc)
135
+		if err != nil {
136
+			return nil, errors.Wrapf(err, "error getting content from content store for digest %s", desc.Digest)
137
+		}
138
+
139
+		id := stringid.TruncateID(desc.Digest.String())
140
+
141
+		rc := ioutils.NewReadCloserWrapper(content.NewReader(ra), ra.Close)
142
+		pr := progress.NewProgressReader(rc, out, desc.Size, id, "Extracting")
143
+		defer pr.Close()
144
+
145
+		if _, err := chrootarchive.ApplyLayer(dir, pr); err != nil {
146
+			return nil, errors.Wrapf(err, "error applying layer for digest %s", desc.Digest)
147
+		}
148
+		progress.Update(out, id, "Complete")
149
+		return nil, nil
150
+	}
151
+}
152
+
153
+func childrenHandler(cs content.Store) images.HandlerFunc {
154
+	ch := images.ChildrenHandler(cs)
155
+	return func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) {
156
+		switch desc.MediaType {
157
+		case mediaTypePluginConfig:
158
+			return nil, nil
159
+		default:
160
+			return ch(ctx, desc)
161
+		}
162
+	}
163
+}
164
+
165
+type fetchMeta struct {
166
+	blobs    []digest.Digest
167
+	config   digest.Digest
168
+	manifest digest.Digest
169
+}
170
+
171
+func storeFetchMetadata(m *fetchMeta) images.HandlerFunc {
172
+	return func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) {
173
+		switch desc.MediaType {
174
+		case
175
+			images.MediaTypeDockerSchema2LayerForeignGzip,
176
+			images.MediaTypeDockerSchema2Layer,
177
+			specs.MediaTypeImageLayer,
178
+			specs.MediaTypeImageLayerGzip:
179
+			m.blobs = append(m.blobs, desc.Digest)
180
+		case specs.MediaTypeImageManifest, images.MediaTypeDockerSchema2Manifest:
181
+			m.manifest = desc.Digest
182
+		case mediaTypePluginConfig:
183
+			m.config = desc.Digest
184
+		}
185
+		return nil, nil
186
+	}
187
+}
188
+
189
+func validateFetchedMetadata(md fetchMeta) error {
190
+	if md.config == "" {
191
+		return errors.New("fetched plugin image but plugin config is missing")
192
+	}
193
+	if md.manifest == "" {
194
+		return errors.New("fetched plugin image but manifest is missing")
195
+	}
196
+	return nil
197
+}
198
+
199
+// withFetchProgress is a fetch handler which registers a descriptor with a progress
200
+func withFetchProgress(cs content.Store, out progress.Output, ref reference.Named) images.HandlerFunc {
201
+	return func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) {
202
+		switch desc.MediaType {
203
+		case specs.MediaTypeImageManifest, images.MediaTypeDockerSchema2Manifest:
204
+			tn := reference.TagNameOnly(ref)
205
+			tagged := tn.(reference.Tagged)
206
+			progress.Messagef(out, tagged.Tag(), "Pulling from %s", reference.FamiliarName(ref))
207
+			progress.Messagef(out, "", "Digest: %s", desc.Digest.String())
208
+			return nil, nil
209
+		case
210
+			images.MediaTypeDockerSchema2LayerGzip,
211
+			images.MediaTypeDockerSchema2Layer,
212
+			specs.MediaTypeImageLayer,
213
+			specs.MediaTypeImageLayerGzip:
214
+		default:
215
+			return nil, nil
216
+		}
217
+
218
+		id := stringid.TruncateID(desc.Digest.String())
219
+
220
+		if _, err := cs.Info(ctx, desc.Digest); err == nil {
221
+			out.WriteProgress(progress.Progress{ID: id, Action: "Already exists", LastUpdate: true})
222
+			return nil, nil
223
+		}
224
+
225
+		progress.Update(out, id, "Waiting")
226
+
227
+		key := remotes.MakeRefKey(ctx, desc)
228
+
229
+		go func() {
230
+			timer := time.NewTimer(100 * time.Millisecond)
231
+			if !timer.Stop() {
232
+				<-timer.C
233
+			}
234
+			defer timer.Stop()
235
+
236
+			var pulling bool
237
+			var ctxErr error
238
+
239
+			for {
240
+				timer.Reset(100 * time.Millisecond)
241
+
242
+				select {
243
+				case <-ctx.Done():
244
+					ctxErr = ctx.Err()
245
+					// make sure we can still fetch from the content store
246
+					// TODO: Might need to add some sort of timeout
247
+					ctx = context.Background()
248
+				case <-timer.C:
249
+				}
250
+
251
+				s, err := cs.Status(ctx, key)
252
+				if err != nil {
253
+					if !c8derrdefs.IsNotFound(err) {
254
+						logrus.WithError(err).WithField("layerDigest", desc.Digest.String()).Error("Error looking up status of plugin layer pull")
255
+						progress.Update(out, id, err.Error())
256
+						return
257
+					}
258
+
259
+					if _, err := cs.Info(ctx, desc.Digest); err == nil {
260
+						progress.Update(out, id, "Download complete")
261
+						return
262
+					}
263
+
264
+					if ctxErr != nil {
265
+						progress.Update(out, id, ctxErr.Error())
266
+						return
267
+					}
268
+
269
+					continue
270
+				}
271
+
272
+				if !pulling {
273
+					progress.Update(out, id, "Pulling fs layer")
274
+					pulling = true
275
+				}
276
+
277
+				if s.Offset == s.Total {
278
+					out.WriteProgress(progress.Progress{ID: id, Action: "Download complete", Current: s.Offset, LastUpdate: true})
279
+					return
280
+				}
281
+
282
+				out.WriteProgress(progress.Progress{ID: id, Action: "Downloading", Current: s.Offset, Total: s.Total})
283
+			}
284
+		}()
285
+		return nil, nil
286
+	}
287
+}
... ...
@@ -1,6 +1,7 @@
1 1
 package plugin // import "github.com/docker/docker/plugin"
2 2
 
3 3
 import (
4
+	"context"
4 5
 	"encoding/json"
5 6
 	"io"
6 7
 	"io/ioutil"
... ...
@@ -12,10 +13,10 @@ import (
12 12
 	"strings"
13 13
 	"sync"
14 14
 
15
+	"github.com/containerd/containerd/content"
16
+	"github.com/containerd/containerd/content/local"
15 17
 	"github.com/docker/distribution/reference"
16 18
 	"github.com/docker/docker/api/types"
17
-	"github.com/docker/docker/image"
18
-	"github.com/docker/docker/layer"
19 19
 	"github.com/docker/docker/pkg/authorization"
20 20
 	"github.com/docker/docker/pkg/ioutils"
21 21
 	"github.com/docker/docker/pkg/pubsub"
... ...
@@ -72,7 +73,7 @@ type Manager struct {
72 72
 	mu        sync.RWMutex // protects cMap
73 73
 	muGC      sync.RWMutex // protects blobstore deletions
74 74
 	cMap      map[*v2.Plugin]*controller
75
-	blobStore *basicBlobStore
75
+	blobStore content.Store
76 76
 	publisher *pubsub.Publisher
77 77
 	executor  Executor
78 78
 }
... ...
@@ -117,9 +118,9 @@ func NewManager(config ManagerConfig) (*Manager, error) {
117 117
 		return nil, err
118 118
 	}
119 119
 
120
-	manager.blobStore, err = newBasicBlobStore(filepath.Join(manager.config.Root, "storage/blobs"))
120
+	manager.blobStore, err = local.NewStore(filepath.Join(manager.config.Root, "storage"))
121 121
 	if err != nil {
122
-		return nil, err
122
+		return nil, errors.Wrap(err, "error creating plugin blob store")
123 123
 	}
124 124
 
125 125
 	manager.cMap = make(map[*v2.Plugin]*controller)
... ...
@@ -305,7 +306,15 @@ func (pm *Manager) GC() {
305 305
 		}
306 306
 	}
307 307
 
308
-	pm.blobStore.gc(whitelist)
308
+	ctx := context.TODO()
309
+	pm.blobStore.Walk(ctx, func(info content.Info) error {
310
+		_, ok := whitelist[info.Digest]
311
+		if ok {
312
+			return nil
313
+		}
314
+
315
+		return pm.blobStore.Delete(ctx, info.Digest)
316
+	})
309 317
 }
310 318
 
311 319
 type logHook struct{ id string }
... ...
@@ -357,28 +366,3 @@ func isEqualPrivilege(a, b types.PluginPrivilege) bool {
357 357
 
358 358
 	return reflect.DeepEqual(a.Value, b.Value)
359 359
 }
360
-
361
-func configToRootFS(c []byte) (*image.RootFS, error) {
362
-	var pluginConfig types.PluginConfig
363
-	if err := json.Unmarshal(c, &pluginConfig); err != nil {
364
-		return nil, err
365
-	}
366
-	// validation for empty rootfs is in distribution code
367
-	if pluginConfig.Rootfs == nil {
368
-		return nil, nil
369
-	}
370
-
371
-	return rootFSFromPlugin(pluginConfig.Rootfs), nil
372
-}
373
-
374
-func rootFSFromPlugin(pluginfs *types.PluginConfigRootfs) *image.RootFS {
375
-	rootFS := image.RootFS{
376
-		Type:    pluginfs.Type,
377
-		DiffIDs: make([]layer.DiffID, len(pluginfs.DiffIds)),
378
-	}
379
-	for i := range pluginfs.DiffIds {
380
-		rootFS.DiffIDs[i] = layer.DiffID(pluginfs.DiffIds[i])
381
-	}
382
-
383
-	return &rootFS
384
-}
... ...
@@ -1,12 +1,14 @@
1 1
 package plugin // import "github.com/docker/docker/plugin"
2 2
 
3 3
 import (
4
+	"context"
4 5
 	"encoding/json"
5 6
 	"net"
6 7
 	"os"
7 8
 	"path/filepath"
8 9
 	"time"
9 10
 
11
+	"github.com/containerd/containerd/content"
10 12
 	"github.com/docker/docker/api/types"
11 13
 	"github.com/docker/docker/daemon/initlayer"
12 14
 	"github.com/docker/docker/errdefs"
... ...
@@ -17,6 +19,7 @@ import (
17 17
 	v2 "github.com/docker/docker/plugin/v2"
18 18
 	"github.com/moby/sys/mount"
19 19
 	digest "github.com/opencontainers/go-digest"
20
+	specs "github.com/opencontainers/image-spec/specs-go/v1"
20 21
 	"github.com/pkg/errors"
21 22
 	"github.com/sirupsen/logrus"
22 23
 	"golang.org/x/sys/unix"
... ...
@@ -213,7 +216,7 @@ func (pm *Manager) Shutdown() {
213 213
 	}
214 214
 }
215 215
 
216
-func (pm *Manager) upgradePlugin(p *v2.Plugin, configDigest digest.Digest, blobsums []digest.Digest, tmpRootFSDir string, privileges *types.PluginPrivileges) (err error) {
216
+func (pm *Manager) upgradePlugin(p *v2.Plugin, configDigest, manifestDigest digest.Digest, blobsums []digest.Digest, tmpRootFSDir string, privileges *types.PluginPrivileges) (err error) {
217 217
 	config, err := pm.setupNewPlugin(configDigest, blobsums, privileges)
218 218
 	if err != nil {
219 219
 		return err
... ...
@@ -261,19 +264,22 @@ func (pm *Manager) upgradePlugin(p *v2.Plugin, configDigest digest.Digest, blobs
261 261
 	}
262 262
 
263 263
 	p.PluginObj.Config = config
264
+	p.Manifest = manifestDigest
264 265
 	err = pm.save(p)
265 266
 	return errors.Wrap(err, "error saving upgraded plugin config")
266 267
 }
267 268
 
268 269
 func (pm *Manager) setupNewPlugin(configDigest digest.Digest, blobsums []digest.Digest, privileges *types.PluginPrivileges) (types.PluginConfig, error) {
269
-	configRC, err := pm.blobStore.Get(configDigest)
270
+	configRA, err := pm.blobStore.ReaderAt(context.TODO(), specs.Descriptor{Digest: configDigest})
270 271
 	if err != nil {
271 272
 		return types.PluginConfig{}, err
272 273
 	}
273
-	defer configRC.Close()
274
+	defer configRA.Close()
275
+
276
+	configR := content.NewReader(configRA)
274 277
 
275 278
 	var config types.PluginConfig
276
-	dec := json.NewDecoder(configRC)
279
+	dec := json.NewDecoder(configR)
277 280
 	if err := dec.Decode(&config); err != nil {
278 281
 		return types.PluginConfig{}, errors.Wrapf(err, "failed to parse config")
279 282
 	}
... ...
@@ -292,7 +298,7 @@ func (pm *Manager) setupNewPlugin(configDigest digest.Digest, blobsums []digest.
292 292
 }
293 293
 
294 294
 // createPlugin creates a new plugin. take lock before calling.
295
-func (pm *Manager) createPlugin(name string, configDigest digest.Digest, blobsums []digest.Digest, rootFSDir string, privileges *types.PluginPrivileges, opts ...CreateOpt) (p *v2.Plugin, err error) {
295
+func (pm *Manager) createPlugin(name string, configDigest, manifestDigest digest.Digest, blobsums []digest.Digest, rootFSDir string, privileges *types.PluginPrivileges, opts ...CreateOpt) (p *v2.Plugin, err error) {
296 296
 	if err := pm.config.Store.validateName(name); err != nil { // todo: this check is wrong. remove store
297 297
 		return nil, errdefs.InvalidParameter(err)
298 298
 	}
... ...
@@ -310,6 +316,7 @@ func (pm *Manager) createPlugin(name string, configDigest digest.Digest, blobsum
310 310
 		},
311 311
 		Config:   configDigest,
312 312
 		Blobsums: blobsums,
313
+		Manifest: manifestDigest,
313 314
 	}
314 315
 	p.InitEmptySettings()
315 316
 	for _, o := range opts {
316 317
new file mode 100644
... ...
@@ -0,0 +1,74 @@
0
+package plugin
1
+
2
+import (
3
+	"sync"
4
+	"time"
5
+
6
+	"github.com/containerd/containerd/remotes/docker"
7
+)
8
+
9
+func newPushJobs(tracker docker.StatusTracker) *pushJobs {
10
+	return &pushJobs{
11
+		names: make(map[string]string),
12
+		t:     tracker,
13
+	}
14
+}
15
+
16
+type pushJobs struct {
17
+	t docker.StatusTracker
18
+
19
+	mu   sync.Mutex
20
+	jobs []string
21
+	// maps job ref to a name
22
+	names map[string]string
23
+}
24
+
25
+func (p *pushJobs) add(id, name string) {
26
+	p.mu.Lock()
27
+	defer p.mu.Unlock()
28
+
29
+	if _, ok := p.names[id]; ok {
30
+		return
31
+	}
32
+	p.jobs = append(p.jobs, id)
33
+	p.names[id] = name
34
+}
35
+
36
+func (p *pushJobs) status() []contentStatus {
37
+	statuses := make([]contentStatus, 0, len(p.jobs))
38
+
39
+	p.mu.Lock()
40
+	defer p.mu.Unlock()
41
+
42
+	for _, j := range p.jobs {
43
+		var s contentStatus
44
+		s.Ref = p.names[j]
45
+
46
+		status, err := p.t.GetStatus(j)
47
+		if err != nil {
48
+			s.Status = "Waiting"
49
+		} else {
50
+			s.Total = status.Total
51
+			s.Offset = status.Offset
52
+			s.StartedAt = status.StartedAt
53
+			s.UpdatedAt = status.UpdatedAt
54
+			if status.UploadUUID == "" {
55
+				s.Status = "Upload complete"
56
+			} else {
57
+				s.Status = "Uploading"
58
+			}
59
+		}
60
+		statuses = append(statuses, s)
61
+	}
62
+
63
+	return statuses
64
+}
65
+
66
+type contentStatus struct {
67
+	Status    string
68
+	Total     int64
69
+	Offset    int64
70
+	StartedAt time.Time
71
+	UpdatedAt time.Time
72
+	Ref       string
73
+}
0 74
new file mode 100644
... ...
@@ -0,0 +1,111 @@
0
+package plugin
1
+
2
+import (
3
+	"context"
4
+	"crypto/tls"
5
+	"net"
6
+	"net/http"
7
+	"time"
8
+
9
+	"github.com/sirupsen/logrus"
10
+
11
+	"github.com/docker/docker/dockerversion"
12
+
13
+	"github.com/pkg/errors"
14
+
15
+	"github.com/containerd/containerd/remotes"
16
+	"github.com/containerd/containerd/remotes/docker"
17
+	"github.com/docker/distribution/reference"
18
+	"github.com/docker/docker/api/types"
19
+)
20
+
21
+// scope builds the correct auth scope for the registry client to authorize against
22
+// By default the client currently only does a "repository:" scope with out a classifier, e.g. "(plugin)"
23
+// Without this, the client will not be able to authorize the request
24
+func scope(ref reference.Named, push bool) string {
25
+	scope := "repository(plugin):" + reference.Path(reference.TrimNamed(ref)) + ":pull"
26
+	if push {
27
+		scope += ",push"
28
+	}
29
+	return scope
30
+}
31
+
32
+func (pm *Manager) newResolver(ctx context.Context, tracker docker.StatusTracker, auth *types.AuthConfig, headers http.Header, httpFallback bool) (remotes.Resolver, error) {
33
+	if headers == nil {
34
+		headers = http.Header{}
35
+	}
36
+	headers.Add("User-Agent", dockerversion.DockerUserAgent(ctx))
37
+
38
+	return docker.NewResolver(docker.ResolverOptions{
39
+		Tracker: tracker,
40
+		Headers: headers,
41
+		Hosts:   pm.registryHostsFn(auth, httpFallback),
42
+	}), nil
43
+}
44
+
45
+func registryHTTPClient(config *tls.Config) *http.Client {
46
+	return &http.Client{
47
+		Transport: &http.Transport{
48
+			Proxy: http.ProxyFromEnvironment,
49
+			DialContext: (&net.Dialer{
50
+				Timeout:   30 * time.Second,
51
+				KeepAlive: 30 * time.Second,
52
+			}).DialContext,
53
+			TLSClientConfig:     config,
54
+			TLSHandshakeTimeout: 10 * time.Second,
55
+			IdleConnTimeout:     30 * time.Second,
56
+		},
57
+	}
58
+}
59
+
60
+func (pm *Manager) registryHostsFn(auth *types.AuthConfig, httpFallback bool) docker.RegistryHosts {
61
+	return func(hostname string) ([]docker.RegistryHost, error) {
62
+		eps, err := pm.config.RegistryService.LookupPullEndpoints(hostname)
63
+		if err != nil {
64
+			return nil, errors.Wrapf(err, "error resolving repository for %s", hostname)
65
+		}
66
+
67
+		hosts := make([]docker.RegistryHost, 0, len(eps))
68
+
69
+		for _, ep := range eps {
70
+			// forced http fallback is used only for push since the containerd pusher only ever uses the first host we
71
+			// pass to it.
72
+			// So it is the callers responsibility to retry with this flag set.
73
+			if httpFallback && ep.URL.Scheme != "http" {
74
+				logrus.WithField("registryHost", hostname).WithField("endpoint", ep).Debugf("Skipping non-http endpoint")
75
+				continue
76
+			}
77
+
78
+			caps := docker.HostCapabilityPull | docker.HostCapabilityResolve
79
+			if !ep.Mirror {
80
+				caps = caps | docker.HostCapabilityPush
81
+			}
82
+
83
+			host, err := docker.DefaultHost(ep.URL.Host)
84
+			if err != nil {
85
+				return nil, err
86
+			}
87
+
88
+			client := registryHTTPClient(ep.TLSConfig)
89
+			hosts = append(hosts, docker.RegistryHost{
90
+				Host:         host,
91
+				Scheme:       ep.URL.Scheme,
92
+				Client:       client,
93
+				Path:         "/v2",
94
+				Capabilities: caps,
95
+				Authorizer: docker.NewDockerAuthorizer(
96
+					docker.WithAuthClient(client),
97
+					docker.WithAuthCreds(func(_ string) (string, string, error) {
98
+						if auth.IdentityToken != "" {
99
+							return "", auth.IdentityToken, nil
100
+						}
101
+						return auth.Username, auth.Password, nil
102
+					}),
103
+				),
104
+			})
105
+		}
106
+		logrus.WithField("registryHost", hostname).WithField("hosts", hosts).Debug("Resolved registry hosts")
107
+
108
+		return hosts, nil
109
+	}
110
+}
... ...
@@ -25,6 +25,7 @@ type Plugin struct {
25 25
 
26 26
 	Config   digest.Digest
27 27
 	Blobsums []digest.Digest
28
+	Manifest digest.Digest
28 29
 
29 30
 	modifyRuntimeSpec func(*specs.Spec)
30 31
 
... ...
@@ -26,7 +26,15 @@ type CreateOpt func(*Config)
26 26
 // create the plugin with.
27 27
 type Config struct {
28 28
 	*types.PluginConfig
29
-	binPath string
29
+	binPath        string
30
+	RegistryConfig registry.ServiceOptions
31
+}
32
+
33
+// WithInsecureRegistry specifies that the given registry can skip host-key checking as well as fall back to plain http
34
+func WithInsecureRegistry(url string) CreateOpt {
35
+	return func(cfg *Config) {
36
+		cfg.RegistryConfig.InsecureRegistries = append(cfg.RegistryConfig.InsecureRegistries, url)
37
+	}
30 38
 }
31 39
 
32 40
 // WithBinary is a CreateOpt to set an custom binary to create the plugin with.
... ...
@@ -82,6 +90,11 @@ func CreateInRegistry(ctx context.Context, repo string, auth *types.AuthConfig,
82 82
 		return errors.Wrap(err, "error creating plugin root")
83 83
 	}
84 84
 
85
+	var cfg Config
86
+	for _, o := range opts {
87
+		o(&cfg)
88
+	}
89
+
85 90
 	tar, err := makePluginBundle(inPath, opts...)
86 91
 	if err != nil {
87 92
 		return err
... ...
@@ -92,7 +105,7 @@ func CreateInRegistry(ctx context.Context, repo string, auth *types.AuthConfig,
92 92
 		return nil, nil
93 93
 	}
94 94
 
95
-	regService, err := registry.NewService(registry.ServiceOptions{})
95
+	regService, err := registry.NewService(cfg.RegistryConfig)
96 96
 	if err != nil {
97 97
 		return err
98 98
 	}
... ...
@@ -1,5 +1,7 @@
1 1
 package registry
2 2
 
3
+import "io"
4
+
3 5
 // Schema1 sets the registry to serve v1 api
4 6
 func Schema1(c *Config) {
5 7
 	c.schema1 = true
... ...
@@ -24,3 +26,17 @@ func URL(registryURL string) func(*Config) {
24 24
 		c.registryURL = registryURL
25 25
 	}
26 26
 }
27
+
28
+// WithStdout sets the stdout of the registry command to the passed in writer.
29
+func WithStdout(w io.Writer) func(c *Config) {
30
+	return func(c *Config) {
31
+		c.stdout = w
32
+	}
33
+}
34
+
35
+// WithStderr sets the stdout of the registry command to the passed in writer.
36
+func WithStderr(w io.Writer) func(c *Config) {
37
+	return func(c *Config) {
38
+		c.stderr = w
39
+	}
40
+}
... ...
@@ -2,6 +2,7 @@ package registry // import "github.com/docker/docker/testutil/registry"
2 2
 
3 3
 import (
4 4
 	"fmt"
5
+	"io"
5 6
 	"io/ioutil"
6 7
 	"net/http"
7 8
 	"os"
... ...
@@ -40,6 +41,8 @@ type Config struct {
40 40
 	auth        string
41 41
 	tokenURL    string
42 42
 	registryURL string
43
+	stdout      io.Writer
44
+	stderr      io.Writer
43 45
 }
44 46
 
45 47
 // NewV2 creates a v2 registry server
... ...
@@ -109,6 +112,8 @@ http:
109 109
 		binary = V2binarySchema1
110 110
 	}
111 111
 	cmd := exec.Command(binary, confPath)
112
+	cmd.Stdout = c.stdout
113
+	cmd.Stderr = c.stderr
112 114
 	if err := cmd.Start(); err != nil {
113 115
 		// FIXME(vdemeester) use a defer/clean func
114 116
 		os.RemoveAll(tmp)