Browse code

Move WriteBroadcaster to separate package as BroadcastWriter

Docker-DCO-1.1-Signed-off-by: Alexandr Morozov <lk4d4math@gmail.com> (github: LK4D4)

LK4D4 authored on 2014/07/02 23:48:37
Showing 6 changed files
... ...
@@ -29,6 +29,7 @@ import (
29 29
 	"github.com/dotcloud/docker/pkg/symlink"
30 30
 	"github.com/dotcloud/docker/runconfig"
31 31
 	"github.com/dotcloud/docker/utils"
32
+	"github.com/dotcloud/docker/utils/broadcastwriter"
32 33
 )
33 34
 
34 35
 const DefaultPathEnv = "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
... ...
@@ -66,8 +67,8 @@ type Container struct {
66 66
 	ExecDriver     string
67 67
 
68 68
 	command   *execdriver.Command
69
-	stdout    *utils.WriteBroadcaster
70
-	stderr    *utils.WriteBroadcaster
69
+	stdout    *broadcastwriter.BroadcastWriter
70
+	stderr    *broadcastwriter.BroadcastWriter
71 71
 	stdin     io.ReadCloser
72 72
 	stdinPipe io.WriteCloser
73 73
 
... ...
@@ -502,10 +503,10 @@ func (container *Container) cleanup() {
502 502
 			utils.Errorf("%s: Error close stdin: %s", container.ID, err)
503 503
 		}
504 504
 	}
