Browse code

Fix some issues in logfile reader and rotation

- Check errors.Cause(err) when comparing errors
- Fix bug where oldest log file is not actually removed. This in
particular causes issues when compression is enabled. On rotate it just
overwrites the data in the log file corrupting it.
- Use O_TRUNC to open new gzip files to ensure we don't corrupt log
files as happens without the above fix.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>

Brian Goff authored on 2018/05/08 00:25:41
Showing 1 changed files
... ...
@@ -97,7 +97,7 @@ type LogFile struct {
97 97
 
98 98
 type makeDecoderFunc func(rdr io.Reader) func() (*logger.Message, error)
99 99
 
100
-//NewLogFile creates new LogFile
100
+// NewLogFile creates new LogFile
101 101
 func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, marshaller logger.MarshalFunc, decodeFunc makeDecoderFunc, perms os.FileMode) (*LogFile, error) {
102 102
 	log, err := os.OpenFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, perms)
103 103
 	if err != nil {
... ...
@@ -201,6 +201,13 @@ func rotate(name string, maxFiles int, compress bool) error {
201 201
 	if compress {
202 202
 		extension = ".gz"
203 203
 	}
204
+
205
+	lastFile := fmt.Sprintf("%s.%d%s", name, maxFiles-1, extension)
206
+	err := os.Remove(lastFile)
207
+	if err != nil && !os.IsNotExist(err) {
208
+		return errors.Wrap(err, "error removing oldest log file")
209
+	}
210
+
204 211
 	for i := maxFiles - 1; i > 1; i-- {
205 212
 		toPath := name + "." + strconv.Itoa(i) + extension
206 213
 		fromPath := name + "." + strconv.Itoa(i-1) + extension
... ...
@@ -230,7 +237,7 @@ func compressFile(fileName string, lastTimestamp time.Time) {
230 230
 		}
231 231
 	}()
232 232
 
233
-	outFile, err := os.OpenFile(fileName+".gz", os.O_CREATE|os.O_RDWR, 0640)
233
+	outFile, err := os.OpenFile(fileName+".gz", os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0640)
234 234
 	if err != nil {
235 235
 		logrus.Errorf("Failed to open or create gzip log file: %v", err)
236 236
 		return
... ...
@@ -251,7 +258,7 @@ func compressFile(fileName string, lastTimestamp time.Time) {
251 251
 	compressWriter.Header.Extra, err = json.Marshal(&extra)
252 252
 	if err != nil {
253 253
 		// Here log the error only and don't return since this is just an optimization.
254
-		logrus.Warningf("Failed to marshal JSON: %v", err)
254
+		logrus.Warningf("Failed to marshal gzip header as JSON: %v", err)
255 255
 	}
256 256
 
257 257
 	_, err = pools.Copy(compressWriter, file)
... ...
@@ -281,6 +288,9 @@ func (w *LogFile) Close() error {
281 281
 }
282 282
 
283 283
 // ReadLogs decodes entries from log files and sends them the passed in watcher
284
+//
285
+// Note: Using the follow option can become inconsistent in cases with very frequent rotations and max log files is 1.
286
+// TODO: Consider a different implementation which can effectively follow logs under frequent rotations.
284 287
 func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) {
285 288
 	w.mu.RLock()
286 289
 	currentFile, err := os.Open(w.f.Name())
... ...
@@ -364,7 +374,7 @@ func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []*os.File,
364 364
 		f, err := os.Open(fmt.Sprintf("%s.%d", w.f.Name(), i-1))
365 365
 		if err != nil {
366 366
 			if !os.IsNotExist(err) {
367
-				return nil, err
367
+				return nil, errors.Wrap(err, "error opening rotated log file")
368 368
 			}
369 369
 
370 370
 			fileName := fmt.Sprintf("%s.%d.gz", w.f.Name(), i-1)
... ...
@@ -377,8 +387,8 @@ func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []*os.File,
377 377
 			})
378 378
 
379 379
 			if err != nil {
380
-				if !os.IsNotExist(err) {
381
-					return nil, err
380
+				if !os.IsNotExist(errors.Cause(err)) {
381
+					return nil, errors.Wrap(err, "error getting reference to decompressed log file")
382 382
 				}
383 383
 				continue
384 384
 			}
... ...
@@ -399,13 +409,13 @@ func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []*os.File,
399 399
 func decompressfile(fileName, destFileName string, since time.Time) (*os.File, error) {
400 400
 	cf, err := os.Open(fileName)
401 401
 	if err != nil {
402
-		return nil, err
402
+		return nil, errors.Wrap(err, "error opening file for decompression")
403 403
 	}
404 404
 	defer cf.Close()
405 405
 
406 406
 	rc, err := gzip.NewReader(cf)
407 407
 	if err != nil {
408
-		return nil, err
408
+		return nil, errors.Wrap(err, "error making gzip reader for compressed log file")
409 409
 	}
410 410
 	defer rc.Close()
411 411
 
... ...
@@ -418,17 +428,17 @@ func decompressfile(fileName, destFileName string, since time.Time) (*os.File, e
418 418
 
419 419
 	rs, err := os.OpenFile(destFileName, os.O_CREATE|os.O_RDWR, 0640)
420 420
 	if err != nil {
421
-		return nil, err
421
+		return nil, errors.Wrap(err, "error creating file for copying decompressed log stream")
422 422
 	}
423 423
 
424 424
 	_, err = pools.Copy(rs, rc)
425 425
 	if err != nil {
426 426
 		rs.Close()
427 427
 		rErr := os.Remove(rs.Name())
428
-		if rErr != nil && os.IsNotExist(rErr) {
428
+		if rErr != nil && !os.IsNotExist(rErr) {
429 429
 			logrus.Errorf("Failed to remove the logfile %q: %v", rs.Name(), rErr)
430 430
 		}
431
-		return nil, err
431
+		return nil, errors.Wrap(err, "error while copying decompressed log stream to file")
432 432
 	}
433 433
 
434 434
 	return rs, nil
... ...
@@ -461,7 +471,7 @@ func tailFile(f io.ReadSeeker, watcher *logger.LogWatcher, createDecoder makeDec
461 461
 	for {
462 462
 		msg, err := decodeLogLine()
463 463
 		if err != nil {
464
-			if err != io.EOF {
464
+			if errors.Cause(err) != io.EOF {
465 465
 				watcher.Err <- err
466 466
 			}
467 467
 			return
... ...
@@ -569,7 +579,7 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
569 569
 	}
570 570
 
571 571
 	handleDecodeErr := func(err error) error {
572
-		if err != io.EOF {
572
+		if errors.Cause(err) != io.EOF {
573 573
 			return err
574 574
 		}
575 575