Browse code

Store image manifests in containerd content store

This allows us to cache manifests and avoid extra round trips to the
registry for content we already know about.

dockerd currently does not support containerd on Windows, so this does
not store manifests on Windows, yet.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>

Brian Goff authored on 2020/10/31 04:47:06
Showing 13 changed files
1 1
new file mode 100644
... ...
@@ -0,0 +1,30 @@
0
+package daemon
1
+
2
+import (
3
+	"os"
4
+	"path/filepath"
5
+
6
+	"github.com/containerd/containerd/content"
7
+	"github.com/containerd/containerd/content/local"
8
+	"github.com/containerd/containerd/leases"
9
+	"github.com/containerd/containerd/metadata"
10
+	"github.com/pkg/errors"
11
+	"go.etcd.io/bbolt"
12
+)
13
+
14
+func (d *Daemon) configureLocalContentStore() (content.Store, leases.Manager, error) {
15
+	if err := os.MkdirAll(filepath.Join(d.root, "content"), 0700); err != nil {
16
+		return nil, nil, errors.Wrap(err, "error creating dir for content store")
17
+	}
18
+	db, err := bbolt.Open(filepath.Join(d.root, "content", "metadata.db"), 0600, nil)
19
+	if err != nil {
20
+		return nil, nil, errors.Wrap(err, "error opening bolt db for content metadata store")
21
+	}
22
+	cs, err := local.NewStore(filepath.Join(d.root, "content", "data"))
23
+	if err != nil {
24
+		return nil, nil, errors.Wrap(err, "error setting up content store")
25
+	}
26
+	md := metadata.NewDB(db, cs, nil)
27
+	d.mdDB = db
28
+	return md.ContentStore(), metadata.NewLeaseManager(md), nil
29
+}
... ...
@@ -20,6 +20,7 @@ import (
20 20
 	"time"
21 21
 
22 22
 	"github.com/docker/docker/pkg/fileutils"
23
+	"go.etcd.io/bbolt"
23 24
 	"google.golang.org/grpc"
24 25
 	"google.golang.org/grpc/backoff"
25 26
 
... ...
@@ -129,6 +130,11 @@ type Daemon struct {
129 129
 
130 130
 	attachmentStore       network.AttachmentStore
131 131
 	attachableNetworkLock *locker.Locker
132
+
133
+	// This is used for Windows which doesn't currently support running on containerd
134
+	// It stores metadata for the content store (used for manifest caching)
135
+	// This needs to be closed on daemon exit
136
+	mdDB *bbolt.DB
132 137
 }
133 138
 
134 139
 // StoreHosts stores the addresses the daemon is listening on
... ...
@@ -1066,10 +1072,7 @@ func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.S
1066 1066
 
1067 1067
 	d.linkIndex = newLinkIndex()
1068 1068
 
1069
-	// TODO: imageStore, distributionMetadataStore, and ReferenceStore are only
1070
-	// used above to run migration. They could be initialized in ImageService
1071
-	// if migration is called from daemon/images. layerStore might move as well.
1072
-	d.imageService = images.NewImageService(images.ImageServiceConfig{
1069
+	imgSvcConfig := images.ImageServiceConfig{
1073 1070
 		ContainerStore:            d.containers,
1074 1071
 		DistributionMetadataStore: distributionMetadataStore,
1075 1072
 		EventsService:             d.EventsService,
... ...
@@ -1081,7 +1084,28 @@ func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.S
1081 1081
 		ReferenceStore:            rs,
1082 1082
 		RegistryService:           registryService,
1083 1083
 		TrustKey:                  trustKey,
1084
-	})
1084
+		ContentNamespace:          config.ContainerdNamespace,
1085
+	}
1086
+
1087
+	// containerd is not currently supported with Windows.
1088
+	// So sometimes d.containerdCli will be nil
1089
+	// In that case we'll create a local content store... but otherwise we'll use containerd
1090
+	if d.containerdCli != nil {
1091
+		imgSvcConfig.Leases = d.containerdCli.LeasesService()
1092
+		imgSvcConfig.ContentStore = d.containerdCli.ContentStore()
1093
+	} else {
1094
+		cs, lm, err := d.configureLocalContentStore()
1095
+		if err != nil {
1096
+			return nil, err
1097
+		}
1098
+		imgSvcConfig.ContentStore = cs
1099
+		imgSvcConfig.Leases = lm
1100
+	}
1101
+
1102
+	// TODO: imageStore, distributionMetadataStore, and ReferenceStore are only
1103
+	// used above to run migration. They could be initialized in ImageService
1104
+	// if migration is called from daemon/images. layerStore might move as well.
1105
+	d.imageService = images.NewImageService(imgSvcConfig)
1085 1106
 
1086 1107
 	go d.execCommandGC()
1087 1108
 
... ...
@@ -1246,6 +1270,10 @@ func (daemon *Daemon) Shutdown() error {
1246 1246
 		daemon.containerdCli.Close()
1247 1247
 	}
1248 1248
 
1249
+	if daemon.mdDB != nil {
1250
+		daemon.mdDB.Close()
1251
+	}
1252
+
1249 1253
 	return daemon.cleanupMounts()
1250 1254
 }
1251 1255
 
