Browse code

Move json log reading into log file object

This allows much of the read logic to be shared for other things,
especially for the new log driver proposed in
https://github.com/moby/moby/issues/33475

The only logic for reads in the json logger is around decoding log
messages, which gets passed into the log file object.

This also helps with implementing compression as it allows us to
simplify locking strategies.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>

Brian Goff authored on 2017/07/19 00:54:03
Showing 11 changed files
... ...
@@ -23,9 +23,9 @@ const Name = "json-file"
23 23
 
24 24
 // JSONFileLogger is Logger implementation for default Docker logging.
25 25
 type JSONFileLogger struct {
26
-	mu      sync.RWMutex
26
+	mu      sync.Mutex
27 27
 	closed  bool
28
-	writer  *loggerutils.RotateFileWriter
28
+	writer  *loggerutils.LogFile
29 29
 	readers map[*logger.LogWatcher]struct{} // stores the active log followers
30 30
 }
31 31
 
... ...
@@ -83,7 +83,8 @@ func New(info logger.Info) (logger.Logger, error) {
83 83
 		buf.Reset()
84 84
 		return b, nil
85 85
 	}
86
-	writer, err := loggerutils.NewRotateFileWriter(info.LogPath, capval, maxFiles, marshalFunc)
86
+
87
+	writer, err := loggerutils.NewLogFile(info.LogPath, capval, maxFiles, marshalFunc, decodeFunc)
87 88
 	if err != nil {
88 89
 		return nil, err
89 90
 	}
... ...
@@ -82,7 +82,7 @@ func BenchmarkJSONFileLoggerLog(b *testing.B) {
82 82
 	}
83 83
 
84 84
 	buf := bytes.NewBuffer(nil)
85
-	require.NoError(b, marshalMessage(msg, jsonlogger.(*JSONFileLogger).extra, buf))
85
+	require.NoError(b, marshalMessage(msg, nil, buf))
86 86
 	b.SetBytes(int64(buf.Len()))
87 87
 
88 88
 	b.ResetTimer()
