Browse code

Revert "use pubsub instead of filenotify to follow json logs"

This reverts commit b1594c59f5e0d1ac898eacde8d91b1ba33c2b626.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>

Brian Goff authored on 2016/02/24 11:07:38
Showing 3 changed files
... ...
@@ -14,7 +14,6 @@ import (
14 14
 	"github.com/docker/docker/daemon/logger"
15 15
 	"github.com/docker/docker/daemon/logger/loggerutils"
16 16
 	"github.com/docker/docker/pkg/jsonlog"
17
-	"github.com/docker/docker/pkg/pubsub"
18 17
 	"github.com/docker/go-units"
19 18
 )
20 19
 
... ...
@@ -23,13 +22,12 @@ const Name = "json-file"
23 23
 
24 24
 // JSONFileLogger is Logger implementation for default Docker logging.
25 25
 type JSONFileLogger struct {
26
-	buf           *bytes.Buffer
27
-	writer        *loggerutils.RotateFileWriter
28
-	mu            sync.Mutex
29
-	ctx           logger.Context
30
-	readers       map[*logger.LogWatcher]struct{} // stores the active log followers
31
-	extra         []byte                          // json-encoded extra attributes
32
-	writeNotifier *pubsub.Publisher
26
+	buf     *bytes.Buffer
27
+	writer  *loggerutils.RotateFileWriter
28
+	mu      sync.Mutex
29
+	ctx     logger.Context
30
+	readers map[*logger.LogWatcher]struct{} // stores the active log followers
31
+	extra   []byte                          // json-encoded extra attributes
33 32
 }
34 33
 
35 34
 func init() {
... ...
@@ -79,11 +77,10 @@ func New(ctx logger.Context) (logger.Logger, error) {
79 79
 	}
80 80
 
81 81
 	return &JSONFileLogger{
82
-		buf:           bytes.NewBuffer(nil),
83
-		writer:        writer,
84
-		readers:       make(map[*logger.LogWatcher]struct{}),
85
-		extra:         extra,
86
-		writeNotifier: pubsub.NewPublisher(0, 10),
82
+		buf:     bytes.NewBuffer(nil),
83
+		writer:  writer,
84
+		readers: make(map[*logger.LogWatcher]struct{}),
85
+		extra:   extra,
87 86
 	}, nil
88 87
 }
89 88
 
... ...
@@ -107,7 +104,6 @@ func (l *JSONFileLogger) Log(msg *logger.Message) error {
107 107
 
108 108
 	l.buf.WriteByte('\n')
109 109
 	_, err = l.writer.Write(l.buf.Bytes())
110
-	l.writeNotifier.Publish(struct{}{})
111 110
 	l.buf.Reset()
112 111
 
113 112
 	return err
... ...
@@ -141,7 +137,6 @@ func (l *JSONFileLogger) Close() error {
141 141
 		r.Close()
142 142
 		delete(l.readers, r)
143 143
 	}
144
-	l.writeNotifier.Close()
145 144
 	l.mu.Unlock()
146 145
 	return err
147 146
 }
... ...
@@ -10,11 +10,14 @@ import (
10 10
 
11 11
 	"github.com/Sirupsen/logrus"
12 12
 	"github.com/docker/docker/daemon/logger"
13
+	"github.com/docker/docker/pkg/filenotify"
13 14
 	"github.com/docker/docker/pkg/ioutils"
14 15
 	"github.com/docker/docker/pkg/jsonlog"
15 16
 	"github.com/docker/docker/pkg/tailfile"
16 17
 )
17 18
 
19
+const maxJSONDecodeRetry = 20000
20
+
18 21
 func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
19 22
 	l.Reset()
20 23
 	if err := dec.Decode(l); err != nil {
... ...
@@ -32,6 +35,7 @@ func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, erro
32 32
 // created by this driver.
33 33
 func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
34 34
 	logWatcher := logger.NewLogWatcher()
35
+
35 36
 	go l.readLogs(logWatcher, config)
36 37
 	return logWatcher
37 38
 }
... ...
@@ -81,7 +85,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
81 81
 	l.mu.Unlock()
82 82
 
83 83
 	notifyRotate := l.writer.NotifyRotate()
84
-	l.followLogs(latestFile, logWatcher, notifyRotate, config.Since)
84
+	followLogs(latestFile, logWatcher, notifyRotate, config.Since)
85 85
 
86 86
 	l.mu.Lock()
87 87
 	delete(l.readers, logWatcher)
... ...
@@ -117,81 +121,95 @@ func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since ti
117 117
 	}
118 118
 }
119 119
 
120
-func (l *JSONFileLogger) followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
121
-	var (
122
-		rotated bool
120
+func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
121
+	dec := json.NewDecoder(f)
122
+	l := &jsonlog.JSONLog{}
123 123
 
124
-		dec         = json.NewDecoder(f)
125
-		log         = &jsonlog.JSONLog{}
126
-		writeNotify = l.writeNotifier.Subscribe()
127
-		watchClose  = logWatcher.WatchClose()
128
-	)
124
+	fileWatcher, err := filenotify.New()
125
+	if err != nil {
126
+		logWatcher.Err <- err
127
+	}
128
+	defer fileWatcher.Close()
129 129
 
