Browse code

Vendor Libnetwork v0.7.0-rc.1

- Fixes https://github.com/docker/libnetwork/issues/1051
- Fixes https://github.com/docker/libnetwork/issues/985
- Fixes https://github.com/docker/libnetwork/issues/945
- Log time taken to set sandbox key
- Limit number of concurrent DNS queries

Signed-off-by: Madhu Venugopal <madhu@docker.com>

Madhu Venugopal authored on 2016/03/31 10:12:23
Showing 11 changed files
... ...
@@ -29,7 +29,7 @@ clone git github.com/RackSec/srslog 259aed10dfa74ea2961eddd1d9847619f6e98837
29 29
 clone git github.com/imdario/mergo 0.2.1
30 30
 
31 31
 #get libnetwork packages
32
-clone git github.com/docker/libnetwork v0.7.0-dev.10
32
+clone git github.com/docker/libnetwork v0.7.0-rc.1
33 33
 clone git github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec
34 34
 clone git github.com/hashicorp/go-msgpack 71c2886f5a673a35f909803f38ece5810165097b
35 35
 clone git github.com/hashicorp/memberlist 9a1e242e454d2443df330bdd51a436d5a9058fc4
... ...
@@ -1,5 +1,11 @@
1 1
 # Changelog
2 2
 
3
+## 0.7.0-rc.1 (2016-03-30)
4
+- Fixes https://github.com/docker/libnetwork/issues/985
5
+- Fixes https://github.com/docker/libnetwork/issues/945
6
+- Log time taken to set sandbox key
7
+- Limit number of concurrent DNS queries
8
+
3 9
 ## 0.7.0-dev.10 (2016-03-21)
4 10
 - Add IPv6 service discovery (AAAA records) in embedded DNS server
5 11
 - Honor enableIPv6 flag in network create for the IP allocation
... ...
@@ -218,6 +218,9 @@ func (c *controller) ReloadConfiguration(cfgOptions ...config.Option) error {
218 218
 				return types.ForbiddenErrorf("cannot accept new configuration because it modifies an existing datastore client")
219 219
 			}
220 220
 		} else {
221
+			if err := c.initScopedStore(s, nSCfg); err != nil {
222
+				return err
223
+			}
221 224
 			update = true
222 225
 		}
223 226
 	}
