Browse code

journald/read: simplify/fix followJournal()

TL;DR: simplify the code, fix --follow hanging indefinitely

Do the following to simplify the followJournal() code:

1. Use Go-native select instead of C-native polling.

2. Use Watch{Producer,Consumer}Gone(), eliminating the need
to have journald.closed variable, and an extra goroutine.

3. Use sd_journal_wait(). In the words of its own man page:

> A synchronous alternative for using sd_journal_get_fd(),
> sd_journal_get_events(), sd_journal_get_timeout() and
> sd_journal_process() is sd_journal_wait().

Unfortunately, the logic is still not as simple as it
could be; the reason being, once the container has exited,
journald might still be writing some logs from its internal
buffers onto journal file(s), and there is no way to
figure out whether it's done so we are guaranteed to
read all of it back. This bug can be reproduced with
something like

> $ ID=$(docker run -d busybox seq 1 150000); docker logs --follow $ID
> ...
> 128123
> $

(The last expected output line should be `150000`).

To avoid exiting from followJournal() early, add the
following logic: once the container is gone, keep trying
to drain the journal until there's no new data for at
least `waitTimeout` time period.

Should fix https://github.com/docker/for-linux/issues/575

Signed-off-by: Kir Kolyshkin <kolyshkin@gmail.com>
(cherry picked from commit f091febc942859ffbc881f3a3aa327366603ae65)
Signed-off-by: Kir Kolyshkin <kolyshkin@gmail.com>

Kir Kolyshkin authored on 2019/03/12 09:20:56
Showing 2 changed files
... ...
@@ -21,7 +21,6 @@ type journald struct {
21 21
 	mu      sync.Mutex
22 22
 	vars    map[string]string // additional variables and values to send to the journal along with the log message
23 23
 	readers map[*logger.LogWatcher]struct{}
24
-	closed  bool
25 24
 }
26 25
 
