Browse code

Vendoring Libnetwork library

- adding conntrack flush fix for docker/docker#8795

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>

Flavio Crisciani authored on 2017/04/11 09:11:18
Showing 12 changed files
... ...
@@ -24,7 +24,7 @@ github.com/RackSec/srslog 456df3a81436d29ba874f3590eeeee25d666f8a5
24 24
 github.com/imdario/mergo 0.2.1
25 25
 
26 26
 #get libnetwork packages
27
-github.com/docker/libnetwork ab8f7e61743aa7e54c5d0dad0551543adadc33cf
27
+github.com/docker/libnetwork b13e0604016a4944025aaff521d9c125850b0d04
28 28
 github.com/docker/go-events 18b43f1bc85d9cdd42c05a6cd2d444c7a200a894
29 29
 github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80
30 30
 github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec
... ...
@@ -47,6 +47,7 @@ import (
47 47
 	"container/heap"
48 48
 	"fmt"
49 49
 	"net"
50
+	"path/filepath"
50 51
 	"strings"
51 52
 	"sync"
52 53
 	"time"
... ...
@@ -979,6 +980,8 @@ func (c *controller) NewSandbox(containerID string, options ...SandboxOption) (s
979 979
 
980 980
 	if sb.ingress {
981 981
 		c.ingressSandbox = sb
982
+		sb.config.hostsPath = filepath.Join(c.cfg.Daemon.DataDir, "/network/files/hosts")
983
+		sb.config.resolvConfPath = filepath.Join(c.cfg.Daemon.DataDir, "/network/files/resolv.conf")
982 984
 		sb.id = "ingress_sbox"
983 985
 	}
984 986
 	c.Unlock()
... ...
@@ -1346,6 +1346,13 @@ func (d *driver) RevokeExternalConnectivity(nid, eid string) error {
1346 1346
 
1347 1347
 	endpoint.portMapping = nil
1348 1348
 
1349
+	// Clean the connection tracker state of the host for the specific endpoint
1350
+	// The host kernel keeps track of the connections (TCP and UDP), so if a new endpoint gets the same IP of
1351
+	// this one (that is going down), is possible that some of the packets would not be routed correctly inside
1352
+	// the new endpoint
1353
+	// Deeper details: https://github.com/docker/docker/issues/8795
1354
+	clearEndpointConnections(d.nlh, endpoint)
1355
+
1349 1356
 	if err = d.storeUpdate(endpoint); err != nil {
1350 1357
 		return fmt.Errorf("failed to update bridge endpoint %s to store: %v", endpoint.id[0:7], err)
1351 1358
 	}
... ...
@@ -7,6 +7,7 @@ import (
7 7
 
8 8
 	"github.com/Sirupsen/logrus"
9 9
 	"github.com/docker/libnetwork/iptables"
10
+	"github.com/vishvananda/netlink"
10 11
 )
11 12
 
12 13
 // DockerChain: DOCKER iptable chain name
... ...
@@ -348,3 +349,15 @@ func setupInternalNetworkRules(bridgeIface string, addr net.Addr, icc, insert bo
348 348
 	}
349 349
 	return nil
350 350
 }
351
+
352
+func clearEndpointConnections(nlh *netlink.Handle, ep *bridgeEndpoint) {
353
+	var ipv4List []net.IP
354
+	var ipv6List []net.IP
355
+	if ep.addr != nil {
356
+		ipv4List = append(ipv4List, ep.addr.IP)
357
+	}
358
+	if ep.addrv6 != nil {
359
+		ipv6List = append(ipv6List, ep.addrv6.IP)
360
+	}
361
+	iptables.DeleteConntrackEntries(nlh, ipv4List, ipv6List)
362
+}
... ...
@@ -665,7 +665,7 @@ func (ep *endpoint) hasInterface(iName string) bool {
665 665
 
666 666
 func (ep *endpoint) Leave(sbox Sandbox, options ...EndpointOption) error {
667 667
 	if sbox == nil || sbox.ID() == "" || sbox.Key() == "" {
668
-		return types.BadRequestErrorf("invalid Sandbox passed to enpoint leave: %v", sbox)
668
+		return types.BadRequestErrorf("invalid Sandbox passed to endpoint leave: %v", sbox)
669 669
 	}
670 670
 
671 671
 	sb, ok := sbox.(*sandbox)
... ...
@@ -129,7 +129,7 @@ type ActiveEndpointsError struct {
129 129
 }
130 130
 
131 131
 func (aee *ActiveEndpointsError) Error() string {
132
-	return fmt.Sprintf("network %s has active endpoints", aee.name)
132
+	return fmt.Sprintf("network %s id %s has active endpoints", aee.name, aee.id)
133 133
 }
134 134
 
135 135
 // Forbidden denotes the type of this error
136 136
new file mode 100644
... ...
@@ -0,0 +1,59 @@
0
+package iptables
1
+
2
+import (
3
+	"errors"
4
+	"net"
5
+	"syscall"
6
+
7
+	"github.com/Sirupsen/logrus"
8
+	"github.com/vishvananda/netlink"
9
+)
10
+
11
+var (
12
+	// ErrConntrackNotConfigurable means that conntrack module is not loaded or does not have the netlink module loaded
13
+	ErrConntrackNotConfigurable = errors.New("conntrack is not available")
14
+)
15
+
16
+// IsConntrackProgrammable returns true if the handle supports the NETLINK_NETFILTER and the base modules are loaded
17
+func IsConntrackProgrammable(nlh *netlink.Handle) bool {
18
+	return nlh.SupportsNetlinkFamily(syscall.NETLINK_NETFILTER)
19
+}
20
+
21
+// DeleteConntrackEntries deletes all the conntrack connections on the host for the specified IP
22
+// Returns the number of flows deleted for IPv4, IPv6 else error
23
+func DeleteConntrackEntries(nlh *netlink.Handle, ipv4List []net.IP, ipv6List []net.IP) (uint, uint, error) {
24
+	if !IsConntrackProgrammable(nlh) {
25
+		return 0, 0, ErrConntrackNotConfigurable
26
+	}
27
+
28
+	var totalIPv4FlowPurged uint
29
+	for _, ipAddress := range ipv4List {
30
+		flowPurged, err := purgeConntrackState(nlh, syscall.AF_INET, ipAddress)
31
+		if err != nil {
32
+			logrus.Warnf("Failed to delete conntrack state for %s: %v", ipAddress, err)
33
+			continue
34
+		}
35
+		totalIPv4FlowPurged += flowPurged
36
+	}
37
+
38
+	var totalIPv6FlowPurged uint
39
+	for _, ipAddress := range ipv6List {
40
+		flowPurged, err := purgeConntrackState(nlh, syscall.AF_INET6, ipAddress)
41
+		if err != nil {
42
+			logrus.Warnf("Failed to delete conntrack state for %s: %v", ipAddress, err)
43
+			continue
44
+		}
45
+		totalIPv6FlowPurged += flowPurged
46
+	}
47
+
48
+	logrus.Debugf("DeleteConntrackEntries purged ipv4:%d, ipv6:%d", totalIPv4FlowPurged, totalIPv6FlowPurged)
49
+	return totalIPv4FlowPurged, totalIPv6FlowPurged, nil
50
+}
51
+
52
+func purgeConntrackState(nlh *netlink.Handle, family netlink.InetFamily, ipAddress net.IP) (uint, error) {
53
+	filter := &netlink.ConntrackFilter{}
54
+	// NOTE: doing the flush using the ipAddress is safe because today there cannot be multiple networks with the same subnet
55
+	// so it will not be possible to flush flows that are of other containers
56
+	filter.AddIP(netlink.ConntrackNatAnyIP, ipAddress)
57
+	return nlh.ConntrackDeleteFilter(netlink.ConntrackTable, family, filter)
58
+}
... ...
@@ -100,14 +100,14 @@ func detectIptables() {
100 100
 	supportsCOpt = supportsCOption(mj, mn, mc)
101 101
 }
102 102
 
103
-func initIptables() {
103
+func initDependencies() {
104 104
 	probe()
105 105
 	initFirewalld()
106 106
 	detectIptables()
107 107
 }
108 108
 
109 109
 func initCheck() error {
110
-	initOnce.Do(initIptables)
110
+	initOnce.Do(initDependencies)
111 111
 
112 112
 	if iptablesPath == "" {
113 113
 		return ErrIptablesNotFound
... ...
@@ -88,12 +88,25 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
88 88
 }
89 89
 
90 90
 func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
91
+	var flushEntries bool
91 92
 	// Update our local clock if the received messages has newer
92 93
 	// time.
93 94
 	nDB.networkClock.Witness(nEvent.LTime)
94 95
 
95 96
 	nDB.Lock()
96
-	defer nDB.Unlock()
97
+	defer func() {
98
+		nDB.Unlock()
99
+		// When a node leaves a network on the last task removal cleanup the
100
+		// local entries for this network & node combination. When the tasks
101
+		// on a network are removed we could have missed the gossip updates.
102
+		// Not doing this cleanup can leave stale entries because bulksyncs
103
+		// from the node will no longer include this network state.
104
+		//
105
+		// deleteNodeNetworkEntries takes nDB lock.
106
+		if flushEntries {
107
+			nDB.deleteNodeNetworkEntries(nEvent.NetworkID, nEvent.NodeName)
108
+		}
109
+	}()
97 110
 
98 111
 	if nEvent.NodeName == nDB.config.NodeName {
99 112
 		return false
... ...
@@ -121,6 +134,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
121 121
 		n.leaving = nEvent.Type == NetworkEventTypeLeave
122 122
 		if n.leaving {
123 123
 			n.reapTime = reapInterval
124
+			flushEntries = true
124 125
 		}
125 126
 
126 127
 		nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName)
... ...
@@ -372,6 +372,37 @@ func (nDB *NetworkDB) deleteNetworkEntriesForNode(deletedNode string) {
372 372
 	nDB.Unlock()
373 373
 }
374 374
 
375
+func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
376
+	nDB.Lock()
377
+	nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid),
378
+		func(path string, v interface{}) bool {
379
+			oldEntry := v.(*entry)
380
+			params := strings.Split(path[1:], "/")
381
+			nid := params[0]
382
+			tname := params[1]
383
+			key := params[2]
384
+
385
+			if oldEntry.node != node {
386
+				return false
387
+			}
388
+
389
+			entry := &entry{
390
+				ltime:    oldEntry.ltime,
391
+				node:     node,
392
+				value:    oldEntry.value,
393
+				deleting: true,
394
+				reapTime: reapInterval,
395
+			}
396
+
397
+			nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
398
+			nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
399
+
400
+			nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
401
+			return false
402
+		})
403
+	nDB.Unlock()
404
+}
405
+
375 406
 func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
376 407
 	nDB.Lock()
377 408
 	nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
... ...
@@ -75,13 +75,28 @@ func NlHandle() *netlink.Handle {
75 75
 
76 76
 func getSupportedNlFamilies() []int {
77 77
 	fams := []int{syscall.NETLINK_ROUTE}
78
+	// NETLINK_XFRM test
78 79
 	if err := loadXfrmModules(); err != nil {
79 80
 		if checkXfrmSocket() != nil {
80 81
 			logrus.Warnf("Could not load necessary modules for IPSEC rules: %v", err)
81
-			return fams
82
+		} else {
83
+			fams = append(fams, syscall.NETLINK_XFRM)
82 84
 		}
85
+	} else {
86
+		fams = append(fams, syscall.NETLINK_XFRM)
83 87
 	}
84
-	return append(fams, syscall.NETLINK_XFRM)
88
+	// NETLINK_NETFILTER test
89
+	if err := loadNfConntrackModules(); err != nil {
90
+		if checkNfSocket() != nil {
91
+			logrus.Warnf("Could not load necessary modules for Conntrack: %v", err)
92
+		} else {
93
+			fams = append(fams, syscall.NETLINK_NETFILTER)
94
+		}
95
+	} else {
96
+		fams = append(fams, syscall.NETLINK_NETFILTER)
97
+	}
98
+
99
+	return fams
85 100
 }
86 101
 
87 102
 func loadXfrmModules() error {
... ...
@@ -103,3 +118,23 @@ func checkXfrmSocket() error {
103 103
 	syscall.Close(fd)
104 104
 	return nil
105 105
 }
106
+
107
+func loadNfConntrackModules() error {
108
+	if out, err := exec.Command("modprobe", "-va", "nf_conntrack").CombinedOutput(); err != nil {
109
+		return fmt.Errorf("Running modprobe nf_conntrack failed with message: `%s`, error: %v", strings.TrimSpace(string(out)), err)
110
+	}
111
+	if out, err := exec.Command("modprobe", "-va", "nf_conntrack_netlink").CombinedOutput(); err != nil {
112
+		return fmt.Errorf("Running modprobe nf_conntrack_netlink failed with message: `%s`, error: %v", strings.TrimSpace(string(out)), err)
113
+	}
114
+	return nil
115
+}
116
+
117
+// API check on required nf_conntrack* modules (nf_conntrack, nf_conntrack_netlink)
118
+func checkNfSocket() error {
119
+	fd, err := syscall.Socket(syscall.AF_NETLINK, syscall.SOCK_RAW, syscall.NETLINK_NETFILTER)
120
+	if err != nil {
121
+		return err
122
+	}
123
+	syscall.Close(fd)
124
+	return nil
125
+}
... ...
@@ -644,13 +644,6 @@ func (sb *sandbox) SetKey(basePath string) error {
644 644
 	sb.Lock()
645 645
 	sb.osSbox = osSbox
646 646
 	sb.Unlock()
647
-	defer func() {
648
-		if err != nil {
649
-			sb.Lock()
650
-			sb.osSbox = nil
651
-			sb.Unlock()
652
-		}
653
-	}()
654 647
 
655 648
 	// If the resolver was setup before stop it and set it up in the
656 649
 	// new osl sandbox.