Browse code

Vendoring libnetwork @45b4086 for 1.13.x

Signed-off-by: Alessandro Boch <aboch@docker.com>

Alessandro Boch authored on 2017/02/03 05:58:16
Showing 13 changed files
... ...
@@ -23,7 +23,7 @@ github.com/RackSec/srslog 456df3a81436d29ba874f3590eeeee25d666f8a5
23 23
 github.com/imdario/mergo 0.2.1
24 24
 
25 25
 #get libnetwork packages
26
-github.com/docker/libnetwork 2c8b6838deee7ab8263b4206980f6623db7279c2
26
+github.com/docker/libnetwork 45b40861e677e37cf27bc184eca5af92f8cdd32d
27 27
 github.com/docker/go-events 18b43f1bc85d9cdd42c05a6cd2d444c7a200a894
28 28
 github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80
29 29
 github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec
... ...
@@ -3,6 +3,7 @@ package libnetwork
3 3
 //go:generate protoc -I.:Godeps/_workspace/src/github.com/gogo/protobuf  --gogo_out=import_path=github.com/docker/libnetwork,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. agent.proto
4 4
 
5 5
 import (
6
+	"encoding/json"
6 7
 	"fmt"
7 8
 	"net"
8 9
 	"os"
... ...
@@ -285,6 +286,7 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr st
285 285
 	}
286 286
 
287 287
 	ch, cancel := nDB.Watch("endpoint_table", "", "")
288
+	nodeCh, cancel := nDB.Watch(networkdb.NodeTable, "", "")
288 289
 
289 290
 	c.Lock()
