Browse code

libnetwork: fix graceful service endpoint removal

When a container is stopped, new connections to a published port will
not be routed to that container but any existing connections are
permitted to continue uninterrupted for the duration of the container's
grace period. Unfortunately recent fixes to overlay networks introduced
a regression: existing connections routed over the service mesh to
containers on remote nodes are dropped immediately when the container is
stopped, irrespective of the grace period.

Fix the handling of NetworkDB endpoint table events so that the endpoint
is disabled in the load balancer when a service endpoint transitions to
ServiceDisabled instead of deleting the endpoint and re-adding it. And
fix the other bugged state transitions with the help of a unit test
which exhaustively covers all permutations of endpoint event.

Signed-off-by: Cory Snider <csnider@mirantis.com>

Cory Snider authored on 2026/01/14 08:08:22
Showing 2 changed files
... ...
@@ -362,7 +362,7 @@ func (c *Controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d
362 362
 	}
363 363
 	c.mu.Unlock()
364 364
 
365
-	go c.handleTableEvents(ch, c.handleEpTableEvent)
365
+	go c.handleTableEvents(ch, func(ev events.Event) { handleEpTableEvent(c, ev) })
366 366
 	go c.handleTableEvents(nodeCh, c.handleNodeTableEvent)
367 367
 
368 368
 	keys, tags := c.getKeys(subsysIPSec)
... ...
@@ -894,14 +894,17 @@ func unmarshalEndpointRecord(data []byte) (*endpointEvent, error) {
894 894
 	}, nil
895 895
 }
896 896
 
