Browse code

Use global LRU cache for layer sizes

Signed-off-by: Gladkov Alexey <agladkov@redhat.com>

Gladkov Alexey authored on 2016/10/25 23:37:01
Showing 6 changed files
... ...
@@ -639,8 +639,10 @@ func (c *MasterConfig) GetRestStorage() map[string]rest.Storage {
639 639
 	imageStreamMappingStorage := imagestreammapping.NewREST(imageRegistry, imageStreamRegistry, c.RegistryNameFn)
640 640
 	imageStreamTagStorage := imagestreamtag.NewREST(imageRegistry, imageStreamRegistry)
641 641
 	imageStreamTagRegistry := imagestreamtag.NewRegistry(imageStreamTagStorage)
642
+	importerCache, err := imageimporter.NewImageStreamLayerCache(imageimporter.DefaultImageStreamLayerCacheSize)
643
+	checkStorageErr(err)
642 644
 	importerFn := func(r importer.RepositoryRetriever) imageimporter.Interface {
643
-		return imageimporter.NewImageStreamImporter(r, c.Options.ImagePolicyConfig.MaxImagesBulkImportedPerRepository, flowcontrol.NewTokenBucketRateLimiter(2.0, 3))
645
+		return imageimporter.NewImageStreamImporter(r, c.Options.ImagePolicyConfig.MaxImagesBulkImportedPerRepository, flowcontrol.NewTokenBucketRateLimiter(2.0, 3), &importerCache)
644 646
 	}
645 647
 	importerDockerClientFn := func() dockerregistry.Client {
646 648
 		return dockerregistry.NewClient(20*time.Second, false)
... ...
@@ -199,7 +199,7 @@ func TestDockerV1Fallback(t *testing.T) {
199 199
 	}
200 200
 
201 201
 	retriever := &mockRetriever{err: fmt.Errorf("does not support v2 API")}
202
-	im := NewImageStreamImporter(retriever, 5, nil)
202
+	im := NewImageStreamImporter(retriever, 5, nil, nil)
203 203
 	if err := im.Import(ctx, isi); err != nil {
204 204
 		t.Fatal(err)
205 205
 	}
... ...
@@ -52,15 +52,18 @@ type ImageStreamImporter struct {
52 52
 	digestToRepositoryCache map[gocontext.Context]map[manifestKey]*api.Image
53 53
 
54 54
 	// digestToLayerSizeCache maps layer digests to size.
55
-	digestToLayerSizeCache map[string]int64
55
+	digestToLayerSizeCache *ImageStreamLayerCache
56 56
 }
57 57
 
58 58
 // NewImageStreamImport creates an importer that will load images from a remote Docker registry into an
59 59
 // ImageStreamImport object. Limiter may be nil.
60
-func NewImageStreamImporter(retriever RepositoryRetriever, maximumTagsPerRepo int, limiter flowcontrol.RateLimiter) *ImageStreamImporter {
60
+func NewImageStreamImporter(retriever RepositoryRetriever, maximumTagsPerRepo int, limiter flowcontrol.RateLimiter, cache *ImageStreamLayerCache) *ImageStreamImporter {
61 61
 	if limiter == nil {
62 62
 		limiter = flowcontrol.NewFakeAlwaysRateLimiter()
63 63
 	}
64
+	if cache == nil {
65
+		glog.V(5).Infof("the global layer cache is disabled")
66
+	}
64 67
 	return &ImageStreamImporter{
65 68
 		maximumTagsPerRepo: maximumTagsPerRepo,
66 69
 
... ...
@@ -68,23 +71,25 @@ func NewImageStreamImporter(retriever RepositoryRetriever, maximumTagsPerRepo in
68 68
 		limiter:   limiter,
69 69
 
70 70
 		digestToRepositoryCache: make(map[gocontext.Context]map[manifestKey]*api.Image),
71
-		digestToLayerSizeCache:  make(map[string]int64),
71
+		digestToLayerSizeCache:  cache,
72 72
 	}
73 73
 }
74 74
 
75
-// contextImageCache initializes the image cache entry for a context and layer size cache.
76
-func (i *ImageStreamImporter) contextImageCache(ctx gocontext.Context) {
75
+// Import tries to complete the provided isi object with images loaded from remote registries.
76
+func (i *ImageStreamImporter) Import(ctx gocontext.Context, isi *api.ImageStreamImport) error {
77
+	// Initialize layer size cache if not given.
77 78
 	if i.digestToLayerSizeCache == nil {
78
-		i.digestToLayerSizeCache = make(map[string]int64)
79
+		cache, err := NewImageStreamLayerCache(DefaultImageStreamLayerCacheSize)
80
+		if err != nil {
81
+			return err
82
+		}
83
+		i.digestToLayerSizeCache = &cache
79 84
 	}
85
+	// Initialize the image cache entry for a context.
80 86
 	if _, ok := i.digestToRepositoryCache[ctx]; !ok {
81 87
 		i.digestToRepositoryCache[ctx] = make(map[manifestKey]*api.Image)
82 88
 	}
83
-}
84 89
 
85
-// Import tries to complete the provided isi object with images loaded from remote registries.
86
-func (i *ImageStreamImporter) Import(ctx gocontext.Context, isi *api.ImageStreamImport) error {
87
-	i.contextImageCache(ctx)
88 90
 	i.importImages(ctx, i.retriever, isi, i.limiter)
89 91
 	i.importFromRepository(ctx, i.retriever, isi, i.maximumTagsPerRepo, i.limiter)
90 92
 	return nil
... ...
@@ -320,8 +325,8 @@ func (isi *ImageStreamImporter) calculateImageSize(ctx gocontext.Context, repo d
320 320
 		}
321 321
 		blobSet.Insert(layer.Name)
322 322
 
323
-		if layerSize, ok := isi.digestToLayerSizeCache[layer.Name]; ok {
324
-			size += layerSize
323
+		if layerSize, ok := isi.digestToLayerSizeCache.Get(layer.Name); ok {
324
+			size += layerSize.(int64)
325 325
 			continue
326 326
 		}
327 327
 
... ...
@@ -330,7 +335,7 @@ func (isi *ImageStreamImporter) calculateImageSize(ctx gocontext.Context, repo d
330 330
 			return err
331 331
 		}
332 332
 
333
-		isi.digestToLayerSizeCache[layer.Name] = desc.Size
333
+		isi.digestToLayerSizeCache.Add(layer.Name, desc.Size)
334 334
 		layer.LayerSize = desc.Size
335 335
 		size += desc.Size
336 336
 	}
... ...
@@ -21,7 +21,7 @@ import (
21 21
 func TestImportNothing(t *testing.T) {
22 22
 	ctx := NewContext(http.DefaultTransport, http.DefaultTransport).WithCredentials(NoCredentials)
23 23
 	isi := &api.ImageStreamImport{}
24
-	i := NewImageStreamImporter(ctx, 5, nil)
24
+	i := NewImageStreamImporter(ctx, 5, nil, nil)
25 25
 	if err := i.Import(nil, isi); err != nil {
26 26
 		t.Fatal(err)
27 27
 	}
... ...
@@ -258,7 +258,7 @@ func TestImport(t *testing.T) {
258 258
 		},
259 259
 	}
260 260
 	for i, test := range testCases {
261
-		im := NewImageStreamImporter(test.retriever, 5, nil)
261
+		im := NewImageStreamImporter(test.retriever, 5, nil, nil)
262 262
 		if err := im.Import(nil, &test.isi); err != nil {
263 263
 			t.Errorf("%d: %v", i, err)
264 264
 		}
265 265
new file mode 100644
... ...
@@ -0,0 +1,24 @@
0
+package importer
1
+
2
+import (
3
+	"github.com/hashicorp/golang-lru"
4
+)
5
+
6
+const (
7
+	DefaultImageStreamLayerCacheSize = 2048
8
+)
9
+
10
+type ImageStreamLayerCache struct {
11
+	*lru.Cache
12
+}
13
+
14
+// ImageStreamLayerCache creates a new LRU cache of layer digests
15
+func NewImageStreamLayerCache(size int) (ImageStreamLayerCache, error) {
16
+	c, err := lru.New(size)
17
+	if err != nil {
18
+		return ImageStreamLayerCache{}, err
19
+	}
20
+	return ImageStreamLayerCache{
21
+		Cache: c,
22
+	}, nil
23
+}
... ...
@@ -21,6 +21,7 @@ import (
21 21
 	kerrors "k8s.io/kubernetes/pkg/util/errors"
22 22
 	"k8s.io/kubernetes/pkg/watch"
23 23
 
24
+	"github.com/openshift/origin/pkg/client"
24 25
 	"github.com/openshift/origin/pkg/dockerregistry"
25 26
 	"github.com/openshift/origin/pkg/image/api"
26 27
 	"github.com/openshift/origin/pkg/image/importer"
... ...
@@ -182,6 +183,43 @@ func mockRegistryHandler(t *testing.T, requireAuth bool, count *int) http.Handle
182 182
 	})
183 183
 }
184 184
 
185
+func testImageStreamImport(t *testing.T, c *client.Client, imageSize int64, imagestreamimport *api.ImageStreamImport) {
186
+	imageStreams := c.ImageStreams(testutil.Namespace())
187
+
188
+	isi, err := imageStreams.Import(imagestreamimport)
189
+	if err != nil {
190
+		t.Fatal(err)
191
+	}
192
+
193
+	if len(isi.Status.Images) != 1 {
194
+		t.Errorf("imported unexpected number of images (%d != 1)", len(isi.Status.Images))
195
+	}
196
+
197
+	for i, image := range isi.Status.Images {
198
+		if image.Status.Status != unversioned.StatusSuccess {
199
+			t.Errorf("unexpected status %d: %#v", i, image.Status)
200
+		}
201
+
202
+		if image.Image == nil {
203
+			t.Errorf("unexpected empty image %d", i)
204
+		}
205
+
206
+		// the image name is always the sha256, and size is calculated
207
+		if image.Image.Name != convertedDigest {
208
+			t.Errorf("unexpected image %d: %#v (expect %q)", i, image.Image.Name, convertedDigest)
209
+		}
210
+
211
+		// the image size is calculated
212
+		if image.Image.DockerImageMetadata.Size == 0 {
213
+			t.Errorf("unexpected image size %d: %#v", i, image.Image.DockerImageMetadata.Size)
214
+		}
215
+
216
+		if image.Image.DockerImageMetadata.Size != imageSize {
217
+			t.Errorf("unexpected image size %d: %#v (expect %d)", i, image.Image.DockerImageMetadata.Size, imageSize)
218
+		}
219
+	}
220
+}
221
+
185 222
 func testImageStreamImportWithPath(t *testing.T, reponame string) {
186 223
 	imageDigest := "sha256:815d06b56f4138afacd0009b8e3799fcdce79f0507bf8d0588e219b93ab6fd4d"
187 224
 	descriptors := map[string]int64{
... ...
@@ -266,9 +304,7 @@ func testImageStreamImportWithPath(t *testing.T, reponame string) {
266 266
 		t.Fatalf("unexpected error: %v", err)
267 267
 	}
268 268
 
269
-	imageStreams := c.ImageStreams(testutil.Namespace())
270
-
271
-	isi, err := imageStreams.Import(&api.ImageStreamImport{
269
+	testImageStreamImport(t, c, imageSize, &api.ImageStreamImport{
272 270
 		ObjectMeta: kapi.ObjectMeta{
273 271
 			Name: "test",
274 272
 		},
... ...
@@ -283,36 +319,31 @@ func testImageStreamImportWithPath(t *testing.T, reponame string) {
283 283
 			},
284 284
 		},
285 285
 	})
286
-	if err != nil {
287
-		t.Fatal(err)
288
-	}
289 286
 
290
-	if len(isi.Status.Images) != 1 {
291
-		t.Errorf("imported unexpected number of images (%d != 1)", len(isi.Status.Images))
287
+	if countStat != len(descriptors) {
288
+		t.Fatalf("unexpected number of blob stats %d (expected %d)", countStat, len(descriptors))
292 289
 	}
293 290
 
294
-	for i, image := range isi.Status.Images {
295
-		if image.Status.Status != unversioned.StatusSuccess {
296
-			t.Errorf("unexpected status %d: %#v", i, image.Status)
297
-		}
298
-
299
-		if image.Image == nil {
300
-			t.Errorf("unexpected empty image %d", i)
301
-		}
302
-
303
-		// the image name is always the sha256, and size is calculated
304
-		if image.Image.Name != convertedDigest {
305
-			t.Errorf("unexpected image %d: %#v (expect %q)", i, image.Image.Name, convertedDigest)
306
-		}
307
-
308
-		// the image size is calculated
309
-		if image.Image.DockerImageMetadata.Size == 0 {
310
-			t.Errorf("unexpected image size %d: %#v", i, image.Image.DockerImageMetadata.Size)
311
-		}
291
+	testImageStreamImport(t, c, imageSize, &api.ImageStreamImport{
292
+		ObjectMeta: kapi.ObjectMeta{
293
+			Name: "test1",
294
+		},
295
+		Spec: api.ImageStreamImportSpec{
296
+			Import: true,
297
+			Images: []api.ImageImportSpec{
298
+				{
299
+					From:         kapi.ObjectReference{Kind: "DockerImage", Name: url.Host + "/" + reponame + ":testtag"},
300
+					To:           &kapi.LocalObjectReference{Name: "other1"},
301
+					ImportPolicy: api.TagImportPolicy{Insecure: true},
302
+				},
303
+			},
304
+		},
305
+	})
312 306
 
313
-		if image.Image.DockerImageMetadata.Size != imageSize {
314
-			t.Errorf("unexpected image size %d: %#v (expect %d)", i, image.Image.DockerImageMetadata.Size, imageSize)
315
-		}
307
+	// Test that the global layer cache is working. The counter shouldn't change
308
+	// because all the information is available in the cache.
309
+	if countStat != len(descriptors) {
310
+		t.Fatalf("the global layer cache is not working: unexpected number of blob stats %d (expected %d)", countStat, len(descriptors))
316 311
 	}
317 312
 }
318 313
 
... ...
@@ -790,7 +821,7 @@ func TestImageStreamImportDockerHub(t *testing.T) {
790 790
 	}
791 791
 
792 792
 	err := retryWhenUnreachable(t, func() error {
793
-		i := importer.NewImageStreamImporter(importCtx, 3, nil)
793
+		i := importer.NewImageStreamImporter(importCtx, 3, nil, nil)
794 794
 		if err := i.Import(gocontext.Background(), imports); err != nil {
795 795
 			return err
796 796
 		}
... ...
@@ -846,7 +877,7 @@ func TestImageStreamImportQuayIO(t *testing.T) {
846 846
 	}
847 847
 
848 848
 	err := retryWhenUnreachable(t, func() error {
849
-		i := importer.NewImageStreamImporter(importCtx, 3, nil)
849
+		i := importer.NewImageStreamImporter(importCtx, 3, nil, nil)
850 850
 		if err := i.Import(gocontext.Background(), imports); err != nil {
851 851
 			return err
852 852
 		}
... ...
@@ -899,7 +930,7 @@ func TestImageStreamImportRedHatRegistry(t *testing.T) {
899 899
 		},
900 900
 	}
901 901
 
902
-	i := importer.NewImageStreamImporter(importCtx, 3, nil)
902
+	i := importer.NewImageStreamImporter(importCtx, 3, nil, nil)
903 903
 	if err := i.Import(gocontext.Background(), imports); err != nil {
904 904
 		t.Fatal(err)
905 905
 	}
... ...
@@ -926,7 +957,7 @@ func TestImageStreamImportRedHatRegistry(t *testing.T) {
926 926
 	context := gocontext.WithValue(gocontext.Background(), importer.ContextKeyV1RegistryClient, dockerregistry.NewClient(20*time.Second, false))
927 927
 	importCtx = importer.NewContext(rt, nil).WithCredentials(importer.NoCredentials)
928 928
 	err := retryWhenUnreachable(t, func() error {
929
-		i = importer.NewImageStreamImporter(importCtx, 3, nil)
929
+		i = importer.NewImageStreamImporter(importCtx, 3, nil, nil)
930 930
 		if err := i.Import(context, imports); err != nil {
931 931
 			return err
932 932
 		}