Browse code

NetworkDB allow setting PacketSize

- Introduce the possibility to specify the max buffer length
in network DB. This will allow to use the whole MTU limit of
the interface

- Add queue stats per network, it can be handy to identify the
node's throughput per network and identify unbalance between
nodes that can point to an MTU missconfiguration

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>

Flavio Crisciani authored on 2017/07/11 04:05:58
Showing 6 changed files
... ...
@@ -214,8 +214,8 @@ func (c *controller) agentSetup(clusterProvider cluster.Provider) error {
214 214
 	listen := clusterProvider.GetListenAddress()
215 215
 	listenAddr, _, _ := net.SplitHostPort(listen)
216 216
 
217
-	logrus.Infof("Initializing Libnetwork Agent Listen-Addr=%s Local-addr=%s Adv-addr=%s Data-addr=%s Remote-addr-list=%v",
218
-		listenAddr, bindAddr, advAddr, dataAddr, remoteAddrList)
217
+	logrus.Infof("Initializing Libnetwork Agent Listen-Addr=%s Local-addr=%s Adv-addr=%s Data-addr=%s Remote-addr-list=%v MTU=%d",
218
+		listenAddr, bindAddr, advAddr, dataAddr, remoteAddrList, c.Config().Daemon.NetworkControlPlaneMTU)
219 219
 	if advAddr != "" && agent == nil {
220 220
 		if err := c.agentInit(listenAddr, bindAddr, advAddr, dataAddr); err != nil {
221 221
 			logrus.Errorf("error in agentInit: %v", err)
... ...
@@ -286,12 +286,19 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d
286 286
 	nodeName := hostname + "-" + stringid.TruncateID(stringid.GenerateRandomID())
287 287
 	logrus.Info("Gossip cluster hostname ", nodeName)
288 288
 
289
-	nDB, err := networkdb.New(&networkdb.Config{
290
-		BindAddr:      listenAddr,
291
-		AdvertiseAddr: advertiseAddr,
292
-		NodeName:      nodeName,
293
-		Keys:          keys,
294
-	})
289
+	netDBConf := networkdb.DefaultConfig()
290
+	netDBConf.NodeName = nodeName
291
+	netDBConf.BindAddr = listenAddr
292
+	netDBConf.AdvertiseAddr = advertiseAddr
293
+	netDBConf.Keys = keys
294
+	if c.Config().Daemon.NetworkControlPlaneMTU != 0 {
295
+		// Consider the MTU remove the IP hdr (IPv4 or IPv6) and the TCP/UDP hdr.
296
+		// To be on the safe side let's cut 100 bytes
297
+		netDBConf.PacketBufferSize = (c.Config().Daemon.NetworkControlPlaneMTU - 100)
298
+		logrus.Debugf("Control plane MTU: %d will initialize NetworkDB with: %d",
299
+			c.Config().Daemon.NetworkControlPlaneMTU, netDBConf.PacketBufferSize)
300
+	}
301
+	nDB, err := networkdb.New(netDBConf)
295 302
 
296 303
 	if err != nil {
297 304
 		return err
... ...
@@ -26,14 +26,15 @@ type Config struct {
26 26
 
27 27
 // DaemonCfg represents libnetwork core configuration
28 28
 type DaemonCfg struct {
29
-	Debug           bool
30
-	Experimental    bool
31
-	DataDir         string
32
-	DefaultNetwork  string
33
-	DefaultDriver   string
34
-	Labels          []string
35
-	DriverCfg       map[string]interface{}
36
-	ClusterProvider cluster.Provider
29
+	Debug                  bool
30
+	Experimental           bool
31
+	DataDir                string
32
+	DefaultNetwork         string
33
+	DefaultDriver          string
34
+	Labels                 []string
35
+	DriverCfg              map[string]interface{}
36
+	ClusterProvider        cluster.Provider
37
+	NetworkControlPlaneMTU int
37 38
 }
38 39
 
39 40
 // ClusterCfg represents cluster configuration
... ...
@@ -221,6 +222,19 @@ func OptionExperimental(exp bool) Option {
221 221
 	}
222 222
 }
223 223
 
224
+// OptionNetworkControlPlaneMTU function returns an option setter for control plane MTU
225
+func OptionNetworkControlPlaneMTU(exp int) Option {
226
+	return func(c *Config) {
227
+		logrus.Debugf("Network Control Plane MTU: %d", exp)
228
+		if exp < 1500 {
229
+			// if exp == 0 the value won't be used
230
+			logrus.Warnf("Received a MTU of %d, this value is very low,",
231
+				"the network control plane can misbehave", exp)
232
+		}
233
+		c.Daemon.NetworkControlPlaneMTU = exp
234
+	}
235
+}
236
+
224 237
 // ProcessOptions processes options and stores it in config
225 238
 func (c *Config) ProcessOptions(options ...Option) {
226 239
 	for _, opt := range options {
... ...
@@ -232,10 +246,7 @@ func (c *Config) ProcessOptions(options ...Option) {
232 232
 
233 233
 // IsValidName validates configuration objects supported by libnetwork
234 234
 func IsValidName(name string) bool {
235
-	if strings.TrimSpace(name) == "" {
236
-		return false
237
-	}
238
-	return true
235
+	return strings.TrimSpace(name) != ""
239 236
 }
240 237
 
241 238
 // OptionLocalKVProvider function returns an option setter for kvstore provider
... ...
@@ -98,10 +98,14 @@ func (nDB *NetworkDB) RemoveKey(key []byte) {
98 98
 }
99 99
 
100 100
 func (nDB *NetworkDB) clusterInit() error {
101
+	nDB.lastStatsTimestamp = time.Now()
102
+	nDB.lastHealthTimestamp = nDB.lastStatsTimestamp
103
+
101 104
 	config := memberlist.DefaultLANConfig()
102 105
 	config.Name = nDB.config.NodeName
103 106
 	config.BindAddr = nDB.config.BindAddr
104 107
 	config.AdvertiseAddr = nDB.config.AdvertiseAddr
108
+	config.UDPBufferSize = nDB.config.PacketBufferSize
105 109
 
106 110
 	if nDB.config.BindPort != 0 {
107 111
 		config.BindPort = nDB.config.BindPort
... ...
@@ -199,9 +203,8 @@ func (nDB *NetworkDB) clusterJoin(members []string) error {
199 199
 	mlist := nDB.memberlist
200 200
 
201 201
 	if _, err := mlist.Join(members); err != nil {
202
-		// Incase of failure, keep retrying join until it succeeds or the cluster is shutdown.
202
+		// In case of failure, keep retrying join until it succeeds or the cluster is shutdown.
203 203
 		go nDB.retryJoin(members, nDB.stopCh)
204
-
205 204
 		return fmt.Errorf("could not join node to memberlist: %v", err)
206 205
 	}
207 206
 
... ...
@@ -372,11 +375,21 @@ func (nDB *NetworkDB) gossip() {
372 372
 		networkNodes[nid] = nDB.networkNodes[nid]
373 373
 
374 374
 	}
375
+	printStats := time.Since(nDB.lastStatsTimestamp) >= nDB.config.StatsPrintPeriod
376
+	printHealth := time.Since(nDB.lastHealthTimestamp) >= nDB.config.HealthPrintPeriod
375 377
 	nDB.RUnlock()
376 378
 
379
+	if printHealth {
380
+		healthScore := nDB.memberlist.GetHealthScore()
381
+		if healthScore != 0 {
382
+			logrus.Warnf("NetworkDB stats - healthscore:%d (connectivity issues)", healthScore)
383
+		}
384
+		nDB.lastHealthTimestamp = time.Now()
385
+	}
386
+
377 387
 	for nid, nodes := range networkNodes {
378 388
 		mNodes := nDB.mRandomNodes(3, nodes)
379
-		bytesAvail := udpSendBuf - compoundHeaderOverhead
389
+		bytesAvail := nDB.config.PacketBufferSize - compoundHeaderOverhead
380 390
 
381 391
 		nDB.RLock()
382 392
 		network, ok := thisNodeNetworks[nid]
... ...
@@ -397,6 +410,14 @@ func (nDB *NetworkDB) gossip() {
397 397
 		}
398 398
 
399 399
 		msgs := broadcastQ.GetBroadcasts(compoundOverhead, bytesAvail)
400
+		// Collect stats and print the queue info, note this code is here also to have a view of the queues empty
401
+		network.qMessagesSent += len(msgs)
402
+		if printStats {
403
+			logrus.Infof("NetworkDB stats - Queue net:%s qLen:%d netPeers:%d netMsg/s:%d",
404
+				nid, broadcastQ.NumQueued(), broadcastQ.NumNodes(), network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second)))
405
+			network.qMessagesSent = 0
406
+		}
407
+
400 408
 		if len(msgs) == 0 {
401 409
 			continue
402 410
 		}
... ...
@@ -414,11 +435,15 @@ func (nDB *NetworkDB) gossip() {
414 414
 			}
415 415
 
416 416
 			// Send the compound message
417
-			if err := nDB.memberlist.SendToUDP(&mnode.Node, compound); err != nil {
417
+			if err := nDB.memberlist.SendBestEffort(&mnode.Node, compound); err != nil {
418 418
 				logrus.Errorf("Failed to send gossip to %s: %s", mnode.Addr, err)
419 419
 			}
420 420
 		}
421 421
 	}
422
+	// Reset the stats
423
+	if printStats {
424
+		nDB.lastStatsTimestamp = time.Now()
425
+	}
422 426
 }
423 427
 
424 428
 func (nDB *NetworkDB) bulkSyncTables() {
... ...
@@ -589,7 +614,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
589 589
 	nDB.bulkSyncAckTbl[node] = ch
590 590
 	nDB.Unlock()
591 591
 
592
-	err = nDB.memberlist.SendToTCP(&mnode.Node, buf)
592
+	err = nDB.memberlist.SendReliable(&mnode.Node, buf)
593 593
 	if err != nil {
594 594
 		nDB.Lock()
595 595
 		delete(nDB.bulkSyncAckTbl, node)
... ...
@@ -606,7 +631,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
606 606
 		case <-t.C:
607 607
 			logrus.Errorf("Bulk sync to node %s timed out", node)
608 608
 		case <-ch:
609
-			logrus.Debugf("%s: Bulk sync to node %s took %s", nDB.config.NodeName, node, time.Now().Sub(startTime))
609
+			logrus.Debugf("%s: Bulk sync to node %s took %s", nDB.config.NodeName, node, time.Since(startTime))
610 610
 		}
611 611
 		t.Stop()
612 612
 	}
... ...
@@ -3,10 +3,6 @@ package networkdb
3 3
 import "github.com/gogo/protobuf/proto"
4 4
 
5 5
 const (
6
-	// Max udp message size chosen to avoid network packet
7
-	// fragmentation.
8
-	udpSendBuf = 1400
9
-
10 6
 	// Compound message header overhead 1 byte(message type) + 4
11 7
 	// bytes (num messages)
12 8
 	compoundHeaderOverhead = 5
... ...
@@ -5,6 +5,7 @@ package networkdb
5 5
 import (
6 6
 	"fmt"
7 7
 	"net"
8
+	"os"
8 9
 	"strings"
9 10
 	"sync"
10 11
 	"time"
... ...
@@ -93,6 +94,12 @@ type NetworkDB struct {
93 93
 	// bootStrapIP is the list of IPs that can be used to bootstrap
94 94
 	// the gossip.
95 95
 	bootStrapIP []net.IP
96
+
97
+	// lastStatsTimestamp is the last timestamp when the stats got printed
98
+	lastStatsTimestamp time.Time
99
+
100
+	// lastHealthTimestamp is the last timestamp when the health score got printed
101
+	lastHealthTimestamp time.Time
96 102
 }
97 103
 
98 104
 // PeerInfo represents the peer (gossip cluster) nodes of a network
... ...
@@ -126,6 +133,9 @@ type network struct {
126 126
 	// The broadcast queue for table event gossip. This is only
127 127
 	// initialized for this node's network attachment entries.
128 128
 	tableBroadcasts *memberlist.TransmitLimitedQueue
129
+
130
+	// Number of gossip messages sent related to this network during the last stats collection period
131
+	qMessagesSent int
129 132
 }
130 133
 
131 134
 // Config represents the configuration of the networdb instance and
... ...
@@ -149,6 +159,21 @@ type Config struct {
149 149
 	// Keys to be added to the Keyring of the memberlist. Key at index
150 150
 	// 0 is the primary key
151 151
 	Keys [][]byte
152
+
153
+	// PacketBufferSize is the maximum number of bytes that memberlist will
154
+	// put in a packet (this will be for UDP packets by default with a NetTransport).
155
+	// A safe value for this is typically 1400 bytes (which is the default). However,
156
+	// depending on your network's MTU (Maximum Transmission Unit) you may
157
+	// be able to increase this to get more content into each gossip packet.
158
+	PacketBufferSize int
159
+
160
+	// StatsPrintPeriod the period to use to print queue stats
161
+	// Default is 5min
162
+	StatsPrintPeriod time.Duration
163
+
164
+	// HealthPrintPeriod the period to use to print the health score
165
+	// Default is 1min
166
+	HealthPrintPeriod time.Duration
152 167
 }
153 168
 
154 169
 // entry defines a table entry
... ...
@@ -171,6 +196,18 @@ type entry struct {
171 171
 	reapTime time.Duration
172 172
 }
173 173
 
174
+// DefaultConfig returns a NetworkDB config with default values
175
+func DefaultConfig() *Config {
176
+	hostname, _ := os.Hostname()
177
+	return &Config{
178
+		NodeName:          hostname,
179
+		BindAddr:          "0.0.0.0",
180
+		PacketBufferSize:  1400,
181
+		StatsPrintPeriod:  5 * time.Minute,
182
+		HealthPrintPeriod: 1 * time.Minute,
183
+	}
184
+}
185
+
174 186
 // New creates a new instance of NetworkDB using the Config passed by
175 187
 // the caller.
176 188
 func New(c *Config) (*NetworkDB, error) {
... ...
@@ -200,6 +237,7 @@ func New(c *Config) (*NetworkDB, error) {
200 200
 // instances passed by the caller in the form of addr:port
201 201
 func (nDB *NetworkDB) Join(members []string) error {
202 202
 	nDB.Lock()
203
+	nDB.bootStrapIP = make([]net.IP, 0, len(members))
203 204
 	for _, m := range members {
204 205
 		nDB.bootStrapIP = append(nDB.bootStrapIP, net.ParseIP(m))
205 206
 	}
... ...
@@ -481,9 +519,8 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
481 481
 	nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
482 482
 		NumNodes: func() int {
483 483
 			nDB.RLock()
484
-			num := len(nDB.networkNodes[nid])
485
-			nDB.RUnlock()
486
-			return num
484
+			defer nDB.RUnlock()
485
+			return len(nDB.networkNodes[nid])
487 486
 		},
488 487
 		RetransmitMult: 4,
489 488
 	}
... ...
@@ -30,10 +30,10 @@ func TestMain(m *testing.M) {
30 30
 func createNetworkDBInstances(t *testing.T, num int, namePrefix string) []*NetworkDB {
31 31
 	var dbs []*NetworkDB
32 32
 	for i := 0; i < num; i++ {
33
-		db, err := New(&Config{
34
-			NodeName: fmt.Sprintf("%s%d", namePrefix, i+1),
35
-			BindPort: int(atomic.AddInt32(&dbPort, 1)),
36
-		})
33
+		conf := DefaultConfig()
34
+		conf.NodeName = fmt.Sprintf("%s%d", namePrefix, i+1)
35
+		conf.BindPort = int(atomic.AddInt32(&dbPort, 1))
36
+		db, err := New(conf)
37 37
 		require.NoError(t, err)
38 38
 
39 39
 		if i != 0 {