Browse code

Move userland proxies out of daemon's process

This PR moves the userland proxies for TCP and UDP traffic out of the
main docker daemon's process ( from goroutines per proxy ) to be a
separate reexec of the docker binary. This reduces the cpu and memory
needed by the daemon and if the proxy processes crash for some reason
the daemon is unaffected. This also displays in the standard process
tree so that a user can clearly see if there is a userland proxy that is
bound to a certain ip and port.

```bash
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
5d349506feb6 busybox:buildroot-2014.02 "sh" 13 minutes ago Up 1 seconds 0.0.0.0:49153->81/tcp, 0.0.0.0:49154->90/tcp hungry_pike
root@1cbfdcedc5a7:/go/src/github.com/docker/docker# ps aux
USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
root 1 0.0 0.1 18168 3100 ? Ss 21:09 0:00 bash
root 8328 0.7 0.6 329072 13420 ? Sl 22:03 0:00 docker -d -s vfs
root 8373 1.0 0.5 196500 10548 ? Sl 22:03 0:00 userland-proxy -proto tcp -host-ip 0.0.0.0 -host-port 49153 -container-ip 10.0.0.2 -container-port 81
root 8382 1.0 0.5 270232 10576 ? Sl 22:03 0:00 userland-proxy -proto tcp -host-ip 0.0.0.0 -host-port 49154 -container-ip 10.0.0.2 -container-port 90
root 8385 1.2 0.0 3168 184 pts/0 Ss+ 22:03 0:00 sh
root 8408 0.0 0.1 15568 2112 ? R+ 22:03 0:00 ps aux
```

This also helps us to cleanly cleanup the proxy processes by stopping
these commands instead of trying to terminate a goroutine.

Signed-off-by: Michael Crosby <michael@docker.com>

Michael Crosby authored on 2014/08/13 07:04:00
Showing 5 changed files
... ...
@@ -1,14 +1,19 @@
1 1
 package bridge
2 2
 
3 3
 import (
4
-	"fmt"
5 4
 	"net"
6 5
 	"strconv"
7 6
 	"testing"
8 7
 
8
+	"github.com/docker/docker/daemon/networkdriver/portmapper"
9 9
 	"github.com/docker/docker/engine"
10 10
 )
11 11
 
