Browse code

libnetwork/d/overlay: handle coalesced peer updates

The eventually-consistent nature of NetworkDB means we cannot depend on
events being received in the same order that they were sent. Nor can we
depend on receiving events for all intermediate states. It is possible
for a series of entry UPDATEs, or a DELETE followed by a CREATE with the
same key, to get coalesced into a single UPDATE event on the receiving
node. Watchers of NetworkDB tables therefore need to be prepared to
gracefully handle arbitrary UPDATEs of a key, including those where the
new value may have nothing in common with the previous value.

The overlay driver naively handled events for overlay_peer_table
assuming that an endpoint leave followed by a rejoin of the same
endpoint would always be expressed as a DELETE event followed by a
CREATE. It would handle a coalesced UPDATE as a CREATE, inserting a new
entry into peerDB without removing the old one. This would
have various side effects, such as having the "transient state" of
multiple entries in peerDB with the same peer IP never settle.

Update driverapi to pass both the previous and new value of a table
entry into the driver. Modify the overlay driver to handle an UPDATE by
removing the previous peer entry from peerDB then adding the new one.
Modify the Windows overlay driver to match.

Signed-off-by: Cory Snider <csnider@mirantis.com>

Cory Snider authored on 2025/07/11 07:22:09
Showing 6 changed files
... ...
@@ -822,25 +822,8 @@ func (n *Network) handleDriverTableEvent(ev events.Event) {
822 822
 		return
823 823
 	}
824 824
 
825
-	var (
826
-		etype driverapi.EventType
827
-		value []byte
828
-	)
829
-
830 825
 	event := ev.(networkdb.WatchEvent)
831
-	switch {
832
-	case event.IsCreate():
833
-		value = event.Value
834
-		etype = driverapi.Create
835
-	case event.IsDelete():
836
-		value = event.Prev
837
-		etype = driverapi.Delete
838
-	case event.IsUpdate():
839
-		value = event.Value
840
-		etype = driverapi.Update
841
-	}
842
-
843
-	ed.EventNotify(etype, n.ID(), event.Table, event.Key, value)
826
+	ed.EventNotify(n.ID(), event.Table, event.Key, event.Prev, event.Value)
844 827
 }
845 828
 
