Signed-off-by: Victor Vieux <victorvieux@gmail.com>
Victor Vieux authored on 2017/06/02 08:22:001 | 1 |
new file mode 100644 |
... | ... |
@@ -0,0 +1,224 @@ |
0 |
+package multireader |
|
1 |
+ |
|
2 |
+import ( |
|
3 |
+ "bytes" |
|
4 |
+ "fmt" |
|
5 |
+ "io" |
|
6 |
+ "os" |
|
7 |
+) |
|
8 |
+ |
|
9 |
+type pos struct { |
|
10 |
+ idx int |
|
11 |
+ offset int64 |
|
12 |
+} |
|
13 |
+ |
|
14 |
+type multiReadSeeker struct { |
|
15 |
+ readers []io.ReadSeeker |
|
16 |
+ pos *pos |
|
17 |
+ posIdx map[io.ReadSeeker]int |
|
18 |
+} |
|
19 |
+ |
|
20 |
+func (r *multiReadSeeker) Seek(offset int64, whence int) (int64, error) { |
|
21 |
+ var tmpOffset int64 |
|
22 |
+ switch whence { |
|
23 |
+ case os.SEEK_SET: |
|
24 |
+ for i, rdr := range r.readers { |
|
25 |
+ // get size of the current reader |
|
26 |
+ s, err := rdr.Seek(0, os.SEEK_END) |
|
27 |
+ if err != nil { |
|
28 |
+ return -1, err |
|
29 |
+ } |
|
30 |
+ |
|
31 |
+ if offset > tmpOffset+s { |
|
32 |
+ if i == len(r.readers)-1 { |
|
33 |
+ rdrOffset := s + (offset - tmpOffset) |
|
34 |
+ if _, err := rdr.Seek(rdrOffset, os.SEEK_SET); err != nil { |
|
35 |
+ return -1, err |
|
36 |
+ } |
|
37 |
+ r.pos = &pos{i, rdrOffset} |
|
38 |
+ return offset, nil |
|
39 |
+ } |
|
40 |
+ |
|
41 |
+ tmpOffset += s |
|
42 |
+ continue |
|
43 |
+ } |
|
44 |
+ |
|
45 |
+ rdrOffset := offset - tmpOffset |
|
46 |
+ idx := i |
|
47 |
+ |
|
48 |
+ rdr.Seek(rdrOffset, os.SEEK_SET) |
|
49 |
+ // make sure all following readers are at 0 |
|
50 |
+ for _, rdr := range r.readers[i+1:] { |
|
51 |
+ rdr.Seek(0, os.SEEK_SET) |
|
52 |
+ } |
|
53 |
+ |
|
54 |
+ if rdrOffset == s && i != len(r.readers)-1 { |
|
55 |
+ idx++ |
|
56 |
+ rdrOffset = 0 |
|
57 |
+ } |
|
58 |
+ r.pos = &pos{idx, rdrOffset} |
|
59 |
+ return offset, nil |
|
60 |
+ } |
|
61 |
+ case os.SEEK_END: |
|
62 |
+ for _, rdr := range r.readers { |
|
63 |
+ s, err := rdr.Seek(0, os.SEEK_END) |
|
64 |
+ if err != nil { |
|
65 |
+ return -1, err |
|
66 |
+ } |
|
67 |
+ tmpOffset += s |
|
68 |
+ } |
|
69 |
+ r.Seek(tmpOffset+offset, os.SEEK_SET) |
|
70 |
+ return tmpOffset + offset, nil |
|
71 |
+ case os.SEEK_CUR: |
|
72 |
+ if r.pos == nil { |
|
73 |
+ return r.Seek(offset, os.SEEK_SET) |
|
74 |
+ } |
|
75 |
+ // Just return the current offset |
|
76 |
+ if offset == 0 { |
|
77 |
+ return r.getCurOffset() |
|
78 |
+ } |
|
79 |
+ |
|
80 |
+ curOffset, err := r.getCurOffset() |
|
81 |
+ if err != nil { |
|
82 |
+ return -1, err |
|
83 |
+ } |
|
84 |
+ rdr, rdrOffset, err := r.getReaderForOffset(curOffset + offset) |
|
85 |
+ if err != nil { |
|
86 |
+ return -1, err |
|
87 |
+ } |
|
88 |
+ |
|
89 |
+ r.pos = &pos{r.posIdx[rdr], rdrOffset} |
|
90 |
+ return curOffset + offset, nil |
|
91 |
+ default: |
|
92 |
+ return -1, fmt.Errorf("Invalid whence: %d", whence) |
|
93 |
+ } |
|
94 |
+ |
|
95 |
+ return -1, fmt.Errorf("Error seeking for whence: %d, offset: %d", whence, offset) |
|
96 |
+} |
|
97 |
+ |
|
98 |
+func (r *multiReadSeeker) getReaderForOffset(offset int64) (io.ReadSeeker, int64, error) { |
|
99 |
+ |
|
100 |
+ var offsetTo int64 |
|
101 |
+ |
|
102 |
+ for _, rdr := range r.readers { |
|
103 |
+ size, err := getReadSeekerSize(rdr) |
|
104 |
+ if err != nil { |
|
105 |
+ return nil, -1, err |
|
106 |
+ } |
|
107 |
+ if offsetTo+size > offset { |
|
108 |
+ return rdr, offset - offsetTo, nil |
|
109 |
+ } |
|
110 |
+ if rdr == r.readers[len(r.readers)-1] { |
|
111 |
+ return rdr, offsetTo + offset, nil |
|
112 |
+ } |
|
113 |
+ offsetTo += size |
|
114 |
+ } |
|
115 |
+ |
|
116 |
+ return nil, 0, nil |
|
117 |
+} |
|
118 |
+ |
|
119 |
+func (r *multiReadSeeker) getCurOffset() (int64, error) { |
|
120 |
+ var totalSize int64 |
|
121 |
+ for _, rdr := range r.readers[:r.pos.idx+1] { |
|
122 |
+ if r.posIdx[rdr] == r.pos.idx { |
|
123 |
+ totalSize += r.pos.offset |
|
124 |
+ break |
|
125 |
+ } |
|
126 |
+ |
|
127 |
+ size, err := getReadSeekerSize(rdr) |
|
128 |
+ if err != nil { |
|
129 |
+ return -1, fmt.Errorf("error getting seeker size: %v", err) |
|
130 |
+ } |
|
131 |
+ totalSize += size |
|
132 |
+ } |
|
133 |
+ return totalSize, nil |
|
134 |
+} |
|
135 |
+ |
|
136 |
+func (r *multiReadSeeker) getOffsetToReader(rdr io.ReadSeeker) (int64, error) { |
|
137 |
+ var offset int64 |
|
138 |
+ for _, r := range r.readers { |
|
139 |
+ if r == rdr { |
|
140 |
+ break |
|
141 |
+ } |
|
142 |
+ |
|
143 |
+ size, err := getReadSeekerSize(rdr) |
|
144 |
+ if err != nil { |
|
145 |
+ return -1, err |
|
146 |
+ } |
|
147 |
+ offset += size |
|
148 |
+ } |
|
149 |
+ return offset, nil |
|
150 |
+} |
|
151 |
+ |
|
152 |
+func (r *multiReadSeeker) Read(b []byte) (int, error) { |
|
153 |
+ if r.pos == nil { |
|
154 |
+ // make sure all readers are at 0 |
|
155 |
+ r.Seek(0, os.SEEK_SET) |
|
156 |
+ } |
|
157 |
+ |
|
158 |
+ bLen := int64(len(b)) |
|
159 |
+ buf := bytes.NewBuffer(nil) |
|
160 |
+ var rdr io.ReadSeeker |
|
161 |
+ |
|
162 |
+ for _, rdr = range r.readers[r.pos.idx:] { |
|
163 |
+ readBytes, err := io.CopyN(buf, rdr, bLen) |
|
164 |
+ if err != nil && err != io.EOF { |
|
165 |
+ return -1, err |
|
166 |
+ } |
|
167 |
+ bLen -= readBytes |
|
168 |
+ |
|
169 |
+ if bLen == 0 { |
|
170 |
+ break |
|
171 |
+ } |
|
172 |
+ } |
|
173 |
+ |
|
174 |
+ rdrPos, err := rdr.Seek(0, os.SEEK_CUR) |
|
175 |
+ if err != nil { |
|
176 |
+ return -1, err |
|
177 |
+ } |
|
178 |
+ r.pos = &pos{r.posIdx[rdr], rdrPos} |
|
179 |
+ return buf.Read(b) |
|
180 |
+} |
|
181 |
+ |
|
182 |
+func getReadSeekerSize(rdr io.ReadSeeker) (int64, error) { |
|
183 |
+ // save the current position |
|
184 |
+ pos, err := rdr.Seek(0, os.SEEK_CUR) |
|
185 |
+ if err != nil { |
|
186 |
+ return -1, err |
|
187 |
+ } |
|
188 |
+ |
|
189 |
+ // get the size |
|
190 |
+ size, err := rdr.Seek(0, os.SEEK_END) |
|
191 |
+ if err != nil { |
|
192 |
+ return -1, err |
|
193 |
+ } |
|
194 |
+ |
|
195 |
+ // reset the position |
|
196 |
+ if _, err := rdr.Seek(pos, os.SEEK_SET); err != nil { |
|
197 |
+ return -1, err |
|
198 |
+ } |
|
199 |
+ return size, nil |
|
200 |
+} |
|
201 |
+ |
|
202 |
+// MultiReadSeeker returns a ReadSeeker that's the logical concatenation of the provided |
|
203 |
+// input readseekers. After calling this method the initial position is set to the |
|
204 |
+// beginning of the first ReadSeeker. At the end of a ReadSeeker, Read always advances |
|
205 |
+// to the beginning of the next ReadSeeker and returns EOF at the end of the last ReadSeeker. |
|
206 |
+// Seek can be used over the sum of lengths of all readseekers. |
|
207 |
+// |
|
208 |
+// When a MultiReadSeeker is used, no Read and Seek operations should be made on |
|
209 |
+// its ReadSeeker components. Also, users should make no assumption on the state |
|
210 |
+// of individual readseekers while the MultiReadSeeker is used. |
|
211 |
+func MultiReadSeeker(readers ...io.ReadSeeker) io.ReadSeeker { |
|
212 |
+ if len(readers) == 1 { |
|
213 |
+ return readers[0] |
|
214 |
+ } |
|
215 |
+ idx := make(map[io.ReadSeeker]int) |
|
216 |
+ for i, rdr := range readers { |
|
217 |
+ idx[rdr] = i |
|
218 |
+ } |
|
219 |
+ return &multiReadSeeker{ |
|
220 |
+ readers: readers, |
|
221 |
+ posIdx: idx, |
|
222 |
+ } |
|
223 |
+} |
0 | 224 |
new file mode 100644 |
... | ... |
@@ -0,0 +1,225 @@ |
0 |
+package multireader |
|
1 |
+ |
|
2 |
+import ( |
|
3 |
+ "bytes" |
|
4 |
+ "encoding/binary" |
|
5 |
+ "fmt" |
|
6 |
+ "io" |
|
7 |
+ "io/ioutil" |
|
8 |
+ "os" |
|
9 |
+ "strings" |
|
10 |
+ "testing" |
|
11 |
+) |
|
12 |
+ |
|
13 |
+func TestMultiReadSeekerReadAll(t *testing.T) { |
|
14 |
+ str := "hello world" |
|
15 |
+ s1 := strings.NewReader(str + " 1") |
|
16 |
+ s2 := strings.NewReader(str + " 2") |
|
17 |
+ s3 := strings.NewReader(str + " 3") |
|
18 |
+ mr := MultiReadSeeker(s1, s2, s3) |
|
19 |
+ |
|
20 |
+ expectedSize := int64(s1.Len() + s2.Len() + s3.Len()) |
|
21 |
+ |
|
22 |
+ b, err := ioutil.ReadAll(mr) |
|
23 |
+ if err != nil { |
|
24 |
+ t.Fatal(err) |
|
25 |
+ } |
|
26 |
+ |
|
27 |
+ expected := "hello world 1hello world 2hello world 3" |
|
28 |
+ if string(b) != expected { |
|
29 |
+ t.Fatalf("ReadAll failed, got: %q, expected %q", string(b), expected) |
|
30 |
+ } |
|
31 |
+ |
|
32 |
+ size, err := mr.Seek(0, os.SEEK_END) |
|
33 |
+ if err != nil { |
|
34 |
+ t.Fatal(err) |
|
35 |
+ } |
|
36 |
+ if size != expectedSize { |
|
37 |
+ t.Fatalf("reader size does not match, got %d, expected %d", size, expectedSize) |
|
38 |
+ } |
|
39 |
+ |
|
40 |
+ // Reset the position and read again |
|
41 |
+ pos, err := mr.Seek(0, os.SEEK_SET) |
|
42 |
+ if err != nil { |
|
43 |
+ t.Fatal(err) |
|
44 |
+ } |
|
45 |
+ if pos != 0 { |
|
46 |
+ t.Fatalf("expected position to be set to 0, got %d", pos) |
|
47 |
+ } |
|
48 |
+ |
|
49 |
+ b, err = ioutil.ReadAll(mr) |
|
50 |
+ if err != nil { |
|
51 |
+ t.Fatal(err) |
|
52 |
+ } |
|
53 |
+ |
|
54 |
+ if string(b) != expected { |
|
55 |
+ t.Fatalf("ReadAll failed, got: %q, expected %q", string(b), expected) |
|
56 |
+ } |
|
57 |
+ |
|
58 |
+ // The positions of some readers are not 0 |
|
59 |
+ s1.Seek(0, os.SEEK_SET) |
|
60 |
+ s2.Seek(0, os.SEEK_END) |
|
61 |
+ s3.Seek(0, os.SEEK_SET) |
|
62 |
+ mr = MultiReadSeeker(s1, s2, s3) |
|
63 |
+ b, err = ioutil.ReadAll(mr) |
|
64 |
+ if err != nil { |
|
65 |
+ t.Fatal(err) |
|
66 |
+ } |
|
67 |
+ |
|
68 |
+ if string(b) != expected { |
|
69 |
+ t.Fatalf("ReadAll failed, got: %q, expected %q", string(b), expected) |
|
70 |
+ } |
|
71 |
+} |
|
72 |
+ |
|
73 |
+func TestMultiReadSeekerReadEach(t *testing.T) { |
|
74 |
+ str := "hello world" |
|
75 |
+ s1 := strings.NewReader(str + " 1") |
|
76 |
+ s2 := strings.NewReader(str + " 2") |
|
77 |
+ s3 := strings.NewReader(str + " 3") |
|
78 |
+ mr := MultiReadSeeker(s1, s2, s3) |
|
79 |
+ |
|
80 |
+ var totalBytes int64 |
|
81 |
+ for i, s := range []*strings.Reader{s1, s2, s3} { |
|
82 |
+ sLen := int64(s.Len()) |
|
83 |
+ buf := make([]byte, s.Len()) |
|
84 |
+ expected := []byte(fmt.Sprintf("%s %d", str, i+1)) |
|
85 |
+ |
|
86 |
+ if _, err := mr.Read(buf); err != nil && err != io.EOF { |
|
87 |
+ t.Fatal(err) |
|
88 |
+ } |
|
89 |
+ |
|
90 |
+ if !bytes.Equal(buf, expected) { |
|
91 |
+ t.Fatalf("expected %q to be %q", string(buf), string(expected)) |
|
92 |
+ } |
|
93 |
+ |
|
94 |
+ pos, err := mr.Seek(0, os.SEEK_CUR) |
|
95 |
+ if err != nil { |
|
96 |
+ t.Fatalf("iteration: %d, error: %v", i+1, err) |
|
97 |
+ } |
|
98 |
+ |
|
99 |
+ // check that the total bytes read is the current position of the seeker |
|
100 |
+ totalBytes += sLen |
|
101 |
+ if pos != totalBytes { |
|
102 |
+ t.Fatalf("expected current position to be: %d, got: %d, iteration: %d", totalBytes, pos, i+1) |
|
103 |
+ } |
|
104 |
+ |
|
105 |
+ // This tests not only that SEEK_SET and SEEK_CUR give the same values, but that the next iteration is in the expected position as well |
|
106 |
+ newPos, err := mr.Seek(pos, os.SEEK_SET) |
|
107 |
+ if err != nil { |
|
108 |
+ t.Fatal(err) |
|
109 |
+ } |
|
110 |
+ if newPos != pos { |
|
111 |
+ t.Fatalf("expected to get same position when calling SEEK_SET with value from SEEK_CUR, cur: %d, set: %d", pos, newPos) |
|
112 |
+ } |
|
113 |
+ } |
|
114 |
+} |
|
115 |
+ |
|
116 |
+func TestMultiReadSeekerReadSpanningChunks(t *testing.T) { |
|
117 |
+ str := "hello world" |
|
118 |
+ s1 := strings.NewReader(str + " 1") |
|
119 |
+ s2 := strings.NewReader(str + " 2") |
|
120 |
+ s3 := strings.NewReader(str + " 3") |
|
121 |
+ mr := MultiReadSeeker(s1, s2, s3) |
|
122 |
+ |
|
123 |
+ buf := make([]byte, s1.Len()+3) |
|
124 |
+ _, err := mr.Read(buf) |
|
125 |
+ if err != nil { |
|
126 |
+ t.Fatal(err) |
|
127 |
+ } |
|
128 |
+ |
|
129 |
+ // expected is the contents of s1 + 3 bytes from s2, ie, the `hel` at the end of this string |
|
130 |
+ expected := "hello world 1hel" |
|
131 |
+ if string(buf) != expected { |
|
132 |
+ t.Fatalf("expected %s to be %s", string(buf), expected) |
|
133 |
+ } |
|
134 |
+} |
|
135 |
+ |
|
136 |
+func TestMultiReadSeekerNegativeSeek(t *testing.T) { |
|
137 |
+ str := "hello world" |
|
138 |
+ s1 := strings.NewReader(str + " 1") |
|
139 |
+ s2 := strings.NewReader(str + " 2") |
|
140 |
+ s3 := strings.NewReader(str + " 3") |
|
141 |
+ mr := MultiReadSeeker(s1, s2, s3) |
|
142 |
+ |
|
143 |
+ s1Len := s1.Len() |
|
144 |
+ s2Len := s2.Len() |
|
145 |
+ s3Len := s3.Len() |
|
146 |
+ |
|
147 |
+ s, err := mr.Seek(int64(-1*s3.Len()), os.SEEK_END) |
|
148 |
+ if err != nil { |
|
149 |
+ t.Fatal(err) |
|
150 |
+ } |
|
151 |
+ if s != int64(s1Len+s2Len) { |
|
152 |
+ t.Fatalf("expected %d to be %d", s, s1.Len()+s2.Len()) |
|
153 |
+ } |
|
154 |
+ |
|
155 |
+ buf := make([]byte, s3Len) |
|
156 |
+ if _, err := mr.Read(buf); err != nil && err != io.EOF { |
|
157 |
+ t.Fatal(err) |
|
158 |
+ } |
|
159 |
+ expected := fmt.Sprintf("%s %d", str, 3) |
|
160 |
+ if string(buf) != fmt.Sprintf("%s %d", str, 3) { |
|
161 |
+ t.Fatalf("expected %q to be %q", string(buf), expected) |
|
162 |
+ } |
|
163 |
+} |
|
164 |
+ |
|
165 |
+func TestMultiReadSeekerCurAfterSet(t *testing.T) { |
|
166 |
+ str := "hello world" |
|
167 |
+ s1 := strings.NewReader(str + " 1") |
|
168 |
+ s2 := strings.NewReader(str + " 2") |
|
169 |
+ s3 := strings.NewReader(str + " 3") |
|
170 |
+ mr := MultiReadSeeker(s1, s2, s3) |
|
171 |
+ |
|
172 |
+ mid := int64(s1.Len() + s2.Len()/2) |
|
173 |
+ |
|
174 |
+ size, err := mr.Seek(mid, os.SEEK_SET) |
|
175 |
+ if err != nil { |
|
176 |
+ t.Fatal(err) |
|
177 |
+ } |
|
178 |
+ if size != mid { |
|
179 |
+ t.Fatalf("reader size does not match, got %d, expected %d", size, mid) |
|
180 |
+ } |
|
181 |
+ |
|
182 |
+ size, err = mr.Seek(3, os.SEEK_CUR) |
|
183 |
+ if err != nil { |
|
184 |
+ t.Fatal(err) |
|
185 |
+ } |
|
186 |
+ if size != mid+3 { |
|
187 |
+ t.Fatalf("reader size does not match, got %d, expected %d", size, mid+3) |
|
188 |
+ } |
|
189 |
+ size, err = mr.Seek(5, os.SEEK_CUR) |
|
190 |
+ if err != nil { |
|
191 |
+ t.Fatal(err) |
|
192 |
+ } |
|
193 |
+ if size != mid+8 { |
|
194 |
+ t.Fatalf("reader size does not match, got %d, expected %d", size, mid+8) |
|
195 |
+ } |
|
196 |
+ |
|
197 |
+ size, err = mr.Seek(10, os.SEEK_CUR) |
|
198 |
+ if err != nil { |
|
199 |
+ t.Fatal(err) |
|
200 |
+ } |
|
201 |
+ if size != mid+18 { |
|
202 |
+ t.Fatalf("reader size does not match, got %d, expected %d", size, mid+18) |
|
203 |
+ } |
|
204 |
+} |
|
205 |
+ |
|
206 |
+func TestMultiReadSeekerSmallReads(t *testing.T) { |
|
207 |
+ readers := []io.ReadSeeker{} |
|
208 |
+ for i := 0; i < 10; i++ { |
|
209 |
+ integer := make([]byte, 4) |
|
210 |
+ binary.BigEndian.PutUint32(integer, uint32(i)) |
|
211 |
+ readers = append(readers, bytes.NewReader(integer)) |
|
212 |
+ } |
|
213 |
+ |
|
214 |
+ reader := MultiReadSeeker(readers...) |
|
215 |
+ for i := 0; i < 10; i++ { |
|
216 |
+ var integer uint32 |
|
217 |
+ if err := binary.Read(reader, binary.BigEndian, &integer); err != nil { |
|
218 |
+ t.Fatalf("Read from NewMultiReadSeeker failed: %v", err) |
|
219 |
+ } |
|
220 |
+ if uint32(i) != integer { |
|
221 |
+ t.Fatalf("Read wrong value from NewMultiReadSeeker: %d != %d", i, integer) |
|
222 |
+ } |
|
223 |
+ } |
|
224 |
+} |
... | ... |
@@ -14,8 +14,8 @@ import ( |
14 | 14 |
|
15 | 15 |
"github.com/Sirupsen/logrus" |
16 | 16 |
"github.com/docker/docker/daemon/logger" |
17 |
+ "github.com/docker/docker/daemon/logger/jsonfilelog/multireader" |
|
17 | 18 |
"github.com/docker/docker/pkg/filenotify" |
18 |
- "github.com/docker/docker/pkg/ioutils" |
|
19 | 19 |
"github.com/docker/docker/pkg/jsonlog" |
20 | 20 |
"github.com/docker/docker/pkg/tailfile" |
21 | 21 |
) |
... | ... |
@@ -77,7 +77,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R |
77 | 77 |
defer latestFile.Close() |
78 | 78 |
|
79 | 79 |
if config.Tail != 0 { |
80 |
- tailer := ioutils.MultiReadSeeker(append(files, latestFile)...) |
|
80 |
+ tailer := multireader.MultiReadSeeker(append(files, latestFile)...) |
|
81 | 81 |
tailFile(tailer, logWatcher, config.Tail, config.Since) |
82 | 82 |
} |
83 | 83 |
|
84 | 84 |
deleted file mode 100644 |
... | ... |
@@ -1,224 +0,0 @@ |
1 |
-package ioutils |
|
2 |
- |
|
3 |
-import ( |
|
4 |
- "bytes" |
|
5 |
- "fmt" |
|
6 |
- "io" |
|
7 |
- "os" |
|
8 |
-) |
|
9 |
- |
|
10 |
-type pos struct { |
|
11 |
- idx int |
|
12 |
- offset int64 |
|
13 |
-} |
|
14 |
- |
|
15 |
-type multiReadSeeker struct { |
|
16 |
- readers []io.ReadSeeker |
|
17 |
- pos *pos |
|
18 |
- posIdx map[io.ReadSeeker]int |
|
19 |
-} |
|
20 |
- |
|
21 |
-func (r *multiReadSeeker) Seek(offset int64, whence int) (int64, error) { |
|
22 |
- var tmpOffset int64 |
|
23 |
- switch whence { |
|
24 |
- case os.SEEK_SET: |
|
25 |
- for i, rdr := range r.readers { |
|
26 |
- // get size of the current reader |
|
27 |
- s, err := rdr.Seek(0, os.SEEK_END) |
|
28 |
- if err != nil { |
|
29 |
- return -1, err |
|
30 |
- } |
|
31 |
- |
|
32 |
- if offset > tmpOffset+s { |
|
33 |
- if i == len(r.readers)-1 { |
|
34 |
- rdrOffset := s + (offset - tmpOffset) |
|
35 |
- if _, err := rdr.Seek(rdrOffset, os.SEEK_SET); err != nil { |
|
36 |
- return -1, err |
|
37 |
- } |
|
38 |
- r.pos = &pos{i, rdrOffset} |
|
39 |
- return offset, nil |
|
40 |
- } |
|
41 |
- |
|
42 |
- tmpOffset += s |
|
43 |
- continue |
|
44 |
- } |
|
45 |
- |
|
46 |
- rdrOffset := offset - tmpOffset |
|
47 |
- idx := i |
|
48 |
- |
|
49 |
- rdr.Seek(rdrOffset, os.SEEK_SET) |
|
50 |
- // make sure all following readers are at 0 |
|
51 |
- for _, rdr := range r.readers[i+1:] { |
|
52 |
- rdr.Seek(0, os.SEEK_SET) |
|
53 |
- } |
|
54 |
- |
|
55 |
- if rdrOffset == s && i != len(r.readers)-1 { |
|
56 |
- idx++ |
|
57 |
- rdrOffset = 0 |
|
58 |
- } |
|
59 |
- r.pos = &pos{idx, rdrOffset} |
|
60 |
- return offset, nil |
|
61 |
- } |
|
62 |
- case os.SEEK_END: |
|
63 |
- for _, rdr := range r.readers { |
|
64 |
- s, err := rdr.Seek(0, os.SEEK_END) |
|
65 |
- if err != nil { |
|
66 |
- return -1, err |
|
67 |
- } |
|
68 |
- tmpOffset += s |
|
69 |
- } |
|
70 |
- r.Seek(tmpOffset+offset, os.SEEK_SET) |
|
71 |
- return tmpOffset + offset, nil |
|
72 |
- case os.SEEK_CUR: |
|
73 |
- if r.pos == nil { |
|
74 |
- return r.Seek(offset, os.SEEK_SET) |
|
75 |
- } |
|
76 |
- // Just return the current offset |
|
77 |
- if offset == 0 { |
|
78 |
- return r.getCurOffset() |
|
79 |
- } |
|
80 |
- |
|
81 |
- curOffset, err := r.getCurOffset() |
|
82 |
- if err != nil { |
|
83 |
- return -1, err |
|
84 |
- } |
|
85 |
- rdr, rdrOffset, err := r.getReaderForOffset(curOffset + offset) |
|
86 |
- if err != nil { |
|
87 |
- return -1, err |
|
88 |
- } |
|
89 |
- |
|
90 |
- r.pos = &pos{r.posIdx[rdr], rdrOffset} |
|
91 |
- return curOffset + offset, nil |
|
92 |
- default: |
|
93 |
- return -1, fmt.Errorf("Invalid whence: %d", whence) |
|
94 |
- } |
|
95 |
- |
|
96 |
- return -1, fmt.Errorf("Error seeking for whence: %d, offset: %d", whence, offset) |
|
97 |
-} |
|
98 |
- |
|
99 |
-func (r *multiReadSeeker) getReaderForOffset(offset int64) (io.ReadSeeker, int64, error) { |
|
100 |
- |
|
101 |
- var offsetTo int64 |
|
102 |
- |
|
103 |
- for _, rdr := range r.readers { |
|
104 |
- size, err := getReadSeekerSize(rdr) |
|
105 |
- if err != nil { |
|
106 |
- return nil, -1, err |
|
107 |
- } |
|
108 |
- if offsetTo+size > offset { |
|
109 |
- return rdr, offset - offsetTo, nil |
|
110 |
- } |
|
111 |
- if rdr == r.readers[len(r.readers)-1] { |
|
112 |
- return rdr, offsetTo + offset, nil |
|
113 |
- } |
|
114 |
- offsetTo += size |
|
115 |
- } |
|
116 |
- |
|
117 |
- return nil, 0, nil |
|
118 |
-} |
|
119 |
- |
|
120 |
-func (r *multiReadSeeker) getCurOffset() (int64, error) { |
|
121 |
- var totalSize int64 |
|
122 |
- for _, rdr := range r.readers[:r.pos.idx+1] { |
|
123 |
- if r.posIdx[rdr] == r.pos.idx { |
|
124 |
- totalSize += r.pos.offset |
|
125 |
- break |
|
126 |
- } |
|
127 |
- |
|
128 |
- size, err := getReadSeekerSize(rdr) |
|
129 |
- if err != nil { |
|
130 |
- return -1, fmt.Errorf("error getting seeker size: %v", err) |
|
131 |
- } |
|
132 |
- totalSize += size |
|
133 |
- } |
|
134 |
- return totalSize, nil |
|
135 |
-} |
|
136 |
- |
|
137 |
-func (r *multiReadSeeker) getOffsetToReader(rdr io.ReadSeeker) (int64, error) { |
|
138 |
- var offset int64 |
|
139 |
- for _, r := range r.readers { |
|
140 |
- if r == rdr { |
|
141 |
- break |
|
142 |
- } |
|
143 |
- |
|
144 |
- size, err := getReadSeekerSize(rdr) |
|
145 |
- if err != nil { |
|
146 |
- return -1, err |
|
147 |
- } |
|
148 |
- offset += size |
|
149 |
- } |
|
150 |
- return offset, nil |
|
151 |
-} |
|
152 |
- |
|
153 |
-func (r *multiReadSeeker) Read(b []byte) (int, error) { |
|
154 |
- if r.pos == nil { |
|
155 |
- // make sure all readers are at 0 |
|
156 |
- r.Seek(0, os.SEEK_SET) |
|
157 |
- } |
|
158 |
- |
|
159 |
- bLen := int64(len(b)) |
|
160 |
- buf := bytes.NewBuffer(nil) |
|
161 |
- var rdr io.ReadSeeker |
|
162 |
- |
|
163 |
- for _, rdr = range r.readers[r.pos.idx:] { |
|
164 |
- readBytes, err := io.CopyN(buf, rdr, bLen) |
|
165 |
- if err != nil && err != io.EOF { |
|
166 |
- return -1, err |
|
167 |
- } |
|
168 |
- bLen -= readBytes |
|
169 |
- |
|
170 |
- if bLen == 0 { |
|
171 |
- break |
|
172 |
- } |
|
173 |
- } |
|
174 |
- |
|
175 |
- rdrPos, err := rdr.Seek(0, os.SEEK_CUR) |
|
176 |
- if err != nil { |
|
177 |
- return -1, err |
|
178 |
- } |
|
179 |
- r.pos = &pos{r.posIdx[rdr], rdrPos} |
|
180 |
- return buf.Read(b) |
|
181 |
-} |
|
182 |
- |
|
183 |
-func getReadSeekerSize(rdr io.ReadSeeker) (int64, error) { |
|
184 |
- // save the current position |
|
185 |
- pos, err := rdr.Seek(0, os.SEEK_CUR) |
|
186 |
- if err != nil { |
|
187 |
- return -1, err |
|
188 |
- } |
|
189 |
- |
|
190 |
- // get the size |
|
191 |
- size, err := rdr.Seek(0, os.SEEK_END) |
|
192 |
- if err != nil { |
|
193 |
- return -1, err |
|
194 |
- } |
|
195 |
- |
|
196 |
- // reset the position |
|
197 |
- if _, err := rdr.Seek(pos, os.SEEK_SET); err != nil { |
|
198 |
- return -1, err |
|
199 |
- } |
|
200 |
- return size, nil |
|
201 |
-} |
|
202 |
- |
|
203 |
-// MultiReadSeeker returns a ReadSeeker that's the logical concatenation of the provided |
|
204 |
-// input readseekers. After calling this method the initial position is set to the |
|
205 |
-// beginning of the first ReadSeeker. At the end of a ReadSeeker, Read always advances |
|
206 |
-// to the beginning of the next ReadSeeker and returns EOF at the end of the last ReadSeeker. |
|
207 |
-// Seek can be used over the sum of lengths of all readseekers. |
|
208 |
-// |
|
209 |
-// When a MultiReadSeeker is used, no Read and Seek operations should be made on |
|
210 |
-// its ReadSeeker components. Also, users should make no assumption on the state |
|
211 |
-// of individual readseekers while the MultiReadSeeker is used. |
|
212 |
-func MultiReadSeeker(readers ...io.ReadSeeker) io.ReadSeeker { |
|
213 |
- if len(readers) == 1 { |
|
214 |
- return readers[0] |
|
215 |
- } |
|
216 |
- idx := make(map[io.ReadSeeker]int) |
|
217 |
- for i, rdr := range readers { |
|
218 |
- idx[rdr] = i |
|
219 |
- } |
|
220 |
- return &multiReadSeeker{ |
|
221 |
- readers: readers, |
|
222 |
- posIdx: idx, |
|
223 |
- } |
|
224 |
-} |
225 | 1 |
deleted file mode 100644 |
... | ... |
@@ -1,225 +0,0 @@ |
1 |
-package ioutils |
|
2 |
- |
|
3 |
-import ( |
|
4 |
- "bytes" |
|
5 |
- "encoding/binary" |
|
6 |
- "fmt" |
|
7 |
- "io" |
|
8 |
- "io/ioutil" |
|
9 |
- "os" |
|
10 |
- "strings" |
|
11 |
- "testing" |
|
12 |
-) |
|
13 |
- |
|
14 |
-func TestMultiReadSeekerReadAll(t *testing.T) { |
|
15 |
- str := "hello world" |
|
16 |
- s1 := strings.NewReader(str + " 1") |
|
17 |
- s2 := strings.NewReader(str + " 2") |
|
18 |
- s3 := strings.NewReader(str + " 3") |
|
19 |
- mr := MultiReadSeeker(s1, s2, s3) |
|
20 |
- |
|
21 |
- expectedSize := int64(s1.Len() + s2.Len() + s3.Len()) |
|
22 |
- |
|
23 |
- b, err := ioutil.ReadAll(mr) |
|
24 |
- if err != nil { |
|
25 |
- t.Fatal(err) |
|
26 |
- } |
|
27 |
- |
|
28 |
- expected := "hello world 1hello world 2hello world 3" |
|
29 |
- if string(b) != expected { |
|
30 |
- t.Fatalf("ReadAll failed, got: %q, expected %q", string(b), expected) |
|
31 |
- } |
|
32 |
- |
|
33 |
- size, err := mr.Seek(0, os.SEEK_END) |
|
34 |
- if err != nil { |
|
35 |
- t.Fatal(err) |
|
36 |
- } |
|
37 |
- if size != expectedSize { |
|
38 |
- t.Fatalf("reader size does not match, got %d, expected %d", size, expectedSize) |
|
39 |
- } |
|
40 |
- |
|
41 |
- // Reset the position and read again |
|
42 |
- pos, err := mr.Seek(0, os.SEEK_SET) |
|
43 |
- if err != nil { |
|
44 |
- t.Fatal(err) |
|
45 |
- } |
|
46 |
- if pos != 0 { |
|
47 |
- t.Fatalf("expected position to be set to 0, got %d", pos) |
|
48 |
- } |
|
49 |
- |
|
50 |
- b, err = ioutil.ReadAll(mr) |
|
51 |
- if err != nil { |
|
52 |
- t.Fatal(err) |
|
53 |
- } |
|
54 |
- |
|
55 |
- if string(b) != expected { |
|
56 |
- t.Fatalf("ReadAll failed, got: %q, expected %q", string(b), expected) |
|
57 |
- } |
|
58 |
- |
|
59 |
- // The positions of some readers are not 0 |
|
60 |
- s1.Seek(0, os.SEEK_SET) |
|
61 |
- s2.Seek(0, os.SEEK_END) |
|
62 |
- s3.Seek(0, os.SEEK_SET) |
|
63 |
- mr = MultiReadSeeker(s1, s2, s3) |
|
64 |
- b, err = ioutil.ReadAll(mr) |
|
65 |
- if err != nil { |
|
66 |
- t.Fatal(err) |
|
67 |
- } |
|
68 |
- |
|
69 |
- if string(b) != expected { |
|
70 |
- t.Fatalf("ReadAll failed, got: %q, expected %q", string(b), expected) |
|
71 |
- } |
|
72 |
-} |
|
73 |
- |
|
74 |
-func TestMultiReadSeekerReadEach(t *testing.T) { |
|
75 |
- str := "hello world" |
|
76 |
- s1 := strings.NewReader(str + " 1") |
|
77 |
- s2 := strings.NewReader(str + " 2") |
|
78 |
- s3 := strings.NewReader(str + " 3") |
|
79 |
- mr := MultiReadSeeker(s1, s2, s3) |
|
80 |
- |
|
81 |
- var totalBytes int64 |
|
82 |
- for i, s := range []*strings.Reader{s1, s2, s3} { |
|
83 |
- sLen := int64(s.Len()) |
|
84 |
- buf := make([]byte, s.Len()) |
|
85 |
- expected := []byte(fmt.Sprintf("%s %d", str, i+1)) |
|
86 |
- |
|
87 |
- if _, err := mr.Read(buf); err != nil && err != io.EOF { |
|
88 |
- t.Fatal(err) |
|
89 |
- } |
|
90 |
- |
|
91 |
- if !bytes.Equal(buf, expected) { |
|
92 |
- t.Fatalf("expected %q to be %q", string(buf), string(expected)) |
|
93 |
- } |
|
94 |
- |
|
95 |
- pos, err := mr.Seek(0, os.SEEK_CUR) |
|
96 |
- if err != nil { |
|
97 |
- t.Fatalf("iteration: %d, error: %v", i+1, err) |
|
98 |
- } |
|
99 |
- |
|
100 |
- // check that the total bytes read is the current position of the seeker |
|
101 |
- totalBytes += sLen |
|
102 |
- if pos != totalBytes { |
|
103 |
- t.Fatalf("expected current position to be: %d, got: %d, iteration: %d", totalBytes, pos, i+1) |
|
104 |
- } |
|
105 |
- |
|
106 |
- // This tests not only that SEEK_SET and SEEK_CUR give the same values, but that the next iteration is in the expected position as well |
|
107 |
- newPos, err := mr.Seek(pos, os.SEEK_SET) |
|
108 |
- if err != nil { |
|
109 |
- t.Fatal(err) |
|
110 |
- } |
|
111 |
- if newPos != pos { |
|
112 |
- t.Fatalf("expected to get same position when calling SEEK_SET with value from SEEK_CUR, cur: %d, set: %d", pos, newPos) |
|
113 |
- } |
|
114 |
- } |
|
115 |
-} |
|
116 |
- |
|
117 |
-func TestMultiReadSeekerReadSpanningChunks(t *testing.T) { |
|
118 |
- str := "hello world" |
|
119 |
- s1 := strings.NewReader(str + " 1") |
|
120 |
- s2 := strings.NewReader(str + " 2") |
|
121 |
- s3 := strings.NewReader(str + " 3") |
|
122 |
- mr := MultiReadSeeker(s1, s2, s3) |
|
123 |
- |
|
124 |
- buf := make([]byte, s1.Len()+3) |
|
125 |
- _, err := mr.Read(buf) |
|
126 |
- if err != nil { |
|
127 |
- t.Fatal(err) |
|
128 |
- } |
|
129 |
- |
|
130 |
- // expected is the contents of s1 + 3 bytes from s2, ie, the `hel` at the end of this string |
|
131 |
- expected := "hello world 1hel" |
|
132 |
- if string(buf) != expected { |
|
133 |
- t.Fatalf("expected %s to be %s", string(buf), expected) |
|
134 |
- } |
|
135 |
-} |
|
136 |
- |
|
137 |
-func TestMultiReadSeekerNegativeSeek(t *testing.T) { |
|
138 |
- str := "hello world" |
|
139 |
- s1 := strings.NewReader(str + " 1") |
|
140 |
- s2 := strings.NewReader(str + " 2") |
|
141 |
- s3 := strings.NewReader(str + " 3") |
|
142 |
- mr := MultiReadSeeker(s1, s2, s3) |
|
143 |
- |
|
144 |
- s1Len := s1.Len() |
|
145 |
- s2Len := s2.Len() |
|
146 |
- s3Len := s3.Len() |
|
147 |
- |
|
148 |
- s, err := mr.Seek(int64(-1*s3.Len()), os.SEEK_END) |
|
149 |
- if err != nil { |
|
150 |
- t.Fatal(err) |
|
151 |
- } |
|
152 |
- if s != int64(s1Len+s2Len) { |
|
153 |
- t.Fatalf("expected %d to be %d", s, s1.Len()+s2.Len()) |
|
154 |
- } |
|
155 |
- |
|
156 |
- buf := make([]byte, s3Len) |
|
157 |
- if _, err := mr.Read(buf); err != nil && err != io.EOF { |
|
158 |
- t.Fatal(err) |
|
159 |
- } |
|
160 |
- expected := fmt.Sprintf("%s %d", str, 3) |
|
161 |
- if string(buf) != fmt.Sprintf("%s %d", str, 3) { |
|
162 |
- t.Fatalf("expected %q to be %q", string(buf), expected) |
|
163 |
- } |
|
164 |
-} |
|
165 |
- |
|
166 |
-func TestMultiReadSeekerCurAfterSet(t *testing.T) { |
|
167 |
- str := "hello world" |
|
168 |
- s1 := strings.NewReader(str + " 1") |
|
169 |
- s2 := strings.NewReader(str + " 2") |
|
170 |
- s3 := strings.NewReader(str + " 3") |
|
171 |
- mr := MultiReadSeeker(s1, s2, s3) |
|
172 |
- |
|
173 |
- mid := int64(s1.Len() + s2.Len()/2) |
|
174 |
- |
|
175 |
- size, err := mr.Seek(mid, os.SEEK_SET) |
|
176 |
- if err != nil { |
|
177 |
- t.Fatal(err) |
|
178 |
- } |
|
179 |
- if size != mid { |
|
180 |
- t.Fatalf("reader size does not match, got %d, expected %d", size, mid) |
|
181 |
- } |
|
182 |
- |
|
183 |
- size, err = mr.Seek(3, os.SEEK_CUR) |
|
184 |
- if err != nil { |
|
185 |
- t.Fatal(err) |
|
186 |
- } |
|
187 |
- if size != mid+3 { |
|
188 |
- t.Fatalf("reader size does not match, got %d, expected %d", size, mid+3) |
|
189 |
- } |
|
190 |
- size, err = mr.Seek(5, os.SEEK_CUR) |
|
191 |
- if err != nil { |
|
192 |
- t.Fatal(err) |
|
193 |
- } |
|
194 |
- if size != mid+8 { |
|
195 |
- t.Fatalf("reader size does not match, got %d, expected %d", size, mid+8) |
|
196 |
- } |
|
197 |
- |
|
198 |
- size, err = mr.Seek(10, os.SEEK_CUR) |
|
199 |
- if err != nil { |
|
200 |
- t.Fatal(err) |
|
201 |
- } |
|
202 |
- if size != mid+18 { |
|
203 |
- t.Fatalf("reader size does not match, got %d, expected %d", size, mid+18) |
|
204 |
- } |
|
205 |
-} |
|
206 |
- |
|
207 |
-func TestMultiReadSeekerSmallReads(t *testing.T) { |
|
208 |
- readers := []io.ReadSeeker{} |
|
209 |
- for i := 0; i < 10; i++ { |
|
210 |
- integer := make([]byte, 4) |
|
211 |
- binary.BigEndian.PutUint32(integer, uint32(i)) |
|
212 |
- readers = append(readers, bytes.NewReader(integer)) |
|
213 |
- } |
|
214 |
- |
|
215 |
- reader := MultiReadSeeker(readers...) |
|
216 |
- for i := 0; i < 10; i++ { |
|
217 |
- var integer uint32 |
|
218 |
- if err := binary.Read(reader, binary.BigEndian, &integer); err != nil { |
|
219 |
- t.Fatalf("Read from NewMultiReadSeeker failed: %v", err) |
|
220 |
- } |
|
221 |
- if uint32(i) != integer { |
|
222 |
- t.Fatalf("Read wrong value from NewMultiReadSeeker: %d != %d", i, integer) |
|
223 |
- } |
|
224 |
- } |
|
225 |
-} |