Browse code

Add `pkg/discovery` for nodes discovery

Absorb Swarm's discovery package in order to provide a common node
discovery mechanism to be used by both Swarm and networking code.

Signed-off-by: Arnaud Porterie <arnaud.porterie@docker.com>

Arnaud Porterie authored on 2015/09/01 05:23:17
Showing 14 changed files
... ...
@@ -16,6 +16,7 @@ clone git github.com/kr/pty 5cf931ef8f
16 16
 clone git github.com/mattn/go-sqlite3 v1.1.0
17 17
 clone git github.com/microsoft/hcsshim 7f646aa6b26bcf90caee91e93cde4a80d0d8a83e
18 18
 clone git github.com/mistifyio/go-zfs v2.1.1
19
+clone git github.com/stretchr/testify 7c2b1e5640dcf2631213ca962d892bffa1e08860
19 20
 clone git github.com/tchap/go-patricia v2.1.0
20 21
 clone git golang.org/x/net 3cffabab72adf04f8e3b01c5baf775361837b5fe https://github.com/golang/net.git
21 22
 
22 23
new file mode 100644
... ...
@@ -0,0 +1,41 @@
0
+---
1
+page_title: Docker discovery
2
+page_description: discovery
3
+page_keywords: docker, clustering, discovery
4
+---
5
+
6
+# Discovery
7
+
8
+Docker comes with multiple Discovery backends.
9
+
10
+## Backends
11
+
12
+### Using etcd
13
+
14
+Point your Docker Engine instances to a common etcd instance. You can specify
15
+the address Docker uses to advertise the node using the `--discovery-address`
16
+flag.
17
+
18
+```bash
19
+$ docker daemon -H=<node_ip:2376> --discovery-address=<node_ip:2376> --discovery-backend etcd://<etcd_ip>/<path>
20
+```
21
+
22
+### Using consul
23
+
24
+Point your Docker Engine instances to a common Consul instance. You can specify
25
+the address Docker uses to advertise the node using the `--discovery-address`
26
+flag.
27
+
28
+```bash
29
+$ docker daemon -H=<node_ip:2376> --discovery-address=<node_ip:2376> --discovery-backend consul://<consul_ip>/<path>
30
+```
31
+
32
+### Using zookeeper
33
+
34
+Point your Docker Engine instances to a common Zookeeper instance. You can specify
35
+the address Docker uses to advertise the node using the `--discovery-address`
36
+flag.
37
+
38
+```bash
39
+$ docker daemon -H=<node_ip:2376> --discovery-address=<node_ip:2376> --discovery-backend zk://<zk_addr1>,<zk_addr2>>/<path>
40
+```
0 41
new file mode 100644
... ...
@@ -0,0 +1,53 @@
0
+package discovery
1
+
2
+import (
3
+	"fmt"
4
+	"strings"
5
+	"time"
6
+
7
+	log "github.com/Sirupsen/logrus"
8
+)
9
+
10
+var (
11
+	// Backends is a global map of discovery backends indexed by their
12
+	// associated scheme.
13
+	backends map[string]Backend
14
+)
15
+
16
+func init() {
17
+	backends = make(map[string]Backend)
18
+}
19
+
20
+// Register makes a discovery backend available by the provided scheme.
21
+// If Register is called twice with the same scheme an error is returned.
22
+func Register(scheme string, d Backend) error {
23
+	if _, exists := backends[scheme]; exists {
24
+		return fmt.Errorf("scheme already registered %s", scheme)
25
+	}
26
+	log.WithField("name", scheme).Debug("Registering discovery service")
27
+	backends[scheme] = d
28
+	return nil
29
+}
30
+
31
+func parse(rawurl string) (string, string) {
32
+	parts := strings.SplitN(rawurl, "://", 2)
33
+
34
+	// nodes:port,node2:port => nodes://node1:port,node2:port
35
+	if len(parts) == 1 {
36
+		return "nodes", parts[0]
37
+	}
38
+	return parts[0], parts[1]
39
+}
40
+
41
+// New returns a new Discovery given a URL, heartbeat and ttl settings.
42
+// Returns an error if the URL scheme is not supported.
43
+func New(rawurl string, heartbeat time.Duration, ttl time.Duration) (Backend, error) {
44
+	scheme, uri := parse(rawurl)
45
+	if backend, exists := backends[scheme]; exists {
46
+		log.WithFields(log.Fields{"name": scheme, "uri": uri}).Debug("Initializing discovery service")
47
+		err := backend.Initialize(uri, heartbeat, ttl)
48
+		return backend, err
49
+	}
50
+
51
+	return nil, ErrNotSupported
52
+}
0 53
new file mode 100644
... ...
@@ -0,0 +1,35 @@
0
+package discovery
1
+
2
+import (
3
+	"errors"
4
+	"time"
5
+)
6
+
7
+var (
8
+	// ErrNotSupported is returned when a discovery service is not supported.
9
+	ErrNotSupported = errors.New("discovery service not supported")
10
+
11
+	// ErrNotImplemented is returned when discovery feature is not implemented
12
+	// by discovery backend.
13
+	ErrNotImplemented = errors.New("not implemented in this discovery service")
14
+)
15
+
16
+// Watcher provides watching over a cluster for nodes joining and leaving.
17
+type Watcher interface {
18
+	// Watch the discovery for entry changes.
19
+	// Returns a channel that will receive changes or an error.
20
+	// Providing a non-nil stopCh can be used to stop watching.
21
+	Watch(stopCh <-chan struct{}) (<-chan Entries, <-chan error)
22
+}
23
+
24
+// Backend is implemented by discovery backends which manage cluster entries.
25
+type Backend interface {
26
+	// Watcher must be provided by every backend.
27
+	Watcher
28
+
29
+	// Initialize the discovery with URIs, a heartbeat and a ttl.
30
+	Initialize(string, time.Duration, time.Duration) error
31
+
32
+	// Register to the discovery.
33
+	Register(string) error
34
+}
0 35
new file mode 100644
... ...
@@ -0,0 +1,120 @@
0
+package discovery
1
+
2
+import (
3
+	"testing"
4
+
5
+	"github.com/stretchr/testify/assert"
6
+)
7
+
8
+func TestNewEntry(t *testing.T) {
9
+	entry, err := NewEntry("127.0.0.1:2375")
10
+	assert.NoError(t, err)
11
+	assert.True(t, entry.Equals(&Entry{Host: "127.0.0.1", Port: "2375"}))
12
+	assert.Equal(t, entry.String(), "127.0.0.1:2375")
13
+
14
+	_, err = NewEntry("127.0.0.1")
15
+	assert.Error(t, err)
16
+}
17
+
18
+func TestParse(t *testing.T) {
19
+	scheme, uri := parse("127.0.0.1:2375")
20
+	assert.Equal(t, scheme, "nodes")
21
+	assert.Equal(t, uri, "127.0.0.1:2375")
22
+
23
+	scheme, uri = parse("localhost:2375")
24
+	assert.Equal(t, scheme, "nodes")
25
+	assert.Equal(t, uri, "localhost:2375")
26
+
27
+	scheme, uri = parse("scheme://127.0.0.1:2375")
28
+	assert.Equal(t, scheme, "scheme")
29
+	assert.Equal(t, uri, "127.0.0.1:2375")
30
+
31
+	scheme, uri = parse("scheme://localhost:2375")
32
+	assert.Equal(t, scheme, "scheme")
33
+	assert.Equal(t, uri, "localhost:2375")
34
+
35
+	scheme, uri = parse("")
36
+	assert.Equal(t, scheme, "nodes")
37
+	assert.Equal(t, uri, "")
38
+}
39
+
40
+func TestCreateEntries(t *testing.T) {
41
+	entries, err := CreateEntries(nil)
42
+	assert.Equal(t, entries, Entries{})
43
+	assert.NoError(t, err)
44
+
45
+	entries, err = CreateEntries([]string{"127.0.0.1:2375", "127.0.0.2:2375", ""})
46
+	assert.NoError(t, err)
47
+	expected := Entries{
48
+		&Entry{Host: "127.0.0.1", Port: "2375"},
49
+		&Entry{Host: "127.0.0.2", Port: "2375"},
50
+	}
51
+	assert.True(t, entries.Equals(expected))
52
+
53
+	_, err = CreateEntries([]string{"127.0.0.1", "127.0.0.2"})
54
+	assert.Error(t, err)
55
+}
56
+
57
+func TestContainsEntry(t *testing.T) {
58
+	entries, err := CreateEntries([]string{"127.0.0.1:2375", "127.0.0.2:2375", ""})
59
+	assert.NoError(t, err)
60
+	assert.True(t, entries.Contains(&Entry{Host: "127.0.0.1", Port: "2375"}))
61
+	assert.False(t, entries.Contains(&Entry{Host: "127.0.0.3", Port: "2375"}))
62
+}
63
+
64
+func TestEntriesEquality(t *testing.T) {
65
+	entries := Entries{
66
+		&Entry{Host: "127.0.0.1", Port: "2375"},
67
+		&Entry{Host: "127.0.0.2", Port: "2375"},
68
+	}
69
+
70
+	// Same
71
+	assert.True(t, entries.Equals(Entries{
72
+		&Entry{Host: "127.0.0.1", Port: "2375"},
73
+		&Entry{Host: "127.0.0.2", Port: "2375"},
74
+	}))
75
+
76
+	// Different size
77
+	assert.False(t, entries.Equals(Entries{
78
+		&Entry{Host: "127.0.0.1", Port: "2375"},
79
+		&Entry{Host: "127.0.0.2", Port: "2375"},
80
+		&Entry{Host: "127.0.0.3", Port: "2375"},
81
+	}))
82
+
83
+	// Different content
84
+	assert.False(t, entries.Equals(Entries{
85
+		&Entry{Host: "127.0.0.1", Port: "2375"},
86
+		&Entry{Host: "127.0.0.42", Port: "2375"},
87
+	}))
88
+}
89
+
90
+func TestEntriesDiff(t *testing.T) {
91
+	entry1 := &Entry{Host: "1.1.1.1", Port: "1111"}
92
+	entry2 := &Entry{Host: "2.2.2.2", Port: "2222"}
93
+	entry3 := &Entry{Host: "3.3.3.3", Port: "3333"}
94
+	entries := Entries{entry1, entry2}
95
+
96
+	// No diff
97
+	added, removed := entries.Diff(Entries{entry2, entry1})
98
+	assert.Empty(t, added)
99
+	assert.Empty(t, removed)
100
+
101
+	// Add
102
+	added, removed = entries.Diff(Entries{entry2, entry3, entry1})
103
+	assert.Len(t, added, 1)
104
+	assert.True(t, added.Contains(entry3))
105
+	assert.Empty(t, removed)
106
+
107
+	// Remove
108
+	added, removed = entries.Diff(Entries{entry2})
109
+	assert.Empty(t, added)
110
+	assert.Len(t, removed, 1)
111
+	assert.True(t, removed.Contains(entry1))
112
+
113
+	// Add and remove
114
+	added, removed = entries.Diff(Entries{entry1, entry3})
115
+	assert.Len(t, added, 1)
116
+	assert.True(t, added.Contains(entry3))
117
+	assert.Len(t, removed, 1)
118
+	assert.True(t, removed.Contains(entry2))
119
+}
0 120
new file mode 100644
... ...
@@ -0,0 +1,97 @@
0
+package discovery
1
+
2
+import (
3
+	"fmt"
4
+	"net"
5
+)
6
+
7
+// NewEntry creates a new entry.
8
+func NewEntry(url string) (*Entry, error) {
9
+	host, port, err := net.SplitHostPort(url)
10
+	if err != nil {
11
+		return nil, err
12
+	}
13
+	return &Entry{host, port}, nil
14
+}
15
+
16
+// An Entry represents a host.
17
+type Entry struct {
18
+	Host string
19
+	Port string
20
+}
21
+
22
+// Equals returns true if cmp contains the same data.
23
+func (e *Entry) Equals(cmp *Entry) bool {
24
+	return e.Host == cmp.Host && e.Port == cmp.Port
25
+}
26
+
27
+// String returns the string form of an entry.
28
+func (e *Entry) String() string {
29
+	return fmt.Sprintf("%s:%s", e.Host, e.Port)
30
+}
31
+
32
+// Entries is a list of *Entry with some helpers.
33
+type Entries []*Entry
34
+
35
+// Equals returns true if cmp contains the same data.
36
+func (e Entries) Equals(cmp Entries) bool {
37
+	// Check if the file has really changed.
38
+	if len(e) != len(cmp) {
39
+		return false
40
+	}
41
+	for i := range e {
42
+		if !e[i].Equals(cmp[i]) {
43
+			return false
44
+		}
45
+	}
46
+	return true
47
+}
48
+
49
+// Contains returns true if the Entries contain a given Entry.
50
+func (e Entries) Contains(entry *Entry) bool {
51
+	for _, curr := range e {
52
+		if curr.Equals(entry) {
53
+			return true
54
+		}
55
+	}
56
+	return false
57
+}
58
+
59
+// Diff compares two entries and returns the added and removed entries.
60
+func (e Entries) Diff(cmp Entries) (Entries, Entries) {
61
+	added := Entries{}
62
+	for _, entry := range cmp {
63
+		if !e.Contains(entry) {
64
+			added = append(added, entry)
65
+		}
66
+	}
67
+
68
+	removed := Entries{}
69
+	for _, entry := range e {
70
+		if !cmp.Contains(entry) {
71
+			removed = append(removed, entry)
72
+		}
73
+	}
74
+
75
+	return added, removed
76
+}
77
+
78
+// CreateEntries returns an array of entries based on the given addresses.
79
+func CreateEntries(addrs []string) (Entries, error) {
80
+	entries := Entries{}
81
+	if addrs == nil {
82
+		return entries, nil
83
+	}
84
+
85
+	for _, addr := range addrs {
86
+		if len(addr) == 0 {
87
+			continue
88
+		}
89
+		entry, err := NewEntry(addr)
90
+		if err != nil {
91
+			return nil, err
92
+		}
93
+		entries = append(entries, entry)
94
+	}
95
+	return entries, nil
96
+}
0 97
new file mode 100644
... ...
@@ -0,0 +1,109 @@
0
+package file
1
+
2
+import (
3
+	"fmt"
4
+	"io/ioutil"
5
+	"strings"
6
+	"time"
7
+
8
+	"github.com/docker/docker/pkg/discovery"
9
+)
10
+
11
+// Discovery is exported
12
+type Discovery struct {
13
+	heartbeat time.Duration
14
+	path      string
15
+}
16
+
17
+func init() {
18
+	Init()
19
+}
20
+
21
+// Init is exported
22
+func Init() {
23
+	discovery.Register("file", &Discovery{})
24
+}
25
+
26
+// Initialize is exported
27
+func (s *Discovery) Initialize(path string, heartbeat time.Duration, ttl time.Duration) error {
28
+	s.path = path
29
+	s.heartbeat = heartbeat
30
+	return nil
31
+}
32
+
33
+func parseFileContent(content []byte) []string {
34
+	var result []string
35
+	for _, line := range strings.Split(strings.TrimSpace(string(content)), "\n") {
36
+		line = strings.TrimSpace(line)
37
+		// Ignoring line starts with #
38
+		if strings.HasPrefix(line, "#") {
39
+			continue
40
+		}
41
+		// Inlined # comment also ignored.
42
+		if strings.Contains(line, "#") {
43
+			line = line[0:strings.Index(line, "#")]
44
+			// Trim additional spaces caused by above stripping.
45
+			line = strings.TrimSpace(line)
46
+		}
47
+		for _, ip := range discovery.Generate(line) {
48
+			result = append(result, ip)
49
+		}
50
+	}
51
+	return result
52
+}
53
+
54
+func (s *Discovery) fetch() (discovery.Entries, error) {
55
+	fileContent, err := ioutil.ReadFile(s.path)
56
+	if err != nil {
57
+		return nil, fmt.Errorf("failed to read '%s': %v", s.path, err)
58
+	}
59
+	return discovery.CreateEntries(parseFileContent(fileContent))
60
+}
61
+
62
+// Watch is exported
63
+func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) {
64
+	ch := make(chan discovery.Entries)
65
+	errCh := make(chan error)
66
+	ticker := time.NewTicker(s.heartbeat)
67
+
68
+	go func() {
69
+		defer close(errCh)
70
+		defer close(ch)
71
+
72
+		// Send the initial entries if available.
73
+		currentEntries, err := s.fetch()
74
+		if err != nil {
75
+			errCh <- err
76
+		} else {
77
+			ch <- currentEntries
78
+		}
79
+
80
+		// Periodically send updates.
81
+		for {
82
+			select {
83
+			case <-ticker.C:
84
+				newEntries, err := s.fetch()
85
+				if err != nil {
86
+					errCh <- err
87
+					continue
88
+				}
89
+
90
+				// Check if the file has really changed.
91
+				if !newEntries.Equals(currentEntries) {
92
+					ch <- newEntries
93
+				}
94
+				currentEntries = newEntries
95
+			case <-stopCh:
96
+				ticker.Stop()
97
+				return
98
+			}
99
+		}
100
+	}()
101
+
102
+	return ch, errCh
103
+}
104
+
105
+// Register is exported
106
+func (s *Discovery) Register(addr string) error {
107
+	return discovery.ErrNotImplemented
108
+}
0 109
new file mode 100644
... ...
@@ -0,0 +1,106 @@
0
+package file
1
+
2
+import (
3
+	"io/ioutil"
4
+	"os"
5
+	"testing"
6
+
7
+	"github.com/docker/docker/pkg/discovery"
8
+	"github.com/stretchr/testify/assert"
9
+)
10
+
11
+func TestInitialize(t *testing.T) {
12
+	d := &Discovery{}
13
+	d.Initialize("/path/to/file", 1000, 0)
14
+	assert.Equal(t, d.path, "/path/to/file")
15
+}
16
+
17
+func TestNew(t *testing.T) {
18
+	d, err := discovery.New("file:///path/to/file", 0, 0)
19
+	assert.NoError(t, err)
20
+	assert.Equal(t, d.(*Discovery).path, "/path/to/file")
21
+}
22
+
23
+func TestContent(t *testing.T) {
24
+	data := `
25
+1.1.1.[1:2]:1111
26
+2.2.2.[2:4]:2222
27
+`
28
+	ips := parseFileContent([]byte(data))
29
+	assert.Len(t, ips, 5)
30
+	assert.Equal(t, ips[0], "1.1.1.1:1111")
31
+	assert.Equal(t, ips[1], "1.1.1.2:1111")
32
+	assert.Equal(t, ips[2], "2.2.2.2:2222")
33
+	assert.Equal(t, ips[3], "2.2.2.3:2222")
34
+	assert.Equal(t, ips[4], "2.2.2.4:2222")
35
+}
36
+
37
+func TestRegister(t *testing.T) {
38
+	discovery := &Discovery{path: "/path/to/file"}
39
+	assert.Error(t, discovery.Register("0.0.0.0"))
40
+}
41
+
42
+func TestParsingContentsWithComments(t *testing.T) {
43
+	data := `
44
+### test ###
45
+1.1.1.1:1111 # inline comment
46
+# 2.2.2.2:2222
47
+      ### empty line with comment
48
+    3.3.3.3:3333
49
+### test ###
50
+`
51
+	ips := parseFileContent([]byte(data))
52
+	assert.Len(t, ips, 2)
53
+	assert.Equal(t, "1.1.1.1:1111", ips[0])
54
+	assert.Equal(t, "3.3.3.3:3333", ips[1])
55
+}
56
+
57
+func TestWatch(t *testing.T) {
58
+	data := `
59
+1.1.1.1:1111
60
+2.2.2.2:2222
61
+`
62
+	expected := discovery.Entries{
63
+		&discovery.Entry{Host: "1.1.1.1", Port: "1111"},
64
+		&discovery.Entry{Host: "2.2.2.2", Port: "2222"},
65
+	}
66
+
67
+	// Create a temporary file and remove it.
68
+	tmp, err := ioutil.TempFile(os.TempDir(), "discovery-file-test")
69
+	assert.NoError(t, err)
70
+	assert.NoError(t, tmp.Close())
71
+	assert.NoError(t, os.Remove(tmp.Name()))
72
+
73
+	// Set up file discovery.
74
+	d := &Discovery{}
75
+	d.Initialize(tmp.Name(), 1000, 0)
76
+	stopCh := make(chan struct{})
77
+	ch, errCh := d.Watch(stopCh)
78
+
79
+	// Make sure it fires errors since the file doesn't exist.
80
+	assert.Error(t, <-errCh)
81
+	// We have to drain the error channel otherwise Watch will get stuck.
82
+	go func() {
83
+		for range errCh {
84
+		}
85
+	}()
86
+
87
+	// Write the file and make sure we get the expected value back.
88
+	assert.NoError(t, ioutil.WriteFile(tmp.Name(), []byte(data), 0600))
89
+	assert.Equal(t, expected, <-ch)
90
+
91
+	// Add a new entry and look it up.
92
+	expected = append(expected, &discovery.Entry{Host: "3.3.3.3", Port: "3333"})
93
+	f, err := os.OpenFile(tmp.Name(), os.O_APPEND|os.O_WRONLY, 0600)
94
+	assert.NoError(t, err)
95
+	assert.NotNil(t, f)
96
+	_, err = f.WriteString("\n3.3.3.3:3333\n")
97
+	assert.NoError(t, err)
98
+	f.Close()
99
+	assert.Equal(t, expected, <-ch)
100
+
101
+	// Stop and make sure it closes all channels.
102
+	close(stopCh)
103
+	assert.Nil(t, <-ch)
104
+	assert.Nil(t, <-errCh)
105
+}
0 106
new file mode 100644
... ...
@@ -0,0 +1,35 @@
0
+package discovery
1
+
2
+import (
3
+	"fmt"
4
+	"regexp"
5
+	"strconv"
6
+)
7
+
8
+// Generate takes care of IP generation
9
+func Generate(pattern string) []string {
10
+	re, _ := regexp.Compile(`\[(.+):(.+)\]`)
11
+	submatch := re.FindStringSubmatch(pattern)
12
+	if submatch == nil {
13
+		return []string{pattern}
14
+	}
15
+
16
+	from, err := strconv.Atoi(submatch[1])
17
+	if err != nil {
18
+		return []string{pattern}
19
+	}
20
+	to, err := strconv.Atoi(submatch[2])
21
+	if err != nil {
22
+		return []string{pattern}
23
+	}
24
+
25
+	template := re.ReplaceAllString(pattern, "%d")
26
+
27
+	var result []string
28
+	for val := from; val <= to; val++ {
29
+		entry := fmt.Sprintf(template, val)
30
+		result = append(result, entry)
31
+	}
32
+
33
+	return result
34
+}
0 35
new file mode 100644
... ...
@@ -0,0 +1,55 @@
0
+package discovery
1
+
2
+import (
3
+	"testing"
4
+
5
+	"github.com/stretchr/testify/assert"
6
+)
7
+
8
+func TestGeneratorNotGenerate(t *testing.T) {
9
+	ips := Generate("127.0.0.1")
10
+	assert.Equal(t, len(ips), 1)
11
+	assert.Equal(t, ips[0], "127.0.0.1")
12
+}
13
+
14
+func TestGeneratorWithPortNotGenerate(t *testing.T) {
15
+	ips := Generate("127.0.0.1:8080")
16
+	assert.Equal(t, len(ips), 1)
17
+	assert.Equal(t, ips[0], "127.0.0.1:8080")
18
+}
19
+
20
+func TestGeneratorMatchFailedNotGenerate(t *testing.T) {
21
+	ips := Generate("127.0.0.[1]")
22
+	assert.Equal(t, len(ips), 1)
23
+	assert.Equal(t, ips[0], "127.0.0.[1]")
24
+}
25
+
26
+func TestGeneratorWithPort(t *testing.T) {
27
+	ips := Generate("127.0.0.[1:11]:2375")
28
+	assert.Equal(t, len(ips), 11)
29
+	assert.Equal(t, ips[0], "127.0.0.1:2375")
30
+	assert.Equal(t, ips[1], "127.0.0.2:2375")
31
+	assert.Equal(t, ips[2], "127.0.0.3:2375")
32
+	assert.Equal(t, ips[3], "127.0.0.4:2375")
33
+	assert.Equal(t, ips[4], "127.0.0.5:2375")
34
+	assert.Equal(t, ips[5], "127.0.0.6:2375")
35
+	assert.Equal(t, ips[6], "127.0.0.7:2375")
36
+	assert.Equal(t, ips[7], "127.0.0.8:2375")
37
+	assert.Equal(t, ips[8], "127.0.0.9:2375")
38
+	assert.Equal(t, ips[9], "127.0.0.10:2375")
39
+	assert.Equal(t, ips[10], "127.0.0.11:2375")
40
+}
41
+
42
+func TestGenerateWithMalformedInputAtRangeStart(t *testing.T) {
43
+	malformedInput := "127.0.0.[x:11]:2375"
44
+	ips := Generate(malformedInput)
45
+	assert.Equal(t, len(ips), 1)
46
+	assert.Equal(t, ips[0], malformedInput)
47
+}
48
+
49
+func TestGenerateWithMalformedInputAtRangeEnd(t *testing.T) {
50
+	malformedInput := "127.0.0.[1:x]:2375"
51
+	ips := Generate(malformedInput)
52
+	assert.Equal(t, len(ips), 1)
53
+	assert.Equal(t, ips[0], malformedInput)
54
+}
0 55
new file mode 100644
... ...
@@ -0,0 +1,148 @@
0
+package kv
1
+
2
+import (
3
+	"fmt"
4
+	"path"
5
+	"strings"
6
+	"time"
7
+
8
+	log "github.com/Sirupsen/logrus"
9
+	"github.com/docker/docker/pkg/discovery"
10
+	"github.com/docker/libkv"
11
+	"github.com/docker/libkv/store"
12
+	"github.com/docker/libkv/store/consul"
13
+	"github.com/docker/libkv/store/etcd"
14
+	"github.com/docker/libkv/store/zookeeper"
15
+)
16
+
17
+const (
18
+	discoveryPath = "docker/nodes"
19
+)
20
+
21
+// Discovery is exported
22
+type Discovery struct {
23
+	backend   store.Backend
24
+	store     store.Store
25
+	heartbeat time.Duration
26
+	ttl       time.Duration
27
+	prefix    string
28
+	path      string
29
+}
30
+
31
+func init() {
32
+	Init()
33
+}
34
+
35
+// Init is exported
36
+func Init() {
37
+	// Register to libkv
38
+	zookeeper.Register()
39
+	consul.Register()
40
+	etcd.Register()
41
+
42
+	// Register to internal discovery service
43
+	discovery.Register("zk", &Discovery{backend: store.ZK})
44
+	discovery.Register("consul", &Discovery{backend: store.CONSUL})
45
+	discovery.Register("etcd", &Discovery{backend: store.ETCD})
46
+}
47
+
48
+// Initialize is exported
49
+func (s *Discovery) Initialize(uris string, heartbeat time.Duration, ttl time.Duration) error {
50
+	var (
51
+		parts = strings.SplitN(uris, "/", 2)
52
+		addrs = strings.Split(parts[0], ",")
53
+		err   error
54
+	)
55
+
56
+	// A custom prefix to the path can be optionally used.
57
+	if len(parts) == 2 {
58
+		s.prefix = parts[1]
59
+	}
60
+
61
+	s.heartbeat = heartbeat
62
+	s.ttl = ttl
63
+	s.path = path.Join(s.prefix, discoveryPath)
64
+
65
+	// Creates a new store, will ignore options given
66
+	// if not supported by the chosen store
67
+	s.store, err = libkv.NewStore(s.backend, addrs, nil)
68
+	return err
69
+}
70
+
71
+// Watch the store until either there's a store error or we receive a stop request.
72
+// Returns false if we shouldn't attempt watching the store anymore (stop request received).
73
+func (s *Discovery) watchOnce(stopCh <-chan struct{}, watchCh <-chan []*store.KVPair, discoveryCh chan discovery.Entries, errCh chan error) bool {
74
+	for {
75
+		select {
76
+		case pairs := <-watchCh:
77
+			if pairs == nil {
78
+				return true
79
+			}
80
+
81
+			log.WithField("discovery", s.backend).Debugf("Watch triggered with %d nodes", len(pairs))
82
+
83
+			// Convert `KVPair` into `discovery.Entry`.
84
+			addrs := make([]string, len(pairs))
85
+			for _, pair := range pairs {
86
+				addrs = append(addrs, string(pair.Value))
87
+			}
88
+
89
+			entries, err := discovery.CreateEntries(addrs)
90
+			if err != nil {
91
+				errCh <- err
92
+			} else {
93
+				discoveryCh <- entries
94
+			}
95
+		case <-stopCh:
96
+			// We were requested to stop watching.
97
+			return false
98
+		}
99
+	}
100
+}
101
+
102
+// Watch is exported
103
+func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) {
104
+	ch := make(chan discovery.Entries)
105
+	errCh := make(chan error)
106
+
107
+	go func() {
108
+		defer close(ch)
109
+		defer close(errCh)
110
+
111
+		// Forever: Create a store watch, watch until we get an error and then try again.
112
+		// Will only stop if we receive a stopCh request.
113
+		for {
114
+			// Set up a watch.
115
+			watchCh, err := s.store.WatchTree(s.path, stopCh)
116
+			if err != nil {
117
+				errCh <- err
118
+			} else {
119
+				if !s.watchOnce(stopCh, watchCh, ch, errCh) {
120
+					return
121
+				}
122
+			}
123
+
124
+			// If we get here it means the store watch channel was closed. This
125
+			// is unexpected so let's retry later.
126
+			errCh <- fmt.Errorf("Unexpected watch error")
127
+			time.Sleep(s.heartbeat)
128
+		}
129
+	}()
130
+	return ch, errCh
131
+}
132
+
133
+// Register is exported
134
+func (s *Discovery) Register(addr string) error {
135
+	opts := &store.WriteOptions{TTL: s.ttl}
136
+	return s.store.Put(path.Join(s.path, addr), []byte(addr), opts)
137
+}
138
+
139
+// Store returns the underlying store used by KV discovery.
140
+func (s *Discovery) Store() store.Store {
141
+	return s.store
142
+}
143
+
144
+// Prefix returns the store prefix
145
+func (s *Discovery) Prefix() string {
146
+	return s.prefix
147
+}
0 148
new file mode 100644
... ...
@@ -0,0 +1,119 @@
0
+package kv
1
+
2
+import (
3
+	"errors"
4
+	"path"
5
+	"testing"
6
+	"time"
7
+
8
+	"github.com/docker/docker/pkg/discovery"
9
+	"github.com/docker/libkv/store"
10
+	libkvmock "github.com/docker/libkv/store/mock"
11
+	"github.com/stretchr/testify/assert"
12
+	"github.com/stretchr/testify/mock"
13
+)
14
+
15
+func TestInitialize(t *testing.T) {
16
+	storeMock, err := libkvmock.New([]string{"127.0.0.1"}, nil)
17
+	assert.NotNil(t, storeMock)
18
+	assert.NoError(t, err)
19
+
20
+	d := &Discovery{backend: store.CONSUL}
21
+	d.Initialize("127.0.0.1", 0, 0)
22
+	d.store = storeMock
23
+
24
+	s := d.store.(*libkvmock.Mock)
25
+	assert.Len(t, s.Endpoints, 1)
26
+	assert.Equal(t, s.Endpoints[0], "127.0.0.1")
27
+	assert.Equal(t, d.path, discoveryPath)
28
+
29
+	storeMock, err = libkvmock.New([]string{"127.0.0.1:1234"}, nil)
30
+	assert.NotNil(t, storeMock)
31
+	assert.NoError(t, err)
32
+
33
+	d = &Discovery{backend: store.CONSUL}
34
+	d.Initialize("127.0.0.1:1234/path", 0, 0)
35
+	d.store = storeMock
36
+
37
+	s = d.store.(*libkvmock.Mock)
38
+	assert.Len(t, s.Endpoints, 1)
39
+	assert.Equal(t, s.Endpoints[0], "127.0.0.1:1234")
40
+	assert.Equal(t, d.path, "path/"+discoveryPath)
41
+
42
+	storeMock, err = libkvmock.New([]string{"127.0.0.1:1234", "127.0.0.2:1234", "127.0.0.3:1234"}, nil)
43
+	assert.NotNil(t, storeMock)
44
+	assert.NoError(t, err)
45
+
46
+	d = &Discovery{backend: store.CONSUL}
47
+	d.Initialize("127.0.0.1:1234,127.0.0.2:1234,127.0.0.3:1234/path", 0, 0)
48
+	d.store = storeMock
49
+
50
+	s = d.store.(*libkvmock.Mock)
51
+	if assert.Len(t, s.Endpoints, 3) {
52
+		assert.Equal(t, s.Endpoints[0], "127.0.0.1:1234")
53
+		assert.Equal(t, s.Endpoints[1], "127.0.0.2:1234")
54
+		assert.Equal(t, s.Endpoints[2], "127.0.0.3:1234")
55
+	}
56
+	assert.Equal(t, d.path, "path/"+discoveryPath)
57
+}
58
+
59
+func TestWatch(t *testing.T) {
60
+	storeMock, err := libkvmock.New([]string{"127.0.0.1:1234"}, nil)
61
+	assert.NotNil(t, storeMock)
62
+	assert.NoError(t, err)
63
+
64
+	d := &Discovery{backend: store.CONSUL}
65
+	d.Initialize("127.0.0.1:1234/path", 0, 0)
66
+	d.store = storeMock
67
+
68
+	s := d.store.(*libkvmock.Mock)
69
+	mockCh := make(chan []*store.KVPair)
70
+
71
+	// The first watch will fail.
72
+	s.On("WatchTree", "path/"+discoveryPath, mock.Anything).Return(mockCh, errors.New("test error")).Once()
73
+	// The second one will succeed.
74
+	s.On("WatchTree", "path/"+discoveryPath, mock.Anything).Return(mockCh, nil).Once()
75
+	expected := discovery.Entries{
76
+		&discovery.Entry{Host: "1.1.1.1", Port: "1111"},
77
+		&discovery.Entry{Host: "2.2.2.2", Port: "2222"},
78
+	}
79
+	kvs := []*store.KVPair{
80
+		{Key: path.Join("path", discoveryPath, "1.1.1.1"), Value: []byte("1.1.1.1:1111")},
81
+		{Key: path.Join("path", discoveryPath, "2.2.2.2"), Value: []byte("2.2.2.2:2222")},
82
+	}
83
+
84
+	stopCh := make(chan struct{})
85
+	ch, errCh := d.Watch(stopCh)
86
+
87
+	// It should fire an error since the first WatchTree call failed.
88
+	assert.EqualError(t, <-errCh, "test error")
89
+	// We have to drain the error channel otherwise Watch will get stuck.
90
+	go func() {
91
+		for range errCh {
92
+		}
93
+	}()
94
+
95
+	// Push the entries into the store channel and make sure discovery emits.
96
+	mockCh <- kvs
97
+	assert.Equal(t, <-ch, expected)
98
+
99
+	// Add a new entry.
100
+	expected = append(expected, &discovery.Entry{Host: "3.3.3.3", Port: "3333"})
101
+	kvs = append(kvs, &store.KVPair{Key: path.Join("path", discoveryPath, "3.3.3.3"), Value: []byte("3.3.3.3:3333")})
102
+	mockCh <- kvs
103
+	assert.Equal(t, <-ch, expected)
104
+
105
+	// Make sure that if an error occurs it retries.
106
+	// This third call to WatchTree will be checked later by AssertExpectations.
107
+	s.On("WatchTree", "path/"+discoveryPath, mock.Anything).Return(mockCh, nil)
108
+	close(mockCh)
109
+	// Give it enough time to call WatchTree.
110
+	time.Sleep(3)
111
+
112
+	// Stop and make sure it closes all channels.
113
+	close(stopCh)
114
+	assert.Nil(t, <-ch)
115
+	assert.Nil(t, <-errCh)
116
+
117
+	s.AssertExpectations(t)
118
+}
0 119
new file mode 100644
... ...
@@ -0,0 +1,54 @@
0
+package nodes
1
+
2
+import (
3
+	"fmt"
4
+	"strings"
5
+	"time"
6
+
7
+	"github.com/docker/docker/pkg/discovery"
8
+)
9
+
10
+// Discovery is exported
11
+type Discovery struct {
12
+	entries discovery.Entries
13
+}
14
+
15
+func init() {
16
+	Init()
17
+}
18
+
19
+// Init is exported
20
+func Init() {
21
+	discovery.Register("nodes", &Discovery{})
22
+}
23
+
24
+// Initialize is exported
25
+func (s *Discovery) Initialize(uris string, _ time.Duration, _ time.Duration) error {
26
+	for _, input := range strings.Split(uris, ",") {
27
+		for _, ip := range discovery.Generate(input) {
28
+			entry, err := discovery.NewEntry(ip)
29
+			if err != nil {
30
+				return fmt.Errorf("%s, please check you are using the correct discovery (missing token:// ?)", err.Error())
31
+			}
32
+			s.entries = append(s.entries, entry)
33
+		}
34
+	}
35
+
36
+	return nil
37
+}
38
+
39
+// Watch is exported
40
+func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) {
41
+	ch := make(chan discovery.Entries)
42
+	go func() {
43
+		defer close(ch)
44
+		ch <- s.entries
45
+		<-stopCh
46
+	}()
47
+	return ch, nil
48
+}
49
+
50
+// Register is exported
51
+func (s *Discovery) Register(addr string) error {
52
+	return discovery.ErrNotImplemented
53
+}
0 54
new file mode 100644
... ...
@@ -0,0 +1,43 @@
0
+package nodes
1
+
2
+import (
3
+	"testing"
4
+
5
+	"github.com/docker/docker/pkg/discovery"
6
+	"github.com/stretchr/testify/assert"
7
+)
8
+
9
+func TestInitialize(t *testing.T) {
10
+	d := &Discovery{}
11
+	d.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0, 0)
12
+	assert.Equal(t, len(d.entries), 2)
13
+	assert.Equal(t, d.entries[0].String(), "1.1.1.1:1111")
14
+	assert.Equal(t, d.entries[1].String(), "2.2.2.2:2222")
15
+}
16
+
17
+func TestInitializeWithPattern(t *testing.T) {
18
+	d := &Discovery{}
19
+	d.Initialize("1.1.1.[1:2]:1111,2.2.2.[2:4]:2222", 0, 0)
20
+	assert.Equal(t, len(d.entries), 5)
21
+	assert.Equal(t, d.entries[0].String(), "1.1.1.1:1111")
22
+	assert.Equal(t, d.entries[1].String(), "1.1.1.2:1111")
23
+	assert.Equal(t, d.entries[2].String(), "2.2.2.2:2222")
24
+	assert.Equal(t, d.entries[3].String(), "2.2.2.3:2222")
25
+	assert.Equal(t, d.entries[4].String(), "2.2.2.4:2222")
26
+}
27
+
28
+func TestWatch(t *testing.T) {
29
+	d := &Discovery{}
30
+	d.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0, 0)
31
+	expected := discovery.Entries{
32
+		&discovery.Entry{Host: "1.1.1.1", Port: "1111"},
33
+		&discovery.Entry{Host: "2.2.2.2", Port: "2222"},
34
+	}
35
+	ch, _ := d.Watch(nil)
36
+	assert.True(t, expected.Equals(<-ch))
37
+}
38
+
39
+func TestRegister(t *testing.T) {
40
+	d := &Discovery{}
41
+	assert.Error(t, d.Register("0.0.0.0"))
42
+}