Browse code

Vendoring in Libnetwork

This carries fixes for
- Internal racy /etc/hosts updates within container during SD
- Renable SD service record watch after cluster-store restarts
- Fix to allow remote IPAM driver to return no IP if the user prefers
- Fix to allow --fixed-cidr and --bip to be in same range

Signed-off-by: Madhu Venugopal <madhu@docker.com>

Madhu Venugopal authored on 2015/10/23 06:05:51
Showing 10 changed files
... ...
@@ -21,7 +21,7 @@ clone git github.com/vdemeester/shakers 3c10293ce22b900c27acad7b28656196fcc2f73b
21 21
 clone git golang.org/x/net 3cffabab72adf04f8e3b01c5baf775361837b5fe https://github.com/golang/net.git
22 22
 
23 23
 #get libnetwork packages
24
-clone git github.com/docker/libnetwork f3c8ebf46b890d4612c5d98e792280d13abdb761
24
+clone git github.com/docker/libnetwork bf041154d27ed34ed39722328c8f1b0144a56fe2
25 25
 clone git github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec
26 26
 clone git github.com/hashicorp/go-msgpack 71c2886f5a673a35f909803f38ece5810165097b
27 27
 clone git github.com/hashicorp/memberlist 9a1e242e454d2443df330bdd51a436d5a9058fc4
... ...
@@ -218,7 +218,14 @@ func (c *controller) initDiscovery(watcher discovery.Watcher) error {
218 218
 	}
219 219
 
220 220
 	c.discovery = hostdiscovery.NewHostDiscovery(watcher)
221
-	return c.discovery.Watch(c.hostJoinCallback, c.hostLeaveCallback)
221
+	return c.discovery.Watch(c.activeCallback, c.hostJoinCallback, c.hostLeaveCallback)
222
+}
223
+
224
+func (c *controller) activeCallback() {
225
+	ds := c.getStore(datastore.GlobalScope)
226
+	if ds != nil && !ds.Active() {
227
+		ds.RestartWatch()
228
+	}
222 229
 }
223 230
 
