Browse code

Add error detection and UT to image progress

Cesar Wong authored on 2016/06/18 11:19:56
Showing 3 changed files
... ...
@@ -1,9 +1,14 @@
1 1
 package imageprogress
2 2
 
3 3
 import (
4
+	"bytes"
4 5
 	"encoding/json"
6
+	"errors"
7
+	"fmt"
5 8
 	"io"
6 9
 	"regexp"
10
+	"strings"
11
+	"sync"
7 12
 	"time"
8 13
 )
9 14
 
... ...
@@ -17,6 +22,7 @@ type progressLine struct {
17 17
 	ID     string          `json:"id"`
18 18
 	Status string          `json:"status"`
19 19
 	Detail *progressDetail `json:"progressDetail"`
20
+	Error  string          `json:"error"`
20 21
 }
21 22
 
22 23
 // progressDetail is the progressDetail structure in a Docker pull progress line
... ...
@@ -94,33 +100,47 @@ func (r report) totalCount() int {
94 94
 	return cnt
95 95
 }
96 96
 
97
+// String is used for test output
98
+func (r report) String() string {
99
+	result := &bytes.Buffer{}
100
+	fmt.Fprintf(result, "{")
101
+	for k := range r {
102
+		var status string
103
+		switch k {
104
+		case statusPending:
105
+			status = "pending"
106
+		case statusDownloading:
107
+			status = "downloading"
108
+		case statusExtracting:
109
+			status = "extracting"
110
+		case statusComplete:
111
+			status = "complete"
112
+		}
113
+		fmt.Fprintf(result, "%s:{Count: %d, Current: %d, Total: %d}, ", status, r[k].Count, r[k].Current, r[k].Total)
114
+	}
115
+	fmt.Fprintf(result, "}")
116
+	return result.String()
117
+}
118
+
97 119
 // newWriter creates a writer that periodically reports
98 120
 // on pull/push progress of a Docker image. It only reports when the state of the
99 121
 // different layers has changed and uses time thresholds to limit the
100 122
 // rate of the reports.
101 123
 func newWriter(reportFn func(report), layersChangedFn func(report, report) bool) io.Writer {
102
-	pipeIn, pipeOut := io.Pipe()
103 124
 	writer := &imageProgressWriter{
104
-		Writer:                 pipeOut,
105
-		decoder:                json.NewDecoder(pipeIn),
125
+		mutex:                  &sync.Mutex{},
106 126
 		layerStatus:            map[string]progressLine{},
107 127
 		reportFn:               reportFn,
108 128
 		layersChangedFn:        layersChangedFn,
109 129
 		progressTimeThreshhold: defaultProgressTimeThreshhold,
110 130
 		stableThreshhold:       defaultStableThreshhold,
111 131
 	}
112
-	go func() {
113
-		err := writer.readProgress()
114
-		if err != nil {
115
-			pipeIn.CloseWithError(err)
116
-		}
117
-	}()
118 132
 	return writer
119 133
 }
120 134
 
