Browse code

Remote communication between engines using beam

Docker-DCO-1.1-Signed-off-by: Solomon Hykes <solomon@docker.com> (github: shykes)

Solomon Hykes authored on 2014/04/24 16:46:32
Showing 2 changed files
1 1
new file mode 100644
... ...
@@ -0,0 +1,109 @@
0
+package engine
1
+
2
+import (
3
+	"fmt"
4
+	"github.com/dotcloud/docker/pkg/beam"
5
+	"github.com/dotcloud/docker/pkg/beam/data"
6
+	"io"
7
+	"os"
8
+	"strconv"
9
+	"sync"
10
+)
11
+
12
+type Sender struct {
13
+	beam.Sender
14
+}
15
+
16
+func NewSender(s beam.Sender) *Sender {
17
+	return &Sender{s}
18
+}
19
+
20
+func (s *Sender) Install(eng *Engine) error {
21
+	// FIXME: this doesn't exist yet.
22
+	eng.RegisterCatchall(s.Handle)
23
+	return nil
24
+}
25
+
26
+func (s *Sender) Handle(job *Job) Status {
27
+	msg := data.Empty().Set("cmd", append([]string{job.Name}, job.Args...)...)
28
+	peer, err := beam.SendConn(s, msg.Bytes())
29
+	if err != nil {
30
+		return job.Errorf("beamsend: %v", err)
31
+	}
32
+	defer peer.Close()
33
+	var tasks sync.WaitGroup
34
+	defer tasks.Wait()
35
+	r := beam.NewRouter(nil)
36
+	r.NewRoute().KeyStartsWith("cmd", "log", "stdout").HasAttachment().Handler(func(p []byte, stdout *os.File) error {
37
+		tasks.Add(1)
38
+		io.Copy(job.Stdout, stdout)
39
+		tasks.Done()
40
+		return nil
41
+	})
42
+	r.NewRoute().KeyStartsWith("cmd", "log", "stderr").HasAttachment().Handler(func(p []byte, stderr *os.File) error {
43
+		tasks.Add(1)
44
+		io.Copy(job.Stderr, stderr)
45
+		tasks.Done()
46
+		return nil
47
+	})
48
+	var status int
49
+	r.NewRoute().KeyStartsWith("cmd", "status").Handler(func(p []byte, f *os.File) error {
50
+		cmd := data.Message(p).Get("cmd")
51
+		if len(cmd) != 3 {
52
+			return fmt.Errorf("usage: %s <0-127>", cmd[0])
53
+		}
54
+		s, err := strconv.ParseUint(cmd[2], 10, 8)
55
+		if err != nil {
56
+			return fmt.Errorf("usage: %s <0-127>", cmd[0])
57
+		}
58
+		status = int(s)
59
+		return nil
60
+
61
+	})
62
+	if _, err := beam.Copy(r, peer); err != nil {
63
+		return job.Errorf("%v", err)
64
+	}
65
+	return Status(status)
66
+}
67
+
68
+type Receiver struct {
69
+	*Engine
70
+	peer beam.Receiver
71
+}
72
+
73
+func NewReceiver(peer beam.Receiver) *Receiver {
74
+	return &Receiver{Engine: New(), peer: peer}
75
+}
76
+
77
+func (rcv *Receiver) Run() error {
78
+	r := beam.NewRouter(nil)
79
+	r.NewRoute().KeyExists("cmd").Handler(func(p []byte, f *os.File) error {
80
+		// Use the attachment as a beam return channel
81
+		peer, err := beam.FileConn(f)
82
+		if err != nil {
83
+			f.Close()
84
+			return err
85
+		}
86
+		cmd := data.Message(p).Get("cmd")
87
+		job := rcv.Engine.Job(cmd[0], cmd[1:]...)
88
+		stdout, err := beam.SendPipe(peer, data.Empty().Set("cmd", "log", "stdout").Bytes())
89
+		if err != nil {
90
+			return err
91
+		}
92
+		job.Stdout.Add(stdout)
93
+		stderr, err := beam.SendPipe(peer, data.Empty().Set("cmd", "log", "stderr").Bytes())
94
+		if err != nil {
95
+			return err
96
+		}
97
+		job.Stderr.Add(stderr)
98
+		// ignore error because we pass the raw status
99
+		job.Run()
100
+		err = peer.Send(data.Empty().Set("cmd", "status", fmt.Sprintf("%d", job.status)).Bytes(), nil)
101
+		if err != nil {
102
+			return err
103
+		}
104
+		return nil
105
+	})
106
+	_, err := beam.Copy(r, rcv.peer)
107
+	return err
108
+}
0 109
new file mode 100644
... ...
@@ -0,0 +1,3 @@
0
+package engine
1
+
2
+import ()