Browse code

Add support for UDP (closes #33)

API Changes
-----------

The port notation is extended to support "/udp" or "/tcp" at the *end*
of the specifier string (and defaults to tcp if "/tcp" or "/udp" are
missing)

`docker ps` now shows UDP ports as "frontend->backend/udp". Nothing
changes for TCP ports.

`docker inspect` now displays two sub-dictionaries: "Tcp" and "Udp",
under "PortMapping" in "NetworkSettings".

Theses changes stand true for the values returned by the HTTP API too.

This changeset will definitely break tools built upon the API (or upon
`docker inspect`). A less intrusive way to add UDP ports in `docker
inspect` would be to simply add "/udp" for UDP ports but it will still
break existing applications which tries to convert the whole field to an
integer. I believe that having two TCP/UDP sub-dictionaries is better
because it makes the whole thing more clear and more easy to parse right
away (i.e: you don't have to check the format of the string, split it
and convert the right part to an integer)

Code Changes
------------

Significant changes in network.go:

- A second PortAllocator is instantiated for the UDP range;
- PortMapper maintains separate mapping for TCP and UDP;
- The extPorts array in NetworkInterface is now an array of Nat objects
(so we can know on which protocol a given port was mapped when
NetworkInterface.Release() is called);
- TCP proxying on localhost has been moved away in network_proxy.go.

localhost proxy code rewrite in network_proxy.go:

We have to proxy the traffic between localhost:frontend-port and
container:backend-port because Netfilter doesn't work properly on the
loopback interface and DNAT iptable rules aren't applied there.

- Goroutines in the TCP proxying code are now explicitly stopped when
the proxy is stopped;
- UDP connection tracking using a map (more infos in [1]);
- Support for IPv6 (to be more accurate, the code is transparent to the
Go net package, so you can use, tcp/tcp4/tcp6/udp/udp4/udp6);
- Single Proxy interface for both UDP and TCP proxying;
- Full test suite.

[1] https://github.com/dotcloud/docker/issues/33#issuecomment-20010400

Louis Opter authored on 2013/06/12 07:46:23
Showing 6 changed files
... ...
@@ -202,20 +202,25 @@ func ParseRun(args []string, capabilities *Capabilities) (*Config, *HostConfig,
202 202
 	return config, hostConfig, cmd, nil
203 203
 }
204 204
 
205
+type portMapping map[string]string
206
+
205 207
 type NetworkSettings struct {
206 208
 	IPAddress   string
207 209
 	IPPrefixLen int
208 210
 	Gateway     string
209 211
 	Bridge      string
210
-	PortMapping map[string]string
212
+	PortMapping map[string]portMapping
211 213
 }
212 214
 
213 215
 // String returns a human-readable description of the port mapping defined in the settings
214 216
 func (settings *NetworkSettings) PortMappingHuman() string {
215 217
 	var mapping []string
216
-	for private, public := range settings.PortMapping {
218
+	for private, public := range settings.PortMapping["Tcp"] {
217 219
 		mapping = append(mapping, fmt.Sprintf("%s->%s", public, private))
218 220
 	}
221
+	for private, public := range settings.PortMapping["Udp"] {
222
+		mapping = append(mapping, fmt.Sprintf("%s->%s/udp", public, private))
223
+	}
219 224
 	sort.Strings(mapping)
220 225
 	return strings.Join(mapping, ", ")
221 226
 }
... ...
@@ -688,14 +693,18 @@ func (container *Container) allocateNetwork() error {
688 688
 	if err != nil {
689 689
 		return err
690 690
 	}
691
-	container.NetworkSettings.PortMapping = make(map[string]string)
691
+	container.NetworkSettings.PortMapping = make(map[string]portMapping)
692
+	container.NetworkSettings.PortMapping["Tcp"] = make(portMapping)
693
+	container.NetworkSettings.PortMapping["Udp"] = make(portMapping)
692 694
 	for _, spec := range container.Config.PortSpecs {
693 695
 		nat, err := iface.AllocatePort(spec)
694 696
 		if err != nil {
695 697
 			iface.Release()
696 698
 			return err
697 699
 		}
698
-		container.NetworkSettings.PortMapping[strconv.Itoa(nat.Backend)] = strconv.Itoa(nat.Frontend)
700
+		proto := strings.Title(nat.Proto)
701
+		backend, frontend := strconv.Itoa(nat.Backend), strconv.Itoa(nat.Frontend)
702
+		container.NetworkSettings.PortMapping[proto][backend] = frontend
699 703
 	}
700 704
 	container.network = iface
701 705
 	container.NetworkSettings.Bridge = container.runtime.networkManager.bridgeIface
... ...
@@ -5,7 +5,6 @@ import (
5 5
 	"errors"
6 6
 	"fmt"
7 7
 	"github.com/dotcloud/docker/utils"
8
-	"io"
9 8
 	"log"
10 9
 	"net"
11 10
 	"os/exec"
... ...
@@ -183,8 +182,10 @@ func getIfaceAddr(name string) (net.Addr, error) {
183 183
 // up iptables rules.
184 184
 // It keeps track of all mappings and is able to unmap at will
185 185
 type PortMapper struct {
186
-	mapping map[int]net.TCPAddr
187
-	proxies map[int]net.Listener
186
+	tcpMapping map[int]*net.TCPAddr
187
+	tcpProxies map[int]Proxy
188
+	udpMapping map[int]*net.UDPAddr
189
+	udpProxies map[int]Proxy
188 190
 }
189 191
 
190 192
 func (mapper *PortMapper) cleanup() error {
... ...
@@ -197,8 +198,10 @@ func (mapper *PortMapper) cleanup() error {
197 197
 	iptables("-t", "nat", "-D", "OUTPUT", "-j", "DOCKER")
198 198
 	iptables("-t", "nat", "-F", "DOCKER")
199 199
 	iptables("-t", "nat", "-X", "DOCKER")
200
-	mapper.mapping = make(map[int]net.TCPAddr)
201
-	mapper.proxies = make(map[int]net.Listener)
200
+	mapper.tcpMapping = make(map[int]*net.TCPAddr)
201
+	mapper.tcpProxies = make(map[int]Proxy)
202
+	mapper.udpMapping = make(map[int]*net.UDPAddr)
203
+	mapper.udpProxies = make(map[int]Proxy)
202 204
 	return nil
203 205
 }
204 206
 
... ...
@@ -215,76 +218,72 @@ func (mapper *PortMapper) setup() error {
215 215
 	return nil
216 216
 }
217 217
 
218
-func (mapper *PortMapper) iptablesForward(rule string, port int, dest net.TCPAddr) error {
219
-	return iptables("-t", "nat", rule, "DOCKER", "-p", "tcp", "--dport", strconv.Itoa(port),
220
-		"-j", "DNAT", "--to-destination", net.JoinHostPort(dest.IP.String(), strconv.Itoa(dest.Port)))
218
+func (mapper *PortMapper) iptablesForward(rule string, port int, proto string, dest_addr string, dest_port int) error {
219
+	return iptables("-t", "nat", rule, "DOCKER", "-p", proto, "--dport", strconv.Itoa(port),
220
+		"-j", "DNAT", "--to-destination", net.JoinHostPort(dest_addr, strconv.Itoa(dest_port)))
221 221
 }
222 222
 
223
-func (mapper *PortMapper) Map(port int, dest net.TCPAddr) error {
224
-	if err := mapper.iptablesForward("-A", port, dest); err != nil {
225
-		return err
226
-	}
227
-
228
-	mapper.mapping[port] = dest
229
-	listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port))
230
-	if err != nil {
231
-		mapper.Unmap(port)
232
-		return err
233
-	}
234
-	mapper.proxies[port] = listener
235
-	go proxy(listener, "tcp", dest.String())
236
-	return nil
237
-}
238
-
239
-// proxy listens for socket connections on `listener`, and forwards them unmodified
240
-// to `proto:address`
241
-func proxy(listener net.Listener, proto, address string) error {
242
-	utils.Debugf("proxying to %s:%s", proto, address)
243
-	defer utils.Debugf("Done proxying to %s:%s", proto, address)
244
-	for {
245
-		utils.Debugf("Listening on %s", listener)
246
-		src, err := listener.Accept()
223
+func (mapper *PortMapper) Map(port int, backendAddr net.Addr) error {
224
+	if _, isTCP := backendAddr.(*net.TCPAddr); isTCP {
225
+		backendPort := backendAddr.(*net.TCPAddr).Port
226
+		backendIP := backendAddr.(*net.TCPAddr).IP
227
+		if err := mapper.iptablesForward("-A", port, "tcp", backendIP.String(), backendPort); err != nil {
228
+			return err
229
+		}
230
+		mapper.tcpMapping[port] = backendAddr.(*net.TCPAddr)
231
+		proxy, err := NewProxy(&net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: port}, backendAddr)
247 232
 		if err != nil {
233
+			mapper.Unmap(port, "tcp")
248 234
 			return err
249 235
 		}
250
-		utils.Debugf("Connecting to %s:%s", proto, address)
251
-		dst, err := net.Dial(proto, address)
236
+		mapper.tcpProxies[port] = proxy
237
+		go proxy.Run()
238
+	} else {
239
+		backendPort := backendAddr.(*net.UDPAddr).Port
240
+		backendIP := backendAddr.(*net.UDPAddr).IP
241
+		if err := mapper.iptablesForward("-A", port, "udp", backendIP.String(), backendPort); err != nil {
242
+			return err
243
+		}
244
+		mapper.udpMapping[port] = backendAddr.(*net.UDPAddr)
245
+		proxy, err := NewProxy(&net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: port}, backendAddr)
252 246
 		if err != nil {
253
-			log.Printf("Error connecting to %s:%s: %s", proto, address, err)
254
-			src.Close()
255
-			continue
247
+			mapper.Unmap(port, "udp")
248
+			return err
256 249
 		}
257
-		utils.Debugf("Connected to backend, splicing")
258
-		splice(src, dst)
250
+		mapper.udpProxies[port] = proxy
251
+		go proxy.Run()
259 252
 	}
253
+	return nil
260 254
 }
261 255
 
262
-func halfSplice(dst, src net.Conn) error {
263
-	_, err := io.Copy(dst, src)
264
-	// FIXME: on EOF from a tcp connection, pass WriteClose()
265
-	dst.Close()
266
-	src.Close()
267
-	return err
268
-}
269
-
270
-func splice(a, b net.Conn) {
271
-	go halfSplice(a, b)
272
-	go halfSplice(b, a)
273
-}
274
-
275
-func (mapper *PortMapper) Unmap(port int) error {
276
-	dest, ok := mapper.mapping[port]
277
-	if !ok {
278
-		return errors.New("Port is not mapped")
279
-	}
280
-	if proxy, exists := mapper.proxies[port]; exists {
281
-		proxy.Close()
282
-		delete(mapper.proxies, port)
283
-	}
284
-	if err := mapper.iptablesForward("-D", port, dest); err != nil {
285
-		return err
256
+func (mapper *PortMapper) Unmap(port int, proto string) error {
257
+	if proto == "tcp" {
258
+		backendAddr, ok := mapper.tcpMapping[port]
259
+		if !ok {
260
+			return fmt.Errorf("Port tcp/%v is not mapped", port)
261
+		}
262
+		if proxy, exists := mapper.tcpProxies[port]; exists {
263
+			proxy.Close()
264
+			delete(mapper.tcpProxies, port)
265
+		}
266
+		if err := mapper.iptablesForward("-D", port, proto, backendAddr.IP.String(), backendAddr.Port); err != nil {
267
+			return err
268
+		}
269
+		delete(mapper.tcpMapping, port)
270
+	} else {
271
+		backendAddr, ok := mapper.udpMapping[port]
272
+		if !ok {
273
+			return fmt.Errorf("Port udp/%v is not mapped", port)
274
+		}
275
+		if proxy, exists := mapper.udpProxies[port]; exists {
276
+			proxy.Close()
277
+			delete(mapper.udpProxies, port)
278
+		}
279
+		if err := mapper.iptablesForward("-D", port, proto, backendAddr.IP.String(), backendAddr.Port); err != nil {
280
+			return err
281
+		}
282
+		delete(mapper.udpMapping, port)
286 283
 	}
287
-	delete(mapper.mapping, port)
288 284
 	return nil
289 285
 }
290 286
 
... ...
@@ -453,7 +452,7 @@ type NetworkInterface struct {
453 453
 	Gateway net.IP
454 454
 
455 455
 	manager  *NetworkManager
456
-	extPorts []int
456
+	extPorts []*Nat
457 457
 }
458 458
 
459 459
 // Allocate an external TCP port and map it to the interface
... ...
@@ -462,17 +461,32 @@ func (iface *NetworkInterface) AllocatePort(spec string) (*Nat, error) {
462 462
 	if err != nil {
463 463
 		return nil, err
464 464
 	}
465
-	// Allocate a random port if Frontend==0
466
-	extPort, err := iface.manager.portAllocator.Acquire(nat.Frontend)
467
-	if err != nil {
468
-		return nil, err
469
-	}
470
-	nat.Frontend = extPort
471
-	if err := iface.manager.portMapper.Map(nat.Frontend, net.TCPAddr{IP: iface.IPNet.IP, Port: nat.Backend}); err != nil {
472
-		iface.manager.portAllocator.Release(nat.Frontend)
473
-		return nil, err
465
+
466
+	if nat.Proto == "tcp" {
467
+		extPort, err := iface.manager.tcpPortAllocator.Acquire(nat.Frontend)
468
+		if err != nil {
469
+			return nil, err
470
+		}
471
+		backend := &net.TCPAddr{IP: iface.IPNet.IP, Port: nat.Backend}
472
+		if err := iface.manager.portMapper.Map(extPort, backend); err != nil {
473
+			iface.manager.tcpPortAllocator.Release(extPort)
474
+			return nil, err
475
+		}
476
+		nat.Frontend = extPort
477
+	} else {
478
+		extPort, err := iface.manager.udpPortAllocator.Acquire(nat.Frontend)
479
+		if err != nil {
480
+			return nil, err
481
+		}
482
+		backend := &net.UDPAddr{IP: iface.IPNet.IP, Port: nat.Backend}
483
+		if err := iface.manager.portMapper.Map(extPort, backend); err != nil {
484
+			iface.manager.udpPortAllocator.Release(extPort)
485
+			return nil, err
486
+		}
487
+		nat.Frontend = extPort
474 488
 	}
475
-	iface.extPorts = append(iface.extPorts, nat.Frontend)
489
+	iface.extPorts = append(iface.extPorts, nat)
490
+
476 491
 	return nat, nil
477 492
 }
478 493
 
... ...
@@ -485,6 +499,21 @@ type Nat struct {
485 485
 func parseNat(spec string) (*Nat, error) {
486 486
 	var nat Nat
487 487
 
488
+	if strings.Contains(spec, "/") {
489
+		specParts := strings.Split(spec, "/")
490
+		if len(specParts) != 2 {
491
+			return nil, fmt.Errorf("Invalid port format.")
492
+		}
493
+		proto := specParts[1]
494
+		spec = specParts[0]
495
+		if proto != "tcp" && proto != "udp" {
496
+			return nil, fmt.Errorf("Invalid port format: unknown protocol %v.", proto)
497
+		}
498
+		nat.Proto = proto
499
+	} else {
500
+		nat.Proto = "tcp"
501
+	}
502
+
488 503
 	if strings.Contains(spec, ":") {
489 504
 		specParts := strings.Split(spec, ":")
490 505
 		if len(specParts) != 2 {
... ...
@@ -517,20 +546,24 @@ func parseNat(spec string) (*Nat, error) {
517 517
 		}
518 518
 		nat.Backend = int(port)
519 519
 	}
520
-	nat.Proto = "tcp"
520
+
521 521
 	return &nat, nil
522 522
 }
523 523
 
524 524
 // Release: Network cleanup - release all resources
525 525
 func (iface *NetworkInterface) Release() {
526
-	for _, port := range iface.extPorts {
527
-		if err := iface.manager.portMapper.Unmap(port); err != nil {
528
-			log.Printf("Unable to unmap port %v: %v", port, err)
526
+	for _, nat := range iface.extPorts {
527
+		utils.Debugf("Unmaping %v/%v", nat.Proto, nat.Frontend)
528
+		if err := iface.manager.portMapper.Unmap(nat.Frontend, nat.Proto); err != nil {
529
+			log.Printf("Unable to unmap port %v/%v: %v", nat.Proto, nat.Frontend, err)
529 530
 		}
530
-		if err := iface.manager.portAllocator.Release(port); err != nil {
531
-			log.Printf("Unable to release port %v: %v", port, err)
531
+		if nat.Proto == "tcp" {
532
+			if err := iface.manager.tcpPortAllocator.Release(nat.Frontend); err != nil {
533
+				log.Printf("Unable to release port tcp/%v: %v", nat.Frontend, err)
534
+			}
535
+		} else if err := iface.manager.udpPortAllocator.Release(nat.Frontend); err != nil {
536
+			log.Printf("Unable to release port udp/%v: %v", nat.Frontend, err)
532 537
 		}
533
-
534 538
 	}
535 539
 
536 540
 	iface.manager.ipAllocator.Release(iface.IPNet.IP)
... ...
@@ -542,9 +575,10 @@ type NetworkManager struct {
542 542
 	bridgeIface   string
543 543
 	bridgeNetwork *net.IPNet
544 544
 
545
-	ipAllocator   *IPAllocator
546
-	portAllocator *PortAllocator
547
-	portMapper    *PortMapper
545
+	ipAllocator      *IPAllocator
546
+	tcpPortAllocator *PortAllocator
547
+	udpPortAllocator *PortAllocator
548
+	portMapper       *PortMapper
548 549
 }
549 550
 
550 551
 // Allocate a network interface
... ...
@@ -577,7 +611,11 @@ func newNetworkManager(bridgeIface string) (*NetworkManager, error) {
577 577
 
578 578
 	ipAllocator := newIPAllocator(network)
579 579
 
580
-	portAllocator, err := newPortAllocator()
580
+	tcpPortAllocator, err := newPortAllocator()
581
+	if err != nil {
582
+		return nil, err
583
+	}
584
+	udpPortAllocator, err := newPortAllocator()
581 585
 	if err != nil {
582 586
 		return nil, err
583 587
 	}
... ...
@@ -588,11 +626,12 @@ func newNetworkManager(bridgeIface string) (*NetworkManager, error) {
588 588
 	}
589 589
 
590 590
 	manager := &NetworkManager{
591
-		bridgeIface:   bridgeIface,
592
-		bridgeNetwork: network,
593
-		ipAllocator:   ipAllocator,
594
-		portAllocator: portAllocator,
595
-		portMapper:    portMapper,
591
+		bridgeIface:      bridgeIface,
592
+		bridgeNetwork:    network,
593
+		ipAllocator:      ipAllocator,
594
+		tcpPortAllocator: tcpPortAllocator,
595
+		udpPortAllocator: udpPortAllocator,
596
+		portMapper:       portMapper,
596 597
 	}
597 598
 	return manager, nil
598 599
 }
599 600
new file mode 100644
... ...
@@ -0,0 +1,257 @@
0
+package docker
1
+
2
+import (
3
+	"encoding/binary"
4
+	"fmt"
5
+	"github.com/dotcloud/docker/utils"
6
+	"io"
7
+	"log"
8
+	"net"
9
+	"sync"
10
+	"syscall"
11
+	"time"
12
+)
13
+
14
+const (
15
+	UDPConnTrackTimeout = 90 * time.Second
16
+	UDPBufSize          = 2048
17
+)
18
+
19
+type Proxy interface {
20
+	// Start forwarding traffic back and forth the front and back-end
21
+	// addresses.
22
+	Run()
23
+	// Stop forwarding traffic and close both ends of the Proxy.
24
+	Close()
25
+	// Return the address on which the proxy is listening.
26
+	FrontendAddr() net.Addr
27
+	// Return the proxied address.
28
+	BackendAddr() net.Addr
29
+}
30
+
31
+type TCPProxy struct {
32
+	listener     *net.TCPListener
33
+	frontendAddr *net.TCPAddr
34
+	backendAddr  *net.TCPAddr
35
+}
36
+
37
+func NewTCPProxy(frontendAddr, backendAddr *net.TCPAddr) (*TCPProxy, error) {
38
+	listener, err := net.ListenTCP("tcp", frontendAddr)
39
+	if err != nil {
40
+		return nil, err
41
+	}
42
+	// If the port in frontendAddr was 0 then ListenTCP will have a picked
43
+	// a port to listen on, hence the call to Addr to get that actual port:
44
+	return &TCPProxy{
45
+		listener:     listener,
46
+		frontendAddr: listener.Addr().(*net.TCPAddr),
47
+		backendAddr:  backendAddr,
48
+	}, nil
49
+}
50
+
51
+func (proxy *TCPProxy) clientLoop(client *net.TCPConn, quit chan bool) {
52
+	backend, err := net.DialTCP("tcp", nil, proxy.backendAddr)
53
+	if err != nil {
54
+		log.Printf("Can't forward traffic to backend tcp/%v: %v\n", proxy.backendAddr, err.Error())
55
+		client.Close()
56
+		return
57
+	}
58
+
59
+	event := make(chan int64)
60
+	var broker = func(to, from *net.TCPConn) {
61
+		written, err := io.Copy(to, from)
62
+		if err != nil {
63
+			err, ok := err.(*net.OpError)
64
+			// If the socket we are writing to is shutdown with
65
+			// SHUT_WR, forward it to the other end of the pipe:
66
+			if ok && err.Err == syscall.EPIPE {
67
+				from.CloseWrite()
68
+			}
69
+		}
70
+		event <- written
71
+	}
72
+	utils.Debugf("Forwarding traffic between tcp/%v and tcp/%v", client.RemoteAddr(), backend.RemoteAddr())
73
+	go broker(client, backend)
74
+	go broker(backend, client)
75
+
76
+	var transferred int64 = 0
77
+	for i := 0; i < 2; i++ {
78
+		select {
79
+		case written := <-event:
80
+			transferred += written
81
+		case <-quit:
82
+			// Interrupt the two brokers and "join" them.
83
+			client.Close()
84
+			backend.Close()
85
+			for ; i < 2; i++ {
86
+				transferred += <-event
87
+			}
88
+			goto done
89
+		}
90
+	}
91
+	client.Close()
92
+	backend.Close()
93
+done:
94
+	utils.Debugf("%v bytes transferred between tcp/%v and tcp/%v", transferred, client.RemoteAddr(), backend.RemoteAddr())
95
+}
96
+
97
+func (proxy *TCPProxy) Run() {
98
+	quit := make(chan bool)
99
+	defer close(quit)
100
+	utils.Debugf("Starting proxy on tcp/%v for tcp/%v", proxy.frontendAddr, proxy.backendAddr)
101
+	for {
102
+		client, err := proxy.listener.Accept()
103
+		if err != nil {
104
+			utils.Debugf("Stopping proxy on tcp/%v for tcp/%v (%v)", proxy.frontendAddr, proxy.backendAddr, err.Error())
105
+			return
106
+		}
107
+		go proxy.clientLoop(client.(*net.TCPConn), quit)
108
+	}
109
+}
110
+
111
+func (proxy *TCPProxy) Close()                 { proxy.listener.Close() }
112
+func (proxy *TCPProxy) FrontendAddr() net.Addr { return proxy.frontendAddr }
113
+func (proxy *TCPProxy) BackendAddr() net.Addr  { return proxy.backendAddr }
114
+
115
+// A net.Addr where the IP is split into two fields so you can use it as a key
116
+// in a map:
117
+type connTrackKey struct {
118
+	IPHigh uint64
119
+	IPLow  uint64
120
+	Port   int
121
+}
122
+
123
+func newConnTrackKey(addr *net.UDPAddr) *connTrackKey {
124
+	if len(addr.IP) == net.IPv4len {
125
+		return &connTrackKey{
126
+			IPHigh: 0,
127
+			IPLow:  uint64(binary.BigEndian.Uint32(addr.IP)),
128
+			Port:   addr.Port,
129
+		}
130
+	}
131
+	return &connTrackKey{
132
+		IPHigh: binary.BigEndian.Uint64(addr.IP[:8]),
133
+		IPLow:  binary.BigEndian.Uint64(addr.IP[8:]),
134
+		Port:   addr.Port,
135
+	}
136
+}
137
+
138
+type connTrackMap map[connTrackKey]*net.UDPConn
139
+
140
+type UDPProxy struct {
141
+	listener       *net.UDPConn
142
+	frontendAddr   *net.UDPAddr
143
+	backendAddr    *net.UDPAddr
144
+	connTrackTable connTrackMap
145
+	connTrackLock  sync.Mutex
146
+}
147
+
148
+func NewUDPProxy(frontendAddr, backendAddr *net.UDPAddr) (*UDPProxy, error) {
149
+	listener, err := net.ListenUDP("udp", frontendAddr)
150
+	if err != nil {
151
+		return nil, err
152
+	}
153
+	return &UDPProxy{
154
+		listener:       listener,
155
+		frontendAddr:   listener.LocalAddr().(*net.UDPAddr),
156
+		backendAddr:    backendAddr,
157
+		connTrackTable: make(connTrackMap),
158
+	}, nil
159
+}
160
+
161
+func (proxy *UDPProxy) replyLoop(proxyConn *net.UDPConn, clientAddr *net.UDPAddr, clientKey *connTrackKey) {
162
+	defer func() {
163
+		proxy.connTrackLock.Lock()
164
+		delete(proxy.connTrackTable, *clientKey)
165
+		proxy.connTrackLock.Unlock()
166
+		utils.Debugf("Done proxying between udp/%v and udp/%v", clientAddr.String(), proxy.backendAddr.String())
167
+		proxyConn.Close()
168
+	}()
169
+
170
+	readBuf := make([]byte, UDPBufSize)
171
+	for {
172
+		proxyConn.SetReadDeadline(time.Now().Add(UDPConnTrackTimeout))
173
+	again:
174
+		read, err := proxyConn.Read(readBuf)
175
+		if err != nil {
176
+			if err, ok := err.(*net.OpError); ok && err.Err == syscall.ECONNREFUSED {
177
+				// This will happen if the last write failed
178
+				// (e.g: nothing is actually listening on the
179
+				// proxied port on the container), ignore it
180
+				// and continue until UDPConnTrackTimeout
181
+				// expires:
182
+				goto again
183
+			}
184
+			return
185
+		}
186
+		for i := 0; i != read; {
187
+			written, err := proxy.listener.WriteToUDP(readBuf[i:read], clientAddr)
188
+			if err != nil {
189
+				return
190
+			}
191
+			i += written
192
+			utils.Debugf("Forwarded %v/%v bytes to udp/%v", i, read, clientAddr.String())
193
+		}
194
+	}
195
+}
196
+
197
+func (proxy *UDPProxy) Run() {
198
+	readBuf := make([]byte, UDPBufSize)
199
+	utils.Debugf("Starting proxy on udp/%v for udp/%v", proxy.frontendAddr, proxy.backendAddr)
200
+	for {
201
+		read, from, err := proxy.listener.ReadFromUDP(readBuf)
202
+		if err != nil {
203
+			// NOTE: Apparently ReadFrom doesn't return
204
+			// ECONNREFUSED like Read do (see comment in
205
+			// UDPProxy.replyLoop)
206
+			utils.Debugf("Stopping proxy on udp/%v for udp/%v (%v)", proxy.frontendAddr, proxy.backendAddr, err.Error())
207
+			break
208
+		}
209
+
210
+		fromKey := newConnTrackKey(from)
211
+		proxy.connTrackLock.Lock()
212
+		proxyConn, hit := proxy.connTrackTable[*fromKey]
213
+		if !hit {
214
+			proxyConn, err = net.DialUDP("udp", nil, proxy.backendAddr)
215
+			if err != nil {
216
+				log.Printf("Can't proxy a datagram to udp/%s: %v\n", proxy.backendAddr.String(), err)
217
+				continue
218
+			}
219
+			proxy.connTrackTable[*fromKey] = proxyConn
220
+			go proxy.replyLoop(proxyConn, from, fromKey)
221
+		}
222
+		proxy.connTrackLock.Unlock()
223
+		for i := 0; i != read; {
224
+			written, err := proxyConn.Write(readBuf[i:read])
225
+			if err != nil {
226
+				log.Printf("Can't proxy a datagram to udp/%s: %v\n", proxy.backendAddr.String(), err)
227
+				break
228
+			}
229
+			i += written
230
+			utils.Debugf("Forwarded %v/%v bytes to udp/%v", i, read, proxy.backendAddr.String())
231
+		}
232
+	}
233
+}
234
+
235
+func (proxy *UDPProxy) Close() {
236
+	proxy.listener.Close()
237
+	proxy.connTrackLock.Lock()
238
+	defer proxy.connTrackLock.Unlock()
239
+	for _, conn := range proxy.connTrackTable {
240
+		conn.Close()
241
+	}
242
+}
243
+
244
+func (proxy *UDPProxy) FrontendAddr() net.Addr { return proxy.frontendAddr }
245
+func (proxy *UDPProxy) BackendAddr() net.Addr  { return proxy.backendAddr }
246
+
247
+func NewProxy(frontendAddr, backendAddr net.Addr) (Proxy, error) {
248
+	switch frontendAddr.(type) {
249
+	case *net.UDPAddr:
250
+		return NewUDPProxy(frontendAddr.(*net.UDPAddr), backendAddr.(*net.UDPAddr))
251
+	case *net.TCPAddr:
252
+		return NewTCPProxy(frontendAddr.(*net.TCPAddr), backendAddr.(*net.TCPAddr))
253
+	default:
254
+		panic(fmt.Errorf("Unsupported protocol"))
255
+	}
256
+}
0 257
new file mode 100644
... ...
@@ -0,0 +1,221 @@
0
+package docker
1
+
2
+import (
3
+	"bytes"
4
+	"fmt"
5
+	"io"
6
+	"net"
7
+	"strings"
8
+	"testing"
9
+	"time"
10
+)
11
+
12
+var testBuf = []byte("Buffalo buffalo Buffalo buffalo buffalo buffalo Buffalo buffalo")
13
+var testBufSize = len(testBuf)
14
+
15
+type EchoServer interface {
16
+	Run()
17
+	Close()
18
+	LocalAddr() net.Addr
19
+}
20
+
21
+type TCPEchoServer struct {
22
+	listener net.Listener
23
+	testCtx  *testing.T
24
+}
25
+
26
+type UDPEchoServer struct {
27
+	conn    net.PacketConn
28
+	testCtx *testing.T
29
+}
30
+
31
+func NewEchoServer(t *testing.T, proto, address string) EchoServer {
32
+	var server EchoServer
33
+	if strings.HasPrefix(proto, "tcp") {
34
+		listener, err := net.Listen(proto, address)
35
+		if err != nil {
36
+			t.Fatal(err)
37
+		}
38
+		server = &TCPEchoServer{listener: listener, testCtx: t}
39
+	} else {
40
+		socket, err := net.ListenPacket(proto, address)
41
+		if err != nil {
42
+			t.Fatal(err)
43
+		}
44
+		server = &UDPEchoServer{conn: socket, testCtx: t}
45
+	}
46
+	t.Logf("EchoServer listening on %v/%v\n", proto, server.LocalAddr().String())
47
+	return server
48
+}
49
+
50
+func (server *TCPEchoServer) Run() {
51
+	go func() {
52
+		for {
53
+			client, err := server.listener.Accept()
54
+			if err != nil {
55
+				return
56
+			}
57
+			go func(client net.Conn) {
58
+				server.testCtx.Logf("TCP client accepted on the EchoServer\n")
59
+				written, err := io.Copy(client, client)
60
+				server.testCtx.Logf("%v bytes echoed back to the client\n", written)
61
+				if err != nil {
62
+					server.testCtx.Logf("can't echo to the client: %v\n", err.Error())
63
+				}
64
+				client.Close()
65
+			}(client)
66
+		}
67
+	}()
68
+}
69
+
70
+func (server *TCPEchoServer) LocalAddr() net.Addr { return server.listener.Addr() }
71
+func (server *TCPEchoServer) Close()              { server.listener.Addr() }
72
+
73
+func (server *UDPEchoServer) Run() {
74
+	go func() {
75
+		readBuf := make([]byte, 1024)
76
+		for {
77
+			read, from, err := server.conn.ReadFrom(readBuf)
78
+			if err != nil {
79
+				return
80
+			}
81
+			server.testCtx.Logf("Writing UDP datagram back")
82
+			for i := 0; i != read; {
83
+				written, err := server.conn.WriteTo(readBuf[i:read], from)
84
+				if err != nil {
85
+					break
86
+				}
87
+				i += written
88
+			}
89
+		}
90
+	}()
91
+}
92
+
93
+func (server *UDPEchoServer) LocalAddr() net.Addr { return server.conn.LocalAddr() }
94
+func (server *UDPEchoServer) Close()              { server.conn.Close() }
95
+
96
+func testProxyAt(t *testing.T, proto string, proxy Proxy, addr string) {
97
+	defer proxy.Close()
98
+	go proxy.Run()
99
+	client, err := net.Dial(proto, addr)
100
+	if err != nil {
101
+		t.Fatalf("Can't connect to the proxy: %v", err)
102
+	}
103
+	defer client.Close()
104
+	client.SetDeadline(time.Now().Add(10 * time.Second))
105
+	if _, err = client.Write(testBuf); err != nil {
106
+		t.Fatal(err)
107
+	}
108
+	recvBuf := make([]byte, testBufSize)
109
+	if _, err = client.Read(recvBuf); err != nil {
110
+		t.Fatal(err)
111
+	}
112
+	if !bytes.Equal(testBuf, recvBuf) {
113
+		t.Fatal(fmt.Errorf("Expected [%v] but got [%v]", testBuf, recvBuf))
114
+	}
115
+}
116
+
117
+func testProxy(t *testing.T, proto string, proxy Proxy) {
118
+	testProxyAt(t, proto, proxy, proxy.FrontendAddr().String())
119
+}
120
+
121
+func TestTCP4Proxy(t *testing.T) {
122
+	backend := NewEchoServer(t, "tcp", "127.0.0.1:0")
123
+	defer backend.Close()
124
+	backend.Run()
125
+	frontendAddr := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
126
+	proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
127
+	if err != nil {
128
+		t.Fatal(err)
129
+	}
130
+	testProxy(t, "tcp", proxy)
131
+}
132
+
133
+func TestTCP6Proxy(t *testing.T) {
134
+	backend := NewEchoServer(t, "tcp", "[::1]:0")
135
+	defer backend.Close()
136
+	backend.Run()
137
+	frontendAddr := &net.TCPAddr{IP: net.IPv6loopback, Port: 0}
138
+	proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
139
+	if err != nil {
140
+		t.Fatal(err)
141
+	}
142
+	testProxy(t, "tcp", proxy)
143
+}
144
+
145
+func TestTCPDualStackProxy(t *testing.T) {
146
+	// If I understand `godoc -src net favoriteAddrFamily` (used by the
147
+	// net.Listen* functions) correctly this should work, but it doesn't.
148
+	t.Skip("No support for dual stack yet")
149
+	backend := NewEchoServer(t, "tcp", "[::1]:0")
150
+	defer backend.Close()
151
+	backend.Run()
152
+	frontendAddr := &net.TCPAddr{IP: net.IPv6loopback, Port: 0}
153
+	proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
154
+	if err != nil {
155
+		t.Fatal(err)
156
+	}
157
+	ipv4ProxyAddr := &net.TCPAddr{
158
+		IP:   net.IPv4(127, 0, 0, 1),
159
+		Port: proxy.FrontendAddr().(*net.TCPAddr).Port,
160
+	}
161
+	testProxyAt(t, "tcp", proxy, ipv4ProxyAddr.String())
162
+}
163
+
164
+func TestUDP4Proxy(t *testing.T) {
165
+	backend := NewEchoServer(t, "udp", "127.0.0.1:0")
166
+	defer backend.Close()
167
+	backend.Run()
168
+	frontendAddr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
169
+	proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
170
+	if err != nil {
171
+		t.Fatal(err)
172
+	}
173
+	testProxy(t, "udp", proxy)
174
+}
175
+
176
+func TestUDP6Proxy(t *testing.T) {
177
+	backend := NewEchoServer(t, "udp", "[::1]:0")
178
+	defer backend.Close()
179
+	backend.Run()
180
+	frontendAddr := &net.UDPAddr{IP: net.IPv6loopback, Port: 0}
181
+	proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
182
+	if err != nil {
183
+		t.Fatal(err)
184
+	}
185
+	testProxy(t, "udp", proxy)
186
+}
187
+
188
+func TestUDPWriteError(t *testing.T) {
189
+	frontendAddr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
190
+	// Hopefully, this port will be free: */
191
+	backendAddr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 25587}
192
+	proxy, err := NewProxy(frontendAddr, backendAddr)
193
+	if err != nil {
194
+		t.Fatal(err)
195
+	}
196
+	defer proxy.Close()
197
+	go proxy.Run()
198
+	client, err := net.Dial("udp", "127.0.0.1:25587")
199
+	if err != nil {
200
+		t.Fatalf("Can't connect to the proxy: %v", err)
201
+	}
202
+	defer client.Close()
203
+	// Make sure the proxy doesn't stop when there is no actual backend:
204
+	client.Write(testBuf)
205
+	client.Write(testBuf)
206
+	backend := NewEchoServer(t, "udp", "127.0.0.1:25587")
207
+	defer backend.Close()
208
+	backend.Run()
209
+	client.SetDeadline(time.Now().Add(10 * time.Second))
210
+	if _, err = client.Write(testBuf); err != nil {
211
+		t.Fatal(err)
212
+	}
213
+	recvBuf := make([]byte, testBufSize)
214
+	if _, err = client.Read(recvBuf); err != nil {
215
+		t.Fatal(err)
216
+	}
217
+	if !bytes.Equal(testBuf, recvBuf) {
218
+		t.Fatal(fmt.Errorf("Expected [%v] but got [%v]", testBuf, recvBuf))
219
+	}
220
+}
... ...
@@ -20,28 +20,97 @@ func TestIptables(t *testing.T) {
20 20
 
21 21
 func TestParseNat(t *testing.T) {
22 22
 	if nat, err := parseNat("4500"); err == nil {
23
-		if nat.Frontend != 0 || nat.Backend != 4500 {
24
-			t.Errorf("-p 4500 should produce 0->4500, got %d->%d", nat.Frontend, nat.Backend)
23
+		if nat.Frontend != 0 || nat.Backend != 4500 || nat.Proto != "tcp" {
24
+			t.Errorf("-p 4500 should produce 0->4500/tcp, got %d->%d/%s",
25
+				nat.Frontend, nat.Backend, nat.Proto)
25 26
 		}
26 27
 	} else {
27 28
 		t.Fatal(err)
28 29
 	}
29 30
 
30 31
 	if nat, err := parseNat(":4501"); err == nil {
31
-		if nat.Frontend != 4501 || nat.Backend != 4501 {
32
-			t.Errorf("-p :4501 should produce 4501->4501, got %d->%d", nat.Frontend, nat.Backend)
32
+		if nat.Frontend != 4501 || nat.Backend != 4501 || nat.Proto != "tcp" {
33
+			t.Errorf("-p :4501 should produce 4501->4501/tcp, got %d->%d/%s",
34
+				nat.Frontend, nat.Backend, nat.Proto)
33 35
 		}
34 36
 	} else {
35 37
 		t.Fatal(err)
36 38
 	}
37 39
 
38 40
 	if nat, err := parseNat("4502:4503"); err == nil {
39
-		if nat.Frontend != 4502 || nat.Backend != 4503 {
40
-			t.Errorf("-p 4502:4503 should produce 4502->4503, got %d->%d", nat.Frontend, nat.Backend)
41
+		if nat.Frontend != 4502 || nat.Backend != 4503 || nat.Proto != "tcp" {
42
+			t.Errorf("-p 4502:4503 should produce 4502->4503/tcp, got %d->%d/%s",
43
+				nat.Frontend, nat.Backend, nat.Proto)
41 44
 		}
42 45
 	} else {
43 46
 		t.Fatal(err)
44 47
 	}
48
+
49
+	if nat, err := parseNat("4502:4503/tcp"); err == nil {
50
+		if nat.Frontend != 4502 || nat.Backend != 4503 || nat.Proto != "tcp" {
51
+			t.Errorf("-p 4502:4503/tcp should produce 4502->4503/tcp, got %d->%d/%s",
52
+				nat.Frontend, nat.Backend, nat.Proto)
53
+		}
54
+	} else {
55
+		t.Fatal(err)
56
+	}
57
+
58
+	if nat, err := parseNat("4502:4503/udp"); err == nil {
59
+		if nat.Frontend != 4502 || nat.Backend != 4503 || nat.Proto != "udp" {
60
+			t.Errorf("-p 4502:4503/udp should produce 4502->4503/udp, got %d->%d/%s",
61
+				nat.Frontend, nat.Backend, nat.Proto)
62
+		}
63
+	} else {
64
+		t.Fatal(err)
65
+	}
66
+
67
+	if nat, err := parseNat(":4503/udp"); err == nil {
68
+		if nat.Frontend != 4503 || nat.Backend != 4503 || nat.Proto != "udp" {
69
+			t.Errorf("-p :4503/udp should produce 4503->4503/udp, got %d->%d/%s",
70
+				nat.Frontend, nat.Backend, nat.Proto)
71
+		}
72
+	} else {
73
+		t.Fatal(err)
74
+	}
75
+
76
+	if nat, err := parseNat(":4503/tcp"); err == nil {
77
+		if nat.Frontend != 4503 || nat.Backend != 4503 || nat.Proto != "tcp" {
78
+			t.Errorf("-p :4503/tcp should produce 4503->4503/tcp, got %d->%d/%s",
79
+				nat.Frontend, nat.Backend, nat.Proto)
80
+		}
81
+	} else {
82
+		t.Fatal(err)
83
+	}
84
+
85
+	if nat, err := parseNat("4503/tcp"); err == nil {
86
+		if nat.Frontend != 0 || nat.Backend != 4503 || nat.Proto != "tcp" {
87
+			t.Errorf("-p 4503/tcp should produce 0->4503/tcp, got %d->%d/%s",
88
+				nat.Frontend, nat.Backend, nat.Proto)
89
+		}
90
+	} else {
91
+		t.Fatal(err)
92
+	}
93
+
94
+	if nat, err := parseNat("4503/udp"); err == nil {
95
+		if nat.Frontend != 0 || nat.Backend != 4503 || nat.Proto != "udp" {
96
+			t.Errorf("-p 4503/udp should produce 0->4503/udp, got %d->%d/%s",
97
+				nat.Frontend, nat.Backend, nat.Proto)
98
+		}
99
+	} else {
100
+		t.Fatal(err)
101
+	}
102
+
103
+	if _, err := parseNat("4503/tcpgarbage"); err == nil {
104
+		t.Fatal(err)
105
+	}
106
+
107
+	if _, err := parseNat("4503/tcp/udp"); err == nil {
108
+		t.Fatal(err)
109
+	}
110
+
111
+	if _, err := parseNat("4503/"); err == nil {
112
+		t.Fatal(err)
113
+	}
45 114
 }
46 115
 
47 116
 func TestPortAllocation(t *testing.T) {
... ...
@@ -1,6 +1,7 @@
1 1
 package docker
2 2
 
3 3
 import (
4
+	"bytes"
4 5
 	"fmt"
5 6
 	"github.com/dotcloud/docker/utils"
6 7
 	"io"
... ...
@@ -17,12 +18,12 @@ import (
17 17
 )
18 18
 
19 19
 const (
20
-	unitTestImageName     = "docker-unit-tests"
21
-	unitTestImageID       = "e9aa60c60128cad1"
22
-	unitTestNetworkBridge = "testdockbr0"
23
-	unitTestStoreBase     = "/var/lib/docker/unit-tests"
24
-	testDaemonAddr        = "127.0.0.1:4270"
25
-	testDaemonProto       = "tcp"
20
+	unitTestImageName	= "docker-test-image"
21
+	unitTestImageID		= "83599e29c455eb719f77d799bc7c51521b9551972f5a850d7ad265bc1b5292f6" // 1.0
22
+	unitTestNetworkBridge	= "testdockbr0"
23
+	unitTestStoreBase	= "/var/lib/docker/unit-tests"
24
+	testDaemonAddr		= "127.0.0.1:4270"
25
+	testDaemonProto		= "tcp"
26 26
 )
27 27
 
28 28
 var globalRuntime *Runtime
... ...
@@ -321,52 +322,47 @@ func TestGet(t *testing.T) {
321 321
 
322 322
 }
323 323
 
324
-func findAvailablePort(runtime *Runtime, port int) (*Container, error) {
325
-	strPort := strconv.Itoa(port)
326
-	container, err := NewBuilder(runtime).Create(&Config{
327
-		Image:     GetTestImage(runtime).ID,
328
-		Cmd:       []string{"sh", "-c", "echo well hello there | nc -l -p " + strPort},
329
-		PortSpecs: []string{strPort},
330
-	},
331
-	)
332
-	if err != nil {
333
-		return nil, err
334
-	}
335
-	hostConfig := &HostConfig{}
336
-	if err := container.Start(hostConfig); err != nil {
337
-		if strings.Contains(err.Error(), "address already in use") {
338
-			return nil, nil
339
-		}
340
-		return nil, err
341
-	}
342
-	return container, nil
343
-}
344
-
345
-// Run a container with a TCP port allocated, and test that it can receive connections on localhost
346
-func TestAllocatePortLocalhost(t *testing.T) {
324
+func startEchoServerContainer(t *testing.T, proto string) (*Runtime, *Container, string) {
347 325
 	runtime, err := newTestRuntime()
348 326
 	if err != nil {
349 327
 		t.Fatal(err)
350 328
 	}
351
-	port := 5554
352 329
 
330
+	port := 5554
353 331
 	var container *Container
332
+	var strPort string
354 333
 	for {
355 334
 		port += 1
356
-		log.Println("Trying port", port)
357
-		t.Log("Trying port", port)
358
-		container, err = findAvailablePort(runtime, port)
335
+		strPort = strconv.Itoa(port)
336
+		var cmd string
337
+		if proto == "tcp" {
338
+			cmd = "socat TCP-LISTEN:" + strPort + ",reuseaddr,fork EXEC:/bin/cat"
339
+		} else if proto == "udp" {
340
+			cmd = "socat UDP-RECVFROM:" + strPort + ",fork EXEC:/bin/cat"
341
+		} else {
342
+			t.Fatal(fmt.Errorf("Unknown protocol %v", proto))
343
+		}
344
+		t.Log("Trying port", strPort)
345
+		container, err = NewBuilder(runtime).Create(&Config{
346
+			Image:     GetTestImage(runtime).ID,
347
+			Cmd:       []string{"sh", "-c", cmd},
348
+			PortSpecs: []string{fmt.Sprintf("%s/%s", strPort, proto)},
349
+		})
359 350
 		if container != nil {
360 351
 			break
361 352
 		}
362 353
 		if err != nil {
354
+			nuke(runtime)
363 355
 			t.Fatal(err)
364 356
 		}
365
-		log.Println("Port", port, "already in use")
366
-		t.Log("Port", port, "already in use")
357
+		t.Logf("Port %v already in use", strPort)
367 358
 	}
368 359
 
369
-	defer container.Kill()
360
+	hostConfig := &HostConfig{}
361
+	if err := container.Start(hostConfig); err != nil {
362
+		nuke(runtime)
363
+		t.Fatal(err)
364
+	}
370 365
 
371 366
 	setTimeout(t, "Waiting for the container to be started timed out", 2*time.Second, func() {
372 367
 		for !container.State.Running {
... ...
@@ -377,26 +373,70 @@ func TestAllocatePortLocalhost(t *testing.T) {
377 377
 	// Even if the state is running, lets give some time to lxc to spawn the process
378 378
 	container.WaitTimeout(500 * time.Millisecond)
379 379
 
380
-	conn, err := net.Dial("tcp",
381
-		fmt.Sprintf(
382
-			"localhost:%s", container.NetworkSettings.PortMapping[strconv.Itoa(port)],
383
-		),
384
-	)
380
+	strPort = container.NetworkSettings.PortMapping[strings.Title(proto)][strPort]
381
+	return runtime, container, strPort
382
+}
383
+
384
+// Run a container with a TCP port allocated, and test that it can receive connections on localhost
385
+func TestAllocateTCPPortLocalhost(t *testing.T) {
386
+	runtime, container, port := startEchoServerContainer(t, "tcp")
387
+	defer nuke(runtime)
388
+	defer container.Kill()
389
+
390
+	conn, err := net.Dial("tcp", fmt.Sprintf("localhost:%v", port))
385 391
 	if err != nil {
386 392
 		t.Fatal(err)
387 393
 	}
388 394
 	defer conn.Close()
389
-	output, err := ioutil.ReadAll(conn)
395
+
396
+	input := bytes.NewBufferString("well hello there\n")
397
+	_, err = conn.Write(input.Bytes())
390 398
 	if err != nil {
391 399
 		t.Fatal(err)
392 400
 	}
393
-	if string(output) != "well hello there\n" {
394
-		t.Fatalf("Received wrong output from network connection: should be '%s', not '%s'",
395
-			"well hello there\n",
396
-			string(output),
397
-		)
401
+	buf := make([]byte, 16)
402
+	read := 0
403
+	conn.SetReadDeadline(time.Now().Add(2 * time.Second))
404
+	read, err = conn.Read(buf)
405
+	if err != nil {
406
+		t.Fatal(err)
398 407
 	}
399
-	container.Wait()
408
+	output := string(buf[:read])
409
+	if !strings.Contains(output, "well hello there") {
410
+		t.Fatal(fmt.Errorf("[%v] doesn't contain [well hello there]", output))
411
+	}
412
+}
413
+
414
+// Run a container with a TCP port allocated, and test that it can receive connections on localhost
415
+func TestAllocateUDPPortLocalhost(t *testing.T) {
416
+	runtime, container, port := startEchoServerContainer(t, "udp")
417
+	defer nuke(runtime)
418
+	defer container.Kill()
419
+
420
+	conn, err := net.Dial("udp", fmt.Sprintf("localhost:%v", port))
421
+	if err != nil {
422
+		t.Fatal(err)
423
+	}
424
+	defer conn.Close()
425
+
426
+	input := bytes.NewBufferString("well hello there\n")
427
+	buf := make([]byte, 16)
428
+	for i := 0; i != 10; i++ {
429
+		_, err := conn.Write(input.Bytes())
430
+		if err != nil {
431
+			t.Fatal(err)
432
+		}
433
+		conn.SetReadDeadline(time.Now().Add(200 * time.Millisecond))
434
+		read, err := conn.Read(buf)
435
+		if err == nil {
436
+			output := string(buf[:read])
437
+			if strings.Contains(output, "well hello there") {
438
+				return
439
+			}
440
+		}
441
+	}
442
+
443
+	t.Fatal("No reply from the container")
400 444
 }
401 445
 
402 446
 func TestRestore(t *testing.T) {