Browse code

Move proxy CLI to main cmd/

Since this command is part of the official distribution and even
required for tests, let's move this up to the main cmd's.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>

Brian Goff authored on 2021/06/19 06:38:50
Showing 14 changed files
1 1
new file mode 100644
... ...
@@ -0,0 +1,72 @@
0
+package main
1
+
2
+import (
3
+	"flag"
4
+	"fmt"
5
+	"log"
6
+	"net"
7
+	"os"
8
+	"os/signal"
9
+	"syscall"
10
+
11
+	"github.com/ishidawataru/sctp"
12
+)
13
+
14
+func main() {
15
+	f := os.NewFile(3, "signal-parent")
16
+	host, container := parseHostContainerAddrs()
17
+
18
+	p, err := NewProxy(host, container)
19
+	if err != nil {
20
+		fmt.Fprintf(f, "1\n%s", err)
21
+		f.Close()
22
+		os.Exit(1)
23
+	}
24
+	go handleStopSignals(p)
25
+	fmt.Fprint(f, "0\n")
26
+	f.Close()
27
+
28
+	// Run will block until the proxy stops
29
+	p.Run()
30
+}
31
+
32
+// parseHostContainerAddrs parses the flags passed on reexec to create the TCP/UDP/SCTP
33
+// net.Addrs to map the host and container ports
34
+func parseHostContainerAddrs() (host net.Addr, container net.Addr) {
35
+	var (
36
+		proto         = flag.String("proto", "tcp", "proxy protocol")
37
+		hostIP        = flag.String("host-ip", "", "host ip")
38
+		hostPort      = flag.Int("host-port", -1, "host port")
39
+		containerIP   = flag.String("container-ip", "", "container ip")
40
+		containerPort = flag.Int("container-port", -1, "container port")
41
+	)
42
+
43
+	flag.Parse()
44
+
45
+	switch *proto {
46
+	case "tcp":
47
+		host = &net.TCPAddr{IP: net.ParseIP(*hostIP), Port: *hostPort}
48
+		container = &net.TCPAddr{IP: net.ParseIP(*containerIP), Port: *containerPort}
49
+	case "udp":
50
+		host = &net.UDPAddr{IP: net.ParseIP(*hostIP), Port: *hostPort}
51
+		container = &net.UDPAddr{IP: net.ParseIP(*containerIP), Port: *containerPort}
52
+	case "sctp":
53
+		host = &sctp.SCTPAddr{IPAddrs: []net.IPAddr{{IP: net.ParseIP(*hostIP)}}, Port: *hostPort}
54
+		container = &sctp.SCTPAddr{IPAddrs: []net.IPAddr{{IP: net.ParseIP(*containerIP)}}, Port: *containerPort}
55
+	default:
56
+		log.Fatalf("unsupported protocol %s", *proto)
57
+	}
58
+
59
+	return host, container
60
+}
61
+
62
+func handleStopSignals(p Proxy) {
63
+	s := make(chan os.Signal, 10)
64
+	signal.Notify(s, os.Interrupt, syscall.SIGTERM)
65
+
66
+	for range s {
67
+		p.Close()
68
+
69
+		os.Exit(0)
70
+	}
71
+}
0 72
new file mode 100644
... ...
@@ -0,0 +1,314 @@
0
+package main
1
+
2
+import (
3
+	"bytes"
4
+	"fmt"
5
+	"io"
6
+	"io/ioutil"
7
+	"net"
8
+	"runtime"
9
+	"strings"
10
+	"testing"
11
+	"time"
12
+
13
+	"github.com/ishidawataru/sctp"
14
+	"gotest.tools/v3/skip"
15
+
16
+	// this takes care of the incontainer flag
17
+	_ "github.com/docker/docker/libnetwork/testutils"
18
+)
19
+
20
+var testBuf = []byte("Buffalo buffalo Buffalo buffalo buffalo buffalo Buffalo buffalo")
21
+var testBufSize = len(testBuf)
22
+
23
+type EchoServer interface {
24
+	Run()
25
+	Close()
26
+	LocalAddr() net.Addr
27
+}
28
+
29
+type EchoServerOptions struct {
30
+	TCPHalfClose bool
31
+}
32
+
33
+type StreamEchoServer struct {
34
+	listener net.Listener
35
+	testCtx  *testing.T
36
+	opts     EchoServerOptions
37
+}
38
+
39
+type UDPEchoServer struct {
40
+	conn    net.PacketConn
41
+	testCtx *testing.T
42
+}
43
+
44
+func NewEchoServer(t *testing.T, proto, address string, opts EchoServerOptions) EchoServer {
45
+	var server EchoServer
46
+	if !strings.HasPrefix(proto, "tcp") && opts.TCPHalfClose {
47
+		t.Fatalf("TCPHalfClose is not supported for %s", proto)
48
+	}
49
+
50
+	switch {
51
+	case strings.HasPrefix(proto, "tcp"):
52
+		listener, err := net.Listen(proto, address)
53
+		if err != nil {
54
+			t.Fatal(err)
55
+		}
56
+		server = &StreamEchoServer{listener: listener, testCtx: t, opts: opts}
57
+	case strings.HasPrefix(proto, "udp"):
58
+		socket, err := net.ListenPacket(proto, address)
59
+		if err != nil {
60
+			t.Fatal(err)
61
+		}
62
+		server = &UDPEchoServer{conn: socket, testCtx: t}
63
+	case strings.HasPrefix(proto, "sctp"):
64
+		addr, err := sctp.ResolveSCTPAddr(proto, address)
65
+		if err != nil {
66
+			t.Fatal(err)
67
+		}
68
+		listener, err := sctp.ListenSCTP(proto, addr)
69
+		if err != nil {
70
+			t.Fatal(err)
71
+		}
72
+		server = &StreamEchoServer{listener: listener, testCtx: t}
73
+	default:
74
+		t.Fatalf("unknown protocol: %s", proto)
75
+	}
76
+	return server
77
+}
78
+
79
+func (server *StreamEchoServer) Run() {
80
+	go func() {
81
+		for {
82
+			client, err := server.listener.Accept()
83
+			if err != nil {
84
+				return
85
+			}
86
+			go func(client net.Conn) {
87
+				if server.opts.TCPHalfClose {
88
+					data, err := ioutil.ReadAll(client)
89
+					if err != nil {
90
+						server.testCtx.Logf("io.ReadAll() failed for the client: %v\n", err.Error())
91
+					}
92
+					if _, err := client.Write(data); err != nil {
93
+						server.testCtx.Logf("can't echo to the client: %v\n", err.Error())
94
+					}
95
+					client.(*net.TCPConn).CloseWrite()
96
+				} else {
97
+					if _, err := io.Copy(client, client); err != nil {
98
+						server.testCtx.Logf("can't echo to the client: %v\n", err.Error())
99
+					}
100
+					client.Close()
101
+				}
102
+			}(client)
103
+		}
104
+	}()
105
+}
106
+
107
+func (server *StreamEchoServer) LocalAddr() net.Addr { return server.listener.Addr() }
108
+func (server *StreamEchoServer) Close()              { server.listener.Close() }
109
+
110
+func (server *UDPEchoServer) Run() {
111
+	go func() {
112
+		readBuf := make([]byte, 1024)
113
+		for {
114
+			read, from, err := server.conn.ReadFrom(readBuf)
115
+			if err != nil {
116
+				return
117
+			}
118
+			for i := 0; i != read; {
119
+				written, err := server.conn.WriteTo(readBuf[i:read], from)
120
+				if err != nil {
121
+					break
122
+				}
123
+				i += written
124
+			}
125
+		}
126
+	}()
127
+}
128
+
129
+func (server *UDPEchoServer) LocalAddr() net.Addr { return server.conn.LocalAddr() }
130
+func (server *UDPEchoServer) Close()              { server.conn.Close() }
131
+
132
+func testProxyAt(t *testing.T, proto string, proxy Proxy, addr string, halfClose bool) {
133
+	defer proxy.Close()
134
+	go proxy.Run()
135
+	var client net.Conn
136
+	var err error
137
+	if strings.HasPrefix(proto, "sctp") {
138
+		var a *sctp.SCTPAddr
139
+		a, err = sctp.ResolveSCTPAddr(proto, addr)
140
+		if err != nil {
141
+			t.Fatal(err)
142
+		}
143
+		client, err = sctp.DialSCTP(proto, nil, a)
144
+	} else {
145
+		client, err = net.Dial(proto, addr)
146
+	}
147
+
148
+	if err != nil {
149
+		t.Fatalf("Can't connect to the proxy: %v", err)
150
+	}
151
+	defer client.Close()
152
+	client.SetDeadline(time.Now().Add(10 * time.Second))
153
+	if _, err = client.Write(testBuf); err != nil {
154
+		t.Fatal(err)
155
+	}
156
+	if halfClose {
157
+		if proto != "tcp" {
158
+			t.Fatalf("halfClose is not supported for %s", proto)
159
+		}
160
+		client.(*net.TCPConn).CloseWrite()
161
+	}
162
+	recvBuf := make([]byte, testBufSize)
163
+	if _, err = client.Read(recvBuf); err != nil {
164
+		t.Fatal(err)
165
+	}
166
+	if !bytes.Equal(testBuf, recvBuf) {
167
+		t.Fatal(fmt.Errorf("Expected [%v] but got [%v]", testBuf, recvBuf))
168
+	}
169
+}
170
+
171
+func testProxy(t *testing.T, proto string, proxy Proxy, halfClose bool) {
172
+	testProxyAt(t, proto, proxy, proxy.FrontendAddr().String(), halfClose)
173
+}
174
+
175
+func testTCP4Proxy(t *testing.T, halfClose bool) {
176
+	backend := NewEchoServer(t, "tcp", "127.0.0.1:0", EchoServerOptions{TCPHalfClose: halfClose})
177
+	defer backend.Close()
178
+	backend.Run()
179
+	frontendAddr := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
180
+	proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
181
+	if err != nil {
182
+		t.Fatal(err)
183
+	}
184
+	testProxy(t, "tcp", proxy, halfClose)
185
+}
186
+
187
+func TestTCP4Proxy(t *testing.T) {
188
+	testTCP4Proxy(t, false)
189
+}
190
+
191
+func TestTCP4ProxyHalfClose(t *testing.T) {
192
+	testTCP4Proxy(t, true)
193
+}
194
+
195
+func TestTCP6Proxy(t *testing.T) {
196
+	t.Skip("Need to start CI docker with --ipv6")
197
+	backend := NewEchoServer(t, "tcp", "[::1]:0", EchoServerOptions{})
198
+	defer backend.Close()
199
+	backend.Run()
200
+	frontendAddr := &net.TCPAddr{IP: net.IPv6loopback, Port: 0}
201
+	proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
202
+	if err != nil {
203
+		t.Fatal(err)
204
+	}
205
+	testProxy(t, "tcp", proxy, false)
206
+}
207
+
208
+func TestTCPDualStackProxy(t *testing.T) {
209
+	// If I understand `godoc -src net favoriteAddrFamily` (used by the
210
+	// net.Listen* functions) correctly this should work, but it doesn't.
211
+	t.Skip("No support for dual stack yet")
212
+	backend := NewEchoServer(t, "tcp", "[::1]:0", EchoServerOptions{})
213
+	defer backend.Close()
214
+	backend.Run()
215
+	frontendAddr := &net.TCPAddr{IP: net.IPv6loopback, Port: 0}
216
+	proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
217
+	if err != nil {
218
+		t.Fatal(err)
219
+	}
220
+	ipv4ProxyAddr := &net.TCPAddr{
221
+		IP:   net.IPv4(127, 0, 0, 1),
222
+		Port: proxy.FrontendAddr().(*net.TCPAddr).Port,
223
+	}
224
+	testProxyAt(t, "tcp", proxy, ipv4ProxyAddr.String(), false)
225
+}
226
+
227
+func TestUDP4Proxy(t *testing.T) {
228
+	backend := NewEchoServer(t, "udp", "127.0.0.1:0", EchoServerOptions{})
229
+	defer backend.Close()
230
+	backend.Run()
231
+	frontendAddr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
232
+	proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
233
+	if err != nil {
234
+		t.Fatal(err)
235
+	}
236
+	testProxy(t, "udp", proxy, false)
237
+}
238
+
239
+func TestUDP6Proxy(t *testing.T) {
240
+	t.Skip("Need to start CI docker with --ipv6")
241
+	backend := NewEchoServer(t, "udp", "[::1]:0", EchoServerOptions{})
242
+	defer backend.Close()
243
+	backend.Run()
244
+	frontendAddr := &net.UDPAddr{IP: net.IPv6loopback, Port: 0}
245
+	proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
246
+	if err != nil {
247
+		t.Fatal(err)
248
+	}
249
+	testProxy(t, "udp", proxy, false)
250
+}
251
+
252
+func TestUDPWriteError(t *testing.T) {
253
+	frontendAddr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
254
+	// Hopefully, this port will be free: */
255
+	backendAddr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 25587}
256
+	proxy, err := NewProxy(frontendAddr, backendAddr)
257
+	if err != nil {
258
+		t.Fatal(err)
259
+	}
260
+	defer proxy.Close()
261
+	go proxy.Run()
262
+	client, err := net.Dial("udp", "127.0.0.1:25587")
263
+	if err != nil {
264
+		t.Fatalf("Can't connect to the proxy: %v", err)
265
+	}
266
+	defer client.Close()
267
+	// Make sure the proxy doesn't stop when there is no actual backend:
268
+	client.Write(testBuf)
269
+	client.Write(testBuf)
270
+	backend := NewEchoServer(t, "udp", "127.0.0.1:25587", EchoServerOptions{})
271
+	defer backend.Close()
272
+	backend.Run()
273
+	client.SetDeadline(time.Now().Add(10 * time.Second))
274
+	if _, err = client.Write(testBuf); err != nil {
275
+		t.Fatal(err)
276
+	}
277
+	recvBuf := make([]byte, testBufSize)
278
+	if _, err = client.Read(recvBuf); err != nil {
279
+		t.Fatal(err)
280
+	}
281
+	if !bytes.Equal(testBuf, recvBuf) {
282
+		t.Fatal(fmt.Errorf("Expected [%v] but got [%v]", testBuf, recvBuf))
283
+	}
284
+}
285
+
286
+func TestSCTP4Proxy(t *testing.T) {
287
+	skip.If(t, runtime.GOOS == "windows", "sctp is not supported on windows")
288
+
289
+	backend := NewEchoServer(t, "sctp", "127.0.0.1:0", EchoServerOptions{})
290
+	defer backend.Close()
291
+	backend.Run()
292
+	frontendAddr := &sctp.SCTPAddr{IPAddrs: []net.IPAddr{{IP: net.IPv4(127, 0, 0, 1)}}, Port: 0}
293
+	proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
294
+	if err != nil {
295
+		t.Fatal(err)
296
+	}
297
+	testProxy(t, "sctp", proxy, false)
298
+}
299
+
300
+func TestSCTP6Proxy(t *testing.T) {
301
+	t.Skip("Need to start CI docker with --ipv6")
302
+	skip.If(t, runtime.GOOS == "windows", "sctp is not supported on windows")
303
+
304
+	backend := NewEchoServer(t, "sctp", "[::1]:0", EchoServerOptions{})
305
+	defer backend.Close()
306
+	backend.Run()
307
+	frontendAddr := &sctp.SCTPAddr{IPAddrs: []net.IPAddr{{IP: net.IPv6loopback}}, Port: 0}
308
+	proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
309
+	if err != nil {
310
+		t.Fatal(err)
311
+	}
312
+	testProxy(t, "sctp", proxy, false)
313
+}
0 314
new file mode 100644
... ...
@@ -0,0 +1,50 @@
0
+// docker-proxy provides a network Proxy interface and implementations for TCP
1
+// and UDP.
2
+package main
3
+
4
+import (
5
+	"net"
6
+
7
+	"github.com/ishidawataru/sctp"
8
+)
9
+
10
+// ipVersion refers to IP version - v4 or v6
11
+type ipVersion string
12
+
13
+const (
14
+	// IPv4 is version 4
15
+	ipv4 ipVersion = "4"
16
+	// IPv4 is version 6
17
+	ipv6 ipVersion = "6"
18
+)
19
+
20
+// Proxy defines the behavior of a proxy. It forwards traffic back and forth
21
+// between two endpoints : the frontend and the backend.
22
+// It can be used to do software port-mapping between two addresses.
23
+// e.g. forward all traffic between the frontend (host) 127.0.0.1:3000
24
+// to the backend (container) at 172.17.42.108:4000.
25
+type Proxy interface {
26
+	// Run starts forwarding traffic back and forth between the front
27
+	// and back-end addresses.
28
+	Run()
29
+	// Close stops forwarding traffic and close both ends of the Proxy.
30
+	Close()
31
+	// FrontendAddr returns the address on which the proxy is listening.
32
+	FrontendAddr() net.Addr
33
+	// BackendAddr returns the proxied address.
34
+	BackendAddr() net.Addr
35
+}
36
+
37
+// NewProxy creates a Proxy according to the specified frontendAddr and backendAddr.
38
+func NewProxy(frontendAddr, backendAddr net.Addr) (Proxy, error) {
39
+	switch frontendAddr.(type) {
40
+	case *net.UDPAddr:
41
+		return NewUDPProxy(frontendAddr.(*net.UDPAddr), backendAddr.(*net.UDPAddr))
42
+	case *net.TCPAddr:
43
+		return NewTCPProxy(frontendAddr.(*net.TCPAddr), backendAddr.(*net.TCPAddr))
44
+	case *sctp.SCTPAddr:
45
+		return NewSCTPProxy(frontendAddr.(*sctp.SCTPAddr), backendAddr.(*sctp.SCTPAddr))
46
+	default:
47
+		panic("Unsupported protocol")
48
+	}
49
+}
0 50
new file mode 100644
... ...
@@ -0,0 +1,98 @@
0
+package main
1
+
2
+import (
3
+	"io"
4
+	"log"
5
+	"net"
6
+	"sync"
7
+
8
+	"github.com/ishidawataru/sctp"
9
+)
10
+
11
+// SCTPProxy is a proxy for SCTP connections. It implements the Proxy interface to
12
+// handle SCTP traffic forwarding between the frontend and backend addresses.
13
+type SCTPProxy struct {
14
+	listener     *sctp.SCTPListener
15
+	frontendAddr *sctp.SCTPAddr
16
+	backendAddr  *sctp.SCTPAddr
17
+}
18
+
19
+// NewSCTPProxy creates a new SCTPProxy.
20
+func NewSCTPProxy(frontendAddr, backendAddr *sctp.SCTPAddr) (*SCTPProxy, error) {
21
+	// detect version of hostIP to bind only to correct version
22
+	ipVersion := ipv4
23
+	if frontendAddr.IPAddrs[0].IP.To4() == nil {
24
+		ipVersion = ipv6
25
+	}
26
+	listener, err := sctp.ListenSCTP("sctp"+string(ipVersion), frontendAddr)
27
+	if err != nil {
28
+		return nil, err
29
+	}
30
+	// If the port in frontendAddr was 0 then ListenSCTP will have a picked
31
+	// a port to listen on, hence the call to Addr to get that actual port:
32
+	return &SCTPProxy{
33
+		listener:     listener,
34
+		frontendAddr: listener.Addr().(*sctp.SCTPAddr),
35
+		backendAddr:  backendAddr,
36
+	}, nil
37
+}
38
+
39
+func (proxy *SCTPProxy) clientLoop(client *sctp.SCTPConn, quit chan bool) {
40
+	backend, err := sctp.DialSCTP("sctp", nil, proxy.backendAddr)
41
+	if err != nil {
42
+		log.Printf("Can't forward traffic to backend sctp/%v: %s\n", proxy.backendAddr, err)
43
+		client.Close()
44
+		return
45
+	}
46
+	clientC := sctp.NewSCTPSndRcvInfoWrappedConn(client)
47
+	backendC := sctp.NewSCTPSndRcvInfoWrappedConn(backend)
48
+
49
+	var wg sync.WaitGroup
50
+	var broker = func(to, from net.Conn) {
51
+		io.Copy(to, from)
52
+		from.Close()
53
+		to.Close()
54
+		wg.Done()
55
+	}
56
+
57
+	wg.Add(2)
58
+	go broker(clientC, backendC)
59
+	go broker(backendC, clientC)
60
+
61
+	finish := make(chan struct{})
62
+	go func() {
63
+		wg.Wait()
64
+		close(finish)
65
+	}()
66
+
67
+	select {
68
+	case <-quit:
69
+	case <-finish:
70
+	}
71
+	clientC.Close()
72
+	backendC.Close()
73
+	<-finish
74
+}
75
+
76
+// Run starts forwarding the traffic using SCTP.
77
+func (proxy *SCTPProxy) Run() {
78
+	quit := make(chan bool)
79
+	defer close(quit)
80
+	for {
81
+		client, err := proxy.listener.Accept()
82
+		if err != nil {
83
+			log.Printf("Stopping proxy on sctp/%v for sctp/%v (%s)", proxy.frontendAddr, proxy.backendAddr, err)
84
+			return
85
+		}
86
+		go proxy.clientLoop(client.(*sctp.SCTPConn), quit)
87
+	}
88
+}
89
+
90
+// Close stops forwarding the traffic.
91
+func (proxy *SCTPProxy) Close() { proxy.listener.Close() }
92
+
93
+// FrontendAddr returns the SCTP address on which the proxy is listening.
94
+func (proxy *SCTPProxy) FrontendAddr() net.Addr { return proxy.frontendAddr }
95
+
96
+// BackendAddr returns the SCTP proxied address.
97
+func (proxy *SCTPProxy) BackendAddr() net.Addr { return proxy.backendAddr }
0 98
new file mode 100644
... ...
@@ -0,0 +1,94 @@
0
+package main
1
+
2
+import (
3
+	"io"
4
+	"log"
5
+	"net"
6
+	"sync"
7
+)
8
+
9
+// TCPProxy is a proxy for TCP connections. It implements the Proxy interface to
10
+// handle TCP traffic forwarding between the frontend and backend addresses.
11
+type TCPProxy struct {
12
+	listener     *net.TCPListener
13
+	frontendAddr *net.TCPAddr
14
+	backendAddr  *net.TCPAddr
15
+}
16
+
17
+// NewTCPProxy creates a new TCPProxy.
18
+func NewTCPProxy(frontendAddr, backendAddr *net.TCPAddr) (*TCPProxy, error) {
19
+	// detect version of hostIP to bind only to correct version
20
+	ipVersion := ipv4
21
+	if frontendAddr.IP.To4() == nil {
22
+		ipVersion = ipv6
23
+	}
24
+	listener, err := net.ListenTCP("tcp"+string(ipVersion), frontendAddr)
25
+	if err != nil {
26
+		return nil, err
27
+	}
28
+	// If the port in frontendAddr was 0 then ListenTCP will have a picked
29
+	// a port to listen on, hence the call to Addr to get that actual port:
30
+	return &TCPProxy{
31
+		listener:     listener,
32
+		frontendAddr: listener.Addr().(*net.TCPAddr),
33
+		backendAddr:  backendAddr,
34
+	}, nil
35
+}
36
+
37
+func (proxy *TCPProxy) clientLoop(client *net.TCPConn, quit chan bool) {
38
+	backend, err := net.DialTCP("tcp", nil, proxy.backendAddr)
39
+	if err != nil {
40
+		log.Printf("Can't forward traffic to backend tcp/%v: %s\n", proxy.backendAddr, err)
41
+		client.Close()
42
+		return
43
+	}
44
+
45
+	var wg sync.WaitGroup
46
+	var broker = func(to, from *net.TCPConn) {
47
+		io.Copy(to, from)
48
+		from.CloseRead()
49
+		to.CloseWrite()
50
+		wg.Done()
51
+	}
52
+
53
+	wg.Add(2)
54
+	go broker(client, backend)
55
+	go broker(backend, client)
56
+
57
+	finish := make(chan struct{})
58
+	go func() {
59
+		wg.Wait()
60
+		close(finish)
61
+	}()
62
+
63
+	select {
64
+	case <-quit:
65
+	case <-finish:
66
+	}
67
+	client.Close()
68
+	backend.Close()
69
+	<-finish
70
+}
71
+
72
+// Run starts forwarding the traffic using TCP.
73
+func (proxy *TCPProxy) Run() {
74
+	quit := make(chan bool)
75
+	defer close(quit)
76
+	for {
77
+		client, err := proxy.listener.Accept()
78
+		if err != nil {
79
+			log.Printf("Stopping proxy on tcp/%v for tcp/%v (%s)", proxy.frontendAddr, proxy.backendAddr, err)
80
+			return
81
+		}
82
+		go proxy.clientLoop(client.(*net.TCPConn), quit)
83
+	}
84
+}
85
+
86
+// Close stops forwarding the traffic.
87
+func (proxy *TCPProxy) Close() { proxy.listener.Close() }
88
+
89
+// FrontendAddr returns the TCP address on which the proxy is listening.
90
+func (proxy *TCPProxy) FrontendAddr() net.Addr { return proxy.frontendAddr }
91
+
92
+// BackendAddr returns the TCP proxied address.
93
+func (proxy *TCPProxy) BackendAddr() net.Addr { return proxy.backendAddr }
0 94
new file mode 100644
... ...
@@ -0,0 +1,173 @@
0
+package main
1
+
2
+import (
3
+	"encoding/binary"
4
+	"log"
5
+	"net"
6
+	"strings"
7
+	"sync"
8
+	"syscall"
9
+	"time"
10
+)
11
+
12
+const (
13
+	// UDPConnTrackTimeout is the timeout used for UDP connection tracking
14
+	UDPConnTrackTimeout = 90 * time.Second
15
+	// UDPBufSize is the buffer size for the UDP proxy
16
+	UDPBufSize = 65507
17
+)
18
+
19
+// A net.Addr where the IP is split into two fields so you can use it as a key
20
+// in a map:
21
+type connTrackKey struct {
22
+	IPHigh uint64
23
+	IPLow  uint64
24
+	Port   int
25
+}
26
+
27
+func newConnTrackKey(addr *net.UDPAddr) *connTrackKey {
28
+	if len(addr.IP) == net.IPv4len {
29
+		return &connTrackKey{
30
+			IPHigh: 0,
31
+			IPLow:  uint64(binary.BigEndian.Uint32(addr.IP)),
32
+			Port:   addr.Port,
33
+		}
34
+	}
35
+	return &connTrackKey{
36
+		IPHigh: binary.BigEndian.Uint64(addr.IP[:8]),
37
+		IPLow:  binary.BigEndian.Uint64(addr.IP[8:]),
38
+		Port:   addr.Port,
39
+	}
40
+}
41
+
42
+type connTrackMap map[connTrackKey]*net.UDPConn
43
+
44
+// UDPProxy is proxy for which handles UDP datagrams. It implements the Proxy
45
+// interface to handle UDP traffic forwarding between the frontend and backend
46
+// addresses.
47
+type UDPProxy struct {
48
+	listener       *net.UDPConn
49
+	frontendAddr   *net.UDPAddr
50
+	backendAddr    *net.UDPAddr
51
+	connTrackTable connTrackMap
52
+	connTrackLock  sync.Mutex
53
+}
54
+
55
+// NewUDPProxy creates a new UDPProxy.
56
+func NewUDPProxy(frontendAddr, backendAddr *net.UDPAddr) (*UDPProxy, error) {
57
+	// detect version of hostIP to bind only to correct version
58
+	ipVersion := ipv4
59
+	if frontendAddr.IP.To4() == nil {
60
+		ipVersion = ipv6
61
+	}
62
+	listener, err := net.ListenUDP("udp"+string(ipVersion), frontendAddr)
63
+	if err != nil {
64
+		return nil, err
65
+	}
66
+	return &UDPProxy{
67
+		listener:       listener,
68
+		frontendAddr:   listener.LocalAddr().(*net.UDPAddr),
69
+		backendAddr:    backendAddr,
70
+		connTrackTable: make(connTrackMap),
71
+	}, nil
72
+}
73
+
74
+func (proxy *UDPProxy) replyLoop(proxyConn *net.UDPConn, clientAddr *net.UDPAddr, clientKey *connTrackKey) {
75
+	defer func() {
76
+		proxy.connTrackLock.Lock()
77
+		delete(proxy.connTrackTable, *clientKey)
78
+		proxy.connTrackLock.Unlock()
79
+		proxyConn.Close()
80
+	}()
81
+
82
+	readBuf := make([]byte, UDPBufSize)
83
+	for {
84
+		proxyConn.SetReadDeadline(time.Now().Add(UDPConnTrackTimeout))
85
+	again:
86
+		read, err := proxyConn.Read(readBuf)
87
+		if err != nil {
88
+			if err, ok := err.(*net.OpError); ok && err.Err == syscall.ECONNREFUSED {
89
+				// This will happen if the last write failed
90
+				// (e.g: nothing is actually listening on the
91
+				// proxied port on the container), ignore it
92
+				// and continue until UDPConnTrackTimeout
93
+				// expires:
94
+				goto again
95
+			}
96
+			return
97
+		}
98
+		for i := 0; i != read; {
99
+			written, err := proxy.listener.WriteToUDP(readBuf[i:read], clientAddr)
100
+			if err != nil {
101
+				return
102
+			}
103
+			i += written
104
+		}
105
+	}
106
+}
107
+
108
+// Run starts forwarding the traffic using UDP.
109
+func (proxy *UDPProxy) Run() {
110
+	readBuf := make([]byte, UDPBufSize)
111
+	for {
112
+		read, from, err := proxy.listener.ReadFromUDP(readBuf)
113
+		if err != nil {
114
+			// NOTE: Apparently ReadFrom doesn't return
115
+			// ECONNREFUSED like Read do (see comment in
116
+			// UDPProxy.replyLoop)
117
+			if !isClosedError(err) {
118
+				log.Printf("Stopping proxy on udp/%v for udp/%v (%s)", proxy.frontendAddr, proxy.backendAddr, err)
119
+			}
120
+			break
121
+		}
122
+
123
+		fromKey := newConnTrackKey(from)
124
+		proxy.connTrackLock.Lock()
125
+		proxyConn, hit := proxy.connTrackTable[*fromKey]
126
+		if !hit {
127
+			proxyConn, err = net.DialUDP("udp", nil, proxy.backendAddr)
128
+			if err != nil {
129
+				log.Printf("Can't proxy a datagram to udp/%s: %s\n", proxy.backendAddr, err)
130
+				proxy.connTrackLock.Unlock()
131
+				continue
132
+			}
133
+			proxy.connTrackTable[*fromKey] = proxyConn
134
+			go proxy.replyLoop(proxyConn, from, fromKey)
135
+		}
136
+		proxy.connTrackLock.Unlock()
137
+		for i := 0; i != read; {
138
+			written, err := proxyConn.Write(readBuf[i:read])
139
+			if err != nil {
140
+				log.Printf("Can't proxy a datagram to udp/%s: %s\n", proxy.backendAddr, err)
141
+				break
142
+			}
143
+			i += written
144
+		}
145
+	}
146
+}
147
+
148
+// Close stops forwarding the traffic.
149
+func (proxy *UDPProxy) Close() {
150
+	proxy.listener.Close()
151
+	proxy.connTrackLock.Lock()
152
+	defer proxy.connTrackLock.Unlock()
153
+	for _, conn := range proxy.connTrackTable {
154
+		conn.Close()
155
+	}
156
+}
157
+
158
+// FrontendAddr returns the UDP address on which the proxy is listening.
159
+func (proxy *UDPProxy) FrontendAddr() net.Addr { return proxy.frontendAddr }
160
+
161
+// BackendAddr returns the proxied UDP address.
162
+func (proxy *UDPProxy) BackendAddr() net.Addr { return proxy.backendAddr }
163
+
164
+func isClosedError(err error) bool {
165
+	/* This comparison is ugly, but unfortunately, net.go doesn't export errClosing.
166
+	 * See:
167
+	 * http://golang.org/src/pkg/net/net.go
168
+	 * https://code.google.com/p/go/issues/detail?id=4337
169
+	 * https://groups.google.com/forum/#!msg/golang-nuts/0_aaCvBmOcM/SptmDyX1XJMJ
170
+	 */
171
+	return strings.HasSuffix(err.Error(), "use of closed network connection")
172
+}
... ...
@@ -3,7 +3,7 @@
3 3
 set -e