... ...
@@ -6,6 +6,8 @@ import (
6 6
 	"strings"
7 7
 	"time"
8 8
 
9
+	"github.com/containerd/containerd/leases"
10
+	"github.com/containerd/containerd/namespaces"
9 11
 	dist "github.com/docker/distribution"
10 12
 	"github.com/docker/distribution/reference"
11 13
 	"github.com/docker/docker/api/types"
... ...
@@ -16,6 +18,7 @@ import (
16 16
 	"github.com/docker/docker/registry"
17 17
 	digest "github.com/opencontainers/go-digest"
18 18
 	specs "github.com/opencontainers/image-spec/specs-go/v1"
19
+	"github.com/pkg/errors"
19 20
 )
20 21
 
21 22
 // PullImage initiates a pull operation. image is the repository name to pull, and
... ...
@@ -65,6 +68,25 @@ func (i *ImageService) pullImageWithReference(ctx context.Context, ref reference
65 65
 		close(writesDone)
66 66
 	}()
67 67
 
68
+	ctx = namespaces.WithNamespace(ctx, i.contentNamespace)
69
+	// Take out a temporary lease for everything that gets persisted to the content store.
70
+	// Before the lease is cancelled, any content we want to keep should have it's own lease applied.
71
+	ctx, done, err := tempLease(ctx, i.leases)
72
+	if err != nil {
73
+		return err
74
+	}
75
+	defer done(ctx)
76
+
77
+	cs := &contentStoreForPull{
78
+		ContentStore: i.content,
79
+		leases:       i.leases,
80
+	}
81
+	imageStore := &imageStoreForPull{
82
+		ImageConfigStore: distribution.NewImageConfigStoreFromStore(i.imageStore),
83
+		ingested:         cs,
84
+		leases:           i.leases,
85
+	}
86
+
68 87
 	imagePullConfig := &distribution.ImagePullConfig{
69 88
 		Config: distribution.Config{
70 89
 			MetaHeaders:      metaHeaders,
... ...
@@ -73,7 +95,7 @@ func (i *ImageService) pullImageWithReference(ctx context.Context, ref reference
73 73
 			RegistryService:  i.registryService,
74 74
 			ImageEventLogger: i.LogImageEvent,
75 75
 			MetadataStore:    i.distributionMetadataStore,
76
-			ImageStore:       distribution.NewImageConfigStoreFromStore(i.imageStore),
76
+			ImageStore:       imageStore,
77 77
 			ReferenceStore:   i.referenceStore,
78 78
 		},
79 79
 		DownloadManager: i.downloadManager,
... ...
@@ -81,7 +103,7 @@ func (i *ImageService) pullImageWithReference(ctx context.Context, ref reference
81 81
 		Platform:        platform,
82 82
 	}
83 83
 
84
-	err := distribution.Pull(ctx, ref, imagePullConfig)
84
+	err = distribution.Pull(ctx, ref, imagePullConfig, cs)
85 85
 	close(progressChan)
86 86
 	<-writesDone
87 87
 	return err
... ...
@@ -124,3 +146,29 @@ func (i *ImageService) GetRepository(ctx context.Context, ref reference.Named, a
124 124
 	}
125 125
 	return repository, confirmedV2, lastError
126 126
 }
127
+
128
+func tempLease(ctx context.Context, mgr leases.Manager) (context.Context, func(context.Context) error, error) {
129
+	nop := func(context.Context) error { return nil }
130
+	_, ok := leases.FromContext(ctx)
131
+	if ok {
132
+		return ctx, nop, nil
133
+	}
134
+
135
+	// Use an expiration that ensures the lease is cleaned up at some point if there is a crash, SIGKILL, etc.
136
+	opts := []leases.Opt{
137
+		leases.WithRandomID(),
138
+		leases.WithExpiration(24 * time.Hour),
139
+		leases.WithLabels(map[string]string{
140
+			"moby.lease/temporary": time.Now().UTC().Format(time.RFC3339Nano),
141
+		}),
142
+	}
143
+	l, err := mgr.Create(ctx, opts...)
144
+	if err != nil {
145
+		return ctx, nop, errors.Wrap(err, "error creating temporary lease")
146
+	}
147
+
148
+	ctx = leases.WithLease(ctx, l.ID)
149
+	return ctx, func(ctx context.Context) error {
150
+		return mgr.Delete(ctx, l)
151
+	}, nil
152
+}
... ...
@@ -5,6 +5,8 @@ import (
5 5
 	"os"
6 6
 	"runtime"
7 7
 
8
+	"github.com/containerd/containerd/content"
9
+	"github.com/containerd/containerd/leases"
8 10
 	"github.com/docker/docker/container"
9 11
 	daemonevents "github.com/docker/docker/daemon/events"
10 12
 	"github.com/docker/docker/distribution"
... ...
@@ -42,6 +44,9 @@ type ImageServiceConfig struct {
42 42
 	ReferenceStore            dockerreference.Store
43 43
 	RegistryService           registry.Service
44 44
 	TrustKey                  libtrust.PrivateKey
45
+	ContentStore              content.Store
46
+	Leases                    leases.Manager
47
+	ContentNamespace          string
45 48
 }
46 49
 
47 50
 // NewImageService returns a new ImageService from a configuration
... ...
@@ -54,12 +59,15 @@ func NewImageService(config ImageServiceConfig) *ImageService {
54 54
 		distributionMetadataStore: config.DistributionMetadataStore,
55 55
 		downloadManager:           xfer.NewLayerDownloadManager(config.LayerStores, config.MaxConcurrentDownloads, xfer.WithMaxDownloadAttempts(config.MaxDownloadAttempts)),
56 56
 		eventsService:             config.EventsService,
57
-		imageStore:                config.ImageStore,
57
+		imageStore:                &imageStoreWithLease{Store: config.ImageStore, leases: config.Leases, ns: config.ContentNamespace},
58 58
 		layerStores:               config.LayerStores,
59 59
 		referenceStore:            config.ReferenceStore,
60 60
 		registryService:           config.RegistryService,
61 61
 		trustKey:                  config.TrustKey,
62 62
 		uploadManager:             xfer.NewLayerUploadManager(config.MaxConcurrentUploads),
63
+		leases:                    config.Leases,
64
+		content:                   config.ContentStore,
65
+		contentNamespace:          config.ContentNamespace,
63 66
 	}
64 67
 }
65 68
 
... ...
@@ -76,6 +84,9 @@ type ImageService struct {
76 76
 	registryService           registry.Service
77 77
 	trustKey                  libtrust.PrivateKey
78 78
 	uploadManager             *xfer.LayerUploadManager
79
+	leases                    leases.Manager
80
+	content                   content.Store
81
+	contentNamespace          string
79 82
 }
80 83
 
81 84
 // DistributionServices provides daemon image storage services
82 85
new file mode 100644
... ...
@@ -0,0 +1,155 @@
0
+package images
1
+
2
+import (
3
+	"context"
4
+	"sync"
5
+
6
+	"github.com/containerd/containerd/content"
7
+	c8derrdefs "github.com/containerd/containerd/errdefs"
8
+	"github.com/containerd/containerd/leases"
9
+	"github.com/containerd/containerd/log"
10
+	"github.com/containerd/containerd/namespaces"
11
+	"github.com/docker/docker/distribution"
12
+	"github.com/docker/docker/image"
13
+	"github.com/docker/docker/layer"
14
+	digest "github.com/opencontainers/go-digest"
15
+	"github.com/pkg/errors"
16
+	"github.com/sirupsen/logrus"
17
+)
18
+
19
+func imageKey(dgst digest.Digest) string {
20
+	return "moby-image-" + dgst.String()
21
+}
22
+
23
+// imageStoreWithLease wraps the configured image store with one that deletes the lease
24
+// reigstered for a given image ID, if one exists
25
+//
26
+// This is used by the main image service to wrap delete calls to the real image store.
27
+type imageStoreWithLease struct {
28
+	image.Store
29
+	leases leases.Manager
30
+
31
+	// Normally we'd pass namespace down through a context.Context, however...
32
+	// The interface for image store doesn't allow this, so we store it here.
33
+	ns string
34
+}
35
+
36
+func (s *imageStoreWithLease) Delete(id image.ID) ([]layer.Metadata, error) {
37
+	ctx := namespaces.WithNamespace(context.TODO(), s.ns)
38
+	if err := s.leases.Delete(ctx, leases.Lease{ID: imageKey(digest.Digest(id))}); err != nil && !c8derrdefs.IsNotFound(err) {
39
+		return nil, errors.Wrap(err, "error deleting lease")
40
+	}
41
+	return s.Store.Delete(id)
42
+}
43
+
44
+// iamgeStoreForPull is created for each pull It wraps an underlying image store
45
+// to handle registering leases for content fetched in a single image pull.
46
+type imageStoreForPull struct {
47
+	distribution.ImageConfigStore
48
+	leases   leases.Manager
49
+	ingested *contentStoreForPull
50
+}
51
+
52
+func (s *imageStoreForPull) Put(ctx context.Context, config []byte) (digest.Digest, error) {
53
+	id, err := s.ImageConfigStore.Put(ctx, config)
54
+	if err != nil {
55
+		return "", err
56
+	}
57
+	return id, s.updateLease(ctx, id)
58
+}
59
+
60
+func (s *imageStoreForPull) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
61
+	id, err := s.ImageConfigStore.Get(ctx, dgst)
62
+	if err != nil {
63
+		return nil, err
64
+	}
65
+	return id, s.updateLease(ctx, dgst)
66
+}
67
+
68
+func (s *imageStoreForPull) updateLease(ctx context.Context, dgst digest.Digest) error {
69
+	leaseID := imageKey(dgst)
70
+	lease, err := s.leases.Create(ctx, leases.WithID(leaseID))
71
+	if err != nil {
72
+		if !c8derrdefs.IsAlreadyExists(err) {
73
+			return errors.Wrap(err, "error creating lease")
74
+		}
75
+		lease = leases.Lease{ID: leaseID}
76
+	}
77
+
78
+	digested := s.ingested.getDigested()
79
+	resource := leases.Resource{
80
+		Type: "content",
81
+	}
82
+	for _, dgst := range digested {
83
+		log.G(ctx).WithFields(logrus.Fields{
84
+			"digest": dgst,
85
+			"lease":  lease.ID,
86
+		}).Debug("Adding content digest to lease")
87
+
88
+		resource.ID = dgst.String()
89
+		if err := s.leases.AddResource(ctx, lease, resource); err != nil {
90
+			return errors.Wrapf(err, "error adding content digest to lease: %s", dgst)
91
+		}
92
+	}
93
+	return nil
94
+}
95
+
96
+// contentStoreForPull is used to wrap the configured content store to
97
+// add lease management for a single `pull`
98
+// It stores all committed digests so that `imageStoreForPull` can add
99
+// the digsted resources to the lease for an image.
100
+type contentStoreForPull struct {
101
+	distribution.ContentStore
102
+	leases leases.Manager
103
+
104
+	mu       sync.Mutex
105
+	digested []digest.Digest
106
+}
107
+
108
+func (c *contentStoreForPull) addDigested(dgst digest.Digest) {
109
+	c.mu.Lock()
110
+	c.digested = append(c.digested, dgst)
111
+	c.mu.Unlock()
112
+}
113
+
114
+func (c *contentStoreForPull) getDigested() []digest.Digest {
115
+	c.mu.Lock()
116
+	digested := make([]digest.Digest, len(c.digested))
117
+	copy(digested, c.digested)
118
+	c.mu.Unlock()
119
+	return digested
120
+}
121
+
122
+func (c *contentStoreForPull) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
123
+	w, err := c.ContentStore.Writer(ctx, opts...)
124
+	if err != nil {
125
+		if c8derrdefs.IsAlreadyExists(err) {
126
+			var cfg content.WriterOpts
127
+			for _, o := range opts {
128
+				if err := o(&cfg); err != nil {
129
+					return nil, err
130
+				}
131
+
132
+			}
133
+			c.addDigested(cfg.Desc.Digest)
134
+		}
135
+		return nil, err
136
+	}
137
+	return &contentWriter{
138
+		cs:     c,
139
+		Writer: w,
140
+	}, nil
141
+}
142
+
143
+type contentWriter struct {
144
+	cs *contentStoreForPull
145
+	content.Writer
146
+}
147
+
148
+func (w *contentWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
149
+	err := w.Writer.Commit(ctx, size, expected, opts...)
150
+	if err == nil || c8derrdefs.IsAlreadyExists(err) {
151
+		w.cs.addDigested(expected)
152
+	}
153
+	return err
154
+}
0 155
new file mode 100644
... ...
@@ -0,0 +1,124 @@
0
+package images
1
+
2
+import (
3
+	"context"
4
+	"io/ioutil"
5
+	"os"
6
+	"path/filepath"
7
+	"testing"
8
+
9
+	"github.com/containerd/containerd/content"
10
+	"github.com/containerd/containerd/content/local"
11
+	c8derrdefs "github.com/containerd/containerd/errdefs"
12
+	"github.com/containerd/containerd/leases"
13
+	"github.com/containerd/containerd/metadata"
14
+	"github.com/containerd/containerd/namespaces"
15
+	"github.com/docker/docker/image"
16
+	digest "github.com/opencontainers/go-digest"
17
+	v1 "github.com/opencontainers/image-spec/specs-go/v1"
18
+	"go.etcd.io/bbolt"
19
+	"gotest.tools/v3/assert"
20
+	"gotest.tools/v3/assert/cmp"
21
+)
22
+
23
+func setupTestStores(t *testing.T) (context.Context, content.Store, *imageStoreWithLease, func(t *testing.T)) {
24
+	dir, err := ioutil.TempDir("", t.Name())
25
+	assert.NilError(t, err)
26
+
27
+	backend, err := image.NewFSStoreBackend(filepath.Join(dir, "images"))
28
+	assert.NilError(t, err)
29
+	is, err := image.NewImageStore(backend, nil)
30
+	assert.NilError(t, err)
31
+
32
+	db, err := bbolt.Open(filepath.Join(dir, "metadata.db"), 0600, nil)
33
+	assert.NilError(t, err)
34
+
35
+	cs, err := local.NewStore(filepath.Join(dir, "content"))
36
+	assert.NilError(t, err)
37
+	mdb := metadata.NewDB(db, cs, nil)
38
+
39
+	cleanup := func(t *testing.T) {
40
+		assert.Check(t, db.Close())
41
+		assert.Check(t, os.RemoveAll(dir))
42
+	}
43
+	ctx := namespaces.WithNamespace(context.Background(), t.Name())
44
+	images := &imageStoreWithLease{Store: is, ns: t.Name(), leases: metadata.NewLeaseManager(mdb)}
45
+
46
+	return ctx, cs, images, cleanup
47
+}
48
+
49
+func TestImageDelete(t *testing.T) {
50
+	ctx, _, images, cleanup := setupTestStores(t)
51
+	defer cleanup(t)
52
+
53
+	t.Run("no lease", func(t *testing.T) {
54
+		id, err := images.Create([]byte(`{"rootFS": {}}`))
55
+		assert.NilError(t, err)
56
+		defer images.Delete(id)
57
+
58
+		ls, err := images.leases.List(ctx)
59
+		assert.NilError(t, err)
60
+		assert.Equal(t, len(ls), 0, ls)
61
+
62
+		_, err = images.Delete(id)
63
+		assert.NilError(t, err, "should not error when there is no lease")
64
+	})
65
+
66
+	t.Run("lease exists", func(t *testing.T) {
67
+		id, err := images.Create([]byte(`{"rootFS": {}}`))
68
+		assert.NilError(t, err)
69
+		defer images.Delete(id)
70
+
71
+		leaseID := imageKey(digest.Digest(id))
72
+		_, err = images.leases.Create(ctx, leases.WithID(leaseID))
73
+		assert.NilError(t, err)
74
+		defer images.leases.Delete(ctx, leases.Lease{ID: leaseID})
75
+
76
+		ls, err := images.leases.List(ctx)
77
+		assert.NilError(t, err)
78
+		assert.Check(t, cmp.Equal(len(ls), 1), ls)
79
+
80
+		_, err = images.Delete(id)
81
+		assert.NilError(t, err)
82
+
83
+		ls, err = images.leases.List(ctx)
84
+		assert.NilError(t, err)
85
+		assert.Check(t, cmp.Equal(len(ls), 0), ls)
86
+	})
87
+}
88
+
89
+func TestContentStoreForPull(t *testing.T) {
90
+	ctx, cs, is, cleanup := setupTestStores(t)
91
+	defer cleanup(t)
92
+
93
+	csP := &contentStoreForPull{
94
+		ContentStore: cs,
95
+		leases:       is.leases,
96
+	}
97
+
98
+	data := []byte(`{}`)
99
+	desc := v1.Descriptor{
100
+		Digest: digest.Canonical.FromBytes(data),
101
+		Size:   int64(len(data)),
102
+	}
103
+
104
+	w, err := csP.Writer(ctx, content.WithRef(t.Name()), content.WithDescriptor(desc))
105
+	assert.NilError(t, err)
106
+
107
+	_, err = w.Write(data)
108
+	assert.NilError(t, err)
109
+	defer w.Close()
110
+
111
+	err = w.Commit(ctx, desc.Size, desc.Digest)
112
+	assert.NilError(t, err)
113
+
114
+	assert.Equal(t, len(csP.digested), 1)
115
+	assert.Check(t, cmp.Equal(csP.digested[0], desc.Digest))
116
+
117
+	// Test already exists
118
+	csP.digested = nil
119
+	_, err = csP.Writer(ctx, content.WithRef(t.Name()), content.WithDescriptor(desc))
120
+	assert.Check(t, c8derrdefs.IsAlreadyExists(err))
121
+	assert.Equal(t, len(csP.digested), 1)
122
+	assert.Check(t, cmp.Equal(csP.digested[0], desc.Digest))
123
+}
... ...
@@ -84,8 +84,8 @@ type ImagePushConfig struct {
84 84
 // by digest. Allows getting an image configurations rootfs from the
85 85
 // configuration.
86 86
 type ImageConfigStore interface {
87
-	Put([]byte) (digest.Digest, error)
88
-	Get(digest.Digest) ([]byte, error)
87
+	Put(context.Context, []byte) (digest.Digest, error)
88
+	Get(context.Context, digest.Digest) ([]byte, error)
89 89
 	RootFSFromConfig([]byte) (*image.RootFS, error)
90 90
 	PlatformFromConfig([]byte) (*specs.Platform, error)
91 91
 }
... ...
@@ -128,12 +128,12 @@ func NewImageConfigStoreFromStore(is image.Store) ImageConfigStore {
128 128
 	}
129 129
 }
