Browse code

Merge pull request #24514 from cpuguy83/fix_log_follow_rotate

Fix issues with tailing rotated jsonlog file

Aaron Lehmann authored on 2016/08/27 01:47:40
Showing 1 changed files
... ...
@@ -3,11 +3,14 @@ package jsonfilelog
3 3
 import (
4 4
 	"bytes"
5 5
 	"encoding/json"
6
+	"errors"
6 7
 	"fmt"
7 8
 	"io"
8 9
 	"os"
9 10
 	"time"
10 11
 
12
+	"gopkg.in/fsnotify.v1"
13
+
11 14
 	"github.com/Sirupsen/logrus"
12 15
 	"github.com/docker/docker/daemon/logger"
13 16
 	"github.com/docker/docker/pkg/filenotify"
... ...
@@ -44,6 +47,10 @@ func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
44 44
 func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
45 45
 	defer close(logWatcher.Msg)
46 46
 
47
+	// lock so the read stream doesn't get corrupted due to rotations or other log data written while we read
48
+	// This will block writes!!!
49
+	l.mu.Lock()
50
+
47 51
 	pth := l.writer.LogPath()
48 52
 	var files []io.ReadSeeker
49 53
 	for i := l.writer.MaxFiles(); i > 1; i-- {
... ...
@@ -63,6 +70,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
63 63
 	latestFile, err := os.Open(pth)
64 64
 	if err != nil {
65 65
 		logWatcher.Err <- err
66
+		l.mu.Unlock()
66 67
 		return
67 68
 	}
68 69
 	defer latestFile.Close()
... ...
@@ -83,6 +91,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
83 83
 		if err := latestFile.Close(); err != nil {
84 84
 			logrus.Errorf("Error closing file: %v", err)
85 85
 		}
86
+		l.mu.Unlock()
86 87
 		return
87 88
 	}
88 89
 
... ...
@@ -90,7 +99,6 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
90 90
 		latestFile.Seek(0, os.SEEK_END)
91 91
 	}
92 92
 
93
-	l.mu.Lock()
94 93
 	l.readers[logWatcher] = struct{}{}
95 94
 	l.mu.Unlock()
96 95
 
... ...
@@ -131,92 +139,148 @@ func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since ti
131 131
 	}
132 132
 }
133 133
 
134
+func watchFile(name string) (filenotify.FileWatcher, error) {
135
+	fileWatcher, err := filenotify.New()
136
+	if err != nil {
137
+		return nil, err
138
+	}
139
+
140
+	if err := fileWatcher.Add(name); err != nil {
141
+		logrus.WithField("logger", "json-file").Warnf("falling back to file poller due to error: %v", err)
142
+		fileWatcher.Close()
143
+		fileWatcher = filenotify.NewPollingWatcher()
144
+
145
+		if err := fileWatcher.Add(name); err != nil {
146
+			fileWatcher.Close()
147
+			logrus.Debugf("error watching log file for modifications: %v", err)
148
+			return nil, err
149
+		}
150
+	}
151
+	return fileWatcher, nil
152
+}
153
+
134 154
 func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
135 155
 	dec := json.NewDecoder(f)
136 156
 	l := &jsonlog.JSONLog{}
137 157
 
138
-	fileWatcher, err := filenotify.New()
158
+	name := f.Name()
159
+	fileWatcher, err := watchFile(name)
139 160
 	if err != nil {
140 161
 		logWatcher.Err <- err
162
+		return
141 163
 	}
142 164
 	defer func() {
143 165
 		f.Close()
144 166
 		fileWatcher.Close()
145 167
 	}()
146
-	name := f.Name()
147 168
 