27 26
 func init() {
... ...
@@ -101,55 +101,10 @@ package journald // import "github.com/docker/docker/daemon/logger/journald"
101 101
 //	}
102 102
 //	return rc;
103 103
 //}
104
-//static int wait_for_data_cancelable(sd_journal *j, int pipefd)
105
-//{
106
-//	struct pollfd fds[2];
107
-//	uint64_t when = 0;
108
-//	int timeout, jevents, i;
109
-//	struct timespec ts;
110
-//	uint64_t now;
111
-//
112
-//	memset(&fds, 0, sizeof(fds));
113
-//	fds[0].fd = pipefd;
114
-//	fds[0].events = POLLHUP;
115
-//	fds[1].fd = sd_journal_get_fd(j);
116
-//	if (fds[1].fd < 0) {
117
-//		return fds[1].fd;
118
-//	}
119
-//
120
-//	do {
121
-//		jevents = sd_journal_get_events(j);
122
-//		if (jevents < 0) {
123
-//			return jevents;
124
-//		}
125
-//		fds[1].events = jevents;
126
-//		sd_journal_get_timeout(j, &when);
127
-//		if (when == -1) {
128
-//			timeout = -1;
129
-//		} else {
130
-//			clock_gettime(CLOCK_MONOTONIC, &ts);
131
-//			now = (uint64_t) ts.tv_sec * 1000000 + ts.tv_nsec / 1000;
132
-//			timeout = when > now ? (int) ((when - now + 999) / 1000) : 0;
133
-//		}
134
-//		i = poll(fds, 2, timeout);
135
-//		if ((i == -1) && (errno != EINTR)) {
136
-//			/* An unexpected error. */
137
-//			return (errno != 0) ? -errno : -EINTR;
138
-//		}
139
-//		if (fds[0].revents & POLLHUP) {
140
-//			/* The close notification pipe was closed. */
141
-//			return 0;
142
-//		}
143
-//		if (sd_journal_process(j) == SD_JOURNAL_APPEND) {
144
-//			/* Data, which we might care about, was appended. */
145
-//			return 1;
146
-//		}
147
-//	} while ((fds[0].revents & POLLHUP) == 0);
148
-//	return 0;
149
-//}
150 104
 import "C"
151 105
 
152 106
 import (
107
+	"errors"
153 108
 	"fmt"
154 109
 	"strings"
155 110
 	"time"
... ...
@@ -158,27 +113,33 @@ import (
158 158
 	"github.com/coreos/go-systemd/journal"
159 159
 	"github.com/docker/docker/api/types/backend"
160 160
 	"github.com/docker/docker/daemon/logger"
161
-	"github.com/sirupsen/logrus"
162 161
 )
163 162
 
164 163
 func (s *journald) Close() error {
165 164
 	s.mu.Lock()
166
-	s.closed = true
167 165
 	for r := range s.readers {
168 166
 		r.ProducerGone()
169 167
 		delete(s.readers, r)
170
-
171 168
 	}
172 169
 	s.mu.Unlock()
173 170
 	return nil
174 171
 }
175 172
 
176
-func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, oldCursor *C.char, untilUnixMicro uint64) (*C.char, bool) {
177
-	var msg, data, cursor *C.char
178
-	var length C.size_t
179
-	var stamp C.uint64_t
180
-	var priority, partial C.int
181
-	var done bool
173
+// convert error code returned from a sd_journal_* function
174
+// (which returns -errno) to a string
175
+func CErr(ret C.int) string {
176
+	return C.GoString(C.strerror(C.int(-ret)))
177
+}
178
+
179
+func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, oldCursor *C.char, untilUnixMicro uint64) (*C.char, bool, int) {
180
+	var (
181
+		msg, data, cursor *C.char
182
+		length            C.size_t
183
+		stamp             C.uint64_t
184
+		priority, partial C.int
185
+		done              bool
186
+		shown             int
187
+	)
182 188
 
183 189
 	// Walk the journal from here forward until we run out of new entries
184 190
 	// or we reach the until value (if provided).
... ...
@@ -230,6 +191,7 @@ drain:
230 230
 				kv := strings.SplitN(C.GoStringN(data, C.int(length)), "=", 2)
231 231
 				attrs = append(attrs, backend.LogAttr{Key: kv[0], Value: kv[1]})
232 232
 			}
233
+
233 234
 			// Send the log message, unless the consumer is gone
234 235
 			select {
235 236
 			case <-logWatcher.WatchConsumerGone():
... ...
@@ -241,6 +203,7 @@ drain:
241 241
 				Timestamp: timestamp.In(time.UTC),
242 242
 				Attrs:     attrs,
243 243
 			}:
244
+				shown++
244 245
 			}
245 246
 		}
246 247
 		// If we're at the end of the journal, we're done (for now).
... ...
@@ -255,73 +218,57 @@ drain:
255 255
 		// ensure that we won't be freeing an address that's invalid
256 256
 		cursor = nil
257 257
 	}
258
-	return cursor, done
258
+	return cursor, done, shown
259 259
 }
260 260
 
