Browse code

Route ordering is unstable, and writes must be ignored

The resync function of the controller results in results being sent in
different orders, which results in the router sometimes processing new
routes first, then replacing them with older ones. We now sort the
resync list to be oldest -> newest, which results in stable outcomes.

We also we generating extra writes when comparing the old rejection
notice to the new one - the time field was different, causing every
rejection notice (even those for the same reason) to be written to etcd.
Fixed by restoring the clear of the time and adding a comment.

Clayton Coleman authored on 2016/02/18 10:25:17
Showing 4 changed files
... ...
@@ -77,7 +77,7 @@ func (c *RouterController) HandleRoute() {
77 77
 	c.lock.Lock()
78 78
 	defer c.lock.Unlock()
79 79
 
80
-	glog.V(4).Infof("Processing Route: %s", route.Spec.To.Name)
80
+	glog.V(4).Infof("Processing Route: %s -> %s", route.Name, route.Spec.To.Name)
81 81
 	glog.V(4).Infof("           Alias: %s", route.Spec.Host)
82 82
 	glog.V(4).Infof("           Event: %s", eventType)
83 83
 
... ...
@@ -2,6 +2,7 @@ package factory
2 2
 
3 3
 import (
4 4
 	"fmt"
5
+	"sort"
5 6
 	"time"
6 7
 
7 8
 	kapi "k8s.io/kubernetes/pkg/api"
... ...
@@ -166,6 +167,21 @@ func (r *routesByHost) Endpoints(namespace, name string) *kapi.Endpoints {
166 166
 	return obj.(*kapi.Endpoints)
167 167
 }
168 168
 
169
+// routeAge sorts routes from oldest to newest and is stable for all routes.
170
+type routeAge []routeapi.Route
171
+
172
+func (r routeAge) Len() int      { return len(r) }
173
+func (r routeAge) Swap(i, j int) { r[i], r[j] = r[j], r[i] }
174
+func (r routeAge) Less(i, j int) bool {
175
+	if r[i].CreationTimestamp.Before(r[j].CreationTimestamp) {
176
+		return true
177
+	}
178
+	if r[i].Namespace < r[j].Namespace {
179
+		return true
180
+	}
181
+	return r[i].Name < r[j].Name
182
+}
183
+
169 184
 func oldestRoute(routes []interface{}) *routeapi.Route {
170 185
 	var oldest *routeapi.Route
171 186
 	for i := range routes {
... ...
@@ -202,7 +218,13 @@ func (lw *routeLW) List(options kapi.ListOptions) (runtime.Object, error) {
202 202
 		LabelSelector: lw.label,
203 203
 		FieldSelector: lw.field,
204 204
 	}
205
-	return lw.client.Routes(lw.namespace).List(opts)
205
+	routes, err := lw.client.Routes(lw.namespace).List(opts)
206
+	if err != nil {
207
+		return nil, err
208
+	}
209
+	// return routes in order of age to avoid rejections during resync
210
+	sort.Sort(routeAge(routes.Items))
211
+	return routes, nil
206 212
 }
207 213
 
208 214
 func (lw *routeLW) Watch(options kapi.ListOptions) (watch.Interface, error) {
... ...
@@ -92,7 +92,8 @@ func findOrCreateIngress(route *routeapi.Route, name string) (_ *routeapi.RouteI
92 92
 // false if no modification was made.
93 93
 func setIngressCondition(ingress *routeapi.RouteIngress, condition routeapi.RouteIngressCondition) bool {
94 94
 	for _, existing := range ingress.Conditions {
95
-		//existing.LastTransitionTime = nil
95
+		// ensures that the comparison is based on the actual value, not the time
96
+		existing.LastTransitionTime = condition.LastTransitionTime
96 97
 		if existing == condition {
97 98
 			return false
98 99
 		}
... ...
@@ -109,10 +109,10 @@ func TestStatusResetsHost(t *testing.T) {
109 109
 		t.Fatalf("expected route reset: %#v", obj)
110 110
 	}
111 111
 	condition := obj.Status.Ingress[0].Conditions[0]
112
-	if condition.LastTransitionTime == nil || *condition.LastTransitionTime != now || condition.Status != kapi.ConditionTrue || condition.Reason != "" {
112
+	if condition.LastTransitionTime == nil || *condition.LastTransitionTime != touched || condition.Status != kapi.ConditionTrue || condition.Reason != "" {
113 113
 		t.Fatalf("unexpected condition: %#v", condition)
114 114
 	}
115
-	if v, ok := admitter.expected.Peek(types.UID("uid1")); !ok || !reflect.DeepEqual(v, now.Time) {
115
+	if v, ok := admitter.expected.Peek(types.UID("uid1")); !ok || !reflect.DeepEqual(v, touched.Time) {
116 116
 		t.Fatalf("did not record last modification time: %#v %#v", admitter.expected, v)
117 117
 	}
118 118
 }
... ...
@@ -198,6 +198,43 @@ func TestStatusRecordRejection(t *testing.T) {
198 198
 	}
199 199
 }
200 200
 
201
+func TestStatusRecordRejectionNoChange(t *testing.T) {
202
+	now := unversioned.Now()
203
+	nowFn = func() unversioned.Time { return now }
204
+	touched := unversioned.Time{Time: now.Add(-time.Minute)}
205
+	p := &fakePlugin{}
206
+	c := testclient.NewSimpleFake(&routeapi.Route{})
207
+	admitter := NewStatusAdmitter(p, c, "test")
208
+	admitter.RecordRouteRejection(&routeapi.Route{
209
+		ObjectMeta: kapi.ObjectMeta{Name: "route1", Namespace: "default", UID: types.UID("uid1")},
210
+		Spec:       routeapi.RouteSpec{Host: "route1.test.local"},
211
+		Status: routeapi.RouteStatus{
212
+			Ingress: []routeapi.RouteIngress{
213
+				{
214
+					Host:       "route1.test.local",
215
+					RouterName: "test",
216
+					Conditions: []routeapi.RouteIngressCondition{
217
+						{
218
+							Type:               routeapi.RouteAdmitted,
219
+							Status:             kapi.ConditionFalse,
220
+							Reason:             "Failed",
221
+							Message:            "generic error",
222
+							LastTransitionTime: &touched,
223
+						},
224
+					},
225
+				},
226
+			},
227
+		},
228
+	}, "Failed", "generic error")
229
+
230
+	if len(c.Actions()) != 0 {
231
+		t.Fatalf("unexpected actions: %#v", c.Actions())
232
+	}
233
+	if v, ok := admitter.expected.Peek(types.UID("uid1")); ok {
234
+		t.Fatalf("expected empty time: %#v", v)
235
+	}
236
+}
237
+
201 238
 func TestStatusRecordRejectionWithStatus(t *testing.T) {
202 239
 	now := unversioned.Now()
203 240
 	nowFn = func() unversioned.Time { return now }