Browse code

Optimize networkDB queue

Added some optimizations to reduce the messages in the queue:
1) on join network the node execute a tcp sync with all the nodes that
it is aware part of the specific network. During this time before the
node was redistributing all the entries. This meant that if the network
had 10K entries the queue of the joining node will jump to 10K. The fix
adds a flag on the network that would avoid to insert any entry in the
queue till the sync happens. Note that right now the flag is set in
a best effort way, there is no real check if at least one of the nodes
succeed.
2) limit the number of messages to redistribute coming from a TCP sync.
Introduced a threshold that limit the number of messages that are
propagated, this will disable this optimization in case of heavy load.

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>

Flavio Crisciani authored on 2018/07/03 08:36:19
Showing 4 changed files
... ...
@@ -110,7 +110,6 @@ type tableEventMessage struct {
110 110
 	tname string
111 111
 	key   string
112 112
 	msg   []byte
113
-	node  string
114 113
 }
115 114
 
116 115
 func (m *tableEventMessage) Invalidates(other memberlist.Broadcast) bool {
... ...
@@ -168,7 +167,6 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st
168 168
 		id:    nid,
169 169
 		tname: tname,
170 170
 		key:   key,
171
-		node:  nDB.config.NodeID,
172 171
 	})
173 172
 	return nil
174 173
 }
... ...
@@ -24,6 +24,9 @@ const (
24 24
 	retryInterval         = 1 * time.Second
25 25
 	nodeReapInterval      = 24 * time.Hour
26 26
 	nodeReapPeriod        = 2 * time.Hour
27
+	// considering a cluster with > 20 nodes and a drain speed of 100 msg/s
28
+	// the following is roughly 1 minute
29
+	maxQueueLenBroadcastOnSync = 500
27 30
 )
28 31
 
29 32
 type logWriter struct{}
... ...
@@ -572,6 +575,7 @@ func (nDB *NetworkDB) bulkSync(nodes []string, all bool) ([]string, error) {
572 572
 
573 573
 	var err error
574 574
 	var networks []string
575
+	var success bool
575 576
 	for _, node := range nodes {
576 577
 		if node == nDB.config.NodeID {
577 578
 			continue
... ...
@@ -579,21 +583,25 @@ func (nDB *NetworkDB) bulkSync(nodes []string, all bool) ([]string, error) {
579 579
 		logrus.Debugf("%v(%v): Initiating bulk sync with node %v", nDB.config.Hostname, nDB.config.NodeID, node)
580 580
 		networks = nDB.findCommonNetworks(node)
581 581
 		err = nDB.bulkSyncNode(networks, node, true)
582
-		// if its periodic bulksync stop after the first successful sync
583
-		if !all && err == nil {
584
-			break
585
-		}
586 582
 		if err != nil {
587 583
 			err = fmt.Errorf("bulk sync to node %s failed: %v", node, err)
588 584
 			logrus.Warn(err.Error())
585
+		} else {
586
+			// bulk sync succeeded
587
+			success = true
588
+			// if its periodic bulksync stop after the first successful sync
589
+			if !all {
590
+				break
591
+			}
589 592
 		}
590 593
 	}
591 594
 
592
-	if err != nil {
593
-		return nil, err
595
+	if success {
596
+		// if at least one node sync succeeded
597
+		return networks, nil
594 598
 	}
595 599
 
596
-	return networks, nil
600
+	return nil, err
597 601
 }
598 602
 
599 603
 // Bulk sync all the table entries belonging to a set of networks to a
... ...
@@ -142,7 +142,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
142 142
 	return true
143 143
 }
144 144
 
145
-func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
145
+func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent, isBulkSync bool) bool {
146 146
 	// Update our local clock if the received messages has newer time.
147 147
 	nDB.tableClock.Witness(tEvent.LTime)
148 148
 
... ...
@@ -175,6 +175,14 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
175 175
 			nDB.Unlock()
176 176
 			return false
177 177
 		}
178
+	} else if tEvent.Type == TableEventTypeDelete && !isBulkSync {
179
+		nDB.Unlock()
180
+		// We don't know the entry, the entry is being deleted and the message is an async message
181
+		// In this case the safest approach is to ignore it, it is possible that the queue grew so much to
182
+		// exceed the garbage collection time (the residual reap time that is in the message is not being
183
+		// updated, to avoid inserting too many messages in the queue).
184
+		// Instead the messages coming from TCP bulk sync are safe with the latest value for the garbage collection time
185
+		return false
178 186
 	}