505
-	if err := container.stdout.CloseWriters(); err != nil {
505
+	if err := container.stdout.Close(); err != nil {
506 506
 		utils.Errorf("%s: Error close stdout: %s", container.ID, err)
507 507
 	}
508
-	if err := container.stderr.CloseWriters(); err != nil {
508
+	if err := container.stderr.Close(); err != nil {
509 509
 		utils.Errorf("%s: Error close stderr: %s", container.ID, err)
510 510
 	}
511 511
 	if container.command != nil && container.command.Terminal != nil {
... ...
@@ -34,6 +34,7 @@ import (
34 34
 	"github.com/dotcloud/docker/pkg/truncindex"
35 35
 	"github.com/dotcloud/docker/runconfig"
36 36
 	"github.com/dotcloud/docker/utils"
37
+	"github.com/dotcloud/docker/utils/broadcastwriter"
37 38
 )
38 39
 
39 40
 // Set the max depth to the aufs default that most
... ...
@@ -169,8 +170,8 @@ func (daemon *Daemon) register(container *Container, updateSuffixarray bool, con
169 169
 	container.daemon = daemon
170 170
 
171 171
 	// Attach to stdout and stderr
172
-	container.stderr = utils.NewWriteBroadcaster()
173
-	container.stdout = utils.NewWriteBroadcaster()
172
+	container.stderr = broadcastwriter.New()
173
+	container.stdout = broadcastwriter.New()
174 174
 	// Attach to stdin
175 175
 	if container.Config.OpenStdin {
176 176
 		container.stdin, container.stdinPipe = io.Pipe()
... ...
@@ -255,7 +256,7 @@ func (daemon *Daemon) ensureName(container *Container) error {
255 255
 	return nil
256 256
 }
257 257
 
258
-func (daemon *Daemon) LogToDisk(src *utils.WriteBroadcaster, dst, stream string) error {
258
+func (daemon *Daemon) LogToDisk(src *broadcastwriter.BroadcastWriter, dst, stream string) error {
259 259
 	log, err := os.OpenFile(dst, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
260 260
 	if err != nil {
261 261
 		return err
262 262
new file mode 100644
... ...
@@ -0,0 +1,92 @@
0
+package broadcastwriter
1
+
2
+import (
3
+	"bytes"
4
+	"encoding/json"
5
+	"io"
6
+	"sync"
7
+	"time"
8
+
9
+	"github.com/dotcloud/docker/utils"
10
+)
11
+
12
+type BroadcastWriter struct {
13
+	sync.Mutex
14
+	buf     *bytes.Buffer
15
+	streams map[string](map[io.WriteCloser]struct{})
16
+}
17
+
18
+func (w *BroadcastWriter) AddWriter(writer io.WriteCloser, stream string) {
19
+	w.Lock()
20
+	if _, ok := w.streams[stream]; !ok {
21
+		w.streams[stream] = make(map[io.WriteCloser]struct{})
22
+	}
23
+	w.streams[stream][writer] = struct{}{}
24
+	w.Unlock()
25
+}
26
+
27
+func (w *BroadcastWriter) Write(p []byte) (n int, err error) {
28
+	created := time.Now().UTC()
29
+	w.Lock()
30
+	defer w.Unlock()
31
+	if writers, ok := w.streams[""]; ok {
32
+		for sw := range writers {
33
+			if n, err := sw.Write(p); err != nil || n != len(p) {
34
+				// On error, evict the writer
35
+				delete(writers, sw)
36
+			}
37
+		}
38
+	}
39
+	w.buf.Write(p)
40
+	lines := []string{}
41
+	for {
42
+		line, err := w.buf.ReadString('\n')
43
+		if err != nil {
44
+			w.buf.Write([]byte(line))
45
+			break
46
+		}
47
+		lines = append(lines, line)
48
+	}
49
+
50
+	if len(lines) != 0 {
51
+		for stream, writers := range w.streams {
52
+			if stream == "" {
53
+				continue
54
+			}
55
+			var lp []byte
56
+			for _, line := range lines {
57
+				b, err := json.Marshal(&utils.JSONLog{Log: line, Stream: stream, Created: created})
58
+				if err != nil {
59
+					utils.Errorf("Error making JSON log line: %s", err)
60
+				}
61
+				lp = append(lp, b...)
62
+				lp = append(lp, '\n')
63
+			}
64
+			for sw := range writers {
65
+				if _, err := sw.Write(lp); err != nil {
66
+					delete(writers, sw)
67
+				}
68
+			}
69
+		}
70
+	}
71
+	return len(p), nil
72
+}
73
+
74
+func (w *BroadcastWriter) Close() error {
75
+	w.Lock()
76
+	defer w.Unlock()
77
+	for _, writers := range w.streams {
78
+		for w := range writers {
79
+			w.Close()
80
+		}
81
+	}
82
+	w.streams = make(map[string](map[io.WriteCloser]struct{}))
83
+	return nil
84
+}
85
+
86
+func New() *BroadcastWriter {
87
+	return &BroadcastWriter{
88
+		streams: make(map[string](map[io.WriteCloser]struct{})),
89
+		buf:     bytes.NewBuffer(nil),
90
+	}
91
+}
0 92
new file mode 100644
... ...
@@ -0,0 +1,108 @@
0
+package broadcastwriter
1
+
2
+import (
3
+	"bytes"
4
+	"errors"
5
+
6
+	"testing"
7
+)
8
+
9
+type dummyWriter struct {
10
+	buffer      bytes.Buffer
11
+	failOnWrite bool
12
+}
13
+
14
+func (dw *dummyWriter) Write(p []byte) (n int, err error) {
15
+	if dw.failOnWrite {
16
+		return 0, errors.New("Fake fail")
17
+	}
18
+	return dw.buffer.Write(p)
19
+}
20
+
21
+func (dw *dummyWriter) String() string {
22
+	return dw.buffer.String()
23
+}
24
+
25
+func (dw *dummyWriter) Close() error {
26
+	return nil
27
+}
28
+
29
+func TestBroadcastWriter(t *testing.T) {
30
+	writer := New()
31
+
32
+	// Test 1: Both bufferA and bufferB should contain "foo"
33
+	bufferA := &dummyWriter{}
34
+	writer.AddWriter(bufferA, "")
35
+	bufferB := &dummyWriter{}
36
+	writer.AddWriter(bufferB, "")
37
+	writer.Write([]byte("foo"))
38
+
39
+	if bufferA.String() != "foo" {
40
+		t.Errorf("Buffer contains %v", bufferA.String())
41
+	}
42
+
43
+	if bufferB.String() != "foo" {
44
+		t.Errorf("Buffer contains %v", bufferB.String())
45
+	}
46
+
47
+	// Test2: bufferA and bufferB should contain "foobar",
48
+	// while bufferC should only contain "bar"
49
+	bufferC := &dummyWriter{}
50
+	writer.AddWriter(bufferC, "")
51
+	writer.Write([]byte("bar"))
52
+
53
+	if bufferA.String() != "foobar" {
54
+		t.Errorf("Buffer contains %v", bufferA.String())
55
+	}
56
+
57
+	if bufferB.String() != "foobar" {
58
+		t.Errorf("Buffer contains %v", bufferB.String())
59
+	}
60
+
61
+	if bufferC.String() != "bar" {
62
+		t.Errorf("Buffer contains %v", bufferC.String())
63
+	}
64
+
65
+	// Test3: Test eviction on failure
66
+	bufferA.failOnWrite = true
67
+	writer.Write([]byte("fail"))
68
+	if bufferA.String() != "foobar" {
69
+		t.Errorf("Buffer contains %v", bufferA.String())
70
+	}
71
+	if bufferC.String() != "barfail" {
72
+		t.Errorf("Buffer contains %v", bufferC.String())
73
+	}
74
+	// Even though we reset the flag, no more writes should go in there
75
+	bufferA.failOnWrite = false
76
+	writer.Write([]byte("test"))
77
+	if bufferA.String() != "foobar" {
78
+		t.Errorf("Buffer contains %v", bufferA.String())
79
+	}
80
+	if bufferC.String() != "barfailtest" {
81
+		t.Errorf("Buffer contains %v", bufferC.String())
82
+	}
83
+
84
+	writer.Close()
85
+}
86
+
87
+type devNullCloser int
88
+
89
+func (d devNullCloser) Close() error {
90
+	return nil
91
+}
92
+
93
+func (d devNullCloser) Write(buf []byte) (int, error) {
94
+	return len(buf), nil
95
+}
96
+
97
+// This test checks for races. It is only useful when run with the race detector.
98
+func TestRaceBroadcastWriter(t *testing.T) {
99
+	writer := New()
100
+	c := make(chan bool)
101
+	go func() {
102
+		writer.AddWriter(devNullCloser(0), "")
103
+		c <- true
104
+	}()
105
+	writer.Write([]byte("hello"))
106
+	<-c
107
+}
... ...
@@ -265,21 +265,6 @@ func (r *bufReader) Close() error {
265 265
 	return closer.Close()
266 266
 }
267 267
 
268
-type WriteBroadcaster struct {
269
-	sync.Mutex
270
-	buf     *bytes.Buffer
271
-	streams map[string](map[io.WriteCloser]struct{})
272
-}
273
-
274
-func (w *WriteBroadcaster) AddWriter(writer io.WriteCloser, stream string) {
275
-	w.Lock()
276
-	if _, ok := w.streams[stream]; !ok {
277
-		w.streams[stream] = make(map[io.WriteCloser]struct{})
278
-	}
279
-	w.streams[stream][writer] = struct{}{}
280
-	w.Unlock()
281
-}
282
-
283 268
 type JSONLog struct {
284 269
 	Log     string    `json:"log,omitempty"`
285 270
 	Stream  string    `json:"stream,omitempty"`
... ...
@@ -316,77 +301,6 @@ func WriteLog(src io.Reader, dst io.WriteCloser, format string) error {
316 316
 	}
317 317
 }
318 318
 
319
-type LogFormatter struct {
320
-	wc         io.WriteCloser
321
-	timeFormat string
322
-}
323
-
324
-func (w *WriteBroadcaster) Write(p []byte) (n int, err error) {
325
-	created := time.Now().UTC()
326
-	w.Lock()
327
-	defer w.Unlock()
328
-	if writers, ok := w.streams[""]; ok {
329
-		for sw := range writers {
330
-			if n, err := sw.Write(p); err != nil || n != len(p) {
331
-				// On error, evict the writer
332
-				delete(writers, sw)
333
-			}
334
-		}
335
-	}
336
-	w.buf.Write(p)
337
-	lines := []string{}
338
-	for {
339
-		line, err := w.buf.ReadString('\n')
340
-		if err != nil {
341
-			w.buf.Write([]byte(line))
342
-			break
343
-		}
344
-		lines = append(lines, line)
345
-	}
346
-
347
-	if len(lines) != 0 {
348
-		for stream, writers := range w.streams {
349
-			if stream == "" {
350
-				continue
351
-			}
352
-			var lp []byte
353
-			for _, line := range lines {
354
-				b, err := json.Marshal(&JSONLog{Log: line, Stream: stream, Created: created})
355
-				if err != nil {
356
-					Errorf("Error making JSON log line: %s", err)
357
-				}
358
-				lp = append(lp, b...)
359
-				lp = append(lp, '\n')
360
-			}
361
-			for sw := range writers {
362
-				if _, err := sw.Write(lp); err != nil {
363
-					delete(writers, sw)
364
-				}
365
-			}
366
-		}
367
-	}
368
-	return len(p), nil
369
-}
370
-
371
-func (w *WriteBroadcaster) CloseWriters() error {
372
-	w.Lock()
373
-	defer w.Unlock()
374
-	for _, writers := range w.streams {
375
-		for w := range writers {
376
-			w.Close()
377
-		}
378
-	}
379
-	w.streams = make(map[string](map[io.WriteCloser]struct{}))
380
-	return nil
381
-}
382
-
383
-func NewWriteBroadcaster() *WriteBroadcaster {
384
-	return &WriteBroadcaster{
385
-		streams: make(map[string](map[io.WriteCloser]struct{})),
386
-		buf:     bytes.NewBuffer(nil),
387
-	}
388
-}
389
-
390 319
 func GetTotalUsedFds() int {
391 320
 	if fds, err := ioutil.ReadDir(fmt.Sprintf("/proc/%d/fd", os.Getpid())); err != nil {
392 321
 		Errorf("Error opening /proc/%d/fd: %s", os.Getpid(), err)
... ...
@@ -2,7 +2,6 @@ package utils
2 2
 
3 3
 import (
4 4
 	"bytes"
5
-	"errors"
6 5
 	"io"
7 6
 	"io/ioutil"
8 7
 	"os"
... ...
@@ -35,106 +34,6 @@ func TestBufReader(t *testing.T) {
35 35
 	}
36 36
 }
37 37
 
38
-type dummyWriter struct {
39
-	buffer      bytes.Buffer
40
-	failOnWrite bool
41
-}
42
-
43
-func (dw *dummyWriter) Write(p []byte) (n int, err error) {
44
-	if dw.failOnWrite {
45
-		return 0, errors.New("Fake fail")
46
-	}
47
-	return dw.buffer.Write(p)
48
-}
49
-
50
-func (dw *dummyWriter) String() string {
51
-	return dw.buffer.String()
52
-}
53
-
54
-func (dw *dummyWriter) Close() error {
55
-	return nil
56
-}
57
-
58
-func TestWriteBroadcaster(t *testing.T) {
59
-	writer := NewWriteBroadcaster()
60
-
61
-	// Test 1: Both bufferA and bufferB should contain "foo"
62
-	bufferA := &dummyWriter{}
63
-	writer.AddWriter(bufferA, "")
64
-	bufferB := &dummyWriter{}
65
-	writer.AddWriter(bufferB, "")
66
-	writer.Write([]byte("foo"))
67
-
68
-	if bufferA.String() != "foo" {
69
-		t.Errorf("Buffer contains %v", bufferA.String())
70
-	}
71
-
72
-	if bufferB.String() != "foo" {
73
-		t.Errorf("Buffer contains %v", bufferB.String())
74
-	}
75
-
76
-	// Test2: bufferA and bufferB should contain "foobar",
77
-	// while bufferC should only contain "bar"
78
-	bufferC := &dummyWriter{}
79
-	writer.AddWriter(bufferC, "")
80
-	writer.Write([]byte("bar"))
81
-
82
-	if bufferA.String() != "foobar" {
83
-		t.Errorf("Buffer contains %v", bufferA.String())
84
-	}
85
-
86
-	if bufferB.String() != "foobar" {
87
-		t.Errorf("Buffer contains %v", bufferB.String())
88
-	}
89
-
90
-	if bufferC.String() != "bar" {
91
-		t.Errorf("Buffer contains %v", bufferC.String())
92
-	}
93
-
94
-	// Test3: Test eviction on failure
95
-	bufferA.failOnWrite = true
96
-	writer.Write([]byte("fail"))
97
-	if bufferA.String() != "foobar" {
98
-		t.Errorf("Buffer contains %v", bufferA.String())
99
-	}
100
-	if bufferC.String() != "barfail" {
101
-		t.Errorf("Buffer contains %v", bufferC.String())
102
-	}
103
-	// Even though we reset the flag, no more writes should go in there
104
-	bufferA.failOnWrite = false
105
-	writer.Write([]byte("test"))
106
-	if bufferA.String() != "foobar" {
107
-		t.Errorf("Buffer contains %v", bufferA.String())
108
-	}
109
-	if bufferC.String() != "barfailtest" {
110
-		t.Errorf("Buffer contains %v", bufferC.String())
111
-	}
112
-
113
-	writer.CloseWriters()
114
-}
115
-
116
-type devNullCloser int
117
-
118
-func (d devNullCloser) Close() error {
119
-	return nil
120
-}
121
-
122
-func (d devNullCloser) Write(buf []byte) (int, error) {
123
-	return len(buf), nil
124
-}
125
-
126
-// This test checks for races. It is only useful when run with the race detector.
127
-func TestRaceWriteBroadcaster(t *testing.T) {
128
-	writer := NewWriteBroadcaster()
129
-	c := make(chan bool)
130
-	go func() {
131
-		writer.AddWriter(devNullCloser(0), "")
132
-		c <- true
133
-	}()
134
-	writer.Write([]byte("hello"))
135
-	<-c
136
-}
137
-
138 38
 func assertKernelVersion(t *testing.T, a, b *KernelVersionInfo, result int) {
139 39
 	if r := CompareKernelVersion(a, b); r != result {
140 40
 		t.Fatalf("Unexpected kernel version comparison result. Found %d, expected %d", r, result)