846 829
 func (c *Controller) handleNodeTableEvent(ev events.Event) {
... ...
@@ -69,7 +69,7 @@ type TableWatcher interface {
69 69
 	// happened on a table of its interest as soon as this node
70 70
 	// receives such an event in the gossip layer. This method is
71 71
 	// only invoked for the global scope driver.
72
-	EventNotify(event EventType, nid string, tableName string, key string, value []byte)
72
+	EventNotify(nid string, tableName string, key string, prev, value []byte)
73 73
 
74 74
 	// DecodeTableEntry passes the driver a key, value pair from table it registered
75 75
 	// with libnetwork. Driver should return {object ID, map[string]string} tuple.
... ...
@@ -204,18 +204,6 @@ type IPAMData struct {
204 204
 	AuxAddresses map[string]*net.IPNet
205 205
 }
206 206
 
207
-// EventType defines a type for the CRUD event
208
-type EventType uint8
209
-
210
-const (
211
-	// Create event is generated when a table entry is created,
212
-	Create EventType = 1 + iota
213
-	// Update event is generated when a table entry is updated.
214
-	Update
215
-	// Delete event is generated when a table entry is deleted.
216
-	Delete
217
-)
218
-
219 207
 // ObjectType represents the type of object driver wants to store in libnetwork's networkDB
220 208
 type ObjectType int
221 209
 
... ...
@@ -11,7 +11,6 @@ import (
11 11
 
12 12
 	"github.com/containerd/log"
13 13
 	"github.com/docker/docker/daemon/libnetwork/driverapi"
14
-	"github.com/docker/docker/daemon/libnetwork/internal/hashable"
15 14
 	"github.com/docker/docker/daemon/libnetwork/internal/netiputil"
16 15
 	"github.com/docker/docker/daemon/libnetwork/netlabel"
17 16
 	"github.com/docker/docker/daemon/libnetwork/ns"
... ...
@@ -161,7 +160,7 @@ func (d *driver) DecodeTableEntry(tablename string, key string, value []byte) (s
161 161
 	}
162 162
 }
163 163
 
164
-func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key string, value []byte) {
164
+func (d *driver) EventNotify(nid, tableName, key string, prev, value []byte) {
165 165
 	if tableName != OverlayPeerTable {
166 166
 		log.G(context.TODO()).Errorf("Unexpected table notification for table %s received", tableName)
167 167
 		return
... ...
@@ -169,33 +168,37 @@ func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key stri
169 169
 
170 170
 	eid := key
171 171
 
172
-	var peer PeerRecord
173
-	if err := proto.Unmarshal(value, &peer); err != nil {
174
-		log.G(context.TODO()).Errorf("Failed to unmarshal peer record: %v", err)
175
-		return
176
-	}
177
-
178
-	// Ignore local peers. We already know about them and they
179
-	// should not be added to vxlan fdb.
180
-	if addr, _ := netip.ParseAddr(peer.TunnelEndpointIP); addr == d.advertiseAddress {
181
-		return
172
+	var prevPeer, newPeer *Peer
173
+	if prev != nil {
174
+		var err error
175
+		prevPeer, err = UnmarshalPeerRecord(prev)
176
+		if err != nil {
177
+			log.G(context.TODO()).WithError(err).Error("Failed to unmarshal previous peer record")
178
+		}
179
+		if prevPeer.TunnelEndpointIP == d.advertiseAddress {
180
+			// Ignore local peers. We don't add them to the VXLAN
181
+			// FDB so don't need to remove them.
182
+			prevPeer = nil
183
+		}
182 184
 	}
183
-
184
-	addr, err := netip.ParsePrefix(peer.EndpointIP)
185
-	if err != nil {
186
-		log.G(context.TODO()).WithError(err).Errorf("Invalid peer IP %s received in event notify", peer.EndpointIP)
187
-		return
185
+	if value != nil {
186
+		var err error
187
+		newPeer, err = UnmarshalPeerRecord(value)
188
+		if err != nil {
189
+			log.G(context.TODO()).WithError(err).Error("Failed to unmarshal peer record")
190
+		}
191
+		if newPeer.TunnelEndpointIP == d.advertiseAddress {
192
+			newPeer = nil
193
+		}
188 194
 	}
189 195
 
190
-	mac, err := hashable.ParseMAC(peer.EndpointMAC)
191
-	if err != nil {
192
-		log.G(context.TODO()).WithError(err).Errorf("Invalid mac %s received in event notify", peer.EndpointMAC)
196
+	if prevPeer == nil && newPeer == nil {
197
+		// Nothing to do! Either the event was for a local peer,
198
+		// or unmarshaling failed.
193 199
 		return
194 200
 	}
195
-
196
-	vtep, err := netip.ParseAddr(peer.TunnelEndpointIP)
197
-	if err != nil {
198
-		log.G(context.TODO()).WithError(err).Errorf("Invalid VTEP %s received in event notify", peer.TunnelEndpointIP)
201
+	if prevPeer != nil && newPeer != nil && *prevPeer == *newPeer {
202
+		// The update did not materially change the FDB entry.
199 203
 		return
200 204
 	}
201 205
 
... ...
@@ -209,21 +212,23 @@ func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key stri
209 209
 	}
210 210
 	defer unlock()
211 211
 
212
-	var opname string
213
-	if etype == driverapi.Delete {
214
-		opname = "delete"
215
-		err = n.peerDelete(eid, addr, mac, vtep)
216
-	} else {
217
-		opname = "add"
218
-		err = n.peerAdd(eid, addr, mac, vtep)
212
+	if prevPeer != nil {
213
+		if err := n.peerDelete(eid, prevPeer.EndpointIP, prevPeer.EndpointMAC, prevPeer.TunnelEndpointIP); err != nil {
214
+			log.G(context.TODO()).WithFields(log.Fields{
215
+				"error": err,
216
+				"nid":   n.id,
217
+				"peer":  prevPeer,
218
+			}).Warn("overlay: failed to delete peer entry")
219
+		}
219 220
 	}
220
-	if err != nil {
221
-		log.G(context.TODO()).WithFields(log.Fields{
222
-			"error": err,
223
-			"nid":   n.id,
224
-			"peer":  peer,
225
-			"op":    opname,
226
-		}).Warn("Peer operation failed")
221
+	if newPeer != nil {
222
+		if err := n.peerAdd(eid, newPeer.EndpointIP, newPeer.EndpointMAC, newPeer.TunnelEndpointIP); err != nil {
223
+			log.G(context.TODO()).WithFields(log.Fields{
224
+				"error": err,
225
+				"nid":   n.id,
226
+				"peer":  newPeer,
227
+			}).Warn("overlay: failed to add peer entry")
228
+		}
227 229
 	}
228 230
 }
229 231
 
... ...
@@ -1,4 +1,42 @@
1 1
 package overlay
2 2
 
3
+import (
4
+	"fmt"
5
+	"net/netip"
6
+
7
+	"github.com/docker/docker/daemon/libnetwork/internal/hashable"
8
+	"github.com/gogo/protobuf/proto"
9
+)
10
+
3 11
 // OverlayPeerTable is the NetworkDB table for overlay network peer discovery.
4 12
 const OverlayPeerTable = "overlay_peer_table"
13
+
14
+type Peer struct {
15
+	EndpointIP       netip.Prefix
16
+	EndpointMAC      hashable.MACAddr
17
+	TunnelEndpointIP netip.Addr
18
+}
19
+
20
+func UnmarshalPeerRecord(data []byte) (*Peer, error) {
21
+	var pr PeerRecord
22
+	if err := proto.Unmarshal(data, &pr); err != nil {
23
+		return nil, fmt.Errorf("failed to unmarshal peer record: %w", err)
24
+	}
25
+	var (
26
+		p   Peer
27
+		err error
28
+	)
29
+	p.EndpointIP, err = netip.ParsePrefix(pr.EndpointIP)
30
+	if err != nil {
31
+		return nil, fmt.Errorf("invalid peer IP %q received: %w", pr.EndpointIP, err)
32
+	}
33
+	p.EndpointMAC, err = hashable.ParseMAC(pr.EndpointMAC)
34
+	if err != nil {
35
+		return nil, fmt.Errorf("invalid MAC %q received: %w", pr.EndpointMAC, err)
36
+	}
37
+	p.TunnelEndpointIP, err = netip.ParseAddr(pr.TunnelEndpointIP)
38
+	if err != nil {
39
+		return nil, fmt.Errorf("invalid VTEP %q received: %w", pr.TunnelEndpointIP, err)
40
+	}
41
+	return &p, nil
42
+}
... ...
@@ -3,12 +3,10 @@ package overlay
3 3
 import (
4 4
 	"context"
5 5
 	"fmt"
6
-	"net"
7 6
 
8 7
 	"github.com/containerd/log"
9 8
 	"github.com/docker/docker/daemon/libnetwork/driverapi"
10 9
 	"github.com/docker/docker/daemon/libnetwork/drivers/overlay"
11
-	"github.com/docker/docker/daemon/libnetwork/types"
12 10
 	"github.com/gogo/protobuf/proto"
13 11
 	"go.opentelemetry.io/otel"
14 12
 	"go.opentelemetry.io/otel/attribute"
... ...
@@ -57,7 +55,7 @@ func (d *driver) Join(ctx context.Context, nid, eid string, sboxKey string, jinf
57 57
 	return nil
58 58
 }
59 59
 
60
-func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key string, value []byte) {
60
+func (d *driver) EventNotify(nid, tableName, key string, prev, value []byte) {
61 61
 	if tableName != overlay.OverlayPeerTable {
62 62
 		log.G(context.TODO()).Errorf("Unexpected table notification for table %s received", tableName)
63 63
 		return
... ...
@@ -65,49 +63,62 @@ func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key stri
65 65
 
66 66
 	eid := key
67 67
 
68
-	var peer overlay.PeerRecord
69
-	if err := proto.Unmarshal(value, &peer); err != nil {
70
-		log.G(context.TODO()).Errorf("Failed to unmarshal peer record: %v", err)
71
-		return
72
-	}
73
-
74 68
 	n := d.network(nid)
75 69
 	if n == nil {
76 70
 		return
77 71
 	}
78 72
 
79
-	// Ignore local peers. We already know about them and they
80
-	// should not be added to vxlan fdb.
81
-	if peer.TunnelEndpointIP == n.providerAddress {
82
-		return
83
-	}
84
-
85
-	addr, err := types.ParseCIDR(peer.EndpointIP)
86
-	if err != nil {
87
-		log.G(context.TODO()).Errorf("Invalid peer IP %s received in event notify", peer.EndpointIP)
88
-		return
89
-	}
90
-
91
-	mac, err := net.ParseMAC(peer.EndpointMAC)
92
-	if err != nil {
93
-		log.G(context.TODO()).Errorf("Invalid mac %s received in event notify", peer.EndpointMAC)
94
-		return
95
-	}
96
-
97
-	vtep := net.ParseIP(peer.TunnelEndpointIP)
98
-	if vtep == nil {
99
-		log.G(context.TODO()).Errorf("Invalid VTEP %s received in event notify", peer.TunnelEndpointIP)
73
+	var prevPeer, newPeer *overlay.Peer
74
+	if prev != nil {
75
+		var err error
76
+		prevPeer, err = overlay.UnmarshalPeerRecord(prev)
77
+		if err != nil {
78
+			log.G(context.TODO()).WithError(err).Error("Failed to unmarshal previous peer record")
79
+		}
80
+		if prevPeer.TunnelEndpointIP.String() == n.providerAddress {
81
+			// Ignore local peers. We don't add them to the VXLAN
82
+			// FDB so don't need to remove them.
83
+			prevPeer = nil
84
+		}
85
+	}
86
+	if value != nil {
87
+		var err error
88
+		newPeer, err = overlay.UnmarshalPeerRecord(value)
89
+		if err != nil {
90
+			log.G(context.TODO()).WithError(err).Error("Failed to unmarshal peer record")
91
+		}
92
+		if prevPeer.TunnelEndpointIP.String() == n.providerAddress {
93
+			newPeer = nil
94
+		}
95
+	}
96
+
97
+	if prevPeer == nil && newPeer == nil {
98
+		// Nothing to do! Either the event was for a local peer,
99
+		// or unmarshaling failed.
100 100
 		return
101 101
 	}
102
-
103
-	if etype == driverapi.Delete {
104
-		d.peerDelete(nid, eid, addr.IP, addr.Mask, mac, vtep, true)
102
+	if prevPeer != nil && newPeer != nil && *prevPeer == *newPeer {
103
+		// The update did not materially change the FDB entry.
105 104
 		return
106 105
 	}
107 106
 
108
-	err = d.peerAdd(nid, eid, addr.IP, addr.Mask, mac, vtep, true)
109
-	if err != nil {
110
-		log.G(context.TODO()).Errorf("peerAdd failed (%v) for ip %s with mac %s", err, addr.IP.String(), mac.String())
107
+	if prevPeer != nil {
108
+		if err := d.peerDelete(nid, eid, prevPeer.EndpointIP.Addr().AsSlice(), true); err != nil {
109
+			log.G(context.TODO()).WithFields(log.Fields{
110
+				"error": err,
111
+				"nid":   n.id,
112
+				"peer":  prevPeer,
113
+			}).Warn("overlay: failed to delete peer entry")
114
+		}
115
+	}
116
+	if newPeer != nil {
117
+		if err := d.peerAdd(nid, eid, newPeer.EndpointIP.Addr().AsSlice(), newPeer.EndpointMAC.AsSlice(), newPeer.TunnelEndpointIP.AsSlice(), true); err != nil {
118
+			log.G(context.TODO()).WithFields(log.Fields{
119
+				"error": err,
120
+				"nid":   n.id,
121
+				"peer":  newPeer,
122
+			}).Warn("overlay: failed to add peer entry")
123
+		}
111 124
 	}
112 125
 }
113 126
 
... ...
@@ -12,7 +12,7 @@ import (
12 12
 	"github.com/docker/docker/daemon/libnetwork/types"
13 13
 )
14 14
 
15
-func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, peerMac net.HardwareAddr, vtep net.IP, updateDb bool) error {
15
+func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerMac net.HardwareAddr, vtep net.IP, updateDb bool) error {
16 16
 	log.G(context.TODO()).Debugf("WINOVERLAY: Enter peerAdd for ca ip %s with ca mac %s", peerIP.String(), peerMac.String())
17 17
 
18 18
 	if err := validateID(nid, eid); err != nil {
... ...
@@ -82,7 +82,7 @@ func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
82 82
 	return nil
83 83
 }
84 84
 
85
-func (d *driver) peerDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, peerMac net.HardwareAddr, vtep net.IP, updateDb bool) error {
85
+func (d *driver) peerDelete(nid, eid string, peerIP net.IP, updateDb bool) error {
86 86
 	log.G(context.TODO()).Infof("WINOVERLAY: Enter peerDelete for endpoint %s and peer ip %s", eid, peerIP.String())
87 87
 
88 88
 	if err := validateID(nid, eid); err != nil {