Browse code

Make sure log pipes are closed

Pipes are still not closed (and goroutines leaked) if neither pipe is
used.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>

Brian Goff authored on 2015/05/07 10:09:27
Showing 3 changed files
... ...
@@ -5,9 +5,10 @@ import (
5 5
 	"encoding/json"
6 6
 	"fmt"
7 7
 	"io"
8
+	"net"
8 9
 	"os"
9 10
 	"strconv"
10
-	"sync"
11
+	"syscall"
11 12
 	"time"
12 13
 
13 14
 	"github.com/Sirupsen/logrus"
... ...
@@ -132,9 +133,10 @@ func (daemon *Daemon) ContainerLogs(name string, config *ContainerLogsConfig) er
132 132
 			}
133 133
 		}
134 134
 	}
135
+
135 136
 	if config.Follow && container.IsRunning() {
136
-		errors := make(chan error, 2)
137
-		wg := sync.WaitGroup{}
137
+		chErr := make(chan error)
138
+		var stdoutPipe, stderrPipe io.ReadCloser
138 139
 
139 140
 		// write an empty chunk of data (this is to ensure that the
140 141
 		// HTTP Response is sent immediatly, even if the container has
... ...
@@ -142,33 +144,36 @@ func (daemon *Daemon) ContainerLogs(name string, config *ContainerLogsConfig) er
142 142
 		outStream.Write(nil)
143 143
 
144 144
 		if config.UseStdout {
145
-			wg.Add(1)
146
-			stdoutPipe := container.StdoutLogPipe()
147
-			defer stdoutPipe.Close()
145
+			stdoutPipe = container.StdoutLogPipe()
148 146
 			go func() {
149
-				errors <- jsonlog.WriteLog(stdoutPipe, outStream, format, config.Since)
150
-				wg.Done()
147
+				logrus.Debug("logs: stdout stream begin")
148
+				chErr <- jsonlog.WriteLog(stdoutPipe, outStream, format, config.Since)
149
+				logrus.Debug("logs: stdout stream end")
151 150
 			}()
152 151
 		}
153 152
 		if config.UseStderr {
154
-			wg.Add(1)
155
-			stderrPipe := container.StderrLogPipe()
156
-			defer stderrPipe.Close()
153
+			stderrPipe = container.StderrLogPipe()
157 154
 			go func() {
158
-				errors <- jsonlog.WriteLog(stderrPipe, errStream, format, config.Since)
159
-				wg.Done()
155
+				logrus.Debug("logs: stderr stream begin")
156
+				chErr <- jsonlog.WriteLog(stderrPipe, errStream, format, config.Since)
157
+				logrus.Debug("logs: stderr stream end")
160 158
 			}()
161 159
 		}
162 160
 
163
-		wg.Wait()
164
-		close(errors)
161
+		err = <-chErr
162
+		if stdoutPipe != nil {
163
+			stdoutPipe.Close()
164
+		}
165
+		if stderrPipe != nil {
166
+			stderrPipe.Close()
167
+		}
168
+		<-chErr // wait for 2nd goroutine to exit, otherwise bad things will happen
165 169
 
166
-		for err := range errors {
167
-			if err != nil {
168
-				logrus.Errorf("%s", err)
170
+		if err != nil && err != io.EOF && err != io.ErrClosedPipe {
171
+			if e, ok := err.(*net.OpError); ok && e.Err != syscall.EPIPE {
172
+				logrus.Errorf("error streaming logs: %v", err)
169 173
 			}
170 174
 		}
171
-
172 175
 	}
173 176
 	return nil
174 177
 }
... ...
@@ -1,7 +1,9 @@
1 1
 package main
2 2
 
3 3
 import (
4
+	"encoding/json"
4 5
 	"fmt"
6
+	"io"
5 7
 	"os/exec"
6 8
 	"regexp"
7 9
 	"strconv"
... ...
@@ -393,3 +395,52 @@ func (s *DockerSuite) TestLogsFollowSlowStdoutConsumer(c *check.C) {
393 393
 	}
394 394
 
395 395
 }
396
+
397
+func (s *DockerSuite) TestLogsFollowGoroutinesWithStdout(c *check.C) {
398
+	out, _ := dockerCmd(c, "run", "-d", "busybox", "/bin/sh", "-c", "while true; do echo hello; sleep 2; done")
399
+	id := strings.TrimSpace(out)
400
+	c.Assert(waitRun(id), check.IsNil)
401
+
402
+	type info struct {
403
+		NGoroutines int
404
+	}
405
+	getNGoroutines := func() int {
406
+		var i info
407
+		status, b, err := sockRequest("GET", "/info", nil)
408
+		c.Assert(err, check.IsNil)
409
+		c.Assert(status, check.Equals, 200)
410
+		c.Assert(json.Unmarshal(b, &i), check.IsNil)
411
+		return i.NGoroutines
412
+	}
413
+
414
+	nroutines := getNGoroutines()
415
+
416
+	cmd := exec.Command(dockerBinary, "logs", "-f", id)
417
+	r, w := io.Pipe()
418
+	cmd.Stdout = w
419
+	c.Assert(cmd.Start(), check.IsNil)
420
+
421
+	// Make sure pipe is written to
422
+	chErr := make(chan error)
423
+	go func() {
424
+		b := make([]byte, 1)
425
+		_, err := r.Read(b)
426
+		chErr <- err
427
+	}()
428
+	c.Assert(<-chErr, check.IsNil)
429
+	c.Assert(cmd.Process.Kill(), check.IsNil)
430
+
431
+	// NGoroutines is not updated right away, so we need to wait before failing
432
+	t := time.After(5 * time.Second)
433
+	for {
434
+		select {
435
+		case <-t:
436
+			c.Assert(nroutines, check.Equals, getNGoroutines())
437
+		default:
438
+			if nroutines == getNGoroutines() {
439
+				return
440
+			}
441
+			time.Sleep(100 * time.Millisecond)
442
+		}
443
+	}
444
+}
... ...
@@ -5,8 +5,6 @@ import (
5 5
 	"fmt"
6 6
 	"io"
7 7
 	"time"
8
-
9
-	"github.com/Sirupsen/logrus"
10 8
 )
11 9
 
12 10
 type JSONLog struct {
... ...
@@ -37,15 +35,16 @@ func WriteLog(src io.Reader, dst io.Writer, format string, since time.Time) erro
37 37
 	l := &JSONLog{}
38 38
 	for {
39 39
 		l.Reset()
40
-		if err := dec.Decode(l); err == io.EOF {
41
-			return nil
42
-		} else if err != nil {
43
-			logrus.Printf("Error streaming logs: %s", err)
40
+		if err := dec.Decode(l); err != nil {
41
+			if err == io.EOF {
42
+				return nil
43
+			}
44 44
 			return err
45 45
 		}
46 46
 		if !since.IsZero() && l.Created.Before(since) {
47 47
 			continue
48 48
 		}
49
+
49 50
 		line, err := l.Format(format)
50 51
 		if err != nil {
51 52
 			return err