130
-	reopenLogFile := func() error {
131
-		f.Close()
132
-		f, err := os.Open(f.Name())
130
+	var retries int
131
+	for {
132
+		msg, err := decodeLogLine(dec, l)
133 133
 		if err != nil {
134
-			return err
135
-		}
136
-		dec = json.NewDecoder(f)
137
-		rotated = true
138
-		return nil
139
-	}
134
+			if err != io.EOF {
135
+				// try again because this shouldn't happen
136
+				if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
137
+					dec = json.NewDecoder(f)
138
+					retries++
139
+					continue
140
+				}
141
+
142
+				// io.ErrUnexpectedEOF is returned from json.Decoder when there is
143
+				// remaining data in the parser's buffer while an io.EOF occurs.
144
+				// If the json logger writes a partial json log entry to the disk
145
+				// while at the same time the decoder tries to decode it, the race condition happens.
146
+				if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry {
147
+					reader := io.MultiReader(dec.Buffered(), f)
148
+					dec = json.NewDecoder(reader)
149
+					retries++
150
+					continue
151
+				}
152
+				logWatcher.Err <- err
153
+				return
154
+			}
140 155
 
141
-	readToEnd := func() error {
142
-		for {
143
-			msg, err := decodeLogLine(dec, log)
144
-			if err != nil {
145
-				return err
156
+			logrus.WithField("logger", "json-file").Debugf("waiting for events")
157
+			if err := fileWatcher.Add(f.Name()); err != nil {
158
+				logrus.WithField("logger", "json-file").Warn("falling back to file poller")
159
+				fileWatcher.Close()
160
+				fileWatcher = filenotify.NewPollingWatcher()
161
+				if err := fileWatcher.Add(f.Name()); err != nil {
162
+					logrus.Errorf("error watching log file for modifications: %v", err)
163
+					logWatcher.Err <- err
164
+				}
146 165
 			}
147
-			if !since.IsZero() && msg.Timestamp.Before(since) {
166
+			select {
167
+			case <-fileWatcher.Events():
168
+				dec = json.NewDecoder(f)
169
+				fileWatcher.Remove(f.Name())
170
+				continue
171
+			case <-fileWatcher.Errors():
172
+				fileWatcher.Remove(f.Name())
173
+				logWatcher.Err <- err
174
+				return
175
+			case <-logWatcher.WatchClose():
176
+				fileWatcher.Remove(f.Name())
177
+				return
178
+			case <-notifyRotate:
179
+				f, err = os.Open(f.Name())
180
+				if err != nil {
181
+					logWatcher.Err <- err
182
+					return
183
+				}
184
+
185
+				dec = json.NewDecoder(f)
186
+				fileWatcher.Remove(f.Name())
187
+				fileWatcher.Add(f.Name())
148 188
 				continue
149 189
 			}
150
-			logWatcher.Msg <- msg
151 190
 		}
152
-	}
153 191
 
154
-	defer func() {
155
-		l.writeNotifier.Evict(writeNotify)
156
-		if rotated {
157
-			f.Close()
192
+		retries = 0 // reset retries since we've succeeded
193
+		if !since.IsZero() && msg.Timestamp.Before(since) {
194
+			continue
158 195
 		}
159
-	}()
160
-
161
-	for {
162 196
 		select {
163
-		case <-watchClose:
164
-			readToEnd()
165
-			return
166
-		case <-notifyRotate:
167
-			readToEnd()
168
-			if err := reopenLogFile(); err != nil {
169
-				logWatcher.Err <- err
170
-				return
171
-			}
172
-		case _, ok := <-writeNotify:
173
-			if err := readToEnd(); err == io.EOF {
174
-				if !ok {
175
-					// The writer is closed, no new logs will be generated.
197
+		case logWatcher.Msg <- msg:
198
+		case <-logWatcher.WatchClose():
199
+			logWatcher.Msg <- msg
200
+			for {
201
+				msg, err := decodeLogLine(dec, l)
202
+				if err != nil {
176 203
 					return
177 204
 				}
178
-
179
-				select {
180
-				case <-notifyRotate:
181
-					if err := reopenLogFile(); err != nil {
182
-						logWatcher.Err <- err
183
-						return
184
-					}
185
-				default:
186
-					dec = json.NewDecoder(f)
205
+				if !since.IsZero() && msg.Timestamp.Before(since) {
206
+					continue
187 207
 				}
188
-
189
-			} else if err == io.ErrUnexpectedEOF {
190
-				dec = json.NewDecoder(io.MultiReader(dec.Buffered(), f))
191
-			} else {
192
-				logrus.Errorf("Failed to decode json log %s: %v", f.Name(), err)
193
-				logWatcher.Err <- err
194
-				return
208
+				logWatcher.Msg <- msg
195 209
 			}
196 210
 		}
197 211
 	}
... ...
@@ -56,10 +56,8 @@ func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
56 56
 // Evict removes the specified subscriber from receiving any more messages.
57 57
 func (p *Publisher) Evict(sub chan interface{}) {
58 58
 	p.m.Lock()
59
-	if _, ok := p.subscribers[sub]; ok {
60
-		delete(p.subscribers, sub)
61
-		close(sub)
62
-	}
59
+	delete(p.subscribers, sub)
60
+	close(sub)
63 61
 	p.m.Unlock()
64 62
 }
65 63