Browse code

Refactor to new events api

Signed-off-by: Josh Horwitz <horwitzja@gmail.com>

Josh Horwitz authored on 2016/08/10 05:34:07
Showing 9 changed files
... ...
@@ -211,10 +211,7 @@ func runRun(dockerCli *command.DockerCli, flags *pflag.FlagSet, opts *runOptions
211 211
 		})
212 212
 	}
213 213
 
214
-	statusChan, err := waitExitOrRemoved(dockerCli, context.Background(), createResponse.ID, hostConfig.AutoRemove)
215
-	if err != nil {
216
-		return fmt.Errorf("Error waiting container's exit code: %v", err)
217
-	}
214
+	statusChan := waitExitOrRemoved(dockerCli, ctx, createResponse.ID, hostConfig.AutoRemove)
218 215
 
219 216
 	//start the container
220 217
 	if err := client.ContainerStart(ctx, createResponse.ID, types.ContainerStartOptions{}); err != nil {
... ...
@@ -108,7 +108,7 @@ func runStart(dockerCli *command.DockerCli, opts *startOptions) error {
108 108
 
109 109
 		// 3. We should open a channel for receiving status code of the container
110 110
 		// no matter it's detached, removed on daemon side(--rm) or exit normally.
111
-		statusChan, statusErr := waitExitOrRemoved(dockerCli, context.Background(), c.ID, c.HostConfig.AutoRemove)
111
+		statusChan := waitExitOrRemoved(dockerCli, ctx, c.ID, c.HostConfig.AutoRemove)
112 112
 		startOptions := types.ContainerStartOptions{
113 113
 			CheckpointID: opts.checkpoint,
114 114
 		}
... ...
@@ -117,7 +117,7 @@ func runStart(dockerCli *command.DockerCli, opts *startOptions) error {
117 117
 		if err := dockerCli.Client().ContainerStart(ctx, c.ID, startOptions); err != nil {
118 118
 			cancelFun()
119 119
 			<-cErr
120
-			if c.HostConfig.AutoRemove && statusErr == nil {
120
+			if c.HostConfig.AutoRemove {
121 121
 				// wait container to be removed
122 122
 				<-statusChan
123 123
 			}
... ...
@@ -134,10 +134,6 @@ func runStart(dockerCli *command.DockerCli, opts *startOptions) error {
134 134
 			return attchErr
135 135
 		}
136 136
 
137
-		if statusErr != nil {
138
-			return fmt.Errorf("can't get container's exit code: %v", statusErr)
139
-		}
140
-
141 137
 		if status := <-statusChan; status != 0 {
142 138
 			return cli.StatusError{StatusCode: status}
143 139
 		}
... ...
@@ -63,24 +63,22 @@ func runStats(dockerCli *command.DockerCli, opts *statsOptions) error {
63 63
 		options := types.EventsOptions{
64 64
 			Filters: f,
65 65
 		}
66
-		resBody, err := dockerCli.Client().Events(ctx, options)
67
-		// Whether we successfully subscribed to events or not, we can now
66
+
67
+		eventq, errq := dockerCli.Client().Events(ctx, options)
68
+
69
+		// Whether we successfully subscribed to eventq or not, we can now
68 70
 		// unblock the main goroutine.
69 71
 		close(started)
70
-		if err != nil {
71
-			closeChan <- err
72
-			return
73
-		}
74
-		defer resBody.Close()
75 72
 
76
-		system.DecodeEvents(resBody, func(event events.Message, err error) error {
77
-			if err != nil {
73
+		for {
74
+			select {
75
+			case event := <-eventq:
76
+				c <- event
77
+			case err := <-errq:
78 78
 				closeChan <- err
79
-				return nil
79
+				return
80 80
 			}
81
-			c <- event
82
-			return nil
83
-		})
81
+		}
84 82
 	}
85 83
 
86 84
 	// waitFirst is a WaitGroup to wait first stat data's reach for each container
... ...
@@ -1,7 +1,6 @@
1 1
 package container
2 2
 
3 3
 import (
4
-	"fmt"
5 4
 	"strconv"
6 5
 
7 6
 	"golang.org/x/net/context"
... ...
@@ -11,11 +10,10 @@ import (
11 11
 	"github.com/docker/docker/api/types/events"
12 12
 	"github.com/docker/docker/api/types/filters"
13 13
 	"github.com/docker/docker/cli/command"
14
-	"github.com/docker/docker/cli/command/system"
15 14
 	clientapi "github.com/docker/docker/client"
16 15
 )
17 16
 
18
-func waitExitOrRemoved(dockerCli *command.DockerCli, ctx context.Context, containerID string, waitRemove bool) (chan int, error) {
17
+func waitExitOrRemoved(dockerCli *command.DockerCli, ctx context.Context, containerID string, waitRemove bool) chan int {
19 18
 	if len(containerID) == 0 {
20 19
 		// containerID can never be empty
21 20
 		panic("Internal Error: waitExitOrRemoved needs a containerID as parameter")
... ...
@@ -24,11 +22,7 @@ func waitExitOrRemoved(dockerCli *command.DockerCli, ctx context.Context, contai
24 24
 	statusChan := make(chan int)
25 25
 	exitCode := 125
26 26
 
27
-	eventProcessor := func(e events.Message, err error) error {
28
-		if err != nil {
29
-			statusChan <- exitCode
30
-			return fmt.Errorf("failed to decode event: %v", err)
31
-		}
27
+	eventProcessor := func(e events.Message) bool {
32 28
 
33 29
 		stopProcessing := false
34 30
 		switch e.Status {
... ...
@@ -53,11 +47,10 @@ func waitExitOrRemoved(dockerCli *command.DockerCli, ctx context.Context, contai
53 53
 
54 54
 		if stopProcessing {
55 55
 			statusChan <- exitCode
56
-			// stop the loop processing
57
-			return fmt.Errorf("done")
56
+			return true
58 57
 		}
59 58
 
60
-		return nil
59
+		return false
61 60
 	}
62 61
 
63 62
 	// Get events via Events API
... ...
@@ -67,14 +60,29 @@ func waitExitOrRemoved(dockerCli *command.DockerCli, ctx context.Context, contai
67 67
 	options := types.EventsOptions{
68 68
 		Filters: f,
69 69
 	}
70
-	resBody, err := dockerCli.Client().Events(ctx, options)
71
-	if err != nil {
72
-		return nil, fmt.Errorf("can't get events from daemon: %v", err)
73
-	}
74 70
 
75
-	go system.DecodeEvents(resBody, eventProcessor)
71
+	eventCtx, cancel := context.WithCancel(ctx)
72
+	eventq, errq := dockerCli.Client().Events(eventCtx, options)
73
+
74
+	go func() {
75
+		defer cancel()
76
+
77
+		for {
78
+			select {
79
+			case evt := <-eventq:
80
+				if eventProcessor(evt) {
81
+					return
82
+				}
83
+
84
+			case err := <-errq:
85
+				logrus.Errorf("error getting events from daemon: %v", err)
86
+				statusChan <- exitCode
87
+				return
88
+			}
89
+		}
90
+	}()
76 91
 
77
-	return statusChan, nil
92
+	return statusChan
78 93
 }
79 94
 
80 95
 // getExitCode performs an inspect on the container. It returns
... ...
@@ -63,13 +63,33 @@ func runEvents(dockerCli *command.DockerCli, opts *eventsOptions) error {
63 63
 		Filters: opts.filter.Value(),
64 64
 	}
65 65
 
66
-	responseBody, err := dockerCli.Client().Events(context.Background(), options)
67
-	if err != nil {
68
-		return err
66
+	ctx, cancel := context.WithCancel(context.Background())
67
+	events, errs := dockerCli.Client().Events(ctx, options)
68
+	defer cancel()
69
+
70
+	out := dockerCli.Out()
71
+
72
+	for {
73
+		select {
74
+		case event := <-events:
75
+			if err := handleEvent(out, event, tmpl); err != nil {
76
+				return err
77
+			}
78
+		case err := <-errs:
79
+			if err == io.EOF {
80
+				return nil
81
+			}
82
+			return err
83
+		}
84
+	}
85
+}
86
+
87
+func handleEvent(out io.Writer, event eventtypes.Message, tmpl *template.Template) error {
88
+	if tmpl == nil {
89
+		return prettyPrintEvent(out, event)
69 90
 	}
70
-	defer responseBody.Close()
71 91
 
72
-	return streamEvents(dockerCli.Out(), responseBody, tmpl)
92
+	return formatEvent(out, event, tmpl)
73 93
 }
74 94
 
75 95
 func makeTemplate(format string) (*template.Template, error) {
... ...
@@ -85,21 +105,6 @@ func makeTemplate(format string) (*template.Template, error) {
85 85
 	return tmpl, tmpl.Execute(ioutil.Discard, &eventtypes.Message{})
86 86
 }
87 87
 
88
-// streamEvents decodes prints the incoming events in the provided output.
89
-func streamEvents(out io.Writer, input io.Reader, tmpl *template.Template) error {
90
-	return DecodeEvents(input, func(event eventtypes.Message, err error) error {
91
-		if err != nil {
92
-			return err
93
-		}
94
-		if tmpl == nil {
95
-			return prettyPrintEvent(out, event)
96
-		}
97
-		return formatEvent(out, event, tmpl)
98
-	})
99
-}
100
-
101
-type eventProcessor func(event eventtypes.Message, err error) error
102
-
103 88
 // prettyPrintEvent prints all types of event information.
104 89
 // Each output includes the event type, actor id, name and action.
105 90
 // Actor attributes are printed at the end if the actor has any.
... ...
@@ -1,14 +1,14 @@
1 1
 package system
2 2
 
3 3
 import (
4
-	"encoding/json"
5
-	"io"
6 4
 	"sync"
7 5
 
8 6
 	"github.com/Sirupsen/logrus"
9 7
 	eventtypes "github.com/docker/docker/api/types/events"
10 8
 )
11 9
 
10
+type eventProcessor func(eventtypes.Message, error) error
11
+
12 12
 // EventHandler is abstract interface for user to customize
13 13
 // own handle functions of each type of events
14 14
 type EventHandler interface {
... ...
@@ -47,20 +47,3 @@ func (w *eventHandler) Watch(c <-chan eventtypes.Message) {
47 47
 		go h(e)
48 48
 	}
49 49
 }
50
-
51
-// DecodeEvents decodes event from input stream
52
-func DecodeEvents(input io.Reader, ep eventProcessor) error {
53
-	dec := json.NewDecoder(input)
54
-	for {
55
-		var event eventtypes.Message
56
-		err := dec.Decode(&event)
57
-		if err != nil && err == io.EOF {
58
-			break
59
-		}
60
-
61
-		if procErr := ep(event, err); procErr != nil {
62
-			return procErr
63
-		}
64
-	}
65
-	return nil
66
-}
... ...
@@ -1,20 +1,71 @@
1 1
 package client
2 2
 
3 3
 import (
4
-	"io"
4
+	"encoding/json"
5 5
 	"net/url"
6 6
 	"time"
7 7
 
8 8
 	"golang.org/x/net/context"
9 9
 
10 10
 	"github.com/docker/docker/api/types"
11
+	"github.com/docker/docker/api/types/events"
11 12
 	"github.com/docker/docker/api/types/filters"
12 13
 	timetypes "github.com/docker/docker/api/types/time"
13 14
 )
14 15
 
15
-// Events returns a stream of events in the daemon in a ReadCloser.
16
-// It's up to the caller to close the stream.
17
-func (cli *Client) Events(ctx context.Context, options types.EventsOptions) (io.ReadCloser, error) {
16
+// Events returns a stream of events in the daemon. It's up to the caller to close the stream
17
+// by cancelling the context. Once the stream has been completely read an io.EOF error will
18
+// be sent over the error channel. If an error is sent all processing will be stopped. It's up
19
+// to the caller to reopen the stream in the event of an error by reinvoking this method.
20
+func (cli *Client) Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error) {
21
+
22
+	messages := make(chan events.Message)
23
+	errs := make(chan error, 1)
24
+
25
+	go func() {
26
+		defer close(errs)
27
+
28
+		query, err := buildEventsQueryParams(cli.version, options)
29
+		if err != nil {
30
+			errs <- err
31
+			return
32
+		}
33
+
34
+		resp, err := cli.get(ctx, "/events", query, nil)
35
+		if err != nil {
36
+			errs <- err
37
+			return
38
+		}
39
+		defer resp.body.Close()
40
+
41
+		decoder := json.NewDecoder(resp.body)
42
+
43
+		for {
44
+			select {
45
+			case <-ctx.Done():
46
+				errs <- ctx.Err()
47
+				return
48
+			default:
49
+				var event events.Message
50
+				if err := decoder.Decode(&event); err != nil {
51
+					errs <- err
52
+					return
53
+				}
54
+
55
+				select {
56
+				case messages <- event:
57
+				case <-ctx.Done():
58
+					errs <- ctx.Err()
59
+					return
60
+				}
61
+			}
62
+		}
63
+	}()
64
+
65
+	return messages, errs
66
+}
67
+
68
+func buildEventsQueryParams(cliVersion string, options types.EventsOptions) (url.Values, error) {
18 69
 	query := url.Values{}
19 70
 	ref := time.Now()
20 71
 
... ...
@@ -25,6 +76,7 @@ func (cli *Client) Events(ctx context.Context, options types.EventsOptions) (io.
25 25
 		}
26 26
 		query.Set("since", ts)
27 27
 	}
28
+
28 29
 	if options.Until != "" {
29 30
 		ts, err := timetypes.GetTimestamp(options.Until, ref)
30 31
 		if err != nil {
... ...
@@ -32,17 +84,14 @@ func (cli *Client) Events(ctx context.Context, options types.EventsOptions) (io.
32 32
 		}
33 33
 		query.Set("until", ts)
34 34
 	}
35
+
35 36
 	if options.Filters.Len() > 0 {
36
-		filterJSON, err := filters.ToParamWithVersion(cli.version, options.Filters)
37
+		filterJSON, err := filters.ToParamWithVersion(cliVersion, options.Filters)
37 38
 		if err != nil {
38 39
 			return nil, err
39 40
 		}
40 41
 		query.Set("filters", filterJSON)
41 42
 	}
42 43
 
43
-	serverResponse, err := cli.get(ctx, "/events", query, nil)
44
-	if err != nil {
45
-		return nil, err
46
-	}
47
-	return serverResponse.body, nil
44
+	return query, nil
48 45
 }
... ...
@@ -2,7 +2,9 @@ package client
2 2
 
3 3
 import (
4 4
 	"bytes"
5
+	"encoding/json"
5 6
 	"fmt"
7
+	"io"
6 8
 	"io/ioutil"
7 9
 	"net/http"
8 10
 	"strings"
... ...
@@ -11,6 +13,7 @@ import (
11 11
 	"golang.org/x/net/context"
12 12
 
13 13
 	"github.com/docker/docker/api/types"
14
+	"github.com/docker/docker/api/types/events"
14 15
 	"github.com/docker/docker/api/types/filters"
15 16
 )
16 17
 
... ...
@@ -36,7 +39,8 @@ func TestEventsErrorInOptions(t *testing.T) {
36 36
 		client := &Client{
37 37
 			client: newMockClient(errorMock(http.StatusInternalServerError, "Server error")),
38 38
 		}
39
-		_, err := client.Events(context.Background(), e.options)
39
+		_, errs := client.Events(context.Background(), e.options)
40
+		err := <-errs
40 41
 		if err == nil || !strings.Contains(err.Error(), e.expectedError) {
41 42
 			t.Fatalf("expected an error %q, got %v", e.expectedError, err)
42 43
 		}
... ...
@@ -47,39 +51,36 @@ func TestEventsErrorFromServer(t *testing.T) {
47 47
 	client := &Client{
48 48
 		client: newMockClient(errorMock(http.StatusInternalServerError, "Server error")),
49 49
 	}
50
-	_, err := client.Events(context.Background(), types.EventsOptions{})
50
+	_, errs := client.Events(context.Background(), types.EventsOptions{})
51
+	err := <-errs
51 52
 	if err == nil || err.Error() != "Error response from daemon: Server error" {
52 53
 		t.Fatalf("expected a Server Error, got %v", err)
53 54
 	}
54 55
 }
55 56
 
56 57
 func TestEvents(t *testing.T) {
58
+
57 59
 	expectedURL := "/events"
58 60
 
59 61
 	filters := filters.NewArgs()
60
-	filters.Add("label", "label1")
61
-	filters.Add("label", "label2")
62
-	expectedFiltersJSON := `{"label":{"label1":true,"label2":true}}`
62
+	filters.Add("type", events.ContainerEventType)
63
+	expectedFiltersJSON := fmt.Sprintf(`{"type":{"%s":true}}`, events.ContainerEventType)
63 64
 
64 65
 	eventsCases := []struct {
65 66
 		options             types.EventsOptions
67
+		events              []events.Message
68
+		expectedEvents      map[string]bool
66 69
 		expectedQueryParams map[string]string
67 70
 	}{
68 71
 		{
69 72
 			options: types.EventsOptions{
70
-				Since: "invalid but valid",
71
-			},
72
-			expectedQueryParams: map[string]string{
73
-				"since": "invalid but valid",
74
-			},
75
-		},
76
-		{
77
-			options: types.EventsOptions{
78
-				Until: "invalid but valid",
73
+				Filters: filters,
79 74
 			},
80 75
 			expectedQueryParams: map[string]string{
81
-				"until": "invalid but valid",
76
+				"filters": expectedFiltersJSON,
82 77
 			},
78
+			events:         []events.Message{},
79
+			expectedEvents: make(map[string]bool),
83 80
 		},
84 81
 		{
85 82
 			options: types.EventsOptions{
... ...
@@ -88,6 +89,28 @@ func TestEvents(t *testing.T) {
88 88
 			expectedQueryParams: map[string]string{
89 89
 				"filters": expectedFiltersJSON,
90 90
 			},
91
+			events: []events.Message{
92
+				{
93
+					Type:   "container",
94
+					ID:     "1",
95
+					Action: "create",
96
+				},
97
+				{
98
+					Type:   "container",
99
+					ID:     "2",
100
+					Action: "die",
101
+				},
102
+				{
103
+					Type:   "container",
104
+					ID:     "3",
105
+					Action: "create",
106
+				},
107
+			},
108
+			expectedEvents: map[string]bool{
109
+				"1": true,
110
+				"2": true,
111
+				"3": true,
112
+			},
91 113
 		},
92 114
 	}
93 115
 
... ...
@@ -98,29 +121,45 @@ func TestEvents(t *testing.T) {
98 98
 					return nil, fmt.Errorf("Expected URL '%s', got '%s'", expectedURL, req.URL)
99 99
 				}
100 100
 				query := req.URL.Query()
101
+
101 102
 				for key, expected := range eventsCase.expectedQueryParams {
102 103
 					actual := query.Get(key)
103 104
 					if actual != expected {
104 105
 						return nil, fmt.Errorf("%s not set in URL query properly. Expected '%s', got %s", key, expected, actual)
105 106
 					}
106 107
 				}
108
+
109
+				buffer := new(bytes.Buffer)
110
+
111
+				for _, e := range eventsCase.events {
112
+					b, _ := json.Marshal(e)
113
+					buffer.Write(b)
114
+				}
115
+
107 116
 				return &http.Response{
108 117
 					StatusCode: http.StatusOK,
109
-					Body:       ioutil.NopCloser(bytes.NewReader([]byte("response"))),
118
+					Body:       ioutil.NopCloser(buffer),
110 119
 				}, nil
111 120
 			}),
112 121
 		}
113
-		body, err := client.Events(context.Background(), eventsCase.options)
114
-		if err != nil {
115
-			t.Fatal(err)
116
-		}
117
-		defer body.Close()
118
-		content, err := ioutil.ReadAll(body)
119
-		if err != nil {
120
-			t.Fatal(err)
121
-		}
122
-		if string(content) != "response" {
123
-			t.Fatalf("expected response to contain 'response', got %s", string(content))
122
+
123
+		messages, errs := client.Events(context.Background(), eventsCase.options)
124
+
125
+	loop:
126
+		for {
127
+			select {
128
+			case err := <-errs:
129
+				if err != nil && err != io.EOF {
130
+					t.Fatal(err)
131
+				}
132
+
133
+				break loop
134
+			case e := <-messages:
135
+				_, ok := eventsCase.expectedEvents[e.ID]
136
+				if !ok {
137
+					t.Fatalf("event received not expected with action %s & id %s", e.Action, e.ID)
138
+				}
139
+			}
124 140
 		}
125 141
 	}
126 142
 }
... ...
@@ -6,6 +6,7 @@ import (
6 6
 
7 7
 	"github.com/docker/docker/api/types"
8 8
 	"github.com/docker/docker/api/types/container"
9
+	"github.com/docker/docker/api/types/events"
9 10
 	"github.com/docker/docker/api/types/filters"
10 11
 	"github.com/docker/docker/api/types/network"
11 12
 	"github.com/docker/docker/api/types/registry"
... ...
@@ -120,7 +121,7 @@ type SwarmAPIClient interface {
120 120
 
121 121
 // SystemAPIClient defines API client methods for the system
122 122
 type SystemAPIClient interface {
123
-	Events(ctx context.Context, options types.EventsOptions) (io.ReadCloser, error)
123
+	Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error)
124 124
 	Info(ctx context.Context) (types.Info, error)
125 125
 	RegistryLogin(ctx context.Context, auth types.AuthConfig) (types.AuthResponse, error)
126 126
 }