Browse code

Allow images to be pulled through the Docker registry

An image tagged into an OpenShift image stream from a remote repository
(with the manifest implemented) can be pulled directly from the
integrated Docker registry by a user invoking "docker pull". The
registry should look up the credentials for those remote servers and
attempt to fetch the manifest and layers. Because layers are only
addressed by repository, it's possible to get a local request (to the
image stream) and not know which remote repository it matches. This PR
will search the remote repositories on the current tag branches, and if
it finds a match, will download the blob. If the blob is local no remote
calls are made. A cache is added to track the most recent repositories
that have been identified as having a blob, this provides a small
optimization to avoid most searches.

This change also serializes the entire manifest to the OpenShift server,
instead of just the "payload". This allows all read requests to be
served directly from the server for manifests which ensures the data in
the master is complete (and could potentially be reestablished in the
future if the registry was impacted).

Clayton Coleman authored on 2016/01/15 02:55:17
Showing 11 changed files
... ...
@@ -16,3 +16,5 @@ auth:
16 16
 middleware:
17 17
   repository:
18 18
     - name: openshift
19
+      options:
20
+        pullthrough: true
... ...
@@ -337,7 +337,7 @@ func GetBootstrapClusterRoles() []authorizationapi.ClusterRole {
337 337
 				},
338 338
 				{
339 339
 					Verbs:     sets.NewString("get"),
340
-					Resources: sets.NewString("imagestreamimages", "imagestreamtags", "imagestreams"),
340
+					Resources: sets.NewString("imagestreamimages", "imagestreamtags", "imagestreams", "imagestreams/secrets"),
341 341
 				},
