Browse code

Tease apart separate concerns in RetryController

Also be more consistent about handling errors the same everywhere (for now)

Clayton Coleman authored on 2015/03/13 10:06:49
Showing 8 changed files
... ...
@@ -23,6 +23,12 @@ import (
23 23
 	imageapi "github.com/openshift/origin/pkg/image/api"
24 24
 )
25 25
 
26
+// logAndRetry retries forever - BuildPodController currently has no fatal errors
27
+func logAndRetry(obj interface{}, err error, _ int) bool {
28
+	kutil.HandleError(err)
29
+	return true
30
+}
31
+
26 32
 // BuildControllerFactory constructs BuildController objects
27 33
 type BuildControllerFactory struct {
28 34
 	OSClient            osclient.Interface
... ...
@@ -54,12 +60,7 @@ func (factory *BuildControllerFactory) Create() controller.RunnableController {
54 54
 
55 55
 	return &controller.RetryController{
56 56
 		Queue:        queue,
57
-		RetryManager: controller.NewQueueRetryManager(queue, cache.MetaNamespaceKeyFunc, -1),
58
-		ShouldRetry: func(obj interface{}, err error) bool {
59
-			kutil.HandleError(err)
60
-			// BuildController currently has no fatal errors.
61
-			return true
62
-		},
57
+		RetryManager: controller.NewQueueRetryManager(queue, cache.MetaNamespaceKeyFunc, logAndRetry),
63 58
 		Handle: func(obj interface{}) error {
64 59
 			build := obj.(*buildapi.Build)
65 60
 			return buildController.HandleBuild(build)
... ...
@@ -103,12 +104,7 @@ func (factory *BuildPodControllerFactory) Create() controller.RunnableController
103 103
 
104 104
 	return &controller.RetryController{
105 105
 		Queue:        queue,
106
-		RetryManager: controller.NewQueueRetryManager(queue, cache.MetaNamespaceKeyFunc, -1),
107
-		ShouldRetry: func(obj interface{}, err error) bool {
108
-			kutil.HandleError(err)
109
-			// BuildPodController currently has no fatal errors.
110
-			return true
111
-		},
106
+		RetryManager: controller.NewQueueRetryManager(queue, cache.MetaNamespaceKeyFunc, logAndRetry),
112 107
 		Handle: func(obj interface{}) error {
113 108
 			pod := obj.(*kapi.Pod)
114 109
 			return buildPodController.HandlePod(pod)
... ...
@@ -143,15 +139,18 @@ func (factory *ImageChangeControllerFactory) Create() controller.RunnableControl
143 143
 	}
144 144
 
145 145
 	return &controller.RetryController{
146
-		Queue:        queue,
147
-		RetryManager: controller.NewQueueRetryManager(queue, cache.MetaNamespaceKeyFunc, -1),
148
-		ShouldRetry: func(obj interface{}, err error) bool {
149
-			kutil.HandleError(err)
150
-			if _, isFatal := err.(buildcontroller.ImageChangeControllerFatalError); isFatal {
151
-				return false
152
-			}
153
-			return true
154
-		},
146
+		Queue: queue,
147
+		RetryManager: controller.NewQueueRetryManager(
148
+			queue,
149
+			cache.MetaNamespaceKeyFunc,
150
+			func(obj interface{}, err error, _ int) bool {
151
+				kutil.HandleError(err)
152
+				if _, isFatal := err.(buildcontroller.ImageChangeControllerFatalError); isFatal {
153
+					return false
154
+				}
155
+				return true
156
+			},
157
+		),
155 158
 		Handle: func(obj interface{}) error {
156 159
 			imageRepo := obj.(*imageapi.ImageRepository)
157 160
 			return imageChangeController.HandleImageRepo(imageRepo)
... ...
@@ -16,25 +16,20 @@ type RunnableController interface {
16 16
 // which failed to be successfully handled.
17 17
 type RetryController struct {
18 18
 	// Queue is where work is retrieved for Handle.
19
-	Queue Queue
19
+	Queue
20 20
 
21 21
 	// Handle is expected to process the next resource from the queue.
22 22
 	Handle func(interface{}) error
23 23
 
24
-	// ShouldRetry returns true if the resource and error returned from
25
-	// HandleNext should trigger a retry via the RetryManager.
26
-	ShouldRetry func(interface{}, error) bool
27
-
28 24
 	// RetryManager is fed the handled resource if Handle returns a Retryable
29 25
 	// error. If Handle returns no error, the RetryManager is asked to forget
30 26
 	// the resource.
31
-	RetryManager RetryManager
27
+	RetryManager
32 28
 }
33 29
 
34 30
 // Queue is a narrow abstraction of a cache.FIFO.
35 31
 type Queue interface {
36 32
 	Pop() interface{}
37
-	AddIfNotPresent(interface{}) error
38 33
 }
39 34
 
40 35
 // Run begins processing resources from Queue asynchronously.
... ...
@@ -42,17 +37,19 @@ func (c *RetryController) Run() {
42 42
 	go kutil.Forever(func() { c.handleOne(c.Queue.Pop()) }, 0)
43 43
 }
44 44
 
45
+// RunUntil begins processing resources from Queue asynchronously until stopCh is closed.
46
+func (c *RetryController) RunUntil(stopCh <-chan struct{}) {
47
+	go kutil.Until(func() { c.handleOne(c.Queue.Pop()) }, 0, stopCh)
48
+}
49
+
45 50
 // handleOne processes resource with Handle. If Handle returns a retryable
46 51
 // error, the handled resource is passed to the RetryManager. If no error is
47 52
 // returned from Handle, the RetryManager is asked to forget the processed
48 53
 // resource.
49 54
 func (c *RetryController) handleOne(resource interface{}) {
50
-	err := c.Handle(resource)
51
-	if err != nil {
52
-		if c.ShouldRetry(resource, err) {
53
-			c.RetryManager.Retry(resource)
54
-			return
55
-		}
55
+	if err := c.Handle(resource); err != nil {
56
+		c.Retry(resource, err)
57
+		return
56 58
 	}
57 59
 	c.RetryManager.Forget(resource)
58 60
 }
... ...
@@ -62,7 +59,7 @@ func (c *RetryController) handleOne(resource interface{}) {
62 62
 type RetryManager interface {
63 63
 	// Retry will cause resource processing to be retried (for example, by
64 64
 	// requeueing resource)
65
-	Retry(resource interface{})
65
+	Retry(resource interface{}, err error)
66 66
 
67 67
 	// Forget will cause the manager to erase all prior knowledge of resource
68 68
 	// and reclaim internal resources associated with state tracking of
... ...
@@ -70,38 +67,46 @@ type RetryManager interface {
70 70
 	Forget(resource interface{})
71 71
 }
72 72
 
73
-// QueueRetryManager retries a resource by re-queueing it into a Queue up to
74
-// MaxRetries number of times.
73
+// RetryFunc should return true if the given object and error should be retried after
74
+// the provided number of times.
75
+type RetryFunc func(obj interface{}, err error, retries int) bool
76
+
77
+// QueueRetryManager retries a resource by re-queueing it into a ReQueue as long as
78
+// retryFunc returns true.
75 79
 type QueueRetryManager struct {
76 80
 	// queue is where resources are re-queued.
77
-	queue Queue
81
+	queue ReQueue
78 82
 
79 83
 	// keyFunc is used to index resources.
80 84
 	keyFunc kcache.KeyFunc
81 85
 
82
-	// maxRetries is the total number of attempts to requeue an individual
83
-	// resource before giving up. A value of -1 is interpreted as retry forever.
84
-	maxRetries int
86
+	// retryFunc returns true if the resource and error returned should be retried.
87
+	retryFunc RetryFunc
85 88
 
86 89
 	// retries maps resources to their current retry count.
87 90
 	retries map[string]int
88 91
 }
89 92
 
93
+// ReQueue is a queue that allows an object to be requeued
94
+type ReQueue interface {
95
+	Queue
96
+	AddIfNotPresent(interface{}) error
97
+}
98
+
90 99
 // NewQueueRetryManager safely creates a new QueueRetryManager.
91
-func NewQueueRetryManager(queue Queue, keyFunc kcache.KeyFunc, maxRetries int) *QueueRetryManager {
100
+func NewQueueRetryManager(queue ReQueue, keyFn kcache.KeyFunc, retryFn RetryFunc) *QueueRetryManager {
92 101
 	return &QueueRetryManager{
93
-		queue:      queue,
94
-		keyFunc:    keyFunc,
95
-		maxRetries: maxRetries,
96
-		retries:    make(map[string]int),
102
+		queue:     queue,
103
+		keyFunc:   keyFn,
104
+		retryFunc: retryFn,
105
+		retries:   make(map[string]int),
97 106
 	}
98 107
 }
99 108
 
100
-// Retry will enqueue resource until maxRetries for that resource has been
101
-// exceeded, at which point resource will be forgotten and no longer retried.
102
-//
103
-// A maxRetries value of -1 is interpreted as retry forever.
104
-func (r *QueueRetryManager) Retry(resource interface{}) {
109
+// Retry will enqueue resource until retryFunc returns false for that resource has been
110
+// exceeded, at which point resource will be forgotten and no longer retried. The current
111
+// retry count will be passed to each invocation of retryFunc.
112
+func (r *QueueRetryManager) Retry(resource interface{}, err error) {
105 113
 	id, _ := r.keyFunc(resource)
106 114
 
107 115
 	if _, exists := r.retries[id]; !exists {
... ...
@@ -109,7 +114,7 @@ func (r *QueueRetryManager) Retry(resource interface{}) {
109 109
 	}
110 110
 	tries := r.retries[id]
111 111
 
112
-	if tries < r.maxRetries || r.maxRetries == -1 {
112
+	if r.retryFunc(resource, err, tries) {
113 113
 		// It's important to use AddIfNotPresent to prevent overwriting newer
114 114
 		// state in the queue which may have arrived asynchronously.
115 115
 		r.queue.AddIfNotPresent(resource)
... ...
@@ -10,16 +10,17 @@ import (
10 10
 
11 11
 func TestRetryController_handleOneRetryableError(t *testing.T) {
12 12
 	retried := false
13
+	retryErr := fmt.Errorf("retryable error")
13 14
 
14 15
 	controller := &RetryController{
15 16
 		Handle: func(obj interface{}) error {
16
-			return fmt.Errorf("retryable error")
17
-		},
18
-		ShouldRetry: func(interface{}, error) bool {
19
-			return true
17
+			return retryErr
20 18
 		},
21 19
 		RetryManager: &testRetryManager{
22
-			RetryFunc: func(resource interface{}) {
20
+			RetryFunc: func(resource interface{}, err error) {
21
+				if err != retryErr {
22
+					t.Fatalf("unexpected error: %v", err)
23
+				}
23 24
 				retried = true
24 25
 			},
25 26
 			ForgetFunc: func(resource interface{}) {
... ...
@@ -35,33 +36,6 @@ func TestRetryController_handleOneRetryableError(t *testing.T) {
35 35
 	}
36 36
 }
37 37
 
38
-func TestRetryController_handleOneFatalError(t *testing.T) {
39
-	forgotten := false
40
-
41
-	controller := &RetryController{
42
-		Handle: func(obj interface{}) error {
43
-			return fmt.Errorf("fatal error")
44
-		},
45
-		ShouldRetry: func(interface{}, error) bool {
46
-			return false
47
-		},
48
-		RetryManager: &testRetryManager{
49
-			RetryFunc: func(resource interface{}) {
50
-				t.Fatalf("unexpected call to retry %v", resource)
51
-			},
52
-			ForgetFunc: func(resource interface{}) {
53
-				forgotten = true
54
-			},
55
-		},
56
-	}
57
-
58
-	controller.handleOne(struct{}{})
59
-
60
-	if !forgotten {
61
-		t.Fatalf("expected to forget")
62
-	}
63
-}
64
-
65 38
 func TestRetryController_handleOneNoError(t *testing.T) {
66 39
 	forgotten := false
67 40
 
... ...
@@ -69,12 +43,8 @@ func TestRetryController_handleOneNoError(t *testing.T) {
69 69
 		Handle: func(obj interface{}) error {
70 70
 			return nil
71 71
 		},
72
-		ShouldRetry: func(interface{}, error) bool {
73
-			t.Fatalf("unexpected retry check")
74
-			return true
75
-		},
76 72
 		RetryManager: &testRetryManager{
77
-			RetryFunc: func(resource interface{}) {
73
+			RetryFunc: func(resource interface{}, err error) {
78 74
 				t.Fatalf("unexpected call to retry %v", resource)
79 75
 			},
80 76
 			ForgetFunc: func(resource interface{}) {
... ...
@@ -109,8 +79,13 @@ func TestQueueRetryManager_retries(t *testing.T) {
109 109
 		keyFunc: func(obj interface{}) (string, error) {
110 110
 			return obj.(testObj).id, nil
111 111
 		},
112
-		maxRetries: retries,
113
-		retries:    make(map[string]int),
112
+		retryFunc: func(obj interface{}, err error, count int) bool {
113
+			if count > 4 {
114
+				return false
115
+			}
116
+			return true
117
+		},
118
+		retries: make(map[string]int),
114 119
 	}
115 120
 
116 121
 	objects := []testObj{
... ...
@@ -122,7 +97,7 @@ func TestQueueRetryManager_retries(t *testing.T) {
122 122
 	// Retry one more than the max
123 123
 	for _, obj := range objects {
124 124
 		for i := 0; i < retries+1; i++ {
125
-			manager.Retry(obj)
125
+			manager.Retry(obj, nil)
126 126
 		}
127 127
 	}
128 128
 
... ...
@@ -154,10 +129,7 @@ func TestRetryController_realFifoEventOrdering(t *testing.T) {
154 154
 
155 155
 	controller := &RetryController{
156 156
 		Queue:        fifo,
157
-		RetryManager: NewQueueRetryManager(fifo, keyFunc, 1),
158
-		ShouldRetry: func(interface{}, error) bool {
159
-			return true
160
-		},
157
+		RetryManager: NewQueueRetryManager(fifo, keyFunc, func(_ interface{}, _ error, _ int) bool { return true }),
161 158
 		Handle: func(obj interface{}) error {
162 159
 			if e, a := 1, obj.(testObj).value; e != a {
163 160
 				t.Fatalf("expected to handle test value %d, got %d")
... ...
@@ -204,12 +176,12 @@ func (t *testFifo) Pop() interface{} {
204 204
 }
205 205
 
206 206
 type testRetryManager struct {
207
-	RetryFunc  func(resource interface{})
207
+	RetryFunc  func(resource interface{}, err error)
208 208
 	ForgetFunc func(resource interface{})
209 209
 }
210 210
 
211
-func (m *testRetryManager) Retry(resource interface{}) {
212
-	m.RetryFunc(resource)
211
+func (m *testRetryManager) Retry(resource interface{}, err error) {
212
+	m.RetryFunc(resource, err)
213 213
 }
214 214
 
215 215
 func (m *testRetryManager) Forget(resource interface{}) {
... ...
@@ -57,15 +57,21 @@ func (factory *DeploymentConfigChangeControllerFactory) Create() controller.Runn
57 57
 	}
58 58
 
59 59
 	return &controller.RetryController{
60
-		Queue:        queue,
61
-		RetryManager: controller.NewQueueRetryManager(queue, cache.MetaNamespaceKeyFunc, 1),
62
-		ShouldRetry: func(obj interface{}, err error) bool {
63
-			if _, isFatal := err.(fatalError); isFatal {
60
+		Queue: queue,
61
+		RetryManager: controller.NewQueueRetryManager(
62
+			queue,
63
+			cache.MetaNamespaceKeyFunc,
64
+			func(obj interface{}, err error, count int) bool {
64 65
 				kutil.HandleError(err)
65
-				return false
66
-			}
67
-			return true
68
-		},
66
+				if _, isFatal := err.(fatalError); isFatal {
67
+					return false
68
+				}
69
+				if count > 0 {
70
+					return false
71
+				}
72
+				return true
73
+			},
74
+		),
69 75
 		Handle: func(obj interface{}) error {
70 76
 			config := obj.(*deployapi.DeploymentConfig)
71 77
 			return changeController.Handle(config)
... ...
@@ -67,11 +67,12 @@ func (factory *DeployerPodControllerFactory) Create() controller.RunnableControl
67 67
 	}
68 68
 
69 69
 	return &controller.RetryController{
70
-		Queue:        podQueue,
71
-		RetryManager: controller.NewQueueRetryManager(podQueue, cache.MetaNamespaceKeyFunc, 1),
72
-		ShouldRetry: func(obj interface{}, err error) bool {
73
-			return true
74
-		},
70
+		Queue: podQueue,
71
+		RetryManager: controller.NewQueueRetryManager(
72
+			podQueue,
73
+			cache.MetaNamespaceKeyFunc,
74
+			func(obj interface{}, err error, count int) bool { return count < 1 },
75
+		),
75 76
 		Handle: func(obj interface{}) error {
76 77
 			pod := obj.(*kapi.Pod)
77 78
 			return podController.Handle(pod)
... ...
@@ -68,15 +68,21 @@ func (factory *DeploymentControllerFactory) Create() controller.RunnableControll
68 68
 	}
69 69
 
70 70
 	return &controller.RetryController{
71
-		Queue:        deploymentQueue,
72
-		RetryManager: controller.NewQueueRetryManager(deploymentQueue, cache.MetaNamespaceKeyFunc, 1),
73
-		ShouldRetry: func(obj interface{}, err error) bool {
74
-			if _, isFatal := err.(fatalError); isFatal {
75
-				kutil.HandleError(err)
76
-				return false
77
-			}
78
-			return true
79
-		},
71
+		Queue: deploymentQueue,
72
+		RetryManager: controller.NewQueueRetryManager(
73
+			deploymentQueue,
74
+			cache.MetaNamespaceKeyFunc,
75
+			func(obj interface{}, err error, count int) bool {
76
+				if _, isFatal := err.(fatalError); isFatal {
77
+					kutil.HandleError(err)
78
+					return false
79
+				}
80
+				if count > 1 {
81
+					return false
82
+				}
83
+				return true
84
+			},
85
+		),
80 86
 		Handle: func(obj interface{}) error {
81 87
 			deployment := obj.(*kapi.ReplicationController)
82 88
 			return deployController.Handle(deployment)
... ...
@@ -54,15 +54,21 @@ func (factory *DeploymentConfigControllerFactory) Create() controller.RunnableCo
54 54
 	}
55 55
 
56 56
 	return &controller.RetryController{
57
-		Queue:        queue,
58
-		RetryManager: controller.NewQueueRetryManager(queue, cache.MetaNamespaceKeyFunc, 1),
59
-		ShouldRetry: func(obj interface{}, err error) bool {
60
-			if _, isFatal := err.(fatalError); isFatal {
57
+		Queue: queue,
58
+		RetryManager: controller.NewQueueRetryManager(
59
+			queue,
60
+			cache.MetaNamespaceKeyFunc,
61
+			func(obj interface{}, err error, count int) bool {
61 62
 				kutil.HandleError(err)
62
-				return false
63
-			}
64
-			return true
65
-		},
63
+				if _, isFatal := err.(fatalError); isFatal {
64
+					return false
65
+				}
66
+				if count > 0 {
67
+					return false
68
+				}
69
+				return true
70
+			},
71
+		),
66 72
 		Handle: func(obj interface{}) error {
67 73
 			config := obj.(*deployapi.DeploymentConfig)
68 74
 			return configController.Handle(config)
... ...
@@ -66,15 +66,21 @@ func (factory *ImageChangeControllerFactory) Create() controller.RunnableControl
66 66
 	}
67 67
 
68 68
 	return &controller.RetryController{
69
-		Queue:        queue,
70
-		RetryManager: controller.NewQueueRetryManager(queue, cache.MetaNamespaceKeyFunc, 1),
71
-		ShouldRetry: func(obj interface{}, err error) bool {
72
-			if _, isFatal := err.(fatalError); isFatal {
69
+		Queue: queue,
70
+		RetryManager: controller.NewQueueRetryManager(
71
+			queue,
72
+			cache.MetaNamespaceKeyFunc,
73
+			func(obj interface{}, err error, count int) bool {
73 74
 				kutil.HandleError(err)
74
-				return false
75
-			}
76
-			return true
77
-		},
75
+				if _, isFatal := err.(fatalError); isFatal {
76
+					return false
77
+				}
78
+				if count > 0 {
79
+					return false
80
+				}
81
+				return true
82
+			},
83
+		),
78 84
 		Handle: func(obj interface{}) error {
79 85
 			repo := obj.(*imageapi.ImageRepository)
80 86
 			return changeController.Handle(repo)