130 130
 
131
-func (s *imageConfigStore) Put(c []byte) (digest.Digest, error) {
131
+func (s *imageConfigStore) Put(_ context.Context, c []byte) (digest.Digest, error) {
132 132
 	id, err := s.Store.Create(c)
133 133
 	return digest.Digest(id), err
134 134
 }
135 135
 
136
-func (s *imageConfigStore) Get(d digest.Digest) ([]byte, error) {
136
+func (s *imageConfigStore) Get(_ context.Context, d digest.Digest) ([]byte, error) {
137 137
 	img, err := s.Store.Get(image.IDFromDigest(d))
138 138
 	if err != nil {
139 139
 		return nil, err
140 140
new file mode 100644
... ...
@@ -0,0 +1,195 @@
0
+package distribution
1
+
2
+import (
3
+	"context"
4
+	"encoding/json"
5
+	"io"
6
+	"io/ioutil"
7
+
8
+	"github.com/containerd/containerd/content"
9
+	"github.com/containerd/containerd/errdefs"
10
+	"github.com/containerd/containerd/log"
11
+	"github.com/containerd/containerd/remotes"
12
+	"github.com/docker/distribution"
13
+	"github.com/docker/distribution/manifest/schema1"
14
+	digest "github.com/opencontainers/go-digest"
15
+	specs "github.com/opencontainers/image-spec/specs-go/v1"
16
+	"github.com/pkg/errors"
17
+)
18
+
19
+// This is used by manifestStore to pare down the requirements to implement a
20
+// full distribution.ManifestService, since `Get` is all we use here.
21
+type manifestGetter interface {
22
+	Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error)
23
+}
24
+
25
+type manifestStore struct {
26
+	local  ContentStore
27
+	remote manifestGetter
28
+}
29
+
30
+// ContentStore is the interface used to persist registry blobs
31
+//
32
+// Currently this is only used to persist manifests and manifest lists.
33
+// It is exported because `distribution.Pull` takes one as an argument.
34
+type ContentStore interface {
35
+	content.Ingester
36
+	content.Provider
37
+	Info(ctx context.Context, dgst digest.Digest) (content.Info, error)
38
+	Abort(ctx context.Context, ref string) error
39
+}
40
+
41
+func (m *manifestStore) getLocal(ctx context.Context, desc specs.Descriptor) (distribution.Manifest, error) {
42
+	ra, err := m.local.ReaderAt(ctx, desc)
43
+	if err != nil {
44
+		return nil, errors.Wrap(err, "error getting content store reader")
45
+	}
46
+	defer ra.Close()
47
+
48
+	r := io.NewSectionReader(ra, 0, ra.Size())
49
+	data, err := ioutil.ReadAll(r)
50
+	if err != nil {
51
+		return nil, errors.Wrap(err, "error reading manifest from content store")
52
+	}
53
+
54
+	manifest, _, err := distribution.UnmarshalManifest(desc.MediaType, data)
55
+	if err != nil {
56
+		return nil, errors.Wrap(err, "error unmarshaling manifest from content store")
57
+	}
58
+	return manifest, nil
59
+}
60
+
61
+func (m *manifestStore) getMediaType(ctx context.Context, desc specs.Descriptor) (string, error) {
62
+	ra, err := m.local.ReaderAt(ctx, desc)
63
+	if err != nil {
64
+		return "", errors.Wrap(err, "error getting reader to detect media type")
65
+	}
66
+	defer ra.Close()
67
+
68
+	mt, err := detectManifestMediaType(ra)
69
+	if err != nil {
70
+		return "", errors.Wrap(err, "error detecting media type")
71
+	}
72
+	return mt, nil
73
+}
74
+
75
+func (m *manifestStore) Get(ctx context.Context, desc specs.Descriptor) (distribution.Manifest, error) {
76
+	l := log.G(ctx)
77
+
78
+	if desc.MediaType == "" {
79
+		// When pulling by digest we will not have the media type on the
80
+		// descriptor since we have not made a request to the registry yet
81
+		//
82
+		// We already have the digest, so we only lookup locally... by digest.
83
+		//
84
+		// Let's try to detect the media type so we can have a good ref key
85
+		// here. We may not even have the content locally, and this is fine, but
86
+		// if we do we should determine that.
87
+		mt, err := m.getMediaType(ctx, desc)
88
+		if err != nil && !errdefs.IsNotFound(err) {
89
+			l.WithError(err).Warn("Error looking up media type of content")
90
+		}
91
+		desc.MediaType = mt
92
+	}
93
+
94
+	key := remotes.MakeRefKey(ctx, desc)
95
+
96
+	// Here we open a writer to the requested content. This both gives us a
97
+	// reference to write to if indeed we need to persist it and increments the
98
+	// ref count on the content.
99
+	w, err := m.local.Writer(ctx, content.WithDescriptor(desc), content.WithRef(key))
100
+	if err != nil {
101
+		if errdefs.IsAlreadyExists(err) {
102
+			var manifest distribution.Manifest
103
+			if manifest, err = m.getLocal(ctx, desc); err == nil {
104
+				return manifest, nil
105
+			}
106
+		}
107
+		// always fallback to the remote if there is an error with the local store
108
+	}
109
+	if w != nil {
110
+		defer w.Close()
111
+	}
112
+
113
+	l.WithError(err).Debug("Fetching manifest from remote")
114
+
115
+	manifest, err := m.remote.Get(ctx, desc.Digest)
116
+	if err != nil {
117
+		if err := m.local.Abort(ctx, key); err != nil {
118
+			l.WithError(err).Warn("Error while attempting to abort content ingest")
119
+		}
120
+		return nil, err
121
+	}
122
+
123
+	if w != nil {
124
+		// if `w` is nil here, something happened with the content store, so don't bother trying to persist.
125
+		if err := m.Put(ctx, manifest, desc, w); err != nil {
126
+			if err := m.local.Abort(ctx, key); err != nil {
127
+				l.WithError(err).Warn("error aborting content ingest")
128
+			}
129
+			l.WithError(err).Warn("Error persisting manifest")
130
+		}
131
+	}
132
+	return manifest, nil
133
+}
134
+
135
+func (m *manifestStore) Put(ctx context.Context, manifest distribution.Manifest, desc specs.Descriptor, w content.Writer) error {
136
+	mt, payload, err := manifest.Payload()
137
+	if err != nil {
138
+		return err
139
+	}
140
+	desc.Size = int64(len(payload))
141
+	desc.MediaType = mt
142
+
143
+	if _, err = w.Write(payload); err != nil {
144
+		return errors.Wrap(err, "error writing manifest to content store")
145
+	}
146
+
147
+	if err := w.Commit(ctx, desc.Size, desc.Digest); err != nil {
148
+		return errors.Wrap(err, "error committing manifest to content store")
149
+	}
150
+	return nil
151
+}
152
+
153
+func detectManifestMediaType(ra content.ReaderAt) (string, error) {
154
+	dt := make([]byte, ra.Size())
155
+	if _, err := ra.ReadAt(dt, 0); err != nil {
156
+		return "", err
157
+	}
158
+
159
+	return detectManifestBlobMediaType(dt)
160
+}
161
+
162
+// This is used when the manifest store does not know the media type of a sha it
163
+// was told to get. This would currently only happen when pulling by digest.
164
+// The media type is needed so the blob can be unmarshalled properly.
165
+func detectManifestBlobMediaType(dt []byte) (string, error) {
166
+	var mfst struct {
167
+		MediaType string          `json:"mediaType"`
168
+		Config    json.RawMessage `json:"config"`   // schema2 Manifest
169
+		FSLayers  json.RawMessage `json:"fsLayers"` // schema1 Manifest
170
+	}
171
+
172
+	if err := json.Unmarshal(dt, &mfst); err != nil {
173
+		return "", err
174
+	}
175
+
176
+	// We may have a media type specified in the json, in which case that should be used.
177
+	// Docker types should generally have a media type set.
178
+	// OCI (golang) types do not have a `mediaType` defined, and it is optional in the spec.
179
+	//
180
+	// `distrubtion.UnmarshalManifest`, which is used to unmarshal this for real, checks these media type values.
181
+	// If the specified media type does not match it will error, and in some cases (docker media types) it is required.
182
+	// So pretty much if we don't have a media type we can fall back to OCI.
183
+	// This does have a special fallback for schema1 manifests just because it is easy to detect.
184
+	switch {
185
+	case mfst.MediaType != "":
186
+		return mfst.MediaType, nil
187
+	case mfst.FSLayers != nil:
188
+		return schema1.MediaTypeManifest, nil
189
+	case mfst.Config != nil:
190
+		return specs.MediaTypeImageManifest, nil
191
+	default:
192
+		return specs.MediaTypeImageIndex, nil
193
+	}
194
+}
0 195
new file mode 100644
... ...
@@ -0,0 +1,351 @@
0
+package distribution
1
+
2
+import (
3
+	"context"
4
+	"encoding/json"
5
+	"io/ioutil"
6
+	"os"
7
+	"strings"
8
+	"sync"
9
+	"testing"
10
+
11
+	"github.com/containerd/containerd/content"
12
+	"github.com/containerd/containerd/content/local"
13
+	"github.com/containerd/containerd/errdefs"
14
+	"github.com/containerd/containerd/remotes"
15
+	"github.com/docker/distribution"
16
+	"github.com/docker/distribution/manifest/ocischema"
17
+	"github.com/docker/distribution/manifest/schema1"
18
+	"github.com/google/go-cmp/cmp/cmpopts"
19
+	digest "github.com/opencontainers/go-digest"
20
+	specs "github.com/opencontainers/image-spec/specs-go/v1"
21
+	"github.com/pkg/errors"
22
+	"gotest.tools/v3/assert"
23
+	"gotest.tools/v3/assert/cmp"
24
+)
25
+
26
+type mockManifestGetter struct {
27
+	manifests map[digest.Digest]distribution.Manifest
28
+	gets      int
29
+}
30
+
31
+func (m *mockManifestGetter) Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error) {
32
+	m.gets++
33
+	manifest, ok := m.manifests[dgst]
34
+	if !ok {
35
+		return nil, distribution.ErrManifestUnknown{Tag: dgst.String()}
36
+	}
37
+	return manifest, nil
38
+}
39
+
40
+type memoryLabelStore struct {
41
+	mu     sync.Mutex
42
+	labels map[digest.Digest]map[string]string
43
+}
44
+
45
+// Get returns all the labels for the given digest
46
+func (s *memoryLabelStore) Get(dgst digest.Digest) (map[string]string, error) {
47
+	s.mu.Lock()
48
+	labels := s.labels[dgst]
49
+	s.mu.Unlock()
50
+	return labels, nil
51
+}
52
+
53
+// Set sets all the labels for a given digest
54
+func (s *memoryLabelStore) Set(dgst digest.Digest, labels map[string]string) error {
55
+	s.mu.Lock()
56
+	if s.labels == nil {
57
+		s.labels = make(map[digest.Digest]map[string]string)
58
+	}
59
+	s.labels[dgst] = labels
60
+	s.mu.Unlock()
61
+	return nil
62
+}
63
+
64
+// Update replaces the given labels for a digest,
65
+// a key with an empty value removes a label.
66
+func (s *memoryLabelStore) Update(dgst digest.Digest, update map[string]string) (map[string]string, error) {
67
+	s.mu.Lock()
68
+	defer s.mu.Unlock()
69
+
70
+	labels, ok := s.labels[dgst]
71
+	if !ok {
72
+		labels = map[string]string{}
73
+	}
74
+	for k, v := range update {
75
+		labels[k] = v
76
+	}
77
+
78
+	s.labels[dgst] = labels
79
+
80
+	return labels, nil
81
+}
82
+
83
+type testingContentStoreWrapper struct {
84
+	ContentStore
85
+	errorOnWriter error
86
+	errorOnCommit error
87
+}
88
+
89
+func (s *testingContentStoreWrapper) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
90
+	if s.errorOnWriter != nil {
91
+		return nil, s.errorOnWriter
92
+	}
93
+
94
+	w, err := s.ContentStore.Writer(ctx, opts...)
95
+	if err != nil {
96
+		return nil, err
97
+	}
98
+
99
+	if s.errorOnCommit != nil {
100
+		w = &testingContentWriterWrapper{w, s.errorOnCommit}
101
+	}
102
+	return w, nil
103
+}
104
+
105
+type testingContentWriterWrapper struct {
106
+	content.Writer
107
+	err error
108
+}
109
+
110
+func (w *testingContentWriterWrapper) Commit(ctx context.Context, size int64, dgst digest.Digest, opts ...content.Opt) error {
111
+	if w.err != nil {
112
+		// The contract for `Commit` is to always close.
113
+		// Since this is returning early before hitting the real `Commit`, we should close it here.
114
+		w.Close()
115
+		return w.err
116
+	}
117
+	return w.Writer.Commit(ctx, size, dgst, opts...)
118
+}
119
+
120
+func TestManifestStore(t *testing.T) {
121
+	ociManifest := &specs.Manifest{}
122
+	serialized, err := json.Marshal(ociManifest)
123
+	assert.NilError(t, err)
124
+	dgst := digest.Canonical.FromBytes(serialized)
125
+
126
+	setupTest := func(t *testing.T) (specs.Descriptor, *mockManifestGetter, *manifestStore, content.Store, func(*testing.T)) {
127
+		root, err := ioutil.TempDir("", strings.Replace(t.Name(), "/", "_", -1))
128
+		assert.NilError(t, err)
129
+		defer func() {
130
+			if t.Failed() {
131
+				os.RemoveAll(root)
132
+			}
133
+		}()
134
+
135
+		cs, err := local.NewLabeledStore(root, &memoryLabelStore{})
136
+		assert.NilError(t, err)
137
+
138
+		mg := &mockManifestGetter{manifests: make(map[digest.Digest]distribution.Manifest)}
139
+		store := &manifestStore{local: cs, remote: mg}
140
+		desc := specs.Descriptor{Digest: dgst, MediaType: specs.MediaTypeImageManifest, Size: int64(len(serialized))}
141
+
142
+		return desc, mg, store, cs, func(t *testing.T) {
143
+			assert.Check(t, os.RemoveAll(root))
144
+		}
145
+	}
146
+
147
+	ctx := context.Background()
148
+
149
+	m, _, err := distribution.UnmarshalManifest(specs.MediaTypeImageManifest, serialized)
150
+	assert.NilError(t, err)
151
+
152
+	writeManifest := func(t *testing.T, cs ContentStore, desc specs.Descriptor, opts ...content.Opt) {
153
+		ingestKey := remotes.MakeRefKey(ctx, desc)
154
+		w, err := cs.Writer(ctx, content.WithDescriptor(desc), content.WithRef(ingestKey))
155
+		assert.NilError(t, err)
156
+		defer func() {
157
+			if err := w.Close(); err != nil {
158
+				t.Log(err)
159
+			}
160
+			if t.Failed() {
161
+				if err := cs.Abort(ctx, ingestKey); err != nil {
162
+					t.Log(err)
163
+				}
164
+			}
165
+		}()
166
+
167
+		_, err = w.Write(serialized)
168
+		assert.NilError(t, err)
169
+
170
+		err = w.Commit(ctx, desc.Size, desc.Digest, opts...)
171
+		assert.NilError(t, err)
172
+
173
+	}
174
+
175
+	// All tests should end up with no active ingest
176
+	checkIngest := func(t *testing.T, cs content.Store, desc specs.Descriptor) {
177
+		ingestKey := remotes.MakeRefKey(ctx, desc)
178
+		_, err := cs.Status(ctx, ingestKey)
179
+		assert.Check(t, errdefs.IsNotFound(err), err)
180
+	}
181
+
182
+	t.Run("no remote or local", func(t *testing.T) {
183
+		desc, _, store, cs, teardown := setupTest(t)
184
+		defer teardown(t)
185
+
186
+		_, err = store.Get(ctx, desc)
187
+		checkIngest(t, cs, desc)
188
+		// This error is what our digest getter returns when it doesn't know about the manifest
189
+		assert.Error(t, err, distribution.ErrManifestUnknown{Tag: dgst.String()}.Error())
190
+	})
191
+
192
+	t.Run("no local cache", func(t *testing.T) {
193
+		desc, mg, store, cs, teardown := setupTest(t)
194
+		defer teardown(t)
195
+
196
+		mg.manifests[desc.Digest] = m
197
+
198
+		m2, err := store.Get(ctx, desc)
199
+		checkIngest(t, cs, desc)
200
+		assert.NilError(t, err)
201
+		assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{})))
202
+		assert.Check(t, cmp.Equal(mg.gets, 1))
203
+
204
+		i, err := cs.Info(ctx, desc.Digest)
205
+		assert.NilError(t, err)
206
+		assert.Check(t, cmp.Equal(i.Digest, desc.Digest))
207
+
208
+		// Now check again, this should not hit the remote
209
+		m2, err = store.Get(ctx, desc)
210
+		checkIngest(t, cs, desc)
211
+		assert.NilError(t, err)
212
+		assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{})))
213
+		assert.Check(t, cmp.Equal(mg.gets, 1))
214
+	})
215
+
216
+	t.Run("with local cache", func(t *testing.T) {
217
+		desc, mg, store, cs, teardown := setupTest(t)
218
+		defer teardown(t)
219
+
220
+		// first add the manifest to the coontent store
221
+		writeManifest(t, cs, desc)
222
+
223
+		// now do the get
224
+		m2, err := store.Get(ctx, desc)
225
+		checkIngest(t, cs, desc)
226
+		assert.NilError(t, err)
227
+		assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{})))
228
+		assert.Check(t, cmp.Equal(mg.gets, 0))
229
+
230
+		i, err := cs.Info(ctx, desc.Digest)
231
+		assert.NilError(t, err)
232
+		assert.Check(t, cmp.Equal(i.Digest, desc.Digest))
233
+	})
234
+
235
+	// This is for the case of pull by digest where we don't know the media type of the manifest until it's actually pulled.
236
+	t.Run("unknown media type", func(t *testing.T) {
237
+		t.Run("no cache", func(t *testing.T) {
238
+			desc, mg, store, cs, teardown := setupTest(t)
239
+			defer teardown(t)
240
+
241
+			mg.manifests[desc.Digest] = m
242
+			desc.MediaType = ""
243
+
244
+			m2, err := store.Get(ctx, desc)
245
+			checkIngest(t, cs, desc)
246
+			assert.NilError(t, err)
247
+			assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{})))
248
+			assert.Check(t, cmp.Equal(mg.gets, 1))
249
+		})
250
+
251
+		t.Run("with cache", func(t *testing.T) {
252
+			t.Run("cached manifest has media type", func(t *testing.T) {
253
+				desc, mg, store, cs, teardown := setupTest(t)
254
+				defer teardown(t)
255
+
256
+				writeManifest(t, cs, desc)
257
+				desc.MediaType = ""
258
+
259
+				m2, err := store.Get(ctx, desc)
260
+				checkIngest(t, cs, desc)
261
+				assert.NilError(t, err)
262
+				assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{})))
263
+				assert.Check(t, cmp.Equal(mg.gets, 0))
264
+			})
265
+
266
+			t.Run("cached manifest has no media type", func(t *testing.T) {
267
+				desc, mg, store, cs, teardown := setupTest(t)
268
+				defer teardown(t)
269
+
270
+				desc.MediaType = ""
271
+				writeManifest(t, cs, desc)
272
+
273
+				m2, err := store.Get(ctx, desc)
274
+				checkIngest(t, cs, desc)
275
+				assert.NilError(t, err)
276
+				assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{})))
277
+				assert.Check(t, cmp.Equal(mg.gets, 0))
278
+			})
279
+		})
280
+	})
281
+
282
+	// Test that if there is an error with the content store, for whatever
283
+	// reason, that doesn't stop us from getting the manifest.
284
+	//
285
+	// Also makes sure the ingests are aborted.
286
+	t.Run("error persisting manifest", func(t *testing.T) {
287
+		t.Run("error on writer", func(t *testing.T) {
288
+			desc, mg, store, cs, teardown := setupTest(t)
289
+			defer teardown(t)
290
+			mg.manifests[desc.Digest] = m
291
+
292
+			csW := &testingContentStoreWrapper{ContentStore: store.local, errorOnWriter: errors.New("random error")}
293
+			store.local = csW
294
+
295
+			m2, err := store.Get(ctx, desc)
296
+			checkIngest(t, cs, desc)
297
+			assert.NilError(t, err)
298
+			assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{})))
299
+			assert.Check(t, cmp.Equal(mg.gets, 1))
300
+
301
+			_, err = cs.Info(ctx, desc.Digest)
302
+			// Nothing here since we couldn't persist
303
+			assert.Check(t, errdefs.IsNotFound(err), err)
304
+		})
305
+
306
+		t.Run("error on commit", func(t *testing.T) {
307
+			desc, mg, store, cs, teardown := setupTest(t)
308
+			defer teardown(t)
309
+			mg.manifests[desc.Digest] = m
310
+
311
+			csW := &testingContentStoreWrapper{ContentStore: store.local, errorOnCommit: errors.New("random error")}
312
+			store.local = csW
313
+
314
+			m2, err := store.Get(ctx, desc)
315
+			checkIngest(t, cs, desc)
316
+			assert.NilError(t, err)
317
+			assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{})))
318
+			assert.Check(t, cmp.Equal(mg.gets, 1))
319
+
320
+			_, err = cs.Info(ctx, desc.Digest)
321
+			// Nothing here since we couldn't persist
322
+			assert.Check(t, errdefs.IsNotFound(err), err)
323
+		})
324
+	})
325
+}
326
+
327
+func TestDetectManifestBlobMediaType(t *testing.T) {
328
+	type testCase struct {
329
+		json     []byte
330
+		expected string
331
+	}
332
+	cases := map[string]testCase{
333
+		"mediaType is set":   {[]byte(`{"mediaType": "bananas"}`), "bananas"},
334
+		"oci manifest":       {[]byte(`{"config": {}}`), specs.MediaTypeImageManifest},
335
+		"schema1":            {[]byte(`{"fsLayers": []}`), schema1.MediaTypeManifest},
336
+		"oci index fallback": {[]byte(`{}`), specs.MediaTypeImageIndex},
337
+		// Make sure we prefer mediaType
338
+		"mediaType and config set":   {[]byte(`{"mediaType": "bananas", "config": {}}`), "bananas"},
339
+		"mediaType and fsLayers set": {[]byte(`{"mediaType": "bananas", "fsLayers": []}`), "bananas"},
340
+	}
341
+
342
+	for name, tc := range cases {
343
+		t.Run(name, func(t *testing.T) {
344
+			mt, err := detectManifestBlobMediaType(tc.json)
345
+			assert.NilError(t, err)
346
+			assert.Equal(t, mt, tc.expected)
347
+		})
348
+	}
349
+
350
+}
... ...
@@ -29,7 +29,7 @@ type Puller interface {
29 29
 // whether a v1 or v2 puller will be created. The other parameters are passed
30 30
 // through to the underlying puller implementation for use during the actual
31 31
 // pull operation.
32
-func newPuller(endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, imagePullConfig *ImagePullConfig) (Puller, error) {
32
+func newPuller(endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, imagePullConfig *ImagePullConfig, local ContentStore) (Puller, error) {
33 33
 	switch endpoint.Version {
34 34
 	case registry.APIVersion2:
35 35
 		return &v2Puller{
... ...
@@ -37,6 +37,9 @@ func newPuller(endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo,
37 37
 			endpoint:          endpoint,
38 38
 			config:            imagePullConfig,
39 39
 			repoInfo:          repoInfo,
40
+			manifestStore: &manifestStore{
41
+				local: local,
42
+			},
40 43
 		}, nil
41 44
 	case registry.APIVersion1:
42 45
 		return nil, fmt.Errorf("protocol version %d no longer supported. Please contact admins of registry %s", endpoint.Version, endpoint.URL)
... ...
@@ -46,7 +49,7 @@ func newPuller(endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo,
46 46
 
47 47
 // Pull initiates a pull operation. image is the repository name to pull, and
48 48
 // tag may be either empty, or indicate a specific tag to pull.
49
-func Pull(ctx context.Context, ref reference.Named, imagePullConfig *ImagePullConfig) error {
49
+func Pull(ctx context.Context, ref reference.Named, imagePullConfig *ImagePullConfig, local ContentStore) error {
50 50
 	// Resolve the Repository name from fqn to RepositoryInfo
51 51
 	repoInfo, err := imagePullConfig.RegistryService.ResolveRepository(ref)
52 52
 	if err != nil {
... ...
@@ -104,7 +107,7 @@ func Pull(ctx context.Context, ref reference.Named, imagePullConfig *ImagePullCo
104 104
 
105 105
 		logrus.Debugf("Trying to pull %s from %s %s", reference.FamiliarName(repoInfo.Name), endpoint.URL, endpoint.Version)
106 106
 
107
-		puller, err := newPuller(endpoint, repoInfo, imagePullConfig)
107
+		puller, err := newPuller(endpoint, repoInfo, imagePullConfig, local)
108 108
 		if err != nil {
109 109
 			lastErr = err
110 110
 			continue
... ...
@@ -11,6 +11,7 @@ import (
11 11
 	"runtime"
12 12
 	"strings"
13 13
 
14
+	"github.com/containerd/containerd/log"
14 15
 	"github.com/containerd/containerd/platforms"
15 16
 	"github.com/docker/distribution"
16 17
 	"github.com/docker/distribution/manifest/manifestlist"
... ...
@@ -62,7 +63,8 @@ type v2Puller struct {
62 62
 	repo              distribution.Repository
63 63
 	// confirmedV2 is set to true if we confirm we're talking to a v2
64 64
 	// registry. This is used to limit fallbacks to the v1 protocol.
65
-	confirmedV2 bool
65
+	confirmedV2   bool
66
+	manifestStore *manifestStore
66 67
 }
67 68
 
68 69
 func (p *v2Puller) Pull(ctx context.Context, ref reference.Named, platform *specs.Platform) (err error) {
... ...
@@ -73,6 +75,11 @@ func (p *v2Puller) Pull(ctx context.Context, ref reference.Named, platform *spec
73 73
 		return err
74 74
 	}
75 75
 
76
+	p.manifestStore.remote, err = p.repo.Manifests(ctx)
77
+	if err != nil {
78
+		return err
79
+	}
80
+
76 81
 	if err = p.pullV2Repository(ctx, ref, platform); err != nil {
77 82
 		if _, ok := err.(fallbackError); ok {
78 83
 			return err
... ...
@@ -330,37 +337,45 @@ func (ld *v2LayerDescriptor) Registered(diffID layer.DiffID) {
330 330
 }
331 331
 
332 332
 func (p *v2Puller) pullV2Tag(ctx context.Context, ref reference.Named, platform *specs.Platform) (tagUpdated bool, err error) {
333
-	manSvc, err := p.repo.Manifests(ctx)
334
-	if err != nil {
335
-		return false, err
336
-	}
337 333
 
338 334
 	var (
339
-		manifest    distribution.Manifest
340 335
 		tagOrDigest string // Used for logging/progress only
336
+		dgst        digest.Digest
337
+		mt          string
338
+		size        int64
341 339
 	)
342 340
 	if digested, isDigested := ref.(reference.Canonical); isDigested {
343
-		manifest, err = manSvc.Get(ctx, digested.Digest())
344
-		if err != nil {
345
-			return false, err
346
-		}
347
-		tagOrDigest = digested.Digest().String()
341
+		dgst = digested.Digest()
342
+		tagOrDigest = digested.String()
348 343
 	} else if tagged, isTagged := ref.(reference.NamedTagged); isTagged {
349 344
 		tagService := p.repo.Tags(ctx)
350 345
 		desc, err := tagService.Get(ctx, tagged.Tag())
351 346
 		if err != nil {
352 347
 			return false, allowV1Fallback(err)
353 348
 		}
354
-
355
-		manifest, err = manSvc.Get(ctx, desc.Digest)
356
-		if err != nil {
357
-			return false, err
358
-		}
349
+		dgst = desc.Digest
359 350
 		tagOrDigest = tagged.Tag()
351
+		mt = desc.MediaType
352
+		size = desc.Size
360 353
 	} else {
361 354
 		return false, fmt.Errorf("internal error: reference has neither a tag nor a digest: %s", reference.FamiliarString(ref))
362 355
 	}
363 356
 
357
+	ctx = log.WithLogger(ctx, logrus.WithFields(
358
+		logrus.Fields{
359
+			"digest": dgst,
360
+			"remote": ref,
361
+		}))
362
+
363
+	manifest, err := p.manifestStore.Get(ctx, specs.Descriptor{
364
+		MediaType: mt,
365
+		Digest:    dgst,
366
+		Size:      size,
367
+	})
368
+	if err != nil {
369
+		return false, err
370
+	}
371
+
364 372
 	if manifest == nil {
365 373
 		return false, fmt.Errorf("image manifest does not exist for tag or digest %q", tagOrDigest)
366 374
 	}
... ...
@@ -559,7 +574,7 @@ func (p *v2Puller) pullSchema1(ctx context.Context, ref reference.Reference, unv
559 559
 		return "", "", err
560 560
 	}
561 561
 
562
-	imageID, err := p.config.ImageStore.Put(config)
562
+	imageID, err := p.config.ImageStore.Put(ctx, config)
563 563
 	if err != nil {
564 564
 		return "", "", err
565 565
 	}
... ...
@@ -570,7 +585,7 @@ func (p *v2Puller) pullSchema1(ctx context.Context, ref reference.Reference, unv
570 570
 }
571 571
 
572 572
 func (p *v2Puller) pullSchema2Layers(ctx context.Context, target distribution.Descriptor, layers []distribution.Descriptor, platform *specs.Platform) (id digest.Digest, err error) {
573
-	if _, err := p.config.ImageStore.Get(target.Digest); err == nil {
573
+	if _, err := p.config.ImageStore.Get(ctx, target.Digest); err == nil {
574 574
 		// If the image already exists locally, no need to pull
575 575
 		// anything.
576 576
 		return target.Digest, nil
... ...
@@ -727,7 +742,7 @@ func (p *v2Puller) pullSchema2Layers(ctx context.Context, target distribution.De
727 727
 		}
728 728
 	}
729 729
 
730
-	imageID, err := p.config.ImageStore.Put(configJSON)
730
+	imageID, err := p.config.ImageStore.Put(ctx, configJSON)
731 731
 	if err != nil {
732 732
 		return "", err
733 733
 	}
... ...
@@ -797,23 +812,22 @@ func (p *v2Puller) pullManifestList(ctx context.Context, ref reference.Named, mf
797 797
 	if len(manifestMatches) > 1 {
798 798
 		logrus.Debugf("found multiple matches in manifest list, choosing best match %s", manifestMatches[0].Digest.String())
799 799
 	}
800
-	manifestDigest := manifestMatches[0].Digest
800
+	match := manifestMatches[0]
801 801
 
802
-	if err := checkImageCompatibility(manifestMatches[0].Platform.OS, manifestMatches[0].Platform.OSVersion); err != nil {
803
-		return "", "", err
804
-	}
805
-
806
-	manSvc, err := p.repo.Manifests(ctx)
807
-	if err != nil {
802
+	if err := checkImageCompatibility(match.Platform.OS, match.Platform.OSVersion); err != nil {
808 803
 		return "", "", err
809 804
 	}
810 805
 
811
-	manifest, err := manSvc.Get(ctx, manifestDigest)
806
+	manifest, err := p.manifestStore.Get(ctx, specs.Descriptor{
807
+		Digest:    match.Digest,
808
+		Size:      match.Size,
809
+		MediaType: match.MediaType,
810
+	})
812 811
 	if err != nil {
813 812
 		return "", "", err
814 813
 	}
815 814
 
816
-	manifestRef, err := reference.WithDigest(reference.TrimNamed(ref), manifestDigest)
815
+	manifestRef, err := reference.WithDigest(reference.TrimNamed(ref), match.Digest)
817 816
 	if err != nil {
818 817
 		return "", "", err
819 818
 	}
... ...
@@ -116,7 +116,7 @@ func (p *v2Pusher) pushV2Repository(ctx context.Context) (err error) {
116 116
 func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id digest.Digest) error {
117 117
 	logrus.Debugf("Pushing repository: %s", reference.FamiliarString(ref))
118 118
 
119
-	imgConfig, err := p.config.ImageStore.Get(id)
119
+	imgConfig, err := p.config.ImageStore.Get(ctx, id)
120 120
 	if err != nil {
121 121
 		return fmt.Errorf("could not find image from tag %s: %v", reference.FamiliarString(ref), err)
122 122
 	}
... ...
@@ -69,7 +69,7 @@ func testTokenPassThru(t *testing.T, ts *httptest.Server) {
69 69
 		},
70 70
 		Schema2Types: ImageTypes,
71 71
 	}
72
-	puller, err := newPuller(endpoint, repoInfo, imagePullConfig)
72
+	puller, err := newPuller(endpoint, repoInfo, imagePullConfig, nil)
73 73
 	if err != nil {
74 74
 		t.Fatal(err)
75 75
 	}