Browse code

libn/networkdb: fix waiting for many bulkSync ACKs

Concurrent bulkSyncNode calls targeting the same node overwrite each
other's entry in bulkSyncAckTbl. Only the last channel gets closed by
handleBulkSync; the rest block for 30s on a channel nobody will ever
close. This causes unnecessary delays for DNS resolution on newly
joined swarm nodes.

Only have unsolicited bulk syncs subscribe to be notified when the peer
replies with its own bulk sync as only unsolicited bulk syncs solicit a
reply. Correlate the reply to its soliciting bulk-sync using Lamport
timestamps.

Co-authored-by: Dustin Kaiser <8209087+mrnicegyu11@users.noreply.github.com>
Signed-off-by: Cory Snider <csnider@mirantis.com>

Cory Snider authored on 2026/06/13 02:55:03
Showing 4 changed files
... ...
@@ -597,7 +597,7 @@ func (nDB *NetworkDB) bulkSync(nodes []string, all bool) ([]string, error) {
597 597
 // Bulk sync all the table entries belonging to a set of networks to a
598 598
 // single peer node. It can be unsolicited or can be in response to an
599 599
 // unsolicited bulk sync
600
-func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited bool) error {
600
+func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited bool) (retErr error) {
601 601
 	var msgs [][]byte
602 602
 
603 603
 	var unsolMsg string
... ...
@@ -651,7 +651,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
651 651
 	compound := makeCompoundMessage(msgs)
652 652
 
653 653
 	bsm := BulkSyncMessage{
654
-		LTime:       nDB.tableClock.Time(),
654
+		LTime:       nDB.tableClock.Increment(),
655 655
 		Unsolicited: unsolicited,
656 656
 		NodeName:    nDB.config.NodeID,
657 657
 		Networks:    networks,
... ...
@@ -663,33 +663,58 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
663 663
 		return fmt.Errorf("failed to encode bulk sync message: %v", err)
664 664
 	}
665 665
 
666
-	nDB.Lock()
667
-	ch := make(chan struct{})
668
-	nDB.bulkSyncAckTbl[node] = ch
669
-	nDB.Unlock()
670
-
671
-	err = nDB.memberlist.SendReliable(&mnode.Node, buf)
672
-	if err != nil {
666
+	// Wait on a response only if we are sending an unsolicited bulk sync,
667
+	// as only unsolicited bulk syncs trigger a response.
668
+	if unsolicited {
673 669
 		nDB.Lock()
674
-		delete(nDB.bulkSyncAckTbl, node)
670
+		ch := make(chan struct{})
671
+		nDB.bulkSyncAckTbl[node] = append(nDB.bulkSyncAckTbl[node], bulkSyncSubscription{
672
+			LTime: bsm.LTime,
673
+			Done:  ch,
674
+		})
675 675
 		nDB.Unlock()
676 676
 
677
-		return fmt.Errorf("failed to send a TCP message during bulk sync: %v", err)
678
-	}
679
-
680
-	// Wait on a response only if it is unsolicited.
681
-	if unsolicited {
682
-		startTime := time.Now()
683
-		t := time.NewTimer(30 * time.Second)
684
-		select {
685
-		case <-t.C:
686
-			log.G(context.TODO()).Errorf("Bulk sync to node %s timed out", node)
687
-		case <-ch:
688
-			log.G(context.TODO()).Debugf("%v(%v): Bulk sync to node %s took %s", nDB.config.Hostname, nDB.config.NodeID, node, time.Since(startTime))
677
+		unsubscribe := func() {
678
+			nDB.Lock()
679
+			defer nDB.Unlock()
680
+			subscriptions := nDB.bulkSyncAckTbl[node]
681
+			for i, sub := range subscriptions {
682
+				if sub.Done == ch {
683
+					subscriptions = slices.Delete(subscriptions, i, i+1)
684
+					break
685
+				}
686
+			}
687
+			if len(subscriptions) > 0 {
688
+				nDB.bulkSyncAckTbl[node] = subscriptions
689
+			} else {
690
+				delete(nDB.bulkSyncAckTbl, node)
691
+			}
689 692
 		}
690
-		t.Stop()
693
+
694
+		defer func() {
695
+			if retErr != nil {
696
+				unsubscribe()
697
+				return
698
+			}
699
+
700
+			startTime := time.Now()
701
+			t := time.NewTimer(30 * time.Second)
702
+			select {
703
+			case <-t.C:
704
+				log.G(context.TODO()).Errorf("Bulk sync to node %s timed out", node)
705
+				nDB.bulkSyncAckTimeouts.Add(1)
706
+				unsubscribe()
707
+			case <-ch:
708
+				log.G(context.TODO()).Debugf("%v(%v): Bulk sync to node %s took %s", nDB.config.Hostname, nDB.config.NodeID, node, time.Since(startTime))
709
+			}
710
+			t.Stop()
711
+		}()
691 712
 	}
692 713
 
714
+	err = nDB.memberlist.SendReliable(&mnode.Node, buf)
715
+	if err != nil {
716
+		return fmt.Errorf("failed to send a TCP message during bulk sync: %v", err)
717
+	}
693 718
 	return nil
694 719
 }
695 720
 
... ...
@@ -357,28 +357,35 @@ func (nDB *NetworkDB) handleBulkSync(buf []byte) {
357 357
 
358 358
 	nDB.handleMessage(bsm.Payload, true)
359 359
 
360
-	// Don't respond to a bulk sync which was not unsolicited
361
-	if !bsm.Unsolicited {
362
-		nDB.Lock()
363
-		ch, ok := nDB.bulkSyncAckTbl[bsm.NodeName]
364
-		if ok {
365
-			close(ch)
366
-			delete(nDB.bulkSyncAckTbl, bsm.NodeName)
360
+	nDB.Lock()
361
+	acks := nDB.bulkSyncAckTbl[bsm.NodeName]
362
+	var pendingAcks []bulkSyncSubscription
363
+	for _, ack := range acks {
364
+		if bsm.LTime > ack.LTime {
365
+			close(ack.Done)
366
+		} else {
367
+			pendingAcks = append(pendingAcks, ack)
367 368
 		}
368
-		nDB.Unlock()
369
-
370
-		return
371 369
 	}
372
-
373
-	var nodeAddr net.IP
374
-	nDB.RLock()
375
-	if node, ok := nDB.nodes[bsm.NodeName]; ok {
376
-		nodeAddr = node.Addr
370
+	if len(pendingAcks) > 0 {
371
+		nDB.bulkSyncAckTbl[bsm.NodeName] = pendingAcks
372
+	} else {
373
+		delete(nDB.bulkSyncAckTbl, bsm.NodeName)
377 374
 	}
378
-	nDB.RUnlock()
375
+	nDB.Unlock()
379 376
 
380
-	if err := nDB.bulkSyncNode(bsm.Networks, bsm.NodeName, false); err != nil {
381
-		log.G(context.TODO()).Errorf("Error in responding to bulk sync from node %s: %v", nodeAddr, err)
377
+	// Only respond to an unsolicited bulk sync.
378
+	if bsm.Unsolicited {
379
+		var nodeAddr net.IP
380
+		nDB.RLock()
381
+		if node, ok := nDB.nodes[bsm.NodeName]; ok {
382
+			nodeAddr = node.Addr
383
+		}
384
+		nDB.RUnlock()
385
+
386
+		if err := nDB.bulkSyncNode(bsm.Networks, bsm.NodeName, false); err != nil {
387
+			log.G(context.TODO()).Errorf("Error in responding to bulk sync from node %s: %v", nodeAddr, err)
388
+		}
382 389
 	}
383 390
 }
384 391
 
... ...
@@ -79,9 +79,13 @@ type NetworkDB struct {
79 79
 	// network. The key is a network ID.
80 80
 	networkNodes map[string][]string
81 81
 
82
-	// A table of ack channels for every node from which we are
83
-	// waiting for an ack.
84
-	bulkSyncAckTbl map[string]chan struct{}
82
+	// A table of subscriptions for every node from which we are waiting for
83
+	// a bulk-sync ack.
84
+	bulkSyncAckTbl map[string][]bulkSyncSubscription
85
+
86
+	// A count of the number of times we have timed out waiting for a bulk
87
+	// sync ack from any peer node.
88
+	bulkSyncAckTimeouts atomic.Uint64
85 89
 
86 90
 	// Broadcast queue for network event gossip.
87 91
 	networkBroadcasts *memberlist.TransmitLimitedQueue
... ...
@@ -261,6 +265,14 @@ type entry struct {
261 261
 	reapTime time.Duration
262 262
 }
263 263
 
264
+// bulkSyncSubscription is a subscription to the bulk-sync progress for a peer node.
265
+// Done is closed when a bulk sync from the node is received with an LTime
266
+// greater than the subscription's LTime.
267
+type bulkSyncSubscription struct {
268
+	LTime serf.LamportTime
269
+	Done  chan<- struct{}
270
+}
271
+
264 272
 // DefaultConfig returns a NetworkDB config with default values
265 273
 func DefaultConfig() *Config {
266 274
 	hostname, _ := os.Hostname()
... ...
@@ -310,7 +322,7 @@ func newNetworkDB(c *Config) *NetworkDB {
310 310
 		failedNodes:      make(map[string]*node),
311 311
 		leftNodes:        make(map[string]*node),
312 312
 		networkNodes:     make(map[string][]string),
313
-		bulkSyncAckTbl:   make(map[string]chan struct{}),
313
+		bulkSyncAckTbl:   make(map[string][]bulkSyncSubscription),
314 314
 		broadcaster:      events.NewBroadcaster(),
315 315
 		rng:              rand.New(rand.NewChaCha8(rngSeed)), //gosec:disable G404 -- not used in a security sensitive context
316 316
 	}
... ...
@@ -17,6 +17,7 @@ import (
17 17
 	"github.com/docker/go-events"
18 18
 	"github.com/hashicorp/memberlist"
19 19
 	"github.com/moby/moby/v2/daemon/internal/stringid"
20
+	"golang.org/x/sync/errgroup"
20 21
 	"gotest.tools/v3/assert"
21 22
 	is "gotest.tools/v3/assert/cmp"
22 23
 	"gotest.tools/v3/poll"
... ...
@@ -464,6 +465,55 @@ func TestNetworkDBBulkSync(t *testing.T) {
464 464
 	closeNetworkDBInstances(t, dbs)
465 465
 }
466 466
 
467
+// Regression test for https://github.com/moby/moby/issues/51701
468
+func TestNetworkDBBulkSyncNodeConcurrent(t *testing.T) {
469
+	dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
470
+	defer closeNetworkDBInstances(t, dbs)
471
+
472
+	err := dbs[0].JoinNetwork("network1")
473
+	assert.NilError(t, err)
474
+
475
+	dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
476
+
477
+	err = dbs[1].JoinNetwork("network1")
478
+	assert.NilError(t, err)
479
+
480
+	dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true)
481
+
482
+	for i := range 50 {
483
+		err = dbs[0].CreateEntry("test_table", "network1",
484
+			fmt.Sprintf("key%d", i),
485
+			fmt.Appendf(nil, "val%d", i))
486
+		assert.NilError(t, err)
487
+	}
488
+
489
+	const N = 4
490
+	start := make(chan struct{})
491
+	var eg errgroup.Group
492
+	for i := range N {
493
+		eg.Go(func() error {
494
+			<-start
495
+			err := dbs[1].bulkSyncNode(
496
+				[]string{"network1"}, dbs[0].config.NodeID, true)
497
+			if err != nil {
498
+				return fmt.Errorf("[%d] %w", i, err)
499
+			}
500
+			return nil
501
+		})
502
+	}
503
+	close(start)
504
+	assert.NilError(t, eg.Wait())
505
+
506
+	// No bulk sync should have timed out. On broken code this is N-1.
507
+	assert.Equal(t, dbs[1].bulkSyncAckTimeouts.Load(), uint64(0),
508
+		"expected 0 bulk sync timeouts, but some ack channels were orphaned")
509
+
510
+	dbs[1].RLock()
511
+	defer dbs[1].RUnlock()
512
+	assert.Assert(t, is.Len(dbs[1].bulkSyncAckTbl, 0),
513
+		"bulkSyncAckTbl should be empty after all syncs complete")
514
+}
515
+
467 516
 func TestNetworkDBCRUDMediumCluster(t *testing.T) {
468 517
 	n := 5
469 518