Browse code

Merge pull request #31011 from aboch/c1.13.x

[1.13.x] Vendoring libnetwork @bba65e5

Brian Goff authored on 2017/02/16 06:26:27
Showing 11 changed files
... ...
@@ -23,7 +23,7 @@ github.com/RackSec/srslog 456df3a81436d29ba874f3590eeeee25d666f8a5
23 23
 github.com/imdario/mergo 0.2.1
24 24
 
25 25
 #get libnetwork packages
26
-github.com/docker/libnetwork 45b40861e677e37cf27bc184eca5af92f8cdd32d
26
+github.com/docker/libnetwork bba65e5e191eccfbc8e2f6455c527b407c2be5ff
27 27
 github.com/docker/go-events 18b43f1bc85d9cdd42c05a6cd2d444c7a200a894
28 28
 github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80
29 29
 github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec
... ...
@@ -40,6 +40,8 @@ type DataStore interface {
40 40
 	// key. The caller must pass a KVObject of the same type as
41 41
 	// the objects that need to be listed
42 42
 	List(string, KVObject) ([]KVObject, error)
43
+	// Map returns a Map of KVObjects
44
+	Map(key string, kvObject KVObject) (map[string]KVObject, error)
43 45
 	// Scope returns the scope of the store
44 46
 	Scope() string
45 47
 	// KVStore returns access to the KV Store
... ...
@@ -512,23 +514,34 @@ func (ds *datastore) List(key string, kvObject KVObject) ([]KVObject, error) {
512 512
 		return ds.cache.list(kvObject)
513 513
 	}
514 514
 
515
+	var kvol []KVObject
516
+	cb := func(key string, val KVObject) {
517
+		kvol = append(kvol, val)
518
+	}
519
+	err := ds.iterateKVPairsFromStore(key, kvObject, cb)
520
+	if err != nil {
521
+		return nil, err
522
+	}
523
+	return kvol, nil
524
+}
525
+
526
+func (ds *datastore) iterateKVPairsFromStore(key string, kvObject KVObject, callback func(string, KVObject)) error {
515 527
 	// Bail out right away if the kvObject does not implement KVConstructor
516 528
 	ctor, ok := kvObject.(KVConstructor)
517 529
 	if !ok {
518
-		return nil, fmt.Errorf("error listing objects, object does not implement KVConstructor interface")
530
+		return fmt.Errorf("error listing objects, object does not implement KVConstructor interface")
519 531
 	}
520 532
 
521 533
 	// Make sure the parent key exists
522 534
 	if err := ds.ensureParent(key); err != nil {
523
-		return nil, err
535
+		return err
524 536
 	}
525 537
 
526 538
 	kvList, err := ds.store.List(key)
527 539
 	if err != nil {
528
-		return nil, err
540
+		return err
529 541
 	}
530 542
 
531
-	var kvol []KVObject
532 543
 	for _, kvPair := range kvList {
533 544
 		if len(kvPair.Value) == 0 {
534 545
 			continue
... ...
@@ -536,16 +549,33 @@ func (ds *datastore) List(key string, kvObject KVObject) ([]KVObject, error) {
536 536
 
537 537
 		dstO := ctor.New()
538 538
 		if err := dstO.SetValue(kvPair.Value); err != nil {
539
-			return nil, err
539
+			return err
540 540
 		}
541 541
 
542 542
 		// Make sure the object has a correct view of the DB index in
543 543
 		// case we need to modify it and update the DB.
544 544
 		dstO.SetIndex(kvPair.LastIndex)
545
+		callback(kvPair.Key, dstO)
546
+	}
547
+
548
+	return nil
549
+}
545 550
 
546
-		kvol = append(kvol, dstO)
551
+func (ds *datastore) Map(key string, kvObject KVObject) (map[string]KVObject, error) {
552
+	if ds.sequential {
553
+		ds.Lock()
554
+		defer ds.Unlock()
547 555
 	}
548 556
 
557
+	kvol := make(map[string]KVObject)
558
+	cb := func(key string, val KVObject) {
559
+		// Trim the leading & trailing "/" to make it consistent across all stores
560
+		kvol[strings.Trim(key, "/")] = val
561
+	}
562
+	err := ds.iterateKVPairsFromStore(key, kvObject, cb)
563
+	if err != nil {
564
+		return nil, err
565
+	}
549 566
 	return kvol, nil
550 567
 }
551 568
 
... ...
@@ -413,6 +413,7 @@ func (d *driver) updateKeys(newKey, primary, pruneKey *key) error {
413 413
 		priIdx = -1
414 414
 		delIdx = -1
415 415
 		lIP    = net.ParseIP(d.bindAddress)
416
+		aIP    = net.ParseIP(d.advertiseAddress)
416 417
 	)
417 418
 
418 419
 	d.Lock()
... ...
@@ -440,7 +441,7 @@ func (d *driver) updateKeys(newKey, primary, pruneKey *key) error {
440 440
 
441 441
 	d.secMapWalk(func(rIPs string, spis []*spi) ([]*spi, bool) {
442 442
 		rIP := net.ParseIP(rIPs)
443
-		return updateNodeKey(lIP, rIP, spis, d.keys, newIdx, priIdx, delIdx), false
443
+		return updateNodeKey(lIP, aIP, rIP, spis, d.keys, newIdx, priIdx, delIdx), false
444 444
 	})
445 445
 
446 446
 	d.Lock()
... ...
@@ -471,7 +472,7 @@ func (d *driver) updateKeys(newKey, primary, pruneKey *key) error {
471 471
  *********************************************************/
472 472
 
473 473
 // Spis and keys are sorted in such away the one in position 0 is the primary
474
-func updateNodeKey(lIP, rIP net.IP, idxs []*spi, curKeys []*key, newIdx, priIdx, delIdx int) []*spi {
474
+func updateNodeKey(lIP, aIP, rIP net.IP, idxs []*spi, curKeys []*key, newIdx, priIdx, delIdx int) []*spi {
475 475
 	logrus.Debugf("Updating keys for node: %s (%d,%d,%d)", rIP, newIdx, priIdx, delIdx)
476 476
 
477 477
 	spis := idxs
... ...
@@ -480,8 +481,8 @@ func updateNodeKey(lIP, rIP net.IP, idxs []*spi, curKeys []*key, newIdx, priIdx,
480 480
 	// add new
481 481
 	if newIdx != -1 {
482 482
 		spis = append(spis, &spi{
483
-			forward: buildSPI(lIP, rIP, curKeys[newIdx].tag),
484
-			reverse: buildSPI(rIP, lIP, curKeys[newIdx].tag),
483
+			forward: buildSPI(aIP, rIP, curKeys[newIdx].tag),
484
+			reverse: buildSPI(rIP, aIP, curKeys[newIdx].tag),
485 485
 		})
486 486
 	}
487 487
 
... ...
@@ -612,13 +612,13 @@ func (n *network) initSandbox(restore bool) error {
612 612
 	var nlSock *nl.NetlinkSocket
613 613
 	sbox.InvokeFunc(func() {
614 614
 		nlSock, err = nl.Subscribe(syscall.NETLINK_ROUTE, syscall.RTNLGRP_NEIGH)
615
-		if err != nil {
616
-			err = fmt.Errorf("failed to subscribe to neighbor group netlink messages")
617
-		}
618 615
 	})
619 616
 
620
-	if nlSock != nil {
617
+	if err == nil {
621 618
 		go n.watchMiss(nlSock)
619
+	} else {
620
+		logrus.Errorf("failed to subscribe to neighbor group netlink messages for overlay network %s in sbox %s: %v",
621
+			n.id, sbox.Key(), err)
622 622
 	}
623 623
 
624 624
 	return nil
... ...
@@ -644,6 +644,9 @@ func (n *network) watchMiss(nlSock *nl.NetlinkSocket) {
644 644
 			}
645 645
 
646 646
 			if neigh.IP.To4() == nil {
647
+				if neigh.HardwareAddr != nil {
648
+					logrus.Debugf("Miss notification, l2 mac %v", neigh.HardwareAddr)
649
+				}
647 650
 				continue
648 651
 			}
649 652
 
... ...
@@ -73,7 +73,7 @@ func (d *driver) serfJoin(neighIP string) error {
73 73
 	if neighIP == "" {
74 74
 		return fmt.Errorf("no neighbor to join")
75 75
 	}
76
-	if _, err := d.serfInstance.Join([]string{neighIP}, false); err != nil {
76
+	if _, err := d.serfInstance.Join([]string{neighIP}, true); err != nil {
77 77
 		return fmt.Errorf("Failed to join the cluster at neigh IP %s: %v",
78 78
 			neighIP, err)
79 79
 	}
... ...
@@ -94,8 +94,8 @@ func (d *driver) notifyEvent(event ovNotify) {
94 94
 }
95 95
 
96 96
 func (d *driver) processEvent(u serf.UserEvent) {
97
-	logrus.Debugf("Received user event name:%s, payload:%s\n", u.Name,
98
-		string(u.Payload))
97
+	logrus.Debugf("Received user event name:%s, payload:%s LTime:%d \n", u.Name,
98
+		string(u.Payload), uint64(u.LTime))
99 99
 
100 100
 	var dummy, action, vtepStr, nid, eid, ipStr, maskStr, macStr string
101 101
 	if _, err := fmt.Sscan(u.Name, &dummy, &vtepStr, &nid, &eid); err != nil {
... ...
@@ -146,6 +146,7 @@ func (d *driver) processQuery(q *serf.Query) {
146 146
 		return
147 147
 	}
148 148
 
149
+	logrus.Debugf("Sending peer query resp mac %s, mask %s, vtep %s", peerMac, net.IP(peerIPMask), vtep)
149 150
 	q.Respond([]byte(fmt.Sprintf("%s %s %s", peerMac.String(), net.IP(peerIPMask).String(), vtep.String())))
150 151
 }
151 152
 
... ...
@@ -173,6 +174,7 @@ func (d *driver) resolvePeer(nid string, peerIP net.IP) (net.HardwareAddr, net.I
173 173
 			return nil, nil, nil, fmt.Errorf("failed to parse mac: %v", err)
174 174
 		}
175 175
 
176
+		logrus.Debugf("Received peer query response, mac %s, vtep %s, mask %s", macStr, vtepStr, maskStr)
176 177
 		return mac, net.IPMask(net.ParseIP(maskStr).To4()), net.ParseIP(vtepStr), nil
177 178
 
178 179
 	case <-time.After(time.Second):
... ...
@@ -48,6 +48,7 @@ type driver struct {
48 48
 	vxlanIdm         *idm.Idm
49 49
 	once             sync.Once
50 50
 	joinOnce         sync.Once
51
+	localJoinOnce    sync.Once
51 52
 	keys             []*key
52 53
 	sync.Mutex
53 54
 }
... ...
@@ -241,6 +242,12 @@ func (d *driver) nodeJoin(advertiseAddress, bindAddress string, self bool) {
241 241
 		d.bindAddress = bindAddress
242 242
 		d.Unlock()
243 243
 
244
+		// If containers are already running on this network update the
245
+		// advertiseaddress in the peerDB
246
+		d.localJoinOnce.Do(func() {
247
+			d.peerDBUpdateSelf()
248
+		})
249
+
244 250
 		// If there is no cluster store there is no need to start serf.
245 251
 		if d.store != nil {
246 252
 			if err := validateSelf(advertiseAddress); err != nil {
... ...
@@ -80,25 +80,29 @@ func (d *driver) peerDbWalk(f func(string, *peerKey, *peerEntry) bool) error {
80 80
 func (d *driver) peerDbNetworkWalk(nid string, f func(*peerKey, *peerEntry) bool) error {
81 81
 	d.peerDb.Lock()
82 82
 	pMap, ok := d.peerDb.mp[nid]
83
+	d.peerDb.Unlock()
84
+
83 85
 	if !ok {
84
-		d.peerDb.Unlock()
85 86
 		return nil
86 87
 	}
87
-	d.peerDb.Unlock()
88
+
89
+	mp := map[string]peerEntry{}
88 90
 
89 91
 	pMap.Lock()
90 92
 	for pKeyStr, pEntry := range pMap.mp {
93
+		mp[pKeyStr] = pEntry
94
+	}
95
+	pMap.Unlock()
96
+
97
+	for pKeyStr, pEntry := range mp {
91 98
 		var pKey peerKey
92 99
 		if _, err := fmt.Sscan(pKeyStr, &pKey); err != nil {
93 100
 			logrus.Warnf("Peer key scan on network %s failed: %v", nid, err)
94 101
 		}
95
-
96 102
 		if f(&pKey, &pEntry) {
97
-			pMap.Unlock()
98 103
 			return nil
99 104
 		}
100 105
 	}
101
-	pMap.Unlock()
102 106
 
103 107
 	return nil
104 108
 }
... ...
@@ -363,3 +367,12 @@ func (d *driver) pushLocalDb() {
363 363
 		return false
364 364
 	})
365 365
 }
366
+
367
+func (d *driver) peerDBUpdateSelf() {
368
+	d.peerDbWalk(func(nid string, pkey *peerKey, pEntry *peerEntry) bool {
369
+		if pEntry.isLocal {
370
+			pEntry.vtep = net.ParseIP(d.advertiseAddress)
371
+		}
372
+		return false
373
+	})
374
+}
... ...
@@ -181,6 +181,9 @@ type tableEntry struct {
181 181
 }
182 182
 
183 183
 func (ep *endpoint) Info() EndpointInfo {
184
+	if ep.sandboxID != "" {
185
+		return ep
186
+	}
184 187
 	n, err := ep.getNetworkFromStore()
185 188
 	if err != nil {
186 189
 		return nil
... ...
@@ -138,6 +138,7 @@ func getIPVSFamily() (int, error) {
138 138
 	if err != nil {
139 139
 		return 0, err
140 140
 	}
141
+	defer sock.Close()
141 142
 
142 143
 	req := newGenlRequest(genlCtrlID, genlCtrlCmdGetFamily)
143 144
 	req.AddData(nl.NewRtAttr(genlCtrlAttrFamilyName, nl.ZeroTerminated("IPVS")))
... ...
@@ -5,6 +5,7 @@ import (
5 5
 	"fmt"
6 6
 	"net"
7 7
 
8
+	"github.com/Sirupsen/logrus"
8 9
 	"github.com/vishvananda/netlink"
9 10
 )
10 11
 
... ...
@@ -96,6 +97,7 @@ func (n *networkNamespace) AddNeighbor(dstIP net.IP, dstMac net.HardwareAddr, op
96 96
 
97 97
 	nh := n.findNeighbor(dstIP, dstMac)
98 98
 	if nh != nil {
99
+		logrus.Debugf("Neighbor entry already present for IP %v, mac %v", dstIP, dstMac)
99 100
 		// If it exists silently return
100 101
 		return nil
101 102
 	}
... ...
@@ -2,6 +2,7 @@ package libnetwork
2 2
 
3 3
 import (
4 4
 	"fmt"
5
+	"strings"
5 6
 
6 7
 	"github.com/Sirupsen/logrus"
7 8
 	"github.com/docker/libkv/store/boltdb"
... ...
@@ -152,21 +153,24 @@ func (c *controller) getNetworksFromStore() ([]*network, error) {
152 152
 			continue
153 153
 		}
154 154
 
155
+		kvep, err := store.Map(datastore.Key(epCntKeyPrefix), &endpointCnt{})
156
+		if err != nil {
157
+			if err != datastore.ErrKeyNotFound {
158
+				logrus.Warnf("failed to get endpoint_count map for scope %s: %v", store.Scope(), err)
159
+			}
160
+		}
161
+
155 162
 		for _, kvo := range kvol {
156 163
 			n := kvo.(*network)
157 164
 			n.Lock()
158 165
 			n.ctrlr = c
159
-			n.Unlock()
160
-
161 166
 			ec := &endpointCnt{n: n}
162
-			err = store.GetObject(datastore.Key(ec.Key()...), ec)
163
-			if err != nil && !n.inDelete {
164
-				logrus.Warnf("could not find endpoint count key %s for network %s while listing: %v", datastore.Key(ec.Key()...), n.Name(), err)
165
-				continue
167
+			// Trim the leading & trailing "/" to make it consistent across all stores
168
+			if val, ok := kvep[strings.Trim(datastore.Key(ec.Key()...), "/")]; ok {
169
+				ec = val.(*endpointCnt)
170
+				ec.n = n
171
+				n.epCnt = ec
166 172
 			}
167
-
168
-			n.Lock()
169
-			n.epCnt = ec
170 173
 			n.scope = store.Scope()
171 174
 			n.Unlock()
172 175
 			nl = append(nl, n)