Browse code

Added stop channel to watched pods to prevent thread leak

markturansky authored on 2015/06/11 06:48:07
Showing 4 changed files
... ...
@@ -54,8 +54,8 @@ func NewRecreateDeploymentStrategy(client kclient.Interface, codec runtime.Codec
54 54
 				CreatePodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) {
55 55
 					return client.Pods(namespace).Create(pod)
56 56
 				},
57
-				PodWatchFunc: func(namespace, name, resourceVersion string) func() *kapi.Pod {
58
-					return stratsupport.NewPodWatch(client, namespace, name, resourceVersion)
57
+				PodWatchFunc: func(namespace, name, resourceVersion string, stopChannel chan struct{}) func() *kapi.Pod {
58
+					return stratsupport.NewPodWatch(client, namespace, name, resourceVersion, stopChannel)
59 59
 				},
60 60
 			},
61 61
 		},
... ...
@@ -105,8 +105,8 @@ func NewRollingDeploymentStrategy(namespace string, client kclient.Interface, co
105 105
 				CreatePodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) {
106 106
 					return client.Pods(namespace).Create(pod)
107 107
 				},
108
-				PodWatchFunc: func(namespace, name, resourceVersion string) func() *kapi.Pod {
109
-					return stratsupport.NewPodWatch(client, namespace, name, resourceVersion)
108
+				PodWatchFunc: func(namespace, name, resourceVersion string, stopChannel chan struct{}) func() *kapi.Pod {
109
+					return stratsupport.NewPodWatch(client, namespace, name, resourceVersion, stopChannel)
110 110
 				},
111 111
 			},
112 112
 		},
... ...
@@ -78,8 +78,10 @@ func (e *HookExecutor) executeExecNewPod(hook *deployapi.LifecycleHook, deployme
78 78
 		glog.V(0).Infof("Created lifecycle pod %s for deployment %s", pod.Name, deployutil.LabelForDeployment(deployment))
79 79
 	}
80 80
 
81
-	// Wait for the pod to finish.
82
-	nextPod := e.PodClient.PodWatch(pod.Namespace, pod.Name, pod.ResourceVersion)
81
+	stopChannel := make(chan struct{})
82
+	defer close(stopChannel)
83
+	nextPod := e.PodClient.PodWatch(pod.Namespace, pod.Name, pod.ResourceVersion, stopChannel)
84
+
83 85
 	glog.V(0).Infof("Waiting for hook pod %s/%s to complete", pod.Namespace, pod.Name)
84 86
 	for {
85 87
 		pod := nextPod()
... ...
@@ -167,26 +169,28 @@ func makeHookPod(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationCont
167 167
 // HookExecutorPodClient abstracts access to pods.
168 168
 type HookExecutorPodClient interface {
169 169
 	CreatePod(namespace string, pod *kapi.Pod) (*kapi.Pod, error)
170
-	PodWatch(namespace, name, resourceVersion string) func() *kapi.Pod
170
+	PodWatch(namespace, name, resourceVersion string, stopChannel chan struct{}) func() *kapi.Pod
171 171
 }
172 172
 
173 173
 // HookExecutorPodClientImpl is a pluggable HookExecutorPodClient.
174 174
 type HookExecutorPodClientImpl struct {
175 175
 	CreatePodFunc func(namespace string, pod *kapi.Pod) (*kapi.Pod, error)
176
-	PodWatchFunc  func(namespace, name, resourceVersion string) func() *kapi.Pod
176
+	PodWatchFunc  func(namespace, name, resourceVersion string, stopChannel chan struct{}) func() *kapi.Pod
177 177
 }
178 178
 
179 179
 func (i *HookExecutorPodClientImpl) CreatePod(namespace string, pod *kapi.Pod) (*kapi.Pod, error) {
180 180
 	return i.CreatePodFunc(namespace, pod)
181 181
 }
182 182
 
183
-func (i *HookExecutorPodClientImpl) PodWatch(namespace, name, resourceVersion string) func() *kapi.Pod {
184
-	return i.PodWatchFunc(namespace, name, resourceVersion)
183
+func (i *HookExecutorPodClientImpl) PodWatch(namespace, name, resourceVersion string, stopChannel chan struct{}) func() *kapi.Pod {
184
+	return i.PodWatchFunc(namespace, name, resourceVersion, stopChannel)
185 185
 }
186 186
 
187 187
 // NewPodWatch creates a pod watching function which is backed by a
188 188
 // FIFO/reflector pair. This avoids managing watches directly.
189
-func NewPodWatch(client kclient.Interface, namespace, name, resourceVersion string) func() *kapi.Pod {
189
+// A stop channel to close the watch's reflector is also returned.
190
+// It is the caller's responsibility to defer closing the stop channel to prevent leaking resources.
191
+func NewPodWatch(client kclient.Interface, namespace, name, resourceVersion string, stopChannel chan struct{}) func() *kapi.Pod {
190 192
 	fieldSelector, _ := fields.ParseSelector("metadata.name=" + name)
191 193
 	podLW := &deployutil.ListWatcherImpl{
192 194
 		ListFunc: func() (runtime.Object, error) {
... ...
@@ -196,8 +200,9 @@ func NewPodWatch(client kclient.Interface, namespace, name, resourceVersion stri
196 196
 			return client.Pods(namespace).Watch(labels.Everything(), fieldSelector, resourceVersion)
197 197
 		},
198 198
 	}
199
+
199 200
 	queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
200
-	cache.NewReflector(podLW, &kapi.Pod{}, queue, 1*time.Minute).Run()
201
+	cache.NewReflector(podLW, &kapi.Pod{}, queue, 1*time.Minute).RunUntil(stopChannel)
201 202
 
202 203
 	return func() *kapi.Pod {
203 204
 		obj := queue.Pop()
... ...
@@ -30,7 +30,7 @@ func TestHookExecutor_executeExecNewCreatePodFailure(t *testing.T) {
30 30
 			CreatePodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) {
31 31
 				return nil, fmt.Errorf("couldn't create pod")
32 32
 			},
33
-			PodWatchFunc: func(namespace, name, resourceVersion string) func() *kapi.Pod {
33
+			PodWatchFunc: func(namespace, name, resourceVersion string, stopChannel chan struct{}) func() *kapi.Pod {
34 34
 				return func() *kapi.Pod { return nil }
35 35
 			},
36 36
 		},
... ...
@@ -62,7 +62,7 @@ func TestHookExecutor_executeExecNewPodSucceeded(t *testing.T) {
62 62
 				createdPod = pod
63 63
 				return createdPod, nil
64 64
 			},
65
-			PodWatchFunc: func(namespace, name, resourceVersion string) func() *kapi.Pod {
65
+			PodWatchFunc: func(namespace, name, resourceVersion string, stopChannel chan struct{}) func() *kapi.Pod {
66 66
 				createdPod.Status.Phase = kapi.PodSucceeded
67 67
 				return func() *kapi.Pod { return createdPod }
68 68
 			},
... ...
@@ -101,7 +101,7 @@ func TestHookExecutor_executeExecNewPodFailed(t *testing.T) {
101 101
 				createdPod = pod
102 102
 				return createdPod, nil
103 103
 			},
104
-			PodWatchFunc: func(namespace, name, resourceVersion string) func() *kapi.Pod {
104
+			PodWatchFunc: func(namespace, name, resourceVersion string, stopChannel chan struct{}) func() *kapi.Pod {
105 105
 				createdPod.Status.Phase = kapi.PodFailed
106 106
 				return func() *kapi.Pod { return createdPod }
107 107
 			},