12
+func init() {
13
+	// reset the new proxy command for mocking out the userland proxy in tests
14
+	portmapper.NewProxy = portmapper.NewMockProxyCommand
15
+}
16
+
12 17
 func findFreePort(t *testing.T) int {
13 18
 	l, err := net.Listen("tcp", ":0")
14 19
 	if err != nil {
... ...
@@ -61,46 +66,3 @@ func TestAllocatePortDetection(t *testing.T) {
61 61
 		t.Fatal("Duplicate port allocation granted by AllocatePort")
62 62
 	}
63 63
 }
64
-
65
-func TestAllocatePortReclaim(t *testing.T) {
66
-	eng := engine.New()
67
-	eng.Logging = false
68
-
69
-	freePort := findFreePort(t)
70
-
71
-	// Init driver
72
-	job := eng.Job("initdriver")
73
-	if res := InitDriver(job); res != engine.StatusOK {
74
-		t.Fatal("Failed to initialize network driver")
75
-	}
76
-
77
-	// Allocate interface
78
-	job = eng.Job("allocate_interface", "container_id")
79
-	if res := Allocate(job); res != engine.StatusOK {
80
-		t.Fatal("Failed to allocate network interface")
81
-	}
82
-
83
-	// Occupy port
84
-	listenAddr := fmt.Sprintf(":%d", freePort)
85
-	tcpListenAddr, err := net.ResolveTCPAddr("tcp", listenAddr)
86
-	if err != nil {
87
-		t.Fatalf("Failed to resolve TCP address '%s'", listenAddr)
88
-	}
89
-
90
-	l, err := net.ListenTCP("tcp", tcpListenAddr)
91
-	if err != nil {
92
-		t.Fatalf("Fail to listen on port %d", freePort)
93
-	}
94
-
95
-	// Allocate port, expect failure
96
-	job = newPortAllocationJob(eng, freePort)
97
-	if res := AllocatePort(job); res == engine.StatusOK {
98
-		t.Fatal("Successfully allocated currently used port")
99
-	}
100
-
101
-	// Reclaim port, retry allocation
102
-	l.Close()
103
-	if res := AllocatePort(job); res != engine.StatusOK {
104
-		t.Fatal("Failed to allocate previously reclaimed port")
105
-	}
106
-}
... ...
@@ -8,12 +8,11 @@ import (
8 8
 
9 9
 	"github.com/docker/docker/daemon/networkdriver/portallocator"
10 10
 	"github.com/docker/docker/pkg/iptables"
11
-	"github.com/docker/docker/pkg/proxy"
12 11
 )
13 12
 
14 13
 type mapping struct {
15 14
 	proto         string
16
-	userlandProxy proxy.Proxy
15
+	userlandProxy UserlandProxy
17 16
 	host          net.Addr
18 17
 	container     net.Addr
19 18
 }
... ...
@@ -24,7 +23,8 @@ var (
24 24
 
25 25
 	// udp:ip:port
26 26
 	currentMappings = make(map[string]*mapping)
27
-	newProxy        = proxy.NewProxy
27
+
28
+	NewProxy = NewProxyCommand
28 29
 )
29 30
 
30 31
 var (
... ...
@@ -45,6 +45,7 @@ func Map(container net.Addr, hostIP net.IP, hostPort int) (host net.Addr, err er
45 45
 		m                 *mapping
46 46
 		proto             string
47 47
 		allocatedHostPort int
48
+		proxy             UserlandProxy
48 49
 	)
49 50
 
50 51
 	switch container.(type) {
... ...
@@ -53,21 +54,27 @@ func Map(container net.Addr, hostIP net.IP, hostPort int) (host net.Addr, err er
53 53
 		if allocatedHostPort, err = portallocator.RequestPort(hostIP, proto, hostPort); err != nil {
54 54
 			return nil, err
55 55
 		}
56
+
56 57
 		m = &mapping{
57 58
 			proto:     proto,
58 59
 			host:      &net.TCPAddr{IP: hostIP, Port: allocatedHostPort},
59 60
 			container: container,
60 61
 		}
62
+
63
+		proxy = NewProxy(proto, hostIP, allocatedHostPort, container.(*net.TCPAddr).IP, container.(*net.TCPAddr).Port)
61 64
 	case *net.UDPAddr:
62 65
 		proto = "udp"
63 66
 		if allocatedHostPort, err = portallocator.RequestPort(hostIP, proto, hostPort); err != nil {
64 67
 			return nil, err
65 68
 		}
69
+
66 70
 		m = &mapping{
67 71
 			proto:     proto,
68 72
 			host:      &net.UDPAddr{IP: hostIP, Port: allocatedHostPort},
69 73
 			container: container,
70 74
 		}
75
+
76
+		proxy = NewProxy(proto, hostIP, allocatedHostPort, container.(*net.UDPAddr).IP, container.(*net.UDPAddr).Port)
71 77
 	default:
72 78
 		return nil, ErrUnknownBackendAddressType
73 79
 	}
... ...
@@ -89,18 +96,16 @@ func Map(container net.Addr, hostIP net.IP, hostPort int) (host net.Addr, err er
89 89
 		return nil, err
90 90
 	}
91 91
 
92
-	p, err := newProxy(m.host, m.container)
93
-	if err != nil {
92
+	m.userlandProxy = proxy
93
+	currentMappings[key] = m
94
+
95
+	if err := proxy.Start(); err != nil {
94 96
 		// need to undo the iptables rules before we return
95 97
 		forward(iptables.Delete, m.proto, hostIP, allocatedHostPort, containerIP.String(), containerPort)
98
+
96 99
 		return nil, err
97 100
 	}
98 101
 
99
-	m.userlandProxy = p
100
-	currentMappings[key] = m
101
-
102
-	go p.Run()
103
-
104 102
 	return m.host, nil
105 103
 }
106 104
 
... ...
@@ -114,7 +119,8 @@ func Unmap(host net.Addr) error {
114 114
 		return ErrPortNotMapped
115 115
 	}
116 116
 
117
-	data.userlandProxy.Close()
117
+	data.userlandProxy.Stop()
118
+
118 119
 	delete(currentMappings, key)
119 120
 
120 121
 	containerIP, containerPort := getIPAndPort(data.container)
... ...
@@ -6,12 +6,11 @@ import (
6 6
 
7 7
 	"github.com/docker/docker/daemon/networkdriver/portallocator"
8 8
 	"github.com/docker/docker/pkg/iptables"
9
-	"github.com/docker/docker/pkg/proxy"
10 9
 )
11 10
 
12 11
 func init() {
13 12
 	// override this func to mock out the proxy server
14
-	newProxy = proxy.NewStubProxy
13
+	NewProxy = NewMockProxyCommand
15 14
 }
16 15
 
17 16
 func reset() {
18 17
new file mode 100644
... ...
@@ -0,0 +1,18 @@
0
+package portmapper
1
+
2
+import "net"
3
+
4
+func NewMockProxyCommand(proto string, hostIP net.IP, hostPort int, containerIP net.IP, containerPort int) UserlandProxy {
5
+	return &mockProxyCommand{}
6
+}
7
+
8
+type mockProxyCommand struct {
9
+}
10
+
11
+func (p *mockProxyCommand) Start() error {
12
+	return nil
13
+}
14
+
15
+func (p *mockProxyCommand) Stop() error {
16
+	return nil
17
+}
0 18
new file mode 100644
... ...
@@ -0,0 +1,119 @@
0
+package portmapper
1
+
2
+import (
3
+	"flag"
4
+	"log"
5
+	"net"
6
+	"os"
7
+	"os/exec"
8
+	"os/signal"
9
+	"strconv"
10
+	"syscall"
11
+
12
+	"github.com/docker/docker/pkg/proxy"
13
+	"github.com/docker/docker/reexec"
14
+)
15
+
16
+const userlandProxyCommandName = "docker-proxy"
17
+
18
+func init() {
19
+	reexec.Register(userlandProxyCommandName, execProxy)
20
+}
21
+
22
+type UserlandProxy interface {
23
+	Start() error
24
+	Stop() error
25
+}
26
+
27
+// proxyCommand wraps an exec.Cmd to run the userland TCP and UDP
28
+// proxies as separate processes.
29
+type proxyCommand struct {
30
+	cmd *exec.Cmd
31
+}
32
+
33
+// execProxy is the reexec function that is registered to start the userland proxies
34
+func execProxy() {
35
+	host, container := parseHostContainerAddrs()
36
+
37
+	p, err := proxy.NewProxy(host, container)
38
+	if err != nil {
39
+		log.Fatal(err)
40
+	}
41
+
42
+	go handleStopSignals(p)
43
+
44
+	// Run will block until the proxy stops
45
+	p.Run()
46
+}
47
+
48
+// parseHostContainerAddrs parses the flags passed on reexec to create the TCP or UDP
49
+// net.Addrs to map the host and container ports
50
+func parseHostContainerAddrs() (host net.Addr, container net.Addr) {
51
+	var (
52
+		proto         = flag.String("proto", "tcp", "proxy protocol")
53
+		hostIP        = flag.String("host-ip", "", "host ip")
54
+		hostPort      = flag.Int("host-port", -1, "host port")
55
+		containerIP   = flag.String("container-ip", "", "container ip")
56
+		containerPort = flag.Int("container-port", -1, "container port")
57
+	)
58
+
59
+	flag.Parse()
60
+
61
+	switch *proto {
62
+	case "tcp":
63
+		host = &net.TCPAddr{IP: net.ParseIP(*hostIP), Port: *hostPort}
64
+		container = &net.TCPAddr{IP: net.ParseIP(*containerIP), Port: *containerPort}
65
+	case "udp":
66
+		host = &net.UDPAddr{IP: net.ParseIP(*hostIP), Port: *hostPort}
67
+		container = &net.UDPAddr{IP: net.ParseIP(*containerIP), Port: *containerPort}
68
+	default:
69
+		log.Fatalf("unsupported protocol %s", *proto)
70
+	}
71
+
72
+	return host, container
73
+}
74
+
75
+func handleStopSignals(p proxy.Proxy) {
76
+	s := make(chan os.Signal, 10)
77
+	signal.Notify(s, os.Interrupt, syscall.SIGTERM, syscall.SIGSTOP)
78
+
79
+	for _ = range s {
80
+		p.Close()
81
+
82
+		os.Exit(0)
83
+	}
84
+}
85
+
86
+func NewProxyCommand(proto string, hostIP net.IP, hostPort int, containerIP net.IP, containerPort int) UserlandProxy {
87
+	args := []string{
88
+		userlandProxyCommandName,
89
+		"-proto", proto,
90
+		"-host-ip", hostIP.String(),
91
+		"-host-port", strconv.Itoa(hostPort),
92
+		"-container-ip", containerIP.String(),
93
+		"-container-port", strconv.Itoa(containerPort),
94
+	}
95
+
96
+	return &proxyCommand{
97
+		cmd: &exec.Cmd{
98
+			Path:   reexec.Self(),
99
+			Args:   args,
100
+			Stdout: os.Stdout,
101
+			Stderr: os.Stderr,
102
+			SysProcAttr: &syscall.SysProcAttr{
103
+				Pdeathsig: syscall.SIGTERM, // send a sigterm to the proxy if the daemon process dies
104
+			},
105
+		},
106
+	}
107
+}
108
+
109
+func (p *proxyCommand) Start() error {
110
+	return p.cmd.Start()
111
+}
112
+
113
+func (p *proxyCommand) Stop() error {
114
+	err := p.cmd.Process.Signal(os.Interrupt)
115
+	p.cmd.Wait()
116
+
117
+	return err
118
+}