342 342
 				{
343 343
 					Verbs:     sets.NewString("update"),
344 344
new file mode 100644
... ...
@@ -0,0 +1,93 @@
0
+package server
1
+
2
+import (
3
+	"sync"
4
+
5
+	"github.com/hashicorp/golang-lru"
6
+
7
+	"github.com/docker/distribution/digest"
8
+)
9
+
10
+// digestToRepositoryCache maps image digests to recently seen remote repositories that
11
+// may contain that digest. Each digest is bucketed and remembering new repositories will
12
+// push old repositories out.
13
+type digestToRepositoryCache struct {
14
+	*lru.Cache
15
+}
16
+
17
+// newDigestToRepositoryCache creates a new LRU cache of image digests to possible remote
18
+// repository strings with the given size. It returns an error if the cache
19
+// cannot be created.
20
+func newDigestToRepositoryCache(size int) (digestToRepositoryCache, error) {
21
+	c, err := lru.New(size)
22
+	if err != nil {
23
+		return digestToRepositoryCache{}, err
24
+	}
25
+	return digestToRepositoryCache{Cache: c}, nil
26
+}
27
+
28
+const bucketSize = 10
29
+
30
+// RememberDigest associates a digest with a repository.
31
+func (c digestToRepositoryCache) RememberDigest(dgst digest.Digest, repo string) {
32
+	key := dgst.String()
33
+	value, ok := c.Get(key)
34
+	if !ok {
35
+		value = &repositoryBucket{}
36
+		if ok, _ := c.ContainsOrAdd(key, value); !ok {
37
+			return
38
+		}
39
+	}
40
+	repos := value.(*repositoryBucket)
41
+	repos.Add(repo)
42
+}
43
+
44
+// RepositoriesForDigest returns a list of repositories that may contain this digest.
45
+func (c digestToRepositoryCache) RepositoriesForDigest(dgst digest.Digest) []string {
46
+	value, ok := c.Get(dgst.String())
47
+	if !ok {
48
+		return nil
49
+	}
50
+	repos := value.(*repositoryBucket)
51
+	return repos.Copy()
52
+}
53
+
54
+type repositoryBucket struct {
55
+	mu   sync.Mutex
56
+	list []string
57
+}
58
+
59
+// Has returns true if the bucket contains this repository.
60
+func (i *repositoryBucket) Has(repo string) bool {
61
+	i.mu.Lock()
62
+	defer i.mu.Unlock()
63
+	for _, s := range i.list {
64
+		if s == repo {
65
+			return true
66
+		}
67
+	}
68
+	return false
69
+}
70
+
71
+// Add one or more repositories to this bucket.
72
+func (i *repositoryBucket) Add(repos ...string) {
73
+	i.mu.Lock()
74
+	defer i.mu.Unlock()
75
+	arr := i.list
76
+	for _, repo := range repos {
77
+		if len(arr) >= bucketSize {
78
+			arr = arr[1:]
79
+		}
80
+		arr = append(arr, repo)
81
+	}
82
+	i.list = arr
83
+}
84
+
85
+// Copy returns a copy of the contents of this bucket in a threadsafe fasion.
86
+func (i *repositoryBucket) Copy() []string {
87
+	i.mu.Lock()
88
+	defer i.mu.Unlock()
89
+	out := make([]string, len(i.list))
90
+	copy(out, i.list)
91
+	return out
92
+}
0 93
new file mode 100644
... ...
@@ -0,0 +1,211 @@
0
+package server
1
+
2
+import (
3
+	"io"
4
+	"net/http"
5
+	"strconv"
6
+
7
+	"github.com/docker/distribution"
8
+	"github.com/docker/distribution/context"
9
+	"github.com/docker/distribution/digest"
10
+
11
+	"k8s.io/kubernetes/pkg/api/errors"
12
+
13
+	imageapi "github.com/openshift/origin/pkg/image/api"
14
+	"github.com/openshift/origin/pkg/image/importer"
15
+)
16
+
17
+// pullthroughBlobStore wraps a distribution.BlobStore and allows remote repositories to serve blobs from remote
18
+// repositories.
19
+type pullthroughBlobStore struct {
20
+	distribution.BlobStore
21
+
22
+	repo          *repository
23
+	digestToStore map[string]distribution.BlobStore
24
+}
25
+
26
+var _ distribution.BlobStore = &pullthroughBlobStore{}
27
+
28
+// Stat makes a local check for the blob, then falls through to the other servers referenced by
29
+// the image stream and looks for those that have the layer.
30
+func (r *pullthroughBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
31
+	// check the local store for the blob
32
+	desc, err := r.BlobStore.Stat(ctx, dgst)
33
+	switch {
34
+	case err == distribution.ErrBlobUnknown:
35
+		// continue on to the code below and look up the blob in a remote store since it is not in
36
+		// the local store
37
+	case err != nil:
38
+		context.GetLogger(r.repo.ctx).Errorf("Failed to find blob %q: %#v", dgst.String(), err)
39
+		fallthrough
40
+	default:
41
+		return desc, err
42
+	}
43
+
44
+	// look up the potential remote repositories that this blob could be part of (at this time,
45
+	// we don't know which image in the image stream surfaced the content).
46
+	is, err := r.repo.getImageStream()
47
+	if err != nil {
48
+		if errors.IsNotFound(err) || errors.IsForbidden(err) {
49
+			return distribution.Descriptor{}, distribution.ErrBlobUnknown
50
+		}
51
+		context.GetLogger(r.repo.ctx).Errorf("Error retrieving image stream for blob: %s", err)
52
+		return distribution.Descriptor{}, err
53
+	}
54
+
55
+	var localRegistry string
56
+	if local, err := imageapi.ParseDockerImageReference(is.Status.DockerImageRepository); err == nil {
57
+		// TODO: normalize further?
58
+		localRegistry = local.Registry
59
+	}
60
+
61
+	retriever := r.repo.importContext()
62
+	cached := r.repo.cachedLayers.RepositoriesForDigest(dgst)
63
+
64
+	// look at the first level of tagged repositories first
65
+	search := identifyCandidateRepositories(is, localRegistry, true)
66
+	if desc, err := r.findCandidateRepository(ctx, search, cached, dgst, retriever); err == nil {
67
+		return desc, nil
68
+	}
69
+
70
+	// look at all other repositories tagged by the server
71
+	secondary := identifyCandidateRepositories(is, localRegistry, false)
72
+	for k := range search {
73
+		delete(secondary, k)
74
+	}
75
+	if desc, err := r.findCandidateRepository(ctx, secondary, cached, dgst, retriever); err == nil {
76
+		return desc, nil
77
+	}
78
+
79
+	return distribution.Descriptor{}, distribution.ErrBlobUnknown
80
+}
81
+
82
+// proxyStat attempts to locate the digest in the provided remote repository or returns an error. If the digest is found,
83
+// r.digestToStore saves the store.
84
+func (r *pullthroughBlobStore) proxyStat(ctx context.Context, retriever importer.RepositoryRetriever, ref imageapi.DockerImageReference, dgst digest.Digest) (distribution.Descriptor, error) {
85
+	context.GetLogger(r.repo.ctx).Infof("Trying to stat %q from %q", dgst, ref.Exact())
86
+	repo, err := retriever.Repository(ctx, ref.RegistryURL(), ref.RepositoryName(), false)
87
+	if err != nil {
88
+		context.GetLogger(r.repo.ctx).Errorf("Error getting remote repository for image %q: %v", ref.Exact(), err)
89
+		return distribution.Descriptor{}, err
90
+	}
91
+	pullthroughBlobStore := repo.Blobs(ctx)
92
+	desc, err := pullthroughBlobStore.Stat(r.repo.ctx, dgst)
93
+	if err != nil {
94
+		if err != distribution.ErrBlobUnknown {
95
+			context.GetLogger(r.repo.ctx).Errorf("Error getting pullthroughBlobStore for image %q: %v", ref.Exact(), err)
96
+		}
97
+		return distribution.Descriptor{}, err
98
+	}
99
+
100
+	r.digestToStore[dgst.String()] = pullthroughBlobStore
101
+	return desc, nil
102
+}
103
+
104
+// ServeBlob attempts to serve the requested digest onto w, using a remote proxy store if necessary.
105
+func (r *pullthroughBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, req *http.Request, dgst digest.Digest) error {
106
+	store, ok := r.digestToStore[dgst.String()]
107
+	if !ok {
108
+		return r.BlobStore.ServeBlob(ctx, w, req, dgst)
109
+	}
110
+
111
+	desc, err := store.Stat(ctx, dgst)
112
+	if err != nil {
113
+		context.GetLogger(r.repo.ctx).Errorf("Failed to stat digest %q: %v", dgst.String(), err)
114
+		return err
115
+	}
116
+
117
+	remoteReader, err := store.Open(ctx, dgst)
118
+	if err != nil {
119
+		context.GetLogger(r.repo.ctx).Errorf("Failure to open remote store %q: %v", dgst.String(), err)
120
+		return err
121
+	}
122
+
123
+	setResponseHeaders(w, desc.Size, desc.MediaType, dgst)
124
+
125
+	context.GetLogger(r.repo.ctx).Infof("Copying %d bytes of type %q for %q", desc.Size, desc.MediaType, dgst.String())
126
+	if _, err := io.CopyN(w, remoteReader, desc.Size); err != nil {
127
+		context.GetLogger(r.repo.ctx).Errorf("Failed copying content from remote store %q: %v", dgst.String(), err)
128
+		return err
129
+	}
130
+	return nil
131
+}
132
+
133
+// findCandidateRepository looks in search for a particular blob, referring to previously cached items
134
+func (r *pullthroughBlobStore) findCandidateRepository(ctx context.Context, search map[string]*imageapi.DockerImageReference, cachedLayers []string, dgst digest.Digest, retriever importer.RepositoryRetriever) (distribution.Descriptor, error) {
135
+	// no possible remote locations to search, exit early
136
+	if len(search) == 0 {
137
+		return distribution.Descriptor{}, distribution.ErrBlobUnknown
138
+	}
139
+
140
+	// see if any of the previously located repositories containing this digest are in this
141
+	// image stream
142
+	for _, repo := range cachedLayers {
143
+		ref, ok := search[repo]
144
+		if !ok {
145
+			continue
146
+		}
147
+		desc, err := r.proxyStat(ctx, retriever, *ref, dgst)
148
+		if err != nil {
149
+			delete(search, repo)
150
+			continue
151
+		}
152
+		context.GetLogger(r.repo.ctx).Infof("Found digest location from cache %q in %q: %v", dgst, repo, err)
153
+		return desc, nil
154
+	}
155
+
156
+	// search the remaining registries for this digest
157
+	for repo, ref := range search {
158
+		desc, err := r.proxyStat(ctx, retriever, *ref, dgst)
159
+		if err != nil {
160
+			continue
161
+		}
162
+		r.repo.cachedLayers.RememberDigest(dgst, repo)
163
+		context.GetLogger(r.repo.ctx).Infof("Found digest location by search %q in %q: %v", dgst, repo, err)
164
+		return desc, nil
165
+	}
166
+
167
+	return distribution.Descriptor{}, distribution.ErrBlobUnknown
168
+}
169
+
170
+// identifyCandidateRepositories returns a map of remote repositories referenced by this image stream.
171
+func identifyCandidateRepositories(is *imageapi.ImageStream, localRegistry string, primary bool) map[string]*imageapi.DockerImageReference {
172
+	// identify the canonical location of referenced registries to search
173
+	search := make(map[string]*imageapi.DockerImageReference)
174
+	for _, tagEvent := range is.Status.Tags {
175
+		var candidates []imageapi.TagEvent
176
+		if primary {
177
+			if len(tagEvent.Items) == 0 {
178
+				continue
179
+			}
180
+			candidates = tagEvent.Items[:1]
181
+		} else {
182
+			if len(tagEvent.Items) <= 1 {
183
+				continue
184
+			}
185
+			candidates = tagEvent.Items[1:]
186
+		}
187
+		for _, event := range candidates {
188
+			ref, err := imageapi.ParseDockerImageReference(event.DockerImageReference)
189
+			if err != nil {
190
+				continue
191
+			}
192
+			// skip anything that matches the innate registry
193
+			// TODO: there may be a better way to make this determination
194
+			if len(localRegistry) != 0 && localRegistry == ref.Registry {
195
+				continue
196
+			}
197
+			ref = ref.DockerClientDefaults()
198
+			search[ref.AsRepository().Exact()] = &ref
199
+		}
200
+	}
201
+	return search
202
+}
203
+
204
+// setResponseHeaders sets the appropriate content serving headers
205
+func setResponseHeaders(w http.ResponseWriter, length int64, mediaType string, digest digest.Digest) {
206
+	w.Header().Set("Content-Length", strconv.FormatInt(length, 10))
207
+	w.Header().Set("Content-Type", mediaType)
208
+	w.Header().Set("Docker-Content-Digest", digest.String())
209
+	w.Header().Set("Etag", digest.String())
210
+}
... ...
@@ -14,17 +14,30 @@ import (
14 14
 	"github.com/docker/distribution/manifest/schema1"
15 15
 	repomw "github.com/docker/distribution/registry/middleware/repository"
16 16
 	"github.com/docker/libtrust"
17
+
17 18
 	kapi "k8s.io/kubernetes/pkg/api"
18 19
 	kerrors "k8s.io/kubernetes/pkg/api/errors"
19 20
 
20 21
 	"github.com/openshift/origin/pkg/client"
21 22
 	imageapi "github.com/openshift/origin/pkg/image/api"
23
+	"github.com/openshift/origin/pkg/image/importer"
22 24
 )
23 25
 
26
+// cachedLayers is a shared cache of blob digests to remote repositories that have previously
27
+// been identified as containing that blob. Thread safe and reused by all middleware layers.
28
+var cachedLayers digestToRepositoryCache
29
+
24 30
 func init() {
31
+	cache, err := newDigestToRepositoryCache(1024)
32
+	if err != nil {
33
+		panic(err)
34
+	}
35
+	cachedLayers = cache
25 36
 	repomw.Register("openshift", repomw.InitFunc(newRepository))
26 37
 }
27 38
 
39
+// repository wraps a distribution.Repository and allows manifests to be served from the OpenShift image
40
+// API.
28 41
 type repository struct {
29 42
 	distribution.Repository
30 43
 
... ...
@@ -33,6 +46,14 @@ type repository struct {
33 33
 	registryAddr   string
34 34
 	namespace      string
35 35
 	name           string
36
+
37
+	// if true, the repository will check remote references in the image stream to support pulling "through"
38
+	// from a remote repository
39
+	pullthrough bool
40
+	// cachedLayers remembers a mapping of layer digest to repositories recently seen with that image to avoid
41
+	// having to check every potential upstream repository when a blob request is made. The cache is useful only
42
+	// when session affinity is on for the registry, but in practice the first pull will fill the cache.
43
+	cachedLayers digestToRepositoryCache
36 44
 }
37 45
 
38 46
 var _ distribution.ManifestService = &repository{}
... ...
@@ -44,6 +65,13 @@ func newRepository(ctx context.Context, repo distribution.Repository, options ma
44 44
 		return nil, errors.New("DOCKER_REGISTRY_URL is required")
45 45
 	}
46 46
 
47
+	pullthrough := false
48
+	if value, ok := options["pullthrough"]; ok {
49
+		if b, ok := value.(bool); ok {
50
+			pullthrough = b
51
+		}
52
+	}
53
+
47 54
 	registryClient, err := NewRegistryOpenShiftClient()
48 55
 	if err != nil {
49 56
 		return nil, err
... ...
@@ -62,6 +90,8 @@ func newRepository(ctx context.Context, repo distribution.Repository, options ma
62 62
 		registryAddr:   registryAddr,
63 63
 		namespace:      nameParts[0],
64 64
 		name:           nameParts[1],
65
+		pullthrough:    pullthrough,
66
+		cachedLayers:   cachedLayers,
65 67
 	}, nil
66 68
 }
67 69
 
... ...
@@ -75,6 +105,22 @@ func (r *repository) Manifests(ctx context.Context, options ...distribution.Mani
75 75
 	return &repo, nil
76 76
 }
77 77
 
78
+// Blobs returns a blob store which can delegate to remote repositories.
79
+func (r *repository) Blobs(ctx context.Context) distribution.BlobStore {
80
+	if !r.pullthrough {
81
+		return r.Repository.Blobs(ctx)
82
+	}
83
+
84
+	repo := repository(*r)
85
+	repo.ctx = ctx
86
+	return &pullthroughBlobStore{
87
+		BlobStore: r.Repository.Blobs(ctx),
88
+
89
+		repo:          &repo,
90
+		digestToStore: make(map[string]distribution.BlobStore),
91
+	}
92
+}
93
+
78 94
 // Tags lists the tags under the named repository.
79 95
 func (r *repository) Tags() ([]string, error) {
80 96
 	imageStream, err := r.getImageStream()
... ...
@@ -121,7 +167,8 @@ func (r *repository) Get(dgst digest.Digest) (*schema1.SignedManifest, error) {
121 121
 		return nil, err
122 122
 	}
123 123
 
124
-	return r.manifestFromImage(image)
124
+	ref := imageapi.DockerImageReference{Namespace: r.namespace, Name: r.name, Registry: r.registryAddr}
125
+	return r.manifestFromImageWithCachedLayers(image, ref.DockerClientDefaults().Exact())
125 126
 }
126 127
 
127 128
 // Enumerate retrieves digests of manifest revisions in particular repository
... ...
@@ -136,26 +183,107 @@ func (r *repository) GetByTag(tag string, options ...distribution.ManifestServic
136 136
 			return nil, err
137 137
 		}
138 138
 	}
139
+
140
+	// find the image mapped to this tag
139 141
 	imageStreamTag, err := r.getImageStreamTag(tag)
140 142
 	if err != nil {
143
+		// TODO: typed errors
141 144
 		context.GetLogger(r.ctx).Errorf("Error getting ImageStreamTag %q: %v", tag, err)
142 145
 		return nil, err
143 146
 	}
144 147
 	image := &imageStreamTag.Image
145 148
 
149
+	ref, referenceErr := imageapi.ParseDockerImageReference(image.DockerImageReference)
150
+	if referenceErr == nil {
151
+		ref.Namespace = r.namespace
152
+		ref.Name = r.name
153
+		ref.Registry = r.registryAddr
154
+	}
155
+	defaultRef := ref.DockerClientDefaults()
156
+	cacheName := defaultRef.AsRepository().Exact()
157
+
158
+	// if we have a local manifest, use it
159
+	if len(image.DockerImageManifest) > 0 {
160
+		return r.manifestFromImageWithCachedLayers(image, cacheName)
161
+	}
162
+
146 163
 	dgst, err := digest.ParseDigest(imageStreamTag.Image.Name)
147 164
 	if err != nil {
148 165
 		context.GetLogger(r.ctx).Errorf("Error parsing digest %q: %v", imageStreamTag.Image.Name, err)
149 166
 		return nil, err
150 167
 	}
151 168
 
152
-	image, err = r.getImage(dgst)
169
+	if localImage, err := r.getImage(dgst); err != nil {
170
+		// if the image is managed by OpenShift and we cannot load the image, report an error
171
+		if image.Annotations[imageapi.ManagedByOpenShiftAnnotation] == "true" {
172
+			context.GetLogger(r.ctx).Errorf("Error getting image %q: %v", dgst.String(), err)
173
+			return nil, err
174
+		}
175
+	} else {
176
+		// if we have a local manifest, use it
177
+		if len(localImage.DockerImageManifest) > 0 {
178
+			return r.manifestFromImageWithCachedLayers(localImage, cacheName)
179
+		}
180
+	}
181
+
182
+	// allow pullthrough to be disabled
183
+	if !r.pullthrough {
184
+		return nil, distribution.ErrManifestBlobUnknown{Digest: dgst}
185
+	}
186
+
187
+	// check the previous error here
188
+	if referenceErr != nil {
189
+		context.GetLogger(r.ctx).Errorf("Error parsing image %q: %v", image.DockerImageReference, referenceErr)
190
+		return nil, referenceErr
191
+	}
192
+
193
+	return r.pullthroughGetByTag(image, ref, cacheName, options...)
194
+}
195
+
196
+// pullthroughGetByTag attempts to load the given image manifest from the remote server defined by ref, using cacheName to store any cached layers.
197
+func (r *repository) pullthroughGetByTag(image *imageapi.Image, ref imageapi.DockerImageReference, cacheName string, options ...distribution.ManifestServiceOption) (*schema1.SignedManifest, error) {
198
+	defaultRef := ref.DockerClientDefaults()
199
+
200
+	retriever := r.importContext()
201
+
202
+	repo, err := retriever.Repository(r.ctx, defaultRef.RegistryURL(), defaultRef.RepositoryName(), false)
203
+	if err != nil {
204
+		context.GetLogger(r.ctx).Errorf("Error getting remote repository for image %q: %v", image.DockerImageReference, err)
205
+		return nil, err
206
+	}
207
+
208
+	// get a manifest context
209
+	manifests, err := repo.Manifests(r.ctx)
210
+	if err != nil {
211
+		context.GetLogger(r.ctx).Errorf("Error getting manifests for image %q: %v", image.DockerImageReference, err)
212
+		return nil, err
213
+	}
214
+
215
+	// fetch this by image
216
+	if len(ref.ID) > 0 {
217
+		dgst, err := digest.ParseDigest(ref.ID)
218
+		if err != nil {
219
+			context.GetLogger(r.ctx).Errorf("Error getting manifests for image %q: %v", image.DockerImageReference, err)
220
+			return nil, err
221
+		}
222
+		manifest, err := manifests.Get(dgst)
223
+		if err != nil {
224
+			context.GetLogger(r.ctx).Errorf("Error getting manifest from remote server for image %q: %v", image.DockerImageReference, err)
225
+			return nil, err
226
+		}
227
+		r.rememberLayers(manifest, cacheName)
228
+		return manifest, nil
229
+	}
230
+
231
+	// fetch this by tag
232
+	manifest, err := manifests.GetByTag(ref.Tag, options...)
153 233
 	if err != nil {
154
-		context.GetLogger(r.ctx).Errorf("Error getting image %q: %v", dgst.String(), err)
234
+		context.GetLogger(r.ctx).Errorf("Error getting manifest from remote server for image %q: %v", image.DockerImageReference, err)
155 235
 		return nil, err
156 236
 	}
157 237
 
158
-	return r.manifestFromImage(image)
238
+	r.rememberLayers(manifest, cacheName)
239
+	return manifest, nil
159 240
 }
