Browse code

Extract event processing to a common function for testing.

We keep only one logic to test event related behavior that will help us
diagnose flacky event errors.

Signed-off-by: David Calavera <david.calavera@gmail.com>

David Calavera authored on 2015/12/22 05:34:08
Showing 4 changed files
... ...
@@ -97,7 +97,7 @@ func (daemon *Daemon) ImportImage(src string, newRef reference.Named, msg string
97 97
 		}
98 98
 	}
99 99
 
100
-	outStream.Write(sf.FormatStatus("", id.String()))
101 100
 	daemon.EventsService.Log("import", id.String(), "")
101
+	outStream.Write(sf.FormatStatus("", id.String()))
102 102
 	return nil
103 103
 }
... ...
@@ -7,7 +7,6 @@ import (
7 7
 	"encoding/json"
8 8
 	"fmt"
9 9
 	"io/ioutil"
10
-	"net/http"
11 10
 	"os"
12 11
 	"os/exec"
13 12
 	"path/filepath"
... ...
@@ -1882,45 +1881,14 @@ func (s *DockerSuite) TestBuildCancellationKillsSleep(c *check.C) {
1882 1882
 	}
1883 1883
 	defer ctx.Close()
1884 1884
 
1885
-	finish := make(chan struct{})
1886
-	defer close(finish)
1887
-
1888 1885
 	eventStart := make(chan struct{})
1889 1886
 	eventDie := make(chan struct{})
1890
-	containerID := make(chan string)
1891
-
1892
-	startEpoch := daemonTime(c).Unix()
1893
-	// Watch for events since epoch.
1894
-	eventsCmd := exec.Command(dockerBinary, "events", "--since", strconv.FormatInt(startEpoch, 10))
1895
-	stdout, err := eventsCmd.StdoutPipe()
1896
-	if err != nil {
1897
-		c.Fatal(err)
1898
-	}
1899
-	if err := eventsCmd.Start(); err != nil {
1900
-		c.Fatal(err)
1901
-	}
1902
-	defer eventsCmd.Process.Kill()
1903 1887
 
1904
-	// Goroutine responsible for watching start/die events from `docker events`
1905
-	go func() {
1906
-		cid := <-containerID
1907
-
1908
-		matchStart := regexp.MustCompile(cid + `(.*) start$`)
1909
-		matchDie := regexp.MustCompile(cid + `(.*) die$`)
1910
-
1911
-		//
1912
-		// Read lines of `docker events` looking for container start and stop.
1913
-		//
1914
-		scanner := bufio.NewScanner(stdout)
1915
-		for scanner.Scan() {
1916
-			switch {
1917
-			case matchStart.MatchString(scanner.Text()):
1918
-				close(eventStart)
1919
-			case matchDie.MatchString(scanner.Text()):
1920
-				close(eventDie)
1921
-			}
1922
-		}
1923
-	}()
1888
+	observer, err := newEventObserver(c)
1889
+	c.Assert(err, checker.IsNil)
1890
+	err = observer.Start()
1891
+	c.Assert(err, checker.IsNil)
1892
+	defer observer.Stop()
1924 1893
 
1925 1894
 	buildCmd := exec.Command(dockerBinary, "build", "-t", name, ".")
1926 1895
 	buildCmd.Dir = ctx.Dir
... ...
@@ -1932,17 +1900,39 @@ func (s *DockerSuite) TestBuildCancellationKillsSleep(c *check.C) {
1932 1932
 
1933 1933
 	matchCID := regexp.MustCompile("Running in (.+)")
1934 1934
 	scanner := bufio.NewScanner(stdoutBuild)
1935
+
1936
+	outputBuffer := new(bytes.Buffer)
1937
+	var buildID string
1935 1938
 	for scanner.Scan() {
1936 1939
 		line := scanner.Text()
1940
+		outputBuffer.WriteString(line)
1941
+		outputBuffer.WriteString("\n")
1937 1942
 		if matches := matchCID.FindStringSubmatch(line); len(matches) > 0 {
1938
-			containerID <- matches[1]
1943
+			buildID = matches[1]
1939 1944
 			break
1940 1945
 		}
1941 1946
 	}
1942 1947
 
1948
+	if buildID == "" {
1949
+		c.Fatalf("Unable to find build container id in build output:\n%s", outputBuffer.String())
1950
+	}
1951
+
1952
+	matchStart := regexp.MustCompile(buildID + `.* start\z`)
1953
+	matchDie := regexp.MustCompile(buildID + `.* die\z`)
1954
+
1955
+	matcher := func(text string) {
1956
+		switch {
1957
+		case matchStart.MatchString(text):
1958
+			close(eventStart)
1959
+		case matchDie.MatchString(text):
1960
+			close(eventDie)
1961
+		}
1962
+	}
1963
+	go observer.Match(matcher)
1964
+
1943 1965
 	select {
1944
-	case <-time.After(5 * time.Second):
1945
-		c.Fatal("failed to observe build container start in timely fashion")
1966
+	case <-time.After(10 * time.Second):
1967
+		c.Fatal(observer.TimeoutError(buildID, "start"))
1946 1968
 	case <-eventStart:
1947 1969
 		// Proceeds from here when we see the container fly past in the
1948 1970
 		// output of "docker events".
... ...
@@ -1961,9 +1951,9 @@ func (s *DockerSuite) TestBuildCancellationKillsSleep(c *check.C) {
1961 1961
 	}
1962 1962
 
1963 1963
 	select {
1964
-	case <-time.After(5 * time.Second):
1964
+	case <-time.After(10 * time.Second):
1965 1965
 		// If we don't get here in a timely fashion, it wasn't killed.
1966
-		c.Fatal("container cancel did not succeed")
1966
+		c.Fatal(observer.TimeoutError(buildID, "die"))
1967 1967
 	case <-eventDie:
1968 1968
 		// We saw the container shut down in the `docker events` stream,
1969 1969
 		// as expected.
... ...
@@ -6498,22 +6488,12 @@ func (s *DockerSuite) TestBuildNoNamedVolume(c *check.C) {
6498 6498
 
6499 6499
 func (s *DockerSuite) TestBuildTagEvent(c *check.C) {
6500 6500
 	testRequires(c, DaemonIsLinux)
6501
-	resp, rc, err := sockRequestRaw("GET", `/events?filters={"event":["tag"]}`, nil, "application/json")
6502
-	c.Assert(err, check.IsNil)
6503
-	defer rc.Close()
6504
-	c.Assert(resp.StatusCode, check.Equals, http.StatusOK)
6505 6501
 
6506
-	type event struct {
6507
-		Status string `json:"status"`
6508
-		ID     string `json:"id"`
6509
-	}
6510
-	ch := make(chan event)
6511
-	go func() {
6512
-		ev := event{}
6513
-		if err := json.NewDecoder(rc).Decode(&ev); err == nil {
6514
-			ch <- ev
6515
-		}
6516
-	}()
6502
+	observer, err := newEventObserver(c, "--filter", "event=tag")
6503
+	c.Assert(err, check.IsNil)
6504
+	err = observer.Start()
6505
+	c.Assert(err, check.IsNil)
6506
+	defer observer.Stop()
6517 6507
 
6518 6508
 	dockerFile := `FROM busybox
6519 6509
 	RUN echo events
... ...
@@ -6521,12 +6501,20 @@ func (s *DockerSuite) TestBuildTagEvent(c *check.C) {
6521 6521
 	_, err = buildImage("test", dockerFile, false)
6522 6522
 	c.Assert(err, check.IsNil)
6523 6523
 
6524
+	matchTag := regexp.MustCompile("test:latest")
6525
+	eventTag := make(chan bool)
6526
+	matcher := func(text string) {
6527
+		if matchTag.MatchString(text) {
6528
+			close(eventTag)
6529
+		}
6530
+	}
6531
+	go observer.Match(matcher)
6532
+
6524 6533
 	select {
6525
-	case ev := <-ch:
6526
-		c.Assert(ev.Status, check.Equals, "tag")
6527
-		c.Assert(ev.ID, check.Equals, "test:latest")
6528
-	case <-time.After(5 * time.Second):
6529
-		c.Fatal("The 'tag' event not heard from the server")
6534
+	case <-time.After(10 * time.Second):
6535
+		c.Fatal(observer.TimeoutError("test:latest", "tag"))
6536
+	case <-eventTag:
6537
+		// We saw the tag event as expected.
6530 6538
 	}
6531 6539
 }
6532 6540
 
... ...
@@ -2,7 +2,6 @@ package main
2 2
 
3 3
 import (
4 4
 	"bufio"
5
-	"bytes"
6 5
 	"fmt"
7 6
 	"io/ioutil"
8 7
 	"net/http"
... ...
@@ -216,27 +215,13 @@ func (s *DockerSuite) TestEventsImagePull(c *check.C) {
216 216
 
217 217
 func (s *DockerSuite) TestEventsImageImport(c *check.C) {
218 218
 	testRequires(c, DaemonIsLinux)
219
-	since := daemonTime(c).Unix()
220 219
 
221
-	id := make(chan string)
222
-	eventImport := make(chan struct{})
223
-	eventsCmd := exec.Command(dockerBinary, "events", "--since", strconv.FormatInt(since, 10))
224
-	stdout, err := eventsCmd.StdoutPipe()
220
+	observer, err := newEventObserver(c)
225 221
 	c.Assert(err, checker.IsNil)
226
-	c.Assert(eventsCmd.Start(), checker.IsNil)
227
-	defer eventsCmd.Process.Kill()
228
-
229
-	go func() {
230
-		containerID := <-id
231 222
 
232
-		matchImport := regexp.MustCompile(containerID + `: import$`)
233
-		scanner := bufio.NewScanner(stdout)
234
-		for scanner.Scan() {
235
-			if matchImport.MatchString(scanner.Text()) {
236
-				close(eventImport)
237
-			}
238
-		}
239
-	}()
223
+	err = observer.Start()
224
+	c.Assert(err, checker.IsNil)
225
+	defer observer.Stop()
240 226
 
241 227
 	out, _ := dockerCmd(c, "run", "-d", "busybox", "true")
242 228
 	cleanedContainerID := strings.TrimSpace(out)
... ...
@@ -246,12 +231,20 @@ func (s *DockerSuite) TestEventsImageImport(c *check.C) {
246 246
 		exec.Command(dockerBinary, "import", "-"),
247 247
 	)
248 248
 	c.Assert(err, checker.IsNil, check.Commentf("import failed with output: %q", out))
249
-	newContainerID := strings.TrimSpace(out)
250
-	id <- newContainerID
249
+	imageRef := strings.TrimSpace(out)
250
+
251
+	eventImport := make(chan bool)
252
+	matchImport := regexp.MustCompile(imageRef + `: import\z`)
253
+	matcher := func(text string) {
254
+		if matchImport.MatchString(text) {
255
+			close(eventImport)
256
+		}
257
+	}
258
+	go observer.Match(matcher)
251 259
 
252 260
 	select {
253 261
 	case <-time.After(5 * time.Second):
254
-		c.Fatal("failed to observe image import in timely fashion")
262
+		c.Fatal(observer.TimeoutError(imageRef, "import"))
255 263
 	case <-eventImport:
256 264
 		// ignore, done
257 265
 	}
... ...
@@ -421,76 +414,65 @@ func (s *DockerSuite) TestEventsFilterContainer(c *check.C) {
421 421
 
422 422
 func (s *DockerSuite) TestEventsStreaming(c *check.C) {
423 423
 	testRequires(c, DaemonIsLinux)
424
-	start := daemonTime(c).Unix()
425 424
 
426
-	id := make(chan string)
427 425
 	eventCreate := make(chan struct{})
428 426
 	eventStart := make(chan struct{})
429 427
 	eventDie := make(chan struct{})
430 428
 	eventDestroy := make(chan struct{})
431 429
 
432
-	eventsCmd := exec.Command(dockerBinary, "events", "--since", strconv.FormatInt(start, 10))
433
-	stdout, err := eventsCmd.StdoutPipe()
430
+	observer, err := newEventObserver(c)
434 431
 	c.Assert(err, checker.IsNil)
435
-	c.Assert(eventsCmd.Start(), checker.IsNil, check.Commentf("failed to start 'docker events'"))
436
-	defer eventsCmd.Process.Kill()
437
-
438
-	buffer := new(bytes.Buffer)
439
-	go func() {
440
-		containerID := <-id
441
-
442
-		matchCreate := regexp.MustCompile(containerID + `: \(from busybox:latest\) create\z`)
443
-		matchStart := regexp.MustCompile(containerID + `: \(from busybox:latest\) start\z`)
444
-		matchDie := regexp.MustCompile(containerID + `: \(from busybox:latest\) die\z`)
445
-		matchDestroy := regexp.MustCompile(containerID + `: \(from busybox:latest\) destroy\z`)
446
-
447
-		scanner := bufio.NewScanner(stdout)
448
-		for scanner.Scan() {
449
-			text := scanner.Text()
450
-			buffer.WriteString(text + "\n")
451
-			switch {
452
-			case matchCreate.MatchString(text):
453
-				close(eventCreate)
454
-			case matchStart.MatchString(text):
455
-				close(eventStart)
456
-			case matchDie.MatchString(text):
457
-				close(eventDie)
458
-			case matchDestroy.MatchString(text):
459
-				close(eventDestroy)
460
-			}
461
-		}
462
-	}()
432
+	err = observer.Start()
433
+	c.Assert(err, checker.IsNil)
434
+	defer observer.Stop()
463 435
 
464 436
 	out, _ := dockerCmd(c, "run", "-d", "busybox:latest", "true")
465
-	cleanedContainerID := strings.TrimSpace(out)
466
-	id <- cleanedContainerID
437
+	containerID := strings.TrimSpace(out)
438
+	matchCreate := regexp.MustCompile(containerID + `: \(from busybox:latest\) create\z`)
439
+	matchStart := regexp.MustCompile(containerID + `: \(from busybox:latest\) start\z`)
440
+	matchDie := regexp.MustCompile(containerID + `: \(from busybox:latest\) die\z`)
441
+	matchDestroy := regexp.MustCompile(containerID + `: \(from busybox:latest\) destroy\z`)
442
+
443
+	matcher := func(text string) {
444
+		switch {
445
+		case matchCreate.MatchString(text):
446
+			close(eventCreate)
447
+		case matchStart.MatchString(text):
448
+			close(eventStart)
449
+		case matchDie.MatchString(text):
450
+			close(eventDie)
451
+		case matchDestroy.MatchString(text):
452
+			close(eventDestroy)
453
+		}
454
+	}
455
+	go observer.Match(matcher)
467 456
 
468 457
 	select {
469 458
 	case <-time.After(5 * time.Second):
470
-		c.Fatal("failed to observe container create in timely fashion", "\n", buffer.String())
459
+		c.Fatal(observer.TimeoutError(containerID, "create"))
471 460
 	case <-eventCreate:
472 461
 		// ignore, done
473 462
 	}
474 463
 
475 464
 	select {
476 465
 	case <-time.After(5 * time.Second):
477
-		c.Fatal("failed to observe container start in timely fashion", "\n", buffer.String())
466
+		c.Fatal(observer.TimeoutError(containerID, "start"))
478 467
 	case <-eventStart:
479 468
 		// ignore, done
480 469
 	}
481 470
 
482 471
 	select {
483 472
 	case <-time.After(5 * time.Second):
484
-		c.Fatal("failed to observe container die in timely fashion", "\n", buffer.String())
473
+		c.Fatal(observer.TimeoutError(containerID, "die"))
485 474
 	case <-eventDie:
486 475
 		// ignore, done
487 476
 	}
488 477
 
489
-	dockerCmd(c, "rm", cleanedContainerID)
478
+	dockerCmd(c, "rm", containerID)
490 479
 
491 480
 	select {
492 481
 	case <-time.After(5 * time.Second):
493
-		c.Fatal("failed to observe container destroy in timely fashion", "\n", buffer.String())
482
+		c.Fatal(observer.TimeoutError(containerID, "destroy"))
494 483
 	case <-eventDestroy:
495 484
 		// ignore, done
496 485
 	}
497 486
new file mode 100644
... ...
@@ -0,0 +1,78 @@
0
+package main
1
+
2
+import (
3
+	"bufio"
4
+	"bytes"
5
+	"fmt"
6
+	"io"
7
+	"os/exec"
8
+	"strconv"
9
+
10
+	"github.com/go-check/check"
11
+)
12
+
13
+// eventMatcher is a function that tries to match an event input.
14
+type eventMatcher func(text string)
15
+
16
+// eventObserver runs an events commands and observes its output.
17
+type eventObserver struct {
18
+	buffer  *bytes.Buffer
19
+	command *exec.Cmd
20
+	stdout  io.Reader
21
+}
22
+
23
+// newEventObserver creates the observer and initializes the command
24
+// without running it. Users must call `eventObserver.Start` to start the command.
25
+func newEventObserver(c *check.C, args ...string) (*eventObserver, error) {
26
+	since := daemonTime(c).Unix()
27
+
28
+	cmdArgs := []string{"events", "--since", strconv.FormatInt(since, 10)}
29
+	if len(args) > 0 {
30
+		cmdArgs = append(cmdArgs, args...)
31
+	}
32
+	eventsCmd := exec.Command(dockerBinary, cmdArgs...)
33
+	stdout, err := eventsCmd.StdoutPipe()
34
+	if err != nil {
35
+		return nil, err
36
+	}
37
+
38
+	return &eventObserver{
39
+		buffer:  new(bytes.Buffer),
40
+		command: eventsCmd,
41
+		stdout:  stdout,
42
+	}, nil
43
+}
44
+
45
+// Start starts the events command.
46
+func (e *eventObserver) Start() error {
47
+	return e.command.Start()
48
+}
49
+
50
+// Stop stops the events command.
51
+func (e *eventObserver) Stop() {
52
+	e.command.Process.Kill()
53
+}
54
+
55
+// Match tries to match the events output with a given matcher.
56
+func (e *eventObserver) Match(match eventMatcher) {
57
+	scanner := bufio.NewScanner(e.stdout)
58
+
59
+	for scanner.Scan() {
60
+		text := scanner.Text()
61
+		e.buffer.WriteString(text)
62
+		e.buffer.WriteString("\n")
63
+
64
+		match(text)
65
+	}
66
+}
67
+
68
+// TimeoutError generates an error for a given containerID and event type.
69
+// It attaches the events command output to the error.
70
+func (e *eventObserver) TimeoutError(id, event string) error {
71
+	return fmt.Errorf("failed to observe event `%s` for %s\n%v", event, id, e.output())
72
+}
73
+
74
+// output returns the events command output read until now by the Match goroutine.
75
+func (e *eventObserver) output() string {
76
+	return e.buffer.String()
77
+}