Browse code

Vendor libnetwork for network inspect --verbose changes

Signed-off-by: Santhosh Manohar <santhosh@docker.com>

Santhosh Manohar authored on 2017/03/12 10:22:15
Showing 24 changed files
... ...
@@ -23,7 +23,7 @@ github.com/RackSec/srslog 456df3a81436d29ba874f3590eeeee25d666f8a5
23 23
 github.com/imdario/mergo 0.2.1
24 24
 
25 25
 #get libnetwork packages
26
-github.com/docker/libnetwork 1a019214c9cb80bd56219e5d6994a22caf302895
26
+github.com/docker/libnetwork 4610dd67c7b9828bb4719d8aa2ac53a7f1f739d2
27 27
 github.com/docker/go-events 18b43f1bc85d9cdd42c05a6cd2d444c7a200a894
28 28
 github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80
29 29
 github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec
... ...
@@ -44,6 +44,8 @@ type agent struct {
44 44
 	sync.Mutex
45 45
 }
46 46
 
47
+const libnetworkEPTable = "endpoint_table"
48
+
47 49
 func getBindAddr(ifaceName string) (string, error) {
48 50
 	iface, err := net.InterfaceByName(ifaceName)
49 51
 	if err != nil {
... ...
@@ -285,7 +287,7 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr st
285 285
 		return err
286 286
 	}
287 287
 
288
-	ch, cancel := nDB.Watch("endpoint_table", "", "")
288
+	ch, cancel := nDB.Watch(libnetworkEPTable, "", "")
289 289
 	nodeCh, cancel := nDB.Watch(networkdb.NodeTable, "", "")
290 290
 
291 291
 	c.Lock()
... ...
@@ -385,6 +387,111 @@ func (c *controller) agentClose() {
385 385
 	agent.networkDB.Close()
386 386
 }
387 387
 
388
+// Task has the backend container details
389
+type Task struct {
390
+	Name       string
391
+	EndpointID string
392
+	EndpointIP string
393
+	Info       map[string]string
394
+}
395
+
396
+// ServiceInfo has service specific details along with the list of backend tasks
397
+type ServiceInfo struct {
398
+	VIP          string
399
+	LocalLBIndex int
400
+	Tasks        []Task
401
+	Ports        []string
402
+}
403
+
404
+type epRecord struct {
405
+	ep      EndpointRecord
406
+	info    map[string]string
407
+	lbIndex int
408
+}
409
+
410
+func (n *network) Services() map[string]ServiceInfo {
411
+	eps := make(map[string]epRecord)
412
+
413
+	if !n.isClusterEligible() {
414
+		return nil
415
+	}
416
+	agent := n.getController().getAgent()
417
+	if agent == nil {
418
+		return nil
419
+	}
420
+
421
+	// Walk through libnetworkEPTable and fetch the driver agnostic endpoint info
422
+	entries := agent.networkDB.GetTableByNetwork(libnetworkEPTable, n.id)
423
+	for eid, value := range entries {
424
+		var epRec EndpointRecord
425
+		nid := n.ID()
426
+		if err := proto.Unmarshal(value.([]byte), &epRec); err != nil {
427
+			logrus.Errorf("Unmarshal of libnetworkEPTable failed for endpoint %s in network %s, %v", eid, nid, err)
428
+			continue
429
+		}
430
+		i := n.getController().getLBIndex(epRec.ServiceID, nid, epRec.IngressPorts)
431
+		eps[eid] = epRecord{
432
+			ep:      epRec,
433
+			lbIndex: i,
434
+		}
435
+	}
436
+
437
+	// Walk through the driver's tables, have the driver decode the entries
438
+	// and return the tuple {ep ID, value}. value is a string that coveys
439
+	// relevant info about the endpoint.
440
+	d, err := n.driver(true)
441
+	if err != nil {
442
+		logrus.Errorf("Could not resolve driver for network %s/%s while fetching services: %v", n.networkType, n.ID(), err)
443
+		return nil
444
+	}
445
+	for _, table := range n.driverTables {
446
+		if table.objType != driverapi.EndpointObject {
447
+			continue
448
+		}
449
+		entries := agent.networkDB.GetTableByNetwork(table.name, n.id)
450
+		for key, value := range entries {
451
+			epID, info := d.DecodeTableEntry(table.name, key, value.([]byte))
452
+			if ep, ok := eps[epID]; !ok {
453
+				logrus.Errorf("Inconsistent driver and libnetwork state for endpoint %s", epID)
454
+			} else {
455
+				ep.info = info
456
+				eps[epID] = ep
457
+			}
458
+		}
459
+	}
460
+
461
+	// group the endpoints into a map keyed by the service name
462
+	sinfo := make(map[string]ServiceInfo)
463
+	for ep, epr := range eps {
464
+		var (
465
+			s  ServiceInfo
466
+			ok bool
467
+		)
468
+		if s, ok = sinfo[epr.ep.ServiceName]; !ok {
469
+			s = ServiceInfo{
470
+				VIP:          epr.ep.VirtualIP,
471
+				LocalLBIndex: epr.lbIndex,
472
+			}
473
+		}
474
+		ports := []string{}
475
+		if s.Ports == nil {
476
+			for _, port := range epr.ep.IngressPorts {
477
+				p := fmt.Sprintf("Target: %d, Publish: %d", port.TargetPort, port.PublishedPort)
478
+				ports = append(ports, p)
479
+			}
480
+			s.Ports = ports
481
+		}
482
+		s.Tasks = append(s.Tasks, Task{
483
+			Name:       epr.ep.Name,
484
+			EndpointID: ep,
485
+			EndpointIP: epr.ep.EndpointIP,
486
+			Info:       epr.info,
487
+		})
488
+		sinfo[epr.ep.ServiceName] = s
489
+	}
490
+	return sinfo
491
+}
492
+
388 493
 func (n *network) isClusterEligible() bool {
389 494
 	if n.driverScope() != datastore.GlobalScope {
390 495
 		return false
... ...
@@ -508,7 +615,7 @@ func (ep *endpoint) addServiceInfoToCluster() error {
508 508
 	}
509 509
 
510 510
 	if agent != nil {
511
-		if err := agent.networkDB.CreateEntry("endpoint_table", n.ID(), ep.ID(), buf); err != nil {
511
+		if err := agent.networkDB.CreateEntry(libnetworkEPTable, n.ID(), ep.ID(), buf); err != nil {
512 512
 			return err
513 513
 		}
514 514
 	}
... ...
@@ -541,7 +648,7 @@ func (ep *endpoint) deleteServiceInfoFromCluster() error {
541 541
 	}
542 542
 
543 543
 	if agent != nil {
544
-		if err := agent.networkDB.DeleteEntry("endpoint_table", n.ID(), ep.ID()); err != nil {
544
+		if err := agent.networkDB.DeleteEntry(libnetworkEPTable, n.ID(), ep.ID()); err != nil {
545 545
 			return err
546 546
 		}
547 547
 	}
... ...
@@ -559,8 +666,8 @@ func (n *network) addDriverWatches() {
559 559
 	if agent == nil {
560 560
 		return
561 561
 	}
562
-	for _, tableName := range n.driverTables {
563
-		ch, cancel := agent.networkDB.Watch(tableName, n.ID(), "")
562
+	for _, table := range n.driverTables {
563
+		ch, cancel := agent.networkDB.Watch(table.name, n.ID(), "")
564 564
 		agent.Lock()
565 565
 		agent.driverCancelFuncs[n.ID()] = append(agent.driverCancelFuncs[n.ID()], cancel)
566 566
 		agent.Unlock()
... ...
@@ -571,9 +678,9 @@ func (n *network) addDriverWatches() {
571 571
 			return
572 572
 		}
573 573
 
574
-		agent.networkDB.WalkTable(tableName, func(nid, key string, value []byte) bool {
574
+		agent.networkDB.WalkTable(table.name, func(nid, key string, value []byte) bool {
575 575
 			if nid == n.ID() {
576
-				d.EventNotify(driverapi.Create, nid, tableName, key, value)
576
+				d.EventNotify(driverapi.Create, nid, table.name, key, value)
577 577
 			}
578 578
 
579 579
 			return false
... ...
@@ -72,6 +72,16 @@ type Driver interface {
72 72
 	// only invoked for the global scope driver.
73 73
 	EventNotify(event EventType, nid string, tableName string, key string, value []byte)
74 74
 
75
+	// DecodeTableEntry passes the driver a key, value pair from table it registered
76
+	// with libnetwork. Driver should return {object ID, map[string]string} tuple.
77
+	// If DecodeTableEntry is called for a table associated with NetworkObject or
78
+	// EndpointObject the return object ID should be the network id or endppoint id
79
+	// associated with that entry. map should have information about the object that
80
+	// can be presented to the user.
81
+	// For exampe: overlay driver returns the VTEP IP of the host that has the endpoint
82
+	// which is shown in 'network inspect --verbose'
83
+	DecodeTableEntry(tablename string, key string, value []byte) (string, map[string]string)
84
+
75 85
 	// Type returns the type of this driver, the network type this driver manages
76 86
 	Type() string
77 87
 
... ...
@@ -84,7 +94,7 @@ type Driver interface {
84 84
 type NetworkInfo interface {
85 85
 	// TableEventRegister registers driver interest in a given
86 86
 	// table name.
87
-	TableEventRegister(tableName string) error
87
+	TableEventRegister(tableName string, objType ObjectType) error
88 88
 }
89 89
 
90 90
 // InterfaceInfo provides a go interface for drivers to retrive
... ...
@@ -175,3 +185,28 @@ const (
175 175
 	// Delete event is generated when a table entry is deleted.
176 176
 	Delete
177 177
 )
178
+
179
+// ObjectType represents the type of object driver wants to store in libnetwork's networkDB
180
+type ObjectType int
181
+
182
+const (
183
+	// EndpointObject should be set for libnetwork endpoint object related data
184
+	EndpointObject ObjectType = 1 + iota
185
+	// NetworkObject should be set for libnetwork network object related data
186
+	NetworkObject
187
+	// OpaqueObject is for driver specific data with no corresponding libnetwork object
188
+	OpaqueObject
189
+)
190
+
191
+// IsValidType validates the passed in type against the valid object types
192
+func IsValidType(objType ObjectType) bool {
193
+	switch objType {
194
+	case EndpointObject:
195
+		fallthrough
196
+	case NetworkObject:
197
+		fallthrough
198
+	case OpaqueObject:
199
+		return true
200
+	}
201
+	return false
202
+}
... ...
@@ -575,6 +575,10 @@ func (d *driver) NetworkFree(id string) error {
575 575
 func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key string, value []byte) {
576 576
 }
577 577
 
578
+func (d *driver) DecodeTableEntry(tablename string, key string, value []byte) (string, map[string]string) {
579
+	return "", nil
580
+}
581
+
578 582
 // Create a new network using bridge plugin
579 583
 func (d *driver) CreateNetwork(id string, option map[string]interface{}, nInfo driverapi.NetworkInfo, ipV4Data, ipV6Data []driverapi.IPAMData) error {
580 584
 	if len(ipV4Data) == 0 || ipV4Data[0].Pool.String() == "0.0.0.0/0" {
... ...
@@ -140,7 +140,6 @@ func setupIPTablesInternal(bridgeIface string, addr net.Addr, icc, ipmasq, hairp
140 140
 		hpNatRule = iptRule{table: iptables.Nat, chain: "POSTROUTING", preArgs: []string{"-t", "nat"}, args: []string{"-m", "addrtype", "--src-type", "LOCAL", "-o", bridgeIface, "-j", "MASQUERADE"}}
141 141
 		skipDNAT  = iptRule{table: iptables.Nat, chain: DockerChain, preArgs: []string{"-t", "nat"}, args: []string{"-i", bridgeIface, "-j", "RETURN"}}
142 142
 		outRule   = iptRule{table: iptables.Filter, chain: "FORWARD", args: []string{"-i", bridgeIface, "!", "-o", bridgeIface, "-j", "ACCEPT"}}
143
-		inRule    = iptRule{table: iptables.Filter, chain: "FORWARD", args: []string{"-o", bridgeIface, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT"}}
144 143
 	)
145 144
 
146 145
 	// Set NAT.
... ...
@@ -173,11 +172,6 @@ func setupIPTablesInternal(bridgeIface string, addr net.Addr, icc, ipmasq, hairp
173 173
 		return err
174 174
 	}
175 175
 
176
-	// Set Accept on incoming packets for existing connections.
177
-	if err := programChainRule(inRule, "ACCEPT INCOMING", enable); err != nil {
178
-		return err
179
-	}
180
-
181 176
 	return nil
182 177
 }
183 178
 
... ...
@@ -35,6 +35,10 @@ func (d *driver) NetworkFree(id string) error {
35 35
 func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key string, value []byte) {
36 36
 }
37 37
 
38
+func (d *driver) DecodeTableEntry(tablename string, key string, value []byte) (string, map[string]string) {
39
+	return "", nil
40
+}
41
+
38 42
 func (d *driver) CreateNetwork(id string, option map[string]interface{}, nInfo driverapi.NetworkInfo, ipV4Data, ipV6Data []driverapi.IPAMData) error {
39 43
 	d.Lock()
40 44
 	defer d.Unlock()
... ...
@@ -108,3 +108,7 @@ func (d *driver) DiscoverDelete(dType discoverapi.DiscoveryType, data interface{
108 108
 
109 109
 func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key string, value []byte) {
110 110
 }
111
+
112
+func (d *driver) DecodeTableEntry(tablename string, key string, value []byte) (string, map[string]string) {
113
+	return "", nil
114
+}
... ...
@@ -110,3 +110,7 @@ func (d *driver) DiscoverDelete(dType discoverapi.DiscoveryType, data interface{
110 110
 
111 111
 func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key string, value []byte) {
112 112
 }
113
+
114
+func (d *driver) DecodeTableEntry(tablename string, key string, value []byte) (string, map[string]string) {
115
+	return "", nil
116
+}
... ...
@@ -35,6 +35,10 @@ func (d *driver) NetworkFree(id string) error {
35 35
 func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key string, value []byte) {
36 36
 }
37 37
 
38
+func (d *driver) DecodeTableEntry(tablename string, key string, value []byte) (string, map[string]string) {
39
+	return "", nil
40
+}
41
+
38 42
 func (d *driver) CreateNetwork(id string, option map[string]interface{}, nInfo driverapi.NetworkInfo, ipV4Data, ipV6Data []driverapi.IPAMData) error {
39 43
 	d.Lock()
40 44
 	defer d.Unlock()
... ...
@@ -20,7 +20,7 @@ import (
20 20
 )
21 21
 
22 22
 const (
23
-	mark         = uint32(0xD0C4E3)
23
+	r            = 0xD0C4E3
24 24
 	timeout      = 30
25 25
 	pktExpansion = 26 // SPI(4) + SeqN(4) + IV(8) + PadLength(1) + NextHeader(1) + ICV(8)
26 26
 )
... ...
@@ -31,6 +31,8 @@ const (
31 31
 	bidir
32 32
 )
33 33
 
34
+var spMark = netlink.XfrmMark{Value: uint32(r), Mask: 0xffffffff}
35
+
34 36
 type key struct {
35 37
 	value []byte
36 38
 	tag   uint32
... ...
@@ -201,7 +203,7 @@ func programMangle(vni uint32, add bool) (err error) {
201 201
 	var (
202 202
 		p      = strconv.FormatUint(uint64(vxlanPort), 10)
203 203
 		c      = fmt.Sprintf("0>>22&0x3C@12&0xFFFFFF00=%d", int(vni)<<8)
204
-		m      = strconv.FormatUint(uint64(mark), 10)
204
+		m      = strconv.FormatUint(uint64(r), 10)
205 205
 		chain  = "OUTPUT"
206 206
 		rule   = []string{"-p", "udp", "--dport", p, "-m", "u32", "--u32", c, "-j", "MARK", "--set-mark", m}
207 207
 		a      = "-A"
... ...
@@ -271,6 +273,7 @@ func programSA(localIP, remoteIP net.IP, spi *spi, k *key, dir int, add bool) (f
271 271
 			Proto: netlink.XFRM_PROTO_ESP,
272 272
 			Spi:   spi.reverse,
273 273
 			Mode:  netlink.XFRM_MODE_TRANSPORT,
274
+			Reqid: r,
274 275
 		}
275 276
 		if add {
276 277
 			rSA.Aead = buildAeadAlgo(k, spi.reverse)
... ...
@@ -296,6 +299,7 @@ func programSA(localIP, remoteIP net.IP, spi *spi, k *key, dir int, add bool) (f
296 296
 			Proto: netlink.XFRM_PROTO_ESP,
297 297
 			Spi:   spi.forward,
298 298
 			Mode:  netlink.XFRM_MODE_TRANSPORT,
299
+			Reqid: r,
299 300
 		}
300 301
 		if add {
301 302
 			fSA.Aead = buildAeadAlgo(k, spi.forward)
... ...
@@ -325,17 +329,18 @@ func programSP(fSA *netlink.XfrmState, rSA *netlink.XfrmState, add bool) error {
325 325
 		xfrmProgram = ns.NlHandle().XfrmPolicyAdd
326 326
 	}
327 327
 
328
-	fullMask := net.CIDRMask(8*len(fSA.Src), 8*len(fSA.Src))
328
+	// Create a congruent cidr
329
+	s := types.GetMinimalIP(fSA.Src)
330
+	d := types.GetMinimalIP(fSA.Dst)
331
+	fullMask := net.CIDRMask(8*len(s), 8*len(s))
329 332
 
330 333
 	fPol := &netlink.XfrmPolicy{
331
-		Src:     &net.IPNet{IP: fSA.Src, Mask: fullMask},
332
-		Dst:     &net.IPNet{IP: fSA.Dst, Mask: fullMask},
334
+		Src:     &net.IPNet{IP: s, Mask: fullMask},
335
+		Dst:     &net.IPNet{IP: d, Mask: fullMask},
333 336
 		Dir:     netlink.XFRM_DIR_OUT,
334 337
 		Proto:   17,
335 338
 		DstPort: 4789,
336
-		Mark: &netlink.XfrmMark{
337
-			Value: mark,
338
-		},
339
+		Mark:    &spMark,
339 340
 		Tmpls: []netlink.XfrmPolicyTmpl{
340 341
 			{
341 342
 				Src:   fSA.Src,
... ...
@@ -343,6 +348,7 @@ func programSP(fSA *netlink.XfrmState, rSA *netlink.XfrmState, add bool) error {
343 343
 				Proto: netlink.XFRM_PROTO_ESP,
344 344
 				Mode:  netlink.XFRM_MODE_TRANSPORT,
345 345
 				Spi:   fSA.Spi,
346
+				Reqid: r,
346 347
 			},
347 348
 		},
348 349
 	}
... ...
@@ -426,6 +432,8 @@ func (d *driver) secMapWalk(f func(string, []*spi) ([]*spi, bool)) error {
426 426
 }
427 427
 
428 428
 func (d *driver) setKeys(keys []*key) error {
429
+	// Remove any stale policy, state
430
+	clearEncryptionStates()
429 431
 	// Accept the encryption keys and clear any stale encryption map
430 432
 	d.Lock()
431 433
 	d.keys = keys
... ...
@@ -526,7 +534,7 @@ func updateNodeKey(lIP, aIP, rIP net.IP, idxs []*spi, curKeys []*key, newIdx, pr
526 526
 	}
527 527
 
528 528
 	if newIdx > -1 {
529
-		// +RSA2
529
+		// +rSA2
530 530
 		programSA(lIP, rIP, spis[newIdx], curKeys[newIdx], reverse, true)
531 531
 	}
532 532
 
... ...
@@ -535,16 +543,17 @@ func updateNodeKey(lIP, aIP, rIP net.IP, idxs []*spi, curKeys []*key, newIdx, pr
535 535
 		fSA2, _, _ := programSA(lIP, rIP, spis[priIdx], curKeys[priIdx], forward, true)
536 536
 
537 537
 		// +fSP2, -fSP1
538
-		fullMask := net.CIDRMask(8*len(fSA2.Src), 8*len(fSA2.Src))
538
+		s := types.GetMinimalIP(fSA2.Src)
539
+		d := types.GetMinimalIP(fSA2.Dst)
540
+		fullMask := net.CIDRMask(8*len(s), 8*len(s))
541
+
539 542
 		fSP1 := &netlink.XfrmPolicy{
540
-			Src:     &net.IPNet{IP: fSA2.Src, Mask: fullMask},
541
-			Dst:     &net.IPNet{IP: fSA2.Dst, Mask: fullMask},
543
+			Src:     &net.IPNet{IP: s, Mask: fullMask},
544
+			Dst:     &net.IPNet{IP: d, Mask: fullMask},
542 545
 			Dir:     netlink.XFRM_DIR_OUT,
543 546
 			Proto:   17,
544 547
 			DstPort: 4789,
545
-			Mark: &netlink.XfrmMark{
546
-				Value: mark,
547
-			},
548
+			Mark:    &spMark,
548 549
 			Tmpls: []netlink.XfrmPolicyTmpl{
549 550
 				{
550 551
 					Src:   fSA2.Src,
... ...
@@ -552,6 +561,7 @@ func updateNodeKey(lIP, aIP, rIP net.IP, idxs []*spi, curKeys []*key, newIdx, pr
552 552
 					Proto: netlink.XFRM_PROTO_ESP,
553 553
 					Mode:  netlink.XFRM_MODE_TRANSPORT,
554 554
 					Spi:   fSA2.Spi,
555
+					Reqid: r,
555 556
 				},
556 557
 			},
557 558
 		}
... ...
@@ -597,3 +607,33 @@ func (n *network) maxMTU() int {
597 597
 	}
598 598
 	return mtu
599 599
 }
600
+
601
+func clearEncryptionStates() {
602
+	nlh := ns.NlHandle()
603
+	spList, err := nlh.XfrmPolicyList(netlink.FAMILY_ALL)
604
+	if err != nil {
605
+		logrus.Warnf("Failed to retrieve SP list for cleanup: %v", err)
606
+	}
607
+	saList, err := nlh.XfrmStateList(netlink.FAMILY_ALL)
608
+	if err != nil {
609
+		logrus.Warnf("Failed to retrieve SA list for cleanup: %v", err)
610
+	}
611
+	for _, sp := range spList {
612
+		if sp.Mark != nil && sp.Mark.Value == spMark.Value {
613
+			if err := nlh.XfrmPolicyDel(&sp); err != nil {
614
+				logrus.Warnf("Failed to delete stale SP %s: %v", sp, err)
615
+				continue
616
+			}
617
+			logrus.Debugf("Removed stale SP: %s", sp)
618
+		}
619
+	}
620
+	for _, sa := range saList {
621
+		if sa.Reqid == r {
622
+			if err := nlh.XfrmStateDel(&sa); err != nil {
623
+				logrus.Warnf("Failed to delete stale SA %s: %v", sa, err)
624
+				continue
625
+			}
626
+			logrus.Debugf("Removed stale SA: %s", sa)
627
+		}
628
+	}
629
+}
... ...
@@ -145,6 +145,23 @@ func (d *driver) Join(nid, eid string, sboxKey string, jinfo driverapi.JoinInfo,
145 145
 	return nil
146 146
 }
147 147
 
148
+func (d *driver) DecodeTableEntry(tablename string, key string, value []byte) (string, map[string]string) {
149
+	if tablename != ovPeerTable {
150
+		logrus.Errorf("DecodeTableEntry: unexpected table name %s", tablename)
151
+		return "", nil
152
+	}
153
+
154
+	var peer PeerRecord
155
+	if err := proto.Unmarshal(value, &peer); err != nil {
156
+		logrus.Errorf("DecodeTableEntry: failed to unmarshal peer record for key %s: %v", key, err)
157
+		return "", nil
158
+	}
159
+
160
+	return key, map[string]string{
161
+		"Host IP": peer.TunnelEndpointIP,
162
+	}
163
+}
164
+
148 165
 func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key string, value []byte) {
149 166
 	if tableName != ovPeerTable {
150 167
 		logrus.Errorf("Unexpected table notification for table %s received", tableName)
... ...
@@ -159,7 +159,7 @@ func (d *driver) CreateNetwork(id string, option map[string]interface{}, nInfo d
159 159
 	}
160 160
 
161 161
 	if nInfo != nil {
162
-		if err := nInfo.TableEventRegister(ovPeerTable); err != nil {
162
+		if err := nInfo.TableEventRegister(ovPeerTable, driverapi.EndpointObject); err != nil {
163 163
 			return err
164 164
 		}
165 165
 	}
... ...
@@ -199,6 +199,10 @@ func (d *driver) CreateNetwork(id string, option map[string]interface{}, nInfo d
199 199
 func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key string, value []byte) {
200 200
 }
201 201
 
202
+func (d *driver) DecodeTableEntry(tablename string, key string, value []byte) (string, map[string]string) {
203
+	return "", nil
204
+}
205
+
202 206
 func (d *driver) DeleteNetwork(nid string) error {
203 207
 	return types.NotImplementedErrorf("not implemented")
204 208
 }
... ...
@@ -116,6 +116,10 @@ func (d *driver) NetworkFree(id string) error {
116 116
 func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key string, value []byte) {
117 117
 }
118 118
 
119
+func (d *driver) DecodeTableEntry(tablename string, key string, value []byte) (string, map[string]string) {
120
+	return "", nil
121
+}
122
+
119 123
 func (d *driver) CreateNetwork(id string, options map[string]interface{}, nInfo driverapi.NetworkInfo, ipV4Data, ipV6Data []driverapi.IPAMData) error {
120 124
 	create := &api.CreateNetworkRequest{
121 125
 		NetworkID: id,
... ...
@@ -175,6 +175,10 @@ func (d *driver) NetworkFree(id string) error {
175 175
 func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key string, value []byte) {
176 176
 }
177 177
 
178
+func (d *driver) DecodeTableEntry(tablename string, key string, value []byte) (string, map[string]string) {
179
+	return "", nil
180
+}
181
+
178 182
 func (d *driver) CreateNetwork(id string, option map[string]interface{}, nInfo driverapi.NetworkInfo, ipV4Data, ipV6Data []driverapi.IPAMData) error {
179 183
 	if len(ipV4Data) == 0 || ipV4Data[0].Pool.String() == "0.0.0.0/0" {
180 184
 		return types.BadRequestErrorf("ipv4 pool is empty")
... ...
@@ -149,6 +149,10 @@ func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key stri
149 149
 	d.peerAdd(nid, eid, addr.IP, addr.Mask, mac, vtep, true)
150 150
 }
151 151
 
152
+func (d *driver) DecodeTableEntry(tablename string, key string, value []byte) (string, map[string]string) {
153
+	return "", nil
154
+}
155
+
152 156
 // Leave method is invoked when a Sandbox detaches from an endpoint.
153 157
 func (d *driver) Leave(nid, eid string) error {
154 158
 	if err := validateID(nid, eid); err != nil {
... ...
@@ -153,7 +153,7 @@ func (d *driver) CreateNetwork(id string, option map[string]interface{}, nInfo d
153 153
 	}
154 154
 
155 155
 	if nInfo != nil {
156
-		if err := nInfo.TableEventRegister(ovPeerTable); err != nil {
156
+		if err := nInfo.TableEventRegister(ovPeerTable, driverapi.EndpointObject); err != nil {
157 157
 			return err
158 158
 		}
159 159
 	}
... ...
@@ -93,6 +93,10 @@ func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key stri
93 93
 	d.peerAdd(nid, eid, addr.IP, addr.Mask, mac, vtep, true)
94 94
 }
95 95
 
96
+func (d *driver) DecodeTableEntry(tablename string, key string, value []byte) (string, map[string]string) {
97
+	return "", nil
98
+}
99
+
96 100
 // Leave method is invoked when a Sandbox detaches from an endpoint.
97 101
 func (d *driver) Leave(nid, eid string) error {
98 102
 	if err := validateID(nid, eid); err != nil {
... ...
@@ -169,7 +169,7 @@ func (d *driver) CreateNetwork(id string, option map[string]interface{}, nInfo d
169 169
 	n.interfaceName = interfaceName
170 170
 
171 171
 	if nInfo != nil {
172
-		if err := nInfo.TableEventRegister(ovPeerTable); err != nil {
172
+		if err := nInfo.TableEventRegister(ovPeerTable, driverapi.EndpointObject); err != nil {
173 173
 			return err
174 174
 		}
175 175
 	}
... ...
@@ -183,6 +183,10 @@ func (c *networkConfiguration) processIPAM(id string, ipamV4Data, ipamV6Data []d
183 183
 func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key string, value []byte) {
184 184
 }
185 185
 
186
+func (d *driver) DecodeTableEntry(tablename string, key string, value []byte) (string, map[string]string) {
187
+	return "", nil
188
+}
189
+
186 190
 // Create a new network
187 191
 func (d *driver) CreateNetwork(id string, option map[string]interface{}, nInfo driverapi.NetworkInfo, ipV4Data, ipV6Data []driverapi.IPAMData) error {
188 192
 	if _, err := d.getNetwork(id); err == nil {
... ...
@@ -50,8 +50,7 @@ var (
50 50
 	bestEffortLock sync.Mutex
51 51
 	// ErrIptablesNotFound is returned when the rule is not found.
52 52
 	ErrIptablesNotFound = errors.New("Iptables not found")
53
-	probeOnce           sync.Once
54
-	firewalldOnce       sync.Once
53
+	initOnce            sync.Once
55 54
 )
56 55
 
57 56
 // ChainInfo defines the iptables chain.
... ...
@@ -86,22 +85,32 @@ func initFirewalld() {
86 86
 	}
87 87
 }
88 88
 
89
+func detectIptables() {
90
+	path, err := exec.LookPath("iptables")
91
+	if err != nil {
92
+		return
93
+	}
94
+	iptablesPath = path
95
+	supportsXlock = exec.Command(iptablesPath, "--wait", "-L", "-n").Run() == nil
96
+	mj, mn, mc, err := GetVersion()
97
+	if err != nil {
98
+		logrus.Warnf("Failed to read iptables version: %v", err)
99
+		return
100
+	}
101
+	supportsCOpt = supportsCOption(mj, mn, mc)
102
+}
103
+
104
+func initIptables() {
105
+	probe()
106
+	initFirewalld()
107
+	detectIptables()
108
+}
109
+
89 110
 func initCheck() error {
111
+	initOnce.Do(initIptables)
112
+
90 113
 	if iptablesPath == "" {
91
-		probeOnce.Do(probe)
92
-		firewalldOnce.Do(initFirewalld)
93
-		path, err := exec.LookPath("iptables")
94
-		if err != nil {
95
-			return ErrIptablesNotFound
96
-		}
97
-		iptablesPath = path
98
-		supportsXlock = exec.Command(iptablesPath, "--wait", "-L", "-n").Run() == nil
99
-		mj, mn, mc, err := GetVersion()
100
-		if err != nil {
101
-			logrus.Warnf("Failed to read iptables version: %v", err)
102
-			return nil
103
-		}
104
-		supportsCOpt = supportsCOption(mj, mn, mc)
114
+		return ErrIptablesNotFound
105 115
 	}
106 116
 	return nil
107 117
 }
... ...
@@ -189,6 +198,26 @@ func ProgramChain(c *ChainInfo, bridgeName string, hairpinMode, enable bool) err
189 189
 			}
190 190
 
191 191
 		}
192
+		establish := []string{
193
+			"-o", bridgeName,
194
+			"-m", "conntrack",
195
+			"--ctstate", "RELATED,ESTABLISHED",
196
+			"-j", "ACCEPT"}
197
+		if !Exists(Filter, "FORWARD", establish...) && enable {
198
+			insert := append([]string{string(Insert), "FORWARD"}, establish...)
199
+			if output, err := Raw(insert...); err != nil {
200
+				return err
201
+			} else if len(output) != 0 {
202
+				return fmt.Errorf("Could not create establish rule to %s: %s", c.Table, output)
203
+			}
204
+		} else if Exists(Filter, "FORWARD", establish...) && !enable {
205
+			del := append([]string{string(Delete), "FORWARD"}, establish...)
206
+			if output, err := Raw(del...); err != nil {
207
+				return err
208
+			} else if len(output) != 0 {
209
+				return fmt.Errorf("Could not delete establish rule from %s: %s", c.Table, output)
210
+			}
211
+		}
192 212
 	}
193 213
 	return nil
194 214
 }
... ...
@@ -353,7 +382,11 @@ func exists(native bool, table Table, chain string, rule ...string) bool {
353 353
 		table = Filter
354 354
 	}
355 355
 
356
-	initCheck()
356
+	if err := initCheck(); err != nil {
357
+		// The exists() signature does not allow us to return an error, but at least
358
+		// we can skip the (likely invalid) exec invocation.
359
+		return false
360
+	}
357 361
 
358 362
 	if supportsCOpt {
359 363
 		// if exit status is 0 then return true, the rule exists
... ...
@@ -436,9 +469,9 @@ func ExistChain(chain string, table Table) bool {
436 436
 	return false
437 437
 }
438 438
 
439
-// GetVersion reads the iptables version numbers
439
+// GetVersion reads the iptables version numbers during initialization
440 440
 func GetVersion() (major, minor, micro int, err error) {
441
-	out, err := Raw("--version")
441
+	out, err := exec.Command(iptablesPath, "--version").CombinedOutput()
442 442
 	if err == nil {
443 443
 		major, minor, micro = parseVersionNumbers(string(out))
444 444
 	}
... ...
@@ -74,6 +74,9 @@ type NetworkInfo interface {
74 74
 	// gossip cluster. For non-dynamic overlay networks and bridge networks it returns an
75 75
 	// empty slice
76 76
 	Peers() []networkdb.PeerInfo
77
+	//Services returns a map of services keyed by the service name with the details
78
+	//of all the tasks that belong to the service. Applicable only in swarm mode.
79
+	Services() map[string]ServiceInfo
77 80
 }
78 81
 
79 82
 // EndpointWalker is a client provided function which will be used to walk the Endpoints.
... ...
@@ -108,6 +111,11 @@ type servicePorts struct {
108 108
 	target   []serviceTarget
109 109
 }
110 110
 
111
+type networkDBTable struct {
112
+	name    string
113
+	objType driverapi.ObjectType
114
+}
115
+
111 116
 // IpamConf contains all the ipam related configurations for a network
112 117
 type IpamConf struct {
113 118
 	// The master address pool for containers and network interfaces
... ...
@@ -208,7 +216,7 @@ type network struct {
208 208
 	attachable   bool
209 209
 	inDelete     bool
210 210
 	ingress      bool
211
-	driverTables []string
211
+	driverTables []networkDBTable
212 212
 	dynamic      bool
213 213
 	sync.Mutex
214 214
 }
... ...
@@ -1607,11 +1615,18 @@ func (n *network) Labels() map[string]string {
1607 1607
 	return lbls
1608 1608
 }
1609 1609
 
1610
-func (n *network) TableEventRegister(tableName string) error {
1610
+func (n *network) TableEventRegister(tableName string, objType driverapi.ObjectType) error {
1611
+	if !driverapi.IsValidType(objType) {
1612
+		return fmt.Errorf("invalid object type %v in registering table, %s", objType, tableName)
1613
+	}
1614
+
1615
+	t := networkDBTable{
1616
+		name:    tableName,
1617
+		objType: objType,
1618
+	}
1611 1619
 	n.Lock()
1612 1620
 	defer n.Unlock()
1613
-
1614
-	n.driverTables = append(n.driverTables, tableName)
1621
+	n.driverTables = append(n.driverTables, t)
1615 1622
 	return nil
1616 1623
 }
1617 1624
 
... ...
@@ -307,6 +307,22 @@ func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
307 307
 	return nil
308 308
 }
309 309
 
310
+// GetTableByNetwork walks the networkdb by the give table and network id and
311
+// returns a map of keys and values
312
+func (nDB *NetworkDB) GetTableByNetwork(tname, nid string) map[string]interface{} {
313
+	entries := make(map[string]interface{})
314
+	nDB.indexes[byTable].WalkPrefix(fmt.Sprintf("/%s/%s", tname, nid), func(k string, v interface{}) bool {
315
+		entry := v.(*entry)
316
+		if entry.deleting {
317
+			return false
318
+		}
319
+		key := k[strings.LastIndex(k, "/")+1:]
320
+		entries[key] = entry.value
321
+		return false
322
+	})
323
+	return entries
324
+}
325
+
310 326
 // DeleteEntry deletes a table entry in NetworkDB for given (network,
311 327
 // table, key) tuple and if the NetworkDB is part of the cluster
312 328
 // propagates this event to the cluster.
... ...
@@ -18,6 +18,26 @@ func newService(name string, id string, ingressPorts []*PortConfig, aliases []st
18 18
 	}
19 19
 }
20 20
 
21
+func (c *controller) getLBIndex(sid, nid string, ingressPorts []*PortConfig) int {
22
+	skey := serviceKey{
23
+		id:    sid,
24
+		ports: portConfigs(ingressPorts).String(),
25
+	}
26
+	c.Lock()
27
+	s, ok := c.serviceBindings[skey]
28
+	c.Unlock()
29
+
30
+	if !ok {
31
+		return 0
32
+	}
33
+
34
+	s.Lock()
35
+	lb := s.loadBalancers[nid]
36
+	s.Unlock()
37
+
38
+	return int(lb.fwMark)
39
+}
40
+
21 41
 func (c *controller) cleanupServiceBindings(cleanupNID string) {
22 42
 	var cleanupFuncs []func()
23 43