160 241
 
161 242
 // Put creates or updates the named manifest.
... ...
@@ -187,7 +315,7 @@ func (r *repository) Put(manifest *schema1.SignedManifest) error {
187 187
 				},
188 188
 			},
189 189
 			DockerImageReference: fmt.Sprintf("%s/%s/%s@%s", r.registryAddr, r.namespace, r.name, dgst.String()),
190
-			DockerImageManifest:  string(payload),
190
+			DockerImageManifest:  string(manifest.Raw),
191 191
 		},
192 192
 	}
193 193
 
... ...
@@ -256,6 +384,18 @@ func (r *repository) Delete(dgst digest.Digest) error {
256 256
 	return ms.Delete(dgst)
257 257
 }
258 258
 
259
+// importContext loads secrets for this image stream and returns a context for getting distribution
260
+// clients to remote repositories.
261
+func (r *repository) importContext() importer.RepositoryRetriever {
262
+	secrets, err := r.registryClient.ImageStreamSecrets(r.namespace).Secrets(r.name, kapi.ListOptions{})
263
+	if err != nil {
264
+		context.GetLogger(r.ctx).Errorf("Error getting secrets for repository %q: %v", r.Name(), err)
265
+		secrets = &kapi.SecretList{}
266
+	}
267
+	credentials := importer.NewCredentialsForSecrets(secrets.Items)
268
+	return importer.NewContext(http.DefaultTransport).WithCredentials(credentials)
269
+}
270
+
259 271
 // getImageStream retrieves the ImageStream for r.
