Browse code

Fix 19575: Docker events doesn't work with authorization plugin

To support the requirement of blocking the request after the daemon
responded the authorization plugin use a `response recorder` that replay
the response after the flow ends.

This commit adds support for commands that hijack the connection and
flushes data via the http.Flusher interface. This resolves the error
with the event endpoint.

Signed-off-by: Liron Levin <liron@twistlock.com>

Liron Levin authored on 2016/02/05 00:41:41
Showing 4 changed files
... ...
@@ -11,10 +11,15 @@ import (
11 11
 	"os"
12 12
 	"strings"
13 13
 
14
+	"bufio"
15
+	"bytes"
14 16
 	"github.com/docker/docker/pkg/authorization"
15 17
 	"github.com/docker/docker/pkg/integration/checker"
16 18
 	"github.com/docker/docker/pkg/plugins"
17 19
 	"github.com/go-check/check"
20
+	"os/exec"
21
+	"strconv"
22
+	"time"
18 23
 )
19 24
 
20 25
 const (
... ...
@@ -221,6 +226,71 @@ func (s *DockerAuthzSuite) TestAuthZPluginDenyResponse(c *check.C) {
221 221
 	c.Assert(res, check.Equals, fmt.Sprintf("Error response from daemon: authorization denied by plugin %s: %s\n", testAuthZPlugin, unauthorizedMessage))
222 222
 }
223 223
 
224
+// TestAuthZPluginAllowEventStream verifies event stream propogates correctly after request pass through by the authorization plugin
225
+func (s *DockerAuthzSuite) TestAuthZPluginAllowEventStream(c *check.C) {
226
+	testRequires(c, DaemonIsLinux)
227
+
228
+	// Start the authorization plugin
229
+	err := s.d.Start("--authorization-plugin=" + testAuthZPlugin)
230
+	c.Assert(err, check.IsNil)
231
+	s.ctrl.reqRes.Allow = true
232
+	s.ctrl.resRes.Allow = true
233
+
234
+	startTime := strconv.FormatInt(daemonTime(c).Unix(), 10)
235
+	// Add another command to to enable event pipelining
236
+	eventsCmd := exec.Command(s.d.cmd.Path, "--host", s.d.sock(), "events", "--since", startTime)
237
+	stdout, err := eventsCmd.StdoutPipe()
238
+	if err != nil {
239
+		c.Assert(err, check.IsNil)
240
+	}
241
+
242
+	observer := eventObserver{
243
+		buffer:    new(bytes.Buffer),
244
+		command:   eventsCmd,
245
+		scanner:   bufio.NewScanner(stdout),
246
+		startTime: startTime,
247
+	}
248
+
249
+	err = observer.Start()
250
+	c.Assert(err, checker.IsNil)
251
+	defer observer.Stop()
252
+
253
+	// Create a container and wait for the creation events
254
+	_, err = s.d.Cmd("pull", "busybox")
255
+	c.Assert(err, check.IsNil)
256
+	out, err := s.d.Cmd("run", "-d", "busybox", "top")
257
+	c.Assert(err, check.IsNil)
258
+
259
+	containerID := strings.TrimSpace(out)
260
+
261
+	events := map[string]chan bool{
262
+		"create": make(chan bool),
263
+		"start":  make(chan bool),
264
+	}
265
+
266
+	matcher := matchEventLine(containerID, "container", events)
267
+	processor := processEventMatch(events)
268
+	go observer.Match(matcher, processor)
269
+
270
+	// Ensure all events are received
271
+	for event, eventChannel := range events {
272
+
273
+		select {
274
+		case <-time.After(5 * time.Second):
275
+			// Fail the test
276
+			observer.CheckEventError(c, containerID, event, matcher)
277
+			c.FailNow()
278
+		case <-eventChannel:
279
+			// Ignore, event received
280
+		}
281
+	}
282
+
283
+	// Ensure both events and container endpoints are passed to the authorization plugin
284
+	assertURIRecorded(c, s.ctrl.requestsURIs, "/events")
285
+	assertURIRecorded(c, s.ctrl.requestsURIs, "/containers/create")
286
+	assertURIRecorded(c, s.ctrl.requestsURIs, fmt.Sprintf("/containers/%s/start", containerID))
287
+}
288
+
224 289
 func (s *DockerAuthzSuite) TestAuthZPluginErrorResponse(c *check.C) {
225 290
 	err := s.d.Start("--authorization-plugin=" + testAuthZPlugin)
226 291
 	c.Assert(err, check.IsNil)
... ...
@@ -116,7 +116,7 @@ func (ctx *Ctx) AuthZResponse(rm ResponseModifier, r *http.Request) error {
116 116
 		}
117 117
 	}
118 118
 
119
-	rm.Flush()
119
+	rm.FlushAll()
120 120
 
121 121
 	return nil
122 122
 }