179 187
 
180 188
 	e = &entry{
... ...
@@ -197,11 +205,17 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
197 197
 	nDB.Unlock()
198 198
 
199 199
 	if err != nil && tEvent.Type == TableEventTypeDelete {
200
-		// If it is a delete event and we did not have a state for it, don't propagate to the application
200
+		// Again we don't know the entry but this is coming from a TCP sync so the message body is up to date.
201
+		// We had saved the state so to speed up convergence and be able to avoid accepting create events.
202
+		// Now we will rebroadcast the message if 2 conditions are met:
203
+		// 1) we had already synced this network (during the network join)
204
+		// 2) the residual reapTime is higher than 1/6 of the total reapTime.
201 205
 		// If the residual reapTime is lower or equal to 1/6 of the total reapTime don't bother broadcasting it around
202
-		// most likely the cluster is already aware of it, if not who will sync with this node will catch the state too.
203
-		// This also avoids that deletion of entries close to their garbage collection ends up circuling around forever
204
-		return e.reapTime > nDB.config.reapEntryInterval/6
206
+		// most likely the cluster is already aware of it
207
+		// This also reduce the possibility that deletion of entries close to their garbage collection ends up circuling around
208
+		// forever
209
+		//logrus.Infof("exiting on delete not knowing the obj with rebroadcast:%t", network.inSync)
210
+		return network.inSync && e.reapTime > nDB.config.reapEntryInterval/6
205 211
 	}
206 212
 
207 213
 	var op opType
... ...
@@ -215,7 +229,7 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
215 215
 	}
216 216
 
217 217
 	nDB.broadcaster.Write(makeEvent(op, tEvent.TableName, tEvent.NetworkID, tEvent.Key, tEvent.Value))
218
-	return true
218
+	return network.inSync
219 219
 }
220 220
 
221 221
 func (nDB *NetworkDB) handleCompound(buf []byte, isBulkSync bool) {
... ...
@@ -244,7 +258,7 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
244 244
 		return
245 245
 	}
246 246
 
247
-	if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast {
247
+	if rebroadcast := nDB.handleTableEvent(&tEvent, isBulkSync); rebroadcast {
248 248
 		var err error
249 249
 		buf, err = encodeRawMessage(MessageTypeTableEvent, buf)
250 250
 		if err != nil {
... ...
@@ -261,12 +275,16 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
261 261
 			return
262 262
 		}
263 263
 
264
+		// if the queue is over the threshold, avoid distributing information coming from TCP sync
265
+		if isBulkSync && n.tableBroadcasts.NumQueued() > maxQueueLenBroadcastOnSync {
266
+			return
267
+		}
268
+
264 269
 		n.tableBroadcasts.QueueBroadcast(&tableEventMessage{
265 270
 			msg:   buf,
266 271
 			id:    tEvent.NetworkID,
267 272
 			tname: tEvent.TableName,
268 273
 			key:   tEvent.Key,
269
-			node:  tEvent.NodeName,
270 274
 		})
271 275
 	}
272 276
 }
... ...
@@ -130,6 +130,9 @@ type network struct {
130 130
 	// Lamport time for the latest state of the entry.
131 131
 	ltime serf.LamportTime
132 132
 
133
+	// Gets set to true after the first bulk sync happens
134
+	inSync bool
135
+
133 136
 	// Node leave is in progress.
134 137
 	leaving bool
135 138
 
... ...
@@ -616,6 +619,7 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
616 616
 	}
617 617
 	nDB.addNetworkNode(nid, nDB.config.NodeID)
618 618
 	networkNodes := nDB.networkNodes[nid]
619
+	n = nodeNetworks[nid]
619 620
 	nDB.Unlock()
620 621
 
621 622
 	if err := nDB.sendNetworkEvent(nid, NetworkEventTypeJoin, ltime); err != nil {
... ...
@@ -627,6 +631,12 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
627 627
 		logrus.Errorf("Error bulk syncing while joining network %s: %v", nid, err)
628 628
 	}
629 629
 
630
+	// Mark the network as being synced
631
+	// note this is a best effort, we are not checking the result of the bulk sync
632
+	nDB.Lock()
633
+	n.inSync = true
634
+	nDB.Unlock()
635
+
630 636
 	return nil
631 637
 }
632 638