260 272
 func (r *repository) getImageStream() (*imageapi.ImageStream, error) {
261 273
 	return r.registryClient.ImageStreams(r.namespace).Get(r.name)
... ...
@@ -278,6 +418,27 @@ func (r *repository) getImageStreamImage(dgst digest.Digest) (*imageapi.ImageStr
278 278
 	return r.registryClient.ImageStreamImages(r.namespace).Get(r.name, dgst.String())
279 279
 }
280 280
 
281
+// rememberLayers caches the provided layers
282
+func (r *repository) rememberLayers(manifest *schema1.SignedManifest, cacheName string) {
283
+	if !r.pullthrough {
284
+		return
285
+	}
286
+	// remember the layers in the cache as an optimization to avoid searching all remote repositories
287
+	for _, layer := range manifest.FSLayers {
288
+		r.cachedLayers.RememberDigest(layer.BlobSum, cacheName)
289
+	}
290
+}
291
+
292
+// manifestFromImageWithCachedLayers loads the image and then caches any located layers
293
+func (r *repository) manifestFromImageWithCachedLayers(image *imageapi.Image, cacheName string) (*schema1.SignedManifest, error) {
294
+	manifest, err := r.manifestFromImage(image)
295
+	if err != nil {
296
+		return nil, err
297
+	}
298
+	r.rememberLayers(manifest, cacheName)
299
+	return manifest, nil
300
+}
301
+
281 302
 // manifestFromImage converts an Image to a SignedManifest.
282 303
 func (r *repository) manifestFromImage(image *imageapi.Image) (*schema1.SignedManifest, error) {
283 304
 	dgst, err := digest.ParseDigest(image.Name)
... ...
@@ -285,19 +446,29 @@ func (r *repository) manifestFromImage(image *imageapi.Image) (*schema1.SignedMa
285 285
 		return nil, err
286 286
 	}
287 287
 
288
+	raw := []byte(image.DockerImageManifest)
289
+
290
+	// prefer signatures from the manifest
291
+	if _, err := libtrust.ParsePrettySignature(raw, "signatures"); err == nil {
292
+		sm := schema1.SignedManifest{Raw: raw}
293
+		if err := json.Unmarshal(raw, &sm); err == nil {
294
+			return &sm, nil
295
+		}
296
+	}
297
+
288 298
 	// Fetch the signatures for the manifest
289 299
 	signatures, err := r.Signatures().Get(dgst)
290 300
 	if err != nil {
291 301
 		return nil, err
292 302
 	}
293 303
 
294
-	jsig, err := libtrust.NewJSONSignature([]byte(image.DockerImageManifest), signatures...)
304
+	jsig, err := libtrust.NewJSONSignature(raw, signatures...)
295 305
 	if err != nil {
296 306
 		return nil, err
297 307
 	}
298 308
 
299 309
 	// Extract the pretty JWS
300
-	raw, err := jsig.PrettySignature("signatures")
310
+	raw, err = jsig.PrettySignature("signatures")
301 311
 	if err != nil {
302 312
 		return nil, err
303 313
 	}
... ...
@@ -14,6 +14,7 @@ import (
14 14
 
15 15
 	"github.com/blang/semver"
16 16
 	"github.com/docker/distribution/digest"
17
+	"github.com/docker/distribution/manifest/schema1"
17 18
 	"github.com/golang/glog"
18 19
 )
19 20
 
... ...
@@ -298,7 +299,28 @@ func NormalizeImageStreamTag(name string) string {
298 298
 	return name
299 299
 }
300 300
 
301
-// ImageWithMetadata modifies the image to fill in DockerImageMetadata
301
+// ManifestMatchesImage returns true if the provided manifest matches the name of the image.
302
+func ManifestMatchesImage(image *Image, newManifest []byte) (bool, error) {
303
+	dgst, err := digest.ParseDigest(image.Name)
304
+	if err != nil {
305
+		return false, err
306
+	}
307
+	v, err := digest.NewDigestVerifier(dgst)
308
+	if err != nil {
309
+		return false, err
310
+	}
311
+	sm := schema1.SignedManifest{Raw: newManifest}
312
+	raw, err := sm.Payload()
313
+	if err != nil {
314
+		return false, err
315
+	}
316
+	if _, err := v.Write(raw); err != nil {
317
+		return false, err
318
+	}
319
+	return v.Verified(), nil
320
+}
321
+
322
+// ImageWithMetadata returns a copy of image with the DockerImageMetadata filled in
302 323
 // from the raw DockerImageManifest data stored in the image.
303 324
 func ImageWithMetadata(image *Image) error {
304 325
 	if len(image.DockerImageManifest) == 0 {
... ...
@@ -66,7 +66,7 @@ type ImageStreamImporter struct {
66 66
 	retriever RepositoryRetriever
67 67
 	limiter   util.RateLimiter
68 68
 
69
-	imageCache map[gocontext.Context]map[manifestKey]*api.Image
69
+	digestToRepositoryCache map[gocontext.Context]map[manifestKey]*api.Image
70 70
 }
71 71
 
72 72
 // NewImageStreamImport creates an importer that will load images from a remote Docker registry into an
... ...
@@ -81,16 +81,16 @@ func NewImageStreamImporter(retriever RepositoryRetriever, maximumTagsPerRepo in
81 81
 		retriever: retriever,
82 82
 		limiter:   limiter,
83 83
 
84
-		imageCache: make(map[gocontext.Context]map[manifestKey]*api.Image),
84
+		digestToRepositoryCache: make(map[gocontext.Context]map[manifestKey]*api.Image),
85 85
 	}
86 86
 }
87 87
 
88 88
 // contextImageCache returns the image cache entry for a context.
89 89
 func (i *ImageStreamImporter) contextImageCache(ctx gocontext.Context) map[manifestKey]*api.Image {
90
-	cache := i.imageCache[ctx]
90
+	cache := i.digestToRepositoryCache[ctx]
91 91
 	if cache == nil {
92 92
 		cache = make(map[manifestKey]*api.Image)
93
-		i.imageCache[ctx] = cache
93
+		i.digestToRepositoryCache[ctx] = cache
94 94
 	}
95 95
 	return cache
96 96
 }
... ...
@@ -154,6 +154,42 @@ const etcdManifest = `
154 154
    ]
155 155
 }`
156 156
 
157
+const etcdManifestNoSignature = `
158
+{
159
+   "schemaVersion": 1, 
160
+   "tag": "latest", 
161
+   "name": "coreos/etcd", 
162
+   "architecture": "amd64", 
163
+   "fsLayers": [
164
+      {
165
+         "blobSum": "sha256:a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4"
166
+      }, 
167
+      {
168
+         "blobSum": "sha256:a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4"
169
+      }, 
170
+      {
171
+         "blobSum": "sha256:2560187847cadddef806eaf244b7755af247a9dbabb90ca953dd2703cf423766"
172
+      }, 
173
+      {
174
+         "blobSum": "sha256:744b46d0ac8636c45870a03830d8d82c20b75fbfb9bc937d5e61005d23ad4cfe"
175
+      }
176
+   ], 
177
+   "history": [
178
+      {
179
+         "v1Compatibility": "{\"id\":\"fe50ac14986497fa6b5d2cc24feb4a561d01767bc64413752c0988cb70b0b8b9\",\"parent\":\"a5a18474fa96a3c6e240bc88e41de2afd236520caf904356ad9d5f8d875c3481\",\"created\":\"2015-12-30T22:29:13.967754365Z\",\"container\":\"c8d0f1a274b5f52fa5beb280775ef07cf18ec0f95e5ae42fbad01157e2614d42\",\"container_config\":{\"Hostname\":\"1b97abade59e\",\"Domainname\":\"\",\"User\":\"\",\"AttachStdin\":false,\"AttachStdout\":false,\"AttachStderr\":false,\"ExposedPorts\":{\"2379/tcp\":{},\"2380/tcp\":{},\"4001/tcp\":{},\"7001/tcp\":{}},\"Tty\":false,\"OpenStdin\":false,\"StdinOnce\":false,\"Env\":null,\"Cmd\":[\"/bin/sh\",\"-c\",\"#(nop) ENTRYPOINT \\u0026{[\\\"/etcd\\\"]}\"],\"Image\":\"a5a18474fa96a3c6e240bc88e41de2afd236520caf904356ad9d5f8d875c3481\",\"Volumes\":null,\"WorkingDir\":\"\",\"Entrypoint\":[\"/etcd\"],\"OnBuild\":null,\"Labels\":{}},\"docker_version\":\"1.9.1\",\"config\":{\"Hostname\":\"1b97abade59e\",\"Domainname\":\"\",\"User\":\"\",\"AttachStdin\":false,\"AttachStdout\":false,\"AttachStderr\":false,\"ExposedPorts\":{\"2379/tcp\":{},\"2380/tcp\":{},\"4001/tcp\":{},\"7001/tcp\":{}},\"Tty\":false,\"OpenStdin\":false,\"StdinOnce\":false,\"Env\":null,\"Cmd\":null,\"Image\":\"a5a18474fa96a3c6e240bc88e41de2afd236520caf904356ad9d5f8d875c3481\",\"Volumes\":null,\"WorkingDir\":\"\",\"Entrypoint\":[\"/etcd\"],\"OnBuild\":null,\"Labels\":{}},\"architecture\":\"amd64\",\"os\":\"linux\"}"
180
+      }, 
181
+      {
182
+         "v1Compatibility": "{\"id\":\"a5a18474fa96a3c6e240bc88e41de2afd236520caf904356ad9d5f8d875c3481\",\"parent\":\"796d581500e960cc02095dcdeccf55db215b8e54c57e3a0b11392145ffe60cf6\",\"created\":\"2015-12-30T22:29:13.504159783Z\",\"container\":\"080708d544f85052a46fab72e701b4358c1b96cb4b805a5b2d66276fc2aaf85d\",\"container_config\":{\"Hostname\":\"1b97abade59e\",\"Domainname\":\"\",\"User\":\"\",\"AttachStdin\":false,\"AttachStdout\":false,\"AttachStderr\":false,\"ExposedPorts\":{\"2379/tcp\":{},\"2380/tcp\":{},\"4001/tcp\":{},\"7001/tcp\":{}},\"Tty\":false,\"OpenStdin\":false,\"StdinOnce\":false,\"Env\":null,\"Cmd\":[\"/bin/sh\",\"-c\",\"#(nop) EXPOSE 2379/tcp 2380/tcp 4001/tcp 7001/tcp\"],\"Image\":\"796d581500e960cc02095dcdeccf55db215b8e54c57e3a0b11392145ffe60cf6\",\"Volumes\":null,\"WorkingDir\":\"\",\"Entrypoint\":null,\"OnBuild\":null,\"Labels\":{}},\"docker_version\":\"1.9.1\",\"config\":{\"Hostname\":\"1b97abade59e\",\"Domainname\":\"\",\"User\":\"\",\"AttachStdin\":false,\"AttachStdout\":false,\"AttachStderr\":false,\"ExposedPorts\":{\"2379/tcp\":{},\"2380/tcp\":{},\"4001/tcp\":{},\"7001/tcp\":{}},\"Tty\":false,\"OpenStdin\":false,\"StdinOnce\":false,\"Env\":null,\"Cmd\":null,\"Image\":\"796d581500e960cc02095dcdeccf55db215b8e54c57e3a0b11392145ffe60cf6\",\"Volumes\":null,\"WorkingDir\":\"\",\"Entrypoint\":null,\"OnBuild\":null,\"Labels\":{}},\"architecture\":\"amd64\",\"os\":\"linux\"}"
183
+      }, 
184
+      {
185
+         "v1Compatibility": "{\"id\":\"796d581500e960cc02095dcdeccf55db215b8e54c57e3a0b11392145ffe60cf6\",\"parent\":\"309c960c7f875411ae2ee2bfb97b86eee5058f3dad77206dd0df4f97df8a77fa\",\"created\":\"2015-12-30T22:29:12.912813629Z\",\"container\":\"f28be899c9b8680d4cf8585e663ad20b35019db062526844e7cfef117ce9037f\",\"container_config\":{\"Hostname\":\"1b97abade59e\",\"Domainname\":\"\",\"User\":\"\",\"AttachStdin\":false,\"AttachStdout\":false,\"AttachStderr\":false,\"Tty\":false,\"OpenStdin\":false,\"StdinOnce\":false,\"Env\":null,\"Cmd\":[\"/bin/sh\",\"-c\",\"#(nop) ADD file:e330b1da49d993059975e46560b3bd360691498b0f2f6e00f39fc160cf8d4ec3 in /\"],\"Image\":\"309c960c7f875411ae2ee2bfb97b86eee5058f3dad77206dd0df4f97df8a77fa\",\"Volumes\":null,\"WorkingDir\":\"\",\"Entrypoint\":null,\"OnBuild\":null,\"Labels\":{}},\"docker_version\":\"1.9.1\",\"config\":{\"Hostname\":\"1b97abade59e\",\"Domainname\":\"\",\"User\":\"\",\"AttachStdin\":false,\"AttachStdout\":false,\"AttachStderr\":false,\"Tty\":false,\"OpenStdin\":false,\"StdinOnce\":false,\"Env\":null,\"Cmd\":null,\"Image\":\"309c960c7f875411ae2ee2bfb97b86eee5058f3dad77206dd0df4f97df8a77fa\",\"Volumes\":null,\"WorkingDir\":\"\",\"Entrypoint\":null,\"OnBuild\":null,\"Labels\":{}},\"architecture\":\"amd64\",\"os\":\"linux\",\"Size\":13502144}"
186
+      }, 
187
+      {
188
+         "v1Compatibility": "{\"id\":\"309c960c7f875411ae2ee2bfb97b86eee5058f3dad77206dd0df4f97df8a77fa\",\"created\":\"2015-12-30T22:29:12.346834862Z\",\"container\":\"1b97abade59e4b5b935aede236980a54fb500cd9ee5bd4323c832c6d7b3ffc6e\",\"container_config\":{\"Hostname\":\"1b97abade59e\",\"Domainname\":\"\",\"User\":\"\",\"AttachStdin\":false,\"AttachStdout\":false,\"AttachStderr\":false,\"Tty\":false,\"OpenStdin\":false,\"StdinOnce\":false,\"Env\":null,\"Cmd\":[\"/bin/sh\",\"-c\",\"#(nop) ADD file:74912593c6783292c4520514f5cc9313acbd1da0f46edee0fdbed2a24a264d6f in /\"],\"Image\":\"\",\"Volumes\":null,\"WorkingDir\":\"\",\"Entrypoint\":null,\"OnBuild\":null,\"Labels\":null},\"docker_version\":\"1.9.1\",\"config\":{\"Hostname\":\"1b97abade59e\",\"Domainname\":\"\",\"User\":\"\",\"AttachStdin\":false,\"AttachStdout\":false,\"AttachStderr\":false,\"Tty\":false,\"OpenStdin\":false,\"StdinOnce\":false,\"Env\":null,\"Cmd\":null,\"Image\":\"\",\"Volumes\":null,\"WorkingDir\":\"\",\"Entrypoint\":null,\"OnBuild\":null,\"Labels\":null},\"architecture\":\"amd64\",\"os\":\"linux\",\"Size\":15141568}"
189
+      }
190
+   ]
191
+}`
192
+
157 193
 func TestCreateSetsMetadata(t *testing.T) {
158 194
 	testCases := []struct {
159 195
 		image  *api.Image
... ...
@@ -293,7 +329,44 @@ func TestUpdateResetsMetadata(t *testing.T) {
293 293
 				DockerImageMetadata:  api.DockerImage{ID: "foo"},
294 294
 			},
295 295
 		},
296
-	}
296
+		// old manifest is replaced because the new manifest matches the digest
297
+		{
298
+			expect: func(image *api.Image) bool {
299
+				if image.DockerImageManifest != etcdManifest {
300
+					t.Errorf("unexpected manifest: %s", image.DockerImageManifest)
301
+					return false
302
+				}
303
+				if image.DockerImageMetadata.ID != "fe50ac14986497fa6b5d2cc24feb4a561d01767bc64413752c0988cb70b0b8b9" {
304
+					t.Errorf("unexpected docker image: %#v", image.DockerImageMetadata)
305
+					return false
306
+				}
307
+				if image.DockerImageReference != "openshift/ruby-19-centos-2" {
308
+					t.Errorf("image reference changed: %s", image.DockerImageReference)
309
+					return false
310
+				}
311
+				if image.DockerImageMetadata.Size != 28643712 {
312
+					t.Errorf("image had size %d", image.DockerImageMetadata.Size)
313
+					return false
314
+				}
315
+				if len(image.DockerImageLayers) != 4 || image.DockerImageLayers[0].Name != "sha256:744b46d0ac8636c45870a03830d8d82c20b75fbfb9bc937d5e61005d23ad4cfe" || image.DockerImageLayers[0].Size != 15141568 {
316
+					t.Errorf("unexpected layers: %#v", image.DockerImageLayers)
317
+					return false
318
+				}
319
+				return true
320
+			},
321
+			existing: &api.Image{
322
+				ObjectMeta:           kapi.ObjectMeta{Name: "sha256:54820434e2ccd1596892668504fef12ed980f0cc312f60eac93d6864445ba123", ResourceVersion: "1"},
323
+				DockerImageReference: "openshift/ruby-19-centos-2",
324
+				DockerImageLayers:    []api.ImageLayer{},
325
+				DockerImageManifest:  etcdManifestNoSignature,
326
+			},
327
+			image: &api.Image{
328
+				ObjectMeta:           kapi.ObjectMeta{Name: "sha256:54820434e2ccd1596892668504fef12ed980f0cc312f60eac93d6864445ba123", ResourceVersion: "1"},
329
+				DockerImageReference: "openshift/ruby-19-centos",
330
+				DockerImageMetadata:  api.DockerImage{ID: "foo"},
331
+				DockerImageManifest:  etcdManifest,
332
+			},
333
+		}}
297 334
 
298 335
 	for i, test := range testCases {
299 336
 		storage, server := newStorage(t)
... ...
@@ -60,7 +60,9 @@ func (imageStrategy) Canonicalize(obj runtime.Object) {
60 60
 }
61 61
 
62 62
 // PrepareForUpdate clears fields that are not allowed to be set by end users on update.
63
-// It extracts the latest info from the manifest and sets that on the object.
63
+// It extracts the latest info from the manifest and sets that on the object. It allows a user
64
+// to update the manifest so that it matches the digest (in case an older server stored a manifest
65
+// that was malformed, it can always be corrected).
64 66
 func (imageStrategy) PrepareForUpdate(obj, old runtime.Object) {
65 67
 	newImage := obj.(*api.Image)
66 68
 	oldImage := old.(*api.Image)
... ...
@@ -68,10 +70,21 @@ func (imageStrategy) PrepareForUpdate(obj, old runtime.Object) {
68 68
 	// image metadata cannot be altered
69 69
 	newImage.DockerImageReference = oldImage.DockerImageReference
70 70
 	newImage.DockerImageMetadata = oldImage.DockerImageMetadata
71
-	newImage.DockerImageManifest = oldImage.DockerImageManifest
72 71
 	newImage.DockerImageMetadataVersion = oldImage.DockerImageMetadataVersion
73 72
 	newImage.DockerImageLayers = oldImage.DockerImageLayers
74 73
 
74
+	// allow an image update that results in the manifest matching the digest (the name)
75
+	newManifest := newImage.DockerImageManifest
76
+	newImage.DockerImageManifest = oldImage.DockerImageManifest
77
+	if newManifest != oldImage.DockerImageManifest && len(newManifest) > 0 {
78
+		ok, err := api.ManifestMatchesImage(oldImage, []byte(newManifest))
79
+		if err != nil {
80
+			util.HandleError(fmt.Errorf("attempted to validate that a manifest change to %q matched the signature, but failed: %v", oldImage.Name, err))
81
+		} else if ok {
82
+			newImage.DockerImageManifest = newManifest
83
+		}
84
+	}
85
+
75 86
 	if err := api.ImageWithMetadata(newImage); err != nil {
76 87
 		util.HandleError(fmt.Errorf("Unable to update image metadata for %q: %v", newImage.Name, err))
77 88
 	}
... ...
@@ -106,7 +106,7 @@ oc login -u e2e-user -p pass
106 106
 # make sure viewers can see oc status
107 107
 oc status -n default
108 108
 
109
-# check to make sure a project admin can push an image
109
+# check to make sure a project admin can push an image to an image stream that doesn't exist
110 110
 oc project cache
111 111
 e2e_user_token=$(oc config view --flatten --minify -o template --template='{{with index .users 0}}{{.user.token}}{{end}}')
112 112
 [[ -n ${e2e_user_token} ]]
... ...
@@ -120,6 +120,10 @@ docker tag -f centos/ruby-22-centos7:latest ${DOCKER_REGISTRY}/cache/ruby-22-cen
120 120
 docker push ${DOCKER_REGISTRY}/cache/ruby-22-centos7:latest
121 121
 echo "[INFO] Pushed ruby-22-centos7"
122 122
 
123
+# verify remote images can be pulled directly from the local registry
124
+oc import-image --confirm --from=mysql:latest mysql:pullthrough
125
+docker pull ${DOCKER_REGISTRY}/cache/mysql:pullthrough
126
+
123 127
 # check to make sure an image-pusher can push an image
124 128
 oc policy add-role-to-user system:image-pusher pusher
125 129
 oc login -u pusher -p pass
... ...
@@ -689,6 +689,7 @@ items:
689 689
     resources:
690 690
     - imagestreamimages
691 691
     - imagestreams
692
+    - imagestreams/secrets
692 693
     - imagestreamtags
693 694
     verbs:
694 695
     - get