... ...
@@ -118,7 +118,7 @@ func TestResponseModifier(t *testing.T) {
118 118
 	m.Write([]byte("body"))
119 119
 	m.WriteHeader(500)
120 120
 
121
-	m.Flush()
121
+	m.FlushAll()
122 122
 	if r.Header().Get("h1") != "v1" {
123 123
 		t.Fatalf("Header value must exists %s", r.Header().Get("h1"))
124 124
 	}
... ...
@@ -147,7 +147,7 @@ func TestResponseModifierOverride(t *testing.T) {
147 147
 	m.OverrideHeader(overrideHeaderBytes)
148 148
 	m.OverrideBody([]byte("override body"))
149 149
 	m.OverrideStatusCode(404)
150
-	m.Flush()
150
+	m.FlushAll()
151 151
 	if r.Header().Get("h1") != "v2" {
152 152
 		t.Fatalf("Header value must exists %s", r.Header().Get("h1"))
153 153
 	}
... ...
@@ -5,6 +5,7 @@ import (
5 5
 	"bytes"
6 6
 	"encoding/json"
7 7
 	"fmt"
8
+	"github.com/Sirupsen/logrus"
8 9
 	"net"
9 10
 	"net/http"
10 11
 )
... ...
@@ -12,6 +13,8 @@ import (
12 12
 // ResponseModifier allows authorization plugins to read and modify the content of the http.response
13 13
 type ResponseModifier interface {
14 14
 	http.ResponseWriter
15
+	http.Flusher
16
+	http.CloseNotifier
15 17
 
16 18
 	// RawBody returns the current http content
17 19
 	RawBody() []byte
... ...
@@ -32,7 +35,10 @@ type ResponseModifier interface {
32 32
 	OverrideStatusCode(statusCode int)
33 33
 
34 34
 	// Flush flushes all data to the HTTP response
35
-	Flush() error
35
+	FlushAll() error
36
+
37
+	// Hijacked indicates the response has been hijacked by the Docker daemon
38
+	Hijacked() bool
36 39
 }
37 40
 
38 41
 // NewResponseModifier creates a wrapper to an http.ResponseWriter to allow inspecting and modifying the content
... ...
@@ -44,7 +50,10 @@ func NewResponseModifier(rw http.ResponseWriter) ResponseModifier {
44 44
 // the http request/response from docker daemon
45 45
 type responseModifier struct {
46 46
 	// The original response writer
47
-	rw     http.ResponseWriter
47
+	rw http.ResponseWriter
48
+
49
+	r *http.Request
50
+
48 51
 	status int
49 52
 	// body holds the response body
50 53
 	body []byte
... ...
@@ -52,15 +61,34 @@ type responseModifier struct {
52 52
 	header http.Header
53 53
 	// statusCode holds the response status code
54 54
 	statusCode int
55
+	// hijacked indicates the request has been hijacked
56
+	hijacked bool
57
+}
58
+
59
+func (rm *responseModifier) Hijacked() bool {
60
+	return rm.hijacked
55 61
 }
56 62
 
57 63
 // WriterHeader stores the http status code
58 64
 func (rm *responseModifier) WriteHeader(s int) {
65
+
66
+	// Use original request if hijacked
67
+	if rm.hijacked {
68
+		rm.rw.WriteHeader(s)
69
+		return
70
+	}
71
+
59 72
 	rm.statusCode = s
60 73
 }
61 74
 
62 75
 // Header returns the internal http header
63 76
 func (rm *responseModifier) Header() http.Header {
77
+
78
+	// Use original header if hijacked
79
+	if rm.hijacked {
80
+		return rm.rw.Header()
81
+	}
82
+
64 83
 	return rm.header
65 84
 }
66 85
 
... ...
@@ -90,6 +118,11 @@ func (rm *responseModifier) OverrideHeader(b []byte) error {
90 90
 
91 91
 // Write stores the byte array inside content
92 92
 func (rm *responseModifier) Write(b []byte) (int, error) {
93
+
94
+	if rm.hijacked {
95
+		return rm.rw.Write(b)
96
+	}
97
+
93 98
 	rm.body = append(rm.body, b...)
94 99
 	return len(b), nil
95 100
 }
... ...
@@ -109,6 +142,10 @@ func (rm *responseModifier) RawHeaders() ([]byte, error) {
109 109
 
110 110
 // Hijack returns the internal connection of the wrapped http.ResponseWriter
111 111
 func (rm *responseModifier) Hijack() (net.Conn, *bufio.ReadWriter, error) {
112
+
113
+	rm.hijacked = true
114
+	rm.FlushAll()
115
+
112 116
 	hijacker, ok := rm.rw.(http.Hijacker)
113 117
 	if !ok {
114 118
 		return nil, nil, fmt.Errorf("Internal reponse writer doesn't support the Hijacker interface")
... ...
@@ -116,8 +153,30 @@ func (rm *responseModifier) Hijack() (net.Conn, *bufio.ReadWriter, error) {
116 116
 	return hijacker.Hijack()
117 117
 }
118 118
 
119
-// Flush flushes all data to the HTTP response
120
-func (rm *responseModifier) Flush() error {
119
+// CloseNotify uses the internal close notify API of the wrapped http.ResponseWriter
120
+func (rm *responseModifier) CloseNotify() <-chan bool {
121
+	closeNotifier, ok := rm.rw.(http.CloseNotifier)
122
+	if !ok {
123
+		logrus.Errorf("Internal reponse writer doesn't support the CloseNotifier interface")
124
+		return nil
125
+	}
126
+	return closeNotifier.CloseNotify()
127
+}
128
+
129
+// Flush uses the internal flush API of the wrapped http.ResponseWriter
130
+func (rm *responseModifier) Flush() {
131
+	flusher, ok := rm.rw.(http.Flusher)
132
+	if !ok {
133
+		logrus.Errorf("Internal reponse writer doesn't support the Flusher interface")
134
+		return
135
+	}
136
+
137
+	rm.FlushAll()
138
+	flusher.Flush()
139
+}
140
+
141
+// FlushAll flushes all data to the HTTP response
142
+func (rm *responseModifier) FlushAll() error {
121 143
 	// Copy the status code
122 144
 	if rm.statusCode > 0 {
123 145
 		rm.rw.WriteHeader(rm.statusCode)
... ...
@@ -130,7 +189,15 @@ func (rm *responseModifier) Flush() error {
130 130
 		}
131 131
 	}
132 132
 
133
-	// Write body
134
-	_, err := rm.rw.Write(rm.body)
133
+	var err error
134
+	if len(rm.body) > 0 {
135
+		// Write body
136
+		_, err = rm.rw.Write(rm.body)
137
+	}
138
+
139
+	// Clean previous data
140
+	rm.body = nil
141
+	rm.statusCode = 0
142
+	rm.header = http.Header{}
135 143
 	return err
136 144
 }