Browse code

Fixed race on quick node fail/join

The previous logic was not properly handling the case of a node
that was failing and oining back in short period of time.
The issue was in the handling of the network messages.
When a node joins it sync with other nodes, these are passing
the whole list of nodes that at best of their knowledge are part
of a network. At this point if the node receives that node A is part
of the network it saves it before having received the notification
that node A is actually alive (coming from memberlist).
If node A failed the source node will receive the notification
while the new joined node won't because memberlist never advertise
node A as available. In this case the new node will never purge
node A from its state but also worse, will accept any table notification
where node A is the owner and so will end up in a out of sync state
with the rest of the cluster.

This commit contains also some code cleanup around the area of node
management

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>

Flavio Crisciani authored on 2017/11/17 09:30:27
Showing 5 changed files
... ...
@@ -4,7 +4,6 @@ import (
4 4
 	"fmt"
5 5
 	"net"
6 6
 	"net/http"
7
-	"strconv"
8 7
 	"sync"
9 8
 
10 9
 	"github.com/sirupsen/logrus"
... ...
@@ -82,7 +81,7 @@ func (n *Server) EnableDebug(ip string, port int) {
82 82
 	// go func() {
83 83
 	// 	http.Serve(n.sk, n.mux)
84 84
 	// }()
85
-	http.ListenAndServe(":"+strconv.Itoa(port), n.mux)
85
+	http.ListenAndServe(fmt.Sprintf(":%d", port), n.mux)
86 86
 }
87 87
 
88 88
 // DisableDebug stop the dubug and closes the tcp socket
... ...
@@ -255,13 +255,18 @@ func (nDB *NetworkDB) triggerFunc(stagger time.Duration, C <-chan time.Time, sto
255 255
 func (nDB *NetworkDB) reapDeadNode() {
256 256
 	nDB.Lock()
257 257
 	defer nDB.Unlock()
258
-	for id, n := range nDB.failedNodes {
259
-		if n.reapTime > 0 {
260
-			n.reapTime -= nodeReapPeriod
261
-			continue
258
+	for _, nodeMap := range []map[string]*node{
259
+		nDB.failedNodes,
260
+		nDB.leftNodes,
261
+	} {
262
+		for id, n := range nodeMap {
263
+			if n.reapTime > nodeReapPeriod {
264
+				n.reapTime -= nodeReapPeriod
265
+				continue
266
+			}
267
+			logrus.Debugf("Garbage collect node %v", n.Name)
268
+			delete(nodeMap, id)
262 269
 		}
263
-		logrus.Debugf("Removing failed node %v from gossip cluster", n.Name)
264
-		delete(nDB.failedNodes, id)
265 270
 	}
266 271
 }
267 272
 
... ...
@@ -374,7 +379,6 @@ func (nDB *NetworkDB) gossip() {
374 374
 	thisNodeNetworks := nDB.networks[nDB.config.NodeID]
375 375
 	for nid := range thisNodeNetworks {
376 376
 		networkNodes[nid] = nDB.networkNodes[nid]
377
-
378 377
 	}
379 378
 	printStats := time.Since(nDB.lastStatsTimestamp) >= nDB.config.StatsPrintPeriod
380 379
 	printHealth := time.Since(nDB.lastHealthTimestamp) >= nDB.config.HealthPrintPeriod
... ...
@@ -16,9 +16,12 @@ func (d *delegate) NodeMeta(limit int) []byte {
16 16
 	return []byte{}
17 17
 }
18 18
 
19
-func (nDB *NetworkDB) getNode(nEvent *NodeEvent) *node {
20
-	nDB.Lock()
21
-	defer nDB.Unlock()
19
+// getNode searches the node inside the tables
20
+// returns true if the node was respectively in the active list, explicit node leave list or failed list
21
+func (nDB *NetworkDB) getNode(nEvent *NodeEvent, extract bool) (bool, bool, bool, *node) {
22
+	var active bool
23
+	var left bool
24
+	var failed bool
22 25
 
23 26
 	for _, nodes := range []map[string]*node{
24 27
 		nDB.failedNodes,
... ...
@@ -26,35 +29,19 @@ func (nDB *NetworkDB) getNode(nEvent *NodeEvent) *node {
26 26
 		nDB.nodes,
27 27
 	} {
28 28
 		if n, ok := nodes[nEvent.NodeName]; ok {
29
+			active = &nodes == &nDB.nodes
30
+			left = &nodes == &nDB.leftNodes
31
+			failed = &nodes == &nDB.failedNodes
29 32
 			if n.ltime >= nEvent.LTime {
30
-				return nil
33
+				return active, left, failed, nil
31 34
 			}
32
-			return n
33
-		}
34
-	}
35
-	return nil
36
-}
37
-
38
-func (nDB *NetworkDB) checkAndGetNode(nEvent *NodeEvent) *node {
39
-	nDB.Lock()
40
-	defer nDB.Unlock()
41
-
42
-	for _, nodes := range []map[string]*node{
43
-		nDB.failedNodes,
44
-		nDB.leftNodes,
45
-		nDB.nodes,
46
-	} {
47
-		if n, ok := nodes[nEvent.NodeName]; ok {
48
-			if n.ltime >= nEvent.LTime {
49
-				return nil
35
+			if extract {
36
+				delete(nodes, n.Name)
50 37
 			}
51
-
52
-			delete(nodes, n.Name)
53
-			return n
38
+			return active, left, failed, n
54 39
 		}
55 40
 	}
56
-
57
-	return nil
41
+	return active, left, failed, nil
58 42
 }
59 43
 
60 44
 func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
... ...
@@ -62,11 +49,14 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
62 62
 	// time.
63 63
 	nDB.networkClock.Witness(nEvent.LTime)
64 64
 
65
-	n := nDB.getNode(nEvent)
65
+	nDB.RLock()
66
+	active, left, _, n := nDB.getNode(nEvent, false)
66 67
 	if n == nil {
68
+		nDB.RUnlock()
67 69
 		return false
68 70
 	}
69
-	// If its a node leave event for a manager and this is the only manager we
71
+	nDB.RUnlock()
72
+	// If it is a node leave event for a manager and this is the only manager we
70 73
 	// know of we want the reconnect logic to kick in. In a single manager
71 74
 	// cluster manager's gossip can't be bootstrapped unless some other node
72 75
 	// connects to it.
... ...
@@ -79,28 +69,38 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
79 79
 		}
80 80
 	}
81 81
 
82
-	n = nDB.checkAndGetNode(nEvent)
83
-	if n == nil {
84
-		return false
85
-	}
86
-
87 82
 	n.ltime = nEvent.LTime
88 83
 
89 84
 	switch nEvent.Type {
90 85
 	case NodeEventTypeJoin:
86
+		if active {
87
+			// the node is already marked as active nothing to do
88
+			return false
89
+		}
91 90
 		nDB.Lock()
92
-		_, found := nDB.nodes[n.Name]
93
-		nDB.nodes[n.Name] = n
94
-		nDB.Unlock()
95
-		if !found {
91
+		// Because the lock got released on the previous check we have to do it again and re verify the status of the node
92
+		// All of this is to avoid a big lock on the function
93
+		if active, _, _, n = nDB.getNode(nEvent, true); !active && n != nil {
94
+			n.reapTime = 0
95
+			nDB.nodes[n.Name] = n
96 96
 			logrus.Infof("%v(%v): Node join event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr)
97 97
 		}
98
+		nDB.Unlock()
98 99
 		return true
99 100
 	case NodeEventTypeLeave:
101
+		if left {
102
+			// the node is already marked as left nothing to do.
103
+			return false
104
+		}
100 105
 		nDB.Lock()
101
-		nDB.leftNodes[n.Name] = n
106
+		// Because the lock got released on the previous check we have to do it again and re verify the status of the node
107
+		// All of this is to avoid a big lock on the function
108
+		if _, left, _, n = nDB.getNode(nEvent, true); !left && n != nil {
109
+			n.reapTime = nodeReapInterval
110
+			nDB.leftNodes[n.Name] = n
111
+			logrus.Infof("%v(%v): Node leave event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr)
112
+		}
102 113
 		nDB.Unlock()
103
-		logrus.Infof("%v(%v): Node leave event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr)
104 114
 		return true
105 115
 	}
106 116
 
... ...
@@ -162,6 +162,12 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
162 162
 		return false
163 163
 	}
164 164
 
165
+	// If the node is not known from memberlist we cannot process save any state of it else if it actually
166
+	// dies we won't receive any notification and we will remain stuck with it
167
+	if _, ok := nDB.nodes[nEvent.NodeName]; !ok {
168
+		return false
169
+	}
170
+
165 171
 	// This remote network join is being seen the first time.
166 172
 	nodeNetworks[nEvent.NetworkID] = &network{
167 173
 		id:    nEvent.NetworkID,
... ...
@@ -466,7 +472,7 @@ func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) {
466 466
 	var gMsg GossipMessage
467 467
 	err := proto.Unmarshal(buf, &gMsg)
468 468
 	if err != nil {
469
-		logrus.Errorf("Error unmarshalling push pull messsage: %v", err)
469
+		logrus.Errorf("Error unmarshalling push pull message: %v", err)
470 470
 		return
471 471
 	}
472 472
 
... ...
@@ -21,10 +21,29 @@ func (e *eventDelegate) broadcastNodeEvent(addr net.IP, op opType) {
21 21
 	}
22 22
 }
23 23
 
24
+func (e *eventDelegate) purgeReincarnation(mn *memberlist.Node) {
25
+	for name, node := range e.nDB.failedNodes {
26
+		if node.Addr.Equal(mn.Addr) {
27
+			logrus.Infof("Node %s/%s, is the new incarnation of the failed node %s/%s", mn.Name, mn.Addr, name, node.Addr)
28
+			delete(e.nDB.failedNodes, name)
29
+			return
30
+		}
31
+	}
32
+
33
+	for name, node := range e.nDB.leftNodes {
34
+		if node.Addr.Equal(mn.Addr) {
35
+			logrus.Infof("Node %s/%s, is the new incarnation of the shutdown node %s/%s", mn.Name, mn.Addr, name, node.Addr)
36
+			delete(e.nDB.leftNodes, name)
37
+			return
38
+		}
39
+	}
40
+}
41
+
24 42
 func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
25 43
 	logrus.Infof("Node %s/%s, joined gossip cluster", mn.Name, mn.Addr)
26 44
 	e.broadcastNodeEvent(mn.Addr, opCreate)
27 45
 	e.nDB.Lock()
46
+	defer e.nDB.Unlock()
28 47
 	// In case the node is rejoining after a failure or leave,
29 48
 	// wait until an explicit join message arrives before adding
30 49
 	// it to the nodes just to make sure this is not a stale
... ...
@@ -32,12 +51,15 @@ func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
32 32
 	_, fOk := e.nDB.failedNodes[mn.Name]
33 33
 	_, lOk := e.nDB.leftNodes[mn.Name]
34 34
 	if fOk || lOk {
35
-		e.nDB.Unlock()
36 35
 		return
37 36
 	}
38 37
 
38
+	// Every node has a unique ID
39
+	// Check on the base of the IP address if the new node that joined is actually a new incarnation of a previous
40
+	// failed or shutdown one
41
+	e.purgeReincarnation(mn)
42
+
39 43
 	e.nDB.nodes[mn.Name] = &node{Node: *mn}
40
-	e.nDB.Unlock()
41 44
 	logrus.Infof("Node %s/%s, added to nodes list", mn.Name, mn.Addr)
42 45
 }
43 46
 
... ...
@@ -49,18 +71,28 @@ func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {
49 49
 	// If the node was temporary down, deleting the entries will guarantee that the CREATE events will be accepted
50 50
 	// If the node instead left because was going down, then it makes sense to just delete all its state
51 51
 	e.nDB.Lock()
52
+	defer e.nDB.Unlock()
52 53
 	e.nDB.deleteNetworkEntriesForNode(mn.Name)
53 54
 	e.nDB.deleteNodeTableEntries(mn.Name)
54 55
 	if n, ok := e.nDB.nodes[mn.Name]; ok {
55 56
 		delete(e.nDB.nodes, mn.Name)
56 57
 
58
+		// Check if a new incarnation of the same node already joined
59
+		// In that case this node can simply be removed and no further action are needed
60
+		for name, node := range e.nDB.nodes {
61
+			if node.Addr.Equal(mn.Addr) {
62
+				logrus.Infof("Node %s/%s, is the new incarnation of the failed node %s/%s", name, node.Addr, mn.Name, mn.Addr)
63
+				return
64
+			}
65
+		}
66
+
57 67
 		// In case of node failure, keep retrying to reconnect every retryInterval (1sec) for nodeReapInterval (24h)
58 68
 		// Explicit leave will have already removed the node from the list of nodes (nDB.nodes) and put it into the leftNodes map
59 69
 		n.reapTime = nodeReapInterval
60 70
 		e.nDB.failedNodes[mn.Name] = n
61 71
 		failed = true
62 72
 	}
63
-	e.nDB.Unlock()
73
+
64 74
 	if failed {
65 75
 		logrus.Infof("Node %s/%s, added to failed nodes list", mn.Name, mn.Addr)
66 76
 	}
... ...
@@ -310,6 +310,10 @@ func (nDB *NetworkDB) Peers(nid string) []PeerInfo {
310 310
 				Name: node.Name,
311 311
 				IP:   node.Addr.String(),
312 312
 			})
313
+		} else {
314
+			// Added for testing purposes, this condition should never happen else mean that the network list
315
+			// is out of sync with the node list
316
+			peers = append(peers, PeerInfo{})
313 317
 		}
314 318
 	}
315 319
 	return peers
... ...
@@ -593,6 +597,9 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
593 593
 	nodeNetworks[nid] = &network{id: nid, ltime: ltime, entriesNumber: entries}
594 594
 	nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
595 595
 		NumNodes: func() int {
596
+			//TODO fcrisciani this can be optimized maybe avoiding the lock?
597
+			// this call is done each GetBroadcasts call to evaluate the number of
598
+			// replicas for the message
596 599
 			nDB.RLock()
597 600
 			defer nDB.RUnlock()
598 601
 			return len(nDB.networkNodes[nid])