Browse code

Remote plugins plumbing.

Signed-off-by: David Calavera <david.calavera@gmail.com>

David Calavera authored on 2015/05/15 02:05:39
Showing 5 changed files
1 1
new file mode 100644
... ...
@@ -0,0 +1,100 @@
0
+package plugins
1
+
2
+import (
3
+	"bytes"
4
+	"encoding/json"
5
+	"fmt"
6
+	"io/ioutil"
7
+	"net"
8
+	"net/http"
9
+	"strings"
10
+	"time"
11
+
12
+	"github.com/Sirupsen/logrus"
13
+)
14
+
15
+const (
16
+	versionMimetype = "appplication/vnd.docker.plugins.v1+json"
17
+	defaultTimeOut  = 120
18
+)
19
+
20
+func NewClient(addr string) *Client {
21
+	tr := &http.Transport{}
22
+	protoAndAddr := strings.Split(addr, "://")
23
+	configureTCPTransport(tr, protoAndAddr[0], protoAndAddr[1])
24
+	return &Client{&http.Client{Transport: tr}, protoAndAddr[1]}
25
+}
26
+
27
+type Client struct {
28
+	http *http.Client
29
+	addr string
30
+}
31
+
32
+func (c *Client) Call(serviceMethod string, args interface{}, ret interface{}) error {
33
+	var buf bytes.Buffer
34
+	if err := json.NewEncoder(&buf).Encode(args); err != nil {
35
+		return err
36
+	}
37
+
38
+	req, err := http.NewRequest("POST", "/"+serviceMethod, &buf)
39
+	if err != nil {
40
+		return err
41
+	}
42
+	req.Header.Add("Accept", versionMimetype)
43
+	req.URL.Scheme = "http"
44
+	req.URL.Host = c.addr
45
+
46
+	var retries int
47
+	start := time.Now()
48
+
49
+	for {
50
+		resp, err := c.http.Do(req)
51
+		if err != nil {
52
+			timeOff := backoff(retries)
53
+			if timeOff+time.Since(start) > defaultTimeOut {
54
+				return err
55
+			}
56
+			retries++
57
+			logrus.Warn("Unable to connect to plugin: %s, retrying in %ds\n", c.addr, timeOff)
58
+			time.Sleep(timeOff)
59
+			continue
60
+		}
61
+
62
+		if resp.StatusCode != http.StatusOK {
63
+			remoteErr, err := ioutil.ReadAll(resp.Body)
64
+			if err != nil {
65
+				return nil
66
+			}
67
+			return fmt.Errorf("Plugin Error: %s", remoteErr)
68
+		}
69
+
70
+		return json.NewDecoder(resp.Body).Decode(&ret)
71
+	}
72
+}
73
+
74
+func backoff(retries int) time.Duration {
75
+	b, max := float64(1), float64(defaultTimeOut)
76
+	for b < max && retries > 0 {
77
+		b *= 2
78
+		retries--
79
+	}
80
+	if b > max {
81
+		b = max
82
+	}
83
+	return time.Duration(b)
84
+}
85
+
86
+func configureTCPTransport(tr *http.Transport, proto, addr string) {
87
+	// Why 32? See https://github.com/docker/docker/pull/8035.
88
+	timeout := 32 * time.Second
89
+	if proto == "unix" {
90
+		// No need for compression in local communications.
91
+		tr.DisableCompression = true
92
+		tr.Dial = func(_, _ string) (net.Conn, error) {
93
+			return net.DialTimeout(proto, addr, timeout)
94
+		}
95
+	} else {
96
+		tr.Proxy = http.ProxyFromEnvironment
97
+		tr.Dial = (&net.Dialer{Timeout: timeout}).Dial
98
+	}
99
+}
0 100
new file mode 100644
... ...
@@ -0,0 +1,63 @@
0
+package plugins
1
+
2
+import (
3
+	"io"
4
+	"net/http"
5
+	"net/http/httptest"
6
+	"reflect"
7
+	"testing"
8
+)
9
+
10
+var (
11
+	mux    *http.ServeMux
12
+	server *httptest.Server
13
+)
14
+
15
+func setupRemotePluginServer() string {
16
+	mux = http.NewServeMux()
17
+	server = httptest.NewServer(mux)
18
+	return server.URL
19
+}
20
+
21
+func teardownRemotePluginServer() {
22
+	if server != nil {
23
+		server.Close()
24
+	}
25
+}
26
+
27
+func TestFailedConnection(t *testing.T) {
28
+	c := NewClient("tcp://127.0.0.1:1")
29
+	err := c.Call("Service.Method", nil, nil)
30
+	if err == nil {
31
+		t.Fatal("Unexpected successful connection")
32
+	}
33
+}
34
+
35
+func TestEchoInputOutput(t *testing.T) {
36
+	addr := setupRemotePluginServer()
37
+	defer teardownRemotePluginServer()
38
+
39
+	m := Manifest{[]string{"VolumeDriver", "NetworkDriver"}}
40
+
41
+	mux.HandleFunc("/Test.Echo", func(w http.ResponseWriter, r *http.Request) {
42
+		if r.Method != "POST" {
43
+			t.Fatalf("Expected POST, got %s\n", r.Method)
44
+		}
45
+
46
+		header := w.Header()
47
+		header.Set("Content-Type", versionMimetype)
48
+
49
+		io.Copy(w, r.Body)
50
+	})
51
+
52
+	c := NewClient(addr)
53
+	var output Manifest
54
+	err := c.Call("Test.Echo", m, &output)
55
+	if err != nil {
56
+		t.Fatal(err)
57
+	}
58
+
59
+	if !reflect.DeepEqual(output, m) {
60
+		t.Fatalf("Expected %v, was %v\n", m, output)
61
+	}
62
+}
0 63
new file mode 100644
... ...
@@ -0,0 +1,78 @@
0
+package plugins
1
+
2
+import (
3
+	"errors"
4
+	"fmt"
5
+	"io/ioutil"
6
+	"net/url"
7
+	"os"
8
+	"path/filepath"
9
+	"strings"
10
+)
11
+
12
+const defaultLocalRegistry = "/usr/share/docker/plugins"
13
+
14
+var (
15
+	ErrNotFound = errors.New("Plugin not found")
16
+)
17
+
18
+type Registry interface {
19
+	Plugins() ([]*Plugin, error)
20
+	Plugin(name string) (*Plugin, error)
21
+}
22
+
23
+type LocalRegistry struct {
24
+	path string
25
+}
26
+
27
+func newLocalRegistry(path string) *LocalRegistry {
28
+	if len(path) == 0 {
29
+		path = defaultLocalRegistry
30
+	}
31
+
32
+	return &LocalRegistry{path}
33
+}
34
+
35
+func (l *LocalRegistry) Plugin(name string) (*Plugin, error) {
36
+	filepath := filepath.Join(l.path, name)
37
+	specpath := filepath + ".spec"
38
+	if fi, err := os.Stat(specpath); err == nil {
39
+		return readPluginInfo(specpath, fi)
40
+	}
41
+	socketpath := filepath + ".sock"
42
+	if fi, err := os.Stat(socketpath); err == nil {
43
+		return readPluginInfo(socketpath, fi)
44
+	}
45
+	return nil, ErrNotFound
46
+}
47
+
48
+func readPluginInfo(path string, fi os.FileInfo) (*Plugin, error) {
49
+	name := strings.Split(fi.Name(), ".")[0]
50
+
51
+	if fi.Mode()&os.ModeSocket != 0 {
52
+		return &Plugin{
53
+			Name: name,
54
+			Addr: "unix://" + path,
55
+		}, nil
56
+	}
57
+
58
+	content, err := ioutil.ReadFile(path)
59
+	if err != nil {
60
+		return nil, err
61
+	}
62
+	addr := strings.TrimSpace(string(content))
63
+
64
+	u, err := url.Parse(addr)
65
+	if err != nil {
66
+		return nil, err
67
+	}
68
+
69
+	if len(u.Scheme) == 0 {
70
+		return nil, fmt.Errorf("Unknown protocol")
71
+	}
72
+
73
+	return &Plugin{
74
+		Name: name,
75
+		Addr: addr,
76
+	}, nil
77
+}
0 78
new file mode 100644
... ...
@@ -0,0 +1,108 @@
0
+package plugins
1
+
2
+import (
3
+	"fmt"
4
+	"io/ioutil"
5
+	"net"
6
+	"os"
7
+	"path"
8
+	"path/filepath"
9
+	"reflect"
10
+	"testing"
11
+)
12
+
13
+func TestUnknownLocalPath(t *testing.T) {
14
+	tmpdir, err := ioutil.TempDir("", "docker-test")
15
+	if err != nil {
16
+		t.Fatal(err)
17
+	}
18
+	defer os.RemoveAll(tmpdir)
19
+
20
+	l := newLocalRegistry(filepath.Join(tmpdir, "unknown"))
21
+	_, err = l.Plugin("foo")
22
+	if err == nil || err != ErrNotFound {
23
+		t.Fatalf("Expected error for unknown directory")
24
+	}
25
+}
26
+
27
+func TestLocalSocket(t *testing.T) {
28
+	tmpdir, err := ioutil.TempDir("", "docker-test")
29
+	if err != nil {
30
+		t.Fatal(err)
31
+	}
32
+	defer os.RemoveAll(tmpdir)
33
+	l, err := net.Listen("unix", filepath.Join(tmpdir, "echo.sock"))
34
+	if err != nil {
35
+		t.Fatal(err)
36
+	}
37
+	defer l.Close()
38
+
39
+	r := newLocalRegistry(tmpdir)
40
+	p, err := r.Plugin("echo")
41
+	if err != nil {
42
+		t.Fatal(err)
43
+	}
44
+
45
+	pp, err := r.Plugin("echo")
46
+	if err != nil {
47
+		t.Fatal(err)
48
+	}
49
+	if !reflect.DeepEqual(p, pp) {
50
+		t.Fatalf("Expected %v, was %v\n", p, pp)
51
+	}
52
+
53
+	if p.Name != "echo" {
54
+		t.Fatalf("Expected plugin `echo`, got %s\n", p.Name)
55
+	}
56
+
57
+	addr := fmt.Sprintf("unix://%s/echo.sock", tmpdir)
58
+	if p.Addr != addr {
59
+		t.Fatalf("Expected plugin addr `%s`, got %s\n", addr, p.Addr)
60
+	}
61
+}
62
+
63
+func TestFileSpecPlugin(t *testing.T) {
64
+	tmpdir, err := ioutil.TempDir("", "docker-test")
65
+	if err != nil {
66
+		t.Fatal(err)
67
+	}
68
+
69
+	cases := []struct {
70
+		path string
71
+		name string
72
+		addr string
73
+		fail bool
74
+	}{
75
+		{filepath.Join(tmpdir, "echo.spec"), "echo", "unix://var/lib/docker/plugins/echo.sock", false},
76
+		{filepath.Join(tmpdir, "foo.spec"), "foo", "tcp://localhost:8080", false},
77
+		{filepath.Join(tmpdir, "bar.spec"), "bar", "localhost:8080", true}, // unknown transport
78
+	}
79
+
80
+	for _, c := range cases {
81
+		if err = os.MkdirAll(path.Dir(c.path), 0755); err != nil {
82
+			t.Fatal(err)
83
+		}
84
+		if err = ioutil.WriteFile(c.path, []byte(c.addr), 0644); err != nil {
85
+			t.Fatal(err)
86
+		}
87
+
88
+		r := newLocalRegistry(tmpdir)
89
+		p, err := r.Plugin(c.name)
90
+		if c.fail && err == nil {
91
+			continue
92
+		}
93
+
94
+		if err != nil {
95
+			t.Fatal(err)
96
+		}
97
+
98
+		if p.Name != c.name {
99
+			t.Fatalf("Expected plugin `%s`, got %s\n", c.name, p.Name)
100
+		}
101
+
102
+		if p.Addr != c.addr {
103
+			t.Fatalf("Expected plugin addr `%s`, got %s\n", c.addr, p.Addr)
104
+		}
105
+		os.Remove(c.path)
106
+	}
107
+}
0 108
new file mode 100644
... ...
@@ -0,0 +1,86 @@
0
+package plugins
1
+
2
+import (
3
+	"errors"
4
+	"sync"
5
+
6
+	"github.com/Sirupsen/logrus"
7
+)
8
+
9
+var (
10
+	ErrNotImplements = errors.New("Plugin does not implement the requested driver")
11
+)
12
+
13
+type plugins struct {
14
+	sync.Mutex
15
+	plugins map[string]*Plugin
16
+}
17
+
18
+var storage = plugins{plugins: make(map[string]*Plugin)}
19
+
20
+type Manifest struct {
21
+	Implements []string
22
+}
23
+
24
+type Plugin struct {
25
+	Name     string
26
+	Addr     string
27
+	Client   *Client
28
+	Manifest *Manifest
29
+}
30
+
31
+func (p *Plugin) activate() error {
32
+	m := new(Manifest)
33
+	p.Client = NewClient(p.Addr)
34
+	err := p.Client.Call("Plugin.Activate", nil, m)
35
+	if err != nil {
36
+		return err
37
+	}
38
+
39
+	logrus.Debugf("%s's manifest: %v", p.Name, m)
40
+	p.Manifest = m
41
+	return nil
42
+}
43
+
44
+func load(name string) (*Plugin, error) {
45
+	registry := newLocalRegistry("")
46
+	pl, err := registry.Plugin(name)
47
+	if err != nil {
48
+		return nil, err
49
+	}
50
+	if err := pl.activate(); err != nil {
51
+		return nil, err
52
+	}
53
+	return pl, nil
54
+}
55
+
56
+func get(name string) (*Plugin, error) {
57
+	storage.Lock()
58
+	defer storage.Unlock()
59
+	pl, ok := storage.plugins[name]
60
+	if ok {
61
+		return pl, nil
62
+	}
63
+	pl, err := load(name)
64
+	if err != nil {
65
+		return nil, err
66
+	}
67
+
68
+	logrus.Debugf("Plugin: %v", pl)
69
+	storage.plugins[name] = pl
70
+	return pl, nil
71
+}
72
+
73
+func Get(name, imp string) (*Plugin, error) {
74
+	pl, err := get(name)
75
+	if err != nil {
76
+		return nil, err
77
+	}
78
+	for _, driver := range pl.Manifest.Implements {
79
+		logrus.Debugf("%s implements: %s", name, driver)
80
+		if driver == imp {
81
+			return pl, nil
82
+		}
83
+	}
84
+	return nil, ErrNotImplements
85
+}