Browse code

Add an initial importer for Image Repositories

Image Repositories that have "dockerImageRepository" set will have
images imported if "tags" is empty or if there are "tags" that have
an empty string value. Once complete, or if sufficient failures
occur, the annotation "openshift.io/image.dockerRepositoryCheck"
will be set to a timestamp (time completed) or a string message.

Clayton Coleman authored on 2015/03/13 08:07:00
Showing 11 changed files
... ...
@@ -9,7 +9,6 @@
9 9
         "name": "ruby-20-centos7"
10 10
       },
11 11
       "tags": {
12
-        "latest": "latest"
13 12
       }
14 13
     },
15 14
     {
... ...
@@ -20,7 +19,6 @@
20 20
         "name": "nodejs-010-centos7"
21 21
       },
22 22
       "tags": {
23
-        "latest": "latest"
24 23
       }
25 24
     },
26 25
     {
... ...
@@ -31,7 +29,6 @@
31 31
         "name": "wildfly-8-centos"
32 32
       },
33 33
       "tags": {
34
-        "latest": "latest"
35 34
       }
36 35
     }
37 36
   ],
... ...
@@ -39,7 +39,8 @@
39 39
       "kind": "ImageRepository",
40 40
       "metadata": {
41 41
         "name": "ruby-20-centos7"
42
-      }
42
+      },
43
+      "dockerImageRepository": "openshift/ruby-20-centos7"
43 44
     },
44 45
     {
45 46
       "apiVersion": "v1beta1",
... ...
@@ -39,7 +39,8 @@
39 39
       "kind": "ImageRepository",
40 40
       "metadata": {
41 41
         "name": "ruby-20-centos7"
42
-      }
42
+      },
43
+      "dockerImageRepository": "openshift/ruby-20-centos7"
43 44
     },