224 231
 func (c *controller) hostJoinCallback(nodes []net.IP) {
... ...
@@ -34,6 +34,10 @@ type DataStore interface {
34 34
 	Watchable() bool
35 35
 	// Watch for changes on a KVObject
36 36
 	Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KVObject, error)
37
+	// RestartWatch retriggers stopped Watches
38
+	RestartWatch()
39
+	// Active returns if the store is active
40
+	Active() bool
37 41
 	// List returns of a list of KVObjects belonging to the parent
38 42
 	// key. The caller must pass a KVObject of the same type as
39 43
 	// the objects that need to be listed
... ...
@@ -53,9 +57,11 @@ var (
53 53
 )
54 54
 
55 55
 type datastore struct {
56
-	scope string
57
-	store store.Store
58
-	cache *cache
56
+	scope   string
57
+	store   store.Store
58
+	cache   *cache
59
+	watchCh chan struct{}
60
+	active  bool
59 61
 	sync.Mutex
60 62
 }
61 63
 
... ...
@@ -204,7 +210,7 @@ func newClient(scope string, kv string, addr string, config *store.Config, cache
204 204
 		return nil, err
205 205
 	}
206 206
 
207
-	ds := &datastore{scope: scope, store: store}
207
+	ds := &datastore{scope: scope, store: store, active: true, watchCh: make(chan struct{})}
208 208
 	if cached {
209 209
 		ds.cache = newCache(ds)
210 210
 	}
... ...
@@ -239,6 +245,10 @@ func (ds *datastore) Scope() string {
239 239
 	return ds.scope
240 240
 }
241 241
 
242
+func (ds *datastore) Active() bool {
243
+	return ds.active
244
+}
245
+
242 246
 func (ds *datastore) Watchable() bool {
243 247
 	return ds.scope != LocalScope
244 248
 }
... ...
@@ -259,6 +269,15 @@ func (ds *datastore) Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KV
259 259
 	kvoCh := make(chan KVObject)
260 260
 
261 261
 	go func() {
262
+	retry_watch:
263
+		var err error
264
+
265
+		// Make sure to get a new instance of watch channel
266
+		ds.Lock()
267
+		watchCh := ds.watchCh
268
+		ds.Unlock()
269
+
270
+	loop:
262 271
 		for {
263 272
 			select {
264 273
 			case <-stopCh:
... ...
@@ -269,12 +288,15 @@ func (ds *datastore) Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KV
269 269
 				// for the watch can exit resulting in a nil value in
270 270
 				// channel.
271 271
 				if kvPair == nil {
272
-					close(sCh)
273
-					return
272
+					ds.Lock()
273
+					ds.active = false
274
+					ds.Unlock()
275
+					break loop
274 276
 				}
277
+
275 278
 				dstO := ctor.New()
276 279
 
277
-				if err := dstO.SetValue(kvPair.Value); err != nil {
280
+				if err = dstO.SetValue(kvPair.Value); err != nil {
278 281
 					log.Printf("Could not unmarshal kvpair value = %s", string(kvPair.Value))
279 282
 					break
280 283
 				}
... ...
@@ -283,11 +305,31 @@ func (ds *datastore) Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KV
283 283
 				kvoCh <- dstO
284 284
 			}
285 285
 		}
286
+
287
+		// Wait on watch channel for a re-trigger when datastore becomes active
288
+		<-watchCh
289
+
290
+		kvpCh, err = ds.store.Watch(Key(kvObject.Key()...), sCh)
291
+		if err != nil {
292
+			log.Printf("Could not watch the key %s in store: %v", Key(kvObject.Key()...), err)
293
+		}
294
+
295
+		goto retry_watch
286 296
 	}()
287 297
 
288 298
 	return kvoCh, nil
289 299
 }
290 300
 
301
+func (ds *datastore) RestartWatch() {
302
+	ds.Lock()
303
+	defer ds.Unlock()
304
+
305
+	ds.active = true
306
+	watchCh := ds.watchCh
307
+	ds.watchCh = make(chan struct{})
308
+	close(watchCh)
309
+}
310
+
291 311
 func (ds *datastore) KVStore() store.Store {
292 312
 	return ds.store
293 313
 }
... ...
@@ -7,6 +7,7 @@ import (
7 7
 	"io/ioutil"
8 8
 	"os"
9 9
 	"regexp"
10
+	"sync"
10 11
 )
11 12
 
12 13
 // Record Structure for a single host record
... ...
@@ -21,14 +22,47 @@ func (r Record) WriteTo(w io.Writer) (int64, error) {
21 21
 	return int64(n), err
22 22
 }
23 23
 
24
-// Default hosts config records slice
25
-var defaultContent = []Record{
26
-	{Hosts: "localhost", IP: "127.0.0.1"},
27
-	{Hosts: "localhost ip6-localhost ip6-loopback", IP: "::1"},
28
-	{Hosts: "ip6-localnet", IP: "fe00::0"},
29
-	{Hosts: "ip6-mcastprefix", IP: "ff00::0"},
30
-	{Hosts: "ip6-allnodes", IP: "ff02::1"},
31
-	{Hosts: "ip6-allrouters", IP: "ff02::2"},
24
+var (
25
+	// Default hosts config records slice
26
+	defaultContent = []Record{
27
+		{Hosts: "localhost", IP: "127.0.0.1"},
28
+		{Hosts: "localhost ip6-localhost ip6-loopback", IP: "::1"},
29
+		{Hosts: "ip6-localnet", IP: "fe00::0"},
30
+		{Hosts: "ip6-mcastprefix", IP: "ff00::0"},
31
+		{Hosts: "ip6-allnodes", IP: "ff02::1"},
32
+		{Hosts: "ip6-allrouters", IP: "ff02::2"},
33
+	}
34
+
35
+	// A cache of path level locks for synchronizing /etc/hosts
36
+	// updates on a file level
37
+	pathMap = make(map[string]*sync.Mutex)
38
+
39
+	// A package level mutex to synchronize the cache itself
40
+	pathMutex sync.Mutex
41
+)
42
+
43
+func pathLock(path string) func() {
44
+	pathMutex.Lock()
45
+	defer pathMutex.Unlock()
46
+
47
+	pl, ok := pathMap[path]
48
+	if !ok {
49
+		pl = &sync.Mutex{}
50
+		pathMap[path] = pl
51
+	}
52
+
53
+	pl.Lock()
54
+	return func() {
55
+		pl.Unlock()
56
+	}
57
+}
58
+
59
+// Drop drops the path string from the path cache
60
+func Drop(path string) {
61
+	pathMutex.Lock()
62
+	defer pathMutex.Unlock()
63
+
64
+	delete(pathMap, path)
32 65
 }
33 66
 
34 67
 // Build function
... ...
@@ -36,6 +70,8 @@ var defaultContent = []Record{
36 36
 // IP, hostname, and domainname set main record leave empty for no master record
37 37
 // extraContent is an array of extra host records.
38 38
 func Build(path, IP, hostname, domainname string, extraContent []Record) error {
39
+	defer pathLock(path)()
40
+
39 41
 	content := bytes.NewBuffer(nil)
40 42
 	if IP != "" {
41 43
 		//set main record
... ...
@@ -68,6 +104,8 @@ func Build(path, IP, hostname, domainname string, extraContent []Record) error {
68 68
 
69 69
 // Add adds an arbitrary number of Records to an already existing /etc/hosts file
70 70
 func Add(path string, recs []Record) error {
71
+	defer pathLock(path)()
72
+
71 73
 	if len(recs) == 0 {
72 74
 		return nil
73 75
 	}
... ...
@@ -95,6 +133,8 @@ func Add(path string, recs []Record) error {
95 95
 
96 96
 // Delete deletes an arbitrary number of Records already existing in /etc/hosts file
97 97
 func Delete(path string, recs []Record) error {
98
+	defer pathLock(path)()
99
+
98 100
 	if len(recs) == 0 {
99 101
 		return nil
100 102
 	}
... ...
@@ -118,6 +158,8 @@ func Delete(path string, recs []Record) error {
118 118
 // IP is new IP address
119 119
 // hostname is hostname to search for to replace IP
120 120
 func Update(path, IP, hostname string) error {
121
+	defer pathLock(path)()
122
+
121 123
 	old, err := ioutil.ReadFile(path)
122 124
 	if err != nil {
123 125
 		return err
... ...
@@ -34,7 +34,7 @@ func NewHostDiscovery(watcher discovery.Watcher) HostDiscovery {
34 34
 	return &hostDiscovery{watcher: watcher, nodes: mapset.NewSet(), stopChan: make(chan struct{})}
35 35
 }
36 36
 
37
-func (h *hostDiscovery) Watch(joinCallback JoinCallback, leaveCallback LeaveCallback) error {
37
+func (h *hostDiscovery) Watch(activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) error {
38 38
 	h.Lock()
39 39
 	d := h.watcher
40 40
 	h.Unlock()
... ...
@@ -42,15 +42,16 @@ func (h *hostDiscovery) Watch(joinCallback JoinCallback, leaveCallback LeaveCall
42 42
 		return types.BadRequestErrorf("invalid discovery watcher")
43 43
 	}
44 44
 	discoveryCh, errCh := d.Watch(h.stopChan)
45
-	go h.monitorDiscovery(discoveryCh, errCh, joinCallback, leaveCallback)
45
+	go h.monitorDiscovery(discoveryCh, errCh, activeCallback, joinCallback, leaveCallback)
46 46
 	return nil
47 47
 }
48 48
 
49
-func (h *hostDiscovery) monitorDiscovery(ch <-chan discovery.Entries, errCh <-chan error, joinCallback JoinCallback, leaveCallback LeaveCallback) {
49
+func (h *hostDiscovery) monitorDiscovery(ch <-chan discovery.Entries, errCh <-chan error,
50
+	activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) {
50 51
 	for {
51 52
 		select {
52 53
 		case entries := <-ch:
53
-			h.processCallback(entries, joinCallback, leaveCallback)
54
+			h.processCallback(entries, activeCallback, joinCallback, leaveCallback)
54 55
 		case err := <-errCh:
55 56
 			if err != nil {
56 57
 				log.Errorf("discovery error: %v", err)
... ...
@@ -71,7 +72,8 @@ func (h *hostDiscovery) StopDiscovery() error {
71 71
 	return nil
72 72
 }
73 73
 
74
-func (h *hostDiscovery) processCallback(entries discovery.Entries, joinCallback JoinCallback, leaveCallback LeaveCallback) {
74
+func (h *hostDiscovery) processCallback(entries discovery.Entries,
75
+	activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) {
75 76
 	updated := hosts(entries)
76 77
 	h.Lock()
77 78
 	existing := h.nodes
... ...
@@ -79,6 +81,7 @@ func (h *hostDiscovery) processCallback(entries discovery.Entries, joinCallback
79 79
 	h.nodes = updated
80 80
 	h.Unlock()
81 81
 
82
+	activeCallback()
82 83
 	if len(added) > 0 {
83 84
 		joinCallback(added)
84 85
 	}
... ...
@@ -5,13 +5,16 @@ import "net"
5 5
 // JoinCallback provides a callback event for new node joining the cluster
6 6
 type JoinCallback func(entries []net.IP)
7 7
 
8
+// ActiveCallback provides a callback event for active discovery event
9
+type ActiveCallback func()
10
+
8 11
 // LeaveCallback provides a callback event for node leaving the cluster
9 12
 type LeaveCallback func(entries []net.IP)
10 13
 
11 14
 // HostDiscovery primary interface
12 15
 type HostDiscovery interface {
13 16
 	//Watch Node join and leave cluster events
14
-	Watch(joinCallback JoinCallback, leaveCallback LeaveCallback) error
17
+	Watch(activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) error
15 18
 	// StopDiscovery stops the discovery perocess
16 19
 	StopDiscovery() error
17 20
 	// Fetch returns a list of host IPs that are currently discovered
... ...
@@ -250,11 +250,6 @@ func (a *Allocator) insertBitMask(key SubnetKey, pool *net.IPNet) error {
250 250
 	ones, bits := pool.Mask.Size()
251 251
 	numAddresses := uint64(1 << uint(bits-ones))
252 252
 
253
-	if ipVer == v4 {
254
-		// Do not let broadcast address be reserved
255
-		numAddresses--
256
-	}
257
-
258 253
 	// Allow /64 subnet
259 254
 	if ipVer == v6 && numAddresses == 0 {
260 255
 		numAddresses--
... ...
@@ -270,6 +265,11 @@ func (a *Allocator) insertBitMask(key SubnetKey, pool *net.IPNet) error {
270 270
 	// Do the same for IPv6 so that bridge ip starts with XXXX...::1
271 271
 	h.Set(0)
272 272
 
273
+	// Do not let broadcast address be reserved
274
+	if ipVer == v4 {
275
+		h.Set(numAddresses - 1)
276
+	}
277
+
273 278
 	a.Lock()
274 279
 	a.addresses[key] = h
275 280
 	a.Unlock()
... ...
@@ -78,7 +78,11 @@ func (a *allocator) ReleasePool(poolID string) error {
78 78
 
79 79
 // RequestAddress requests an address from the address pool
80 80
 func (a *allocator) RequestAddress(poolID string, address net.IP, options map[string]string) (*net.IPNet, map[string]string, error) {
81
-	var prefAddress string
81
+	var (
82
+		prefAddress string
83
+		retAddress  *net.IPNet
84
+		err         error
85
+	)
82 86
 	if address != nil {
83 87
 		prefAddress = address.String()
84 88
 	}
... ...
@@ -87,7 +91,9 @@ func (a *allocator) RequestAddress(poolID string, address net.IP, options map[st
87 87
 	if err := a.call("RequestAddress", req, res); err != nil {
88 88
 		return nil, nil, err
89 89
 	}
90
-	retAddress, err := types.ParseCIDR(res.Address)
90
+	if res.Address != "" {
91
+		retAddress, err = types.ParseCIDR(res.Address)
92
+	}
91 93
 	return retAddress, res.Data, err
92 94
 }
93 95
 
... ...
@@ -182,6 +182,10 @@ func (sb *sandbox) Delete() error {
182 182
 		}
183 183
 	}
184 184
 
185
+	// Container is going away. Path cache in etchosts is most
186
+	// likely not required any more. Drop it.
187
+	etchosts.Drop(sb.config.hostsPath)
188
+
185 189
 	if sb.osSbox != nil {
186 190
 		sb.osSbox.Destroy()
187 191
 	}
... ...
@@ -308,6 +308,11 @@ func (c *controller) processEndpointCreate(nmap map[string]*netWatch, ep *endpoi
308 308
 
309 309
 		c.Lock()
310 310
 		nw.localEps[ep.ID()] = ep
311
+
312
+		// If we had learned that from the kv store remove it
313
+		// from remote ep list now that we know that this is
314
+		// indeed a local endpoint
315
+		delete(nw.remoteEps, ep.ID())
311 316
 		c.Unlock()
312 317
 		return
313 318
 	}