4 4
 
5 5
 (
6
-	GO_PACKAGE='github.com/docker/docker/libnetwork/cmd/proxy'
6
+	GO_PACKAGE='github.com/docker/docker/cmd/docker-proxy'
7 7
 	BINARY_SHORT_NAME='docker-proxy'
8 8
 
9 9
 	source "${MAKEDIR}/.binary"
... ...
@@ -9,7 +9,7 @@ set -e
9 9
 	export BUILDFLAGS=("${BUILDFLAGS[@]/osusergo /}")     # ditto for osusergo
10 10
 	export BUILDFLAGS=("${BUILDFLAGS[@]/static_build /}") # we're not building a "static" binary here
11 11
 
12
-	GO_PACKAGE='github.com/docker/docker/libnetwork/cmd/proxy'
12
+	GO_PACKAGE='github.com/docker/docker/cmd/docker-proxy'
13 13
 	BINARY_SHORT_NAME='docker-proxy'
14 14
 	source "${MAKEDIR}/.binary"
15 15
 )
16 16
deleted file mode 100644
... ...
@@ -1,72 +0,0 @@
1
-package main
2
-
3
-import (
4
-	"flag"
5
-	"fmt"
6
-	"log"
7
-	"net"
8
-	"os"
9
-	"os/signal"
10
-	"syscall"
11
-
12
-	"github.com/ishidawataru/sctp"
13
-)
14
-
15
-func main() {
16
-	f := os.NewFile(3, "signal-parent")
17
-	host, container := parseHostContainerAddrs()
18
-
19
-	p, err := NewProxy(host, container)
20
-	if err != nil {
21
-		fmt.Fprintf(f, "1\n%s", err)
22
-		f.Close()
23
-		os.Exit(1)
24
-	}
25
-	go handleStopSignals(p)
26
-	fmt.Fprint(f, "0\n")
27
-	f.Close()
28
-
29
-	// Run will block until the proxy stops
30
-	p.Run()
31
-}
32
-
33
-// parseHostContainerAddrs parses the flags passed on reexec to create the TCP/UDP/SCTP
34
-// net.Addrs to map the host and container ports
35
-func parseHostContainerAddrs() (host net.Addr, container net.Addr) {
36
-	var (
37
-		proto         = flag.String("proto", "tcp", "proxy protocol")
38
-		hostIP        = flag.String("host-ip", "", "host ip")
39
-		hostPort      = flag.Int("host-port", -1, "host port")
40
-		containerIP   = flag.String("container-ip", "", "container ip")
41
-		containerPort = flag.Int("container-port", -1, "container port")
42
-	)
43
-
44
-	flag.Parse()
45
-
46
-	switch *proto {
47
-	case "tcp":
48
-		host = &net.TCPAddr{IP: net.ParseIP(*hostIP), Port: *hostPort}
49
-		container = &net.TCPAddr{IP: net.ParseIP(*containerIP), Port: *containerPort}
50
-	case "udp":
51
-		host = &net.UDPAddr{IP: net.ParseIP(*hostIP), Port: *hostPort}
52
-		container = &net.UDPAddr{IP: net.ParseIP(*containerIP), Port: *containerPort}
53
-	case "sctp":
54
-		host = &sctp.SCTPAddr{IPAddrs: []net.IPAddr{{IP: net.ParseIP(*hostIP)}}, Port: *hostPort}
55
-		container = &sctp.SCTPAddr{IPAddrs: []net.IPAddr{{IP: net.ParseIP(*containerIP)}}, Port: *containerPort}
56
-	default:
57
-		log.Fatalf("unsupported protocol %s", *proto)
58
-	}
59
-
60
-	return host, container
61
-}
62
-
63
-func handleStopSignals(p Proxy) {
64
-	s := make(chan os.Signal, 10)
65
-	signal.Notify(s, os.Interrupt, syscall.SIGTERM)
66
-
67
-	for range s {
68
-		p.Close()
69
-
70
-		os.Exit(0)
71
-	}
72
-}
73 1
deleted file mode 100644
... ...
@@ -1,314 +0,0 @@
1
-package main
2
-
3
-import (
4
-	"bytes"
5
-	"fmt"
6
-	"io"
7
-	"io/ioutil"
8
-	"net"
9
-	"runtime"
10
-	"strings"
11
-	"testing"
12
-	"time"
13
-
14
-	"github.com/ishidawataru/sctp"
15
-	"gotest.tools/v3/skip"
16
-
17
-	// this takes care of the incontainer flag
18
-	_ "github.com/docker/docker/libnetwork/testutils"
19
-)
20
-
21
-var testBuf = []byte("Buffalo buffalo Buffalo buffalo buffalo buffalo Buffalo buffalo")
22
-var testBufSize = len(testBuf)
23
-
24
-type EchoServer interface {
25
-	Run()
26
-	Close()
27
-	LocalAddr() net.Addr
28
-}
29
-
30
-type EchoServerOptions struct {
31
-	TCPHalfClose bool
32
-}
33
-
34
-type StreamEchoServer struct {
35
-	listener net.Listener
36
-	testCtx  *testing.T
37
-	opts     EchoServerOptions
38
-}
39
-
40
-type UDPEchoServer struct {
41
-	conn    net.PacketConn
42
-	testCtx *testing.T
43
-}
44
-
45
-func NewEchoServer(t *testing.T, proto, address string, opts EchoServerOptions) EchoServer {
46
-	var server EchoServer
47
-	if !strings.HasPrefix(proto, "tcp") && opts.TCPHalfClose {
48
-		t.Fatalf("TCPHalfClose is not supported for %s", proto)
49
-	}
50
-
51
-	switch {
52
-	case strings.HasPrefix(proto, "tcp"):
53
-		listener, err := net.Listen(proto, address)
54
-		if err != nil {
55
-			t.Fatal(err)
56
-		}
57
-		server = &StreamEchoServer{listener: listener, testCtx: t, opts: opts}
58
-	case strings.HasPrefix(proto, "udp"):
59
-		socket, err := net.ListenPacket(proto, address)
60
-		if err != nil {
61
-			t.Fatal(err)
62
-		}
63
-		server = &UDPEchoServer{conn: socket, testCtx: t}
64
-	case strings.HasPrefix(proto, "sctp"):
65
-		addr, err := sctp.ResolveSCTPAddr(proto, address)
66
-		if err != nil {
67
-			t.Fatal(err)
68
-		}
69
-		listener, err := sctp.ListenSCTP(proto, addr)
70
-		if err != nil {
71
-			t.Fatal(err)
72
-		}
73
-		server = &StreamEchoServer{listener: listener, testCtx: t}
74
-	default:
75
-		t.Fatalf("unknown protocol: %s", proto)
76
-	}
77
-	return server
78
-}
79
-
80
-func (server *StreamEchoServer) Run() {
81
-	go func() {
82
-		for {
83
-			client, err := server.listener.Accept()
84
-			if err != nil {
85
-				return
86
-			}
87
-			go func(client net.Conn) {
88
-				if server.opts.TCPHalfClose {
89
-					data, err := ioutil.ReadAll(client)
90
-					if err != nil {
91
-						server.testCtx.Logf("io.ReadAll() failed for the client: %v\n", err.Error())
92
-					}
93
-					if _, err := client.Write(data); err != nil {
94
-						server.testCtx.Logf("can't echo to the client: %v\n", err.Error())
95
-					}
96
-					client.(*net.TCPConn).CloseWrite()
97
-				} else {
98
-					if _, err := io.Copy(client, client); err != nil {
99
-						server.testCtx.Logf("can't echo to the client: %v\n", err.Error())
100
-					}
101
-					client.Close()
102
-				}
103
-			}(client)
104
-		}
105
-	}()
106
-}
107
-
108
-func (server *StreamEchoServer) LocalAddr() net.Addr { return server.listener.Addr() }
109
-func (server *StreamEchoServer) Close()              { server.listener.Close() }
110
-
111
-func (server *UDPEchoServer) Run() {
112
-	go func() {
113
-		readBuf := make([]byte, 1024)
114
-		for {
115
-			read, from, err := server.conn.ReadFrom(readBuf)
116
-			if err != nil {
117
-				return
118
-			}
119
-			for i := 0; i != read; {
120
-				written, err := server.conn.WriteTo(readBuf[i:read], from)
121
-				if err != nil {
122
-					break
123
-				}
124
-				i += written
125
-			}
126
-		}
127
-	}()
128
-}
129
-
130
-func (server *UDPEchoServer) LocalAddr() net.Addr { return server.conn.LocalAddr() }
131
-func (server *UDPEchoServer) Close()              { server.conn.Close() }
132
-
133
-func testProxyAt(t *testing.T, proto string, proxy Proxy, addr string, halfClose bool) {
134
-	defer proxy.Close()
135
-	go proxy.Run()
136
-	var client net.Conn
137
-	var err error
138
-	if strings.HasPrefix(proto, "sctp") {
139
-		var a *sctp.SCTPAddr
140
-		a, err = sctp.ResolveSCTPAddr(proto, addr)
141
-		if err != nil {
142
-			t.Fatal(err)
143
-		}
144
-		client, err = sctp.DialSCTP(proto, nil, a)
145
-	} else {
146
-		client, err = net.Dial(proto, addr)
147
-	}
148
-
149
-	if err != nil {
150
-		t.Fatalf("Can't connect to the proxy: %v", err)
151
-	}
152
-	defer client.Close()
153
-	client.SetDeadline(time.Now().Add(10 * time.Second))
154
-	if _, err = client.Write(testBuf); err != nil {
155
-		t.Fatal(err)
156
-	}
157
-	if halfClose {
158
-		if proto != "tcp" {
159
-			t.Fatalf("halfClose is not supported for %s", proto)
160
-		}
161
-		client.(*net.TCPConn).CloseWrite()
162
-	}
163
-	recvBuf := make([]byte, testBufSize)
164
-	if _, err = client.Read(recvBuf); err != nil {
165
-		t.Fatal(err)
166
-	}
167
-	if !bytes.Equal(testBuf, recvBuf) {
168
-		t.Fatal(fmt.Errorf("Expected [%v] but got [%v]", testBuf, recvBuf))
169
-	}
170
-}
171
-
172
-func testProxy(t *testing.T, proto string, proxy Proxy, halfClose bool) {
173
-	testProxyAt(t, proto, proxy, proxy.FrontendAddr().String(), halfClose)
174
-}
175
-
176
-func testTCP4Proxy(t *testing.T, halfClose bool) {
177
-	backend := NewEchoServer(t, "tcp", "127.0.0.1:0", EchoServerOptions{TCPHalfClose: halfClose})
178
-	defer backend.Close()
179
-	backend.Run()
180
-	frontendAddr := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
181
-	proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
182
-	if err != nil {
183
-		t.Fatal(err)
184
-	}
185
-	testProxy(t, "tcp", proxy, halfClose)
186
-}
187
-
188
-func TestTCP4Proxy(t *testing.T) {
189
-	testTCP4Proxy(t, false)
190
-}
191
-
192
-func TestTCP4ProxyHalfClose(t *testing.T) {
193
-	testTCP4Proxy(t, true)
194
-}
195
-
196
-func TestTCP6Proxy(t *testing.T) {
197
-	t.Skip("Need to start CI docker with --ipv6")
198
-	backend := NewEchoServer(t, "tcp", "[::1]:0", EchoServerOptions{})
199
-	defer backend.Close()
200
-	backend.Run()
201
-	frontendAddr := &net.TCPAddr{IP: net.IPv6loopback, Port: 0}
202
-	proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
203
-	if err != nil {
204
-		t.Fatal(err)
205
-	}
206
-	testProxy(t, "tcp", proxy, false)
207
-}
208
-
209
-func TestTCPDualStackProxy(t *testing.T) {
210
-	// If I understand `godoc -src net favoriteAddrFamily` (used by the
211
-	// net.Listen* functions) correctly this should work, but it doesn't.
212
-	t.Skip("No support for dual stack yet")
213
-	backend := NewEchoServer(t, "tcp", "[::1]:0", EchoServerOptions{})
214
-	defer backend.Close()
215
-	backend.Run()
216
-	frontendAddr := &net.TCPAddr{IP: net.IPv6loopback, Port: 0}
217
-	proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
218
-	if err != nil {
219
-		t.Fatal(err)
220
-	}
221
-	ipv4ProxyAddr := &net.TCPAddr{
222
-		IP:   net.IPv4(127, 0, 0, 1),
223
-		Port: proxy.FrontendAddr().(*net.TCPAddr).Port,
224
-	}
225
-	testProxyAt(t, "tcp", proxy, ipv4ProxyAddr.String(), false)
226
-}
227
-
228
-func TestUDP4Proxy(t *testing.T) {
229
-	backend := NewEchoServer(t, "udp", "127.0.0.1:0", EchoServerOptions{})
230
-	defer backend.Close()
231
-	backend.Run()
232
-	frontendAddr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
233
-	proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
234
-	if err != nil {
235
-		t.Fatal(err)
236
-	}
237
-	testProxy(t, "udp", proxy, false)
238
-}
239
-
240
-func TestUDP6Proxy(t *testing.T) {
241
-	t.Skip("Need to start CI docker with --ipv6")
242
-	backend := NewEchoServer(t, "udp", "[::1]:0", EchoServerOptions{})
243
-	defer backend.Close()
244
-	backend.Run()
245
-	frontendAddr := &net.UDPAddr{IP: net.IPv6loopback, Port: 0}
246
-	proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
247
-	if err != nil {
248
-		t.Fatal(err)
249
-	}
250
-	testProxy(t, "udp", proxy, false)
251
-}
252
-
253
-func TestUDPWriteError(t *testing.T) {
254
-	frontendAddr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
255
-	// Hopefully, this port will be free: */
256
-	backendAddr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 25587}
257
-	proxy, err := NewProxy(frontendAddr, backendAddr)
258
-	if err != nil {
259
-		t.Fatal(err)
260
-	}
261
-	defer proxy.Close()
262
-	go proxy.Run()
263
-	client, err := net.Dial("udp", "127.0.0.1:25587")
264
-	if err != nil {
265
-		t.Fatalf("Can't connect to the proxy: %v", err)
266
-	}
267
-	defer client.Close()
268
-	// Make sure the proxy doesn't stop when there is no actual backend:
269
-	client.Write(testBuf)
270
-	client.Write(testBuf)
271
-	backend := NewEchoServer(t, "udp", "127.0.0.1:25587", EchoServerOptions{})
272
-	defer backend.Close()
273
-	backend.Run()
274
-	client.SetDeadline(time.Now().Add(10 * time.Second))
275
-	if _, err = client.Write(testBuf); err != nil {
276
-		t.Fatal(err)
277
-	}
278
-	recvBuf := make([]byte, testBufSize)
279
-	if _, err = client.Read(recvBuf); err != nil {
280
-		t.Fatal(err)
281
-	}
282
-	if !bytes.Equal(testBuf, recvBuf) {
283
-		t.Fatal(fmt.Errorf("Expected [%v] but got [%v]", testBuf, recvBuf))
284
-	}
285
-}
286
-
287
-func TestSCTP4Proxy(t *testing.T) {
288
-	skip.If(t, runtime.GOOS == "windows", "sctp is not supported on windows")
289
-
290
-	backend := NewEchoServer(t, "sctp", "127.0.0.1:0", EchoServerOptions{})
291
-	defer backend.Close()
292
-	backend.Run()
293
-	frontendAddr := &sctp.SCTPAddr{IPAddrs: []net.IPAddr{{IP: net.IPv4(127, 0, 0, 1)}}, Port: 0}
294
-	proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
295
-	if err != nil {
296
-		t.Fatal(err)
297
-	}
298
-	testProxy(t, "sctp", proxy, false)
299
-}
300
-
301
-func TestSCTP6Proxy(t *testing.T) {
302
-	t.Skip("Need to start CI docker with --ipv6")
303
-	skip.If(t, runtime.GOOS == "windows", "sctp is not supported on windows")
304
-
305
-	backend := NewEchoServer(t, "sctp", "[::1]:0", EchoServerOptions{})
306
-	defer backend.Close()
307
-	backend.Run()
308
-	frontendAddr := &sctp.SCTPAddr{IPAddrs: []net.IPAddr{{IP: net.IPv6loopback}}, Port: 0}
309
-	proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
310
-	if err != nil {
311
-		t.Fatal(err)
312
-	}
313
-	testProxy(t, "sctp", proxy, false)
314
-}
315 1
deleted file mode 100644
... ...
@@ -1,50 +0,0 @@
1
-// docker-proxy provides a network Proxy interface and implementations for TCP
2
-// and UDP.
3
-package main
4
-
5
-import (
6
-	"net"
7
-
8
-	"github.com/ishidawataru/sctp"
9
-)
10
-
11
-// ipVersion refers to IP version - v4 or v6
12
-type ipVersion string
13
-
14
-const (
15
-	// IPv4 is version 4
16
-	ipv4 ipVersion = "4"
17
-	// IPv4 is version 6
18
-	ipv6 ipVersion = "6"
19
-)
20
-
21
-// Proxy defines the behavior of a proxy. It forwards traffic back and forth
22
-// between two endpoints : the frontend and the backend.
23
-// It can be used to do software port-mapping between two addresses.
24
-// e.g. forward all traffic between the frontend (host) 127.0.0.1:3000
25
-// to the backend (container) at 172.17.42.108:4000.
26
-type Proxy interface {
27
-	// Run starts forwarding traffic back and forth between the front
28
-	// and back-end addresses.
29
-	Run()
30
-	// Close stops forwarding traffic and close both ends of the Proxy.
31
-	Close()
32
-	// FrontendAddr returns the address on which the proxy is listening.
33
-	FrontendAddr() net.Addr
34
-	// BackendAddr returns the proxied address.
35
-	BackendAddr() net.Addr
36
-}
37
-
38
-// NewProxy creates a Proxy according to the specified frontendAddr and backendAddr.
39
-func NewProxy(frontendAddr, backendAddr net.Addr) (Proxy, error) {
40
-	switch frontendAddr.(type) {
41
-	case *net.UDPAddr:
42
-		return NewUDPProxy(frontendAddr.(*net.UDPAddr), backendAddr.(*net.UDPAddr))
43
-	case *net.TCPAddr:
44
-		return NewTCPProxy(frontendAddr.(*net.TCPAddr), backendAddr.(*net.TCPAddr))
45
-	case *sctp.SCTPAddr:
46
-		return NewSCTPProxy(frontendAddr.(*sctp.SCTPAddr), backendAddr.(*sctp.SCTPAddr))
47
-	default:
48
-		panic("Unsupported protocol")
49
-	}
50
-}
51 1
deleted file mode 100644
... ...
@@ -1,98 +0,0 @@
1
-package main
2
-
3
-import (
4
-	"io"
5
-	"log"
6
-	"net"
7
-	"sync"
8
-
9
-	"github.com/ishidawataru/sctp"
10
-)
11
-
12
-// SCTPProxy is a proxy for SCTP connections. It implements the Proxy interface to
13
-// handle SCTP traffic forwarding between the frontend and backend addresses.
14
-type SCTPProxy struct {
15
-	listener     *sctp.SCTPListener
16
-	frontendAddr *sctp.SCTPAddr
17
-	backendAddr  *sctp.SCTPAddr
18
-}
19
-
20
-// NewSCTPProxy creates a new SCTPProxy.
21
-func NewSCTPProxy(frontendAddr, backendAddr *sctp.SCTPAddr) (*SCTPProxy, error) {
22
-	// detect version of hostIP to bind only to correct version
23
-	ipVersion := ipv4
24
-	if frontendAddr.IPAddrs[0].IP.To4() == nil {
25
-		ipVersion = ipv6
26
-	}
27
-	listener, err := sctp.ListenSCTP("sctp"+string(ipVersion), frontendAddr)
28
-	if err != nil {
29
-		return nil, err
30
-	}
31
-	// If the port in frontendAddr was 0 then ListenSCTP will have a picked
32
-	// a port to listen on, hence the call to Addr to get that actual port:
33
-	return &SCTPProxy{
34
-		listener:     listener,
35
-		frontendAddr: listener.Addr().(*sctp.SCTPAddr),
36
-		backendAddr:  backendAddr,
37
-	}, nil
38
-}
39
-
40
-func (proxy *SCTPProxy) clientLoop(client *sctp.SCTPConn, quit chan bool) {
41
-	backend, err := sctp.DialSCTP("sctp", nil, proxy.backendAddr)
42
-	if err != nil {
43
-		log.Printf("Can't forward traffic to backend sctp/%v: %s\n", proxy.backendAddr, err)
44
-		client.Close()
45
-		return
46
-	}
47
-	clientC := sctp.NewSCTPSndRcvInfoWrappedConn(client)
48
-	backendC := sctp.NewSCTPSndRcvInfoWrappedConn(backend)
49
-
50
-	var wg sync.WaitGroup
51
-	var broker = func(to, from net.Conn) {
52
-		io.Copy(to, from)
53
-		from.Close()
54
-		to.Close()
55
-		wg.Done()
56
-	}
57
-
58
-	wg.Add(2)
59
-	go broker(clientC, backendC)
60
-	go broker(backendC, clientC)
61
-
62
-	finish := make(chan struct{})
63
-	go func() {
64
-		wg.Wait()
65
-		close(finish)
66
-	}()
67
-
68
-	select {
69
-	case <-quit:
70
-	case <-finish:
71
-	}
72
-	clientC.Close()
73
-	backendC.Close()
74
-	<-finish
75
-}
76
-
77
-// Run starts forwarding the traffic using SCTP.
78
-func (proxy *SCTPProxy) Run() {
79
-	quit := make(chan bool)
80
-	defer close(quit)
81
-	for {
82
-		client, err := proxy.listener.Accept()
83
-		if err != nil {
84
-			log.Printf("Stopping proxy on sctp/%v for sctp/%v (%s)", proxy.frontendAddr, proxy.backendAddr, err)
85
-			return
86
-		}
87
-		go proxy.clientLoop(client.(*sctp.SCTPConn), quit)
88
-	}
89
-}
90
-
91
-// Close stops forwarding the traffic.
92
-func (proxy *SCTPProxy) Close() { proxy.listener.Close() }
93
-
94
-// FrontendAddr returns the SCTP address on which the proxy is listening.
95
-func (proxy *SCTPProxy) FrontendAddr() net.Addr { return proxy.frontendAddr }
96
-
97
-// BackendAddr returns the SCTP proxied address.
98
-func (proxy *SCTPProxy) BackendAddr() net.Addr { return proxy.backendAddr }
99 1
deleted file mode 100644
... ...
@@ -1,94 +0,0 @@
1
-package main
2
-
3
-import (
4
-	"io"
5
-	"log"
6
-	"net"
7
-	"sync"
8
-)
9
-
10
-// TCPProxy is a proxy for TCP connections. It implements the Proxy interface to
11
-// handle TCP traffic forwarding between the frontend and backend addresses.
12
-type TCPProxy struct {
13
-	listener     *net.TCPListener
14
-	frontendAddr *net.TCPAddr
15
-	backendAddr  *net.TCPAddr
16
-}
17
-
18
-// NewTCPProxy creates a new TCPProxy.
19
-func NewTCPProxy(frontendAddr, backendAddr *net.TCPAddr) (*TCPProxy, error) {
20
-	// detect version of hostIP to bind only to correct version
21
-	ipVersion := ipv4
22
-	if frontendAddr.IP.To4() == nil {
23
-		ipVersion = ipv6
24
-	}
25
-	listener, err := net.ListenTCP("tcp"+string(ipVersion), frontendAddr)
26
-	if err != nil {
27
-		return nil, err
28
-	}
29
-	// If the port in frontendAddr was 0 then ListenTCP will have a picked
30
-	// a port to listen on, hence the call to Addr to get that actual port:
31
-	return &TCPProxy{
32
-		listener:     listener,
33
-		frontendAddr: listener.Addr().(*net.TCPAddr),
34
-		backendAddr:  backendAddr,
35
-	}, nil
36
-}
37
-
38
-func (proxy *TCPProxy) clientLoop(client *net.TCPConn, quit chan bool) {
39
-	backend, err := net.DialTCP("tcp", nil, proxy.backendAddr)
40
-	if err != nil {
41
-		log.Printf("Can't forward traffic to backend tcp/%v: %s\n", proxy.backendAddr, err)
42
-		client.Close()
43
-		return
44
-	}
45
-
46
-	var wg sync.WaitGroup
47
-	var broker = func(to, from *net.TCPConn) {
48
-		io.Copy(to, from)
49
-		from.CloseRead()
50
-		to.CloseWrite()
51
-		wg.Done()
52
-	}
53
-
54
-	wg.Add(2)
55
-	go broker(client, backend)
56
-	go broker(backend, client)
57
-
58
-	finish := make(chan struct{})
59
-	go func() {
60
-		wg.Wait()
61
-		close(finish)
62
-	}()
63
-
64
-	select {
65
-	case <-quit:
66
-	case <-finish:
67
-	}
68
-	client.Close()
69
-	backend.Close()
70
-	<-finish
71
-}
72
-
73
-// Run starts forwarding the traffic using TCP.
74
-func (proxy *TCPProxy) Run() {
75
-	quit := make(chan bool)
76
-	defer close(quit)
77
-	for {
78
-		client, err := proxy.listener.Accept()
79
-		if err != nil {
80
-			log.Printf("Stopping proxy on tcp/%v for tcp/%v (%s)", proxy.frontendAddr, proxy.backendAddr, err)
81
-			return
82
-		}
83
-		go proxy.clientLoop(client.(*net.TCPConn), quit)
84
-	}
85
-}
86
-
87
-// Close stops forwarding the traffic.
88
-func (proxy *TCPProxy) Close() { proxy.listener.Close() }
89
-
90
-// FrontendAddr returns the TCP address on which the proxy is listening.
91
-func (proxy *TCPProxy) FrontendAddr() net.Addr { return proxy.frontendAddr }
92
-
93
-// BackendAddr returns the TCP proxied address.
94
-func (proxy *TCPProxy) BackendAddr() net.Addr { return proxy.backendAddr }
95 1
deleted file mode 100644
... ...
@@ -1,173 +0,0 @@
1
-package main
2
-
3
-import (
4
-	"encoding/binary"
5
-	"log"
6
-	"net"
7
-	"strings"
8
-	"sync"
9
-	"syscall"
10
-	"time"
11
-)
12
-
13
-const (
14
-	// UDPConnTrackTimeout is the timeout used for UDP connection tracking
15
-	UDPConnTrackTimeout = 90 * time.Second
16
-	// UDPBufSize is the buffer size for the UDP proxy
17
-	UDPBufSize = 65507
18
-)
19
-
20
-// A net.Addr where the IP is split into two fields so you can use it as a key
21
-// in a map:
22
-type connTrackKey struct {
23
-	IPHigh uint64
24
-	IPLow  uint64
25
-	Port   int
26
-}
27
-
28
-func newConnTrackKey(addr *net.UDPAddr) *connTrackKey {
29
-	if len(addr.IP) == net.IPv4len {
30
-		return &connTrackKey{
31
-			IPHigh: 0,
32
-			IPLow:  uint64(binary.BigEndian.Uint32(addr.IP)),
33
-			Port:   addr.Port,
34
-		}
35
-	}
36
-	return &connTrackKey{
37
-		IPHigh: binary.BigEndian.Uint64(addr.IP[:8]),
38
-		IPLow:  binary.BigEndian.Uint64(addr.IP[8:]),
39
-		Port:   addr.Port,
40
-	}
41
-}
42
-
43
-type connTrackMap map[connTrackKey]*net.UDPConn
44
-
45
-// UDPProxy is proxy for which handles UDP datagrams. It implements the Proxy
46
-// interface to handle UDP traffic forwarding between the frontend and backend
47
-// addresses.
48
-type UDPProxy struct {
49
-	listener       *net.UDPConn
50
-	frontendAddr   *net.UDPAddr
51
-	backendAddr    *net.UDPAddr
52
-	connTrackTable connTrackMap
53
-	connTrackLock  sync.Mutex
54
-}
55
-
56
-// NewUDPProxy creates a new UDPProxy.
57
-func NewUDPProxy(frontendAddr, backendAddr *net.UDPAddr) (*UDPProxy, error) {
58
-	// detect version of hostIP to bind only to correct version
59
-	ipVersion := ipv4
60
-	if frontendAddr.IP.To4() == nil {
61
-		ipVersion = ipv6
62
-	}
63
-	listener, err := net.ListenUDP("udp"+string(ipVersion), frontendAddr)
64
-	if err != nil {
65
-		return nil, err
66
-	}
67
-	return &UDPProxy{
68
-		listener:       listener,
69
-		frontendAddr:   listener.LocalAddr().(*net.UDPAddr),
70
-		backendAddr:    backendAddr,
71
-		connTrackTable: make(connTrackMap),
72
-	}, nil
73
-}
74
-
75
-func (proxy *UDPProxy) replyLoop(proxyConn *net.UDPConn, clientAddr *net.UDPAddr, clientKey *connTrackKey) {
76
-	defer func() {
77
-		proxy.connTrackLock.Lock()
78
-		delete(proxy.connTrackTable, *clientKey)
79
-		proxy.connTrackLock.Unlock()
80
-		proxyConn.Close()
81
-	}()
82
-
83
-	readBuf := make([]byte, UDPBufSize)
84
-	for {
85
-		proxyConn.SetReadDeadline(time.Now().Add(UDPConnTrackTimeout))
86
-	again:
87
-		read, err := proxyConn.Read(readBuf)
88
-		if err != nil {
89
-			if err, ok := err.(*net.OpError); ok && err.Err == syscall.ECONNREFUSED {
90
-				// This will happen if the last write failed
91
-				// (e.g: nothing is actually listening on the
92
-				// proxied port on the container), ignore it
93
-				// and continue until UDPConnTrackTimeout
94
-				// expires:
95
-				goto again
96
-			}
97
-			return
98
-		}
99
-		for i := 0; i != read; {
100
-			written, err := proxy.listener.WriteToUDP(readBuf[i:read], clientAddr)
101
-			if err != nil {
102
-				return
103
-			}
104
-			i += written
105
-		}
106
-	}
107
-}
108
-
109
-// Run starts forwarding the traffic using UDP.
110
-func (proxy *UDPProxy) Run() {
111
-	readBuf := make([]byte, UDPBufSize)
112
-	for {
113
-		read, from, err := proxy.listener.ReadFromUDP(readBuf)
114
-		if err != nil {
115
-			// NOTE: Apparently ReadFrom doesn't return
116
-			// ECONNREFUSED like Read do (see comment in
117
-			// UDPProxy.replyLoop)
118
-			if !isClosedError(err) {
119
-				log.Printf("Stopping proxy on udp/%v for udp/%v (%s)", proxy.frontendAddr, proxy.backendAddr, err)
120
-			}
121
-			break
122
-		}
123
-
124
-		fromKey := newConnTrackKey(from)
125
-		proxy.connTrackLock.Lock()
126
-		proxyConn, hit := proxy.connTrackTable[*fromKey]
127
-		if !hit {
128
-			proxyConn, err = net.DialUDP("udp", nil, proxy.backendAddr)
129
-			if err != nil {
130
-				log.Printf("Can't proxy a datagram to udp/%s: %s\n", proxy.backendAddr, err)
131
-				proxy.connTrackLock.Unlock()
132
-				continue
133
-			}
134
-			proxy.connTrackTable[*fromKey] = proxyConn
135
-			go proxy.replyLoop(proxyConn, from, fromKey)
136
-		}
137
-		proxy.connTrackLock.Unlock()
138
-		for i := 0; i != read; {
139
-			written, err := proxyConn.Write(readBuf[i:read])
140
-			if err != nil {
141
-				log.Printf("Can't proxy a datagram to udp/%s: %s\n", proxy.backendAddr, err)
142
-				break
143
-			}
144
-			i += written
145
-		}
146
-	}
147
-}
148
-
149
-// Close stops forwarding the traffic.
150
-func (proxy *UDPProxy) Close() {
151
-	proxy.listener.Close()
152
-	proxy.connTrackLock.Lock()
153
-	defer proxy.connTrackLock.Unlock()
154
-	for _, conn := range proxy.connTrackTable {
155
-		conn.Close()
156
-	}
157
-}
158
-
159
-// FrontendAddr returns the UDP address on which the proxy is listening.
160
-func (proxy *UDPProxy) FrontendAddr() net.Addr { return proxy.frontendAddr }
161
-
162
-// BackendAddr returns the proxied UDP address.
163
-func (proxy *UDPProxy) BackendAddr() net.Addr { return proxy.backendAddr }
164
-
165
-func isClosedError(err error) bool {
166
-	/* This comparison is ugly, but unfortunately, net.go doesn't export errClosing.
167
-	 * See:
168
-	 * http://golang.org/src/pkg/net/net.go
169
-	 * https://code.google.com/p/go/issues/detail?id=4337
170
-	 * https://groups.google.com/forum/#!msg/golang-nuts/0_aaCvBmOcM/SptmDyX1XJMJ
171
-	 */
172
-	return strings.HasSuffix(err.Error(), "use of closed network connection")
173
-}