Browse code

Vendoring in libnetwork with fixes for issues identified in RC1 & RC2

Signed-off-by: Madhu Venugopal <madhu@docker.com>

Madhu Venugopal authored on 2015/10/27 05:56:56
Showing 10 changed files
... ...
@@ -21,7 +21,7 @@ clone git github.com/vdemeester/shakers 3c10293ce22b900c27acad7b28656196fcc2f73b
21 21
 clone git golang.org/x/net 3cffabab72adf04f8e3b01c5baf775361837b5fe https://github.com/golang/net.git
22 22
 
23 23
 #get libnetwork packages
24
-clone git github.com/docker/libnetwork bf041154d27ed34ed39722328c8f1b0144a56fe2
24
+clone git github.com/docker/libnetwork c92c21bca42a3581fd108d3222848faa16372249
25 25
 clone git github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec
26 26
 clone git github.com/hashicorp/go-msgpack 71c2886f5a673a35f909803f38ece5810165097b
27 27
 clone git github.com/hashicorp/memberlist 9a1e242e454d2443df330bdd51a436d5a9058fc4
... ...
@@ -178,7 +178,7 @@ func New(cfgOptions ...config.Option) (NetworkController, error) {
178 178
 		if err := c.initDiscovery(cfg.Cluster.Watcher); err != nil {
179 179
 			// Failing to initalize discovery is a bad situation to be in.
180 180
 			// But it cannot fail creating the Controller
181
-			log.Debugf("Failed to Initialize Discovery : %v", err)
181
+			log.Errorf("Failed to Initialize Discovery : %v", err)
182 182
 		}
183 183
 	}
184 184
 
