Browse code

Vendoring libnetwork to address some concurrency issues

Addresses #28697, #28845, #28712, #26111

Signed-off-by: Madhu Venugopal <madhu@docker.com>

Madhu Venugopal authored on 2016/11/30 15:53:17
Showing 11 changed files
... ...
@@ -23,7 +23,7 @@ github.com/RackSec/srslog 456df3a81436d29ba874f3590eeeee25d666f8a5
23 23
 github.com/imdario/mergo 0.2.1
24 24
 
25 25
 #get libnetwork packages
26
-github.com/docker/libnetwork dd0ddde6749fdffe310087e1c3616142d8c3ef9e 
26
+github.com/docker/libnetwork fd27f22aaa35e3d57f88688f919d05b744f431fd
27 27
 github.com/docker/go-events 18b43f1bc85d9cdd42c05a6cd2d444c7a200a894
28 28
 github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80
29 29
 github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec
... ...
@@ -7,6 +7,7 @@ import (
7 7
 	"net"
8 8
 	"os"
9 9
 	"sort"
10
+	"sync"
10 11
 
11 12
 	"github.com/Sirupsen/logrus"
12 13
 	"github.com/docker/docker/pkg/stringid"
... ...
@@ -39,6 +40,7 @@ type agent struct {
39 39
 	advertiseAddr     string
40 40
 	epTblCancel       func()
41 41
 	driverCancelFuncs map[string][]func()
42
+	sync.Mutex
42 43
 }
43 44
 
44 45
 func getBindAddr(ifaceName string) (string, error) {
... ...
@@ -86,9 +88,16 @@ func resolveAddr(addrOrInterface string) (string, error) {
86 86
 func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error {
87 87
 	drvEnc := discoverapi.DriverEncryptionUpdate{}
88 88
 
89
-	a := c.agent
89
+	a := c.getAgent()
90
+	if a == nil {
91
+		logrus.Debug("Skipping key change as agent is nil")
92
+		return nil
93
+	}
94
+
90 95
 	// Find the deleted key. If the deleted key was the primary key,
91 96
 	// a new primary key should be set before removing if from keyring.
97
+	c.Lock()
98
+	added := []byte{}
92 99
 	deleted := []byte{}
93 100
 	j := len(c.keys)
94 101
 	for i := 0; i < j; {
... ...
@@ -127,7 +136,7 @@ func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error {
127 127
 		if !same {
128 128
 			c.keys = append(c.keys, key)
129 129
 			if key.Subsystem == subsysGossip {
130
-				a.networkDB.SetKey(key.Key)
130
+				added = key.Key
131 131
 			}
132 132
 
133 133
 			if key.Subsystem == subsysIPSec {
... ...
@@ -136,6 +145,11 @@ func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error {
136 136
 			}
137 137
 		}
138 138
 	}
139
+	c.Unlock()
140
+
141
+	if len(added) > 0 {
142
+		a.networkDB.SetKey(added)
143
+	}
139 144
 
140 145
 	key, tag, err := c.getPrimaryKeyTag(subsysGossip)
141 146
 	if err != nil {
... ...
@@ -166,8 +180,10 @@ func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error {
166 166
 }
167 167
 
168 168
 func (c *controller) agentSetup() error {
169
+	c.Lock()
169 170
 	clusterProvider := c.cfg.Daemon.ClusterProvider
170
-
171
+	agent := c.agent
172
+	c.Unlock()
171 173
 	bindAddr := clusterProvider.GetLocalAddress()
172 174
 	advAddr := clusterProvider.GetAdvertiseAddress()
173 175
 	remote := clusterProvider.GetRemoteAddress()
... ...
@@ -176,7 +192,7 @@ func (c *controller) agentSetup() error {
176 176
 	listenAddr, _, _ := net.SplitHostPort(listen)
177 177
 
178 178
 	logrus.Infof("Initializing Libnetwork Agent Listen-Addr=%s Local-addr=%s Adv-addr=%s Remote-addr =%s", listenAddr, bindAddr, advAddr, remoteAddr)
179
-	if advAddr != "" && c.agent == nil {
179
+	if advAddr != "" && agent == nil {
180 180
 		if err := c.agentInit(listenAddr, bindAddr, advAddr); err != nil {
181 181
 			logrus.Errorf("Error in agentInit : %v", err)
182 182
 		} else {
... ...
@@ -208,6 +224,9 @@ func (c *controller) agentSetup() error {
208 208
 // For a given subsystem getKeys sorts the keys by lamport time and returns
209 209
 // slice of keys and lamport time which can used as a unique tag for the keys
210 210
 func (c *controller) getKeys(subsys string) ([][]byte, []uint64) {
211
+	c.Lock()
212
+	defer c.Unlock()
213
+
211 214
 	sort.Sort(ByTime(c.keys))
212 215
 
213 216
 	keys := [][]byte{}
... ...
@@ -227,6 +246,8 @@ func (c *controller) getKeys(subsys string) ([][]byte, []uint64) {
227 227
 // getPrimaryKeyTag returns the primary key for a given subsystem from the
228 228
 // list of sorted key and the associated tag
229 229
 func (c *controller) getPrimaryKeyTag(subsys string) ([]byte, uint64, error) {
230
+	c.Lock()
231
+	defer c.Unlock()
230 232
 	sort.Sort(ByTime(c.keys))
231 233
 	keys := []*types.EncryptionKey{}
232 234
 	for _, key := range c.keys {
... ...
@@ -265,6 +286,7 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr st
265 265
 
266 266
 	ch, cancel := nDB.Watch("endpoint_table", "", "")
267 267
 
268
+	c.Lock()
268 269
 	c.agent = &agent{
269 270
 		networkDB:         nDB,
270 271
 		bindAddr:          bindAddr,
... ...
@@ -272,6 +294,7 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr st
272 272
 		epTblCancel:       cancel,
273 273
 		driverCancelFuncs: make(map[string][]func()),
274 274
 	}
275
+	c.Unlock()
275 276
 
276 277
 	go c.handleTableEvents(ch, c.handleEpTableEvent)
277 278
 
... ...
@@ -294,21 +317,22 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr st
294 294
 }
295 295
 
296 296
 func (c *controller) agentJoin(remote string) error {
297
-	if c.agent == nil {
297
+	agent := c.getAgent()
298
+	if agent == nil {
298 299
 		return nil
299 300
 	}
300
-
301
-	return c.agent.networkDB.Join([]string{remote})
301
+	return agent.networkDB.Join([]string{remote})
302 302
 }
303 303
 
304 304
 func (c *controller) agentDriverNotify(d driverapi.Driver) {
305
-	if c.agent == nil {
305
+	agent := c.getAgent()
306
+	if agent == nil {
306 307
 		return
307 308
 	}
308 309
 
309 310
 	d.DiscoverNew(discoverapi.NodeDiscovery, discoverapi.NodeDiscoveryData{
310
-		Address:     c.agent.advertiseAddr,
311
-		BindAddress: c.agent.bindAddr,
311
+		Address:     agent.advertiseAddr,
312
+		BindAddress: agent.bindAddr,
312 313
 		Self:        true,
313 314
 	})
314 315
 
... ...
@@ -339,11 +363,19 @@ func (c *controller) agentClose() {
339 339
 		return
340 340
 	}
341 341
 
342
+	var cancelList []func()
343
+
344
+	agent.Lock()
342 345
 	for _, cancelFuncs := range agent.driverCancelFuncs {
343 346
 		for _, cancel := range cancelFuncs {
344
-			cancel()
347
+			cancelList = append(cancelList, cancel)
345 348
 		}
346 349
 	}
350
+	agent.Unlock()
351
+
352
+	for _, cancel := range cancelList {
353
+		cancel()
354
+	}
347 355
 
348 356
 	agent.epTblCancel()
349 357
 
... ...
@@ -354,13 +386,7 @@ func (n *network) isClusterEligible() bool {
354 354
 	if n.driverScope() != datastore.GlobalScope {
355 355
 		return false
356 356
 	}
357
-
358
-	c := n.getController()
359
-	if c.agent == nil {
360
-		return false
361
-	}
362
-
363
-	return true
357
+	return n.getController().getAgent() != nil
364 358
 }
365 359
 
366 360
 func (n *network) joinCluster() error {
... ...
@@ -368,8 +394,12 @@ func (n *network) joinCluster() error {
368 368
 		return nil
369 369
 	}
370 370
 
371
-	c := n.getController()
372
-	return c.agent.networkDB.JoinNetwork(n.ID())
371
+	agent := n.getController().getAgent()
372
+	if agent == nil {
373
+		return nil
374
+	}
375
+
376
+	return agent.networkDB.JoinNetwork(n.ID())
373 377
 }
374 378
 
375 379
 func (n *network) leaveCluster() error {
... ...
@@ -377,8 +407,12 @@ func (n *network) leaveCluster() error {
377 377
 		return nil
378 378
 	}
379 379
 
380
-	c := n.getController()
381
-	return c.agent.networkDB.LeaveNetwork(n.ID())
380
+	agent := n.getController().getAgent()
381
+	if agent == nil {
382
+		return nil
383
+	}
384
+
385
+	return agent.networkDB.LeaveNetwork(n.ID())
382 386
 }
383 387
 
384 388
 func (ep *endpoint) addDriverInfoToCluster() error {
... ...
@@ -390,10 +424,7 @@ func (ep *endpoint) addDriverInfoToCluster() error {
390 390
 		return nil
391 391
 	}
392 392
 
393
-	ctrlr := n.ctrlr
394
-	ctrlr.Lock()
395
-	agent := ctrlr.agent
396
-	ctrlr.Unlock()
393
+	agent := n.getController().getAgent()
397 394
 	if agent == nil {
398 395
 		return nil
399 396
 	}
... ...
@@ -415,10 +446,7 @@ func (ep *endpoint) deleteDriverInfoFromCluster() error {
415 415
 		return nil
416 416
 	}
417 417
 
418
-	ctrlr := n.ctrlr
419
-	ctrlr.Lock()
420
-	agent := ctrlr.agent
421
-	ctrlr.Unlock()
418
+	agent := n.getController().getAgent()
422 419
 	if agent == nil {
423 420
 		return nil
424 421
 	}
... ...
@@ -438,6 +466,7 @@ func (ep *endpoint) addServiceInfoToCluster() error {
438 438
 	}
439 439
 
440 440
 	c := n.getController()
441
+	agent := c.getAgent()
441 442
 	if !ep.isAnonymous() && ep.Iface().Address() != nil {
442 443
 		var ingressPorts []*PortConfig
443 444
 		if ep.svcID != "" {
... ...
@@ -466,8 +495,10 @@ func (ep *endpoint) addServiceInfoToCluster() error {
466 466
 			return err
467 467
 		}
468 468
 
469
-		if err := c.agent.networkDB.CreateEntry("endpoint_table", n.ID(), ep.ID(), buf); err != nil {
470
-			return err
469
+		if agent != nil {
470
+			if err := agent.networkDB.CreateEntry("endpoint_table", n.ID(), ep.ID(), buf); err != nil {
471
+				return err
472
+			}
471 473
 		}
472 474
 	}
473 475
 
... ...
@@ -481,6 +512,8 @@ func (ep *endpoint) deleteServiceInfoFromCluster() error {
481 481
 	}
482 482
 
483 483
 	c := n.getController()
484
+	agent := c.getAgent()
485
+
484 486
 	if !ep.isAnonymous() {
485 487
 		if ep.svcID != "" && ep.Iface().Address() != nil {
486 488
 			var ingressPorts []*PortConfig
... ...
@@ -492,9 +525,10 @@ func (ep *endpoint) deleteServiceInfoFromCluster() error {
492 492
 				return err
493 493
 			}
494 494
 		}
495
-
496
-		if err := c.agent.networkDB.DeleteEntry("endpoint_table", n.ID(), ep.ID()); err != nil {
497
-			return err
495
+		if agent != nil {
496
+			if err := agent.networkDB.DeleteEntry("endpoint_table", n.ID(), ep.ID()); err != nil {
497
+				return err
498
+			}
498 499
 		}
499 500
 	}
500 501
 	return nil
... ...
@@ -506,16 +540,15 @@ func (n *network) addDriverWatches() {
506 506
 	}
507 507
 
508 508
 	c := n.getController()
509
+	agent := c.getAgent()
510
+	if agent == nil {
511
+		return
512
+	}
509 513
 	for _, tableName := range n.driverTables {
510
-		c.Lock()
511
-		if c.agent == nil {
512
-			c.Unlock()
513
-			return
514
-		}
515
-		ch, cancel := c.agent.networkDB.Watch(tableName, n.ID(), "")
516
-		c.agent.driverCancelFuncs[n.ID()] = append(c.agent.driverCancelFuncs[n.ID()], cancel)
517
-		c.Unlock()
518
-
514
+		ch, cancel := agent.networkDB.Watch(tableName, n.ID(), "")
515
+		agent.Lock()
516
+		agent.driverCancelFuncs[n.ID()] = append(agent.driverCancelFuncs[n.ID()], cancel)
517
+		agent.Unlock()
519 518
 		go c.handleTableEvents(ch, n.handleDriverTableEvent)
520 519
 		d, err := n.driver(false)
521 520
 		if err != nil {
... ...
@@ -523,7 +556,7 @@ func (n *network) addDriverWatches() {
523 523
 			return
524 524
 		}
525 525
 
526
-		c.agent.networkDB.WalkTable(tableName, func(nid, key string, value []byte) bool {
526
+		agent.networkDB.WalkTable(tableName, func(nid, key string, value []byte) bool {
527 527
 			if nid == n.ID() {
528 528
 				d.EventNotify(driverapi.Create, nid, tableName, key, value)
529 529
 			}
... ...
@@ -538,11 +571,15 @@ func (n *network) cancelDriverWatches() {
538 538
 		return
539 539
 	}
540 540
 
541
-	c := n.getController()
542
-	c.Lock()
543
-	cancelFuncs := c.agent.driverCancelFuncs[n.ID()]
544
-	delete(c.agent.driverCancelFuncs, n.ID())
545
-	c.Unlock()
541
+	agent := n.getController().getAgent()
542
+	if agent == nil {
543
+		return
544
+	}
545
+
546
+	agent.Lock()
547
+	cancelFuncs := agent.driverCancelFuncs[n.ID()]
548
+	delete(agent.driverCancelFuncs, n.ID())
549
+	agent.Unlock()
546 550
 
547 551
 	for _, cancel := range cancelFuncs {
548 552
 		cancel()
... ...
@@ -237,12 +237,13 @@ func New(cfgOptions ...config.Option) (NetworkController, error) {
237 237
 
238 238
 func (c *controller) SetClusterProvider(provider cluster.Provider) {
239 239
 	c.Lock()
240
-	defer c.Unlock()
241 240
 	c.cfg.Daemon.ClusterProvider = provider
241
+	disableProviderCh := c.cfg.Daemon.DisableProvider
242
+	c.Unlock()
242 243
 	if provider != nil {
243 244
 		go c.clusterAgentInit()
244 245
 	} else {
245
-		c.cfg.Daemon.DisableProvider <- struct{}{}
246
+		disableProviderCh <- struct{}{}
246 247
 	}
247 248
 }
248 249
 
... ...
@@ -295,6 +296,12 @@ func (c *controller) SetKeys(keys []*types.EncryptionKey) error {
295 295
 	return c.handleKeyChange(keys)
296 296
 }
297 297
 
298
+func (c *controller) getAgent() *agent {
299
+	c.Lock()
300
+	defer c.Unlock()
301
+	return c.agent
302
+}
303
+
298 304
 func (c *controller) clusterAgentInit() {
299 305
 	clusterProvider := c.cfg.Daemon.ClusterProvider
300 306
 	for {
... ...
@@ -57,7 +57,7 @@ func Init(dc driverapi.DriverCallback, config map[string]interface{}) error {
57 57
 		config:   config,
58 58
 	}
59 59
 
60
-	d.vxlanIdm, err = idm.New(nil, "vxlan-id", 1, vxlanIDEnd)
60
+	d.vxlanIdm, err = idm.New(nil, "vxlan-id", 0, vxlanIDEnd)
61 61
 	if err != nil {
62 62
 		return fmt.Errorf("failed to initialize vxlan id manager: %v", err)
63 63
 	}
... ...
@@ -15,7 +15,7 @@ type Idm struct {
15 15
 	handle *bitseq.Handle
16 16
 }
17 17
 
18
-// New returns an instance of id manager for a set of [start-end] numerical ids
18
+// New returns an instance of id manager for a [start,end] set of numerical ids
19 19
 func New(ds datastore.DataStore, id string, start, end uint64) (*Idm, error) {
20 20
 	if id == "" {
21 21
 		return nil, fmt.Errorf("Invalid id")
... ...
@@ -54,7 +54,7 @@ func (i *Idm) GetSpecificID(id uint64) error {
54 54
 	return i.handle.Set(id - i.start)
55 55
 }
56 56
 
57
-// GetIDInRange returns the first available id in the set within a range
57
+// GetIDInRange returns the first available id in the set within a [start,end] range
58 58
 func (i *Idm) GetIDInRange(start, end uint64) (uint64, error) {
59 59
 	if i.handle == nil {
60 60
 		return 0, fmt.Errorf("ID set is not initialized")
... ...
@@ -64,7 +64,9 @@ func (i *Idm) GetIDInRange(start, end uint64) (uint64, error) {
64 64
 		return 0, fmt.Errorf("Requested range does not belong to the set")
65 65
 	}
66 66
 
67
-	return i.handle.SetAnyInRange(start, end-start)
67
+	ordinal, err := i.handle.SetAnyInRange(start-i.start, end-i.start)
68
+
69
+	return i.start + ordinal, err
68 70
 }
69 71
 
70 72
 // Release releases the specified id
... ...
@@ -45,6 +45,7 @@ var (
45 45
 	iptablesPath  string
46 46
 	supportsXlock = false
47 47
 	supportsCOpt  = false
48
+	xLockWaitMsg  = "Another app is currently holding the xtables lock; waiting"
48 49
 	// used to lock iptables commands if xtables lock is not supported
49 50
 	bestEffortLock sync.Mutex
50 51
 	// ErrIptablesNotFound is returned when the rule is not found.
... ...
@@ -402,7 +403,7 @@ func raw(args ...string) ([]byte, error) {
402 402
 	}
403 403
 
404 404
 	// ignore iptables' message about xtables lock
405
-	if strings.Contains(string(output), "waiting for it to exit") {
405
+	if strings.Contains(string(output), xLockWaitMsg) {
406 406
 		output = []byte("")
407 407
 	}
408 408
 
... ...
@@ -1485,17 +1485,12 @@ func (n *network) Peers() []networkdb.PeerInfo {
1485 1485
 		return []networkdb.PeerInfo{}
1486 1486
 	}
1487 1487
 
1488
-	var nDB *networkdb.NetworkDB
1489
-	n.ctrlr.Lock()
1490
-	if n.ctrlr.agentInitDone == nil && n.ctrlr.agent != nil {
1491
-		nDB = n.ctrlr.agent.networkDB
1488
+	agent := n.getController().getAgent()
1489
+	if agent == nil {
1490
+		return []networkdb.PeerInfo{}
1492 1491
 	}
1493
-	n.ctrlr.Unlock()
1494 1492
 
1495
-	if nDB != nil {
1496
-		return n.ctrlr.agent.networkDB.Peers(n.id)
1497
-	}
1498
-	return []networkdb.PeerInfo{}
1493
+	return agent.networkDB.Peers(n.ID())
1499 1494
 }
1500 1495
 
1501 1496
 func (n *network) DriverOptions() map[string]string {
... ...
@@ -45,6 +45,8 @@ func (l *logWriter) Write(p []byte) (int, error) {
45 45
 // SetKey adds a new key to the key ring
46 46
 func (nDB *NetworkDB) SetKey(key []byte) {
47 47
 	logrus.Debugf("Adding key %s", hex.EncodeToString(key)[0:5])
48
+	nDB.Lock()
49
+	defer nDB.Unlock()
48 50
 	for _, dbKey := range nDB.config.Keys {
49 51
 		if bytes.Equal(key, dbKey) {
50 52
 			return
... ...
@@ -60,6 +62,8 @@ func (nDB *NetworkDB) SetKey(key []byte) {
60 60
 // been added apriori through SetKey
61 61
 func (nDB *NetworkDB) SetPrimaryKey(key []byte) {
62 62
 	logrus.Debugf("Primary Key %s", hex.EncodeToString(key)[0:5])
63
+	nDB.RLock()
64
+	defer nDB.RUnlock()
63 65
 	for _, dbKey := range nDB.config.Keys {
64 66
 		if bytes.Equal(key, dbKey) {
65 67
 			if nDB.keyring != nil {
... ...
@@ -74,6 +78,8 @@ func (nDB *NetworkDB) SetPrimaryKey(key []byte) {
74 74
 // can't be the primary key
75 75
 func (nDB *NetworkDB) RemoveKey(key []byte) {
76 76
 	logrus.Debugf("Remove Key %s", hex.EncodeToString(key)[0:5])
77
+	nDB.Lock()
78
+	defer nDB.Unlock()
77 79
 	for i, dbKey := range nDB.config.Keys {
78 80
 		if bytes.Equal(key, dbKey) {
79 81
 			nDB.config.Keys = append(nDB.config.Keys[:i], nDB.config.Keys[i+1:]...)
... ...
@@ -418,8 +418,12 @@ func (r *resolver) ServeDNS(w dns.ResponseWriter, query *dns.Msg) {
418 418
 			}
419 419
 
420 420
 			execErr := r.backend.ExecFunc(extConnect)
421
-			if execErr != nil || err != nil {
422
-				logrus.Debugf("Connect failed, %s", err)
421
+			if execErr != nil {
422
+				logrus.Warn(execErr)
423
+				continue
424
+			}
425
+			if err != nil {
426
+				logrus.Warnf("Connect failed: %s", err)
423 427
 				continue
424 428
 			}
425 429
 			logrus.Debugf("Query %s[%d] from %s, forwarding to %s:%s", name, query.Question[0].Qtype,
... ...
@@ -156,11 +156,10 @@ func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, in
156 156
 
157 157
 	c.Lock()
158 158
 	s, ok := c.serviceBindings[skey]
159
+	c.Unlock()
159 160
 	if !ok {
160
-		c.Unlock()
161 161
 		return nil
162 162
 	}
163
-	c.Unlock()
164 163
 
165 164
 	s.Lock()
166 165
 	lb, ok := s.loadBalancers[nid]
... ...
@@ -188,7 +187,9 @@ func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, in
188 188
 	if len(s.loadBalancers) == 0 {
189 189
 		// All loadbalancers for the service removed. Time to
190 190
 		// remove the service itself.
191
+		c.Lock()
191 192
 		delete(c.serviceBindings, skey)
193
+		c.Unlock()
192 194
 	}
193 195
 
194 196
 	// Remove loadbalancer service(if needed) and backend in all
... ...
@@ -34,8 +34,8 @@ func init() {
34 34
 func (n *network) connectedLoadbalancers() []*loadBalancer {
35 35
 	c := n.getController()
36 36
 
37
-	serviceBindings := make([]*service, 0, len(c.serviceBindings))
38 37
 	c.Lock()
38
+	serviceBindings := make([]*service, 0, len(c.serviceBindings))
39 39
 	for _, s := range c.serviceBindings {
40 40
 		serviceBindings = append(serviceBindings, s)
41 41
 	}