261
-func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, pfd [2]C.int, cursor *C.char, untilUnixMicro uint64) *C.char {
261
+func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, cursor *C.char, untilUnixMicro uint64) *C.char {
262 262
 	s.mu.Lock()
263 263
 	s.readers[logWatcher] = struct{}{}
264
-	if s.closed {
265
-		// the journald Logger is closed, presumably because the container has been
266
-		// reset.  So we shouldn't follow, because we'll never be woken up.  But we
267
-		// should make one more drainJournal call to be sure we've got all the logs.
268
-		// Close pfd[1] so that one drainJournal happens, then cleanup, then return.
269
-		C.close(pfd[1])
270
-	}
271 264
 	s.mu.Unlock()
272 265
 
273
-	newCursor := make(chan *C.char)
266
+	waitTimeout := C.uint64_t(250000) // 0.25s
274 267
 
275
-	go func() {
276
-		for {
277
-			// Keep copying journal data out until we're notified to stop
278
-			// or we hit an error.
279
-			status := C.wait_for_data_cancelable(j, pfd[0])
280
-			if status < 0 {
281
-				cerrstr := C.strerror(C.int(-status))
282
-				errstr := C.GoString(cerrstr)
283
-				fmtstr := "error %q while attempting to follow journal for container %q"
284
-				logrus.Errorf(fmtstr, errstr, s.vars["CONTAINER_ID_FULL"])
285
-				break
286
-			}
287
-
288
-			var done bool
289
-			cursor, done = s.drainJournal(logWatcher, j, cursor, untilUnixMicro)
290
-
291
-			if status != 1 || done {
292
-				// We were notified to stop
293
-				break
268
+	for {
269
+		status := C.sd_journal_wait(j, waitTimeout)
270
+		if status < 0 {
271
+			logWatcher.Err <- errors.New("error waiting for journal: " + CErr(status))
272
+			goto cleanup
273
+		}
274
+		select {
275
+		case <-logWatcher.WatchConsumerGone():
276
+			goto cleanup // won't be able to write anything anymore
277
+		case <-logWatcher.WatchProducerGone():
278
+			// container is gone, drain journal
279
+		default:
280
+			// container is still alive
281
+			if status == C.SD_JOURNAL_NOP {
282
+				// no new data -- keep waiting
283
+				continue
294 284
 			}
295 285
 		}
296
-
297
-		// Clean up.
298
-		C.close(pfd[0])
299
-		s.mu.Lock()
300
-		delete(s.readers, logWatcher)
301
-		s.mu.Unlock()
302
-		close(logWatcher.Msg)
303
-		newCursor <- cursor
304
-	}()
305
-
306
-	// Wait until we're told to stop.
307
-	select {
308
-	case cursor = <-newCursor:
309
-	case <-logWatcher.WatchConsumerGone():
310
-		// Notify the other goroutine that its work is done.
311
-		C.close(pfd[1])
312
-		cursor = <-newCursor
286
+		newCursor, done, recv := s.drainJournal(logWatcher, j, cursor, untilUnixMicro)
287
+		cursor = newCursor
288
+		if done || (status == C.SD_JOURNAL_NOP && recv == 0) {
289
+			break
290
+		}
313 291
 	}
314 292
 
293
+cleanup:
294
+	s.mu.Lock()
295
+	delete(s.readers, logWatcher)
296
+	s.mu.Unlock()
297
+	close(logWatcher.Msg)
315 298
 	return cursor
316 299
 }
317 300
 
318 301
 func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
319
-	var j *C.sd_journal
320
-	var cmatch, cursor *C.char
321
-	var stamp C.uint64_t
322
-	var sinceUnixMicro uint64
323
-	var untilUnixMicro uint64
324
-	var pipes [2]C.int
302
+	var (
303
+		j              *C.sd_journal
304
+		cmatch, cursor *C.char
305
+		stamp          C.uint64_t
306
+		sinceUnixMicro uint64
307
+		untilUnixMicro uint64
308
+	)
325 309
 
326 310
 	// Get a handle to the journal.
327 311
 	rc := C.sd_journal_open(&j, C.int(0))
... ...
@@ -409,19 +356,12 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon
409 409
 			return
410 410
 		}
411 411
 	}
412
-	cursor, _ = s.drainJournal(logWatcher, j, nil, untilUnixMicro)
412
+	cursor, _, _ = s.drainJournal(logWatcher, j, nil, untilUnixMicro)
413 413
 	if config.Follow {
414
-		// Create a pipe that we can poll at the same time as
415
-		// the journald descriptor.
416
-		ret := C.pipe(&pipes[0])
417
-		if ret < 0 {
418
-			logWatcher.Err <- fmt.Errorf("error creating journald notification pipe")
419
-		} else {
420
-			cursor = s.followJournal(logWatcher, j, pipes, cursor, untilUnixMicro)
421
-			// Let followJournal handle freeing the journal context
422
-			// object and closing the channel.
423
-			following = true
424
-		}
414
+		cursor = s.followJournal(logWatcher, j, cursor, untilUnixMicro)
415
+		// Let followJournal handle freeing the journal context
416
+		// object and closing the channel.
417
+		following = true
425 418
 	}
426 419
 
427 420
 	C.free(unsafe.Pointer(cursor))