Browse code

Bug 1275537 - Fixed the way image import controller informs about errors from imports.

Maciej Szulik authored on 2015/10/31 04:56:45
Showing 2 changed files
... ...
@@ -9,7 +9,6 @@ import (
9 9
 	kapi "k8s.io/kubernetes/pkg/api"
10 10
 	"k8s.io/kubernetes/pkg/api/errors"
11 11
 	"k8s.io/kubernetes/pkg/api/unversioned"
12
-	"k8s.io/kubernetes/pkg/util"
13 12
 	kerrors "k8s.io/kubernetes/pkg/util/errors"
14 13
 	"k8s.io/kubernetes/pkg/util/sets"
15 14
 
... ...
@@ -55,11 +54,11 @@ const retryCount = 2
55 55
 // 3. image retrieving when error is different from RepositoryNotFound, RegistryNotFound or ImageNotFound
56 56
 // 4. ImageStreamMapping save error
57 57
 // 5. error when marking ImageStream as imported
58
-
59 58
 func (c *ImportController) Next(stream *api.ImageStream) error {
60 59
 	if !needsImport(stream) {
61 60
 		return nil
62 61
 	}
62
+	glog.V(4).Infof("Importing stream %s/%s...", stream.Namespace, stream.Name)
63 63
 
64 64
 	insecure := stream.Annotations[api.InsecureRepositoryAnnotation] == "true"
65 65
 	client := c.client
... ...
@@ -67,29 +66,38 @@ func (c *ImportController) Next(stream *api.ImageStream) error {
67 67
 		client = dockerregistry.NewClient()
68 68
 	}
69 69
 
70
-	toImport, err := getTags(stream, client, insecure)
70
+	var errlist []error
71
+	toImport, retry, err := getTags(stream, client, insecure)
71 72
 	// return here, only if there is an error and nothing to import
72 73
 	if err != nil && len(toImport) == 0 {
73
-		return err
74
+		if retry {
75
+			return err
76
+		}
77
+		return c.done(stream, err.Error(), retryCount)
74 78
 	}
75
-
76
-	errs := c.importTags(stream, toImport, client, insecure)
77
-	// one of retry-able error happened, we need to inform the RetryController
78
-	// the import should be retried by returning error
79
-	if len(errs) > 0 {
80
-		return kerrors.NewAggregate(errs)
79
+	if err != nil {
80
+		errlist = append(errlist, err)
81 81
 	}
82
+
83
+	retry, err = c.importTags(stream, toImport, client, insecure)
82 84
 	if err != nil {
83
-		return err
85
+		if retry {
86
+			return err
87
+		}
88
+		errlist = append(errlist, err)
89
+	}
90
+
91
+	if len(errlist) > 0 {
92
+		return c.done(stream, kerrors.NewAggregate(errlist).Error(), retryCount)
84 93
 	}
85 94
 
86 95
 	return c.done(stream, "", retryCount)
87 96
 }
88 97
 
89
-// getTags returns tags from default upstream image repository and explicitly defined.
90
-// Returns a map of tags to be imported and an error if one occurs.
98
+// getTags returns a map of tags to be imported, a flag saying if we should retry
99
+// imports, meaning not setting the import annotation and an error if one occurs.
91 100
 // Tags explicitly defined will overwrite those from default upstream image repository.
92
-func getTags(stream *api.ImageStream, client dockerregistry.Client, insecure bool) (map[string]api.DockerImageReference, error) {
101
+func getTags(stream *api.ImageStream, client dockerregistry.Client, insecure bool) (map[string]api.DockerImageReference, bool, error) {
93 102
 	imports := make(map[string]api.DockerImageReference)
94 103
 	references := sets.NewString()
95 104
 
... ...
@@ -111,27 +119,26 @@ func getTags(stream *api.ImageStream, client dockerregistry.Client, insecure boo
111 111
 	}
112 112
 
113 113
 	if len(stream.Spec.DockerImageRepository) == 0 {
114
-		return imports, nil
114
+		return imports, false, nil
115 115
 	}
116 116
 
117 117
 	// read tags from default upstream image repository
118 118
 	streamRef, err := api.ParseDockerImageReference(stream.Spec.DockerImageRepository)
119 119
 	if err != nil {
120
-		util.HandleError(fmt.Errorf("invalid docker image repository, cannot import data: %v", err))
121
-		return imports, nil
120
+		return imports, false, err
122 121
 	}
123 122
 	conn, err := client.Connect(streamRef.Registry, insecure)
124 123
 	if err != nil {
125 124
 		// retry-able error no. 1
126
-		return imports, err
125
+		return imports, true, err
127 126
 	}
128 127
 	tags, err := conn.ImageTags(streamRef.Namespace, streamRef.Name)
129 128
 	switch {
130 129
 	case dockerregistry.IsRepositoryNotFound(err), dockerregistry.IsRegistryNotFound(err):
131
-		return imports, nil
130
+		return imports, false, err
132 131
 	case err != nil:
133 132
 		// retry-able error no. 2
134
-		return imports, err
133
+		return imports, true, err
135 134
 	}
136 135
 	for tag, image := range tags {
137 136
 		if _, ok := imports[tag]; ok || references.Has(tag) {
... ...
@@ -156,17 +163,22 @@ func getTags(stream *api.ImageStream, client dockerregistry.Client, insecure boo
156 156
 		imports[tag] = ref
157 157
 	}
158 158
 
159
-	return imports, nil
159
+	return imports, false, nil
160 160
 }
161 161
 
162
-// importTags imports tags specified in a map from given ImageStream. Returns an error if one occurs.
163
-func (c *ImportController) importTags(stream *api.ImageStream, imports map[string]api.DockerImageReference, client dockerregistry.Client, insecure bool) []error {
162
+// importTags imports tags specified in a map from given ImageStream. Returns flag
163
+// saying if we should retry imports, meaning not setting the import annotation
164
+// and an error if one occurs.
165
+func (c *ImportController) importTags(stream *api.ImageStream, imports map[string]api.DockerImageReference, client dockerregistry.Client, insecure bool) (bool, error) {
164 166
 	retrieved := make(map[string]*dockerregistry.Image)
165 167
 	var errlist []error
168
+	shouldRetry := false
166 169
 	for tag, ref := range imports {
167
-		image, err := c.importTag(stream, tag, ref, retrieved[ref.ID], client, insecure)
170
+		image, retry, err := c.importTag(stream, tag, ref, retrieved[ref.ID], client, insecure)
168 171
 		if err != nil {
169
-			util.HandleError(err)
172
+			if retry {
173
+				shouldRetry = retry
174
+			}
170 175
 			errlist = append(errlist, err)
171 176
 			continue
172 177
 		}
... ...
@@ -175,16 +187,19 @@ func (c *ImportController) importTags(stream *api.ImageStream, imports map[strin
175 175
 			retrieved[ref.ID] = image
176 176
 		}
177 177
 	}
178
-	return errlist
178
+	return shouldRetry, kerrors.NewAggregate(errlist)
179 179
 }
180 180
 
181
-// importTag import single tag from given ImageStream. Returns an error if one occurs.
182
-func (c *ImportController) importTag(stream *api.ImageStream, tag string, ref api.DockerImageReference, dockerImage *dockerregistry.Image, client dockerregistry.Client, insecure bool) (*dockerregistry.Image, error) {
181
+// importTag import single tag from given ImageStream. Returns retrieved image (for later reuse),
182
+// a flag saying if we should retry imports and an error if one occurs.
183
+func (c *ImportController) importTag(stream *api.ImageStream, tag string, ref api.DockerImageReference, dockerImage *dockerregistry.Image, client dockerregistry.Client, insecure bool) (*dockerregistry.Image, bool, error) {
184
+	glog.V(5).Infof("Importing tag %s from %s/%s...", tag, stream.Namespace, stream.Name)
183 185
 	if dockerImage == nil {
184 186
 		// TODO insecure applies to the stream's spec.dockerImageRepository, not necessarily to an external one!
185 187
 		conn, err := client.Connect(ref.Registry, insecure)
186 188
 		if err != nil {
187
-			return nil, err
189
+			// retry-able error no. 3
190
+			return nil, true, err
188 191
 		}
189 192
 		if len(ref.ID) > 0 {
190 193
 			dockerImage, err = conn.ImageByID(ref.Namespace, ref.Name, ref.ID)
... ...
@@ -192,16 +207,16 @@ func (c *ImportController) importTag(stream *api.ImageStream, tag string, ref ap
192 192
 			dockerImage, err = conn.ImageByTag(ref.Namespace, ref.Name, ref.Tag)
193 193
 		}
194 194
 		switch {
195
-		case dockerregistry.IsRepositoryNotFound(err), dockerregistry.IsRegistryNotFound(err), dockerregistry.IsImageNotFound(err):
196
-			return nil, nil
195
+		case dockerregistry.IsRepositoryNotFound(err), dockerregistry.IsRegistryNotFound(err), dockerregistry.IsImageNotFound(err), dockerregistry.IsTagNotFound(err):
196
+			return nil, false, err
197 197
 		case err != nil:
198
-			// retry-able error no. 3
199
-			return nil, err
198
+			// retry-able error no. 4
199
+			return nil, true, err
200 200
 		}
201 201
 	}
202 202
 	var image api.DockerImage
203 203
 	if err := kapi.Scheme.Convert(&dockerImage.Image, &image); err != nil {
204
-		return nil, fmt.Errorf("could not convert image: %#v", err)
204
+		return nil, false, fmt.Errorf("could not convert image: %#v", err)
205 205
 	}
206 206
 
207 207
 	// prefer to pull by ID always
... ...
@@ -226,13 +241,13 @@ func (c *ImportController) importTag(stream *api.ImageStream, tag string, ref ap
226 226
 		},
227 227
 	}
228 228
 	if err := c.mappings.ImageStreamMappings(stream.Namespace).Create(mapping); err != nil {
229
-		// retry-able no. 4
230
-		return nil, err
229
+		// retry-able no. 5
230
+		return nil, true, err
231 231
 	}
232
-	return dockerImage, nil
232
+	return dockerImage, false, nil
233 233
 }
234 234
 
235
-// done marks the stream as being processed due to an error or failure condition
235
+// done marks the stream as being processed due to an error or failure condition.
236 236
 func (c *ImportController) done(stream *api.ImageStream, reason string, retry int) error {
237 237
 	if len(reason) == 0 {
238 238
 		reason = unversioned.Now().UTC().Format(time.RFC3339)
... ...
@@ -10,6 +10,8 @@ import (
10 10
 
11 11
 	kapi "k8s.io/kubernetes/pkg/api"
12 12
 	"k8s.io/kubernetes/pkg/api/unversioned"
13
+	kclient "k8s.io/kubernetes/pkg/client/unversioned/testclient"
14
+	"k8s.io/kubernetes/pkg/runtime"
13 15
 
14 16
 	client "github.com/openshift/origin/pkg/client/testclient"
15 17
 	"github.com/openshift/origin/pkg/dockerregistry"
... ...
@@ -28,8 +30,9 @@ type fakeDockerRegistryClient struct {
28 28
 	Namespace, Name, Tag, ID string
29 29
 	Insecure                 bool
30 30
 
31
-	Tags map[string]string
32
-	Err  error
31
+	Tags    map[string]string
32
+	Err     error
33
+	ConnErr error
33 34
 
34 35
 	Images []expectedImage
35 36
 }
... ...
@@ -37,7 +40,7 @@ type fakeDockerRegistryClient struct {
37 37
 func (f *fakeDockerRegistryClient) Connect(registry string, insecure bool) (dockerregistry.Connection, error) {
38 38
 	f.Registry = registry
39 39
 	f.Insecure = insecure
40
-	return f, nil
40
+	return f, f.ConnErr
41 41
 }
42 42
 
43 43
 func (f *fakeDockerRegistryClient) ImageTags(namespace, name string) (map[string]string, error) {
... ...
@@ -573,9 +576,9 @@ func TestControllerWithSpecTags(t *testing.T) {
573 573
 		"docker image": {
574 574
 			from: &kapi.ObjectReference{
575 575
 				Kind: "DockerImage",
576
-				Name: "some/repo",
576
+				Name: "some/repo:tagX",
577 577
 			},
578
-			expectUpdate: false,
578
+			expectUpdate: true,
579 579
 		},
580 580
 		"from image stream tag": {
581 581
 			from: &kapi.ObjectReference{
... ...
@@ -606,6 +609,15 @@ func TestControllerWithSpecTags(t *testing.T) {
606 606
 						},
607 607
 					},
608 608
 				},
609
+				{
610
+					Tag: "tagX",
611
+					Image: &dockerregistry.Image{
612
+						Image: docker.Image{
613
+							Comment: "foo",
614
+							Config:  &docker.Config{},
615
+						},
616
+					},
617
+				},
609 618
 			},
610 619
 		}, &client.Fake{}
611 620
 		c := ImportController{client: cli, streams: fake, mappings: fake}
... ...
@@ -624,7 +636,7 @@ func TestControllerWithSpecTags(t *testing.T) {
624 624
 			t.Errorf("%s: unexpected error: %v", name, err)
625 625
 		}
626 626
 		if !isRFC3339(stream.Annotations[api.DockerImageRepositoryCheckAnnotation]) {
627
-			t.Fatalf("%s: did not set annotation: %#v", name, stream)
627
+			t.Errorf("%s: did not set annotation: %#v", name, stream)
628 628
 		}
629 629
 		actions := fake.Actions()
630 630
 		if test.expectUpdate {
... ...
@@ -648,6 +660,156 @@ func TestControllerWithSpecTags(t *testing.T) {
648 648
 	}
649 649
 }
650 650
 
651
+func TestControllerReturnsErrForRetries(t *testing.T) {
652
+	expErr := fmt.Errorf("expected error")
653
+	osClient := &client.Fake{}
654
+	errISMClient := &client.Fake{}
655
+	errISMClient.PrependReactor("create", "imagestreammappings", func(action kclient.Action) (handled bool, ret runtime.Object, err error) {
656
+		return true, ret, expErr
657
+	})
658
+	tests := map[string]struct {
659
+		singleError bool
660
+		expActions  int
661
+		fakeClient  *client.Fake
662
+		fakeDocker  *fakeDockerRegistryClient
663
+		stream      *api.ImageStream
664
+	}{
665
+		"retry-able error no. 1": {
666
+			singleError: true,
667
+			expActions:  0,
668
+			fakeClient:  osClient,
669
+			fakeDocker: &fakeDockerRegistryClient{
670
+				ConnErr: expErr,
671
+			},
672
+			stream: &api.ImageStream{
673
+				ObjectMeta: kapi.ObjectMeta{Name: "test", Namespace: "other"},
674
+				Spec: api.ImageStreamSpec{
675
+					DockerImageRepository: "foo/bar",
676
+				},
677
+			},
678
+		},
679
+		"retry-able error no. 2": {
680
+			singleError: true,
681
+			expActions:  0,
682
+			fakeClient:  osClient,
683
+			fakeDocker: &fakeDockerRegistryClient{
684
+				Err: expErr,
685
+			},
686
+			stream: &api.ImageStream{
687
+				ObjectMeta: kapi.ObjectMeta{Name: "test", Namespace: "other"},
688
+				Spec: api.ImageStreamSpec{
689
+					DockerImageRepository: "foo/bar",
690
+				},
691
+			},
692
+		},
693
+		"retry-able error no. 3": {
694
+			singleError: false,
695
+			expActions:  0,
696
+			fakeClient:  osClient,
697
+			fakeDocker: &fakeDockerRegistryClient{
698
+				ConnErr: expErr,
699
+			},
700
+			stream: &api.ImageStream{
701
+				ObjectMeta: kapi.ObjectMeta{Name: "test", Namespace: "other"},
702
+				Spec: api.ImageStreamSpec{
703
+					Tags: map[string]api.TagReference{
704
+						api.DefaultImageTag: {
705
+							From: &kapi.ObjectReference{
706
+								Kind: "DockerImage",
707
+								Name: "foo/bar",
708
+							},
709
+						},
710
+					},
711
+				},
712
+			},
713
+		},
714
+		"retry-able error no. 4": {
715
+			singleError: false,
716
+			expActions:  0,
717
+			fakeClient:  osClient,
718
+			fakeDocker: &fakeDockerRegistryClient{
719
+				Images: []expectedImage{
720
+					{
721
+						Tag: api.DefaultImageTag,
722
+						Image: &dockerregistry.Image{
723
+							Image: docker.Image{
724
+								Comment: "foo",
725
+								Config:  &docker.Config{},
726
+							},
727
+						},
728
+						Err: expErr,
729
+					},
730
+				},
731
+			},
732
+			stream: &api.ImageStream{
733
+				ObjectMeta: kapi.ObjectMeta{Name: "test", Namespace: "other"},
734
+				Spec: api.ImageStreamSpec{
735
+					Tags: map[string]api.TagReference{
736
+						api.DefaultImageTag: {
737
+							From: &kapi.ObjectReference{
738
+								Kind: "DockerImage",
739
+								Name: "foo/bar",
740
+							},
741
+						},
742
+					},
743
+				},
744
+			},
745
+		},
746
+		"retry-able error no. 5": {
747
+			singleError: false,
748
+			expActions:  1,
749
+			fakeClient:  errISMClient,
750
+			fakeDocker: &fakeDockerRegistryClient{
751
+				Images: []expectedImage{
752
+					{
753
+						Tag: api.DefaultImageTag,
754
+						Image: &dockerregistry.Image{
755
+							Image: docker.Image{
756
+								Comment: "foo",
757
+								Config:  &docker.Config{},
758
+							},
759
+						},
760
+					},
761
+				},
762
+			},
763
+			stream: &api.ImageStream{
764
+				ObjectMeta: kapi.ObjectMeta{Name: "test", Namespace: "other"},
765
+				Spec: api.ImageStreamSpec{
766
+					Tags: map[string]api.TagReference{
767
+						api.DefaultImageTag: {
768
+							From: &kapi.ObjectReference{
769
+								Kind: "DockerImage",
770
+								Name: "foo/bar",
771
+							},
772
+						},
773
+					},
774
+				},
775
+			},
776
+		},
777
+	}
778
+
779
+	for name, test := range tests {
780
+		c := ImportController{client: test.fakeDocker, streams: test.fakeClient, mappings: test.fakeClient}
781
+
782
+		err := c.Next(test.stream)
783
+		if err == nil {
784
+			t.Errorf("%s: unexpected error: %v", name, err)
785
+		}
786
+		// The first condition checks error from the getTags method only,
787
+		// iow. where the error returned is the exact error that happened.
788
+		// The second condition checks error from the importTags method only,
789
+		// iow. where the error is an aggregate.
790
+		if test.singleError && err != expErr {
791
+			t.Errorf("%s: unexpected error from getTags: %v", name, err)
792
+		} else if !test.singleError && !strings.Contains(err.Error(), expErr.Error()) {
793
+			t.Errorf("%s: unexpected error from importTags: %v", name, err)
794
+		}
795
+		if len(test.fakeClient.Actions()) != test.expActions {
796
+			t.Errorf("%s: expected no actions: %#v", name, test.fakeClient.Actions())
797
+		}
798
+	}
799
+}
800
+
651 801
 func isRFC3339(s string) bool {
652 802
 	_, err := time.Parse(time.RFC3339, s)
653 803
 	return err == nil