Browse code

Merge pull request #7329 from erikh/move_broadcastwriter

Cleanup: utils/broadcastwriter -> pkg/broadcastwriter

Michael Crosby authored on 2014/08/08 06:51:42
Showing 10 changed files
... ...
@@ -8,6 +8,7 @@ import (
8 8
 	"time"
9 9
 
10 10
 	"github.com/docker/docker/engine"
11
+	"github.com/docker/docker/pkg/jsonlog"
11 12
 	"github.com/docker/docker/utils"
12 13
 )
13 14
 
... ...
@@ -57,7 +58,7 @@ func (daemon *Daemon) ContainerAttach(job *engine.Job) engine.Status {
57 57
 		} else {
58 58
 			dec := json.NewDecoder(cLog)
59 59
 			for {
60
-				l := &utils.JSONLog{}
60
+				l := &jsonlog.JSONLog{}
61 61
 
62 62
 				if err := dec.Decode(l); err == io.EOF {
63 63
 					break
... ...
@@ -22,12 +22,12 @@ import (
22 22
 	"github.com/docker/docker/image"
23 23
 	"github.com/docker/docker/links"
24 24
 	"github.com/docker/docker/nat"
25
+	"github.com/docker/docker/pkg/broadcastwriter"
25 26
 	"github.com/docker/docker/pkg/networkfs/etchosts"
26 27
 	"github.com/docker/docker/pkg/networkfs/resolvconf"
27 28
 	"github.com/docker/docker/pkg/symlink"
28 29
 	"github.com/docker/docker/runconfig"
29 30
 	"github.com/docker/docker/utils"
30
-	"github.com/docker/docker/utils/broadcastwriter"
31 31
 	"github.com/docker/libcontainer/devices"
32 32
 	"github.com/docker/libcontainer/label"
33 33
 )
... ...
@@ -26,6 +26,7 @@ import (
26 26
 	"github.com/docker/docker/engine"
27 27
 	"github.com/docker/docker/graph"
28 28
 	"github.com/docker/docker/image"
29
+	"github.com/docker/docker/pkg/broadcastwriter"
29 30
 	"github.com/docker/docker/pkg/graphdb"
30 31
 	"github.com/docker/docker/pkg/namesgenerator"
31 32
 	"github.com/docker/docker/pkg/networkfs/resolvconf"
... ...
@@ -35,7 +36,6 @@ import (
35 35
 	"github.com/docker/docker/pkg/truncindex"
36 36
 	"github.com/docker/docker/runconfig"
37 37
 	"github.com/docker/docker/utils"
38
-	"github.com/docker/docker/utils/broadcastwriter"
39 38
 	"github.com/docker/libcontainer/label"
40 39
 )
41 40
 
... ...
@@ -12,6 +12,7 @@ import (
12 12
 	"github.com/docker/docker/pkg/tailfile"
13 13
 
14 14
 	"github.com/docker/docker/engine"
15
+	"github.com/docker/docker/pkg/jsonlog"
15 16
 	"github.com/docker/docker/utils"
16 17
 )
17 18
 
... ...
@@ -89,7 +90,7 @@ func (daemon *Daemon) ContainerLogs(job *engine.Job) engine.Status {
89 89
 			}
90 90
 			dec := json.NewDecoder(cLog)
91 91
 			for {
92
-				l := &utils.JSONLog{}
92
+				l := &jsonlog.JSONLog{}
93 93
 
94 94
 				if err := dec.Decode(l); err == io.EOF {
95 95
 					break
... ...
@@ -115,13 +116,13 @@ func (daemon *Daemon) ContainerLogs(job *engine.Job) engine.Status {
115 115
 		if stdout {
116 116
 			stdoutPipe := container.StdoutLogPipe()
117 117
 			go func() {
118
-				errors <- utils.WriteLog(stdoutPipe, job.Stdout, format)
118
+				errors <- jsonlog.WriteLog(stdoutPipe, job.Stdout, format)
119 119
 			}()
120 120
 		}
121 121
 		if stderr {
122 122
 			stderrPipe := container.StderrLogPipe()
123 123
 			go func() {
124
-				errors <- utils.WriteLog(stderrPipe, job.Stderr, format)
124
+				errors <- jsonlog.WriteLog(stderrPipe, job.Stderr, format)
125 125
 			}()
126 126
 		}
127 127
 		err := <-errors
128 128
new file mode 100644
... ...
@@ -0,0 +1,93 @@
0
+package broadcastwriter
1
+
2
+import (
3
+	"bytes"
4
+	"encoding/json"
5
+	"io"
6
+	"log"
7
+	"sync"
8
+	"time"
9
+
10
+	"github.com/docker/docker/pkg/jsonlog"
11
+)
12
+
13
+// BroadcastWriter accumulate multiple io.WriteCloser by stream.
14
+type BroadcastWriter struct {
15
+	sync.Mutex
16
+	buf     *bytes.Buffer
17
+	streams map[string](map[io.WriteCloser]struct{})
18
+}
19
+
20
+// AddWriter adds new io.WriteCloser for stream.
21
+// If stream is "", then all writes proceed as is. Otherwise every line from
22
+// input will be packed to serialized jsonlog.JSONLog.
23
+func (w *BroadcastWriter) AddWriter(writer io.WriteCloser, stream string) {
24
+	w.Lock()
25
+	if _, ok := w.streams[stream]; !ok {
26
+		w.streams[stream] = make(map[io.WriteCloser]struct{})
27
+	}
28
+	w.streams[stream][writer] = struct{}{}
29
+	w.Unlock()
30
+}
31
+
32
+// Write writes bytes to all writers. Failed writers will be evicted during
33
+// this call.
34
+func (w *BroadcastWriter) Write(p []byte) (n int, err error) {
35
+	created := time.Now().UTC()
36
+	w.Lock()
37
+	if writers, ok := w.streams[""]; ok {
38
+		for sw := range writers {
39
+			if n, err := sw.Write(p); err != nil || n != len(p) {
40
+				// On error, evict the writer
41
+				delete(writers, sw)
42
+			}
43
+		}
44
+	}
45
+	w.buf.Write(p)
46
+	for {
47
+		line, err := w.buf.ReadString('\n')
48
+		if err != nil {
49
+			w.buf.Write([]byte(line))
50
+			break
51
+		}
52
+		for stream, writers := range w.streams {
53
+			if stream == "" {
54
+				continue
55
+			}
56
+			b, err := json.Marshal(jsonlog.JSONLog{Log: line, Stream: stream, Created: created})
57
+			if err != nil {
58
+				log.Printf("Error making JSON log line: %s", err)
59
+				continue
60
+			}
61
+			b = append(b, '\n')
62
+			for sw := range writers {
63
+				if _, err := sw.Write(b); err != nil {
64
+					delete(writers, sw)
65
+				}
66
+			}
67
+		}
68
+	}
69
+	w.Unlock()
70
+	return len(p), nil
71
+}
72
+
73
+// Clean closes and removes all writers. Last non-eol-terminated part of data
74
+// will be saved.
75
+func (w *BroadcastWriter) Clean() error {
76
+	w.Lock()
77
+	for _, writers := range w.streams {
78
+		for w := range writers {
79
+			w.Close()
80
+		}
81
+	}
82
+	w.streams = make(map[string](map[io.WriteCloser]struct{}))
83
+	w.Unlock()
84
+	return nil
85
+}
86
+
87
+func New() *BroadcastWriter {
88
+	return &BroadcastWriter{
89
+		streams: make(map[string](map[io.WriteCloser]struct{})),
90
+		buf:     bytes.NewBuffer(nil),
91
+	}
92
+}
0 93
new file mode 100644
... ...
@@ -0,0 +1,144 @@
0
+package broadcastwriter
1
+
2
+import (
3
+	"bytes"
4
+	"errors"
5
+
6
+	"testing"
7
+)
8
+
9
+type dummyWriter struct {
10
+	buffer      bytes.Buffer
11
+	failOnWrite bool
12
+}
13
+
14
+func (dw *dummyWriter) Write(p []byte) (n int, err error) {
15
+	if dw.failOnWrite {
16
+		return 0, errors.New("Fake fail")
17
+	}
18
+	return dw.buffer.Write(p)
19
+}
20
+
21
+func (dw *dummyWriter) String() string {
22
+	return dw.buffer.String()
23
+}
24
+
25
+func (dw *dummyWriter) Close() error {
26
+	return nil
27
+}
28
+
29
+func TestBroadcastWriter(t *testing.T) {
30
+	writer := New()
31
+
32
+	// Test 1: Both bufferA and bufferB should contain "foo"
33
+	bufferA := &dummyWriter{}
34
+	writer.AddWriter(bufferA, "")
35
+	bufferB := &dummyWriter{}
36
+	writer.AddWriter(bufferB, "")
37
+	writer.Write([]byte("foo"))
38
+
39
+	if bufferA.String() != "foo" {
40
+		t.Errorf("Buffer contains %v", bufferA.String())
41
+	}
42
+
43
+	if bufferB.String() != "foo" {
44
+		t.Errorf("Buffer contains %v", bufferB.String())
45
+	}
46
+
47
+	// Test2: bufferA and bufferB should contain "foobar",
48
+	// while bufferC should only contain "bar"
49
+	bufferC := &dummyWriter{}
50
+	writer.AddWriter(bufferC, "")
51
+	writer.Write([]byte("bar"))
52
+
53
+	if bufferA.String() != "foobar" {
54
+		t.Errorf("Buffer contains %v", bufferA.String())
55
+	}
56
+
57
+	if bufferB.String() != "foobar" {
58
+		t.Errorf("Buffer contains %v", bufferB.String())
59
+	}
60
+
61
+	if bufferC.String() != "bar" {
62
+		t.Errorf("Buffer contains %v", bufferC.String())
63
+	}
64
+
65
+	// Test3: Test eviction on failure
66
+	bufferA.failOnWrite = true
67
+	writer.Write([]byte("fail"))
68
+	if bufferA.String() != "foobar" {
69
+		t.Errorf("Buffer contains %v", bufferA.String())
70
+	}
71
+	if bufferC.String() != "barfail" {
72
+		t.Errorf("Buffer contains %v", bufferC.String())
73
+	}
74
+	// Even though we reset the flag, no more writes should go in there
75
+	bufferA.failOnWrite = false
76
+	writer.Write([]byte("test"))
77
+	if bufferA.String() != "foobar" {
78
+		t.Errorf("Buffer contains %v", bufferA.String())
79
+	}
80
+	if bufferC.String() != "barfailtest" {
81
+		t.Errorf("Buffer contains %v", bufferC.String())
82
+	}
83
+
84
+	writer.Clean()
85
+}
86
+
87
+type devNullCloser int
88
+
89
+func (d devNullCloser) Close() error {
90
+	return nil
91
+}
92
+
93
+func (d devNullCloser) Write(buf []byte) (int, error) {
94
+	return len(buf), nil
95
+}
96
+
97
+// This test checks for races. It is only useful when run with the race detector.
98
+func TestRaceBroadcastWriter(t *testing.T) {
99
+	writer := New()
100
+	c := make(chan bool)
101
+	go func() {
102
+		writer.AddWriter(devNullCloser(0), "")
103
+		c <- true
104
+	}()
105
+	writer.Write([]byte("hello"))
106
+	<-c
107
+}
108
+
109
+func BenchmarkBroadcastWriter(b *testing.B) {
110
+	writer := New()
111
+	setUpWriter := func() {
112
+		for i := 0; i < 100; i++ {
113
+			writer.AddWriter(devNullCloser(0), "stdout")
114
+			writer.AddWriter(devNullCloser(0), "stderr")
115
+			writer.AddWriter(devNullCloser(0), "")
116
+		}
117
+	}
118
+	testLine := "Line that thinks that it is log line from docker"
119
+	var buf bytes.Buffer
120
+	for i := 0; i < 100; i++ {
121
+		buf.Write([]byte(testLine + "\n"))
122
+	}
123
+	// line without eol
124
+	buf.Write([]byte(testLine))
125
+	testText := buf.Bytes()
126
+	b.SetBytes(int64(5 * len(testText)))
127
+	b.ResetTimer()
128
+	for i := 0; i < b.N; i++ {
129
+		b.StopTimer()
130
+		setUpWriter()
131
+		b.StartTimer()
132
+
133
+		for j := 0; j < 5; j++ {
134
+			if _, err := writer.Write(testText); err != nil {
135
+				b.Fatal(err)
136
+			}
137
+		}
138
+
139
+		b.StopTimer()
140
+		writer.Clean()
141
+		b.StartTimer()
142
+	}
143
+}
0 144
new file mode 100644
... ...
@@ -0,0 +1,45 @@
0
+package jsonlog
1
+
2
+import (
3
+	"encoding/json"
4
+	"fmt"
5
+	"io"
6
+	"log"
7
+	"time"
8
+)
9
+
10
+type JSONLog struct {
11
+	Log     string    `json:"log,omitempty"`
12
+	Stream  string    `json:"stream,omitempty"`
13
+	Created time.Time `json:"time"`
14
+}
15
+
16
+func (jl *JSONLog) Format(format string) (string, error) {
17
+	if format == "" {
18
+		return jl.Log, nil
19
+	}
20
+	if format == "json" {
21
+		m, err := json.Marshal(jl)
22
+		return string(m), err
23
+	}
24
+	return fmt.Sprintf("[%s] %s", jl.Created.Format(format), jl.Log), nil
25
+}
26
+
27
+func WriteLog(src io.Reader, dst io.WriteCloser, format string) error {
28
+	dec := json.NewDecoder(src)
29
+	for {
30
+		l := &JSONLog{}
31
+
32
+		if err := dec.Decode(l); err == io.EOF {
33
+			return nil
34
+		} else if err != nil {
35
+			log.Printf("Error streaming logs: %s", err)
36
+			return err
37
+		}
38
+		line, err := l.Format(format)
39
+		if err != nil {
40
+			return err
41
+		}
42
+		fmt.Fprintf(dst, "%s", line)
43
+	}
44
+}
0 45
deleted file mode 100644
... ...
@@ -1,92 +0,0 @@
1
-package broadcastwriter
2
-
3
-import (
4
-	"bytes"
5
-	"encoding/json"
6
-	"io"
7
-	"sync"
8
-	"time"
9
-
10
-	"github.com/docker/docker/utils"
11
-)
12
-
13
-// BroadcastWriter accumulate multiple io.WriteCloser by stream.
14
-type BroadcastWriter struct {
15
-	sync.Mutex
16
-	buf     *bytes.Buffer
17
-	streams map[string](map[io.WriteCloser]struct{})
18
-}
19
-
20
-// AddWriter adds new io.WriteCloser for stream.
21
-// If stream is "", then all writes proceed as is. Otherwise every line from
22
-// input will be packed to serialized utils.JSONLog.
23
-func (w *BroadcastWriter) AddWriter(writer io.WriteCloser, stream string) {
24
-	w.Lock()
25
-	if _, ok := w.streams[stream]; !ok {
26
-		w.streams[stream] = make(map[io.WriteCloser]struct{})
27
-	}
28
-	w.streams[stream][writer] = struct{}{}
29
-	w.Unlock()
30
-}
31
-
32
-// Write writes bytes to all writers. Failed writers will be evicted during
33
-// this call.
34
-func (w *BroadcastWriter) Write(p []byte) (n int, err error) {
35
-	created := time.Now().UTC()
36
-	w.Lock()
37
-	if writers, ok := w.streams[""]; ok {
38
-		for sw := range writers {
39
-			if n, err := sw.Write(p); err != nil || n != len(p) {
40
-				// On error, evict the writer
41
-				delete(writers, sw)
42
-			}
43
-		}
44
-	}
45
-	w.buf.Write(p)
46
-	for {
47
-		line, err := w.buf.ReadString('\n')
48
-		if err != nil {
49
-			w.buf.Write([]byte(line))
50
-			break
51
-		}
52
-		for stream, writers := range w.streams {
53
-			if stream == "" {
54
-				continue
55
-			}
56
-			b, err := json.Marshal(utils.JSONLog{Log: line, Stream: stream, Created: created})
57
-			if err != nil {
58
-				utils.Errorf("Error making JSON log line: %s", err)
59
-				continue
60
-			}
61
-			b = append(b, '\n')
62
-			for sw := range writers {
63
-				if _, err := sw.Write(b); err != nil {
64
-					delete(writers, sw)
65
-				}
66
-			}
67
-		}
68
-	}
69
-	w.Unlock()
70
-	return len(p), nil
71
-}
72
-
73
-// Clean closes and removes all writers. Last non-eol-terminated part of data
74
-// will be saved.
75
-func (w *BroadcastWriter) Clean() error {
76
-	w.Lock()
77
-	for _, writers := range w.streams {
78
-		for w := range writers {
79
-			w.Close()
80
-		}
81
-	}
82
-	w.streams = make(map[string](map[io.WriteCloser]struct{}))
83
-	w.Unlock()
84
-	return nil
85
-}
86
-
87
-func New() *BroadcastWriter {
88
-	return &BroadcastWriter{
89
-		streams: make(map[string](map[io.WriteCloser]struct{})),
90
-		buf:     bytes.NewBuffer(nil),
91
-	}
92
-}
93 1
deleted file mode 100644
... ...
@@ -1,144 +0,0 @@
1
-package broadcastwriter
2
-
3
-import (
4
-	"bytes"
5
-	"errors"
6
-
7
-	"testing"
8
-)
9
-
10
-type dummyWriter struct {
11
-	buffer      bytes.Buffer
12
-	failOnWrite bool
13
-}
14
-
15
-func (dw *dummyWriter) Write(p []byte) (n int, err error) {
16
-	if dw.failOnWrite {
17
-		return 0, errors.New("Fake fail")
18
-	}
19
-	return dw.buffer.Write(p)
20
-}
21
-
22
-func (dw *dummyWriter) String() string {
23
-	return dw.buffer.String()
24
-}
25
-
26
-func (dw *dummyWriter) Close() error {
27
-	return nil
28
-}
29
-
30
-func TestBroadcastWriter(t *testing.T) {
31
-	writer := New()
32
-
33
-	// Test 1: Both bufferA and bufferB should contain "foo"
34
-	bufferA := &dummyWriter{}
35
-	writer.AddWriter(bufferA, "")
36
-	bufferB := &dummyWriter{}
37
-	writer.AddWriter(bufferB, "")
38
-	writer.Write([]byte("foo"))
39
-
40
-	if bufferA.String() != "foo" {
41
-		t.Errorf("Buffer contains %v", bufferA.String())
42
-	}
43
-
44
-	if bufferB.String() != "foo" {
45
-		t.Errorf("Buffer contains %v", bufferB.String())
46
-	}
47
-
48
-	// Test2: bufferA and bufferB should contain "foobar",
49
-	// while bufferC should only contain "bar"
50
-	bufferC := &dummyWriter{}
51
-	writer.AddWriter(bufferC, "")
52
-	writer.Write([]byte("bar"))
53
-
54
-	if bufferA.String() != "foobar" {
55
-		t.Errorf("Buffer contains %v", bufferA.String())
56
-	}
57
-
58
-	if bufferB.String() != "foobar" {
59
-		t.Errorf("Buffer contains %v", bufferB.String())
60
-	}
61
-
62
-	if bufferC.String() != "bar" {
63
-		t.Errorf("Buffer contains %v", bufferC.String())
64
-	}
65
-
66
-	// Test3: Test eviction on failure
67
-	bufferA.failOnWrite = true
68
-	writer.Write([]byte("fail"))
69
-	if bufferA.String() != "foobar" {
70
-		t.Errorf("Buffer contains %v", bufferA.String())
71
-	}
72
-	if bufferC.String() != "barfail" {
73
-		t.Errorf("Buffer contains %v", bufferC.String())
74
-	}
75
-	// Even though we reset the flag, no more writes should go in there
76
-	bufferA.failOnWrite = false
77
-	writer.Write([]byte("test"))
78
-	if bufferA.String() != "foobar" {
79
-		t.Errorf("Buffer contains %v", bufferA.String())
80
-	}
81
-	if bufferC.String() != "barfailtest" {
82
-		t.Errorf("Buffer contains %v", bufferC.String())
83
-	}
84
-
85
-	writer.Clean()
86
-}
87
-
88
-type devNullCloser int
89
-
90
-func (d devNullCloser) Close() error {
91
-	return nil
92
-}
93
-
94
-func (d devNullCloser) Write(buf []byte) (int, error) {
95
-	return len(buf), nil
96
-}
97
-
98
-// This test checks for races. It is only useful when run with the race detector.
99
-func TestRaceBroadcastWriter(t *testing.T) {
100
-	writer := New()
101
-	c := make(chan bool)
102
-	go func() {
103
-		writer.AddWriter(devNullCloser(0), "")
104
-		c <- true
105
-	}()
106
-	writer.Write([]byte("hello"))
107
-	<-c
108
-}
109
-
110
-func BenchmarkBroadcastWriter(b *testing.B) {
111
-	writer := New()
112
-	setUpWriter := func() {
113
-		for i := 0; i < 100; i++ {
114
-			writer.AddWriter(devNullCloser(0), "stdout")
115
-			writer.AddWriter(devNullCloser(0), "stderr")
116
-			writer.AddWriter(devNullCloser(0), "")
117
-		}
118
-	}
119
-	testLine := "Line that thinks that it is log line from docker"
120
-	var buf bytes.Buffer
121
-	for i := 0; i < 100; i++ {
122
-		buf.Write([]byte(testLine + "\n"))
123
-	}
124
-	// line without eol
125
-	buf.Write([]byte(testLine))
126
-	testText := buf.Bytes()
127
-	b.SetBytes(int64(5 * len(testText)))
128
-	b.ResetTimer()
129
-	for i := 0; i < b.N; i++ {
130
-		b.StopTimer()
131
-		setUpWriter()
132
-		b.StartTimer()
133
-
134
-		for j := 0; j < 5; j++ {
135
-			if _, err := writer.Write(testText); err != nil {
136
-				b.Fatal(err)
137
-			}
138
-		}
139
-
140
-		b.StopTimer()
141
-		writer.Clean()
142
-		b.StartTimer()
143
-	}
144
-}
... ...
@@ -6,7 +6,6 @@ import (
6 6
 	"crypto/sha1"
7 7
 	"crypto/sha256"
8 8
 	"encoding/hex"
9
-	"encoding/json"
10 9
 	"fmt"
11 10
 	"io"
12 11
 	"io/ioutil"
... ...
@@ -19,7 +18,6 @@ import (
19 19
 	"strings"
20 20
 	"sync"
21 21
 	"syscall"
22
-	"time"
23 22
 
24 23
 	"github.com/docker/docker/dockerversion"
25 24
 )
... ...
@@ -264,42 +262,6 @@ func (r *bufReader) Close() error {
264 264
 	return closer.Close()
265 265
 }
266 266
 
267
-type JSONLog struct {
268
-	Log     string    `json:"log,omitempty"`
269
-	Stream  string    `json:"stream,omitempty"`
270
-	Created time.Time `json:"time"`
271
-}
272
-
273
-func (jl *JSONLog) Format(format string) (string, error) {
274
-	if format == "" {
275
-		return jl.Log, nil
276
-	}
277
-	if format == "json" {
278
-		m, err := json.Marshal(jl)
279
-		return string(m), err
280
-	}
281
-	return fmt.Sprintf("[%s] %s", jl.Created.Format(format), jl.Log), nil
282
-}
283
-
284
-func WriteLog(src io.Reader, dst io.WriteCloser, format string) error {
285
-	dec := json.NewDecoder(src)
286
-	for {
287
-		l := &JSONLog{}
288
-
289
-		if err := dec.Decode(l); err == io.EOF {
290
-			return nil
291
-		} else if err != nil {
292
-			Errorf("Error streaming logs: %s", err)
293
-			return err
294
-		}
295
-		line, err := l.Format(format)
296
-		if err != nil {
297
-			return err
298
-		}
299
-		fmt.Fprintf(dst, "%s", line)
300
-	}
301
-}
302
-
303 267
 func GetTotalUsedFds() int {
304 268
 	if fds, err := ioutil.ReadDir(fmt.Sprintf("/proc/%d/fd", os.Getpid())); err != nil {
305 269
 		Errorf("Error opening /proc/%d/fd: %s", os.Getpid(), err)