121 135
 type imageProgressWriter struct {
122
-	io.Writer
123
-	decoder                *json.Decoder
136
+	mutex                  *sync.Mutex
137
+	internalWriter         io.Writer
124 138
 	layerStatus            map[string]progressLine
125 139
 	lastLayerCount         int
126 140
 	stableLines            int
... ...
@@ -132,10 +152,32 @@ type imageProgressWriter struct {
132 132
 	layersChangedFn        func(report, report) bool
133 133
 }
134 134
 
135
-func (w *imageProgressWriter) readProgress() error {
135
+func (w *imageProgressWriter) ReadFrom(reader io.Reader) (int64, error) {
136
+	decoder := json.NewDecoder(reader)
137
+	return 0, w.readProgress(decoder)
138
+}
139
+
140
+func (w *imageProgressWriter) Write(data []byte) (int, error) {
141
+	w.mutex.Lock()
142
+	defer w.mutex.Unlock()
143
+	if w.internalWriter == nil {
144
+		var pipeIn *io.PipeReader
145
+		pipeIn, w.internalWriter = io.Pipe()
146
+		decoder := json.NewDecoder(pipeIn)
147
+		go func() {
148
+			err := w.readProgress(decoder)
149
+			if err != nil {
150
+				pipeIn.CloseWithError(err)
151
+			}
152
+		}()
153
+	}
154
+	return w.internalWriter.Write(data)
155
+}
156
+
157
+func (w *imageProgressWriter) readProgress(decoder *json.Decoder) error {
136 158
 	for {
137 159
 		line := &progressLine{}
138
-		err := w.decoder.Decode(line)
160
+		err := decoder.Decode(line)
139 161
 		if err == io.EOF {
140 162
 			break
141 163
 		}
... ...
@@ -151,6 +193,11 @@ func (w *imageProgressWriter) readProgress() error {
151 151
 }
152 152
 
153 153
 func (w *imageProgressWriter) processLine(line *progressLine) error {
154
+
155
+	if err := getError(line); err != nil {
156
+		return err
157
+	}
158
+
154 159
 	// determine if it's a line we want to process
155 160
 	if !islayerStatus(line) {
156 161
 		return nil
... ...
@@ -212,9 +259,20 @@ func islayerStatus(line *progressLine) bool {
212 212
 	if !layerIDRegexp.MatchString(line.ID) {
213 213
 		return false
214 214
 	}
215
+	// ignore retrying status
216
+	if strings.HasPrefix(line.Status, "Retrying") {
217
+		return false
218
+	}
215 219
 	return true
216 220
 }
217 221
 
222
+func getError(line *progressLine) error {
223
+	if len(line.Error) > 0 {
224
+		return errors.New(line.Error)
225
+	}
226
+	return nil
227
+}
228
+
218 229
 func createReport(dockerProgress map[string]progressLine) report {
219 230
 	r := report{}
220 231
 	for _, line := range dockerProgress {
221 232
new file mode 100644
... ...
@@ -0,0 +1,183 @@
0
+package imageprogress
1
+
2
+import (
3
+	"encoding/json"
4
+	"io"
5
+	"reflect"
6
+	"testing"
7
+)
8
+
9
+func TestReports(t *testing.T) {
10
+	tests := []struct {
11
+		name        string
12
+		gen         func(*progressGenerator)
13
+		errExpected bool
14
+		expected    report
15
+	}{
16
+		{
17
+			name: "simple report",
18
+			gen: func(p *progressGenerator) {
19
+				p.status("1", "Extracting")
20
+				p.status("2", "Downloading")
21
+				p.status("1", "Downloading")
22
+				p.status("2", "Pull complete")
23
+			},
24
+			expected: report{
25
+				statusDownloading: &layerDetail{Count: 1},
26
+				statusComplete:    &layerDetail{Count: 1},
27
+			},
28
+		},
29
+		{
30
+			name: "ignore invalid layer id",
31
+			gen: func(p *progressGenerator) {
32
+				p.status("1", "Downloading")
33
+				p.status("hello", "testing")
34
+				p.status("1", "Downloading")
35
+			},
36
+			expected: report{
37
+				statusDownloading: &layerDetail{Count: 1},
38
+			},
39
+		},
40
+		{
41
+			name: "ignore retrying status",
42
+			gen: func(p *progressGenerator) {
43
+				p.status("1", "Downloading")
44
+				p.status("2", "Pull complete")
45
+				p.status("1", "Downloading")
46
+				p.status("3", "Retrying")
47
+			},
48
+			expected: report{
49
+				statusDownloading: &layerDetail{Count: 1},
50
+				statusComplete:    &layerDetail{Count: 1},
51
+			},
52
+		},
53
+		{
54
+			name: "detect error",
55
+			gen: func(p *progressGenerator) {
56
+				p.status("1", "Downloading")
57
+				p.err("an error")
58
+			},
59
+			errExpected: true,
60
+		},
61
+	}
62
+
63
+	for _, test := range tests {
64
+		pipeIn, pipeOut := io.Pipe()
65
+		go func() {
66
+			p := newProgressGenerator(pipeOut)
67
+			test.gen(p)
68
+			pipeOut.Close()
69
+		}()
70
+		var lastReport report
71
+		w := newWriter(
72
+			func(r report) {
73
+				lastReport = r
74
+			},
75
+			func(a report, b report) bool {
76
+				return true
77
+			},
78
+		)
79
+		w.(*imageProgressWriter).stableThreshhold = 0
80
+		_, err := io.Copy(w, pipeIn)
81
+		if err != nil {
82
+			if !test.errExpected {
83
+				t.Errorf("%s: unexpected: %v", test.name, err)
84
+			}
85
+			continue
86
+		}
87
+		if test.errExpected {
88
+			t.Errorf("%s: did not get expected error", test.name)
89
+			continue
90
+		}
91
+		if !compareReport(lastReport, test.expected) {
92
+			t.Errorf("%s: unexpected report, got: %v, expected: %v", test.name, lastReport, test.expected)
93
+		}
94
+	}
95
+}
96
+
97
+func TestErrorOnCopy(t *testing.T) {
98
+	// Producer pipe
99
+	genIn, genOut := io.Pipe()
100
+	p := newProgressGenerator(genOut)
101
+
102
+	// generate some data
103
+	go func() {
104
+		for i := 0; i < 100; i++ {
105
+			p.status("1", "Downloading")
106
+			p.status("2", "Downloading")
107
+			p.status("3", "Downloading")
108
+		}
109
+		p.err("data error")
110
+		genOut.Close()
111
+	}()
112
+
113
+	w := newWriter(func(r report) {}, func(a, b report) bool { return true })
114
+
115
+	// Ensure that the error is propagated to the copy
116
+	_, err := io.Copy(w, genIn)
117
+	if err == nil {
118
+		t.Errorf("Did not get an error when copying to writer")
119
+	}
120
+	if err.Error() != "data error" {
121
+		t.Errorf("Did not get expected error: %v", err)
122
+	}
123
+}
124
+
125
+func TestStableLayerCount(t *testing.T) {
126
+	var result report
127
+	w := newWriter(func(r report) { result = r }, func(a, b report) bool { return true })
128
+	w.(*imageProgressWriter).stableThreshhold = 3 // This means that the number of layers must be stable for at least 3 lines
129
+	p := newProgressGenerator(w)
130
+
131
+	// Increase number of layers by one each time
132
+	p.status("1", "one")
133
+	p.status("2", "two")
134
+	p.status("3", "three")
135
+	if result != nil {
136
+		t.Errorf("do not expect any reports at this point")
137
+		return
138
+	}
139
+
140
+	// Report on the same layers as before, keeping the
141
+	// the number stable
142
+	p.status("1", "one-a")
143
+	p.status("2", "two-a")
144
+	p.status("3", "three-a")
145
+	expected := report{
146
+		statusPending: &layerDetail{Count: 3},
147
+	}
148
+	if !compareReport(result, expected) {
149
+		t.Errorf("did not get expected report")
150
+	}
151
+}
152
+
153
+func compareReport(a, b report) bool {
154
+	if len(a) != len(b) {
155
+		return false
156
+	}
157
+	for k := range a {
158
+		if _, ok := b[k]; !ok {
159
+			return false
160
+		}
161
+		if !reflect.DeepEqual(*a[k], *b[k]) {
162
+			return false
163
+		}
164
+	}
165
+	return true
166
+}
167
+
168
+type progressGenerator json.Encoder
169
+
170
+func newProgressGenerator(w io.Writer) *progressGenerator {
171
+	return (*progressGenerator)(json.NewEncoder(w))
172
+}
173
+
174
+func (p *progressGenerator) status(id, status string) {
175
+	(*json.Encoder)(p).Encode(&progressLine{ID: id, Status: status})
176
+}
177
+func (p *progressGenerator) detail(id, status string, current, total int64) {
178
+	(*json.Encoder)(p).Encode(&progressLine{ID: id, Status: status, Detail: &progressDetail{Current: current, Total: total}})
179
+}
180
+func (p *progressGenerator) err(msg string) {
181
+	(*json.Encoder)(p).Encode(&progressLine{Error: msg})
182
+}
... ...
@@ -78,7 +78,7 @@ type file struct {
78 78
 }
79 79
 
80 80
 func (f file) Is(level int) bool {
81
-	return level <= f.level
81
+	return level <= f.level || bool(glog.V(glog.Level(level)))
82 82
 }
83 83
 
84 84
 func (f file) V(level int) Logger {