Browse code

Cleanup WriteFlusher

Signed-off-by: Brian Goff <cpuguy83@gmail.com>

Brian Goff authored on 2015/12/19 06:06:28
Showing 1 changed files
... ...
@@ -1,9 +1,7 @@
1 1
 package ioutils
2 2
 
3 3
 import (
4
-	"errors"
5 4
 	"io"
6
-	"net/http"
7 5
 	"sync"
8 6
 )
9 7
 
... ...
@@ -11,45 +9,43 @@ import (
11 11
 // is a flush. In addition, the Close method can be called to intercept
12 12
 // Read/Write calls if the targets lifecycle has already ended.
13 13
 type WriteFlusher struct {
14
-	mu      sync.Mutex
15
-	w       io.Writer
16
-	flusher http.Flusher
17
-	flushed bool
18
-	closed  error
14
+	w           io.Writer
15
+	flusher     flusher
16
+	flushed     chan struct{}
17
+	flushedOnce sync.Once
18
+	closed      chan struct{}
19
+	closeLock   sync.Mutex
20
+}
19 21
 
20
-	// TODO(stevvooe): Use channel for closed instead, remove mutex. Using a
21
-	// channel will allow one to properly order the operations.
22
+type flusher interface {
23
+	Flush()
22 24
 }
23 25
 
24
-var errWriteFlusherClosed = errors.New("writeflusher: closed")
26
+var errWriteFlusherClosed = io.EOF
25 27
 
26 28
 func (wf *WriteFlusher) Write(b []byte) (n int, err error) {
27
-	wf.mu.Lock()
28
-	defer wf.mu.Unlock()
29
-	if wf.closed != nil {
30
-		return 0, wf.closed
29
+	select {
30
+	case <-wf.closed:
31
+		return 0, errWriteFlusherClosed
32
+	default:
31 33
 	}
32 34
 
33 35
 	n, err = wf.w.Write(b)
34
-	wf.flush() // every write is a flush.
36
+	wf.Flush() // every write is a flush.
35 37
 	return n, err
36 38
 }
37 39
 
38 40
 // Flush the stream immediately.
39 41
 func (wf *WriteFlusher) Flush() {
40
-	wf.mu.Lock()
41
-	defer wf.mu.Unlock()
42
-
43
-	wf.flush()
44
-}
45
-
46
-// flush the stream immediately without taking a lock. Used internally.
47
-func (wf *WriteFlusher) flush() {
48
-	if wf.closed != nil {
42
+	select {
43
+	case <-wf.closed:
49 44
 		return
45
+	default:
50 46
 	}
51 47
 
52
-	wf.flushed = true
48
+	wf.flushedOnce.Do(func() {
49
+		close(wf.flushed)
50
+	})
53 51
 	wf.flusher.Flush()
54 52
 }
55 53
 
... ...
@@ -59,34 +55,38 @@ func (wf *WriteFlusher) Flushed() bool {
59 59
 	// BUG(stevvooe): Remove this method. Its use is inherently racy. Seems to
60 60
 	// be used to detect whether or a response code has been issued or not.
61 61
 	// Another hook should be used instead.
62
-	wf.mu.Lock()
63
-	defer wf.mu.Unlock()
64
-
65
-	return wf.flushed
62
+	var flushed bool
63
+	select {
64
+	case <-wf.flushed:
65
+		flushed = true
66
+	default:
67
+	}
68
+	return flushed
66 69
 }
67 70
 
68 71
 // Close closes the write flusher, disallowing any further writes to the
69 72
 // target. After the flusher is closed, all calls to write or flush will
70 73
 // result in an error.
71 74
 func (wf *WriteFlusher) Close() error {
72
-	wf.mu.Lock()
73
-	defer wf.mu.Unlock()
74
-
75
-	if wf.closed != nil {
76
-		return wf.closed
75
+	wf.closeLock.Lock()
76
+	defer wf.closeLock.Unlock()
77
+
78
+	select {
79
+	case <-wf.closed:
80
+		return errWriteFlusherClosed
81
+	default:
82
+		close(wf.closed)
77 83
 	}
78
-
79
-	wf.closed = errWriteFlusherClosed
80 84
 	return nil
81 85
 }
82 86
 
83 87
 // NewWriteFlusher returns a new WriteFlusher.
84 88
 func NewWriteFlusher(w io.Writer) *WriteFlusher {
85
-	var flusher http.Flusher
86
-	if f, ok := w.(http.Flusher); ok {
87
-		flusher = f
89
+	var fl flusher
90
+	if f, ok := w.(flusher); ok {
91
+		fl = f
88 92
 	} else {
89
-		flusher = &NopFlusher{}
93
+		fl = &NopFlusher{}
90 94
 	}
91
-	return &WriteFlusher{w: w, flusher: flusher}
95
+	return &WriteFlusher{w: w, flusher: fl, closed: make(chan struct{}), flushed: make(chan struct{})}
92 96
 }