290 291
 	c.agent = &agent{
... ...
@@ -297,6 +299,7 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr st
297 297
 	c.Unlock()
298 298
 
299 299
 	go c.handleTableEvents(ch, c.handleEpTableEvent)
300
+	go c.handleTableEvents(nodeCh, c.handleNodeTableEvent)
300 301
 
301 302
 	drvEnc := discoverapi.DriverEncryptionConfig{}
302 303
 	keys, tags = c.getKeys(subsysIPSec)
... ...
@@ -634,6 +637,31 @@ func (n *network) handleDriverTableEvent(ev events.Event) {
634 634
 	d.EventNotify(etype, n.ID(), tname, key, value)
635 635
 }
636 636
 
637
+func (c *controller) handleNodeTableEvent(ev events.Event) {
638
+	var (
639
+		value    []byte
640
+		isAdd    bool
641
+		nodeAddr networkdb.NodeAddr
642
+	)
643
+	switch event := ev.(type) {
644
+	case networkdb.CreateEvent:
645
+		value = event.Value
646
+		isAdd = true
647
+	case networkdb.DeleteEvent:
648
+		value = event.Value
649
+	case networkdb.UpdateEvent:
650
+		logrus.Errorf("Unexpected update node table event = %#v", event)
651
+	}
652
+
653
+	err := json.Unmarshal(value, &nodeAddr)
654
+	if err != nil {
655
+		logrus.Errorf("Error unmarshalling node table event %v", err)
656
+		return
657
+	}
658
+	c.processNodeDiscovery([]net.IP{nodeAddr.Addr}, isAdd)
659
+
660
+}
661
+
637 662
 func (c *controller) handleEpTableEvent(ev events.Event) {
638 663
 	var (
639 664
 		nid   string
... ...
@@ -1,8 +1,6 @@
1 1
 package config
2 2
 
3 3
 import (
4
-	"fmt"
5
-	"regexp"
6 4
 	"strings"
7 5
 
8 6
 	"github.com/BurntSushi/toml"
... ...
@@ -17,12 +15,6 @@ import (
17 17
 	"github.com/docker/libnetwork/osl"
18 18
 )
19 19
 
20
-// restrictedNameRegex represents the regular expression which regulates the allowed network or endpoint names.
21
-const restrictedNameRegex = `^[\w]+[\w-. ]*[\w]+$`
22
-
23
-// RestrictedNamePattern is a regular expression to validate names against the collection of restricted characters.
24
-var restrictedNamePattern = regexp.MustCompile(restrictedNameRegex)
25
-
26 20
 // Config encapsulates configurations of various Libnetwork components
27 21
 type Config struct {
28 22
 	Daemon          DaemonCfg
... ...
@@ -240,12 +232,12 @@ func (c *Config) ProcessOptions(options ...Option) {
240 240
 	}
241 241
 }
242 242
 
243
-// ValidateName validates configuration objects supported by libnetwork
244
-func ValidateName(name string) error {
245
-	if !restrictedNamePattern.MatchString(name) {
246
-		return fmt.Errorf("%q includes invalid characters, resource name has to conform to %q", name, restrictedNameRegex)
243
+// IsValidName validates configuration objects supported by libnetwork
244
+func IsValidName(name string) bool {
245
+	if strings.TrimSpace(name) == "" {
246
+		return false
247 247
 	}
248
-	return nil
248
+	return true
249 249
 }
250 250
 
251 251
 // OptionLocalKVProvider function returns an option setter for kvstore provider
... ...
@@ -567,6 +567,12 @@ func (c *controller) pushNodeDiscovery(d driverapi.Driver, cap driverapi.Capabil
567 567
 	if c.cfg != nil {
568 568
 		addr := strings.Split(c.cfg.Cluster.Address, ":")
569 569
 		self = net.ParseIP(addr[0])
570
+		// if external kvstore is not configured, try swarm-mode config
571
+		if self == nil {
572
+			if agent := c.getAgent(); agent != nil {
573
+				self = net.ParseIP(agent.advertiseAddr)
574
+			}
575
+		}
570 576
 	}
571 577
 
572 578
 	if d == nil || cap.DataScope != datastore.GlobalScope || nodes == nil {
... ...
@@ -647,8 +653,8 @@ func (c *controller) NewNetwork(networkType, name string, id string, options ...
647 647
 		}
648 648
 	}
649 649
 
650
-	if err := config.ValidateName(name); err != nil {
651
-		return nil, ErrInvalidName(err.Error())
650
+	if !config.IsValidName(name) {
651
+		return nil, ErrInvalidName(name)
652 652
 	}
653 653
 
654 654
 	if id == "" {
... ...
@@ -69,7 +69,7 @@ func (ii ErrInvalidID) Error() string {
69 69
 func (ii ErrInvalidID) BadRequest() {}
70 70
 
71 71
 // ErrInvalidName is returned when a query-by-name or resource create method is
72
-// invoked with an invalid name parameter
72
+// invoked with an empty name parameter
73 73
 type ErrInvalidName string
74 74
 
75 75
 func (in ErrInvalidName) Error() string {
... ...
@@ -42,6 +42,7 @@ type Destination struct {
42 42
 // Handle provides a namespace specific ipvs handle to program ipvs
43 43
 // rules.
44 44
 type Handle struct {
45
+	seq  uint32
45 46
 	sock *nl.NetlinkSocket
46 47
 }
47 48
 
... ...
@@ -82,6 +83,11 @@ func (i *Handle) NewService(s *Service) error {
82 82
 	return i.doCmd(s, nil, ipvsCmdNewService)
83 83
 }
84 84
 
85
+// IsServicePresent queries for the ipvs service in the passed handle.
86
+func (i *Handle) IsServicePresent(s *Service) bool {
87
+	return nil == i.doCmd(s, nil, ipvsCmdGetService)
88
+}
89
+
85 90
 // UpdateService updates an already existing service in the passed
86 91
 // handle.
87 92
 func (i *Handle) UpdateService(s *Service) error {
... ...
@@ -10,6 +10,7 @@ import (
10 10
 	"os/exec"
11 11
 	"strings"
12 12
 	"sync"
13
+	"sync/atomic"
13 14
 	"syscall"
14 15
 	"unsafe"
15 16
 
... ...
@@ -118,6 +119,7 @@ func fillDestinaton(d *Destination) nl.NetlinkRequestData {
118 118
 
119 119
 func (i *Handle) doCmd(s *Service, d *Destination, cmd uint8) error {
120 120
 	req := newIPVSRequest(cmd)
121
+	req.Seq = atomic.AddUint32(&i.seq, 1)
121 122
 	req.AddData(fillService(s))
122 123
 
123 124
 	if d != nil {
... ...
@@ -206,7 +208,7 @@ done:
206 206
 		}
207 207
 		for _, m := range msgs {
208 208
 			if m.Header.Seq != req.Seq {
209
-				return nil, fmt.Errorf("Wrong Seq nr %d, expected %d", m.Header.Seq, req.Seq)
209
+				continue
210 210
 			}
211 211
 			if m.Header.Pid != pid {
212 212
 				return nil, fmt.Errorf("Wrong pid %d, expected %d", m.Header.Pid, pid)
... ...
@@ -871,9 +871,8 @@ func (n *network) addEndpoint(ep *endpoint) error {
871 871
 
872 872
 func (n *network) CreateEndpoint(name string, options ...EndpointOption) (Endpoint, error) {
873 873
 	var err error
874
-
875
-	if err = config.ValidateName(name); err != nil {
876
-		return nil, ErrInvalidName(err.Error())
874
+	if !config.IsValidName(name) {
875
+		return nil, ErrInvalidName(name)
877 876
 	}
878 877
 
879 878
 	if _, err = n.EndpointByName(name); err == nil {
... ...
@@ -1,12 +1,28 @@
1 1
 package networkdb
2 2
 
3
-import "github.com/hashicorp/memberlist"
3
+import (
4
+	"encoding/json"
5
+	"net"
6
+
7
+	"github.com/Sirupsen/logrus"
8
+	"github.com/hashicorp/memberlist"
9
+)
4 10
 
5 11
 type eventDelegate struct {
6 12
 	nDB *NetworkDB
7 13
 }
8 14
 
15
+func (e *eventDelegate) broadcastNodeEvent(addr net.IP, op opType) {
16
+	value, err := json.Marshal(&NodeAddr{addr})
17
+	if err == nil {
18
+		e.nDB.broadcaster.Write(makeEvent(op, NodeTable, "", "", value))
19
+	} else {
20
+		logrus.Errorf("Error marshalling node broadcast event %s", addr.String())
21
+	}
22
+}
23
+
9 24
 func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
25
+	e.broadcastNodeEvent(mn.Addr, opCreate)
10 26
 	e.nDB.Lock()
11 27
 	// In case the node is rejoining after a failure or leave,
12 28
 	// wait until an explicit join message arrives before adding
... ...
@@ -24,6 +40,7 @@ func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
24 24
 }
25 25
 
26 26
 func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {
27
+	e.broadcastNodeEvent(mn.Addr, opDelete)
27 28
 	e.nDB.deleteNodeTableEntries(mn.Name)
28 29
 	e.nDB.deleteNetworkEntriesForNode(mn.Name)
29 30
 	e.nDB.Lock()
... ...
@@ -1,6 +1,10 @@
1 1
 package networkdb
2 2
 
3
-import "github.com/docker/go-events"
3
+import (
4
+	"net"
5
+
6
+	"github.com/docker/go-events"
7
+)
4 8
 
5 9
 type opType uint8
6 10
 
... ...
@@ -17,6 +21,14 @@ type event struct {
17 17
 	Value     []byte
18 18
 }
19 19
 
20
+// NodeTable represents table event for node join and leave
21
+const NodeTable = "NodeTable"
22
+
23
+// NodeAddr represents the value carried for node event in NodeTable
24
+type NodeAddr struct {
25
+	Addr net.IP
26
+}
27
+
20 28
 // CreateEvent generates a table entry create event to the watchers
21 29
 type CreateEvent event
22 30
 
... ...
@@ -61,11 +61,6 @@ func (c *controller) cleanupServiceBindings(cleanupNID string) {
61 61
 }
62 62
 
63 63
 func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, ingressPorts []*PortConfig, aliases []string, ip net.IP) error {
64
-	var (
65
-		s          *service
66
-		addService bool
67
-	)
68
-
69 64
 	n, err := c.NetworkByID(nid)
70 65
 	if err != nil {
71 66
 		return err
... ...
@@ -123,11 +118,6 @@ func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, i
123 123
 		fwMarkCtrMu.Unlock()
124 124
 
125 125
 		s.loadBalancers[nid] = lb
126
-
127
-		// Since we just created this load balancer make sure
128
-		// we add a new service service in IPVS rules.
129
-		addService = true
130
-
131 126
 	}
132 127
 
133 128
 	lb.backEnds[eid] = ip
... ...
@@ -135,7 +125,7 @@ func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, i
135 135
 	// Add loadbalancer service and backend in all sandboxes in
136 136
 	// the network only if vip is valid.
137 137
 	if len(vip) != 0 {
138
-		n.(*network).addLBBackend(ip, vip, lb.fwMark, ingressPorts, addService)
138
+		n.(*network).addLBBackend(ip, vip, lb.fwMark, ingressPorts)
139 139
 	}
140 140
 
141 141
 	return nil
... ...
@@ -68,7 +68,7 @@ func (sb *sandbox) populateLoadbalancers(ep *endpoint) {
68 68
 
69 69
 	if n.ingress {
70 70
 		if err := addRedirectRules(sb.Key(), eIP, ep.ingressPorts); err != nil {
71
-			logrus.Errorf("Failed to add redirect rules for ep %s: %v", ep.Name(), err)
71
+			logrus.Errorf("Failed to add redirect rules for ep %s (%s): %v", ep.Name(), ep.ID()[0:7], err)
72 72
 		}
73 73
 	}
74 74
 
... ...
@@ -97,20 +97,16 @@ func (sb *sandbox) populateLoadbalancers(ep *endpoint) {
97 97
 		}
98 98
 
99 99
 		lb.service.Lock()
100
-		addService := true
101 100
 		for _, ip := range lb.backEnds {
102
-			sb.addLBBackend(ip, lb.vip, lb.fwMark, lb.service.ingressPorts,
103
-				eIP, gwIP, addService, n.ingress)
104
-			addService = false
101
+			sb.addLBBackend(ip, lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, gwIP, n.ingress)
105 102
 		}
106 103
 		lb.service.Unlock()
107 104
 	}
108 105
 }
109 106
 
110 107
 // Add loadbalancer backend to all sandboxes which has a connection to
111
-// this network. If needed add the service as well, as specified by
112
-// the addService bool.
113
-func (n *network) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, addService bool) {
108
+// this network. If needed add the service as well.
109
+func (n *network) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*PortConfig) {
114 110
 	n.WalkEndpoints(func(e Endpoint) bool {
115 111
 		ep := e.(*endpoint)
116 112
 		if sb, ok := ep.getSandbox(); ok {
... ...
@@ -123,7 +119,7 @@ func (n *network) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po
123 123
 				gwIP = ep.Iface().Address().IP
124 124
 			}
125 125
 
126
-			sb.addLBBackend(ip, vip, fwMark, ingressPorts, ep.Iface().Address(), gwIP, addService, n.ingress)
126
+			sb.addLBBackend(ip, vip, fwMark, ingressPorts, ep.Iface().Address(), gwIP, n.ingress)
127 127
 		}
128 128
 
129 129
 		return false
... ...
@@ -154,7 +150,7 @@ func (n *network) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Por
154 154
 }
155 155
 
156 156
 // Add loadbalancer backend into one connected sandbox.
157
-func (sb *sandbox) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, gwIP net.IP, addService bool, isIngressNetwork bool) {
157
+func (sb *sandbox) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, gwIP net.IP, isIngressNetwork bool) {
158 158
 	if sb.osSbox == nil {
159 159
 		return
160 160
 	}
... ...
@@ -165,7 +161,7 @@ func (sb *sandbox) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*P
165 165
 
166 166
 	i, err := ipvs.New(sb.Key())
167 167
 	if err != nil {
168
-		logrus.Errorf("Failed to create an ipvs handle for sbox %s: %v", sb.Key(), err)
168
+		logrus.Errorf("Failed to create an ipvs handle for sbox %s (%s,%s) for lb addition: %v", sb.ID()[0:7], sb.ContainerID()[0:7], sb.Key(), err)
169 169
 		return
170 170
 	}
171 171
 	defer i.Close()
... ...
@@ -176,7 +172,7 @@ func (sb *sandbox) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*P
176 176
 		SchedName:     ipvs.RoundRobin,
177 177
 	}
178 178
 
179
-	if addService {
179
+	if !i.IsServicePresent(s) {
180 180
 		var filteredPorts []*PortConfig
181 181
 		if sb.ingress {
182 182
 			filteredPorts = filterPortConfigs(ingressPorts, false)
... ...
@@ -186,14 +182,14 @@ func (sb *sandbox) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*P
186 186
 			}
187 187
 		}
188 188
 
189
-		logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v", vip, fwMark, ingressPorts)
189
+		logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v in sbox %s (%s)", vip, fwMark, ingressPorts, sb.ID()[0:7], sb.ContainerID()[0:7])
190 190
 		if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, eIP, false); err != nil {
191
-			logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
191
+			logrus.Errorf("Failed to add firewall mark rule in sbox %s (%s): %v", sb.ID()[0:7], sb.ContainerID()[0:7], err)
192 192
 			return
193 193
 		}
194 194
 
195
-		if err := i.NewService(s); err != nil {
196
-			logrus.Errorf("Failed to create a new service for vip %s fwmark %d: %v", vip, fwMark, err)
195
+		if err := i.NewService(s); err != nil && err != syscall.EEXIST {
196
+			logrus.Errorf("Failed to create a new service for vip %s fwmark %d in sbox %s (%s): %v", vip, fwMark, sb.ID()[0:7], sb.ContainerID()[0:7], err)
197 197
 			return
198 198
 		}
199 199
 	}
... ...
@@ -208,7 +204,7 @@ func (sb *sandbox) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*P
208 208
 	// destination.
209 209
 	s.SchedName = ""
210 210
 	if err := i.NewDestination(s, d); err != nil && err != syscall.EEXIST {
211
-		logrus.Errorf("Failed to create real server %s for vip %s fwmark %d in sb %s: %v", ip, vip, fwMark, sb.containerID, err)
211
+		logrus.Errorf("Failed to create real server %s for vip %s fwmark %d in sbox %s (%s): %v", ip, vip, fwMark, sb.ID()[0:7], sb.ContainerID()[0:7], err)
212 212
 	}
213 213
 }
214 214
 
... ...
@@ -224,7 +220,7 @@ func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po
224 224
 
225 225
 	i, err := ipvs.New(sb.Key())
226 226
 	if err != nil {
227
-		logrus.Errorf("Failed to create an ipvs handle for sbox %s: %v", sb.Key(), err)
227
+		logrus.Errorf("Failed to create an ipvs handle for sbox %s (%s,%s) for lb removal: %v", sb.ID()[0:7], sb.ContainerID()[0:7], sb.Key(), err)
228 228
 		return
229 229
 	}
230 230
 	defer i.Close()
... ...
@@ -240,14 +236,14 @@ func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po
240 240
 		Weight:        1,
241 241
 	}
242 242
 
243
-	if err := i.DelDestination(s, d); err != nil {
244
-		logrus.Infof("Failed to delete real server %s for vip %s fwmark %d: %v", ip, vip, fwMark, err)
243
+	if err := i.DelDestination(s, d); err != nil && err != syscall.ENOENT {
244
+		logrus.Errorf("Failed to delete real server %s for vip %s fwmark %d in sbox %s (%s): %v", ip, vip, fwMark, sb.ID()[0:7], sb.ContainerID()[0:7], err)
245 245
 	}
246 246
 
247 247
 	if rmService {
248 248
 		s.SchedName = ipvs.RoundRobin
249
-		if err := i.DelService(s); err != nil {
250
-			logrus.Errorf("Failed to delete a new service for vip %s fwmark %d: %v", vip, fwMark, err)
249
+		if err := i.DelService(s); err != nil && err != syscall.ENOENT {
250
+			logrus.Errorf("Failed to delete service for vip %s fwmark %d in sbox %s (%s): %v", vip, fwMark, sb.ID()[0:7], sb.ContainerID()[0:7], err)
251 251
 		}
252 252
 
253 253
 		var filteredPorts []*PortConfig
... ...
@@ -259,7 +255,7 @@ func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po
259 259
 		}
260 260
 
261 261
 		if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, eIP, true); err != nil {
262
-			logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
262
+			logrus.Errorf("Failed to delete firewall mark rule in sbox %s (%s): %v", sb.ID()[0:7], sb.ContainerID()[0:7], err)
263 263
 		}
264 264
 	}
265 265
 }
... ...
@@ -2,7 +2,7 @@ package libnetwork
2 2
 
3 3
 import "net"
4 4
 
5
-func (n *network) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, addService bool) {
5
+func (n *network) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*PortConfig) {
6 6
 }
7 7
 
8 8
 func (n *network) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, rmService bool) {