Fixes #8832
All stdio streams need to finish writing before the
connection can be closed.
Signed-off-by: Tõnis Tiigi <tonistiigi@gmail.com> (github: tonistiigi)
| ... | ... |
@@ -7,6 +7,7 @@ import ( |
| 7 | 7 |
"io" |
| 8 | 8 |
"os" |
| 9 | 9 |
"strconv" |
| 10 |
+ "sync" |
|
| 10 | 11 |
|
| 11 | 12 |
log "github.com/Sirupsen/logrus" |
| 12 | 13 |
"github.com/docker/docker/engine" |
| ... | ... |
@@ -112,24 +113,36 @@ func (daemon *Daemon) ContainerLogs(job *engine.Job) engine.Status {
|
| 112 | 112 |
} |
| 113 | 113 |
if follow && container.IsRunning() {
|
| 114 | 114 |
errors := make(chan error, 2) |
| 115 |
+ wg := sync.WaitGroup{}
|
|
| 116 |
+ |
|
| 115 | 117 |
if stdout {
|
| 118 |
+ wg.Add(1) |
|
| 116 | 119 |
stdoutPipe := container.StdoutLogPipe() |
| 117 | 120 |
defer stdoutPipe.Close() |
| 118 | 121 |
go func() {
|
| 119 | 122 |
errors <- jsonlog.WriteLog(stdoutPipe, job.Stdout, format) |
| 123 |
+ wg.Done() |
|
| 120 | 124 |
}() |
| 121 | 125 |
} |
| 122 | 126 |
if stderr {
|
| 127 |
+ wg.Add(1) |
|
| 123 | 128 |
stderrPipe := container.StderrLogPipe() |
| 124 | 129 |
defer stderrPipe.Close() |
| 125 | 130 |
go func() {
|
| 126 | 131 |
errors <- jsonlog.WriteLog(stderrPipe, job.Stderr, format) |
| 132 |
+ wg.Done() |
|
| 127 | 133 |
}() |
| 128 | 134 |
} |
| 129 |
- err := <-errors |
|
| 130 |
- if err != nil {
|
|
| 131 |
- log.Errorf("%s", err)
|
|
| 135 |
+ |
|
| 136 |
+ wg.Wait() |
|
| 137 |
+ close(errors) |
|
| 138 |
+ |
|
| 139 |
+ for err := range errors {
|
|
| 140 |
+ if err != nil {
|
|
| 141 |
+ log.Errorf("%s", err)
|
|
| 142 |
+ } |
|
| 132 | 143 |
} |
| 144 |
+ |
|
| 133 | 145 |
} |
| 134 | 146 |
return engine.StatusOK |
| 135 | 147 |
} |
| ... | ... |
@@ -284,3 +284,54 @@ func TestLogsFollowStopped(t *testing.T) {
|
| 284 | 284 |
deleteContainer(cleanedContainerID) |
| 285 | 285 |
logDone("logs - logs follow stopped container")
|
| 286 | 286 |
} |
| 287 |
+ |
|
| 288 |
+// Regression test for #8832 |
|
| 289 |
+func TestLogsFollowSlowStdoutConsumer(t *testing.T) {
|
|
| 290 |
+ runCmd := exec.Command(dockerBinary, "run", "-d", "busybox", "/bin/sh", "-c", `usleep 200000;yes X | head -c 200000`) |
|
| 291 |
+ |
|
| 292 |
+ out, _, _, err := runCommandWithStdoutStderr(runCmd) |
|
| 293 |
+ if err != nil {
|
|
| 294 |
+ t.Fatalf("run failed with errors: %s, %v", out, err)
|
|
| 295 |
+ } |
|
| 296 |
+ |
|
| 297 |
+ cleanedContainerID := stripTrailingCharacters(out) |
|
| 298 |
+ defer deleteContainer(cleanedContainerID) |
|
| 299 |
+ |
|
| 300 |
+ stopSlowRead := make(chan bool) |
|
| 301 |
+ |
|
| 302 |
+ go func() {
|
|
| 303 |
+ exec.Command(dockerBinary, "wait", cleanedContainerID).Run() |
|
| 304 |
+ stopSlowRead <- true |
|
| 305 |
+ }() |
|
| 306 |
+ |
|
| 307 |
+ logCmd := exec.Command(dockerBinary, "logs", "-f", cleanedContainerID) |
|
| 308 |
+ |
|
| 309 |
+ stdout, err := logCmd.StdoutPipe() |
|
| 310 |
+ if err != nil {
|
|
| 311 |
+ t.Fatal(err) |
|
| 312 |
+ } |
|
| 313 |
+ |
|
| 314 |
+ if err := logCmd.Start(); err != nil {
|
|
| 315 |
+ t.Fatal(err) |
|
| 316 |
+ } |
|
| 317 |
+ |
|
| 318 |
+ // First read slowly |
|
| 319 |
+ bytes1, err := consumeWithSpeed(stdout, 10, 50*time.Millisecond, stopSlowRead) |
|
| 320 |
+ if err != nil {
|
|
| 321 |
+ t.Fatal(err) |
|
| 322 |
+ } |
|
| 323 |
+ |
|
| 324 |
+ // After the container has finished we can continue reading fast |
|
| 325 |
+ bytes2, err := consumeWithSpeed(stdout, 32*1024, 0, nil) |
|
| 326 |
+ if err != nil {
|
|
| 327 |
+ t.Fatal(err) |
|
| 328 |
+ } |
|
| 329 |
+ |
|
| 330 |
+ actual := bytes1 + bytes2 |
|
| 331 |
+ expected := 200000 |
|
| 332 |
+ if actual != expected {
|
|
| 333 |
+ t.Fatalf("Invalid bytes read: %d, expected %d", actual, expected)
|
|
| 334 |
+ } |
|
| 335 |
+ |
|
| 336 |
+ logDone("logs - follow slow consumer")
|
|
| 337 |
+} |