Browse code

daemon.ContainerLogs(): fix resource leak on follow

When daemon.ContainerLogs() is called with options.follow=true
(as in "docker logs --follow"), the "loggerutils.followLogs()"
function never returns (even then the logs consumer is gone).
As a result, all the resources associated with it (including
an opened file descriptor for the log file being read, two FDs
for a pipe, and two FDs for inotify watch) are never released.

If this is repeated (such as by running "docker logs --follow"
and pressing Ctrl-C a few times), this results in DoS caused by
either hitting the limit of inotify watches, or the limit of
opened files. The only cure is daemon restart.

Apparently, what happens is:

1. logs producer (a container) is gone, calling (*LogWatcher).Close()
for all its readers (daemon/logger/jsonfilelog/jsonfilelog.go:175).

2. WatchClose() is properly handled by a dedicated goroutine in
followLogs(), cancelling the context.

3. Upon receiving the ctx.Done(), the code in followLogs()
(daemon/logger/loggerutils/logfile.go#L626-L638) keeps to
send messages _synchronously_ (which is OK for now).

4. Logs consumer is gone (Ctrl-C is pressed on a terminal running
"docker logs --follow"). Method (*LogWatcher).Close() is properly
called (see daemon/logs.go:114). Since it was called before and
due to to once.Do(), nothing happens (which is kinda good, as
otherwise it will panic on closing a closed channel).

5. A goroutine (see item 3 above) keeps sending log messages
synchronously to the logWatcher.Msg channel. Since the
channel reader is gone, the channel send operation blocks forever,
and resource cleanup set up in defer statements at the beginning
of followLogs() never happens.

Alas, the fix is somewhat complicated:

1. Distinguish between close from logs producer and logs consumer.
To that effect,
- yet another channel is added to LogWatcher();
- {Watch,}Close() are renamed to {Watch,}ProducerGone();
- {Watch,}ConsumerGone() are added;

*NOTE* that ProducerGone()/WatchProducerGone() pair is ONLY needed
in order to stop ConsumerLogs(follow=true) when a container is stopped;
otherwise we're not interested in it. In other words, we're only
using it in followLogs().

2. Code that was doing (logWatcher*).Close() is modified to either call
ProducerGone() or ConsumerGone(), depending on the context.

3. Code that was waiting for WatchClose() is modified to wait for
either ConsumerGone() or ProducerGone(), or both, depending on the
context.

4. followLogs() are modified accordingly:
- context cancellation is happening on WatchProducerGone(),
and once it's received the FileWatcher is closed and waitRead()
returns errDone on EOF (i.e. log rotation handling logic is disabled);
- due to this, code that was writing synchronously to logWatcher.Msg
can be and is removed as the code above it handles this case;
- function returns once ConsumerGone is received, freeing all the
resources -- this is the bugfix itself.

While at it,

1. Let's also remove the ctx usage to simplify the code a bit.
It was introduced by commit a69a59ffc7e3d ("Decouple removing the
fileWatcher from reading") in order to fix a bug. The bug was actually
a deadlock in fsnotify, and the fix was just a workaround. Since then
the fsnofify bug has been fixed, and a new fsnotify was vendored in.
For more details, please see
https://github.com/moby/moby/pull/27782#issuecomment-416794490

2. Since `(*filePoller).Close()` is fixed to remove all the files
being watched, there is no need to explicitly call
fileWatcher.Remove(name) anymore, so get rid of the extra code.

Should fix https://github.com/moby/moby/issues/37391

Signed-off-by: Kir Kolyshkin <kolyshkin@gmail.com>

Kir Kolyshkin authored on 2018/08/01 13:03:55
Showing 11 changed files
... ...
@@ -123,7 +123,7 @@ func (daemon *Daemon) containerAttach(c *container.Container, cfg *stream.Attach
123 123
 			return logger.ErrReadLogsNotSupported{}
124 124
 		}
125 125
 		logs := cLog.ReadLogs(logger.ReadConfig{Tail: -1})
126
-		defer logs.Close()
126
+		defer logs.ConsumerGone()
127 127
 
128 128
 	LogLoop:
129 129
 		for {
... ...
@@ -93,21 +93,12 @@ func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher {
93 93
 
94 94
 		dec := logdriver.NewLogEntryDecoder(stream)
95 95
 		for {
96
-			select {
97
-			case <-watcher.WatchClose():
98
-				return
99
-			default:
100
-			}
101
-
102 96
 			var buf logdriver.LogEntry
103 97
 			if err := dec.Decode(&buf); err != nil {
104 98
 				if err == io.EOF {
105 99
 					return
106 100
 				}
107
-				select {
108
-				case watcher.Err <- errors.Wrap(err, "error decoding log message"):
109
-				case <-watcher.WatchClose():
110
-				}
101
+				watcher.Err <- errors.Wrap(err, "error decoding log message")
111 102
 				return
112 103
 			}
113 104
 
... ...
@@ -125,11 +116,10 @@ func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher {
125 125
 				return
126 126
 			}
127 127
 
128
+			// send the message unless the consumer is gone
128 129
 			select {
129 130
 			case watcher.Msg <- msg:
130
-			case <-watcher.WatchClose():
131
-				// make sure the message we consumed is sent
132
-				watcher.Msg <- msg
131
+			case <-watcher.WatchConsumerGone():
133 132
 				return
134 133
 			}
135 134
 		}
... ...
@@ -174,7 +174,7 @@ func TestAdapterReadLogs(t *testing.T) {
174 174
 		t.Fatal("timeout waiting for message channel to close")
175 175
 
176 176
 	}
177
-	lw.Close()
177
+	lw.ProducerGone()
178 178
 
179 179
 	lw = lr.ReadLogs(ReadConfig{Follow: true})
180 180
 	for _, x := range testMsg {
... ...
@@ -165,7 +165,7 @@ func (s *journald) Close() error {
165 165
 	s.mu.Lock()
166 166
 	s.closed = true
167 167
 	for reader := range s.readers.readers {
168
-		reader.Close()
168
+		reader.ProducerGone()
169 169
 	}
170 170
 	s.mu.Unlock()
171 171
 	return nil
... ...
@@ -299,7 +299,7 @@ func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal,
299 299
 	// Wait until we're told to stop.
300 300
 	select {
301 301
 	case cursor = <-newCursor:
302
-	case <-logWatcher.WatchClose():
302
+	case <-logWatcher.WatchConsumerGone():
303 303
 		// Notify the other goroutine that its work is done.
304 304
 		C.close(pfd[1])
305 305
 		cursor = <-newCursor
... ...
@@ -166,13 +166,14 @@ func ValidateLogOpt(cfg map[string]string) error {
166 166
 	return nil
167 167
 }
168 168
 
169
-// Close closes underlying file and signals all readers to stop.
169
+// Close closes underlying file and signals all the readers
170
+// that the logs producer is gone.
170 171
 func (l *JSONFileLogger) Close() error {
171 172
 	l.mu.Lock()
172 173
 	l.closed = true
173 174
 	err := l.writer.Close()
174 175
 	for r := range l.readers {
175
-		r.Close()
176
+		r.ProducerGone()
176 177
 		delete(l.readers, r)
177 178
 	}
178 179
 	l.mu.Unlock()
... ...
@@ -50,11 +50,10 @@ func BenchmarkJSONFileLoggerReadLogs(b *testing.B) {
50 50
 	}()
51 51
 
52 52
 	lw := jsonlogger.(*JSONFileLogger).ReadLogs(logger.ReadConfig{Follow: true})
53
-	watchClose := lw.WatchClose()
54 53
 	for {
55 54
 		select {
56 55
 		case <-lw.Msg:
57
-		case <-watchClose:
56
+		case <-lw.WatchProducerGone():
58 57
 			return
59 58
 		case err := <-chError:
60 59
 			if err != nil {
... ...
@@ -166,7 +166,7 @@ func (d *driver) Close() error {
166 166
 	d.closed = true
167 167
 	err := d.logfile.Close()
168 168
 	for r := range d.readers {
169
-		r.Close()
169
+		r.ProducerGone()
170 170
 		delete(d.readers, r)
171 171
 	}
172 172
 	d.mu.Unlock()
... ...
@@ -104,33 +104,50 @@ type LogWatcher struct {
104 104
 	// For sending log messages to a reader.
105 105
 	Msg chan *Message
106 106
 	// For sending error messages that occur while while reading logs.
107
-	Err           chan error
108
-	closeOnce     sync.Once
109
-	closeNotifier chan struct{}
107
+	Err          chan error
108
+	producerOnce sync.Once
109
+	producerGone chan struct{}
110
+	consumerOnce sync.Once
111
+	consumerGone chan struct{}
110 112
 }
111 113
 
112 114
 // NewLogWatcher returns a new LogWatcher.
113 115
 func NewLogWatcher() *LogWatcher {
114 116
 	return &LogWatcher{
115
-		Msg:           make(chan *Message, logWatcherBufferSize),
116
-		Err:           make(chan error, 1),
117
-		closeNotifier: make(chan struct{}),
117
+		Msg:          make(chan *Message, logWatcherBufferSize),
118
+		Err:          make(chan error, 1),
119
+		producerGone: make(chan struct{}),
120
+		consumerGone: make(chan struct{}),
118 121
 	}
119 122
 }
120 123
 
121
-// Close notifies the underlying log reader to stop.
122
-func (w *LogWatcher) Close() {
124
+// ProducerGone notifies the underlying log reader that
125
+// the logs producer (a container) is gone.
126
+func (w *LogWatcher) ProducerGone() {
123 127
 	// only close if not already closed
124
-	w.closeOnce.Do(func() {
125
-		close(w.closeNotifier)
128
+	w.producerOnce.Do(func() {
129
+		close(w.producerGone)
126 130
 	})
127 131
 }
128 132
 
129
-// WatchClose returns a channel receiver that receives notification
130
-// when the watcher has been closed. This should only be called from
131
-// one goroutine.
132
-func (w *LogWatcher) WatchClose() <-chan struct{} {
133
-	return w.closeNotifier
133
+// WatchProducerGone returns a channel receiver that receives notification
134
+// once the logs producer (a container) is gone.
135
+func (w *LogWatcher) WatchProducerGone() <-chan struct{} {
136
+	return w.producerGone
137
+}
138
+
139
+// ConsumerGone notifies that the logs consumer is gone.
140
+func (w *LogWatcher) ConsumerGone() {
141
+	// only close if not already closed
142
+	w.consumerOnce.Do(func() {
143
+		close(w.consumerGone)
144
+	})
145
+}
146
+
147
+// WatchConsumerGone returns a channel receiver that receives notification
148
+// when the log watcher consumer is gone.
149
+func (w *LogWatcher) WatchConsumerGone() <-chan struct{} {
150
+	return w.consumerGone
134 151
 }
135 152
 
136 153
 // Capability defines the list of capabilities that a driver can implement
... ...
@@ -488,7 +488,7 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, createDecoder m
488 488
 	go func() {
489 489
 		select {
490 490
 		case <-ctx.Done():
491
-		case <-watcher.WatchClose():
491
+		case <-watcher.WatchConsumerGone():
492 492
 			cancel()
493 493
 		}
494 494
 	}()
... ...
@@ -546,22 +546,9 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
546 546
 	}
547 547
 	defer func() {
548 548
 		f.Close()
549
-		fileWatcher.Remove(name)
550 549
 		fileWatcher.Close()
551 550
 	}()
552 551
 
553
-	ctx, cancel := context.WithCancel(context.Background())
554
-	defer cancel()
555
-	go func() {
556
-		select {
557
-		case <-logWatcher.WatchClose():
558
-			fileWatcher.Remove(name)
559
-			cancel()
560
-		case <-ctx.Done():
561
-			return
562
-		}
563
-	}()
564
-
565 552
 	var retries int
566 553
 	handleRotate := func() error {
567 554
 		f.Close()
... ...
@@ -596,7 +583,9 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
596 596
 			case fsnotify.Rename, fsnotify.Remove:
597 597
 				select {
598 598
 				case <-notifyRotate:
599
-				case <-ctx.Done():
599
+				case <-logWatcher.WatchProducerGone():
600
+					return errDone
601
+				case <-logWatcher.WatchConsumerGone():
600 602
 					return errDone
601 603
 				}
602 604
 				if err := handleRotate(); err != nil {
... ...
@@ -618,7 +607,9 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
618 618
 				return errRetry
619 619
 			}
620 620
 			return err
621
-		case <-ctx.Done():
621
+		case <-logWatcher.WatchProducerGone():
622
+			return errDone
623
+		case <-logWatcher.WatchConsumerGone():
622 624
 			return errDone
623 625
 		}
624 626
 	}
... ...
@@ -664,23 +655,11 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
664 664
 		if !until.IsZero() && msg.Timestamp.After(until) {
665 665
 			return
666 666
 		}
667
+		// send the message, unless the consumer is gone
667 668
 		select {
668 669
 		case logWatcher.Msg <- msg:
669
-		case <-ctx.Done():
670
-			logWatcher.Msg <- msg
671
-			for {
672
-				msg, err := decodeLogLine()
673
-				if err != nil {
674
-					return
675
-				}
676
-				if !since.IsZero() && msg.Timestamp.Before(since) {
677
-					continue
678
-				}
679
-				if !until.IsZero() && msg.Timestamp.After(until) {
680
-					return
681
-				}
682
-				logWatcher.Msg <- msg
683
-			}
670
+		case <-logWatcher.WatchConsumerGone():
671
+			return
684 672
 		}
685 673
 	}
686 674
 }
... ...
@@ -77,7 +77,7 @@ func TestTailFiles(t *testing.T) {
77 77
 	}
78 78
 }
79 79
 
80
-func TestFollowLogsClose(t *testing.T) {
80
+func TestFollowLogsConsumerGone(t *testing.T) {
81 81
 	lw := logger.NewLogWatcher()
82 82
 
83 83
 	f, err := ioutil.TempFile("", t.Name())
... ...
@@ -110,7 +110,7 @@ func TestFollowLogsClose(t *testing.T) {
110 110
 		t.Fatal("timeout waiting for log message")
111 111
 	}
112 112
 
113
-	lw.Close()
113
+	lw.ConsumerGone()
114 114
 	select {
115 115
 	case <-followLogsDone:
116 116
 	case <-time.After(20 * time.Second):
... ...
@@ -110,8 +110,8 @@ func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, c
110 110
 				}
111 111
 			}()
112 112
 		}
113
-		// set up some defers
114
-		defer logs.Close()
113
+		// signal that the log reader is gone
114
+		defer logs.ConsumerGone()
115 115
 
116 116
 		// close the messages channel. closing is the only way to signal above
117 117
 		// that we're doing with logs (other than context cancel i guess).