89 89
deleted file mode 100644
... ...
@@ -1,228 +0,0 @@
1
-package multireader
2
-
3
-import (
4
-	"bytes"
5
-	"fmt"
6
-	"io"
7
-	"os"
8
-)
9
-
10
-type pos struct {
11
-	idx    int
12
-	offset int64
13
-}
14
-
15
-type multiReadSeeker struct {
16
-	readers []io.ReadSeeker
17
-	pos     *pos
18
-	posIdx  map[io.ReadSeeker]int
19
-}
20
-
21
-func (r *multiReadSeeker) Seek(offset int64, whence int) (int64, error) {
22
-	var tmpOffset int64
23
-	switch whence {
24
-	case os.SEEK_SET:
25
-		for i, rdr := range r.readers {
26
-			// get size of the current reader
27
-			s, err := rdr.Seek(0, os.SEEK_END)
28
-			if err != nil {
29
-				return -1, err
30
-			}
31
-
32
-			if offset > tmpOffset+s {
33
-				if i == len(r.readers)-1 {
34
-					rdrOffset := s + (offset - tmpOffset)
35
-					if _, err := rdr.Seek(rdrOffset, os.SEEK_SET); err != nil {
36
-						return -1, err
37
-					}
38
-					r.pos = &pos{i, rdrOffset}
39
-					return offset, nil
40
-				}
41
-
42
-				tmpOffset += s
43
-				continue
44
-			}
45
-
46
-			rdrOffset := offset - tmpOffset
47
-			idx := i
48
-
49
-			if _, err := rdr.Seek(rdrOffset, os.SEEK_SET); err != nil {
50
-				return -1, err
51
-			}
52
-			// make sure all following readers are at 0
53
-			for _, rdr := range r.readers[i+1:] {
54
-				rdr.Seek(0, os.SEEK_SET)
55
-			}
56
-
57
-			if rdrOffset == s && i != len(r.readers)-1 {
58
-				idx++
59
-				rdrOffset = 0
60
-			}
61
-			r.pos = &pos{idx, rdrOffset}
62
-			return offset, nil
63
-		}
64
-	case os.SEEK_END:
65
-		for _, rdr := range r.readers {
66
-			s, err := rdr.Seek(0, os.SEEK_END)
67
-			if err != nil {
68
-				return -1, err
69
-			}
70
-			tmpOffset += s
71
-		}
72
-		if _, err := r.Seek(tmpOffset+offset, os.SEEK_SET); err != nil {
73
-			return -1, err
74
-		}
75
-		return tmpOffset + offset, nil
76
-	case os.SEEK_CUR:
77
-		if r.pos == nil {
78
-			return r.Seek(offset, os.SEEK_SET)
79
-		}
80
-		// Just return the current offset
81
-		if offset == 0 {
82
-			return r.getCurOffset()
83
-		}
84
-
85
-		curOffset, err := r.getCurOffset()
86
-		if err != nil {
87
-			return -1, err
88
-		}
89
-		rdr, rdrOffset, err := r.getReaderForOffset(curOffset + offset)
90
-		if err != nil {
91
-			return -1, err
92
-		}
93
-
94
-		r.pos = &pos{r.posIdx[rdr], rdrOffset}
95
-		return curOffset + offset, nil
96
-	default:
97
-		return -1, fmt.Errorf("Invalid whence: %d", whence)
98
-	}
99
-
100
-	return -1, fmt.Errorf("Error seeking for whence: %d, offset: %d", whence, offset)
101
-}
102
-
103
-func (r *multiReadSeeker) getReaderForOffset(offset int64) (io.ReadSeeker, int64, error) {
104
-
105
-	var offsetTo int64
106
-
107
-	for _, rdr := range r.readers {
108
-		size, err := getReadSeekerSize(rdr)
109
-		if err != nil {
110
-			return nil, -1, err
111
-		}
112
-		if offsetTo+size > offset {
113
-			return rdr, offset - offsetTo, nil
114
-		}
115
-		if rdr == r.readers[len(r.readers)-1] {
116
-			return rdr, offsetTo + offset, nil
117
-		}
118
-		offsetTo += size
119
-	}
120
-
121
-	return nil, 0, nil
122
-}
123
-
124
-func (r *multiReadSeeker) getCurOffset() (int64, error) {
125
-	var totalSize int64
126
-	for _, rdr := range r.readers[:r.pos.idx+1] {
127
-		if r.posIdx[rdr] == r.pos.idx {
128
-			totalSize += r.pos.offset
129
-			break
130
-		}
131
-
132
-		size, err := getReadSeekerSize(rdr)
133
-		if err != nil {
134
-			return -1, fmt.Errorf("error getting seeker size: %v", err)
135
-		}
136
-		totalSize += size
137
-	}
138
-	return totalSize, nil
139
-}
140
-
141
-func (r *multiReadSeeker) getOffsetToReader(rdr io.ReadSeeker) (int64, error) {
142
-	var offset int64
143
-	for _, r := range r.readers {
144
-		if r == rdr {
145
-			break
146
-		}
147
-
148
-		size, err := getReadSeekerSize(rdr)
149
-		if err != nil {
150
-			return -1, err
151
-		}
152
-		offset += size
153
-	}
154
-	return offset, nil
155
-}
156
-
157
-func (r *multiReadSeeker) Read(b []byte) (int, error) {
158
-	if r.pos == nil {
159
-		// make sure all readers are at 0
160
-		r.Seek(0, os.SEEK_SET)
161
-	}
162
-
163
-	bLen := int64(len(b))
164
-	buf := bytes.NewBuffer(nil)
165
-	var rdr io.ReadSeeker
166
-
167
-	for _, rdr = range r.readers[r.pos.idx:] {
168
-		readBytes, err := io.CopyN(buf, rdr, bLen)
169
-		if err != nil && err != io.EOF {
170
-			return -1, err
171
-		}
172
-		bLen -= readBytes
173
-
174
-		if bLen == 0 {
175
-			break
176
-		}
177
-	}
178
-
179
-	rdrPos, err := rdr.Seek(0, os.SEEK_CUR)
180
-	if err != nil {
181
-		return -1, err
182
-	}
183
-	r.pos = &pos{r.posIdx[rdr], rdrPos}
184
-	return buf.Read(b)
185
-}
186
-
187
-func getReadSeekerSize(rdr io.ReadSeeker) (int64, error) {
188
-	// save the current position
189
-	pos, err := rdr.Seek(0, os.SEEK_CUR)
190
-	if err != nil {
191
-		return -1, err
192
-	}
193
-
194
-	// get the size
195
-	size, err := rdr.Seek(0, os.SEEK_END)
196
-	if err != nil {
197
-		return -1, err
198
-	}
199
-
200
-	// reset the position
201
-	if _, err := rdr.Seek(pos, os.SEEK_SET); err != nil {
202
-		return -1, err
203
-	}
204
-	return size, nil
205
-}
206
-
207
-// MultiReadSeeker returns a ReadSeeker that's the logical concatenation of the provided
208
-// input readseekers. After calling this method the initial position is set to the
209
-// beginning of the first ReadSeeker. At the end of a ReadSeeker, Read always advances
210
-// to the beginning of the next ReadSeeker and returns EOF at the end of the last ReadSeeker.
211
-// Seek can be used over the sum of lengths of all readseekers.
212
-//
213
-// When a MultiReadSeeker is used, no Read and Seek operations should be made on
214
-// its ReadSeeker components. Also, users should make no assumption on the state
215
-// of individual readseekers while the MultiReadSeeker is used.
216
-func MultiReadSeeker(readers ...io.ReadSeeker) io.ReadSeeker {
217
-	if len(readers) == 1 {
218
-		return readers[0]
219
-	}
220
-	idx := make(map[io.ReadSeeker]int)
221
-	for i, rdr := range readers {
222
-		idx[rdr] = i
223
-	}
224
-	return &multiReadSeeker{
225
-		readers: readers,
226
-		posIdx:  idx,
227
-	}
228
-}
229 1
deleted file mode 100644
... ...
@@ -1,225 +0,0 @@
1
-package multireader
2
-
3
-import (
4
-	"bytes"
5
-	"encoding/binary"
6
-	"fmt"
7
-	"io"
8
-	"io/ioutil"
9
-	"os"
10
-	"strings"
11
-	"testing"
12
-)
13
-
14
-func TestMultiReadSeekerReadAll(t *testing.T) {
15
-	str := "hello world"
16
-	s1 := strings.NewReader(str + " 1")
17
-	s2 := strings.NewReader(str + " 2")
18
-	s3 := strings.NewReader(str + " 3")
19
-	mr := MultiReadSeeker(s1, s2, s3)
20
-
21
-	expectedSize := int64(s1.Len() + s2.Len() + s3.Len())
22
-
23
-	b, err := ioutil.ReadAll(mr)
24
-	if err != nil {
25
-		t.Fatal(err)
26
-	}
27
-
28
-	expected := "hello world 1hello world 2hello world 3"
29
-	if string(b) != expected {
30
-		t.Fatalf("ReadAll failed, got: %q, expected %q", string(b), expected)
31
-	}
32
-
33
-	size, err := mr.Seek(0, os.SEEK_END)
34
-	if err != nil {
35
-		t.Fatal(err)
36
-	}
37
-	if size != expectedSize {
38
-		t.Fatalf("reader size does not match, got %d, expected %d", size, expectedSize)
39
-	}
40
-
41
-	// Reset the position and read again
42
-	pos, err := mr.Seek(0, os.SEEK_SET)
43
-	if err != nil {
44
-		t.Fatal(err)
45
-	}
46
-	if pos != 0 {
47
-		t.Fatalf("expected position to be set to 0, got %d", pos)
48
-	}
49
-
50
-	b, err = ioutil.ReadAll(mr)
51
-	if err != nil {
52
-		t.Fatal(err)
53
-	}
54
-
55
-	if string(b) != expected {
56
-		t.Fatalf("ReadAll failed, got: %q, expected %q", string(b), expected)
57
-	}
58
-
59
-	// The positions of some readers are not 0
60
-	s1.Seek(0, os.SEEK_SET)
61
-	s2.Seek(0, os.SEEK_END)
62
-	s3.Seek(0, os.SEEK_SET)
63
-	mr = MultiReadSeeker(s1, s2, s3)
64
-	b, err = ioutil.ReadAll(mr)
65
-	if err != nil {
66
-		t.Fatal(err)
67
-	}
68
-
69
-	if string(b) != expected {
70
-		t.Fatalf("ReadAll failed, got: %q, expected %q", string(b), expected)
71
-	}
72
-}
73
-
74
-func TestMultiReadSeekerReadEach(t *testing.T) {
75
-	str := "hello world"
76
-	s1 := strings.NewReader(str + " 1")
77
-	s2 := strings.NewReader(str + " 2")
78
-	s3 := strings.NewReader(str + " 3")
79
-	mr := MultiReadSeeker(s1, s2, s3)
80
-
81
-	var totalBytes int64
82
-	for i, s := range []*strings.Reader{s1, s2, s3} {
83
-		sLen := int64(s.Len())
84
-		buf := make([]byte, s.Len())
85
-		expected := []byte(fmt.Sprintf("%s %d", str, i+1))
86
-
87
-		if _, err := mr.Read(buf); err != nil && err != io.EOF {
88
-			t.Fatal(err)
89
-		}
90
-
91
-		if !bytes.Equal(buf, expected) {
92
-			t.Fatalf("expected %q to be %q", string(buf), string(expected))
93
-		}
94
-
95
-		pos, err := mr.Seek(0, os.SEEK_CUR)
96
-		if err != nil {
97
-			t.Fatalf("iteration: %d, error: %v", i+1, err)
98
-		}
99
-
100
-		// check that the total bytes read is the current position of the seeker
101
-		totalBytes += sLen
102
-		if pos != totalBytes {
103
-			t.Fatalf("expected current position to be: %d, got: %d, iteration: %d", totalBytes, pos, i+1)
104
-		}
105
-
106
-		// This tests not only that SEEK_SET and SEEK_CUR give the same values, but that the next iteration is in the expected position as well
107
-		newPos, err := mr.Seek(pos, os.SEEK_SET)
108
-		if err != nil {
109
-			t.Fatal(err)
110
-		}
111
-		if newPos != pos {
112
-			t.Fatalf("expected to get same position when calling SEEK_SET with value from SEEK_CUR, cur: %d, set: %d", pos, newPos)
113
-		}
114
-	}
115
-}
116
-
117
-func TestMultiReadSeekerReadSpanningChunks(t *testing.T) {
118
-	str := "hello world"
119
-	s1 := strings.NewReader(str + " 1")
120
-	s2 := strings.NewReader(str + " 2")
121
-	s3 := strings.NewReader(str + " 3")
122
-	mr := MultiReadSeeker(s1, s2, s3)
123
-
124
-	buf := make([]byte, s1.Len()+3)
125
-	_, err := mr.Read(buf)
126
-	if err != nil {
127
-		t.Fatal(err)
128
-	}
129
-
130
-	// expected is the contents of s1 + 3 bytes from s2, ie, the `hel` at the end of this string
131
-	expected := "hello world 1hel"
132
-	if string(buf) != expected {
133
-		t.Fatalf("expected %s to be %s", string(buf), expected)
134
-	}
135
-}
136
-
137
-func TestMultiReadSeekerNegativeSeek(t *testing.T) {
138
-	str := "hello world"
139
-	s1 := strings.NewReader(str + " 1")
140
-	s2 := strings.NewReader(str + " 2")
141
-	s3 := strings.NewReader(str + " 3")
142
-	mr := MultiReadSeeker(s1, s2, s3)
143
-
144
-	s1Len := s1.Len()
145
-	s2Len := s2.Len()
146
-	s3Len := s3.Len()
147
-
148
-	s, err := mr.Seek(int64(-1*s3.Len()), os.SEEK_END)
149
-	if err != nil {
150
-		t.Fatal(err)
151
-	}
152
-	if s != int64(s1Len+s2Len) {
153
-		t.Fatalf("expected %d to be %d", s, s1.Len()+s2.Len())
154
-	}
155
-
156
-	buf := make([]byte, s3Len)
157
-	if _, err := mr.Read(buf); err != nil && err != io.EOF {
158
-		t.Fatal(err)
159
-	}
160
-	expected := fmt.Sprintf("%s %d", str, 3)
161
-	if string(buf) != fmt.Sprintf("%s %d", str, 3) {
162
-		t.Fatalf("expected %q to be %q", string(buf), expected)
163
-	}
164
-}
165
-
166
-func TestMultiReadSeekerCurAfterSet(t *testing.T) {
167
-	str := "hello world"
168
-	s1 := strings.NewReader(str + " 1")
169
-	s2 := strings.NewReader(str + " 2")
170
-	s3 := strings.NewReader(str + " 3")
171
-	mr := MultiReadSeeker(s1, s2, s3)
172
-
173
-	mid := int64(s1.Len() + s2.Len()/2)
174
-
175
-	size, err := mr.Seek(mid, os.SEEK_SET)
176
-	if err != nil {
177
-		t.Fatal(err)
178
-	}
179
-	if size != mid {
180
-		t.Fatalf("reader size does not match, got %d, expected %d", size, mid)
181
-	}
182
-
183
-	size, err = mr.Seek(3, os.SEEK_CUR)
184
-	if err != nil {
185
-		t.Fatal(err)
186
-	}
187
-	if size != mid+3 {
188
-		t.Fatalf("reader size does not match, got %d, expected %d", size, mid+3)
189
-	}
190
-	size, err = mr.Seek(5, os.SEEK_CUR)
191
-	if err != nil {
192
-		t.Fatal(err)
193
-	}
194
-	if size != mid+8 {
195
-		t.Fatalf("reader size does not match, got %d, expected %d", size, mid+8)
196
-	}
197
-
198
-	size, err = mr.Seek(10, os.SEEK_CUR)
199
-	if err != nil {
200
-		t.Fatal(err)
201
-	}
202
-	if size != mid+18 {
203
-		t.Fatalf("reader size does not match, got %d, expected %d", size, mid+18)
204
-	}
205
-}
206
-
207
-func TestMultiReadSeekerSmallReads(t *testing.T) {
208
-	readers := []io.ReadSeeker{}
209
-	for i := 0; i < 10; i++ {
210
-		integer := make([]byte, 4)
211
-		binary.BigEndian.PutUint32(integer, uint32(i))
212
-		readers = append(readers, bytes.NewReader(integer))
213
-	}
214
-
215
-	reader := MultiReadSeeker(readers...)
216
-	for i := 0; i < 10; i++ {
217
-		var integer uint32
218
-		if err := binary.Read(reader, binary.BigEndian, &integer); err != nil {
219
-			t.Fatalf("Read from NewMultiReadSeeker failed: %v", err)
220
-		}
221
-		if uint32(i) != integer {
222
-			t.Fatalf("Read wrong value from NewMultiReadSeeker: %d != %d", i, integer)
223
-		}
224
-	}
225
-}
... ...
@@ -1,49 +1,16 @@
1 1
 package jsonfilelog
