Browse code

Move proxy into pkg Docker-DCO-1.1-Signed-off-by: Michael Crosby <michael@crosbymichael.com> (github: crosbymichael)

Michael Crosby authored on 2014/02/15 08:25:25
Showing 15 changed files
... ...
@@ -4,7 +4,7 @@ import (
4 4
 	"errors"
5 5
 	"fmt"
6 6
 	"github.com/dotcloud/docker/pkg/iptables"
7
-	"github.com/dotcloud/docker/proxy"
7
+	"github.com/dotcloud/docker/pkg/proxy"
8 8
 	"net"
9 9
 	"sync"
10 10
 )
... ...
@@ -2,7 +2,7 @@ package portmapper
2 2
 
3 3
 import (
4 4
 	"github.com/dotcloud/docker/pkg/iptables"
5
-	"github.com/dotcloud/docker/proxy"
5
+	"github.com/dotcloud/docker/pkg/proxy"
6 6
 	"net"
7 7
 	"testing"
8 8
 )
9 9
new file mode 100644
... ...
@@ -0,0 +1 @@
0
+Michael Crosby <michael@crosbymichael.com> (@crosbymichael)
0 1
new file mode 100644
... ...
@@ -0,0 +1,216 @@
0
+package proxy
1
+
2
+import (
3
+	"bytes"
4
+	"fmt"
5
+	"io"
6
+	"net"
7
+	"strings"
8
+	"testing"
9
+	"time"
10
+)
11
+
12
+var testBuf = []byte("Buffalo buffalo Buffalo buffalo buffalo buffalo Buffalo buffalo")
13
+var testBufSize = len(testBuf)
14
+
15
+type EchoServer interface {
16
+	Run()
17
+	Close()
18
+	LocalAddr() net.Addr
19
+}
20
+
21
+type TCPEchoServer struct {
22
+	listener net.Listener
23
+	testCtx  *testing.T
24
+}
25
+
26
+type UDPEchoServer struct {
27
+	conn    net.PacketConn
28
+	testCtx *testing.T
29
+}
30
+
31
+func NewEchoServer(t *testing.T, proto, address string) EchoServer {
32
+	var server EchoServer
33
+	if strings.HasPrefix(proto, "tcp") {
34
+		listener, err := net.Listen(proto, address)
35
+		if err != nil {
36
+			t.Fatal(err)
37
+		}
38
+		server = &TCPEchoServer{listener: listener, testCtx: t}
39
+	} else {
40
+		socket, err := net.ListenPacket(proto, address)
41
+		if err != nil {
42
+			t.Fatal(err)
43
+		}
44
+		server = &UDPEchoServer{conn: socket, testCtx: t}
45
+	}
46
+	return server
47
+}
48
+
49
+func (server *TCPEchoServer) Run() {
50
+	go func() {
51
+		for {
52
+			client, err := server.listener.Accept()
53
+			if err != nil {
54
+				return
55
+			}
56
+			go func(client net.Conn) {
57
+				if _, err := io.Copy(client, client); err != nil {
58
+					server.testCtx.Logf("can't echo to the client: %v\n", err.Error())
59
+				}
60
+				client.Close()
61
+			}(client)
62
+		}
63
+	}()
64
+}
65
+
66
+func (server *TCPEchoServer) LocalAddr() net.Addr { return server.listener.Addr() }
67
+func (server *TCPEchoServer) Close()              { server.listener.Addr() }
68
+
69
+func (server *UDPEchoServer) Run() {
70
+	go func() {
71
+		readBuf := make([]byte, 1024)
72
+		for {
73
+			read, from, err := server.conn.ReadFrom(readBuf)
74
+			if err != nil {
75
+				return
76
+			}
77
+			for i := 0; i != read; {
78
+				written, err := server.conn.WriteTo(readBuf[i:read], from)
79
+				if err != nil {
80
+					break
81
+				}
82
+				i += written
83
+			}
84
+		}
85
+	}()
86
+}
87
+
88
+func (server *UDPEchoServer) LocalAddr() net.Addr { return server.conn.LocalAddr() }
89
+func (server *UDPEchoServer) Close()              { server.conn.Close() }
90
+
91
+func testProxyAt(t *testing.T, proto string, proxy Proxy, addr string) {
92
+	defer proxy.Close()
93
+	go proxy.Run()
94
+	client, err := net.Dial(proto, addr)
95
+	if err != nil {
96
+		t.Fatalf("Can't connect to the proxy: %v", err)
97
+	}
98
+	defer client.Close()
99
+	client.SetDeadline(time.Now().Add(10 * time.Second))
100
+	if _, err = client.Write(testBuf); err != nil {
101
+		t.Fatal(err)
102
+	}
103
+	recvBuf := make([]byte, testBufSize)
104
+	if _, err = client.Read(recvBuf); err != nil {
105
+		t.Fatal(err)
106
+	}
107
+	if !bytes.Equal(testBuf, recvBuf) {
108
+		t.Fatal(fmt.Errorf("Expected [%v] but got [%v]", testBuf, recvBuf))
109
+	}
110
+}
111
+
112
+func testProxy(t *testing.T, proto string, proxy Proxy) {
113
+	testProxyAt(t, proto, proxy, proxy.FrontendAddr().String())
114
+}
115
+
116
+func TestTCP4Proxy(t *testing.T) {
117
+	backend := NewEchoServer(t, "tcp", "127.0.0.1:0")
118
+	defer backend.Close()
119
+	backend.Run()
120
+	frontendAddr := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
121
+	proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
122
+	if err != nil {
123
+		t.Fatal(err)
124
+	}
125
+	testProxy(t, "tcp", proxy)
126
+}
127
+
128
+func TestTCP6Proxy(t *testing.T) {
129
+	backend := NewEchoServer(t, "tcp", "[::1]:0")
130
+	defer backend.Close()
131
+	backend.Run()
132
+	frontendAddr := &net.TCPAddr{IP: net.IPv6loopback, Port: 0}
133
+	proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
134
+	if err != nil {
135
+		t.Fatal(err)
136
+	}
137
+	testProxy(t, "tcp", proxy)
138
+}
139
+
140
+func TestTCPDualStackProxy(t *testing.T) {
141
+	// If I understand `godoc -src net favoriteAddrFamily` (used by the
142
+	// net.Listen* functions) correctly this should work, but it doesn't.
143
+	t.Skip("No support for dual stack yet")
144
+	backend := NewEchoServer(t, "tcp", "[::1]:0")
145
+	defer backend.Close()
146
+	backend.Run()
147
+	frontendAddr := &net.TCPAddr{IP: net.IPv6loopback, Port: 0}
148
+	proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
149
+	if err != nil {
150
+		t.Fatal(err)
151
+	}
152
+	ipv4ProxyAddr := &net.TCPAddr{
153
+		IP:   net.IPv4(127, 0, 0, 1),
154
+		Port: proxy.FrontendAddr().(*net.TCPAddr).Port,
155
+	}
156
+	testProxyAt(t, "tcp", proxy, ipv4ProxyAddr.String())
157
+}
158
+
159
+func TestUDP4Proxy(t *testing.T) {
160
+	backend := NewEchoServer(t, "udp", "127.0.0.1:0")
161
+	defer backend.Close()
162
+	backend.Run()
163
+	frontendAddr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
164
+	proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
165
+	if err != nil {
166
+		t.Fatal(err)
167
+	}
168
+	testProxy(t, "udp", proxy)
169
+}
170
+
171
+func TestUDP6Proxy(t *testing.T) {
172
+	backend := NewEchoServer(t, "udp", "[::1]:0")
173
+	defer backend.Close()
174
+	backend.Run()
175
+	frontendAddr := &net.UDPAddr{IP: net.IPv6loopback, Port: 0}
176
+	proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
177
+	if err != nil {
178
+		t.Fatal(err)
179
+	}
180
+	testProxy(t, "udp", proxy)
181
+}
182
+
183
+func TestUDPWriteError(t *testing.T) {
184
+	frontendAddr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
185
+	// Hopefully, this port will be free: */
186
+	backendAddr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 25587}
187
+	proxy, err := NewProxy(frontendAddr, backendAddr)
188
+	if err != nil {
189
+		t.Fatal(err)
190
+	}
191
+	defer proxy.Close()
192
+	go proxy.Run()
193
+	client, err := net.Dial("udp", "127.0.0.1:25587")
194
+	if err != nil {
195
+		t.Fatalf("Can't connect to the proxy: %v", err)
196
+	}
197
+	defer client.Close()
198
+	// Make sure the proxy doesn't stop when there is no actual backend:
199
+	client.Write(testBuf)
200
+	client.Write(testBuf)
201
+	backend := NewEchoServer(t, "udp", "127.0.0.1:25587")
202
+	defer backend.Close()
203
+	backend.Run()
204
+	client.SetDeadline(time.Now().Add(10 * time.Second))
205
+	if _, err = client.Write(testBuf); err != nil {
206
+		t.Fatal(err)
207
+	}
208
+	recvBuf := make([]byte, testBufSize)
209
+	if _, err = client.Read(recvBuf); err != nil {
210
+		t.Fatal(err)
211
+	}
212
+	if !bytes.Equal(testBuf, recvBuf) {
213
+		t.Fatal(fmt.Errorf("Expected [%v] but got [%v]", testBuf, recvBuf))
214
+	}
215
+}
0 216
new file mode 100644
... ...
@@ -0,0 +1,29 @@
0
+package proxy
1
+
2
+import (
3
+	"fmt"
4
+	"net"
5
+)
6
+
7
+type Proxy interface {
8
+	// Start forwarding traffic back and forth the front and back-end
9
+	// addresses.
10
+	Run()
11
+	// Stop forwarding traffic and close both ends of the Proxy.
12
+	Close()
13
+	// Return the address on which the proxy is listening.
14
+	FrontendAddr() net.Addr
15
+	// Return the proxied address.
16
+	BackendAddr() net.Addr
17
+}
18
+
19
+func NewProxy(frontendAddr, backendAddr net.Addr) (Proxy, error) {
20
+	switch frontendAddr.(type) {
21
+	case *net.UDPAddr:
22
+		return NewUDPProxy(frontendAddr.(*net.UDPAddr), backendAddr.(*net.UDPAddr))
23
+	case *net.TCPAddr:
24
+		return NewTCPProxy(frontendAddr.(*net.TCPAddr), backendAddr.(*net.TCPAddr))
25
+	default:
26
+		panic(fmt.Errorf("Unsupported protocol"))
27
+	}
28
+}
0 29
new file mode 100644
... ...
@@ -0,0 +1,22 @@
0
+package proxy
1
+
2
+import (
3
+	"net"
4
+)
5
+
6
+type StubProxy struct {
7
+	frontendAddr net.Addr
8
+	backendAddr  net.Addr
9
+}
10
+
11
+func (p *StubProxy) Run()                   {}
12
+func (p *StubProxy) Close()                 {}
13
+func (p *StubProxy) FrontendAddr() net.Addr { return p.frontendAddr }
14
+func (p *StubProxy) BackendAddr() net.Addr  { return p.backendAddr }
15
+
16
+func NewStubProxy(frontendAddr, backendAddr net.Addr) (Proxy, error) {
17
+	return &StubProxy{
18
+		frontendAddr: frontendAddr,
19
+		backendAddr:  backendAddr,
20
+	}, nil
21
+}
0 22
new file mode 100644
... ...
@@ -0,0 +1,93 @@
0
+package proxy
1
+
2
+import (
3
+	"io"
4
+	"log"
5
+	"net"
6
+	"syscall"
7
+)
8
+
9
+type TCPProxy struct {
10
+	listener     *net.TCPListener
11
+	frontendAddr *net.TCPAddr
12
+	backendAddr  *net.TCPAddr
13
+}
14
+
15
+func NewTCPProxy(frontendAddr, backendAddr *net.TCPAddr) (*TCPProxy, error) {
16
+	listener, err := net.ListenTCP("tcp", frontendAddr)
17
+	if err != nil {
18
+		return nil, err
19
+	}
20
+	// If the port in frontendAddr was 0 then ListenTCP will have a picked
21
+	// a port to listen on, hence the call to Addr to get that actual port:
22
+	return &TCPProxy{
23
+		listener:     listener,
24
+		frontendAddr: listener.Addr().(*net.TCPAddr),
25
+		backendAddr:  backendAddr,
26
+	}, nil
27
+}
28
+
29
+func (proxy *TCPProxy) clientLoop(client *net.TCPConn, quit chan bool) {
30
+	backend, err := net.DialTCP("tcp", nil, proxy.backendAddr)
31
+	if err != nil {
32
+		log.Printf("Can't forward traffic to backend tcp/%v: %v\n", proxy.backendAddr, err.Error())
33
+		client.Close()
34
+		return
35
+	}
36
+
37
+	event := make(chan int64)
38
+	var broker = func(to, from *net.TCPConn) {
39
+		written, err := io.Copy(to, from)
40
+		if err != nil {
41
+			// If the socket we are writing to is shutdown with
42
+			// SHUT_WR, forward it to the other end of the pipe:
43
+			if err, ok := err.(*net.OpError); ok && err.Err == syscall.EPIPE {
44
+				from.CloseWrite()
45
+			}
46
+		}
47
+		to.CloseRead()
48
+		event <- written
49
+	}
50
+
51
+	log.Printf("Forwarding traffic between tcp/%v and tcp/%v", client.RemoteAddr(), backend.RemoteAddr())
52
+	go broker(client, backend)
53
+	go broker(backend, client)
54
+
55
+	var transferred int64 = 0
56
+	for i := 0; i < 2; i++ {
57
+		select {
58
+		case written := <-event:
59
+			transferred += written
60
+		case <-quit:
61
+			// Interrupt the two brokers and "join" them.
62
+			client.Close()
63
+			backend.Close()
64
+			for ; i < 2; i++ {
65
+				transferred += <-event
66
+			}
67
+			goto done
68
+		}
69
+	}
70
+	client.Close()
71
+	backend.Close()
72
+done:
73
+	log.Printf("%v bytes transferred between tcp/%v and tcp/%v", transferred, client.RemoteAddr(), backend.RemoteAddr())
74
+}
75
+
76
+func (proxy *TCPProxy) Run() {
77
+	quit := make(chan bool)
78
+	defer close(quit)
79
+	log.Printf("Starting proxy on tcp/%v for tcp/%v", proxy.frontendAddr, proxy.backendAddr)
80
+	for {
81
+		client, err := proxy.listener.Accept()
82
+		if err != nil {
83
+			log.Printf("Stopping proxy on tcp/%v for tcp/%v (%v)", proxy.frontendAddr, proxy.backendAddr, err.Error())
84
+			return
85
+		}
86
+		go proxy.clientLoop(client.(*net.TCPConn), quit)
87
+	}
88
+}
89
+
90
+func (proxy *TCPProxy) Close()                 { proxy.listener.Close() }
91
+func (proxy *TCPProxy) FrontendAddr() net.Addr { return proxy.frontendAddr }
92
+func (proxy *TCPProxy) BackendAddr() net.Addr  { return proxy.backendAddr }
0 93
new file mode 100644
... ...
@@ -0,0 +1,162 @@
0
+package proxy
1
+
2
+import (
3
+	"encoding/binary"
4
+	"log"
5
+	"net"
6
+	"strings"
7
+	"sync"
8
+	"syscall"
9
+	"time"
10
+)
11
+
12
+const (
13
+	UDPConnTrackTimeout = 90 * time.Second
14
+	UDPBufSize          = 2048
15
+)
16
+
17
+// A net.Addr where the IP is split into two fields so you can use it as a key
18
+// in a map:
19
+type connTrackKey struct {
20
+	IPHigh uint64
21
+	IPLow  uint64
22
+	Port   int
23
+}
24
+
25
+func newConnTrackKey(addr *net.UDPAddr) *connTrackKey {
26
+	if len(addr.IP) == net.IPv4len {
27
+		return &connTrackKey{
28
+			IPHigh: 0,
29
+			IPLow:  uint64(binary.BigEndian.Uint32(addr.IP)),
30
+			Port:   addr.Port,
31
+		}
32
+	}
33
+	return &connTrackKey{
34
+		IPHigh: binary.BigEndian.Uint64(addr.IP[:8]),
35
+		IPLow:  binary.BigEndian.Uint64(addr.IP[8:]),
36
+		Port:   addr.Port,
37
+	}
38
+}
39
+
40
+type connTrackMap map[connTrackKey]*net.UDPConn
41
+
42
+type UDPProxy struct {
43
+	listener       *net.UDPConn
44
+	frontendAddr   *net.UDPAddr
45
+	backendAddr    *net.UDPAddr
46
+	connTrackTable connTrackMap
47
+	connTrackLock  sync.Mutex
48
+}
49
+
50
+func NewUDPProxy(frontendAddr, backendAddr *net.UDPAddr) (*UDPProxy, error) {
51
+	listener, err := net.ListenUDP("udp", frontendAddr)
52
+	if err != nil {
53
+		return nil, err
54
+	}
55
+	return &UDPProxy{
56
+		listener:       listener,
57
+		frontendAddr:   listener.LocalAddr().(*net.UDPAddr),
58
+		backendAddr:    backendAddr,
59
+		connTrackTable: make(connTrackMap),
60
+	}, nil
61
+}
62
+
63
+func (proxy *UDPProxy) replyLoop(proxyConn *net.UDPConn, clientAddr *net.UDPAddr, clientKey *connTrackKey) {
64
+	defer func() {
65
+		proxy.connTrackLock.Lock()
66
+		delete(proxy.connTrackTable, *clientKey)
67
+		proxy.connTrackLock.Unlock()
68
+		log.Printf("Done proxying between udp/%v and udp/%v", clientAddr.String(), proxy.backendAddr.String())
69
+		proxyConn.Close()
70
+	}()
71
+
72
+	readBuf := make([]byte, UDPBufSize)
73
+	for {
74
+		proxyConn.SetReadDeadline(time.Now().Add(UDPConnTrackTimeout))
75
+	again:
76
+		read, err := proxyConn.Read(readBuf)
77
+		if err != nil {
78
+			if err, ok := err.(*net.OpError); ok && err.Err == syscall.ECONNREFUSED {
79
+				// This will happen if the last write failed
80
+				// (e.g: nothing is actually listening on the
81
+				// proxied port on the container), ignore it
82
+				// and continue until UDPConnTrackTimeout
83
+				// expires:
84
+				goto again
85
+			}
86
+			return
87
+		}
88
+		for i := 0; i != read; {
89
+			written, err := proxy.listener.WriteToUDP(readBuf[i:read], clientAddr)
90
+			if err != nil {
91
+				return
92
+			}
93
+			i += written
94
+			log.Printf("Forwarded %v/%v bytes to udp/%v", i, read, clientAddr.String())
95
+		}
96
+	}
97
+}
98
+
99
+func (proxy *UDPProxy) Run() {
100
+	readBuf := make([]byte, UDPBufSize)
101
+	log.Printf("Starting proxy on udp/%v for udp/%v", proxy.frontendAddr, proxy.backendAddr)
102
+	for {
103
+		read, from, err := proxy.listener.ReadFromUDP(readBuf)
104
+		if err != nil {
105
+			// NOTE: Apparently ReadFrom doesn't return
106
+			// ECONNREFUSED like Read do (see comment in
107
+			// UDPProxy.replyLoop)
108
+			if isClosedError(err) {
109
+				log.Printf("Stopping proxy on udp/%v for udp/%v (socket was closed)", proxy.frontendAddr, proxy.backendAddr)
110
+			} else {
111
+				log.Printf("Stopping proxy on udp/%v for udp/%v (%v)", proxy.frontendAddr, proxy.backendAddr, err.Error())
112
+			}
113
+			break
114
+		}
115
+
116
+		fromKey := newConnTrackKey(from)
117
+		proxy.connTrackLock.Lock()
118
+		proxyConn, hit := proxy.connTrackTable[*fromKey]
119
+		if !hit {
120
+			proxyConn, err = net.DialUDP("udp", nil, proxy.backendAddr)
121
+			if err != nil {
122
+				log.Printf("Can't proxy a datagram to udp/%s: %v\n", proxy.backendAddr.String(), err)
123
+				continue
124
+			}
125
+			proxy.connTrackTable[*fromKey] = proxyConn
126
+			go proxy.replyLoop(proxyConn, from, fromKey)
127
+		}
128
+		proxy.connTrackLock.Unlock()
129
+		for i := 0; i != read; {
130
+			written, err := proxyConn.Write(readBuf[i:read])
131
+			if err != nil {
132
+				log.Printf("Can't proxy a datagram to udp/%s: %v\n", proxy.backendAddr.String(), err)
133
+				break
134
+			}
135
+			i += written
136
+			log.Printf("Forwarded %v/%v bytes to udp/%v", i, read, proxy.backendAddr.String())
137
+		}
138
+	}
139
+}
140
+
141
+func (proxy *UDPProxy) Close() {
142
+	proxy.listener.Close()
143
+	proxy.connTrackLock.Lock()
144
+	defer proxy.connTrackLock.Unlock()
145
+	for _, conn := range proxy.connTrackTable {
146
+		conn.Close()
147
+	}
148
+}
149
+
150
+func (proxy *UDPProxy) FrontendAddr() net.Addr { return proxy.frontendAddr }
151
+func (proxy *UDPProxy) BackendAddr() net.Addr  { return proxy.backendAddr }
152
+
153
+func isClosedError(err error) bool {
154
+	/* This comparison is ugly, but unfortunately, net.go doesn't export errClosing.
155
+	 * See:
156
+	 * http://golang.org/src/pkg/net/net.go
157
+	 * https://code.google.com/p/go/issues/detail?id=4337
158
+	 * https://groups.google.com/forum/#!msg/golang-nuts/0_aaCvBmOcM/SptmDyX1XJMJ
159
+	 */
160
+	return strings.HasSuffix(err.Error(), "use of closed network connection")
161
+}
0 162
deleted file mode 100644
... ...
@@ -1 +0,0 @@
1
-Michael Crosby <michael@crosbymichael.com> (@crosbymichael)
2 1
deleted file mode 100644
... ...
@@ -1,216 +0,0 @@
1
-package proxy
2
-
3
-import (
4
-	"bytes"
5
-	"fmt"
6
-	"io"
7
-	"net"
8
-	"strings"
9
-	"testing"
10
-	"time"
11
-)
12
-
13
-var testBuf = []byte("Buffalo buffalo Buffalo buffalo buffalo buffalo Buffalo buffalo")
14
-var testBufSize = len(testBuf)
15
-
16
-type EchoServer interface {
17
-	Run()
18
-	Close()
19
-	LocalAddr() net.Addr
20
-}
21
-
22
-type TCPEchoServer struct {
23
-	listener net.Listener
24
-	testCtx  *testing.T
25
-}
26
-
27
-type UDPEchoServer struct {
28
-	conn    net.PacketConn
29
-	testCtx *testing.T
30
-}
31
-
32
-func NewEchoServer(t *testing.T, proto, address string) EchoServer {
33
-	var server EchoServer
34
-	if strings.HasPrefix(proto, "tcp") {
35
-		listener, err := net.Listen(proto, address)
36
-		if err != nil {
37
-			t.Fatal(err)
38
-		}
39
-		server = &TCPEchoServer{listener: listener, testCtx: t}
40
-	} else {
41
-		socket, err := net.ListenPacket(proto, address)
42
-		if err != nil {
43
-			t.Fatal(err)
44
-		}
45
-		server = &UDPEchoServer{conn: socket, testCtx: t}
46
-	}
47
-	return server
48
-}
49
-
50
-func (server *TCPEchoServer) Run() {
51
-	go func() {
52
-		for {
53
-			client, err := server.listener.Accept()
54
-			if err != nil {
55
-				return
56
-			}
57
-			go func(client net.Conn) {
58
-				if _, err := io.Copy(client, client); err != nil {
59
-					server.testCtx.Logf("can't echo to the client: %v\n", err.Error())
60
-				}
61
-				client.Close()
62
-			}(client)
63
-		}
64
-	}()
65
-}
66
-
67
-func (server *TCPEchoServer) LocalAddr() net.Addr { return server.listener.Addr() }
68
-func (server *TCPEchoServer) Close()              { server.listener.Addr() }
69
-
70
-func (server *UDPEchoServer) Run() {
71
-	go func() {
72
-		readBuf := make([]byte, 1024)
73
-		for {
74
-			read, from, err := server.conn.ReadFrom(readBuf)
75
-			if err != nil {
76
-				return
77
-			}
78
-			for i := 0; i != read; {
79
-				written, err := server.conn.WriteTo(readBuf[i:read], from)
80
-				if err != nil {
81
-					break
82
-				}
83
-				i += written
84
-			}
85
-		}
86
-	}()
87
-}
88
-
89
-func (server *UDPEchoServer) LocalAddr() net.Addr { return server.conn.LocalAddr() }
90
-func (server *UDPEchoServer) Close()              { server.conn.Close() }
91
-
92
-func testProxyAt(t *testing.T, proto string, proxy Proxy, addr string) {
93
-	defer proxy.Close()
94
-	go proxy.Run()
95
-	client, err := net.Dial(proto, addr)
96
-	if err != nil {
97
-		t.Fatalf("Can't connect to the proxy: %v", err)
98
-	}
99
-	defer client.Close()
100
-	client.SetDeadline(time.Now().Add(10 * time.Second))
101
-	if _, err = client.Write(testBuf); err != nil {
102
-		t.Fatal(err)
103
-	}
104
-	recvBuf := make([]byte, testBufSize)
105
-	if _, err = client.Read(recvBuf); err != nil {
106
-		t.Fatal(err)
107
-	}
108
-	if !bytes.Equal(testBuf, recvBuf) {
109
-		t.Fatal(fmt.Errorf("Expected [%v] but got [%v]", testBuf, recvBuf))
110
-	}
111
-}
112
-
113
-func testProxy(t *testing.T, proto string, proxy Proxy) {
114
-	testProxyAt(t, proto, proxy, proxy.FrontendAddr().String())
115
-}
116
-
117
-func TestTCP4Proxy(t *testing.T) {
118
-	backend := NewEchoServer(t, "tcp", "127.0.0.1:0")
119
-	defer backend.Close()
120
-	backend.Run()
121
-	frontendAddr := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
122
-	proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
123
-	if err != nil {
124
-		t.Fatal(err)
125
-	}
126
-	testProxy(t, "tcp", proxy)
127
-}
128
-
129
-func TestTCP6Proxy(t *testing.T) {
130
-	backend := NewEchoServer(t, "tcp", "[::1]:0")
131
-	defer backend.Close()
132
-	backend.Run()
133
-	frontendAddr := &net.TCPAddr{IP: net.IPv6loopback, Port: 0}
134
-	proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
135
-	if err != nil {
136
-		t.Fatal(err)
137
-	}
138
-	testProxy(t, "tcp", proxy)
139
-}
140
-
141
-func TestTCPDualStackProxy(t *testing.T) {
142
-	// If I understand `godoc -src net favoriteAddrFamily` (used by the
143
-	// net.Listen* functions) correctly this should work, but it doesn't.
144
-	t.Skip("No support for dual stack yet")
145
-	backend := NewEchoServer(t, "tcp", "[::1]:0")
146
-	defer backend.Close()
147
-	backend.Run()
148
-	frontendAddr := &net.TCPAddr{IP: net.IPv6loopback, Port: 0}
149
-	proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
150
-	if err != nil {
151
-		t.Fatal(err)
152
-	}
153
-	ipv4ProxyAddr := &net.TCPAddr{
154
-		IP:   net.IPv4(127, 0, 0, 1),
155
-		Port: proxy.FrontendAddr().(*net.TCPAddr).Port,
156
-	}
157
-	testProxyAt(t, "tcp", proxy, ipv4ProxyAddr.String())
158
-}
159
-
160
-func TestUDP4Proxy(t *testing.T) {
161
-	backend := NewEchoServer(t, "udp", "127.0.0.1:0")
162
-	defer backend.Close()
163
-	backend.Run()
164
-	frontendAddr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
165
-	proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
166
-	if err != nil {
167
-		t.Fatal(err)
168
-	}
169
-	testProxy(t, "udp", proxy)
170
-}
171
-
172
-func TestUDP6Proxy(t *testing.T) {
173
-	backend := NewEchoServer(t, "udp", "[::1]:0")
174
-	defer backend.Close()
175
-	backend.Run()
176
-	frontendAddr := &net.UDPAddr{IP: net.IPv6loopback, Port: 0}
177
-	proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
178
-	if err != nil {
179
-		t.Fatal(err)
180
-	}
181
-	testProxy(t, "udp", proxy)
182
-}
183
-
184
-func TestUDPWriteError(t *testing.T) {
185
-	frontendAddr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
186
-	// Hopefully, this port will be free: */
187
-	backendAddr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 25587}
188
-	proxy, err := NewProxy(frontendAddr, backendAddr)
189
-	if err != nil {
190
-		t.Fatal(err)
191
-	}
192
-	defer proxy.Close()
193
-	go proxy.Run()
194
-	client, err := net.Dial("udp", "127.0.0.1:25587")
195
-	if err != nil {
196
-		t.Fatalf("Can't connect to the proxy: %v", err)
197
-	}
198
-	defer client.Close()
199
-	// Make sure the proxy doesn't stop when there is no actual backend:
200
-	client.Write(testBuf)
201
-	client.Write(testBuf)
202
-	backend := NewEchoServer(t, "udp", "127.0.0.1:25587")
203
-	defer backend.Close()
204
-	backend.Run()
205
-	client.SetDeadline(time.Now().Add(10 * time.Second))
206
-	if _, err = client.Write(testBuf); err != nil {
207
-		t.Fatal(err)
208
-	}
209
-	recvBuf := make([]byte, testBufSize)
210
-	if _, err = client.Read(recvBuf); err != nil {
211
-		t.Fatal(err)
212
-	}
213
-	if !bytes.Equal(testBuf, recvBuf) {
214
-		t.Fatal(fmt.Errorf("Expected [%v] but got [%v]", testBuf, recvBuf))
215
-	}
216
-}
217 1
deleted file mode 100644
... ...
@@ -1,29 +0,0 @@
1
-package proxy
2
-
3
-import (
4
-	"fmt"
5
-	"net"
6
-)
7
-
8
-type Proxy interface {
9
-	// Start forwarding traffic back and forth the front and back-end
10
-	// addresses.
11
-	Run()
12
-	// Stop forwarding traffic and close both ends of the Proxy.
13
-	Close()
14
-	// Return the address on which the proxy is listening.
15
-	FrontendAddr() net.Addr
16
-	// Return the proxied address.
17
-	BackendAddr() net.Addr
18
-}
19
-
20
-func NewProxy(frontendAddr, backendAddr net.Addr) (Proxy, error) {
21
-	switch frontendAddr.(type) {
22
-	case *net.UDPAddr:
23
-		return NewUDPProxy(frontendAddr.(*net.UDPAddr), backendAddr.(*net.UDPAddr))
24
-	case *net.TCPAddr:
25
-		return NewTCPProxy(frontendAddr.(*net.TCPAddr), backendAddr.(*net.TCPAddr))
26
-	default:
27
-		panic(fmt.Errorf("Unsupported protocol"))
28
-	}
29
-}
30 1
deleted file mode 100644
... ...
@@ -1,22 +0,0 @@
1
-package proxy
2
-
3
-import (
4
-	"net"
5
-)
6
-
7
-type StubProxy struct {
8
-	frontendAddr net.Addr
9
-	backendAddr  net.Addr
10
-}
11
-
12
-func (p *StubProxy) Run()                   {}
13
-func (p *StubProxy) Close()                 {}
14
-func (p *StubProxy) FrontendAddr() net.Addr { return p.frontendAddr }
15
-func (p *StubProxy) BackendAddr() net.Addr  { return p.backendAddr }
16
-
17
-func NewStubProxy(frontendAddr, backendAddr net.Addr) (Proxy, error) {
18
-	return &StubProxy{
19
-		frontendAddr: frontendAddr,
20
-		backendAddr:  backendAddr,
21
-	}, nil
22
-}
23 1
deleted file mode 100644
... ...
@@ -1,93 +0,0 @@
1
-package proxy
2
-
3
-import (
4
-	"github.com/dotcloud/docker/utils"
5
-	"io"
6
-	"log"
7
-	"net"
8
-	"syscall"
9
-)
10
-
11
-type TCPProxy struct {
12
-	listener     *net.TCPListener
13
-	frontendAddr *net.TCPAddr
14
-	backendAddr  *net.TCPAddr
15
-}
16
-
17
-func NewTCPProxy(frontendAddr, backendAddr *net.TCPAddr) (*TCPProxy, error) {
18
-	listener, err := net.ListenTCP("tcp", frontendAddr)
19
-	if err != nil {
20
-		return nil, err
21
-	}
22
-	// If the port in frontendAddr was 0 then ListenTCP will have a picked
23
-	// a port to listen on, hence the call to Addr to get that actual port:
24
-	return &TCPProxy{
25
-		listener:     listener,
26
-		frontendAddr: listener.Addr().(*net.TCPAddr),
27
-		backendAddr:  backendAddr,
28
-	}, nil
29
-}
30
-
31
-func (proxy *TCPProxy) clientLoop(client *net.TCPConn, quit chan bool) {
32
-	backend, err := net.DialTCP("tcp", nil, proxy.backendAddr)
33
-	if err != nil {
34
-		log.Printf("Can't forward traffic to backend tcp/%v: %v\n", proxy.backendAddr, err.Error())
35
-		client.Close()
36
-		return
37
-	}
38
-
39
-	event := make(chan int64)
40
-	var broker = func(to, from *net.TCPConn) {
41
-		written, err := io.Copy(to, from)
42
-		if err != nil {
43
-			// If the socket we are writing to is shutdown with
44
-			// SHUT_WR, forward it to the other end of the pipe:
45
-			if err, ok := err.(*net.OpError); ok && err.Err == syscall.EPIPE {
46
-				from.CloseWrite()
47
-			}
48
-		}
49
-		to.CloseRead()
50
-		event <- written
51
-	}
52
-	utils.Debugf("Forwarding traffic between tcp/%v and tcp/%v", client.RemoteAddr(), backend.RemoteAddr())
53
-	go broker(client, backend)
54
-	go broker(backend, client)
55
-
56
-	var transferred int64 = 0
57
-	for i := 0; i < 2; i++ {
58
-		select {
59
-		case written := <-event:
60
-			transferred += written
61
-		case <-quit:
62
-			// Interrupt the two brokers and "join" them.
63
-			client.Close()
64
-			backend.Close()
65
-			for ; i < 2; i++ {
66
-				transferred += <-event
67
-			}
68
-			goto done
69
-		}
70
-	}
71
-	client.Close()
72
-	backend.Close()
73
-done:
74
-	utils.Debugf("%v bytes transferred between tcp/%v and tcp/%v", transferred, client.RemoteAddr(), backend.RemoteAddr())
75
-}
76
-
77
-func (proxy *TCPProxy) Run() {
78
-	quit := make(chan bool)
79
-	defer close(quit)
80
-	utils.Debugf("Starting proxy on tcp/%v for tcp/%v", proxy.frontendAddr, proxy.backendAddr)
81
-	for {
82
-		client, err := proxy.listener.Accept()
83
-		if err != nil {
84
-			utils.Debugf("Stopping proxy on tcp/%v for tcp/%v (%v)", proxy.frontendAddr, proxy.backendAddr, err.Error())
85
-			return
86
-		}
87
-		go proxy.clientLoop(client.(*net.TCPConn), quit)
88
-	}
89
-}
90
-
91
-func (proxy *TCPProxy) Close()                 { proxy.listener.Close() }
92
-func (proxy *TCPProxy) FrontendAddr() net.Addr { return proxy.frontendAddr }
93
-func (proxy *TCPProxy) BackendAddr() net.Addr  { return proxy.backendAddr }
94 1
deleted file mode 100644
... ...
@@ -1,152 +0,0 @@
1
-package proxy
2
-
3
-import (
4
-	"encoding/binary"
5
-	"github.com/dotcloud/docker/utils"
6
-	"log"
7
-	"net"
8
-	"sync"
9
-	"syscall"
10
-	"time"
11
-)
12
-
13
-const (
14
-	UDPConnTrackTimeout = 90 * time.Second
15
-	UDPBufSize          = 2048
16
-)
17
-
18
-// A net.Addr where the IP is split into two fields so you can use it as a key
19
-// in a map:
20
-type connTrackKey struct {
21
-	IPHigh uint64
22
-	IPLow  uint64
23
-	Port   int
24
-}
25
-
26
-func newConnTrackKey(addr *net.UDPAddr) *connTrackKey {
27
-	if len(addr.IP) == net.IPv4len {
28
-		return &connTrackKey{
29
-			IPHigh: 0,
30
-			IPLow:  uint64(binary.BigEndian.Uint32(addr.IP)),
31
-			Port:   addr.Port,
32
-		}
33
-	}
34
-	return &connTrackKey{
35
-		IPHigh: binary.BigEndian.Uint64(addr.IP[:8]),
36
-		IPLow:  binary.BigEndian.Uint64(addr.IP[8:]),
37
-		Port:   addr.Port,
38
-	}
39
-}
40
-
41
-type connTrackMap map[connTrackKey]*net.UDPConn
42
-
43
-type UDPProxy struct {
44
-	listener       *net.UDPConn
45
-	frontendAddr   *net.UDPAddr
46
-	backendAddr    *net.UDPAddr
47
-	connTrackTable connTrackMap
48
-	connTrackLock  sync.Mutex
49
-}
50
-
51
-func NewUDPProxy(frontendAddr, backendAddr *net.UDPAddr) (*UDPProxy, error) {
52
-	listener, err := net.ListenUDP("udp", frontendAddr)
53
-	if err != nil {
54
-		return nil, err
55
-	}
56
-	return &UDPProxy{
57
-		listener:       listener,
58
-		frontendAddr:   listener.LocalAddr().(*net.UDPAddr),
59
-		backendAddr:    backendAddr,
60
-		connTrackTable: make(connTrackMap),
61
-	}, nil
62
-}
63
-
64
-func (proxy *UDPProxy) replyLoop(proxyConn *net.UDPConn, clientAddr *net.UDPAddr, clientKey *connTrackKey) {
65
-	defer func() {
66
-		proxy.connTrackLock.Lock()
67
-		delete(proxy.connTrackTable, *clientKey)
68
-		proxy.connTrackLock.Unlock()
69
-		utils.Debugf("Done proxying between udp/%v and udp/%v", clientAddr.String(), proxy.backendAddr.String())
70
-		proxyConn.Close()
71
-	}()
72
-
73
-	readBuf := make([]byte, UDPBufSize)
74
-	for {
75
-		proxyConn.SetReadDeadline(time.Now().Add(UDPConnTrackTimeout))
76
-	again:
77
-		read, err := proxyConn.Read(readBuf)
78
-		if err != nil {
79
-			if err, ok := err.(*net.OpError); ok && err.Err == syscall.ECONNREFUSED {
80
-				// This will happen if the last write failed
81
-				// (e.g: nothing is actually listening on the
82
-				// proxied port on the container), ignore it
83
-				// and continue until UDPConnTrackTimeout
84
-				// expires:
85
-				goto again
86
-			}
87
-			return
88
-		}
89
-		for i := 0; i != read; {
90
-			written, err := proxy.listener.WriteToUDP(readBuf[i:read], clientAddr)
91
-			if err != nil {
92
-				return
93
-			}
94
-			i += written
95
-			utils.Debugf("Forwarded %v/%v bytes to udp/%v", i, read, clientAddr.String())
96
-		}
97
-	}
98
-}
99
-
100
-func (proxy *UDPProxy) Run() {
101
-	readBuf := make([]byte, UDPBufSize)
102
-	utils.Debugf("Starting proxy on udp/%v for udp/%v", proxy.frontendAddr, proxy.backendAddr)
103
-	for {
104
-		read, from, err := proxy.listener.ReadFromUDP(readBuf)
105
-		if err != nil {
106
-			// NOTE: Apparently ReadFrom doesn't return
107
-			// ECONNREFUSED like Read do (see comment in
108
-			// UDPProxy.replyLoop)
109
-			if utils.IsClosedError(err) {
110
-				utils.Debugf("Stopping proxy on udp/%v for udp/%v (socket was closed)", proxy.frontendAddr, proxy.backendAddr)
111
-			} else {
112
-				utils.Errorf("Stopping proxy on udp/%v for udp/%v (%v)", proxy.frontendAddr, proxy.backendAddr, err.Error())
113
-			}
114
-			break
115
-		}
116
-
117
-		fromKey := newConnTrackKey(from)
118
-		proxy.connTrackLock.Lock()
119
-		proxyConn, hit := proxy.connTrackTable[*fromKey]
120
-		if !hit {
121
-			proxyConn, err = net.DialUDP("udp", nil, proxy.backendAddr)
122
-			if err != nil {
123
-				log.Printf("Can't proxy a datagram to udp/%s: %v\n", proxy.backendAddr.String(), err)
124
-				continue
125
-			}
126
-			proxy.connTrackTable[*fromKey] = proxyConn
127
-			go proxy.replyLoop(proxyConn, from, fromKey)
128
-		}
129
-		proxy.connTrackLock.Unlock()
130
-		for i := 0; i != read; {
131
-			written, err := proxyConn.Write(readBuf[i:read])
132
-			if err != nil {
133
-				log.Printf("Can't proxy a datagram to udp/%s: %v\n", proxy.backendAddr.String(), err)
134
-				break
135
-			}
136
-			i += written
137
-			utils.Debugf("Forwarded %v/%v bytes to udp/%v", i, read, proxy.backendAddr.String())
138
-		}
139
-	}
140
-}
141
-
142
-func (proxy *UDPProxy) Close() {
143
-	proxy.listener.Close()
144
-	proxy.connTrackLock.Lock()
145
-	defer proxy.connTrackLock.Unlock()
146
-	for _, conn := range proxy.connTrackTable {
147
-		conn.Close()
148
-	}
149
-}
150
-
151
-func (proxy *UDPProxy) FrontendAddr() net.Addr { return proxy.frontendAddr }
152
-func (proxy *UDPProxy) BackendAddr() net.Addr  { return proxy.backendAddr }
... ...
@@ -879,16 +879,6 @@ func ShellQuoteArguments(args []string) string {
879 879
 	return buf.String()
880 880
 }
881 881
 
882
-func IsClosedError(err error) bool {
883
-	/* This comparison is ugly, but unfortunately, net.go doesn't export errClosing.
884
-	 * See:
885
-	 * http://golang.org/src/pkg/net/net.go
886
-	 * https://code.google.com/p/go/issues/detail?id=4337
887
-	 * https://groups.google.com/forum/#!msg/golang-nuts/0_aaCvBmOcM/SptmDyX1XJMJ
888
-	 */
889
-	return strings.HasSuffix(err.Error(), "use of closed network connection")
890
-}
891
-
892 882
 func PartParser(template, data string) (map[string]string, error) {
893 883
 	// ip:public:private
894 884
 	var (