Browse code

Merge pull request #319 from kolyshkin/19.03-journald

[19.03] backport journald reading fixes (ENGCORE-941)

Andrew Hsu authored on 2019/08/14 03:48:05
Showing 2 changed files
... ...
@@ -21,7 +21,6 @@ type journald struct {
21 21
 	mu      sync.Mutex
22 22
 	vars    map[string]string // additional variables and values to send to the journal along with the log message
23 23
 	readers map[*logger.LogWatcher]struct{}
24
-	closed  bool
25 24
 }
26 25
 
27 26
 func init() {
... ...
@@ -101,56 +101,10 @@ package journald // import "github.com/docker/docker/daemon/logger/journald"
101 101
 //	}
102 102
 //	return rc;
103 103
 //}
104
-//static int wait_for_data_cancelable(sd_journal *j, int pipefd)
105
-//{
106
-//	struct pollfd fds[2];
107
-//	uint64_t when = 0;
108
-//	int timeout, jevents, i;
109
-//	struct timespec ts;
110
-//	uint64_t now;
111
-//
112
-//	memset(&fds, 0, sizeof(fds));
113
-//	fds[0].fd = pipefd;
114
-//	fds[0].events = POLLHUP;
115
-//	fds[1].fd = sd_journal_get_fd(j);
116
-//	if (fds[1].fd < 0) {
117
-//		return fds[1].fd;
118
-//	}
119
-//
120
-//	do {
121
-//		jevents = sd_journal_get_events(j);
122
-//		if (jevents < 0) {
123
-//			return jevents;
124
-//		}
125
-//		fds[1].events = jevents;
126
-//		sd_journal_get_timeout(j, &when);
127
-//		if (when == -1) {
128
-//			timeout = -1;
129
-//		} else {
130
-//			clock_gettime(CLOCK_MONOTONIC, &ts);
131
-//			now = (uint64_t) ts.tv_sec * 1000000 + ts.tv_nsec / 1000;
132
-//			timeout = when > now ? (int) ((when - now + 999) / 1000) : 0;
133
-//		}
134
-//		i = poll(fds, 2, timeout);
135
-//		if ((i == -1) && (errno != EINTR)) {
136
-//			/* An unexpected error. */
137
-//			return (errno != 0) ? -errno : -EINTR;
138
-//		}
139
-//		if (fds[0].revents & POLLHUP) {
140
-//			/* The close notification pipe was closed. */
141
-//			return 0;
142
-//		}
143
-//		if (sd_journal_process(j) == SD_JOURNAL_APPEND) {
144
-//			/* Data, which we might care about, was appended. */
145
-//			return 1;
146
-//		}
147
-//	} while ((fds[0].revents & POLLHUP) == 0);
148
-//	return 0;
149
-//}
150 104
 import "C"
151 105
 
152 106
 import (
153
-	"fmt"
107
+	"errors"
154 108
 	"strings"
155 109
 	"time"
156 110
 	"unsafe"
... ...
@@ -163,22 +117,29 @@ import (
163 163
 
164 164
 func (s *journald) Close() error {
165 165
 	s.mu.Lock()
166
-	s.closed = true
167 166
 	for r := range s.readers {
168 167
 		r.ProducerGone()
169 168
 		delete(s.readers, r)
170
-
171 169
 	}
172 170
 	s.mu.Unlock()
173 171
 	return nil
174 172
 }
175 173
 
176
-func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, oldCursor *C.char, untilUnixMicro uint64) (*C.char, bool) {
177
-	var msg, data, cursor *C.char
178
-	var length C.size_t
179
-	var stamp C.uint64_t
180
-	var priority, partial C.int
181
-	var done bool
174
+// convert error code returned from a sd_journal_* function
175
+// (which returns -errno) to a string
176
+func CErr(ret C.int) string {
177
+	return C.GoString(C.strerror(C.int(-ret)))
178
+}
179
+
180
+func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, oldCursor *C.char, untilUnixMicro uint64) (*C.char, bool, int) {
181
+	var (
182
+		msg, data, cursor *C.char
183
+		length            C.size_t
184
+		stamp             C.uint64_t
185
+		priority, partial C.int
186
+		done              bool
187
+		shown             int
188
+	)
182 189
 
183 190
 	// Walk the journal from here forward until we run out of new entries
184 191
 	// or we reach the until value (if provided).
... ...
@@ -216,12 +177,12 @@ drain:
216 216
 			// the stream that we would have
217 217
 			// assigned that value.
218 218
 			source := ""
219
-			if C.get_priority(j, &priority) != 0 {
220
-				source = ""
221
-			} else if priority == C.int(journal.PriErr) {
222
-				source = "stderr"
223
-			} else if priority == C.int(journal.PriInfo) {
224
-				source = "stdout"
219
+			if C.get_priority(j, &priority) == 0 {
220
+				if priority == C.int(journal.PriErr) {
221
+					source = "stderr"
222
+				} else if priority == C.int(journal.PriInfo) {
223
+					source = "stdout"
224
+				}
225 225
 			}
226 226
 			// Retrieve the values of any variables we're adding to the journal.
227 227
 			var attrs []backend.LogAttr
... ...
@@ -230,12 +191,29 @@ drain:
230 230
 				kv := strings.SplitN(C.GoStringN(data, C.int(length)), "=", 2)
231 231
 				attrs = append(attrs, backend.LogAttr{Key: kv[0], Value: kv[1]})
232 232
 			}
233
-			// Send the log message.
234
-			logWatcher.Msg <- &logger.Message{
233
+
234
+			// Send the log message, unless the consumer is gone
235
+			select {
236
+			case <-logWatcher.WatchConsumerGone():
237
+				done = true // we won't be able to write anything anymore
238
+				break drain
239
+			case logWatcher.Msg <- &logger.Message{
235 240
 				Line:      line,
236 241
 				Source:    source,
237 242
 				Timestamp: timestamp.In(time.UTC),
238 243
 				Attrs:     attrs,
244
+			}:
245
+				shown++
246
+			}
247
+			// Call sd_journal_process() periodically during the processing loop
248
+			// to close any opened file descriptors for rotated (deleted) journal files.
249
+			if shown%1024 == 0 {
250
+				if ret := C.sd_journal_process(j); ret < 0 {
251
+					// log a warning but ignore it for now
252
+					logrus.WithField("container", s.vars["CONTAINER_ID_FULL"]).
253
+						WithField("error", CErr(ret)).
254
+						Warn("journald: error processing journal")
255
+				}
239 256
 			}
240 257
 		}
241 258
 		// If we're at the end of the journal, we're done (for now).
... ...
@@ -250,104 +228,93 @@ drain:
250 250
 		// ensure that we won't be freeing an address that's invalid
251 251
 		cursor = nil
252 252
 	}
253
-	return cursor, done
253
+	return cursor, done, shown
254 254
 }
255 255
 
256
-func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, pfd [2]C.int, cursor *C.char, untilUnixMicro uint64) *C.char {
256
+func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, cursor *C.char, untilUnixMicro uint64) *C.char {
257 257
 	s.mu.Lock()
258 258
 	s.readers[logWatcher] = struct{}{}
259
-	if s.closed {
260
-		// the journald Logger is closed, presumably because the container has been
261
-		// reset.  So we shouldn't follow, because we'll never be woken up.  But we
262
-		// should make one more drainJournal call to be sure we've got all the logs.
263
-		// Close pfd[1] so that one drainJournal happens, then cleanup, then return.
264
-		C.close(pfd[1])
265
-	}
266 259
 	s.mu.Unlock()
267 260
 
268
-	newCursor := make(chan *C.char)
269
-
270
-	go func() {
271
-		for {
272
-			// Keep copying journal data out until we're notified to stop
273
-			// or we hit an error.
274
-			status := C.wait_for_data_cancelable(j, pfd[0])
275
-			if status < 0 {
276
-				cerrstr := C.strerror(C.int(-status))
277
-				errstr := C.GoString(cerrstr)
278
-				fmtstr := "error %q while attempting to follow journal for container %q"
279
-				logrus.Errorf(fmtstr, errstr, s.vars["CONTAINER_ID_FULL"])
280
-				break
281
-			}
261
+	waitTimeout := C.uint64_t(250000) // 0.25s
282 262
 
283
-			var done bool
284
-			cursor, done = s.drainJournal(logWatcher, j, cursor, untilUnixMicro)
285
-
286
-			if status != 1 || done {
287
-				// We were notified to stop
288
-				break
263
+	for {
264
+		status := C.sd_journal_wait(j, waitTimeout)
265
+		if status < 0 {
266
+			logWatcher.Err <- errors.New("error waiting for journal: " + CErr(status))
267
+			goto cleanup
268
+		}
269
+		select {
270
+		case <-logWatcher.WatchConsumerGone():
271
+			goto cleanup // won't be able to write anything anymore
272
+		case <-logWatcher.WatchProducerGone():
273
+			// container is gone, drain journal
274
+		default:
275
+			// container is still alive
276
+			if status == C.SD_JOURNAL_NOP {
277
+				// no new data -- keep waiting
278
+				continue
289 279
 			}
290 280
 		}
291
-
292
-		// Clean up.
293
-		C.close(pfd[0])
294
-		s.mu.Lock()
295
-		delete(s.readers, logWatcher)
296
-		s.mu.Unlock()
297
-		close(logWatcher.Msg)
298
-		newCursor <- cursor
299
-	}()
300
-
301
-	// Wait until we're told to stop.
302
-	select {
303
-	case cursor = <-newCursor:
304
-	case <-logWatcher.WatchConsumerGone():
305
-		// Notify the other goroutine that its work is done.
306
-		C.close(pfd[1])
307
-		cursor = <-newCursor
281
+		newCursor, done, recv := s.drainJournal(logWatcher, j, cursor, untilUnixMicro)
282
+		cursor = newCursor
283
+		if done || (status == C.SD_JOURNAL_NOP && recv == 0) {
284
+			break
285
+		}
308 286
 	}
309 287
 
288
+cleanup:
289
+	s.mu.Lock()
290
+	delete(s.readers, logWatcher)
291
+	s.mu.Unlock()
292
+	close(logWatcher.Msg)
310 293
 	return cursor
311 294
 }
312 295
 
313 296
 func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
314
-	var j *C.sd_journal
315
-	var cmatch, cursor *C.char
316
-	var stamp C.uint64_t
317
-	var sinceUnixMicro uint64
318
-	var untilUnixMicro uint64
319
-	var pipes [2]C.int
297
+	var (
298
+		j              *C.sd_journal
299
+		cmatch, cursor *C.char
300
+		stamp          C.uint64_t
301
+		sinceUnixMicro uint64
302
+		untilUnixMicro uint64
303
+	)
320 304
 
321 305
 	// Get a handle to the journal.
322
-	rc := C.sd_journal_open(&j, C.int(0))
323
-	if rc != 0 {
324
-		logWatcher.Err <- fmt.Errorf("error opening journal")
306
+	if rc := C.sd_journal_open(&j, C.int(0)); rc != 0 {
307
+		logWatcher.Err <- errors.New("error opening journal: " + CErr(rc))
325 308
 		close(logWatcher.Msg)
326 309
 		return
327 310
 	}
311
+	if config.Follow {
312
+		// Initialize library inotify watches early
313
+		if rc := C.sd_journal_get_fd(j); rc < 0 {
314
+			logWatcher.Err <- errors.New("error getting journald fd: " + CErr(rc))
315
+			close(logWatcher.Msg)
316
+			return
317
+		}
318
+	}
328 319
 	// If we end up following the log, we can set the journal context
329 320
 	// pointer and the channel pointer to nil so that we won't close them
330 321
 	// here, potentially while the goroutine that uses them is still
331 322
 	// running.  Otherwise, close them when we return from this function.
332 323
 	following := false
333
-	defer func(pfollowing *bool) {
334
-		if !*pfollowing {
324
+	defer func() {
325
+		if !following {
335 326
 			close(logWatcher.Msg)
336 327
 		}
337 328
 		C.sd_journal_close(j)
338
-	}(&following)
329
+	}()
339 330
 	// Remove limits on the size of data items that we'll retrieve.
340
-	rc = C.sd_journal_set_data_threshold(j, C.size_t(0))
341
-	if rc != 0 {
342
-		logWatcher.Err <- fmt.Errorf("error setting journal data threshold")
331
+	if rc := C.sd_journal_set_data_threshold(j, C.size_t(0)); rc != 0 {
332
+		logWatcher.Err <- errors.New("error setting journal data threshold: " + CErr(rc))
343 333
 		return
344 334
 	}
345 335
 	// Add a match to have the library do the searching for us.
346 336
 	cmatch = C.CString("CONTAINER_ID_FULL=" + s.vars["CONTAINER_ID_FULL"])
347 337
 	defer C.free(unsafe.Pointer(cmatch))
348
-	rc = C.sd_journal_add_match(j, unsafe.Pointer(cmatch), C.strlen(cmatch))
349
-	if rc != 0 {
350
-		logWatcher.Err <- fmt.Errorf("error setting journal match")
338
+	if rc := C.sd_journal_add_match(j, unsafe.Pointer(cmatch), C.strlen(cmatch)); rc != 0 {
339
+		logWatcher.Err <- errors.New("error setting journal match: " + CErr(rc))
351 340
 		return
352 341
 	}
353 342
 	// If we have a cutoff time, convert it to Unix time once.
... ...
@@ -360,76 +327,53 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon
360 360
 		nano := config.Until.UnixNano()
361 361
 		untilUnixMicro = uint64(nano / 1000)
362 362
 	}
363
-	if config.Tail > 0 {
364
-		lines := config.Tail
363
+	if config.Tail >= 0 {
365 364
 		// If until time provided, start from there.
366 365
 		// Otherwise start at the end of the journal.
367
-		if untilUnixMicro != 0 && C.sd_journal_seek_realtime_usec(j, C.uint64_t(untilUnixMicro)) < 0 {
368
-			logWatcher.Err <- fmt.Errorf("error seeking provided until value")
369
-			return
370
-		} else if C.sd_journal_seek_tail(j) < 0 {
371
-			logWatcher.Err <- fmt.Errorf("error seeking to end of journal")
372
-			return
373
-		}
374
-		if C.sd_journal_previous(j) < 0 {
375
-			logWatcher.Err <- fmt.Errorf("error backtracking to previous journal entry")
366
+		if untilUnixMicro != 0 {
367
+			if rc := C.sd_journal_seek_realtime_usec(j, C.uint64_t(untilUnixMicro)); rc != 0 {
368
+				logWatcher.Err <- errors.New("error seeking provided until value: " + CErr(rc))
369
+				return
370
+			}
371
+		} else if rc := C.sd_journal_seek_tail(j); rc != 0 {
372
+			logWatcher.Err <- errors.New("error seeking to end of journal: " + CErr(rc))
376 373
 			return
377 374
 		}
378
-		// Walk backward.
379
-		for lines > 0 {
380
-			// Stop if the entry time is before our cutoff.
381
-			// We'll need the entry time if it isn't, so go
382
-			// ahead and parse it now.
383
-			if C.sd_journal_get_realtime_usec(j, &stamp) != 0 {
384
-				break
385
-			} else {
386
-				// Compare the timestamp on the entry to our threshold value.
387
-				if sinceUnixMicro != 0 && sinceUnixMicro > uint64(stamp) {
388
-					break
389
-				}
390
-			}
391
-			lines--
392
-			// If we're at the start of the journal, or
393
-			// don't need to back up past any more entries,
394
-			// stop.
395
-			if lines == 0 || C.sd_journal_previous(j) <= 0 {
396
-				break
375
+		// (Try to) skip backwards by the requested number of lines...
376
+		if C.sd_journal_previous_skip(j, C.uint64_t(config.Tail)) >= 0 {
377
+			// ...but not before "since"
378
+			if sinceUnixMicro != 0 &&
379
+				C.sd_journal_get_realtime_usec(j, &stamp) == 0 &&
380
+				uint64(stamp) < sinceUnixMicro {
381
+				C.sd_journal_seek_realtime_usec(j, C.uint64_t(sinceUnixMicro))
397 382
 			}
398 383
 		}
399 384
 	} else {
400 385
 		// Start at the beginning of the journal.
401
-		if C.sd_journal_seek_head(j) < 0 {
402
-			logWatcher.Err <- fmt.Errorf("error seeking to start of journal")
386
+		if rc := C.sd_journal_seek_head(j); rc != 0 {
387
+			logWatcher.Err <- errors.New("error seeking to start of journal: " + CErr(rc))
403 388
 			return
404 389
 		}
405 390
 		// If we have a cutoff date, fast-forward to it.
406
-		if sinceUnixMicro != 0 && C.sd_journal_seek_realtime_usec(j, C.uint64_t(sinceUnixMicro)) != 0 {
407
-			logWatcher.Err <- fmt.Errorf("error seeking to start time in journal")
408
-			return
391
+		if sinceUnixMicro != 0 {
392
+			if rc := C.sd_journal_seek_realtime_usec(j, C.uint64_t(sinceUnixMicro)); rc != 0 {
393
+				logWatcher.Err <- errors.New("error seeking to start time in journal: " + CErr(rc))
394
+				return
395
+			}
409 396
 		}
410
-		if C.sd_journal_next(j) < 0 {
411
-			logWatcher.Err <- fmt.Errorf("error skipping to next journal entry")
397
+		if rc := C.sd_journal_next(j); rc < 0 {
398
+			logWatcher.Err <- errors.New("error skipping to next journal entry: " + CErr(rc))
412 399
 			return
413 400
 		}
414 401
 	}
415
-	cursor, _ = s.drainJournal(logWatcher, j, nil, untilUnixMicro)
402
+	if config.Tail != 0 { // special case for --tail 0
403
+		cursor, _, _ = s.drainJournal(logWatcher, j, nil, untilUnixMicro)
404
+	}
416 405
 	if config.Follow {
417
-		// Allocate a descriptor for following the journal, if we'll
418
-		// need one.  Do it here so that we can report if it fails.
419
-		if fd := C.sd_journal_get_fd(j); fd < C.int(0) {
420
-			logWatcher.Err <- fmt.Errorf("error opening journald follow descriptor: %q", C.GoString(C.strerror(-fd)))
421
-		} else {
422
-			// Create a pipe that we can poll at the same time as
423
-			// the journald descriptor.
424
-			if C.pipe(&pipes[0]) == C.int(-1) {
425
-				logWatcher.Err <- fmt.Errorf("error opening journald close notification pipe")
426
-			} else {
427
-				cursor = s.followJournal(logWatcher, j, pipes, cursor, untilUnixMicro)
428
-				// Let followJournal handle freeing the journal context
429
-				// object and closing the channel.
430
-				following = true
431
-			}
432
-		}
406
+		cursor = s.followJournal(logWatcher, j, cursor, untilUnixMicro)
407
+		// Let followJournal handle freeing the journal context
408
+		// object and closing the channel.
409
+		following = true
433 410
 	}
434 411
 
435 412
 	C.free(unsafe.Pointer(cursor))