2 2
 
3 3
 import (
4
-	"bytes"
5 4
 	"encoding/json"
6
-	"fmt"
7 5
 	"io"
8
-	"os"
9
-	"time"
10
-
11
-	"github.com/fsnotify/fsnotify"
12
-	"golang.org/x/net/context"
13 6
 
14 7
 	"github.com/docker/docker/api/types/backend"
15 8
 	"github.com/docker/docker/daemon/logger"
16 9
 	"github.com/docker/docker/daemon/logger/jsonfilelog/jsonlog"
17
-	"github.com/docker/docker/daemon/logger/jsonfilelog/multireader"
18
-	"github.com/docker/docker/pkg/filenotify"
19
-	"github.com/docker/docker/pkg/tailfile"
20
-	"github.com/pkg/errors"
21
-	"github.com/sirupsen/logrus"
22 10
 )
23 11
 
24 12
 const maxJSONDecodeRetry = 20000
25 13
 
26
-func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
27
-	l.Reset()
28
-	if err := dec.Decode(l); err != nil {
29
-		return nil, err
30
-	}
31
-	var attrs []backend.LogAttr
32
-	if len(l.Attrs) != 0 {
33
-		attrs = make([]backend.LogAttr, 0, len(l.Attrs))
34
-		for k, v := range l.Attrs {
35
-			attrs = append(attrs, backend.LogAttr{Key: k, Value: v})
36
-		}
37
-	}
38
-	msg := &logger.Message{
39
-		Source:    l.Stream,
40
-		Timestamp: l.Created,
41
-		Line:      []byte(l.Log),
42
-		Attrs:     attrs,
43
-	}
44
-	return msg, nil
45
-}
46
-
47 14
 // ReadLogs implements the logger's LogReader interface for the logs
48 15
 // created by this driver.
49 16
 func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
... ...
@@ -53,309 +20,70 @@ func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
53 53
 	return logWatcher
54 54
 }
55 55
 
56
-func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
57
-	defer close(logWatcher.Msg)
58
-
59
-	// lock so the read stream doesn't get corrupted due to rotations or other log data written while we open these files
60
-	// This will block writes!!!
61
-	l.mu.RLock()
62
-
63
-	// TODO it would be nice to move a lot of this reader implementation to the rotate logger object
64
-	pth := l.writer.LogPath()
65
-	var files []io.ReadSeeker
66
-	for i := l.writer.MaxFiles(); i > 1; i-- {
67
-		f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1))
68
-		if err != nil {
69
-			if !os.IsNotExist(err) {
70
-				logWatcher.Err <- err
71
-				l.mu.RUnlock()
72
-				return
73
-			}
74
-			continue
75
-		}
76
-		defer f.Close()
77
-		files = append(files, f)
78
-	}
79
-
80
-	latestFile, err := os.Open(pth)
81
-	if err != nil {
82
-		logWatcher.Err <- errors.Wrap(err, "error opening latest log file")
83
-		l.mu.RUnlock()
84
-		return
85
-	}
86
-	defer latestFile.Close()
87
-
88
-	latestChunk, err := newSectionReader(latestFile)
89
-
90
-	// Now we have the reader sectioned, all fd's opened, we can unlock.
91
-	// New writes/rotates will not affect seeking through these files
92
-	l.mu.RUnlock()
93
-
94
-	if err != nil {
95
-		logWatcher.Err <- err
96
-		return
97
-	}
98
-
99
-	if config.Tail != 0 {
100
-		tailer := multireader.MultiReadSeeker(append(files, latestChunk)...)
101
-		tailFile(tailer, logWatcher, config.Tail, config.Since, config.Until)
102
-	}
103
-
104
-	// close all the rotated files
105
-	for _, f := range files {
106
-		if err := f.(io.Closer).Close(); err != nil {
107
-			logrus.WithField("logger", "json-file").Warnf("error closing tailed log file: %v", err)
108
-		}
109
-	}
110
-
111
-	if !config.Follow || l.closed {
112
-		return
113
-	}
114
-
115
-	notifyRotate := l.writer.NotifyRotate()
116
-	defer l.writer.NotifyRotateEvict(notifyRotate)
56
+func (l *JSONFileLogger) readLogs(watcher *logger.LogWatcher, config logger.ReadConfig) {
57
+	defer close(watcher.Msg)
117 58
 
118 59
 	l.mu.Lock()
119
-	l.readers[logWatcher] = struct{}{}
60
+	l.readers[watcher] = struct{}{}
120 61
 	l.mu.Unlock()
121 62
 
122
-	followLogs(latestFile, logWatcher, notifyRotate, config)
63
+	l.writer.ReadLogs(config, watcher)
123 64
 
124 65
 	l.mu.Lock()
125
-	delete(l.readers, logWatcher)
66
+	delete(l.readers, watcher)
126 67
 	l.mu.Unlock()
127 68
 }
128 69
 