148
-	if err := fileWatcher.Add(name); err != nil {
149
-		logrus.WithField("logger", "json-file").Warnf("falling back to file poller due to error: %v", err)
150
-		fileWatcher.Close()
151
-		fileWatcher = filenotify.NewPollingWatcher()
169
+	var retries int
170
+	handleRotate := func() error {
171
+		f.Close()
172
+		fileWatcher.Remove(name)
152 173
 
174
+		// retry when the file doesn't exist
175
+		for retries := 0; retries <= 5; retries++ {
176
+			f, err = os.Open(name)
177
+			if err == nil || !os.IsNotExist(err) {
178
+				break
179
+			}
180
+		}
181
+		if err != nil {
182
+			return err
183
+		}
153 184
 		if err := fileWatcher.Add(name); err != nil {
154
-			logrus.Debugf("error watching log file for modifications: %v", err)
155
-			logWatcher.Err <- err
156
-			return
185
+			return err
157 186
 		}
187
+		dec = json.NewDecoder(f)
188
+		return nil
158 189
 	}
159 190
 
160
-	var retries int
161
-	for {
162
-		msg, err := decodeLogLine(dec, l)
163
-		if err != nil {
164
-			if err != io.EOF {
165
-				// try again because this shouldn't happen
166
-				if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
167
-					dec = json.NewDecoder(f)
168
-					retries++
169
-					continue
191
+	errRetry := errors.New("retry")
192
+	errDone := errors.New("done")
193
+	waitRead := func() error {
194
+		select {
195
+		case e := <-fileWatcher.Events():
196
+			switch e.Op {
197
+			case fsnotify.Write:
198
+				dec = json.NewDecoder(f)
199
+				return nil
200
+			case fsnotify.Rename, fsnotify.Remove:
201
+				select {
202
+				case <-notifyRotate:
203
+				case <-logWatcher.WatchClose():
204
+					fileWatcher.Remove(name)
205
+					return errDone
170 206
 				}
171
-
172
-				// io.ErrUnexpectedEOF is returned from json.Decoder when there is
173
-				// remaining data in the parser's buffer while an io.EOF occurs.
174
-				// If the json logger writes a partial json log entry to the disk
175
-				// while at the same time the decoder tries to decode it, the race condition happens.
176
-				if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry {
177
-					reader := io.MultiReader(dec.Buffered(), f)
178
-					dec = json.NewDecoder(reader)
179
-					retries++
180
-					continue
207
+				if err := handleRotate(); err != nil {
208
+					return err
181 209
 				}
182
-
183
-				return
210
+				return nil
184 211
 			}
185
-
186
-			select {
187
-			case <-fileWatcher.Events():
188
-				dec = json.NewDecoder(f)
189
-				continue
190
-			case <-fileWatcher.Errors():
191
-				logWatcher.Err <- err
192
-				return
193
-			case <-logWatcher.WatchClose():
194
-				fileWatcher.Remove(name)
195
-				return
196
-			case <-notifyRotate:
197
-				f.Close()
198
-				fileWatcher.Remove(name)
199
-
200
-				// retry when the file doesn't exist
201
-				for retries := 0; retries <= 5; retries++ {
202
-					f, err = os.Open(name)
203
-					if err == nil || !os.IsNotExist(err) {
204
-						break
205
-					}
212
+			return errRetry
213
+		case err := <-fileWatcher.Errors():
214
+			logrus.Debug("logger got error watching file: %v", err)
215
+			// Something happened, let's try and stay alive and create a new watcher
216
+			if retries <= 5 {
217
+				fileWatcher.Close()
218
+				fileWatcher, err = watchFile(name)
219
+				if err != nil {
220
+					return err
206 221
 				}
222
+				retries++
223
+				return errRetry
224
+			}
225
+			return err
226
+		case <-logWatcher.WatchClose():
227
+			fileWatcher.Remove(name)
228
+			return errDone
229
+		}
230
+	}
207 231
 
208
-				if err = fileWatcher.Add(name); err != nil {
209
-					logWatcher.Err <- err
210
-					return
232
+	handleDecodeErr := func(err error) error {
233
+		if err == io.EOF {
234
+			for err := waitRead(); err != nil; {
235
+				if err == errRetry {
236
+					// retry the waitRead
237
+					continue
211 238
 				}
212
-				if err != nil {
213
-					logWatcher.Err <- err
239
+				return err
240
+			}
241
+			return nil
242
+		}
243
+		// try again because this shouldn't happen
244
+		if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
245
+			dec = json.NewDecoder(f)
246
+			retries++
247
+			return nil
248
+		}
249
+		// io.ErrUnexpectedEOF is returned from json.Decoder when there is
250
+		// remaining data in the parser's buffer while an io.EOF occurs.
251
+		// If the json logger writes a partial json log entry to the disk
252
+		// while at the same time the decoder tries to decode it, the race condition happens.
253
+		if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry {
254
+			reader := io.MultiReader(dec.Buffered(), f)
255
+			dec = json.NewDecoder(reader)
256
+			retries++
257
+			return nil
258
+		}
259
+		return err
260
+	}
261
+
262
+	// main loop
263
+	for {
264
+		msg, err := decodeLogLine(dec, l)
265
+		if err != nil {
266
+			if err := handleDecodeErr(err); err != nil {
267
+				if err == errDone {
214 268
 					return
215 269
 				}
216
-
217
-				dec = json.NewDecoder(f)
218
-				continue
270
+				// we got an unrecoverable error, so return
271
+				logWatcher.Err <- err
272
+				return
219 273
 			}
274
+			// ready to try again
275
+			continue
220 276
 		}
221 277
 
222 278
 		retries = 0 // reset retries since we've succeeded