Browse code

allow parallel image streams

deads2k authored on 2015/12/19 01:26:00
Showing 4 changed files
... ...
@@ -20,6 +20,7 @@ import (
20 20
 	buildclient "github.com/openshift/origin/pkg/build/client"
21 21
 	buildcontrollerfactory "github.com/openshift/origin/pkg/build/controller/factory"
22 22
 	buildstrategy "github.com/openshift/origin/pkg/build/controller/strategy"
23
+	"github.com/openshift/origin/pkg/client"
23 24
 	cmdutil "github.com/openshift/origin/pkg/cmd/util"
24 25
 	"github.com/openshift/origin/pkg/cmd/util/clientcmd"
25 26
 	configchangecontroller "github.com/openshift/origin/pkg/deploy/controller/configchange"
... ...
@@ -345,10 +346,7 @@ func (c *MasterConfig) RunSDNController() {
345 345
 // RunImageImportController starts the image import trigger controller process.
346 346
 func (c *MasterConfig) RunImageImportController() {
347 347
 	osclient := c.ImageImportControllerClient()
348
-	factory := imagecontroller.ImportControllerFactory{
349
-		Client: osclient,
350
-	}
351
-	controller := factory.Create()
348
+	controller := imagecontroller.NewImportController(client.ImageStreamsNamespacer(osclient), client.ImageStreamMappingsNamespacer(osclient), 10, 2*time.Minute)
352 349
 	controller.Run()
353 350
 }
354 351
 
... ...
@@ -2,15 +2,24 @@ package controller
2 2
 
3 3
 import (
4 4
 	"fmt"
5
+	"sync"
5 6
 	"time"
6 7
 
7 8
 	"github.com/golang/glog"
8 9
 
9 10
 	kapi "k8s.io/kubernetes/pkg/api"
10
-	"k8s.io/kubernetes/pkg/api/errors"
11
+	kapierrors "k8s.io/kubernetes/pkg/api/errors"
11 12
 	"k8s.io/kubernetes/pkg/api/unversioned"
13
+	"k8s.io/kubernetes/pkg/client/cache"
14
+	kclient "k8s.io/kubernetes/pkg/client/unversioned"
15
+	"k8s.io/kubernetes/pkg/controller/framework"
16
+	"k8s.io/kubernetes/pkg/fields"
17
+	"k8s.io/kubernetes/pkg/labels"
18
+	"k8s.io/kubernetes/pkg/runtime"
19
+	"k8s.io/kubernetes/pkg/util"
12 20
 	kerrors "k8s.io/kubernetes/pkg/util/errors"
13 21
 	"k8s.io/kubernetes/pkg/util/sets"
22
+	"k8s.io/kubernetes/pkg/watch"
14 23
 
15 24
 	"github.com/openshift/origin/pkg/client"
16 25
 	"github.com/openshift/origin/pkg/dockerregistry"
... ...
@@ -22,6 +31,91 @@ type ImportController struct {
22 22
 	mappings client.ImageStreamMappingsNamespacer
23 23
 	// injected for testing
24 24
 	client dockerregistry.Client
25
+
26
+	stopChan chan struct{}
27
+
28
+	imageStreamController *framework.Controller
29
+
30
+	work           chan *api.ImageStream
31
+	workingSet     sets.String
32
+	workingSetLock sync.Mutex
33
+
34
+	// this should not be larger the capacity of the work channel
35
+	numParallelImports int
36
+}
37
+
38
+func NewImportController(isNamespacer client.ImageStreamsNamespacer, ismNamespacer client.ImageStreamMappingsNamespacer, parallelImports int, resyncInterval time.Duration) *ImportController {
39
+	c := &ImportController{
40
+		streams:  isNamespacer,
41
+		mappings: ismNamespacer,
42
+
43
+		numParallelImports: parallelImports,
44
+		work:               make(chan *api.ImageStream, 20*parallelImports),
45
+		workingSet:         sets.String{},
46
+	}
47
+
48
+	_, c.imageStreamController = framework.NewInformer(
49
+		&cache.ListWatch{
50
+			ListFunc: func() (runtime.Object, error) {
51
+				return c.streams.ImageStreams(kapi.NamespaceAll).List(labels.Everything(), fields.Everything())
52
+			},
53
+			WatchFunc: func(resourceVersion string) (watch.Interface, error) {
54
+				return c.streams.ImageStreams(kapi.NamespaceAll).Watch(labels.Everything(), fields.Everything(), resourceVersion)
55
+			},
56
+		},
57
+		&api.ImageStream{},
58
+		resyncInterval,
59
+		framework.ResourceEventHandlerFuncs{
60
+			AddFunc:    c.imageStreamAdded,
61
+			UpdateFunc: c.imageStreamUpdated,
62
+		},
63
+	)
64
+
65
+	return c
66
+}
67
+
68
+// Runs controller loops and returns immediately
69
+func (c *ImportController) Run() {
70
+	if c.stopChan == nil {
71
+		c.stopChan = make(chan struct{})
72
+		go c.imageStreamController.Run(c.stopChan)
73
+
74
+		for i := 0; i < c.numParallelImports; i++ {
75
+			go util.Until(c.handleImport, time.Second, c.stopChan)
76
+		}
77
+	}
78
+}
79
+
80
+// Stop gracefully shuts down this controller
81
+func (c *ImportController) Stop() {
82
+	if c.stopChan != nil {
83
+		close(c.stopChan)
84
+		c.stopChan = nil
85
+	}
86
+}
87
+
88
+func (c *ImportController) imageStreamAdded(obj interface{}) {
89
+	imageStream := obj.(*api.ImageStream)
90
+	if needsImport(imageStream) {
91
+		glog.V(5).Infof("trying to add %s to the worklist", workingSetKey(imageStream))
92
+		c.work <- imageStream
93
+		glog.V(3).Infof("added %s to the worklist", workingSetKey(imageStream))
94
+
95
+	} else {
96
+		glog.V(5).Infof("not adding %s to the worklist", workingSetKey(imageStream))
97
+	}
98
+}
99
+
100
+func (c *ImportController) imageStreamUpdated(oldObj interface{}, newObj interface{}) {
101
+	newImageStream := newObj.(*api.ImageStream)
102
+	if needsImport(newImageStream) {
103
+		glog.V(5).Infof("trying to add %s to the worklist", workingSetKey(newImageStream))
104
+		c.work <- newImageStream
105
+		glog.V(3).Infof("added %s to the worklist", workingSetKey(newImageStream))
106
+
107
+	} else {
108
+		glog.V(5).Infof("not adding %s to the worklist", workingSetKey(newImageStream))
109
+	}
25 110
 }
26 111
 
27 112
 // needsImport returns true if the provided image stream should have its tags imported.
... ...
@@ -29,8 +123,81 @@ func needsImport(stream *api.ImageStream) bool {
29 29
 	return stream.Annotations == nil || len(stream.Annotations[api.DockerImageRepositoryCheckAnnotation]) == 0
30 30
 }
31 31
 
32
-// retryCount is the number of times to retry on a conflict when updating an image stream
33
-const retryCount = 2
32
+func (c *ImportController) handleImport() {
33
+	for {
34
+		select {
35
+		case <-c.stopChan:
36
+			return
37
+
38
+		case staleImageStream := <-c.work:
39
+			glog.V(1).Infof("popped %s from the worklist", workingSetKey(staleImageStream))
40
+
41
+			c.importImageStream(staleImageStream)
42
+		}
43
+	}
44
+}
45
+
46
+func (c *ImportController) importImageStream(staleImageStream *api.ImageStream) {
47
+	// if we're already in the workingset, that means that some thread is already trying to do an import for this.
48
+	// This does NOT mean that we shouldn't attempt to do this work, only that we shouldn't attempt to do it now.
49
+	if !c.addToWorkingSet(staleImageStream) {
50
+		// If there isn't any other work in the queue, wait for a while so that we don't hot loop.
51
+		// Then requeue to the end of the channel.  That allows other work to continue without delay
52
+		if len(c.work) == 0 {
53
+			time.Sleep(100 * time.Millisecond)
54
+		}
55
+		glog.V(5).Infof("requeuing %s to the worklist", workingSetKey(staleImageStream))
56
+		c.work <- staleImageStream
57
+
58
+		return
59
+	}
60
+	defer c.removeFromWorkingSet(staleImageStream)
61
+
62
+	err := kclient.RetryOnConflict(kclient.DefaultBackoff, func() error {
63
+		liveImageStream, err := c.streams.ImageStreams(staleImageStream.Namespace).Get(staleImageStream.Name)
64
+		// no work to do here
65
+		if kapierrors.IsNotFound(err) {
66
+			return nil
67
+		}
68
+		if err != nil {
69
+			return err
70
+		}
71
+		if !needsImport(liveImageStream) {
72
+			return nil
73
+		}
74
+
75
+		// if we're notified, do work and then start waiting again.
76
+		return c.Next(liveImageStream)
77
+	})
78
+
79
+	if err != nil {
80
+		util.HandleError(err)
81
+	}
82
+}
83
+
84
+func workingSetKey(imageStream *api.ImageStream) string {
85
+	return imageStream.Namespace + "/" + imageStream.Name
86
+}
87
+
88
+// addToWorkingSet returns true if the image stream was added, false if it was
89
+// already present
90
+func (c *ImportController) addToWorkingSet(imageStream *api.ImageStream) bool {
91
+	c.workingSetLock.Lock()
92
+	defer c.workingSetLock.Unlock()
93
+
94
+	if c.workingSet.Has(workingSetKey(imageStream)) {
95
+		return false
96
+	}
97
+
98
+	c.workingSet.Insert(workingSetKey(imageStream))
99
+	return true
100
+}
101
+
102
+func (c *ImportController) removeFromWorkingSet(imageStream *api.ImageStream) {
103
+	c.workingSetLock.Lock()
104
+	defer c.workingSetLock.Unlock()
105
+	c.workingSet.Delete(workingSetKey(imageStream))
106
+}
34 107
 
35 108
 // Next processes the given image stream, looking for streams that have DockerImageRepository
36 109
 // set but have not yet been marked as "ready". If transient errors occur, err is returned but
... ...
@@ -55,9 +222,6 @@ const retryCount = 2
55 55
 // 4. ImageStreamMapping save error
56 56
 // 5. error when marking ImageStream as imported
57 57
 func (c *ImportController) Next(stream *api.ImageStream) error {
58
-	if !needsImport(stream) {
59
-		return nil
60
-	}
61 58
 	glog.V(4).Infof("Importing stream %s/%s...", stream.Namespace, stream.Name)
62 59
 
63 60
 	insecure := stream.Annotations[api.InsecureRepositoryAnnotation] == "true"
... ...
@@ -73,7 +237,7 @@ func (c *ImportController) Next(stream *api.ImageStream) error {
73 73
 		if retry {
74 74
 			return err
75 75
 		}
76
-		return c.done(stream, err.Error(), retryCount)
76
+		return c.done(stream, err.Error())
77 77
 	}
78 78
 	if err != nil {
79 79
 		errlist = append(errlist, err)
... ...
@@ -88,10 +252,10 @@ func (c *ImportController) Next(stream *api.ImageStream) error {
88 88
 	}
89 89
 
90 90
 	if len(errlist) > 0 {
91
-		return c.done(stream, kerrors.NewAggregate(errlist).Error(), retryCount)
91
+		return c.done(stream, kerrors.NewAggregate(errlist).Error())
92 92
 	}
93 93
 
94
-	return c.done(stream, "", retryCount)
94
+	return c.done(stream, "")
95 95
 }
96 96
 
97 97
 // getTags returns a map of tags to be imported, a flag saying if we should retry
... ...
@@ -254,7 +418,7 @@ func (c *ImportController) importTag(stream *api.ImageStream, tag string, ref ap
254 254
 }
255 255
 
256 256
 // done marks the stream as being processed due to an error or failure condition.
257
-func (c *ImportController) done(stream *api.ImageStream, reason string, retry int) error {
257
+func (c *ImportController) done(stream *api.ImageStream, reason string) error {
258 258
 	if len(reason) == 0 {
259 259
 		reason = unversioned.Now().UTC().Format(time.RFC3339)
260 260
 	} else if len(reason) > 300 {
... ...
@@ -265,12 +429,7 @@ func (c *ImportController) done(stream *api.ImageStream, reason string, retry in
265 265
 		stream.Annotations = make(map[string]string)
266 266
 	}
267 267
 	stream.Annotations[api.DockerImageRepositoryCheckAnnotation] = reason
268
-	if _, err := c.streams.ImageStreams(stream.Namespace).Update(stream); err != nil && !errors.IsNotFound(err) {
269
-		if errors.IsConflict(err) && retry > 0 {
270
-			if stream, err := c.streams.ImageStreams(stream.Namespace).Get(stream.Name); err == nil {
271
-				return c.done(stream, reason, retry-1)
272
-			}
273
-		}
268
+	if _, err := c.streams.ImageStreams(stream.Namespace).Update(stream); err != nil {
274 269
 		return err
275 270
 	}
276 271
 	return nil
277 272
deleted file mode 100644
... ...
@@ -1,59 +0,0 @@
1
-package controller
2
-
3
-import (
4
-	"time"
5
-
6
-	kapi "k8s.io/kubernetes/pkg/api"
7
-	"k8s.io/kubernetes/pkg/client/cache"
8
-	"k8s.io/kubernetes/pkg/fields"
9
-	"k8s.io/kubernetes/pkg/labels"
10
-	"k8s.io/kubernetes/pkg/runtime"
11
-	"k8s.io/kubernetes/pkg/util"
12
-	kutil "k8s.io/kubernetes/pkg/util"
13
-	"k8s.io/kubernetes/pkg/watch"
14
-
15
-	"github.com/openshift/origin/pkg/client"
16
-	"github.com/openshift/origin/pkg/controller"
17
-	"github.com/openshift/origin/pkg/image/api"
18
-)
19
-
20
-// ImportControllerFactory can create an ImportController.
21
-type ImportControllerFactory struct {
22
-	Client client.Interface
23
-}
24
-
25
-// Create creates an ImportController.
26
-func (f *ImportControllerFactory) Create() controller.RunnableController {
27
-	lw := &cache.ListWatch{
28
-		ListFunc: func() (runtime.Object, error) {
29
-			return f.Client.ImageStreams(kapi.NamespaceAll).List(labels.Everything(), fields.Everything())
30
-		},
31
-		WatchFunc: func(resourceVersion string) (watch.Interface, error) {
32
-			return f.Client.ImageStreams(kapi.NamespaceAll).Watch(labels.Everything(), fields.Everything(), resourceVersion)
33
-		},
34
-	}
35
-	q := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
36
-	cache.NewReflector(lw, &api.ImageStream{}, q, 2*time.Minute).Run()
37
-
38
-	c := &ImportController{
39
-		streams:  f.Client,
40
-		mappings: f.Client,
41
-	}
42
-
43
-	return &controller.RetryController{
44
-		Queue: q,
45
-		RetryManager: controller.NewQueueRetryManager(
46
-			q,
47
-			cache.MetaNamespaceKeyFunc,
48
-			func(obj interface{}, err error, retries controller.Retry) bool {
49
-				util.HandleError(err)
50
-				return retries.Count < 5
51
-			},
52
-			kutil.NewTokenBucketRateLimiter(1, 10),
53
-		),
54
-		Handle: func(obj interface{}) error {
55
-			r := obj.(*api.ImageStream)
56
-			return c.Next(r)
57
-		},
58
-	}
59
-}
... ...
@@ -47,6 +47,7 @@ os::cmd::expect_success_and_text "oc get imageStreams postgresql --template='{{.
47 47
 os::cmd::expect_success_and_text "oc get imageStreams mongodb --template='{{.status.dockerImageRepository}}'" 'mongodb'
48 48
 # verify the image repository had its tags populated
49 49
 os::cmd::try_until_success 'oc get imagestreamtags wildfly:latest'
50
+os::cmd::try_until_success "oc get imageStreams wildfly --template='{{ index .metadata.annotations \"openshift.io/image.dockerRepositoryCheck\"}}'"
50 51
 os::cmd::expect_success_and_text "oc get imageStreams wildfly --template='{{ index .metadata.annotations \"openshift.io/image.dockerRepositoryCheck\"}}'" '[0-9]{4}\-[0-9]{2}\-[0-9]{2}' # expect a date like YYYY-MM-DD
51 52
 os::cmd::expect_success_and_text 'oc get istag' 'wildfly'
52 53
 os::cmd::expect_success 'oc annotate istag/wildfly:latest foo=bar'
... ...
@@ -65,6 +66,7 @@ os::cmd::expect_failure 'oc get imageStreams mongodb'
65 65
 os::cmd::expect_failure 'oc get imageStreams wildfly'
66 66
 os::cmd::try_until_success 'oc get imagestreamTags mysql:5.5'
67 67
 os::cmd::try_until_success 'oc get imagestreamTags mysql:5.6'
68
+os::cmd::try_until_success "oc get imagestreams mysql --template='{{ index .metadata.annotations \"openshift.io/image.dockerRepositoryCheck\"}}'"
68 69
 os::cmd::expect_success_and_text "oc get imagestreams mysql --template='{{ index .metadata.annotations \"openshift.io/image.dockerRepositoryCheck\"}}'" '[0-9]{4}\-[0-9]{2}\-[0-9]{2}' # expect a date like YYYY-MM-DD
69 70
 os::cmd::expect_success 'oc describe istag/mysql:latest'
70 71
 os::cmd::expect_success_and_text 'oc describe istag/mysql:latest' 'Environment:'