Browse code

Before that change, etcd and zookeeper would fail to instantiate the discovery without the key being already there in the store or created beforehand and implicitely by a 'swarm join'.

Signed-off-by: Alexandre Beslic <abronan@docker.com>
Signed-off-by: Victor Vieux <vieux@docker.com>

This PR allows to configure the discovery path using the
--discovery-opt flag (with "kv.path=path/to/nodes"). We
can point to "docker/nodes" and use the docker discovery.

If docker instances are advertising to the cluster using
the `--cluster-advertise` flag, the swarm join command
becomes unnecessary.

Signed-off-by: Alexandre Beslic <abronan@docker.com>
Signed-off-by: Victor Vieux <vieux@docker.com>

Victor Vieux authored on 2016/01/06 11:41:51
Showing 3 changed files
... ...
@@ -603,6 +603,10 @@ The currently supported cluster store options are:
603 603
     private key is used as the client key for communication with the
604 604
     Key/Value store.
605 605
 
606
+*  `kv.path`
607
+
608
+    Specifies the path in the Key/Value store. If not configured, the default value is 'docker/nodes'.
609
+
606 610
 ## Access authorization
607 611
 
608 612
 Docker's access authorization can be extended by authorization plugins that your
... ...
@@ -17,7 +17,7 @@ import (
17 17
 )
18 18
 
19 19
 const (
20
-	discoveryPath = "docker/nodes"
20
+	defaultDiscoveryPath = "docker/nodes"
21 21
 )
22 22
 
23 23
 // Discovery is exported
... ...
@@ -62,7 +62,14 @@ func (s *Discovery) Initialize(uris string, heartbeat time.Duration, ttl time.Du
62 62
 
63 63
 	s.heartbeat = heartbeat
64 64
 	s.ttl = ttl
65
-	s.path = path.Join(s.prefix, discoveryPath)
65
+
66
+	// Use a custom path if specified in discovery options
67
+	dpath := defaultDiscoveryPath
68
+	if clusterOpts["kv.path"] != "" {
69
+		dpath = clusterOpts["kv.path"]
70
+	}
71
+
72
+	s.path = path.Join(s.prefix, dpath)
66 73
 
67 74
 	var config *store.Config
68 75
 	if clusterOpts["kv.cacertfile"] != "" && clusterOpts["kv.certfile"] != "" && clusterOpts["kv.keyfile"] != "" {
... ...
@@ -138,6 +145,17 @@ func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-c
138 138
 		// Forever: Create a store watch, watch until we get an error and then try again.
139 139
 		// Will only stop if we receive a stopCh request.
140 140
 		for {
141
+			// Create the path to watch if it does not exist yet
142
+			exists, err := s.store.Exists(s.path)
143
+			if err != nil {
144
+				errCh <- err
145
+			}
146
+			if !exists {
147
+				if err := s.store.Put(s.path, []byte(""), &store.WriteOptions{IsDir: true}); err != nil {
148
+					errCh <- err
149
+				}
150
+			}
151
+
141 152
 			// Set up a watch.
142 153
 			watchCh, err := s.store.WatchTree(s.path, stopCh)
143 154
 			if err != nil {
... ...
@@ -33,7 +33,7 @@ func (ds *DiscoverySuite) TestInitialize(c *check.C) {
33 33
 	s := d.store.(*FakeStore)
34 34
 	c.Assert(s.Endpoints, check.HasLen, 1)
35 35
 	c.Assert(s.Endpoints[0], check.Equals, "127.0.0.1")
36
-	c.Assert(d.path, check.Equals, discoveryPath)
36
+	c.Assert(d.path, check.Equals, defaultDiscoveryPath)
37 37
 
38 38
 	storeMock = &FakeStore{
39 39
 		Endpoints: []string{"127.0.0.1:1234"},
... ...
@@ -45,7 +45,7 @@ func (ds *DiscoverySuite) TestInitialize(c *check.C) {
45 45
 	s = d.store.(*FakeStore)
46 46
 	c.Assert(s.Endpoints, check.HasLen, 1)
47 47
 	c.Assert(s.Endpoints[0], check.Equals, "127.0.0.1:1234")
48
-	c.Assert(d.path, check.Equals, "path/"+discoveryPath)
48
+	c.Assert(d.path, check.Equals, "path/"+defaultDiscoveryPath)
49 49
 
50 50
 	storeMock = &FakeStore{
51 51
 		Endpoints: []string{"127.0.0.1:1234", "127.0.0.2:1234", "127.0.0.3:1234"},
... ...
@@ -60,7 +60,7 @@ func (ds *DiscoverySuite) TestInitialize(c *check.C) {
60 60
 	c.Assert(s.Endpoints[1], check.Equals, "127.0.0.2:1234")
61 61
 	c.Assert(s.Endpoints[2], check.Equals, "127.0.0.3:1234")
62 62
 
63
-	c.Assert(d.path, check.Equals, "path/"+discoveryPath)
63
+	c.Assert(d.path, check.Equals, "path/"+defaultDiscoveryPath)
64 64
 }
65 65
 
66 66
 // Extremely limited mock store so we can test initialization
... ...
@@ -224,8 +224,8 @@ func (ds *DiscoverySuite) TestWatch(c *check.C) {
224 224
 		&discovery.Entry{Host: "2.2.2.2", Port: "2222"},
225 225
 	}
226 226
 	kvs := []*store.KVPair{
227
-		{Key: path.Join("path", discoveryPath, "1.1.1.1"), Value: []byte("1.1.1.1:1111")},
228
-		{Key: path.Join("path", discoveryPath, "2.2.2.2"), Value: []byte("2.2.2.2:2222")},
227
+		{Key: path.Join("path", defaultDiscoveryPath, "1.1.1.1"), Value: []byte("1.1.1.1:1111")},
228
+		{Key: path.Join("path", defaultDiscoveryPath, "2.2.2.2"), Value: []byte("2.2.2.2:2222")},
229 229
 	}
230 230
 
231 231
 	stopCh := make(chan struct{})
... ...
@@ -245,7 +245,7 @@ func (ds *DiscoverySuite) TestWatch(c *check.C) {
245 245
 
246 246
 	// Add a new entry.
247 247
 	expected = append(expected, &discovery.Entry{Host: "3.3.3.3", Port: "3333"})
248
-	kvs = append(kvs, &store.KVPair{Key: path.Join("path", discoveryPath, "3.3.3.3"), Value: []byte("3.3.3.3:3333")})
248
+	kvs = append(kvs, &store.KVPair{Key: path.Join("path", defaultDiscoveryPath, "3.3.3.3"), Value: []byte("3.3.3.3:3333")})
249 249
 	mockCh <- kvs
250 250
 	c.Assert(<-ch, check.DeepEquals, expected)
251 251