... ...
@@ -133,7 +133,7 @@ func makeDefaultScopes() map[string]*ScopeCfg {
133 133
 	def := make(map[string]*ScopeCfg)
134 134
 	def[LocalScope] = &ScopeCfg{
135 135
 		Client: ScopeClientCfg{
136
-			Provider: "boltdb",
136
+			Provider: string(store.BOLTDB),
137 137
 			Address:  defaultPrefix + "/local-kv.db",
138 138
 			Config: &store.Config{
139 139
 				Bucket: "libnetwork",
... ...
@@ -144,7 +144,8 @@ func makeDefaultScopes() map[string]*ScopeCfg {
144 144
 	return def
145 145
 }
146 146
 
147
-var rootChain = []string{"docker", "network", "v1.0"}
147
+var defaultRootChain = []string{"docker", "network", "v1.0"}
148
+var rootChain = defaultRootChain
148 149
 
149 150
 func init() {
150 151
 	consul.Register()
... ...
@@ -195,6 +196,7 @@ func ParseKey(key string) ([]string, error) {
195 195
 
196 196
 // newClient used to connect to KV Store
197 197
 func newClient(scope string, kv string, addr string, config *store.Config, cached bool) (DataStore, error) {
198
+
198 199
 	if cached && scope != LocalScope {
199 200
 		return nil, fmt.Errorf("caching supported only for scope %s", LocalScope)
200 201
 	}
... ...
@@ -203,7 +205,21 @@ func newClient(scope string, kv string, addr string, config *store.Config, cache
203 203
 		config = &store.Config{}
204 204
 	}
205 205
 
206
-	addrs := strings.Split(addr, ",")
206
+	var addrs []string
207
+
208
+	if kv == string(store.BOLTDB) {
209
+		// Parse file path
210
+		addrs = strings.Split(addr, ",")
211
+	} else {
212
+		// Parse URI
213
+		parts := strings.SplitN(addr, "/", 2)
214
+		addrs = strings.Split(parts[0], ",")
215
+
216
+		// Add the custom prefix to the root chain
217
+		if len(parts) == 2 {
218
+			rootChain = append([]string{parts[1]}, defaultRootChain...)
219
+		}
220
+	}
207 221
 
208 222
 	store, err := libkv.NewStore(store.Backend(kv), addrs, config)
209 223
 	if err != nil {
... ...
@@ -58,6 +58,8 @@ func (sb *sandbox) setupDefaultGW(srcEp *endpoint) error {
58 58
 		}
59 59
 	}
60 60
 
61
+	createOptions = append(createOptions, CreateOptionAnonymous())
62
+
61 63
 	eplen := gwEPlen
62 64
 	if len(sb.containerID) < gwEPlen {
63 65
 		eplen = len(sb.containerID)
... ...
@@ -118,10 +118,12 @@ func (d *driver) Leave(nid, eid string) error {
118 118
 		return fmt.Errorf("could not find network with id %s", nid)
119 119
 	}
120 120
 
121
-	d.notifyCh <- ovNotify{
122
-		action: "leave",
123
-		nid:    nid,
124
-		eid:    eid,
121
+	if d.notifyCh != nil {
122
+		d.notifyCh <- ovNotify{
123
+			action: "leave",
124
+			nid:    nid,
125
+			eid:    eid,
126
+		}
125 127
 	}
126 128
 
127 129
 	n.leaveSandbox()
... ...
@@ -2,6 +2,7 @@ package overlay
2 2
 
3 3
 import (
4 4
 	"fmt"
5
+	"net"
5 6
 	"sync"
6 7
 
7 8
 	"github.com/Sirupsen/logrus"
... ...
@@ -120,8 +121,30 @@ func (d *driver) Type() string {
120 120
 	return networkType
121 121
 }
122 122
 
123
+func validateSelf(node string) error {
124
+	advIP := net.ParseIP(node)
125
+	if advIP == nil {
126
+		return fmt.Errorf("invalid self address (%s)", node)
127
+	}
128
+
129
+	addrs, err := net.InterfaceAddrs()
130
+	if err != nil {
131
+		return fmt.Errorf("Unable to get interface addresses %v", err)
132
+	}
133
+	for _, addr := range addrs {
134
+		ip, _, err := net.ParseCIDR(addr.String())
135
+		if err == nil && ip.Equal(advIP) {
136
+			return nil
137
+		}
138
+	}
139
+	return fmt.Errorf("Multi-Host overlay networking requires cluster-advertise(%s) to be configured with a local ip-address that is reachable within the cluster", advIP.String())
140
+}
141
+
123 142
 func (d *driver) nodeJoin(node string, self bool) {
124 143
 	if self && !d.isSerfAlive() {
144
+		if err := validateSelf(node); err != nil {
145
+			logrus.Errorf("%s", err.Error())
146
+		}
125 147
 		d.Lock()
126 148
 		d.bindAddress = node
127 149
 		d.Unlock()
... ...
@@ -106,6 +106,51 @@ func (ep *endpoint) UnmarshalJSON(b []byte) (err error) {
106 106
 
107 107
 	if v, ok := epMap["generic"]; ok {
108 108
 		ep.generic = v.(map[string]interface{})
109
+
110
+		if opt, ok := ep.generic[netlabel.PortMap]; ok {
111
+			pblist := []types.PortBinding{}
112
+
113
+			for i := 0; i < len(opt.([]interface{})); i++ {
114
+				pb := types.PortBinding{}
115
+				tmp := opt.([]interface{})[i].(map[string]interface{})
116
+
117
+				bytes, err := json.Marshal(tmp)
118
+				if err != nil {
119
+					log.Error(err)
120
+					break
121
+				}
122
+				err = json.Unmarshal(bytes, &pb)
123
+				if err != nil {
124
+					log.Error(err)
125
+					break
126
+				}
127
+				pblist = append(pblist, pb)
128
+			}
129
+			ep.generic[netlabel.PortMap] = pblist
130
+		}
131
+
132
+		if opt, ok := ep.generic[netlabel.ExposedPorts]; ok {
133
+			tplist := []types.TransportPort{}
134
+
135
+			for i := 0; i < len(opt.([]interface{})); i++ {
136
+				tp := types.TransportPort{}
137
+				tmp := opt.([]interface{})[i].(map[string]interface{})
138
+
139
+				bytes, err := json.Marshal(tmp)
140
+				if err != nil {
141
+					log.Error(err)
142
+					break
143
+				}
144
+				err = json.Unmarshal(bytes, &tp)
145
+				if err != nil {
146
+					log.Error(err)
147
+					break
148
+				}
149
+				tplist = append(tplist, tp)
150
+			}
151
+			ep.generic[netlabel.ExposedPorts] = tplist
152
+
153
+		}
109 154
 	}
110 155
 
111 156
 	if v, ok := epMap["anonymous"]; ok {
... ...
@@ -345,7 +390,7 @@ func (ep *endpoint) sbJoin(sbox Sandbox, options ...EndpointOption) error {
345 345
 	if ip := ep.getFirstInterfaceAddress(); ip != nil {
346 346
 		address = ip.String()
347 347
 	}
348
-	if err = sb.updateHostsFile(address, network.getSvcRecords()); err != nil {
348
+	if err = sb.updateHostsFile(address, network.getSvcRecords(ep)); err != nil {
349 349
 		return err
350 350
 	}
351 351
 
... ...
@@ -467,8 +512,7 @@ func (ep *endpoint) sbLeave(sbox Sandbox, options ...EndpointOption) error {
467 467
 		return err
468 468
 	}
469 469
 
470
-	// unwatch for service records
471
-	n.getController().unWatchSvcRecord(ep)
470
+	sb.deleteHostsEntries(n.getSvcRecords(ep))
472 471
 
473 472
 	if sb.needDefaultGW() {
474 473
 		ep := sb.getEPwithoutGateway()
... ...
@@ -501,29 +545,32 @@ func (ep *endpoint) Delete() error {
501 501
 	}
502 502
 	ep.Unlock()
503 503
 
504
-	if err = n.getEpCnt().DecEndpointCnt(); err != nil {
504
+	if err = n.getController().deleteFromStore(ep); err != nil {
505 505
 		return err
506 506
 	}
507 507
 	defer func() {
508 508
 		if err != nil {
509
-			if e := n.getEpCnt().IncEndpointCnt(); e != nil {
510
-				log.Warnf("failed to update network %s : %v", n.name, e)
509
+			ep.dbExists = false
510
+			if e := n.getController().updateToStore(ep); e != nil {
511
+				log.Warnf("failed to recreate endpoint in store %s : %v", name, e)
511 512
 			}
512 513
 		}
513 514
 	}()
514 515
 
515
-	if err = n.getController().deleteFromStore(ep); err != nil {
516
+	if err = n.getEpCnt().DecEndpointCnt(); err != nil {
516 517
 		return err
517 518
 	}
518 519
 	defer func() {
519 520
 		if err != nil {
520
-			ep.dbExists = false
521
-			if e := n.getController().updateToStore(ep); e != nil {
522
-				log.Warnf("failed to recreate endpoint in store %s : %v", name, e)
521
+			if e := n.getEpCnt().IncEndpointCnt(); e != nil {
522
+				log.Warnf("failed to update network %s : %v", n.name, e)
523 523
 			}
524 524
 		}
525 525
 	}()
526 526
 
527
+	// unwatch for service records
528
+	n.getController().unWatchSvcRecord(ep)
529
+
527 530
 	if err = ep.deleteEndpoint(); err != nil {
528 531
 		return err
529 532
 	}
... ...
@@ -84,10 +84,6 @@ func (a *Allocator) refresh(as string) error {
84 84
 		return nil
85 85
 	}
86 86
 
87
-	if err := a.updateBitMasks(aSpace); err != nil {
88
-		return fmt.Errorf("error updating bit masks during init: %v", err)
89
-	}
90
-
91 87
 	a.Lock()
92 88
 	a.addrSpaces[as] = aSpace
93 89
 	a.Unlock()
... ...
@@ -5,6 +5,7 @@ import (
5 5
 	"fmt"
6 6
 	"net"
7 7
 	"strconv"
8
+	"strings"
8 9
 	"sync"
9 10
 
10 11
 	log "github.com/Sirupsen/logrus"
... ...
@@ -685,6 +686,14 @@ func (n *network) CreateEndpoint(name string, options ...EndpointOption) (Endpoi
685 685
 		}
686 686
 	}()
687 687
 
688
+	// Watch for service records
689
+	n.getController().watchSvcRecord(ep)
690
+	defer func() {
691
+		if err != nil {
692
+			n.getController().unWatchSvcRecord(ep)
693
+		}
694
+	}()
695
+
688 696
 	// Increment endpoint count to indicate completion of endpoint addition
689 697
 	if err = n.getEpCnt().IncEndpointCnt(); err != nil {
690 698
 		return nil, err
... ...
@@ -768,6 +777,12 @@ func (n *network) updateSvcRecord(ep *endpoint, localEps []*endpoint, isAdd bool
768 768
 	var recs []etchosts.Record
769 769
 	if iface := ep.Iface(); iface.Address() != nil {
770 770
 		if isAdd {
771
+			// If we already have this endpoint in service db just return
772
+			if _, ok := sr[ep.Name()]; ok {
773
+				n.Unlock()
774
+				return
775
+			}
776
+
771 777
 			sr[ep.Name()] = iface.Address().IP
772 778
 			sr[ep.Name()+"."+n.name] = iface.Address().IP
773 779
 		} else {
... ...
@@ -793,8 +808,12 @@ func (n *network) updateSvcRecord(ep *endpoint, localEps []*endpoint, isAdd bool
793 793
 	}
794 794
 
795 795
 	var sbList []*sandbox
796
-	for _, ep := range localEps {
797
-		if sb, hasSandbox := ep.getSandbox(); hasSandbox {
796
+	for _, lEp := range localEps {
797
+		if ep.ID() == lEp.ID() {
798
+			continue
799
+		}
800
+
801
+		if sb, hasSandbox := lEp.getSandbox(); hasSandbox {
798 802
 			sbList = append(sbList, sb)
799 803
 		}
800 804
 	}
... ...
@@ -808,7 +827,7 @@ func (n *network) updateSvcRecord(ep *endpoint, localEps []*endpoint, isAdd bool
808 808
 	}
809 809
 }
810 810
 
811
-func (n *network) getSvcRecords() []etchosts.Record {
811
+func (n *network) getSvcRecords(ep *endpoint) []etchosts.Record {
812 812
 	n.Lock()
813 813
 	defer n.Unlock()
814 814
 
... ...
@@ -816,6 +835,10 @@ func (n *network) getSvcRecords() []etchosts.Record {
816 816
 	sr, _ := n.ctrlr.svcDb[n.id]
817 817
 
818 818
 	for h, ip := range sr {
819
+		if ep != nil && strings.Split(h, ".")[0] == ep.Name() {
820
+			continue
821
+		}
822
+
819 823
 		recs = append(recs, etchosts.Record{
820 824
 			Hosts: h,
821 825
 			IP:    ip.String(),
... ...
@@ -367,6 +367,11 @@ func (c *controller) processEndpointDelete(nmap map[string]*netWatch, ep *endpoi
367 367
 		c.Lock()
368 368
 		if len(nw.localEps) == 0 {
369 369
 			close(nw.stopCh)
370
+
371
+			// This is the last container going away for the network. Destroy
372
+			// this network's svc db entry
373
+			delete(c.svcDb, ep.getNetwork().ID())
374
+
370 375
 			delete(nmap, ep.getNetwork().ID())
371 376
 		}
372 377
 	}