Browse code

Vendoring libnetwork @f3c4ca8

Signed-off-by: Alessandro Boch <aboch@docker.com>

Alessandro Boch authored on 2017/04/01 06:07:31
Showing 14 changed files
... ...
@@ -24,7 +24,7 @@ github.com/RackSec/srslog 456df3a81436d29ba874f3590eeeee25d666f8a5
24 24
 github.com/imdario/mergo 0.2.1
25 25
 
26 26
 #get libnetwork packages
27
-github.com/docker/libnetwork b6cb1eee1e7fc27ee05f0eb830d3e60e67a88565
27
+github.com/docker/libnetwork f3c4ca8ce5c128e071bab198c4ed9fd0d08384eb
28 28
 github.com/docker/go-events 18b43f1bc85d9cdd42c05a6cd2d444c7a200a894
29 29
 github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80
30 30
 github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec
... ...
@@ -218,6 +218,7 @@ func (c *controller) agentSetup() error {
218 218
 	if c.agent != nil && c.agentInitDone != nil {
219 219
 		close(c.agentInitDone)
220 220
 		c.agentInitDone = nil
221
+		c.agentStopDone = make(chan struct{})
221 222
 	}
222 223
 	c.Unlock()
223 224
 
... ...
@@ -127,6 +127,9 @@ type NetworkController interface {
127 127
 	// Wait for agent initialization complete in libnetwork controller
128 128
 	AgentInitWait()
129 129
 
130
+	// Wait for agent to stop if running
131
+	AgentStopWait()
132
+
130 133
 	// SetKeys configures the encryption key for gossip and overlay data path
131 134
 	SetKeys(keys []*types.EncryptionKey) error
132 135
 }
... ...
@@ -160,6 +163,7 @@ type controller struct {
160 160
 	agent                  *agent
161 161
 	networkLocker          *locker.Locker
162 162
 	agentInitDone          chan struct{}
163
+	agentStopDone          chan struct{}
163 164
 	keys                   []*types.EncryptionKey
164 165
 	clusterConfigAvailable bool
165 166
 	sync.Mutex
... ...
@@ -338,7 +342,12 @@ func (c *controller) clusterAgentInit() {
338 338
 			c.agentClose()
339 339
 			c.cleanupServiceBindings("")
340 340
 
341
-			c.clearIngress(true)
341
+			c.Lock()
342
+			if c.agentStopDone != nil {
343
+				close(c.agentStopDone)
344
+				c.agentStopDone = nil
345
+			}
346
+			c.Unlock()
342 347
 
343 348
 			return
344 349
 		}
... ...
@@ -357,6 +366,15 @@ func (c *controller) AgentInitWait() {
357 357
 	}
358 358
 }
359 359
 
360
+func (c *controller) AgentStopWait() {
361
+	c.Lock()
362
+	agentStopDone := c.agentStopDone
363
+	c.Unlock()
364
+	if agentStopDone != nil {
365
+		<-agentStopDone
366
+	}
367
+}
368
+
360 369
 func (c *controller) makeDriverConfig(ntype string) map[string]interface{} {
361 370
 	if c.cfg == nil {
362 371
 		return nil
... ...
@@ -1153,46 +1171,7 @@ func (c *controller) getIPAMDriver(name string) (ipamapi.Ipam, *ipamapi.Capabili
1153 1153
 }
1154 1154
 
1155 1155
 func (c *controller) Stop() {
1156
-	c.clearIngress(false)
1157 1156
 	c.closeStores()
1158 1157
 	c.stopExternalKeyListener()
1159 1158
 	osl.GC()
1160 1159
 }
1161
-
1162
-func (c *controller) clearIngress(clusterLeave bool) {
1163
-	c.Lock()
1164
-	ingressSandbox := c.ingressSandbox
1165
-	c.ingressSandbox = nil
1166
-	c.Unlock()
1167
-
1168
-	var n *network
1169
-	if ingressSandbox != nil {
1170
-		for _, ep := range ingressSandbox.getConnectedEndpoints() {
1171
-			if nw := ep.getNetwork(); nw.ingress {
1172
-				n = nw
1173
-				break
1174
-			}
1175
-		}
1176
-		if err := ingressSandbox.Delete(); err != nil {
1177
-			logrus.Warnf("Could not delete ingress sandbox while leaving: %v", err)
1178
-		}
1179
-	}
1180
-
1181
-	if n == nil {
1182
-		for _, nw := range c.Networks() {
1183
-			if nw.Info().Ingress() {
1184
-				n = nw.(*network)
1185
-				break
1186
-			}
1187
-		}
1188
-	}
1189
-	if n == nil && clusterLeave {
1190
-		logrus.Warnf("Could not find ingress network while leaving")
1191
-	}
1192
-
1193
-	if n != nil {
1194
-		if err := n.Delete(); err != nil {
1195
-			logrus.Warnf("Could not delete ingress network while leaving: %v", err)
1196
-		}
1197
-	}
1198
-}
... ...
@@ -205,7 +205,7 @@ func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key stri
205 205
 		return
206 206
 	}
207 207
 
208
-	d.peerAdd(nid, eid, addr.IP, addr.Mask, mac, vtep, true)
208
+	d.peerAdd(nid, eid, addr.IP, addr.Mask, mac, vtep, true, false, false)
209 209
 }
210 210
 
211 211
 // Leave method is invoked when a Sandbox detaches from an endpoint.
... ...
@@ -645,19 +645,28 @@ func (n *network) watchMiss(nlSock *nl.NetlinkSocket) {
645 645
 				continue
646 646
 			}
647 647
 
648
-			if neigh.IP.To4() == nil {
649
-				if neigh.HardwareAddr != nil {
650
-					logrus.Debugf("Miss notification, l2 mac %v", neigh.HardwareAddr)
651
-				}
648
+			var (
649
+				ip             net.IP
650
+				mac            net.HardwareAddr
651
+				l2Miss, l3Miss bool
652
+			)
653
+			if neigh.IP.To4() != nil {
654
+				ip = neigh.IP
655
+				l3Miss = true
656
+			} else if neigh.HardwareAddr != nil {
657
+				mac = []byte(neigh.HardwareAddr)
658
+				ip = net.IP(mac[2:])
659
+				l2Miss = true
660
+			} else {
652 661
 				continue
653 662
 			}
654 663
 
655 664
 			// Not any of the network's subnets. Ignore.
656
-			if !n.contains(neigh.IP) {
665
+			if !n.contains(ip) {
657 666
 				continue
658 667
 			}
659 668
 
660
-			logrus.Debugf("miss notification for dest IP, %v", neigh.IP.String())
669
+			logrus.Debugf("miss notification: dest IP %v, dest MAC %v", ip, mac)
661 670
 
662 671
 			if neigh.State&(netlink.NUD_STALE|netlink.NUD_INCOMPLETE) == 0 {
663 672
 				continue
... ...
@@ -667,14 +676,14 @@ func (n *network) watchMiss(nlSock *nl.NetlinkSocket) {
667 667
 				continue
668 668
 			}
669 669
 
670
-			mac, IPmask, vtep, err := n.driver.resolvePeer(n.id, neigh.IP)
670
+			mac, IPmask, vtep, err := n.driver.resolvePeer(n.id, ip)
671 671
 			if err != nil {
672
-				logrus.Errorf("could not resolve peer %q: %v", neigh.IP, err)
672
+				logrus.Errorf("could not resolve peer %q: %v", ip, err)
673 673
 				continue
674 674
 			}
675 675
 
676
-			if err := n.driver.peerAdd(n.id, "dummy", neigh.IP, IPmask, mac, vtep, true); err != nil {
677
-				logrus.Errorf("could not add neighbor entry for missed peer %q: %v", neigh.IP, err)
676
+			if err := n.driver.peerAdd(n.id, "dummy", ip, IPmask, mac, vtep, true, l2Miss, l3Miss); err != nil {
677
+				logrus.Errorf("could not add neighbor entry for missed peer %q: %v", ip, err)
678 678
 			}
679 679
 		}
680 680
 	}
... ...
@@ -121,7 +121,7 @@ func (d *driver) processEvent(u serf.UserEvent) {
121 121
 	switch action {
122 122
 	case "join":
123 123
 		if err := d.peerAdd(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac,
124
-			net.ParseIP(vtepStr), true); err != nil {
124
+			net.ParseIP(vtepStr), true, false, false); err != nil {
125 125
 			logrus.Errorf("Peer add failed in the driver: %v\n", err)
126 126
 		}
127 127
 	case "leave":
... ...
@@ -236,7 +236,7 @@ func (d *driver) peerDbUpdateSandbox(nid string) {
236 236
 		op := func() {
237 237
 			if err := d.peerAdd(nid, entry.eid, pKey.peerIP, entry.peerIPMask,
238 238
 				pKey.peerMac, entry.vtep,
239
-				false); err != nil {
239
+				false, false, false); err != nil {
240 240
 				fmt.Printf("peerdbupdate in sandbox failed for ip %s and mac %s: %v",
241 241
 					pKey.peerIP, pKey.peerMac, err)
242 242
 			}
... ...
@@ -254,7 +254,7 @@ func (d *driver) peerDbUpdateSandbox(nid string) {
254 254
 }
255 255
 
256 256
 func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
257
-	peerMac net.HardwareAddr, vtep net.IP, updateDb bool) error {
257
+	peerMac net.HardwareAddr, vtep net.IP, updateDb, l2Miss, l3Miss bool) error {
258 258
 
259 259
 	if err := validateID(nid, eid); err != nil {
260 260
 		return err
... ...
@@ -297,12 +297,12 @@ func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
297 297
 	}
298 298
 
299 299
 	// Add neighbor entry for the peer IP
300
-	if err := sbox.AddNeighbor(peerIP, peerMac, sbox.NeighborOptions().LinkName(s.vxlanName)); err != nil {
300
+	if err := sbox.AddNeighbor(peerIP, peerMac, l3Miss, sbox.NeighborOptions().LinkName(s.vxlanName)); err != nil {
301 301
 		return fmt.Errorf("could not add neighbor entry into the sandbox: %v", err)
302 302
 	}
303 303
 
304 304
 	// Add fdb entry to the bridge for the peer mac
305
-	if err := sbox.AddNeighbor(vtep, peerMac, sbox.NeighborOptions().LinkName(s.vxlanName),
305
+	if err := sbox.AddNeighbor(vtep, peerMac, l2Miss, sbox.NeighborOptions().LinkName(s.vxlanName),
306 306
 		sbox.NeighborOptions().Family(syscall.AF_BRIDGE)); err != nil {
307 307
 		return fmt.Errorf("could not add fdb entry into the sandbox: %v", err)
308 308
 	}
... ...
@@ -279,7 +279,7 @@ func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
279 279
 	}
280 280
 
281 281
 	// Add neighbor entry for the peer IP
282
-	if err := sbox.AddNeighbor(peerIP, peerMac, sbox.NeighborOptions().LinkName(s.vxlanName)); err != nil {
282
+	if err := sbox.AddNeighbor(peerIP, peerMac, false, sbox.NeighborOptions().LinkName(s.vxlanName)); err != nil {
283 283
 		return fmt.Errorf("could not add neigbor entry into the sandbox: %v", err)
284 284
 	}
285 285
 
... ...
@@ -519,6 +519,14 @@ func (ep *endpoint) sbJoin(sb *sandbox, options ...EndpointOption) error {
519 519
 		return err
520 520
 	}
521 521
 
522
+	defer func() {
523
+		if err != nil {
524
+			if e := ep.deleteDriverInfoFromCluster(); e != nil {
525
+				logrus.Errorf("Could not delete endpoint state for endpoint %s from cluster on join failure: %v", ep.Name(), e)
526
+			}
527
+		}
528
+	}()
529
+
522 530
 	if sb.needDefaultGW() && sb.getEndpointInGWNetwork() == nil {
523 531
 		return sb.setupDefaultGW()
524 532
 	}
... ...
@@ -576,39 +584,64 @@ func doUpdateHostsFile(n *network, sb *sandbox) bool {
576 576
 }
577 577
 
578 578
 func (ep *endpoint) rename(name string) error {
579
-	var err error
579
+	var (
580
+		err      error
581
+		netWatch *netWatch
582
+		ok       bool
583
+	)
584
+
580 585
 	n := ep.getNetwork()
581 586
 	if n == nil {
582 587
 		return fmt.Errorf("network not connected for ep %q", ep.name)
583 588
 	}
584 589
 
585
-	n.getController().Lock()
586
-	netWatch, ok := n.getController().nmap[n.ID()]
587
-	n.getController().Unlock()
590
+	c := n.getController()
588 591
 
589
-	if !ok {
590
-		return fmt.Errorf("watch null for network %q", n.Name())
592
+	if c.isAgent() {
593
+		if err = ep.deleteServiceInfoFromCluster(); err != nil {
594
+			return types.InternalErrorf("Could not delete service state for endpoint %s from cluster on rename: %v", ep.Name(), err)
595
+		}
596
+	} else {
597
+		c.Lock()
598
+		netWatch, ok = c.nmap[n.ID()]
599
+		c.Unlock()
600
+		if !ok {
601
+			return fmt.Errorf("watch null for network %q", n.Name())
602
+		}
603
+		n.updateSvcRecord(ep, c.getLocalEps(netWatch), false)
591 604
 	}
592 605
 
593
-	n.updateSvcRecord(ep, n.getController().getLocalEps(netWatch), false)
594
-
595 606
 	oldName := ep.name
596 607
 	oldAnonymous := ep.anonymous
597 608
 	ep.name = name
598 609
 	ep.anonymous = false
599 610
 
600
-	n.updateSvcRecord(ep, n.getController().getLocalEps(netWatch), true)
601
-	defer func() {
602
-		if err != nil {
603
-			n.updateSvcRecord(ep, n.getController().getLocalEps(netWatch), false)
604
-			ep.name = oldName
605
-			ep.anonymous = oldAnonymous
606
-			n.updateSvcRecord(ep, n.getController().getLocalEps(netWatch), true)
611
+	if c.isAgent() {
612
+		if err = ep.addServiceInfoToCluster(); err != nil {
613
+			return types.InternalErrorf("Could not add service state for endpoint %s to cluster on rename: %v", ep.Name(), err)
607 614
 		}
608
-	}()
615
+		defer func() {
616
+			if err != nil {
617
+				ep.deleteServiceInfoFromCluster()
618
+				ep.name = oldName
619
+				ep.anonymous = oldAnonymous
620
+				ep.addServiceInfoToCluster()
621
+			}
622
+		}()
623
+	} else {
624
+		n.updateSvcRecord(ep, c.getLocalEps(netWatch), true)
625
+		defer func() {
626
+			if err != nil {
627
+				n.updateSvcRecord(ep, c.getLocalEps(netWatch), false)
628
+				ep.name = oldName
629
+				ep.anonymous = oldAnonymous
630
+				n.updateSvcRecord(ep, c.getLocalEps(netWatch), true)
631
+			}
632
+		}()
633
+	}
609 634
 
610 635
 	// Update the store with the updated name
611
-	if err = n.getController().updateToStore(ep); err != nil {
636
+	if err = c.updateToStore(ep); err != nil {
612 637
 		return err
613 638
 	}
614 639
 	// After the name change do a dummy endpoint count update to
... ...
@@ -183,3 +183,11 @@ func (mr ManagerRedirectError) Error() string {
183 183
 
184 184
 // Maskable denotes the type of this error
185 185
 func (mr ManagerRedirectError) Maskable() {}
186
+
187
+// ErrDataStoreNotInitialized is returned if an invalid data scope is passed
188
+// for getting data store
189
+type ErrDataStoreNotInitialized string
190
+
191
+func (dsni ErrDataStoreNotInitialized) Error() string {
192
+	return fmt.Sprintf("datastore for scope %q is not initialized", string(dsni))
193
+}
... ...
@@ -105,8 +105,7 @@ type tableEventMessage struct {
105 105
 }
106 106
 
107 107
 func (m *tableEventMessage) Invalidates(other memberlist.Broadcast) bool {
108
-	otherm := other.(*tableEventMessage)
109
-	return m.id == otherm.id && m.tname == otherm.tname && m.key == otherm.key
108
+	return false
110 109
 }
111 110
 
112 111
 func (m *tableEventMessage) Message() []byte {
... ...
@@ -72,8 +72,11 @@ func (n *networkNamespace) DeleteNeighbor(dstIP net.IP, dstMac net.HardwareAddr,
72 72
 			nlnh.LinkIndex = iface.Attrs().Index
73 73
 		}
74 74
 
75
+		// If the kernel deletion fails for the neighbor entry still remote it
76
+		// from the namespace cache. Otherwise if the neighbor moves back to the
77
+		// same host again, kernel update can fail.
75 78
 		if err := nlh.NeighDel(nlnh); err != nil {
76
-			return fmt.Errorf("could not delete neighbor entry: %v", err)
79
+			logrus.Warnf("Deleting neighbor IP %s, mac %s failed, %v", dstIP, dstMac, err)
77 80
 		}
78 81
 	}
79 82
 
... ...
@@ -85,21 +88,26 @@ func (n *networkNamespace) DeleteNeighbor(dstIP net.IP, dstMac net.HardwareAddr,
85 85
 		}
86 86
 	}
87 87
 	n.Unlock()
88
+	logrus.Debugf("Neighbor entry deleted for IP %v, mac %v", dstIP, dstMac)
88 89
 
89 90
 	return nil
90 91
 }
91 92
 
92
-func (n *networkNamespace) AddNeighbor(dstIP net.IP, dstMac net.HardwareAddr, options ...NeighOption) error {
93
+func (n *networkNamespace) AddNeighbor(dstIP net.IP, dstMac net.HardwareAddr, force bool, options ...NeighOption) error {
93 94
 	var (
94 95
 		iface netlink.Link
95 96
 		err   error
96 97
 	)
97 98
 
99
+	// If the namespace already has the neighbor entry but the AddNeighbor is called
100
+	// because of a miss notification (force flag) program the kernel anyway.
98 101
 	nh := n.findNeighbor(dstIP, dstMac)
99 102
 	if nh != nil {
100
-		logrus.Debugf("Neighbor entry already present for IP %v, mac %v", dstIP, dstMac)
101
-		// If it exists silently return
102
-		return nil
103
+		if !force {
104
+			logrus.Warnf("Neighbor entry already present for IP %v, mac %v", dstIP, dstMac)
105
+			return nil
106
+		}
107
+		logrus.Warnf("Force kernel update, Neighbor entry already present for IP %v, mac %v", dstIP, dstMac)
103 108
 	}
104 109
 
105 110
 	nh = &neigh{
... ...
@@ -150,6 +158,7 @@ func (n *networkNamespace) AddNeighbor(dstIP net.IP, dstMac net.HardwareAddr, op
150 150
 	n.Lock()
151 151
 	n.neighbors = append(n.neighbors, nh)
152 152
 	n.Unlock()
153
+	logrus.Debugf("Neighbor entry added for IP %v, mac %v", dstIP, dstMac)
153 154
 
154 155
 	return nil
155 156
 }
... ...
@@ -39,7 +39,7 @@ type Sandbox interface {
39 39
 	RemoveStaticRoute(*types.StaticRoute) error
40 40
 
41 41
 	// AddNeighbor adds a neighbor entry into the sandbox.
42
-	AddNeighbor(dstIP net.IP, dstMac net.HardwareAddr, option ...NeighOption) error
42
+	AddNeighbor(dstIP net.IP, dstMac net.HardwareAddr, force bool, option ...NeighOption) error
43 43
 
44 44
 	// DeleteNeighbor deletes neighbor entry from the sandbox.
45 45
 	DeleteNeighbor(dstIP net.IP, dstMac net.HardwareAddr, osDelete bool) error
... ...
@@ -225,7 +225,7 @@ func (n *network) getEndpointsFromStore() ([]*endpoint, error) {
225 225
 func (c *controller) updateToStore(kvObject datastore.KVObject) error {
226 226
 	cs := c.getStore(kvObject.DataScope())
227 227
 	if cs == nil {
228
-		return fmt.Errorf("datastore for scope %q is not initialized ", kvObject.DataScope())
228
+		return ErrDataStoreNotInitialized(kvObject.DataScope())
229 229
 	}
230 230
 
231 231
 	if err := cs.PutObjectAtomic(kvObject); err != nil {
... ...
@@ -241,7 +241,7 @@ func (c *controller) updateToStore(kvObject datastore.KVObject) error {
241 241
 func (c *controller) deleteFromStore(kvObject datastore.KVObject) error {
242 242
 	cs := c.getStore(kvObject.DataScope())
243 243
 	if cs == nil {
244
-		return fmt.Errorf("datastore for scope %q is not initialized ", kvObject.DataScope())
244
+		return ErrDataStoreNotInitialized(kvObject.DataScope())
245 245
 	}
246 246
 
247 247
 retry: