Browse code

Merge pull request #40595 from sfzhu93/GL_2_25

[integration] add buffer to prevent goroutine leak

Sebastiaan van Stijn authored on 2020/05/08 23:00:52
Showing 22 changed files
... ...
@@ -43,14 +43,14 @@ func (s *DockerSuite) TestGetContainersAttachWebsocket(c *testing.T) {
43 43
 	expected := []byte("hello")
44 44
 	actual := make([]byte, len(expected))
45 45
 
46
-	outChan := make(chan error)
46
+	outChan := make(chan error, 1)
47 47
 	go func() {
48 48
 		_, err := io.ReadFull(ws, actual)
49 49
 		outChan <- err
50 50
 		close(outChan)
51 51
 	}()
52 52
 
53
-	inChan := make(chan error)
53
+	inChan := make(chan error, 1)
54 54
 	go func() {
55 55
 		_, err := ws.Write(expected)
56 56
 		inChan <- err
... ...
@@ -279,7 +279,7 @@ func bodyIsWritable(r *http.Response) bool {
279 279
 
280 280
 // readTimeout read from io.Reader with timeout
281 281
 func readTimeout(r io.Reader, buf []byte, timeout time.Duration) (n int, err error) {
282
-	ch := make(chan bool)
282
+	ch := make(chan bool, 1)
283 283
 	go func() {
284 284
 		n, err = io.ReadFull(r, buf)
285 285
 		ch <- true
... ...
@@ -338,7 +338,7 @@ func (s *DockerSuite) TestGetStoppedContainerStats(c *testing.T) {
338 338
 	name := "statscontainer"
339 339
 	dockerCmd(c, "create", "--name", name, "busybox", "ps")
340 340
 
341
-	chResp := make(chan error)
341
+	chResp := make(chan error, 1)
342 342
 
343 343
 	// We expect an immediate response, but if it's not immediate, the test would hang, so put it in a goroutine
344 344
 	// below we'll check this on a timeout.
... ...
@@ -30,7 +30,7 @@ func (s *DockerSuite) TestLogsAPIWithStdout(c *testing.T) {
30 30
 		err error
31 31
 	}
32 32
 
33
-	chLog := make(chan logOut)
33
+	chLog := make(chan logOut, 1)
34 34
 	res, body, err := request.Get(fmt.Sprintf("/containers/%s/logs?follow=1&stdout=1&timestamps=1", id))
35 35
 	assert.NilError(c, err)
36 36
 	assert.Equal(c, res.StatusCode, http.StatusOK)
... ...
@@ -116,6 +116,8 @@ func (s *DockerSuite) TestLogsAPIUntilFutureFollow(c *testing.T) {
116 116
 	}
117 117
 
118 118
 	chLog := make(chan logOut)
119
+	stop := make(chan struct{})
120
+	defer close(stop)
119 121
 
120 122
 	go func() {
121 123
 		bufReader := bufio.NewReader(reader)
... ...
@@ -126,11 +128,20 @@ func (s *DockerSuite) TestLogsAPIUntilFutureFollow(c *testing.T) {
126 126
 				if err == io.EOF {
127 127
 					return
128 128
 				}
129
-				chLog <- logOut{"", err}
129
+				select {
130
+				case <-stop:
131
+					return
132
+				case chLog <- logOut{"", err}:
133
+				}
134
+
130 135
 				return
131 136
 			}
132 137
 
133
-			chLog <- logOut{strings.TrimSpace(string(out)), err}
138
+			select {
139
+			case <-stop:
140
+				return
141
+			case chLog <- logOut{strings.TrimSpace(string(out)), err}:
142
+			}
134 143
 		}
135 144
 	}()
136 145
 
... ...
@@ -102,7 +102,7 @@ func (s *DockerSuite) TestAttachTTYWithoutStdin(c *testing.T) {
102 102
 	id := strings.TrimSpace(out)
103 103
 	assert.NilError(c, waitRun(id))
104 104
 
105
-	done := make(chan error)
105
+	done := make(chan error, 1)
106 106
 	go func() {
107 107
 		defer close(done)
108 108
 
... ...
@@ -33,7 +33,7 @@ func (s *DockerSuite) TestAttachClosedOnContainerStop(c *testing.T) {
33 33
 	err = attachCmd.Start()
34 34
 	assert.NilError(c, err)
35 35
 
36
-	errChan := make(chan error)
36
+	errChan := make(chan error, 1)
37 37
 	go func() {
38 38
 		time.Sleep(300 * time.Millisecond)
39 39
 		defer close(errChan)
... ...
@@ -68,7 +68,7 @@ func (s *DockerSuite) TestAttachAfterDetach(c *testing.T) {
68 68
 	cmd.Stdout = tty
69 69
 	cmd.Stderr = tty
70 70
 
71
-	cmdExit := make(chan error)
71
+	cmdExit := make(chan error, 1)
72 72
 	go func() {
73 73
 		cmdExit <- cmd.Run()
74 74
 		close(cmdExit)
... ...
@@ -490,7 +490,7 @@ func (s *DockerSuite) TestBuildAddSingleFileToWorkdir(c *testing.T) {
490 490
 		}))
491 491
 	defer ctx.Close()
492 492
 
493
-	errChan := make(chan error)
493
+	errChan := make(chan error, 1)
494 494
 	go func() {
495 495
 		errChan <- buildImage(name, build.WithExternalBuildContext(ctx)).Error
496 496
 		close(errChan)
... ...
@@ -833,7 +833,7 @@ COPY test_file .`),
833 833
 		}))
834 834
 	defer ctx.Close()
835 835
 
836
-	errChan := make(chan error)
836
+	errChan := make(chan error, 1)
837 837
 	go func() {
838 838
 		errChan <- buildImage(name, build.WithExternalBuildContext(ctx)).Error
839 839
 		close(errChan)
... ...
@@ -1246,7 +1246,7 @@ func (s *DockerDaemonSuite) TestDaemonRestartKillWait(c *testing.T) {
1246 1246
 
1247 1247
 	s.d.Restart(c)
1248 1248
 
1249
-	errchan := make(chan error)
1249
+	errchan := make(chan error, 1)
1250 1250
 	go func() {
1251 1251
 		if out, err := s.d.Cmd("wait", containerID); err != nil {
1252 1252
 			errchan <- fmt.Errorf("%v:\n%s", err, out)
... ...
@@ -1599,15 +1599,17 @@ func (s *DockerDaemonSuite) TestDaemonRestartWithPausedContainer(c *testing.T) {
1599 1599
 	}
1600 1600
 	s.d.Restart(c)
1601 1601
 
1602
-	errchan := make(chan error)
1602
+	errchan := make(chan error, 1)
1603 1603
 	go func() {
1604 1604
 		out, err := s.d.Cmd("start", "test")
1605 1605
 		if err != nil {
1606 1606
 			errchan <- fmt.Errorf("%v:\n%s", err, out)
1607
+			return
1607 1608
 		}
1608 1609
 		name := strings.TrimSpace(out)
1609 1610
 		if name != "test" {
1610 1611
 			errchan <- fmt.Errorf("Paused container start error on docker daemon restart, expected 'test' but got '%s'", name)
1612
+			return
1611 1613
 		}
1612 1614
 		close(errchan)
1613 1615
 	}()
... ...
@@ -51,7 +51,7 @@ func (s *DockerSuite) TestEventsRedirectStdout(c *testing.T) {
51 51
 func (s *DockerSuite) TestEventsOOMDisableFalse(c *testing.T) {
52 52
 	testRequires(c, DaemonIsLinux, oomControl, memoryLimitSupport, swapMemorySupport, NotPpc64le)
53 53
 
54
-	errChan := make(chan error)
54
+	errChan := make(chan error, 1)
55 55
 	go func() {
56 56
 		defer close(errChan)
57 57
 		out, exitCode, _ := dockerCmdWithError("run", "--name", "oomFalse", "-m", "10MB", "busybox", "sh", "-c", "x=a; while true; do x=$x$x$x$x; done")
... ...
@@ -81,7 +81,7 @@ func (s *DockerSuite) TestEventsOOMDisableFalse(c *testing.T) {
81 81
 func (s *DockerSuite) TestEventsOOMDisableTrue(c *testing.T) {
82 82
 	testRequires(c, DaemonIsLinux, oomControl, memoryLimitSupport, NotArm, swapMemorySupport, NotPpc64le)
83 83
 
84
-	errChan := make(chan error)
84
+	errChan := make(chan error, 1)
85 85
 	observer, err := newEventObserver(c)
86 86
 	assert.NilError(c, err)
87 87
 	err = observer.Start()
... ...
@@ -53,7 +53,7 @@ func (s *DockerSuite) TestExecInteractive(c *testing.T) {
53 53
 	assert.Equal(c, line, "test")
54 54
 	err = stdin.Close()
55 55
 	assert.NilError(c, err)
56
-	errChan := make(chan error)
56
+	errChan := make(chan error, 1)
57 57
 	go func() {
58 58
 		errChan <- execCmd.Wait()
59 59
 		close(errChan)
... ...
@@ -170,7 +170,7 @@ func (s *DockerSuite) TestExecTTYWithoutStdin(c *testing.T) {
170 170
 	id := strings.TrimSpace(out)
171 171
 	assert.NilError(c, waitRun(id))
172 172
 
173
-	errChan := make(chan error)
173
+	errChan := make(chan error, 1)
174 174
 	go func() {
175 175
 		defer close(errChan)
176 176
 
... ...
@@ -230,7 +230,7 @@ func (s *DockerSuite) TestExecStopNotHanging(c *testing.T) {
230 230
 		out string
231 231
 		err error
232 232
 	}
233
-	ch := make(chan dstop)
233
+	ch := make(chan dstop, 1)
234 234
 	go func() {
235 235
 		result := icmd.RunCommand(dockerBinary, "stop", "testing")
236 236
 		ch <- dstop{result.Combined(), result.Error}
... ...
@@ -256,11 +256,12 @@ func (s *DockerSuite) TestExecCgroup(c *testing.T) {
256 256
 	var wg sync.WaitGroup
257 257
 	var mu sync.Mutex
258 258
 	var execCgroups []sort.StringSlice
259
-	errChan := make(chan error)
259
+	errChan := make(chan error, 5)
260 260
 	// exec a few times concurrently to get consistent failure
261 261
 	for i := 0; i < 5; i++ {
262 262
 		wg.Add(1)
263 263
 		go func() {
264
+			defer wg.Done()
264 265
 			out, _, err := dockerCmdWithError("exec", "testing", "cat", "/proc/self/cgroup")
265 266
 			if err != nil {
266 267
 				errChan <- err
... ...
@@ -271,7 +272,6 @@ func (s *DockerSuite) TestExecCgroup(c *testing.T) {
271 271
 			mu.Lock()
272 272
 			execCgroups = append(execCgroups, cg)
273 273
 			mu.Unlock()
274
-			wg.Done()
275 274
 		}()
276 275
 	}
277 276
 	wg.Wait()
... ...
@@ -26,7 +26,7 @@ func (s *DockerSuite) TestExecInteractiveStdinClose(c *testing.T) {
26 26
 
27 27
 	b := bytes.NewBuffer(nil)
28 28
 
29
-	ch := make(chan error)
29
+	ch := make(chan error, 1)
30 30
 	go func() { ch <- cmd.Wait() }()
31 31
 
32 32
 	select {
... ...
@@ -56,7 +56,7 @@ func (s *DockerSuite) TestExecTTY(c *testing.T) {
56 56
 	_, err = p.Write([]byte("cat /foo && exit\n"))
57 57
 	assert.NilError(c, err)
58 58
 
59
-	chErr := make(chan error)
59
+	chErr := make(chan error, 1)
60 60
 	go func() {
61 61
 		chErr <- cmd.Wait()
62 62
 	}()
... ...
@@ -365,7 +365,7 @@ func (s *DockerExternalVolumeSuite) TestExternalVolumeDriverLookupNotBlocked(c *
365 365
 	defer os.RemoveAll(specPath)
366 366
 
367 367
 	chCmd1 := make(chan struct{})
368
-	chCmd2 := make(chan error)
368
+	chCmd2 := make(chan error, 1)
369 369
 	cmd1 := exec.Command(dockerBinary, "volume", "create", "-d", "down-driver")
370 370
 	cmd2 := exec.Command(dockerBinary, "volume", "create")
371 371
 
... ...
@@ -398,7 +398,7 @@ func (s *DockerExternalVolumeSuite) TestExternalVolumeDriverRetryNotImmediatelyE
398 398
 	s.d.StartWithBusybox(c)
399 399
 	driverName := "test-external-volume-driver-retry"
400 400
 
401
-	errchan := make(chan error)
401
+	errchan := make(chan error, 1)
402 402
 	started := make(chan struct{})
403 403
 	go func() {
404 404
 		close(started)
... ...
@@ -128,7 +128,7 @@ func (s *DockerSuite) TestLogsFollowStopped(c *testing.T) {
128 128
 	logsCmd := exec.Command(dockerBinary, "logs", "-f", id)
129 129
 	assert.NilError(c, logsCmd.Start())
130 130
 
131
-	errChan := make(chan error)
131
+	errChan := make(chan error, 1)
132 132
 	go func() {
133 133
 		errChan <- logsCmd.Wait()
134 134
 		close(errChan)
... ...
@@ -82,8 +82,8 @@ func testConcurrentPullWholeRepo(c *testing.T) {
82 82
 	dockerCmd(c, args...)
83 83
 
84 84
 	// Run multiple re-pulls concurrently
85
-	results := make(chan error)
86 85
 	numPulls := 3
86
+	results := make(chan error, numPulls)
87 87
 
88 88
 	for i := 0; i != numPulls; i++ {
89 89
 		go func() {
... ...
@@ -120,8 +120,8 @@ func testConcurrentFailingPull(c *testing.T) {
120 120
 	repoName := fmt.Sprintf("%v/dockercli/busybox", privateRegistryURL)
121 121
 
122 122
 	// Run multiple pulls concurrently
123
-	results := make(chan error)
124 123
 	numPulls := 3
124
+	results := make(chan error, numPulls)
125 125
 
126 126
 	for i := 0; i != numPulls; i++ {
127 127
 		go func() {
... ...
@@ -170,7 +170,7 @@ func testConcurrentPullMultipleTags(c *testing.T) {
170 170
 	dockerCmd(c, args...)
171 171
 
172 172
 	// Re-pull individual tags, in parallel
173
-	results := make(chan error)
173
+	results := make(chan error, len(repos))
174 174
 
175 175
 	for _, repo := range repos {
176 176
 		go func(repo string) {
... ...
@@ -162,7 +162,7 @@ func testConcurrentPush(c *testing.T) {
162 162
 	}
163 163
 
164 164
 	// Push tags, in parallel
165
-	results := make(chan error)
165
+	results := make(chan error, len(repos))
166 166
 
167 167
 	for _, repo := range repos {
168 168
 		go func(repo string) {
... ...
@@ -1765,7 +1765,7 @@ func (s *DockerSuite) TestRunExitOnStdinClose(c *testing.T) {
1765 1765
 	if err := stdin.Close(); err != nil {
1766 1766
 		c.Fatal(err)
1767 1767
 	}
1768
-	finish := make(chan error)
1768
+	finish := make(chan error, 1)
1769 1769
 	go func() {
1770 1770
 		finish <- runCmd.Wait()
1771 1771
 		close(finish)
... ...
@@ -2523,7 +2523,7 @@ func (s *DockerSuite) TestRunPortFromDockerRangeInUse(c *testing.T) {
2523 2523
 }
2524 2524
 
2525 2525
 func (s *DockerSuite) TestRunTTYWithPipe(c *testing.T) {
2526
-	errChan := make(chan error)
2526
+	errChan := make(chan error, 1)
2527 2527
 	go func() {
2528 2528
 		defer close(errChan)
2529 2529
 
... ...
@@ -2810,7 +2810,7 @@ func (s *DockerSuite) TestRunPIDHostWithChildIsKillable(c *testing.T) {
2810 2810
 
2811 2811
 	assert.Assert(c, waitRun(name) == nil)
2812 2812
 
2813
-	errchan := make(chan error)
2813
+	errchan := make(chan error, 1)
2814 2814
 	go func() {
2815 2815
 		if out, _, err := dockerCmdWithError("kill", name); err != nil {
2816 2816
 			errchan <- fmt.Errorf("%v:\n%s", err, out)
... ...
@@ -3622,7 +3622,7 @@ func (s *DockerSuite) TestRunStdinBlockedAfterContainerExit(c *testing.T) {
3622 3622
 	cmd.Stderr = stdout
3623 3623
 	assert.Assert(c, cmd.Start() == nil)
3624 3624
 
3625
-	waitChan := make(chan error)
3625
+	waitChan := make(chan error, 1)
3626 3626
 	go func() {
3627 3627
 		waitChan <- cmd.Wait()
3628 3628
 	}()
... ...
@@ -40,7 +40,7 @@ func (s *DockerSuite) TestRunRedirectStdout(c *testing.T) {
40 40
 		cmd.Stdout = tty
41 41
 		cmd.Stderr = tty
42 42
 		assert.NilError(c, cmd.Start())
43
-		ch := make(chan error)
43
+		ch := make(chan error, 1)
44 44
 		go func() {
45 45
 			ch <- cmd.Wait()
46 46
 			close(ch)
... ...
@@ -122,7 +122,7 @@ func (s *DockerSuite) TestRunAttachDetach(c *testing.T) {
122 122
 	_, err = cpty.Write([]byte{17})
123 123
 	assert.NilError(c, err)
124 124
 
125
-	ch := make(chan struct{})
125
+	ch := make(chan struct{}, 1)
126 126
 	go func() {
127 127
 		cmd.Wait()
128 128
 		ch <- struct{}{}
... ...
@@ -188,7 +188,7 @@ func (s *DockerSuite) TestRunAttachDetachFromFlag(c *testing.T) {
188 188
 		c.Fatal(err)
189 189
 	}
190 190
 
191
-	ch := make(chan struct{})
191
+	ch := make(chan struct{}, 1)
192 192
 	go func() {
193 193
 		cmd.Wait()
194 194
 		ch <- struct{}{}
... ...
@@ -304,7 +304,7 @@ func (s *DockerSuite) TestRunAttachDetachFromConfig(c *testing.T) {
304 304
 		c.Fatal(err)
305 305
 	}
306 306
 
307
-	ch := make(chan struct{})
307
+	ch := make(chan struct{}, 1)
308 308
 	go func() {
309 309
 		cmd.Wait()
310 310
 		ch <- struct{}{}
... ...
@@ -387,7 +387,7 @@ func (s *DockerSuite) TestRunAttachDetachKeysOverrideConfig(c *testing.T) {
387 387
 		c.Fatal(err)
388 388
 	}
389 389
 
390
-	ch := make(chan struct{})
390
+	ch := make(chan struct{}, 1)
391 391
 	go func() {
392 392
 		cmd.Wait()
393 393
 		ch <- struct{}{}
... ...
@@ -615,7 +615,7 @@ func (s *DockerSuite) TestRunWithInvalidPathforBlkioDeviceWriteIOps(c *testing.T
615 615
 
616 616
 func (s *DockerSuite) TestRunOOMExitCode(c *testing.T) {
617 617
 	testRequires(c, memoryLimitSupport, swapMemorySupport, NotPpc64le)
618
-	errChan := make(chan error)
618
+	errChan := make(chan error, 1)
619 619
 	go func() {
620 620
 		defer close(errChan)
621 621
 		// memory limit lower than 8MB will raise an error of "device or resource busy" from docker-runc.
... ...
@@ -177,6 +177,8 @@ func (s *DockerSwarmSuite) TestServiceLogsFollow(c *testing.T) {
177 177
 	// Make sure pipe is written to
178 178
 	ch := make(chan *logMessage)
179 179
 	done := make(chan struct{})
180
+	stop := make(chan struct{})
181
+	defer close(stop)
180 182
 	go func() {
181 183
 		reader := bufio.NewReader(r)
182 184
 		for {
... ...
@@ -184,6 +186,8 @@ func (s *DockerSwarmSuite) TestServiceLogsFollow(c *testing.T) {
184 184
 			msg.data, _, msg.err = reader.ReadLine()
185 185
 			select {
186 186
 			case ch <- msg:
187
+			case <-stop:
188
+				return
187 189
 			case <-done:
188 190
 				return
189 191
 			}
... ...
@@ -26,7 +26,7 @@ func (s *DockerSuite) TestStartAttachReturnsOnError(c *testing.T) {
26 26
 	// err shouldn't be nil because container test2 try to link to stopped container
27 27
 	assert.Assert(c, err != nil, "out: %s", out)
28 28
 
29
-	ch := make(chan error)
29
+	ch := make(chan error, 1)
30 30
 	go func() {
31 31
 		// Attempt to start attached to the container that won't start
32 32
 		// This should return an error immediately since the container can't be started
... ...
@@ -26,7 +26,7 @@ func (s *DockerSuite) TestStatsNoStream(c *testing.T) {
26 26
 		err error
27 27
 	}
28 28
 
29
-	ch := make(chan output)
29
+	ch := make(chan output, 1)
30 30
 	go func() {
31 31
 		out, err := statsCmd.Output()
32 32
 		ch <- output{out, err}
... ...
@@ -53,7 +53,7 @@ func TestExecWithCloseStdin(t *testing.T) {
53 53
 		resCh  = make(chan struct {
54 54
 			content string
55 55
 			err     error
56
-		})
56
+		}, 1)
57 57
 	)
58 58
 
59 59
 	go func() {
... ...
@@ -47,7 +47,7 @@ func TestContinueAfterPluginCrash(t *testing.T) {
47 47
 	attach, err := client.ContainerAttach(context.Background(), id, types.ContainerAttachOptions{Stream: true, Stdout: true})
48 48
 	assert.NilError(t, err)
49 49
 
50
-	chErr := make(chan error)
50
+	chErr := make(chan error, 1)
51 51
 	go func() {
52 52
 		defer close(chErr)
53 53
 		rdr := bufio.NewReader(attach.Reader)
... ...
@@ -645,13 +645,14 @@ func (d *Daemon) ReloadConfig() error {
645 645
 		return errors.New("daemon is not running")
646 646
 	}
647 647
 
648
-	errCh := make(chan error)
648
+	errCh := make(chan error, 1)
649 649
 	started := make(chan struct{})
650 650
 	go func() {
651 651
 		_, body, err := request.Get("/events", request.Host(d.Sock()))
652 652
 		close(started)
653 653
 		if err != nil {
654 654
 			errCh <- err
655
+			return
655 656
 		}
656 657
 		defer body.Close()
657 658
 		dec := json.NewDecoder(body)