44 45
     {
45 46
       "apiVersion": "v1beta1",
... ...
@@ -225,10 +225,9 @@ osc create -f examples/image-repositories/image-repositories.json
225 225
 [ -n "$(osc get imageRepositories wildfly-8-centos -t "{{.status.dockerImageRepository}}")" ]
226 226
 osc delete imageRepositories ruby-20-centos7
227 227
 osc delete imageRepositories nodejs-010-centos7
228
-osc delete imageRepositories wildfly-8-centos
229 228
 [ -z "$(osc get imageRepositories ruby-20-centos7 -t "{{.status.dockerImageRepository}}")" ]
230 229
 [ -z "$(osc get imageRepositories nodejs-010-centos7 -t "{{.status.dockerImageRepository}}")" ]
231
-[ -z "$(osc get imageRepositories wildfly-8-centos -t "{{.status.dockerImageRepository}}")" ]
230
+# don't delete wildfly-8-centos
232 231
 echo "imageRepositories: ok"
233 232
 
234 233
 osc create -f test/integration/fixtures/test-image-repository.json
... ...
@@ -324,4 +323,8 @@ openshift ex registry --create --credentials="${OPENSHIFTCONFIG}"
324 324
 [ "$(openshift ex registry | grep 'service exists')" ]
325 325
 echo "ex registry: ok"
326 326
 
327
+# verify the image repository had its tags populated
328
+[ -n "$(osc get imageRepositories wildfly-8-centos -t "{{.tags.latest}}")" ]
329
+[ -n "$(osc get imageRepositories wildfly-8-centos -t "{{ index .metadata.annotations \"openshift.io/image.dockerRepositoryCheck\"}}")" ]
330
+
327 331
 osc get minions,pods
... ...
@@ -265,7 +265,7 @@ osc create -n test -f "${STI_CONFIG_FILE}"
265 265
 
266 266
 # Trigger build
267 267
 echo "[INFO] Starting build from ${STI_CONFIG_FILE} and streaming its logs..."
268
-osc start-build -n test ruby-sample-build --follow
268
+#osc start-build -n test ruby-sample-build --follow
269 269
 wait_for_build "test"
270 270
 wait_for_app "test"
271 271
 
... ...
@@ -53,6 +53,7 @@ import (
53 53
 	deployetcd "github.com/openshift/origin/pkg/deploy/registry/etcd"
54 54
 	deployrollback "github.com/openshift/origin/pkg/deploy/rollback"
55 55
 	"github.com/openshift/origin/pkg/dns"
56
+	imagecontroller "github.com/openshift/origin/pkg/image/controller"
56 57
 	"github.com/openshift/origin/pkg/image/registry/image"
57 58
 	imageetcd "github.com/openshift/origin/pkg/image/registry/image/etcd"
58 59
 	"github.com/openshift/origin/pkg/image/registry/imagerepository"
... ...
@@ -159,6 +160,11 @@ func (c *MasterConfig) ImageChangeControllerClient() *osclient.Client {
159 159
 	return c.OSClient
160 160
 }
161 161
 
162
+// ImageImportControllerClient returns the deployment client object
163
+func (c *MasterConfig) ImageImportControllerClient() *osclient.Client {
164
+	return c.OSClient
165
+}
166
+
162 167
 // DeploymentControllerClients returns the deployment controller client object
163 168
 func (c *MasterConfig) DeploymentControllerClients() (*osclient.Client, *kclient.Client) {
164 169
 	return c.OSClient, c.KubernetesClient
... ...
@@ -774,10 +780,7 @@ func (c *MasterConfig) RouteAllocator() *routeallocationcontroller.RouteAllocati
774 774
 		KubeClient: c.KubeClient(),
775 775
 	}
776 776
 
777
-	subdomain := os.Getenv("OPENSHIFT_ROUTE_SUBDOMAIN")
778
-	if len(subdomain) == 0 {
779
-		subdomain = OpenShiftRouteSubdomain
780
-	}
777
+	subdomain := env("OPENSHIFT_ROUTE_SUBDOMAIN", OpenShiftRouteSubdomain)
781 778
 
782 779
 	plugin, err := routeplugin.NewSimpleAllocationPlugin(subdomain)
783 780
 	if err != nil {
... ...
@@ -787,6 +790,15 @@ func (c *MasterConfig) RouteAllocator() *routeallocationcontroller.RouteAllocati
787 787
 	return factory.Create(plugin)
788 788
 }
789 789
 
790
+func (c *MasterConfig) RunImageImportController() {
791
+	osclient := c.ImageImportControllerClient()
792
+	factory := imagecontroller.ImportControllerFactory{
793
+		Client: osclient,
794
+	}
795
+	controller := factory.Create()
796
+	controller.Run()
797
+}
798
+
790 799
 // ensureCORSAllowedOrigins takes a string list of origins and attempts to covert them to CORS origin
791 800
 // regexes, or exits if it cannot.
792 801
 func (c *MasterConfig) ensureCORSAllowedOrigins() []*regexp.Regexp {
... ...
@@ -343,6 +343,7 @@ func StartMaster(openshiftMasterConfig *configapi.MasterConfig) error {
343 343
 	openshiftConfig.RunDeploymentConfigController()
344 344
 	openshiftConfig.RunDeploymentConfigChangeController()
345 345
 	openshiftConfig.RunDeploymentImageChangeTriggerController()
346
+	openshiftConfig.RunImageImportController()
346 347
 	openshiftConfig.RunProjectAuthorizationCache()
347 348
 
348 349
 	return nil
... ...
@@ -18,6 +18,12 @@ type Client interface {
18 18
 
19 19
 // Connection allows you to retrieve data from a Docker V1 registry.
20 20
 type Connection interface {
21
+	// ImageTags will return a map of the tags for the image by namespace (if not
22
+	// specified, will be "library") and name.
23
+	ImageTags(namespace, name string) (map[string]string, error)
24
+	// ImageByID will return the requested image by namespace (if not specified,
25
+	// will be "library"), name, and ID.
26
+	ImageByID(namespace, name, id string) (*docker.Image, error)
21 27
 	// ImageByTag will return the requested image by namespace (if not specified,
22 28
 	// will be "library"), name, and tag (if not specified, "latest").
23 29
 	ImageByTag(namespace, name, tag string) (*docker.Image, error)
... ...
@@ -60,12 +66,14 @@ func convertConnectionError(registry string, err error) error {
60 60
 type connection struct {
61 61
 	client *http.Client
62 62
 	host   string
63
+	cached map[string]*repository
63 64
 }
64 65
 
65 66
 func newConnection(name string) connection {
66 67
 	return connection{
67 68
 		host:   name,
68 69
 		client: http.DefaultClient,
70
+		cached: make(map[string]*repository),
69 71
 	}
70 72
 }
71 73
 
... ...
@@ -75,6 +83,41 @@ type repository struct {
75 75
 	token    string
76 76
 }
77 77
 
78
+// ImageTags returns the tags for the named Docker image repository.
79
+func (c connection) ImageTags(namespace, name string) (map[string]string, error) {
80
+	if len(namespace) == 0 {
81
+		namespace = "library"
82
+	}
83
+	if len(name) == 0 {
84
+		return nil, fmt.Errorf("image name must be specified")
85
+	}
86
+
87
+	repo, err := c.getCachedRepository(fmt.Sprintf("%s/%s", namespace, name))
88
+	if err != nil {
89
+		return nil, err
90
+	}
91
+
92
+	return c.getTags(repo)
93
+}
94
+
95
+// ImageByID returns the specified image within the named Docker image repository
96
+func (c connection) ImageByID(namespace, name, imageID string) (*docker.Image, error) {
97
+	if len(namespace) == 0 {
98
+		namespace = "library"
99
+	}
100
+	if len(name) == 0 {
101
+		return nil, fmt.Errorf("image name must be specified")
102
+	}
103
+
104
+	repo, err := c.getCachedRepository(fmt.Sprintf("%s/%s", namespace, name))
105
+	if err != nil {
106
+		return nil, err
107
+	}
108
+
109
+	return c.getImage(repo, imageID, "")
110
+}
111
+
112
+// ImageByTag returns the specified image within the named Docker image repository
78 113
 func (c connection) ImageByTag(namespace, name, tag string) (*docker.Image, error) {
79 114
 	if len(namespace) == 0 {
80 115
 		namespace = "library"
... ...
@@ -87,7 +130,7 @@ func (c connection) ImageByTag(namespace, name, tag string) (*docker.Image, erro
87 87
 		searchTag = "latest"
88 88
 	}
89 89
 
90
-	repo, err := c.getRepository(fmt.Sprintf("%s/%s", namespace, name))
90
+	repo, err := c.getCachedRepository(fmt.Sprintf("%s/%s", namespace, name))
91 91
 	if err != nil {
92 92
 		return nil, err
93 93
 	}
... ...
@@ -100,6 +143,18 @@ func (c connection) ImageByTag(namespace, name, tag string) (*docker.Image, erro
100 100
 	return c.getImage(repo, imageID, tag)
101 101
 }
102 102
 
103
+func (c connection) getCachedRepository(name string) (*repository, error) {
104
+	if cached, ok := c.cached[name]; ok {
105
+		return cached, nil
106
+	}
107
+	repo, err := c.getRepository(name)
108
+	if err != nil {
109
+		return nil, err
110
+	}
111
+	c.cached[name] = repo
112
+	return repo, nil
113
+}
114
+
103 115
 func (c connection) getRepository(name string) (*repository, error) {
104 116
 	req, err := http.NewRequest("GET", fmt.Sprintf("https://%s/v1/repositories/%s/images", c.host, name), nil)
105 117
 	if err != nil {
... ...
@@ -123,6 +178,29 @@ func (c connection) getRepository(name string) (*repository, error) {
123 123
 	}, nil
124 124
 }
125 125
 
126
+func (c connection) getTags(repo *repository) (map[string]string, error) {
127
+	req, err := http.NewRequest("GET", fmt.Sprintf("https://%s/v1/repositories/%s/tags", repo.endpoint, repo.name), nil)
128
+	if err != nil {
129
+		return nil, fmt.Errorf("error creating request: %v", err)
130
+	}
131
+	req.Header.Add("Authorization", "Token "+repo.token)
132
+	resp, err := c.client.Do(req)
133
+	if err != nil {
134
+		return nil, convertConnectionError(c.host, fmt.Errorf("error getting image tags for %s: %v", repo.name, err))
135
+	}
136
+	switch code := resp.StatusCode; {
137
+	case code == http.StatusNotFound:
138
+		return nil, errRepositoryNotFound{repo.name}
139
+	case code >= 300 || resp.StatusCode < 200:
140
+		return nil, fmt.Errorf("error retrieving tags: server returned %d", resp.StatusCode)
141
+	}
142
+	tags := make(map[string]string)
143
+	if err := json.NewDecoder(resp.Body).Decode(&tags); err != nil {
144
+		return nil, fmt.Errorf("error decoding image %s tags: %v", repo.name, err)
145
+	}
146
+	return tags, nil
147
+}
148
+
126 149
 func (c connection) getTag(repo *repository, tag, userTag string) (string, error) {
127 150
 	req, err := http.NewRequest("GET", fmt.Sprintf("https://%s/v1/repositories/%s/tags/%s", repo.endpoint, repo.name, tag), nil)
128 151
 	if err != nil {
... ...
@@ -158,7 +236,7 @@ func (c connection) getImage(repo *repository, image, userTag string) (*docker.I
158 158
 	}
159 159
 	switch code := resp.StatusCode; {
160 160
 	case code == http.StatusNotFound:
161
-		return nil, errImageNotFound{userTag, image, repo.name}
161
+		return nil, NewImageNotFoundError(repo.name, image, userTag)
162 162
 	case code >= 300 || resp.StatusCode < 200:
163 163
 		return nil, fmt.Errorf("error retrieving image %s: server returned %d", req.URL, resp.StatusCode)
164 164
 	}
... ...
@@ -197,8 +275,15 @@ type errImageNotFound struct {
197 197
 	repository string
198 198
 }
199 199
 
200
+func NewImageNotFoundError(repository, image, tag string) error {
201
+	return errImageNotFound{tag, image, repository}
202
+}
203
+
200 204
 func (e errImageNotFound) Error() string {
201
-	return fmt.Sprintf("the image %q in repository %q with tag %q was not found and may have been deleted", e.tag, e.image, e.repository)
205
+	if len(e.tag) == 0 {
206
+		return fmt.Sprintf("the image %q in repository %q was not found and may have been deleted", e.image, e.repository)
207
+	}
208
+	return fmt.Sprintf("the image %q in repository %q with tag %q was not found and may have been deleted", e.image, e.repository, e.tag)
202 209
 }
203 210
 
204 211
 type errRegistryNotFound struct {
205 212
new file mode 100644
... ...
@@ -0,0 +1,147 @@
0
+package controller
1
+
2
+import (
3
+	"fmt"
4
+	"time"
5
+
6
+	kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
7
+	"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
8
+	"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
9
+
10
+	"github.com/openshift/origin/pkg/client"
11
+	"github.com/openshift/origin/pkg/dockerregistry"
12
+	"github.com/openshift/origin/pkg/image/api"
13
+)
14
+
15
+const dockerImageRepositoryCheckAnnotation = "openshift.io/image.dockerRepositoryCheck"
16
+
17
+type ImportController struct {
18
+	repositories client.ImageRepositoriesNamespacer
19
+	mappings     client.ImageRepositoryMappingsNamespacer
20
+	client       dockerregistry.Client
21
+}
22
+
23
+// Next processes the given image repository, looking for repos that have DockerImageRepository
24
+// set but have not yet been marked as "ready". If transient errors occur, err is returned but
25
+// the image repository is not modified (so it will be tried again later). If a permanent
26
+// failure occurs the image is marked with an annotation.
27
+func (c *ImportController) Next(repo *api.ImageRepository) error {
28
+	name := repo.DockerImageRepository
29
+	if len(name) == 0 {
30
+		return nil
31
+	}
32
+	if repo.Annotations == nil {
33
+		repo.Annotations = make(map[string]string)
34
+	}
35
+	if len(repo.Annotations[dockerImageRepositoryCheckAnnotation]) != 0 {
36
+		return nil
37
+	}
38
+
39
+	registry, namespace, name, _, err := api.SplitDockerPullSpec(name)
40
+	if err != nil {
41
+		err = fmt.Errorf("invalid docker image repository, cannot import data: %v", err)
42
+		util.HandleError(err)
43
+		return c.done(repo, err.Error())
44
+	}
45
+
46
+	conn, err := c.client.Connect(registry)
47
+	if err != nil {
48
+		return err
49
+	}
50
+	tags, err := conn.ImageTags(namespace, name)
51
+	switch {
52
+	case dockerregistry.IsRepositoryNotFound(err), dockerregistry.IsRegistryNotFound(err):
53
+		return c.done(repo, err.Error())
54
+	case err != nil:
55
+		return err
56
+	}
57
+
58
+	newTags := make(map[string]string, len(repo.Tags))
59
+	imageToTag := make(map[string][]string)
60
+	switch {
61
+	case len(repo.Tags) == 0:
62
+		// copy all tags
63
+		for tag, _ := range tags {
64
+			// TODO: once pull by image is implemented, use tag = imageid
65
+			newTags[tag] = tag
66
+		}
67
+		for tag, image := range tags {
68
+			imageToTag[image] = append(imageToTag[image], tag)
69
+		}
70
+	default:
71
+		for tag, v := range repo.Tags {
72
+			if len(v) != 0 {
73
+				newTags[tag] = v
74
+				continue
75
+			}
76
+			image, ok := tags[tag]
77
+			if !ok {
78
+				// tag not found, set empty
79
+				continue
80
+			}
81
+			imageToTag[image] = append(imageToTag[image], tag)
82
+			// TODO: once pull by image is implemented, use tag = imageid
83
+			newTags[tag] = tag
84
+		}
85
+	}
86
+
87
+	// whether we ignore or succeed, ensure the most recent mappings are recorded
88
+	repo.Tags = newTags
89
+
90
+	// nothing to tag - no images in the upstream repo, or we're in sync
91
+	if len(imageToTag) == 0 {
92
+		return c.done(repo, "")
93
+	}
94
+
95
+	for id, tags := range imageToTag {
96
+		dockerImage, err := conn.ImageByID(namespace, name, id)
97
+		switch {
98
+		case dockerregistry.IsRepositoryNotFound(err), dockerregistry.IsRegistryNotFound(err):
99
+			return c.done(repo, err.Error())
100
+		case dockerregistry.IsImageNotFound(err):
101
+			for _, tag := range tags {
102
+				delete(newTags, tag)
103
+			}
104
+			continue
105
+		case err != nil:
106
+			return err
107
+		}
108
+		var image api.DockerImage
109
+		if err := kapi.Scheme.Convert(dockerImage, &image); err != nil {
110
+			err = fmt.Errorf("could not convert image: %#v", err)
111
+			util.HandleError(err)
112
+			return c.done(repo, err.Error())
113
+		}
114
+		mapping := &api.ImageRepositoryMapping{
115
+			ObjectMeta: kapi.ObjectMeta{
116
+				Name:      repo.Name,
117
+				Namespace: repo.Namespace,
118
+			},
119
+			Tag: tags[0],
120
+			Image: api.Image{
121
+				DockerImageMetadata: image,
122
+			},
123
+		}
124
+		if err := c.mappings.ImageRepositoryMappings(repo.Namespace).Create(mapping); err != nil {
125
+			if errors.IsNotFound(err) {
126
+				return c.done(repo, err.Error())
127
+			}
128
+			return err
129
+		}
130
+	}
131
+
132
+	// we've completed our updates
133
+	return c.done(repo, "")
134
+}
135
+
136
+// ignore marks the repository as being processed due to an error or failure condition
137
+func (c *ImportController) done(repo *api.ImageRepository, reason string) error {
138
+	if len(reason) == 0 {
139
+		reason = util.Now().UTC().Format(time.RFC3339)
140
+	}
141
+	repo.Annotations[dockerImageRepositoryCheckAnnotation] = reason
142
+	if _, err := c.repositories.ImageRepositories(repo.Namespace).Update(repo); err != nil && !errors.IsNotFound(err) {
143
+		return err
144
+	}
145
+	return nil
146
+}
0 147
new file mode 100644
... ...
@@ -0,0 +1,257 @@
0
+package controller
1
+
2
+import (
3
+	"fmt"
4
+	"testing"
5
+	"time"
6
+
7
+	"github.com/fsouza/go-dockerclient"
8
+
9
+	kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
10
+
11
+	"github.com/openshift/origin/pkg/client"
12
+	"github.com/openshift/origin/pkg/dockerregistry"
13
+	"github.com/openshift/origin/pkg/image/api"
14
+)
15
+
16
+type expectedImage struct {
17
+	Tag   string
18
+	ID    string
19
+	Image *docker.Image
20
+	Err   error
21
+}
22
+
23
+type fakeDockerRegistryClient struct {
24
+	Registry                 string
25
+	Namespace, Name, Tag, ID string
26
+
27
+	Tags map[string]string
28
+	Err  error
29
+
30
+	Images []expectedImage
31
+}
32
+
33
+func (f *fakeDockerRegistryClient) Connect(registry string) (dockerregistry.Connection, error) {
34
+	f.Registry = registry
35
+	return f, nil
36
+}
37
+
38
+func (f *fakeDockerRegistryClient) ImageTags(namespace, name string) (map[string]string, error) {
39
+	f.Namespace, f.Name = namespace, name
40
+	return f.Tags, f.Err
41
+}
42
+
43
+func (f *fakeDockerRegistryClient) ImageByTag(namespace, name, tag string) (*docker.Image, error) {
44
+	if len(tag) == 0 {
45
+		tag = "latest"
46
+	}
47
+	f.Namespace, f.Name, f.Tag = namespace, name, tag
48
+	for _, t := range f.Images {
49
+		if t.Tag == tag {
50
+			return t.Image, t.Err
51
+		}
52
+	}
53
+	return nil, dockerregistry.NewImageNotFoundError(fmt.Sprintf("%s/%s", namespace, name), tag, tag)
54
+}
55
+
56
+func (f *fakeDockerRegistryClient) ImageByID(namespace, name, id string) (*docker.Image, error) {
57
+	f.Namespace, f.Name, f.ID = namespace, name, id
58
+	for _, t := range f.Images {
59
+		if t.ID == id {
60
+			return t.Image, t.Err
61
+		}
62
+	}
63
+	return nil, dockerregistry.NewImageNotFoundError(fmt.Sprintf("%s/%s", namespace, name), id, "")
64
+}
65
+
66
+func TestControllerNoDockerRepo(t *testing.T) {
67
+	cli, fake := &fakeDockerRegistryClient{}, &client.Fake{}
68
+	c := ImportController{client: cli, repositories: fake, mappings: fake}
69
+
70
+	repo := api.ImageRepository{
71
+		ObjectMeta: kapi.ObjectMeta{
72
+			Name:      "test",
73
+			Namespace: "other",
74
+		},
75
+	}
76
+	other := repo
77
+	if err := c.Next(&repo); err != nil {
78
+		t.Errorf("unexpected error: %v", err)
79
+	}
80
+	if !kapi.Semantic.DeepEqual(repo, other) {
81
+		t.Errorf("did not expect change to repo")
82
+	}
83
+}
84
+
85
+func TestControllerRepoHandled(t *testing.T) {
86
+	cli, fake := &fakeDockerRegistryClient{}, &client.Fake{}
87
+	c := ImportController{client: cli, repositories: fake, mappings: fake}
88
+
89
+	repo := api.ImageRepository{
90
+		ObjectMeta: kapi.ObjectMeta{
91
+			Name:      "test",
92
+			Namespace: "other",
93
+		},
94
+		DockerImageRepository: "foo/bar",
95
+	}
96
+	if err := c.Next(&repo); err != nil {
97
+		t.Errorf("unexpected error: %v", err)
98
+	}
99
+	if len(repo.Annotations["openshift.io/image.dockerRepositoryCheck"]) == 0 {
100
+		t.Errorf("did not set annotation: %#v", repo)
101
+	}
102
+	if len(fake.Actions) != 1 {
103
+		t.Error("expected an update action: %#v", fake.Actions)
104
+	}
105
+}
106
+
107
+func TestControllerTagRetrievalFails(t *testing.T) {
108
+	cli, fake := &fakeDockerRegistryClient{Err: fmt.Errorf("test error")}, &client.Fake{}
109
+	c := ImportController{client: cli, repositories: fake, mappings: fake}
110
+
111
+	repo := api.ImageRepository{
112
+		ObjectMeta:            kapi.ObjectMeta{Name: "test", Namespace: "other"},
113
+		DockerImageRepository: "foo/bar",
114
+	}
115
+	if err := c.Next(&repo); err != cli.Err {
116
+		t.Errorf("unexpected error: %v", err)
117
+	}
118
+	if len(repo.Annotations["openshift.io/image.dockerRepositoryCheck"]) != 0 {
119
+		t.Errorf("should not set annotation: %#v", repo)
120
+	}
121
+	if len(fake.Actions) != 0 {
122
+		t.Error("expected no actions on fake client")
123
+	}
124
+}
125
+
126
+func TestControllerRepoTagsAlreadySet(t *testing.T) {
127
+	cli, fake := &fakeDockerRegistryClient{}, &client.Fake{}
128
+	c := ImportController{client: cli, repositories: fake, mappings: fake}
129
+
130
+	repo := api.ImageRepository{
131
+		ObjectMeta:            kapi.ObjectMeta{Name: "test", Namespace: "other"},
132
+		DockerImageRepository: "foo/bar",
133
+		Tags: map[string]string{
134
+			"test": "value",
135
+		},
136
+	}
137
+	if err := c.Next(&repo); err != nil {
138
+		t.Errorf("unexpected error: %v", err)
139
+	}
140
+	if len(repo.Annotations["openshift.io/image.dockerRepositoryCheck"]) == 0 {
141
+		t.Errorf("did not set annotation: %#v", repo)
142
+	}
143
+	if len(fake.Actions) != 1 {
144
+		t.Error("expected an update action: %#v", fake.Actions)
145
+	}
146
+}
147
+
148
+func TestControllerImageNotFoundError(t *testing.T) {
149
+	cli, fake := &fakeDockerRegistryClient{Tags: map[string]string{"latest": "not_found"}}, &client.Fake{}
150
+	c := ImportController{client: cli, repositories: fake, mappings: fake}
151
+	repo := api.ImageRepository{
152
+		ObjectMeta:            kapi.ObjectMeta{Name: "test", Namespace: "other"},
153
+		DockerImageRepository: "foo/bar",
154
+	}
155
+	if err := c.Next(&repo); err != nil {
156
+		t.Errorf("unexpected error: %v", err)
157
+	}
158
+	if len(repo.Annotations["openshift.io/image.dockerRepositoryCheck"]) == 0 {
159
+		t.Errorf("did not set annotation: %#v", repo)
160
+	}
161
+	if len(fake.Actions) != 1 {
162
+		t.Error("expected an update action: %#v", fake.Actions)
163
+	}
164
+}
165
+
166
+func TestControllerImageWithGenericError(t *testing.T) {
167
+	cli, fake := &fakeDockerRegistryClient{
168
+		Tags: map[string]string{"latest": "found"},
169
+		Images: []expectedImage{
170
+			{
171
+				ID:  "found",
172
+				Err: fmt.Errorf("test error"),
173
+			},
174
+		},
175
+	}, &client.Fake{}
176
+	c := ImportController{client: cli, repositories: fake, mappings: fake}
177
+	repo := api.ImageRepository{
178
+		ObjectMeta:            kapi.ObjectMeta{Name: "test", Namespace: "other"},
179
+		DockerImageRepository: "foo/bar",
180
+	}
181
+	if err := c.Next(&repo); err != cli.Images[0].Err {
182
+		t.Fatalf("unexpected error: %v", err)
183
+	}
184
+	if len(repo.Annotations["openshift.io/image.dockerRepositoryCheck"]) != 0 {
185
+		t.Errorf("did not expect annotation: %#v", repo)
186
+	}
187
+	if len(fake.Actions) != 0 {
188
+		t.Error("expected no update action: %#v", fake.Actions)
189
+	}
190
+}
191
+
192
+func TestControllerWithImage(t *testing.T) {
193
+	cli, fake := &fakeDockerRegistryClient{
194
+		Tags: map[string]string{"latest": "found"},
195
+		Images: []expectedImage{
196
+			{
197
+				ID: "found",
198
+				Image: &docker.Image{
199
+					Comment: "foo",
200
+					Config:  &docker.Config{},
201
+				},
202
+			},
203
+		},
204
+	}, &client.Fake{}
205
+	c := ImportController{client: cli, repositories: fake, mappings: fake}
206
+	repo := api.ImageRepository{
207
+		ObjectMeta:            kapi.ObjectMeta{Name: "test", Namespace: "other"},
208
+		DockerImageRepository: "foo/bar",
209
+	}
210
+	if err := c.Next(&repo); err != nil {
211
+		t.Errorf("unexpected error: %v", err)
212
+	}
213
+	if !isRFC3339(repo.Annotations["openshift.io/image.dockerRepositoryCheck"]) {
214
+		t.Fatalf("did not set annotation: %#v", repo)
215
+	}
216
+	if len(fake.Actions) != 2 {
217
+		t.Error("expected an update action: %#v", fake.Actions)
218
+	}
219
+}
220
+
221
+func TestControllerWithEmptyTag(t *testing.T) {
222
+	cli, fake := &fakeDockerRegistryClient{
223
+		Tags: map[string]string{"latest": "found"},
224
+		Images: []expectedImage{
225
+			{
226
+				ID: "found",
227
+				Image: &docker.Image{
228
+					Comment: "foo",
229
+					Config:  &docker.Config{},
230
+				},
231
+			},
232
+		},
233
+	}, &client.Fake{}
234
+	c := ImportController{client: cli, repositories: fake, mappings: fake}
235
+	repo := api.ImageRepository{
236
+		ObjectMeta:            kapi.ObjectMeta{Name: "test", Namespace: "other"},
237
+		DockerImageRepository: "foo/bar",
238
+		Tags: map[string]string{
239
+			"latest": "",
240
+		},
241
+	}
242
+	if err := c.Next(&repo); err != nil {
243
+		t.Errorf("unexpected error: %v", err)
244
+	}
245
+	if !isRFC3339(repo.Annotations["openshift.io/image.dockerRepositoryCheck"]) {
246
+		t.Fatalf("did not set annotation: %#v", repo)
247
+	}
248
+	if len(fake.Actions) != 2 {
249
+		t.Error("expected an update action: %#v", fake.Actions)
250
+	}
251
+}
252
+
253
+func isRFC3339(s string) bool {
254
+	_, err := time.Parse(time.RFC3339, s)
255
+	return err == nil
256
+}
0 257
new file mode 100644
... ...
@@ -0,0 +1,56 @@
0
+package controller
1
+
2
+import (
3
+	kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
4
+	"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
5
+	"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
6
+	"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
7
+	"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
8
+	"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
9
+
10
+	"github.com/openshift/origin/pkg/client"
11
+	"github.com/openshift/origin/pkg/controller"
12
+	"github.com/openshift/origin/pkg/dockerregistry"
13
+	"github.com/openshift/origin/pkg/image/api"
14
+)
15
+
16
+// ImportControllerFactory can create an ImportController.
17
+type ImportControllerFactory struct {
18
+	Client client.Interface
19
+}
20
+
21
+// Create creates an ImportController.
22
+func (f *ImportControllerFactory) Create() controller.RunnableController {
23
+	lw := &cache.ListWatch{
24
+		ListFunc: func() (runtime.Object, error) {
25
+			return f.Client.ImageRepositories(kapi.NamespaceAll).List(labels.Everything(), labels.Everything())
26
+		},
27
+		WatchFunc: func(resourceVersion string) (watch.Interface, error) {
28
+			return f.Client.ImageRepositories(kapi.NamespaceAll).Watch(labels.Everything(), labels.Everything(), resourceVersion)
29
+		},
30
+	}
31
+	q := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
32
+	cache.NewReflector(lw, &api.ImageRepository{}, q).Run()
33
+
34
+	c := &ImportController{
35
+		client:       dockerregistry.NewClient(),
36
+		repositories: f.Client,
37
+		mappings:     f.Client,
38
+	}
39
+
40
+	return &controller.RetryController{
41
+		Queue: q,
42
+		RetryManager: controller.NewQueueRetryManager(
43
+			q,
44
+			cache.MetaNamespaceKeyFunc,
45
+			func(obj interface{}, err error, _ int) bool {
46
+				util.HandleError(err)
47
+				return true
48
+			},
49
+		),
50
+		Handle: func(obj interface{}) error {
51
+			r := obj.(*api.ImageRepository)
52
+			return c.Next(r)
53
+		},
54
+	}
55
+}