897
-// EquivalentTo returns true if ev is semantically equivalent to other.
897
+// EquivalentTo returns true if ev is semantically equivalent to other,
898
+// ignoring the ServiceDisabled field.
898 899
 func (ev *endpointEvent) EquivalentTo(other *endpointEvent) bool {
900
+	if ev == nil || other == nil {
901
+		return (ev == nil) == (other == nil)
902
+	}
899 903
 	return ev.Name == other.Name &&
900 904
 		ev.ServiceName == other.ServiceName &&
901 905
 		ev.ServiceID == other.ServiceID &&
902 906
 		ev.VirtualIP == other.VirtualIP &&
903 907
 		ev.EndpointIP == other.EndpointIP &&
904
-		ev.ServiceDisabled == other.ServiceDisabled &&
905 908
 		iterutil.SameValues(
906 909
 			iterutil.Deref(slices.Values(ev.IngressPorts)),
907 910
 			iterutil.Deref(slices.Values(other.IngressPorts))) &&
... ...
@@ -909,7 +912,14 @@ func (ev *endpointEvent) EquivalentTo(other *endpointEvent) bool {
909 909
 		iterutil.SameValues(slices.Values(ev.TaskAliases), slices.Values(other.TaskAliases))
910 910
 }
911 911
 
912
-func (c *Controller) handleEpTableEvent(ev events.Event) {
912
+type serviceBinder interface {
913
+	addContainerNameResolution(nID, eID, containerName string, taskAliases []string, ip net.IP, method string) error
914
+	delContainerNameResolution(nID, eID, containerName string, taskAliases []string, ip net.IP, method string) error
915
+	addServiceBinding(svcName, svcID, nID, eID, containerName string, vip net.IP, ingressPorts []*PortConfig, serviceAliases, taskAliases []string, ip net.IP, method string) error
916
+	rmServiceBinding(svcName, svcID, nID, eID, containerName string, vip net.IP, ingressPorts []*PortConfig, serviceAliases []string, taskAliases []string, ip net.IP, method string, deleteSvcRecords bool, fullRemove bool) error
917
+}
918
+
919
+func handleEpTableEvent(c serviceBinder, ev events.Event) {
913 920
 	event := ev.(networkdb.WatchEvent)
914 921
 	nid := event.NetworkID
915 922
 	eid := event.Key
... ...
@@ -939,8 +949,9 @@ func (c *Controller) handleEpTableEvent(ev events.Event) {
939 939
 	})
940 940
 	logger.Debug("handleEpTableEvent")
941 941
 
942
+	equivalent := prev.EquivalentTo(epRec)
942 943
 	if prev != nil {
943
-		if epRec != nil && prev.EquivalentTo(epRec) {
944
+		if equivalent && prev.ServiceDisabled == epRec.ServiceDisabled {
944 945
 			// Avoid flapping if we would otherwise remove a service
945 946
 			// binding then immediately replace it with an equivalent one.
946 947
 			return
... ...
@@ -948,7 +959,11 @@ func (c *Controller) handleEpTableEvent(ev events.Event) {
948 948
 
949 949
 		if prev.ServiceID != "" {
950 950
 			// This is a remote task part of a service
951
-			if !prev.ServiceDisabled {
951
+			if epRec == nil || !equivalent {
952
+				// Either the endpoint is deleted from NetworkDB or has
953
+				// been replaced with a different one. Remove the old
954
+				// binding. The new binding, if any, will be added
955
+				// below.
952 956
 				err := c.rmServiceBinding(prev.ServiceName, prev.ServiceID, nid, eid,
953 957
 					prev.Name, prev.VirtualIP.AsSlice(), prev.IngressPorts,
954 958
 					prev.Aliases, prev.TaskAliases, prev.EndpointIP.AsSlice(),
... ...
@@ -957,7 +972,7 @@ func (c *Controller) handleEpTableEvent(ev events.Event) {
957 957
 					logger.WithError(err).Error("failed removing service binding")
958 958
 				}
959 959
 			}
960
-		} else {
960
+		} else if !prev.ServiceDisabled && (!equivalent || epRec == nil || epRec.ServiceDisabled) {
961 961
 			// This is a remote container simply attached to an attachable network
962 962
 			err := c.delContainerNameResolution(nid, eid, prev.Name, prev.TaskAliases,
963 963
 				prev.EndpointIP.AsSlice(), "handleEpTableEvent")
... ...
@@ -970,19 +985,16 @@ func (c *Controller) handleEpTableEvent(ev events.Event) {
970 970
 	if epRec != nil {
971 971
 		if epRec.ServiceID != "" {
972 972
 			// This is a remote task part of a service
973
-			if epRec.ServiceDisabled {
974
-				// Don't double-remove a service binding
975
-				if prev == nil || prev.ServiceID != epRec.ServiceID || !prev.ServiceDisabled {
976
-					err := c.rmServiceBinding(epRec.ServiceName, epRec.ServiceID,
977
-						nid, eid, epRec.Name, epRec.VirtualIP.AsSlice(),
978
-						epRec.IngressPorts, epRec.Aliases, epRec.TaskAliases,
979
-						epRec.EndpointIP.AsSlice(), "handleEpTableEvent", true, false)
980
-					if err != nil {
981
-						logger.WithError(err).Error("failed disabling service binding")
982
-						return
983
-					}
973
+			if equivalent && !prev.ServiceDisabled && epRec.ServiceDisabled {
974
+				err := c.rmServiceBinding(epRec.ServiceName, epRec.ServiceID,
975
+					nid, eid, epRec.Name, epRec.VirtualIP.AsSlice(),
976
+					epRec.IngressPorts, epRec.Aliases, epRec.TaskAliases,
977
+					epRec.EndpointIP.AsSlice(), "handleEpTableEvent", true, false)
978
+				if err != nil {
979
+					logger.WithError(err).Error("failed disabling service binding")
980
+					return
984 981
 				}
985
-			} else {
982
+			} else if !epRec.ServiceDisabled {
986 983
 				err := c.addServiceBinding(epRec.ServiceName, epRec.ServiceID, nid, eid,
987 984
 					epRec.Name, epRec.VirtualIP.AsSlice(), epRec.IngressPorts,
988 985
 					epRec.Aliases, epRec.TaskAliases, epRec.EndpointIP.AsSlice(),
... ...
@@ -992,7 +1004,7 @@ func (c *Controller) handleEpTableEvent(ev events.Event) {
992 992
 					return
993 993
 				}
994 994
 			}
995
-		} else {
995
+		} else if !epRec.ServiceDisabled && (!equivalent || prev == nil || prev.ServiceDisabled) {
996 996
 			// This is a remote container simply attached to an attachable network
997 997
 			err := c.addContainerNameResolution(nid, eid, epRec.Name, epRec.TaskAliases,
998 998
 				epRec.EndpointIP.AsSlice(), "handleEpTableEvent")
... ...
@@ -1,11 +1,16 @@
1 1
 package libnetwork
2 2
 
3 3
 import (
4
+	"fmt"
5
+	"net"
4 6
 	"net/netip"
5 7
 	"slices"
6 8
 	"testing"
7 9
 
10
+	"github.com/gogo/protobuf/proto"
8 11
 	"gotest.tools/v3/assert"
12
+
13
+	"github.com/moby/moby/v2/daemon/libnetwork/networkdb"
9 14
 )
10 15
 
11 16
 func TestEndpointEvent_EquivalentTo(t *testing.T) {
... ...
@@ -40,9 +45,12 @@ func TestEndpointEvent_EquivalentTo(t *testing.T) {
40 40
 		return a.EquivalentTo(b)
41 41
 	}
42 42
 
43
+	assert.Check(t, reflexiveEquiv(nil, nil), "nil should be equivalent to nil")
44
+	assert.Check(t, !reflexiveEquiv(&a, nil), "non-nil should not be equivalent to nil")
45
+
43 46
 	b := a
44 47
 	b.ServiceDisabled = true
45
-	assert.Check(t, !reflexiveEquiv(&a, &b), "differing by ServiceDisabled")
48
+	assert.Check(t, reflexiveEquiv(&a, &b), "ServiceDisabled value should not matter")
46 49
 
47 50
 	c := a
48 51
 	c.IngressPorts = slices.Clone(a.IngressPorts)
... ...
@@ -88,3 +96,321 @@ func TestEndpointEvent_EquivalentTo(t *testing.T) {
88 88
 	l.Name = "aaaaa"
89 89
 	assert.Check(t, !reflexiveEquiv(&a, &l), "Differing Name should not be equivalent")
90 90
 }
91
+
92
+type mockServiceBinder struct {
93
+	actions []string
94
+}
95
+
96
+func (m *mockServiceBinder) addContainerNameResolution(nID, eID, containerName string, _ []string, ip net.IP, _ string) error {
97
+	m.actions = append(m.actions, fmt.Sprintf("addContainerNameResolution(%v, %v, %v, %v)", nID, eID, containerName, ip))
98
+	return nil
99
+}
100
+
101
+func (m *mockServiceBinder) delContainerNameResolution(nID, eID, containerName string, _ []string, ip net.IP, _ string) error {
102
+	m.actions = append(m.actions, fmt.Sprintf("delContainerNameResolution(%v, %v, %v, %v)", nID, eID, containerName, ip))
103
+	return nil
104
+}
105
+
106
+func (m *mockServiceBinder) addServiceBinding(svcName, svcID, nID, eID, containerName string, vip net.IP, _ []*PortConfig, _, _ []string, ip net.IP, _ string) error {
107
+	m.actions = append(m.actions, fmt.Sprintf("addServiceBinding(%v, %v, %v, %v, %v, %v, %v)", svcName, svcID, nID, eID, containerName, vip, ip))
108
+	return nil
109
+}
110
+
111
+func (m *mockServiceBinder) rmServiceBinding(svcName, svcID, nID, eID, containerName string, vip net.IP, _ []*PortConfig, _, _ []string, ip net.IP, _ string, deleteSvcRecords bool, fullRemove bool) error {
112
+	m.actions = append(m.actions, fmt.Sprintf("rmServiceBinding(%v, %v, %v, %v, %v, %v, %v, deleteSvcRecords=%v, fullRemove=%v)", svcName, svcID, nID, eID, containerName, vip, ip, deleteSvcRecords, fullRemove))
113
+	return nil
114
+}
115
+
116
+func TestHandleEPTableEvent(t *testing.T) {
117
+	svc1 := EndpointRecord{
118
+		Name:        "ep1",
119
+		ServiceName: "svc1",
120
+		ServiceID:   "id1",
121
+		VirtualIP:   "10.0.0.1",
122
+		EndpointIP:  "192.168.12.42",
123
+	}
124
+	svc1disabled := svc1
125
+	svc1disabled.ServiceDisabled = true
126
+
127
+	svc2 := EndpointRecord{
128
+		Name:        "ep2",
129
+		ServiceName: "svc2",
130
+		ServiceID:   "id2",
131
+		VirtualIP:   "10.0.0.2",
132
+		EndpointIP:  "172.16.69.5",
133
+	}
134
+	svc2disabled := svc2
135
+	svc2disabled.ServiceDisabled = true
136
+
137
+	ctr1 := EndpointRecord{
138
+		Name:       "ctr1",
139
+		EndpointIP: "172.18.1.1",
140
+	}
141
+	ctr1disabled := ctr1
142
+	ctr1disabled.ServiceDisabled = true
143
+
144
+	ctr2 := EndpointRecord{
145
+		Name:       "ctr2",
146
+		EndpointIP: "172.18.1.2",
147
+	}
148
+	ctr2disabled := ctr2
149
+	ctr2disabled.ServiceDisabled = true
150
+
151
+	tests := []struct {
152
+		name            string
153
+		prev, ev        *EndpointRecord
154
+		expectedActions []string
155
+	}{
156
+		{
157
+			name: "Insert/Service/ServiceDisabled=false",
158
+			ev:   &svc1,
159
+			expectedActions: []string{
160
+				"addServiceBinding(svc1, id1, network1, endpoint1, ep1, 10.0.0.1, 192.168.12.42)",
161
+			},
162
+		},
163
+		{
164
+			name: "Insert/Service/ServiceDisabled=true",
165
+			ev:   &svc1disabled,
166
+		},
167
+		{
168
+			name: "Insert/Container/ServiceDisabled=false",
169
+			ev:   &ctr1,
170
+			expectedActions: []string{
171
+				"addContainerNameResolution(network1, endpoint1, ctr1, 172.18.1.1)",
172
+			},
173
+		},
174
+		{
175
+			name: "Insert/Container/ServiceDisabled=true",
176
+			ev:   &ctr1disabled,
177
+		},
178
+
179
+		{
180
+			name: "Update/Service/ServiceDisabled=ft",
181
+			prev: &svc1,
182
+			ev:   &svc1disabled,
183
+			expectedActions: []string{
184
+				"rmServiceBinding(svc1, id1, network1, endpoint1, ep1, 10.0.0.1, 192.168.12.42, deleteSvcRecords=true, fullRemove=false)",
185
+			},
186
+		},
187
+		{
188
+			name: "Update/Service/ServiceDisabled=tf",
189
+			prev: &svc1disabled,
190
+			ev:   &svc1,
191
+			expectedActions: []string{
192
+				"addServiceBinding(svc1, id1, network1, endpoint1, ep1, 10.0.0.1, 192.168.12.42)",
193
+			},
194
+		},
195
+		{
196
+			name: "Update/Service/ServiceDisabled=ff",
197
+			prev: &svc1disabled,
198
+			ev:   &svc1disabled,
199
+		},
200
+		{
201
+			name: "Update/Service/ServiceDisabled=tt",
202
+			prev: &svc1,
203
+			ev:   &svc1,
204
+		},
205
+		{
206
+			name: "Update/Container/ServiceDisabled=ft",
207
+			prev: &ctr1,
208
+			ev:   &ctr1disabled,
209
+			expectedActions: []string{
210
+				"delContainerNameResolution(network1, endpoint1, ctr1, 172.18.1.1)",
211
+			},
212
+		},
213
+		{
214
+			name: "Update/Container/ServiceDisabled=tf",
215
+			prev: &ctr1disabled,
216
+			ev:   &ctr1,
217
+			expectedActions: []string{
218
+				"addContainerNameResolution(network1, endpoint1, ctr1, 172.18.1.1)",
219
+			},
220
+		},
221
+		{
222
+			name: "Update/Container/ServiceDisabled=ff",
223
+			prev: &ctr1disabled,
224
+			ev:   &ctr1disabled,
225
+		},
226
+		{
227
+			name: "Update/Container/ServiceDisabled=tt",
228
+			prev: &ctr1,
229
+			ev:   &ctr1,
230
+		},
231
+
232
+		{
233
+			name: "Delete/Service/ServiceDisabled=false",
234
+			prev: &svc1,
235
+			expectedActions: []string{
236
+				"rmServiceBinding(svc1, id1, network1, endpoint1, ep1, 10.0.0.1, 192.168.12.42, deleteSvcRecords=true, fullRemove=true)",
237
+			},
238
+		},
239
+		{
240
+			name: "Delete/Service/ServiceDisabled=true",
241
+			prev: &svc1disabled,
242
+			expectedActions: []string{
243
+				"rmServiceBinding(svc1, id1, network1, endpoint1, ep1, 10.0.0.1, 192.168.12.42, deleteSvcRecords=true, fullRemove=true)",
244
+			},
245
+		},
246
+
247
+		{
248
+			name: "Delete/Container/ServiceDisabled=false",
249
+			prev: &ctr1,
250
+			expectedActions: []string{
251
+				"delContainerNameResolution(network1, endpoint1, ctr1, 172.18.1.1)",
252
+			},
253
+		},
254
+		{
255
+			name: "Delete/Container/ServiceDisabled=true",
256
+			prev: &ctr1disabled,
257
+		},
258
+
259
+		{
260
+			name: "Replace/From=Service/To=Service/ServiceDisabled=ff",
261
+			prev: &svc1,
262
+			ev:   &svc2,
263
+			expectedActions: []string{
264
+				"rmServiceBinding(svc1, id1, network1, endpoint1, ep1, 10.0.0.1, 192.168.12.42, deleteSvcRecords=true, fullRemove=true)",
265
+				"addServiceBinding(svc2, id2, network1, endpoint1, ep2, 10.0.0.2, 172.16.69.5)",
266
+			},
267
+		},
268
+		{
269
+			name: "Replace/From=Service/To=Service/ServiceDisabled=ft",
270
+			prev: &svc1,
271
+			ev:   &svc2disabled,
272
+			expectedActions: []string{
273
+				"rmServiceBinding(svc1, id1, network1, endpoint1, ep1, 10.0.0.1, 192.168.12.42, deleteSvcRecords=true, fullRemove=true)",
274
+			},
275
+		},
276
+		{
277
+			name: "Replace/From=Service/To=Service/ServiceDisabled=tf",
278
+			prev: &svc1disabled,
279
+			ev:   &svc2,
280
+			expectedActions: []string{
281
+				"rmServiceBinding(svc1, id1, network1, endpoint1, ep1, 10.0.0.1, 192.168.12.42, deleteSvcRecords=true, fullRemove=true)",
282
+				"addServiceBinding(svc2, id2, network1, endpoint1, ep2, 10.0.0.2, 172.16.69.5)",
283
+			},
284
+		},
285
+		{
286
+			name: "Replace/From=Service/To=Service/ServiceDisabled=tt",
287
+			prev: &svc1disabled,
288
+			ev:   &svc2disabled,
289
+			expectedActions: []string{
290
+				"rmServiceBinding(svc1, id1, network1, endpoint1, ep1, 10.0.0.1, 192.168.12.42, deleteSvcRecords=true, fullRemove=true)",
291
+			},
292
+		},
293
+		{
294
+			name: "Replace/From=Service/To=Container/ServiceDisabled=ff",
295
+			prev: &svc1,
296
+			ev:   &ctr2,
297
+			expectedActions: []string{
298
+				"rmServiceBinding(svc1, id1, network1, endpoint1, ep1, 10.0.0.1, 192.168.12.42, deleteSvcRecords=true, fullRemove=true)",
299
+				"addContainerNameResolution(network1, endpoint1, ctr2, 172.18.1.2)",
300
+			},
301
+		},
302
+		{
303
+			name: "Replace/From=Service/To=Container/ServiceDisabled=ft",
304
+			prev: &svc1,
305
+			ev:   &ctr2disabled,
306
+			expectedActions: []string{
307
+				"rmServiceBinding(svc1, id1, network1, endpoint1, ep1, 10.0.0.1, 192.168.12.42, deleteSvcRecords=true, fullRemove=true)",
308
+			},
309
+		},
310
+		{
311
+			name: "Replace/From=Service/To=Container/ServiceDisabled=tf",
312
+			prev: &svc1disabled,
313
+			ev:   &ctr2,
314
+			expectedActions: []string{
315
+				"rmServiceBinding(svc1, id1, network1, endpoint1, ep1, 10.0.0.1, 192.168.12.42, deleteSvcRecords=true, fullRemove=true)",
316
+				"addContainerNameResolution(network1, endpoint1, ctr2, 172.18.1.2)",
317
+			},
318
+		},
319
+		{
320
+			name: "Replace/From=Service/To=Container/ServiceDisabled=tt",
321
+			prev: &svc1disabled,
322
+			ev:   &ctr2disabled,
323
+			expectedActions: []string{
324
+				"rmServiceBinding(svc1, id1, network1, endpoint1, ep1, 10.0.0.1, 192.168.12.42, deleteSvcRecords=true, fullRemove=true)",
325
+			},
326
+		},
327
+		{
328
+			name: "Replace/From=Container/To=Service/ServiceDisabled=ff",
329
+			prev: &ctr1,
330
+			ev:   &svc2,
331
+			expectedActions: []string{
332
+				"delContainerNameResolution(network1, endpoint1, ctr1, 172.18.1.1)",
333
+				"addServiceBinding(svc2, id2, network1, endpoint1, ep2, 10.0.0.2, 172.16.69.5)",
334
+			},
335
+		},
336
+		{
337
+			name: "Replace/From=Container/To=Service/ServiceDisabled=ft",
338
+			prev: &ctr1,
339
+			ev:   &svc2disabled,
340
+			expectedActions: []string{
341
+				"delContainerNameResolution(network1, endpoint1, ctr1, 172.18.1.1)",
342
+			},
343
+		},
344
+		{
345
+			name: "Replace/From=Container/To=Service/ServiceDisabled=tf",
346
+			prev: &ctr1disabled,
347
+			ev:   &svc2,
348
+			expectedActions: []string{
349
+				"addServiceBinding(svc2, id2, network1, endpoint1, ep2, 10.0.0.2, 172.16.69.5)",
350
+			},
351
+		},
352
+		{
353
+			name: "Replace/From=Container/To=Service/ServiceDisabled=tt",
354
+			prev: &ctr1disabled,
355
+			ev:   &svc2disabled,
356
+		},
357
+		{
358
+			name: "Replace/From=Container/To=Container/ServiceDisabled=ff",
359
+			prev: &ctr1,
360
+			ev:   &ctr2,
361
+			expectedActions: []string{
362
+				"delContainerNameResolution(network1, endpoint1, ctr1, 172.18.1.1)",
363
+				"addContainerNameResolution(network1, endpoint1, ctr2, 172.18.1.2)",
364
+			},
365
+		},
366
+		{
367
+			name: "Replace/From=Container/To=Container/ServiceDisabled=ft",
368
+			prev: &ctr1,
369
+			ev:   &ctr2disabled,
370
+			expectedActions: []string{
371
+				"delContainerNameResolution(network1, endpoint1, ctr1, 172.18.1.1)",
372
+			},
373
+		},
374
+		{
375
+			name: "Replace/From=Container/To=Container/ServiceDisabled=tf",
376
+			prev: &ctr1disabled,
377
+			ev:   &ctr2,
378
+			expectedActions: []string{
379
+				"addContainerNameResolution(network1, endpoint1, ctr2, 172.18.1.2)",
380
+			},
381
+		},
382
+		{
383
+			name: "Replace/From=Container/To=Container/ServiceDisabled=tt",
384
+			prev: &ctr1disabled,
385
+			ev:   &ctr2disabled,
386
+		},
387
+	}
388
+	for _, tt := range tests {
389
+		t.Run(tt.name, func(t *testing.T) {
390
+			msb := &mockServiceBinder{}
391
+			event := networkdb.WatchEvent{
392
+				NetworkID: "network1",
393
+				Key:       "endpoint1",
394
+			}
395
+			var err error
396
+			if tt.prev != nil {
397
+				event.Prev, err = proto.Marshal(tt.prev)
398
+				assert.NilError(t, err)
399
+			}
400
+			if tt.ev != nil {
401
+				event.Value, err = proto.Marshal(tt.ev)
402
+				assert.NilError(t, err)
403
+			}
404
+			handleEpTableEvent(msb, event)
405
+			assert.DeepEqual(t, tt.expectedActions, msb.actions)
406
+		})
407
+	}
408
+}