Browse code

utils/broadcastwriter -> pkg/broadcastwriter

Docker-DCO-1.1-Signed-off-by: Erik Hollensbe <github@hollensbe.org> (github: erikh)

Erik Hollensbe authored on 2014/07/31 00:16:10
Showing 6 changed files
... ...
@@ -22,12 +22,12 @@ import (
22 22
 	"github.com/docker/docker/image"
23 23
 	"github.com/docker/docker/links"
24 24
 	"github.com/docker/docker/nat"
25
+	"github.com/docker/docker/pkg/broadcastwriter"
25 26
 	"github.com/docker/docker/pkg/networkfs/etchosts"
26 27
 	"github.com/docker/docker/pkg/networkfs/resolvconf"
27 28
 	"github.com/docker/docker/pkg/symlink"
28 29
 	"github.com/docker/docker/runconfig"
29 30
 	"github.com/docker/docker/utils"
30
-	"github.com/docker/docker/utils/broadcastwriter"
31 31
 	"github.com/docker/libcontainer/devices"
32 32
 	"github.com/docker/libcontainer/label"
33 33
 )
... ...
@@ -26,6 +26,7 @@ import (
26 26
 	"github.com/docker/docker/engine"
27 27
 	"github.com/docker/docker/graph"
28 28
 	"github.com/docker/docker/image"
29
+	"github.com/docker/docker/pkg/broadcastwriter"
29 30
 	"github.com/docker/docker/pkg/graphdb"
30 31
 	"github.com/docker/docker/pkg/namesgenerator"
31 32
 	"github.com/docker/docker/pkg/networkfs/resolvconf"
... ...
@@ -35,7 +36,6 @@ import (
35 35
 	"github.com/docker/docker/pkg/truncindex"
36 36
 	"github.com/docker/docker/runconfig"
37 37
 	"github.com/docker/docker/utils"
38
-	"github.com/docker/docker/utils/broadcastwriter"
39 38
 	"github.com/docker/libcontainer/label"
40 39
 )
41 40
 
42 41
new file mode 100644
... ...
@@ -0,0 +1,92 @@
0
+package broadcastwriter
1
+
2
+import (
3
+	"bytes"
4
+	"encoding/json"
5
+	"io"
6
+	"sync"
7
+	"time"
8
+
9
+	"github.com/docker/docker/utils"
10
+)
11
+
12
+// BroadcastWriter accumulate multiple io.WriteCloser by stream.
13
+type BroadcastWriter struct {
14
+	sync.Mutex
15
+	buf     *bytes.Buffer
16
+	streams map[string](map[io.WriteCloser]struct{})
17
+}
18
+
19
+// AddWriter adds new io.WriteCloser for stream.
20
+// If stream is "", then all writes proceed as is. Otherwise every line from
21
+// input will be packed to serialized utils.JSONLog.
22
+func (w *BroadcastWriter) AddWriter(writer io.WriteCloser, stream string) {
23
+	w.Lock()
24
+	if _, ok := w.streams[stream]; !ok {
25
+		w.streams[stream] = make(map[io.WriteCloser]struct{})
26
+	}
27
+	w.streams[stream][writer] = struct{}{}
28
+	w.Unlock()
29
+}
30
+
31
+// Write writes bytes to all writers. Failed writers will be evicted during
32
+// this call.
33
+func (w *BroadcastWriter) Write(p []byte) (n int, err error) {
34
+	created := time.Now().UTC()
35
+	w.Lock()
36
+	if writers, ok := w.streams[""]; ok {
37
+		for sw := range writers {
38
+			if n, err := sw.Write(p); err != nil || n != len(p) {
39
+				// On error, evict the writer
40
+				delete(writers, sw)
41
+			}
42
+		}
43
+	}
44
+	w.buf.Write(p)
45
+	for {
46
+		line, err := w.buf.ReadString('\n')
47
+		if err != nil {
48
+			w.buf.Write([]byte(line))
49
+			break
50
+		}
51
+		for stream, writers := range w.streams {
52
+			if stream == "" {
53
+				continue
54
+			}
55
+			b, err := json.Marshal(utils.JSONLog{Log: line, Stream: stream, Created: created})
56
+			if err != nil {
57
+				utils.Errorf("Error making JSON log line: %s", err)
58
+				continue
59
+			}
60
+			b = append(b, '\n')
61
+			for sw := range writers {
62
+				if _, err := sw.Write(b); err != nil {
63
+					delete(writers, sw)
64
+				}
65
+			}
66
+		}
67
+	}
68
+	w.Unlock()
69
+	return len(p), nil
70
+}
71
+
72
+// Clean closes and removes all writers. Last non-eol-terminated part of data
73
+// will be saved.
74
+func (w *BroadcastWriter) Clean() error {
75
+	w.Lock()
76
+	for _, writers := range w.streams {
77
+		for w := range writers {
78
+			w.Close()
79
+		}
80
+	}
81
+	w.streams = make(map[string](map[io.WriteCloser]struct{}))
82
+	w.Unlock()
83
+	return nil
84
+}
85
+
86
+func New() *BroadcastWriter {
87
+	return &BroadcastWriter{
88
+		streams: make(map[string](map[io.WriteCloser]struct{})),
89
+		buf:     bytes.NewBuffer(nil),
90
+	}
91
+}
0 92
new file mode 100644
... ...
@@ -0,0 +1,144 @@
0
+package broadcastwriter
1
+
2
+import (
3
+	"bytes"
4
+	"errors"
5
+
6
+	"testing"
7
+)
8
+
9
+type dummyWriter struct {
10
+	buffer      bytes.Buffer
11
+	failOnWrite bool
12
+}
13
+
14
+func (dw *dummyWriter) Write(p []byte) (n int, err error) {
15
+	if dw.failOnWrite {
16
+		return 0, errors.New("Fake fail")
17
+	}
18
+	return dw.buffer.Write(p)
19
+}
20
+
21
+func (dw *dummyWriter) String() string {
22
+	return dw.buffer.String()
23
+}
24
+
25
+func (dw *dummyWriter) Close() error {
26
+	return nil
27
+}
28
+
29
+func TestBroadcastWriter(t *testing.T) {
30
+	writer := New()
31
+
32
+	// Test 1: Both bufferA and bufferB should contain "foo"
33
+	bufferA := &dummyWriter{}
34
+	writer.AddWriter(bufferA, "")
35
+	bufferB := &dummyWriter{}
36
+	writer.AddWriter(bufferB, "")
37
+	writer.Write([]byte("foo"))
38
+
39
+	if bufferA.String() != "foo" {
40
+		t.Errorf("Buffer contains %v", bufferA.String())
41
+	}
42
+
43
+	if bufferB.String() != "foo" {
44
+		t.Errorf("Buffer contains %v", bufferB.String())
45
+	}
46
+
47
+	// Test2: bufferA and bufferB should contain "foobar",
48
+	// while bufferC should only contain "bar"
49
+	bufferC := &dummyWriter{}
50
+	writer.AddWriter(bufferC, "")
51
+	writer.Write([]byte("bar"))
52
+
53
+	if bufferA.String() != "foobar" {
54
+		t.Errorf("Buffer contains %v", bufferA.String())
55
+	}
56
+
57
+	if bufferB.String() != "foobar" {
58
+		t.Errorf("Buffer contains %v", bufferB.String())
59
+	}
60
+
61
+	if bufferC.String() != "bar" {
62
+		t.Errorf("Buffer contains %v", bufferC.String())
63
+	}
64
+
65
+	// Test3: Test eviction on failure
66
+	bufferA.failOnWrite = true
67
+	writer.Write([]byte("fail"))
68
+	if bufferA.String() != "foobar" {
69
+		t.Errorf("Buffer contains %v", bufferA.String())
70
+	}
71
+	if bufferC.String() != "barfail" {
72
+		t.Errorf("Buffer contains %v", bufferC.String())
73
+	}
74
+	// Even though we reset the flag, no more writes should go in there
75
+	bufferA.failOnWrite = false
76
+	writer.Write([]byte("test"))
77
+	if bufferA.String() != "foobar" {
78
+		t.Errorf("Buffer contains %v", bufferA.String())
79
+	}
80
+	if bufferC.String() != "barfailtest" {
81
+		t.Errorf("Buffer contains %v", bufferC.String())
82
+	}
83
+
84
+	writer.Clean()
85
+}
86
+
87
+type devNullCloser int
88
+
89
+func (d devNullCloser) Close() error {
90
+	return nil
91
+}
92
+
93
+func (d devNullCloser) Write(buf []byte) (int, error) {
94
+	return len(buf), nil
95
+}
96
+
97
+// This test checks for races. It is only useful when run with the race detector.
98
+func TestRaceBroadcastWriter(t *testing.T) {
99
+	writer := New()
100
+	c := make(chan bool)
101
+	go func() {
102
+		writer.AddWriter(devNullCloser(0), "")
103
+		c <- true
104
+	}()
105
+	writer.Write([]byte("hello"))
106
+	<-c
107
+}
108
+
109
+func BenchmarkBroadcastWriter(b *testing.B) {
110
+	writer := New()
111
+	setUpWriter := func() {
112
+		for i := 0; i < 100; i++ {
113
+			writer.AddWriter(devNullCloser(0), "stdout")
114
+			writer.AddWriter(devNullCloser(0), "stderr")
115
+			writer.AddWriter(devNullCloser(0), "")
116
+		}
117
+	}
118
+	testLine := "Line that thinks that it is log line from docker"
119
+	var buf bytes.Buffer
120
+	for i := 0; i < 100; i++ {
121
+		buf.Write([]byte(testLine + "\n"))
122
+	}
123
+	// line without eol
124
+	buf.Write([]byte(testLine))
125
+	testText := buf.Bytes()
126
+	b.SetBytes(int64(5 * len(testText)))
127
+	b.ResetTimer()
128
+	for i := 0; i < b.N; i++ {
129
+		b.StopTimer()
130
+		setUpWriter()
131
+		b.StartTimer()
132
+
133
+		for j := 0; j < 5; j++ {
134
+			if _, err := writer.Write(testText); err != nil {
135
+				b.Fatal(err)
136
+			}
137
+		}
138
+
139
+		b.StopTimer()
140
+		writer.Clean()
141
+		b.StartTimer()
142
+	}
143
+}
0 144
deleted file mode 100644
... ...
@@ -1,92 +0,0 @@
1
-package broadcastwriter
2
-
3
-import (
4
-	"bytes"
5
-	"encoding/json"
6
-	"io"
7
-	"sync"
8
-	"time"
9
-
10
-	"github.com/docker/docker/utils"
11
-)
12
-
13
-// BroadcastWriter accumulate multiple io.WriteCloser by stream.
14
-type BroadcastWriter struct {
15
-	sync.Mutex
16
-	buf     *bytes.Buffer
17
-	streams map[string](map[io.WriteCloser]struct{})
18
-}
19
-
20
-// AddWriter adds new io.WriteCloser for stream.
21
-// If stream is "", then all writes proceed as is. Otherwise every line from
22
-// input will be packed to serialized utils.JSONLog.
23
-func (w *BroadcastWriter) AddWriter(writer io.WriteCloser, stream string) {
24
-	w.Lock()
25
-	if _, ok := w.streams[stream]; !ok {
26
-		w.streams[stream] = make(map[io.WriteCloser]struct{})
27
-	}
28
-	w.streams[stream][writer] = struct{}{}
29
-	w.Unlock()
30
-}
31
-
32
-// Write writes bytes to all writers. Failed writers will be evicted during
33
-// this call.
34
-func (w *BroadcastWriter) Write(p []byte) (n int, err error) {
35
-	created := time.Now().UTC()
36
-	w.Lock()
37
-	if writers, ok := w.streams[""]; ok {
38
-		for sw := range writers {
39
-			if n, err := sw.Write(p); err != nil || n != len(p) {
40
-				// On error, evict the writer
41
-				delete(writers, sw)
42
-			}
43
-		}
44
-	}
45
-	w.buf.Write(p)
46
-	for {
47
-		line, err := w.buf.ReadString('\n')
48
-		if err != nil {
49
-			w.buf.Write([]byte(line))
50
-			break
51
-		}
52
-		for stream, writers := range w.streams {
53
-			if stream == "" {
54
-				continue
55
-			}
56
-			b, err := json.Marshal(utils.JSONLog{Log: line, Stream: stream, Created: created})
57
-			if err != nil {
58
-				utils.Errorf("Error making JSON log line: %s", err)
59
-				continue
60
-			}
61
-			b = append(b, '\n')
62
-			for sw := range writers {
63
-				if _, err := sw.Write(b); err != nil {
64
-					delete(writers, sw)
65
-				}
66
-			}
67
-		}
68
-	}
69
-	w.Unlock()
70
-	return len(p), nil
71
-}
72
-
73
-// Clean closes and removes all writers. Last non-eol-terminated part of data
74
-// will be saved.
75
-func (w *BroadcastWriter) Clean() error {
76
-	w.Lock()
77
-	for _, writers := range w.streams {
78
-		for w := range writers {
79
-			w.Close()
80
-		}
81
-	}
82
-	w.streams = make(map[string](map[io.WriteCloser]struct{}))
83
-	w.Unlock()
84
-	return nil
85
-}
86
-
87
-func New() *BroadcastWriter {
88
-	return &BroadcastWriter{
89
-		streams: make(map[string](map[io.WriteCloser]struct{})),
90
-		buf:     bytes.NewBuffer(nil),
91
-	}
92
-}
93 1
deleted file mode 100644
... ...
@@ -1,144 +0,0 @@
1
-package broadcastwriter
2
-
3
-import (
4
-	"bytes"
5
-	"errors"
6
-
7
-	"testing"
8
-)
9
-
10
-type dummyWriter struct {
11
-	buffer      bytes.Buffer
12
-	failOnWrite bool
13
-}
14
-
15
-func (dw *dummyWriter) Write(p []byte) (n int, err error) {
16
-	if dw.failOnWrite {
17
-		return 0, errors.New("Fake fail")
18
-	}
19
-	return dw.buffer.Write(p)
20
-}
21
-
22
-func (dw *dummyWriter) String() string {
23
-	return dw.buffer.String()
24
-}
25
-
26
-func (dw *dummyWriter) Close() error {
27
-	return nil
28
-}
29
-
30
-func TestBroadcastWriter(t *testing.T) {
31
-	writer := New()
32
-
33
-	// Test 1: Both bufferA and bufferB should contain "foo"
34
-	bufferA := &dummyWriter{}
35
-	writer.AddWriter(bufferA, "")
36
-	bufferB := &dummyWriter{}
37
-	writer.AddWriter(bufferB, "")
38
-	writer.Write([]byte("foo"))
39
-
40
-	if bufferA.String() != "foo" {
41
-		t.Errorf("Buffer contains %v", bufferA.String())
42
-	}
43
-
44
-	if bufferB.String() != "foo" {
45
-		t.Errorf("Buffer contains %v", bufferB.String())
46
-	}
47
-
48
-	// Test2: bufferA and bufferB should contain "foobar",
49
-	// while bufferC should only contain "bar"
50
-	bufferC := &dummyWriter{}
51
-	writer.AddWriter(bufferC, "")
52
-	writer.Write([]byte("bar"))
53
-
54
-	if bufferA.String() != "foobar" {
55
-		t.Errorf("Buffer contains %v", bufferA.String())
56
-	}
57
-
58
-	if bufferB.String() != "foobar" {
59
-		t.Errorf("Buffer contains %v", bufferB.String())
60
-	}
61
-
62
-	if bufferC.String() != "bar" {
63
-		t.Errorf("Buffer contains %v", bufferC.String())
64
-	}
65
-
66
-	// Test3: Test eviction on failure
67
-	bufferA.failOnWrite = true
68
-	writer.Write([]byte("fail"))
69
-	if bufferA.String() != "foobar" {
70
-		t.Errorf("Buffer contains %v", bufferA.String())
71
-	}
72
-	if bufferC.String() != "barfail" {
73
-		t.Errorf("Buffer contains %v", bufferC.String())
74
-	}
75
-	// Even though we reset the flag, no more writes should go in there
76
-	bufferA.failOnWrite = false
77
-	writer.Write([]byte("test"))
78
-	if bufferA.String() != "foobar" {
79
-		t.Errorf("Buffer contains %v", bufferA.String())
80
-	}
81
-	if bufferC.String() != "barfailtest" {
82
-		t.Errorf("Buffer contains %v", bufferC.String())
83
-	}
84
-
85
-	writer.Clean()
86
-}
87
-
88
-type devNullCloser int
89
-
90
-func (d devNullCloser) Close() error {
91
-	return nil
92
-}
93
-
94
-func (d devNullCloser) Write(buf []byte) (int, error) {
95
-	return len(buf), nil
96
-}
97
-
98
-// This test checks for races. It is only useful when run with the race detector.
99
-func TestRaceBroadcastWriter(t *testing.T) {
100
-	writer := New()
101
-	c := make(chan bool)
102
-	go func() {
103
-		writer.AddWriter(devNullCloser(0), "")
104
-		c <- true
105
-	}()
106
-	writer.Write([]byte("hello"))
107
-	<-c
108
-}
109
-
110
-func BenchmarkBroadcastWriter(b *testing.B) {
111
-	writer := New()
112
-	setUpWriter := func() {
113
-		for i := 0; i < 100; i++ {
114
-			writer.AddWriter(devNullCloser(0), "stdout")
115
-			writer.AddWriter(devNullCloser(0), "stderr")
116
-			writer.AddWriter(devNullCloser(0), "")
117
-		}
118
-	}
119
-	testLine := "Line that thinks that it is log line from docker"
120
-	var buf bytes.Buffer
121
-	for i := 0; i < 100; i++ {
122
-		buf.Write([]byte(testLine + "\n"))
123
-	}
124
-	// line without eol
125
-	buf.Write([]byte(testLine))
126
-	testText := buf.Bytes()
127
-	b.SetBytes(int64(5 * len(testText)))
128
-	b.ResetTimer()
129
-	for i := 0; i < b.N; i++ {
130
-		b.StopTimer()
131
-		setUpWriter()
132
-		b.StartTimer()
133
-
134
-		for j := 0; j < 5; j++ {
135
-			if _, err := writer.Write(testText); err != nil {
136
-				b.Fatal(err)
137
-			}
138
-		}
139
-
140
-		b.StopTimer()
141
-		writer.Clean()
142
-		b.StartTimer()
143
-	}
144
-}