... ...
@@ -229,10 +232,6 @@ func (c *controller) ReloadConfiguration(cfgOptions ...config.Option) error {
229 229
 	c.cfg = cfg
230 230
 	c.Unlock()
231 231
 
232
-	if err := c.initStores(); err != nil {
233
-		return err
234
-	}
235
-
236 232
 	if c.discovery == nil && c.cfg.Cluster.Watcher != nil {
237 233
 		if err := c.initDiscovery(c.cfg.Cluster.Watcher); err != nil {
238 234
 			log.Errorf("Failed to Initialize Discovery after configuration update: %v", err)
... ...
@@ -129,8 +129,8 @@ func (d *driver) Leave(nid, eid string) error {
129 129
 	if d.notifyCh != nil {
130 130
 		d.notifyCh <- ovNotify{
131 131
 			action: "leave",
132
-			nid:    nid,
133
-			eid:    eid,
132
+			nw:     n,
133
+			ep:     ep,
134 134
 		}
135 135
 	}
136 136
 
... ...
@@ -149,9 +149,9 @@ func (n *network) joinSubnetSandbox(s *subnet) error {
149 149
 
150 150
 func (n *network) leaveSandbox() {
151 151
 	n.Lock()
152
+	defer n.Unlock()
152 153
 	n.joinCnt--
153 154
 	if n.joinCnt != 0 {
154
-		n.Unlock()
155 155
 		return
156 156
 	}
157 157
 
... ...
@@ -162,15 +162,14 @@ func (n *network) leaveSandbox() {
162 162
 	for _, s := range n.subnets {
163 163
 		s.once = &sync.Once{}
164 164
 	}
165
-	n.Unlock()
166 165
 
167 166
 	n.destroySandbox()
168 167
 }
169 168
 
169
+// to be called while holding network lock
170 170
 func (n *network) destroySandbox() {
171
-	sbox := n.sandbox()
172
-	if sbox != nil {
173
-		for _, iface := range sbox.Info().Interfaces() {
171
+	if n.sbox != nil {
172
+		for _, iface := range n.sbox.Info().Interfaces() {
174 173
 			if err := iface.Remove(); err != nil {
175 174
 				logrus.Debugf("Remove interface %s failed: %v", iface.SrcName(), err)
176 175
 			}
... ...
@@ -197,8 +196,8 @@ func (n *network) destroySandbox() {
197 197
 			}
198 198
 		}
199 199
 
200
-		sbox.Destroy()
201
-		n.setSandbox(nil)
200
+		n.sbox.Destroy()
201
+		n.sbox = nil
202 202
 	}
203 203
 }
204 204
 
... ...
@@ -12,8 +12,8 @@ import (
12 12
 
13 13
 type ovNotify struct {
14 14
 	action string
15
-	eid    string
16
-	nid    string
15
+	ep     *endpoint
16
+	nw     *network
17 17
 }
18 18
 
19 19
 type logWriter struct{}
... ...
@@ -81,13 +81,12 @@ func (d *driver) serfJoin(neighIP string) error {
81 81
 }
82 82
 
83 83
 func (d *driver) notifyEvent(event ovNotify) {
84
-	n := d.network(event.nid)
85
-	ep := n.endpoint(event.eid)
84
+	ep := event.ep
86 85
 
87 86
 	ePayload := fmt.Sprintf("%s %s %s %s", event.action, ep.addr.IP.String(),
88 87
 		net.IP(ep.addr.Mask).String(), ep.mac.String())
89 88
 	eName := fmt.Sprintf("jl %s %s %s", d.serfInstance.LocalMember().Addr.String(),
90
-		event.nid, event.eid)
89
+		event.nw.id, ep.id)
91 90
 
92 91
 	if err := d.serfInstance.UserEvent(eName, []byte(ePayload), true); err != nil {
93 92
 		logrus.Errorf("Sending user event failed: %v\n", err)
... ...
@@ -180,13 +180,24 @@ func (d *driver) nodeJoin(node string, self bool) {
180 180
 }
181 181
 
182 182
 func (d *driver) pushLocalEndpointEvent(action, nid, eid string) {
183
+	n := d.network(nid)
184
+	if n == nil {
185
+		logrus.Debugf("Error pushing local endpoint event for network %s", nid)
186
+		return
187
+	}
188
+	ep := n.endpoint(eid)
189
+	if ep == nil {
190
+		logrus.Debugf("Error pushing local endpoint event for ep %s / %s", nid, eid)
191
+		return
192
+	}
193
+
183 194
 	if !d.isSerfAlive() {
184 195
 		return
185 196
 	}
186 197
 	d.notifyCh <- ovNotify{
187 198
 		action: "join",
188
-		nid:    nid,
189
-		eid:    eid,
199
+		nw:     n,
200
+		ep:     ep,
190 201
 	}
191 202
 }
192 203
 
... ...
@@ -547,6 +547,10 @@ func (ep *endpoint) Leave(sbox Sandbox, options ...EndpointOption) error {
547 547
 	sb.joinLeaveStart()
548 548
 	defer sb.joinLeaveEnd()
549 549
 
550
+	if sb.resolver != nil {
551
+		sb.resolver.FlushExtServers()
552
+	}
553
+
550 554
 	return ep.sbLeave(sb, false, options...)
551 555
 }
552 556
 
... ...
@@ -28,8 +28,12 @@ type Resolver interface {
28 28
 	// NameServer() returns the IP of the DNS resolver for the
29 29
 	// containers.
30 30
 	NameServer() string
31
-	// To configure external name servers the resolver should use
31
+	// SetExtServers configures the external nameservers the resolver
32
+	// should use to forward queries
32 33
 	SetExtServers([]string)
34
+	// FlushExtServers clears the cached UDP connections to external
35
+	// nameservers
36
+	FlushExtServers()
33 37
 	// ResolverOptions returns resolv.conf options that should be set
34 38
 	ResolverOptions() []string
35 39
 }
... ...
@@ -43,6 +47,8 @@ const (
43 43
 	maxExtDNS       = 3 //max number of external servers to try
44 44
 	extIOTimeout    = 3 * time.Second
45 45
 	defaultRespSize = 512
46
+	maxConcurrent   = 50
47
+	logInterval     = 2 * time.Second
46 48
 )
47 49
 
48 50
 type extDNSEntry struct {
... ...
@@ -60,6 +66,9 @@ type resolver struct {
60 60
 	tcpServer  *dns.Server
61 61
 	tcpListen  *net.TCPListener
62 62
 	err        error
63
+	count      int32
64
+	tStamp     time.Time
65
+	queryLock  sync.Mutex
63 66
 }
64 67
 
65 68
 func init() {
... ...
@@ -139,11 +148,15 @@ func (r *resolver) Start() error {
139 139
 	return nil
140 140
 }
141 141
 
142
-func (r *resolver) Stop() {
142
+func (r *resolver) FlushExtServers() {
143 143
 	for i := 0; i < maxExtDNS; i++ {
144 144
 		r.extDNSList[i].extConn = nil
145 145
 		r.extDNSList[i].extOnce = sync.Once{}
146 146
 	}
147
+}
148
+
149
+func (r *resolver) Stop() {
150
+	r.FlushExtServers()
147 151
 
148 152
 	if r.server != nil {
149 153
 		r.server.Shutdown()
... ...
@@ -154,6 +167,9 @@ func (r *resolver) Stop() {
154 154
 	r.conn = nil
155 155
 	r.tcpServer = nil
156 156
 	r.err = fmt.Errorf("setup not done yet")
157
+	r.tStamp = time.Time{}
158
+	r.count = 0
159
+	r.queryLock = sync.Mutex{}
157 160
 }
158 161
 
159 162
 func (r *resolver) SetExtServers(dns []string) {
... ...
@@ -320,7 +336,8 @@ func (r *resolver) ServeDNS(w dns.ResponseWriter, query *dns.Msg) {
320 320
 			if extDNS.ipStr == "" {
321 321
 				break
322 322
 			}
323
-			log.Debugf("Querying ext dns %s:%s for %s[%d]", proto, extDNS.ipStr, name, query.Question[0].Qtype)
323
+			log.Debugf("Query %s[%d] from %s, forwarding to %s:%s", name, query.Question[0].Qtype,
324
+				w.LocalAddr().String(), proto, extDNS.ipStr)
324 325
 
325 326
 			extConnect := func() {
326 327
 				addr := fmt.Sprintf("%s:%d", extDNS.ipStr, 53)
... ...
@@ -358,6 +375,15 @@ func (r *resolver) ServeDNS(w dns.ResponseWriter, query *dns.Msg) {
358 358
 			extConn.SetDeadline(time.Now().Add(extIOTimeout))
359 359
 			co := &dns.Conn{Conn: extConn}
360 360
 
361
+			if r.concurrentQueryInc() == false {
362
+				old := r.tStamp
363
+				r.tStamp = time.Now()
364
+				if r.tStamp.Sub(old) > logInterval {
365
+					log.Errorf("More than %v concurrent queries from %s", maxConcurrent, w.LocalAddr().String())
366
+				}
367
+				continue
368
+			}
369
+
361 370
 			defer func() {
362 371
 				if proto == "tcp" {
363 372
 					co.Close()
... ...
@@ -365,11 +391,13 @@ func (r *resolver) ServeDNS(w dns.ResponseWriter, query *dns.Msg) {
365 365
 			}()
366 366
 			err = co.WriteMsg(query)
367 367
 			if err != nil {
368
+				r.concurrentQueryDec()
368 369
 				log.Debugf("Send to DNS server failed, %s", err)
369 370
 				continue
370 371
 			}
371 372
 
372 373
 			resp, err = co.ReadMsg()
374
+			r.concurrentQueryDec()
373 375
 			if err != nil {
374 376
 				log.Debugf("Read from DNS server failed, %s", err)
375 377
 				continue
... ...
@@ -389,3 +417,23 @@ func (r *resolver) ServeDNS(w dns.ResponseWriter, query *dns.Msg) {
389 389
 		log.Errorf("error writing resolver resp, %s", err)
390 390
 	}
391 391
 }
392
+
393
+func (r *resolver) concurrentQueryInc() bool {
394
+	r.queryLock.Lock()
395
+	defer r.queryLock.Unlock()
396
+	if r.count == maxConcurrent {
397
+		return false
398
+	}
399
+	r.count++
400
+	return true
401
+}
402
+
403
+func (r *resolver) concurrentQueryDec() bool {
404
+	r.queryLock.Lock()
405
+	defer r.queryLock.Unlock()
406
+	if r.count == 0 {
407
+		return false
408
+	}
409
+	r.count--
410
+	return true
411
+}
... ...
@@ -7,6 +7,7 @@ import (
7 7
 	"net"
8 8
 	"strings"
9 9
 	"sync"
10
+	"time"
10 11
 
11 12
 	log "github.com/Sirupsen/logrus"
12 13
 	"github.com/docker/libnetwork/etchosts"
... ...
@@ -536,6 +537,11 @@ func (sb *sandbox) resolveName(req string, networkName string, epList []*endpoin
536 536
 }
537 537
 
538 538
 func (sb *sandbox) SetKey(basePath string) error {
539
+	start := time.Now()
540
+	defer func() {
541
+		log.Debugf("sandbox set key processing took %s for container %s", time.Now().Sub(start), sb.ContainerID())
542
+	}()
543
+
539 544
 	if basePath == "" {
540 545
 		return types.BadRequestErrorf("invalid sandbox key")
541 546
 	}
... ...
@@ -7,6 +7,18 @@ import (
7 7
 	"github.com/docker/libnetwork/datastore"
8 8
 )
9 9
 
10
+func (c *controller) initScopedStore(scope string, scfg *datastore.ScopeCfg) error {
11
+	store, err := datastore.NewDataStore(scope, scfg)
12
+	if err != nil {
13
+		return err
14
+	}
15
+	c.Lock()
16
+	c.stores = append(c.stores, store)
17
+	c.Unlock()
18
+
19
+	return nil
20
+}
21
+
10 22
 func (c *controller) initStores() error {
11 23
 	c.Lock()
12 24
 	if c.cfg == nil {
... ...
@@ -18,13 +30,9 @@ func (c *controller) initStores() error {
18 18
 	c.Unlock()
19 19
 
20 20
 	for scope, scfg := range scopeConfigs {
21
-		store, err := datastore.NewDataStore(scope, scfg)
22
-		if err != nil {
21
+		if err := c.initScopedStore(scope, scfg); err != nil {
23 22
 			return err
24 23
 		}
25
-		c.Lock()
26
-		c.stores = append(c.stores, store)
27
-		c.Unlock()
28 24
 	}
29 25
 
30 26
 	c.startWatch()