Introduce a reusable controller package which facilitates safe retries
for controllers which make use of cache.FIFO.
Port the DeploymentConfigController to use the new support as a reference
implementation of the pattern.
| ... | ... |
@@ -44,6 +44,7 @@ import ( |
| 44 | 44 |
osclient "github.com/openshift/origin/pkg/client" |
| 45 | 45 |
cmdutil "github.com/openshift/origin/pkg/cmd/util" |
| 46 | 46 |
"github.com/openshift/origin/pkg/cmd/util/clientcmd" |
| 47 |
+ deployconfigcontroller "github.com/openshift/origin/pkg/deploy/controller/deploymentconfig" |
|
| 47 | 48 |
deploycontrollerfactory "github.com/openshift/origin/pkg/deploy/controller/factory" |
| 48 | 49 |
deployconfiggenerator "github.com/openshift/origin/pkg/deploy/generator" |
| 49 | 50 |
deployregistry "github.com/openshift/origin/pkg/deploy/registry/deploy" |
| ... | ... |
@@ -710,7 +711,7 @@ func (c *MasterConfig) RunDeploymentController() {
|
| 710 | 710 |
|
| 711 | 711 |
func (c *MasterConfig) RunDeploymentConfigController() {
|
| 712 | 712 |
osclient, kclient := c.DeploymentConfigControllerClients() |
| 713 |
- factory := deploycontrollerfactory.DeploymentConfigControllerFactory{
|
|
| 713 |
+ factory := deployconfigcontroller.DeploymentConfigControllerFactory{
|
|
| 714 | 714 |
Client: osclient, |
| 715 | 715 |
KubeClient: kclient, |
| 716 | 716 |
Codec: latest.Codec, |
| 717 | 717 |
new file mode 100644 |
| ... | ... |
@@ -0,0 +1,126 @@ |
| 0 |
+package controller |
|
| 1 |
+ |
|
| 2 |
+import ( |
|
| 3 |
+ kcache "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" |
|
| 4 |
+ kutil "github.com/GoogleCloudPlatform/kubernetes/pkg/util" |
|
| 5 |
+) |
|
| 6 |
+ |
|
| 7 |
+// RunnableController is a controller which implements a Run loop. |
|
| 8 |
+type RunnableController interface {
|
|
| 9 |
+ // Run starts the asynchronous controller loop. |
|
| 10 |
+ Run() |
|
| 11 |
+} |
|
| 12 |
+ |
|
| 13 |
+// RetryController is a RunnableController which delegates resource |
|
| 14 |
+// handling to a function and knows how to safely manage retries of a resource |
|
| 15 |
+// which failed to be successfully handled. |
|
| 16 |
+type RetryController struct {
|
|
| 17 |
+ // Queue is where work is retrieved for Handle. |
|
| 18 |
+ Queue Queue |
|
| 19 |
+ |
|
| 20 |
+ // Handle is expected to process the next resource from the queue. |
|
| 21 |
+ Handle func(interface{}) error
|
|
| 22 |
+ |
|
| 23 |
+ // ShouldRetry returns true if the resource and error returned from |
|
| 24 |
+ // HandleNext should trigger a retry via the RetryManager. |
|
| 25 |
+ ShouldRetry func(interface{}, error) bool
|
|
| 26 |
+ |
|
| 27 |
+ // RetryManager is fed the handled resource if Handle returns a Retryable |
|
| 28 |
+ // error. If Handle returns no error, the RetryManager is asked to forget |
|
| 29 |
+ // the resource. |
|
| 30 |
+ RetryManager RetryManager |
|
| 31 |
+} |
|
| 32 |
+ |
|
| 33 |
+// Queue is a narrow abstraction of a cache.FIFO. |
|
| 34 |
+type Queue interface {
|
|
| 35 |
+ Pop() interface{}
|
|
| 36 |
+ AddIfNotPresent(interface{}) error
|
|
| 37 |
+} |
|
| 38 |
+ |
|
| 39 |
+// Run begins processing resources from Queue asynchronously. |
|
| 40 |
+func (c *RetryController) Run() {
|
|
| 41 |
+ go kutil.Forever(func() { c.handleOne(c.Queue.Pop()) }, 0)
|
|
| 42 |
+} |
|
| 43 |
+ |
|
| 44 |
+// handleOne processes resource with Handle. If Handle returns a retryable |
|
| 45 |
+// error, the handled resource is passed to the RetryManager. If no error is |
|
| 46 |
+// returned from Handle, the RetryManager is asked to forget the processed |
|
| 47 |
+// resource. |
|
| 48 |
+func (c *RetryController) handleOne(resource interface{}) {
|
|
| 49 |
+ err := c.Handle(resource) |
|
| 50 |
+ if err != nil {
|
|
| 51 |
+ if c.ShouldRetry(resource, err) {
|
|
| 52 |
+ c.RetryManager.Retry(resource) |
|
| 53 |
+ return |
|
| 54 |
+ } |
|
| 55 |
+ } |
|
| 56 |
+ c.RetryManager.Forget(resource) |
|
| 57 |
+} |
|
| 58 |
+ |
|
| 59 |
+// RetryManager knows how to retry processing of a resource, and how to forget |
|
| 60 |
+// a resource it may be tracking the state of. |
|
| 61 |
+type RetryManager interface {
|
|
| 62 |
+ // Retry will cause resource processing to be retried (for example, by |
|
| 63 |
+ // requeueing resource) |
|
| 64 |
+ Retry(resource interface{})
|
|
| 65 |
+ |
|
| 66 |
+ // Forget will cause the manager to erase all prior knowledge of resource |
|
| 67 |
+ // and reclaim internal resources associated with state tracking of |
|
| 68 |
+ // resource. |
|
| 69 |
+ Forget(resource interface{})
|
|
| 70 |
+} |
|
| 71 |
+ |
|
| 72 |
+// QueueRetryManager retries a resource by re-queueing it into a Queue up to |
|
| 73 |
+// MaxRetries number of times. |
|
| 74 |
+type QueueRetryManager struct {
|
|
| 75 |
+ // queue is where resources are re-queued. |
|
| 76 |
+ queue Queue |
|
| 77 |
+ |
|
| 78 |
+ // keyFunc is used to index resources. |
|
| 79 |
+ keyFunc kcache.KeyFunc |
|
| 80 |
+ |
|
| 81 |
+ // maxRetries is the total number of attempts to requeue an individual |
|
| 82 |
+ // resource before giving up. A value of -1 is interpreted as retry forever. |
|
| 83 |
+ maxRetries int |
|
| 84 |
+ |
|
| 85 |
+ // retries maps resources to their current retry count. |
|
| 86 |
+ retries map[string]int |
|
| 87 |
+} |
|
| 88 |
+ |
|
| 89 |
+// NewQueueRetryManager safely creates a new QueueRetryManager. |
|
| 90 |
+func NewQueueRetryManager(queue Queue, keyFunc kcache.KeyFunc, maxRetries int) *QueueRetryManager {
|
|
| 91 |
+ return &QueueRetryManager{
|
|
| 92 |
+ queue: queue, |
|
| 93 |
+ keyFunc: keyFunc, |
|
| 94 |
+ maxRetries: maxRetries, |
|
| 95 |
+ retries: make(map[string]int), |
|
| 96 |
+ } |
|
| 97 |
+} |
|
| 98 |
+ |
|
| 99 |
+// Retry will enqueue resource until maxRetries for that resource has been |
|
| 100 |
+// exceeded, at which point resource will be forgotten and no longer retried. |
|
| 101 |
+// |
|
| 102 |
+// A maxRetries value of -1 is interpreted as retry forever. |
|
| 103 |
+func (r *QueueRetryManager) Retry(resource interface{}) {
|
|
| 104 |
+ id, _ := r.keyFunc(resource) |
|
| 105 |
+ |
|
| 106 |
+ if _, exists := r.retries[id]; !exists {
|
|
| 107 |
+ r.retries[id] = 0 |
|
| 108 |
+ } |
|
| 109 |
+ tries := r.retries[id] |
|
| 110 |
+ |
|
| 111 |
+ if tries < r.maxRetries || r.maxRetries == -1 {
|
|
| 112 |
+ // It's important to use AddIfNotPresent to prevent overwriting newer |
|
| 113 |
+ // state in the queue which may have arrived asynchronously. |
|
| 114 |
+ r.queue.AddIfNotPresent(resource) |
|
| 115 |
+ r.retries[id] = tries + 1 |
|
| 116 |
+ } else {
|
|
| 117 |
+ r.Forget(resource) |
|
| 118 |
+ } |
|
| 119 |
+} |
|
| 120 |
+ |
|
| 121 |
+// Forget resets the retry count for resource. |
|
| 122 |
+func (r *QueueRetryManager) Forget(resource interface{}) {
|
|
| 123 |
+ id, _ := r.keyFunc(resource) |
|
| 124 |
+ delete(r.retries, id) |
|
| 125 |
+} |
| 0 | 126 |
new file mode 100644 |
| ... | ... |
@@ -0,0 +1,217 @@ |
| 0 |
+package controller |
|
| 1 |
+ |
|
| 2 |
+import ( |
|
| 3 |
+ "fmt" |
|
| 4 |
+ "sync" |
|
| 5 |
+ "testing" |
|
| 6 |
+ |
|
| 7 |
+ kcache "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" |
|
| 8 |
+) |
|
| 9 |
+ |
|
| 10 |
+func TestRetryController_handleOneRetryableError(t *testing.T) {
|
|
| 11 |
+ retried := false |
|
| 12 |
+ |
|
| 13 |
+ controller := &RetryController{
|
|
| 14 |
+ Handle: func(obj interface{}) error {
|
|
| 15 |
+ return fmt.Errorf("retryable error")
|
|
| 16 |
+ }, |
|
| 17 |
+ ShouldRetry: func(interface{}, error) bool {
|
|
| 18 |
+ return true |
|
| 19 |
+ }, |
|
| 20 |
+ RetryManager: &testRetryManager{
|
|
| 21 |
+ RetryFunc: func(resource interface{}) {
|
|
| 22 |
+ retried = true |
|
| 23 |
+ }, |
|
| 24 |
+ ForgetFunc: func(resource interface{}) {
|
|
| 25 |
+ t.Fatalf("unexpected call to forget %v", resource)
|
|
| 26 |
+ }, |
|
| 27 |
+ }, |
|
| 28 |
+ } |
|
| 29 |
+ |
|
| 30 |
+ controller.handleOne(struct{}{})
|
|
| 31 |
+ |
|
| 32 |
+ if !retried {
|
|
| 33 |
+ t.Fatalf("expected a retry")
|
|
| 34 |
+ } |
|
| 35 |
+} |
|
| 36 |
+ |
|
| 37 |
+func TestRetryController_handleOneFatalError(t *testing.T) {
|
|
| 38 |
+ forgotten := false |
|
| 39 |
+ |
|
| 40 |
+ controller := &RetryController{
|
|
| 41 |
+ Handle: func(obj interface{}) error {
|
|
| 42 |
+ return fmt.Errorf("fatal error")
|
|
| 43 |
+ }, |
|
| 44 |
+ ShouldRetry: func(interface{}, error) bool {
|
|
| 45 |
+ return false |
|
| 46 |
+ }, |
|
| 47 |
+ RetryManager: &testRetryManager{
|
|
| 48 |
+ RetryFunc: func(resource interface{}) {
|
|
| 49 |
+ t.Fatalf("unexpected call to retry %v", resource)
|
|
| 50 |
+ }, |
|
| 51 |
+ ForgetFunc: func(resource interface{}) {
|
|
| 52 |
+ forgotten = true |
|
| 53 |
+ }, |
|
| 54 |
+ }, |
|
| 55 |
+ } |
|
| 56 |
+ |
|
| 57 |
+ controller.handleOne(struct{}{})
|
|
| 58 |
+ |
|
| 59 |
+ if !forgotten {
|
|
| 60 |
+ t.Fatalf("expected to forget")
|
|
| 61 |
+ } |
|
| 62 |
+} |
|
| 63 |
+ |
|
| 64 |
+func TestRetryController_handleOneNoError(t *testing.T) {
|
|
| 65 |
+ forgotten := false |
|
| 66 |
+ |
|
| 67 |
+ controller := &RetryController{
|
|
| 68 |
+ Handle: func(obj interface{}) error {
|
|
| 69 |
+ return nil |
|
| 70 |
+ }, |
|
| 71 |
+ ShouldRetry: func(interface{}, error) bool {
|
|
| 72 |
+ t.Fatalf("unexpected retry check")
|
|
| 73 |
+ return true |
|
| 74 |
+ }, |
|
| 75 |
+ RetryManager: &testRetryManager{
|
|
| 76 |
+ RetryFunc: func(resource interface{}) {
|
|
| 77 |
+ t.Fatalf("unexpected call to retry %v", resource)
|
|
| 78 |
+ }, |
|
| 79 |
+ ForgetFunc: func(resource interface{}) {
|
|
| 80 |
+ forgotten = true |
|
| 81 |
+ }, |
|
| 82 |
+ }, |
|
| 83 |
+ } |
|
| 84 |
+ |
|
| 85 |
+ controller.handleOne(struct{}{})
|
|
| 86 |
+ |
|
| 87 |
+ if !forgotten {
|
|
| 88 |
+ t.Fatalf("expected to forget")
|
|
| 89 |
+ } |
|
| 90 |
+} |
|
| 91 |
+ |
|
| 92 |
+func TestQueueRetryManager_retries(t *testing.T) {
|
|
| 93 |
+ retries := 5 |
|
| 94 |
+ requeued := map[string]int{}
|
|
| 95 |
+ |
|
| 96 |
+ manager := &QueueRetryManager{
|
|
| 97 |
+ queue: &testFifo{
|
|
| 98 |
+ // Track re-queues |
|
| 99 |
+ AddIfNotPresentFunc: func(obj interface{}) error {
|
|
| 100 |
+ id := obj.(testObj).id |
|
| 101 |
+ if _, exists := requeued[id]; !exists {
|
|
| 102 |
+ requeued[id] = 0 |
|
| 103 |
+ } |
|
| 104 |
+ requeued[id] = requeued[id] + 1 |
|
| 105 |
+ return nil |
|
| 106 |
+ }, |
|
| 107 |
+ }, |
|
| 108 |
+ keyFunc: func(obj interface{}) (string, error) {
|
|
| 109 |
+ return obj.(testObj).id, nil |
|
| 110 |
+ }, |
|
| 111 |
+ maxRetries: retries, |
|
| 112 |
+ retries: make(map[string]int), |
|
| 113 |
+ } |
|
| 114 |
+ |
|
| 115 |
+ objects := []testObj{
|
|
| 116 |
+ {"a", 1},
|
|
| 117 |
+ {"b", 2},
|
|
| 118 |
+ {"c", 3},
|
|
| 119 |
+ } |
|
| 120 |
+ |
|
| 121 |
+ // Retry one more than the max |
|
| 122 |
+ for _, obj := range objects {
|
|
| 123 |
+ for i := 0; i < retries+1; i++ {
|
|
| 124 |
+ manager.Retry(obj) |
|
| 125 |
+ } |
|
| 126 |
+ } |
|
| 127 |
+ |
|
| 128 |
+ // Should only have re-queued up to the max retry setting |
|
| 129 |
+ for _, obj := range objects {
|
|
| 130 |
+ if e, a := retries, requeued[obj.id]; e != a {
|
|
| 131 |
+ t.Fatalf("expected requeue count %d for obj %s, got %d", e, obj.id, a)
|
|
| 132 |
+ } |
|
| 133 |
+ } |
|
| 134 |
+ |
|
| 135 |
+ // Should have no more state since all objects were retried beyond max |
|
| 136 |
+ if e, a := 0, len(manager.retries); e != a {
|
|
| 137 |
+ t.Fatalf("expected retry len %d, got %d", e, a)
|
|
| 138 |
+ } |
|
| 139 |
+} |
|
| 140 |
+ |
|
| 141 |
+// This test ensures that when an asynchronous state update is received |
|
| 142 |
+// on the queue during failed event handling, that the updated state is |
|
| 143 |
+// retried, NOT the event that failed (which is now stale). |
|
| 144 |
+func TestRetryController_realFifoEventOrdering(t *testing.T) {
|
|
| 145 |
+ keyFunc := func(obj interface{}) (string, error) {
|
|
| 146 |
+ return obj.(testObj).id, nil |
|
| 147 |
+ } |
|
| 148 |
+ |
|
| 149 |
+ fifo := kcache.NewFIFO(keyFunc) |
|
| 150 |
+ |
|
| 151 |
+ wg := sync.WaitGroup{}
|
|
| 152 |
+ wg.Add(1) |
|
| 153 |
+ |
|
| 154 |
+ controller := &RetryController{
|
|
| 155 |
+ Queue: fifo, |
|
| 156 |
+ RetryManager: NewQueueRetryManager(fifo, keyFunc, 1), |
|
| 157 |
+ ShouldRetry: func(interface{}, error) bool {
|
|
| 158 |
+ return true |
|
| 159 |
+ }, |
|
| 160 |
+ Handle: func(obj interface{}) error {
|
|
| 161 |
+ if e, a := 1, obj.(testObj).value; e != a {
|
|
| 162 |
+ t.Fatalf("expected to handle test value %d, got %d")
|
|
| 163 |
+ } |
|
| 164 |
+ |
|
| 165 |
+ go func() {
|
|
| 166 |
+ fifo.Add(testObj{"a", 2})
|
|
| 167 |
+ wg.Done() |
|
| 168 |
+ }() |
|
| 169 |
+ wg.Wait() |
|
| 170 |
+ return fmt.Errorf("retryable error")
|
|
| 171 |
+ }, |
|
| 172 |
+ } |
|
| 173 |
+ |
|
| 174 |
+ fifo.Add(testObj{"a", 1})
|
|
| 175 |
+ controller.handleOne(fifo.Pop()) |
|
| 176 |
+ |
|
| 177 |
+ if e, a := 1, len(fifo.List()); e != a {
|
|
| 178 |
+ t.Fatalf("expected queue length %d, got %d", e, a)
|
|
| 179 |
+ } |
|
| 180 |
+ |
|
| 181 |
+ obj := fifo.Pop() |
|
| 182 |
+ if e, a := 2, obj.(testObj).value; e != a {
|
|
| 183 |
+ t.Fatalf("expected queued value %d, got %d", e, a)
|
|
| 184 |
+ } |
|
| 185 |
+} |
|
| 186 |
+ |
|
| 187 |
+type testObj struct {
|
|
| 188 |
+ id string |
|
| 189 |
+ value int |
|
| 190 |
+} |
|
| 191 |
+ |
|
| 192 |
+type testFifo struct {
|
|
| 193 |
+ AddIfNotPresentFunc func(interface{}) error
|
|
| 194 |
+ PopFunc func() interface{}
|
|
| 195 |
+} |
|
| 196 |
+ |
|
| 197 |
+func (t *testFifo) AddIfNotPresent(obj interface{}) error {
|
|
| 198 |
+ return t.AddIfNotPresentFunc(obj) |
|
| 199 |
+} |
|
| 200 |
+ |
|
| 201 |
+func (t *testFifo) Pop() interface{} {
|
|
| 202 |
+ return t.PopFunc() |
|
| 203 |
+} |
|
| 204 |
+ |
|
| 205 |
+type testRetryManager struct {
|
|
| 206 |
+ RetryFunc func(resource interface{})
|
|
| 207 |
+ ForgetFunc func(resource interface{})
|
|
| 208 |
+} |
|
| 209 |
+ |
|
| 210 |
+func (m *testRetryManager) Retry(resource interface{}) {
|
|
| 211 |
+ m.RetryFunc(resource) |
|
| 212 |
+} |
|
| 213 |
+ |
|
| 214 |
+func (m *testRetryManager) Forget(resource interface{}) {
|
|
| 215 |
+ m.ForgetFunc(resource) |
|
| 216 |
+} |
| ... | ... |
@@ -141,3 +141,8 @@ func (i *ChangeStrategyImpl) GenerateDeploymentConfig(namespace, name string) (* |
| 141 | 141 |
func (i *ChangeStrategyImpl) UpdateDeploymentConfig(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) {
|
| 142 | 142 |
return i.UpdateDeploymentConfigFunc(namespace, config) |
| 143 | 143 |
} |
| 144 |
+ |
|
| 145 |
+// labelFor builds a string identifier for a DeploymentConfig. |
|
| 146 |
+func labelFor(config *deployapi.DeploymentConfig) string {
|
|
| 147 |
+ return fmt.Sprintf("%s/%s:%d", config.Namespace, config.Name, config.LatestVersion)
|
|
| 148 |
+} |
| 144 | 149 |
deleted file mode 100644 |
| ... | ... |
@@ -1,114 +0,0 @@ |
| 1 |
-package controller |
|
| 2 |
- |
|
| 3 |
-import ( |
|
| 4 |
- "fmt" |
|
| 5 |
- |
|
| 6 |
- "github.com/golang/glog" |
|
| 7 |
- |
|
| 8 |
- kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" |
|
| 9 |
- "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" |
|
| 10 |
- "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" |
|
| 11 |
- "github.com/GoogleCloudPlatform/kubernetes/pkg/util" |
|
| 12 |
- |
|
| 13 |
- deployapi "github.com/openshift/origin/pkg/deploy/api" |
|
| 14 |
- deployutil "github.com/openshift/origin/pkg/deploy/util" |
|
| 15 |
-) |
|
| 16 |
- |
|
| 17 |
-// DeploymentConfigController is responsible for creating a deployment when a DeploymentConfig is |
|
| 18 |
-// updated with a new LatestVersion. Any deployment created is correlated to a DeploymentConfig |
|
| 19 |
-// by setting the DeploymentConfigLabel on the deployment. |
|
| 20 |
-// |
|
| 21 |
-// Deployments are represented by ReplicationControllers. The DeploymentConfig used to create the |
|
| 22 |
-// ReplicationController is encoded and stored in an annotation on the ReplicationController. |
|
| 23 |
-type DeploymentConfigController struct {
|
|
| 24 |
- // DeploymentClient provides access to Deployments. |
|
| 25 |
- DeploymentClient DeploymentConfigControllerDeploymentClient |
|
| 26 |
- // NextDeploymentConfig blocks until the next DeploymentConfig is available. |
|
| 27 |
- NextDeploymentConfig func() *deployapi.DeploymentConfig |
|
| 28 |
- // Codec is used to encode DeploymentConfigs which are stored on deployments. |
|
| 29 |
- Codec runtime.Codec |
|
| 30 |
- // Stop is an optional channel that controls when the controller exits. |
|
| 31 |
- Stop <-chan struct{}
|
|
| 32 |
-} |
|
| 33 |
- |
|
| 34 |
-// Run processes DeploymentConfigs one at a time until the Stop channel unblocks. |
|
| 35 |
-func (c *DeploymentConfigController) Run() {
|
|
| 36 |
- go util.Until(func() {
|
|
| 37 |
- err := c.HandleDeploymentConfig(c.NextDeploymentConfig()) |
|
| 38 |
- if err != nil {
|
|
| 39 |
- glog.Errorf("%v", err)
|
|
| 40 |
- } |
|
| 41 |
- }, 0, c.Stop) |
|
| 42 |
-} |
|
| 43 |
- |
|
| 44 |
-// HandleDeploymentConfig examines the current state of a DeploymentConfig, and creates a new |
|
| 45 |
-// deployment for the config if the following conditions are true: |
|
| 46 |
-// |
|
| 47 |
-// 1. The config version is greater than 0 |
|
| 48 |
-// 2. No deployment exists corresponding to the config's version |
|
| 49 |
-// |
|
| 50 |
-// If the config can't be processed, an error is returned. |
|
| 51 |
-func (c *DeploymentConfigController) HandleDeploymentConfig(config *deployapi.DeploymentConfig) error {
|
|
| 52 |
- // Only deploy when the version has advanced past 0. |
|
| 53 |
- if config.LatestVersion == 0 {
|
|
| 54 |
- glog.V(5).Infof("Waiting for first version of %s", labelFor(config))
|
|
| 55 |
- return nil |
|
| 56 |
- } |
|
| 57 |
- |
|
| 58 |
- // Find any existing deployment, and return if one already exists. |
|
| 59 |
- if deployment, err := c.DeploymentClient.GetDeployment(config.Namespace, deployutil.LatestDeploymentNameForConfig(config)); err != nil {
|
|
| 60 |
- if !errors.IsNotFound(err) {
|
|
| 61 |
- return fmt.Errorf("error looking up existing deployment for config %s: %v", labelFor(config), err)
|
|
| 62 |
- } |
|
| 63 |
- } else {
|
|
| 64 |
- // If there's an existing deployment, nothing needs to be done. |
|
| 65 |
- if deployment != nil {
|
|
| 66 |
- return nil |
|
| 67 |
- } |
|
| 68 |
- } |
|
| 69 |
- |
|
| 70 |
- // Try and build a deployment for the config. |
|
| 71 |
- deployment, err := deployutil.MakeDeployment(config, c.Codec) |
|
| 72 |
- if err != nil {
|
|
| 73 |
- return fmt.Errorf("couldn't make deployment from (potentially invalid) config %s: %v", labelFor(config), err)
|
|
| 74 |
- } |
|
| 75 |
- |
|
| 76 |
- // Create the deployment. |
|
| 77 |
- if _, err := c.DeploymentClient.CreateDeployment(config.Namespace, deployment); err == nil {
|
|
| 78 |
- glog.V(4).Infof("Created deployment for config %s", labelFor(config))
|
|
| 79 |
- return nil |
|
| 80 |
- } else {
|
|
| 81 |
- // If the deployment was already created, just move on. The cache could be stale, or another |
|
| 82 |
- // process could have already handled this update. |
|
| 83 |
- if errors.IsAlreadyExists(err) {
|
|
| 84 |
- glog.V(4).Infof("Deployment already exists for config %s", labelFor(config))
|
|
| 85 |
- return nil |
|
| 86 |
- } |
|
| 87 |
- return fmt.Errorf("couldn't create deployment for config %s: %v", labelFor(config), err)
|
|
| 88 |
- } |
|
| 89 |
-} |
|
| 90 |
- |
|
| 91 |
-// labelFor builds a string identifier for a DeploymentConfig. |
|
| 92 |
-func labelFor(config *deployapi.DeploymentConfig) string {
|
|
| 93 |
- return fmt.Sprintf("%s/%s:%d", config.Namespace, config.Name, config.LatestVersion)
|
|
| 94 |
-} |
|
| 95 |
- |
|
| 96 |
-// DeploymentConfigControllerDeploymentClient abstracts access to deployments. |
|
| 97 |
-type DeploymentConfigControllerDeploymentClient interface {
|
|
| 98 |
- GetDeployment(namespace, name string) (*kapi.ReplicationController, error) |
|
| 99 |
- CreateDeployment(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) |
|
| 100 |
-} |
|
| 101 |
- |
|
| 102 |
-// DeploymentConfigControllerDeploymentClientImpl is a pluggable deploymentConfigControllerDeploymentClient. |
|
| 103 |
-type DeploymentConfigControllerDeploymentClientImpl struct {
|
|
| 104 |
- GetDeploymentFunc func(namespace, name string) (*kapi.ReplicationController, error) |
|
| 105 |
- CreateDeploymentFunc func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) |
|
| 106 |
-} |
|
| 107 |
- |
|
| 108 |
-func (i *DeploymentConfigControllerDeploymentClientImpl) GetDeployment(namespace, name string) (*kapi.ReplicationController, error) {
|
|
| 109 |
- return i.GetDeploymentFunc(namespace, name) |
|
| 110 |
-} |
|
| 111 |
- |
|
| 112 |
-func (i *DeploymentConfigControllerDeploymentClientImpl) CreateDeployment(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) {
|
|
| 113 |
- return i.CreateDeploymentFunc(namespace, deployment) |
|
| 114 |
-} |
| 115 | 1 |
deleted file mode 100644 |
| ... | ... |
@@ -1,128 +0,0 @@ |
| 1 |
-package controller |
|
| 2 |
- |
|
| 3 |
-import ( |
|
| 4 |
- "fmt" |
|
| 5 |
- "testing" |
|
| 6 |
- |
|
| 7 |
- kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" |
|
| 8 |
- kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" |
|
| 9 |
- |
|
| 10 |
- api "github.com/openshift/origin/pkg/api/latest" |
|
| 11 |
- deploytest "github.com/openshift/origin/pkg/deploy/api/test" |
|
| 12 |
- deployutil "github.com/openshift/origin/pkg/deploy/util" |
|
| 13 |
-) |
|
| 14 |
- |
|
| 15 |
-func TestHandleNewDeploymentConfig(t *testing.T) {
|
|
| 16 |
- controller := &DeploymentConfigController{
|
|
| 17 |
- Codec: api.Codec, |
|
| 18 |
- DeploymentClient: &DeploymentConfigControllerDeploymentClientImpl{
|
|
| 19 |
- GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) {
|
|
| 20 |
- t.Fatalf("unexpected call with name %s", name)
|
|
| 21 |
- return nil, nil |
|
| 22 |
- }, |
|
| 23 |
- CreateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) {
|
|
| 24 |
- t.Fatalf("unexpected call with deployment %v", deployment)
|
|
| 25 |
- return nil, nil |
|
| 26 |
- }, |
|
| 27 |
- }, |
|
| 28 |
- } |
|
| 29 |
- |
|
| 30 |
- err := controller.HandleDeploymentConfig(deploytest.OkDeploymentConfig(0)) |
|
| 31 |
- |
|
| 32 |
- if err != nil {
|
|
| 33 |
- t.Fatalf("unexpected error: %v", err)
|
|
| 34 |
- } |
|
| 35 |
-} |
|
| 36 |
- |
|
| 37 |
-func TestHandleUpdatedDeploymentConfigOk(t *testing.T) {
|
|
| 38 |
- deploymentConfig := deploytest.OkDeploymentConfig(1) |
|
| 39 |
- var deployed *kapi.ReplicationController |
|
| 40 |
- |
|
| 41 |
- controller := &DeploymentConfigController{
|
|
| 42 |
- Codec: api.Codec, |
|
| 43 |
- DeploymentClient: &DeploymentConfigControllerDeploymentClientImpl{
|
|
| 44 |
- GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) {
|
|
| 45 |
- return nil, kerrors.NewNotFound("ReplicationController", name)
|
|
| 46 |
- }, |
|
| 47 |
- CreateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) {
|
|
| 48 |
- deployed = deployment |
|
| 49 |
- return deployment, nil |
|
| 50 |
- }, |
|
| 51 |
- }, |
|
| 52 |
- } |
|
| 53 |
- |
|
| 54 |
- err := controller.HandleDeploymentConfig(deploymentConfig) |
|
| 55 |
- |
|
| 56 |
- if deployed == nil {
|
|
| 57 |
- t.Fatalf("expected a deployment")
|
|
| 58 |
- } |
|
| 59 |
- |
|
| 60 |
- if err != nil {
|
|
| 61 |
- t.Fatalf("unexpected error: %v", err)
|
|
| 62 |
- } |
|
| 63 |
-} |
|
| 64 |
- |
|
| 65 |
-func TestHandleUpdatedDeploymentConfigLookupFailure(t *testing.T) {
|
|
| 66 |
- controller := &DeploymentConfigController{
|
|
| 67 |
- Codec: api.Codec, |
|
| 68 |
- DeploymentClient: &DeploymentConfigControllerDeploymentClientImpl{
|
|
| 69 |
- GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) {
|
|
| 70 |
- return nil, kerrors.NewInternalError(fmt.Errorf("test error"))
|
|
| 71 |
- }, |
|
| 72 |
- CreateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) {
|
|
| 73 |
- t.Fatalf("unexpected call with deployment %v", deployment)
|
|
| 74 |
- return nil, nil |
|
| 75 |
- }, |
|
| 76 |
- }, |
|
| 77 |
- } |
|
| 78 |
- |
|
| 79 |
- err := controller.HandleDeploymentConfig(deploytest.OkDeploymentConfig(1)) |
|
| 80 |
- |
|
| 81 |
- if err == nil {
|
|
| 82 |
- t.Fatalf("expected error")
|
|
| 83 |
- } |
|
| 84 |
-} |
|
| 85 |
- |
|
| 86 |
-func TestHandleUpdatedDeploymentConfigAlreadyDeployed(t *testing.T) {
|
|
| 87 |
- deploymentConfig := deploytest.OkDeploymentConfig(0) |
|
| 88 |
- |
|
| 89 |
- controller := &DeploymentConfigController{
|
|
| 90 |
- Codec: api.Codec, |
|
| 91 |
- DeploymentClient: &DeploymentConfigControllerDeploymentClientImpl{
|
|
| 92 |
- GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) {
|
|
| 93 |
- deployment, _ := deployutil.MakeDeployment(deploymentConfig, kapi.Codec) |
|
| 94 |
- return deployment, nil |
|
| 95 |
- }, |
|
| 96 |
- CreateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) {
|
|
| 97 |
- t.Fatalf("unexpected call to to create deployment: %v", deployment)
|
|
| 98 |
- return nil, nil |
|
| 99 |
- }, |
|
| 100 |
- }, |
|
| 101 |
- } |
|
| 102 |
- |
|
| 103 |
- err := controller.HandleDeploymentConfig(deploymentConfig) |
|
| 104 |
- |
|
| 105 |
- if err != nil {
|
|
| 106 |
- t.Fatalf("unexpected error: %v", err)
|
|
| 107 |
- } |
|
| 108 |
-} |
|
| 109 |
- |
|
| 110 |
-func TestHandleUpdatedDeploymentConfigError(t *testing.T) {
|
|
| 111 |
- controller := &DeploymentConfigController{
|
|
| 112 |
- Codec: api.Codec, |
|
| 113 |
- DeploymentClient: &DeploymentConfigControllerDeploymentClientImpl{
|
|
| 114 |
- GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) {
|
|
| 115 |
- return nil, kerrors.NewNotFound("ReplicationController", name)
|
|
| 116 |
- }, |
|
| 117 |
- CreateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) {
|
|
| 118 |
- return nil, kerrors.NewInternalError(fmt.Errorf("test error"))
|
|
| 119 |
- }, |
|
| 120 |
- }, |
|
| 121 |
- } |
|
| 122 |
- |
|
| 123 |
- err := controller.HandleDeploymentConfig(deploytest.OkDeploymentConfig(1)) |
|
| 124 |
- |
|
| 125 |
- if err == nil {
|
|
| 126 |
- t.Fatalf("expected error")
|
|
| 127 |
- } |
|
| 128 |
-} |
| 129 | 1 |
new file mode 100644 |
| ... | ... |
@@ -0,0 +1,103 @@ |
| 0 |
+package deploymentconfig |
|
| 1 |
+ |
|
| 2 |
+import ( |
|
| 3 |
+ "fmt" |
|
| 4 |
+ |
|
| 5 |
+ "github.com/golang/glog" |
|
| 6 |
+ |
|
| 7 |
+ kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" |
|
| 8 |
+ "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" |
|
| 9 |
+ |
|
| 10 |
+ deployapi "github.com/openshift/origin/pkg/deploy/api" |
|
| 11 |
+ deployutil "github.com/openshift/origin/pkg/deploy/util" |
|
| 12 |
+) |
|
| 13 |
+ |
|
| 14 |
+// DeploymentConfigController is responsible for creating a new deployment when: |
|
| 15 |
+// |
|
| 16 |
+// 1. The config version is > 0 and, |
|
| 17 |
+// 2. No existing deployment for that version exists. |
|
| 18 |
+// |
|
| 19 |
+// The responsibility of constructing a new deployment resource from a config |
|
| 20 |
+// is delegated. See util.MakeDeployment for more details. |
|
| 21 |
+// |
|
| 22 |
+// Use the DeploymentConfigControllerFactory to create this controller. |
|
| 23 |
+type DeploymentConfigController struct {
|
|
| 24 |
+ // deploymentClient provides access to Deployments. |
|
| 25 |
+ deploymentClient deploymentClient |
|
| 26 |
+ // makeDeployment creates a Deployment from a DeploymentConfig. |
|
| 27 |
+ makeDeployment makeDeployment |
|
| 28 |
+} |
|
| 29 |
+ |
|
| 30 |
+// fatalError is an error which can't be retried. |
|
| 31 |
+type fatalError string |
|
| 32 |
+ |
|
| 33 |
+func (e fatalError) Error() string { return "fatal error handling deploymentConfig: " + string(e) }
|
|
| 34 |
+ |
|
| 35 |
+// Handle creates a new deployment for config as necessary. |
|
| 36 |
+func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig) error {
|
|
| 37 |
+ // Only deploy when the version has advanced past 0. |
|
| 38 |
+ if config.LatestVersion == 0 {
|
|
| 39 |
+ glog.V(5).Infof("Waiting for first version of %s", labelFor(config))
|
|
| 40 |
+ return nil |
|
| 41 |
+ } |
|
| 42 |
+ |
|
| 43 |
+ // Find any existing deployment, and return if one already exists. |
|
| 44 |
+ if deployment, err := c.deploymentClient.getDeployment(config.Namespace, deployutil.LatestDeploymentNameForConfig(config)); err != nil {
|
|
| 45 |
+ if !errors.IsNotFound(err) {
|
|
| 46 |
+ return fmt.Errorf("couldn't get deployment for config %s: %v", labelFor(config), err)
|
|
| 47 |
+ } |
|
| 48 |
+ } else {
|
|
| 49 |
+ // If there's an existing deployment, nothing needs to be done. |
|
| 50 |
+ if deployment != nil {
|
|
| 51 |
+ return nil |
|
| 52 |
+ } |
|
| 53 |
+ } |
|
| 54 |
+ |
|
| 55 |
+ // Try and build a deployment for the config. |
|
| 56 |
+ deployment, err := c.makeDeployment(config) |
|
| 57 |
+ if err != nil {
|
|
| 58 |
+ return fatalError(fmt.Sprintf("couldn't make deployment from (potentially invalid) config %s: %v", labelFor(config), err))
|
|
| 59 |
+ } |
|
| 60 |
+ |
|
| 61 |
+ // Create the deployment. |
|
| 62 |
+ if _, err := c.deploymentClient.createDeployment(config.Namespace, deployment); err == nil {
|
|
| 63 |
+ glog.V(4).Infof("Created deployment for config %s", labelFor(config))
|
|
| 64 |
+ return nil |
|
| 65 |
+ } else {
|
|
| 66 |
+ // If the deployment was already created, just move on. The cache could be stale, or another |
|
| 67 |
+ // process could have already handled this update. |
|
| 68 |
+ if errors.IsAlreadyExists(err) {
|
|
| 69 |
+ glog.V(4).Infof("Deployment already exists for config %s", labelFor(config))
|
|
| 70 |
+ return nil |
|
| 71 |
+ } |
|
| 72 |
+ return fmt.Errorf("couldn't create deployment for config %s: %v", labelFor(config), err)
|
|
| 73 |
+ } |
|
| 74 |
+} |
|
| 75 |
+ |
|
| 76 |
+// labelFor builds a string identifier for a DeploymentConfig. |
|
| 77 |
+func labelFor(config *deployapi.DeploymentConfig) string {
|
|
| 78 |
+ return fmt.Sprintf("%s/%s:%d", config.Namespace, config.Name, config.LatestVersion)
|
|
| 79 |
+} |
|
| 80 |
+ |
|
| 81 |
+// deploymentClient abstracts access to deployments. |
|
| 82 |
+type deploymentClient interface {
|
|
| 83 |
+ getDeployment(namespace, name string) (*kapi.ReplicationController, error) |
|
| 84 |
+ createDeployment(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) |
|
| 85 |
+} |
|
| 86 |
+ |
|
| 87 |
+// deploymentClientImpl is a pluggable deploymentClient. |
|
| 88 |
+type deploymentClientImpl struct {
|
|
| 89 |
+ getDeploymentFunc func(namespace, name string) (*kapi.ReplicationController, error) |
|
| 90 |
+ createDeploymentFunc func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) |
|
| 91 |
+} |
|
| 92 |
+ |
|
| 93 |
+func (i *deploymentClientImpl) getDeployment(namespace, name string) (*kapi.ReplicationController, error) {
|
|
| 94 |
+ return i.getDeploymentFunc(namespace, name) |
|
| 95 |
+} |
|
| 96 |
+ |
|
| 97 |
+func (i *deploymentClientImpl) createDeployment(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) {
|
|
| 98 |
+ return i.createDeploymentFunc(namespace, deployment) |
|
| 99 |
+} |
|
| 100 |
+ |
|
| 101 |
+// makeDeployment knows how to make a deployment from a config. |
|
| 102 |
+type makeDeployment func(*deployapi.DeploymentConfig) (*kapi.ReplicationController, error) |
| 0 | 103 |
new file mode 100644 |
| ... | ... |
@@ -0,0 +1,168 @@ |
| 0 |
+package deploymentconfig |
|
| 1 |
+ |
|
| 2 |
+import ( |
|
| 3 |
+ "fmt" |
|
| 4 |
+ "testing" |
|
| 5 |
+ |
|
| 6 |
+ kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" |
|
| 7 |
+ kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" |
|
| 8 |
+ |
|
| 9 |
+ api "github.com/openshift/origin/pkg/api/latest" |
|
| 10 |
+ deployapi "github.com/openshift/origin/pkg/deploy/api" |
|
| 11 |
+ deploytest "github.com/openshift/origin/pkg/deploy/api/test" |
|
| 12 |
+ deployutil "github.com/openshift/origin/pkg/deploy/util" |
|
| 13 |
+) |
|
| 14 |
+ |
|
| 15 |
+func TestHandle_initialOk(t *testing.T) {
|
|
| 16 |
+ controller := &DeploymentConfigController{
|
|
| 17 |
+ makeDeployment: func(config *deployapi.DeploymentConfig) (*kapi.ReplicationController, error) {
|
|
| 18 |
+ return deployutil.MakeDeployment(config, api.Codec) |
|
| 19 |
+ }, |
|
| 20 |
+ deploymentClient: &deploymentClientImpl{
|
|
| 21 |
+ getDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) {
|
|
| 22 |
+ t.Fatalf("unexpected call with name %s", name)
|
|
| 23 |
+ return nil, nil |
|
| 24 |
+ }, |
|
| 25 |
+ createDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) {
|
|
| 26 |
+ t.Fatalf("unexpected call with deployment %v", deployment)
|
|
| 27 |
+ return nil, nil |
|
| 28 |
+ }, |
|
| 29 |
+ }, |
|
| 30 |
+ } |
|
| 31 |
+ |
|
| 32 |
+ err := controller.Handle(deploytest.OkDeploymentConfig(0)) |
|
| 33 |
+ |
|
| 34 |
+ if err != nil {
|
|
| 35 |
+ t.Fatalf("unexpected error: %v", err)
|
|
| 36 |
+ } |
|
| 37 |
+} |
|
| 38 |
+ |
|
| 39 |
+func TestHandle_updateOk(t *testing.T) {
|
|
| 40 |
+ deploymentConfig := deploytest.OkDeploymentConfig(1) |
|
| 41 |
+ var deployed *kapi.ReplicationController |
|
| 42 |
+ |
|
| 43 |
+ controller := &DeploymentConfigController{
|
|
| 44 |
+ makeDeployment: func(config *deployapi.DeploymentConfig) (*kapi.ReplicationController, error) {
|
|
| 45 |
+ return deployutil.MakeDeployment(config, api.Codec) |
|
| 46 |
+ }, |
|
| 47 |
+ deploymentClient: &deploymentClientImpl{
|
|
| 48 |
+ getDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) {
|
|
| 49 |
+ return nil, kerrors.NewNotFound("ReplicationController", name)
|
|
| 50 |
+ }, |
|
| 51 |
+ createDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) {
|
|
| 52 |
+ deployed = deployment |
|
| 53 |
+ return deployment, nil |
|
| 54 |
+ }, |
|
| 55 |
+ }, |
|
| 56 |
+ } |
|
| 57 |
+ |
|
| 58 |
+ err := controller.Handle(deploymentConfig) |
|
| 59 |
+ |
|
| 60 |
+ if deployed == nil {
|
|
| 61 |
+ t.Fatalf("expected a deployment")
|
|
| 62 |
+ } |
|
| 63 |
+ |
|
| 64 |
+ if err != nil {
|
|
| 65 |
+ t.Fatalf("unexpected error: %v", err)
|
|
| 66 |
+ } |
|
| 67 |
+} |
|
| 68 |
+ |
|
| 69 |
+func TestHandle_nonfatalLookupError(t *testing.T) {
|
|
| 70 |
+ configController := &DeploymentConfigController{
|
|
| 71 |
+ makeDeployment: func(config *deployapi.DeploymentConfig) (*kapi.ReplicationController, error) {
|
|
| 72 |
+ return deployutil.MakeDeployment(config, api.Codec) |
|
| 73 |
+ }, |
|
| 74 |
+ deploymentClient: &deploymentClientImpl{
|
|
| 75 |
+ getDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) {
|
|
| 76 |
+ return nil, kerrors.NewInternalError(fmt.Errorf("fatal test error"))
|
|
| 77 |
+ }, |
|
| 78 |
+ createDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) {
|
|
| 79 |
+ t.Fatalf("unexpected call with deployment %v", deployment)
|
|
| 80 |
+ return nil, nil |
|
| 81 |
+ }, |
|
| 82 |
+ }, |
|
| 83 |
+ } |
|
| 84 |
+ |
|
| 85 |
+ err := configController.Handle(deploytest.OkDeploymentConfig(1)) |
|
| 86 |
+ if err == nil {
|
|
| 87 |
+ t.Fatalf("expected error")
|
|
| 88 |
+ } |
|
| 89 |
+ if _, isFatal := err.(fatalError); isFatal {
|
|
| 90 |
+ t.Fatalf("expected a retryable error, got a fatal error: %v", err)
|
|
| 91 |
+ } |
|
| 92 |
+} |
|
| 93 |
+ |
|
| 94 |
+func TestHandle_configAlreadyDeployed(t *testing.T) {
|
|
| 95 |
+ deploymentConfig := deploytest.OkDeploymentConfig(0) |
|
| 96 |
+ |
|
| 97 |
+ controller := &DeploymentConfigController{
|
|
| 98 |
+ makeDeployment: func(config *deployapi.DeploymentConfig) (*kapi.ReplicationController, error) {
|
|
| 99 |
+ return deployutil.MakeDeployment(config, api.Codec) |
|
| 100 |
+ }, |
|
| 101 |
+ deploymentClient: &deploymentClientImpl{
|
|
| 102 |
+ getDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) {
|
|
| 103 |
+ deployment, _ := deployutil.MakeDeployment(deploymentConfig, kapi.Codec) |
|
| 104 |
+ return deployment, nil |
|
| 105 |
+ }, |
|
| 106 |
+ createDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) {
|
|
| 107 |
+ t.Fatalf("unexpected call to to create deployment: %v", deployment)
|
|
| 108 |
+ return nil, nil |
|
| 109 |
+ }, |
|
| 110 |
+ }, |
|
| 111 |
+ } |
|
| 112 |
+ |
|
| 113 |
+ err := controller.Handle(deploymentConfig) |
|
| 114 |
+ |
|
| 115 |
+ if err != nil {
|
|
| 116 |
+ t.Fatalf("unexpected error: %v", err)
|
|
| 117 |
+ } |
|
| 118 |
+} |
|
| 119 |
+ |
|
| 120 |
+func TestHandle_nonfatalCreateError(t *testing.T) {
|
|
| 121 |
+ configController := &DeploymentConfigController{
|
|
| 122 |
+ makeDeployment: func(config *deployapi.DeploymentConfig) (*kapi.ReplicationController, error) {
|
|
| 123 |
+ return deployutil.MakeDeployment(config, api.Codec) |
|
| 124 |
+ }, |
|
| 125 |
+ deploymentClient: &deploymentClientImpl{
|
|
| 126 |
+ getDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) {
|
|
| 127 |
+ return nil, kerrors.NewNotFound("ReplicationController", name)
|
|
| 128 |
+ }, |
|
| 129 |
+ createDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) {
|
|
| 130 |
+ return nil, kerrors.NewInternalError(fmt.Errorf("test error"))
|
|
| 131 |
+ }, |
|
| 132 |
+ }, |
|
| 133 |
+ } |
|
| 134 |
+ |
|
| 135 |
+ err := configController.Handle(deploytest.OkDeploymentConfig(1)) |
|
| 136 |
+ if err == nil {
|
|
| 137 |
+ t.Fatalf("expected error")
|
|
| 138 |
+ } |
|
| 139 |
+ if _, isFatal := err.(fatalError); isFatal {
|
|
| 140 |
+ t.Fatalf("expected a retryable error, got a fatal error: %v", err)
|
|
| 141 |
+ } |
|
| 142 |
+} |
|
| 143 |
+ |
|
| 144 |
+func TestHandle_fatalError(t *testing.T) {
|
|
| 145 |
+ configController := &DeploymentConfigController{
|
|
| 146 |
+ makeDeployment: func(config *deployapi.DeploymentConfig) (*kapi.ReplicationController, error) {
|
|
| 147 |
+ return nil, fmt.Errorf("couldn't make deployment")
|
|
| 148 |
+ }, |
|
| 149 |
+ deploymentClient: &deploymentClientImpl{
|
|
| 150 |
+ getDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) {
|
|
| 151 |
+ return nil, kerrors.NewNotFound("ReplicationController", name)
|
|
| 152 |
+ }, |
|
| 153 |
+ createDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) {
|
|
| 154 |
+ t.Fatalf("unexpected call to create")
|
|
| 155 |
+ return nil, kerrors.NewInternalError(fmt.Errorf("test error"))
|
|
| 156 |
+ }, |
|
| 157 |
+ }, |
|
| 158 |
+ } |
|
| 159 |
+ |
|
| 160 |
+ err := configController.Handle(deploytest.OkDeploymentConfig(1)) |
|
| 161 |
+ if err == nil {
|
|
| 162 |
+ t.Fatalf("expected error")
|
|
| 163 |
+ } |
|
| 164 |
+ if _, isFatal := err.(fatalError); !isFatal {
|
|
| 165 |
+ t.Fatalf("expected a fatal error, got: %v", err)
|
|
| 166 |
+ } |
|
| 167 |
+} |
| 0 | 168 |
new file mode 100644 |
| ... | ... |
@@ -0,0 +1,67 @@ |
| 0 |
+package deploymentconfig |
|
| 1 |
+ |
|
| 2 |
+import ( |
|
| 3 |
+ kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" |
|
| 4 |
+ kclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client" |
|
| 5 |
+ "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" |
|
| 6 |
+ "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" |
|
| 7 |
+ "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" |
|
| 8 |
+ kutil "github.com/GoogleCloudPlatform/kubernetes/pkg/util" |
|
| 9 |
+ "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" |
|
| 10 |
+ |
|
| 11 |
+ osclient "github.com/openshift/origin/pkg/client" |
|
| 12 |
+ controller "github.com/openshift/origin/pkg/controller" |
|
| 13 |
+ deployapi "github.com/openshift/origin/pkg/deploy/api" |
|
| 14 |
+ deployutil "github.com/openshift/origin/pkg/deploy/util" |
|
| 15 |
+) |
|
| 16 |
+ |
|
| 17 |
+// DeploymentConfigControllerFactory can create a DeploymentConfigController which obtains |
|
| 18 |
+// DeploymentConfigs from a queue populated from a watch of all DeploymentConfigs. |
|
| 19 |
+type DeploymentConfigControllerFactory struct {
|
|
| 20 |
+ Client *osclient.Client |
|
| 21 |
+ KubeClient kclient.Interface |
|
| 22 |
+ Codec runtime.Codec |
|
| 23 |
+} |
|
| 24 |
+ |
|
| 25 |
+func (factory *DeploymentConfigControllerFactory) Create() controller.RunnableController {
|
|
| 26 |
+ deploymentConfigLW := &deployutil.ListWatcherImpl{
|
|
| 27 |
+ ListFunc: func() (runtime.Object, error) {
|
|
| 28 |
+ return factory.Client.DeploymentConfigs(kapi.NamespaceAll).List(labels.Everything(), labels.Everything()) |
|
| 29 |
+ }, |
|
| 30 |
+ WatchFunc: func(resourceVersion string) (watch.Interface, error) {
|
|
| 31 |
+ return factory.Client.DeploymentConfigs(kapi.NamespaceAll).Watch(labels.Everything(), labels.Everything(), resourceVersion) |
|
| 32 |
+ }, |
|
| 33 |
+ } |
|
| 34 |
+ queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) |
|
| 35 |
+ cache.NewReflector(deploymentConfigLW, &deployapi.DeploymentConfig{}, queue).Run()
|
|
| 36 |
+ |
|
| 37 |
+ configController := &DeploymentConfigController{
|
|
| 38 |
+ deploymentClient: &deploymentClientImpl{
|
|
| 39 |
+ getDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) {
|
|
| 40 |
+ return factory.KubeClient.ReplicationControllers(namespace).Get(name) |
|
| 41 |
+ }, |
|
| 42 |
+ createDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) {
|
|
| 43 |
+ return factory.KubeClient.ReplicationControllers(namespace).Create(deployment) |
|
| 44 |
+ }, |
|
| 45 |
+ }, |
|
| 46 |
+ makeDeployment: func(config *deployapi.DeploymentConfig) (*kapi.ReplicationController, error) {
|
|
| 47 |
+ return deployutil.MakeDeployment(config, factory.Codec) |
|
| 48 |
+ }, |
|
| 49 |
+ } |
|
| 50 |
+ |
|
| 51 |
+ return &controller.RetryController{
|
|
| 52 |
+ Queue: queue, |
|
| 53 |
+ RetryManager: controller.NewQueueRetryManager(queue, cache.MetaNamespaceKeyFunc, 1), |
|
| 54 |
+ ShouldRetry: func(obj interface{}, err error) bool {
|
|
| 55 |
+ if _, isFatal := err.(fatalError); isFatal {
|
|
| 56 |
+ return false |
|
| 57 |
+ } |
|
| 58 |
+ kutil.HandleError(err) |
|
| 59 |
+ return true |
|
| 60 |
+ }, |
|
| 61 |
+ Handle: func(obj interface{}) error {
|
|
| 62 |
+ config := obj.(*deployapi.DeploymentConfig) |
|
| 63 |
+ return configController.Handle(config) |
|
| 64 |
+ }, |
|
| 65 |
+ } |
|
| 66 |
+} |
| ... | ... |
@@ -15,51 +15,11 @@ import ( |
| 15 | 15 |
|
| 16 | 16 |
osclient "github.com/openshift/origin/pkg/client" |
| 17 | 17 |
deployapi "github.com/openshift/origin/pkg/deploy/api" |
| 18 |
- controller "github.com/openshift/origin/pkg/deploy/controller" |
|
| 18 |
+ deploycontroller "github.com/openshift/origin/pkg/deploy/controller" |
|
| 19 | 19 |
deployutil "github.com/openshift/origin/pkg/deploy/util" |
| 20 | 20 |
imageapi "github.com/openshift/origin/pkg/image/api" |
| 21 | 21 |
) |
| 22 | 22 |
|
| 23 |
-// DeploymentConfigControllerFactory can create a DeploymentConfigController which obtains |
|
| 24 |
-// DeploymentConfigs from a queue populated from a watch of all DeploymentConfigs. |
|
| 25 |
-type DeploymentConfigControllerFactory struct {
|
|
| 26 |
- Client *osclient.Client |
|
| 27 |
- KubeClient kclient.Interface |
|
| 28 |
- Codec runtime.Codec |
|
| 29 |
- Stop <-chan struct{}
|
|
| 30 |
-} |
|
| 31 |
- |
|
| 32 |
-func (factory *DeploymentConfigControllerFactory) Create() *controller.DeploymentConfigController {
|
|
| 33 |
- deploymentConfigLW := &deployutil.ListWatcherImpl{
|
|
| 34 |
- ListFunc: func() (runtime.Object, error) {
|
|
| 35 |
- return factory.Client.DeploymentConfigs(kapi.NamespaceAll).List(labels.Everything(), labels.Everything()) |
|
| 36 |
- }, |
|
| 37 |
- WatchFunc: func(resourceVersion string) (watch.Interface, error) {
|
|
| 38 |
- return factory.Client.DeploymentConfigs(kapi.NamespaceAll).Watch(labels.Everything(), labels.Everything(), resourceVersion) |
|
| 39 |
- }, |
|
| 40 |
- } |
|
| 41 |
- queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) |
|
| 42 |
- cache.NewReflector(deploymentConfigLW, &deployapi.DeploymentConfig{}, queue).RunUntil(factory.Stop)
|
|
| 43 |
- |
|
| 44 |
- return &controller.DeploymentConfigController{
|
|
| 45 |
- DeploymentClient: &controller.DeploymentConfigControllerDeploymentClientImpl{
|
|
| 46 |
- GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) {
|
|
| 47 |
- return factory.KubeClient.ReplicationControllers(namespace).Get(name) |
|
| 48 |
- }, |
|
| 49 |
- CreateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) {
|
|
| 50 |
- return factory.KubeClient.ReplicationControllers(namespace).Create(deployment) |
|
| 51 |
- }, |
|
| 52 |
- }, |
|
| 53 |
- NextDeploymentConfig: func() *deployapi.DeploymentConfig {
|
|
| 54 |
- config := queue.Pop().(*deployapi.DeploymentConfig) |
|
| 55 |
- panicIfStopped(factory.Stop, "deployment config controller stopped") |
|
| 56 |
- return config |
|
| 57 |
- }, |
|
| 58 |
- Codec: factory.Codec, |
|
| 59 |
- Stop: factory.Stop, |
|
| 60 |
- } |
|
| 61 |
-} |
|
| 62 |
- |
|
| 63 | 23 |
// DeploymentControllerFactory can create a DeploymentController which obtains Deployments |
| 64 | 24 |
// from a queue populated from a watch of Deployments. |
| 65 | 25 |
// Pods are obtained from a queue populated from a watch of all pods. |
| ... | ... |
@@ -78,7 +38,7 @@ type DeploymentControllerFactory struct {
|
| 78 | 78 |
Stop <-chan struct{}
|
| 79 | 79 |
} |
| 80 | 80 |
|
| 81 |
-func (factory *DeploymentControllerFactory) Create() *controller.DeploymentController {
|
|
| 81 |
+func (factory *DeploymentControllerFactory) Create() *deploycontroller.DeploymentController {
|
|
| 82 | 82 |
deploymentLW := &deployutil.ListWatcherImpl{
|
| 83 | 83 |
ListFunc: func() (runtime.Object, error) {
|
| 84 | 84 |
return factory.KubeClient.ReplicationControllers(kapi.NamespaceAll).List(labels.Everything()) |
| ... | ... |
@@ -107,9 +67,9 @@ func (factory *DeploymentControllerFactory) Create() *controller.DeploymentContr |
| 107 | 107 |
} |
| 108 | 108 |
cache.NewPoller(pollFunc, 10*time.Second, podQueue).RunUntil(factory.Stop) |
| 109 | 109 |
|
| 110 |
- return &controller.DeploymentController{
|
|
| 110 |
+ return &deploycontroller.DeploymentController{
|
|
| 111 | 111 |
ContainerCreator: &defaultContainerCreator{factory.RecreateStrategyImage},
|
| 112 |
- DeploymentClient: &controller.DeploymentControllerDeploymentClientImpl{
|
|
| 112 |
+ DeploymentClient: &deploycontroller.DeploymentControllerDeploymentClientImpl{
|
|
| 113 | 113 |
// Since we need to use a deployment cache to support the pod poller, go ahead and use |
| 114 | 114 |
// it for other deployment lookups and maintain the usual REST API for not-found errors. |
| 115 | 115 |
GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) {
|
| ... | ... |
@@ -131,7 +91,7 @@ func (factory *DeploymentControllerFactory) Create() *controller.DeploymentContr |
| 131 | 131 |
return factory.KubeClient.ReplicationControllers(namespace).Update(deployment) |
| 132 | 132 |
}, |
| 133 | 133 |
}, |
| 134 |
- PodClient: &controller.DeploymentControllerPodClientImpl{
|
|
| 134 |
+ PodClient: &deploycontroller.DeploymentControllerPodClientImpl{
|
|
| 135 | 135 |
CreatePodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) {
|
| 136 | 136 |
return factory.KubeClient.Pods(namespace).Create(pod) |
| 137 | 137 |
}, |
| ... | ... |
@@ -240,7 +200,7 @@ type DeploymentConfigChangeControllerFactory struct {
|
| 240 | 240 |
Stop <-chan struct{}
|
| 241 | 241 |
} |
| 242 | 242 |
|
| 243 |
-func (factory *DeploymentConfigChangeControllerFactory) Create() *controller.DeploymentConfigChangeController {
|
|
| 243 |
+func (factory *DeploymentConfigChangeControllerFactory) Create() *deploycontroller.DeploymentConfigChangeController {
|
|
| 244 | 244 |
deploymentConfigLW := &deployutil.ListWatcherImpl{
|
| 245 | 245 |
ListFunc: func() (runtime.Object, error) {
|
| 246 | 246 |
return factory.Client.DeploymentConfigs(kapi.NamespaceAll).List(labels.Everything(), labels.Everything()) |
| ... | ... |
@@ -252,8 +212,8 @@ func (factory *DeploymentConfigChangeControllerFactory) Create() *controller.Dep |
| 252 | 252 |
queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) |
| 253 | 253 |
cache.NewReflector(deploymentConfigLW, &deployapi.DeploymentConfig{}, queue).RunUntil(factory.Stop)
|
| 254 | 254 |
|
| 255 |
- return &controller.DeploymentConfigChangeController{
|
|
| 256 |
- ChangeStrategy: &controller.ChangeStrategyImpl{
|
|
| 255 |
+ return &deploycontroller.DeploymentConfigChangeController{
|
|
| 256 |
+ ChangeStrategy: &deploycontroller.ChangeStrategyImpl{
|
|
| 257 | 257 |
GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) {
|
| 258 | 258 |
return factory.KubeClient.ReplicationControllers(namespace).Get(name) |
| 259 | 259 |
}, |
| ... | ... |
@@ -282,7 +242,7 @@ type ImageChangeControllerFactory struct {
|
| 282 | 282 |
Stop <-chan struct{}
|
| 283 | 283 |
} |
| 284 | 284 |
|
| 285 |
-func (factory *ImageChangeControllerFactory) Create() *controller.ImageChangeController {
|
|
| 285 |
+func (factory *ImageChangeControllerFactory) Create() *deploycontroller.ImageChangeController {
|
|
| 286 | 286 |
imageRepositoryLW := &deployutil.ListWatcherImpl{
|
| 287 | 287 |
ListFunc: func() (runtime.Object, error) {
|
| 288 | 288 |
return factory.Client.ImageRepositories(kapi.NamespaceAll).List(labels.Everything(), labels.Everything()) |
| ... | ... |
@@ -305,8 +265,8 @@ func (factory *ImageChangeControllerFactory) Create() *controller.ImageChangeCon |
| 305 | 305 |
store := cache.NewStore(cache.MetaNamespaceKeyFunc) |
| 306 | 306 |
cache.NewReflector(deploymentConfigLW, &deployapi.DeploymentConfig{}, store).RunUntil(factory.Stop)
|
| 307 | 307 |
|
| 308 |
- return &controller.ImageChangeController{
|
|
| 309 |
- DeploymentConfigClient: &controller.ImageChangeControllerDeploymentConfigClientImpl{
|
|
| 308 |
+ return &deploycontroller.ImageChangeController{
|
|
| 309 |
+ DeploymentConfigClient: &deploycontroller.ImageChangeControllerDeploymentConfigClientImpl{
|
|
| 310 | 310 |
ListDeploymentConfigsFunc: func() ([]*deployapi.DeploymentConfig, error) {
|
| 311 | 311 |
configs := []*deployapi.DeploymentConfig{}
|
| 312 | 312 |
objs := store.List() |
| ... | ... |
@@ -28,6 +28,7 @@ import ( |
| 28 | 28 |
buildetcd "github.com/openshift/origin/pkg/build/registry/etcd" |
| 29 | 29 |
osclient "github.com/openshift/origin/pkg/client" |
| 30 | 30 |
deployapi "github.com/openshift/origin/pkg/deploy/api" |
| 31 |
+ deployconfigcontroller "github.com/openshift/origin/pkg/deploy/controller/deploymentconfig" |
|
| 31 | 32 |
deploycontrollerfactory "github.com/openshift/origin/pkg/deploy/controller/factory" |
| 32 | 33 |
deployconfiggenerator "github.com/openshift/origin/pkg/deploy/generator" |
| 33 | 34 |
deployregistry "github.com/openshift/origin/pkg/deploy/registry/deploy" |
| ... | ... |
@@ -376,11 +377,10 @@ func NewTestOpenshift(t *testing.T) *testOpenshift {
|
| 376 | 376 |
|
| 377 | 377 |
apiserver.NewAPIGroupVersion(storage, v1beta1.Codec, "/osapi", "v1beta1", interfaces.MetadataAccessor, admit.NewAlwaysAdmit(), kapi.NewRequestContextMapper(), latest.RESTMapper).InstallREST(handlerContainer, "/osapi", "v1beta1") |
| 378 | 378 |
|
| 379 |
- dccFactory := deploycontrollerfactory.DeploymentConfigControllerFactory{
|
|
| 379 |
+ dccFactory := deployconfigcontroller.DeploymentConfigControllerFactory{
|
|
| 380 | 380 |
Client: osClient, |
| 381 | 381 |
KubeClient: kubeClient, |
| 382 | 382 |
Codec: latest.Codec, |
| 383 |
- Stop: openshift.stop, |
|
| 384 | 383 |
} |
| 385 | 384 |
dccFactory.Create().Run() |
| 386 | 385 |
|