Browse code

UPSTREAM: 28966: Fix watch cache filtering

Jordan Liggitt authored on 2016/07/13 01:34:43
Showing 4 changed files
... ...
@@ -21,7 +21,6 @@ import (
21 21
 	"net/http"
22 22
 	"reflect"
23 23
 	"strconv"
24
-	"strings"
25 24
 	"sync"
26 25
 	"time"
27 26
 
... ...
@@ -413,7 +412,7 @@ func filterFunction(key string, keyFunc func(runtime.Object) (string, error), fi
413 413
 			glog.Errorf("invalid object for filter: %v", obj)
414 414
 			return false
415 415
 		}
416
-		if !strings.HasPrefix(objKey, key) {
416
+		if !hasPathPrefix(objKey, key) {
417 417
 			return false
418 418
 		}
419 419
 		return filter(obj)
... ...
@@ -70,7 +70,7 @@ func makeTestPod(name string) *api.Pod {
70 70
 }
71 71
 
72 72
 func updatePod(t *testing.T, s storage.Interface, obj, old *api.Pod) *api.Pod {
73
-	key := etcdtest.AddPrefix("pods/ns/" + obj.Name)
73
+	key := etcdtest.AddPrefix("pods/" + obj.Namespace + "/" + obj.Name)
74 74
 	result := &api.Pod{}
75 75
 	if old == nil {
76 76
 		if err := s.Create(context.TODO(), key, obj, result, 0); err != nil {
... ...
@@ -107,6 +107,12 @@ func TestList(t *testing.T) {
107 107
 
108 108
 	_ = updatePod(t, etcdStorage, podFooPrime, fooCreated)
109 109
 
110
+	// Create a pod in a namespace that contains "ns" as a prefix
111
+	// Make sure it is not returned in a watch of "ns"
112
+	podFooNS2 := makeTestPod("foo")
113
+	podFooNS2.Namespace += "2"
114
+	updatePod(t, etcdStorage, podFooNS2, nil)
115
+
110 116
 	deleted := api.Pod{}
111 117
 	if err := etcdStorage.Delete(context.TODO(), etcdtest.AddPrefix("pods/ns/bar"), &deleted); err != nil {
112 118
 		t.Errorf("Unexpected error: %v", err)
... ...
@@ -144,6 +150,10 @@ func TestList(t *testing.T) {
144 144
 		item.ResourceVersion = ""
145 145
 		item.CreationTimestamp = unversioned.Time{}
146 146
 
147
+		if item.Namespace != "ns" {
148
+			t.Errorf("Unexpected namespace: %s", item.Namespace)
149
+		}
150
+
147 151
 		var expected *api.Pod
148 152
 		switch item.Name {
149 153
 		case "foo":
... ...
@@ -203,6 +213,9 @@ func TestWatch(t *testing.T) {
203 203
 	podFooBis := makeTestPod("foo")
204 204
 	podFooBis.Spec.NodeName = "anotherFakeNode"
205 205
 
206
+	podFooNS2 := makeTestPod("foo")
207
+	podFooNS2.Namespace += "2"
208
+
206 209
 	// initialVersion is used to initate the watcher at the beginning of the world,
207 210
 	// which is not defined precisely in etcd.
208 211
 	initialVersion, err := cacher.LastSyncResourceVersion()
... ...
@@ -218,6 +231,9 @@ func TestWatch(t *testing.T) {
218 218
 	}
219 219
 	defer watcher.Stop()
220 220
 
221
+	// Create in another namespace first to make sure events from other namespaces don't get delivered
222
+	updatePod(t, etcdStorage, podFooNS2, nil)
223
+
221 224
 	fooCreated := updatePod(t, etcdStorage, podFoo, nil)
222 225
 	_ = updatePod(t, etcdStorage, podBar, nil)
223 226
 	fooUpdated := updatePod(t, etcdStorage, podFooPrime, fooCreated)
... ...
@@ -317,6 +333,13 @@ func TestFiltering(t *testing.T) {
317 317
 	podFooPrime.Labels = map[string]string{"filter": "foo"}
318 318
 	podFooPrime.Spec.NodeName = "fakeNode"
319 319
 
320
+	podFooNS2 := makeTestPod("foo")
321
+	podFooNS2.Namespace += "2"
322
+	podFooNS2.Labels = map[string]string{"filter": "foo"}
323
+
324
+	// Create in another namespace first to make sure events from other namespaces don't get delivered
325
+	updatePod(t, etcdStorage, podFooNS2, nil)
326
+
320 327
 	fooCreated := updatePod(t, etcdStorage, podFoo, nil)
321 328
 	fooFiltered := updatePod(t, etcdStorage, podFooFiltered, fooCreated)
322 329
 	fooUnfiltered := updatePod(t, etcdStorage, podFoo, fooFiltered)
... ...
@@ -19,6 +19,7 @@ package storage
19 19
 import (
20 20
 	"fmt"
21 21
 	"strconv"
22
+	"strings"
22 23
 
23 24
 	"k8s.io/kubernetes/pkg/api/errors"
24 25
 	"k8s.io/kubernetes/pkg/api/meta"
... ...
@@ -90,3 +91,28 @@ func NoNamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) {
90 90
 	}
91 91
 	return prefix + "/" + name, nil
92 92
 }
93
+
94
+// hasPathPrefix returns true if the string matches pathPrefix exactly, or if is prefixed with pathPrefix at a path segment boundary
95
+func hasPathPrefix(s, pathPrefix string) bool {
96
+	// Short circuit if s doesn't contain the prefix at all
97
+	if !strings.HasPrefix(s, pathPrefix) {
98
+		return false
99
+	}
100
+
101
+	pathPrefixLength := len(pathPrefix)
102
+
103
+	if len(s) == pathPrefixLength {
104
+		// Exact match
105
+		return true
106
+	}
107
+	if strings.HasSuffix(pathPrefix, "/") {
108
+		// pathPrefix already ensured a path segment boundary
109
+		return true
110
+	}
111
+	if s[pathPrefixLength:pathPrefixLength+1] == "/" {
112
+		// The next character in s is a path segment boundary
113
+		// Check this instead of normalizing pathPrefix to avoid allocating on every call
114
+		return true
115
+	}
116
+	return false
117
+}
... ...
@@ -55,3 +55,51 @@ func TestEtcdParseWatchResourceVersion(t *testing.T) {
55 55
 		}
56 56
 	}
57 57
 }
58
+
59
+func TestHasPathPrefix(t *testing.T) {
60
+	validTestcases := []struct {
61
+		s      string
62
+		prefix string
63
+	}{
64
+		// Exact matches
65
+		{"", ""},
66
+		{"a", "a"},
67
+		{"a/", "a/"},
68
+		{"a/../", "a/../"},
69
+
70
+		// Path prefix matches
71
+		{"a/b", "a"},
72
+		{"a/b", "a/"},
73
+		{"中文/", "中文"},
74
+	}
75
+	for i, tc := range validTestcases {
76
+		if !hasPathPrefix(tc.s, tc.prefix) {
77
+			t.Errorf(`%d: Expected hasPathPrefix("%s","%s") to be true`, i, tc.s, tc.prefix)
78
+		}
79
+	}
80
+
81
+	invalidTestcases := []struct {
82
+		s      string
83
+		prefix string
84
+	}{
85
+		// Mismatch
86
+		{"a", "b"},
87
+
88
+		// Dir requirement
89
+		{"a", "a/"},
90
+
91
+		// Prefix mismatch
92
+		{"ns2", "ns"},
93
+		{"ns2", "ns/"},
94
+		{"中文文", "中文"},
95
+
96
+		// Ensure no normalization is applied
97
+		{"a/c/../b/", "a/b/"},
98
+		{"a/", "a/b/.."},
99
+	}
100
+	for i, tc := range invalidTestcases {
101
+		if hasPathPrefix(tc.s, tc.prefix) {
102
+			t.Errorf(`%d: Expected hasPathPrefix("%s","%s") to be false`, i, tc.s, tc.prefix)
103
+		}
104
+	}
105
+}