129
-func newSectionReader(f *os.File) (*io.SectionReader, error) {
130
-	// seek to the end to get the size
131
-	// we'll leave this at the end of the file since section reader does not advance the reader
132
-	size, err := f.Seek(0, os.SEEK_END)
133
-	if err != nil {
134
-		return nil, errors.Wrap(err, "error getting current file size")
135
-	}
136
-	return io.NewSectionReader(f, 0, size), nil
137
-}
138
-
139
-func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since, until time.Time) {
140
-	rdr := io.Reader(f)
141
-	if tail > 0 {
142
-		ls, err := tailfile.TailFile(f, tail)
143
-		if err != nil {
144
-			logWatcher.Err <- err
145
-			return
146
-		}
147
-		rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
148
-	}
149
-	dec := json.NewDecoder(rdr)
150
-	for {
151
-		msg, err := decodeLogLine(dec, &jsonlog.JSONLog{})
152
-		if err != nil {
153
-			if err != io.EOF {
154
-				logWatcher.Err <- err
155
-			}
156
-			return
157
-		}
158
-		if !since.IsZero() && msg.Timestamp.Before(since) {
159
-			continue
160
-		}
161
-		if !until.IsZero() && msg.Timestamp.After(until) {
162
-			return
163
-		}
164
-		select {
165
-		case <-logWatcher.WatchClose():
166
-			return
167
-		case logWatcher.Msg <- msg:
168
-		}
169
-	}
170
-}
171
-
172
-func watchFile(name string) (filenotify.FileWatcher, error) {
173
-	fileWatcher, err := filenotify.New()
174
-	if err != nil {
70
+func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
71
+	l.Reset()
72
+	if err := dec.Decode(l); err != nil {
175 73
 		return nil, err
176 74
 	}
177 75
 
178
-	if err := fileWatcher.Add(name); err != nil {
179
-		logrus.WithField("logger", "json-file").Warnf("falling back to file poller due to error: %v", err)
180
-		fileWatcher.Close()
181
-		fileWatcher = filenotify.NewPollingWatcher()
182
-
183
-		if err := fileWatcher.Add(name); err != nil {
184
-			fileWatcher.Close()
185
-			logrus.Debugf("error watching log file for modifications: %v", err)
186
-			return nil, err
76
+	var attrs []backend.LogAttr
77
+	if len(l.Attrs) != 0 {
78
+		attrs = make([]backend.LogAttr, 0, len(l.Attrs))
79
+		for k, v := range l.Attrs {
80
+			attrs = append(attrs, backend.LogAttr{Key: k, Value: v})
187 81
 		}
188 82
 	}
189
-	return fileWatcher, nil
83
+	msg := &logger.Message{
84
+		Source:    l.Stream,
85
+		Timestamp: l.Created,
86
+		Line:      []byte(l.Log),
87
+		Attrs:     attrs,
88
+	}
89
+	return msg, nil
190 90
 }
191 91
 
192
-func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, config logger.ReadConfig) {
193
-	dec := json.NewDecoder(f)
92
+// decodeFunc is used to create a decoder for the log file reader
93
+func decodeFunc(rdr io.Reader) func() (*logger.Message, error) {
194 94
 	l := &jsonlog.JSONLog{}
195
-
196
-	name := f.Name()
197
-	fileWatcher, err := watchFile(name)
198
-	if err != nil {
199
-		logWatcher.Err <- err
200
-		return
201
-	}
202
-	defer func() {
203
-		f.Close()
204
-		fileWatcher.Remove(name)
205
-		fileWatcher.Close()
206
-	}()
207
-
208
-	ctx, cancel := context.WithCancel(context.Background())
209
-	defer cancel()
210
-	go func() {
211
-		select {
212
-		case <-logWatcher.WatchClose():
213
-			fileWatcher.Remove(name)
214
-			cancel()
215
-		case <-ctx.Done():
216
-			return
217
-		}
218
-	}()
219
-
220
-	var retries int
221
-	handleRotate := func() error {
222
-		f.Close()
223
-		fileWatcher.Remove(name)
224
-
225
-		// retry when the file doesn't exist
226
-		for retries := 0; retries <= 5; retries++ {
227
-			f, err = os.Open(name)
228
-			if err == nil || !os.IsNotExist(err) {
95
+	dec := json.NewDecoder(rdr)
96
+	return func() (msg *logger.Message, err error) {
97
+		for retries := 0; retries < maxJSONDecodeRetry; retries++ {
98
+			msg, err = decodeLogLine(dec, l)
99
+			if err == nil {
229 100
 				break
230 101
 			}
231
-		}
232
-		if err != nil {
233
-			return err
234
-		}
235
-		if err := fileWatcher.Add(name); err != nil {
236
-			return err
237
-		}
238
-		dec = json.NewDecoder(f)
239
-		return nil
240
-	}
241 102
 
242
-	errRetry := errors.New("retry")
243
-	errDone := errors.New("done")
244
-	waitRead := func() error {
245
-		select {
246
-		case e := <-fileWatcher.Events():
247
-			switch e.Op {
248
-			case fsnotify.Write:
249
-				dec = json.NewDecoder(f)
250
-				return nil
251
-			case fsnotify.Rename, fsnotify.Remove:
252
-				select {
253
-				case <-notifyRotate:
254
-				case <-ctx.Done():
255
-					return errDone
256
-				}
257
-				if err := handleRotate(); err != nil {
258
-					return err
259
-				}
260
-				return nil
261
-			}
262
-			return errRetry
263
-		case err := <-fileWatcher.Errors():
264
-			logrus.Debug("logger got error watching file: %v", err)
265
-			// Something happened, let's try and stay alive and create a new watcher
266
-			if retries <= 5 {
267
-				fileWatcher.Close()
268
-				fileWatcher, err = watchFile(name)
269
-				if err != nil {
270
-					return err
271
-				}
103
+			// try again, could be due to a an incomplete json object as we read
104
+			if _, ok := err.(*json.SyntaxError); ok {
105
+				dec = json.NewDecoder(rdr)
272 106
 				retries++
273
-				return errRetry
274
-			}
275
-			return err
276
-		case <-ctx.Done():
277
-			return errDone
278
-		}
279
-	}
280
-
281
-	handleDecodeErr := func(err error) error {
282
-		if err == io.EOF {
283
-			for {
284
-				err := waitRead()
285
-				if err == nil {
286
-					break
287
-				}
288
-				if err == errRetry {
289
-					continue
290
-				}
291
-				return err
292
-			}
293
-			return nil
294
-		}
295
-		// try again because this shouldn't happen
296
-		if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
297
-			dec = json.NewDecoder(f)
298
-			retries++
299
-			return nil
300
-		}
301
-		// io.ErrUnexpectedEOF is returned from json.Decoder when there is
302
-		// remaining data in the parser's buffer while an io.EOF occurs.
303
-		// If the json logger writes a partial json log entry to the disk
304
-		// while at the same time the decoder tries to decode it, the race condition happens.
305
-		if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry {
306
-			reader := io.MultiReader(dec.Buffered(), f)
307
-			dec = json.NewDecoder(reader)
308
-			retries++
309
-			return nil
310
-		}
311
-		return err
312
-	}
313
-
314
-	// main loop
315
-	for {
316
-		msg, err := decodeLogLine(dec, l)
317
-		if err != nil {
318
-			if err := handleDecodeErr(err); err != nil {
319
-				if err == errDone {
320
-					return
321
-				}
322
-				// we got an unrecoverable error, so return
323
-				logWatcher.Err <- err
324
-				return
107
+				continue
325 108
 			}
326
-			// ready to try again
327
-			continue
328
-		}
329
-
330
-		since := config.Since
331
-		until := config.Until
332 109
 
333
-		retries = 0 // reset retries since we've succeeded
334
-		if !since.IsZero() && msg.Timestamp.Before(since) {
335
-			continue
336
-		}
337
-		if !until.IsZero() && msg.Timestamp.After(until) {
338
-			return
339
-		}
340
-		select {
341
-		case logWatcher.Msg <- msg:
342
-		case <-ctx.Done():
343
-			logWatcher.Msg <- msg
344
-			// This for loop is used when the logger is closed (ie, container
345
-			// stopped) but the consumer is still waiting for logs.
346
-			for {
347
-				msg, err := decodeLogLine(dec, l)
348
-				if err != nil {
349
-					return
350
-				}
351
-				if !since.IsZero() && msg.Timestamp.Before(since) {
352
-					continue
353
-				}
354
-				if !until.IsZero() && msg.Timestamp.After(until) {
355
-					return
356
-				}
357
-				logWatcher.Msg <- msg
110
+			// io.ErrUnexpectedEOF is returned from json.Decoder when there is
111
+			// remaining data in the parser's buffer while an io.EOF occurs.
112
+			// If the json logger writes a partial json log entry to the disk
113
+			// while at the same time the decoder tries to decode it, the race condition happens.
114
+			if err == io.ErrUnexpectedEOF {
115
+				reader := io.MultiReader(dec.Buffered(), rdr)
116
+				dec = json.NewDecoder(reader)
117
+				retries++
358 118
 			}
359 119
 		}
120
+		return msg, err
360 121
 	}
361 122
 }
... ...
@@ -35,7 +35,7 @@ func BenchmarkJSONFileLoggerReadLogs(b *testing.B) {
35 35
 	}
36 36
 
37 37
 	buf := bytes.NewBuffer(nil)
38
-	require.NoError(b, marshalMessage(msg, jsonlogger.(*JSONFileLogger).extra, buf))
38
+	require.NoError(b, marshalMessage(msg, nil, buf))
39 39
 	b.SetBytes(int64(buf.Len()))
40 40
 
41 41
 	b.ResetTimer()
42 42
new file mode 100644
... ...
@@ -0,0 +1,454 @@
0
+package loggerutils
1
+
2
+import (
3
+	"bytes"
4
+	"context"
5
+	"fmt"
6
+	"io"
7
+	"os"
8
+	"strconv"
9
+	"sync"
10
+	"time"
11
+
12
+	"github.com/docker/docker/daemon/logger"
13
+	"github.com/docker/docker/daemon/logger/loggerutils/multireader"
14
+	"github.com/docker/docker/pkg/filenotify"
15
+	"github.com/docker/docker/pkg/pubsub"
16
+	"github.com/docker/docker/pkg/tailfile"
17
+	"github.com/fsnotify/fsnotify"
18
+	"github.com/pkg/errors"
19
+	"github.com/sirupsen/logrus"
20
+)
21
+
22
+// LogFile is Logger implementation for default Docker logging.
23
+type LogFile struct {
24
+	f             *os.File // store for closing
25
+	closed        bool
26
+	mu            sync.RWMutex
27
+	capacity      int64 //maximum size of each file
28
+	currentSize   int64 // current size of the latest file
29
+	maxFiles      int   //maximum number of files
30
+	notifyRotate  *pubsub.Publisher
31
+	marshal       logger.MarshalFunc
32
+	createDecoder makeDecoderFunc
33
+}
34
+
35
+type makeDecoderFunc func(rdr io.Reader) func() (*logger.Message, error)
36
+
37
+//NewLogFile creates new LogFile
38
+func NewLogFile(logPath string, capacity int64, maxFiles int, marshaller logger.MarshalFunc, decodeFunc makeDecoderFunc) (*LogFile, error) {
39
+	log, err := os.OpenFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640)
40
+	if err != nil {
41
+		return nil, err
42
+	}
43
+
44
+	size, err := log.Seek(0, os.SEEK_END)
45
+	if err != nil {
46
+		return nil, err
47
+	}
48
+
49
+	return &LogFile{
50
+		f:             log,
51
+		capacity:      capacity,
52
+		currentSize:   size,
53
+		maxFiles:      maxFiles,
54
+		notifyRotate:  pubsub.NewPublisher(0, 1),
55
+		marshal:       marshaller,
56
+		createDecoder: decodeFunc,
57
+	}, nil
58
+}
59
+
60
+// WriteLogEntry writes the provided log message to the current log file.
61
+// This may trigger a rotation event if the max file/capacity limits are hit.
62
+func (w *LogFile) WriteLogEntry(msg *logger.Message) error {
63
+	b, err := w.marshal(msg)
64
+	if err != nil {
65
+		return errors.Wrap(err, "error marshalling log message")
66
+	}
67
+
68
+	logger.PutMessage(msg)
69
+
70
+	w.mu.Lock()
71
+	if w.closed {
72
+		w.mu.Unlock()
73
+		return errors.New("cannot write because the output file was closed")
74
+	}
75
+
76
+	if err := w.checkCapacityAndRotate(); err != nil {
77
+		w.mu.Unlock()
78
+		return err
79
+	}
80
+
81
+	n, err := w.f.Write(b)
82
+	if err == nil {
83
+		w.currentSize += int64(n)
84
+	}
85
+	w.mu.Unlock()
86
+	return err
87
+}
88
+
89
+func (w *LogFile) checkCapacityAndRotate() error {
90
+	if w.capacity == -1 {
91
+		return nil
92
+	}
93
+
94
+	if w.currentSize >= w.capacity {
95
+		name := w.f.Name()
96
+		if err := w.f.Close(); err != nil {
97
+			return errors.Wrap(err, "error closing file")
98
+		}
99
+		if err := rotate(name, w.maxFiles); err != nil {
100
+			return err
101
+		}
102
+		file, err := os.OpenFile(name, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
103
+		if err != nil {
104
+			return err
105
+		}
106
+		w.f = file
107
+		w.currentSize = 0
108
+		w.notifyRotate.Publish(struct{}{})
109
+	}
110
+
111
+	return nil
112
+}
113
+
114
+func rotate(name string, maxFiles int) error {
115
+	if maxFiles < 2 {
116
+		return nil
117
+	}
118
+	for i := maxFiles - 1; i > 1; i-- {
119
+		toPath := name + "." + strconv.Itoa(i)
120
+		fromPath := name + "." + strconv.Itoa(i-1)
121
+		if err := os.Rename(fromPath, toPath); err != nil && !os.IsNotExist(err) {
122
+			return errors.Wrap(err, "error rotating old log entries")
123
+		}
124
+	}
125
+
126
+	if err := os.Rename(name, name+".1"); err != nil && !os.IsNotExist(err) {
127
+		return errors.Wrap(err, "error rotating current log")
128
+	}
129
+	return nil
130
+}
131
+
132
+// LogPath returns the location the given writer logs to.
133
+func (w *LogFile) LogPath() string {
134
+	w.mu.Lock()
135
+	defer w.mu.Unlock()
136
+	return w.f.Name()
137
+}
138
+
139
+// MaxFiles return maximum number of files
140
+func (w *LogFile) MaxFiles() int {
141
+	return w.maxFiles
142
+}
143
+
144
+// Close closes underlying file and signals all readers to stop.
145
+func (w *LogFile) Close() error {
146
+	w.mu.Lock()
147
+	defer w.mu.Unlock()
148
+	if w.closed {
149
+		return nil
150
+	}
151
+	if err := w.f.Close(); err != nil {
152
+		return err
153
+	}
154
+	w.closed = true
155
+	return nil
156
+}
157
+
158
+// ReadLogs decodes entries from log files and sends them the passed in watcher
159
+func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) {
160
+	w.mu.RLock()
161
+	files, err := w.openRotatedFiles()
162
+	if err != nil {
163
+		w.mu.RUnlock()
164
+		watcher.Err <- err
165
+		return
166
+	}
167
+	defer func() {
168
+		for _, f := range files {
169
+			f.Close()
170
+		}
171
+	}()
172
+
173
+	currentFile, err := os.Open(w.f.Name())
174
+	if err != nil {
175
+		w.mu.RUnlock()
176
+		watcher.Err <- err
177
+		return
178
+	}
179
+	defer currentFile.Close()
180
+
181
+	currentChunk, err := newSectionReader(currentFile)
182
+	w.mu.RUnlock()
183
+
184
+	if err != nil {
185
+		watcher.Err <- err
186
+		return
187
+	}
188
+
189
+	if config.Tail != 0 {
190
+		seekers := make([]io.ReadSeeker, 0, len(files)+1)
191
+		for _, f := range files {
192
+			seekers = append(seekers, f)
193
+		}
194
+		seekers = append(seekers, currentChunk)
195
+		tailFile(multireader.MultiReadSeeker(seekers...), watcher, w.createDecoder, config)
196
+	}
197
+
198
+	w.mu.RLock()
199
+	if !config.Follow || w.closed {
200
+		w.mu.RUnlock()
201
+		return
202
+	}
203
+	w.mu.RUnlock()
204
+
205
+	notifyRotate := w.notifyRotate.Subscribe()
206
+	defer w.notifyRotate.Evict(notifyRotate)
207
+	followLogs(currentFile, watcher, notifyRotate, w.createDecoder, config.Since, config.Until)
208
+}
209
+
210
+func (w *LogFile) openRotatedFiles() (files []*os.File, err error) {
211
+	defer func() {
212
+		if err == nil {
213
+			return
214
+		}
215
+		for _, f := range files {
216
+			f.Close()
217
+		}
218
+	}()
219
+
220
+	for i := w.maxFiles; i > 1; i-- {
221
+		f, err := os.Open(fmt.Sprintf("%s.%d", w.f.Name(), i-1))
222
+		if err != nil {
223
+			if !os.IsNotExist(err) {
224
+				return nil, err
225
+			}
226
+			continue
227
+		}
228
+		files = append(files, f)
229
+	}
230
+
231
+	return files, nil
232
+}
233
+
234
+func newSectionReader(f *os.File) (*io.SectionReader, error) {
235
+	// seek to the end to get the size
236
+	// we'll leave this at the end of the file since section reader does not advance the reader
237
+	size, err := f.Seek(0, os.SEEK_END)
238
+	if err != nil {
239
+		return nil, errors.Wrap(err, "error getting current file size")
240
+	}
241
+	return io.NewSectionReader(f, 0, size), nil
242
+}
243
+
244
+type decodeFunc func() (*logger.Message, error)
245
+
246
+func tailFile(f io.ReadSeeker, watcher *logger.LogWatcher, createDecoder makeDecoderFunc, config logger.ReadConfig) {
247
+	var rdr io.Reader = f
248
+	if config.Tail > 0 {
249
+		ls, err := tailfile.TailFile(f, config.Tail)
250
+		if err != nil {
251
+			watcher.Err <- err
252
+			return
253
+		}
254
+		rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
255
+	}
256
+
257
+	decodeLogLine := createDecoder(rdr)
258
+	for {
259
+		msg, err := decodeLogLine()
260
+		if err != nil {
261
+			if err != io.EOF {
262
+				watcher.Err <- err
263
+			}
264
+			return
265
+		}
266
+		if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) {
267
+			continue
268
+		}
269
+		if !config.Until.IsZero() && msg.Timestamp.After(config.Until) {
270
+			return
271
+		}
272
+		select {
273
+		case <-watcher.WatchClose():
274
+			return
275
+		case watcher.Msg <- msg:
276
+		}
277
+	}
278
+}
279
+
280
+func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, createDecoder makeDecoderFunc, since, until time.Time) {
281
+	decodeLogLine := createDecoder(f)
282
+
283
+	name := f.Name()
284
+	fileWatcher, err := watchFile(name)
285
+	if err != nil {
286
+		logWatcher.Err <- err
287
+		return
288
+	}
289
+	defer func() {
290
+		f.Close()
291
+		fileWatcher.Remove(name)
292
+		fileWatcher.Close()
293
+	}()
294
+
295
+	ctx, cancel := context.WithCancel(context.Background())
296
+	defer cancel()
297
+	go func() {
298
+		select {
299
+		case <-logWatcher.WatchClose():
300
+			fileWatcher.Remove(name)
301
+			cancel()
302
+		case <-ctx.Done():
303
+			return
304
+		}
305
+	}()
306
+
307
+	var retries int
308
+	handleRotate := func() error {
309
+		f.Close()
310
+		fileWatcher.Remove(name)
311
+
312
+		// retry when the file doesn't exist
313
+		for retries := 0; retries <= 5; retries++ {
314
+			f, err = os.Open(name)
315
+			if err == nil || !os.IsNotExist(err) {
316
+				break
317
+			}
318
+		}
319
+		if err != nil {
320
+			return err
321
+		}
322
+		if err := fileWatcher.Add(name); err != nil {
323
+			return err
324
+		}
325
+		decodeLogLine = createDecoder(f)
326
+		return nil
327
+	}
328
+
329
+	errRetry := errors.New("retry")
330
+	errDone := errors.New("done")
331
+	waitRead := func() error {
332
+		select {
333
+		case e := <-fileWatcher.Events():
334
+			switch e.Op {
335
+			case fsnotify.Write:
336
+				decodeLogLine = createDecoder(f)
337
+				return nil
338
+			case fsnotify.Rename, fsnotify.Remove:
339
+				select {
340
+				case <-notifyRotate:
341
+				case <-ctx.Done():
342
+					return errDone
343
+				}
344
+				if err := handleRotate(); err != nil {
345
+					return err
346
+				}
347
+				return nil
348
+			}
349
+			return errRetry
350
+		case err := <-fileWatcher.Errors():
351
+			logrus.Debug("logger got error watching file: %v", err)
352
+			// Something happened, let's try and stay alive and create a new watcher
353
+			if retries <= 5 {
354
+				fileWatcher.Close()
355
+				fileWatcher, err = watchFile(name)
356
+				if err != nil {
357
+					return err
358
+				}
359
+				retries++
360
+				return errRetry
361
+			}
362
+			return err
363
+		case <-ctx.Done():
364
+			return errDone
365
+		}
366
+	}
367
+
368
+	handleDecodeErr := func(err error) error {
369
+		if err != io.EOF {
370
+			return err
371
+		}
372
+
373
+		for {
374
+			err := waitRead()
375
+			if err == nil {
376
+				break
377
+			}
378
+			if err == errRetry {
379
+				continue
380
+			}
381
+			return err
382
+		}
383
+		return nil
384
+	}
385
+
386
+	// main loop
387
+	for {
388
+		msg, err := decodeLogLine()
389
+		if err != nil {
390
+			if err := handleDecodeErr(err); err != nil {
391
+				if err == errDone {
392
+					return
393
+				}
394
+				// we got an unrecoverable error, so return
395
+				logWatcher.Err <- err
396
+				return
397
+			}
398
+			// ready to try again
399
+			continue
400
+		}
401
+
402
+		retries = 0 // reset retries since we've succeeded
403
+		if !since.IsZero() && msg.Timestamp.Before(since) {
404
+			continue
405
+		}
406
+		if !until.IsZero() && msg.Timestamp.After(until) {
407
+			return
408
+		}
409
+		select {
410
+		case logWatcher.Msg <- msg:
411
+		case <-ctx.Done():
412
+			logWatcher.Msg <- msg
413
+			for {
414
+				msg, err := decodeLogLine()
415
+				if err != nil {
416
+					return
417
+				}
418
+				if !since.IsZero() && msg.Timestamp.Before(since) {
419
+					continue
420
+				}
421
+				if !until.IsZero() && msg.Timestamp.After(until) {
422
+					return
423
+				}
424
+				logWatcher.Msg <- msg
425
+			}
426
+		}
427
+	}
428
+}
429
+
430
+func watchFile(name string) (filenotify.FileWatcher, error) {
431
+	fileWatcher, err := filenotify.New()
432
+	if err != nil {
433
+		return nil, err
434
+	}
435
+
436
+	logger := logrus.WithFields(logrus.Fields{
437
+		"module": "logger",
438
+		"fille":  name,
439
+	})
440
+
441
+	if err := fileWatcher.Add(name); err != nil {
442
+		logger.WithError(err).Warnf("falling back to file poller")
443
+		fileWatcher.Close()
444
+		fileWatcher = filenotify.NewPollingWatcher()
445
+
446
+		if err := fileWatcher.Add(name); err != nil {
447
+			fileWatcher.Close()
448
+			logger.WithError(err).Debugf("error watching log file for modifications")
449
+			return nil, err
450
+		}
451
+	}
452
+	return fileWatcher, nil
453
+}
0 454
new file mode 100644
... ...
@@ -0,0 +1,228 @@
0
+package multireader
1
+
2
+import (
3
+	"bytes"
4
+	"fmt"
5
+	"io"
6
+	"os"
7
+)
8
+
9
+type pos struct {
10
+	idx    int
11
+	offset int64
12
+}
13
+
14
+type multiReadSeeker struct {
15
+	readers []io.ReadSeeker
16
+	pos     *pos
17
+	posIdx  map[io.ReadSeeker]int
18
+}
19
+
20
+func (r *multiReadSeeker) Seek(offset int64, whence int) (int64, error) {
21
+	var tmpOffset int64
22
+	switch whence {
23
+	case os.SEEK_SET:
24
+		for i, rdr := range r.readers {
25
+			// get size of the current reader
26
+			s, err := rdr.Seek(0, os.SEEK_END)
27
+			if err != nil {
28
+				return -1, err
29
+			}
30
+
31
+			if offset > tmpOffset+s {
32
+				if i == len(r.readers)-1 {
33
+					rdrOffset := s + (offset - tmpOffset)
34
+					if _, err := rdr.Seek(rdrOffset, os.SEEK_SET); err != nil {
35
+						return -1, err
36
+					}
37
+					r.pos = &pos{i, rdrOffset}
38
+					return offset, nil
39
+				}
40
+
41
+				tmpOffset += s
42
+				continue
43
+			}
44
+
45
+			rdrOffset := offset - tmpOffset
46
+			idx := i
47
+
48
+			if _, err := rdr.Seek(rdrOffset, os.SEEK_SET); err != nil {
49
+				return -1, err
50
+			}
51
+			// make sure all following readers are at 0
52
+			for _, rdr := range r.readers[i+1:] {
53
+				rdr.Seek(0, os.SEEK_SET)
54
+			}
55
+
56
+			if rdrOffset == s && i != len(r.readers)-1 {
57
+				idx++
58
+				rdrOffset = 0
59
+			}
60
+			r.pos = &pos{idx, rdrOffset}
61
+			return offset, nil
62
+		}
63
+	case os.SEEK_END:
64
+		for _, rdr := range r.readers {
65
+			s, err := rdr.Seek(0, os.SEEK_END)
66
+			if err != nil {
67
+				return -1, err
68
+			}
69
+			tmpOffset += s
70
+		}
71
+		if _, err := r.Seek(tmpOffset+offset, os.SEEK_SET); err != nil {
72
+			return -1, err
73
+		}
74
+		return tmpOffset + offset, nil
75
+	case os.SEEK_CUR:
76
+		if r.pos == nil {
77
+			return r.Seek(offset, os.SEEK_SET)
78
+		}
79
+		// Just return the current offset
80
+		if offset == 0 {
81
+			return r.getCurOffset()
82
+		}
83
+
84
+		curOffset, err := r.getCurOffset()
85
+		if err != nil {
86
+			return -1, err
87
+		}
88
+		rdr, rdrOffset, err := r.getReaderForOffset(curOffset + offset)
89
+		if err != nil {
90
+			return -1, err
91
+		}
92
+
93
+		r.pos = &pos{r.posIdx[rdr], rdrOffset}
94
+		return curOffset + offset, nil
95
+	default:
96
+		return -1, fmt.Errorf("Invalid whence: %d", whence)
97
+	}
98
+
99
+	return -1, fmt.Errorf("Error seeking for whence: %d, offset: %d", whence, offset)
100
+}
101
+
102
+func (r *multiReadSeeker) getReaderForOffset(offset int64) (io.ReadSeeker, int64, error) {
103
+
104
+	var offsetTo int64
105
+
106
+	for _, rdr := range r.readers {
107
+		size, err := getReadSeekerSize(rdr)
108
+		if err != nil {
109
+			return nil, -1, err
110
+		}
111
+		if offsetTo+size > offset {
112
+			return rdr, offset - offsetTo, nil
113
+		}
114
+		if rdr == r.readers[len(r.readers)-1] {
115
+			return rdr, offsetTo + offset, nil
116
+		}
117
+		offsetTo += size
118
+	}
119
+
120
+	return nil, 0, nil
121
+}
122
+
123
+func (r *multiReadSeeker) getCurOffset() (int64, error) {
124
+	var totalSize int64
125
+	for _, rdr := range r.readers[:r.pos.idx+1] {
126
+		if r.posIdx[rdr] == r.pos.idx {
127
+			totalSize += r.pos.offset
128
+			break
129
+		}
130
+
131
+		size, err := getReadSeekerSize(rdr)
132
+		if err != nil {
133
+			return -1, fmt.Errorf("error getting seeker size: %v", err)
134
+		}
135
+		totalSize += size
136
+	}
137
+	return totalSize, nil
138
+}
139
+
140
+func (r *multiReadSeeker) getOffsetToReader(rdr io.ReadSeeker) (int64, error) {
141
+	var offset int64
142
+	for _, r := range r.readers {
143
+		if r == rdr {
144
+			break
145
+		}
146
+
147
+		size, err := getReadSeekerSize(rdr)
148
+		if err != nil {
149
+			return -1, err
150
+		}
151
+		offset += size
152
+	}
153
+	return offset, nil
154
+}
155
+
156
+func (r *multiReadSeeker) Read(b []byte) (int, error) {
157
+	if r.pos == nil {
158
+		// make sure all readers are at 0
159
+		r.Seek(0, os.SEEK_SET)
160
+	}
161
+
162
+	bLen := int64(len(b))
163
+	buf := bytes.NewBuffer(nil)
164
+	var rdr io.ReadSeeker
165
+
166
+	for _, rdr = range r.readers[r.pos.idx:] {
167
+		readBytes, err := io.CopyN(buf, rdr, bLen)
168
+		if err != nil && err != io.EOF {
169
+			return -1, err
170
+		}
171
+		bLen -= readBytes
172
+
173
+		if bLen == 0 {
174
+			break
175
+		}
176
+	}
177
+
178
+	rdrPos, err := rdr.Seek(0, os.SEEK_CUR)
179
+	if err != nil {
180
+		return -1, err
181
+	}
182
+	r.pos = &pos{r.posIdx[rdr], rdrPos}
183
+	return buf.Read(b)
184
+}
185
+
186
+func getReadSeekerSize(rdr io.ReadSeeker) (int64, error) {
187
+	// save the current position
188
+	pos, err := rdr.Seek(0, os.SEEK_CUR)
189
+	if err != nil {
190
+		return -1, err
191
+	}
192
+
193
+	// get the size
194
+	size, err := rdr.Seek(0, os.SEEK_END)
195
+	if err != nil {
196
+		return -1, err
197
+	}
198
+
199
+	// reset the position
200
+	if _, err := rdr.Seek(pos, os.SEEK_SET); err != nil {
201
+		return -1, err
202
+	}
203
+	return size, nil
204
+}
205
+
206
+// MultiReadSeeker returns a ReadSeeker that's the logical concatenation of the provided
207
+// input readseekers. After calling this method the initial position is set to the
208
+// beginning of the first ReadSeeker. At the end of a ReadSeeker, Read always advances
209
+// to the beginning of the next ReadSeeker and returns EOF at the end of the last ReadSeeker.
210
+// Seek can be used over the sum of lengths of all readseekers.
211
+//
212
+// When a MultiReadSeeker is used, no Read and Seek operations should be made on
213
+// its ReadSeeker components. Also, users should make no assumption on the state
214
+// of individual readseekers while the MultiReadSeeker is used.
215
+func MultiReadSeeker(readers ...io.ReadSeeker) io.ReadSeeker {
216
+	if len(readers) == 1 {
217
+		return readers[0]
218
+	}
219
+	idx := make(map[io.ReadSeeker]int)
220
+	for i, rdr := range readers {
221
+		idx[rdr] = i
222
+	}
223
+	return &multiReadSeeker{
224
+		readers: readers,
225
+		posIdx:  idx,
226
+	}
227
+}
0 228
new file mode 100644
... ...
@@ -0,0 +1,225 @@
0
+package multireader
1
+
2
+import (
3
+	"bytes"
4
+	"encoding/binary"
5
+	"fmt"
6
+	"io"
7
+	"io/ioutil"
8
+	"os"
9
+	"strings"
10
+	"testing"
11
+)
12
+
13
+func TestMultiReadSeekerReadAll(t *testing.T) {
14
+	str := "hello world"
15
+	s1 := strings.NewReader(str + " 1")
16
+	s2 := strings.NewReader(str + " 2")
17
+	s3 := strings.NewReader(str + " 3")
18
+	mr := MultiReadSeeker(s1, s2, s3)
19
+
20
+	expectedSize := int64(s1.Len() + s2.Len() + s3.Len())
21
+
22
+	b, err := ioutil.ReadAll(mr)
23
+	if err != nil {
24
+		t.Fatal(err)
25
+	}
26
+
27
+	expected := "hello world 1hello world 2hello world 3"
28
+	if string(b) != expected {
29
+		t.Fatalf("ReadAll failed, got: %q, expected %q", string(b), expected)
30
+	}
31
+
32
+	size, err := mr.Seek(0, os.SEEK_END)
33
+	if err != nil {
34
+		t.Fatal(err)
35
+	}
36
+	if size != expectedSize {
37
+		t.Fatalf("reader size does not match, got %d, expected %d", size, expectedSize)
38
+	}
39
+
40
+	// Reset the position and read again
41
+	pos, err := mr.Seek(0, os.SEEK_SET)
42
+	if err != nil {
43
+		t.Fatal(err)
44
+	}
45
+	if pos != 0 {
46
+		t.Fatalf("expected position to be set to 0, got %d", pos)
47
+	}
48
+
49
+	b, err = ioutil.ReadAll(mr)
50
+	if err != nil {
51
+		t.Fatal(err)
52
+	}
53
+
54
+	if string(b) != expected {
55
+		t.Fatalf("ReadAll failed, got: %q, expected %q", string(b), expected)
56
+	}
57
+
58
+	// The positions of some readers are not 0
59
+	s1.Seek(0, os.SEEK_SET)
60
+	s2.Seek(0, os.SEEK_END)
61
+	s3.Seek(0, os.SEEK_SET)
62
+	mr = MultiReadSeeker(s1, s2, s3)
63
+	b, err = ioutil.ReadAll(mr)
64
+	if err != nil {
65
+		t.Fatal(err)
66
+	}
67
+
68
+	if string(b) != expected {
69
+		t.Fatalf("ReadAll failed, got: %q, expected %q", string(b), expected)
70
+	}
71
+}
72
+
73
+func TestMultiReadSeekerReadEach(t *testing.T) {
74
+	str := "hello world"
75
+	s1 := strings.NewReader(str + " 1")
76
+	s2 := strings.NewReader(str + " 2")
77
+	s3 := strings.NewReader(str + " 3")
78
+	mr := MultiReadSeeker(s1, s2, s3)
79
+
80
+	var totalBytes int64
81
+	for i, s := range []*strings.Reader{s1, s2, s3} {
82
+		sLen := int64(s.Len())
83
+		buf := make([]byte, s.Len())
84
+		expected := []byte(fmt.Sprintf("%s %d", str, i+1))
85
+
86
+		if _, err := mr.Read(buf); err != nil && err != io.EOF {
87
+			t.Fatal(err)
88
+		}
89
+
90
+		if !bytes.Equal(buf, expected) {
91
+			t.Fatalf("expected %q to be %q", string(buf), string(expected))
92
+		}
93
+
94
+		pos, err := mr.Seek(0, os.SEEK_CUR)
95
+		if err != nil {
96
+			t.Fatalf("iteration: %d, error: %v", i+1, err)
97
+		}
98
+
99
+		// check that the total bytes read is the current position of the seeker
100
+		totalBytes += sLen
101
+		if pos != totalBytes {
102
+			t.Fatalf("expected current position to be: %d, got: %d, iteration: %d", totalBytes, pos, i+1)
103
+		}
104
+
105
+		// This tests not only that SEEK_SET and SEEK_CUR give the same values, but that the next iteration is in the expected position as well
106
+		newPos, err := mr.Seek(pos, os.SEEK_SET)
107
+		if err != nil {
108
+			t.Fatal(err)
109
+		}
110
+		if newPos != pos {
111
+			t.Fatalf("expected to get same position when calling SEEK_SET with value from SEEK_CUR, cur: %d, set: %d", pos, newPos)
112
+		}
113
+	}
114
+}
115
+
116
+func TestMultiReadSeekerReadSpanningChunks(t *testing.T) {
117
+	str := "hello world"
118
+	s1 := strings.NewReader(str + " 1")
119
+	s2 := strings.NewReader(str + " 2")
120
+	s3 := strings.NewReader(str + " 3")
121
+	mr := MultiReadSeeker(s1, s2, s3)
122
+
123
+	buf := make([]byte, s1.Len()+3)
124
+	_, err := mr.Read(buf)
125
+	if err != nil {
126
+		t.Fatal(err)
127
+	}
128
+
129
+	// expected is the contents of s1 + 3 bytes from s2, ie, the `hel` at the end of this string
130
+	expected := "hello world 1hel"
131
+	if string(buf) != expected {
132
+		t.Fatalf("expected %s to be %s", string(buf), expected)
133
+	}
134
+}
135
+
136
+func TestMultiReadSeekerNegativeSeek(t *testing.T) {
137
+	str := "hello world"
138
+	s1 := strings.NewReader(str + " 1")
139
+	s2 := strings.NewReader(str + " 2")
140
+	s3 := strings.NewReader(str + " 3")
141
+	mr := MultiReadSeeker(s1, s2, s3)
142
+
143
+	s1Len := s1.Len()
144
+	s2Len := s2.Len()
145
+	s3Len := s3.Len()
146
+
147
+	s, err := mr.Seek(int64(-1*s3.Len()), os.SEEK_END)
148
+	if err != nil {
149
+		t.Fatal(err)
150
+	}
151
+	if s != int64(s1Len+s2Len) {
152
+		t.Fatalf("expected %d to be %d", s, s1.Len()+s2.Len())
153
+	}
154
+
155
+	buf := make([]byte, s3Len)
156
+	if _, err := mr.Read(buf); err != nil && err != io.EOF {
157
+		t.Fatal(err)
158
+	}
159
+	expected := fmt.Sprintf("%s %d", str, 3)
160
+	if string(buf) != fmt.Sprintf("%s %d", str, 3) {
161
+		t.Fatalf("expected %q to be %q", string(buf), expected)
162
+	}
163
+}
164
+
165
+func TestMultiReadSeekerCurAfterSet(t *testing.T) {
166
+	str := "hello world"
167
+	s1 := strings.NewReader(str + " 1")
168
+	s2 := strings.NewReader(str + " 2")
169
+	s3 := strings.NewReader(str + " 3")
170
+	mr := MultiReadSeeker(s1, s2, s3)
171
+
172
+	mid := int64(s1.Len() + s2.Len()/2)
173
+
174
+	size, err := mr.Seek(mid, os.SEEK_SET)
175
+	if err != nil {
176
+		t.Fatal(err)
177
+	}
178
+	if size != mid {
179
+		t.Fatalf("reader size does not match, got %d, expected %d", size, mid)
180
+	}
181
+
182
+	size, err = mr.Seek(3, os.SEEK_CUR)
183
+	if err != nil {
184
+		t.Fatal(err)
185
+	}
186
+	if size != mid+3 {
187
+		t.Fatalf("reader size does not match, got %d, expected %d", size, mid+3)
188
+	}
189
+	size, err = mr.Seek(5, os.SEEK_CUR)
190
+	if err != nil {
191
+		t.Fatal(err)
192
+	}
193
+	if size != mid+8 {
194
+		t.Fatalf("reader size does not match, got %d, expected %d", size, mid+8)
195
+	}
196
+
197
+	size, err = mr.Seek(10, os.SEEK_CUR)
198
+	if err != nil {
199
+		t.Fatal(err)
200
+	}
201
+	if size != mid+18 {
202
+		t.Fatalf("reader size does not match, got %d, expected %d", size, mid+18)
203
+	}
204
+}
205
+
206
+func TestMultiReadSeekerSmallReads(t *testing.T) {
207
+	readers := []io.ReadSeeker{}
208
+	for i := 0; i < 10; i++ {
209
+		integer := make([]byte, 4)
210
+		binary.BigEndian.PutUint32(integer, uint32(i))
211
+		readers = append(readers, bytes.NewReader(integer))
212
+	}
213
+
214
+	reader := MultiReadSeeker(readers...)
215
+	for i := 0; i < 10; i++ {
216
+		var integer uint32
217
+		if err := binary.Read(reader, binary.BigEndian, &integer); err != nil {
218
+			t.Fatalf("Read from NewMultiReadSeeker failed: %v", err)
219
+		}
220
+		if uint32(i) != integer {
221
+			t.Fatalf("Read wrong value from NewMultiReadSeeker: %d != %d", i, integer)
222
+		}
223
+	}
224
+}
0 225
deleted file mode 100644
... ...
@@ -1,153 +0,0 @@
1
-package loggerutils
2
-
3
-import (
4
-	"os"
5
-	"strconv"
6
-	"sync"
7
-
8
-	"github.com/docker/docker/daemon/logger"
9
-	"github.com/docker/docker/pkg/pubsub"
10
-	"github.com/pkg/errors"
11
-)
12
-
13
-// RotateFileWriter is Logger implementation for default Docker logging.
14
-type RotateFileWriter struct {
15
-	f            *os.File // store for closing
16
-	closed       bool
17
-	mu           sync.Mutex
18
-	capacity     int64 //maximum size of each file
19
-	currentSize  int64 // current size of the latest file
20
-	maxFiles     int   //maximum number of files
21
-	notifyRotate *pubsub.Publisher
22
-	marshal      logger.MarshalFunc
23
-}
24
-
25
-//NewRotateFileWriter creates new RotateFileWriter
26
-func NewRotateFileWriter(logPath string, capacity int64, maxFiles int, marshaller logger.MarshalFunc) (*RotateFileWriter, error) {
27
-	log, err := os.OpenFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640)
28
-	if err != nil {
29
-		return nil, err
30
-	}
31
-
32
-	size, err := log.Seek(0, os.SEEK_END)
33
-	if err != nil {
34
-		return nil, err
35
-	}
36
-
37
-	return &RotateFileWriter{
38
-		f:            log,
39
-		capacity:     capacity,
40
-		currentSize:  size,
41
-		maxFiles:     maxFiles,
42
-		notifyRotate: pubsub.NewPublisher(0, 1),
43
-		marshal:      marshaller,
44
-	}, nil
45
-}
46
-
47
-// WriteLogEntry writes the provided log message to the current log file.
48
-// This may trigger a rotation event if the max file/capacity limits are hit.
49
-func (w *RotateFileWriter) WriteLogEntry(msg *logger.Message) error {
50
-	b, err := w.marshal(msg)
51
-	if err != nil {
52
-		return errors.Wrap(err, "error marshalling log message")
53
-	}
54
-
55
-	logger.PutMessage(msg)
56
-
57
-	w.mu.Lock()
58
-	if w.closed {
59
-		w.mu.Unlock()
60
-		return errors.New("cannot write because the output file was closed")
61
-	}
62
-
63
-	if err := w.checkCapacityAndRotate(); err != nil {
64
-		w.mu.Unlock()
65
-		return err
66
-	}
67
-
68
-	n, err := w.f.Write(b)
69
-	if err == nil {
70
-		w.currentSize += int64(n)
71
-	}
72
-	w.mu.Unlock()
73
-	return err
74
-}
75
-
76
-func (w *RotateFileWriter) checkCapacityAndRotate() error {
77
-	if w.capacity == -1 {
78
-		return nil
79
-	}
80
-
81
-	if w.currentSize >= w.capacity {
82
-		name := w.f.Name()
83
-		if err := w.f.Close(); err != nil {
84
-			return errors.Wrap(err, "error closing file")
85
-		}
86
-		if err := rotate(name, w.maxFiles); err != nil {
87
-			return err
88
-		}
89
-		file, err := os.OpenFile(name, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
90
-		if err != nil {
91
-			return err
92
-		}
93
-		w.f = file
94
-		w.currentSize = 0
95
-		w.notifyRotate.Publish(struct{}{})
96
-	}
97
-
98
-	return nil
99
-}
100
-
101
-func rotate(name string, maxFiles int) error {
102
-	if maxFiles < 2 {
103
-		return nil
104
-	}
105
-	for i := maxFiles - 1; i > 1; i-- {
106
-		toPath := name + "." + strconv.Itoa(i)
107
-		fromPath := name + "." + strconv.Itoa(i-1)
108
-		if err := os.Rename(fromPath, toPath); err != nil && !os.IsNotExist(err) {
109
-			return errors.Wrap(err, "error rotating old log entries")
110
-		}
111
-	}
112
-
113
-	if err := os.Rename(name, name+".1"); err != nil && !os.IsNotExist(err) {
114
-		return errors.Wrap(err, "error rotating current log")
115
-	}
116
-	return nil
117
-}
118
-
119
-// LogPath returns the location the given writer logs to.
120
-func (w *RotateFileWriter) LogPath() string {
121
-	w.mu.Lock()
122
-	defer w.mu.Unlock()
123
-	return w.f.Name()
124
-}
125
-
126
-// MaxFiles return maximum number of files
127
-func (w *RotateFileWriter) MaxFiles() int {
128
-	return w.maxFiles
129
-}
130
-
131
-//NotifyRotate returns the new subscriber
132
-func (w *RotateFileWriter) NotifyRotate() chan interface{} {
133
-	return w.notifyRotate.Subscribe()
134
-}
135
-
136
-//NotifyRotateEvict removes the specified subscriber from receiving any more messages.
137
-func (w *RotateFileWriter) NotifyRotateEvict(sub chan interface{}) {
138
-	w.notifyRotate.Evict(sub)
139
-}
140
-
141
-// Close closes underlying file and signals all readers to stop.
142
-func (w *RotateFileWriter) Close() error {
143
-	w.mu.Lock()
144
-	defer w.mu.Unlock()
145
-	if w.closed {
146
-		return nil
147
-	}
148
-	if err := w.f.Close(); err != nil {
149
-		return err
150
-	}
151
-	w.closed = true
152
-	return nil
153
-}
... ...
@@ -123,7 +123,7 @@ func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, c
123 123
 				}
124 124
 				return
125 125
 			case <-ctx.Done():
126
-				lg.Debug("logs: end stream, ctx is done: %v", ctx.Err())
126
+				lg.Debugf("logs: end stream, ctx is done: %v", ctx.Err())
127 127
 				return
128 128
 			case msg, ok := <-logs.Msg:
129 129
 				// there is some kind of pool or ring buffer in the logger that